package com.paile.kafka.service.impl; import java.util.Properties; import java.util.Random; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import org.apache.log4j.Logger; import com.paile.kafka.bean.MessageBean; import com.paile.kafka.service.IKafkaService; import com.paile.utils.others.Const; /*** * kafka消息服务类 * * @author libo * */ public class KafkaServiceImpl implements IKafkaService { private Logger logger = Logger.getLogger(KafkaServiceImpl.class); /*** * 发送一条消息 */ public void sendSinglePartitionMessage(String broke_list, String topic, Object message) throws Exception { Producer<Integer, Object> producer = null; try { Properties props = new Properties(); props.put("metadata.broker.list", broke_list); props.put("serializer.class", "com.paile.kafka.CustomEncoder"); props.put("key.serializer.class", "kafka.serializer.StringEncoder"); props.put("producer.type", "sync");//是否同步 sync:同步 async:异步 props.put("request.required.acks", "1"); producer = new Producer<Integer, Object>(new ProducerConfig(props)); KeyedMessage<Integer, Object> data = new KeyedMessage<Integer, Object>(topic, Const.defaultPartitionKey, message); producer.send(data); } catch (Exception e) { logger.error("发送消息到Kafka失败,", e); throw e; }finally{ if(producer!=null) producer.close(); } } /*** * 发送多分区消息 */ public void sendMutilPartitionMessage(String broke_list, String topic,MessageBean message) throws Exception { Producer<String, MessageBean> producer = null; try { Properties props = new Properties(); props.put("metadata.broker.list", broke_list); props.put("serializer.class", "com.paile.kafka.productor.CustomEncoder"); props.put("key.serializer.class", "kafka.serializer.StringEncoder"); props.put("producer.type", "async");//是否同步 sync:同步 async:异步 props.put("partitioner.class", "com.paile.kafka.productor.SimplePartitioner");//分区算法类 props.put("request.required.acks", "1"); producer = new Producer<String, MessageBean>(new ProducerConfig(props)); Random random = new Random(); int partitionKey = random.nextInt(255); KeyedMessage<String, MessageBean> data = new KeyedMessage<String, MessageBean>(topic, String.valueOf(partitionKey), message); producer.send(data); } catch (Throwable e) { logger.error("发送消息到Kafka失败,", e); System.out.println(e.getMessage()); throw new Exception(e.getMessage()); }finally{ if(producer!=null) producer.close(); } } /*** * 接收消息 */ @Override public void startConsumer(String zookeeperConnect, String groupId, String topic,int threads) throws Exception { GroupConsumerManager example = new GroupConsumerManager(zookeeperConnect, groupId, topic); try { example.run(threads); } catch (Exception e) { e.printStackTrace(); } try { Thread.sleep(10000); } catch (InterruptedException ie) { } example.shutdown(); } public static void main(String[] args){ KafkaServiceImpl service = new KafkaServiceImpl(); try { for(int i=0;i<10;i++){ MessageBean bean = new MessageBean(); bean.setId("00"+i); bean.setData("111111111111111"); bean.setImg(new byte[0]); service.sendMutilPartitionMessage("192.168.1.101:9092", "paile01",bean); } System.out.println(""); } catch (Exception e) { e.printStackTrace(); } } }
相关推荐
星环大数据平台中的Kafka消息发布与订阅是大数据处理的重要组成部分,尤其是在构建分布式消息队列时。Kafka是一个分布式的流处理平台,主要用于处理实时数据流。它具有高性能、可扩展和可靠性等特点。Kafka作为...
《C#实现Kafka消息发布与订阅:Kafka.Net实战》 Kafka是一种分布式流处理平台,由LinkedIn开发并开源,现在是Apache软件基金会的一部分。它被设计为高吞吐量、低延迟的消息系统,广泛应用于实时数据管道和流式应用...
通过Kafka消费者插件,Pentaho可以订阅Kafka主题,实时接收并处理发布在这些主题上的数据。 这个插件的使用通常包括以下步骤: 1. **配置Kafka连接**:在Kettle中,用户需要提供Kafka集群的地址(Brokers)、认证...
在描述中提到,“kettle kafka 消息者生产插件,用于集成到kettle,生产Kafka消息。亲测试可用。”这意味着这个插件已经过实际测试,可以在Kettle环境中正常工作,帮助用户创建并发送Kafka消息。在Kettle的工作流程...
**SpringBoot集成Kafka进行消息发布与订阅** 在现代微服务架构中,消息队列(Message Queue)扮演着至关重要的角色,它能够有效地解耦系统组件,提高系统的响应速度和可扩展性。Apache Kafka作为一款高吞吐量、低...
Kafka简介及使用PHP处理Kafka消息 Kafka 是一种高吞吐的分布式消息系统,能够替代传统的消息队列用于解耦合数据处理,缓存未处理消息等,同时具有更高的吞吐率,支持分区、多副本、冗余,因此被广泛用于大规模消息...
在本文中,我们将深入探讨如何将Spring Boot与Apache Kafka整合,以便实现发布/消费消息队列模式。Apache Kafka是一款高效、可扩展且分布式的消息中间件,而Spring Boot则是一个简化了Spring应用程序开发的框架。...
在"Kafka消息队列样例"中,生产者负责生成数据并将其发送到指定的主题。生产者可以决定如何分配消息到不同的分区,这可以通过自定义分区器实现。此外,Kafka生产者支持批量发送,以提高效率并降低网络I/O开销。 2. ...
**KAFKA分布式消息系统在Windows环境下的搭建与应用** KAFKA是一个高吞吐量的分布式消息系统,由LinkedIn开发并开源,现在是Apache软件基金会的顶级项目。它主要设计用于处理实时流数据,允许应用程序发布和订阅...
- **发送消息**:调用生产者的send方法将消息发布到指定主题。 - **创建消费者**:配置消费者参数,如group.id,订阅主题,并实现MessageListener接口处理消息。 - **消费消息**:消费者通过poll方法轮询获取新...
#### 四、Kafka消息发布与消费 **1. 发布消息** Kafka提供了简单的命令行工具用于发送消息到指定的主题。 ```bash bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic_test ``` 运行此命令...
SpringBoot整合Kafka是现代微服务...通过上述步骤,我们可以构建出一个简单的Kafka消息发布和订阅系统。然而,在实际应用中,还需要考虑更多因素,如错误处理、幂等性、消息确认策略等,以确保系统的稳定性和可靠性。
Kafka是一个分布式发布-订阅消息系统。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。
Apache Kafka 是一个高性能、分布式的实时消息队列系统,它最初由 LinkedIn 开发,后来成为 Apache 软件基金会的顶级项目。Kafka 的设计目标是处理大规模的数据流,它提供了高吞吐量、低延迟的消息传递能力,适合...
根据提供的文件信息,本研究文件的标题为“基于Kafka消息队列的新一代分布式电量采集方法研究”,文件的描述为“#资源达人分享计划#”,而标签则包括“分布式”、“分布式系统”、“分布式开发”、“参考文献”和...
1. **数据摄入(Source Connector)**:它可以定期读取数据库表中的新记录,将其转换为Kafka消息发布到指定的主题。这使得实时数据流处理成为可能,比如监控数据库变更、实现数据湖的实时更新等。 2. **数据分发...
本文将详细介绍一款可视化Kafka测试工具,该工具能够简化Kafka消息的生产和消费过程,并提供直观的界面来帮助我们理解Kafka的工作原理。 **工具介绍** 这款可视化Kafka测试工具的主要功能是模拟发送Topic消息到...
Kafka 主要用于构建实时数据管道和流应用,能够高效地处理大量数据流,提供发布/订阅模式的消息传递。 4. **消息队列**:消息队列是分布式系统中的一个重要组件,用于解耦应用程序之间的通信。Kafka 作为一个消息...
Kafka 是一种高性能的分布式消息队列,最初由 LinkedIn 开发,现在已成为 Apache 软件基金会的顶级项目。Kafka 主要设计用于处理大规模的日志数据,它以高吞吐量、低延迟和容错性著称。LinkedIn 的日志数据主要包含...
### Kafka消息中间件面试专题知识点解析 #### 一、Kafka的设计原理与架构 Kafka是一种分布式的流处理平台,其核心设计围绕着**主题**(topic)展开。一个主题可以视为一条或多条消息的集合。Kafka将消息的发布者称为*...