This is an old revision of the document!
MapReduce Tutorial : Mappers, running Java Hadoop jobs
We start by going through a simple Hadoop job with Mapper only.
A mapper which processes (key, value) pairs of types (Kin, Vin) and produces (key, value) pairs of types (Kout, Vout) must be a subclass of Mapper<Kin, Vin, Kout, Vout>. In our case, the mapper is subclass of Mapper<Text, Text, Text, Text>
.
The mapper must define a map
method and may provide setup
and context
method:
public static class TheMapper extends Mapper<Text, Text, Text, Text>{ public void setup(Context context) throws IOException, InterruptedException {} public void map(Text key, Text value, Context context) throws IOException, InterruptedException {} public void cleanup(Context context) throws IOException, InterruptedException {} }
Outputting (key, value) pairs is performed using the MapContext<Kin, Vin, Kout, Vout> object (the Context
is an abbreviation for this type), with the method context.write(Kout key, Vout value)
.
Here is the source of the whole Hadoop job:
- MapperOnlyHadoopJob.java
import java.io.IOException; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.lib.output.*; import org.apache.hadoop.util.*; public class MapperOnlyHadoopJob extends Configured implements Tool { // Mapper public static class TheMapper extends Mapper<Text, Text, Text, Text>{ public void setup(Context context) { } public void map(Text key, Text value, Context context) throws IOException, InterruptedException { if (key.getLength() > 0 && Character.toUpperCase(key.charAt(0)) == 'A') { context.write(key, value); } } public void cleanup(Context context) { } } // Job configuration public int run(String[] args) throws Exception { if (args.length < 2) { System.err.printf("Usage: %s.jar in-path out-path", this.getClass().getName()); return 1; } Job job = new Job(getConf(), this.getClass().getName()); // Create class representing Hadoop job. job.setJarByClass(this.getClass()); // Use jar containing current class. job.setMapperClass(TheMapper.class); // The mapper of the job. job.setOutputKeyClass(Text.class); // Type of the output keys. job.setOutputValueClass(Text.class); // Type of the output values. job.setInputFormatClass(KeyValueTextInputFormat.class); // Input format. // Output format is the default -- TextOutputFormat FileInputFormat.addInputPath(job, new Path(args[0])); // Input path is on command line. FileOutputFormat.setOutputPath(job, new Path(args[1])); // Output path is on command line too. return job.waitForCompletion(true) ? 0 : 1; } // Main method public static void main(String[] args) throws Exception { int res = ToolRunner.run(new MapperOnlyHadoopJob(), args); System.exit(res); } }
Running the job
Download the source and compile it.
The official way of running Hadoop jobs is to use the /SGE/HADOOP/active/bin/hadoop
script. Jobs submitted through this script can be configured using Hadoop properties only. Therefore a wrapper script is provided, with similar options as the Perl API runner:
net/projects/hadoop/bin/hadoop [-r number_of_reducers] job.jar [generic Hadoop properties] input_path output_path
– executes the given job locally in a single thread. It is useful for debugging.net/projects/hadoop/bin/hadoop -jt cluster_master [-r number_of_reducers] job.jar [generic Hadoop properties] input_path output_path
– submits the job to givencluster_master
.net/projects/hadoop/bin/hadoop -c number_of_machines [-w secs_to_wait_after_job_finishes] [-r number_of_reducers] job.jar [generic Hadoop properties] input_path output_path
– creates a new cluster with specified number of machines, which executes given job, and then waits for specified number of seconds before it stops.