kafka+sparkStreaming+mysql 不念不忘少年蓝@ 2023-02-21 12:26 128阅读 0赞 # 一、说明 # 1、一个程序模拟用户每个时间点到达的地方和走的步数信息,并实时写入kafka主题;sparkStreaming实时从kafka消费这些信息进行分析并存储到mysql;这里直接存储到mysql; 2、sparkStreaming存储mysql的最好思路为这样: ![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2h5al9raW5n_size_16_color_FFFFFF_t_70][] 3、mysql要提前创建表 create table walk_info(user varchar(20),counttime varchar(40),walkplace nvarchar(100),newwalknum int(20)); # 二、 代码实现 # ## 1、maven依赖 ## <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.spark.demo</groupId> <artifactId>sparkProgram</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.11.8</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-compiler</artifactId> <version>2.11.8</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-reflect</artifactId> <version>2.11.8</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.2.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.2.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.2.1</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.2.1</version> </dependency> <dependency><!-- Spark Streaming Kafka --> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_2.10</artifactId> <version>1.6.3</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId> <version>2.2.1</version> </dependency> <dependency> <groupId>org.gavaghan</groupId> <artifactId>geodesy</artifactId> <version>1.1.3</version> </dependency> <dependency> <groupId>com.github.scopt</groupId> <artifactId>scopt_2.11</artifactId> <version>3.7.0</version> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.2.4</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.9.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.codehaus.jettison/jettison --> <dependency> <groupId>org.codehaus.jettison</groupId> <artifactId>jettison</artifactId> <version>1.1</version> </dependency> <!-- https://mvnrepository.com/artifact/net.sf.json-lib/json-lib --> <dependency> <groupId>net.sf.json-lib</groupId> <artifactId>json-lib</artifactId> <version>2.4</version> <classifier>jdk15</classifier> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-pool2 --> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <version>2.4.2</version> </dependency> <!-- orcale驱动 --> <dependency> <groupId>com.oracle</groupId> <artifactId>ojdbc6</artifactId> <version>12.1.0.1-atlassian-hosted</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client --> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.2.6</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-server --> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.2.6</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive-thriftserver --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive-thriftserver_2.11</artifactId> <version>2.1.1</version> <scope>provided</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.11.0.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>1.0.2</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.0.2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams-scala --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams-scala_2.11</artifactId> <version>2.4.0</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.38</version> </dependency> <dependency> <groupId>commons-dbcp</groupId> <artifactId>commons-dbcp</artifactId> <version>1.4</version> </dependency> </dependencies> <build> <finalName>CountWalk-1.0.0</finalName> <pluginManagement> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.0</version> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>2.12.4</version> </plugin> </plugins> </pluginManagement> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <!-- <descriptor>src/main/resources/assembly.xml</descriptor> --> <appendAssemblyId>false</appendAssemblyId> </configuration> </plugin> <!-- 拷贝依赖的jar包到lib目录 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> <executions> <execution> <id>copy</id> <phase>package</phase> <goals> <goal>copy-dependencies</goal> </goals> <configuration> <outputDirectory> ${project.build.directory}/lib </outputDirectory> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <version>2.15.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project> ## 2、模拟数据代码 ## package com.hyj.util import java.util.Properties import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.codehaus.jettison.json.JSONObject import scala.util.Random /** * 编写一个提交数据到kafka集群的producer * 模拟场景: * 统计一些用户实时步行的总步数,每隔1s统计一次,包括某个用户新统计时的时间、所在地点、新增步数; */ object KafkaEventProducer { //用户 private val users = Array( "zhangSan", "liSi", "wangWu", "xiaoQiang" ) private var pointer = -1 //随机获得用户 def getUser(): String = { pointer = (pointer + 1) % users.length users(pointer) } //获取新增步数 val random = new Random() // def getNewStepNum(): Int = { random.nextInt(users.length)+1 } //获取统计时间 def getTime(): Long = { System.currentTimeMillis() } //获取行走地点 val walkPlace = Array( "操场南门", "操场东门", "操场北门", "操场西门", "操场东南门", "操场西北门", "操场西南门", "操场东北门" ) def getWalkPlace(): String = { walkPlace(random.nextInt(walkPlace.length)) } def main(args: Array[String]): Unit = { val topic = "topic_walkCount" val brokers = "192.168.230.21:6667,192.168.230.22:6667,192.168.230.23:6667" //设置属性,配置 val props = new Properties() props.setProperty("bootstrap.servers", brokers) props.setProperty("metadata.broker.list", brokers) props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") //生成producer对象 val producer = new KafkaProducer[String, String](props) //传输数据 while (true) { val event = new JSONObject() event.put("user", getUser()) //.put("count_time", getTime()) .put("count_time", TimeUtil.tranTimeToString(getTime().toString)) .put("walk_place", getWalkPlace()) .put("new_walkNum", getNewStepNum()) println(event.toString()) //发送数据 producer.send(new ProducerRecord[String, String](topic, event.toString)) Thread.sleep(1000) } } } ## 3、数据库连接池 ## package com.hyj.util; import java.sql.Connection; import java.sql.DriverManager; import java.util.LinkedList; public class MySqlConnectionPool { private static LinkedList<Connection> connectionQueue; static { try { Class.forName("com.mysql.jdbc.Driver"); }catch (ClassNotFoundException e) { e.printStackTrace(); } } public synchronized static Connection getConnection() { try { if (connectionQueue == null) { connectionQueue = new LinkedList<Connection>(); for (int i = 0;i < 5;i ++) { Connection conn = DriverManager.getConnection( "jdbc:mysql://192.168.230.21:3306/test?characterEncoding=utf8&useSSL=true", "root", "123456" ); connectionQueue.push(conn); } } }catch (Exception e) { e.printStackTrace(); } return connectionQueue.poll(); } public static void returnConnection(Connection conn) { connectionQueue.push(conn); } } ## 4、工具类 ## package com.hyj.util import java.text.SimpleDateFormat import java.util.{Calendar, Date} object TimeUtil { //时间转化为时间戳 def tranTimeToLong(tm:String) :Long={ val fm = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") val dt = fm.parse(tm) val aa = fm.format(dt) val tim: Long = dt.getTime() tim } //时间戳转化为时间 def tranTimeToString(tm:String) :String={ val fm = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") val tim = fm.format(new Date(tm.toLong)) tim } //得到本月最后一天的日期 def getLastDateOfMonth():String={ val now: Date = new Date(); val dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd"); val dateNow = dateFormat.format(now); val day =dateNow.substring(0,4)+dateNow.substring(5,7)+dateNow.substring(8,10); val year=dateNow.substring(0,4).toInt; val month=dateNow.substring(5,7).toInt; val cal = Calendar.getInstance(); cal.set(Calendar.YEAR, year); //cal.set(Calendar.MONTH, month-2);//上个月最后一天 cal.set(Calendar.MONTH, month-1); cal.set(Calendar.DAY_OF_MONTH,cal.getActualMaximum(Calendar.DATE)); new SimpleDateFormat("yyyy-MM-dd").format(cal.getTime())+" 23:59:59" } //获取当前时间 def getCurrentTime():String={ val now: Date = new Date(); val dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); dateFormat.format(now); } //获取两个日期时间间隔秒数 def getTimeDifferenceOfTwoTime(startTime:String,endTime:String):Int={ val l_time=(tranTimeToLong(endTime)-tranTimeToLong(startTime))/1000 l_time.asInstanceOf[Int] } def main(args: Array[String]): Unit = { println(getCurrentTime()) println(getTimeDifferenceOfTwoTime(getCurrentTime(),"2020-03-11 14:35:40")) } } ## 5、主程序 ## package com.hyj.main import com.hyj.util.{MySqlConnectionPool, RedisUtils} import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.{Seconds, StreamingContext} import org.codehaus.jettison.json.JSONObject /** * 统计一些用户实时步行的步数信息,包括某个用户新统计时的时间、所在地点、新增步数; * 将每个用户以及实时更新的步数信息保存到mysql数据库中; */ object kafka2sparkStreaming2mySql { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("kafka2sparkStreaming2mySql") .setMaster("local[1]") //.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") //设置流数据每批的时间间隔为2s val ssc = new StreamingContext(conf, Seconds(1)) //控制日志输出级别 ssc.sparkContext.setLogLevel("WARN") //WARN,INFO,DEBUG ssc.checkpoint("checkpoint") val topic = "topic_walkCount" val groupId = "t03" val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "192.168.230.21:6667,192.168.230.22:6667,192.168.230.23:6667", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> groupId, "auto.offset.reset" -> "earliest", // 初次启动从最开始的位置开始消费 "enable.auto.commit" -> (false: java.lang.Boolean) // 自动提交设置为 false ) val topics = Array(topic) val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, //均匀分发到executor ConsumerStrategies.Subscribe[String, String](topics, kafkaParams) ) stream.foreachRDD(rdd => { // 获取每一个分区的消费的偏移量 val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd.foreachPartition(partitions => { val conn = MySqlConnectionPool.getConnection partitions.foreach(records => { val record = new JSONObject(records.value()) val user = record.getString("user") val countTime = record.getString("count_time") val walkPlace = record.getString("walk_place") val newWalkNum = record.getInt("new_walkNum") println(record.toString) val sql = "insert into walk_info(user,counttime,walkplace,newwalknum) values(\'" + user + "\',\'" + countTime + "\',\'"+walkPlace+"\',"+newWalkNum+")" println(sql) val stmt = conn.createStatement stmt.executeUpdate(sql) }) MySqlConnectionPool.returnConnection(conn) }) // 手动提交偏移量 stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) }) ssc.start() ssc.awaitTermination() } } # 三、出现的问题 # java.sql.SQLException: Incorrect string value: '\xE6\x93\x8D\xE5\x9C\xBA...' for column walkplace 解决办法: 将创建表的sql中,类型从varchar改为nvarchar; # 四、效果 # mysql> select * from walk_info; +-----------+---------------------+-----------------+------------+ | user | counttime | walkplace | newwalknum | +-----------+---------------------+-----------------+------------+ | zhangSan | 2020-06-30 16:45:52 | 操场西门 | 1 | | liSi | 2020-06-30 16:45:54 | 操场西南门 | 3 | | wangWu | 2020-06-30 16:45:55 | 操场北门 | 3 | | xiaoQiang | 2020-06-30 16:45:56 | 操场东南门 | 3 | | zhangSan | 2020-06-30 16:45:57 | 操场北门 | 4 | | liSi | 2020-06-30 16:45:58 | 操场西北门 | 1 | | zhangSan | 2020-06-30 16:51:51 | 操场西南门 | 4 | | liSi | 2020-06-30 16:51:53 | 操场西北门 | 3 | | wangWu | 2020-06-30 16:51:54 | 操场北门 | 3 | | xiaoQiang | 2020-06-30 16:51:55 | 操场西北门 | 3 | | zhangSan | 2020-06-30 16:51:56 | 操场南门 | 2 | | liSi | 2020-06-30 16:51:57 | 操场西南门 | 3 | | wangWu | 2020-06-30 16:51:58 | 操场东门 | 3 | | xiaoQiang | 2020-06-30 16:51:59 | 操场东北门 | 2 | | zhangSan | 2020-06-30 16:52:00 | 操场东南门 | 4 | +-----------+---------------------+-----------------+------------+ 15 rows in set (0.00 sec) mysql> [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2h5al9raW5n_size_16_color_FFFFFF_t_70]: https://img-blog.csdnimg.cn/20200630170012211.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2h5al9raW5n,size_16,color_FFFFFF,t_70
还没有评论,来说两句吧...