import java.io.IOException; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.lib.output.*; import org.apache.hadoop.util.*; public class MapperOnlyHadoopJob extends Configured implements Tool { // Mapper public static class TheMapper extends Mapper{ public void setup(Context context) { } public void map(Text key, Text value, Context context) throws IOException, InterruptedException { if (key.getLength() > 0 && Character.toUpperCase(key.charAt(0)) == 'A') { context.write(key, value); } } public void cleanup(Context context) { } } // Job configuration public int run(String[] args) throws Exception { if (args.length < 2) { System.err.printf("Usage: %s.jar in-path out-path", this.getClass().getName()); return 1; } Job job = new Job(getConf(), this.getClass().getName()); // Create class representing Hadoop job. // Name of the job is the name of current class. job.setJarByClass(this.getClass()); // Use jar containing current class. job.setMapperClass(TheMapper.class); // The mapper of the job. job.setOutputKeyClass(Text.class); // Type of the output keys. job.setOutputValueClass(Text.class); // Type of the output values. job.setInputFormatClass(KeyValueTextInputFormat.class); // Input format. // Output format is the default -- TextOutputFormat FileInputFormat.addInputPath(job, new Path(args[0])); // Input path is on command line. FileOutputFormat.setOutputPath(job, new Path(args[1])); // Output path is on command line too. return job.waitForCompletion(true) ? 0 : 1; } // Main method public static void main(String[] args) throws Exception { int res = ToolRunner.run(new MapperOnlyHadoopJob(), args); System.exit(res); } }