最近一直忙着各种设计和文档,终于有时间来更新一点儿关于kafka的东西。之前有一篇文章讲述的是kafka Producer端的程序,也就是日志的生产者,这部分比较容易理解,业务系统将运行日志或者业务日志发送到broker中,由broker代为存储。那讲的是如何收集日志,今天要写的是如何获取日志,然后再做相关的处理。
之前写过kafka是讲日志按照topic的形式存储,一个topic会按照partition存在同一个文件夹下,目录在config/server.properties中指定,具体的存储规则可以查看之前的文章:
1
2
|
# The directory under which to store log files log. dir = /tmp/kafka-logs
|
Consumer端的目的就是为了获取log日志,然后做进一步的处理。在这里我们可以将数据的处理按照需求分为两个方向,线上和线下,也可以叫实时和离线。实时处理部分类似于网站里的站短,有消息了马上就推送到前端,这是一种对实时性要求极高的模式,kafka可以做到,当然针对站短这样的功能还有更好的处理方式,我主要将kafka线上消费功能用在了实时统计上,处理一些如实时流量汇总、各系统实时吞吐量汇总等。
这种应用,一般采用一个consumer中的一个group对应一个业务,配合多个producer提供数据,如下图模
式:
采用这种方式处理很简单,采用官网上给的例子即可解决,只是由于版本的问题,代码稍作更改即可:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
package com.a2.kafka.consumer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import kafka.message.MessageAndMetadata;
public class CommonConsumer {
public static void main(String[] args) {
// specify some consumer properties
Properties props = new Properties();
props.put( "zk.connect" , "192.168.181.128:2181" );
props.put( "zk.connectiontimeout.ms" , "1000000" );
props.put( "groupid" , "test_group" );
// Create the connection to the cluster
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, Integer> map= new HashMap<String,Integer>();
map.put( "test" , 2 );
// create 4 partitions of the stream for topic “test”, to allow 4 threads to consume
Map<String, List<KafkaStream<Message>>> topicMessageStreams =
consumerConnector.createMessageStreams(map);
List<KafkaStream<Message>> streams = topicMessageStreams.get( "test" );
// create list of 4 threads to consume from each of the partitions
ExecutorService executor = Executors.newFixedThreadPool( 4 );
// consume the messages in the threads
for ( final KafkaStream<Message> stream: streams) {
executor.submit( new Runnable() {
public void run() {
for (MessageAndMetadata<Message> msgAndMetadata: stream) {
// process message (msgAndMetadata.message())
System.out.println(msgAndMetadata.message());
}
}
});
}
}
} |
这是一个user level的API,还有一个low level的API可以从官网找到,这里就不贴出来了。这个consumer是底层采用的是一个阻塞队列,只要一有producer生产数据,那consumer就会将数据打印出来,这是不是十分符合实时性的要求。
当然这里会产生一个很严重的问题,如果你重启一下上面这个程序,那你连一条数据都抓不到,但是你去log文件中明明可以看到所有数据都好好的存在。换句话说,一旦你消费过这些数据,那你就无法再次用同一个groupid消费同一组数据了。我已经把结论说出来了,要消费同一组数据,你可以采用不同的group。
简单说下产生这个问题的原因,这个问题类似于transaction commit,在消息系统中都会有这样一个问题存在,数据消费状态这个信息到底存哪里。是存在consumer端,还是存在broker端。对于这样的争论,一般会出现三种情况:
- At most once—this handles the first case described. Messages are immediately marked as consumed, so they can't be given out twice, but many failure scenarios may lead to losing messages.
- At least once—this is the second case where we guarantee each message will be delivered at least once, but in failure cases may be delivered twice.
- Exactly once—this is what people actually want, each message is delivered once and only once.
第一种情况是将消费的状态存储在了broker端,一旦消费了就改变状态,但会因为网络原因少消费信息,第二种是存在两端,并且先在broker端将状态记为send,等consumer处理完之后将状态标记为consumed,但也有可能因为在处理消息时产生异常,导致状态标记错误等,并且会产生性能的问题。第三种当然是最好的结果。
Kafka解决这个问题采用high water mark这样的标记,也就是设置offset:
1
|
Kafka does two unusual things with respect to metadata. First the stream is partitioned on the brokers into a set of distinct partitions. The semantic meaning of these partitions is left up to the producer and the producer specifies which partition a message belongs to. Within a partition messages are stored in the order in which they arrive at the broker, and will be given out to consumers in that same order. This means that rather than store metadata for each message (marking it as consumed, say), we just need to store the "high water mark" for each combination of consumer, topic, and partition. Hence the total metadata required to summarize the state of the consumer is actually quite small. In Kafka we refer to this high-water mark as "the offset" for reasons that will become clear in the implementation section.
|
所以在每次消费信息时,log4j中都会输出不同的offset:
1
2
3
|
[FetchRunnable- 0 ] INFO : kafka.consumer.FetcherRunnable#info : FetchRunnable- 0 start fetching topic: test part: 0 offset: 0 from 192.168 . 181.128 : 9092
[FetchRunnable- 0 ] INFO : kafka.consumer.FetcherRunnable#info : FetchRunnable- 0 start fetching topic: test part: 0 offset: 15 from 192.168 . 181.128 : 9092
|
除了采用不同的groupid去抓取已经消费过的数据,kafka还提供了另一种思路,这种方式更适合线下的操作,镜像。
通过一些配置,就可以将线上产生的数据同步到镜像中去,然后再由特定的集群区处理大批量的数据,这种方式可以采用low level的API按照不同的partition和offset来抓取数据,以获得更好的并行处理效果。
http://my.oschina.net/ielts0909/blog/110280
相关推荐
【自然语言处理(NLP)】机器翻译之数据处理(数据收集、数据清洗、数据分词、数据标注、数据划分)
1、文件内容:fence-agents-rhevm-4.2.1-41.el7_9.6.rpm以及相关依赖 2、文件形式:tar.gz压缩包 3、安装指令: #Step1、解压 tar -zxvf /mnt/data/output/fence-agents-rhevm-4.2.1-41.el7_9.6.tar.gz #Step2、进入解压后的目录,执行安装 sudo rpm -ivh *.rpm 4、安装指导:私信博主,全程指导安装
1.版本:matlab2014/2019a/2024a 2.附赠案例数据可直接运行matlab程序。 3.代码特点:参数化编程、参数可方便更改、代码编程思路清晰、注释明细。 4.适用对象:计算机,电子信息工程、数学等专业的大学生课程设计、期末大作业和毕业设计。
适用于Matlab 2019a和b版本的基于MRAS无位置传感器控制系统设计:速度环模块采用PI与MTPA控制策略,适用于Matlab2019a和b版本 速度环模块儿分别用PI和MTPA控制策略 基于MRAS(模型参考自适应法)的无位置传感器控制系统设计。 ,关键词:Matlab 2019a/b; 速度环模块; PI控制策略; MTPA控制策略; MRAS; 无位置传感器控制系统设计。,Matlab 2019a/b版本:基于PI和MTPA控制策略的MRAS无位置传感器控制系统设计。
1、文件内容:exiv2-devel-0.27.0-4.el7_8.rpm以及相关依赖 2、文件形式:tar.gz压缩包 3、安装指令: #Step1、解压 tar -zxvf /mnt/data/output/exiv2-devel-0.27.0-4.el7_8.tar.gz #Step2、进入解压后的目录,执行安装 sudo rpm -ivh *.rpm 4、安装指导:私信博主,全程指导安装
1.版本:matlab2014/2019a/2024a 2.附赠案例数据可直接运行matlab程序。 3.代码特点:参数化编程、参数可方便更改、代码编程思路清晰、注释明细。 4.适用对象:计算机,电子信息工程、数学等专业的大学生课程设计、期末大作业和毕业设计。
1.Matlab实现CNN-BiLSTM卷积双向长短期记忆神经网络时间序列预测(Matlab完整源码和数据)。 2.输出MAE 、 MAPE、MSE、RMSE、R2多指标评价,运行环境Matlab2023及以上。 3.代码特点:参数化编程、参数可方便更改、代码编程思路清晰、注释明细。 4.适用对象:计算机,电子信息工程、数学等专业的大学生课程设计、期末大作业和毕业设计。 5.作者介绍:机器学习之心,博客专家认证,机器学习领域创作者,2023博客之星TOP50,主做机器学习和深度学习时序、回归、分类、聚类和降维等程序设计和案例分析,文章底部有博主联系方式。从事Matlab、Python算法仿真工作8年,更多仿真源码、数据集定制私信.
1、文件内容:dotconf-1.3-8.el7.rpm以及相关依赖 2、文件形式:tar.gz压缩包 3、安装指令: #Step1、解压 tar -zxvf /mnt/data/output/dotconf-1.3-8.el7.tar.gz #Step2、进入解压后的目录,执行安装 sudo rpm -ivh *.rpm 4、安装指导:私信博主,全程指导安装
项目已获导师指导并通过的高分毕业设计项目,可作为课程设计和期末大作业,下载即用无需修改,项目完整确保可以运行。 包含:项目源码、数据库脚本、软件工具等,该项目可以作为毕设、课程设计使用,前后端代码都在里面。 该系统功能完善、界面美观、操作简单、功能齐全、管理便捷,具有很高的实际应用价值。 项目都经过严格调试,确保可以运行!可以放心下载 技术组成 语言:java 开发环境:idea 数据库:MySql8.0 部署环境:Tomcat(建议用 7.x 或者 8.x 版本),maven 数据库工具:navicat
1.Matlab实现CNN-LSTM卷积长短期记忆神经网络时间序列预测(Matlab完整源码和数据)。 2.输出MAE 、 MAPE、MSE、RMSE、R2多指标评价,运行环境Matlab2023及以上。 3.代码特点:参数化编程、参数可方便更改、代码编程思路清晰、注释明细。 4.适用对象:计算机,电子信息工程、数学等专业的大学生课程设计、期末大作业和毕业设计。 5.作者介绍:机器学习之心,博客专家认证,机器学习领域创作者,2023博客之星TOP50,主做机器学习和深度学习时序、回归、分类、聚类和降维等程序设计和案例分析,文章底部有博主联系方式。从事Matlab、Python算法仿真工作8年,更多仿真源码、数据集定制私信.
2025医学三基考试题库及答案(通用版).docx
1.版本:matlab2014/2019a/2024a 2.附赠案例数据可直接运行matlab程序。 3.代码特点:参数化编程、参数可方便更改、代码编程思路清晰、注释明细。 4.适用对象:计算机,电子信息工程、数学等专业的大学生课程设计、期末大作业和毕业设计。
数字简谱播放程序代码
2025医疗三基三严知识试题题库(附答案).docx
基于牛顿拉夫逊潮流计算结果的故障支路功率灵敏度分析与潮流修正量计算,开断潮流,基于牛顿拉夫逊潮流计算结果,引入灵敏度矩阵和雅可比矩阵计算支路功率对故障点注入功率的灵敏度,进而计算故障后所有支路潮流的修正量 ,核心关键词:开断潮流;牛顿拉夫逊潮流计算;灵敏度矩阵;雅可比矩阵;支路功率;故障点注入功率;故障后支路潮流修正量;计算。,基于潮流计算结果的故障后支路功率灵敏度分析及修正量计算
1、文件内容:festvox-bdl-arctic-hts-0.20061229-28.el7.rpm以及相关依赖 2、文件形式:tar.gz压缩包 3、安装指令: #Step1、解压 tar -zxvf /mnt/data/output/festvox-bdl-arctic-hts-0.20061229-28.el7.tar.gz #Step2、进入解压后的目录,执行安装 sudo rpm -ivh *.rpm 4、安装指导:私信博主,全程指导安装
1.版本:matlab2014/2019a/2024a 2.附赠案例数据可直接运行matlab程序。 3.代码特点:参数化编程、参数可方便更改、代码编程思路清晰、注释明细。 4.适用对象:计算机,电子信息工程、数学等专业的大学生课程设计、期末大作业和毕业设计。
OPENCV4.8.0+MINGW编译
1.版本:matlab2014/2019a/2024a 2.附赠案例数据可直接运行matlab程序。 3.代码特点:参数化编程、参数可方便更改、代码编程思路清晰、注释明细。 4.适用对象:计算机,电子信息工程、数学等专业的大学生课程设计、期末大作业和毕业设计。
2025最新医院收费员考试题库及答案.docx