http://dongxicheng.org/search-engine/kafka/
http://shift-alt-ctrl.iteye.com/blog/1930791
kafka作为分布式日志收集或系统监控服务,我们有必要在合适的场合使用它。kafka的部署包括zookeeper环境/kafka环境,同时还需要进行一些配置操作.接下来介绍如何使用kafka.
我们使用3个zookeeper实例构建zk集群,使用2个kafka broker构建kafka集群.
其中kafka为0.8V,zookeeper为3.4.5V
一.Zookeeper集群构建
我们有3个zk实例,分别为zk-0,zk-1,zk-2;如果你仅仅是测试使用,可以使用1个zk实例.
1) zk-0
调整配置文件:
- clientPort=2181
- server.0=127.0.0.1:2888:3888
- server.1=127.0.0.1:2889:3889
- server.2=127.0.0.1:2890:3890
- ##只需要修改上述配置,其他配置保留默认值
启动zookeeper
- ./zkServer.sh start
2) zk-1
调整配置文件(其他配置和zk-0一只):
- clientPort=2182
- ##只需要修改上述配置,其他配置保留默认值
启动zookeeper
- ./zkServer.sh start
3) zk-2
调整配置文件(其他配置和zk-0一只):
- clientPort=2183
- ##只需要修改上述配置,其他配置保留默认值
启动zookeeper
- ./zkServer.sh start
二. Kafka集群构建
因为Broker配置文件涉及到zookeeper的相关约定,因此我们先展示broker配置文件.我们使用2个kafka broker来构建这个集群环境,分别为kafka-0,kafka-1.
1) kafka-0
在config目录下修改配置文件为:
- broker.id=0
- port=9092
- num.network.threads=2
- num.io.threads=2
- socket.send.buffer.bytes=1048576
- socket.receive.buffer.bytes=1048576
- socket.request.max.bytes=104857600
- log.dir=./logs
- num.partitions=2
- log.flush.interval.messages=10000
- log.flush.interval.ms=1000
- log.retention.hours=168
- #log.retention.bytes=1073741824
- log.segment.bytes=536870912
- num.replica.fetchers=2
- log.cleanup.interval.mins=10
- zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
- zookeeper.connection.timeout.ms=1000000
- kafka.metrics.polling.interval.secs=5
- kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter
- kafka.csv.metrics.dir=/tmp/kafka_metrics
- kafka.csv.metrics.reporter.enabled=false
因为kafka用scala语言编写,因此运行kafka需要首先准备scala相关环境。
- > cd kafka-0
- > ./sbt update
- > ./sbt package
- > ./sbt assembly-package-dependency
其中最后一条指令执行有可能出现异常,暂且不管。 启动kafka broker:
- > JMS_PORT=9997 bin/kafka-server-start.sh config/server.properties &
因为zookeeper环境已经正常运行了,我们无需通过kafka来挂载启动zookeeper.如果你的一台机器上部署了多个kafka broker,你需要声明JMS_PORT.
2) kafka-1
- broker.id=1
- port=9093
- ##其他配置和kafka-0保持一致
然后和kafka-0一样执行打包命令,然后启动此broker.
- > JMS_PORT=9998 bin/kafka-server-start.sh config/server.properties &
到目前为止环境已经OK了,那我们就开始展示编程实例吧。
三.项目准备
项目基于maven构建,不得不说kafka java客户端实在是太糟糕了;构建环境会遇到很多麻烦。建议参考如下pom.xml;其中各个依赖包必须版本协调一致。如果kafka client的版本和kafka server的版本不一致,将会有很多异常,比如"broker id not exists"等;因为kafka从0.7升级到0.8之后(正名为2.8.0),client与server通讯的protocol已经改变.
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <groupId>com.test</groupId>
- <artifactId>test-kafka</artifactId>
- <packaging>jar</packaging>
- <name>test-kafka</name>
- <url>http://maven.apache.org</url>
- <version>1.0.0</version>
- <dependencies>
- <dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- <version>1.2.14</version>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.8.0</artifactId>
- <version>0.8.0-beta1</version>
- <exclusions>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- <version>2.8.1</version>
- </dependency>
- <dependency>
- <groupId>com.yammer.metrics</groupId>
- <artifactId>metrics-core</artifactId>
- <version>2.2.0</version>
- </dependency>
- <dependency>
- <groupId>com.101tec</groupId>
- <artifactId>zkclient</artifactId>
- <version>0.3</version>
- </dependency>
- </dependencies>
- <build>
- <finalName>test-kafka-1.0</finalName>
- <resources>
- <resource>
- <directory>src/main/resources</directory>
- <filtering>true</filtering>
- </resource>
- </resources>
- <plugins>
- <plugin>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>2.3.2</version>
- <configuration>
- <source>1.5</source>
- <target>1.5</target>
- <encoding>gb2312</encoding>
- </configuration>
- </plugin>
- <plugin>
- <artifactId>maven-resources-plugin</artifactId>
- <version>2.2</version>
- <configuration>
- <encoding>gbk</encoding>
- </configuration>
- </plugin>
- </plugins>
- </build>
- </project>
四.Producer端代码
1) producer.properties文件:此文件放在/resources目录下
- #partitioner.class=
- metadata.broker.list=127.0.0.1:9092,127.0.0.1:9093
- ##,127.0.0.1:9093
- producer.type=sync
- compression.codec=0
- serializer.class=kafka.serializer.StringEncoder
- ##在producer.type=async时有效
- #batch.num.messages=100
2) LogProducer.java代码样例
- package com.test.kafka;
- import java.util.ArrayList;
- import java.util.Collection;
- import java.util.List;
- import java.util.Properties;
- import kafka.javaapi.producer.Producer;
- import kafka.producer.KeyedMessage;
- import kafka.producer.ProducerConfig;
- public class LogProducer {
- private Producer<String,String> inner;
- public LogProducer() throws Exception{
- Properties properties = new Properties();
- properties.load(ClassLoader.getSystemResourceAsStream("producer.properties"));
- ProducerConfig config = new ProducerConfig(properties);
- inner = new Producer<String, String>(config);
- }
- public void send(String topicName,String message) {
- if(topicName == null || message == null){
- return;
- }
- KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message);
- inner.send(km);
- }
- public void send(String topicName,Collection<String> messages) {
- if(topicName == null || messages == null){
- return;
- }
- if(messages.isEmpty()){
- return;
- }
- List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();
- for(String entry : messages){
- KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,entry);
- kms.add(km);
- }
- inner.send(kms);
- }
- public void close(){
- inner.close();
- }
- /**
- * @param args
- */
- public static void main(String[] args) {
- LogProducer producer = null;
- try{
- producer = new LogProducer();
- int i=0;
- while(true){
- producer.send("test-topic", "this is a sample" + i);
- i++;
- Thread.sleep(2000);
- }
- }catch(Exception e){
- e.printStackTrace();
- }finally{
- if(producer != null){
- producer.close();
- }
- }
- }
- }
五.Consumer端
1) consumer.properties:文件位于/resources目录下
- zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
- ##,127.0.0.1:2182,127.0.0.1:2183
- # timeout in ms for connecting to zookeeper
- zookeeper.connectiontimeout.ms=1000000
- #consumer group id
- group.id=test-group
- #consumer timeout
- #consumer.timeout.ms=5000
- auto.commit.enable=true
- auto.commit.interval.ms=60000
2) LogConsumer.java代码样例
- package com.test.kafka;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.Properties;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import kafka.consumer.Consumer;
- import kafka.consumer.ConsumerConfig;
- import kafka.consumer.ConsumerIterator;
- import kafka.consumer.KafkaStream;
- import kafka.javaapi.consumer.ConsumerConnector;
- import kafka.message.MessageAndMetadata;
- public class LogConsumer {
- private ConsumerConfig config;
- private String topic;
- private int partitionsNum;
- private MessageExecutor executor;
- private ConsumerConnector connector;
- private ExecutorService threadPool;
- public LogConsumer(String topic,int partitionsNum,MessageExecutor executor) throws Exception{
- Properties properties = new Properties();
- properties.load(ClassLoader.getSystemResourceAsStream("consumer.properties"));
- config = new ConsumerConfig(properties);
- this.topic = topic;
- this.partitionsNum = partitionsNum;
- this.executor = executor;
- }
- public void start() throws Exception{
- connector = Consumer.createJavaConsumerConnector(config);
- Map<String,Integer> topics = new HashMap<String,Integer>();
- topics.put(topic, partitionsNum);
- Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(topics);
- List<KafkaStream<byte[], byte[]>> partitions = streams.get(topic);
- threadPool = Executors.newFixedThreadPool(partitionsNum);
- for(KafkaStream<byte[], byte[]> partition : partitions){
- threadPool.execute(new MessageRunner(partition));
- }
- }
- public void close(){
- try{
- threadPool.shutdownNow();
- }catch(Exception e){
- //
- }finally{
- connector.shutdown();
- }
- }
- class MessageRunner implements Runnable{
- private KafkaStream<byte[], byte[]> partition;
- MessageRunner(KafkaStream<byte[], byte[]> partition) {
- this.partition = partition;
- }
- public void run(){
- ConsumerIterator<byte[], byte[]> it = partition.iterator();
- while(it.hasNext()){
- //connector.commitOffsets();手动提交offset,当autocommit.enable=false时使用
- MessageAndMetadata<byte[],byte[]> item = it.next();
- System.out.println("partiton:" + item.partition());
- System.out.println("offset:" + item.offset());
- executor.execute(new String(item.message()));//UTF-8,注意异常
- }
- }
- }
- interface MessageExecutor {
- public void execute(String message);
- }
- /**
- * @param args
- */
- public static void main(String[] args) {
- LogConsumer consumer = null;
- try{
- MessageExecutor executor = new MessageExecutor() {
- public void execute(String message) {
- System.out.println(message);
- }
- };
- consumer = new LogConsumer("test-topic", 2, executor);
- consumer.start();
- }catch(Exception e){
- e.printStackTrace();
- }finally{
- // if(consumer != null){
- // consumer.close();
- // }
- }
- }
- }
需要提醒的是,上述LogConsumer类中,没有太多的关注异常情况,必须在MessageExecutor.execute()方法中抛出异常时的情况.
在测试时,建议优先启动consumer,然后再启动producer,这样可以实时的观测到最新的消息。
相关推荐
本文主要介绍了分布式消息系统Kafka的概述、架构、应用场景、工作原理等方面的知识点。 1. Kafka 概述 Kafka 是一个快速、可扩展的、高吞吐的、可容错的分布式“发布-订阅”消息系统。使用 Scala 与 Java 语言编写...
**基于分布式的发布订阅消息系统Kafka** Kafka是一种高性能、可扩展的分布式消息系统,由LinkedIn开发并贡献给了Apache软件基金会。它被设计为一个实时处理大量数据的平台,适用于大数据流处理、日志聚合、网站活性...
消息队列Kafka是一种分布式、高吞吐量、高可扩展性的消息服务系统。它最初是由LinkedIn公司开发,并于2011年捐赠给了Apache软件基金会,成为了开源项目之一。Kafka被设计为一个能够处理大规模数据流的平台,其高性能...
【Kafka介绍】 Apache Kafka是由LinkedIn开发并随后贡献给Apache软件基金会的一个开源流处理平台。Kafka最初设计的目的是为了处理大规模的实时数据流,它能够处理来自各种数据源的活跃流式数据,如页面访问统计、...
Kafka 提供了高吞吐量、低延迟的消息传递能力,是大数据领域中重要的消息队列(MQ)解决方案。Kafka-Eagle 是针对 Kafka 集群设计的一款高效、易用的监控工具,旨在提供对 Kafka 的深度监控和管理。 Kafka-Eagle 的...
Kafka是由LinkedIn于2010年12月开源的一款强大的消息系统,它主要用于处理活跃的流式数据,如网站的PV(页面浏览量)、用户行为数据等。传统日志分析系统虽然能够提供一种可扩展的离线处理日志信息的方案,但在实时...
Kafka 是一款开源的分布式消息系统,以其高吞吐量、低延迟的特点,在大数据处理领域有着广泛的应用。Kafka 由 LinkedIn 开发并贡献给 Apache 软件基金会,最终成为其顶级项目之一。Kafka 的核心设计理念是为实时数据...
它的分区(partitioning)和复制(replication)特性,进一步增强了消息系统的可用性和容错性。Kafka不仅能够处理海量数据的实时流式处理,还能够作为数据仓库与外部系统进行数据交换,支持各种实时计算任务和离线...
作者提供的其他相关文章,如 Java API 的使用、Kafka 的关键概念详解、分区和副本介绍,以及 Kafka 监控工具 Kafka-Eagle 的使用,将更深入地探讨 Kafka 的功能和操作。 总之,Apache Kafka 是一个强大的工具,适用...
### Kafka介绍 #### Kafka概述 Kafka是一种分布式发布-订阅消息系统,最初由LinkedIn公司开发,后成为Apache软件基金会的顶级项目。Kafka主要使用Scala语言编写,具有高吞吐量、可持久化、分布式扩展性强等特点。它...
Kafka的核心功能包括发布订阅消息系统、高吞吐量的数据处理以及持久化数据存储。然而,对于开发者和运维人员来说,测试和监控Kafka集群的性能和功能往往需要借助一些工具。本文将详细介绍一款可视化Kafka测试工具,...
**Kafka介绍** Apache Kafka是一款高性能、分布式的消息中间件,由LinkedIn开发并捐献给Apache软件基金会。它最初设计的目标是构建一个实时的数据管道,能够高效地处理大量的数据流,同时支持发布订阅和队列模型,...
本主题将详细介绍如何利用Logback和SLF4J来将日志记录到Kafka队列中,以及支持日志解析和过滤等扩展功能。 首先,我们需要理解SLF4J的工作原理。SLF4J提供了一组API,允许我们在应用程序中插入日志语句,而具体的...
消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。 顺序保证 在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证...
Kafka是一个高吞吐量、低延迟的消息发布订阅系统,常用于构建实时数据管道和流应用程序。以下是根据书中的内容提取的关键知识点: 1. **Kafka基础**:Kafka是一个分布式流处理平台,它允许发布和订阅持久化的消息流...
总之,Apache Kafka 是一个强大而灵活的分布式消息系统,适用于构建实时数据管道和流应用。通过了解它的核心概念、应用场景以及 Java API 的使用,开发者可以充分利用 Kafka 的优势,构建高性能的大数据处理系统。
而Kafka是一个分布式消息系统,广泛用于构建实时数据管道和流应用。为了在Flink中安全地连接到Kafka,我们需要使用Kerberos协议,这是一个广泛采用的网络身份验证协议,可以提供互操作性和可扩展性。 1. **Kerberos...
1.1. 为何使用消息系统 1.1.1. 解耦 在项目启动之初来预测将来项目会碰到什么需求, 是极其困难的。 消息系统在处理过程中间插入了一个隐含的、 基于数据的接口层, 两边的处理过程都要实现这一接口。 这允许你独立...
内容概要:文章详细介绍了如何使用 Java、Kafka 和 ZooKeeper 搭建一个高吞吐量的消息系统,涵盖了从环境准备、组件简介到实际编码的全过程。具体包括 Kafka 和 ZooKeeper 的基本概念、安装配置、生产者和消费者的 ...
这部分内容着重描述了Kafka作为高性能、可伸缩消息系统的内部工作机制。 操作章节介绍了如何进行基本的Kafka操作,如添加和移除topics、更改topics、优雅地关闭Kafka、检查consumer的位置、集群间做数据镜像、扩展...