Kafka Flink示例(基于OSX环境)
需要Java环境、依赖zookeeper
kafka
下载及安装
Java环境配置
解压至/opt目录
zk启动
cd /opt/zookeeper-3.5.8
bin/zkServer.sh start
kafka启动
cd /opt/kafka_2.12-2.7.0
bin/kafka-server-start.sh config/server.properties
show port status
netstat -tnlp
测试
cd /opt/kafka_2.12-2.7.0
bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic flink-test
模拟接收消息
cd /opt/kafka_2.12-2.7.0
bin/kafka-console-consumer.sh --topic flink-test --from-beginning --bootstrap-server 127.0.0.1:9092
模拟发送消息
cd /opt/kafka_2.12-2.7.0
bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic flink-test
下面可以写Java程序了!
需要一个maven项目
要引入flink核心包,flink-streaming,还有flink-kafka-connector;
<properties>
<flink.version>1.11.3</flink.version>
</properties>
<repositories>
<repository>
<id>aliyun</id>
<name>aliyun</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
package com.hi7s.tech;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import java.util.Properties;
public class StreamWorkCount {
private static final String KAFKFA_URL = "192.168.0.106:9092,192.168.0.107:9092";
private static final String ZK_URL = "192.168.0.106:2181,192.168.0.107:2181";
private static final String TOPIC = "flink-test";
private static final String GROUP_ID = "test-consumer-group";
private static final Properties PROP = new Properties();
static {
PROP.setProperty("bootstrap.servers", KAFKFA_URL);
PROP.setProperty("zookeeper.connect", ZK_URL);
PROP.setProperty("group.id", GROUP_ID);
}
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>(TOPIC, new SimpleStringSchema(), PROP);//test0是kafka中开启的topic
DataStream<Tuple2<String, Integer>> keyedStream = env.addSource(kafkaConsumer).flatMap(new LineSplitter()).keyBy(0).sum(1);
keyedStream.print();
env.execute("Flink Streaming Java API Skeleton");
}
static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
public void flatMap(String val, Collector<Tuple2<String, Integer>> collector) {
for (String word : val.toLowerCase().split("\\W+")) {
if (StringUtils.isNotBlank(word)) {
collector.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}
}
还没有评论,来说两句吧...