zoremq
这个不能算是mq产品,只能算是个socket的封装,并针对常用场景进行了抽取。简化了网络编程。
jeromq 是对 zoremq 框架的一个纯粹的java版本的实现(不需要依赖zoremq 动态库)
jeromq 它是基于java nio 进行的封装。
maven 依赖,仅一个jar
<dependency> <groupId>org.zeromq</groupId> <artifactId>jeromq</artifactId> <version>0.3.4</version> </dependency>
1、请求-响应模式
package zoremq; import org.zeromq.ZMQ; public class Request { public static void main(String args[]) { for (int j = 0; j < 5; j++) { new Thread(new Runnable() { public void run() { ZMQ.Context context = ZMQ.context(1); // 创建一个I/O线程的上下文 ZMQ.Socket socket = context.socket(ZMQ.REQ); // 创建一个request类型的socket,这里可以将其简单的理解为客户端,用于向response端发送数据 socket.connect("tcp://127.0.0.1:5555"); // 与response端建立连接 long now = System.currentTimeMillis(); for (int i = 0; i < 100000; i++) { String request = "hello"; socket.send(request); // 向reponse端发送数据 byte[] response = socket.recv(); // 接收response发送回来的数据 // 正在request/response模型中,send之后必须要recv之后才能继续send,这可能是为了保证整个request/response的流程走完 System.out.println("receive : " + new String(response)); } long after = System.currentTimeMillis(); System.out.println((after - now) / 1000); } }).start(); ; } } }
package zoremq; import org.zeromq.ZMQ; public class Response { public static void main(String[] args) { ZMQ.Context context = ZMQ.context(1); // 这个表示创建用于一个I/O线程的context ZMQ.Socket socket = context.socket(ZMQ.REP); // 创建一个response类型的socket,他可以接收request发送过来的请求,其实可以将其简单的理解为服务端 socket.bind("tcp://*:5555"); // 绑定端口 int i = 0; int number = 0; while (!Thread.currentThread().isInterrupted()) { i++; if (i == 10000) { i = 0; System.out.println(++number); } byte[] request = socket.recv(); // 获取request发送过来的数据 System.out.println("客户端的相应:" + new String(request)); // System.out.println("receive : " + new String(request)); String response = "world"; socket.send(response); // 向request端发送数据 // ,必须要要request端返回数据,没有返回就又recv,将会出错,这里可以理解为强制要求走完整个request/response流程 } socket.close(); // 先关闭socket context.term(); // 关闭当前的上下文 } }
2、Publish-subscribe
package zoremq.sub; import org.zeromq.ZMQ; import org.zeromq.ZMQ.Context; import org.zeromq.ZMQ.Socket; public class Publisher { /** * @param args * @throws InterruptedException */ public static void main(String[] args) throws InterruptedException { // TODO Auto-generated method stub Context context = ZMQ.context(1); Socket publisher = context.socket(ZMQ.PUB); publisher.bind("tcp://*:5557"); int i = 0; while (true) { Thread.currentThread().sleep(1000); publisher.send("A".getBytes(), ZMQ.SNDMORE); publisher.send("This is A".getBytes(), 0); publisher.send("B".getBytes(), ZMQ.SNDMORE); publisher.send("This is B".getBytes(), 0); } } }
package zoremq.sub; import org.zeromq.ZMQ; import org.zeromq.ZMQ.Context; import org.zeromq.ZMQ.Socket; public class Sub1 { public static void main(String[] args) { Context context = ZMQ.context(1); Socket subscribe = context.socket(ZMQ.SUB); subscribe.connect("tcp://127.0.0.1:5557"); subscribe.subscribe("B".getBytes()); while (true) { System.out.println(new String(subscribe.recv(0))); System.out.println(new String(subscribe.recv(0))); } } }
3、PipeLine模式
package zoremq.sub; import org.zeromq.ZMQ; import org.zeromq.ZMQ.Context; import org.zeromq.ZMQ.Socket; public class Sub2 { public static void main(String[] args) { Context context = ZMQ.context(1); Socket subscribe = context.socket(ZMQ.SUB); subscribe.connect("tcp://127.0.0.1:5557"); //subscribe.subscribe("topic".getBytes()); subscribe.subscribe("A".getBytes()); while (true) { System.out.println(new String(subscribe.recv(0))); System.out.println(new String(subscribe.recv(0))); } } }
3、PipeLine模式
package zoremq.PipeLine; import org.zeromq.ZMQ; public class MainPusher { public static void main(String[] args) throws InterruptedException { // 参数代表使用多少线程,大多数情况下,1个线程已经足够。 ZMQ.Context context = ZMQ.context(1); // 指定模式为Pusher ZMQ.Socket socket = context.socket(ZMQ.PUSH); socket.bind("tcp://127.0.0.1:5557"); // 绑定服务地址及端口 for (;;) { long time = System.nanoTime(); socket.send(String.valueOf(time)); System.out.println("发布了新消息,时间:" + time); Thread.sleep(2000); } } }
package zoremq.PipeLine; import org.zeromq.ZMQ; public class WorkerOne { public static void main(String[] args) { // 指定模式为pull模式 ZMQ.Socket receiver = ZMQ.context(1).socket(ZMQ.PULL); receiver.connect("tcp://127.0.0.1:5557"); // 指定模式为push模式 ZMQ.Socket sender = ZMQ.context(1).socket(ZMQ.PUSH); sender.connect("tcp://127.0.0.1:5558"); for (;;) { byte[] recs = receiver.recv(); long receiveTime = System.nanoTime(); String oriMsg = new String(recs); long pubTime = Long.valueOf(oriMsg); long costTime = receiveTime - pubTime; System.out.println("Receive: " + oriMsg + " Cost time: " + costTime); sender.send("1" + oriMsg); System.out.println("Send to sinker."); } } }
package zoremq.PipeLine; import org.zeromq.ZMQ; public class WorkerTwo { public static void main(String[] args) { // 指定模式为pull模式 ZMQ.Socket receiver = ZMQ.context(1).socket(ZMQ.PULL); receiver.connect("tcp://127.0.0.1:5557"); // 指定模式为push模式 ZMQ.Socket sender = ZMQ.context(1).socket(ZMQ.PUSH); sender.connect("tcp://127.0.0.1:5558"); for (;;) { byte[] recs = receiver.recv(); long receiveTime = System.nanoTime(); String oriMsg = new String(recs); long pubTime = Long.valueOf(oriMsg); long costTime = receiveTime - pubTime; System.out.println("Receive: " + oriMsg + " Cost time: " + costTime); sender.send("2" + oriMsg); System.out.println("Send to sinker."); } } }
package zoremq.PipeLine; import org.zeromq.ZMQ; public class Sinker { public static void main(String[] args) { ZMQ.Context context = ZMQ. context(1); // 指定模式为pull模式 ZMQ.Socket receiver = context.socket(ZMQ. PULL); receiver. bind("tcp://127.0.0.1:5558"); for (;;) { byte[] recs = receiver.recv(); long receiveTime = System. nanoTime(); String oriMsg = new String(recs); String msg = new String(recs,1,recs.length-1); long pubTime = Long. valueOf(msg); long costTime = receiveTime - pubTime; System. out.println( "Receive: " + oriMsg + " Cost time: " + costTime); } } }
参考:http://www.coderli.com
相关推荐
这个例子展示了zeromq如何处理服务器端的通信流程。 标签"zeromq jar"暗示了我们将在Java程序中使用zeromq的JAR包。在Java项目中,JAR包是库或框架的打包形式,包含了所有必要的类和资源。为了使用zeromq,我们需要...
在这个例子中,数据会从发送者经过两个并行的处理器,最后到达接收者。每个处理器都是独立运行的,可以根据需求扩展更多的处理器节点以增加并行处理能力。 在Eclipse环境下,你可以直接运行这个maven项目,由于所有...
这个简单的例子展示了zeromq pub-sub模式的基本用法。实际应用中,你可以根据需要创建多个订阅者,订阅不同的主题,以实现更复杂的网络通信架构。同时,jeromq还提供了其他高级特性,如负载均衡、身份认证和安全连接...
通过阅读和实践这些例子,可以深入理解ZeroMQ的工作原理和用法,提升在分布式系统开发中的能力。 总的来说,学习ZeroMQ可以帮助开发者构建高效、可靠和可扩展的网络应用,尤其适用于需要高性能通信的场景,如实时...
Mycos 建立在 ZeroMQ 高性能库(实际上是 Java 端口 JeroMQ)和 GSON object-json 解析器之上。 它隐藏了网络编程中常见的资源管理问题。 ###simple client-server example 也许简单的例子是描述 mycos 最有效的...