MapReducer-wordCount 我就是我 2022-04-03 02:37 253阅读 0赞 public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //读取一行 String wordLine = value.toString(); //根据空格拆分单词 String[] wordArr = wordLine.split(" "); for (String word : wordArr) { //把每个单词作为Key、1作为value输送到Reducer context.write(new Text(word), new IntWritable(1)); } } } public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { //map会把相同key发送到一个reducer,把他们的第一个key作为key,把每个值放入迭代器 @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //定义一个统计数 int count = 0; for (IntWritable intWritable : values) { count = count + intWritable.get(); } //写出一行, context.write(key, new IntWritable(count)); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //输入路径设置为hdfs的input文件夹下所有文件 String input = "hdfs://localhost:9000/wordCount/input"; //输出路径设置为hdfs的output文件夹下 String output = "hdfs://localhost:9000/wordCount/output/wordRest"; //也可以设置为本地 String output2 = "file:D:/aa"; Configuration configuration = new Configuration(); //获得一个job实例 Job job = Job.getInstance(configuration); //设置为当前的class job.setJarByClass(WordCountDriver.class); //设置Map是哪个 job.setMapperClass(WordCountMapper.class); //设置Reducer是哪个 job.setReducerClass(WordCountReducer.class); //设置Map输出key是哪个 job.setMapOutputKeyClass(Text.class); //设置Map输出value是哪个 job.setMapOutputValueClass(IntWritable.class); //设置Reducer输出key是哪个 job.setOutputKeyClass(Text.class); //设置Reducer输出value是哪个 job.setOutputValueClass(IntWritable.class); //设置输入路径,也就是从哪里读取文件 FileInputFormat.setInputPaths(job,new Path(input)); //设置输入路径,也就是从哪里写文件 FileOutputFormat.setOutputPath(job, new Path(output2)); //提交job boolean res = job.waitForCompletion(true); System.exit(res?0:1); }
还没有评论,来说两句吧...