`
QING____
  • 浏览: 2250678 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Spring-data-redis: pub/sub消息订阅

 
阅读更多

    Redis中pub/sub特性,可以用来实现类似与JMS的“topic”功能,只不过这些消息无法被持久化而已。spring-data-redis组件中对pub/sub提供了类似JMS的编程模式,我们通过实例来展示如何使用。

    需要注意的是,在redis中消息的订阅端(subscribe)需要独占链接,那么消息接收将是阻塞的。

    代码实例中,使用了“连接池”/“消息异步接受”“消息并发处理”,请根据需要调整相关参数。

    1) Redis中"pub/sub"的消息,为"即发即失",server不会保存消息,如果publish的消息,没有任何client处于"subscribe"状态,消息将会被丢弃.如果client在subcribe时,链接断开后重连,那么此期间的消息也将丢失.Redis server将会"尽力"将消息发送给处于subscribe状态的client,但是仍不会保证每条消息都能被正确接收.

    2) 如果期望pub/sub的消息时持久的,那么需要借助额外的功能.参见"pub/sub持久化订阅"

 

一.配置文件

 

<beans xmlns="http://www.springframework.org/schema/beans" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd" default-autowire="byName">
	<bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">
		<property name="maxActive" value="32"></property>
		<property name="maxIdle" value="6"></property>
		<property name="maxWait" value="15000"></property>
		<property name="minEvictableIdleTimeMillis" value="300000"></property>
		<property name="numTestsPerEvictionRun" value="3"></property>
		<property name="timeBetweenEvictionRunsMillis" value="60000"></property>
		<property name="whenExhaustedAction" value="1"></property>
	</bean>
	<bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory" destroy-method="destroy">
		<property name="poolConfig" ref="jedisPoolConfig"></property>
		<property name="hostName" value="127.0.0.1"></property>
		<property name="port" value="6379"></property>
		<property name="password" value="0123456"></property>
		<property name="timeout" value="15000"></property>
		<property name="usePool" value="true"></property>
	</bean>
	<bean id="jedisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
		<property name="connectionFactory" ref="jedisConnectionFactory"></property>
		<property name="defaultSerializer">
			<bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
		</property>
	</bean>
	
	<bean id="topicMessageListener" class="com.sample.redis.sdr.TopicMessageListener">
		<property name="redisTemplate" ref="jedisTemplate"></property>
	</bean>
	<bean id="topicContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer" destroy-method="destroy">
		<property name="connectionFactory" ref="jedisConnectionFactory"/>
		<property name="taskExecutor"><!-- 此处有个奇怪的问题,无法正确使用其他类型的Executor -->
			<bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler">
				<property name="poolSize" value="3"></property>
			</bean>
		</property>
		<property name="messageListeners">
			<map>
				<entry key-ref="topicMessageListener">
					<bean class="org.springframework.data.redis.listener.ChannelTopic">
						<constructor-arg value="user:topic"/>
					</bean>
				</entry>
			</map>
		</property>
	</bean>
</beans>

 

 

二.消息发布(pub):

 

String channel = "user:topic";
//其中channel必须为string,而且“序列化”策略也是StringSerializer
//消息内容,将会根据配置文件中指定的valueSerializer进行序列化
//本例中,默认全部采用StringSerializer
//那么在消息的subscribe端也要对“发序列化”保持一致。
redisTemplate.convertAndSend(channel, "from app 1");

 

 

三.消息接收(subscribe):

   1) TopicMessageListener类:

public class TopicMessageListener implements MessageListener {

	private RedisTemplate redisTemplate;
	
	public void setRedisTemplate(RedisTemplate redisTemplate) {
		this.redisTemplate = redisTemplate;
	}

	@Override
	public void onMessage(Message message, byte[] pattern) {
		byte[] body = message.getBody();//请使用valueSerializer
		byte[] channel = message.getChannel();
		//请参考配置文件,本例中key,value的序列化方式均为string。
		//其中key必须为stringSerializer。和redisTemplate.convertAndSend对应
		String itemValue = (String)redisTemplate.getValueSerializer().deserialize(body);
		String topic = (String)redisTemplate.getStringSerializer().deserialize(channel);
		//...
	}
}

   2) 你会发现上述编程风格非常像JMS。需要注意的是消息体的反序列化。

 

3
0
分享到:
评论
6 楼 imxood 2016-04-24  
谢谢啦分享
5 楼 whuvr 2015-05-05  
亲,需要哪些JAR包,应该说明一下吧。。不然别人要搞好久。。
4 楼 QING____ 2014-04-11  
death0320 写道
你好,看了你的关于使用spring-data-redis的文章,里面主要是处理收的信息,我现在有个需要是要在订阅时就做一些操作。我看了一下源代码,发现RedisMessageListenerContainer类中有个属性是可以设置的subscriptionExecutor,我就自己写了一个implement Executor的类来做处理,主要是在redis中生成个集合,为我订阅时缓存信息用的,这个是满足了要求,也看到redis里有了一个集合了,但原来可以监听到的发布信息,就无法订阅到了,我看了一下,发布是正常的,就是无法订阅到。在spring配置如下:
<property name="subscriptionExecutor"><!-- 此处解决在订阅时所需要处理的操作 -->
      <bean class="pf.redis.spring.SubExecutor" parent="redisBase">
        <property name="clientId" value="subClient-2"/>
        <property name="channel" value="channel.test"/>
      </bean>
    </property>

很不好意思,最近比较忙,未能回复你的问题,在spring中,有些executor的实现是不能产生效果的,你先看看源码,是不是你的executor没有实现正确的spring中的某些接口。

稍后,我再重现你的问题。抱歉。
3 楼 death0320 2014-04-09  
你好,看了你的关于使用spring-data-redis的文章,里面主要是处理收的信息,我现在有个需要是要在订阅时就做一些操作。我看了一下源代码,发现RedisMessageListenerContainer类中有个属性是可以设置的subscriptionExecutor,我就自己写了一个implement Executor的类来做处理,主要是在redis中生成个集合,为我订阅时缓存信息用的,这个是满足了要求,也看到redis里有了一个集合了,但原来可以监听到的发布信息,就无法订阅到了,我看了一下,发布是正常的,就是无法订阅到。在spring配置如下:
<property name="subscriptionExecutor"><!-- 此处解决在订阅时所需要处理的操作 -->
      <bean class="pf.redis.spring.SubExecutor" parent="redisBase">
        <property name="clientId" value="subClient-2"/>
        <property name="channel" value="channel.test"/>
      </bean>
    </property>
2 楼 QING____ 2013-06-17  
rox 写道
不错的文章,非常感谢!
这里是去年底,研究Redis时,发现的一篇文章,也不错。推荐一下:
http://blog.springsource.org/2012/05/16/spring-mvc-3-2-preview-chat-sample/

谢谢,稍后拜读一下你提供的资讯.
1 楼 rox 2013-06-17  
不错的文章,非常感谢!
这里是去年底,研究Redis时,发现的一篇文章,也不错。推荐一下:
http://blog.springsource.org/2012/05/16/spring-mvc-3-2-preview-chat-sample/

相关推荐

    springMVC集成spring-data-redis

    在SpringMVC中集成Spring Data Redis,可以利用Redis的高效特性来提升应用程序的数据处理能力,例如作为session共享的存储、缓存数据或者实现发布/订阅(Pub/Sub)功能。发布/订阅是一种通信模式,允许发送者(pub)将...

    redis-spring-pub_sub

    标题 "redis-spring-pub_sub" 暗示了我们关注的是如何在Spring框架中使用Redis作为发布/订阅(pub/sub)消息系统。这个主题涵盖了两个主要方面:Redis的发布/订阅功能和Spring对它的集成。 Redis是一个高性能的键值...

    SpringDataRedis的jar包.rar

    8. **消息支持**:Spring Data Redis提供了对Redis Pub/Sub(发布/订阅)的支持,允许开发构建基于消息的应用程序,实现异步通信和解耦。 9. **Key的过期策略**:可以通过`expire()`, `expireAt()`等方法设置Redis...

    spring-data-redis-1.6.0.RELEASE最新稳定版(个人测试通过)

    5. **消息监听**:Spring Data Redis 还支持 Redis Pub/Sub(发布/订阅)模式,允许应用程序监听特定频道或模式的消息,实现事件驱动的架构。 6. **事务支持**:尽管 Redis 自身不支持传统的关系型数据库事务,但 ...

    spring-redis-websocket:使用Spring Boot和Redis PubSub的多实例React式WebSocket消息传递聊天应用程序演示

    websocket使用Spring Boot WebFlux和Redis Pub / Sub的多实例React式聊天应用程序可扩展的Java 11 Spring Boot WebFlux聊天应用程序,用于演示如何使用Reactive 使用Reactive Redis ,而无需使用任何外部Message ...

    spring-data-redis最新架包

    此外,Spring Data Redis还支持事务(Transactions)和发布/订阅(Pub/Sub)模式。通过`TransactionOperations`接口,开发者可以执行多条命令并在一个原子操作中提交或回滚。发布/订阅模式允许实时通信,通过`...

    spring-redis-session 自定义 key 和过期时间

    Spring-Redis-Session 的实现原理是基于 Redis 的 Pub/Sub 机制和 Hash 数据结构。它使用 Redis 的 Hash 结构来存储会话数据,每个会话对应一个 Hash 结构,其中包含了会话的基本信息、用户设置的属性信息和过期时间...

    spring-data-redis英文版

    7. Redis消息/发布订阅(Pub/Sub):这涉及到Redis消息传递机制,说明了如何在Spring Data Redis中发送和接收消息。 8. Redis事务:介绍如何使用Spring的@Transactional注解来管理Redis的事务。 9. 管道...

    spring data redis 官方文档

    5. **消息传递/发布订阅**:利用 Redis 的 Pub/Sub 功能实现消息传递机制,支持消息发送和接收。 6. **事务管理**:支持基于注解的事务管理,简化了复杂操作的原子性控制。 7. **管道(Pipelining)**:通过管道批量...

    Spring Redis操作手册

    - **Topic与Pub/Sub:** 实现基于主题的消息发布/订阅模型。 - **Cache支持:** 介绍了如何利用Redis作为缓存系统,提高应用性能。 #### 四、NoSQL数据库设计 - **一般经验分享:** - **不持久化业务实体:** 建议...

    spring data redis api jar

    9. **Redis Pub/Sub支持**:能够方便地订阅和发布消息,实现分布式消息通信。 10. **Spring整合**:Spring Data Redis与Spring框架深度集成,支持Spring Boot自动配置,可以快速搭建Redis相关的应用。 使用Spring ...

    spring-data-redis:通过spring框架实现对redis的封装访问

    Spring Data Redis也提供了发布/订阅(Pub/Sub)功能。通过`MessageListenerAdapter`和`SimpleMessageListenerContainer`,可以轻松实现消息监听。 六、事务支持 Spring Data Redis支持事务操作,但需要注意Redis...

    spring data for redis

    - **RedisListenerContainerFactory**: 用于创建Redis监听容器,监听Redis的消息(如PUB/SUB)。 - **RedisMessageListener**: 实现该接口可以处理接收到的Redis消息。 ### 5. Redis配置 在Spring Boot应用中,...

    spring-redis-boot-starter-1.0.0_java_

    Redis不仅仅是一个键值存储,还可以作为一个高效的发布/订阅(pub/sub)消息系统。在Spring Boot应用中,可以使用RedisTemplate或JedisConnectionFactory来实现消息的发布和订阅。这使得开发者可以构建基于事件驱动...

    spring-boot-redis.zip

    Spring Boot结合Redis的发布/订阅(Pub/Sub)模式,可以实现消息的实时同步,确保在分布式系统中的数据一致性。 总结来说,"spring-boot-redis.zip"项目展示了如何在Spring Boot应用中整合MySQL、MyBatis和Redis,...

    spring-data-redis:当使用键值存储Redis时,提供支持以提高Java开发人员的生产率。 使用熟悉的Spring概念,例如用于核心API使用和轻量级存储库样式数据访问的模板类

    6. **消息订阅/发布**:Spring Data Redis 还包含了对Redis Pub/Sub(发布/订阅)模式的支持,使得应用可以实现事件驱动的架构,订阅特定频道或者模式,接收和处理消息。 7. **持久化策略**:框架提供了过期策略和...

    SpringDataRedis.rar

    - SpringDataRedis也支持Redis的发布订阅(Pub/Sub)模式,可以注册监听器来响应特定频道或模式的消息。 8. **RedisTemplate与ReactiveRedisTemplate**: - Spring Data Redis 2.x引入了ReactiveRedisTemplate,...

    Redis 发布订阅 Demo

    Redis 发布订阅(Pub/Sub)概念** 发布订阅模式允许消息生产者(Publisher)发送消息到特定的频道(Channel),而多个消息消费者(Subscriber)可以订阅这些频道,一旦有新消息发布,所有订阅了该频道的消费者都会...

    Redis-x64-5.0.10.zip

    5. **发布/订阅**:Redis内置了发布/订阅(pub/sub)消息系统,可用于构建实时的消息传递功能,如聊天应用或者实时通知。 6. **丰富的数据类型**:Redis支持字符串、哈希、列表、集合、有序集合等多种数据结构,...

    Spring Boot 实战 - redis

    Redis还提供了发布订阅(Pub/Sub)功能,可以用于构建实时的消息传递系统。Spring Data Redis的`MessageListenerAdapter`可以帮助我们实现监听器: ```java @Bean public MessageListenerAdapter ...

Global site tag (gtag.js) - Google Analytics