`
taiwei.peng
  • 浏览: 232589 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

生产者消费者模式个推

阅读更多

package com.sunline.sysconf.thread;

 

import java.util.ArrayList;

import java.util.Date;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Map.Entry;

import java.util.concurrent.BlockingQueue;

 

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

 

import com.sunline.common.config.DefaultConfig;

import com.sunline.persist.po.DeviceInfo;

import com.sunline.persist.po.MsgSendRecord;

import com.sunline.sysconf.dao.DeviceInfoDao;

import com.sunline.sysconf.util.AppMsgUtil;

import com.sunline.sysconf.util.ConstantsConf;

import com.sunline.sysconf.vo.PushAppBean;

 

/**

 * 消息消费线程

 * @author David

 *

 */

public class MsgConsumerThread implements Runnable{

 

private static Logger log = LoggerFactory.getLogger(MsgConsumerThread.class);

 

private volatile boolean  isRunning= true;

 

private DefaultConfig defaultConfig;

 

private DeviceInfoDao deviceInfoDao;

 

private BlockingQueue<PushAppBean> queue;

 

public MsgConsumerThread(BlockingQueue<PushAppBean> queue,DeviceInfoDao deviceInfoDao,DefaultConfig defaultConfig) {

        this.queue = queue;

        this.deviceInfoDao=deviceInfoDao;

        this.defaultConfig=defaultConfig;

    }

 

@Override

public void run() {

String sAppId=defaultConfig.getVal("app.appId");

String sAppKey=defaultConfig.getVal("app.appkey");

String sAppHost=defaultConfig.getVal("app.host");

String sAppMaster=defaultConfig.getVal("app.master");

        try {

            while (isRunning) {

            log.info("MsgConsumerThread start!"); 

            PushAppBean  pushBean = queue.poll();

                if (null != pushBean) {

  log.info("queue poll msgId:"+pushBean.getMsgId()+",queue size:"+queue.size());

                   //查询所有设备

                   List<DeviceInfo> deviceInfoList=deviceInfoDao.findDeviceTokenAll(pushBean.getDeptId());

                   int size=deviceInfoList.size();

                   if(size>0){

                  Map<String,List<String>> deviceMap=new HashMap<String,List<String>>();

                  for(DeviceInfo device:deviceInfoList){

                  String deviceType=device.getDeviceType();

                  List<String> deviceTokenList=deviceMap.get(deviceType);

                  if(null==deviceTokenList){

                  deviceTokenList=new ArrayList<String>();

                  deviceMap.put(deviceType, deviceTokenList);

                  }

                  deviceTokenList.add(device.getDeviceToken());

                  }

                  int num=deviceMap.size();

                  int record=0;

                  for(Entry<String,List<String>> entry:deviceMap.entrySet()){

                  //0 安卓系统 1 IOS系统 

                  String key=entry.getKey();

                  List<String> tokenIds=entry.getValue();

                  int type="ios".equals(key)?1:0;

                  String code=AppMsgUtil.pushMessage(sAppId, sAppKey, sAppHost, sAppMaster,pushBean, type, tokenIds);

                  log.info("send code:"+code);

                  record+=Integer.parseInt(code);

                  }

                       if(record==num){

                      MsgSendRecord sendRecord=new MsgSendRecord();

                      sendRecord.setMsgId(pushBean.getMsgId());

                      sendRecord.setCreateTime(new Date());

                      deviceInfoDao.saveMsgSendRecord(sendRecord);

                       }

                   }

                }

                Thread.sleep(ConstantsConf.THREAD_SLEEP);

                log.info("MsgConsumerThread end!"); 

            }

        } catch (Exception ce) {

            ce.printStackTrace();

            log.error("Consumer msg error",ce);

        }

}

 

}

 

 

package com.sunline.sysconf.thread;

 

import java.util.List;

import java.util.concurrent.BlockingQueue;

 

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

 

import com.sunline.sysconf.dao.DeviceInfoDao;

import com.sunline.sysconf.util.ConstantsConf;

import com.sunline.sysconf.vo.MsgInfoResVo;

import com.sunline.sysconf.vo.PushAppBean;

 

/**

 * 消息生产线程

 * @author David

 *

 */

public class MsgProducerThread implements Runnable{

 

private static Logger log = LoggerFactory.getLogger(MsgProducerThread.class);

 

private BlockingQueue<PushAppBean> queue;

 

private volatile boolean  isRunning= true;

 

private DeviceInfoDao deviceInfoDao;

 

public MsgProducerThread(BlockingQueue<PushAppBean> queue,DeviceInfoDao deviceInfoDao){

this.queue = queue;

this.deviceInfoDao=deviceInfoDao;

}

 

@Override

public void run() {

try {

  while (isRunning) {

  log.info("MsgProducerThread start!"); 

  //查询需要发送的消息列表

  List<MsgInfoResVo> msgList=deviceInfoDao.findMsgInfo();

  int size=msgList.size();

  if(size>0){

 for(MsgInfoResVo msgVo:msgList){

  PushAppBean appBean=new PushAppBean();

  appBean.setTitle(msgVo.getMsgTitle());

  appBean.setContent(msgVo.getMsgContent());

  appBean.setMsgType(msgVo.getMsgType());

  appBean.setMsgTime(msgVo.getMsgTime());

  appBean.setMsgId(msgVo.getMsgId());

  appBean.setDeptId(msgVo.getDeptId());

  queue.offer(appBean);

  log.info("queue offer msgId:"+msgVo.getMsgId()+",queue size:"+queue.size());

  }

 Thread.sleep((size+2)*ConstantsConf.THREAD_SLEEP);

  }else{

 Thread.sleep(ConstantsConf.THREAD_SLEEP);

  }

  log.info("MsgProducerThread end!"); 

}

} catch (Exception ce) {

  log.error("Producer msg error",ce); 

  ce.printStackTrace();

}

}

 

public void stop() {

        isRunning = false;

    }

 

}

 

package com.sunline.sysconf.util;

 

import java.util.ArrayList;

import java.util.List;

 

import org.apache.commons.lang.StringUtils;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

 

import com.alibaba.fastjson.JSONObject;

import com.gexin.rp.sdk.base.IPushResult;

import com.gexin.rp.sdk.base.impl.ListMessage;

import com.gexin.rp.sdk.base.impl.Target;

import com.gexin.rp.sdk.http.IGtPush;

import com.gexin.rp.sdk.template.TransmissionTemplate;

import com.sunline.sysconf.vo.PushAppBean;

import com.sunline.sysconf.vo.PushBean;

 

package com.sunline.sysconf.util;

 

import java.util.ArrayList;

import java.util.List;

 

import org.apache.commons.lang.StringUtils;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

 

import com.alibaba.fastjson.JSONObject;

import com.gexin.rp.sdk.base.IPushResult;

import com.gexin.rp.sdk.base.impl.ListMessage;

import com.gexin.rp.sdk.base.impl.Target;

import com.gexin.rp.sdk.base.payload.APNPayload;

import com.gexin.rp.sdk.http.IGtPush;

import com.gexin.rp.sdk.template.TransmissionTemplate;

import com.sunline.sysconf.vo.PushAppBean;

import com.sunline.sysconf.vo.PushBean;

 

public class AppMsgUtil {

 

private static Logger log = LoggerFactory.getLogger(AppMsgUtil.class);

 

/**

* 推送方法 

* @param sAppId

* @param sAppKey

* @param sAppHost

* @param sAppMaster

* @param pushBean

* @param osType

* @param tokenIds 0 安卓系统 1 IOS系统 

* @return

*/

public static String pushMessage(String sAppId, String sAppKey,String sAppHost, String sAppMaster,PushAppBean pushBean,int osType,List<String> tokenIds){

String code="";

try{

 String msgContext=pushBean.getContent();

 String msgTitle=pushBean.getTitle();

 String msgType=pushBean.getMsgType();

 String msgTime=pushBean.getMsgTime();

 //个推参数连接设置

 IGtPush push = new IGtPush(sAppHost, sAppKey, sAppMaster);

 //创建连接

 push.connect();

 //多个设备列表信息推送对象

 ListMessage listMessage = new ListMessage();

 //设备列表

 List<Target> lstTargets = new ArrayList<Target>();

 //透传消息模板

 TransmissionTemplate emp=null;

 if(0==osType){

 emp=transmissionTemplate(sAppId, sAppKey, msgContext,msgTitle,msgType,msgTime);

 }else{

 emp=iosTransmissionTemplate(sAppId, sAppKey, msgTitle, msgContext, msgType, msgTime);

 }

 for (String sTokenId : tokenIds) {

Target target = new Target();

target.setAppId(sAppId);

target.setClientId(sTokenId);

lstTargets.add(target);

 }

 //透传消息模板

 listMessage.setData(emp);

 //设置消息离线,并设置离线时间

 listMessage.setOffline(true);

     //离线有效时间,单位为毫秒,可选

 listMessage.setOfflineExpireTime(24 * 1000 * 3600);

 // 计算taskId

 String taskId = push.getContentId(listMessage);

 //发送并获取返回的结果对象

 IPushResult ret = push.pushMessageToList(taskId, lstTargets);

 //结果返回

 String result = ret.getResponse().get("result").toString().trim();

 if (!StringUtils.isBlank(result)) {

result = result.trim();

log.info("push app result:"+result);

if (result.equalsIgnoreCase("ok")) {

code="1";

log.info("推送成功,taskId:" + taskId);

} else {

code="0";

log.info("推送失败,个推返回结果:" + result + ", taskId:" + taskId);

}

}

}catch(Exception ce){

ce.printStackTrace();

log.error("push app error",ce);

code="0";

}

   return code;

}

 

/**

* 透传消息模板设置

* @param sAppId  应用id

* @param sAppKey 应用密钥

* @param context 透传消息

* @param mobileType 设备类型

* @return 透传消息模板对象

*/

public static TransmissionTemplate transmissionTemplate(String sAppId, String sAppKey,String msgContext,String msgTitle,String msgType,String msgTime) {

// 透传消息模板对象

TransmissionTemplate transmissionTemplate = new TransmissionTemplate();

transmissionTemplate.setAppId(sAppId);

transmissionTemplate.setAppkey(sAppKey);

//收到消息是否立即启动应用,1为立即启动,2则广播等待客户端自启动

transmissionTemplate.setTransmissionType(2);

//应是组装后的JSON字符串

PushBean push=new PushBean();

push.setTitle(msgTitle);

if(StringUtils.isNotBlank(msgContext) && msgContext.length() > 400) {

msgContext=msgContext.replaceAll("\\s*", "").substring(0, 400) + "...";

}

push.setContent(msgContext);

push.setMsgType(msgType);

push.setMsgTime(msgTime);

String json=JSONObject.toJSONString(push);

transmissionTemplate.setTransmissionContent(json);

// 返回透传模板对象

return transmissionTemplate;

}

  

/**

* IOS的强提醒消息模板

* @param transmissionTemplate

* @param sAppId

* @param sAppKey

* @param msgTitle

* @param msgContext

* @param msgType

* @param msgTime

* @return

*/

public static TransmissionTemplate iosTransmissionTemplate(String sAppId, String sAppKey,String msgTitle, String msgContext, String msgType,String msgTime) {

// 透传消息模板对象

TransmissionTemplate transmissionTemplate = new TransmissionTemplate();

transmissionTemplate.setAppId(sAppId);

transmissionTemplate.setAppkey(sAppKey);

//收到消息是否立即启动应用,1为立即启动,2则广播等待客户端自启动

transmissionTemplate.setTransmissionType(2); 

//应是组装后的JSON字符串

PushBean push=new PushBean();

push.setTitle(msgTitle);

if(StringUtils.isNotBlank(msgContext) && msgContext.length() > 400) {

msgContext=msgContext.replaceAll("\\s*", "").substring(0, 400) + "...";

}

push.setContent(msgContext);

push.setMsgType(msgType);

push.setMsgTime(msgTime);

String json=JSONObject.toJSONString(push);

//应是组装后的JSON字符串

transmissionTemplate.setTransmissionContent(json); 

APNPayload payload = new APNPayload();

payload.setBadge(1);

   payload.setContentAvailable(1);

   payload.setSound("default");

   //payload.setCategory("$由客户端定义"); 

   //简单模式APNPayload.SimpleMsg 

   payload.setAlertMsg(new APNPayload.SimpleAlertMsg(msgContext));

   transmissionTemplate.setAPNInfo(payload);

return transmissionTemplate; 

}

 

}

 

启动线程的主类

public class SunlineCfgServer {

 

private static Logger logger = Logger.getLogger(SunlineCfgServer.class);

 

public static void main(String[] args) {

 

ApplicationContext ctx = new ClassPathXmlApplicationContext("applicationContext.xml");

logger.info("Spring container of UserHttpServer init finished...");

DefaultConfig defaultConfig = (DefaultConfig) ctx.getBean("defaultConfig");

DeviceInfoDao deviceInfoDao = (DeviceInfoDao) ctx.getBean("deviceInfoDaoImpl");

BlockingQueue<PushAppBean> queue = new LinkedBlockingQueue<PushAppBean>(ConstantsConf.QUE_LEN);

MsgProducerThread msgProducer = new MsgProducerThread(queue,deviceInfoDao);

MsgConsumerThread msgConsumer=new MsgConsumerThread(queue,deviceInfoDao,defaultConfig);

// 借助Executors

 ExecutorService service = Executors.newCachedThreadPool();

 // 启动线程

 service.execute(msgProducer);

service.execute(msgConsumer);

}

}

分享到:
评论

相关推荐

    redis 老版本生产者消费者模式

    生产者消费者模式在 Redis 中主要通过 List 数据类型来实现,如使用 `RPush` 命令将消息推入列表尾部,作为生产行为;而 `BLPop` 或 `BRPop` 命令则可以在列表为空时阻塞等待,直到有新消息到来,这是消费者的消费...

    多个生产者消费者问题

    ### 多个生产者消费者问题 #### 知识点概览 1. **生产者-消费者模式** 2. **多线程同步机制** 3. **Java中的线程控制** 4. **Vector类与同步** 5. **异常处理** #### 生产者-消费者模式 在计算机科学中,“生产者-...

    RocketMQ生产消费者模型实现

    1. Push模式:消费者订阅主题后,RocketMQ服务器主动将消息推送到消费者,消费者只需处理接收到的消息。 2. Pull模式:消费者主动从RocketMQ服务器拉取消息,可根据需要控制消费速度。 四、RocketMQ消费模型实现 1....

    ActiveMQ集群及生产者和消费者Java代码.zip

    7. **队列与主题**:在JMS中,队列是一对一的消息传递模式,每个消息只被一个消费者接收;而主题是一对多,消息可以被多个订阅者接收。在这个项目中,我们可能同时看到队列和主题的使用。 8. **连接配置**:为了...

    RocketMQ生产者和消费者Java代码示例.zip

    - RocketMQ支持多种消息模型,如Push模式(消费者主动拉取)和Pull模式(服务器推送),本示例采用的是Push模式。 - 为了保证消息的顺序性,可以使用消息键(MessageKey)进行消息分组。 这个压缩包文件中可能...

    NetMQ例子,推拉模式

    在推模式中,生产者节点(Pusher)将消息连续不断地推送到一个队列,而不关心是否有消费者正在接收。这种方式允许生产者快速地发送数据,无需等待确认,提高了系统的吞吐量。在NetMQ中,`Pusher`类用于实现这一功能...

    NetMQ例子,多线程模式,推拉模式等

    推送者负责生产和发送消息,而拉取者负责消费这些消息。这种模式适合于任务调度或者数据流处理,其中拉取者可以按需控制消息的接收速度,避免消息积压。 - **拉模式(Pub-Sub)**:不同于推模式,拉模式下有“发布...

    消息中间件 RocketMQ 开发指南

    Push模式下,Broker根据消费者的消费进度推送给消费者。 RocketMQ支持多种消息模型,如简单模型、集群模型和广播模型。简单模型中,一个消费者实例只能消费一个队列的消息;集群模型下,一组消费者实例共同消费一个...

    以消费者为中心构建食品品牌电商营销模式.zip

    在当前数字化的时代,以消费者为中心构建食品品牌电商营销模式已经成为企业成功的关键。这一模式强调了对消费者需求的深度理解、个性化服务以及通过电商平台提供卓越的购物体验。以下将详细阐述这个主题中的关键知识...

    社会距离对生鲜农产品消费者购买意愿的影响研究——基于社交电商模式.zip

    例如,平台上的农户直供模式,可以让消费者直接了解农产品的生产过程,增强了购买的信任感,从而提升购买意愿。 再者,文化距离主要体现在消费者的价值观、生活习惯与产品产地文化的匹配程度。不同的消费者群体可能...

    .NET CORE 代码使用kafka推送数据

    Kafka 作为一个消息队列,允许生产者发送消息到主题(Topic),然后消费者可以从这些主题中消费消息。 5. **Confluent.Kafka**:Confluent.Kafka 是一个高性能的 .NET 客户端,用于与 Kafka 交互。它提供了生产者和...

    RecketMq生产和消费样例代码

    你可以根据项目需求调整生产者和消费者的配置,如设置发送模式、消息重试策略、消费策略等。同时,深入学习RocketMQ的其他特性,如事务消息、延迟消息、分布式事务、消息过滤等,将有助于构建更高效、可靠的分布式...

    观察者模式.pdf

    在JavaScript中,观察者模式也被称为发布者-订阅者模式,其中包含两个主要角色:发布者(Publisher)和订阅者(Subscriber)。 1. 发布者(Publisher): 发布者是状态发生变化的对象,当它的状态改变时,会通知...

    ActiveMQ的queue和topic两种模式的示例演示参照.pdf

    在Queue中,消息被一个生产者发送,然后由一个或多个消费者接收,但每个消息仅被一个消费者消费。如果有多于一个的消费者,只有第一个连接到Queue的消费者能接收到消息,这种行为被称为“竞争消费”。 在提供的代码...

    DTC模式如何引领消费品牌企业实现创新.zip

    这种模式跳过了传统的零售商环节,让品牌直接与消费者建立联系,从而在产品设计、营销策略、客户服务等多个方面实现了深度的变革。 1. **增强品牌控制力** DTC模式使得品牌能够完全掌握产品从生产到销售的全过程...

    RocketMQ核心概念1

    RocketMQ 是一款高性能、...通过合理配置生产者、消费者、主题、消息队列、标签以及选择合适的消费模式,我们可以优化系统的性能和稳定性。同时,利用名称服务和 Broker,我们可以构建出灵活、可扩展的消息传递网络。

    C#23种设计模式_示例源代码及PDF

    观察者模式: 让多个观察者对象同时监听某一个 观察者模式 观察者模式定义了一种一队多的依赖关系, 主题对象。这个主题对象在状态上发生变化时,会通知所有观察者对象,使他们能够自动更 新自己。 20、STATE —...

    【JAVA代码之RocketMQ生产和消费数据】

    根据消费模式,可以选择Push Consumer(消息推送)或Pull Consumer(消息拉取)。Push Consumer会自动从RocketMQ服务器接收消息,而Pull Consumer需要主动请求消息。 ```java DefaultMQPushConsumer consumer = ...

    my ali rocket mq学习demo

    通过阅读和理解这两个Java文件,你可以深入了解到如何在实际应用中使用阿里RocketMQ进行消息通信,包括生产者如何发送消息,消费者如何订阅和消费消息,以及如何配置不同的消费模式。同时,这也是一个很好的学习示例...

    ActiveMQ的处理模式:PTP与PUB/SUB

    在这种模式下,每个消息只有一个消费者,消息发送者将消息放入队列,消费者从队列中取出并消费消息。队列中的一条消息在被一个消费者处理后会被自动删除,因此,同一消息不会被多个消费者处理。在SpringBoot中配置和...

Global site tag (gtag.js) - Google Analytics