`

Java里快如闪电的线程间通讯

    博客分类:
  • JAVA
 
阅读更多

这个故事源自一个很简单的想法:创建一个对开发人员友好的、简单轻量的线程间通讯框架,完全不用锁、同步器、信号量、等待和通知,在Java里开发一个轻量、无锁的线程内通讯框架;并且也没有队列、消息、事件或任何其他并发专用的术语或工具。

只用普通的老式Java接口实现POJO的通讯。

它可能跟Akka的类型化actor类似,但作为一个必须超级轻量,并且要针对单台多核计算机进行优化的新框架,那个可能有点过了。

当actor跨越不同JVM实例(在同一台机器上,或分布在网络上的不同机器上)的进程边界时,Akka框架很善于处理进程间的通讯。

但对于那种只需要线程间通讯的小型项目而言,用Akka类型化actor可能有点儿像用牛刀杀鸡,不过类型化actor仍然是一种理想的实现方式。

我花了几天时间,用动态代理,阻塞队列和缓存线程池创建了一个解决方案。

图一是这个框架的高层次架构:


图一: 框架的高层次架构

 

SPSC队列是指单一生产者/单一消费者队列。MPSC队列是指多生产者/单一消费者队列。

派发线程负责接收Actor线程发送的消息,并把它们派发到对应的SPSC队列中去。

接收到消息的Actor线程用其中的数据调用相应的actor实例中的方法。借助其他actor的代理,actor实例可以将消息发送到MPSC队列中,然后消息会被发送给目标actor线程。

我创建了一个简单的例子来测试,就是下面这个打乒乓球的程序:

public interface PlayerA (
  void pong(long ball); //发完就忘的方法调用 
}
public interface PlayerB {   
  void ping(PlayerA playerA, long ball); //发完就忘的方法调用 
}    
public class PlayerAImpl implements PlayerA {    
  @Override    
  public void pong(long ball) {    
  }    
}
public class PlayerBImpl implements PlayerB {   
  @Override    
  public void ping(PlayerA playerA, long ball) {    
    playerA.pong(ball);    
  }    
}
public class PingPongExample {   
  public void testPingPong() {
    // 管理器隐藏了线程间通讯的复杂性
    // 控制actor代理,actor实现和线程  
    ActorManager manager = new ActorManager();
    // 在管理器内注册actor实现 
    manager.registerImpl(PlayerAImpl.class);    
    manager.registerImpl(PlayerBImpl.class);
    //创建actor代理。代理会将方法调用转换成内部消息。 
    //会在线程间发给特定的actor实例。    
    PlayerA playerA = manager.createActor(PlayerA.class);    
    PlayerB playerB = manager.createActor(PlayerB.class);    
    for(int i = 0; i < 1000000; i++) {    
       playerB.ping(playerA, i);     
   }    
}

经过测试,速度大约在每秒500,000 次乒/乓左右;还不错吧。然而跟单线程的运行速度比起来,我突然就感觉没那么好了。在 单线程 中运行的代码每秒速度能达到20亿 (2,681,850,373)!

居然差了5,000 多倍。太让我失望了。在大多数情况下,单线程代码的效果都比多线程代码更高效。

我开始找原因,想看看我的乒乓球运动员们为什么这么慢。经过一番调研和测试,我发现是阻塞队列的问题,我用来在actor间传递消息的队列影响了性能。

2: 只有一个生产者和一个消费者的SPSC队列

所以我发起了一场竞赛,要将它换成Java里最快的队列。我发现了Nitsan Wakart的 博客 。他发了几篇文章介绍单一生产者/单一消费者(SPSC)无锁队列的实现。这些文章受到了Martin Thompson的演讲 终极性能的无锁算法的启发。

跟基于私有锁的队列相比,无锁队列的性能更优。在基于锁的队列中,当一个线程得到锁时,其它线程就要等着锁被释放。而在无锁的算法中,某个生产者线程生产消息时不会阻塞其它生产者线程,消费者也不会被其它读取队列的消费者阻塞。

在Martin Thompson的演讲以及在Nitsan的博客中介绍的SPSC队列的性能简直令人难以置信—— 超过了100M ops/sec。比JDK的并发队列实现还要快10倍 (在4核的 Intel Core i7 上的性能大约在 8M ops/sec 左右)。

我怀着极大的期望,将所有actor上连接的链式阻塞队列都换成了无锁的SPSC队列。可惜,在吞吐量上的性能测试并没有像我预期的那样出现大幅提升。不过很快我就意识到,瓶颈并不在SPSC队列上,而是在多个生产者/单一消费者(MPSC)那里。

用SPSC队列做MPSC队列的任务并不那么简单;在做put操作时,多个生产者可能会覆盖掉彼此的值。SPSC 队列就没有控制多个生产者put操作的代码。所以即便换成最快的SPSC队列,也解决不了我的问题。

为了处理多个生产者/单一消费者的情况,我决定启用LMAX Disruptor ——一个基于环形缓冲区的高性能进程间消息库。

3: 单一生产者和单一消费者的LMAX Disruptor

借助Disruptor,很容易实现低延迟、高吞吐量的线程间消息通讯。它还为生产者和消费者的不同组合提供了不同的用例。几个线程可以互不阻塞地读取环形缓冲中的消息:

4: 单一生产者和两个消费者的LMAX Disruptor    

下面是有多个生产者写入环形缓冲区,多个消费者从中读取消息的场景。

5: 两个生产者和两个消费者的LMAX Disruptor

经过对性能测试的快速搜索,我找到了 三个发布者和一个消费者的吞吐量测试。 这个真是正合我意,它给出了下面这个结果:

 

LinkedBlockingQueue

Disruptor

Run 0

4,550,625 ops/sec

11,487,650 ops/sec

Run 1

4,651,162 ops/sec

11,049,723 ops/sec

Run 2

4,404,316 ops/sec

11,142,061 ops/sec

在3 个生产者/1个 消费者场景下, Disruptor要比LinkedBlockingQueue快两倍多。然而这跟我所期望的性能上提升10倍仍有很大差距。

这让我觉得很沮丧,并且我的大脑一直在搜寻解决方案。就像命中注定一样,我最近不在跟人拼车上下班,而是改乘地铁了。突然灵光一闪,我的大脑开始将车站跟生产者消费者对应起来。在一个车站里,既有生产者(车和下车的人),也有消费者(同一辆车和上车的人)。

我创建了 Railway类,并用AtomicLong追踪从一站到下一站的列车。我先从简单的场景开始,只有一辆车的铁轨。

public class RailWay {  
 private final Train train = new Train();  
 // stationNo追踪列车并定义哪个车站接收到了列车
 private final AtomicInteger stationIndex = new AtomicInteger();
// 会有多个线程访问这个方法,并等待特定车站上的列车 
public Train waitTrainOnStation(final int stationNo) {
  
   while (stationIndex.get() % stationCount != stationNo) {
    Thread.yield(); // 为保证高吞吐量的消息传递,这个是必须的。
                   //但在等待列车时它会消耗CPU周期 
   }  
   // 只有站号等于stationIndex.get() % stationCount时,这个忙循环才会返回

   return train;
 }
// 这个方法通过增加列车的站点索引将这辆列车移到下一站
  public void sendTrain() {
    stationIndex.getAndIncrement();
   }
  }

为了测试,我用的条件跟在Disruptor性能测试中用的一样,并且也是测的SPSC队列——测试在线程间传递long值。我创建了下面这个Train类,其中包含了一个long数组:

public class Train {   
  //   
  public static int CAPACITY = 2*1024;
  private final long[] goodsArray; // 传输运输货物的数组

  private int index;

  public Train() {   
      goodsArray = new long[CAPACITY];     
 }

 public int goodsCount() { //返回货物数量    
  return index;    
 }    
 public void addGoods(long i) { // 向列车中添加条目    
  goodsArray[index++] = i;    
 }    
 public long getGoods(int i) { //从列车中移走条目    
  index--;    
  return goodsArray[i];    
 }    
}

然后我写了一个简单的测试 :两个线程通过列车互相传递long值。

6: 使用单辆列车的单一生产者和单一消费者Railway

public void testRailWay() {   
  final Railway railway = new Railway();    
  final long n = 20000000000l;    
  //启动一个消费者进程 
  new Thread() {    
   long lastValue = 0;
   @Override   
   public void run() {    
    while (lastValue < n) {    
      Train train = railway.waitTrainOnStation(1); //在#1站等列车
      int count = train.goodsCount();    
      for (int i = 0; i < count; i++) {    
        lastValue = train.getGoods(i); // 卸货   
      }    
      railway.sendTrain(); //将当前列车送到第一站 
     }    
   }    
 }.start();

final long start = System.nanoTime();
long i = 0;   
while (i < n) {    
 Train train = railway.waitTrainOnStation(0); // 在#0站等列车    
 int capacity = train.getCapacity();    
 for (int j = 0; j < capacity; j++) {    
   train.addGoods((int)i++); // 将货物装到列车上 
 }    
 railway.sendTrain();
 if (i % 100000000 == 0) { //每隔100M个条目测量一次性能 
    final long duration = System.nanoTime() - start;    
    final long ops = (i * 1000L * 1000L * 1000L) / duration;    
    System.out.format("ops/sec = %,d\n", ops);    
    System.out.format("trains/sec = %,d\n", ops / Train.CAPACITY);    
    System.out.format("latency nanos = %.3f%n\n", 
    duration / (float)(i) * (float)Train.CAPACITY);    
  }    
 }    
}

在不同的列车容量下运行这个测试,结果惊着我了:

容量

吞吐量: ops/sec

延迟: ns

1

5,190,883

192.6

2

10,282,820

194.5

32

104,878,614

305.1

256

344,614,640

742. 9

2048

608,112,493

3,367.8

32768

767,028,751

42,720.7

在列车容量达到32,768时,两个线程传送消息的吞吐量达到了767,028,751 ops/sec。比Nitsan博客中的SPSC队列快了几倍。

继续按铁路列车这个思路思考,我想知道如果有两辆列车会怎么样?我觉得应该能提高吞吐量,同时还能降低延迟。每个车站都会有它自己的列车。当一辆列车在第一个车站装货时,第二辆列车会在第二个车站卸货,反之亦然。

7: 使用两辆列车的单一生产者和单一消费者Railway

下面是吞吐量的结果:

容量

吞吐量: ops/sec

延时: ns

1

7,492,684

133.5

2

14,754,786

135.5

32

174,227,656

183.7

256

613,555,475

417.2

2048

940,144,900

2,178.4

32768

797,806,764

41,072.6

结果是惊人的;比单辆列车的结果快了1.4倍多。列车容量为一时,延迟从192.6纳秒降低到133.5纳秒;这显然是一个令人鼓舞的迹象。

因此我的实验还没结束。列车容量为2048的两个线程传递消息的延迟为2,178.4 纳秒,这太高了。我在想如何降低它,创建一个有很多辆列车 的例子:

8: 使用多辆列车的单一生产者和单一消费者Railway 

我还把列车容量降到了1个long值,开始玩起了列车数量。下面是测试结果:

列车数量

吞吐量: ops/sec

延迟: ns

2

10,917,951

91.6

32

31,233,310

32.0

256

42,791,962

23.4

1024

53,220,057

18.8

32768

71,812,166

13.9

用32,768 列车在线程间发送一个long值的延迟降低到了13.9 纳秒。通过调整列车数量和列车容量,当延时不那么高,吞吐量不那么低时,吞吐量和延时就达到了最佳平衡。

对于单一生产者和单一消费者(SPSC)而言,这些数值很棒;但我们怎么让它在有多个生产者和消费者时也能生效呢?答案很简单,添加更多的车站!

9:一个生产者和两个消费者的Railway

每个线程都等着下一趟列车,装货/卸货,然后把列车送到下一站。在生产者往列车上装货时,消费者在从列车上卸货。列车周而复始地从一个车站转到另一个车站。

为了测试单一生产者/多消费者(SPMC) 的情况,我创建了一个有8个车站的Railway测试。 一个车站属于一个生产者,而另外7个车站属于消费者。结果是:

列车数量 = 256 ,列车容量 = 32:

 ops/sec = 116,604,397     延迟(纳秒) = 274.4

列车数量= 32,列车容量= 256:

 ops/sec = 432,055,469     延迟(纳秒) = 592.5

如你所见,即便有8个工作线程,测试给出的结果也相当好-- 32辆容量为256个long的列车吞吐量为432,055,469 ops/sec。在测试期间,所有CPU内核的负载都是100%。

10:在测试有8个车站的Railway 期间的CPU 使用情况

在玩这个Railway算法时,我几乎忘了我最初的目标:提升多生产者/单消费者情况下的性能。

11:三个生产者和一个消费者的 Railway 

我创建了3个生产者和1个消费者的新测试。每辆列车一站一站地转圈,而每个生产者只给每辆车装1/3容量的货。消费者取出每辆车上三个生产者给出的全部三项货物。性能测试给出的平均结果如下所示:

 ops/sec = 162,597,109  列车/秒 = 54,199,036     延迟(纳秒) = 18.5

结果相当棒。生产者和消费者工作的速度超过了160M ops/sec。

为了填补差异,下面给出相同情况下的Disruptor结果- 3个生产者和1个消费者

Run 0, Disruptor=11,467,889 ops/sec
Run 1, Disruptor=11,280,315 ops/sec
Run 2, Disruptor=11,286,681 ops/sec
Run 3, Disruptor=11,254,924 ops/sec

下面是另一个批量消息的Disruptor 3P:1C 测试 (10 条消息每批):

Run 0, Disruptor=116,009,280 ops/sec
Run 1, Disruptor=128,205,128 ops/sec
Run 2, Disruptor=101,317,122 ops/sec
Run 3, Disruptor=98,716,683 ops/sec;

最后是用带LinkedBlockingQueue 实现的Disruptor 在3P:1C场景下的测试结果:

Run 0, BlockingQueue=4,546,281 ops/sec
Run 1, BlockingQueue=4,508,769 ops/sec
Run 2, BlockingQueue=4,101,386 ops/sec
Run 3, BlockingQueue=4,124,561 ops/sec

如你所见,Railway方式的平均吞吐量是162,597,109 ops/sec,而Disruptor在同样的情况下的最好结果只有128,205,128 ops/sec。至于 LinkedBlockingQueue,最好的结果只有4,546,281 ops/sec。

Railway算法为事件批处理提供了一种可以显著增加吞吐量的简易办法。通过调整列车容量或列车数量,很容易达成想要的吞吐量/延迟。

另外, 当同一个线程可以用来消费消息,处理它们并向环中返回结果时,通过混合生产者和消费者,Railway也能用来处理复杂的情况:

12: 混合生产者和消费者的Railway

最后,我会提供一个经过优化的超高吞吐量 单生产者/单消费者测试:

13:单个生产者和单个消费者的Railway

它的平均结果为:吞吐量超过每秒15亿 (1,569,884,271)次操作,延迟为1.3 微秒。如你所见,本文开头描述的那个规模相同的单线程测试的结果是每秒2,681,850,373。

 
 
分享到:
评论

相关推荐

    JAVA100例之实例64 JAVA线程间通讯

    在"JAVA100例之实例64 JAVA线程间通讯"这个主题中,我们将深入探讨Java中实现线程间通信的几种主要方法。 1. **共享数据**:最直观的线程间通信方式是通过共享内存空间,即共享变量。只要对共享变量的操作是线程...

    Java中利用管道实现线程间的通讯

    Java中的线程间通信是多线程编程中的关键部分,以确保不同线程之间的协作和数据交换。在Java中,管道(Pipe)流提供了一种有效的方法,允许数据从一个线程...理解并掌握这些技术对于编写高效的多线程Java程序至关重要。

    彻底明白Java的多线程-线程间的通信.doc

    本文将深入讲解如何在Java中实现多线程以及线程间的通信。 首先,我们要理解一个虚假的多线程示例。在例1中,创建了两个`TestThread`对象,分别调用`go(0)`和`go(1)`方法。尽管每个方法都在无限循环中调用了`Thread...

    c++ 多线程编程之三----线程间通讯

    C++ 多线程编程之三----线程间通讯 C++ 多线程编程中,线程间通讯是非常重要的一部分。线程间通讯可以让不同的线程之间进行信息传递,实现协作和同步。在多线程编程中,线程间通讯可以使用全局变量、自定义消息等...

    Java多线程机制(讲述java里面与多线程有关的函数)

    掌握Java的多线程机制对于编写高效、并发友好的应用程序至关重要,它涉及到线程的创建、管理、同步以及线程间的通信等多个方面。理解并熟练运用这些概念和方法,可以帮助开发者构建出更稳定、高效的系统。

    java 线程工具类 java 线程工具类

    java 线程工具类 java 线程工具类java 线程工具类 java 线程工具类java 线程工具类 java 线程工具类java 线程工具类 java 线程工具类java 线程工具类 java 线程工具类java 线程工具类 java 线程工具类java 线程工具...

    java多线程Demo

    Java多线程是Java编程中的一个重要概念,它允许程序同时执行多个任务,提高了程序的效率和响应速度。在Java中,实现多线程有两种主要方式:继承Thread类和实现Runnable接口。 1. 继承Thread类: 当我们创建一个新...

    jeromq-0.3.5.jar 线程间通讯

    jeromq-0.3.5.jar 线程间通讯

    线程 JAVA java线程 java线程第3版 java线程第2版第3版合集

    电子书相关:包含4个有关JAVA线程的电子书(几乎涵盖全部有关线程的书籍) OReilly.Java.Threads.3rd.Edition.Sep.2004.eBook-DDU Java Thread Programming (Sams) java线程第二版中英文 java线程第二版中英文 ...

    java多线程代码案例(创建线程,主线程,线程优先级,线程组,线程同步,线程间的通信)

    Java提供了多种线程间通信的手段,如`BlockingQueue`、`Future`、`ExecutorService`等。其中,`wait()`, `notify()`, `notifyAll()`是基于对象监视器的通信方式,用于在线程间传递信号。`BlockingQueue`则提供了...

    xiancheng.zip_线程间通讯

    下面我们将围绕标题“线程间通讯”和描述“通过代码实现线程间通讯,实现等待、唤醒等机制”来深入讲解相关知识点。 1. **线程的基本概念**: - **线程**是操作系统分配处理器时间的基本单位,一个进程中可以有多...

    线程的几种控制方式以及线程间的几种通信方式

    4. **线程间通信**:线程间通信允许线程之间交换信息,Java提供了多种机制,如`wait()`, `notify()`, `notifyAll()`,Python中则有`Condition`对象。 5. **线程休眠**:Java的`Thread.sleep()`方法可以让线程暂停...

    java多线程经典案例

    Java中,可以通过wait()、notify()和notifyAll()这三个Object类的方法来实现线程间的通信。这些方法必须在同步环境中使用,否则会抛出异常。此外,Java 5引入了BlockingQueue阻塞队列,它是一种线程安全的数据结构,...

    Java多线程示例之线程控制

    在Java编程中,多线程是一种常见的并发处理方式,它能充分利用CPU资源,提高程序的执行效率。本示例主要探讨了如何通过两种方法来控制Java中的线程数量,以达到优化性能和防止内存不足的目的。 首先,我们来看...

    Java 多线程间的通讯.doc

    Java 多线程间的通信是Java并发编程中的一个重要概念,主要解决的是线程间协作的问题。在多线程环境中,不同线程可能需要共享资源或按照特定顺序执行任务,这时就需要线程之间的通信来协调执行流程。传统的同步机制...

    Java并发编程线程间通讯实现过程详解

    "Java并发编程线程间通讯实现过程详解" Java并发编程线程间通讯实现过程详解是Java编程中非常重要的一部分,涉及到多线程之间的数据共享、同步和通信。在Java中,线程间通讯可以通过多种方式实现,本文将通过示例...

    java多线程文件传输

    在Java中,多线程可以提高程序的执行效率,尤其在处理并发任务时,如大文件的上传、下载和传输。下面将详细探讨相关知识点。 1. **线程基础** - **线程定义**:线程是操作系统分配CPU时间的基本单位,是程序执行的...

    关于线程(java)两天的课件

    线程在Java编程中扮演着至关重要的角色,它是程序中的执行单元,允许应用程序同时执行多个任务。本课程将深入探讨线程的基础知识,包括如何创建线程、理解线程的状态以及如何有效地管理线程资源,特别是通过线程池来...

    Java的多线程(java基础)

    Java提供了多种方法来控制线程,如synchronized关键字用于实现互斥,保证同一时间只有一个线程访问特定资源;wait(), notify()和notifyAll()方法用于线程间的同步与通信,实现线程间的数据交换。 此外,Java还提供...

    java多线程的讲解和实战

    4. **线程优先级与调度**:Java的`Thread`类提供了设置线程优先级的方法,如`setPriority(int priority)`,但实际线程调度依赖于操作系统的策略,优先级并不保证绝对的执行顺序。 5. **守护线程(Daemon)**:守护...

Global site tag (gtag.js) - Google Analytics