zoremq
这个不能算是mq产品,只能算是个socket的封装,并针对常用场景进行了抽取。简化了网络编程。
jeromq 是对 zoremq 框架的一个纯粹的java版本的实现(不需要依赖zoremq 动态库)
jeromq 它是基于java nio 进行的封装。
maven 依赖,仅一个jar
<dependency> <groupId>org.zeromq</groupId> <artifactId>jeromq</artifactId> <version>0.3.4</version> </dependency>
1、请求-响应模式
package zoremq; import org.zeromq.ZMQ; public class Request { public static void main(String args[]) { for (int j = 0; j < 5; j++) { new Thread(new Runnable() { public void run() { ZMQ.Context context = ZMQ.context(1); // 创建一个I/O线程的上下文 ZMQ.Socket socket = context.socket(ZMQ.REQ); // 创建一个request类型的socket,这里可以将其简单的理解为客户端,用于向response端发送数据 socket.connect("tcp://127.0.0.1:5555"); // 与response端建立连接 long now = System.currentTimeMillis(); for (int i = 0; i < 100000; i++) { String request = "hello"; socket.send(request); // 向reponse端发送数据 byte[] response = socket.recv(); // 接收response发送回来的数据 // 正在request/response模型中,send之后必须要recv之后才能继续send,这可能是为了保证整个request/response的流程走完 System.out.println("receive : " + new String(response)); } long after = System.currentTimeMillis(); System.out.println((after - now) / 1000); } }).start(); ; } } }
package zoremq; import org.zeromq.ZMQ; public class Response { public static void main(String[] args) { ZMQ.Context context = ZMQ.context(1); // 这个表示创建用于一个I/O线程的context ZMQ.Socket socket = context.socket(ZMQ.REP); // 创建一个response类型的socket,他可以接收request发送过来的请求,其实可以将其简单的理解为服务端 socket.bind("tcp://*:5555"); // 绑定端口 int i = 0; int number = 0; while (!Thread.currentThread().isInterrupted()) { i++; if (i == 10000) { i = 0; System.out.println(++number); } byte[] request = socket.recv(); // 获取request发送过来的数据 System.out.println("客户端的相应:" + new String(request)); // System.out.println("receive : " + new String(request)); String response = "world"; socket.send(response); // 向request端发送数据 // ,必须要要request端返回数据,没有返回就又recv,将会出错,这里可以理解为强制要求走完整个request/response流程 } socket.close(); // 先关闭socket context.term(); // 关闭当前的上下文 } }
2、Publish-subscribe
package zoremq.sub; import org.zeromq.ZMQ; import org.zeromq.ZMQ.Context; import org.zeromq.ZMQ.Socket; public class Publisher { /** * @param args * @throws InterruptedException */ public static void main(String[] args) throws InterruptedException { // TODO Auto-generated method stub Context context = ZMQ.context(1); Socket publisher = context.socket(ZMQ.PUB); publisher.bind("tcp://*:5557"); int i = 0; while (true) { Thread.currentThread().sleep(1000); publisher.send("A".getBytes(), ZMQ.SNDMORE); publisher.send("This is A".getBytes(), 0); publisher.send("B".getBytes(), ZMQ.SNDMORE); publisher.send("This is B".getBytes(), 0); } } }
package zoremq.sub; import org.zeromq.ZMQ; import org.zeromq.ZMQ.Context; import org.zeromq.ZMQ.Socket; public class Sub1 { public static void main(String[] args) { Context context = ZMQ.context(1); Socket subscribe = context.socket(ZMQ.SUB); subscribe.connect("tcp://127.0.0.1:5557"); subscribe.subscribe("B".getBytes()); while (true) { System.out.println(new String(subscribe.recv(0))); System.out.println(new String(subscribe.recv(0))); } } }
3、PipeLine模式
package zoremq.sub; import org.zeromq.ZMQ; import org.zeromq.ZMQ.Context; import org.zeromq.ZMQ.Socket; public class Sub2 { public static void main(String[] args) { Context context = ZMQ.context(1); Socket subscribe = context.socket(ZMQ.SUB); subscribe.connect("tcp://127.0.0.1:5557"); //subscribe.subscribe("topic".getBytes()); subscribe.subscribe("A".getBytes()); while (true) { System.out.println(new String(subscribe.recv(0))); System.out.println(new String(subscribe.recv(0))); } } }
3、PipeLine模式
package zoremq.PipeLine; import org.zeromq.ZMQ; public class MainPusher { public static void main(String[] args) throws InterruptedException { // 参数代表使用多少线程,大多数情况下,1个线程已经足够。 ZMQ.Context context = ZMQ.context(1); // 指定模式为Pusher ZMQ.Socket socket = context.socket(ZMQ.PUSH); socket.bind("tcp://127.0.0.1:5557"); // 绑定服务地址及端口 for (;;) { long time = System.nanoTime(); socket.send(String.valueOf(time)); System.out.println("发布了新消息,时间:" + time); Thread.sleep(2000); } } }
package zoremq.PipeLine; import org.zeromq.ZMQ; public class WorkerOne { public static void main(String[] args) { // 指定模式为pull模式 ZMQ.Socket receiver = ZMQ.context(1).socket(ZMQ.PULL); receiver.connect("tcp://127.0.0.1:5557"); // 指定模式为push模式 ZMQ.Socket sender = ZMQ.context(1).socket(ZMQ.PUSH); sender.connect("tcp://127.0.0.1:5558"); for (;;) { byte[] recs = receiver.recv(); long receiveTime = System.nanoTime(); String oriMsg = new String(recs); long pubTime = Long.valueOf(oriMsg); long costTime = receiveTime - pubTime; System.out.println("Receive: " + oriMsg + " Cost time: " + costTime); sender.send("1" + oriMsg); System.out.println("Send to sinker."); } } }
package zoremq.PipeLine; import org.zeromq.ZMQ; public class WorkerTwo { public static void main(String[] args) { // 指定模式为pull模式 ZMQ.Socket receiver = ZMQ.context(1).socket(ZMQ.PULL); receiver.connect("tcp://127.0.0.1:5557"); // 指定模式为push模式 ZMQ.Socket sender = ZMQ.context(1).socket(ZMQ.PUSH); sender.connect("tcp://127.0.0.1:5558"); for (;;) { byte[] recs = receiver.recv(); long receiveTime = System.nanoTime(); String oriMsg = new String(recs); long pubTime = Long.valueOf(oriMsg); long costTime = receiveTime - pubTime; System.out.println("Receive: " + oriMsg + " Cost time: " + costTime); sender.send("2" + oriMsg); System.out.println("Send to sinker."); } } }
package zoremq.PipeLine; import org.zeromq.ZMQ; public class Sinker { public static void main(String[] args) { ZMQ.Context context = ZMQ. context(1); // 指定模式为pull模式 ZMQ.Socket receiver = context.socket(ZMQ. PULL); receiver. bind("tcp://127.0.0.1:5558"); for (;;) { byte[] recs = receiver.recv(); long receiveTime = System. nanoTime(); String oriMsg = new String(recs); String msg = new String(recs,1,recs.length-1); long pubTime = Long. valueOf(msg); long costTime = receiveTime - pubTime; System. out.println( "Receive: " + oriMsg + " Cost time: " + costTime); } } }
参考:http://www.coderli.com
相关推荐
JeroMQ Pure Java implementation of libzmq (http://zeromq.org). Features Based on libzmq 4.1.7. ZMTP/3.0 (http://rfc.zeromq.org/spec:23). tcp:// protocol and inproc:// is compatible with ...
JeroMQ是libzmq(ZeroMQ)的纯Java实现。这个是Android可用的jar包。 4.1.7版
jeromq-0.3.5.jar 线程间通讯
标题“jeromq-master.zip”指的是一个包含Java实现的libzmq源代码的压缩包,名为"jeromq-master"。libzmq,通常被称为ZeroMQ,是一个开源的消息队列库,它为开发者提供了高性能、轻量级的异步消息传递机制。这个库...
标题中的“ZMQ的纯java实现JeroMQ jar包及javadoc”指的是JeroMQ,这是一个完全用Java语言实现的零拷贝(ZeroMQ)消息队列库。ZeroMQ,通常简称为ZMQ,是一个高性能的开源消息中间件,它提供了一个轻量级的消息传递...
JeroMQ libzmq( )的纯Java实现。 特征 基于libzmq 4.1.7。 ZMTP / 3.0( )。 tcp://协议和inproc://与zeromq兼容。 ipc://协议仅在jeromq之间起作用(内部使用tcp://127.0.0.1:port)。 有价证券 。 与...
这个例子展示了zeromq如何处理服务器端的通信流程。 标签"zeromq jar"暗示了我们将在Java程序中使用zeromq的JAR包。在Java项目中,JAR包是库或框架的打包形式,包含了所有必要的类和资源。为了使用zeromq,我们需要...
在这个例子中,数据会从发送者经过两个并行的处理器,最后到达接收者。每个处理器都是独立运行的,可以根据需求扩展更多的处理器节点以增加并行处理能力。 在Eclipse环境下,你可以直接运行这个maven项目,由于所有...
标签“JeroMQ”是指ZeroMQ的Java实现,它与JZMQ是相关的,都属于Java语言对ZeroMQ的接口。“Android”标签表明这个项目是针对Android平台的。“ZeroMQ”标签则再次强调了这是关于ZeroMQ技术的内容。 在压缩包子文件...
官方离线安装包,亲测可用
官方离线安装包,亲测可用
在这个场景中,我们关注的是一个名为"LabVertxMQ"的项目,它是一个研究和测试实验室,专门针对使用Vert.X与JEroMQ结合创建半功能的TestNG测试上下文。这个项目对于理解和实践基于Java的异步事件驱动编程、消息队列...
在给定的“jzmq源码.zip”压缩包中,包含的是jeromq项目的源代码,jeromq是jzmq的一个分支,提供了更稳定和优化的实现。 **ZeroMQ简介** ZeroMQ,又称为ØMQ或0MQ,是一个轻量级的消息中间件,它提供了一种灵活的、...
这个简单的例子展示了zeromq pub-sub模式的基本用法。实际应用中,你可以根据需要创建多个订阅者,订阅不同的主题,以实现更复杂的网络通信架构。同时,jeromq还提供了其他高级特性,如负载均衡、身份认证和安全连接...
1. 下载JeroMQ的jar文件,并将其添加到你的项目类路径中。如果你使用的是Maven,可以在pom.xml文件中添加依赖: ```xml <groupId>org.zeromq <artifactId>jeromq <version>0.5.2</version> <!-- 按照实际版本号...
描述中提到的 "Pure Java ZeroMQ" 没有提供额外的具体信息,但我们可以推测,这可能是一个Java版的ZeroMQ实现,例如JeroMQ,它是ZeroMQ的一个纯Java绑定,允许Java开发者充分利用ZeroMQ的功能。 **知识点一:ZeroMQ...
【描述】"jeromq.zip, ZeroMQ POJO实现纯Java ZeroMQ" 指的是 Jeromq,它是 ZeroMQ 的一个纯Java实现。ZeroMQ 是一个高性能的消息传递库,它提供了一种轻量级的、面向消息的中间件,支持多种协议,并可在多种语言中...
jar包,亲测可用
描述中提到了 "jeromq.zip",这是一个纯 Java 实现的 ZeroMQ(又称 ØMQ、零MQ)库,名为 "JeroMQ"。ZeroMQ 是一个高级的消息队列库,它提供了分布式计算中的多种通信模式,如发布/订阅、请求/响应、推拉模式等。...
通过阅读和实践这些例子,可以深入理解ZeroMQ的工作原理和用法,提升在分布式系统开发中的能力。 总的来说,学习ZeroMQ可以帮助开发者构建高效、可靠和可扩展的网络应用,尤其适用于需要高性能通信的场景,如实时...