Flink代码kill掉yarn任务,并且实现自动savepoint。

- 日理万妓 2022-12-07 15:22 382阅读 0赞

一,最近在做平台,就是前后端分离的项目,简单的说就是对各种组件整合一下子,所以呢,提交任务啥的都在平台上搞了。

二,这里实现的功能很简单吧。就是代码模式,执行任务就可以kill掉yarn上的Flink任务。并且能自动生成savapoint

三,我们需要写入的参数是:

1)yarn 任务id

  1. String appId = "application_1600222031782_0023";

2)Flink任务的jobId

  1. String jobid = "c4d7e2ff6a35d402eaf54b9f9ca0f6c6";

3)需要savapoint地址

  1. String savePoint = "hdfs://dev-ct6-dc-master01:8020/flink-savepoints5";

pom依赖:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-yarn_${scala.binary.version}</artifactId>
  4. <version>${flink.version}</version>
  5. </dependency>

如果不成功,执行任务的时候加上hadoop_home的环境变量(下面只是参考)

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzMxODY2Nzkz_size_16_color_FFFFFF_t_70

四,代码

  1. import org.apache.flink.api.common.JobID;
  2. import org.apache.flink.client.cli.CliArgsException;
  3. import org.apache.flink.client.program.ClusterClient;
  4. import org.apache.flink.configuration.Configuration;
  5. import org.apache.flink.configuration.GlobalConfiguration;
  6. import org.apache.flink.util.FlinkException;
  7. import org.apache.flink.yarn.YarnClusterClientFactory;
  8. import org.apache.flink.yarn.YarnClusterDescriptor;
  9. import org.apache.flink.yarn.configuration.YarnConfigOptions;
  10. import org.apache.hadoop.yarn.api.records.ApplicationId;
  11. import java.util.concurrent.CompletableFuture;
  12. import java.util.concurrent.ExecutionException;
  13. public class StopYarnJob {
  14. public static void main(String[] args) throws FlinkException, CliArgsException, ExecutionException, InterruptedException {
  15. String hadoop_home = System.getProperty("HADOOP_HOME");
  16. System.out.println("hadoop_home = " + hadoop_home);
  17. String configurationDirectory = "G:/flink_working_tools/yarn-conf";
  18. String appId = "application_1600222031782_0023";
  19. String jobid = "c4d7e2ff6a35d402eaf54b9f9ca0f6c6";
  20. String savePoint = "hdfs://dev-ct6-dc-master01:8020/flink-savepoints5";
  21. //获取flink的配置
  22. Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(
  23. configurationDirectory);
  24. // Configuration flinkConfiguration = new Configuration();
  25. flinkConfiguration.set(YarnConfigOptions.APPLICATION_ID, appId);
  26. YarnClusterClientFactory clusterClientFactory = new YarnClusterClientFactory();
  27. ApplicationId applicationId = clusterClientFactory.getClusterId(flinkConfiguration);
  28. if (applicationId == null) {
  29. throw new FlinkException(
  30. "No cluster id was specified. Please specify a cluster to which you would like to connect.");
  31. }
  32. YarnClusterDescriptor clusterDescriptor = clusterClientFactory
  33. .createClusterDescriptor(
  34. flinkConfiguration);
  35. ClusterClient<ApplicationId> clusterClient = clusterDescriptor.retrieve(
  36. applicationId).getClusterClient();
  37. JobID jobID = parseJobId(jobid);
  38. CompletableFuture<String> completableFuture = clusterClient.stopWithSavepoint(
  39. jobID,
  40. true,
  41. savePoint);
  42. String savepoint = completableFuture.get();
  43. System.out.println(savepoint);
  44. }
  45. private static JobID parseJobId(String jobIdString) throws CliArgsException {
  46. if (jobIdString == null) {
  47. throw new CliArgsException("Missing JobId");
  48. }
  49. final JobID jobId;
  50. try {
  51. jobId = JobID.fromHexString(jobIdString);
  52. } catch (IllegalArgumentException e) {
  53. throw new CliArgsException(e.getMessage());
  54. }
  55. return jobId;
  56. }
  57. }

五,测试效果

1)我们现实拿example的案例代码,打包提交到集群

  1. public class SocketWindowWordCount {
  2. private static final Logger logger = Logger.getLogger(SocketWindowWordCount.class);
  3. public static void main(String[] args) throws Exception {
  4. // the host and the port to connect to
  5. final String hostname;
  6. final int port;
  7. try {
  8. final ParameterTool params = ParameterTool.fromArgs(args);
  9. hostname = params.has("hostname") ? params.get("hostname") : "localhost";
  10. port = params.getInt("port");
  11. } catch (Exception e) {
  12. System.err.println("No port specified. Please run 'SocketWindowWordCount " +
  13. "--hostname <hostname> --port <port>', where hostname (localhost by default) " +
  14. "and port is the address of the text server");
  15. System.err.println("To start a simple text server, run 'netcat -l <port>' and " +
  16. "type the input text into the command line");
  17. return;
  18. }
  19. // get the execution environment
  20. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  21. // get input data by connecting to the socket
  22. DataStream<String> text = env.socketTextStream(hostname, port, "\n");
  23. text.print("数据源");
  24. // parse the data, group it, window it, and aggregate the counts
  25. DataStream<WordWithCount> windowCounts = text
  26. .flatMap(new FlatMapFunction<String, WordWithCount>() {
  27. @Override
  28. public void flatMap(String value, Collector<WordWithCount> out) {
  29. for (String word : value.split("\\s")) {
  30. out.collect(new WordWithCount(word, 1L));
  31. }
  32. }
  33. })
  34. .keyBy(value -> "aaa")
  35. .process(new KeyedProcessFunction<String, WordWithCount, WordWithCount>() {
  36. private transient ValueState<Long> valueState;
  37. @Override
  38. public void open(Configuration parameters) throws Exception {
  39. super.open(parameters);
  40. ValueStateDescriptor<Long> valueStateDescriptor = new ValueStateDescriptor("totalAmountState",
  41. TypeInformation.of(new TypeHint<Long>() {
  42. }));
  43. valueState = this.getRuntimeContext().getState(valueStateDescriptor);
  44. }
  45. @Override
  46. public void processElement(WordWithCount wordWithCount, Context context, Collector<WordWithCount> out) throws Exception {
  47. Long value = valueState.value();
  48. Long counts = wordWithCount.count;
  49. if (value != null){
  50. System.out.println("打印内存state = "+value);
  51. logger.error("打印内存state = "+value);
  52. value = value+counts;
  53. valueState.update(value);
  54. out.collect(wordWithCount);
  55. }else {
  56. value = counts;
  57. valueState.update(value);
  58. }
  59. }
  60. });
  61. // print the results with a single thread, rather than in parallel
  62. windowCounts.print().setParallelism(1);
  63. env.execute("Socket Window WordCount");
  64. }
  65. // ------------------------------------------------------------------------
  66. /**
  67. * Data type for words with count.
  68. */
  69. public static class WordWithCount {
  70. public String word;
  71. public long count;
  72. public WordWithCount() {}
  73. public WordWithCount(String word, long count) {
  74. this.word = word;
  75. this.count = count;
  76. }
  77. @Override
  78. public String toString() {
  79. return word + " : " + count;
  80. }
  81. }
  82. }

2)运行任务脚本 :

export HADOOP_CLASSPATH=/opt/cloudera/parcels/CDH/jars/*;./bin/flink run -m yarn-cluster \
-yD yarn.containers.vcores=2 \
./examples/streaming/SocketWindowWordCount.jar —hostname 192.168.6.31 —port 12345

3)我们在linux环境 192.168.6.31节点执行 nc -l 12345

随意输入点数据

2020091617384893.png

4)我们这个时候查看yarn任务打印输出:

假定state = 11

5)我们执行main方法,干掉任务,然后查看hdfs:

发现生成了文件…………..

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzMxODY2Nzkz_size_16_color_FFFFFF_t_70 1

6)再次启动任务 ,输入nc -l 12345 输入一条数据,发现state打印是从上次state=12开始的,验证成功,savapoint有效果

20200916174111997.png

发表评论

表情:
评论列表 (有 0 条评论,382人围观)

还没有评论,来说两句吧...

相关阅读