package com.sunline.sysconf.thread;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.BlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.sunline.common.config.DefaultConfig;
import com.sunline.persist.po.DeviceInfo;
import com.sunline.persist.po.MsgSendRecord;
import com.sunline.sysconf.dao.DeviceInfoDao;
import com.sunline.sysconf.util.AppMsgUtil;
import com.sunline.sysconf.util.ConstantsConf;
import com.sunline.sysconf.vo.PushAppBean;
/**
* 消息消费线程
* @author David
*
*/
public class MsgConsumerThread implements Runnable{
private static Logger log = LoggerFactory.getLogger(MsgConsumerThread.class);
private volatile boolean isRunning= true;
private DefaultConfig defaultConfig;
private DeviceInfoDao deviceInfoDao;
private BlockingQueue<PushAppBean> queue;
public MsgConsumerThread(BlockingQueue<PushAppBean> queue,DeviceInfoDao deviceInfoDao,DefaultConfig defaultConfig) {
this.queue = queue;
this.deviceInfoDao=deviceInfoDao;
this.defaultConfig=defaultConfig;
}
@Override
public void run() {
String sAppId=defaultConfig.getVal("app.appId");
String sAppKey=defaultConfig.getVal("app.appkey");
String sAppHost=defaultConfig.getVal("app.host");
String sAppMaster=defaultConfig.getVal("app.master");
try {
while (isRunning) {
log.info("MsgConsumerThread start!");
PushAppBean pushBean = queue.poll();
if (null != pushBean) {
log.info("queue poll msgId:"+pushBean.getMsgId()+",queue size:"+queue.size());
//查询所有设备
List<DeviceInfo> deviceInfoList=deviceInfoDao.findDeviceTokenAll(pushBean.getDeptId());
int size=deviceInfoList.size();
if(size>0){
Map<String,List<String>> deviceMap=new HashMap<String,List<String>>();
for(DeviceInfo device:deviceInfoList){
String deviceType=device.getDeviceType();
List<String> deviceTokenList=deviceMap.get(deviceType);
if(null==deviceTokenList){
deviceTokenList=new ArrayList<String>();
deviceMap.put(deviceType, deviceTokenList);
}
deviceTokenList.add(device.getDeviceToken());
}
int num=deviceMap.size();
int record=0;
for(Entry<String,List<String>> entry:deviceMap.entrySet()){
//0 安卓系统 1 IOS系统
String key=entry.getKey();
List<String> tokenIds=entry.getValue();
int type="ios".equals(key)?1:0;
String code=AppMsgUtil.pushMessage(sAppId, sAppKey, sAppHost, sAppMaster,pushBean, type, tokenIds);
log.info("send code:"+code);
record+=Integer.parseInt(code);
}
if(record==num){
MsgSendRecord sendRecord=new MsgSendRecord();
sendRecord.setMsgId(pushBean.getMsgId());
sendRecord.setCreateTime(new Date());
deviceInfoDao.saveMsgSendRecord(sendRecord);
}
}
}
Thread.sleep(ConstantsConf.THREAD_SLEEP);
log.info("MsgConsumerThread end!");
}
} catch (Exception ce) {
ce.printStackTrace();
log.error("Consumer msg error",ce);
}
}
}
package com.sunline.sysconf.thread;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.sunline.sysconf.dao.DeviceInfoDao;
import com.sunline.sysconf.util.ConstantsConf;
import com.sunline.sysconf.vo.MsgInfoResVo;
import com.sunline.sysconf.vo.PushAppBean;
/**
* 消息生产线程
* @author David
*
*/
public class MsgProducerThread implements Runnable{
private static Logger log = LoggerFactory.getLogger(MsgProducerThread.class);
private BlockingQueue<PushAppBean> queue;
private volatile boolean isRunning= true;
private DeviceInfoDao deviceInfoDao;
public MsgProducerThread(BlockingQueue<PushAppBean> queue,DeviceInfoDao deviceInfoDao){
this.queue = queue;
this.deviceInfoDao=deviceInfoDao;
}
@Override
public void run() {
try {
while (isRunning) {
log.info("MsgProducerThread start!");
//查询需要发送的消息列表
List<MsgInfoResVo> msgList=deviceInfoDao.findMsgInfo();
int size=msgList.size();
if(size>0){
for(MsgInfoResVo msgVo:msgList){
PushAppBean appBean=new PushAppBean();
appBean.setTitle(msgVo.getMsgTitle());
appBean.setContent(msgVo.getMsgContent());
appBean.setMsgType(msgVo.getMsgType());
appBean.setMsgTime(msgVo.getMsgTime());
appBean.setMsgId(msgVo.getMsgId());
appBean.setDeptId(msgVo.getDeptId());
queue.offer(appBean);
log.info("queue offer msgId:"+msgVo.getMsgId()+",queue size:"+queue.size());
}
Thread.sleep((size+2)*ConstantsConf.THREAD_SLEEP);
}else{
Thread.sleep(ConstantsConf.THREAD_SLEEP);
}
log.info("MsgProducerThread end!");
}
} catch (Exception ce) {
log.error("Producer msg error",ce);
ce.printStackTrace();
}
}
public void stop() {
isRunning = false;
}
}
package com.sunline.sysconf.util;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSONObject;
import com.gexin.rp.sdk.base.IPushResult;
import com.gexin.rp.sdk.base.impl.ListMessage;
import com.gexin.rp.sdk.base.impl.Target;
import com.gexin.rp.sdk.http.IGtPush;
import com.gexin.rp.sdk.template.TransmissionTemplate;
import com.sunline.sysconf.vo.PushAppBean;
import com.sunline.sysconf.vo.PushBean;
package com.sunline.sysconf.util;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSONObject;
import com.gexin.rp.sdk.base.IPushResult;
import com.gexin.rp.sdk.base.impl.ListMessage;
import com.gexin.rp.sdk.base.impl.Target;
import com.gexin.rp.sdk.base.payload.APNPayload;
import com.gexin.rp.sdk.http.IGtPush;
import com.gexin.rp.sdk.template.TransmissionTemplate;
import com.sunline.sysconf.vo.PushAppBean;
import com.sunline.sysconf.vo.PushBean;
public class AppMsgUtil {
private static Logger log = LoggerFactory.getLogger(AppMsgUtil.class);
/**
* 推送方法
* @param sAppId
* @param sAppKey
* @param sAppHost
* @param sAppMaster
* @param pushBean
* @param osType
* @param tokenIds 0 安卓系统 1 IOS系统
* @return
*/
public static String pushMessage(String sAppId, String sAppKey,String sAppHost, String sAppMaster,PushAppBean pushBean,int osType,List<String> tokenIds){
String code="";
try{
String msgContext=pushBean.getContent();
String msgTitle=pushBean.getTitle();
String msgType=pushBean.getMsgType();
String msgTime=pushBean.getMsgTime();
//个推参数连接设置
IGtPush push = new IGtPush(sAppHost, sAppKey, sAppMaster);
//创建连接
push.connect();
//多个设备列表信息推送对象
ListMessage listMessage = new ListMessage();
//设备列表
List<Target> lstTargets = new ArrayList<Target>();
//透传消息模板
TransmissionTemplate emp=null;
if(0==osType){
emp=transmissionTemplate(sAppId, sAppKey, msgContext,msgTitle,msgType,msgTime);
}else{
emp=iosTransmissionTemplate(sAppId, sAppKey, msgTitle, msgContext, msgType, msgTime);
}
for (String sTokenId : tokenIds) {
Target target = new Target();
target.setAppId(sAppId);
target.setClientId(sTokenId);
lstTargets.add(target);
}
//透传消息模板
listMessage.setData(emp);
//设置消息离线,并设置离线时间
listMessage.setOffline(true);
//离线有效时间,单位为毫秒,可选
listMessage.setOfflineExpireTime(24 * 1000 * 3600);
// 计算taskId
String taskId = push.getContentId(listMessage);
//发送并获取返回的结果对象
IPushResult ret = push.pushMessageToList(taskId, lstTargets);
//结果返回
String result = ret.getResponse().get("result").toString().trim();
if (!StringUtils.isBlank(result)) {
result = result.trim();
log.info("push app result:"+result);
if (result.equalsIgnoreCase("ok")) {
code="1";
log.info("推送成功,taskId:" + taskId);
} else {
code="0";
log.info("推送失败,个推返回结果:" + result + ", taskId:" + taskId);
}
}
}catch(Exception ce){
ce.printStackTrace();
log.error("push app error",ce);
code="0";
}
return code;
}
/**
* 透传消息模板设置
*
* @param sAppId 应用id
* @param sAppKey 应用密钥
* @param context 透传消息
* @param mobileType 设备类型
* @return 透传消息模板对象
*/
public static TransmissionTemplate transmissionTemplate(String sAppId, String sAppKey,String msgContext,String msgTitle,String msgType,String msgTime) {
// 透传消息模板对象
TransmissionTemplate transmissionTemplate = new TransmissionTemplate();
transmissionTemplate.setAppId(sAppId);
transmissionTemplate.setAppkey(sAppKey);
//收到消息是否立即启动应用,1为立即启动,2则广播等待客户端自启动
transmissionTemplate.setTransmissionType(2);
//应是组装后的JSON字符串
PushBean push=new PushBean();
push.setTitle(msgTitle);
if(StringUtils.isNotBlank(msgContext) && msgContext.length() > 400) {
msgContext=msgContext.replaceAll("\\s*", "").substring(0, 400) + "...";
}
push.setContent(msgContext);
push.setMsgType(msgType);
push.setMsgTime(msgTime);
String json=JSONObject.toJSONString(push);
transmissionTemplate.setTransmissionContent(json);
// 返回透传模板对象
return transmissionTemplate;
}
/**
* IOS的强提醒消息模板
* @param transmissionTemplate
* @param sAppId
* @param sAppKey
* @param msgTitle
* @param msgContext
* @param msgType
* @param msgTime
* @return
*/
public static TransmissionTemplate iosTransmissionTemplate(String sAppId, String sAppKey,String msgTitle, String msgContext, String msgType,String msgTime) {
// 透传消息模板对象
TransmissionTemplate transmissionTemplate = new TransmissionTemplate();
transmissionTemplate.setAppId(sAppId);
transmissionTemplate.setAppkey(sAppKey);
//收到消息是否立即启动应用,1为立即启动,2则广播等待客户端自启动
transmissionTemplate.setTransmissionType(2);
//应是组装后的JSON字符串
PushBean push=new PushBean();
push.setTitle(msgTitle);
if(StringUtils.isNotBlank(msgContext) && msgContext.length() > 400) {
msgContext=msgContext.replaceAll("\\s*", "").substring(0, 400) + "...";
}
push.setContent(msgContext);
push.setMsgType(msgType);
push.setMsgTime(msgTime);
String json=JSONObject.toJSONString(push);
//应是组装后的JSON字符串
transmissionTemplate.setTransmissionContent(json);
APNPayload payload = new APNPayload();
payload.setBadge(1);
payload.setContentAvailable(1);
payload.setSound("default");
//payload.setCategory("$由客户端定义");
//简单模式APNPayload.SimpleMsg
payload.setAlertMsg(new APNPayload.SimpleAlertMsg(msgContext));
transmissionTemplate.setAPNInfo(payload);
return transmissionTemplate;
}
}
启动线程的主类
public class SunlineCfgServer {
private static Logger logger = Logger.getLogger(SunlineCfgServer.class);
public static void main(String[] args) {
ApplicationContext ctx = new ClassPathXmlApplicationContext("applicationContext.xml");
logger.info("Spring container of UserHttpServer init finished...");
DefaultConfig defaultConfig = (DefaultConfig) ctx.getBean("defaultConfig");
DeviceInfoDao deviceInfoDao = (DeviceInfoDao) ctx.getBean("deviceInfoDaoImpl");
BlockingQueue<PushAppBean> queue = new LinkedBlockingQueue<PushAppBean>(ConstantsConf.QUE_LEN);
MsgProducerThread msgProducer = new MsgProducerThread(queue,deviceInfoDao);
MsgConsumerThread msgConsumer=new MsgConsumerThread(queue,deviceInfoDao,defaultConfig);
// 借助Executors
ExecutorService service = Executors.newCachedThreadPool();
// 启动线程
service.execute(msgProducer);
service.execute(msgConsumer);
}
}
相关推荐
生产者消费者模式在 Redis 中主要通过 List 数据类型来实现,如使用 `RPush` 命令将消息推入列表尾部,作为生产行为;而 `BLPop` 或 `BRPop` 命令则可以在列表为空时阻塞等待,直到有新消息到来,这是消费者的消费...
### 多个生产者消费者问题 #### 知识点概览 1. **生产者-消费者模式** 2. **多线程同步机制** 3. **Java中的线程控制** 4. **Vector类与同步** 5. **异常处理** #### 生产者-消费者模式 在计算机科学中,“生产者-...
1. Push模式:消费者订阅主题后,RocketMQ服务器主动将消息推送到消费者,消费者只需处理接收到的消息。 2. Pull模式:消费者主动从RocketMQ服务器拉取消息,可根据需要控制消费速度。 四、RocketMQ消费模型实现 1....
7. **队列与主题**:在JMS中,队列是一对一的消息传递模式,每个消息只被一个消费者接收;而主题是一对多,消息可以被多个订阅者接收。在这个项目中,我们可能同时看到队列和主题的使用。 8. **连接配置**:为了...
- RocketMQ支持多种消息模型,如Push模式(消费者主动拉取)和Pull模式(服务器推送),本示例采用的是Push模式。 - 为了保证消息的顺序性,可以使用消息键(MessageKey)进行消息分组。 这个压缩包文件中可能...
在推模式中,生产者节点(Pusher)将消息连续不断地推送到一个队列,而不关心是否有消费者正在接收。这种方式允许生产者快速地发送数据,无需等待确认,提高了系统的吞吐量。在NetMQ中,`Pusher`类用于实现这一功能...
推送者负责生产和发送消息,而拉取者负责消费这些消息。这种模式适合于任务调度或者数据流处理,其中拉取者可以按需控制消息的接收速度,避免消息积压。 - **拉模式(Pub-Sub)**:不同于推模式,拉模式下有“发布...
Push模式下,Broker根据消费者的消费进度推送给消费者。 RocketMQ支持多种消息模型,如简单模型、集群模型和广播模型。简单模型中,一个消费者实例只能消费一个队列的消息;集群模型下,一组消费者实例共同消费一个...
在当前数字化的时代,以消费者为中心构建食品品牌电商营销模式已经成为企业成功的关键。这一模式强调了对消费者需求的深度理解、个性化服务以及通过电商平台提供卓越的购物体验。以下将详细阐述这个主题中的关键知识...
例如,平台上的农户直供模式,可以让消费者直接了解农产品的生产过程,增强了购买的信任感,从而提升购买意愿。 再者,文化距离主要体现在消费者的价值观、生活习惯与产品产地文化的匹配程度。不同的消费者群体可能...
你可以根据项目需求调整生产者和消费者的配置,如设置发送模式、消息重试策略、消费策略等。同时,深入学习RocketMQ的其他特性,如事务消息、延迟消息、分布式事务、消息过滤等,将有助于构建更高效、可靠的分布式...
Kafka 作为一个消息队列,允许生产者发送消息到主题(Topic),然后消费者可以从这些主题中消费消息。 5. **Confluent.Kafka**:Confluent.Kafka 是一个高性能的 .NET 客户端,用于与 Kafka 交互。它提供了生产者和...
在JavaScript中,观察者模式也被称为发布者-订阅者模式,其中包含两个主要角色:发布者(Publisher)和订阅者(Subscriber)。 1. 发布者(Publisher): 发布者是状态发生变化的对象,当它的状态改变时,会通知...
在Queue中,消息被一个生产者发送,然后由一个或多个消费者接收,但每个消息仅被一个消费者消费。如果有多于一个的消费者,只有第一个连接到Queue的消费者能接收到消息,这种行为被称为“竞争消费”。 在提供的代码...
这种模式跳过了传统的零售商环节,让品牌直接与消费者建立联系,从而在产品设计、营销策略、客户服务等多个方面实现了深度的变革。 1. **增强品牌控制力** DTC模式使得品牌能够完全掌握产品从生产到销售的全过程...
RocketMQ 是一款高性能、...通过合理配置生产者、消费者、主题、消息队列、标签以及选择合适的消费模式,我们可以优化系统的性能和稳定性。同时,利用名称服务和 Broker,我们可以构建出灵活、可扩展的消息传递网络。
观察者模式: 让多个观察者对象同时监听某一个 观察者模式 观察者模式定义了一种一队多的依赖关系, 主题对象。这个主题对象在状态上发生变化时,会通知所有观察者对象,使他们能够自动更 新自己。 20、STATE —...
根据消费模式,可以选择Push Consumer(消息推送)或Pull Consumer(消息拉取)。Push Consumer会自动从RocketMQ服务器接收消息,而Pull Consumer需要主动请求消息。 ```java DefaultMQPushConsumer consumer = ...
通过阅读和理解这两个Java文件,你可以深入了解到如何在实际应用中使用阿里RocketMQ进行消息通信,包括生产者如何发送消息,消费者如何订阅和消费消息,以及如何配置不同的消费模式。同时,这也是一个很好的学习示例...
在这种模式下,每个消息只有一个消费者,消息发送者将消息放入队列,消费者从队列中取出并消费消息。队列中的一条消息在被一个消费者处理后会被自动删除,因此,同一消息不会被多个消费者处理。在SpringBoot中配置和...