`
m635674608
  • 浏览: 5060983 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

Kafka Consumer端的一些解惑

    博客分类:
  • MQ
 
阅读更多

最近一直忙着各种设计和文档,终于有时间来更新一点儿关于kafka的东西。之前有一篇文章讲述的是kafka Producer端的程序,也就是日志的生产者,这部分比较容易理解,业务系统将运行日志或者业务日志发送到broker中,由broker代为存储。那讲的是如何收集日志,今天要写的是如何获取日志,然后再做相关的处理。

 

之前写过kafka是讲日志按照topic的形式存储,一个topic会按照partition存在同一个文件夹下,目录在config/server.properties中指定,具体的存储规则可以查看之前的文章:

 

 

 

1
2
# The directory under which to store log files
log.dir=/tmp/kafka-logs

 

Consumer端的目的就是为了获取log日志,然后做进一步的处理。在这里我们可以将数据的处理按照需求分为两个方向,线上和线下,也可以叫实时和离线。实时处理部分类似于网站里的站短,有消息了马上就推送到前端,这是一种对实时性要求极高的模式,kafka可以做到,当然针对站短这样的功能还有更好的处理方式,我主要将kafka线上消费功能用在了实时统计上,处理一些如实时流量汇总、各系统实时吞吐量汇总等。

 

这种应用,一般采用一个consumer中的一个group对应一个业务,配合多个producer提供数据,如下图模
式:

 

 

采用这种方式处理很简单,采用官网上给的例子即可解决,只是由于版本的问题,代码稍作更改即可:

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.a2.kafka.consumer;
 
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import kafka.message.MessageAndMetadata;
 
public class CommonConsumer {
    public static void main(String[] args) {
        // specify some consumer properties
        Properties props = new Properties();
        props.put("zk.connect", "192.168.181.128:2181");
        props.put("zk.connectiontimeout.ms", "1000000");
        props.put("groupid", "test_group");
 
        // Create the connection to the cluster
        ConsumerConfig consumerConfig = new ConsumerConfig(props);
        ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
 
        Map<String, Integer> map=new HashMap<String,Integer>();
        map.put("test", 2);
        // create 4 partitions of the stream for topic “test”, to allow 4 threads to consume
        Map<String, List<KafkaStream<Message>>> topicMessageStreams =
            consumerConnector.createMessageStreams(map);
        List<KafkaStream<Message>> streams = topicMessageStreams.get("test");
 
        // create list of 4 threads to consume from each of the partitions
        ExecutorService executor = Executors.newFixedThreadPool(4);
 
        // consume the messages in the threads
        for(final KafkaStream<Message> stream: streams) {
          executor.submit(new Runnable() {
            public void run() {
              for(MessageAndMetadata<Message> msgAndMetadata: stream) {
                // process message (msgAndMetadata.message())
                  System.out.println(msgAndMetadata.message());
              }
            }
          });
        }
    }
}

 

这是一个user levelAPI,还有一个low levelAPI可以从官网找到,这里就不贴出来了。这个consumer是底层采用的是一个阻塞队列,只要一有producer生产数据,那consumer就会将数据打印出来,这是不是十分符合实时性的要求。

 

当然这里会产生一个很严重的问题,如果你重启一下上面这个程序,那你连一条数据都抓不到,但是你去log文件中明明可以看到所有数据都好好的存在。换句话说,一旦你消费过这些数据,那你就无法再次用同一个groupid消费同一组数据了。我已经把结论说出来了,要消费同一组数据,你可以采用不同的group

 

 

简单说下产生这个问题的原因,这个问题类似于transaction commit,在消息系统中都会有这样一个问题存在,数据消费状态这个信息到底存哪里。是存在consumer端,还是存在broker端。对于这样的争论,一般会出现三种情况:

 

 

  • At most once—this handles the first case described. Messages are immediately marked as consumed, so they can't be given out twice, but many failure scenarios may lead to losing messages.
  • At least once—this is the second case where we guarantee each message will be delivered at least once, but in failure cases may be delivered twice.
  • Exactly once—this is what people actually want, each message is delivered once and only once.

 

 

 

第一种情况是将消费的状态存储在了broker端,一旦消费了就改变状态,但会因为网络原因少消费信息,第二种是存在两端,并且先在broker端将状态记为send,等consumer处理完之后将状态标记为consumed,但也有可能因为在处理消息时产生异常,导致状态标记错误等,并且会产生性能的问题。第三种当然是最好的结果。

 

Kafka解决这个问题采用high water mark这样的标记,也就是设置offset:

 

1
Kafka does two unusual things with respect to metadata. First the stream is partitioned on the brokers into a set of distinct partitions. The semantic meaning of these partitions is left up to the producer and the producer specifies which partition a message belongs to. Within a partition messages are stored in the order in which they arrive at the broker, and will be given out to consumers in that same order. This means that rather than store metadata for each message (marking it as consumed, say), we just need to store the "high water mark" for each combination of consumer, topic, and partition. Hence the total metadata required to summarize the state of the consumer is actually quite small. In Kafka we refer to this high-water mark as "the offset" for reasons that will become clear in the implementation section.

 

所以在每次消费信息时,log4j中都会输出不同的offset

 

1
2
3
[FetchRunnable-0] INFO : kafka.consumer.FetcherRunnable#info : FetchRunnable-0 start fetching topic: test part: 0 offset: 0 from 192.168.181.128:9092
 
[FetchRunnable-0] INFO : kafka.consumer.FetcherRunnable#info : FetchRunnable-0 start fetching topic: test part: 0 offset: 15 from 192.168.181.128:9092

除了采用不同的groupid去抓取已经消费过的数据,kafka还提供了另一种思路,这种方式更适合线下的操作,镜像。

 

 

通过一些配置,就可以将线上产生的数据同步到镜像中去,然后再由特定的集群区处理大批量的数据,这种方式可以采用low levelAPI按照不同的partitionoffset来抓取数据,以获得更好的并行处理效果。

 

http://my.oschina.net/ielts0909/blog/110280

分享到:
评论

相关推荐

    很全面的kafka技术文档

    #### 八、Kafka Consumer端的一些解惑 Consumer 端的主要任务是从 Kafka 中获取消息并进行处理。需要注意的关键点包括: - **消息拉取(Polling)**:Consumer 通过轮询的方式从 Kafka 中获取消息。 - **自动提交...

    ChromeOS镜像文件.zip

    目录: ChromeOS-PC-20130222-oscome.com ChromeOS-Vanilla-4028.0.2013_04_20_1810-r706c4144 ChromeOS-Vanilla-4028.0.2013_04_20_1810-r706c4144-VirtualBox ChromeOS-Vanilla-4028.0.2013_04_20_1810-r706c4144-VMWare ChromeOS-virtualbox-20130222-OSCOME.COM ChromeOS-vmware-20130222-OSCOME.COM 网盘文件永久链接

    ieee33节点matlab模型

    IEEE33节点模型搭建,matlab

    3GPP R15 38.331 5G NR无线资源控制(RRC)协议规范解析

    3GPP R15 38.331 5G NR无线资源控制(RRC)协议规范解析

    基于ssm+mysql实现的零食商城系统(电商购物).zip(毕设&课设&实训&大作业&竞赛&项目)

    项目工程资源经过严格测试运行并且功能上ok,可实现复现复刻,拿到资料包后可实现复现出一样的项目,本人系统开发经验充足(全栈全领域),有任何使用问题欢迎随时与我联系,我会抽时间努力为您解惑,提供帮助 【资源内容】:包含源码+工程文件+说明等。答辩评审平均分达到96分,放心下载使用!可实现复现;设计报告也可借鉴此项目;该资源内项目代码都经过测试运行,功能ok 【项目价值】:可用在相关项目设计中,皆可应用在项目、毕业设计、课程设计、期末/期中/大作业、工程实训、大创等学科竞赛比赛、初期项目立项、学习/练手等方面,可借鉴此优质项目实现复刻,设计报告也可借鉴此项目,也可基于此项目来扩展开发出更多功能 【提供帮助】:有任何使用上的问题欢迎随时与我联系,抽时间努力解答解惑,提供帮助 【附带帮助】:若还需要相关开发工具、学习资料等,我会提供帮助,提供资料,鼓励学习进步 下载后请首先打开说明文件(如有);整理时不同项目所包含资源内容不同;项目工程可实现复现复刻,如果基础还行,也可在此程序基础上进行修改,以实现其它功能。供开源学习/技术交流/学习参考,勿用于商业用途。质量优质,放心下载使用,资源为网络商品(电子资料类)基于网络商品和电子资料商品的性质和特征不支持退款

    19考试真题最近的t44.txt

    19考试真题最近的t44.txt

    JSP基于SSH2新闻发布系统.zip(毕设&课设&实训&大作业&竞赛&项目)

    项目工程资源经过严格测试运行并且功能上ok,可实现复现复刻,拿到资料包后可实现复现出一样的项目,本人系统开发经验充足(全栈全领域),有任何使用问题欢迎随时与我联系,我会抽时间努力为您解惑,提供帮助 【资源内容】:包含源码+工程文件+说明等。答辩评审平均分达到96分,放心下载使用!可实现复现;设计报告也可借鉴此项目;该资源内项目代码都经过测试运行,功能ok 【项目价值】:可用在相关项目设计中,皆可应用在项目、毕业设计、课程设计、期末/期中/大作业、工程实训、大创等学科竞赛比赛、初期项目立项、学习/练手等方面,可借鉴此优质项目实现复刻,设计报告也可借鉴此项目,也可基于此项目来扩展开发出更多功能 【提供帮助】:有任何使用上的问题欢迎随时与我联系,抽时间努力解答解惑,提供帮助 【附带帮助】:若还需要相关开发工具、学习资料等,我会提供帮助,提供资料,鼓励学习进步 下载后请首先打开说明文件(如有);整理时不同项目所包含资源内容不同;项目工程可实现复现复刻,如果基础还行,也可在此程序基础上进行修改,以实现其它功能。供开源学习/技术交流/学习参考,勿用于商业用途。质量优质,放心下载使用,资源为网络商品(电子资料类)基于网络商品和电子资料商品的性质和特征不支持退款,质量优质,放心下载使用

    19考试真题最近的t49.txt

    19考试真题最近的t49.txt

    19考试真题最近的t61.txt

    19考试真题最近的t61.txt

    电动汽车充电站选址定容优化:基于MATLAB建模求解与成本最小化策略,电动汽车充电站选址定容优化:基于MATLAB的最优规划模型及初学者指南,电动汽车充电站的最优选址定容MATLAB程序 以规划期内充

    电动汽车充电站选址定容优化:基于MATLAB建模求解与成本最小化策略,电动汽车充电站选址定容优化:基于MATLAB的最优规划模型及初学者指南,电动汽车充电站的最优选址定容MATLAB程序 以规划期内充电站的总成本 (包括投资、运行和维护成本)和网损费用之和最小为目标,考虑了相关的约束条件,构造了电动汽车充电站最优规划的数学模型。 从34个位置中,选取7个充电站地址,进行选址优化 关键词:电动汽车;充电站;选址和定容 程序注释清晰,适合初学者学习 ,电动汽车; 充电站选址定容; MATLAB程序; 规划模型; 成本优化; 网损费用; 初学者学习; 程序注释清晰,基于MATLAB的电动汽车充电站选址定容优化程序:成本最小化与约束条件下的选址策略

    威纶通触摸屏图库模板程序:多尺寸适用,PS原文件可自由修改,便捷电气助手应用,威纶通触摸屏图库模板程序:多尺寸适用,PS原文件可自由修改,便捷电气助手应用,威纶通触摸屏图库模板程序(电气助手) 可直接

    威纶通触摸屏图库模板程序:多尺寸适用,PS原文件可自由修改,便捷电气助手应用,威纶通触摸屏图库模板程序:多尺寸适用,PS原文件可自由修改,便捷电气助手应用,威纶通触摸屏图库模板程序(电气助手) 可直接使用。 内附原图、PS原文件可自行修改 不同触摸屏,不同寸尺都可以使用 ,威纶通触摸屏; 图库模板程序; 电气助手; 直接使用; 原图; 修改; 兼容不同寸尺,威纶通触摸屏图库模板程序:电气助手,便捷编辑通用模板

    群辉引导7.2.2 最新 vmware workstation 已经帮忙转换好为vmdk文件 直接使用就可以

    修复 "保存'/opt/rr'的修改" 后 主菜单锁死问题. 修复 trivial 插件的语法错误. 修复 open-vm-tools 套件 缺失的 SOCKETS 驱动. 添加 vmtools 插件, 包含 qemu-ga & open-vm-tools. 4.1. 该插件会自动判断环境并启用对应的功能, 物理机也不用刻意删除该插件. 4.2. 新安装用户会默认选中, 升级用户如需要请手动添加该插件. 4.3. 如启用该插件, 请不要再在系统中安装套件. 修复 wireless 插件. 5.1. 修复 RR 下无线网络 IP 显示和刷新问题. 5.2. 修复 RR 下设置 SSID&PSK 后 DSM 下不驱动的问题. 5.3. 同步 RR 下的 SSID&PSK 到 DSM 下. 5.4. 修复 junior 模式下无线网络的支持, 已支持 无线网卡的 DSM 系统安装. (暂时不支持 intel 无线网卡) 5.5. wpa_supplicant.conf 文件位于引导盘第一个分区根目录, 纯无线环境可手动放置该文件后其启动引导.

    19考试真题最近的t66.txt

    19考试真题最近的t66.txt

    19考试真题最近的t37.txt

    19考试真题最近的t37.txt

    Arduino-Mega2560开发板-毕业设计

    Arduino_Mega2560开发板工程文件 包含 原理图 PCB图

    智能养猪系统的高精度称重算法及其Python实现(含详细可运行代码及解释)

    内容概要:本文详述了一种用于智能养猪的高精度称重系统设计及其实现方法,主要涵盖了卡尔曼滤波、数据采集与预处理、重量估算与存储等功能。文中提供了完整的Python代码示例和详细的代码解释,旨在减少噪声干扰并提高数据准确性。具体而言,通过对采集的数据进行卡尔曼滤波,去除异常值,并使用一定时间段内数据的平均值作为最终的体重估计。此外,还实现了一个简单的图形用户界面,能够实时显示称重数据和估计的重量。 适合人群:农业自动化领域的开发者和技术爱好者,尤其关注智能畜牧业的技术应用。 使用场景及目标:适用于智能养猪场的精准称重,提高养猪效率和管理水平,确保获取高精度、可靠的牲畜体重数据,帮助养殖场更好地管理饲养过程。同时,提供完整的源代码有助于相关人员理解和优化现有系统。 阅读建议:对于想要深入了解智能畜牧业相关技术的读者来说,可以通过本教程掌握从硬件接入、软件设计再到数据处理全流程的具体细节。重点关注各个关键算法的实现原理及其应用场景,从而为自己的项目带来启示与借鉴。

    基于SSM框架构建积分系统和基本商品检索系统(Spring+SpringMVC+MyBatis+Lucene+Redis+MAVEN).zip(毕设&课设&实训&大作业&竞赛&项目)

    项目工程资源经过严格测试运行并且功能上ok,可实现复现复刻,拿到资料包后可实现复现出一样的项目,本人系统开发经验充足(全栈全领域),有任何使用问题欢迎随时与我联系,我会抽时间努力为您解惑,提供帮助 【资源内容】:包含源码+工程文件+说明等。答辩评审平均分达到96分,放心下载使用!可实现复现;设计报告也可借鉴此项目;该资源内项目代码都经过测试运行,功能ok 【项目价值】:可用在相关项目设计中,皆可应用在项目、毕业设计、课程设计、期末/期中/大作业、工程实训、大创等学科竞赛比赛、初期项目立项、学习/练手等方面,可借鉴此优质项目实现复刻,设计报告也可借鉴此项目,也可基于此项目来扩展开发出更多功能 【提供帮助】:有任何使用上的问题欢迎随时与我联系,抽时间努力解答解惑,提供帮助 【附带帮助】:若还需要相关开发工具、学习资料等,我会提供帮助,提供资料,鼓励学习进步 下载后请首先打开说明文件(如有);整理时不同项目所包含资源内容不同;项目工程可实现复现复刻,如果基础还行,也可在此程序基础上进行修改,以实现其它功能。供开源学习/技术交流/学习参考,勿用于商业用途。质量优质,放心下载使用,资源为网络商品(电子资料类)基于网络商品和电子资料商品的性质和特征不支持退款

    最新更新!!!地级市-产业链韧性数据(2006-2021年)

    ## 01、数据简介 产业链韧性是指在产业链部分环节出现问题或遭受内外部冲击时,产业链仍能保持其稳定性和动态平衡,迅速做出反应并恢复正常运转的能力。这种能力体现了产业链的复杂适应性,是其能够应对各种不确定性因素和破坏性事件的重要保障。 产业链韧性是保障产业链安全稳定运行的重要基础,对于提升产业竞争力、推动经济高质量发展具有重要意义。 数据名称:地级市-产业链韧性数据 数据年份:2006-2021年 ## 02、相关数据 代码 年度 城市 产业结构HHI 获得专利数 第一产业增加值占GDP比 第二产业增加值占GDP比 第三产业增加值占GDP比 产业链韧性

    PNP发射极接地开关仿真原理图

    PNP发射极接地开关仿真原理图

    上门预约服务小程序v4.10.9+前端.zip

    上门预约服务小程序v4.10.9+前端 文章列表单图时,图标统一左侧对齐 文章内增加视频位置,显示在文章顶部 文章内底部导航增加首页、分享、自定义按钮,可跳转内部页面、其他小程序、业务域名内的H5页面,方便宣传使用

Global site tag (gtag.js) - Google Analytics