Combiner应用

r囧r小猫 2024-04-19 07:00 117阅读 0赞
  1. package MovieCount;
  2. import Use.Rates.UserRateTopN;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.io.IntWritable;
  6. import org.apache.hadoop.io.LongWritable;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.mapreduce.Job;
  9. import org.apache.hadoop.mapreduce.Mapper;
  10. import org.apache.hadoop.mapreduce.Reducer;
  11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  12. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  13. import org.codehaus.jackson.map.ObjectMapper;
  14. import java.io.IOException;
  15. import java.util.Comparator;
  16. import java.util.Map;
  17. import java.util.TreeMap;
  18. public class RateHotn {
  19. public static class RateHotMap extends Mapper<LongWritable, Text,Text, IntWritable>{
  20. //ObjectMapper是Jackson提供的一个类,作用是将java对象与json格式相互转化
  21. //ObjectMapper 通过 writeValue 系列方法 将 java 对 象序列化 为 json,并 将 json 存 储成不同的格式,String(writeValueAsString),Byte Array(writeValueAsString),Writer, File,OutStream 和 DataOutput。
  22. //ObjectMapper 通过 readValue 系列方法从不同的数据源像 String , Byte Array, Reader,File,URL, InputStream 将 json 反序列化为 java 对象。
  23. ObjectMapper objectMapper = new ObjectMapper();
  24. @Override
  25. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  26. String line = value.toString();
  27. UserRateTopN userRateTopN = objectMapper.readValue(line, UserRateTopN.class);
  28. String movie = userRateTopN.getMovie();
  29. context.write(new Text(movie),new IntWritable(1));
  30. }
  31. }
  32. public static class RateHotnReduce extends Reducer<Text,IntWritable,Text,IntWritable>{
  33. TreeMap<IntWritable,Text>map;
  34. @Override
  35. protected void setup(Context context) throws IOException, InterruptedException {
  36. map = new TreeMap<>(new Comparator<IntWritable>(){
  37. @Override
  38. public int compare(IntWritable o1, IntWritable o2) {
  39. return o2.compareTo(o1);
  40. }
  41. });
  42. }
  43. @Override
  44. protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
  45. Integer count = 0;
  46. for(IntWritable value:values){
  47. count = count + value.get();
  48. }
  49. map.put(new IntWritable(count),new Text(key));
  50. }
  51. @Override
  52. protected void cleanup(Context context) throws IOException, InterruptedException {
  53. Configuration conf = context.getConfiguration();
  54. int suibian = conf.getInt("suibian",3);
  55. for(int i = 0;i<suibian;i++){
  56. Map.Entry<IntWritable,Text> entry = map.pollFirstEntry();
  57. IntWritable count = entry.getKey();
  58. Text movie = entry.getValue();
  59. context.write(movie,count);
  60. }
  61. }
  62. }
  63. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  64. Configuration conf = new Configuration();
  65. conf.setInt("suibian",Integer.parseInt(args[0]));
  66. Job job = Job.getInstance(conf);
  67. job.setCombinerClass(RateHotnCombiner.class);
  68. job.setJarByClass(RateHotn.class);
  69. job.setMapperClass(RateHotMap.class);
  70. job.setReducerClass(RateHotnReduce.class);
  71. job.setOutputKeyClass(Text.class);
  72. job.setOutputValueClass(IntWritable.class);
  73. FileInputFormat.setInputPaths(job,new Path(""));
  74. FileOutputFormat.setOutputPath(job,new Path(""));
  75. boolean b = job.waitForCompletion(true);
  76. System.exit(b?0:1);
  77. }
  78. }
  79. package MovieCount;
  80. import org.apache.hadoop.io.IntWritable;
  81. import org.apache.hadoop.io.Text;
  82. import org.apache.hadoop.mapreduce.Reducer;
  83. import java.io.IOException;
  84. public class RateHotnCombiner extends Reducer<Text, IntWritable,Text,IntWritable> {
  85. @Override
  86. protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
  87. Integer count = 0;
  88. for(IntWritable value:values){
  89. count ++;
  90. }
  91. context.write(key,new IntWritable(count));
  92. }
  93. }

发表评论

表情:
评论列表 (有 0 条评论,117人围观)

还没有评论,来说两句吧...

相关阅读

    相关 MR Combiner

    在Mapper和Reducer之间有一个非常重要的组件Combiner。每一个map都可能会产生大量的本地输出,Combiner的作用就是对map端的输出先做一次合并,以减少在