1.1. 背景
系统的有些业务时需要定时发消息通知。但是这些消息又不是有规律可循的。比如,商品的优惠是限时的。在之前的实现是有一个排查任务每5分钟都去去商品表中查询哪些有做活动的商品,并比较是否过了限时折扣的时间。但是类似的排程多了,就会出现在某个时候数据库的资源使用率特别高。
1.2. 解决思路
1、将参与限时活动的商品保存在另外一张表。
2、使用消息队列机制,选择限时商品的时候将商品信息和限时的时间传入消息队列。
3、创建一个定时任务。
4、当时间到了定时任务就将在限时商品表删除此商品。
1.3. 定时消息任务的实现
这边就不去操作数据库了,就演示一下要如何实现这样的定时任务。同时也不演示kafka是如何搭建的,这边就直接用起来。
生产者代码
生产者代码主要实现了将 商品信息、数据库链接信息、定时时间传入kafka中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from pykafka import KafkaClient
import simplejson asjson
import logging
import time
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
logging.basicConfig()
if__name__=='__main__':
# 可接受多个Client这是重点
client=KafkaClient(hosts="192.168.137.12:9092")
# 选择一个topic
topic=client.topics['test']
# 创建一个生产者
producer=topic.get_producer()
producer.start()
# 生产消息
msg_dict={
"sleep_time":10,
"db_config":{
"database" :"test",
"host" :"192.168.137.12",
"user" :"root",
"password" :"root"
},
"table" :"msg",
"msg" :"Hello World"
}
msg=json.dumps(msg_dict)
producer.produce(msg)
producer.stop()
|
消费者代码
消费者代码中主要是实现了将接收的消息放入定时任务中(timer)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from pykafka.common import OffsetType
from pykafka import KafkaClient
from threading import Timer
import simplejson asjson
import logging
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
logging.basicConfig()
def func(msg):
'''
定时执行的任务
'''
print msg
if__name__=='__main__':
# 可接受多个Client这是重点
client=KafkaClient(hosts="192.168.137.12:9092")
# 查看所有topic
client.topics
# 选择一个topic
topic=client.topics['test']
# 使用这种一个topic只能允许一个consumer_group消费
balanced_consumer=topic.get_balanced_consumer(
consumer_group='test_group1',
auto_commit_enable=True,
zookeeper_connect='localhost:2181'
)
formessage inbalanced_consumer:
ifmessage isnotNone:
# 创建定时任务
timer=Timer(10,func,args=[message.value])
timer.start()
|
1.4. 补充
之前我说过消息信息需要存入数据库,这边的原因是主要是怕这个消费者程序奔溃重启是还能恢复。还有其实我们应该使用zookeeper开发分布式程序。当一个消费者程序崩溃了另外一个需要马上接进来(有兴趣的可以去研究一下kazoo,并实现分布式程序和leader选举)。
分享到:
相关推荐
STM32F103搭载FreeRTOS系统,工程中有2个任务,任务一1秒发送一次数字至任务队列,任务二接收来自任务一和串口中断的填充数据并打印。串口采用DMA+闲时中断方式,串口接收的数据转发到队列中。工程将USART重定义到...
消息队列延迟定时任务是软件开发中一种常见且重要的技术,它主要用于处理那些需要在特定时间点执行的任务,比如订单超时处理、定时发送邮件等。在这个场景中,Redis作为一个功能丰富的键值存储系统,被广泛用作消息...
搭载FreeRTOS系统,任务一向消息队列填充数字,任务二从消息队列提取数据并发送到串口1,同时有LED灯跟随数据传送亮灭。 这里我们的课程设计内容。 对于STM32和FreeRTOS初学者以及想了解RTOS的任务机制与消息队列的...
延迟队列是一种非常实用的解决方案,而Redis也具备延迟队列的功能,这里博主将和大家分享基于Redis的Zset数据类型+定时任务实现延迟队列。 redis常见的实现延迟队列的方案 ❶ 通过过期key通知实现 ❷ 通过Zset数据...
通过以上分析,我们可以看出“1 消息队列MQ+多线程任务+业务处理”这一主题涉及到的技术深度和广度,它涵盖了软件设计中的并发处理、消息传递、系统优化等多个重要方面。在实际开发中,熟练掌握这些知识点对于提升...
队列的使用场景广泛,如邮件发送、定时任务、批量数据处理等,通过队列可以避免一次性处理大量请求导致的服务器压力,提升系统响应速度。 PHP-Redis队列则是在PHP中利用Redis内存数据库作为队列存储的解决方案。...
"使用消息队列+js实现分布式服务器热切换业务处理功能" 这个标题揭示了我们要探讨的核心技术点。首先,"消息队列"是分布式系统中常用的一种中间件,它用于在不同服务之间传递消息,降低耦合度,提高系统的可扩展性和...
这里,我们探讨的主题是“Go-管理任务定时将任务放入消息队列直到任务关闭”,这个系统设计主要涉及到以下几个核心概念和技术: 1. **任务管理**:任务管理是整个系统的基石,包括创建、更新、查询以及删除任务。...
rabbitmq+消息队列+仿QQ的java代码+SDA实习报告
消息队列规划: MSG[0] = 1 ---> 表示接收到上位机的请求命令 MSG[0] = 2 ---> 表示接收到任务5的的SD2405时钟芯片的时间 MSG[1] = 年 MSG[2] = 月 MSG[3] = 日 MSG[4] = 时 MSG[5] = 分 MSG[6] = ...
总结起来,"swing+项目管理+消息队列+socket通信.7z"项目综合运用了Swing来构建用户界面,利用Apache POI操作Excel进行项目管理,借助消息队列实现组件间的异步通信,以及通过Socket实现实时交流。这样的设计确保了...
编程语言+JAVAspring+消息队列+异步通信**:这是一个关于JAVAspring编程语言的消息队列的异步通信的资源,适合有一定JAVAspring基础的开发者。它介绍了JAVAspring的消息队列的概念、原理和作用,以及如何使用JAVA...
定时任务可能用于触发生产者生成数据,或者消费者从队列中取数据的逻辑,以便在预设的时间间隔内执行特定的操作。 4. **Spring框架集成**:Spring框架简化了多线程编程,提供了声明式事务管理、自动依赖注入等特性...
在IT行业中,消息队列(Message Queue)是一种重要的中间件技术,它允许应用程序之间通过异步通信进行数据交换。在本教程中,我们将探讨如何整合Spring框架与ActiveMQ消息队列,实现前后台的消息传递。这有助于提升...
消息队列是分布式系统中的一种重要技术,它用于在不同组件之间传递消息,实现解耦、异步处理和负载均衡。RabbitMQ是一款广泛应用的消息中间件,基于AMQP(Advanced Message Queuing Protocol)协议,提供了高可用性...
本教程将重点讨论如何在SpringBoot2.x框架下,结合RabbitMQ消息队列,实现消息的100%可靠性投递,并通过定时任务确保消息无丢失。首先,我们要理解RabbitMQ的核心概念及其在SpringBoot中的集成。 RabbitMQ是一个...
读书笔记:高性能秒杀系统redis限流+redis缓存+kafka消息队列+mysql乐观锁