`
linpyi
  • 浏览: 62952 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

阻塞队列发送消息

    博客分类:
  • java
 
阅读更多

消息发送服务,需要发送短信的值需要往阻塞队列中put消息信息即可

 

SmsServer.putTask(new LendSms(map));

 

 

public class SmsServer {

    private static final Logger log = LoggerFactory.getLogger(ProTask.class);

    private static final ExecutorService executorService = Executors.newFixedThreadPool(5);

    private static final BlockingQueue taskQueue = new ArrayBlockingQueue(10000);

    private static WebApplicationContext webApplicationContext;

    private static Client client;

    public static void initSmsServer(WebApplicationContext webApplicationContext) {

        SmsServer.webApplicationContext = webApplicationContext;

        /**
         * 初始化短信服务
         * */
        String softwareSerialNo = PropertiesUtil.getValue("sms.softwareSerialNo").toString();
        String key = PropertiesUtil.getValue("sms.key").toString();
        client = SingletonSmsClient.INSTANCE.getClient(softwareSerialNo,key);

        /**
         * 开始任务
         * */
        new Thread(new Runnable() {
            @Override
            public void run() {
                for (;;) {
                    try {
                        List taskList = Lists.newArrayList();
                        taskList.add(SmsServer.taskQueue.take());
                        taskQueue.drainTo(taskList,19);
                        executorService.submit(new ProTask(SmsServer.webApplicationContext,taskList));
                    } catch (InterruptedException e) {
                        log.error("sms server error:",e);
                    }
                }
            }
        }).start();

     }


    public static void putMsg(SmsMessageEntity smsMessageEntity) throws InterruptedException {
        taskQueue.put(smsMessageEntity);
    }

    public static void putTask(AbstractMessagePro abstractMessagePro) throws InterruptedException {
        taskQueue.put(abstractMessagePro);
    }

    public static SmsMessageEntity sendMessage(SmsMessageEntity smsMessageEntity) throws RemoteException {
        int status = client.sendSMS(new String[]{smsMessageEntity.getMobile()},smsMessageEntity.getMessage(), "", 5);
        smsMessageEntity.setStatus(status==0?0:1);
        smsMessageEntity.setFirmStatus(status);
        return smsMessageEntity;
    }

 

消息抽象类

public abstract class AbstractMessagePro {

    protected WebApplicationContext webApplicationContext;

    protected Object object;

    protected abstract List<SmsMessageEntity> toMessages();

    protected void setWebApplicationContext(WebApplicationContext webApplicationContext){
        this.webApplicationContext = webApplicationContext;
    }

}

 

消息执行task

public class ProTask extends Thread {

    private static final Logger log = LoggerFactory.getLogger(ProTask.class);

    private WebApplicationContext webApplicationContext;

    private List taskList;

    public ProTask(WebApplicationContext webApplicationContext,List taskList){
        this.taskList = taskList;
        this.webApplicationContext = webApplicationContext;
    }

    @Override
    public void run(){
        ISmsMessageDao iSmsMessageDao = (ISmsMessageDao) webApplicationContext.getBean("iSmsMessageDao");
        List<SmsMessageEntity> taskResult = Lists.newArrayList();
        for (Object o:taskList) {
            if(o instanceof SmsMessageEntity) {
                SmsMessageEntity smsMessageEntity = (SmsMessageEntity)o;
                taskResult.add(smsMessageEntity);
            }else{
                AbstractMessagePro abstractMessagePro = (AbstractMessagePro)o;
                abstractMessagePro.setWebApplicationContext(webApplicationContext);
                List temp = abstractMessagePro.toMessages();
                taskResult.addAll(temp);
            }
        }

        List<SmsMessageEntity> results = Lists.newArrayListWithCapacity(taskResult.size());
        for (SmsMessageEntity smsMessageEntity:taskResult) {
            try {
                log.info("send message to {},content {}",new String[]{smsMessageEntity.getMobile(),smsMessageEntity.getMessage()});
                smsMessageEntity = SmsServer.sendMessage(smsMessageEntity);
            } catch (RemoteException e) {
                log.error("task error",e);
            }finally {
                smsMessageEntity.setSmsMessageNo(StringUtil.getUUID());
                results.add(smsMessageEntity);
            }
        }
        iSmsMessageDao.inserList(results);
    }
}

 

监听启动smsserver

public class SystemInitializedListener extends ContextLoaderListener {

    private static final Logger log = LoggerFactory.getLogger(SystemInitializedListener.class);

    @Override
    public void contextInitialized(ServletContextEvent event) {
        ServletContext servletContext = event.getServletContext();
        WebApplicationContext webApplicationContext = WebApplicationContextUtils.getRequiredWebApplicationContext(servletContext);
        SmsServer.initSmsServer(webApplicationContext);
    }

}

 

分享到:
评论

相关推荐

    并发-线程池和阻塞队列

    此外,在大数据处理、消息中间件等场景中,线程池和阻塞队列也起到了关键作用。 总之,理解和掌握线程池与阻塞队列的原理和使用方法,是提升Java并发编程能力的重要一步。它们为开发人员提供了一种强大而灵活的工具...

    java线程聊天室(阻塞队列实现)

    生产者线程(如用户发送消息的线程)将消息放入阻塞队列,而消费者线程(如处理和分发消息的线程)从队列中取出消息。通过这种方式,生产者和消费者可以异步工作,提高系统吞吐量。 登录机制是聊天室的重要组成部分...

    生产者/消费者模式 阻塞队列 LinkedBlockingQueue

    例如,在消息中间件中,生产者负责发布消息,消费者负责接收并处理消息,两者通过阻塞队列进行通信,提高了系统的并行性和解耦性。 了解并熟练运用生产者/消费者模式以及像LinkedBlockingQueue这样的阻塞队列,对于...

    阻塞队列(Blocking Queue)是一个支持两个附加操作的队列.txt

    阻塞队列可以实现消息的可靠传递,并且支持异步处理。发送者将消息放入队列,而接收者取出消息并处理。 了解和掌握阻塞队列的概念及其在实际编程中的应用,对于开发高效、安全的并发程序是非常关键的。使用阻塞队列...

    Java并发编程(21)并发新特性-阻塞队列和阻塞栈(含代

    在Java并发编程中,阻塞队列和阻塞栈是两个重要的并发数据结构,它们在多线程环境下的高效通信和资源管理中扮演着至关重要的角色。这些数据结构源自Java的并发包`java.util.concurrent`,是实现并发设计模式如生产者...

    xxjyjy5.rar_LINUX消息队列_linux 消息队列_linux 消息_linux 消息队列_消息队列

    msgget用于创建或打开一个消息队列并获取其标识符,msgsnd用于向消息队列发送消息,而msgrcv则用于从消息队列接收消息。消息队列中的消息通常包含一个类型标识,使得接收进程可以选择接收特定类型的消息。 三、消息...

    消息分发框架(基于JAVA阻塞队列实现、 生产者消费者模型)

    综上所述,"消息分发框架(基于JAVA阻塞队列实现、生产者消费者模型)"是一个关键的并发处理组件,通过Java提供的并发工具和设计模式,实现了高效、稳定的消息处理。在实际应用中,需要根据业务需求进行适当的性能...

    msg_creat.rar_LINUX消息队列_linux 消息队列_linux 消息队列_消息队列

    标题"msg_creat.rar_LINUX消息队列_linux 消息队列_linux 消息队列_消息队列"和描述"生成一个消息队列,同时向该消息队列发送一串信息"指向的核心概念是创建和使用Linux的消息队列。 在Linux中,消息队列由内核管理...

    实时接收发送消息(接收消息线程阻塞,发送消息线程唤醒)

    在标题和描述中提到的“实时接收发送消息(接收消息线程阻塞,发送消息线程唤醒)”是一个典型的并发编程问题,涉及到线程管理和通信。以下是对这个主题的详细讲解: 1. **线程与并发**: - 线程是程序执行的最小...

    3.1.7.阻塞队列、原子操作的原理分析1

    阻塞队列、原子操作的原理分析1】 在Java编程中,阻塞队列(Blocking Queue)是一种线程安全的数据结构,它在多线程环境中扮演着重要的角色,特别是在并发处理和解耦组件之间通信时。阻塞队列的核心特性在于它能够...

    消息队列,消息队列的使用场景,C,C++

    - FreeRTOS的消息队列具有容量限制,当队列满时,新消息将被丢弃或阻塞发送任务,直到有空间可用。 - 创建消息队列:使用`xQueueCreate()`函数创建一个消息队列,并指定队列长度和每个消息的大小。 - 发送消息:...

    linux使用消息队列实现进程间双向通信

    4. **消息过滤**:发送方可以选择向特定标识符的消息队列发送消息,接收方也可以通过消息类型来筛选接收的消息。 在我们的示例中,`msg_quene_test_server`和`msg_queue_test_client`两个程序展示了如何创建、发送...

    day19_阻塞队列、线程池、File类、递归.pdf

    本文主要讲解了Java中的阻塞队列、线程池以及File类的相关知识,并涉及到了递归的概念。阻塞队列是并发编程中的一种重要工具,它在多线程环境下的生产者-消费者模型中起到关键作用。线程池则是为了优化线程管理,...

    ActiveMQ的队列queue模式(事务、应答、转发模式、阻塞消息)

    本文将深入探讨ActiveMQ中的队列(Queue)模式,包括事务、应答、转发以及MessageConsumer的receive阻塞消息处理方式。 ### 1. ActiveMQ队列(Queue)模式 在ActiveMQ中,队列是一种点对点的消息传递模型,每个...

    并发队列ConcurrentLinkedQueue和阻塞队列LinkedBlockingQueue用法

    ### 并发队列 ConcurrentLinkedQueue 和阻塞队列 LinkedBlockingQueue 用法详解 #### 一、并发队列 ConcurrentLinkedQueue 概述 `ConcurrentLinkedQueue` 是 Java 并发包 `java.util.concurrent` 提供的一个高性能...

    ActiveMQ学习笔记之九--发送消息到队列中

    这篇"ActiveMQ学习笔记之九--发送消息到队列中"主要探讨的是如何通过编程方式向ActiveMQ队列发送消息,这对于理解和应用消息中间件至关重要。 首先,我们要理解ActiveMQ中的队列(Queue)概念。队列是一种先进先出...

    易语言消息队列

    发送消息(消息队列句柄, 消息ID, 消息参数1, 消息参数2) // 发送消息到队列 .整数型 取得状态, 消息ID, 消息参数1, 消息参数2 获取消息(取得状态, 消息队列句柄, 消息ID, 消息参数1, 消息参数2) // 获取队列中的...

    消息队列——message

    在Linux系统中,消息队列是一种可靠的存储数据的方式,它允许进程将数据结构作为消息发送,并在合适的时候由其他进程接收。这种通信方式比传统的管道、共享内存或者信号量更灵活,因为它提供了数据的结构化存储和...

    抢占式OS消息队列例程

    3. **消息发送**:任务可以向消息队列发送消息,如果队列未满,消息会被存储;如果队列已满,发送操作可能被阻塞,直到队列有空间为止,或者根据配置采用丢弃策略。 4. **消息接收**:任务从消息队列接收消息,遵循...

Global site tag (gtag.js) - Google Analytics