`

Apache Kafka 初步

 
阅读更多

1:

 

下载ApacheKafka,

安装:

tar -xvf kafka.tgz

 

2:

修改配置文件:

 

(1):

kafka/conf/zookeeper.properties

dataDir=/disks/sdb3/soft/Kafka/Data/ZooKeeper/2.9.2/dataDir

 

(2):

kafka/conf/server.properties

log.dirs=/disks/sdb3/soft/Kafka/Data/KafkaData/2.9.2/logs
host.name=192.168.56.3

 

此处的host.name为本机IP(重要),如果不改,则客户端会抛出:Producer connection to localhost:9092 unsuccessful 错误!

 

3:

启动:

 

启动zookeeper:

bin/zookeeper-server-start.sh config/zookeeper.properties

 

启动kafka:

bin/kafka-server-start.sh config/server.properties

 

4:

Java客户端:

package com.test.search.test.kafka;

import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import org.junit.Test;

import com.google.common.collect.Maps;

public class KafkaTest {

	private static final String KAFKA_BROKER_LIST = "192.168.56.3:9092";
	private static final String KAFKA_ZOOKEEPER_LIST = "192.168.56.3:2181";
	
	private static final String KAFKA_TOPIC = "CustomTopic";

	class Producer extends Thread {
		private final kafka.javaapi.producer.Producer<Integer, String> producer;
		private final String topic;
		private final Properties props = new Properties();

		public Producer(String topic) {
			props.put("serializer.class", "kafka.serializer.StringEncoder");
			// props.put("metadata.broker.list", "localhost:9092");
			props.put("metadata.broker.list", KAFKA_BROKER_LIST);
			props.put("request.required.acks", "1");
			// Use random partitioner. Don't need the key type. Just set it to Integer.
			// The message is of type String.
			producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));
			this.topic = topic;
		}

		public void sendMsg(String msg) {
			producer.send(new KeyedMessage<Integer, String>(topic, msg));
			System.out.println("已发送: " + msg);
		}

		public void run() {
			int messageNo = 1;
			while (true) {
				String messageStr = new String("Message_" + messageNo);
				producer.send(new KeyedMessage<Integer, String>(topic, messageStr));
				messageNo++;
			}
		}
	}

	private static ConsumerConfig createConsumerConfig() {
		Properties props = new Properties();
		props.put("zookeeper.connect", KAFKA_ZOOKEEPER_LIST);
		props.put("group.id", "0");
		props.put("zookeeper.session.timeout.ms", "400");
		props.put("zookeeper.sync.time.ms", "200");
		props.put("auto.commit.interval.ms", "1000");

		return new ConsumerConfig(props);
	}

	class Consumer extends Thread {
		private final ConsumerConnector consumer;
		private final String topic;

		public Consumer(String topic) {
			consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
			this.topic = topic;
		}
		
		public void receiveMsg() {
			Map<String, Integer> topicCountMap = Maps.newHashMap();
			topicCountMap.put(topic, new Integer(1));
			Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
			KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
			ConsumerIterator<byte[], byte[]> it = stream.iterator();
			while (it.hasNext()) {
				System.out.println(new String(it.next().message()));
			}
		}

		public void run() {
			Map<String, Integer> topicCountMap = Maps.newHashMap();
			topicCountMap.put(topic, new Integer(1));
			Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
			KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
			ConsumerIterator<byte[], byte[]> it = stream.iterator();
			while (it.hasNext())
				System.out.println(new String(it.next().message()));
		}
	}

	@Test
	public void testSendMsg() {
		Producer producer = new Producer(KAFKA_TOPIC);
		producer.sendMsg("{'cardid':21,'opt':'c'}");
	}

	@Test
	public void testReceiveMsg() {
		Consumer consumer = new Consumer(KAFKA_TOPIC);
		consumer.receiveMsg();
	}

}

 

 5:

参考:

http://kafka.apache.org/documentation.html 

http://stackoverflow.com/questions/17808988/using-kafka-java-producer-send-a-message-producer-connection-to-localhost9092

分享到:
评论

相关推荐

    有关kafka的三本电子书,电子文档

    这本书首先介绍Kafka的分布式消息传递模型,让读者对消息系统有一个初步的认识。随后,书中逐步讲解了Kafka的核心概念,例如主题(Topic)与分区(Partition),以及生产者(Producer)和消费者(Consumer)的API...

    Kafka安装包、安装文档

    **Kafka安装与配置详解** Kafka是一款高性能的分布式消息...通过以上步骤,你已经成功安装并初步了解了Kafka的基本操作。随着对Kafka的深入理解和实践,你会发现它在大数据实时处理和消息传递方面的能力非常强大。

    Kafka 实战演练 1

    Kafka实战演练1将带领我们初步了解Kafka的核心概念和基本操作,让我们一起探索这个强大的工具。 首先,Kafka是由LinkedIn开发并贡献给Apache软件基金会的一个开源项目。它的主要设计目标是提供高吞吐量的实时发布...

    haskell的kafka客户端的(非常谦虚的)开端_Haskell_.zip

    在本文中,我们将深入探讨Haskell编程语言与Apache Kafka集成的初步知识,特别是关于Haskell的Kafka客户端的实现。标题“haskell的kafka客户端的(非常谦虚的)开端”暗示了这是一个关于初学者如何在Haskell环境中...

    Kafka的初步认识

    【Kafka的初步认识】 Kafka,源自LinkedIn,是一个分布式消息系统,如今已成为Apache软件基金会的顶级项目。它设计的核心目标是提供高吞吐量、低延迟的消息传递能力,适用于实时流式数据处理和大数据量的数据传输。...

    Kafka-Pipeline:该项目旨在探索Kafka。 我创建了一个Twitter生产者,一个ElasticSearch消费者,并在一定程度上探讨了Kafka Streams

    该项目专注于研究Apache Kafka,一个高度可扩展的实时流处理平台。Kafka-Pipeline由一个Twitter生产者和一个ElasticSearch消费者组成,这为我们提供了了解Kafka核心概念的机会,包括其作为消息中间件的角色以及如何...

    基于Spark streaming+Kafka+RedisHBase的GBDT+LR推荐排序模型.zip

    接着,Spark Streaming从Kafka消费数据,利用GBDT进行特征工程和初步预测,然后通过LR进行最终的排序预测;预测结果可暂存于Redis以供快速响应,长期数据则存储在HBase中。 通过以上讲解,我们可以看到,这个推荐...

    apache-atlas-2.0.0-bin.tar.gz

    在实际应用中,Apache Atlas 可以与数据湖治理工具如Hue、Kafka、Zookeeper等配合使用,构建强大的数据治理平台,以满足企业对大数据治理的需求。通过深入理解和熟练运用Apache Atlas,你可以提升数据管理的效率,...

    Kafka_processor

    以上是对"Kafka_processor"项目的初步分析,具体实现可能根据实际需求有所不同,但以上内容涵盖了在Java环境中开发Kafka数据处理应用的基本要素。通过深入研究提供的"Kafka_processor-main"源代码,可以进一步了解该...

    Druid源码(apache-druid-0.21.1-src.tar.gz)

    5. **数据节点(Historical & Realtime)**:Historical 节点用于存储和查询 Segment,而 Realtime 节点负责实时摄入和初步处理数据,然后将其转换为 Segment 交给 Historical 节点。 6. **索引服务(Indexing ...

    Apache Flink 和 Elasticsearch 助⼒实时 OLAP 平台.pdf

    在去哪儿网的案例中,数据源首先通过Kafka进行初步的流处理,然后通过Flink进行复杂的实时计算,最终将结果实时写入Elasticsearch,由Kibana进行实时数据的可视化展示。这一完整的实时数据处理流程,保证了实时OLAP...

    开源项目-apache-arrow.zip

    6. **集成**:Apache Arrow 可以与众多大数据生态系统中的工具和框架集成,例如 Apache Spark、Hadoop、Parquet、Kafka 等,以提高整体性能和互操作性。 7. **工具和应用**:社区还开发了各种工具,如用于数据转换...

    java处理海量数据的初步解决思路

    最后,Apache Kafka是一个高吞吐量的分布式消息队列,常用于构建实时数据管道。Java开发者可以使用Kafka的Java客户端库来生产和消费消息,实现数据的异步处理。 总的来说,Java处理海量数据涉及内存管理、分片处理...

    《Hadoop at 10-the History and Evolution of the Apache Hadoop Ecosystem》

    尽管这一时期的信息并不详尽,但可以推断这是Hadoop从构思到初步实现的阶段。 2. 原始灵感(2003-2004): Hadoop的诞生受到了Google发布的关于其大规模数据处理技术(如Google File System和MapReduce)的论文的...

    大数据-数据采集平台(一)

    Kafka是由LinkedIn开发并贡献给Apache的分布式流处理平台,它兼具消息队列和数据流处理的功能。在数据采集平台中,Kafka主要作为实时数据管道,能够高效地处理和存储大量的实时数据流。Kafka的高吞吐量和低延迟特性...

    数据处理基础设施概要方案

    - **数据收集与消息队列**:首选方案是直接将结构化数据流式发布到Apache Kafka中,提高资源利用率。 #### 批量计算子系统 ##### 功能描述 该子系统主要任务是从数据收集子系统接收数据,执行批量ETL流程及数据...

    【原文】阿里巴巴数据中台实践.pdf

    这一层通常采用流式处理框架(如Apache Kafka)和批量处理工具(如Apache Spark)来实现。 #### 2. 数据存储层 经过集成层处理后的数据会被存储到相应的数据仓库或数据湖中。这里既包含了结构化数据(例如关系型...

    基于MATLAB图像处理技术的蛋鸡采食行为研究源码数据库.zip

    - 实时流处理:对于实时视频流,可能利用如Apache Kafka或Spark Streaming等技术,实现对实时数据的快速处理和响应。 6. 毕业设计相关: - 系统设计:毕业设计通常涵盖需求分析、系统设计、实现和测试等阶段,该...

    网络游戏-基于大数据分析的传输网络故障定位分析方法及系统.zip

    2. 实时流处理:结合Apache Kafka或Flink等实时数据处理工具,实现实时数据采集和分析,快速响应网络故障。 3. 可视化界面:通过图形化界面展示网络状态和故障信息,便于运维人员理解和操作。 4. 自动化响应:结合...

    基于物联网的智能酒驾检测系统.zip

    6. 实时数据分析:涉及大数据处理技术,如流式计算(Apache Kafka、Flink)和实时数据库(InfluxDB、Cassandra)。 7. 安全性与隐私保护:探讨数据加密、用户身份验证和权限管理,以保障系统的安全性和用户隐私。 ...

Global site tag (gtag.js) - Google Analytics