`
huangyongxing310
  • 浏览: 496487 次
  • 性别: Icon_minigender_1
  • 来自: 广州
文章分类
社区版块
存档分类
最新评论

kafka java原生简单应用

阅读更多
kafka java原生简单应用



KafkaTestMain.java
package com;

public class KafkaTestMain extends Thread{

    private static String topic = "test-xing";

    public static void main(String[] args) {
        new KafkaConsumer(topic).start();// 消费者
        new KafkaProducer(topic).start();// 生产者
    }

}



KafkaProducer.java
package com;

import java.util.Properties;
import java.util.concurrent.TimeUnit;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.StringEncoder;

public class KafkaProducer extends Thread {

	private static String topic;

	public KafkaProducer(String topic) {
		super();
		this.topic = topic;
	}

	@Override
	public void run() {
		Producer producer = createProducer();
		int i = 0;
		while (true) {
			producer.send(new KeyedMessage<String, String>(topic, "message: " + i++));
			System.out.println("已经发送一则消息");
			try {
				TimeUnit.SECONDS.sleep(1);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

	private Producer createProducer() {
		Properties properties = new Properties();
		properties.put("zookeeper.connect", "114.55.72.173:2181");// 声明zk IP

		properties.put("serializer.class", StringEncoder.class.getName());
		// 声明kafka broker IP
		properties.put("metadata.broker.list", "114.55.72.173:9092");
		return new Producer<String, String>(new ProducerConfig(properties));
	}
}


KafkaConsumer.java
package com;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class KafkaConsumer extends Thread {

	private String topic;
	private ConsumerConnector consumer;

	public KafkaConsumer(String topic) {
		super();
		this.topic = topic;
	}

	@Override
	public void run() {
		consumer = createConsumer();
		Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
		topicCountMap.put(topic, 1); // 一次从主题中获取一个数据
		Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);
		KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);// 获取每次接收到的这个数据
		ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
		while (iterator.hasNext()) {
			String message = new String(iterator.next().message());
			System.out.println("接收到: " + message);
		}
	}

	private ConsumerConnector createConsumer() {
		Properties properties = new Properties();
		properties.put("zookeeper.connect", "114.55.72.173:2181");// 声明zk
		// 必须要使用别的组名称,如果生产者和消费者都在同一组,则不能访问同一组内的topic数据
		properties.put("group.id", "group_xing");

		// ZooKeeper的最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不易过大
		// properties.put("zookeeper.session.timeout.ms", "4000");

		// zk follower落后于zk leader的最长时间
		// properties.put("zookeeper.sync.time.ms", "200");

		// 往zookeeper上写offset的频率
		properties.put("auto.commit.interval.ms", "1000");

		// 消费最老消息为smallest,最新为largest
		properties.put("auto.offset.reset", "smallest");

		// 序列化类
		// properties.put("serializer.class", "kafka.serializer.StringEncoder");

		return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
	}

}


依赖包(pom.xml)
<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka_2.10</artifactId>
	<version>0.8.2.0</version>
</dependency>

<dependency>
	<groupId>org.apache.zookeeper</groupId>
	<artifactId>zookeeper</artifactId>
	<version>3.4.3</version>
</dependency>



结果:
接收到: message: 0
接收到: message: 1
接收到: message: 2
已经发送一则消息
接收到: message: 3
已经发送一则消息
接收到: message: 4
已经发送一则消息
接收到: message: 5
已经发送一则消息
接收到: message: 6
已经发送一则消息
接收到: message: 7


参考原文(配置属性):http://blog.csdn.net/louisliaoxh/article/details/51516070
参考原文:http://chengjianxiaoxue.iteye.com/blog/2190488
分享到:
评论

相关推荐

    使用netty实现TCP长链接消息写入kafka以及kafka批量消费数据

    Netty是一个高性能、异步事件驱动的网络应用程序框架,常用于开发高并发、低延迟的网络应用,如TCP服务器。而Kafka是一款分布式流处理平台,它在大数据实时处理和消息传递中扮演着核心角色。 标题"使用netty实现TCP...

    Java_AutoMQ是Kafka的云原生分支,它将存储分离到S3和EBS.zip

    Java_AutoMQ是一个创新的项目,它是Apache Kafka的云原生分支,旨在提供更高效、更灵活的分布式消息传递解决方案。这个项目的核心理念是将存储层与计算层分离,允许用户将消息数据存储在Amazon S3和EBS(Elastic ...

    构建微服务云原生应用——介绍.pdf

    在当前的IT技术领域中,微服务架构和云原生应用已经成为构建和部署软件系统的主流范式。该文件标题为“构建微服务云原生应用——介绍.pdf”,顾名思义,该文档涉及的主题和知识点围绕微服务架构、云原生应用以及与之...

    Kafka-Windows-Service-master.zip

    将Kafka设置为Windows服务,我们可以利用Java的Windows服务包装器(如nssm,Non-Sucking Service Manager),它允许我们将Java应用作为Windows服务来启动。下载nssm后,指定Java的路径、Kafka的可执行jar文件以及...

    kafka-2.12-2.8.2

    3. 简单API:提供Java和Scala的API,方便开发者集成到各种应用程序中。 4. 分布式设计:Kafka集群支持跨多个服务器部署,能自动处理节点故障。 5. 时间窗口:支持基于时间的窗口操作,适用于实时数据处理。 二、...

    Kafka技术内幕:图文详解Kafka源码设计与实现 高清带书签

    1. **Kafka概述**:Kafka是一个高吞吐量、低延迟的分布式发布订阅消息系统,主要用于构建实时数据管道和流应用。它具有持久化、可扩展性和高可用性等特性。 2. **Kafka架构**:Kafka由Brokers、Producers、...

    最新版windows kafka_2.11-2.4.1.tgz

    - Windows并不是Kafka的原生支持平台,但通过JDK和一些调整,可以在Windows上运行Kafka。首先,确保安装了Java Development Kit (JDK) 8或以上版本。 - 解压`kafka_2.11-2.4.1.tgz`文件到指定目录,配置环境变量,...

    Java 抢单系统源码-派单系统平台 原生安卓苹果APP 带项目说明

    原生应用的优势在于性能优化和对各自平台特性的深度支持,如Android的Material Design和iOS的Human Interface Guidelines。 **Java核心技术:** 1. **Spring Boot框架**:作为后端开发的基础,Spring Boot简化了...

    一套云原生的Kafka管控平台,一站式Apache Kafka管控平台,脱胎于众多互联网内部多年的Kafka运营实践经验

    Know Streaming是一套云原生的Kafka管控平台,脱胎于众多互联网内部多年的Kafka运营实践经验,专注于Kafka运维管控、监控告警、资源治理、多活容灾等核心场景。在用户体验、监控、运维管控上进行了平台化、可视化、...

    kafka-streams-scala:Kafka Streams Java API周围的Thin Scala包装器

    这个库的主要目标是让 Scala 开发者能更自然地使用 Kafka Streams 功能,同时保持与原生 Java API 的紧密兼容性。 Kafka Streams 是 Apache Kafka 的一个模块,它允许开发者以流处理的方式对数据进行实时分析和处理...

    azkarra-streams::rocket:Azkarra是一个轻量级的Java框架,可简化基于Apache Kafka Streams的云原生流微服务的开发,部署和管理。

    Azkarra Streams是一个轻量级的Java框架,可轻松开发和操作Kafka Streams应用程序(Azkarra是巴斯克语,表示“快速” ) 是用于构建应用程序和微服务的客户端库,其中输入和输出数据存储在Kafka集群中。 它结合了在...

    Python-ApacheKafkaStreams的Python实现

    而当我们谈论“Python-Apache Kafka Streams”的实现时,这通常意味着开发者正在寻找或者已经实现了使用 Python 语言与 Kafka Streams API 进行交互的方法,尽管原生的 Kafka Streams 是用 Java 编写的。 Python ...

    Kafka-Grpc_kafka_assignment_源码.zip

    【标题】"Kafka-Grpc_kafka_assignment_源码" 涉及的主要知识点是Apache Kafka和gRPC的整合应用...这将有助于提升对分布式系统设计和实现的理解,对于从事大数据处理和云原生应用开发的人员来说,是非常有价值的实践。

    言讯IM即时聊天通讯软件双端原生源码后端JAVA

    首先,我们要理解“双端原生源码”意味着该软件提供了Android和iOS两端的应用程序源代码,这些源代码是使用各自平台的原生语言(Java和Swift/Objective-C)编写的。原生开发的优势在于性能更优,用户体验更佳,同时...

    销售系统项目,spring+spring mvc+mybatis+dubbo+kafka+redis+maven.zip

    2. **MyBatis**:MyBatis是一个持久层框架,它将SQL语句与Java代码分离,通过XML或注解方式配置和映射原生信息,使得开发者可以更好地控制SQL执行过程,提高了数据库操作的灵活性。 3. **Dubbo**:Dubbo是阿里巴巴...

    【派单系统平台】完整版java抢单系统源码+原生安卓苹果APP+带项目说明

    【派单系统平台】是一个基于Java技术开发的抢单系统,提供了完整的源代码,包括原生的Android和iOS应用程序,以及详细的项目说明文档。这个系统主要用于实现服务提供者与服务需求者之间的快速匹配,通常应用于物流、...

    kafak的三种java实现方式

    本篇文章将详细介绍在Java环境中,如何利用原生Java API、Spring框架以及Spring Cloud Stream来实现Kafka的集成与应用。 首先,我们来看**原生Java API**的实现方式。Apache Kafka提供了Java客户端库,允许开发者...

    即时通讯APP源码 IM原生APP高仿微信全源码无加密

    - **Android与iOS原生开发**:这款源码覆盖了Android和iOS两大主流平台,使用Java或Kotlin(Android)和Swift或Objective-C(iOS)进行原生开发,确保了良好的性能和用户体验。 - **UI设计**:高度模仿微信的界面...

    Java最著名的开源项目

    8. **Quarkus**:这是Red Hat开发的一个超快、全功能的Kubernetes原生Java框架,旨在为微服务和云原生环境提供极致的性能。 9. **Elasticsearch**:Elasticsearch是一个基于Lucene的搜索服务器,提供全文检索、分析...

Global site tag (gtag.js) - Google Analytics