如何使用redis实现消息的订阅与发布
测试类
- /**
*redis消息订阅发布测试类
*/
@Controller
@RequestMapping(value="/queue")
publicclassTestControlller{
@Autowired
privateRedisTemplate redisService;
static{
newTestControlller().initThread("msgQueue");
}
/**订阅消息-启动消费队列数据线程**/
public void initThread(String key){
newThread(new blpopMessageHandler(key)).start();
}
/**发布消息--往队列里写数据**/
@RequestMapping(value="/brpush")
@UnSession
@ResponseBody
publicboolean brpush(String key,String name,int age)throwsException{
boolean flag = redisService.rpush(key,JSON.toJSONString(newEdu(name,age)));
return flag;
}
/**线程实时监听获取数据**/
class blpopMessageHandler implementsRunnable{
privateString key;
privateRedisTemplate redisTemplate=(RedisTemplate)SpringContextUtil.getBean("redisTemplate");
publicString getKey(){
return key;
}
publicvoid setKey(String key){
this.key = key;
}
public blpopMessageHandler(String key){
this.key = key;
}
@Override
publicvoid run(){
do{
//获取到数据
String result = redisTemplate.blpop(1000,key);
if(EmptyUtil.isNotEmpty(result)){
//do something
Edu edu = JSON.parseObject(result,Edu.class);
System.out.println("【线程一】"+edu.getName()+"---->"+edu.getAge());
}
}while(true);
}
}
}
model类
publicclassEduimplementsSerializable{
privatestaticfinallong serialVersionUID =-6336530413316596246L;
privateString name;
privateint age;
publicString getName(){
return name;
}
publicvoid setName(String name){
this.name = name;
}
publicint getAge(){
return age;
}
publicvoid setAge(int age){
this.age = age;
}
publicEdu(String name,int age){
this.name = name;
this.age = age;
}
@Override
publicString toString(){
return"Edu{"+
"name='"+ name +'\''+
", age="+ age +
'}';
}
}
操作redis类
/**
* 实现Jedis对象的封装,实现操作
*/
@Component
publicclassRedisTemplate{
privatestaticLogger logger =LoggerFactory.getLogger(RedisTemplate.class);
// 封装一个pool池用于jedis对象的管理
privatePool<Jedis> jedisPool;
publicRedisTemplate(Pool<Jedis> jedisPool){
this.jedisPool = jedisPool;
}
/**
* 存储REDIS队列 顺序存储
* @param key reids键名
* @param value 键值
*/
publicboolean rpush(String key,String value){
Jedis jedis =null;
boolean flag =false;
try{
jedis = jedisPool.getResource();
jedis.rpush(key.getBytes("utf-8"),value.getBytes("utf-8"));
flag =true;
}catch(Exception e){
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
}finally{
//返还到连接池
close(jedis);
}
return flag;
}
/**
* 获取队列数据
* @param key 键名
* @return
*/
publicString lpop(String key){
byte[] bytes =null;
Jedis jedis =null;
try{
jedis = jedisPool.getResource();
bytes = jedis.lpop(key.getBytes("utf-8"));
}catch(Exception e){
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
}finally{
//返还到连接池
close(jedis);
}
if(EmptyUtil.isNotEmpty(bytes)){
try{
returnnewString(bytes,"utf-8");
}catch(UnsupportedEncodingException e){
e.printStackTrace();
}
}
returnnull;
}
/**
* 获取阻塞list中的数据
* @param timeout
* @param key
* @return
*/
publicString blpop(int timeout,String key){
List<String> list =newArrayList<>();
Jedis jedis =null;
try{
jedis = jedisPool.getResource();
list = jedis.blpop(timeout,key);
}catch(Exception e){
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
}finally{
//返还到连接池
close(jedis);
}
returnEmptyUtil.isNotEmpty(list)&& list.size()>1? list.get(1):null;
}
privatevoid close(Jedis jedis){
try{
jedisPool.returnResource(jedis);
}catch(Exception e){
if(jedis.isConnected()){
jedis.quit();
jedis.disconnect();
}
}
}
}
相关推荐
【微服务SpringBoot整合Redis基于Redis的Stream消息队列实现异步秒杀下单】这篇文章主要讲解了如何在微服务架构中使用SpringBoot整合Redis来构建一个基于Redis Stream的消息队列,以此来实现实时、高效的异步秒杀...
消息队列是一种异步处理机制,它允许应用程序将任务放入队列,而无需等待其完成。后台工作者会从队列中取出任务并执行,从而实现了请求的非阻塞处理。这样,前端可以快速响应用户,而不会因为等待耗时操作而阻塞。 ...
消息队列的核心思想是将生产者(Producer)和消费者(Consumer)解耦,生产者将消息放入队列,而消费者从队列中取出并处理消息。这种方式可以确保即使生产者和消费者在速度、生命周期或网络方面存在差异,也不会影响...
使用Redis实现消息队列 为了实现短信异步发送,我们可以利用Redis的消息队列功能。具体步骤如下: 1. **确保Redis已安装** 首先确认服务器上已经安装了Redis以及PHP的Redis扩展。 2. **编写发布代码** 创建一...
在消息队列的上下文中,每个子进程都可以视为一个消费者,它们并行地从Redis获取和处理消息,提升了整个系统的吞吐量。 在描述中提到的“Serve 基于Swoole Server 编写的消息队列消费系统”,可能是指一个名为Serve...
"redis + ajax实现异步下拉列表加载"的主题聚焦于如何结合Redis内存数据库和Ajax技术来实现在网页中动态加载下拉列表内容。下面我们将详细探讨这两个关键技术和它们如何协同工作。 首先,Redis是一个开源的、基于...
Java中,我们可以使用Jedis或Lettuce等库来实现异步Redis。 1. Jedis vs Lettuce Jedis是较早的Redis Java客户端,提供了同步和异步API。然而,对于异步支持,Jedis的实现相对较弱。Lettuce,另一方面,专注于提供...
本项目将这三者结合,实现了一个基于MQTT的设备消息处理系统,通过Spring Boot进行服务端的管理和控制,并利用Redis存储和分发消息。 首先,让我们详细了解一下MQTT协议。MQTT设计的目标是低带宽、高延迟和不可靠...
Redis实现异步任务的方法则有所不同,主要依赖于它的发布订阅机制: 1. 创建一个发布者(Publisher)脚本,连接Redis服务器并发布消息到特定频道(Channel)。 2. 创建一个订阅者(Subscriber)脚本,连接Redis...
在秒杀场景下,我们可以将用户请求作为元素放入队列,然后通过后台进程逐个处理这些请求,实现异步处理。 1. **设置秒杀商品库存**: 在秒杀开始前,我们需要在Redis中为每个商品设置库存,可以使用`INCRBY`命令...
3. 错误处理和重试机制:当消费者无法处理消息时,项目可能包含一个机制将消息放回队列,或者存储在一个错误队列中,以供后续处理。 4. 监控和日志:为了确保系统的稳定运行,监控任务的进度和状态,以及记录日志,...
在本项目中,我们将探讨如何基于Redis实现一个高并发、异步的秒杀系统,以确保在流量高峰期能够稳定运行,避免传统数据库可能出现的性能瓶颈。 **一、Redis介绍** Redis是一个开源的内存数据结构存储系统,它可以...
* 消息队列可以应用于多种场景,如异步处理、任务队列、消息中间件等。 * 消息队列可以提高系统的可扩展性和可靠性。 六、为什么选择Redis * Redis是高速的NoSQL数据库,适合用于实现消息队列。 * Redis具有高性能...
2. **异步处理**:将秒杀操作转化为消息发送,后台服务接收到消息后再执行秒杀逻辑,提高系统响应速度。 3. **负载均衡**:多个消费者订阅同一消息队列,实现请求的分布式处理,平衡服务器负载。 4. **容错机制**:...
`Go-Gores` 是一个用Go语言编写的高效、轻量级的异步作业执行系统,它利用了Redis作为中间件来实现任务的调度与分发。设计的目标是为分布式系统提供可靠且灵活的定时任务解决方案,支持高并发和大规模数据处理场景。...
用来记录发生的事件的web App,配置了完全支持Redis以及sidekiq异步消息处理。以dip compose up方式也可运作起来,同时,在工程根目录下也可以允许rails s 。.zip项目工程资源经过严格测试可直接运行成功且功能正常...
4. **确认消费**: 消费者在处理完消息后,使用`XACK`命令将消息标记为已处理,从Stream的待处理消息列表中移除。 Redis Stream还提供了其他高级特性,如流的限流(通过`XLEN`和`XRANGE`命令)、消息过期策略(通过`...