1:
下载ApacheKafka,
安装:
tar -xvf kafka.tgz
2:
修改配置文件:
(1):
kafka/conf/zookeeper.properties
中
dataDir=/disks/sdb3/soft/Kafka/Data/ZooKeeper/2.9.2/dataDir
(2):
kafka/conf/server.properties
中
log.dirs=/disks/sdb3/soft/Kafka/Data/KafkaData/2.9.2/logs host.name=192.168.56.3
此处的host.name为本机IP(重要),如果不改,则客户端会抛出:Producer connection to localhost:9092 unsuccessful 错误!
3:
启动:
启动zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties
启动kafka:
bin/kafka-server-start.sh config/server.properties
4:
Java客户端:
package com.test.search.test.kafka; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import org.junit.Test; import com.google.common.collect.Maps; public class KafkaTest { private static final String KAFKA_BROKER_LIST = "192.168.56.3:9092"; private static final String KAFKA_ZOOKEEPER_LIST = "192.168.56.3:2181"; private static final String KAFKA_TOPIC = "CustomTopic"; class Producer extends Thread { private final kafka.javaapi.producer.Producer<Integer, String> producer; private final String topic; private final Properties props = new Properties(); public Producer(String topic) { props.put("serializer.class", "kafka.serializer.StringEncoder"); // props.put("metadata.broker.list", "localhost:9092"); props.put("metadata.broker.list", KAFKA_BROKER_LIST); props.put("request.required.acks", "1"); // Use random partitioner. Don't need the key type. Just set it to Integer. // The message is of type String. producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props)); this.topic = topic; } public void sendMsg(String msg) { producer.send(new KeyedMessage<Integer, String>(topic, msg)); System.out.println("已发送: " + msg); } public void run() { int messageNo = 1; while (true) { String messageStr = new String("Message_" + messageNo); producer.send(new KeyedMessage<Integer, String>(topic, messageStr)); messageNo++; } } } private static ConsumerConfig createConsumerConfig() { Properties props = new Properties(); props.put("zookeeper.connect", KAFKA_ZOOKEEPER_LIST); props.put("group.id", "0"); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); return new ConsumerConfig(props); } class Consumer extends Thread { private final ConsumerConnector consumer; private final String topic; public Consumer(String topic) { consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig()); this.topic = topic; } public void receiveMsg() { Map<String, Integer> topicCountMap = Maps.newHashMap(); topicCountMap.put(topic, new Integer(1)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0); ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext()) { System.out.println(new String(it.next().message())); } } public void run() { Map<String, Integer> topicCountMap = Maps.newHashMap(); topicCountMap.put(topic, new Integer(1)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0); ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext()) System.out.println(new String(it.next().message())); } } @Test public void testSendMsg() { Producer producer = new Producer(KAFKA_TOPIC); producer.sendMsg("{'cardid':21,'opt':'c'}"); } @Test public void testReceiveMsg() { Consumer consumer = new Consumer(KAFKA_TOPIC); consumer.receiveMsg(); } }
5:
参考:
http://kafka.apache.org/documentation.html
http://stackoverflow.com/questions/17808988/using-kafka-java-producer-send-a-message-producer-connection-to-localhost9092
相关推荐
这本书首先介绍Kafka的分布式消息传递模型,让读者对消息系统有一个初步的认识。随后,书中逐步讲解了Kafka的核心概念,例如主题(Topic)与分区(Partition),以及生产者(Producer)和消费者(Consumer)的API...
**Kafka安装与配置详解** Kafka是一款高性能的分布式消息...通过以上步骤,你已经成功安装并初步了解了Kafka的基本操作。随着对Kafka的深入理解和实践,你会发现它在大数据实时处理和消息传递方面的能力非常强大。
Kafka实战演练1将带领我们初步了解Kafka的核心概念和基本操作,让我们一起探索这个强大的工具。 首先,Kafka是由LinkedIn开发并贡献给Apache软件基金会的一个开源项目。它的主要设计目标是提供高吞吐量的实时发布...
在本文中,我们将深入探讨Haskell编程语言与Apache Kafka集成的初步知识,特别是关于Haskell的Kafka客户端的实现。标题“haskell的kafka客户端的(非常谦虚的)开端”暗示了这是一个关于初学者如何在Haskell环境中...
【Kafka的初步认识】 Kafka,源自LinkedIn,是一个分布式消息系统,如今已成为Apache软件基金会的顶级项目。它设计的核心目标是提供高吞吐量、低延迟的消息传递能力,适用于实时流式数据处理和大数据量的数据传输。...
该项目专注于研究Apache Kafka,一个高度可扩展的实时流处理平台。Kafka-Pipeline由一个Twitter生产者和一个ElasticSearch消费者组成,这为我们提供了了解Kafka核心概念的机会,包括其作为消息中间件的角色以及如何...
接着,Spark Streaming从Kafka消费数据,利用GBDT进行特征工程和初步预测,然后通过LR进行最终的排序预测;预测结果可暂存于Redis以供快速响应,长期数据则存储在HBase中。 通过以上讲解,我们可以看到,这个推荐...
在实际应用中,Apache Atlas 可以与数据湖治理工具如Hue、Kafka、Zookeeper等配合使用,构建强大的数据治理平台,以满足企业对大数据治理的需求。通过深入理解和熟练运用Apache Atlas,你可以提升数据管理的效率,...
以上是对"Kafka_processor"项目的初步分析,具体实现可能根据实际需求有所不同,但以上内容涵盖了在Java环境中开发Kafka数据处理应用的基本要素。通过深入研究提供的"Kafka_processor-main"源代码,可以进一步了解该...
5. **数据节点(Historical & Realtime)**:Historical 节点用于存储和查询 Segment,而 Realtime 节点负责实时摄入和初步处理数据,然后将其转换为 Segment 交给 Historical 节点。 6. **索引服务(Indexing ...
在去哪儿网的案例中,数据源首先通过Kafka进行初步的流处理,然后通过Flink进行复杂的实时计算,最终将结果实时写入Elasticsearch,由Kibana进行实时数据的可视化展示。这一完整的实时数据处理流程,保证了实时OLAP...
6. **集成**:Apache Arrow 可以与众多大数据生态系统中的工具和框架集成,例如 Apache Spark、Hadoop、Parquet、Kafka 等,以提高整体性能和互操作性。 7. **工具和应用**:社区还开发了各种工具,如用于数据转换...
最后,Apache Kafka是一个高吞吐量的分布式消息队列,常用于构建实时数据管道。Java开发者可以使用Kafka的Java客户端库来生产和消费消息,实现数据的异步处理。 总的来说,Java处理海量数据涉及内存管理、分片处理...
尽管这一时期的信息并不详尽,但可以推断这是Hadoop从构思到初步实现的阶段。 2. 原始灵感(2003-2004): Hadoop的诞生受到了Google发布的关于其大规模数据处理技术(如Google File System和MapReduce)的论文的...
Kafka是由LinkedIn开发并贡献给Apache的分布式流处理平台,它兼具消息队列和数据流处理的功能。在数据采集平台中,Kafka主要作为实时数据管道,能够高效地处理和存储大量的实时数据流。Kafka的高吞吐量和低延迟特性...
- **数据收集与消息队列**:首选方案是直接将结构化数据流式发布到Apache Kafka中,提高资源利用率。 #### 批量计算子系统 ##### 功能描述 该子系统主要任务是从数据收集子系统接收数据,执行批量ETL流程及数据...
这一层通常采用流式处理框架(如Apache Kafka)和批量处理工具(如Apache Spark)来实现。 #### 2. 数据存储层 经过集成层处理后的数据会被存储到相应的数据仓库或数据湖中。这里既包含了结构化数据(例如关系型...
- 实时流处理:对于实时视频流,可能利用如Apache Kafka或Spark Streaming等技术,实现对实时数据的快速处理和响应。 6. 毕业设计相关: - 系统设计:毕业设计通常涵盖需求分析、系统设计、实现和测试等阶段,该...
2. 实时流处理:结合Apache Kafka或Flink等实时数据处理工具,实现实时数据采集和分析,快速响应网络故障。 3. 可视化界面:通过图形化界面展示网络状态和故障信息,便于运维人员理解和操作。 4. 自动化响应:结合...
6. 实时数据分析:涉及大数据处理技术,如流式计算(Apache Kafka、Flink)和实时数据库(InfluxDB、Cassandra)。 7. 安全性与隐私保护:探讨数据加密、用户身份验证和权限管理,以保障系统的安全性和用户隐私。 ...