This is an old revision of the document!
Table of Contents
MapReduce Tutorial : Mappers, running Java Hadoop jobs, counters
We start by going through a simple Hadoop job with Mapper only.
A mapper which processes (key, value) pairs of types (Kin, Vin) and produces (key, value) pairs of types (Kout, Vout) must be a subclass of Mapper<Kin, Vin, Kout, Vout>. In our case, the mapper is subclass of Mapper<Text, Text, Text, Text>
.
The mapper must define a map
method and may provide setup
and context
method:
public static class TheMapper extends Mapper<Text, Text, Text, Text>{ public void setup(Context context) throws IOException, InterruptedException {} public void map(Text key, Text value, Context context) throws IOException, InterruptedException {} public void cleanup(Context context) throws IOException, InterruptedException {} }
Outputting (key, value) pairs is performed using the MapContext<Kin, Vin, Kout, Vout> object (the Context
is an abbreviation for this type), with the method context.write(Kout key, Vout value)
.
Here is the source of the whole Hadoop job:
- MapperOnlyHadoopJob.java
import java.io.IOException; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.lib.output.*; import org.apache.hadoop.util.*; public class MapperOnlyHadoopJob extends Configured implements Tool { // Mapper public static class TheMapper extends Mapper<Text, Text, Text, Text>{ public void setup(Context context) { } public void map(Text key, Text value, Context context) throws IOException, InterruptedException { if (key.getLength() > 0 && Character.toUpperCase(key.charAt(0)) == 'A') { context.write(key, value); } } public void cleanup(Context context) { } } // Job configuration public int run(String[] args) throws Exception { if (args.length < 2) { System.err.printf("Usage: %s.jar in-path out-path", this.getClass().getName()); return 1; } Job job = new Job(getConf(), this.getClass().getName()); // Create class representing Hadoop job. // Name of the job is the name of current class. job.setJarByClass(this.getClass()); // Use jar containing current class. job.setMapperClass(TheMapper.class); // The mapper of the job. job.setOutputKeyClass(Text.class); // Type of the output keys. job.setOutputValueClass(Text.class); // Type of the output values. job.setInputFormatClass(KeyValueTextInputFormat.class); // Input format. // Output format is the default -- TextOutputFormat FileInputFormat.addInputPath(job, new Path(args[0])); // Input path is on command line. FileOutputFormat.setOutputPath(job, new Path(args[1])); // Output path is on command line too. return job.waitForCompletion(true) ? 0 : 1; } // Main method public static void main(String[] args) throws Exception { int res = ToolRunner.run(new MapperOnlyHadoopJob(), args); System.exit(res); } }
Remarks
- The filename must be the same as the name of the top-level class – this is enforced by Java compiler. But the top-level class can contain any number of nested classes.
- In one class multiple jobs can be submitted, either in sequence or in parallel.
- A mismatch of types is usually detected by the compiler, but sometimes it is detected only at runtime. If that happens, an exception is raised and the program crashes. For example, default key output class it
LongWritable
– ifText
was not specified, the program would crash. - VIM users: The code completion plugin does not complete the
context
variable. That is because it does not understand thatContext
is used as an abbreviation forMapContext<Text, Text, Text, Text>
. If the typeMapContext<Text, Text, Text, Text>
is used instead ofContext
, the code compiles and code completion oncontext
works.
Running the job
The official way of running Hadoop jobs is to use the /SGE/HADOOP/active/bin/hadoop
script. Jobs submitted through this script can be configured using Hadoop properties only. Therefore a wrapper script is provided, with similar options as the Perl API runner:
net/projects/hadoop/bin/hadoop job.jar [-Dname=value -Dname=value …] input output_path
– executes the given job locally in a single thread. It is useful for debugging.net/projects/hadoop/bin/hadoop job.jar -jt cluster_master [-r number_of_reducers] [-Dname=value -Dname=value …] input output_path
– submits the job to givencluster_master
.net/projects/hadoop/bin/hadoop job.jar -c number_of_machines [-w secs_to_wait_after_job_finishes] [-r number_of_reducers] [-Dname=value -Dname=value …] input output_path
– creates a new cluster with specified number of machines, which executes given job, and then waits for specified number of seconds before it stops.
Exercise 1
Download the MapperOnlyHadoopJob.java
, compile it and run it using
wget --no-check-certificate 'https://wiki.ufal.ms.mff.cuni.cz/_export/code/courses:mapreduce-tutorial:step-24?codeblock=1' -O 'MapperOnlyHadoopJob.java' make -f /net/projects/hadoop/java/Makefile MapperOnlyHadoopJob.jar rm -rf step-24-out-sol; /net/projects/hadoop/bin/hadoop MapperOnlyHadoopJob.jar -r 0 /home/straka/wiki/cs-text-small step-24-out-sol less step-24-out-sol/part-*
Mind the -r 0
switch – specifying -r 0
disable the reducer. If the switch -r 0
was not given, one reducer of default type IdentityReducer
would be used. The IdentityReducer
outputs every (key, value) pair it is given.
- When using
-r 0
, the job runs faster, as the mappers write the output directly to disk. Buth there are as many output files as mappers and the (key, value) pairs are stored in no special order. - When not specifying
-r 0
(i.e., using-r 1
withIdentityReducer
), the job produces the same (key, value) pairs. But this time they are in one output file, sorted according to the key. Of course, the job runs slower in this case.
Counters
As in the Perl API, a mapper (or a reducer) can increment various counters by using context.getCounter(“Group”, “Name”).increment(value)
:
public void map(Text key, Text value, Context context) throws IOException, InterruptedException { ... context.getCounter("Group", "Name").increment(value); ... }
The getCounter
method returns a Counter object, so if a counter is incremented frequently, the getCounter
method can be called only once:
public void map(Text key, Text value, Context context) throws IOException, InterruptedException { ... Counter words = context.getCounter("Mapper", "Number of words"); for (String word : value.toString().split("\\W+")) { ... words.increment(1); } }
Example 2
TODO: step-24.txtthreeletterwords.java
Step 23: Predefined formats and types. | Overview | Step 25: Reducers, combiners and partitioners. |