七、Structured Streaming Kafka

朱雀 2024-04-18 16:01 212阅读 0赞

1、pom.xml

  1. <properties>
  2. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  3. <maven.compiler.source>1.8</maven.compiler.source>
  4. <maven.compiler.target>1.8</maven.compiler.target>
  5. <maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion>
  6. <scala.version>2.11.4</scala.version>
  7. <spark.version>2.4.1</spark.version>
  8. </properties>
  9. <repositories>
  10. <repository>
  11. <id>scala-tools.org</id>
  12. <name>Scala-Tools Maven2 Repository</name>
  13. <url>http://scala-tools.org/repo-releases</url>
  14. </repository>
  15. </repositories>
  16. <pluginRepositories>
  17. <pluginRepository>
  18. <id>scala-tools.org</id>
  19. <name>Scala-Tools Maven2 Repository</name>
  20. <url>http://scala-tools.org/repo-releases</url>
  21. </pluginRepository>
  22. </pluginRepositories>
  23. <dependencies>
  24. <dependency>
  25. <groupId>org.scala-lang</groupId>
  26. <artifactId>scala-library</artifactId>
  27. <version>${scala.version}</version>
  28. </dependency>
  29. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 -->
  30. <dependency>
  31. <groupId>org.apache.spark</groupId>
  32. <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
  33. <version>2.4.4</version>
  34. <!--<scope>provided</scope>-->
  35. </dependency>
  36. <dependency>
  37. <groupId>org.apache.spark</groupId>
  38. <artifactId>spark-streaming_2.11</artifactId>
  39. <version>${spark.version}</version>
  40. <scope>${scope}</scope>
  41. </dependency>
  42. <dependency>
  43. <groupId>org.apache.spark</groupId>
  44. <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
  45. <version>${spark.version}</version>
  46. </dependency>
  47. <dependency>
  48. <groupId>org.apache.spark</groupId>
  49. <artifactId>spark-sql_2.11</artifactId>
  50. <version>${spark.version}</version>
  51. <!--<scope>${scope}</scope>-->
  52. </dependency>
  53. <dependency>
  54. <groupId>org.apache.spark</groupId>
  55. <artifactId>spark-core_2.11</artifactId>
  56. <version>${spark.version}</version>
  57. <!--<scope>${scope}</scope>-->
  58. </dependency>
  59. <dependency>
  60. <groupId>org.apache.spark</groupId>
  61. <artifactId>spark-sql_2.11</artifactId>
  62. <version>${spark.version}</version>
  63. <!--<scope>provided</scope>-->
  64. </dependency>
  65. <dependency>
  66. <groupId>mysql</groupId>
  67. <artifactId>mysql-connector-java</artifactId>
  68. <version>5.1.34</version>
  69. </dependency>
  70. </dependencies>

2、Bean

  1. 创建所需要的Bean
  2. @JsonIgnoreProperties(ignoreUnknown = true)
  3. class DateBean extends Serializable {
  4. var item:Array[ItemBean] = Array.empty
  5. var sid = ""
  6. var roomid = ""
  7. var sourcelink=""
  8. }

3、工具类

  1. object KafkaStreamMapUtil {
  2. def kafkaStreamToDStream(df:DataFrame, chatTypeName:String):
  3. RDD[Array[(Array[MessageBean],Array[GiftBean])]] = {
  4. df.rdd.map(row=>{
  5. val jsonParse = new JsonParser()
  6. val gs =new Gson()
  7. var msg:scala.Array[(Array[MessageBean],Array[GiftBean])]=null
  8. val sdf = new SimpleDateFormat("yyyyMMdd")
  9. try {
  10. // println(line._2)
  11. val je=jsonParse.parse(row.getAs[String]("value"))
  12. var dbArr:Array[DateBean]=null
  13. if(je.isJsonArray){
  14. dbArr= gs.fromJson(je,classOf[Array[DateBean]])
  15. }else{
  16. val dbb= gs.fromJson(je,classOf[DateBean])
  17. dbArr= new Array[DateBean](1)
  18. dbArr(0)=dbb
  19. }
  20. msg = dbArr.map(m=>{
  21. var p=""
  22. var r=""
  23. var mArr:Array[MBean]=Array.empty
  24. var gArr:Array[GBean]=Array.empty
  25. if(m!=null & m.item!=null & m.item.length>0){
  26. p=m.sid
  27. r=m.r
  28. var gBean :GBean= null
  29. var mBean :MBean = null
  30. m.item.foreach(x=> {
  31. gBean = new GBean
  32. mBean = new MBean
  33. if(x!=null){
  34. if(x.typeName.equals(chatTypeName)){
  35. mBean .p =p
  36. mBean .r =r
  37. mBean .f = x.f
  38. mBean .timestamp =x.time
  39. mBean .content=x.content
  40. val date = sdf.format(new Date(x.time.toLong * 1000))
  41. mBean .date = date
  42. mArr :+= mBean
  43. } else{
  44. if(!x.price.trim.equals("")){
  45. // println(line._2)
  46. gBean .p = p
  47. gBean .r =r
  48. gBean .f = x.f
  49. gBean .fname = x.fname
  50. gBean .g_type= x.g_type
  51. gBean .price = x.price.toDouble
  52. gBean .count = x.count
  53. gBean .timestamp = x.time
  54. gBean .g_name=x.gname
  55. gBean .g_id=x.gid
  56. val date = sdf.format(new Date(x.time.toLong * 1000))
  57. gBean .date = date
  58. gArr :+= gBean
  59. }
  60. }
  61. }
  62. })
  63. }
  64. (msgArr,giftArr)
  65. })
  66. } catch {
  67. case e: Exception =>println(e.getMessage)
  68. }
  69. msg
  70. })
  71. }
  72. }

4、主类

  1. object KafkaSourceApp {
  2. def main(args: Array[String]): Unit = {
  3. Logger.getLogger("org").setLevel(Level.ERROR)
  4. val sqLContext = SparkSession.builder().appName(" structured_streaming_kafka_App")
  5. .master("local[*]").getOrCreate()
  6. val df = sqLContext
  7. .readStream
  8. .format("kafka")
  9. .option("kafka.bootstrap.servers", "brokerid")
  10. .option("subscribe", "topic")
  11. .load()
  12. val strDF = df
  13. .selectExpr( "CAST(value AS STRING)")
  14. val chatTypeName="chat"
  15. strDF.writeStream.format("console")
  16. .option("checkpointLocation", "path/")
  17. .foreachBatch((df,id)=>{
  18. println(s"batchid = $id")
  19. import sqLContext.implicits._
  20. val transRDD = KafkaStreamMapUtil.kafkaStreamToDStream(df,chatTypeName)
  21. val msgDF = transRDD.flatMap(x=>x)
  22. .flatMap(_._1)
  23. .map(e=>(e.p,e.r,e.f,e.content,e.timestamp,e.date))
  24. .toDF("p","r","f","content","timestamp","date")
  25. .selectExpr("*","platform_id as plat")
  26. val giftDF =transRDD.flatMap(x=>x)
  27. .flatMap(_._2)
  28. .map(e=>(e.p,e.r,e.f,e.f_name,e.g_type,e.price,e.count,e.timestamp,e.date,e.g_name,e.g_id))
  29. .toDF("p","r","f","fname","g_type",
  30. "price","count","timestamp","date","g_name","g_id")
  31. .selectExpr("*","platform_id as plat")
  32. msgDF.show()
  33. giftDF.show()
  34. }).start().awaitTermination()
  35. }
  36. }

发表评论

表情:
评论列表 (有 0 条评论,212人围观)

还没有评论,来说两句吧...

相关阅读