kafka-->storm-->mongodb 待我称王封你为后i 2022-01-09 07:47 210阅读 0赞 **目的:** 通过Spout发射kafka的数据,到bolt统计每一个单词的个数,将这些记录更新到mongodb中。 Spout的nextTuple方法会一直处于一个while循环这中,每一条数据发送给bolt后,bolt都会调用一次execute方法。 spout用于发射数据,bolt用于对数据进行处理。 **MongoUtil:mongo工具类** package storm; import com.mongodb.BasicDBObject; import com.mongodb.DB; import com.mongodb.DBCollection; import com.mongodb.DBObject; import com.mongodb.MongoClient; public class MongoUtil \{ private MongoUtil()\{\} private static MongoClient mongo; private static DB db; private static DBCollection collection; static\{ mongo = new MongoClient("192.168.170.185",27017); db = mongo.getDB("mySpout"); collection = db.getCollection("myBolt"); \} public static Long getCount()\{ return collection.count(new BasicDBObject("\_id",1L)); \} public static void insert(String substring)\{ DBObject obj = new BasicDBObject(); obj.put("\_id", 1); obj.put("bolt", substring); collection.insert(obj); \} public static void update(String substring)\{ DBObject obj = new BasicDBObject(); obj.put("\_id", 1); DBObject obj2 = collection.findOne(obj); obj2.put("bolt", substring); collection.update(obj, obj2); \} \} **SentenceSpout:发射数据的spout,从kafka读取数据。** package storm; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import org.apache.kafka.common.utils.Utils; import org.apache.storm.Constants; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import kafka.KafkaConsumer; import kafka.KafkaProducer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.serializer.StringDecoder; import kafka.utils.VerifiableProperties; public class SentenceSpout extends BaseRichSpout\{ private SpoutOutputCollector collector; private int index = 0; private ConsumerConnector consumer; private Map conf; @Override public void open(Map map, TopologyContext context, SpoutOutputCollector collector) \{//尽量将初始化写在open方法中,否则可能会报错。 this.conf = map; this.collector = collector; Properties props = new Properties(); // zookeeper 配置 props.put("zookeeper.connect", "192.168.170.185:2181"); // 消费者所在组 props.put("group.id", "testgroup"); // zk连接超时 props.put("zookeeper.session.timeout.ms", "4000"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "smallest"); // 序列化类 props.put("serializer.class", "kafka.serializer.StringEncoder"); ConsumerConfig config = new ConsumerConfig(props); this.consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config); \} @Override public void nextTuple() \{ Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put("helloworld", new Integer(1)); StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties()); StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties()); Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder); KafkaStream<String, String> stream = consumerMap.get("helloworld").get(0); ConsumerIterator<String, String> it = stream.iterator(); int messageCount = 0; while (it.hasNext())\{ this.collector.emit(new Values(it.next().message().toString())); \} // index = (index+1>=sentences.length)?0:index+1; \} @Override public void declareOutputFields(OutputFieldsDeclarer declarer) \{ declarer.declare(new Fields("sentence")); \} \} **SplitSentenceBolt:切割单词bolt** package storm; import java.util.Map; import org.apache.storm.Constants; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; public class SplitSentenceBolt extends BaseRichBolt\{ private OutputCollector collector; private Map stormConf; @Override public void prepare(Map map, TopologyContext context, OutputCollector collector) \{ this.stormConf = map; this.collector = collector; \} @Override public void execute(Tuple tuple) \{ String str = tuple.getStringByField("sentence"); String\[\] split = str.split(" "); for(String word : split)\{ this.collector.emit(new Values(word)); \} \} @Override public void declareOutputFields(OutputFieldsDeclarer declarer) \{ declarer.declare(new Fields("word")); \} \} **WordCountBolt:计数的bolt** package storm; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.util.HashMap; import java.util.Map; import org.apache.storm.Config; import org.apache.storm.Constants; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; public class WordCountBolt extends BaseRichBolt\{ private Map boltconf; private OutputCollector collector; private HashMap<String,Long> counts = null; @Override public void prepare(Map map, TopologyContext context, OutputCollector collector) \{ this.boltconf = map; this.collector=collector; this.counts = new HashMap<String,Long>(); \} @Override public void execute(Tuple tuple) \{ String word = tuple.getStringByField("word"); this.counts.put(word, this.counts.containsKey(word)?this.counts.get(word)+1:1); this.collector.emit(new Values(word,counts.get(word))); \} @Override public void declareOutputFields(OutputFieldsDeclarer declarer) \{ declarer.declare(new Fields("word","count")); \} \} **ReportBolt:打印记录结果,并将结果插入mongodb中bolt** package storm; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.storm.Config; import org.apache.storm.Constants; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple; import com.mongodb.BasicDBObject; import com.mongodb.DB; import com.mongodb.DBCollection; import com.mongodb.DBObject; import com.mongodb.MongoClient; public class ReportBolt extends BaseRichBolt\{ private HashMap<String,Long> counts = null; private Map boltconf; private StringBuffer buf = null; @Override public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) \{ this.boltconf = arg0; this.counts=new HashMap<String,Long>(); this.buf = new StringBuffer(); \} @Override public void execute(Tuple tuple) \{ String word = tuple.getStringByField("word"); Long counts = tuple.getLongByField("count"); this.counts.put(word, counts); System.out.println("------统计结果------"); List<String> keys = new ArrayList<String>(); keys.addAll(this.counts.keySet()); buf.append("\{"); for(String key : keys)\{ buf.append(key+":"+this.counts.get(key)).append(","); System.out.println(key + " : " +this.counts.get(key)); \} System.out.println("------------------"); buf.append("\}"); String substring = buf.delete(buf.length()-2, buf.length()-1).toString(); long count = MongoUtil.getCount(); if(count<=0)\{ MongoUtil.insert(substring); \}else\{ MongoUtil.update(substring); \} buf = buf.delete(0, buf.length()); \} @Override public void declareOutputFields(OutputFieldsDeclarer arg0) \{ // TODO Auto-generated method stub \} /\* @Override public Map<String, Object> getComponentConfiguration() \{ HashMap<String, Object> hashMap = new HashMap<String, Object>(); hashMap.put(Config.TOPOLOGY\_TICK\_TUPLE\_FREQ\_SECS, 10); return hashMap; \}\*/ \} **WordCountTopology: topology,storm零件的组装** package storm; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; public class WordCountTopology \{ private static final String SENTENCE\_SPOUT\_ID = "sentence-spout"; private static final String SPLIT\_BOLT\_ID = "split-bolt"; private static final String COUNT\_BOLT\_ID = "count-bolt"; private static final String REPORT\_BOLT\_ID = "report-bolt"; private static final String TOPOLOGY\_NAME = "word-count-topology"; public static void main(String\[\] args) throws Exception \{ //--实例化Spout和Bolt SentenceSpout spout = new SentenceSpout(); SplitSentenceBolt splitBolt = new SplitSentenceBolt(); WordCountBolt countBolt = new WordCountBolt(); ReportBolt reportBolt = new ReportBolt(); //--创建TopologyBuilder类实例 TopologyBuilder builder = new TopologyBuilder(); //--注册SentenceSpout builder.setSpout(SENTENCE\_SPOUT\_ID, spout); //--注册SplitSentenceBolt,订阅SentenceSpout发送的tuple //此处使用了shuffleGrouping方法,此方法指定所有的tuple随机均匀的分发给SplitSentenceBolt的实例。 builder.setBolt(SPLIT\_BOLT\_ID, splitBolt).shuffleGrouping(SENTENCE\_SPOUT\_ID); //--注册WordCountBolt,,订阅SplitSentenceBolt发送的tuple //此处使用了filedsGrouping方法,此方法可以将指定名称的tuple路由到同一个WordCountBolt实例中 builder.setBolt(COUNT\_BOLT\_ID, countBolt).fieldsGrouping(SPLIT\_BOLT\_ID, new Fields("word")); //--注册ReprotBolt,订阅WordCountBolt发送的tuple //此处使用了globalGrouping方法,表示所有的tuple都路由到唯一的ReprotBolt实例中 builder.setBolt(REPORT\_BOLT\_ID, reportBolt).globalGrouping(COUNT\_BOLT\_ID); //--创建配置对象 Config conf = new Config(); //--创建代表集群的对象,LocalCluster表示在本地开发环境来模拟一个完整的Storm集群 //本地模式是开发和测试的简单方式,省去了在分布式集群中反复部署的开销 //另外可以执行断点调试非常的便捷 LocalCluster cluster = new LocalCluster(); //--提交Topology给集群运行 cluster.submitTopology(TOPOLOGY\_NAME, conf, builder.createTopology()); //--运行10秒钟后杀死Topology关闭集群 Thread.sleep(300000000); cluster.killTopology(TOPOLOGY\_NAME); cluster.shutdown(); \} \} 转载于:https://www.cnblogs.com/wangjing666/p/6894015.html
还没有评论,来说两句吧...