/* * To change this license header, choose License Headers in Project Properties. * To change this template file, choose Tools | Templates * and open the template in the editor. */ package summation; import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; /** * * @author rakesh */ public class Summation { public static class MyMap extends MapReduceBase implements Mapper { private static IntWritable myvalue = new IntWritable(-1); private Text mykey = new Text(); public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line,","); String token; int x; while (tokenizer.hasMoreTokens()) { token=tokenizer.nextToken(); mykey.set(token); x=Integer.parseInt(token); myvalue=new IntWritable(x); output.collect(mykey, myvalue); } } } public static class MyReduce extends MapReduceBase implements Reducer { public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { int sum = 0; Text mytext=new Text("Total Sum:"); while (values.hasNext()) { sum += values.next().get(); } output.collect(mytext, new IntWritable(sum)); } } public static void main(String[] args) throws Exception { JobConf conf = new JobConf(Summation.class); conf.setJobName("summation"); conf.setMapperClass(MyMap.class); conf.setCombinerClass(MyReduce.class); conf.setReducerClass(MyReduce.class); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); } }