`
longgangbai
  • 浏览: 7280851 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论
阅读更多

本文链接: http://www.54chen.com/java-ee/linkedin-kafka-usage.html

1、  概述

Kafka是Linkedin于2010年12月份开源的消息系统,它主要用于处理活跃的流式数据。活跃的流式数据在web网站应用中非常常见,这些数据包括网站的pv、用户访问了什么内容,搜索了什么内容等。 这些数据通常以日志的形式记录下来,然后每隔一段时间进行一次统计处理。

传统的日志分析系统提供了一种离线处理日志信息的可扩展方案,但若要进行实时处理,通常会有较大延迟。而现有的消(队列)系统能够很好的处理实时或者近似实时的应用,但未处理的数据通常不会写到磁盘上,这对于Hadoop之类(一小时或者一天只处理一部分数据)的离线应用而言,可能存在问题。Kafka正是为了解决以上问题而设计的,它能够很好地离线和在线应用。

2、  设计目标

(1)数据在磁盘上存取代价为O(1)。一般数据在磁盘上是使用BTree存储的,存取代价为O(lgn)。

(2)高吞吐率。即使在普通的节点上每秒钟也能处理成百上千的message。

(3)显式分布式,即所有的producer、broker和consumer都会有多个,均为分布式的。

(4)支持数据并行加载到Hadoop中。

3、  KafKa部署结构


kafka是显式分布式架构,producer、broker(Kafka)和consumer都可以有多个。Kafka的作用类似于缓存,即活跃的数据和离线处理系统之间的缓存。几个基本概念:

(1)message(消息)是通信的基本单位,每个producer可以向一个topic(主题)发布一些消息。如果consumer订阅了这个主题,那么新发布的消息就会广播给这些consumer。

(2)Kafka是显式分布式的,多个producer、consumer和broker可以运行在一个大的集群上,作为一个逻辑整体对外提供服务。对于consumer,多个consumer可以组成一个group,这个message只能传输给某个group中的某一个consumer.

4、  KafKa关键技术点

(1)  zero-copy

在Kafka上,有两个原因可能导致低效:1)太多的网络请求 2)过多的字节拷贝。为了提高效率,Kafka把message分成一组一组的,每次请求会把一组message发给相应的consumer。 此外, 为了减少字节拷贝,采用了sendfile系统调用。为了理解sendfile原理,先说一下传统的利用socket发送文件要进行拷贝:

Sendfile系统调用:

(2) Exactly once message transfer

怎样记录每个consumer处理的信息的状态?在Kafka中仅保存了每个consumer已经处理数据的offset。这样有两个好处:1)保存的数据量少 2)当consumer出错时,重新启动consumer处理数据时,只需从最近的offset开始处理数据即可。

(3)Push/pull

Producer 向Kafka(push)推数据,consumer 从kafka 拉(pull)数据。

(4)负载均衡和容错

Producer和broker之间没有负载均衡机制。
broker和consumer之间利用zookeeper进行负载均衡。所有broker和consumer都会在zookeeper中进行注册,且zookeeper会保存他们的一些元数据信息。如果某个broker和consumer发生了变化,所有其他的broker和consumer都会得到通知。

【参考资料】

【1】Kafka主页:http://sna-projects.com/kafka/design.php

【2】Zero-copy原理:https://www.ibm.com/developerworks/linux/library/j-zerocopy/

【3】Kafka与Hadoop:http://sna-projects.com/sna/media/kafka_hadoop.pdf

kafka 介绍

 

kafka是一种高吞吐量的分布式发布订阅消息系统,她有如下特性:

通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
高吞吐量:即使是非常普通的硬件kafka也可以支持每秒数十万的消息。
支持通过kafka服务器和消费机集群来分区消息。
支持Hadoop并行数据加载。

 

设计侧重高吞吐量,用于好友动态,相关性统计,排行统计,访问频率控制,批处理等系统。大部分的消息中间件能够处理实时性要求高的消息/数据,但是对于队列中大量未处理的消息/数据在持久性方面比较弱。

kakfa的consumer使用拉的方式工作。

安装kafka
下载:http://people.apache.org/~nehanarkhede/kafka-0.7.0-incubating/kafka-0.7.0-incubating-src.tar.gz

> tar xzf kafka-.tgz
> cd kafka-
> ./sbt update
> ./sbt package
启动zkserver:
bin/zookeeper-server-start.sh config/zookeeper.properties
启动server:
bin/kafka-server-start.sh config/server.properties
就是这么简单。

使用kafka

  1. import java.util.Arrays;  
  2. import java.util.List;  
  3. import java.util.Properties;  
  4. import kafka.javaapi.producer.SyncProducer;  
  5. import kafka.javaapi.message.ByteBufferMessageSet;  
  6. import kafka.message.Message;  
  7. import kafka.producer.SyncProducerConfig;  
  8.   
  9. ...  
  10.   
  11. Properties props = new Properties();  
  12. props.put(“zk.connect”, “127.0.0.1:2181”);  
  13. props.put("serializer.class""kafka.serializer.StringEncoder");  
  14. ProducerConfig config = new ProducerConfig(props);  
  15. Producer<String, String> producer = new Producer<String, String>(config);  
  16.   
  17. Send a single message  
  18.   
  19. // The message is sent to a randomly selected partition registered in ZK  
  20. ProducerData<String, String> data = new ProducerData<String, String>("test-topic""test-message");  
  21. producer.send(data);      
  22.   
  23. producer.close();  

这样就是一个标准的producer。

consumer的代码

  1. // specify some consumer properties  
  2. Properties props = new Properties();  
  3. props.put("zk.connect""localhost:2181");  
  4. props.put("zk.connectiontimeout.ms""1000000");  
  5. props.put("groupid""test_group");  
  6.   
  7. // Create the connection to the cluster  
  8. ConsumerConfig consumerConfig = new ConsumerConfig(props);  
  9. ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);  
  10.   
  11. // create 4 partitions of the stream for topic “test”, to allow 4 threads to consume  
  12. Map<String, List<KafkaMessageStream<Message>>> topicMessageStreams =   
  13.     consumerConnector.createMessageStreams(ImmutableMap.of("test"4));  
  14. List<KafkaMessageStream<Message>> streams = topicMessageStreams.get("test");  
  15.   
  16. // create list of 4 threads to consume from each of the partitions   
  17. ExecutorService executor = Executors.newFixedThreadPool(4);  
  18.   
  19. // consume the messages in the threads  
  20. for(final KafkaMessageStream<Message> stream: streams) {  
  21.   executor.submit(new Runnable() {  
  22.     public void run() {  
  23.       for(Message message: stream) {  
  24.         // process message  
  25.       }   
  26.     }  
  27.   });  
  28. }  

 

分享到:
评论

相关推荐

    分布式消息系统Kafka.pdf

    本文主要介绍了分布式消息系统Kafka的概述、架构、应用场景、工作原理等方面的知识点。 1. Kafka 概述 Kafka 是一个快速、可扩展的、高吞吐的、可容错的分布式“发布-订阅”消息系统。使用 Scala 与 Java 语言编写...

    基于分布式的发布订阅消息系统Kafka

    **基于分布式的发布订阅消息系统Kafka** Kafka是一种高性能、可扩展的分布式消息系统,由LinkedIn开发并贡献给了Apache软件基金会。它被设计为一个实时处理大量数据的平台,适用于大数据流处理、日志聚合、网站活性...

    浅谈分布式消息技术:Kafka.docx

    Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等,Linkedin于2010年贡献...

    kafka介绍及部署

    Kafka是Linkedin于2010年12月份开源的消息系统,它主要用于处理活跃的流式数据和运营数据,同时也是支持通用的消息语义(messaging semantics)。其中活跃的流式数据包括页面访问量(page view)、被查看内容方面的...

    kafka原理介绍及参数.pptx

    消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。 顺序保证  在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证...

    Apache Kafka实战.pdf

    Kafka是一个高吞吐量、低延迟的消息发布订阅系统,常用于构建实时数据管道和流应用程序。以下是根据书中的内容提取的关键知识点: 1. **Kafka基础**:Kafka是一个分布式流处理平台,它允许发布和订阅持久化的消息流...

    Apache Kafka入门介绍.zip

    总之,Apache Kafka 是一个强大而灵活的分布式消息系统,适用于构建实时数据管道和流应用。通过了解它的核心概念、应用场景以及 Java API 的使用,开发者可以充分利用 Kafka 的优势,构建高性能的大数据处理系统。

    java实现flink订阅Kerberos认证的Kafka消息示例源码

    而Kafka是一个分布式消息系统,广泛用于构建实时数据管道和流应用。为了在Flink中安全地连接到Kafka,我们需要使用Kerberos协议,这是一个广泛采用的网络身份验证协议,可以提供互操作性和可扩展性。 1. **Kerberos...

    kafka.pdf 介绍 为何使用消息系统

    1.1. 为何使用消息系统 1.1.1. 解耦 在项目启动之初来预测将来项目会碰到什么需求, 是极其困难的。 消息系统在处理过程中间插入了一个隐含的、 基于数据的接口层, 两边的处理过程都要实现这一接口。 这允许你独立...

    Kafka与spring集成

    Kafka分布式消息系统是一种高吞吐量、可扩展、基于发布订阅模式的消息系统,广泛应用于大数据处理、实时数据处理和日志处理等领域。Spring Framework是一种流行的Java应用程序框架,提供了一个广泛的编程模型和配置...

    kafka安装及详细介绍

    Kafka 是一个高吞吐的分布式消息队列系统,具有生产者消费者模式,先进先出(FIFO)保证顺序,不丢失数据,默认每隔 7 天清理数据。事件记录了一个事实,即世界或企业中发生的“某些事情”。事件具有键、值、时间戳...

    深入剖析Kafka设计原理:如何构建高效的消息系统

    本文详细解析了Kafka的设计原理,重点介绍了Kafka作为一种高效的分布式消息系统的核心组件和机制。首先,文档解释了Kafka的总控制器(Controller)的作用,它负责管理集群中的分区和副本状态,并在必要时进行Leader...

    Kafka技术内幕:图文详解Kafka源码设计与实现+书签.pdf+源码

    以下将详细介绍Kafka的一些关键技术点: 1. **发布/订阅模型**:Kafka采用发布/订阅模型,生产者发布消息到主题(Topic),消费者则订阅这些主题以接收消息。每个主题可以被分成多个分区(Partition),保证了消息...

    大数据kafka的api详细介绍和使用,包含kafka的安装部署

    Kafka是一款由LinkedIn开发并贡献给Apache的开源分布式消息系统,主要用于处理实时数据流。其强大的性能、高吞吐量和可扩展性使其成为大数据领域的重要组件。本文将深入探讨Kafka的API,以及如何进行安装、部署和...

    Kafka .Net Framework4.0 版本

    本篇将详细介绍Kafka .Net Framework 4.0版本以及其相关知识点。 首先,Kafka .Net是.NET社区开发的一个C#和F#接口,它为.NET开发者提供了与Kafka集群通信的便利。这个库允许开发者发送和接收消息,管理主题和分区...

    kafka连接工具客户端.rar

    本篇文章将重点介绍在Windows系统下使用的Kafka连接工具客户端,以及如何高效地利用这些工具提升Kafka操作的便捷性和效率。 首先,"kafka连接工具客户端.rar"这个压缩包中包含的是针对Windows平台设计的Kafka可视化...

    kafka-demo

    Kafka,由LinkedIn开源并贡献给Apache基金会,是一款高吞吐量的分布式消息系统,被广泛应用于实时数据流处理和大数据领域。Kafka的核心特性包括高可用性、高可扩展性和消息持久化,使其在大数据实时处理中占据重要...

    Apache Kafka 基本介绍.zip

    Kafka最初设计为高吞吐量、低延迟的消息发布订阅系统,现在已经发展成为了一个全面的数据流平台,用于构建实时数据管道和流应用。 **一、Kafka的核心概念** 1. **主题(Topics)**: 主题是Kafka中的数据流,可以...

    jemter-kafka连接器

    在现代大数据处理系统中,Apache Kafka作为一款分布式消息中间件,广泛应用于实时数据流处理。为了验证和优化Kafka系统的性能,开发者通常需要进行大规模并发数据发送的测试。这时,JMeter,一个开源的性能测试工具...

Global site tag (gtag.js) - Google Analytics