使用原生态方式发送、接受消息
pom.xml
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.1.1</version>
</dependency>
发消息
@Test
public void TestProducer(){
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.1.245:9393,192.168.1.246:9393");
//“所有”设置将导致记录的完整提交阻塞,最慢的,但最持久的设置。
props.put("acks", "all");
//如果请求失败,生产者也会自动重试,即使设置成0 the producer can automatically retry.
props.put("retries", 0);
//The producer maintains buffers of unsent records for each partition.
props.put("batch.size", 16384);
//默认立即发送,这里这是延时毫秒数
props.put("linger.ms", 1);
//生产者缓冲大小,当缓冲区耗尽后,额外的发送调用将被阻塞。时间超过max.block.ms将抛出TimeoutException
props.put("buffer.memory", 33554432);
//The key.serializer and value.serializer instruct how to turn the key and value objects the user provides with their ProducerRecord into bytes.
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//创建kafka的生产者类
// close();//Close this producer.
// close(long timeout, TimeUnit timeUnit); //This method waits up to timeout for the producer to complete the sending of all incomplete requests.
// flush() ;所有缓存记录被立刻发送
Producer<String, String> producer = new KafkaProducer<String, String>(props);
for(int i = 0; i < 10; i++){
producer.send(new ProducerRecord<String, String>("d-topic", Integer.toString(i), Integer.toString(i)));
}
producer.flush();
producer.close();
}
收消息
@Test
public void TestConsumer() throws InterruptedException{
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.1.245:9393,192.168.1.246:9393");
props.put("group.id", "GroupA");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
//从poll(拉)的回话处理时长
props.put("session.timeout.ms", "30000");
//poll的数量限制
//props.put("max.poll.records", "100");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
//订阅主题列表topic
//consumer.subscribe(Arrays.asList("d-topic", "bar"));
consumer.subscribe(Arrays.asList("d-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records){
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
Thread.sleep(1000);
}
}
与spring集成
pom.xml
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>2.1.0.RELEASE</version>
</dependency>
producer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka
http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<!-- 生产者配置 -->
<bean id="template" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg index="0">
<bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="192.168.1.245:9393,192.168.1.246:9393"/>
<entry key="acks" value="all"/>
<entry key="retries" value="3"/>
<entry key="batch.size" value="16384"/>
<entry key="linger.ms" value="1"/>
<entry key="buffer.memory" value="33554432"/>
<entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer"></entry>
<entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"></entry>
</map>
</constructor-arg>
</bean>
</constructor-arg>
</bean>
<!-- 生产1号 -->
<int:channel id="inputToKafka">
<int:queue/>
</int:channel>
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
kafka-template="template"
auto-startup="true"
channel="inputToKafka"
topic="d-topic">
<int:poller fixed-delay="1000" time-unit="MILLISECONDS"/>
</int-kafka:outbound-channel-adapter>
</beans>
cunsumer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka
http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<!-- 消费配置 -->
<bean id="consumerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="192.168.1.245:9393,192.168.1.246:9393"/>
<entry key="group.id" value="GroupA"/>
<entry key="enable.auto.commit" value="true"/>
<entry key="auto.commit.interval.ms" value="1000"/>
<entry key="session.timeout.ms" value="15000"/>
<entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
<entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
</map>
</constructor-arg>
</bean>
<!-- 创建consumerFactory bean -->
<bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<ref bean="consumerProperties"/>
</constructor-arg>
</bean>
<!-- 消费1号 -->
<int:channel id="inputFromKafka">
<int:queue/>
</int:channel>
<int-kafka:message-driven-channel-adapter auto-startup="true" channel="inputFromKafka" listener-container="container1" />
<bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
<constructor-arg index="0" ref="consumerFactory"/>
<constructor-arg index="1" ref="containerProperties"/>
</bean>
<bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg value="d-topic"/>
</bean>
</beans>
发送、接口消息代码
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"classpath:applicationContext.xml"})
public class TestKafka {
@Autowired
@Qualifier("inputToKafka")
MessageChannel messageChannel;
@Autowired
@Qualifier("inputFromKafka")
PollableChannel pollableChannel;
@Test
public void TestSpringProducer(){
for (int i = 0; i < 15; i++) {
Message<String> message = new GenericMessage<String>("test_" + i);
boolean flag = messageChannel.send(message);
System.out.println(flag + "_" + i);
}
}
@Test
public void TestSpringConsumer(){
Message<?> received = pollableChannel.receive(1000);
while (received != null) {
System.out.println("message########" + received);
received = pollableChannel.receive(1000);
}
}
}
分享到:
相关推荐
从提供的文件名称"myMQ-demo-master"来看,这个项目很可能是采用了类似于Apache Kafka或RabbitMQ等真实消息中间件的架构设计,通过模拟实现消息队列的基本原理。这种模拟通常包括以下几个关键组件和概念: 1. 生产...
- **消息队列**:当消息量大时,可以引入消息队列(如RabbitMQ或Kafka),以异步处理消息,提高系统吞吐量。 7. **可扩展性**: - **分布式部署**:为了应对更多用户,系统可以设计成分布式,通过负载均衡将请求...
在IT行业中,网络编程是必不可少的一部分,特别是在C++这样的系统级编程语言中。本文将深入讲解如何在Linux环境下使用C++实现UDP(User Datagram Protocol)数据的发送与接收,包括单播和组播功能,并且支持指定网卡...
scratch少儿编程逻辑思维游戏源码-城堡战争.zip
内容概要:本文档汇集了来自字节跳动、腾讯、金山WPS、跟谁学和百度等大厂的Go工程师面试题,涵盖广泛的技术领域。主要包括Go语言特性(如goroutine调度、channel机制)、操作系统(进程间通信、线程调度)、计算机网络(TCP/IP协议栈、HTTP协议)、数据结构与算法(排序算法、LRU缓存)、数据库(MySQL索引优化、Redis内部机制)、分布式系统(负载均衡、服务发现)等方面的知识点。通过这些问题,不仅考察应聘者的理论基础,还测试其实际项目经验和技术深度。 适合人群:有一定Go语言编程经验和计算机基础知识的开发者,特别是准备应聘互联网大厂的中级及以上水平的后端工程师或全栈工程师。 使用场景及目标:①帮助求职者全面复习Go语言及其相关领域的核心概念;②为面试官提供有价值的参考题目,确保候选人具备解决复杂问题的能力;③指导工程师深入理解并掌握企业级应用开发所需的关键技能。 阅读建议:由于题目覆盖面广且难度较高,建议读者结合自身情况选择重点复习方向,同时配合实际编码练习加深理解。对于每个知识点,不仅要记住答案,更要理解背后的原理,这样才能在面试中灵活应对各种变体问题。
scratch少儿编程逻辑思维游戏源码-堡垒之夜(吃鸡游戏).zip
少儿编程scratch项目源代码文件案例素材-派.zip
scratch少儿编程逻辑思维游戏源码-Scratch 冒险.zip
2025 飞特舵机, Arduino版本
scratch少儿编程逻辑思维游戏源码-躲避.zip
内容概要:本文详细介绍了利用PFC5.0进行纤维混凝土三点弯曲模拟的方法。首先,作者展示了如何通过定义纤维的体积含量、长度、半径和刚度等关键参数来构建纤维网络。接着,描述了三点弯曲加载的具体实现方式,包括加载速率控制和终止条件设定。最后,提供了后处理方法,如绘制并导出力-位移曲线图,以便于分析材料破坏机制。文中还给出了若干实用建议,如纤维半径的选择范围、加载速率的初始值以及不同类型纤维的接触模型选择。 适合人群:从事材料科学尤其是混凝土材料研究的专业人士,以及对离散元法和数值模拟感兴趣的科研工作者。 使用场景及目标:适用于希望深入了解纤维混凝土力学性能的研究人员,旨在帮助他们掌握PFC5.0软件的操作技巧,优化模拟参数设置,提高实验效率。 其他说明:文中提供的代码片段可以直接应用于实际项目中,同时附带了一些实践经验分享,有助于初学者快速入门并避免常见错误。
少儿编程scratch项目源代码文件案例素材-生存V1(有BAG).zip
少儿编程scratch项目源代码文件案例素材-披萨机器人.zip
少儿编程scratch项目源代码文件案例素材-气球滑雪板.zip
少儿编程scratch项目源代码文件案例素材-使命召唤(苏联插旗).zip
1. GPIO模拟I2C 实战项目,根据正点原子 STM32F407ZGT6 进行更改; 2. 可适配STM32、GD32、HC32等MCU;
scratch少儿编程逻辑思维游戏源码-百米冲刺.zip
内容概要:本文档汇总了蓝桥杯历年试题及练习资源,涵盖编程类试题精选、硬件与单片机试题、练习资源与题库以及备考建议。编程类试题精选包括基础算法题(如数组求和、质因数分解)、经典算法案例(如最大子序列和、兰顿蚂蚁模拟)和数据结构应用(如字符全排列)。硬件与单片机试题主要涉及客观题考点,如BUCK电路和电源设计。练习资源与题库部分介绍了真题平台(如Dotcpp、CSDN专题)和专项训练包(如Python题库、Java百题集、C++真题解析)。备考建议分为分阶段练习(新手阶段、进阶提升)和模拟实战(如使用Dotcpp估分系统进行限时训练),强调按年份和组别分类练习,强化代码实现与调试能力。; 适合人群:准备参加蓝桥杯竞赛的学生及编程爱好者。; 使用场景及目标:①针对不同编程语言和难度级别的题目进行专项训练;②通过历年真题和模拟实战提高解题速度和准确性;③掌握算法设计、数据结构应用及硬件基础知识。; 阅读建议:此文档提供了丰富的试题和练习资源,建议根据自身水平选择合适的题目进行练习,并结合真题平台的估分系统和社区开源代码进行对比优化,逐步提升编程能力和竞赛水平。
内容概要:本文详细介绍了30kW储能PCS(电力转换系统)原理图的设计要点及其量产化过程中需要注意的技术细节。首先阐述了储能PCS的基本概念和重要性,接着深入探讨了主拓扑结构的选择,特别是双级式结构的优势以及关键组件如IGBT的驱动时序配置。随后讨论了控制算法的智能化改进,包括加入前馈补偿以提高系统的稳定性。此外,还强调了EMC设计、PCB布局、元件选择等方面的注意事项,并分享了一些实际生产中遇到的问题及解决方案。最后提到了自动化测试方法和散热管理策略,确保产品在各种环境下的可靠运行。 适合人群:从事储能系统设计、电力电子产品研发的工程师和技术人员。 使用场景及目标:帮助读者掌握30kW储能PCS从原理图设计到量产实施的全流程关键技术,提升产品的性能和可靠性,避免常见错误。 其他说明:文中提供了具体的代码片段和实践经验,有助于理解和应用相关理论。
少儿编程scratch项目源代码文件案例素材-喷气包多德.zip