直入主题:Kafka是一个消息系统,通过消费端订阅生产端,从而消费所需的数据。
问题的产生原因是生成端发送大量数据,但是海量的数据只对应一个topic,且对这个topic开辟多个分区并未成功发送数据,因此自己测试了生成端发送数据至一个topic,十个分区。生产方发送数据至一个toppic,十个分区中,消费端采用十个线程采集这一个topic与十个分区的数据(建议数据量大的数据可以采用创建多个topic并每个topic对应多个分区,这个可以大大提高采集数据的效率)。结果代码如下,可直接粘贴运行:
生成端模拟发送数据至一个topic并创建十个分区代码(在创建一个topic多个分区的方法之下有创建一个topic一个分区的默认的方法,这里使用的上面那个方法,一个topic对应十个分区且每个分区中发送6条数据):
public KafkaProducer() { Properties props = new Properties(); props.put("zookeeper.connect", "xxxx:2181,xxxx:2181,xxxx:2181"); // props.put("zookeeper.connect", "localhost:2181"); // 指定序列化处理类,默认为kafka.serializer.DefaultEncoder,即byte[] props.put("serializer.class", "kafka.serializer.StringEncoder"); // 同步还是异步,默认2表同步,1表异步。异步可以提高发送吞吐量,但是也可能导致丢失未发送过去的消息 props.put("producer.type", "sync"); // 是否压缩,默认0表示不压缩,1表示用gzip压缩,2表示用snappy压缩。压缩后消息中会有头来指明消息压缩类型,故在消费者端消息解压是透明的无需指定。 props.put("compression.codec", "1"); // 指定kafka节点列表,用于获取metadata(元数据),不必全部指定 props.put("metadata.broker.list", "lognn1te:6667,lognn2te:6667,logrmte:6667"); config = new ProducerConfig(props); } @Override public void run() { producer = new Producer<String, String>(config); for(int i = 1; i <= 5; i++){ //5个分区 List<KeyedMessage<String, String>> messageList = new ArrayList<KeyedMessage<String, String>>(); for(int j = 0; j < 6; j++){ //每个分区6条讯息 //针对topic创建相应分区数并发送数据 messageList.add(new KeyedMessage<String, String>("wujun", "我是分区名称partition[" + i + "]", "我是发送的内容message[The " + i + " message]")); producer.send(messageList); } } //针对topic创建一个分区并发送数据 //List<KeyedMessage<String, String>> messageList = new ArrayList<KeyedMessage<String, String>>(); // for(int i = 1; i <= 10; i++){ // messageList.add(new KeyedMessage<String, String>("wj", "我是发送的内容message"+i)); //} //producer.send(messageList); // producer.close(); // } } public static void main(String[] args) { Thread t = new Thread(new KafkaProducer()); t.start(); } }
以上是生成端的代码,发送10条数据至一个topic(wj)十个分区中
消费端代码:
public class testKafka implements Runnable { public void run() { //所要开辟的线程数量 final int a_numThreads = 5; //对应的kafka采集地址(本地测试的可以写为localhost:2181),对个地址用逗号隔开 String zk = "lognn1te:2181,lognn2te:2181,logrmte:2181"; //topic,与发送端生成的topic名称一致 String topic = "wj"; //groupid,生成端从新生成一个topic,消费端消费时最好变动groupid String groupId = "test"; Properties props = new Properties(); props.put("zookeeper.connect", zk); props.put("zookeeper.connectiontimeout.ms", "30000"); props.put("group.id", groupId); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(a_numThreads)); ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); //所要开辟的线程数量 List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); ExecutorService executor = Executors.newFixedThreadPool(a_numThreads); int threadNumber = 0; for (final KafkaStream stream : streams) { //使用相应数量的线程数量采集数据 executor.submit(new KafkaConsumerThread(stream, threadNumber)); //查看消费对应的线程 threadNumber++; } } public static void main(String[] args) { Thread t = new Thread(new testKafka()); t.start(); } }
public class KafkaConsumerThread implements Runnable { private KafkaStream m_stream; private int m_threadNumber; public KafkaConsumerThread(KafkaStream a_stream, int a_threadNumber) { m_threadNumber = a_threadNumber; m_stream = a_stream; } public void run() { ConsumerIterator<byte[], byte[]> it = m_stream.iterator(); while (it.hasNext()){ // System.out.println(Thread.currentThread().getName() + ":" +"partition:["+ mam.partition() +"]"+ "," + new String(mam.message())); System.out.println("使用线程:" + m_threadNumber + " 发送的内容:" + new String(it.next().message())); // System.out.println("Shutting down Thread: " + m_threadNumber); } } }
以上两个类是消费端的代码,消费端使用了5个线程去采集该topic十个分区中的数据,以下是测试的结果:
打印结果:
使用线程:3 发送的内容:我是发送的内容message[The 0 message] 使用线程:3 发送的内容:我是发送的内容message[The 0 message] 使用线程:3 发送的内容:我是发送的内容message[The 1 message] 使用线程:3 发送的内容:我是发送的内容message[The 0 message] 使用线程:3 发送的内容:我是发送的内容message[The 1 message] 使用线程:3 发送的内容:我是发送的内容message[The 2 message] 使用线程:3 发送的内容:我是发送的内容message[The 0 message] 使用线程:3 发送的内容:我是发送的内容message[The 1 message] 使用线程:3 发送的内容:我是发送的内容message[The 2 message] 使用线程:3 发送的内容:我是发送的内容message[The 3 message] 使用线程:3 发送的内容:我是发送的内容message[The 0 message] 使用线程:3 发送的内容:我是发送的内容message[The 1 message] 使用线程:3 发送的内容:我是发送的内容message[The 2 message] 使用线程:3 发送的内容:我是发送的内容message[The 3 message] 使用线程:3 发送的内容:我是发送的内容message[The 4 message] 使用线程:4 发送的内容:我是发送的内容message[The 0 message] 使用线程:3 发送的内容:我是发送的内容message[The 0 message] 使用线程:3 发送的内容:我是发送的内容message[The 0 message] 使用线程:3 发送的内容:我是发送的内容message[The 1 message] 使用线程:3 发送的内容:我是发送的内容message[The 0 message] 使用线程:3 发送的内容:我是发送的内容message[The 1 message] 使用线程:3 发送的内容:我是发送的内容message[The 2 message] 使用线程:3 发送的内容:我是发送的内容message[The 0 message] 使用线程:3 发送的内容:我是发送的内容message[The 1 message] 使用线程:3 发送的内容:我是发送的内容message[The 2 message] 使用线程:3 发送的内容:我是发送的内容message[The 3 message] 使用线程:3 发送的内容:我是发送的内容message[The 0 message] 使用线程:3 发送的内容:我是发送的内容message[The 1 message] 使用线程:3 发送的内容:我是发送的内容message[The 2 message] 使用线程:3 发送的内容:我是发送的内容message[The 3 message] 使用线程:3 发送的内容:我是发送的内容message[The 4 message] 使用线程:4 发送的内容:我是发送的内容message[The 0 message] 使用线程:4 发送的内容:我是发送的内容message[The 1 message] 使用线程:4 发送的内容:我是发送的内容message[The 0 message] 使用线程:4 发送的内容:我是发送的内容message[The 1 message] 使用线程:4 发送的内容:我是发送的内容message[The 2 message] 使用线程:4 发送的内容:我是发送的内容message[The 0 message] 使用线程:4 发送的内容:我是发送的内容message[The 1 message] 使用线程:4 发送的内容:我是发送的内容message[The 2 message] 使用线程:4 发送的内容:我是发送的内容message[The 3 message] 使用线程:4 发送的内容:我是发送的内容message[The 0 message] 使用线程:4 发送的内容:我是发送的内容message[The 1 message] 使用线程:4 发送的内容:我是发送的内容message[The 2 message] 使用线程:4 发送的内容:我是发送的内容message[The 3 message] 使用线程:4 发送的内容:我是发送的内容message[The 4 message] 使用线程:4 发送的内容:我是发送的内容message[The 0 message] 使用线程:1 发送的内容:我是发送的内容message[The 0 message] 使用线程:1 发送的内容:我是发送的内容message[The 0 message] 使用线程:1 发送的内容:我是发送的内容message[The 1 message] 使用线程:1 发送的内容:我是发送的内容message[The 0 message] 使用线程:1 发送的内容:我是发送的内容message[The 1 message] 使用线程:1 发送的内容:我是发送的内容message[The 2 message] 使用线程:4 发送的内容:我是发送的内容message[The 0 message] 使用线程:4 发送的内容:我是发送的内容message[The 1 message] 使用线程:4 发送的内容:我是发送的内容message[The 0 message] 使用线程:4 发送的内容:我是发送的内容message[The 1 message] 使用线程:4 发送的内容:我是发送的内容message[The 2 message] 使用线程:4 发送的内容:我是发送的内容message[The 0 message] 使用线程:4 发送的内容:我是发送的内容message[The 1 message] 使用线程:4 发送的内容:我是发送的内容message[The 2 message] 使用线程:4 发送的内容:我是发送的内容message[The 3 message] 使用线程:4 发送的内容:我是发送的内容message[The 0 message] 使用线程:4 发送的内容:我是发送的内容message[The 1 message] 使用线程:4 发送的内容:我是发送的内容message[The 2 message] 使用线程:4 发送的内容:我是发送的内容message[The 3 message] 使用线程:4 发送的内容:我是发送的内容message[The 4 message] 使用线程:1 发送的内容:我是发送的内容message[The 0 message] 使用线程:1 发送的内容:我是发送的内容message[The 0 message] 使用线程:1 发送的内容:我是发送的内容message[The 1 message] 使用线程:1 发送的内容:我是发送的内容message[The 2 message] 使用线程:1 发送的内容:我是发送的内容message[The 3 message] 使用线程:1 发送的内容:我是发送的内容message[The 0 message] 使用线程:1 发送的内容:我是发送的内容message[The 1 message] 使用线程:1 发送的内容:我是发送的内容message[The 2 message] 使用线程:1 发送的内容:我是发送的内容message[The 3 message] 使用线程:1 发送的内容:我是发送的内容message[The 4 message] 使用线程:0 发送的内容:我是发送的内容message[The 0 message] 使用线程:2 发送的内容:我是发送的内容message[The 0 message] 使用线程:2 发送的内容:我是发送的内容message[The 0 message] 使用线程:2 发送的内容:我是发送的内容message[The 1 message] 使用线程:2 发送的内容:我是发送的内容message[The 0 message] 使用线程:2 发送的内容:我是发送的内容message[The 1 message] 使用线程:2 发送的内容:我是发送的内容message[The 2 message] 使用线程:2 发送的内容:我是发送的内容message[The 0 message] 使用线程:2 发送的内容:我是发送的内容message[The 1 message] 使用线程:2 发送的内容:我是发送的内容message[The 2 message] 使用线程:2 发送的内容:我是发送的内容message[The 3 message] 使用线程:2 发送的内容:我是发送的内容message[The 0 message] 使用线程:2 发送的内容:我是发送的内容message[The 1 message] 使用线程:2 发送的内容:我是发送的内容message[The 2 message] 使用线程:2 发送的内容:我是发送的内容message[The 3 message] 使用线程:2 发送的内容:我是发送的内容message[The 4 message] 使用线程:1 发送的内容:我是发送的内容message[The 0 message] 使用线程:1 发送的内容:我是发送的内容message[The 1 message] 使用线程:1 发送的内容:我是发送的内容message[The 0 message] 使用线程:1 发送的内容:我是发送的内容message[The 1 message] 使用线程:1 发送的内容:我是发送的内容message[The 2 message] 使用线程:1 发送的内容:我是发送的内容message[The 0 message] 使用线程:1 发送的内容:我是发送的内容message[The 1 message] 使用线程:1 发送的内容:我是发送的内容message[The 2 message] 使用线程:1 发送的内容:我是发送的内容message[The 3 message] 使用线程:1 发送的内容:我是发送的内容message[The 0 message] 使用线程:1 发送的内容:我是发送的内容message[The 1 message] 使用线程:1 发送的内容:我是发送的内容message[The 2 message] 使用线程:1 发送的内容:我是发送的内容message[The 3 message] 使用线程:1 发送的内容:我是发送的内容message[The 4 message] 使用线程:0 发送的内容:我是发送的内容message[The 0 message] 使用线程:0 发送的内容:我是发送的内容message[The 0 message] 使用线程:0 发送的内容:我是发送的内容message[The 1 message] 使用线程:0 发送的内容:我是发送的内容message[The 0 message] 使用线程:0 发送的内容:我是发送的内容message[The 1 message] 使用线程:0 发送的内容:我是发送的内容message[The 2 message] 使用线程:0 发送的内容:我是发送的内容message[The 0 message] 使用线程:0 发送的内容:我是发送的内容message[The 1 message] 使用线程:0 发送的内容:我是发送的内容message[The 2 message] 使用线程:0 发送的内容:我是发送的内容message[The 3 message] 使用线程:0 发送的内容:我是发送的内容message[The 0 message] 使用线程:0 发送的内容:我是发送的内容message[The 1 message] 使用线程:0 发送的内容:我是发送的内容message[The 2 message] 使用线程:0 发送的内容:我是发送的内容message[The 3 message] 使用线程:0 发送的内容:我是发送的内容message[The 4 message] 使用线程:0 发送的内容:我是发送的内容message[The 0 message] 使用线程:0 发送的内容:我是发送的内容message[The 1 message] 使用线程:0 发送的内容:我是发送的内容message[The 0 message] 使用线程:0 发送的内容:我是发送的内容message[The 1 message] 使用线程:0 发送的内容:我是发送的内容message[The 2 message] 使用线程:0 发送的内容:我是发送的内容message[The 0 message] 使用线程:0 发送的内容:我是发送的内容message[The 1 message] 使用线程:0 发送的内容:我是发送的内容message[The 2 message] 使用线程:0 发送的内容:我是发送的内容message[The 3 message] 使用线程:0 发送的内容:我是发送的内容message[The 0 message] 使用线程:0 发送的内容:我是发送的内容message[The 1 message] 使用线程:0 发送的内容:我是发送的内容message[The 2 message] 使用线程:0 发送的内容:我是发送的内容message[The 3 message] 使用线程:0 发送的内容:我是发送的内容message[The 4 message] 使用线程:0 发送的内容:我是发送的内容message[The 0 message] 使用线程:0 发送的内容:我是发送的内容message[The 0 message] 使用线程:0 发送的内容:我是发送的内容message[The 1 message] 使用线程:0 发送的内容:我是发送的内容message[The 0 message] 使用线程:0 发送的内容:我是发送的内容message[The 1 message] 使用线程:0 发送的内容:我是发送的内容message[The 2 message] 使用线程:0 发送的内容:我是发送的内容message[The 0 message] 使用线程:0 发送的内容:我是发送的内容message[The 1 message] 使用线程:0 发送的内容:我是发送的内容message[The 2 message] 使用线程:0 发送的内容:我是发送的内容message[The 3 message] 使用线程:0 发送的内容:我是发送的内容message[The 0 message] 使用线程:0 发送的内容:我是发送的内容message[The 1 message] 使用线程:0 发送的内容:我是发送的内容message[The 2 message] 使用线程:0 发送的内容:我是发送的内容message[The 3 message] 使用线程:0 发送的内容:我是发送的内容message[The 4 message]
从以上的打印结果可以清楚的说明结果:
1.当使用的分区数大于开辟的线程数,消费端消费数据时会有一个线程同时采集1个以上分区的数据(不会出现一个分区对应多个线程的情况,这样采集数据会重复混乱)当某个分区中的数据较少时,采集的线程快速的采完了该分区的数据,处于空闲的状态则有可能从新分给别的分区进行采集任务。
2.当使用的分区数等于或者小于线程数且每个分区数据量比较大时,这样就会一个线程对应采集一个分区中的数据,开辟多余的线程数处于闲置状态。
以上是消费端,生产端对应topic与分区数量所采用的线程大小采集问题总结。
相关推荐
标题中的“kettle kafka ...例如,他们可以从关系型数据库抽取数据,经过清洗和转换,然后通过Kafka插件实时地将处理结果发布到Kafka主题,供下游系统消费。这种方式提高了数据处理的效率,也增强了系统的可扩展性。
### Kafka学习之路——详解Kafka原理与架构 #### 一、Kafka简介 Kafka是一款由LinkedIn开发并开源的消息队列系统,它主要用于处理实时数据流,并能够支持在线和离线的日志处理需求。Kafka的基本特性包括高吞吐量、...
Kafka通过生产者(Producer)将数据发布到主题(Topic),消费者(Consumer)可以从主题中订阅并消费数据。在Kafka集群中,数据被分片存储在多个分区(Partition)中,确保高可用性和可扩展性。 接下来,我们要介绍...
4. 连接器API(Connectors API):允许构建并运行可重用的生产者或消费者,将Kafka topics连接到已有的应用程序或数据系统,如关系型数据库。 Kafka的配置部分涵盖了Broker配置、Topic配置、Producer配置、Consumer...
7. **代码示例**: 代码示例通常会包含如何初始化Kafka生产者和消费者,设置配置参数,创建Topic,发送和接收消息等关键操作。这些示例可以帮助初学者快速理解Kafka的工作原理,并在实际项目中应用。 8. **问题与...
使用 PHP 生产、消费 Kafka 消息的例子: 1. 启动 zookeeper 和 kafka 2. 创建由 2 个 partition 组成的、名为 testtopic 的 topic 3. 使用 PHP 生产、消费 Kafka 消息 Kafka 是一种高吞吐的分布式消息系统,能够...
- **问题排查**:通过Kafka-Manager监控消费滞后,发现并解决生产或消费的问题。 - **容量规划**:查看主题的分区和副本分布,为集群扩展提供数据支持。 - **日常运维**:定期检查集群健康状况,及时发现并处理...
生产者负责将消息发布到Kafka的主题中,而消费者则负责从主题中读取消息。如果生产者的写入速度超过消费者的读取速度,可能导致消息堆积。为了解决这个问题,Kafka引入了消费组的概念,通过多个消费者分担主题的消费...
6. Kafka和Zookeeper的关系 - Kafka依赖于Zookeeper进行集群管理和元数据的维护。Zookeeper管理着Kafka集群的状态信息,如broker信息、主题信息、消费者组信息等。 7. Kafka Eagle - Kafka Eagle是Kafka的一个...
5. **消费者组(Consumer Group)**:消费者组是Kafka消费模型的关键,它允许多个消费者并行处理消息,提高处理效率。每个分区只能被消费者组中的一个消费者消费,确保无重复消费。 6. ** brokers**:Kafka集群由多...
- **消费者(Consumer)**:消费者从Kafka主题中订阅和消费消息。消费者可以是单实例或消费者组的一部分,组内的消费者以并行方式消费消息。 - **消费者组(Consumer Group)**:消费者组是消费者实例的集合,每个...
4. Kafka与Zookeeper的关系Kafka使用Zookeeper作为分布式协调服务,管理集群元数据、消费者位移和群组协调。Zookeeper帮助Kafka维护集群状态,保证在节点故障时能够快速发现并转移领导者角色,同时管理消费者组成员...
7. 配置生产者和消费者:编写像`RequestServiceServlet.java`这样的程序,作为Kafka的生产者或消费者,根据需要实现数据的发送和接收。 8. 测试集群:发布消息到Kafka集群,验证生产者、消费者以及整个集群的正确性...
它可以方便地将数据引入Kafka(称为生产者)或者从Kafka导出到其他系统(称为消费者)。Kafka Connect支持各种数据源和数据接收器,如数据库、日志文件、HDFS等,使得数据集成变得更加简单和自动化。 在提供的...
* Producer:我们将发布(publish)消息到 Topic 的进程称之为生产者(producer)。 * Consumer:我们将订阅(subscribe)Topic 并且处理 Topic 中消息的进程称之为消费者(consumer)。 * Broker:Kafka 以集群的...
这款工具通常包含了配置、主题管理、消费者管理、生产者管理、集群监控等多种功能,旨在简化 Kafka 的日常运维工作。 Apache Kafka 是一个分布式流处理平台,被广泛应用于实时数据管道和消息系统。它的核心特性包括...
6. **Zookeeper 管理**:查看 Zookeeper 的节点和数据,对 Kafka 集群的依赖关系有更直观的理解。 二、实战技巧 1. **故障排查**:当发现 broker 或 topic 出现异常时,可以利用 Kafka Manager 查看具体错误日志,...
1. **Kafka架构**:理解Kafka的服务器节点(Brokers)、生产者(Producers)、消费者(Consumers)以及日志存储(Log)之间的交互机制,包括如何进行消息发布和订阅,以及副本复制策略。 2. **消息模型**:详述...
Kafka具有高吞吐量、可扩展性和可靠性等特点,能够处理大量数据,并支持多个生产者和消费者。 入门简介(Introduction)介绍了Kafka的基本概念和术语,比如生产者(Producer)、消费者(Consumer)、主题(Topic)...
5. **Producer**:消息生产者,负责向Kafka Broker发送消息。Producer可以选择将消息发送到特定的Topic和Partition。 6. **Consumer**:消息消费者,负责从Kafka Broker拉取消息。消费者通常归属于一个Consumer ...