Hadoop——topN 向右看齐 2022-05-26 02:16 198阅读 0赞 本节目标: 1、通过一个求topN的案例,掌握MR的开发流程。 2、学会查看[官方API][API] 根据已知的数据集,数据集每一行的文本内容是不同年月和时间对应的温度。 ![20180423205929607][] Q:求每年每月中出现的最高的两个温度值。 分析:年月、时间升序,温度降序。将年月分组,具有相同年月的数据分组到一起,然后时间按照升序排列,温度降序排列,取前两个。 /** * Job主类 * 设置与job任务相关的所有信息,提交job任务 * @author devin */ public class TempJob { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //本地测试环境,手动设置(Active NN,Active RM)配置信息 Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://master:8020"); conf.set("yarn.resourcemanager.hostname", "slave3"); //实例化job Job job = Job.getInstance(conf); //job入口 job.setJarByClass(TempJob.class); //设置map相关信息,包括自定义的map类,map输出key的类型,输出value的类型 job.setMapperClass(TempMap.class); job.setMapOutputKeyClass(Weather.class); job.setMapOutputValueClass(IntWritable.class); //设置job的shuffle过程操作信息,包括分区,排序,分组 job.setPartitionerClass(TempPartition.class); job.setSortComparatorClass(TempSort.class); job.setGroupingComparatorClass(TempGroup.class); //设置reduce相关信息,包括reduce任务个数,自定义的reduce类 job.setNumReduceTasks(3); job.setReducerClass(TempReduce.class); //设置要处理的文件 Path inPath = new Path("/temperature/input/temp2.txt"); FileInputFormat.addInputPath(job, inPath); //设置最终结果保存的路径 Path outPath = new Path("/temperature/output"); FileSystem fs = FileSystem.get(conf); if (fs.exists(outPath)) { fs.delete(outPath, true); } FileOutputFormat.setOutputPath(job, outPath); //提交job,等待完成! boolean flag = job.waitForCompletion(true); if (flag) { System.out.println("Job success!"); } } } /** * 封装了可用于排序的相关属性 * 实现WritableComparable,使该类可序列化反序列化,用于机器间的数据传输;可比较,用于对象排序 */ public class Weather implements WritableComparable<Weather> { //封装要排序的属性 private int year; private int month; private int day; private int temperature;//温度 public int getYear() { return year; } public void setYear(int year) { this.year = year; } public int getMonth() { return month; } public void setMonth(int month) { this.month = month; } public int getDay() { return day; } public void setDay(int day) { this.day = day; } public int getTemperature() { return temperature; } public void setTemperature(int temperature) { this.temperature = temperature; } //反序列化 @Override public void readFields(DataInput input) throws IOException { this.year = input.readInt(); this.month = input.readInt(); this.day = input.readInt(); this.temperature = input.readInt(); } //序列化 @Override public void write(DataOutput output) throws IOException { output.writeInt(this.year); output.writeInt(this.month); output.writeInt(this.day); output.writeInt(this.temperature); } /** * 自定义对象排序规则: * 先按year属性升序排序 * 如果year属性值相同,按month属性升序排序 * 如果year属性值相同,按temperature属性降序排序 */ @Override public int compareTo(Weather o) { int i = Integer.compare(this.year, o.getYear()); if (i == 0) { int j = Integer.compare(this.month, o.getMonth()); if (j == 0) { return -Integer.compare(this.temperature, o.getTemperature()); } return j; } return i; } } /\*\* \* 自定义map函数的类 主要功能:读取文本中的每行数据,提取每行数据中的可用属性(年月日,时间,温度),封装成javabean,作为map输出的key,输出的值为对应的温度。 \* \* @author devin \* \*/public class TempMap extends Mapper<LongWritable, Text, Weather, IntWritable> \{/\*\* \* key:偏移量 \* value:每行文本内容 \* context:输出内容上下文 \*/@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException \{try \{// 分割每行数据,提取javabean属性String\[\] str = StringUtils.split(value.toString(), "\\t");String dataTime = str\[0\];String tempVal = str\[1\];//每行数据封装成一个Weather对象,设置每个对象的属性值Weather w = new Weather();SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");Date date = sdf.parse(dataTime);Calendar cal = Calendar.getInstance();cal.setTime(date);w.setYear(cal.get(cal.YEAR));w.setMonth(cal.get(cal.MONTH) + 1);w.setDay(cal.get(cal.DAY\_OF\_MONTH));int temperature = Integer.parseInt(tempVal.substring(0,tempVal.lastIndexOf("c")));w.setTemperature(temperature);// map输出结果<weather,temperature>context.write(w, new IntWritable(temperature));\} catch (ParseException e) \{e.printStackTrace();\}\}\}/\*\* \* 自定义分区规则: \* 简单为好 \*/public class TempPartition extends HashPartitioner<Weather, IntWritable> \{ //接收map输出的key,value,以及reduce任务的数目作为参数@Overridepublic int getPartition(Weather weather, IntWritable value,int numReduceTasks) \{return (weather.getYear() - 1949) % numReduceTasks;\}\}/\*\* \* 根据自定义分区方式分区后,需要将各分区中数据按一定规则排序。 \* 排序后,map端的shuffle完成(溢写在此先忽略) \*/public class TempSort extends WritableComparator \{//需要在构造方法中调用父类构造,传入key的类型,以及是否创建实例public TempSort() \{super(Weather.class,true);\}@Overridepublic int compare(WritableComparable k1, WritableComparable k2) \{Weather w1 = (Weather) k1;Weather w2 = (Weather) k2;//Weather对象的排序规则在创建类的时候已定义return w1.compareTo(w2);//也可以通过以下方式定义// int i = Integer.compare(w1.getYear(), w2.getYear());// if (i == 0) \{// int j = Integer.compare(w1.getMonth(), w2.getMonth());// if (j == 0) \{// return -Integer.compare(w1.getTemperature(), w2.getTemperature());// \}// return j;// \}// return i;\}\}/\*\* \* reduce端的shuffle过程,从map端领取属于自己分区的数据 \* 通过自定义分组方法,进行分组 \* @author lenovo \* \*/public class TempGroup extends WritableComparator \{public TempGroup() \{super(Weather.class,true);\}/\*\* \* 自定义分组方式: \* 按照对象的year、month属性分组,具有相同year和month属性的对象会被分到一组 \*/@Overridepublic int compare(WritableComparable k1, WritableComparable k2) \{Weather w1 = (Weather) k1;Weather w2 = (Weather) k2;int i = Integer.compare(w1.getYear(), w2.getYear());if (i == 0) \{return Integer.compare(w1.getMonth(), w2.getMonth());\}return i;\}\}/\*\* \* 自定义reduce函数 \* 输入的key:分组后的javabean对象 \* 输入的value:每个对象对应的值(温度) \* 输出的key:结果字符串 \* 输出的value:null \*/public class TempReduce extends Reducer<Weather, IntWritable, Text, NullWritable> \{@Overrideprotected void reduce(Weather weather, Iterable<IntWritable> iterable,Context context)throws IOException, InterruptedException \{//输出top2int num = 0;for (IntWritable iter : iterable) \{num++;if (num >2) \{break;\}String str = weather.getYear()+"-" + weather.getMonth() + "-"+weather.getDay()+"\\t" + iter.get();context.write(new Text(str), NullWritable.get());\}\}\} 允许结果: ![20180423211852429][]![20180423211858382][]![20180423211903766][] [API]: http://hadoop.apache.org/docs/r2.7.6/api/index.html [20180423205929607]: /images/20220526/706739bde9024256927bef6116b72713.png [20180423211852429]: https://img-blog.csdn.net/20180423211852429 [20180423211858382]: https://img-blog.csdn.net/20180423211858382 [20180423211903766]: /images/20220526/15591793d5f54140922ab49fb19231a8.png
还没有评论,来说两句吧...