Java实战-hadoop patition 分区简介和自定义

逃离我推掉我的手 2021-06-10 20:38 661阅读 0赞

0 简介:

0) 类比于新生入学,不同的学生实现分配好了宿舍,然后进入到不同的宿舍(reduce task)

如果map发送来的数据量太大,意味着这些数据都到这个默认reduce节点执行,没有发挥reduce

并行计算的目的,IO压力也很大。 这就是分区的原因。

a) 默认下分配一个区

b) 分配几个区,则对应几个reduce任务,每个任务在执行的时候都会公用reduce内的代码

c) 自定义分区下 返回的分区数量一定要和 定义的reduce任务相同,具体来说就是:

自定义分区类 extends HashPartitioner,重写getPartition时,返回的分支个数要和

job.setNumReduceTasks(X); 中的X个数相同

如果分区格式和reducetask任务个数不同下,在hadoop不同版本中的运行情况如下:

Java代码

  1. HashPartitioner.java key.hashcode() & integer.maxvalue % numreducetasks = 模1恒等于0 返回值恒为0 返回值是分区的标记或者索引 part-00000 part-00001 等等
  2. 默认的是job.setPartitionerClass(HashPartitioner.class) 自定义分区返回的是索引数字,从0开始依次递增1返回。
  3. 以 手机号和座机号写在一个文件中为例:
  4. 如果分区数量 大于/小于 reduce数量时,
  5. 2个分区 1个reduce —-> hadoop2中依旧能正常执行 只不过不会分区 所有数据都写到一个输出中 hadoop1中会报错
  6. 2个分区 4个reduce —-> hadoop2中依旧能正常执行 输出结果写到4个区中,第一个分区结果为手机号 第二个为座机号 剩下两个为空文件 所有数据都写到一个输出中

d) 需要打包放在hadoop环境内运行,否则在本机运行eg:eclipse环境下,会报错如下:

Java代码

  1. 14/12/09 14:12:58 WARN mapred.LocalJobRunner: job_local_0001
  2. java.io.IOException: Illegal partition for 84138413 (1)

map-shuffle-reduce过程图如下:

format_png

1 代码

结果处理成2个区, 一个是放手机号的 一个是放固话的:

  1. package partition;
  2. import java.io.DataInput;
  3. import java.io.DataOutput;
  4. import java.io.IOException;
  5. import java.net.URI;
  6. import org.apache.hadoop.conf.Configuration;
  7. import org.apache.hadoop.fs.FileSystem;
  8. import org.apache.hadoop.fs.Path;
  9. import org.apache.hadoop.io.LongWritable;
  10. import org.apache.hadoop.io.Text;
  11. import org.apache.hadoop.io.Writable;
  12. import org.apache.hadoop.mapreduce.Job;
  13. import org.apache.hadoop.mapreduce.Mapper;
  14. import org.apache.hadoop.mapreduce.Reducer;
  15. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  16. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  17. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  18. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  19. import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
  20. /**
  21. *
  22. * 实现单词计数功能,指定分区个数(分区下必须通过打包方式来运行)
  23. * 1 自定义规约
  24. * 1.1 规约定义好处:Combiner发生在Map端,对数据进行规约处理,数据量变小了,传送到reduce端的数据量变小了,传输时间变短,作业的整体时间变短
  25. * 1.2 因为不是所有的算法都适合使用Combiner处理,例如求平均数,因此Combiner不作为MR运行的标配
  26. * 1.3 Combiner本身已经执行了reduce操作仅仅是处理一个任务所接收的文件中的数据,不能跨map任务执行;只有reduce可以接收多个map任务处理的数据
  27. * 这也是Combiner本身已经执行了reduce操作,为什么在Reducer阶段还要执行reduce操作的原因。
  28. * 2 自定义分区
  29. * 2.1 分区运行必须打成jar运行
  30. * 2.2 map分几个区,则reduce有几个任务数量,每个reduce任务将对应一个输出文件
  31. * 2.3 分区不是越多越好,要根据业务需求,分区太多,也会造成资源创建,等待等消耗
  32. * 2.4 多个reduce任务在运行的好处是提高整体job的运行效率
  33. *
  34. * 结果处理成2个区, 一个是放手机号的 一个是放固话的
  35. * [root@master ~]# hadoop fs -ls /out
  36. Warning: $HADOOP_HOME is deprecated.
  37. Found 4 items
  38. -rw-r--r-- 1 root supergroup 0 2014-08-24 16:02 /out/_SUCCESS
  39. drwxr-xr-x - root supergroup 0 2014-08-24 16:02 /out/_logs
  40. -rw-r--r-- 1 root supergroup 556 2014-08-24 16:02 /out/part-r-00000
  41. -rw-r--r-- 1 root supergroup 79 2014-08-24 16:02 /out/part-r-00001
  42. */
  43. public class KpiAppPatition {
  44. // 0 定义操作地址
  45. static final String FILE_ROOT = "hdfs://master:9000/";
  46. static final String INPUT_PATH = "hdfs://master:9000/hello";
  47. static final String OUT_PATH = "hdfs://master:9000/out";
  48. public static void main(String[] args) throws Exception{
  49. Configuration conf = new Configuration();
  50. FileSystem fileSystem = FileSystem.get(new URI(FILE_ROOT),conf);
  51. Path outpath = new Path(OUT_PATH);
  52. if(fileSystem.exists(outpath)){
  53. fileSystem.delete(outpath, true);
  54. }
  55. // 0 定义干活的人
  56. Job job = new Job(conf);
  57. job.setJarByClass(KpiAppPatition.class);
  58. // 1.1 告诉干活的人 输入流位置 读取hdfs中的文件。每一行解析成一个<k,v>。每一个键值对调用一次map函数
  59. FileInputFormat.setInputPaths(job, INPUT_PATH);
  60. // 指定如何对输入文件进行格式化,把输入文件每一行解析成键值对
  61. job.setInputFormatClass(TextInputFormat.class);
  62. //1.2 指定自定义的map类
  63. job.setMapperClass(MyMapper2.class);
  64. job.setMapOutputKeyClass(Text.class);
  65. job.setMapOutputValueClass(KpiWritable2.class);
  66. //1.3 分区
  67. job.setPartitionerClass(KpiPartitioner.class);
  68. job.setNumReduceTasks(2);
  69. //1.4 TODO 排序、分组 目前按照默认方式执行
  70. //1.5 TODO 规约
  71. //2.2 指定自定义reduce类
  72. job.setReducerClass(MyReducer2.class);
  73. job.setOutputKeyClass(Text.class);
  74. job.setOutputValueClass(LongWritable.class);
  75. //2.3 指定写出到哪里
  76. FileOutputFormat.setOutputPath(job, outpath);
  77. job.setOutputFormatClass(TextOutputFormat.class);
  78. // 让干活的人干活
  79. job.waitForCompletion(true);
  80. }
  81. }
  82. // 自定义分区写法如 注意返回的值是从0开始依次累加1的int值,不能跳跃
  83. // 否则报错 说找不到编号为X的
  84. // 你的reduce有几个,那么就会从0开始以1为累加数字返回对应个数的分区编码 然后
  85. // 在去你代码里找对应编码 代码中随意返回patition的num 找不到就会报错
  86. class KpiPartitioner extends HashPartitioner<Text, KpiWritable2>{
  87. @Override
  88. public int getPartition(Text key, KpiWritable2 value, int numReduceTasks) {
  89. System.out.println("KpiPartitioner numReduceTasks is : " + numReduceTasks );
  90. return (key.toString().length()==11)?0:1; // key为key2 即 电话号码,这里 如果是手机号(11位)则返回0,否则返回1 这样会生成2个分区,1个存放手机号的 1个存放固话的
  91. }
  92. }
  93. /**
  94. * 将 <k1,v1> ---> <k2,v2>
  95. * @author zm
  96. */
  97. class MyMapper2 extends Mapper<LongWritable, Text, Text, KpiWritable2>{
  98. /**
  99. * key 表示k1 即 当前行号
  100. * value 表示v1 即当前行内容
  101. */
  102. @Override
  103. protected void map(LongWritable k1, Text v1, Context context)
  104. throws IOException, InterruptedException {
  105. //格式: 1363157990043 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 200
  106. String[] elements = v1.toString().split("\t");
  107. String phoneNum = elements[1];
  108. KpiWritable2 v2 = new KpiWritable2(elements[6],elements[7],elements[8],elements[9]);
  109. Text k2 = new Text(phoneNum);
  110. context.write(k2, v2);
  111. }
  112. }
  113. /**
  114. * 将 <k2,v2> ---> <k3,v3>
  115. * @author zm
  116. */
  117. class MyReducer2 extends Reducer<Text, KpiWritable2,Text, LongWritable>{
  118. protected void reduce(Text k2, Iterable<KpiWritable2> v2s,
  119. org.apache.hadoop.mapreduce.Reducer.Context context)
  120. throws IOException, InterruptedException {
  121. long upPackNum = 0L;
  122. long downPackNum = 0L;
  123. long upPayLoad = 0L;
  124. long downPayLoad = 0L;
  125. for(KpiWritable2 kpiWritable1 : v2s){
  126. upPackNum = kpiWritable1.upPackNum;
  127. downPackNum = kpiWritable1.downPackNum;
  128. upPayLoad = kpiWritable1.upPayLoad;
  129. downPayLoad = kpiWritable1.downPayLoad;
  130. }
  131. KpiWritable2 v3 = new KpiWritable2(upPackNum+"",downPackNum+"",upPayLoad+"",downPayLoad+"");
  132. context.write(k2, v3);
  133. }
  134. }
  135. /**
  136. * 自定义类型类,里面封装 上网流量信息
  137. * @author zm
  138. *
  139. */
  140. class KpiWritable2 implements Writable{
  141. long upPackNum; // 上传数据包个数
  142. long downPackNum;// 下载数据包个数
  143. long upPayLoad;// 上传数据
  144. long downPayLoad;// 下载数据
  145. public KpiWritable2(String upPackNum,String downPackNum,String upPayLoad,String downPayLoad){
  146. this.upPackNum = Long.parseLong(upPackNum);
  147. this.downPackNum = Long.parseLong(downPackNum);
  148. this.upPayLoad = Long.parseLong(upPayLoad);
  149. this.downPayLoad = Long.parseLong(downPayLoad);
  150. }
  151. public KpiWritable2(){}
  152. @Override
  153. public void write(DataOutput out) throws IOException {
  154. // 先写后读
  155. out.writeLong(this.upPackNum);
  156. out.writeLong(this.downPackNum);
  157. out.writeLong(this.upPayLoad);
  158. out.writeLong(this.downPayLoad);
  159. }
  160. @Override
  161. public void readFields(DataInput in) throws IOException {
  162. // 读取的时候, 按照写方法的顺序( 队列方式) 顺序读取
  163. this.upPackNum = in.readLong();
  164. this.downPackNum = in.readLong();
  165. this.upPayLoad = in.readLong();
  166. this.downPayLoad = in.readLong();
  167. }
  168. @Override
  169. public String toString() {
  170. return upPackNum + "\t" + downPackNum + "\t" + upPayLoad + "\t" + downPayLoad;
  171. }
  172. }

发表评论

表情:
评论列表 (有 0 条评论,661人围观)

还没有评论,来说两句吧...

相关阅读

    相关 hadoop 定义OutputFormat

    前言 在某些业务场景下,需要对原始的数据进行合理的分类输出,减少后续的程序处理数据带来的麻烦,其实这也属于ETL中的一种,比如,我们收集到了一份原始的日志,主体字段为区域

    相关 hadoop 定义分区

    分区概念 分区这个词对很多同学来说并不陌生,比如Java很多中间件中,像kafka的分区,mysql的分区表等,分区存在的意义在于将数据按照业务规则进行合理的划分,方便后