在上一篇中“基于Pipe管道通信”中很多朋友反应说只有代码没有理论,看起来不知道过程。本文将过程补上,而且提供基于Pipe的另种通信方式-监听方式的实现。
Pipe是Peer之间通信主要机制之一。JXTA中的Pipe和传统的管道存在着本质的区别。它不是内存中存在的数据,也不是硬盘上的文件,而只是一个XML广告文档。因此,其是抽象的概念。其创建过程在这里也称为绑定过程,可以在不同时刻动态地绑定到不同的“物理管道末端”。其具有两个明显的特点:
1、Pipe可以绑定到任何端点
2、Pipe是单向的、不可靠的、异步的
JXTA中的Pipe中存在三种类型:JxtaUnicast,JxtaUnicastSecure,JxtaPropagate。
JxtaUnicast,JxtaUnicastSecure类型管道是单向的,可以在一个输入管道和一个或多个输出管道之间创建,而JxtaPropagate类型的管道则支持多个输入端点。
下面给出其具体通信过程:
1、作为接收方的Peer需要具备一个PipeAdvertisement,然后根据这个PipeAdvertisement创建一个InputPipe,然后等待Messages的到达。
2、发送数据方的Peer需要使用相同的PipeAdvertisement,然后根据这个PipeAdvertisement来创建OutputPipe以发送数据。要创建OutputPipe,它要先发送一个Pipe Binding Query Message给所有它知道的Peers。
3、接收方收到这个Pipe Binding Query Message,看看自己缓存的Pipes中有没有匹配的PipeID。如果有,它就回复一个Pipe Binding Answer Message(里面包含了自己的PeerAdvertisement)给接收方。
4、发送方接收到Pipe Binding Answer Message后,将PeerAdvertisement从中抽取出来。然后使用PeerAdvertisement中的Endpoint信息来创建OutputPipe,这样发送方才可以发送数据。
以上就是JXTA中基于Pipe的通信过程,下面我们基于Pipe,利用信息监听接口实现通信。在给出代码之前,先给出下面两个类所用到的Pipe中的主要类的描述。
PipeService:管道服务实现的接口,用于创建输入,输出管道以及创建管道注册监听器
InputPipe:输入管道的接口,为注册监听器的时候也可以等待信息
OutputPipe:输出管道的接口,用来发送信息
PipeMsgListener(之前版本是InputPipeListener):由JXTA程序实现的的接口,处理输入管道消息达到事件
OutputPipeListener:由需要处理的输出管道事件的应用程序实现的接口,通常在管道绑定完成的时候发生。
PipeMsgEvent:注册到输入管道监听器的类,表示消息的到达。
OutputPipeEvent:注册到输出管道监听器的类,可以用来绑定或者解析成功后的管道的ID或者输出管道的实体。
写了这么,有些人应该看不下去了,呵呵。好,下面就给出代码,在这里我使用的PipeAdvertisement与以前的不同,以前是在程序中生成,在这里是读pipe.adv来返回管道广告。该文件直接在工程下面,注意:如果读不到该文件,应该是路径问题。
public class PipeServer implements PipeMsgListener {
static PeerGroup netPg = null;
transient NetworkConfigurator config;
private PipeService pipeService;
private PipeAdvertisement pipeAdv;
private InputPipe inputPipe = null;
public PipeServer() {
config = null;
try {
config = new NetworkConfigurator();
config.setPrincipal("Pipe1");
config.setPassword("888888888");
config.save();
netPg = new NetPeerGroupFactory().getInterface();
} catch (Exception e) {
e.printStackTrace();
System.exit(-1);
}
pipeService = netPg.getPipeService();
pipeAdv = PipeClient.getPipeAdvertisement();
}
public static void main(String args[]) {
PipeServer server = new PipeServer();
server.start();
}
/**
* 创建输入管道并监听信息
*/
public void start() {
try {
System.out.println("服务端=创建输入管道");
// 创建输入管道并注册等待信息的到达
inputPipe = pipeService.createInputPipe(pipeAdv, this);
} catch (IOException io) {
io.printStackTrace();
return;
}
if (inputPipe == null) {
System.out.println("服务端=不能打开输入管道");
System.exit(-1);
}
System.out.println("服务端=等待输出管道中的信息......");
}
/**
* 关闭输入管道及退出程序
*/
public void stop() {
inputPipe.close();
System.exit(-1);
}
/**
* 要实现PipeMsgListener接口该方法,用来接收输出管道的中信息,并打印在控制台
*/
public void pipeMsgEvent(PipeMsgEvent event) {
Message msg;
try {
// PipeMsgEvent事件中包含信息
msg = event.getMessage();
if (msg == null) {
System.out.println("服务端=空信息");
return;
}
} catch (Exception e) {
e.printStackTrace();
return;
}
// 取得所有信息元素
Message.ElementIterator en = msg.getMessageElements();
if (!en.hasNext()) {
return;
}
// 取得content元素下的内容
MessageElement msgElement = msg.getMessageElement(null, PipeClient.MESSAGE_NAME_SPACE);
// 接收信息
if (msgElement.toString() == null) {
System.out.println("服务端=接到空的信息 ");
} else {
System.out.println("服务端=接收到的信息 :" + msgElement.toString());
}
}
}
首先运行的是上面这个类,如果运行顺序错了,会连接不到,而发送不了信息。
public class PipeClient implements OutputPipeListener {
public final static String MESSAGE_NAME_SPACE = "content";
private PipeService pipeService;
private PipeAdvertisement pipeAdv;
private OutputPipe outputPipe;
private NetworkConfigurator config;
private PeerGroup netPg = null;
public PipeClient() {
config = null;
try {
config = new NetworkConfigurator();
config.setPrincipal("Pipe2");
config.setPassword("888888888");
config.save();
netPg = new NetPeerGroupFactory().getInterface();
} catch (Exception e) {
e.printStackTrace();
System.exit(-1);
}
// 获取管道服务
pipeService = netPg.getPipeService();
// 创建管道广告
pipeAdv = getPipeAdvertisement();
}
public static void main(String args[]) {
// 连接目标Peer
PipeClient client = new PipeClient();
client.start();
}
/**
* 创建管道广告
*/
public static PipeAdvertisement getPipeAdvertisement() {
PipeAdvertisement advertisement=null;
FileInputStream is;
try {
is = new FileInputStream("pipe.adv");//通过本地特定管道广告来创建管道
advertisement=(PipeAdvertisement)AdvertisementFactory.newAdvertisement(MimeMediaType.XMLUTF8, is);
is.close();
} catch (Exception e) {
e.printStackTrace();
}
return advertisement;
}
/**
* 创建输出管道
*/
public synchronized void start() {
try {
// 一旦输出管道创建完成,outputPipeEvent()就被激活
pipeService.createOutputPipe(pipeAdv, this);
} catch (IOException e) {
System.out.println("客户端=创建输出管道失败");
e.printStackTrace();
System.exit(-1);
}
}
/**
* 要实现OutputPipeListener中的该方法
* 当OutputPipe创建完成则调用该方法
*/
public void outputPipeEvent(OutputPipeEvent event) {
System.out.println("收到输出管道创建事件");
// 获取输出管道对象
outputPipe = event.getOutputPipe();
Message msg;
try {
System.out.println("发送信息中......");
// 实例化要发送的信息对象
msg = new Message();
// 字符串信息元素中添加要发送的内容“Testing Pipe"
StringMessageElement sme = new StringMessageElement(MESSAGE_NAME_SPACE, "Testing Pipe", null);
msg.addMessageElement(null, sme);
// 发送信息
outputPipe.send(msg);
System.out.println("客户端:信息已经发送");
} catch (IOException e) {
System.out.println("客户端:发送信息失败");
e.printStackTrace();
System.exit(-1);
}
stop();
}
/**
* 关闭输出管道和退出程序
*/
public void stop() {
outputPipe.close();
System.exit(-1);
}
}
终于完了,这是我写的最长一篇啦。今天就写到这了。以后会继续把我学到的JXTA P2P通信方面的知识共享给大家。
忘记提供pipe.adv,真不意思。pipe.adv文件内容如下:
<?xml version="1.0" encoding="UTF-8"?>
<jxta:PipeAdvertisement>
<Name>Pipe tutorial</Name>
<Id>urn:jxta:uuid-59616261646162614E50472050325033C0C1DE89719B456691A596B983BA0E1004</Id>
<Type>JxtaUnicast</Type>
</jxta:PipeAdvertisement>
分享到:
相关推荐
该包内包含多个方法,比如绑定输入管道并执行监听的bind()方法,发送和接收信息的run()方法,绑定输出管道并建立通信连接的connect()方法,通过输出管道发送信息的sendMessage()方法,设定输入和输出管道监听器的...
接下来,我们来看`JxtaBiDiPipe`,这是一个双向管道,它提供了一种同步的、全双工的通信方式。与`JxtaServerPipe`不同,`JxtaBiDiPipe`允许两个节点同时进行读写操作,这对于需要双向即时交互的应用非常有用。创建`...
TCP监听在JXTA中的作用在于,它使得P2P网络中的节点能够开放端口,接收其他节点的连接请求,从而实现通信。JXTA的TCP监听主要通过Rendezvous服务来实现,Rendezvous节点相当于P2P网络中的路由中心,它们维护着网络...
4. **管道(Pipes)与通道(Rendevous)**:JXTA的通信机制基于管道,管道可以是单向或双向,而rendezvous节点则用于帮助非直连的节点建立连接。myjxta的源码会解释如何建立、管理和维护这些通信通道。 5. **安全与...
JXTA通过提供诸如广告、发现、管道、身份管理和资源交换等功能,使得在P2P环境中实现这些特性变得可能。 **JXTA的主要组件包括:** 1. **边缘服务(Edge Services)**:这是P2P网络的入口点,允许节点加入或离开...
2. **管道(Pipe)通信**:详细讲解了JXTA管道的概念,它是P2P通信的基础,支持一对多和多对多的消息传递。 3. **同步与异步管道**:对比分析了两种管道类型的特点,同步管道保证消息顺序,而异步管道提供更高的并发...
JXTA(Java XML-based Peer-to-Peer Technology)是Oracle公司推出的一种基于XML的P2P(Peer-to-Peer,对等网络)技术,旨在提供一个开放、分散的网络平台,使得节点之间可以进行直接的信息交换和服务共享。...
通过JXTA的广告系统,可以发现并连接到其他对等体,建立通信管道,实现文件共享、聊天或其他协作功能。源码分析可以帮助我们理解这些过程的具体实现。 4. **模块化设计** JXSE的源码结构清晰,采用模块化设计,...
JXSE 2.5支持JXTA的多个子协议,如Rendezvous Protocol(rendezvous服务发现)、Pipe Protocol(数据传输管道)和广告协议(advertisements,用于发布和查找服务或资源)。 **3. JXSE服务** JXSE 2.5提供了多种...