spark streaming从指定offset处消费Kafka数据

﹏ヽ暗。殇╰゛Y 2022-04-02 05:46 504阅读 0赞

一. 前言

当spark streaming程序意外退出时,数据仍然再往Kafka中推送,然而由于Kafka默认是从latest的offset读取,这会导致数据丢失。为了避免数据丢失,那么我们需要记录每次消费的offset,以便下次检查并且从指定的offset开始读取

二. 环境

kafka-0.9.0、spark-1.6.0、jdk-1.7、scala-2.10.5、idea16

三. 实现代码

1. 引入spark和kafka的相关依赖包

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xmlns="http://maven.apache.org/POM/4.0.0"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>com.ngaa</groupId>
  7. <artifactId>test-my</artifactId>
  8. <version>1.0-SNAPSHOT</version>
  9. <inceptionYear>2008</inceptionYear>
  10. <properties>
  11. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  12. <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
  13. <!--add maven release-->
  14. <maven.compiler.source>1.7</maven.compiler.source>
  15. <maven.compiler.target>1.7</maven.compiler.target>
  16. <encoding>UTF-8</encoding>
  17. <!--scala版本-->
  18. <scala.version>2.10.5</scala.version>
  19. <!--测试机器上的scala版本-->
  20. <test.scala.version>2.11.7</test.scala.version>
  21. <jackson.version>2.3.0</jackson.version>
  22. <!--slf4j版本-->
  23. <slf4j-version>1.7.20</slf4j-version>
  24. <!--cdh-spark-->
  25. <spark.cdh.version>1.6.0-cdh5.8.0</spark.cdh.version>
  26. <spark.streaming.cdh.version>1.6.0-cdh5.8.0</spark.streaming.cdh.version>
  27. <kafka.spark.cdh.version>1.6.0-cdh5.8.0</kafka.spark.cdh.version>
  28. <!--cdh-hadoop-->
  29. <hadoop.cdh.version>2.6.0-cdh5.8.0</hadoop.cdh.version>
  30. <!--http client必需要兼容CDH中的hadoop版本(cd /opt/cloudera/parcels/CDH/lib/hadoop/lib)-->
  31. <httpclient.version>4.2.5</httpclient.version>
  32. <!--http copre-->
  33. <httpcore.version>4.2.5</httpcore.version>
  34. <!--fastjson-->
  35. <fastjson.version>1.1.39</fastjson.version>
  36. </properties>
  37. <repositories>
  38. <repository>
  39. <id>scala-tools.org</id>
  40. <name>Scala-Tools Maven2 Repository</name>
  41. <url>http://scala-tools.org/repo-releases</url>
  42. </repository>
  43. <!--配置依赖库地址(用于加载CDH依赖的jar包) -->
  44. <repository>
  45. <id>cloudera</id>
  46. <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
  47. </repository>
  48. </repositories>
  49. <pluginRepositories>
  50. <pluginRepository>
  51. <id>scala-tools.org</id>
  52. <name>Scala-Tools Maven2 Repository</name>
  53. <url>http://scala-tools.org/repo-releases</url>
  54. </pluginRepository>
  55. </pluginRepositories>
  56. <dependencies>
  57. <!--fastjson-->
  58. <dependency>
  59. <groupId>com.alibaba</groupId>
  60. <artifactId>fastjson</artifactId>
  61. <version>${fastjson.version}</version>
  62. </dependency>
  63. <!--httpclient-->
  64. <dependency>
  65. <groupId>org.apache.httpcomponents</groupId>
  66. <artifactId>httpclient</artifactId>
  67. <version>${httpclient.version}</version>
  68. </dependency>
  69. <!--http core-->
  70. <dependency>
  71. <groupId>org.apache.httpcomponents</groupId>
  72. <artifactId>httpcore</artifactId>
  73. <version>${httpcore.version}</version>
  74. </dependency>
  75. <!--slf4j-->
  76. <dependency>
  77. <groupId>org.slf4j</groupId>
  78. <artifactId>slf4j-log4j12</artifactId>
  79. <version>${slf4j-version}</version>
  80. </dependency>
  81. <!--hadoop-->
  82. <dependency>
  83. <groupId>org.apache.hadoop</groupId>
  84. <artifactId>hadoop-client</artifactId>
  85. <version>${hadoop.cdh.version}</version>
  86. <exclusions>
  87. <exclusion>
  88. <groupId>javax.servlet</groupId>
  89. <artifactId>*</artifactId>
  90. </exclusion>
  91. </exclusions>
  92. </dependency>
  93. <dependency>
  94. <groupId>org.apache.hadoop</groupId>
  95. <artifactId>hadoop-common</artifactId>
  96. <version>${hadoop.cdh.version}</version>
  97. <exclusions>
  98. <exclusion>
  99. <groupId>javax.servlet</groupId>
  100. <artifactId>*</artifactId>
  101. </exclusion>
  102. </exclusions>
  103. </dependency>
  104. <dependency>
  105. <groupId>org.apache.hadoop</groupId>
  106. <artifactId>hadoop-hdfs</artifactId>
  107. <version>${hadoop.cdh.version}</version>
  108. <exclusions>
  109. <exclusion>
  110. <groupId>javax.servlet</groupId>
  111. <artifactId>*</artifactId>
  112. </exclusion>
  113. </exclusions>
  114. </dependency>
  115. <!--spark scala-->
  116. <dependency>
  117. <groupId>org.scala-lang</groupId>
  118. <artifactId>scala-library</artifactId>
  119. <version>${scala.version}</version>
  120. </dependency>
  121. <dependency>
  122. <groupId>com.fasterxml.jackson.core</groupId>
  123. <artifactId>jackson-databind</artifactId>
  124. <version>${jackson.version}</version>
  125. </dependency>
  126. <!--spark streaming和kafka的相关包-->
  127. <dependency>
  128. <groupId>org.apache.spark</groupId>
  129. <artifactId>spark-streaming_2.10</artifactId>
  130. <version>${spark.streaming.cdh.version}</version>
  131. </dependency>
  132. <dependency>
  133. <groupId>org.apache.spark</groupId>
  134. <artifactId>spark-streaming-kafka_2.10</artifactId>
  135. <version>${kafka.spark.cdh.version}</version>
  136. </dependency>
  137. <dependency>
  138. <groupId>junit</groupId>
  139. <artifactId>junit</artifactId>
  140. <version>4.12</version>
  141. <scope>test</scope>
  142. </dependency>
  143. <!--引入windows本地库的spark包-->
  144. <dependency>
  145. <groupId>org.apache.spark</groupId>
  146. <artifactId>spark-assembly_2.10</artifactId>
  147. <version>${spark.cdh.version}</version>
  148. <scope>system</scope>
  149. <systemPath>D:/crt_send_document/spark-assembly-1.6.0-cdh5.8.0-hadoop2.6.0-cdh5.8.0.jar</systemPath>
  150. </dependency>
  151. <!--引入测试环境linux本地库的spark包-->
  152. <!--<dependency>-->
  153. <!--<groupId>org.apache.spark</groupId>-->
  154. <!--<artifactId>spark-assembly_2.10</artifactId>-->
  155. <!--<version>${spark.cdh.version}</version>-->
  156. <!--<scope>system</scope>-->
  157. <!--<systemPath>/opt/cloudera/parcels/CDH/lib/spark/lib/spark-examples-1.6.0-cdh5.8.0-hadoop2.6.0-cdh5.8.0.jar-->
  158. <!--</systemPath>-->
  159. <!--</dependency>-->
  160. <!--引入中央仓库的spark包-->
  161. <!--<dependency>-->
  162. <!--<groupId>org.apache.spark</groupId>-->
  163. <!--<artifactId>spark-assembly_2.10</artifactId>-->
  164. <!--<version>${spark.cdh.version}</version>-->
  165. <!--</dependency>-->
  166. <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-yarn-server-web-proxy -->
  167. <dependency>
  168. <groupId>org.apache.hadoop</groupId>
  169. <artifactId>hadoop-yarn-server-web-proxy</artifactId>
  170. <version>2.6.0-cdh5.8.0</version>
  171. </dependency>
  172. </dependencies>
  173. <!--maven打包-->
  174. <build>
  175. <finalName>test-my</finalName>
  176. <sourceDirectory>src/main/scala</sourceDirectory>
  177. <testSourceDirectory>src/test/scala</testSourceDirectory>
  178. <plugins>
  179. <plugin>
  180. <groupId>org.scala-tools</groupId>
  181. <artifactId>maven-scala-plugin</artifactId>
  182. <version>2.15.2</version>
  183. <executions>
  184. <execution>
  185. <goals>
  186. <goal>compile</goal>
  187. <goal>testCompile</goal>
  188. </goals>
  189. </execution>
  190. </executions>
  191. <configuration>
  192. <scalaVersion>${scala.version}</scalaVersion>
  193. <args>
  194. <arg>-target:jvm-1.7</arg>
  195. </args>
  196. </configuration>
  197. </plugin>
  198. <plugin>
  199. <groupId>org.apache.maven.plugins</groupId>
  200. <artifactId>maven-eclipse-plugin</artifactId>
  201. <configuration>
  202. <downloadSources>true</downloadSources>
  203. <buildcommands>
  204. <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
  205. </buildcommands>
  206. <additionalProjectnatures>
  207. <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
  208. </additionalProjectnatures>
  209. <classpathContainers>
  210. <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
  211. <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
  212. </classpathContainers>
  213. </configuration>
  214. </plugin>
  215. <plugin>
  216. <artifactId>maven-assembly-plugin</artifactId>
  217. <configuration>
  218. <descriptorRefs>
  219. <descriptorRef>jar-with-dependencies</descriptorRef>
  220. </descriptorRefs>
  221. <archive>
  222. <manifest>
  223. <mainClass></mainClass>
  224. </manifest>
  225. </archive>
  226. </configuration>
  227. <executions>
  228. <execution>
  229. <id>make-assembly</id>
  230. <phase>package</phase>
  231. <goals>
  232. <goal>single</goal>
  233. </goals>
  234. </execution>
  235. </executions>
  236. </plugin>
  237. </plugins>
  238. </build>
  239. <reporting>
  240. <plugins>
  241. <plugin>
  242. <groupId>org.scala-tools</groupId>
  243. <artifactId>maven-scala-plugin</artifactId>
  244. <configuration>
  245. <scalaVersion>${scala.version}</scalaVersion>
  246. </configuration>
  247. </plugin>
  248. </plugins>
  249. </reporting>
  250. </project>

2. 新建测试类

  1. import kafka.common.TopicAndPartition
  2. import kafka.message.MessageAndMetadata
  3. import kafka.serializer.StringDecoder
  4. import org.apache.log4j.{Level, Logger}
  5. import org.apache.spark.{SparkConf, TaskContext}
  6. import org.apache.spark.streaming.dstream.InputDStream
  7. import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
  8. import org.apache.spark.streaming.{Seconds, StreamingContext}
  9. import org.slf4j.LoggerFactory
  10. /**
  11. * Created by yangjf on 2016/12/18
  12. * Update date:
  13. * Time: 11:10
  14. * Describle :从指定偏移量读取kafka数据
  15. * Result of Test:
  16. * Command:
  17. * Email: jifei.yang@ngaa.com.cn
  18. */
  19. object ReadBySureOffsetTest {
  20. val logger = LoggerFactory.getLogger(ReadBySureOffsetTest.getClass)
  21. def main(args: Array[String]) {
  22. //设置打印日志级别
  23. Logger.getLogger("org.apache.kafka").setLevel(Level.ERROR)
  24. Logger.getLogger("org.apache.zookeeper").setLevel(Level.ERROR)
  25. Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
  26. logger.info("测试从指定offset消费kafka的主程序开始")
  27. if (args.length < 1) {
  28. System.err.println("Your arguments were " + args.mkString(","))
  29. System.exit(1)
  30. logger.info("主程序意外退出")
  31. }
  32. //hdfs://hadoop1:8020/user/root/spark/checkpoint
  33. val Array(checkpointDirectory) = args
  34. logger.info("checkpoint检查:" + checkpointDirectory)
  35. val ssc = StreamingContext.getOrCreate(checkpointDirectory,
  36. () => {
  37. createContext(checkpointDirectory)
  38. })
  39. logger.info("streaming开始启动")
  40. ssc.start()
  41. ssc.awaitTermination()
  42. }
  43. def createContext(checkpointDirectory: String): StreamingContext = {
  44. //获取配置
  45. val brokers = "hadoop3:9092,hadoop4:9092"
  46. val topics = "20161218a"
  47. //默认为5秒
  48. val split_rdd_time = 8
  49. // 创建上下文
  50. val sparkConf = new SparkConf()
  51. .setAppName("SendSampleKafkaDataToApple").setMaster("local[2]")
  52. .set("spark.app.id", "streaming_kafka")
  53. val ssc = new StreamingContext(sparkConf, Seconds(split_rdd_time))
  54. ssc.checkpoint(checkpointDirectory)
  55. // 创建包含brokers和topic的直接kafka流
  56. val topicsSet: Set[String] = topics.split(",").toSet
  57. //kafka配置参数
  58. val kafkaParams: Map[String, String] = Map[String, String](
  59. "metadata.broker.list" -> brokers,
  60. "group.id" -> "apple_sample",
  61. "serializer.class" -> "kafka.serializer.StringEncoder"
  62. // "auto.offset.reset" -> "largest" //自动将偏移重置为最新偏移(默认)
  63. // "auto.offset.reset" -> "earliest" //自动将偏移重置为最早的偏移
  64. // "auto.offset.reset" -> "none" //如果没有为消费者组找到以前的偏移,则向消费者抛出异常
  65. )
  66. /**
  67. * 从指定位置开始读取kakfa数据
  68. * 注意:由于Exactly Once的机制,所以任何情况下,数据只会被消费一次!
  69. * 指定了开始的offset后,将会从上一次Streaming程序停止处,开始读取kafka数据
  70. */
  71. val offsetList = List((topics, 0, 22753623L),(topics, 1, 327041L)) //指定topic,partition_no,offset
  72. val fromOffsets = setFromOffsets(offsetList) //构建参数
  73. val messageHandler = (mam: MessageAndMetadata[String, String]) => (mam.topic, mam.message()) //构建MessageAndMetadata
  74. //使用高级API从指定的offset开始消费,欲了解详情,
  75. //请进入"http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$"查看
  76. val messages: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)
  77. //数据操作
  78. messages.foreachRDD(mess => {
  79. //获取offset集合
  80. val offsetsList = mess.asInstanceOf[HasOffsetRanges].offsetRanges
  81. mess.foreachPartition(lines => {
  82. lines.foreach(line => {
  83. val o: OffsetRange = offsetsList(TaskContext.get.partitionId)
  84. logger.info("++++++++++++++++++++++++++++++此处记录offset+++++++++++++++++++++++++++++++++++++++")
  85. logger.info(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
  86. logger.info("+++++++++++++++++++++++++++++++此处消费数据操作++++++++++++++++++++++++++++++++++++++")
  87. logger.info("The kafka line is " + line)
  88. })
  89. })
  90. })
  91. ssc
  92. }
  93. //构建Map
  94. def setFromOffsets(list: List[(String, Int, Long)]): Map[TopicAndPartition, Long] = {
  95. var fromOffsets: Map[TopicAndPartition, Long] = Map()
  96. for (offset <- list) {
  97. val tp = TopicAndPartition(offset._1, offset._2)//topic和分区数
  98. fromOffsets += (tp -> offset._3) // offset位置
  99. }
  100. fromOffsets
  101. }
  102. }

四. 参考文档:

  1. 1spark API http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$
  2. 2Kafka官方配置说明:http://kafka.apache.org/documentation.html#configuration
  3. 3Kafka SampleConsumerhttps://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
  4. 4Spark streaming 消费遍历offset说明:http://spark.apache.org/docs/1.6.0/streaming-kafka-integration.html
  5. 5Kafka官方API说明:http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

转自:https://blog.csdn.net/high2011/article/details/53706446

发表评论

表情:
评论列表 (有 0 条评论,504人围观)

还没有评论,来说两句吧...

相关阅读