`

kafka小试

 
阅读更多

我使用的kafka版本 kafka_2.8.0-0.8.1.1.tgz

参考了官网手册http://kafka.apache.org/documentation.html#quickstart

和http://blog.csdn.net/hxpjava1/article/details/19160665  版本低一下,里面有些代码不兼容

  1. 下载kafka 地址http://mirrors.hust.edu.cn/apache/kafka/0.8.1.1/kafka_2.8.0-0.8.1.1.tgz
tar -xzf kafka_2.9.2-0.8.1.1.tgz
cd kafka_2.9.2-0.8.1.1

    2.启动服务

       首先要启动zookeeper      

bin/zookeeper-server-start.sh config/zookeeper.properties &

      启动kafaka

bin/kafka-server-start.sh config/server.properties &

   3.创建topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

   查看是否创建成功

bin/kafka-topics.sh --list --zookeeper localhost:2181

  4.发送消息

import java.util.*;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
 
public class TestProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("metadata.broker.list", "test.kafka.com:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("request.required.acks", "1");
        ProducerConfig config = new ProducerConfig(props);
        Producer<String, String> producer = new Producer<String, String>(config);
        KeyedMessage<String, String> data = new KeyedMessage<String, String>("test", "key", "测试");
        producer.send(data);
        producer.close();
        System.out.println("结束");
    }
}

 5.接收消息

 

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;

public class ConsumerSample {
	
	public static void main(String[] args) {  
        // specify some consumer properties  
		Properties props = new Properties();  
       props.put("group.id", "test-consumer-group");
	   props.put("zookeeper.connect", "test.kafka.com:2181");
       props.put("zookeeper.session.timeout.ms", "400");
       props.put("zookeeper.sync.time.ms", "200");
       props.put("auto.commit.interval.ms", "1000");
		
		// Create the connection to the cluster  
		ConsumerConfig consumerConfig = new ConsumerConfig(props);  
		ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);  
		
		        // create 4 partitions of the stream for topic “test-topic”, to allow 4 threads to consume  
		HashMap<String, Integer> map = new HashMap<String, Integer>();  
		map.put("test", 4); 
		 Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams =  
		        consumerConnector.createMessageStreams(map);  
		List<KafkaStream<byte[], byte[]>> streams = topicMessageStreams.get("test");  
		
		        // create list of 4 threads to consume from each of the partitions   
		ExecutorService executor = Executors.newFixedThreadPool(4);  
		
		        // consume the messages in the threads  
		for (final KafkaStream<byte[], byte[]> stream : streams) {  
		    executor.submit(new Runnable() {  
		        public void run() {  
		            for (MessageAndMetadata<byte[], byte[]> msgAndMetadata : stream) { 
		            	System.out.println("topic:"+msgAndMetadata.topic());
		                String tmp = new String(msgAndMetadata.message());
		                System.out.println("message key: " + new String(msgAndMetadata.key())); 
		                System.out.println("message content: " + tmp);  
		            }  
		        }  
		    });  
		}  

	}
}

 

  6.注意的地方

     test.kafka.com  为域名映射,可以自己映射到自己的kafka的ip地址

     如果发送消息失败 看下防火墙是否关闭

    对于group.id可以查看config/consumer.properties的配置

  7.如果出现FailedToSendMessageException: Failed to send messages after 3 tries错误

     修改config/server.properties  链接zookeeper为

    zookeeper.connect=127.0.0.1:2181

    配置的时候最好通过域名映射添加topic

  8.maven配置文件

<dependencies>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>3.8.1</version>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>io.netty</groupId>
			<artifactId>netty-all</artifactId>
			<version>4.0.0.Final</version>
		</dependency>
		<dependency>
			<groupId>log4j</groupId>
			<artifactId>log4j</artifactId>
			<version>1.2.14</version>
		</dependency>
		<dependency>
		  <groupId>org.apache.kafka</groupId>
		  <artifactId>kafka_2.9.2</artifactId>
		  <version>0.8.1.1</version>
		</dependency> 
		<dependency>
		  <groupId>org.scala-lang</groupId>
		  <artifactId>scala-library</artifactId>
		  <version>2.9.3</version>
		</dependency>
		<dependency>
			<groupId>com.yammer.metrics</groupId>
			<artifactId>metrics-core</artifactId>
			<version>2.2.0</version>
		</dependency>
		<dependency>
			<groupId>com.101tec</groupId>
			<artifactId>zkclient</artifactId>
			<version>0.3</version>
		</dependency>
	</dependencies>

 

分享到:
评论

相关推荐

    kafkatool 连接kafka工具

    **Kafka Tool 连接 Kafka 工具详解** 在大数据处理和实时流处理领域,Apache Kafka 是一个不可或缺的组件,它作为一个分布式的消息中间件,提供高效、可扩展且可靠的发布订阅服务。为了方便管理和操作 Kafka 集群,...

    Kafka管理工具Kafka Tool

    **Kafka Tool:高效管理Apache Kafka集群的利器** Apache Kafka是一个分布式的流处理平台,广泛应用于大数据实时处理、日志聚合、消息系统等多个领域。在Kafka的实际操作中,管理和监控集群是至关重要的任务,而...

    kafka可视化工具--kafkatool

    **Kafka工具详解——Kafkatool** Kafka作为一个分布式流处理平台,广泛应用于大数据实时处理和消息传递。然而,管理Kafka集群和操作其组件(如topics、partitions、offsets等)可能会变得复杂,这时就需要一些可视...

    springboot 基于spring-kafka动态创建kafka消费者

    在Spring Boot应用中,我们可以利用Spring Kafka框架来与Apache Kafka进行集成,实现高效的消息传递。本文将详细探讨如何在Spring Boot项目中基于Spring Kafka动态创建Kafka消费者。 首先,了解Kafka基本概念:...

    kafka2种工具 kafkatool-64bit.exe kafka-eagle-bin-1.4.6.tar.gz

    在IT行业中,Kafka是一种广泛使用的分布式流处理平台,它由Apache软件基金会开发,主要用于构建实时数据管道和流应用。本文将围绕标题和描述中提到的两种Kafka工具——kafkatool-64bit.exe和kafka-eagle-bin-1.4.6....

    StormStorm集成Kafka 从Kafka中读取数据

    本文将深入探讨如何实现Storm与Kafka的集成,重点在于如何从Kafka中读取数据。 **一、整合说明** Apache Storm是一个开源的分布式实时计算系统,它能够持续处理无限的数据流,确保每个事件都得到精确一次(Exactly...

    5、kafka监控工具Kafka-Eagle介绍及使用

    Apache Kafka 是一个分布式流处理平台,常用于构建实时的数据管道和应用。Kafka 提供了高吞吐量、低延迟的消息传递能力,是大数据领域中重要的消息队列(MQ)解决方案。Kafka-Eagle 是针对 Kafka 集群设计的一款高效...

    kafka-java-demo 基于java的kafka生产消费者示例

    【Kafka基础知识】 Kafka是由Apache开发的分布式流处理平台,它主要被设计用来处理实时数据流。在大数据处理领域,Kafka常被用于构建实时数据管道和流应用,能够高效地处理大量的实时数据。 【Java与Kafka的结合】...

    Kafka Tool linux版本,适用于kafka0.11及以上

    **Kafka Tool for Linux: 管理与使用Apache Kafka集群的高效工具** Apache Kafka是一款分布式流处理平台,常用于构建实时数据管道和流应用。Kafka Tool是针对Kafka集群进行管理和操作的一款图形用户界面(GUI)工具...

    Kafka详细课程讲义

    **Kafka详细课程讲义** 本课程主要涵盖了Apache Kafka的核心概念、安装配置、架构解析、API使用以及监控与面试知识点,旨在帮助学习者全面理解并掌握这一强大的分布式流处理平台。 **第 1 章 Kafka 概述** Apache...

    Kafka技术内幕:图文详解Kafka源码设计与实现+书签.pdf+源码

    《Kafka技术内幕:图文详解Kafka源码设计与实现》是一本深入解析Apache Kafka的专著,旨在帮助读者理解Kafka的核心设计理念、内部机制以及源码实现。这本书结合图文并茂的方式,使得复杂的概念变得更为易懂。同时,...

    Kafka尚硅谷.rar

    **Kafka概述** Kafka是由LinkedIn开发并贡献给Apache软件基金会的一个开源消息系统,它是一个高性能、可扩展的分布式消息中间件。Kafka最初设计的目标是处理网站活动流数据,但随着时间的发展,它已被广泛应用于...

    Kafka技术内幕-图文详解Kafka源码设计与实现

    Kafka自LinkedIn开源以来就以高性能、高吞吐量、分布式的特性著称,本书以0.10版本的源码为基础,深入分析了Kafka的设计与实现,包括生产者和消费者的消息处理流程,新旧消费者不同的设计方式,存储层的实现,协调者...

    kafka安装包-2.13-3.6.2

    **Kafka介绍** Apache Kafka是一款高性能、分布式的消息中间件,由LinkedIn开发并捐献给Apache软件基金会。它最初设计的目标是构建一个实时的数据管道,能够高效地处理大量的数据流,同时支持发布订阅和队列模型,...

    最新版kafka kafka_2.12-2.5.1.tgz

    **Kafka 2.5.1 知识点详解** Kafka 是一个分布式流处理平台,由 Apache 软件基金会开发,广泛应用于大数据实时处理、日志收集、消息系统等多个领域。`kafka_2.12-2.5.1` 是 Kafka 的一个特定版本,针对 Scala 2.12 ...

    使用netty实现TCP长链接消息写入kafka以及kafka批量消费数据

    在IT行业中,网络通信和大数据处理是两个至关重要的领域,Netty和Kafka分别是这两个领域的佼佼者。Netty是一个高性能、异步事件驱动的网络应用程序框架,常用于开发高并发、低延迟的网络应用,如TCP服务器。而Kafka...

    Kafka the Definitive Guide 2nd Edition

    Kafka the Definitive Guide Kafka 是一个分布式流媒体平台,用于构建实时数据处理和流媒体处理系统。下面是 Kafka 的一些重要知识点: 1. Kafka 概述 Kafka 是一个基于发布/订阅模式的消息队列系统,由 LinkedIn...

    kafka资源下载kafka_2.11-2.0.0.tgz

    ### 关于Kafka资源下载kafka_2.11-2.0.0.tgz的知识点 #### Kafka简介 Apache Kafka是一种开源的消息队列服务,它最初由LinkedIn开发,并于2011年成为Apache软件基金会的一个顶级项目。Kafka因其高性能、可扩展性和...

    Kafka Tool 2.0.4.zip

    Kafka Tool 2.0.4是一款专为Kafka设计的强大的客户端工具,尤其适用于Mac操作系统。它提供了一种直观且可视化的界面,让用户能够轻松地连接到Kafka服务并进行各种操作,包括但不限于管理Topic、监控集群状态以及进行...

    Kafka Tool Mac版本,适用于kafka0.11及以上

    **Kafka Tool for Mac: 管理与使用Apache Kafka集群的理想选择** Kafka Tool是一款专为Apache Kafka设计的强大管理工具,尤其适用于Mac用户。它提供了直观的图形用户界面(GUI),使得对Kafka集群的操作变得简单易...

Global site tag (gtag.js) - Google Analytics