论坛首页 Java企业应用论坛

多线程发送消息之流控

浏览 4787 次
精华帖 (0) :: 良好帖 (3) :: 新手帖 (12) :: 隐藏帖 (0)
作者 正文
   发表时间:2011-06-24   最后修改:2011-06-29
近期接触到流量,由于旧的版本流控不准确,所以相对其改进一下

整体方案:使用jdk1.5的信号量机制,没法送一条消息,信号量加1,当信号量达到上限时,阻塞,等待定时线程来清理(每100毫秒尝试清理一次)

1.首先想到使用Semaphore来实现,不过测试时发现,由于semaphore不能够重入,这导致,在1秒钟内一个线程发送了一条之后,就会阻塞,这样一秒钟每个线程只能发送一条消息。

2.采用ArrayBlockingQueue来实现,每发送一条调用一次put方法(put时如果发现超过1秒钟,则立即清0),定时线程每100毫秒尝试清理一次,但是发现用ArrayBlockingQueue似乎不够轻便,因为其内部维护了一个对象数组。

3.自定义个可重入的信号量类,参照ArrayBlockingQueue来实现,去掉其中的对象数组,值保留put和clear方法,(put方法改名为acquire)。

public class ReentrantSemaphore
{
    private int size;
    private final ReentrantLock lock;
    private final Condition notEmpty;
    private final Condition notFull;
    private int count;
    
    public ReentrantSemaphore(int capacity) {
        size = capacity;
        lock = new ReentrantLock();
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

    
    public void clear() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            count = 0;
            notFull.signalAll();
        } finally {
            lock.unlock();
        }
    }
    
    public void acquire(int x) throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            try {
                while (count == size)
                    notFull.await();
            } catch (InterruptedException ie) {
                notFull.signal(); // propagate to non-interrupted thread
                throw ie;
            }
            count+=x;
        } finally {
            lock.unlock();
        }
    }
}


经过测试,这种方式可以比较好的实现,如果其他人有更好的实现,希望能一起讨论。

完整代码在附件中:
   发表时间:2011-06-24  
直接一个计数器不就搞定的事情呀
0 请登录后投票
   发表时间:2011-06-25  
直接计数不准确,每次到达上限,你sleep多长时间算好呢?
假设只有一个线程,sleep时间较长的话,会使得下一秒发送次数变少。

如果sleep时间较短,多个线程时(数目较多),高并发的情况下性能会比较差, 而使用基于AQS的锁,性能不会降低很多,这也是我使用并发类的主要原因。

0 请登录后投票
   发表时间:2011-06-27  
samttsch 写道
直接计数不准确,每次到达上限,你sleep多长时间算好呢?
假设只有一个线程,sleep时间较长的话,会使得下一秒发送次数变少。

如果sleep时间较短,多个线程时(数目较多),高并发的情况下性能会比较差, 而使用基于AQS的锁,性能不会降低很多,这也是我使用并发类的主要原因。


牛逼人必有牛逼之处。。。
0 请登录后投票
论坛首页 Java企业应用版

跳转论坛:
Global site tag (gtag.js) - Google Analytics