`
cywhoyi
  • 浏览: 422648 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

Batching Opertaion

阅读更多

我们常常说Batching(批量增加、批量操作...),那么Batching会增加延迟性,特别针对于本身延迟比较low的系统。

但是从我跟人角度来说,如果Batching的算法如果做得好,不仅会带来吞吐量的增加,而且也降低整个系统的延迟性。

在我们的网络传输时候,经常采用把messages/even捆绑在一起形成数据包,然后提高网络传输的吞吐量,同样我们也会类似的方式在系统中IOPS来帮助我们提高性能。



 如果上图1中场景非常容易产生竞争,而下面图是因为通过Queue,但是最终是有单一线程完成批量的操作。

The Algorithm

 

public final class NetworkBatcher
    implements Runnable
{
    private final NetworkFacade network;
    private final Queue<Message> queue;
    private final ByteBuffer buffer;

    public NetworkBatcher(final NetworkFacade network,
                          final int maxPacketSize,
                          final Queue<Message> queue)
    {
        this.network = network;
        buffer = ByteBuffer.allocate(maxPacketSize);
        this.queue = queue;
    }

    public void run()
    {
        while (!Thread.currentThread().isInterrupted())
        {
            while (null == queue.peek())
            {
                employWaitStrategy(); // block, spin, yield, etc.
            }

            Message msg;
            while (null != (msg = queue.poll()))
            {
                if (msg.size() > buffer.remaining())
                {
                    sendBuffer();
                }

                buffer.put(msg.getBytes());
            }

            sendBuffer();
        }
    }

    private void sendBuffer()
    {
        buffer.flip();
        network.send(buffer);
        buffer.clear();
    }
}

 伪代码大致地想法可能如上所述。

 

在现实中,并发下的同步以及性能指标基本会采用CAS的队列ConcurrentLinkedQueue,但是都有问题就是Queue都是无边界,当超出可控的范围,那么性能指标可能会多损耗50%+,这个也许大家对于ArrayList都有比较多的了解。

算法都是基于 single writer principle  的原则而进行编写设计的。

Batching with the Disruptor

在爆发业务场景,我们也会采用 EventHandler 用来进行加载

public final class NetworkBatchHandler
    implements EventHander<Message>
{
    private final NetworkFacade network;
    private final ByteBuffer buffer;

    public NetworkBatchHandler(final NetworkFacade network,
                               final int maxPacketSize)
    {
        this.network = network;
        buffer = ByteBuffer.allocate(maxPacketSize);
    }
    
    public void onEvent(Message msg, long sequence, boolean endOfBatch) 
        throws Exception
    {
        if (msg.size() > buffer.remaining())
        {
            sendBuffer();
        }

        buffer.put(msg.getBytes());
        
        if (endOfBatch)
        {
            sendBuffer();
        }
    } 

    private void sendBuffer()
    {
        buffer.flip();
        network.send(buffer);
        buffer.clear();
    }    
}

 Separation of IO from Work Processing

 

  • 大小: 41.5 KB
0
1
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics