spark+kafka+hive ゞ 浴缸里的玫瑰 2022-05-14 07:21 308阅读 0赞 使用spark streaming消费kafka消息,并通过hql同步到hive中 用到的pom: <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.3.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.3.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId> <version>2.3.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_2.11</artifactId> <version>1.6.3</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.3.1</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>compile</scope> </dependency> * hive可以通过更改配置来支持行级别的更新和删除,这里通过曲线(比如更新行:先删除原有行再插入新行)实现,效率不高; * 这里消费的消息是通过canal解析sql的binlog得到的json格式,canal解析并通过kafka生产消息参照另一篇文章:[canal+kafka实践——实时etl][canal_kafka_etl] package com.kexin.etl; import com.google.gson.Gson; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka010.ConsumerStrategies; import org.apache.spark.streaming.kafka010.KafkaUtils; import org.apache.spark.streaming.kafka010.LocationStrategies; import java.io.File; import java.util.*; import java.util.regex.Pattern; /** * @Author KeXin * @Date 2018/9/4 上午10:05 **/ public class KafkaEtl { static String warehouseLocation = new File("spark-warehouse").getAbsolutePath(); static SparkSession spark = SparkSession.builder() .config("spark.sql.warehouse.dir", warehouseLocation) .master("local[2]") .appName("etl") .enableHiveSupport() .getOrCreate(); public static void main(String[] args) throws InterruptedException { String brokers = "127.0.0.1:9092"; String topics = "etl_timely.*"; Pattern pattern = Pattern.compile(topics); JavaStreamingContext jssc = new JavaStreamingContext(JavaSparkContext.fromSparkContext(spark.sparkContext()), Durations.seconds(2)); Map<String, Object> kafkaParams = new HashMap<>(); kafkaParams.put("bootstrap.servers", brokers); kafkaParams.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); kafkaParams.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); kafkaParams.put("serializer.class", "kafka.serializer.StringEncoder"); kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); kafkaParams.put("group.id", "helloSpark"); JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(jssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.SubscribePattern(pattern, kafkaParams)); //{event_op_type=insert, database_name=etl_kafka, event_timestamp=1536029791000, table_name=persons // , map_info={Address=, FirstName=Bill, LastName=Gates, City=, P_Id=4}} //获取每条消息的offset messages.foreachRDD((rdd, time) -> { System.out.println("========= " + time + "========="); JavaRDD<String> rowRDD = rdd.map(record -> { String result; Gson gson = new Gson(); Map<String, Object> map = new HashMap<>(); map = gson.fromJson(record.value(), map.getClass()); Map<String, Object> map_info = (Map<String, Object>) map.get("map_info"); String eventType = (String) map.get("event_op_type"); switch (eventType) { case "delete": result = "insert overwrite table " + map.get("table_name")+" select * from "+map.get("table_name")+" where "; for (String key : map_info.keySet()) { result += key + "!='" + map_info.get(key) + "' or "; } result = result.substring(0, result.lastIndexOf("or ")); break; case "insert": String stemp = " values("; result = "insert into " + map.get("table_name"); for (String key : map_info.keySet()) { stemp += "'" + map_info.get(key) + "',"; } stemp = stemp.substring(0, stemp.lastIndexOf(',')); result += stemp + ")"; break; default: //update Map<String, Object> map_before = (Map<String, Object>) map.get("beforeColumns"); result = "insert overwrite table " + map.get("table_name")+" select * from "+map.get("table_name")+" where "; for (String key : map_before.keySet()) { result += key + "!='" + map_before.get(key) + "' or "; } result = result.substring(0, result.lastIndexOf("or ")); spark.sql(result); String stemp1 = " values("; result = "insert into " + map.get("table_name"); for (String key : map_info.keySet()) { stemp1 += "'" + map_info.get(key) + "',"; } stemp1 = stemp1.substring(0, stemp1.lastIndexOf(',')); result += stemp1 + ")"; break; } System.out.println(result); String stable = "CREATE TABLE IF NOT EXISTS " + map.get("table_name")+" ("; for(String key:map_info.keySet()){ stable += key+" varchar(255),"; } stable = stable.substring(0,stable.lastIndexOf(","))+") USING hive"; spark.sql(stable); spark.sql(result); spark.sql("select * from "+map.get("table_name")).show(); return result; }); rowRDD.saveAsTextFile("./src/main/resources/output/operations"); }); // Start the computation jssc.start(); jssc.awaitTermination(); jssc.stop(false); } } [canal_kafka_etl]: https://blog.csdn.net/qq_29721419/article/details/82388090
还没有评论,来说两句吧...