`
lizhensan
  • 浏览: 377174 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

jeromq 例子

    博客分类:
  • java
 
阅读更多

zoremq 

这个不能算是mq产品,只能算是个socket的封装,并针对常用场景进行了抽取。简化了网络编程。

jeromq 是对 zoremq 框架的一个纯粹的java版本的实现(不需要依赖zoremq 动态库)

 

jeromq 它是基于java nio 进行的封装。

 

 

maven 依赖,仅一个jar

<dependency>
			<groupId>org.zeromq</groupId>
			<artifactId>jeromq</artifactId>
			<version>0.3.4</version>
		</dependency>

 

 

 

1、请求-响应模式

 

package zoremq;

import org.zeromq.ZMQ;

public class Request {
	public static void main(String args[]) {
		for (int j = 0; j < 5; j++) {
			new Thread(new Runnable() {
				public void run() {
					ZMQ.Context context = ZMQ.context(1); // 创建一个I/O线程的上下文
					ZMQ.Socket socket = context.socket(ZMQ.REQ); // 创建一个request类型的socket,这里可以将其简单的理解为客户端,用于向response端发送数据

					socket.connect("tcp://127.0.0.1:5555"); // 与response端建立连接
					long now = System.currentTimeMillis();
					for (int i = 0; i < 100000; i++) {
						String request = "hello";
						socket.send(request); // 向reponse端发送数据
						byte[] response = socket.recv(); // 接收response发送回来的数据
															// 正在request/response模型中,send之后必须要recv之后才能继续send,这可能是为了保证整个request/response的流程走完
						System.out.println("receive : " + new String(response));
					}
					long after = System.currentTimeMillis();

					System.out.println((after - now) / 1000);
				}

			}).start();
			;
		}

	}
}

 

package zoremq;

import org.zeromq.ZMQ;

public class Response {
	public static void main(String[] args) {
		ZMQ.Context context = ZMQ.context(1); // 这个表示创建用于一个I/O线程的context

		ZMQ.Socket socket = context.socket(ZMQ.REP); // 创建一个response类型的socket,他可以接收request发送过来的请求,其实可以将其简单的理解为服务端
		socket.bind("tcp://*:5555"); // 绑定端口
		int i = 0;
		int number = 0;
		while (!Thread.currentThread().isInterrupted()) {
			i++;
			if (i == 10000) {
				i = 0;
				System.out.println(++number);
			}
			byte[] request = socket.recv(); // 获取request发送过来的数据
			System.out.println("客户端的相应:" + new String(request));
			// System.out.println("receive : " + new String(request));
			String response = "world";
			socket.send(response); // 向request端发送数据
									// ,必须要要request端返回数据,没有返回就又recv,将会出错,这里可以理解为强制要求走完整个request/response流程
		}
		socket.close(); // 先关闭socket
		context.term(); // 关闭当前的上下文

	}
}

 

 

 

2、Publish-subscribe

 

 

package zoremq.sub;

import org.zeromq.ZMQ; 
import org.zeromq.ZMQ.Context; 
import org.zeromq.ZMQ.Socket; 

public class Publisher { 

    /** 
     * @param args 
     * @throws InterruptedException 
     */ 
    public static void main(String[] args) throws InterruptedException { 
    // TODO Auto-generated method stub 
    Context context = ZMQ.context(1); 
    Socket publisher = context.socket(ZMQ.PUB); 
    publisher.bind("tcp://*:5557"); 
    int i = 0; 
    while (true) { 
        Thread.currentThread().sleep(1000); 
        publisher.send("A".getBytes(), ZMQ.SNDMORE); 
        publisher.send("This is A".getBytes(), 0); 
        publisher.send("B".getBytes(), ZMQ.SNDMORE); 
        publisher.send("This is B".getBytes(), 0); 
    } 
    } 

}

 

package zoremq.sub;

import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;

public class Sub1 {
	public static void main(String[] args) {
		Context context = ZMQ.context(1);
		Socket subscribe = context.socket(ZMQ.SUB);
		subscribe.connect("tcp://127.0.0.1:5557");
		subscribe.subscribe("B".getBytes());
		while (true) {
			System.out.println(new String(subscribe.recv(0)));
			System.out.println(new String(subscribe.recv(0)));
		}
	}

}

 3、PipeLine模式

 

 

package zoremq.sub;

import org.zeromq.ZMQ; 
import org.zeromq.ZMQ.Context; 
import org.zeromq.ZMQ.Socket; 

public class Sub2 { 
    public static void main(String[] args) { 
    Context context = ZMQ.context(1); 
    Socket subscribe = context.socket(ZMQ.SUB); 
    subscribe.connect("tcp://127.0.0.1:5557"); 
    //subscribe.subscribe("topic".getBytes()); 
    subscribe.subscribe("A".getBytes()); 
    while (true) { 
        System.out.println(new String(subscribe.recv(0))); 
        System.out.println(new String(subscribe.recv(0))); 
    } 
    } 
}

 3、PipeLine模式

package zoremq.PipeLine;

import org.zeromq.ZMQ;
public class MainPusher {
	public static void main(String[] args) throws InterruptedException {
		// 参数代表使用多少线程,大多数情况下,1个线程已经足够。
		ZMQ.Context context = ZMQ.context(1);
		// 指定模式为Pusher
		ZMQ.Socket socket = context.socket(ZMQ.PUSH);
		socket.bind("tcp://127.0.0.1:5557"); // 绑定服务地址及端口
		for (;;) {
			long time = System.nanoTime();
			socket.send(String.valueOf(time));
			System.out.println("发布了新消息,时间:" + time);
			Thread.sleep(2000);
		}
	}
}

 

package zoremq.PipeLine;

import org.zeromq.ZMQ;

public class WorkerOne {
	public static void main(String[] args) {
		// 指定模式为pull模式
		ZMQ.Socket receiver = ZMQ.context(1).socket(ZMQ.PULL);
		receiver.connect("tcp://127.0.0.1:5557");
		// 指定模式为push模式
		ZMQ.Socket sender = ZMQ.context(1).socket(ZMQ.PUSH);
		sender.connect("tcp://127.0.0.1:5558");
		for (;;) {
			byte[] recs = receiver.recv();
			long receiveTime = System.nanoTime();
			String oriMsg = new String(recs);
			long pubTime = Long.valueOf(oriMsg);
			long costTime = receiveTime - pubTime;
			System.out.println("Receive: " + oriMsg + " Cost time: " + costTime);
			sender.send("1" + oriMsg);
			System.out.println("Send to sinker.");
		}
	}
}

 

package zoremq.PipeLine;

import org.zeromq.ZMQ;

public class WorkerTwo {
	public static void main(String[] args) {
		// 指定模式为pull模式
		ZMQ.Socket receiver = ZMQ.context(1).socket(ZMQ.PULL);
		receiver.connect("tcp://127.0.0.1:5557");
		// 指定模式为push模式
		ZMQ.Socket sender = ZMQ.context(1).socket(ZMQ.PUSH);
		sender.connect("tcp://127.0.0.1:5558");
		for (;;) {
			byte[] recs = receiver.recv();
			long receiveTime = System.nanoTime();
			String oriMsg = new String(recs);
			long pubTime = Long.valueOf(oriMsg);
			long costTime = receiveTime - pubTime;
			System.out.println("Receive: " + oriMsg + " Cost time: " + costTime);
			sender.send("2" + oriMsg);
			System.out.println("Send to sinker.");
		}
	}
}

 

package zoremq.PipeLine;
import org.zeromq.ZMQ;
 
public class Sinker   {
     public static void main(String[] args) {
          ZMQ.Context context = ZMQ. context(1);
           // 指定模式为pull模式
          ZMQ.Socket receiver = context.socket(ZMQ. PULL);
          receiver. bind("tcp://127.0.0.1:5558");
           for (;;) {
               byte[] recs = receiver.recv();
               long receiveTime = System. nanoTime();
              String oriMsg = new String(recs);
              String msg = new String(recs,1,recs.length-1);
               long pubTime = Long. valueOf(msg);
               long costTime = receiveTime - pubTime;
              System. out.println( "Receive: " + oriMsg + " Cost time: " + costTime);
          }
     }
}

 

 

参考:http://www.coderli.com

 

分享到:
评论

相关推荐

    Android代码-jeromq

    JeroMQ Pure Java implementation of libzmq (http://zeromq.org). Features Based on libzmq 4.1.7. ZMTP/3.0 (http://rfc.zeromq.org/spec:23). tcp:// protocol and inproc:// is compatible with ...

    JeroMQ的jar包

    JeroMQ是libzmq(ZeroMQ)的纯Java实现。这个是Android可用的jar包。 4.1.7版

    jeromq-0.3.5.jar 线程间通讯

    jeromq-0.3.5.jar 线程间通讯

    jeromq-master.zip

    标题“jeromq-master.zip”指的是一个包含Java实现的libzmq源代码的压缩包,名为"jeromq-master"。libzmq,通常被称为ZeroMQ,是一个开源的消息队列库,它为开发者提供了高性能、轻量级的异步消息传递机制。这个库...

    ZMQ的纯java实现JeroMQ jar包及javadoc

    标题中的“ZMQ的纯java实现JeroMQ jar包及javadoc”指的是JeroMQ,这是一个完全用Java语言实现的零拷贝(ZeroMQ)消息队列库。ZeroMQ,通常简称为ZMQ,是一个高性能的开源消息中间件,它提供了一个轻量级的消息传递...

    jeromq:纯Java ZeroMQ

    JeroMQ libzmq( )的纯Java实现。 特征 基于libzmq 4.1.7。 ZMTP / 3.0( )。 tcp://协议和inproc://与zeromq兼容。 ipc://协议仅在jeromq之间起作用(内部使用tcp://127.0.0.1:port)。 有价证券 。 与...

    zermq最简单的java版实例(带jar包)

    这个例子展示了zeromq如何处理服务器端的通信流程。 标签"zeromq jar"暗示了我们将在Java程序中使用zeromq的JAR包。在Java项目中,JAR包是库或框架的打包形式,包含了所有必要的类和资源。为了使用zeromq,我们需要...

    zeromq的parallel-pipeline并行处理模式的jave实现

    在这个例子中,数据会从发送者经过两个并行的处理器,最后到达接收者。每个处理器都是独立运行的,可以根据需求扩展更多的处理器节点以增加并行处理能力。 在Eclipse环境下,你可以直接运行这个maven项目,由于所有...

    纯java版本的ZeroMQ,可用在Android上

    标签“JeroMQ”是指ZeroMQ的Java实现,它与JZMQ是相关的,都属于Java语言对ZeroMQ的接口。“Android”标签表明这个项目是针对Android平台的。“ZeroMQ”标签则再次强调了这是关于ZeroMQ技术的内容。 在压缩包子文件...

    sclo-cassandra3-jeromq-0.3.6-3.el7.noarch.rpm

    官方离线安装包,亲测可用

    sclo-cassandra3-jeromq-javadoc-0.3.6-3.el7.noarch.rpm

    官方离线安装包,亲测可用

    LabVertxMQ:用于使用Vert.X从JEroMQ项目创建半功能TestNG测试上下文的研究,评估和测试实验室

    在这个场景中,我们关注的是一个名为"LabVertxMQ"的项目,它是一个研究和测试实验室,专门针对使用Vert.X与JEroMQ结合创建半功能的TestNG测试上下文。这个项目对于理解和实践基于Java的异步事件驱动编程、消息队列...

    jzmq源码.zip

    在给定的“jzmq源码.zip”压缩包中,包含的是jeromq项目的源代码,jeromq是jzmq的一个分支,提供了更稳定和优化的实现。 **ZeroMQ简介** ZeroMQ,又称为ØMQ或0MQ,是一个轻量级的消息中间件,它提供了一种灵活的、...

    zeromq的pub-sub订阅模式的jave实现

    这个简单的例子展示了zeromq pub-sub模式的基本用法。实际应用中,你可以根据需要创建多个订阅者,订阅不同的主题,以实现更复杂的网络通信架构。同时,jeromq还提供了其他高级特性,如负载均衡、身份认证和安全连接...

    zeromq简易demo搭建(java版本)

    1. 下载JeroMQ的jar文件,并将其添加到你的项目类路径中。如果你使用的是Maven,可以在pom.xml文件中添加依赖: ```xml &lt;groupId&gt;org.zeromq &lt;artifactId&gt;jeromq &lt;version&gt;0.5.2&lt;/version&gt; &lt;!-- 按照实际版本号...

    Pure Java ZeroMQ .zip

    描述中提到的 "Pure Java ZeroMQ" 没有提供额外的具体信息,但我们可以推测,这可能是一个Java版的ZeroMQ实现,例如JeroMQ,它是ZeroMQ的一个纯Java绑定,允许Java开发者充分利用ZeroMQ的功能。 **知识点一:ZeroMQ...

    fabric3-ftp-spi-1.9.6.zip

    【描述】"jeromq.zip, ZeroMQ POJO实现纯Java ZeroMQ" 指的是 Jeromq,它是 ZeroMQ 的一个纯Java实现。ZeroMQ 是一个高性能的消息传递库,它提供了一种轻量级的、面向消息的中间件,支持多种协议,并可在多种语言中...

    bristleback-core-0.3.5-sources.jar

    jar包,亲测可用

    crux-runtime-5.1.0.zip

    描述中提到了 "jeromq.zip",这是一个纯 Java 实现的 ZeroMQ(又称 ØMQ、零MQ)库,名为 "JeroMQ"。ZeroMQ 是一个高级的消息队列库,它提供了分布式计算中的多种通信模式,如发布/订阅、请求/响应、推拉模式等。...

    Zmq使用实例多种语言C,C++,OC,Python等

    通过阅读和实践这些例子,可以深入理解ZeroMQ的工作原理和用法,提升在分布式系统开发中的能力。 总的来说,学习ZeroMQ可以帮助开发者构建高效、可靠和可扩展的网络应用,尤其适用于需要高性能通信的场景,如实时...

Global site tag (gtag.js) - Google Analytics