`
wbj0110
  • 浏览: 1610025 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

如何让Java以光的速度跨线程通信?

阅读更多

一个比Disruptor吞吐量等性能指标更好的框架,使用Railway算法,将线程之间的消费发送参考现实生活中火车在站点之间搬运货物。

目标起始于一个简单的想法:创建一个开发人员友好的,简单的,轻量级线程间的通信框架,无需使用任何锁,同步器,信号量,等待,通知以及没有队列,消息,事件或任何其它并发特定的语法或工具。

只是一个Java接口接受到POJO以后在其背后实现这个通信,这个主意很类似Akka的Actors,但是它也许是有点矫枉过正,特别是对于单个多核计算机上线程间的通信优化必须是轻量的。

Akka的伟大之处是跨进程通信,特别是Actor是能够跨越不同JVM节点实现分布式通信。

无论如何,你可能觉得使用Akka在一个小型项目上有些过度,因为你只需要线程之间的通信,但是你还是想使用类似Actor这种做法模式。

该文章作者使用了动态代理 堵塞队列和一个缓存的线程池创建了这个解决方案,如图:


SPSC队列是一个Single Producer/Single Consumer 队列(单生产者/单消费者),而MPSC是一个Multi Producer/Single Consumer队列。

Dispatcher线程从Actor线程接受到消息,然后发送到相应的SPSC中。

Actor线程从接受的消息中使用数据,调用相应的actor类的方法,Actor实例都是发送消息给MPSC队列,然后再从Actor线程那里得到消息。

下面是ping-pong案例:

public interface PlayerA (
  void pong(long ball); //send and forget method call 
}
public interface PlayerB {   
  void ping(PlayerA playerA, long ball); //send and forget method call    
}    
 public class PlayerAImpl implements PlayerA {    
  @Override    
  @ublic void pong(long ball) {    
  }    
}
public class PlayerBImpl implements PlayerB {   
  @Override    
  public void ping(PlayerA playerA, long ball) {    
    playerA.pong(ball);    
  }    
}
public class PingPongExample {   
  public void testPingPong() {
// this manager hides the complexity of inter-thread communications   // and it takes control over actor proxies, actor implementations and threads    
    ActorManager manager = new ActorManager();
// registers actor implementations inside the manager   
    manager.registerImpl(PlayerAImpl.class);    
    manager.registerImpl(PlayerBImpl.class);
//Create actor proxies. Proxies convert method calls into internal messages    //which would be sent between threads to a specific actor instance.    
   PlayerA playerA = manager.createActor(PlayerA.class);    
   PlayerB playerB = manager.createActor(PlayerB.class);    
    for(int i = 0; i < 1000000; i++) {    
       playerB.ping(playerA, i);     
   }    
}


这两个play能够每秒打500,000个乒乓。但是如果和单个线程执行速度相比,还是很差的,同样代码在单个线程可以到达每秒两百万个。

作者开始研究缓慢的原因,在一些校验和测试以后,他认为是Actors之间发送消息影响了整体性能:


作者找到一个SPSC单生产者和单消费者的无锁队列,http://www.infoq.com/presentations/Lock-Free-Algorithms

无锁队列提供比锁队列更好的性能。锁队列中在当一个线程获得锁,其他线程将被阻塞,直到该锁被释放的。在无锁算法的情况下,生产者线程可以产生消息,但不阻止其他生产者线程,以及其他消费者,而从队列中读取的消费者不会被阻塞。

这个无锁队列据测试结果是超过每秒100M ops,是JDK的并发队列实现的10倍。

但是作者使用这个无锁队列提到SPSC 以后,并没有产生明显性能提升,他立即意识到这个框架的性能瓶颈不是在SPSC,而是在多个生产者/单个消费者(MPSC)那里。

多个生产者如果使用SPSC会覆盖彼此的值,因为SPSC并没有一个对生产者的控制机制,即使最快的SPSC也不适合。

对于MPSC作者找到了LMAX的disruptor,一个通过Ringbuffer实现的高性能线程间通信库包。



使用Disruptor很容易实现非常低延迟,高吞吐量的线程间消息通信。它还提供了用例对生产者和消费者的不同组合。多个线程可以从环形缓冲区中读取而不会阻塞对方:



多生产者和多消费者:


三个生产者/一个消费者测试结果显示,Disruptor都是两倍于LinkedBlockingQueue 。

但是使用Disruptor后的这个框架性能还是没有达到预期,作者从上下班的地铁中得到灵感,在某个站点同一车厢出来的人是生产者,进去的是消费者。

建立一个Railway类,使用AtomicLong来跟踪地铁在站与站之间的传递,下面是一个single-train railway:

public class RailWay {  
 private final Train train = new Train();  
 //站台号码stationNo 跟踪火车,定义哪个站点接受火车
 private final AtomicInteger stationIndex = new AtomicInteger();
//多线程访问这个方法,也就是在特定站点等待火车
public Train waitTrainOnStation(final int stationNo) {
  
   while (stationIndex.get() % stationCount != stationNo) {
    Thread.yield(); // this is necessary to keep a high throughput of message passing.   //But it eats CPU cycles while waiting for a train  
   }  
   // the busy loop returns only when the station number will match  // stationIndex.get() % stationCount condition

   return train;
 }
//这个方法通过增加火车站台号将火车移到下一个站点。  public void sendTrain() {
    stationIndex.getAndIncrement();
   }
  }



参考Disruptor,创建线程间传递long值:

public class Train {   
  //   
  public static int CAPACITY = 2*1024;
  private final long[] goodsArray; // array to transfer freight goods

  private int index;

  public Train() {   
      goodsArray = new long[CAPACITY];     
 }

 public int goodsCount() { // returns the count of goods    
  return index;    
 }    
 public void addGoods(long i) { // adds item to the train    
  goodsArray[index++] = i;    
 }    
 public long getGoods(int i) { //removes the item from the train    
  index--;    
  return goodsArray[i];    
 }    
}


如下图两个线程传递long:


使用一列火车实现单个生产者单个消费者:

public void testRailWay() {   
  final Railway railway = new Railway();    
  final long n = 20000000000l;    
  //starting a consumer thread    
  new Thread() {    
   long lastValue = 0;
   @Override   
   public void run() {    
    while (lastValue < n) {    
      Train train = railway.waitTrainOnStation(1); //waits for the train at the station #1    
      int count = train.goodsCount();    
      for (int i = 0; i < count; i++) {    
        lastValue = train.getGoods(i); // unload goods    
      }    
      railway.sendTrain(); //sends the current train to the first station.    
     }    
   }    
 }.start();

 final long start = System.nanoTime();
long i = 0;   
while (i < n) {    
 Train train = railway.waitTrainOnStation(0); // waits for the train on the station #0    
 int capacity = train.getCapacity();    
 for (int j = 0; j < capacity; j++) {    
   train.addGoods((int)i++); // adds goods to the train    
 }    
 railway.sendTrain();
 if (i % 100000000 == 0) { //measures the performance per each 100M items   
    final long duration = System.nanoTime() - start;|    
    final long ops = (i * 1000L * 1000L * 1000L) / duration;    
    System.out.format("ops/sec = %,d\n", ops);    
    System.out.format("trains/sec = %,d\n", ops / Train.CAPACITY);    
    System.out.format("latency nanos = %.3f%n\n", 
                       duration / (float)(i) * (float) Train.CAPACITY);    
  }    
 }    
}


通过测试,它达到 767,028,751 ops/sec ,是Nitsan’s blog.(第一个采用)的SPSC队列的几倍。

下面假设如果能有两列火车,每个站点有自己的火车,一个火车在第一个站点加载货物,第二列火车在第二个站点加载货物:


经过测试吞吐量是单列火车的1.4被,延迟从192.6纳秒降低到133.5纳秒。

但是线程间传输消息延迟是因为火车容量2048导致2178.4纳秒,通过增加火车降低这个延迟,如下图:


当在两个线程之间使用32,768 列火车传递一个long值,其延迟降低到13.9纳秒。到此吞吐量和延迟达到了一个平衡。

这只是SPSC的实现,纳秒多个生产者如何实现呢?答案是加入更多站点。



每个线程等待下一列火车,然后加载卸装消息,再把火车发到下一个站,而生产者线程放入消息到火车而消费者是从其中获得,火车总是从一个站到另外一个站循环不断移动。

测试了SPMC单个生产者和多个消费者,使用8个站点,一个属于生产者,剩余7个是消费者。

火车数量是256 火车容量是32时,测试结果是:吞吐量和延迟:
ops/sec = 116,604,397
latency nanos = 274.4
而火车数量是32而火车容量是256时:
ops/sec = 432,055,469
latency nanos = 592.5

后者相对是一个好的结果,延迟虽然提高,但是吞吐量提高的倍数要高得多。

分享到:
评论

相关推荐

    JAVA程序与PLC之间的通信

    9. **异步通信**:为了提高性能和响应速度,Java的多线程和非阻塞I/O(如NIO)可以用于实现与PLC的异步通信,提高系统的并发处理能力。 10. **调试与日志**:在开发过程中,日志记录和调试工具是必不可少的,它们...

    java语言UDP通信

    Java语言中的UDP(User Datagram Protocol)通信是一种基于无连接的数据包网络协议,相较于TCP,它提供了更低的延迟和更高的吞吐量,但不保证数据的顺序和可靠性。在Java中,我们通过`java.net`包中的`...

    java网络即时通信系统

    Java网络即时通信系统是一种基于Java技术构建的实时通信平台,它允许用户通过互联网进行快速、高效的数据交换。这种系统通常采用客户端-服务器架构,能够支持多种通信协议,如TCP/IP、HTTP、WebSocket等,以实现不同...

    Java模块与C网关通信

    在IT行业中,不同编程语言之间的通信是常见的需求,特别是在分布式系统和微服务架构中。...实现这种跨语言通信需要对不同语言的特性和通信机制有深入理解,同时进行充分的测试以确保系统的稳定性和可靠性。

    java与c++通过socket通信

    Java和C++之间的Socket通信是跨语言网络编程的一个常见应用场景,尤其在系统集成、设备通讯或者混合编程中。Socket提供了一种基于TCP/IP协议的进程间通信(IPC)方式,可以实现在不同操作系统上的进程之间的数据传输...

    Java多线程技术及其在网络编程中的应用.pdf

    ### Java多线程技术及其在网络编程中的...通过对多线程的支持,Java能够轻松地处理复杂的网络通信问题,实现高效的数据传输和服务。无论是对于服务器端还是客户端的开发,理解并熟练掌握Java多线程技术都是非常必要的。

    即时通信系统(Java实现)Java源码

    Java作为一种跨平台、面向对象的语言,其强大的网络编程能力使得它成为开发此类系统的理想选择。 1. **Java基础知识**:Java是一种广泛使用的高级编程语言,具有“一次编写,到处运行”的特性,其语法基于C++但更...

    跨线程提交数据

    在多线程编程中,"跨线程提交数据"是一个重要的概念,特别是在并发处理和优化应用程序性能时。本文将深入探讨这一主题,旨在为初学者提供一个全面的理解。 首先,我们要明白什么是线程。线程是操作系统分配CPU时间...

    JAVA编写网络通信程序

    Java语言以其强大的特性和广泛的应用范围成为开发网络通信程序的理想选择。 **3.1 简单性** - Java语言的设计力求简洁,去除了C++中一些不必要的复杂性,如指针、运算符重载等。同时,Java内置了自动垃圾回收机制...

    即时通信系统(Java实现)_java系统_即时通信_

    综上所述,Java实现的即时通信系统涉及了网络编程、多线程、数据库操作、安全机制等多个方面,这些知识点构成了系统的基础架构。通过对这些技术的熟练掌握和灵活运用,可以构建出高效、可靠的即时通信平台。在实际...

    JAVA串口通信实例(GUI图像化界面)

    2. **RXTX库**:由于javax.comm库的局限性,RXTX应运而生,它是一个开源的、跨平台的Java串口通信库,支持Java 5及以上版本,并且在Windows、Linux、Mac OS X等多个平台上都能良好运行。RXTX库提供了与javax.comm...

    java使用socket和c++通信

    本话题聚焦于"java使用socket和c++通信",这是一个典型的跨语言、跨平台的网络编程场景。Java和C++都是广泛应用的编程语言,它们之间的通信能够实现多种系统的交互和集成。以下是关于这一主题的详细知识讲解。 首先...

    JAVA多线程端口扫描器

    通过学习和实践这个Java多线程端口扫描器项目,开发者不仅可以深化对Java多线程编程的理解,还能掌握网络通信和并发控制的关键技能。此外,对于网络安全分析和服务器管理等领域也有实际的应用价值。

    java串口通信.zip

    Java串口通信是一种在计算机与外部设备之间进行数据交换的技术,尤其适用于低速或中速的数据传输场景,如连接扫码枪、打印机等硬件设备。在本案例中,我们看到一个名为"java串口通信.zip"的压缩包,它可能包含了实现...

    基于Java的即时通信软件

    Java作为一种跨平台、面向对象的编程语言,因其强大的网络功能和丰富的类库,被广泛用于开发各种复杂的应用程序,包括即时通信软件。 1. **Java基础知识**:Java的核心特性包括平台独立性(JVM使得Java代码可以在...

    Java即时通信工具源码

    1. **Java技术栈**:作为主要的开发语言,Java以其跨平台性、强大的类库和面向对象特性成为构建此类应用的理想选择。在源码中,你会看到如何利用Java的Socket编程实现网络连接,以及如何使用多线程处理并发请求。 2...

    Java课程设计 Java多线程以及图形用户界面编程详解(完整代码)

    3. 线程同步:Java提供多种机制来处理线程间的通信和同步,如synchronized关键字、wait()和notify()方法,以及Lock和Condition接口,以防止数据竞争和死锁。 4. 线程池:Executor框架提供了线程池的概念,允许...

    java实现windows蓝牙与android设备通信

    Java 实现Windows与Android设备通过蓝牙通信是一种常见且实用的技术,尤其在物联网和移动设备交互的场景中。本文将深入探讨如何使用Java的BlueCove库在Windows x64平台上建立蓝牙服务端,并与Android设备进行数据...

    Java串口通信实现二台电脑的串口通信

    Java串口通信是一种在两台计算机之间通过串行接口进行数据传输的技术,广泛应用于设备控制、数据采集和嵌入式系统...在实际应用中,还需考虑线程安全、异常处理以及资源管理等多方面因素,以保证通信的稳定性和可靠性。

Global site tag (gtag.js) - Google Analytics