import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.lib.output.*; import org.apache.hadoop.util.*; public class ArticlesAndWords extends Configured implements Tool { public static class TheMapper extends Mapper{ private Text word = new Text(); private IntWritable zero = new IntWritable(0); private IntWritable one = new IntWritable(1); public void map(Text key, Text value, Context context) throws IOException, InterruptedException { context.write(key, zero); for (String token : value.toString().split("\\W+")) { word.set(token); context.write(word, one); } } } public static class ThePartitioner extends Partitioner { public int getPartition(Text key, IntWritable value, int numPartitions) { return value.get(); } } public static class TheCombiner extends Reducer { public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { for (IntWritable value : values) { context.write(key, value); return; } } } public static class TheReducer extends Reducer { public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } } public int run(String[] args) throws Exception { if (args.length < 2) { System.err.printf("Usage: %s.jar in-path out-path", this.getClass().getName()); return 1; } Job job = new Job(getConf(), this.getClass().getName()); job.setJarByClass(this.getClass()); job.setMapperClass(TheMapper.class); job.setPartitionerClass(ThePartitioner.class); job.setCombinerClass(TheCombiner.class); job.setReducerClass(TheReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); job.setMapOutputValueClass(IntWritable.class); job.setInputFormatClass(KeyValueTextInputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new ArticlesAndWords(), args); System.exit(res); } }