0 0

生产者生产了短信,2个小时后消费者才将短信发送出去,问题出在了哪里20

才发现这里有个问答模块

转发一下

整个算法大致是:
多个线程同时生产短信,放到队列中(存在优先级,且先进先出),
一个消费线程从队列中拿出短信发送出去

目前出现了一个现象
生产者生产了短信,2个小时后消费者才将短信发送出去

而这2个小时的时间里面,又生产的其他短信都比较正常,都能马上发送出去,就这个停在队列里面,像是放假了不起床

并且程序运行了快半年了都没有出现这个毛病


生产者方面

public void sendSms(String xml) throws RemoteException {
		log.info("Rmi接口接收到了xml"+xml);
		MessageThread messageThread = Factory.getMessageThread();
		messageThread.setXml(xml);
		VerifyResult verifyResult = messageThread.verifyXml();
		if (verifyResult.isValid()||verifyResult.equals(VerifyResult.XMLBEAN_HAS_DUPLE_PHONE)) {
			log.info("过滤成功,开始放入发送队列");
			new Thread(messageThread).start();
		} else {
			log.error("短信过滤掉了,原因:"+verifyResult.getMessage());
		}
}



MessageThread run 方法

		
public void run() {
	if (verified) {
		ShortMessage shortMessage = xmlBean.convert2SM();
		shortMessage.setServiceNo(shortMsgServiceNo.getServiceNo(xmlBean));
		if (log4j.isDebugEnabled()) {
			log4j.debug("shortMessage: 开始放入队列:" + shortMessage);
		}
		synchronized (queue) {
			queue.push(shortMessage);
			queue.notifyAll();
		}
	} else {
		throw new ServiceException("请先检验在执行,当前没有检验");
	}
}




消费者


		
public void run() {
	stop=false;
	if(log.isDebugEnabled()){
		log.debug("SmsSendManager start run");
	}
	while (true){			
		ShortMessage sm =null;
		synchronized (queue) {
			while (queue.isEmpty()&&!stop) {
				try {
					queue.wait();
				} catch (InterruptedException e) {
					e.printStackTrace();
					log.error(e);
					System.exit(0);
				}
			}
			sm = queue.pop();
		}
		if(stop){
			return;
		}
		if (sm == null || !sm.isFull()) {
			log.info("ShortMessage is null or is not valid");
			continue;
		}
		log.info("will sender ShortMessage:"+sm);
		sendShortMessage(sm);
	}
}


队列结构:
public class SmsQueue {
	
	private LinkedList<ShortMessage>[] queuePool;
	
	@SuppressWarnings("unchecked")
	public SmsQueue(int poolSize){
		if(poolSize<=0||poolSize>20)
			throw new ServiceException("初始化队列出错 应该 在0-20 而实际是"+poolSize);
		queuePool = new LinkedList[poolSize];
		for(int i=0;i<poolSize;i++){
			queuePool[i] = new LinkedList<ShortMessage>();
		}
	}
	
	/**
	 * 把短信放到队列尾
	 * @param shortMessage 短信 {@link #addFirst(ShortMessage)}
	 */
	public void push(ShortMessage shortMessage){
		if(shortMessage!=null&&shortMessage.isFull()){
			int index = shortMessage.getPriority();			
			synchronized (queuePool) {
				if (index <= 0 || index >= queuePool.length) {
					queuePool[0].addLast(shortMessage);
				} else {
					queuePool[index].addLast(shortMessage);
				}
			}			
		}
	}
	
	/**
	 * 从队列头拿出短信,如果整个队列为空,返回null
	 * @return
	 */
	public ShortMessage pop(){
		synchronized (queuePool) {
			for (int i = queuePool.length; i > 0; i--) {
				if (!queuePool[i - 1].isEmpty()) {
					return queuePool[i - 1].removeFirst();
				}
			}
		}		
		return null;
	}
	/**
	 * 获得队列共有多少优先级
	 * @return
	 */
	public int getQueueWidth(){
		synchronized(queuePool){
			return queuePool.length;
		}
	}
	/**
	 * 获得当前队列状态 没有返回空 list
	 * @return
	 */
	public List<QueueStatus> getStatus(){
		List<QueueStatus> retList = new ArrayList<QueueStatus>();
		int i=0;		
		synchronized (queuePool) {
			for (LinkedList<ShortMessage> queue : queuePool) {
				QueueStatus status = new QueueStatus();
				status.setPrority(++i);
				status.setSize(queue.size());
				retList.add(status);
			}
		}		
		return retList;
	}
	
	/**
	 * 获得所有没有发送出去的对象,发回的list 中对象引用不能修改
	 * 由于对象已经在内存中 所以不考虑分页
	 * @return
	 */
	public List<ShortMessage> getAllShortMessage(){
		List<ShortMessage> retList = new ArrayList<ShortMessage>();
		synchronized (queuePool) {
			for (LinkedList<ShortMessage> queue : queuePool) {
				for (ShortMessage shortMessage : queue) {
					retList.add(shortMessage);
				}
			}
		}
		return Collections.unmodifiableList(retList);
	}
	
	/**
	 * @return id ==null 或者没有找到 返回 false
	 * @param id
	 * @return
	 */
	public boolean removeSMById(String id){
		if(id==null)
			return false;
		synchronized (queuePool) {
			for (LinkedList<ShortMessage> queue : queuePool) {
				for (ShortMessage shortMessage : queue) {
					if(shortMessage.getId().equals(id)){
						queue.remove(shortMessage);
						return true;
					}
				}
			}
			return false;
		}
	}
	
	/**
	 * 对列中共有多少短信
	 * @return
	 */
	public int totalShortMessageNum(){		
		int size=0;
		synchronized(queuePool){
			for(LinkedList list:queuePool){
				if(!list.isEmpty()){
					size+=list.size();
				}
			}
			return size;
		}
	}

	/**
	 * 队列是否为空
	 * @return
	 */
	public boolean isEmpty() {
		synchronized (queuePool) {
			for (LinkedList list : queuePool) {
				if (!list.isEmpty()) {
					return false;
				}
			}
		}		
		return true;
	}
	
	/**
	 * 清空队列
	 *
	 */
	public void clear(){
		synchronized (queuePool) {
			for (LinkedList list : queuePool) {
				list.clear();
			}
		}		
	}
	
	/**
	 * 把短信放到队列头 (发送短信失败的时候使用)
	 * @param sm
	 */
	public void addFirst(ShortMessage sm) {
		synchronized (queuePool) {
			queuePool[queuePool.length - 1].addFirst(sm);
		}				
	}
}



问题补充:
你在造轮子啊。。。。。

为什么说我在造轮子呢,有现成的东西可以使用么


========================
按照你的描述 问题在生产者和消费者的可能不太 看看发送模块是否有异样日志

正式环境下面消费者(发送短信)消费一次需要几秒甚至一分多

而我在测试环境下面消费者使用了mock
发送了几万条一点问题都没有

不知道什么原因

另外生产环境下消费者是其他厂商提供的api
2008年5月13日 17:12

2个答案 按时间排序 按投票排序

0 0

你在造轮子啊。。。。。

2008年5月23日 17:15
0 0


按照你的描述  问题在生产者和消费者的可能不太  看看发送模块是否有异样日志

2008年5月23日 12:12

相关推荐

    生产者消费者问题

    生产者消费者问题,C++。生产者-消费者(producer-consumer)问题,也称作有界缓冲区(bounded-buffer)问题,两个进程共享一个公共的固定大小的缓冲区。其中一个是生产者,用于将消息放入缓冲区;另外一个是消费者...

    pv操作解决生产者与消费者问题

    "pv操作解决生产者与消费者问题" pv操作是解决生产者与消费者问题的常用方法之一。在本例子中,我们使用C语言来实现pv操作,以解决生产者与消费者问题。 首先,让我们了解什么是生产者与消费者问题。生产者与消费...

    生产者消费者问题c++实现

    生产者消费者问题是多线程编程中的一个经典案例,它展示了如何通过线程间的协作来解决资源的同步和异步操作。在C++中,我们可以利用标准库中的互斥量(mutex)、条件变量(condition_variable)等工具来实现这个问题...

    生产者消费者的c++代码实现

    生产者消费者问题是一个经典的进程同步问题,该问题最早由 Dijkstra 提出,用以演示他提出的信号量机制。在同一个进程地址空间内执行的两个线程。生产者线程生产物品,然后将物品放置在一个空缓冲区中供消费者线程...

    生产者-消费者问题的模拟实现(课设含源代码).doc

    生产者-消费者问题是操作系统中经典的进程同步问题,它模拟了实际生产环境中的资源分配与消耗。在这个问题中,生产者进程负责生成数据并放入有限大小的缓冲区,而消费者进程则从缓冲区取出数据进行消费。为了确保...

    Win丨linux丨操作系统实验二:生产者——消费者问题

    操作系统实验二:生产者——消费者问题 1. 在Windows操作系统上,利用Win32 API提供的信号量机制,编写应用程序实现生产者——消费者问题。 2. 在Linux操作系统上,利用Pthread API提供的信号量机制,编写应用程序...

    操作系统课程设计生产者和消费者问题源代码

    在Windows2000环境下,创建一个控制台进程,在此进程中创建n个线程来模拟生产者或者消费者。这些线程的信息由本程序定义的“测试用例文件”中予以指定。 该文件的格式和含义如下: 3 1 P 3 2 P 4 3 C 4 1 4 P 2 5 C ...

    生产者消费者问题C#

    生产者消费者问题是多线程编程中的一个经典案例,它展示了如何通过线程间的协作来解决资源的并发访问问题。在C#中,我们可以利用System.Threading命名空间提供的工具来实现这一模型。下面将详细阐述这个问题的背景、...

    操作系统实验,生产者与消费者问题

    实验四、生产者消费者问题(15分) • 一个大小为3的缓冲区,初始为空 • 2个生产者 – 随机等待一段时间,往缓冲区添加数据, – 若缓冲区已满,等待消费者取走数据后再添加 – 重复6次 • 3个消费者 – 随机...

    利用数组解决生产者消费者问题

    生产者消费者问题是多线程编程中的经典模型,用于模拟两个或多个并发执行的实体(生产者和消费者)共享有限资源的情况。在这个问题中,生产者负责生成数据并放入缓冲区,而消费者则从缓冲区取出数据进行处理。当缓冲...

    pv.rar_pv_pv操作_生产者消费者_生产者消费者问题_生产者消费者问题 c

    在IT领域,生产者消费者问题是多线程编程中一个经典的同步问题,主要涉及进程或线程间的通信与协作。此问题描述的是一个系统中有两个角色:生产者和消费者,生产者负责生成数据,而消费者负责消费这些数据。为了保证...

    生产者消费者问题C/C++源程序

    操作系统中典型的同步问题,m个生产者,n个消费者链接在具有k个单位缓冲区的有界环形缓冲区上,生产者和消费者是并发线程,只要缓冲区未满,生产者线程就可以生产一件产品放入其中,只要缓冲区不空,消费者就可以...

    生产者消费者问题 进程实现

    它描述了两个或多个进程之间的协作关系,其中一个或多个生产者进程生产数据,并将其存储在缓冲区中,而一个或多个消费者进程则从缓冲区中取出数据进行处理。 在这个问题中,生产者和消费者之间需要通过某种机制来...

    实验一 生产者消费者问题

    生产者消费者问题是多线程编程中的经典模型,用于展示如何在并发环境中协调生产者和消费者之间的数据处理。在这个问题中,生产者线程负责生成数据,而消费者线程则负责消费这些数据。为了保证系统的稳定性和正确性,...

    C语言实现生产者消费者问题

    C语言实现生产者消费者问题,分配具有n个缓冲区的缓冲池,作为共享资源。 定义两个资源型信号量empty 和full,empty信号量表示当前空的缓冲区数量,full表示当前满的缓冲区数量。 定义互斥信号量mutex,当某个进程...

    用多进程同步方法演示“生产者-消费者”问题

    1、设计目的:通过研究Linux的进程机制和信号量,实现生产者消费者问题的并发控制。 2、说明:有界缓冲区内设有20个存储单元,放入取出的产品设定为1-20个整数。 3、设计要求: 生产者和消费者进程的数目不固定,可...

    生产者与消费者的问题

    在操作系统领域,生产者与消费者问题是一个经典的多线程同步问题,它涉及到进程间的通信和资源的共享。这个问题源于现实世界中的一个简单场景:生产者制造产品,而消费者消费这些产品。在计算机科学中,生产者通常是...

    编程实现生产者消费者或读写者的同步问题

    本文将详细介绍如何通过编程方式解决生产者消费者问题,这是一个经典的多线程同步问题。在本例中,我们将采用C++语言结合Windows API来实现。该示例程序展示了如何在同一个进程地址空间内,让多个线程(生产者线程与...

    生产者消费者问题,MFC实现

    生产者消费者问题是多线程编程中的经典模型,用于展示如何高效地在多个线程之间共享资源。MFC(Microsoft Foundation Classes)是微软提供的一套面向对象的C++库,用于构建Windows应用程序。在这个问题中,我们将...

Global site tag (gtag.js) - Google Analytics