jobcontrol 冷不防 2022-06-14 12:20 108阅读 0赞 jobcontrol可以实现多个job结合起来运行。下面就是有两个job的jobcontrol,第一个job的输出是第二个job的输入。 package hadoop; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob; import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import scala.reflect.generic.Trees.New; public class WordCount { public static class WordCountMapper extends Mapper<Object, Text, Text, IntWritable>{ private static final IntWritable Number = new IntWritable(1); private Text word = new Text(); @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer stringTokenizer = new StringTokenizer(value.toString()); while(stringTokenizer.hasMoreTokens()){ String string = stringTokenizer.nextToken(); word.set(string); context.write(word, Number); } } } public static class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> vlaues, Context context) throws IOException, InterruptedException { int num=0; for(IntWritable intWritable:vlaues){ num+=intWritable.get(); } context.write(key, new IntWritable(num)); } } public static class WordCountMapper1 extends Mapper<Object, Text, Text, IntWritable>{ private static final IntWritable Number = new IntWritable(1); private Text word = new Text(); @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer stringTokenizer = new StringTokenizer(value.toString()); while(stringTokenizer.hasMoreTokens()){ String string = stringTokenizer.nextToken(); word.set(string); context.write(word, Number); } } } public static class WordCountReduce1 extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> vlaues, Context context) throws IOException, InterruptedException { int num=0; for(IntWritable intWritable:vlaues){ num+=intWritable.get(); } context.write(key, new IntWritable(num)); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); String[] argsValues = new GenericOptionsParser(conf, args).getRemainingArgs(); JobControl jobControl = new JobControl("jobcontrol"); Job job = new Job(conf, "word count1"); job.setJarByClass(WordCount.class); job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPaths(job, argsValues[0]); FileOutputFormat.setOutputPath(job, new Path(argsValues[1])); Job job2 = new Job(conf, "word count2"); job2.setJarByClass(WordCount.class); job2.setMapperClass(WordCountMapper1.class); job2.setReducerClass(WordCountReduce1.class); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPaths(job2, argsValues[1]); FileOutputFormat.setOutputPath(job2, new Path(argsValues[2])); ControlledJob controlledJob = new ControlledJob(conf); controlledJob.setJob(job); ControlledJob controlledJob2 = new ControlledJob(conf); controlledJob2.setJob(job2); controlledJob2.addDependingJob(controlledJob); jobControl.addJob(controlledJob); jobControl.addJob(controlledJob2); Thread thread = new Thread(jobControl); thread.start(); while(true){ if(jobControl.allFinished()){ System.out.println(jobControl.getSuccessfulJobList()); jobControl.stop(); break; } } } } 输出结果为 [hadoop@master local]$ hadoop fs -cat /test/test.txt hello hadoop hello hi [hadoop@master local]$ hadoop fs -ls /test/output Found 2 items -rw-r--r-- 1 hadoop supergroup 0 2017-06-20 14:55 /test/output/_SUCCESS -rw-r--r-- 1 hadoop supergroup 22 2017-06-20 14:55 /test/output/part-r-00000 [hadoop@master local]$ hadoop fs -cat /test/output/part-r-00000 hadoop 1 hello 2 hi 1 [hadoop@master local]$ hadoop fs -ls /test/output1 Found 2 items -rw-r--r-- 1 hadoop supergroup 0 2017-06-20 14:57 /test/output1/_SUCCESS -rw-r--r-- 1 hadoop supergroup 30 2017-06-20 14:57 /test/output1/part-r-00000 [hadoop@master local]$ hadoop fs -cat /test/output1/part-r-00000 1 2 2 1 hadoop 1 hello 1 hi 1
相关 Hadoop的JobControl设计及用法 JobControl设计及用法 1、JobControl设计原理分析: JobControl由两个类组成:Job和JobControl。 Job类封装 阳光穿透心脏的1/2处/ 2022年08月20日 12:19/ 0 赞/ 124 阅读
相关 jobcontrol jobcontrol可以实现多个job结合起来运行。下面就是有两个job的jobcontrol,第一个job的输出是第二个job的输入。 package hadoop 冷不防/ 2022年06月14日 12:20/ 0 赞/ 109 阅读
相关 如何使用Hadoop的JobControl \[b\]\[color=green\]\[size=large\]使用Hadoop里面的MapReduce来处理海量数据是非常简单方便的,但有时候我们的应用程序,往往需要多个 以你之姓@/ 2022年04月13日 10:18/ 0 赞/ 180 阅读
还没有评论,来说两句吧...