`
littie1987
  • 浏览: 133870 次
  • 性别: Icon_minigender_1
  • 来自: 长沙
社区版块
存档分类
最新评论

Kafka消息发布

    博客分类:
  • Java
 
阅读更多
package com.paile.kafka.service.impl;

import java.util.Properties;
import java.util.Random;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import org.apache.log4j.Logger;

import com.paile.kafka.bean.MessageBean;
import com.paile.kafka.service.IKafkaService;
import com.paile.utils.others.Const;

/***
 * kafka消息服务类
 * 
 * @author libo
 * 
 */
public class KafkaServiceImpl implements IKafkaService {

	private Logger logger = Logger.getLogger(KafkaServiceImpl.class);
	/***
	 * 发送一条消息
	 */
	public void sendSinglePartitionMessage(String broke_list, String topic, Object message)
			throws Exception {
			Producer<Integer, Object> producer = null;
			try {
				Properties props = new Properties();
				props.put("metadata.broker.list", broke_list);
				props.put("serializer.class", "com.paile.kafka.CustomEncoder");
				props.put("key.serializer.class", "kafka.serializer.StringEncoder");
				props.put("producer.type", "sync");//是否同步 sync:同步   async:异步
				props.put("request.required.acks", "1");
				producer = new Producer<Integer, Object>(new ProducerConfig(props));
				
				KeyedMessage<Integer, Object> data = new KeyedMessage<Integer, Object>(topic, Const.defaultPartitionKey, message);  
				producer.send(data);
			} catch (Exception e) {
				logger.error("发送消息到Kafka失败,", e);
				throw e;
			}finally{
				if(producer!=null)
					producer.close();
			}
	}
	/***
	 * 发送多分区消息
	 */
	public void sendMutilPartitionMessage(String broke_list,
			String topic,MessageBean message)
			throws Exception {
			Producer<String, MessageBean> producer = null;
			try {
				Properties props = new Properties();
				props.put("metadata.broker.list", broke_list);
				props.put("serializer.class", "com.paile.kafka.productor.CustomEncoder");
				props.put("key.serializer.class", "kafka.serializer.StringEncoder");
				props.put("producer.type", "async");//是否同步 sync:同步   async:异步
				props.put("partitioner.class", "com.paile.kafka.productor.SimplePartitioner");//分区算法类
				props.put("request.required.acks", "1");
				producer = new Producer<String, MessageBean>(new ProducerConfig(props));
				
				Random random = new Random();
				int partitionKey = random.nextInt(255);
				KeyedMessage<String, MessageBean> data = new KeyedMessage<String, MessageBean>(topic, 
						String.valueOf(partitionKey), message);  
				producer.send(data);
			} catch (Throwable e) {
				logger.error("发送消息到Kafka失败,", e);
				System.out.println(e.getMessage());
				throw new Exception(e.getMessage());
			}finally{
				if(producer!=null)
					producer.close();
			}
	}
	/***
	 * 接收消息
	 */
	@Override
	public void startConsumer(String zookeeperConnect, String groupId, String topic,int threads)
			throws Exception {
        GroupConsumerManager example = new GroupConsumerManager(zookeeperConnect, groupId, topic);
        try {
			example.run(threads);
		} catch (Exception e) {
			e.printStackTrace();
		}
        try {
            Thread.sleep(10000);
        } catch (InterruptedException ie) {
 
        }
        example.shutdown();
	}
	
	public static void main(String[] args){
		KafkaServiceImpl service = new KafkaServiceImpl();
		try {
				for(int i=0;i<10;i++){
					MessageBean bean = new MessageBean();
					bean.setId("00"+i);
					bean.setData("111111111111111");
					bean.setImg(new byte[0]);
					service.sendMutilPartitionMessage("192.168.1.101:9092", "paile01",bean);
				}
				System.out.println("");
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

}

 

分享到:
评论

相关推荐

    星环大数据平台_Kafka消息发布与订阅.pdf

    星环大数据平台中的Kafka消息发布与订阅是大数据处理的重要组成部分,尤其是在构建分布式消息队列时。Kafka是一个分布式的流处理平台,主要用于处理实时数据流。它具有高性能、可扩展和可靠性等特点。Kafka作为...

    C#_Kafka_Demo.rar

    《C#实现Kafka消息发布与订阅:Kafka.Net实战》 Kafka是一种分布式流处理平台,由LinkedIn开发并开源,现在是Apache软件基金会的一部分。它被设计为高吞吐量、低延迟的消息系统,广泛应用于实时数据管道和流式应用...

    kettle kafka 消息者插件

    通过Kafka消费者插件,Pentaho可以订阅Kafka主题,实时接收并处理发布在这些主题上的数据。 这个插件的使用通常包括以下步骤: 1. **配置Kafka连接**:在Kettle中,用户需要提供Kafka集群的地址(Brokers)、认证...

    kettle kafka 消息生产插件

    在描述中提到,“kettle kafka 消息者生产插件,用于集成到kettle,生产Kafka消息。亲测试可用。”这意味着这个插件已经过实际测试,可以在Kettle环境中正常工作,帮助用户创建并发送Kafka消息。在Kettle的工作流程...

    springboot集成kafka进行消息发布和订阅jar

    **SpringBoot集成Kafka进行消息发布与订阅** 在现代微服务架构中,消息队列(Message Queue)扮演着至关重要的角色,它能够有效地解耦系统组件,提高系统的响应速度和可扩展性。Apache Kafka作为一款高吞吐量、低...

    Kafka简介及使用PHP处理Kafka消息

    Kafka简介及使用PHP处理Kafka消息 Kafka 是一种高吞吐的分布式消息系统,能够替代传统的消息队列用于解耦合数据处理,缓存未处理消息等,同时具有更高的吞吐率,支持分区、多副本、冗余,因此被广泛用于大规模消息...

    springboot整合kafka的发布/消费demo项目源码

    在本文中,我们将深入探讨如何将Spring Boot与Apache Kafka整合,以便实现发布/消费消息队列模式。Apache Kafka是一款高效、可扩展且分布式的消息中间件,而Spring Boot则是一个简化了Spring应用程序开发的框架。...

    kafka消息队列样例

    在"Kafka消息队列样例"中,生产者负责生成数据并将其发送到指定的主题。生产者可以决定如何分配消息到不同的分区,这可以通过自定义分区器实现。此外,Kafka生产者支持批量发送,以提高效率并降低网络I/O开销。 2. ...

    KAFKA分布式消息系统(window)

    **KAFKA分布式消息系统在Windows环境下的搭建与应用** KAFKA是一个高吞吐量的分布式消息系统,由LinkedIn开发并开源,现在是Apache软件基金会的顶级项目。它主要设计用于处理实时流数据,允许应用程序发布和订阅...

    Kafka 消息队列(高清版)深入理解Kafka:核心设计与实践原理.zip

    - **发送消息**:调用生产者的send方法将消息发布到指定主题。 - **创建消费者**:配置消费者参数,如group.id,订阅主题,并实现MessageListener接口处理消息。 - **消费消息**:消费者通过poll方法轮询获取新...

    linux中如何操作kafka

    #### 四、Kafka消息发布与消费 **1. 发布消息** Kafka提供了简单的命令行工具用于发送消息到指定的主题。 ```bash bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic_test ``` 运行此命令...

    springboot整合kafka相关讲解

    SpringBoot整合Kafka是现代微服务...通过上述步骤,我们可以构建出一个简单的Kafka消息发布和订阅系统。然而,在实际应用中,还需要考虑更多因素,如错误处理、幂等性、消息确认策略等,以确保系统的稳定性和可靠性。

    kafka订阅消息系统

    Kafka是一个分布式发布-订阅消息系统。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。

    最新ELK集群 => KafKa消息队列.pdf

    Apache Kafka 是一个高性能、分布式的实时消息队列系统,它最初由 LinkedIn 开发,后来成为 Apache 软件基金会的顶级项目。Kafka 的设计目标是处理大规模的数据流,它提供了高吞吐量、低延迟的消息传递能力,适合...

    基于Kafka消息队列的新一代分布式电量采集方法研究.pdf

    根据提供的文件信息,本研究文件的标题为“基于Kafka消息队列的新一代分布式电量采集方法研究”,文件的描述为“#资源达人分享计划#”,而标签则包括“分布式”、“分布式系统”、“分布式开发”、“参考文献”和...

    kafka-connect-jdbc-4.1.1.zip

    1. **数据摄入(Source Connector)**:它可以定期读取数据库表中的新记录,将其转换为Kafka消息发布到指定的主题。这使得实时数据流处理成为可能,比如监控数据库变更、实现数据湖的实时更新等。 2. **数据分发...

    可视化kafka测试工具

    本文将详细介绍一款可视化Kafka测试工具,该工具能够简化Kafka消息的生产和消费过程,并提供直观的界面来帮助我们理解Kafka的工作原理。 **工具介绍** 这款可视化Kafka测试工具的主要功能是模拟发送Topic消息到...

    .NET CORE 代码使用kafka推送数据

    Kafka 主要用于构建实时数据管道和流应用,能够高效地处理大量数据流,提供发布/订阅模式的消息传递。 4. **消息队列**:消息队列是分布式系统中的一个重要组件,用于解耦应用程序之间的通信。Kafka 作为一个消息...

    Kafka分布式消息系统

    Kafka 是一种高性能的分布式消息队列,最初由 LinkedIn 开发,现在已成为 Apache 软件基金会的顶级项目。Kafka 主要设计用于处理大规模的日志数据,它以高吞吐量、低延迟和容错性著称。LinkedIn 的日志数据主要包含...

    Kafka消息中间件面试专题Kafka消息中间件面试专题

    ### Kafka消息中间件面试专题知识点解析 #### 一、Kafka的设计原理与架构 Kafka是一种分布式的流处理平台,其核心设计围绕着**主题**(topic)展开。一个主题可以视为一条或多条消息的集合。Kafka将消息的发布者称为*...

Global site tag (gtag.js) - Google Analytics