本文链接: http://www.54chen.com/java-ee/linkedin-kafka-usage.html
1、 概述
Kafka是Linkedin于2010年12月份开源的消息系统,它主要用于处理活跃的流式数据。活跃的流式数据在web网站应用中非常常见,这些数据包括网站的pv、用户访问了什么内容,搜索了什么内容等。 这些数据通常以日志的形式记录下来,然后每隔一段时间进行一次统计处理。
传统的日志分析系统提供了一种离线处理日志信息的可扩展方案,但若要进行实时处理,通常会有较大延迟。而现有的消(队列)系统能够很好的处理实时或者近似实时的应用,但未处理的数据通常不会写到磁盘上,这对于Hadoop之类(一小时或者一天只处理一部分数据)的离线应用而言,可能存在问题。Kafka正是为了解决以上问题而设计的,它能够很好地离线和在线应用。
2、 设计目标
(1)数据在磁盘上存取代价为O(1)。一般数据在磁盘上是使用BTree存储的,存取代价为O(lgn)。
(2)高吞吐率。即使在普通的节点上每秒钟也能处理成百上千的message。
(3)显式分布式,即所有的producer、broker和consumer都会有多个,均为分布式的。
(4)支持数据并行加载到Hadoop中。
3、 KafKa部署结构
kafka是显式分布式架构,producer、broker(Kafka)和consumer都可以有多个。Kafka的作用类似于缓存,即活跃的数据和离线处理系统之间的缓存。几个基本概念:
(1)message(消息)是通信的基本单位,每个producer可以向一个topic(主题)发布一些消息。如果consumer订阅了这个主题,那么新发布的消息就会广播给这些consumer。
(2)Kafka是显式分布式的,多个producer、consumer和broker可以运行在一个大的集群上,作为一个逻辑整体对外提供服务。对于consumer,多个consumer可以组成一个group,这个message只能传输给某个group中的某一个consumer.
4、 KafKa关键技术点
(1) zero-copy
在Kafka上,有两个原因可能导致低效:1)太多的网络请求 2)过多的字节拷贝。为了提高效率,Kafka把message分成一组一组的,每次请求会把一组message发给相应的consumer。 此外, 为了减少字节拷贝,采用了sendfile系统调用。为了理解sendfile原理,先说一下传统的利用socket发送文件要进行拷贝:
Sendfile系统调用:
(2) Exactly once message transfer
怎样记录每个consumer处理的信息的状态?在Kafka中仅保存了每个consumer已经处理数据的offset。这样有两个好处:1)保存的数据量少 2)当consumer出错时,重新启动consumer处理数据时,只需从最近的offset开始处理数据即可。
(3)Push/pull
Producer 向Kafka(push)推数据,consumer 从kafka 拉(pull)数据。
(4)负载均衡和容错
Producer和broker之间没有负载均衡机制。
broker和consumer之间利用zookeeper进行负载均衡。所有broker和consumer都会在zookeeper中进行注册,且zookeeper会保存他们的一些元数据信息。如果某个broker和consumer发生了变化,所有其他的broker和consumer都会得到通知。
【参考资料】
【1】Kafka主页:http://sna-projects.com/kafka/design.php
【2】Zero-copy原理:https://www.ibm.com/developerworks/linux/library/j-zerocopy/
【3】Kafka与Hadoop:http://sna-projects.com/sna/media/kafka_hadoop.pdf
kafka是一种高吞吐量的分布式发布订阅消息系统,她有如下特性:
通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
高吞吐量:即使是非常普通的硬件kafka也可以支持每秒数十万的消息。
支持通过kafka服务器和消费机集群来分区消息。
支持Hadoop并行数据加载。
设计侧重高吞吐量,用于好友动态,相关性统计,排行统计,访问频率控制,批处理等系统。大部分的消息中间件能够处理实时性要求高的消息/数据,但是对于队列中大量未处理的消息/数据在持久性方面比较弱。
kakfa的consumer使用拉的方式工作。
安装kafka
下载:http://people.apache.org/~nehanarkhede/kafka-0.7.0-incubating/kafka-0.7.0-incubating-src.tar.gz
> tar xzf kafka-.tgz
> cd kafka-
> ./sbt update
> ./sbt package
启动zkserver:
bin/zookeeper-server-start.sh config/zookeeper.properties
启动server:
bin/kafka-server-start.sh config/server.properties
就是这么简单。
使用kafka
- import java.util.Arrays;
- import java.util.List;
- import java.util.Properties;
- import kafka.javaapi.producer.SyncProducer;
- import kafka.javaapi.message.ByteBufferMessageSet;
- import kafka.message.Message;
- import kafka.producer.SyncProducerConfig;
- ...
- Properties props = new Properties();
- props.put(“zk.connect”, “127.0.0.1:2181”);
- props.put("serializer.class", "kafka.serializer.StringEncoder");
- ProducerConfig config = new ProducerConfig(props);
- Producer<String, String> producer = new Producer<String, String>(config);
- Send a single message
- // The message is sent to a randomly selected partition registered in ZK
- ProducerData<String, String> data = new ProducerData<String, String>("test-topic", "test-message");
- producer.send(data);
- producer.close();
这样就是一个标准的producer。
consumer的代码
- // specify some consumer properties
- Properties props = new Properties();
- props.put("zk.connect", "localhost:2181");
- props.put("zk.connectiontimeout.ms", "1000000");
- props.put("groupid", "test_group");
- // Create the connection to the cluster
- ConsumerConfig consumerConfig = new ConsumerConfig(props);
- ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
- // create 4 partitions of the stream for topic “test”, to allow 4 threads to consume
- Map<String, List<KafkaMessageStream<Message>>> topicMessageStreams =
- consumerConnector.createMessageStreams(ImmutableMap.of("test", 4));
- List<KafkaMessageStream<Message>> streams = topicMessageStreams.get("test");
- // create list of 4 threads to consume from each of the partitions
- ExecutorService executor = Executors.newFixedThreadPool(4);
- // consume the messages in the threads
- for(final KafkaMessageStream<Message> stream: streams) {
- executor.submit(new Runnable() {
- public void run() {
- for(Message message: stream) {
- // process message
- }
- }
- });
- }
相关推荐
本文主要介绍了分布式消息系统Kafka的概述、架构、应用场景、工作原理等方面的知识点。 1. Kafka 概述 Kafka 是一个快速、可扩展的、高吞吐的、可容错的分布式“发布-订阅”消息系统。使用 Scala 与 Java 语言编写...
**基于分布式的发布订阅消息系统Kafka** Kafka是一种高性能、可扩展的分布式消息系统,由LinkedIn开发并贡献给了Apache软件基金会。它被设计为一个实时处理大量数据的平台,适用于大数据流处理、日志聚合、网站活性...
消息队列Kafka是一种分布式、高吞吐量、高可扩展性的消息服务系统。它最初是由LinkedIn公司开发,并于2011年捐赠给了Apache软件基金会,成为了开源项目之一。Kafka被设计为一个能够处理大规模数据流的平台,其高性能...
【Kafka介绍】 Apache Kafka是由LinkedIn开发并随后贡献给Apache软件基金会的一个开源流处理平台。Kafka最初设计的目的是为了处理大规模的实时数据流,它能够处理来自各种数据源的活跃流式数据,如页面访问统计、...
Kafka 提供了高吞吐量、低延迟的消息传递能力,是大数据领域中重要的消息队列(MQ)解决方案。Kafka-Eagle 是针对 Kafka 集群设计的一款高效、易用的监控工具,旨在提供对 Kafka 的深度监控和管理。 Kafka-Eagle 的...
Kafka是由LinkedIn于2010年12月开源的一款强大的消息系统,它主要用于处理活跃的流式数据,如网站的PV(页面浏览量)、用户行为数据等。传统日志分析系统虽然能够提供一种可扩展的离线处理日志信息的方案,但在实时...
Kafka 是一款开源的分布式消息系统,以其高吞吐量、低延迟的特点,在大数据处理领域有着广泛的应用。Kafka 由 LinkedIn 开发并贡献给 Apache 软件基金会,最终成为其顶级项目之一。Kafka 的核心设计理念是为实时数据...
它的分区(partitioning)和复制(replication)特性,进一步增强了消息系统的可用性和容错性。Kafka不仅能够处理海量数据的实时流式处理,还能够作为数据仓库与外部系统进行数据交换,支持各种实时计算任务和离线...
作者提供的其他相关文章,如 Java API 的使用、Kafka 的关键概念详解、分区和副本介绍,以及 Kafka 监控工具 Kafka-Eagle 的使用,将更深入地探讨 Kafka 的功能和操作。 总之,Apache Kafka 是一个强大的工具,适用...
### Kafka介绍 #### Kafka概述 Kafka是一种分布式发布-订阅消息系统,最初由LinkedIn公司开发,后成为Apache软件基金会的顶级项目。Kafka主要使用Scala语言编写,具有高吞吐量、可持久化、分布式扩展性强等特点。它...
Kafka的核心功能包括发布订阅消息系统、高吞吐量的数据处理以及持久化数据存储。然而,对于开发者和运维人员来说,测试和监控Kafka集群的性能和功能往往需要借助一些工具。本文将详细介绍一款可视化Kafka测试工具,...
**Kafka介绍** Apache Kafka是一款高性能、分布式的消息中间件,由LinkedIn开发并捐献给Apache软件基金会。它最初设计的目标是构建一个实时的数据管道,能够高效地处理大量的数据流,同时支持发布订阅和队列模型,...
本主题将详细介绍如何利用Logback和SLF4J来将日志记录到Kafka队列中,以及支持日志解析和过滤等扩展功能。 首先,我们需要理解SLF4J的工作原理。SLF4J提供了一组API,允许我们在应用程序中插入日志语句,而具体的...
Kafka是一个高吞吐量、低延迟的消息发布订阅系统,常用于构建实时数据管道和流应用程序。以下是根据书中的内容提取的关键知识点: 1. **Kafka基础**:Kafka是一个分布式流处理平台,它允许发布和订阅持久化的消息流...
消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。 顺序保证 在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证...
总之,Apache Kafka 是一个强大而灵活的分布式消息系统,适用于构建实时数据管道和流应用。通过了解它的核心概念、应用场景以及 Java API 的使用,开发者可以充分利用 Kafka 的优势,构建高性能的大数据处理系统。
而Kafka是一个分布式消息系统,广泛用于构建实时数据管道和流应用。为了在Flink中安全地连接到Kafka,我们需要使用Kerberos协议,这是一个广泛采用的网络身份验证协议,可以提供互操作性和可扩展性。 1. **Kerberos...
1.1. 为何使用消息系统 1.1.1. 解耦 在项目启动之初来预测将来项目会碰到什么需求, 是极其困难的。 消息系统在处理过程中间插入了一个隐含的、 基于数据的接口层, 两边的处理过程都要实现这一接口。 这允许你独立...
内容概要:文章详细介绍了如何使用 Java、Kafka 和 ZooKeeper 搭建一个高吞吐量的消息系统,涵盖了从环境准备、组件简介到实际编码的全过程。具体包括 Kafka 和 ZooKeeper 的基本概念、安装配置、生产者和消费者的 ...
这部分内容着重描述了Kafka作为高性能、可伸缩消息系统的内部工作机制。 操作章节介绍了如何进行基本的Kafka操作,如添加和移除topics、更改topics、优雅地关闭Kafka、检查consumer的位置、集群间做数据镜像、扩展...