`

redis实现异步消息处理

阅读更多

 

 如何使用redis实现消息的订阅与发布

 

 测试类

  1. /**
  2. *redis消息订阅发布测试类
  3. */
  4. @Controller
  5. @RequestMapping(value="/queue")
  6. publicclassTestControlller{
  7. @Autowired
  8. privateRedisTemplate redisService;
  9. static{
  10. newTestControlller().initThread("msgQueue");
  11. }
  12. /**订阅消息-启动消费队列数据线程**/
  13. public void initThread(String key){
  14. newThread(new blpopMessageHandler(key)).start();
  15. }
  16. /**发布消息--往队列里写数据**/
  17. @RequestMapping(value="/brpush")
  18. @UnSession
  19. @ResponseBody
  20. publicboolean brpush(String key,String name,int age)throwsException{
  21. boolean flag = redisService.rpush(key,JSON.toJSONString(newEdu(name,age)));
  22. return flag;
  23. }
  24. /**线程实时监听获取数据**/
  25. class blpopMessageHandler implementsRunnable{
  26. privateString key;
  27. privateRedisTemplate redisTemplate=(RedisTemplate)SpringContextUtil.getBean("redisTemplate");
  28. publicString getKey(){
  29. return key;
  30. }
  31. publicvoid setKey(String key){
  32. this.key = key;
  33. }
  34. public blpopMessageHandler(String key){
  35. this.key = key;
  36. }
  37. @Override
  38. publicvoid run(){
  39. do{
  40. //获取到数据
  41. String result = redisTemplate.blpop(1000,key);
  42. if(EmptyUtil.isNotEmpty(result)){
  43. //do something
  44. Edu edu = JSON.parseObject(result,Edu.class);
  45. System.out.println("【线程一】"+edu.getName()+"---->"+edu.getAge());
  46. }
  47. }while(true);
  48. }
  49. }
  50. }

model类

  1.  
  2. publicclassEduimplementsSerializable{
  3. privatestaticfinallong serialVersionUID =-6336530413316596246L;
  4. privateString name;
  5. privateint age;
  6. publicString getName(){
  7. return name;
  8. }
  9. publicvoid setName(String name){
  10. this.name = name;
  11. }
  12. publicint getAge(){
  13. return age;
  14. }
  15. publicvoid setAge(int age){
  16. this.age = age;
  17. }
  18. publicEdu(String name,int age){
  19. this.name = name;
  20. this.age = age;
  21. }
  22. @Override
  23. publicString toString(){
  24. return"Edu{"+
  25. "name='"+ name +'\''+
  26. ", age="+ age +
  27. '}';
  28. }
  29. }

操作redis类

  1.  
  2. /**
  3. * 实现Jedis对象的封装,实现操作
  4. */
  5. @Component
  6. publicclassRedisTemplate{
  7. privatestaticLogger logger =LoggerFactory.getLogger(RedisTemplate.class);
  8. // 封装一个pool池用于jedis对象的管理
  9. privatePool<Jedis> jedisPool;
  10. publicRedisTemplate(Pool<Jedis> jedisPool){
  11. this.jedisPool = jedisPool;
  12. }
  13. /**
  14. * 存储REDIS队列 顺序存储
  15. * @param key reids键名
  16. * @param value 键值
  17. */
  18. publicboolean rpush(String key,String value){
  19. Jedis jedis =null;
  20. boolean flag =false;
  21. try{
  22. jedis = jedisPool.getResource();
  23. jedis.rpush(key.getBytes("utf-8"),value.getBytes("utf-8"));
  24. flag =true;
  25. }catch(Exception e){
  26. //释放redis对象
  27. jedisPool.returnBrokenResource(jedis);
  28. e.printStackTrace();
  29. }finally{
  30. //返还到连接池
  31. close(jedis);
  32. }
  33. return flag;
  34. }
  35. /**
  36. * 获取队列数据
  37. * @param key 键名
  38. * @return
  39. */
  40. publicString lpop(String key){
  41. byte[] bytes =null;
  42. Jedis jedis =null;
  43. try{
  44. jedis = jedisPool.getResource();
  45. bytes = jedis.lpop(key.getBytes("utf-8"));
  46. }catch(Exception e){
  47. //释放redis对象
  48. jedisPool.returnBrokenResource(jedis);
  49. e.printStackTrace();
  50. }finally{
  51. //返还到连接池
  52. close(jedis);
  53. }
  54. if(EmptyUtil.isNotEmpty(bytes)){
  55. try{
  56. returnnewString(bytes,"utf-8");
  57. }catch(UnsupportedEncodingException e){
  58. e.printStackTrace();
  59. }
  60. }
  61. returnnull;
  62. }
  63. /**
  64. * 获取阻塞list中的数据
  65. * @param timeout
  66. * @param key
  67. * @return
  68. */
  69. publicString blpop(int timeout,String key){
  70. List<String> list =newArrayList<>();
  71. Jedis jedis =null;
  72. try{
  73. jedis = jedisPool.getResource();
  74. list = jedis.blpop(timeout,key);
  75. }catch(Exception e){
  76. //释放redis对象
  77. jedisPool.returnBrokenResource(jedis);
  78. e.printStackTrace();
  79. }finally{
  80. //返还到连接池
  81. close(jedis);
  82. }
  83. returnEmptyUtil.isNotEmpty(list)&& list.size()>1? list.get(1):null;
  84. }
  85. privatevoid close(Jedis jedis){
  86. try{
  87. jedisPool.returnResource(jedis);
  88. }catch(Exception e){
  89. if(jedis.isConnected()){
  90. jedis.quit();
  91. jedis.disconnect();
  92. }
  93. }
  94. }
  95. }
分享到:
评论

相关推荐

    微服务SpringBoot整合Redis基于Redis的Stream消息队列实现异步秒杀下单

    【微服务SpringBoot整合Redis基于Redis的Stream消息队列实现异步秒杀下单】这篇文章主要讲解了如何在微服务架构中使用SpringBoot整合Redis来构建一个基于Redis Stream的消息队列,以此来实现实时、高效的异步秒杀...

    PHP中利用redis实现消息队列处理高并发请求思路详解.rar

    消息队列是一种异步处理机制,它允许应用程序将任务放入队列,而无需等待其完成。后台工作者会从队列中取出任务并执行,从而实现了请求的非阻塞处理。这样,前端可以快速响应用户,而不会因为等待耗时操作而阻塞。 ...

    基于redis实现的消息队列

    消息队列的核心思想是将生产者(Producer)和消费者(Consumer)解耦,生产者将消息放入队列,而消费者从队列中取出并处理消息。这种方式可以确保即使生产者和消费者在速度、生命周期或网络方面存在差异,也不会影响...

    swoole + redis 实现短信异步发送

    使用Redis实现消息队列 为了实现短信异步发送,我们可以利用Redis的消息队列功能。具体步骤如下: 1. **确保Redis已安装** 首先确认服务器上已经安装了Redis以及PHP的Redis扩展。 2. **编写发布代码** 创建一...

    Redis延时消息队列基于swoole实现的多进程消费端

    在消息队列的上下文中,每个子进程都可以视为一个消费者,它们并行地从Redis获取和处理消息,提升了整个系统的吞吐量。 在描述中提到的“Serve 基于Swoole Server 编写的消息队列消费系统”,可能是指一个名为Serve...

    redis + ajax实现异步下拉列表加载

    "redis + ajax实现异步下拉列表加载"的主题聚焦于如何结合Redis内存数据库和Ajax技术来实现在网页中动态加载下拉列表内容。下面我们将详细探讨这两个关键技术和它们如何协同工作。 首先,Redis是一个开源的、基于...

    异步redis + java源码

    Java中,我们可以使用Jedis或Lettuce等库来实现异步Redis。 1. Jedis vs Lettuce Jedis是较早的Redis Java客户端,提供了同步和异步API。然而,对于异步支持,Jedis的实现相对较弱。Lettuce,另一方面,专注于提供...

    mqtt+springBoot+redis消息处理,

    本项目将这三者结合,实现了一个基于MQTT的设备消息处理系统,通过Spring Boot进行服务端的管理和控制,并利用Redis存储和分发消息。 首先,让我们详细了解一下MQTT协议。MQTT设计的目标是低带宽、高延迟和不可靠...

    PHP swoole和redis异步任务实现方法分析

    Redis实现异步任务的方法则有所不同,主要依赖于它的发布订阅机制: 1. 创建一个发布者(Publisher)脚本,连接Redis服务器并发布消息到特定频道(Channel)。 2. 创建一个订阅者(Subscriber)脚本,连接Redis...

    Redis从入门到精通(九)Redis实战(六)基于Redis队列实现异步秒杀下单 测试项目代码

    在秒杀场景下,我们可以将用户请求作为元素放入队列,然后通过后台进程逐个处理这些请求,实现异步处理。 1. **设置秒杀商品库存**: 在秒杀开始前,我们需要在Redis中为每个商品设置库存,可以使用`INCRBY`命令...

    PHP+Redis 的队列处理程序

    3. 错误处理和重试机制:当消费者无法处理消息时,项目可能包含一个机制将消息放回队列,或者存储在一个错误队列中,以供后续处理。 4. 监控和日志:为了确保系统的稳定运行,监控任务的进度和状态,以及记录日志,...

    基于redis实现高并发异步秒杀点评项目

    在本项目中,我们将探讨如何基于Redis实现一个高并发、异步的秒杀系统,以确保在流量高峰期能够稳定运行,避免传统数据库可能出现的性能瓶颈。 **一、Redis介绍** Redis是一个开源的内存数据结构存储系统,它可以...

    Java利用Redis实现消息队列的示例代码

    * 消息队列可以应用于多种场景,如异步处理、任务队列、消息中间件等。 * 消息队列可以提高系统的可扩展性和可靠性。 六、为什么选择Redis * Redis是高速的NoSQL数据库,适合用于实现消息队列。 * Redis具有高性能...

    基于mq和redis实现的秒杀系统

    2. **异步处理**:将秒杀操作转化为消息发送,后台服务接收到消息后再执行秒杀逻辑,提高系统响应速度。 3. **负载均衡**:多个消费者订阅同一消息队列,实现请求的分布式处理,平衡服务器负载。 4. **容错机制**:...

    Go-Gores-基于Redis的异步作业执行系统

    `Go-Gores` 是一个用Go语言编写的高效、轻量级的异步作业执行系统,它利用了Redis作为中间件来实现任务的调度与分发。设计的目标是为分布式系统提供可靠且灵活的定时任务解决方案,支持高并发和大规模数据处理场景。...

    用来记录发生的事件的web App,配置了完全支持Redis以及sidekiq异步消息处理。(毕设&课设&实训&大作业&竞赛&项

    用来记录发生的事件的web App,配置了完全支持Redis以及sidekiq异步消息处理。以dip compose up方式也可运作起来,同时,在工程根目录下也可以允许rails s 。.zip项目工程资源经过严格测试可直接运行成功且功能正常...

    利用redis rightPop 和 redis stream 实现消息队列

    4. **确认消费**: 消费者在处理完消息后,使用`XACK`命令将消息标记为已处理,从Stream的待处理消息列表中移除。 Redis Stream还提供了其他高级特性,如流的限流(通过`XLEN`和`XRANGE`命令)、消息过期策略(通过`...

Global site tag (gtag.js) - Google Analytics