`
kevin.xqw
  • 浏览: 10038 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

高并发消息处理器

阅读更多

来源:

工作需要实现消息路由的中间层模块

测试结果:

 

2W客户, 每客户10 Request -> MQEngine -> 本地tomcat

消息转发并发量: 6.7K 零丢包

设计图



1. MQEngine

package lightmq;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class MQEngine<E, P extends MessageHandler<E>> {
	
	/**
     * 消息队列
     * TODO 优化改为 LightQueue(内部实现为queue组)
     */
    final Queue<E> queue = new ConcurrentLinkedQueue<E>();
    //final Queue<E> queue1 = new LightQueue<E>(10);
    
    /**
     * handler class
     */
    private Class<? extends MessageHandler<E>> handlerClass;
    
    /**
     * 消费者线程池
     */
    ExecutorService consumerES;    
    
    /**
     * 消费者数量
     */
    private int consumerSize = 1;
    
    private Runnable[] consumers;
    
    /**
     * 构造函数
     * 
     * @param c 处理器类
     */
    public MQEngine(Class<? extends MessageHandler<E>> c){
        this(1, 1, c);
    }
    
    /**
     * 构�?函数 
     * 
     * @param threadPoolSize 线程池大�?
     * @param consumerSize 消息者数�?
     * @param c 处理器类�?
     */
    public MQEngine(int threadPoolSize, int consumerSize, Class<? extends MessageHandler<E>> c){
        consumerES = Executors.newFixedThreadPool(threadPoolSize);
        this.handlerClass = c;
        this.consumerSize = consumerSize;        
    }
    
    /**
     * 启动消费者线�?
     * @param consumerSize
     * @param c
     */
    public void start() {
        final Class<? extends MessageHandler<E>> c = this.handlerClass;
        
        class ConsumerTask implements Runnable{            
            
            @Override
            public void run() {
                MessageHandler<E> p = null;
                try {
                    p = c.newInstance();
                } catch (InstantiationException e1) {
                    // TODO Auto-generated catch block
                    e1.printStackTrace();
                } catch (IllegalAccessException e1) {
                    // TODO Auto-generated catch block
                    e1.printStackTrace();
                }
                // 
                int i = 0;
                while(true){
                    try {
                        if (!queue.isEmpty()) {
                            p.consume(queue.poll());
                        }
                        i++;
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    // 每执行100次
                    if(10==i){
                        synchronized (this) {
                            try {
                                i = 0;
                                wait(100);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }                            
                    }
                }
            }
        }
        
        this.consumers = new Runnable[this.consumerSize];
        for (int i = 0; i < this.consumers.length; i++) {
            consumers[i] = new ConsumerTask();
        }
        
        for (int i = 0; i < consumers.length; i++) {
            consumerES.execute(consumers[i]);
        }
    }

    /**
     * 
     * @param e
     */
    public void push(E e){
        queue.add(e);
        for (int i = 0; i < this.consumers.length; i++) {
            synchronized (consumers[i]) {
                consumers[i].notify();
            }            
        }        
    }
    
    /**
     * 
     */
    public void destory(){
        this.queue.clear();
        this.consumerES.shutdown();
    }
	
}

 

2. MessageHandler

 

 

package lightmq;

/**
 * 消息处理器
 * 由子类派生
 * @author kevin
 *
 * @param <E>
 */
public abstract class MessageHandler<E> {
	
	public abstract void consume(E e);

}

 

3.MyMessageHandler

 

package lightmq;

import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.nio.conn.NHttpClientConnectionManager;

/**
 * 业务相关消息处理器
 * @author kevin
 *
 */
public class MyMessageHandler extends MessageHandler<String>{
	
	static AtomicLong sentCount = new AtomicLong(0);
    
    static NHttpClientConnectionManager connMgr;
    
    @Override
    public void consume(String e) {
        sendToTomcat(e);
    }
    
    private CloseableHttpAsyncClient httpclient = null;
    
    public MyMessageHandler(){
        
        try {
            connMgr = new PoolingNHttpClientConnectionManager(new DefaultConnectingIOReactor());
            httpclient = HttpAsyncClients.createMinimal(connMgr);
            httpclient.start();
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    
    /**
     * 发给sc
     * @param message
     */
    private void sendToTomcat(String message){
        long startTime = System.currentTimeMillis();        
        
        try {
            
            
            // http[GET]请求, 
            final HttpGet request1 = new HttpGet("http://localhost");
            Future<HttpResponse> future = httpclient.execute(request1, null);
            
            // and wait until a response is received
            HttpResponse response1;
            response1 = future.get();
            System.out.println("message " + message + ":" + request1.getRequestLine() + "->" + response1.getStatusLine());
            System.out.println(message + " Sent; Cost:" + (System.currentTimeMillis() - startTime) + "; Succeed  Sent: " + sentCount.incrementAndGet());
        } catch (Exception e1) {
            System.err.println(e1.getMessage());
        } finally{
            // 关闭链接
            if(null!=httpclient){
                try {
                    //httpclient.close();
                } catch (Exception e1) {
                    //e1.printStackTrace();
                    System.err.println(e1.getMessage());
                }
            }
        }
    }
}

 4.M2Queue

 

package lightmq;

import java.util.Collection;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
 * 
 * @author kevin.xu
 *
 * @param <V>
 */
public class M2Queue<V> implements Queue<V>{
	/**
     * 队列数组
     */
    private Queue<V> queues[];
    
    /**
     * 
     * @param initQueueSize
     */
    public M2Queue(int initQueueSize) {
        queues = new Queue[initQueueSize];
        for (int i = 0; i < queues.length; i++) {
            queues[i] = new ConcurrentLinkedQueue<V>();
        }
    }

    @Override
    public int size() {
        return 0;
    }

    @Override
    public boolean isEmpty() {
        // TODO Auto-generated method stub
        return false;
    }

    @Override
    public boolean contains(Object o) {
        // TODO Auto-generated method stub
        return false;
    }

    @Override
    public Iterator<V> iterator() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public Object[] toArray() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public <T> T[] toArray(T[] a) {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public boolean remove(Object o) {
        // TODO Auto-generated method stub
        return false;
    }

    @Override
    public boolean containsAll(Collection<?> c) {
        // TODO Auto-generated method stub
        return false;
    }

    @Override
    public boolean addAll(Collection<? extends V> c) {
        // TODO Auto-generated method stub
        return false;
    }

    @Override
    public boolean removeAll(Collection<?> c) {
        // TODO Auto-generated method stub
        return false;
    }

    @Override
    public boolean retainAll(Collection<?> c) {
        // TODO Auto-generated method stub
        return false;
    }

    @Override
    public void clear() {
        // TODO Auto-generated method stub
        
    }

    @Override
    public boolean add(V e) {
        return offer(e);
    }

    /**
     * 添加到元素最少的队列中
     */
    @Override
    public boolean offer(V e) {
        return queues[getSmallestQueueIndex()].offer(e);
    }

    /**
     * 从元素最大的队列中remove
     */
    @Override
    public V remove() {
        return queues[getLargestQueueIndex()].remove();
    }

    /**
     * 从元素最大的队列中poll
     */
    @Override
    public V poll() {
        return queues[getLargestQueueIndex()].poll();
    }

    /**
     * 从元素最大的队列中element
     */
    @Override
    public V element() {
        return queues[getLargestQueueIndex()].element();
    }

    /**
     * 先从记录数最多的queue里peek
     */
    @Override
    public V peek() {
        return queues[getLargestQueueIndex()].peek();
    }
    
    /**
     * 返回最少记录数的queue
     * 
     * @return
     */
    private int getSmallestQueueIndex(){
        int index = 0;
        if (queues.length > 1) {
            for (int i = index; i < queues.length; i++) {
                if(queues[i].size() > queues[i+1].size()){
                    index = i+1;
                }
            }
        }
        return index;
    }
    
    /**
     * 返回最多记录数的queue
     * 
     * @return
     */
    private int getLargestQueueIndex(){
        int index = 0;
        if (queues.length > 1) {
            for (int i = index; i < queues.length; i++) {
                if(queues[i].size() < queues[i+1].size()){
                    index = i+1;
                }
            }
        }
        return index;
    }
}

 5.TestMQ

 

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;

import lightmq.MQEngine;
import lightmq.MyMessageHandler;

/**
 * MQEngine测试类
 * @author kevin
 *
 */
public class TestMQ {
	public static void main(String[] args) {	
		final AtomicLong l = new AtomicLong(0);
		// 
		final MQEngine<String, MyMessageHandler> mq = new MQEngine<String, MyMessageHandler>(10, 50, MyMessageHandler.class);		
		mq.start();
		// 模拟客户并发数
		final int PRODUCER_SIZE = 200000;
		// 模拟每个客户平均请求次数
		final int REQUEST_TIME = 10;
		
		ExecutorService es = Executors.newFixedThreadPool(10);
		for (int i = 0; i < PRODUCER_SIZE; i++) {
			es.execute(new Runnable() {
				@Override
				public void run() {
					for (int i = 0; i < REQUEST_TIME; i++) {						
						mq.push(String.valueOf(l.incrementAndGet()));
					}
				}
			});
		}
		
		
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
		System.out.println(mq.size());
		
	}
}

 

13
6
分享到:
评论
5 楼 kevin.xqw 2014-01-10  
iablee 写道
TestMQ 中 17行 mq是不是要调用start()方法?


是的,应该加上mq.start();
谢谢指正
4 楼 iablee 2013-12-31  
TestMQ 中 17行 mq是不是要调用start()方法?
3 楼 qingling600 2013-12-31  
MyMessageHandler
56行有问题,没有办法编译过
2 楼 kevin.xqw 2013-12-31  
linde 写道
MQEngine
110行有问题吗?


谢谢linde指正,是要改为 consumerES.execute(consumers[i]);
1 楼 linde 2013-12-30  
MQEngine
110行有问题吗?

相关推荐

    ACE 服务器源码,高并发的高性能处理器和源码

    ACE(Adaptive Communication Environment)是一个跨平台的网络编程框架,被广泛用于开发高并发、高性能的网络应用。这个压缩包包含的文件主要是ACE服务器的源码及相关组件,可以帮助我们深入理解ACE的工作机制以及...

    基于众核处理器的高并发视频转码与分发系统.pdf

    【基于众核处理器的高并发视频转码与分发系统】是针对海量视频应用需求而设计的一种高效解决方案。系统利用36核众核处理器平台,旨在实现高并发的视频转码和网络分发,以应对日益增长的用户规模和多样化终端设备的...

    多线程 高并发

    在IT领域,多线程和高并发是两个关键的概念,特别是在服务器端开发、分布式系统以及高性能计算中。这里,我们主要探讨的是如何通过编写多线程并发程序来优化应用程序的性能,提高系统的处理能力。 首先,多线程是指...

    C#高并发SOCKET服务器和客户端完整工程实例源码.zip

    本示例源码提供了C#语言实现的高并发SOCKET服务器和客户端的完整工程实例,这为开发者提供了学习和实践网络通信机制的机会。C#作为一种强大的.NET平台语言,拥有丰富的库支持,使得构建这样的系统变得相对简单。 ...

    高并发高性能服务器

    在IT领域,构建高并发高性能服务器是至关重要的技术挑战,特别是在大数据时代,处理大量用户请求、实时数据交换以及保持高效的服务响应成为系统设计的核心。本资料包提供的"高并发高性能服务器"源码提供了深入理解这...

    C#版支持高并发的HTTP服务器源码

    在高并发场景下,传统的同步I/O模型可能会导致线程池耗尽,影响系统性能。而异步处理则允许服务器在等待I/O操作完成时执行其他任务,显著提高了资源利用率和吞吐量。 异步编程在C#中主要通过`async`和`await`关键字...

    基于Node.js的高并发电商购物系统设计与实现

    【基于Node.js的高并发电商购物系统设计与实现】 在当今互联网时代,电子商务网站面临着日益增长的并发访问压力,特别是由于直播营销和各种电商节日活动的兴起,这使得企业对高性能、高并发的系统需求更加迫切。...

    Java并发编程与高并发解决方案-学习笔记

    ### Java并发编程与高并发解决方案知识点总结 #### 一、并发与高并发基本概念 ##### 1.1 并发 - **定义**: 指一个程序在同一时刻拥有两个或更多的线程,这些线程可以在单核或多核处理器上运行。 - **单核处理器上...

    Java并发编程与高并发解决方案-学习笔记-www.itmuch.com.pdf

    在探讨Java并发编程与高并发解决方案的过程中,我们会涉及到一系列核心概念和相关技术。本文将基于文档《Java并发编程与高并发解决方案-学习笔记***.pdf》中提供的内容,来详细阐述并发编程和高并发的基本概念、CPU...

    高通量众核处理器设计.pdf

    【结论】随着对高并发、强实时处理能力的需求日益增长,高通量众核处理器有望成为未来数据中心的核心处理器,引领高性能计算领域的发展。 高通量计算的关键在于如何有效地处理和传输大量数据,众核处理器则是实现这...

    高性能高并发服务器架构.pdf

    ### 高性能高并发服务器架构的关键知识点 #### 1. 高性能高并发服务器架构概述 - **定义**: 高性能高并发服务器架构是指能够同时处理大量用户请求,并且能够保持稳定性能的服务器架构。 - **重要性**: 在当前互联网...

    JavaWeb并发编程与高并发解决方案.docx

    ### JavaWeb并发编程与高并发解决方案 #### 一、并发编程概述 在现代软件开发中,尤其是对于基于JavaWeb的应用程序来说,面对大量的用户请求和数据处理任务时,高效的并发处理能力至关重要。并发编程旨在利用多...

    易语言MySql高并发操作模块源码 效率很高

    【MySQL高并发操作模块源码】是一个专门设计用于处理大量并发请求的软件组件,它能够高效地处理数据库的读写操作。在高并发环境中,系统的性能往往受到数据库操作的限制,因为数据库是系统中的瓶颈。这个模块的目标...

    高并发线程配置建议-合理配置

    在高并发环境中,正确配置线程池至关重要,因为它直接影响到系统的性能和响应速度。线程池的设置应根据任务的特性和系统资源来确定,主要包括CPU密集型任务和IO密集型任务。 对于CPU密集型任务,这类任务主要依赖...

    多线程与高并发-电子.pdf

    多线程与高并发是计算机科学中非常重要的两个概念,它们在提高软件程序的性能、响应速度和资源利用率方面起着至关重要的作用。在当今的互联网时代,特别是在产业互联网和5G技术的推动下,多线程和高并发的应用变得...

    java高并发编程推荐超好的一本电子书

    根据提供的信息,我们可以深入探讨Java高并发编程的相关知识点。高并发是现代软件系统设计中一个非常重要的方面,尤其是在云计算和大数据处理领域。下面将详细解释Java高并发编程的基础概念、核心技术以及实现技巧。...

    webserver(1)_linux高并发服务器_源码

    在IT领域,尤其是在服务器开发和运维中,"Linux高并发服务器"是一个至关重要的主题。这里的“webserver(1)_linux高并发服务器_源码”是一个针对该主题的实践项目,它采用Preactor模式并利用了Linux的epoll LT模型来...

    高性能PowerPC处理器.pdf

    此外,其CoreConnect 128位处理器局域总线(PLB)采用了双向纵横结构,每段都有独立的128位读/写数据总线,可以实现并发处理,片上总带宽达到了惊人的10.4GB/s(在166MHz下)。这种高带宽设计确保了数据传输的快速性...

    Java高并发笔记.pdf

    ### Java高并发核心知识点解析 #### 一、同步与异步 **同步(Synchronous)** - **定义**:同步操作是指在发起一个调用时,调用者必须等待该调用返回结果之后才能继续执行接下来的操作。 - **应用场景**:在日常生活...

    高性能,高并发TCP server

    在IT行业中,构建一个高性能、高并发的TCP服务器是许多应用程序的基础,特别是在网络服务、游戏服务器、实时数据传输等领域。Boost.Asio库是C++中实现这类服务的有力工具,它提供了一种高效且灵活的方式来处理I/O...

Global site tag (gtag.js) - Google Analytics