Flink流计算WordCount代码示例

墨蓝 2022-09-11 06:13 758阅读 0赞

代码

  1. package com.zxl.flink
  2. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  3. /** * flink的流计算的WordCount */
  4. object FlinkStreamWordCount {
  5. def main(args: Array[String]): Unit = {
  6. //1、初始化Flink流计算的环境
  7. val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  8. //修改并行度
  9. streamEnv.setParallelism(1) //默认所有算子的并行度为1
  10. //2、导入隐式转换
  11. import org.apache.flink.streaming.api.scala._
  12. //3、读取数据,读取sock流中的数据
  13. //val stream: DataStream[String] = streamEnv.socketTextStream("hadoop101",8888) //DataStream ==> spark 中Dstream
  14. //nc -lk 8888
  15. val stream: DataStream[String] = streamEnv.socketTextStream("localhost",8888) //DataStream ==> spark 中Dstream
  16. //4、转换和处理数据
  17. val result: DataStream[(String, Int)] = stream.flatMap(_.split(" "))
  18. .map((_, 1)).setParallelism(2)
  19. .keyBy(0)//分组算子 : 0 或者 1 代表下标。前面的DataStream[二元组] , 0代表单词 ,1代表单词出现的次数
  20. .sum(1).setParallelism(2) //聚会累加算子
  21. //5、打印结果
  22. result.print("结果").setParallelism(1)
  23. //6、启动流计算程序
  24. streamEnv.execute("wordcount")
  25. }
  26. }

调试

发数据

在这里插入图片描述

console日志输出

  1. 结果> (scala,1)
  2. 结果> (hello,1)
  3. 结果> (zxl,1)
  4. 结果> (hello,2)
  5. 结果> (hello,3)
  6. 结果> (flink,1)
  7. 结果> (hello,4)
  8. 结果> (spark,1)
  9. 结果> (filne,1)
  10. 结果> (flink,2)

附pom文件

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  3. <modelVersion>4.0.0</modelVersion>
  4. <groupId>com.zxl</groupId>
  5. <artifactId>Flink-SXT</artifactId>
  6. <version>1.0-SNAPSHOT</version>
  7. <dependencies>
  8. <dependency>
  9. <groupId>org.apache.flink</groupId>
  10. <artifactId>flink-scala_2.11</artifactId>
  11. <version>1.9.1</version>
  12. </dependency>
  13. <dependency>
  14. <groupId>org.apache.flink</groupId>
  15. <artifactId>flink-streaming-scala_2.11</artifactId>
  16. <version>1.9.1</version>
  17. </dependency>
  18. <dependency>
  19. <groupId>org.apache.hadoop</groupId>
  20. <artifactId>hadoop-common</artifactId>
  21. <version>2.7.2</version>
  22. </dependency>
  23. <dependency>
  24. <groupId>org.apache.hadoop</groupId>
  25. <artifactId>hadoop-client</artifactId>
  26. <version>2.7.2</version>
  27. </dependency>
  28. <dependency>
  29. <groupId>org.apache.flink</groupId>
  30. <artifactId>flink-connector-kafka_2.11</artifactId>
  31. <version>1.9.1</version>
  32. </dependency>
  33. <dependency>
  34. <groupId>org.apache.kafka</groupId>
  35. <artifactId>kafka-clients</artifactId>
  36. <version>0.11.0.3</version>
  37. </dependency>
  38. <dependency>
  39. <groupId>org.apache.flink</groupId>
  40. <artifactId>flink-connector-filesystem_2.11</artifactId>
  41. <version>1.9.1</version>
  42. </dependency>
  43. <dependency>
  44. <groupId>org.apache.bahir</groupId>
  45. <artifactId>flink-connector-redis_2.11</artifactId>
  46. <version>1.0</version>
  47. </dependency>
  48. <dependency>
  49. <groupId>mysql</groupId>
  50. <artifactId>mysql-connector-java</artifactId>
  51. <version>5.1.44</version>
  52. </dependency>
  53. <dependency>
  54. <groupId>org.apache.flink</groupId>
  55. <artifactId>flink-table-planner_2.11</artifactId>
  56. <version>1.9.1</version>
  57. </dependency>
  58. <dependency>
  59. <groupId>org.apache.flink</groupId>
  60. <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
  61. <version>1.9.1</version>
  62. </dependency>
  63. <dependency>
  64. <groupId>org.apache.flink</groupId>
  65. <artifactId>flink-cep-scala_2.11</artifactId>
  66. <version>1.9.1</version>
  67. </dependency>
  68. </dependencies>
  69. <build>
  70. <plugins>
  71. <!-- 该插件用于将Scala代码编译成class文件 -->
  72. <plugin>
  73. <groupId>net.alchim31.maven</groupId>
  74. <artifactId>scala-maven-plugin</artifactId>
  75. <version>3.4.6</version>
  76. <executions>
  77. <execution>
  78. <!-- 声明绑定到maven的compile阶段 -->
  79. <goals>
  80. <goal>testCompile</goal>
  81. </goals>
  82. </execution>
  83. </executions>
  84. </plugin>
  85. <plugin>
  86. <groupId>org.apache.maven.plugins</groupId>
  87. <artifactId>maven-assembly-plugin</artifactId>
  88. <version>3.0.0</version>
  89. <configuration>
  90. <descriptorRefs>
  91. <descriptorRef>jar-with-dependencies</descriptorRef>
  92. </descriptorRefs>
  93. </configuration>
  94. <executions>
  95. <execution>
  96. <id>make-assembly</id>
  97. <phase>package</phase>
  98. <goals>
  99. <goal>single</goal>
  100. </goals>
  101. </execution>
  102. </executions>
  103. </plugin>
  104. </plugins>
  105. </build>
  106. </project>

发表评论

表情:
评论列表 (有 0 条评论,758人围观)

还没有评论,来说两句吧...

相关阅读