`
taiwei.peng
  • 浏览: 233940 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

redis 发布 订阅

阅读更多

1.服务端
package com.dz.im.tools;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

/**
* 消息发布端
* @author David
*
*/
public class PubClient {

// Redis服务器IP
private static String ADDR = "192.168.0.11";

// Redis的端口号
private static int PORT = 6379;

// 如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。
private static int MAX_ACTIVE = 500;

// 控制一个pool最多有多少个状态为idle(空闲的)的jedis实例,默认值也是8。
private static int MAX_IDLE = 100;

// 等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时。如果超过等待时间,则直接抛出JedisConnectionException;
private static int MAX_WAIT = 1000;

// 当调用borrow Object方法时,是否进行有效性检查
private static Boolean TEST_ON_BORROW = false;

// 当调用return Object方法时,是否进行有效性检查
private static Boolean TEST_ON_RETURN = false;

// 超时时间
private static int TIMEOUT = 1000000;

private static JedisPool jedisPool = null;

/**
* 初始化Redis连接池
*/
private static void initialPool() {
try {
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxActive(MAX_ACTIVE);
config.setMaxIdle(MAX_IDLE);
config.setMaxWait(MAX_WAIT);
config.setTestOnBorrow(TEST_ON_BORROW);
config.setTestOnReturn(TEST_ON_RETURN);
jedisPool = new JedisPool(config, ADDR, PORT, TIMEOUT);
} catch (Exception e) {
e.printStackTrace();
LoggerUtil.MyLogger.error("init jedispoll error :%s", e);
}
}

/**
* 在多线程环境同步初始化
*/
private static synchronized void poolInit() {
if (jedisPool == null) {
initialPool();
}
}

/**
* 释放jedis资源
*
* @param jedis
*/
private static void returnResource(final Jedis jedis) {
if (jedis != null && jedisPool != null) {
jedisPool.returnResource(jedis);
}
}

/**
* 销毁连接
*
* @param shardedJedis
*/
private static void returnBrokenResource(final Jedis jedis) {
try {
jedisPool.returnBrokenResource(jedis);
} catch (Exception e) {
LoggerUtil.MyLogger.error("returnBrokenResource error", e);
}
}

/**
* 推入消息到redis消息通道
*
* @param channel
* @param message
*/
public void publish(String channel, String message) {
Jedis jedis = null;
if (jedisPool == null) {
poolInit();
}
try {
if (jedisPool != null) {
jedis = jedisPool.getResource();
jedis.publish(channel, message);
}
} catch (Exception ex) {
ex.printStackTrace();
returnBrokenResource(jedis);
} finally {
returnResource(jedis);
}
}

public void close(String channel){ 
Jedis jedis = null;
if (jedisPool == null) {
poolInit();
}
try {
if (jedisPool != null) {
jedis = jedisPool.getResource();
jedis.publish(channel, "quit");
jedis.del(channel);
}
} catch (Exception ex) {
ex.printStackTrace();
returnBrokenResource(jedis);
} finally {
returnResource(jedis);
}


}

2.客户端
package com.dz.im.tools;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
/**
* 消息订阅端
*
* @author David
*
*/
public class SubClient {

// Redis服务器IP
private static String ADDR = "192.168.0.11";

// Redis的端口号
private static int PORT = 6379;

// 如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。
private static int MAX_ACTIVE = 500;

// 控制一个pool最多有多少个状态为idle(空闲的)的jedis实例,默认值也是8。
private static int MAX_IDLE = 100;

// 等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时。如果超过等待时间,则直接抛出JedisConnectionException;
private static int MAX_WAIT = 1000;

// 当调用borrow Object方法时,是否进行有效性检查
private static Boolean TEST_ON_BORROW = false;

// 当调用return Object方法时,是否进行有效性检查
private static Boolean TEST_ON_RETURN = false;

// 超时时间
private static int TIMEOUT = 1000000;

private static JedisPool jedisPool = null;

/**
* 初始化Redis连接池
*/
private static void initialPool() {
try {
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxActive(MAX_ACTIVE);
config.setMaxIdle(MAX_IDLE);
config.setMaxWait(MAX_WAIT);
config.setTestOnBorrow(TEST_ON_BORROW);
config.setTestOnReturn(TEST_ON_RETURN);
jedisPool = new JedisPool(config, ADDR, PORT, TIMEOUT);
} catch (Exception e) {
e.printStackTrace();
LoggerUtil.MyLogger.error("init jedispoll error :%s", e);
}
}

/**
* 在多线程环境同步初始化
*/
private static synchronized void poolInit() {
if (jedisPool == null) {
initialPool();
}
}

/**
* 释放jedis资源
*
* @param jedis
*/
private static void returnResource(final Jedis jedis) {
if (jedis != null && jedisPool != null) {
jedisPool.returnResource(jedis);
}
}

/**
* 销毁连接
*
* @param shardedJedis
*/
private static void returnBrokenResource(final Jedis jedis) {
try {
jedisPool.returnBrokenResource(jedis);
} catch (Exception e) {
LoggerUtil.MyLogger.error("returnBrokenResource error", e);
}
}

/**
* 推入消息到redis消息通道
*
* @param channel
* @param message
*/
public void subscribe(JedisPubSub listener, String channel) {
Jedis jedis = null;
if (jedisPool == null) {
poolInit();
}
try {
if (jedisPool != null) {
jedis = jedisPool.getResource();
jedis.subscribe(listener, channel);
}
} catch (Exception ex) {
ex.printStackTrace();
returnBrokenResource(jedis);
} finally {
returnResource(jedis);
}
}
}

3.客户端监听
package com.dz.im.tools;

import java.text.SimpleDateFormat;
import java.util.Date;

import redis.clients.jedis.JedisPubSub;

/**
* 订阅者消息处理器
* @author David
*
*/
public class PrintListener extends JedisPubSub{

@Override
public void onMessage(String channel, String message) {
Date curDate=new Date();
SimpleDateFormat df=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String time = df.format(curDate);
        System.out.println("message receive:" + message + ",channel:" + channel + "..." + time); 
        //此处我们可以取消订阅 
        if(message.equalsIgnoreCase("quit")){ 
            this.unsubscribe(channel); 
        } 
}

@Override
public void onPMessage(String arg0, String arg1, String arg2) {

}

@Override
public void onPSubscribe(String arg0, int arg1) {

}

@Override
public void onPUnsubscribe(String arg0, int arg1) {

}

@Override
public void onSubscribe(String arg0, int arg1) {

}

@Override
public void onUnsubscribe(String arg0, int arg1) {

}
}
4.服务端测试类
package com.dz.im.tools;

import org.apache.commons.lang.RandomStringUtils;

public class PubSubTestMain {

public static void main(String[] args) throws Exception {
PubClient pubClient = new PubClient();
String channel = "public-test";
int i = 0;
while (i < 200) {
String message = "测试的所得税法大的"+RandomStringUtils.random(6, true, true);
pubClient.publish(channel, message);
i++;
Thread.sleep(1000);
}
}
}

 

5.客户端测试类

package com.dz.im.tools;

 

import redis.clients.jedis.JedisPubSub;

 

public class Demo {

 

private static final String channel="public-test";

 

public static void main(String[] args) {

try {

while(true){

SubClient subClient = new SubClient();

JedisPubSub listener = new PrintListener();

subClient.subscribe(listener, channel);

}

} catch (Exception e) {

e.printStackTrace();

}

}

}

 

参考http://shift-alt-ctrl.iteye.com/blog/1867454 这篇文章

 

 

分享到:
评论

相关推荐

    C# Redis发布订阅Demo

    综上所述,C# Redis发布订阅Demo涵盖了Redis Pub/Sub机制的使用以及Key过期通知的处理。通过这个Demo,开发者可以学习到如何在C#应用中利用Redis进行实时数据通信和监控,这对于构建分布式系统和实时应用非常有价值...

    redis发布订阅小案例

    在这个“redis发布订阅小案例”中,我们将探讨如何使用Java来实现Redis的发布订阅功能。首先,我们需要引入Jedis库,这是Java操作Redis的常用客户端库。确保在项目中已经添加了Jedis的依赖,例如通过Maven或Gradle。...

    基于netcore 3.0的redis发布订阅示例代码

    在本文中,我们将深入探讨如何使用.NET Core 3.0框架与Redis进行发布/订阅(Pub/Sub)通信。Redis是一个高性能的键值存储系统,它提供了丰富的数据结构,如字符串、哈希表、集合和有序集合。在.NET Core 3.0中,我们...

    Redis发布订阅.net实现

    标题"Redis发布订阅.net实现"所涉及的关键知识点包括: 1. Redis的发布订阅机制: 发布订阅模式在Redis中由`PUBLISH`和`SUBSCRIBE`命令支持。发布者通过`PUBLISH`命令将消息发送到一个特定的频道,而订阅者则通过`...

    Redis 发布订阅 Demo

    **Redis发布订阅模式详解** Redis 是一款高性能的键值存储系统,它支持多种数据结构,如字符串、哈希、列表、集合和有序集合等。在实际应用中,Redis 不仅可以作为缓存服务,还提供了丰富的消息通信机制,其中之一...

    可以工作在nginx tcp负载均衡环境中的集群聊天服务器和客户端源码,使用了redis发布订阅消息队列.zip

    可以工作在nginx tcp负载均衡环境中的集群聊天服务器和客户端源码 基于nuduo库实现 使用了redis发布订阅消息队列 数据库采用MySQL 可以工作在nginx tcp负载均衡环境中的集群聊天服务器和客户端源码 基于nuduo库实现 ...

    基于Java环境下的Redis发布订阅的设计与实现.pdf

    基于Java环境下的Redis发布订阅设计与实现 本文研究了基于Java环境下的Redis发布订阅设计与实现,以满足实时聊天系统、微博以及分布式架构等应用对数据通信中间件的需求。Redis的发布订阅功能可以实现实时通信、...

    C# Redis发布与订阅系统源码

    本项目提供了一个基于C#的Redis发布与订阅(Publish/Subscribe)系统的源码,帮助开发者了解如何在Windows Forms(Winform)应用中实现这一功能。 首先,我们需要了解Redis的发布/订阅模式。在这个模式下,发送者...

    .net core如何使用Redis发布订阅

    在探讨.NET Core如何使用Redis进行发布订阅之前,首先需要了解Redis发布订阅的基本概念。Redis的发布订阅(pub/sub)是一种消息通信模式,它包括发布者(publisher)和订阅者(subscriber)。发布者负责发布消息,而...

    基于Redis 发布订阅(pub/sub)的dtalk Device Talk前端设备控制框架设计源码

    该项目是dtalk(Device Talk)前端设备控制框架,基于Redis 发布订阅(pub/sub)机制设计,源码包含270个文件,涵盖119个Java源文件、61个PNG图片文件、21个Shell脚本文件、20个批处理文件、8个XML配置文件、7个属性文件...

    基于Redis发布订阅(pub/sub)系统实现的前端设备控制框架(java)

    基于Redis发布订阅(pub/sub)系统实现的前端设备控制框架(java)。在基于物联网的应用中,各种前端设备需要能被远程管理,当所有的设备都能直接通过ip地址访问时,设计设备管理系统时,只需要把前端设备当作一个简单的...

    QT实现redis订阅发布功能

    在本文中,我们将深入探讨如何使用Qt框架结合hiredis-1.0.0库来实现Redis的订阅发布功能。Redis是一种高性能的键值存储系统,广泛用于数据缓存、消息队列以及实时数据处理等领域。Qt则是一个跨平台的应用程序开发...

    Redis发布订阅SignalR即时通讯.rar

    本案例包含redis的发布订阅功能,以及dotnet core+SignalR实现的简单即时通信,并提供文档笔记。本案例初衷是想结合redis的发布订阅功能+websocket实现消息客户端页面订阅指定的消息,并在客户端页面进行显示;

    python 19、REDIS基础 2-1_Redis发布订阅_Day02_am.mp4

    python 19、REDIS基础 2-1_Redis发布订阅_Day02_am.mp4

    dtalk(Device Talk)基于Redis发布订阅(pub/sub)系统实现的前端设备控制框架(java)

    dtalk就是为了实现上述的目标而开发的一个Redis发布订阅(pub/sub)系统实现的前端设备控制框架,在dtalk框架上,Redis服务器用于提供中转服务。前端设备通过订阅特定的频道接收管理发送的请求消息,执行对应的功能。...

    SpringBoot + Redis实现事件的发布订阅功能

    本话题主要探讨如何利用SpringBoot和Redis实现事件的发布订阅功能,这对于实现分布式系统中的异步通信和解耦至关重要。 首先,我们需要理解SpringBoot的核心特性。SpringBoot是Spring框架的一个简化版本,它旨在...

    第11周-第14章节-Python3.5-Redis 发布订阅及本节作业.mp4

    第11周-第14章节-Python3.5-Redis 发布订阅及本节作业.mp4

    redis订阅与发布.zip

    在 Redis 中,订阅与发布(Pub/Sub)是一种消息通信模式,用于实现实时的消息传递。在这个模式下,发布者将消息发送到特定的频道,而订阅者则可以监听并接收这些频道上的消息。这种机制无需直接交互,而是通过中间的...

    Redis 发布订阅

    Redis 发布订阅(pub/sub)是 Redis 数据库中一种基于发布-订阅模式的消息通信机制,它允许应用程序通过发送和接收消息来实现不同组件之间的异步通信。在这个模式中,发送者(publisher)向特定的频道(channel)...

    Redis发布订阅和实现.NET客户端详解

    发布订阅在设计模式中可以类比为观察者模式,它解决了对象之间一对多依赖关系的问题。当发布者(Publisher)向一个频道(Channel)发布消息时,所有订阅了该频道的订阅者(Subscriber)都会收到这个消息。 Redis...

Global site tag (gtag.js) - Google Analytics