`

工作中使用Kafka接收消息代码整理

 
阅读更多

前段时间工作中使用到Kafka接收消息,简单整理一下:

Properties props = new Properties();
props.put("zookeeper.connect", zkServer);
props.put("group.id",groupId);
//使用zk集群管理
ConsumerConfig conf = new ConsumerConfig(props);
kafka.javaapi.consumer.ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(conf);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topicId, 1);
Map<String, List<KafkaStream<String, LogInfoBean>>> consumerMap =
consumer.createMessageStreams(topicCountMap,new StringDecoder(new VerifiableProperties()), new DiggerEventDecoder());
List<KafkaStream<String,LogInfoBean>> streams = consumerMap.get(topicId);
KafkaStream<String,LogInfoBean> stream = streams.get(0);
ConsumerIterator<String,LogInfoBean> it = stream.iterator();
//没有消息时会阻塞线程
while (it.hasNext()){
LogInfoBean msgBean = it.next().message();
String logMessage = msgBean.getMsg();
//处理接收到的消息
//...
}

 

分享到:
评论

相关推荐

    网络信息交流工具【此代码由codefang.com收集整理】

    5. **消息队列与存储**:对于大量消息的处理,可以使用消息队列(如RabbitMQ、Kafka)来缓存和分发信息,同时数据库(如MySQL、MongoDB)用于持久化存储。 6. **实时通信**:实时聊天可能需要用到WebSocket技术,它...

    【面试宝典】2021年超全超详细的最新大数据开发面试题,附答案解析(一版).pdf

    MapReduce中ReduceTask的工作机制** - ReduceTask从各个MapTask拉取中间结果。 - 对拉取的数据进行合并、排序和分组。 - 对每组数据执行reduce函数。 - 写入最终结果到HDFS。 **12. MapReduce中shuffle阶段** - ...

    个人整理的3年以上JAVA开发面试资料

    6. **拦截器**:Spring MVC支持在请求处理过程中使用拦截器(Interceptor)来实现某些功能,例如权限验证、日志记录等。拦截器可以通过实现`HandlerInterceptor`接口或者继承`HandlerInterceptorAdapter`类来定义,...

    大数据面试整理

    在上述代码中,`MyMapper`类负责将输入文本拆分成单词并生成键值对(&lt;单词, 1&gt;)。`MyReducer`类则将所有相同的单词键对应的计数值(IntWritable)进行累加,最终输出每个单词及其总数。在Job配置中,设置了Mapper和...

    Thingsboard项目详细说明.doc

    thingsboard项目中使用了多个第三方包或插件,例如Apache Kafka、Apache Cassandra等。这些包或插件提供了额外的功能和作用,扩展了thingsboard的功能。 ThingsBoard 设备连接协议 thingsboard项目中的设备连接...

    JAVASendMail

    在描述中提到的“整理网上的例子”,通常指的是开发者可能参考的代码片段或教程。这些例子可能包括如何配置SMTP服务器信息,如何创建并设置`Message`对象,以及如何通过`Transport.send(Message)`方法发送邮件。常见...

    通用新闻采集系统通用新闻采集系统

    为了应对新闻数据的实时更新,新闻采集系统往往需要结合消息队列(如RabbitMQ或Kafka)实现异步处理,确保快速响应和处理大量数据。同时,采用分布式爬虫架构可以并行处理多个任务,提高采集效率。 六、反爬虫策略...

    reopositorioSD:分布式系统主题代码的个人存储库

    在“reopositorioSD:分布式系统主题代码的个人存储库”中,我们可以推测这是一个个人项目,专注于收集和整理与分布式系统相关的编程代码。这个存储库可能包含了各种实现分布式系统功能的示例代码、框架、算法和工具...

    (2024)跳槽涨薪必备精选面试题.pdf

    5. **Tomcat中为什么要使用自定义类加载器** - 解决类路径冲突问题。 - 提高安全性,避免不同应用间类的相互干扰。 6. **Tomcat如何进行优化?** - 调整 JVM 参数。 - 使用连接池管理数据库连接。 - 启用缓存...

    大数据工程师 Flink技术与实战 源码笔记下载

    "资料.txt"可能是笔记的文本形式,包含了上述所有知识点的详细内容,可能是作者在学习过程中整理的笔记要点,也可能包含了一些源代码示例和实战项目经验分享。 **总结** 这个压缩包资源对于正在学习或已经在使用...

    后端Java.pdf

    18. **中间件的理解**:对于Java后端开发者而言,中间件是支撑业务逻辑运行的重要组件,如Dubbo是分布式服务框架,MQ是消息队列系统,Redis是内存数据库,kafka是分布式流处理平台,zk是分布式协调服务。 19. **...

    Bat面试题汇总&详解

    消息中间件如RabbitMQ、Kafka、ActiveMQ等,用于解耦、异步处理,保证消息一致性主要通过确认机制和重试策略。 【数据库篇】 数据库锁机制包括行锁、表锁等,用于解决并发控制。事务隔离级别有读未提交、读已提交...

    java面试题目

    - **定义**:Apache Kafka是一种高性能的消息队列中间件,主要用于构建实时数据管道和流式应用。 - **特点**: - **高吞吐量**:支持大规模数据流的高效处理。 - **低延迟**:数据传输延迟低,适用于实时应用场景...

Global site tag (gtag.js) - Google Analytics