`
517573897
  • 浏览: 9528 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

kafka之生产、消费关系

阅读更多

直入主题:Kafka是一个消息系统,通过消费端订阅生产端,从而消费所需的数据。

问题的产生原因是生成端发送大量数据,但是海量的数据只对应一个topic,且对这个topic开辟多个分区并未成功发送数据,因此自己测试了生成端发送数据至一个topic,十个分区。生产方发送数据至一个toppic,十个分区中,消费端采用十个线程采集这一个topic与十个分区的数据(建议数据量大的数据可以采用创建多个topic并每个topic对应多个分区,这个可以大大提高采集数据的效率)。结果代码如下,可直接粘贴运行:

 

生成端模拟发送数据至一个topic并创建十个分区代码(在创建一个topic多个分区的方法之下有创建一个topic一个分区的默认的方法,这里使用的上面那个方法,一个topic对应十个分区且每个分区中发送6条数据):

 

public KafkaProducer() {
Properties props = new Properties();
props.put("zookeeper.connect", "xxxx:2181,xxxx:2181,xxxx:2181");
//	props.put("zookeeper.connect", "localhost:2181");
// 指定序列化处理类,默认为kafka.serializer.DefaultEncoder,即byte[]
props.put("serializer.class", "kafka.serializer.StringEncoder");
// 同步还是异步,默认2表同步,1表异步。异步可以提高发送吞吐量,但是也可能导致丢失未发送过去的消息
props.put("producer.type", "sync");
// 是否压缩,默认0表示不压缩,1表示用gzip压缩,2表示用snappy压缩。压缩后消息中会有头来指明消息压缩类型,故在消费者端消息解压是透明的无需指定。
props.put("compression.codec", "1");
// 指定kafka节点列表,用于获取metadata(元数据),不必全部指定
props.put("metadata.broker.list", "lognn1te:6667,lognn2te:6667,logrmte:6667");

config = new ProducerConfig(props);
}
@Override
public void run() {
producer = new Producer<String, String>(config);
for(int i = 1; i <= 5; i++){ //5个分区 
	List<KeyedMessage<String, String>> messageList = new ArrayList<KeyedMessage<String, String>>(); 
	for(int j = 0; j < 6; j++){ //每个分区6条讯息
//针对topic创建相应分区数并发送数据
		messageList.add(new KeyedMessage<String, String>("wujun", "我是分区名称partition[" + i + "]", "我是发送的内容message[The " + i + " message]"));
		producer.send(messageList); 
	} 
} 
//针对topic创建一个分区并发送数据
//List<KeyedMessage<String, String>> messageList = new ArrayList<KeyedMessage<String, String>>();
//  for(int i = 1; i <= 10; i++){
//	messageList.add(new KeyedMessage<String, String>("wj",  "我是发送的内容message"+i));
//}
//producer.send(messageList); 
//	producer.close(); 
//	}
}
public static void main(String[] args) {
		Thread t = new Thread(new KafkaProducer());
		t.start();
	}
}

 以上是生成端的代码,发送10条数据至一个topic(wj)十个分区中

 

消费端代码:

 

public class testKafka implements Runnable {
public void run() {
        //所要开辟的线程数量
	final int a_numThreads = 5;
	//对应的kafka采集地址(本地测试的可以写为localhost:2181),对个地址用逗号隔开
	String zk = "lognn1te:2181,lognn2te:2181,logrmte:2181";
	//topic,与发送端生成的topic名称一致
	String topic = "wj";
	//groupid,生成端从新生成一个topic,消费端消费时最好变动groupid
	String groupId = "test";
	Properties props = new Properties();
	props.put("zookeeper.connect", zk);
	props.put("zookeeper.connectiontimeout.ms", "30000");
	props.put("group.id", groupId);
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(a_numThreads));
    ConsumerConfig consumerConfig = new ConsumerConfig(props);
	ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    //所要开辟的线程数量
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
    ExecutorService executor = Executors.newFixedThreadPool(a_numThreads);
    int threadNumber = 0;
    for (final KafkaStream stream : streams) {
    	//使用相应数量的线程数量采集数据
        executor.submit(new KafkaConsumerThread(stream, threadNumber));
        //查看消费对应的线程
        threadNumber++;
    }
}
	public static void main(String[] args) {
		Thread t = new Thread(new testKafka());
		t.start();
	}
}

 

 

 

public class KafkaConsumerThread implements Runnable {
	 private KafkaStream m_stream;
	    private int m_threadNumber;
	    public KafkaConsumerThread(KafkaStream a_stream, int a_threadNumber) {
	        m_threadNumber = a_threadNumber;
	        m_stream = a_stream;
	    }
	    public void run() {
	        ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
	        while (it.hasNext()){
//	        System.out.println(Thread.currentThread().getName() + ":" +"partition:["+ mam.partition() +"]"+ "," + new String(mam.message()));
	         System.out.println("使用线程:" + m_threadNumber + "   发送的内容:" + new String(it.next().message()));
//	        System.out.println("Shutting down Thread: " + m_threadNumber);
	        }
	    }
}

  以上两个类是消费端的代码,消费端使用了5个线程去采集该topic十个分区中的数据,以下是测试的结果:

 

打印结果:

使用线程:3   发送的内容:我是发送的内容message[The 0 message]
使用线程:3   发送的内容:我是发送的内容message[The 0 message]
使用线程:3   发送的内容:我是发送的内容message[The 1 message]
使用线程:3   发送的内容:我是发送的内容message[The 0 message]
使用线程:3   发送的内容:我是发送的内容message[The 1 message]
使用线程:3   发送的内容:我是发送的内容message[The 2 message]
使用线程:3   发送的内容:我是发送的内容message[The 0 message]
使用线程:3   发送的内容:我是发送的内容message[The 1 message]
使用线程:3   发送的内容:我是发送的内容message[The 2 message]
使用线程:3   发送的内容:我是发送的内容message[The 3 message]
使用线程:3   发送的内容:我是发送的内容message[The 0 message]
使用线程:3   发送的内容:我是发送的内容message[The 1 message]
使用线程:3   发送的内容:我是发送的内容message[The 2 message]
使用线程:3   发送的内容:我是发送的内容message[The 3 message]
使用线程:3   发送的内容:我是发送的内容message[The 4 message]
使用线程:4   发送的内容:我是发送的内容message[The 0 message]
使用线程:3   发送的内容:我是发送的内容message[The 0 message]
使用线程:3   发送的内容:我是发送的内容message[The 0 message]
使用线程:3   发送的内容:我是发送的内容message[The 1 message]
使用线程:3   发送的内容:我是发送的内容message[The 0 message]
使用线程:3   发送的内容:我是发送的内容message[The 1 message]
使用线程:3   发送的内容:我是发送的内容message[The 2 message]
使用线程:3   发送的内容:我是发送的内容message[The 0 message]
使用线程:3   发送的内容:我是发送的内容message[The 1 message]
使用线程:3   发送的内容:我是发送的内容message[The 2 message]
使用线程:3   发送的内容:我是发送的内容message[The 3 message]
使用线程:3   发送的内容:我是发送的内容message[The 0 message]
使用线程:3   发送的内容:我是发送的内容message[The 1 message]
使用线程:3   发送的内容:我是发送的内容message[The 2 message]
使用线程:3   发送的内容:我是发送的内容message[The 3 message]
使用线程:3   发送的内容:我是发送的内容message[The 4 message]
使用线程:4   发送的内容:我是发送的内容message[The 0 message]
使用线程:4   发送的内容:我是发送的内容message[The 1 message]
使用线程:4   发送的内容:我是发送的内容message[The 0 message]
使用线程:4   发送的内容:我是发送的内容message[The 1 message]
使用线程:4   发送的内容:我是发送的内容message[The 2 message]
使用线程:4   发送的内容:我是发送的内容message[The 0 message]
使用线程:4   发送的内容:我是发送的内容message[The 1 message]
使用线程:4   发送的内容:我是发送的内容message[The 2 message]
使用线程:4   发送的内容:我是发送的内容message[The 3 message]
使用线程:4   发送的内容:我是发送的内容message[The 0 message]
使用线程:4   发送的内容:我是发送的内容message[The 1 message]
使用线程:4   发送的内容:我是发送的内容message[The 2 message]
使用线程:4   发送的内容:我是发送的内容message[The 3 message]
使用线程:4   发送的内容:我是发送的内容message[The 4 message]
使用线程:4   发送的内容:我是发送的内容message[The 0 message]
使用线程:1   发送的内容:我是发送的内容message[The 0 message]
使用线程:1   发送的内容:我是发送的内容message[The 0 message]
使用线程:1   发送的内容:我是发送的内容message[The 1 message]
使用线程:1   发送的内容:我是发送的内容message[The 0 message]
使用线程:1   发送的内容:我是发送的内容message[The 1 message]
使用线程:1   发送的内容:我是发送的内容message[The 2 message]
使用线程:4   发送的内容:我是发送的内容message[The 0 message]
使用线程:4   发送的内容:我是发送的内容message[The 1 message]
使用线程:4   发送的内容:我是发送的内容message[The 0 message]
使用线程:4   发送的内容:我是发送的内容message[The 1 message]
使用线程:4   发送的内容:我是发送的内容message[The 2 message]
使用线程:4   发送的内容:我是发送的内容message[The 0 message]
使用线程:4   发送的内容:我是发送的内容message[The 1 message]
使用线程:4   发送的内容:我是发送的内容message[The 2 message]
使用线程:4   发送的内容:我是发送的内容message[The 3 message]
使用线程:4   发送的内容:我是发送的内容message[The 0 message]
使用线程:4   发送的内容:我是发送的内容message[The 1 message]
使用线程:4   发送的内容:我是发送的内容message[The 2 message]
使用线程:4   发送的内容:我是发送的内容message[The 3 message]
使用线程:4   发送的内容:我是发送的内容message[The 4 message]
使用线程:1   发送的内容:我是发送的内容message[The 0 message]
使用线程:1   发送的内容:我是发送的内容message[The 0 message]
使用线程:1   发送的内容:我是发送的内容message[The 1 message]
使用线程:1   发送的内容:我是发送的内容message[The 2 message]
使用线程:1   发送的内容:我是发送的内容message[The 3 message]
使用线程:1   发送的内容:我是发送的内容message[The 0 message]
使用线程:1   发送的内容:我是发送的内容message[The 1 message]
使用线程:1   发送的内容:我是发送的内容message[The 2 message]
使用线程:1   发送的内容:我是发送的内容message[The 3 message]
使用线程:1   发送的内容:我是发送的内容message[The 4 message]
使用线程:0   发送的内容:我是发送的内容message[The 0 message]
使用线程:2   发送的内容:我是发送的内容message[The 0 message]
使用线程:2   发送的内容:我是发送的内容message[The 0 message]
使用线程:2   发送的内容:我是发送的内容message[The 1 message]
使用线程:2   发送的内容:我是发送的内容message[The 0 message]
使用线程:2   发送的内容:我是发送的内容message[The 1 message]
使用线程:2   发送的内容:我是发送的内容message[The 2 message]
使用线程:2   发送的内容:我是发送的内容message[The 0 message]
使用线程:2   发送的内容:我是发送的内容message[The 1 message]
使用线程:2   发送的内容:我是发送的内容message[The 2 message]
使用线程:2   发送的内容:我是发送的内容message[The 3 message]
使用线程:2   发送的内容:我是发送的内容message[The 0 message]
使用线程:2   发送的内容:我是发送的内容message[The 1 message]
使用线程:2   发送的内容:我是发送的内容message[The 2 message]
使用线程:2   发送的内容:我是发送的内容message[The 3 message]
使用线程:2   发送的内容:我是发送的内容message[The 4 message]
使用线程:1   发送的内容:我是发送的内容message[The 0 message]
使用线程:1   发送的内容:我是发送的内容message[The 1 message]
使用线程:1   发送的内容:我是发送的内容message[The 0 message]
使用线程:1   发送的内容:我是发送的内容message[The 1 message]
使用线程:1   发送的内容:我是发送的内容message[The 2 message]
使用线程:1   发送的内容:我是发送的内容message[The 0 message]
使用线程:1   发送的内容:我是发送的内容message[The 1 message]
使用线程:1   发送的内容:我是发送的内容message[The 2 message]
使用线程:1   发送的内容:我是发送的内容message[The 3 message]
使用线程:1   发送的内容:我是发送的内容message[The 0 message]
使用线程:1   发送的内容:我是发送的内容message[The 1 message]
使用线程:1   发送的内容:我是发送的内容message[The 2 message]
使用线程:1   发送的内容:我是发送的内容message[The 3 message]
使用线程:1   发送的内容:我是发送的内容message[The 4 message]
使用线程:0   发送的内容:我是发送的内容message[The 0 message]
使用线程:0   发送的内容:我是发送的内容message[The 0 message]
使用线程:0   发送的内容:我是发送的内容message[The 1 message]
使用线程:0   发送的内容:我是发送的内容message[The 0 message]
使用线程:0   发送的内容:我是发送的内容message[The 1 message]
使用线程:0   发送的内容:我是发送的内容message[The 2 message]
使用线程:0   发送的内容:我是发送的内容message[The 0 message]
使用线程:0   发送的内容:我是发送的内容message[The 1 message]
使用线程:0   发送的内容:我是发送的内容message[The 2 message]
使用线程:0   发送的内容:我是发送的内容message[The 3 message]
使用线程:0   发送的内容:我是发送的内容message[The 0 message]
使用线程:0   发送的内容:我是发送的内容message[The 1 message]
使用线程:0   发送的内容:我是发送的内容message[The 2 message]
使用线程:0   发送的内容:我是发送的内容message[The 3 message]
使用线程:0   发送的内容:我是发送的内容message[The 4 message]
使用线程:0   发送的内容:我是发送的内容message[The 0 message]
使用线程:0   发送的内容:我是发送的内容message[The 1 message]
使用线程:0   发送的内容:我是发送的内容message[The 0 message]
使用线程:0   发送的内容:我是发送的内容message[The 1 message]
使用线程:0   发送的内容:我是发送的内容message[The 2 message]
使用线程:0   发送的内容:我是发送的内容message[The 0 message]
使用线程:0   发送的内容:我是发送的内容message[The 1 message]
使用线程:0   发送的内容:我是发送的内容message[The 2 message]
使用线程:0   发送的内容:我是发送的内容message[The 3 message]
使用线程:0   发送的内容:我是发送的内容message[The 0 message]
使用线程:0   发送的内容:我是发送的内容message[The 1 message]
使用线程:0   发送的内容:我是发送的内容message[The 2 message]
使用线程:0   发送的内容:我是发送的内容message[The 3 message]
使用线程:0   发送的内容:我是发送的内容message[The 4 message]
使用线程:0   发送的内容:我是发送的内容message[The 0 message]
使用线程:0   发送的内容:我是发送的内容message[The 0 message]
使用线程:0   发送的内容:我是发送的内容message[The 1 message]
使用线程:0   发送的内容:我是发送的内容message[The 0 message]
使用线程:0   发送的内容:我是发送的内容message[The 1 message]
使用线程:0   发送的内容:我是发送的内容message[The 2 message]
使用线程:0   发送的内容:我是发送的内容message[The 0 message]
使用线程:0   发送的内容:我是发送的内容message[The 1 message]
使用线程:0   发送的内容:我是发送的内容message[The 2 message]
使用线程:0   发送的内容:我是发送的内容message[The 3 message]
使用线程:0   发送的内容:我是发送的内容message[The 0 message]
使用线程:0   发送的内容:我是发送的内容message[The 1 message]
使用线程:0   发送的内容:我是发送的内容message[The 2 message]
使用线程:0   发送的内容:我是发送的内容message[The 3 message]
使用线程:0   发送的内容:我是发送的内容message[The 4 message]

 

从以上的打印结果可以清楚的说明结果:

1.当使用的分区数大于开辟的线程数,消费端消费数据时会有一个线程同时采集1个以上分区的数据(不会出现一个分区对应多个线程的情况,这样采集数据会重复混乱)当某个分区中的数据较少时,采集的线程快速的采完了该分区的数据,处于空闲的状态则有可能从新分给别的分区进行采集任务。

2.当使用的分区数等于或者小于线程数且每个分区数据量比较大时,这样就会一个线程对应采集一个分区中的数据,开辟多余的线程数处于闲置状态。

 

以上是消费端,生产端对应topic与分区数量所采用的线程大小采集问题总结。

 

 

分享到:
评论

相关推荐

    kettle kafka 消息生产插件

    标题中的“kettle kafka ...例如,他们可以从关系型数据库抽取数据,经过清洗和转换,然后通过Kafka插件实时地将处理结果发布到Kafka主题,供下游系统消费。这种方式提高了数据处理的效率,也增强了系统的可扩展性。

    kafka学习之路

    ### Kafka学习之路——详解Kafka原理与架构 #### 一、Kafka简介 Kafka是一款由LinkedIn开发并开源的消息队列系统,它主要用于处理实时数据流,并能够支持在线和离线的日志处理需求。Kafka的基本特性包括高吞吐量、...

    代码:kafka数据接入到mysql中

    Kafka通过生产者(Producer)将数据发布到主题(Topic),消费者(Consumer)可以从主题中订阅并消费数据。在Kafka集群中,数据被分片存储在多个分区(Partition)中,确保高可用性和可扩展性。 接下来,我们要介绍...

    Kafka官方中文文档.pdf

    4. 连接器API(Connectors API):允许构建并运行可重用的生产者或消费者,将Kafka topics连接到已有的应用程序或数据系统,如关系型数据库。 Kafka的配置部分涵盖了Broker配置、Topic配置、Producer配置、Consumer...

    kafka 实例

    7. **代码示例**: 代码示例通常会包含如何初始化Kafka生产者和消费者,设置配置参数,创建Topic,发送和接收消息等关键操作。这些示例可以帮助初学者快速理解Kafka的工作原理,并在实际项目中应用。 8. **问题与...

    Kafka简介及使用PHP处理Kafka消息

    使用 PHP 生产、消费 Kafka 消息的例子: 1. 启动 zookeeper 和 kafka 2. 创建由 2 个 partition 组成的、名为 testtopic 的 topic 3. 使用 PHP 生产、消费 Kafka 消息 Kafka 是一种高吞吐的分布式消息系统,能够...

    kafka-manager管理工具

    - **问题排查**:通过Kafka-Manager监控消费滞后,发现并解决生产或消费的问题。 - **容量规划**:查看主题的分区和副本分布,为集群扩展提供数据支持。 - **日常运维**:定期检查集群健康状况,及时发现并处理...

    大数据采集技术-Kafka的消费模式.pptx

    生产者负责将消息发布到Kafka的主题中,而消费者则负责从主题中读取消息。如果生产者的写入速度超过消费者的读取速度,可能导致消息堆积。为了解决这个问题,Kafka引入了消费组的概念,通过多个消费者分担主题的消费...

    尚硅谷大数据技术之Kafka.pdf

    6. Kafka和Zookeeper的关系 - Kafka依赖于Zookeeper进行集群管理和元数据的维护。Zookeeper管理着Kafka集群的状态信息,如broker信息、主题信息、消费者组信息等。 7. Kafka Eagle - Kafka Eagle是Kafka的一个...

    kafka简介与kafka深入浅出两个资料.rar

    5. **消费者组(Consumer Group)**:消费者组是Kafka消费模型的关键,它允许多个消费者并行处理消息,提高处理效率。每个分区只能被消费者组中的一个消费者消费,确保无重复消费。 6. ** brokers**:Kafka集群由多...

    kafka思维导图及demo

    - **消费者(Consumer)**:消费者从Kafka主题中订阅和消费消息。消费者可以是单实例或消费者组的一部分,组内的消费者以并行方式消费消息。 - **消费者组(Consumer Group)**:消费者组是消费者实例的集合,每个...

    kafka学习笔记.doc

    4. Kafka与Zookeeper的关系Kafka使用Zookeeper作为分布式协调服务,管理集群元数据、消费者位移和群组协调。Zookeeper帮助Kafka维护集群状态,保证在节点故障时能够快速发现并转移领导者角色,同时管理消费者组成员...

    Kafka和Zookeeper集群搭建

    7. 配置生产者和消费者:编写像`RequestServiceServlet.java`这样的程序,作为Kafka的生产者或消费者,根据需要实现数据的发送和接收。 8. 测试集群:发布消息到Kafka集群,验证生产者、消费者以及整个集群的正确性...

    kafka2.4.0+zookeeper+kafka-connect集成环境包

    它可以方便地将数据引入Kafka(称为生产者)或者从Kafka导出到其他系统(称为消费者)。Kafka Connect支持各种数据源和数据接收器,如数据库、日志文件、HDFS等,使得数据集成变得更加简单和自动化。 在提供的...

    kafka集群搭建与使用

    * Producer:我们将发布(publish)消息到 Topic 的进程称之为生产者(producer)。 * Consumer:我们将订阅(subscribe)Topic 并且处理 Topic 中消息的进程称之为消费者(consumer)。 * Broker:Kafka 以集群的...

    kafka_web1.0.1.zip

    这款工具通常包含了配置、主题管理、消费者管理、生产者管理、集群监控等多种功能,旨在简化 Kafka 的日常运维工作。 Apache Kafka 是一个分布式流处理平台,被广泛应用于实时数据管道和消息系统。它的核心特性包括...

    kafka-manager-1.3.3.6.zip

    6. **Zookeeper 管理**:查看 Zookeeper 的节点和数据,对 Kafka 集群的依赖关系有更直观的理解。 二、实战技巧 1. **故障排查**:当发现 broker 或 topic 出现异常时,可以利用 Kafka Manager 查看具体错误日志,...

    Kafka源码解析及实战

    1. **Kafka架构**:理解Kafka的服务器节点(Brokers)、生产者(Producers)、消费者(Consumers)以及日志存储(Log)之间的交互机制,包括如何进行消息发布和订阅,以及副本复制策略。 2. **消息模型**:详述...

    apache-kafka-documentation.pdf

    Kafka具有高吞吐量、可扩展性和可靠性等特点,能够处理大量数据,并支持多个生产者和消费者。 入门简介(Introduction)介绍了Kafka的基本概念和术语,比如生产者(Producer)、消费者(Consumer)、主题(Topic)...

    Kafka工作原理详解

    5. **Producer**:消息生产者,负责向Kafka Broker发送消息。Producer可以选择将消息发送到特定的Topic和Partition。 6. **Consumer**:消息消费者,负责从Kafka Broker拉取消息。消费者通常归属于一个Consumer ...

Global site tag (gtag.js) - Google Analytics