`
lobin
  • 浏览: 417954 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

zookeeper 连接建立会话

 
阅读更多

建立连接会话

 

建立连接会话请求响应没有头部

 

 

建立连接会话请求

class ConnectRequest {

        int protocolVersion;

        long lastZxidSeen;

        int timeOut;

        long sessionId;

        buffer passwd;

}

 

 

建立连接会话响应

class ConnectResponse {

        int protocolVersion;

        int timeOut;

        long sessionId;

        buffer passwd;

}

 

 

 

public class ConnectTest {

 

private static final Logger log = LoggerFactory.getLogger(ConnectTest.class);

 

private static SocketChannel channel;

 

@BeforeClass

public static void initialize() throws IOException {

channel = SocketChannel.open();

 

SocketAddress socketAddress = new InetSocketAddress("localhost", 2191);

channel.connect(socketAddress);

}

 

private static class WriteWorker extends Thread {

public void run() {

//for (;;) {

try {

ByteArrayOutputStream baos = new ByteArrayOutputStream();

BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);

 

ConnectRequest connectRequest = new ConnectRequest(0, 0, 10000, 0, new byte[16]);

 

boa.writeInt(-1, "len"); // We'll fill this in later

connectRequest.serialize(boa, "connect");

boa.writeBool(true, "readOnly");

baos.close();

 

ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());

bb.putInt(bb.capacity() - 4);

bb.rewind();

 

channel.write(bb);

} catch (IOException e) {

e.printStackTrace();

}

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

//}

}

}

 

private static class ReaderWorker extends Thread {

public void run() {

for (;;) {

try {

ByteBuffer lenBuffer = ByteBuffer.allocateDirect(4);

while (true) {

channel.read(lenBuffer);

if (! lenBuffer.hasRemaining()) {

break;

}

}

lenBuffer.flip();

int len = lenBuffer.getInt();

if (len < 0 || len >= ClientCnxn.packetLen) {

throw new IOException("Packet len" + len + " is out of range!");

}

 

ByteBuffer buffer = ByteBuffer.allocate(len);

while (true) {  

channel.read(buffer);

if (! buffer.hasRemaining()) {

break;

}

}

buffer.flip();

 

ByteBufferInputStream bbis = new ByteBufferInputStream(buffer);

BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);

ConnectResponse conRsp = new ConnectResponse();

conRsp.deserialize(bbia, "connect");

 

// read "is read-only" flag

        boolean isRO = false;

        try {

            isRO = bbia.readBool("readOnly");

        } catch (IOException e) {

            // this is ok -- just a packet from an old server which

            // doesn't contain readOnly field

        log.warn("Connected to an old server; r-o mode will be unavailable");

        }

        

System.out.println(ToStringBuilder.reflectionToString(conRsp, ToStringStyle.MULTI_LINE_STYLE));

} catch (IOException e) {

e.printStackTrace();

}

}

}

}

 

@Test

public void connect() throws IOException {

Thread wt = new WriteWorker();

wt.start();

Thread rt = new ReaderWorker();

rt.start();

 

try {

wt.join();

rt.join();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

 

 

启动运行后:

 

服务器打印出如下日志:

2017-04-14 20:53:31,138 [myid:1] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2191:NIOServerCnxnFactory@192] - Accepted socket connection from /127.0.0.1:2544

2017-04-14 20:53:31,170 [myid:1] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2191:ZooKeeperServer@900] - Client attempting to establish new session at /127.0.0.1:2544

2017-04-14 20:53:31,248 [myid:1] - INFO  [CommitProcessor:1:ZooKeeperServer@645] - Established session 0x15b6c2343f90011 with negotiated timeout 10000 for client /127.0.0.1:2544

2017-04-14 20:53:42,060 [myid:1] - INFO  [CommitProcessor:1:NIOServerCnxn@1008] - Closed socket connection for client /127.0.0.1:2544 which had sessionid 0x15b6c2343f90011

 

 

 

 

客户端输出:

org.apache.zookeeper.proto.ConnectResponse@16de067[

  protocolVersion=0

  timeOut=10000

  sessionId=97790715638644753

  passwd={-18,75,33,46,-73,2,30,-80,-43,-55,-68,-125,-23,-87,6,72}

]

可以看出建立了一个新会话,会话id为:97790715638644753

 

zookeeper 请求认证

 


认证
认证请求有头部,响应有头部


请求头

class RequestHeader {
        int xid;
        int type;
}

 


认证请求

class AuthPacket {
        int type;
        ustring scheme;
        buffer auth;
}

 


响应头

class ReplyHeader {
        int xid;
        long zxid;
        int err;
}

 




public class AuthPacketTest {
 
private static final Logger log = LoggerFactory.getLogger(AuthPacketTest.class);
 
private static SocketChannel channel;
 
private static volatile boolean isSessionOpened = false;
 
private static long sid = 0;
 
@BeforeClass
public static void initialize() throws IOException {
channel = SocketChannel.open();
 
//SocketAddress socketAddress = new InetSocketAddress("localhost", 2182);
SocketAddress socketAddress = new InetSocketAddress("localhost", 2191);
channel.connect(socketAddress);
}
 
private static class WriteWorker extends Thread {
private void isSessionAlive() {
heartbeat();
}
 
private void heartbeat() {
ping();
}
 
private void ping() {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
 
boa.writeInt(-1, "len"); // We'll fill this in later
 
RequestHeader h = new RequestHeader(-2, OpCode.ping);
h.serialize(boa, "header");
baos.close();
 
ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
bb.putInt(bb.capacity() - 4);
bb.rewind();
 
channel.write(bb);
} catch (IOException e) {
e.printStackTrace();
}
}
 
private void auth() {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
 
boa.writeInt(-1, "len"); // We'll fill this in later
 
RequestHeader requestHeader = new RequestHeader(-4, OpCode.auth);
requestHeader.serialize(boa, "header");
 
//AuthPacket authPacket = new AuthPacket(0, "auth", "".getBytes());
AuthPacket authPacket = new AuthPacket(0, "ip", "127.0.0.1".getBytes());
authPacket.serialize(boa, "request");
baos.close();
 
ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
bb.putInt(bb.capacity() - 4);
bb.rewind();
 
channel.write(bb);
} catch (IOException e) {
e.printStackTrace();
}
}
 
private void connect() {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
 
ConnectRequest connectRequest = new ConnectRequest(0, 0, 10000, 0, new byte[16]);
 
boa.writeInt(-1, "len"); // We'll fill this in later
connectRequest.serialize(boa, "connect");
boa.writeBool(true, "readOnly");
baos.close();
 
ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
bb.putInt(bb.capacity() - 4);
bb.rewind();
 
channel.write(bb);
} catch (IOException e) {
e.printStackTrace();
}
}
public void run() {
connect();
for (;;) {
isSessionAlive();
auth();
 
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
 
private static class ReaderWorker extends Thread {
private void handConnectResponse() {
try {
ByteBuffer lenBuffer = ByteBuffer.allocateDirect(4);
while (true) {
channel.read(lenBuffer);
if (! lenBuffer.hasRemaining()) {
break;
}
}
lenBuffer.flip();
int len = lenBuffer.getInt();
if (len < 0 || len >= ClientCnxn.packetLen) {
throw new IOException("Packet len" + len + " is out of range!");
}
 
ByteBuffer buffer = ByteBuffer.allocate(len);
while (true) {  
channel.read(buffer);
if (! buffer.hasRemaining()) {
break;
}
}
buffer.flip();
 
ByteBufferInputStream bbis = new ByteBufferInputStream(buffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ConnectResponse conRsp = new ConnectResponse();
conRsp.deserialize(bbia, "connect");
 
// read "is read-only" flag
       boolean isRO = false;
       try {
           isRO = bbia.readBool("readOnly");
       } catch (IOException e) {
           // this is ok -- just a packet from an old server which
           // doesn't contain readOnly field
       log.warn("Connected to an old server; r-o mode will be unavailable");
       }
       
       sid = conRsp.getSessionId();
       isSessionOpened = true;
System.out.println(ToStringBuilder.reflectionToString(conRsp, ToStringStyle.MULTI_LINE_STYLE));
} catch (IOException e) {
e.printStackTrace();
}
}
 
private void handleResponse() {
try {
ByteBuffer lenBuffer = ByteBuffer.allocateDirect(4);
while (true) {
channel.read(lenBuffer);
if (! lenBuffer.hasRemaining()) {
break;
}
}
lenBuffer.flip();
int len = lenBuffer.getInt();
if (len < 0 || len >= ClientCnxn.packetLen) {
throw new IOException("Packet len" + len + " is out of range!");
}
 
ByteBuffer buffer = ByteBuffer.allocate(len);
while (true) {  
channel.read(buffer);
if (! buffer.hasRemaining()) {
break;
}
}
buffer.flip();
 
ByteBufferInputStream bbis = new ByteBufferInputStream(buffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
 
ReplyHeader replyHdr = new ReplyHeader();
           replyHdr.deserialize(bbia, "header");
       
System.out.println(ToStringBuilder.reflectionToString(replyHdr, ToStringStyle.MULTI_LINE_STYLE));
} catch (IOException e) {
e.printStackTrace();
}
}
 
public void run() {
for (;;) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
 
if (! isSessionOpened) {
handConnectResponse();
} else {
handleResponse();
}
}
}
}
 
@Test
public void connect() throws IOException {
Thread wt = new WriteWorker();
wt.start();
Thread rt = new ReaderWorker();
rt.start();
 
try {
wt.join();
rt.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

 






启动运行后:

服务器打印出如下日志:

2017-04-16 02:46:47,000 [myid:1] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2191:NIOServerCnxnFactory@192] - Accepted socket connection from /127.0.0.1:3681
2017-04-16 02:46:47,078 [myid:1] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2191:ZooKeeperServer@900] - Client attempting to establish new session at /127.0.0.1:3681
2017-04-16 02:46:47,203 [myid:1] - INFO  [CommitProcessor:1:ZooKeeperServer@645] - Established session 0x15b71a085380002 with negotiated timeout 10000 for client /127.0.0.1:3681
2017-04-16 02:46:47,218 [myid:1] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2191:ZooKeeperServer@924] - got auth packet /127.0.0.1:3681
2017-04-16 02:46:47,375 [myid:1] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2191:ZooKeeperServer@958] - auth success /127.0.0.1:3681
2017-04-16 02:46:58,062 [myid:1] - INFO  [CommitProcessor:1:NIOServerCnxn@1008] - Closed socket connection for client /127.0.0.1:3681 which had sessionid 0x15b71a085380002


客户端输出:

org.apache.zookeeper.proto.ConnectResponse@1e0be38[
  protocolVersion=0
  timeOut=10000
  sessionId=97796751162343430
  passwd={-32,49,98,88,-1,-10,-5,13,19,-80,10,87,71,61,41,-28}
]
org.apache.zookeeper.proto.ReplyHeader@1aaa14a[
  xid=-4
  zxid=0
  err=0
]

 

0
0
分享到:
评论

相关推荐

    ZooKeeper会话超时以及重连机制.pdf

    3. **恢复会话状态**:一旦重新建立连接成功,客户端需要恢复之前的会话状态。这一过程涉及到数据同步、状态恢复等一系列操作。 4. **同步状态信息**:客户端会从服务器获取最新的数据状态,以确保数据的一致性。 5....

    java连接zookeeper的jar包

    7. **commons-cli.jar**:Apache Commons CLI,提供解析命令行参数的功能,虽然在Java连接Zookeeper的场景中可能不是必须的,但在某些工具或应用中,可能会使用它来接收命令行参数来配置Zookeeper连接。 8. **netty...

    zookeeper-3.4.8源码包

    客户端通过TCP连接与Zookeeper服务器建立会话,心跳机制确保会话的有效性。当客户端与服务器失去连接时,客户端可以自动重连到集群中的其他服务器,保证服务的连续性。 四、Zookeeper Watcher机制 Watcher是...

    zookeeper 3.6.3 源码下载

    2. **会话(Session)**:客户端与ZooKeeper服务器建立的连接称为会话,会话期间,客户端可以发送请求并接收响应,会话具有超时机制。 3. **Watcher**:ZooKeeper提供了一种事件监听机制,称为Watcher,允许客户端...

    zookeeper版本为zookeeper-3.4.10.tar.gz

    2. **会话(Session)**:客户端与Zookeeper服务器建立的连接称为会话,会话期间,服务器会持续监控客户端的状态,如果客户端因为网络故障断开连接,Zookeeper会根据预设的超时时间决定是否关闭该会话。 3. **...

    ZooKeeper 客户端的使用(二).

    - 调用 `ZooKeeper` 的 `connect()` 方法建立连接。 2. **操作 ZNode** - **创建 ZNode**:使用 `create()` 方法,指定 ZNode 的路径、数据、权限和类型(临时或持久)。 - **读取 ZNode**:通过 `getData()` ...

    zookeeper客户端

    客户端首先通过TCP/IP协议建立连接,然后进行会话建立,期间可能涉及身份验证和权限检查。一旦会话建立,客户端就可以发送请求给服务器,包括读取、写入、监控节点变化等操作。Zookeeper保证了在会话期间,即使...

    Zookeeper中文开发指南

    2. **会话(Session)**: 用户与Zookeeper服务器建立的连接称为会话。如果会话超时或网络中断,与Zookeeper的连接将会断开。 3. **Watcher**: Watcher 是一种事件监听机制,当Znode的状态发生改变时,注册了该节点...

    zookeeper测试小程序

    1. 连接和断开Zookeeper服务器:测试程序会初始化一个Zookeeper客户端实例,通过指定的服务器地址和端口建立连接。断开连接通常发生在会话超时或者主动关闭。 2. 创建ZNode:可以创建临时或持久性的ZNode,并设置...

    zookeeper-3.6.3.zip

    4. **会话(Session)**:客户端与Zookeeper服务器之间建立的连接称为会话,会话具有超时机制,如果服务器和客户端之间的连接断开,会话将在一段时间后超时。 5. **选举算法**:Zookeeper使用Paxos或Fast-Paxos算法...

    ZooKeeper 客户端的使用(一)

    - **会话**:每个客户端与 ZooKeeper 服务器建立的连接称为会话。如果服务器与客户端之间的网络连接断开,只要在会话超时时间内重新连接,会话仍然有效。 - **Watcher**:Watcher 是 ZooKeeper 中的一种通知机制,...

    zookeeper-API开发lib

    - 会话:建立连接后,ZooKeeper服务器为客户端分配一个会话,具有一定的超时时间。如果网络中断导致会话失效,ZooKeeper会尝试重新连接,并保持已存在的会话状态。 2. **数据节点(ZNode)操作**: - `create()`...

    zookeeper-3.4.14.zip

    2. **会话(Session)**:客户端与Zookeeper服务器建立的连接被称为会话。在会话期间,客户端可以创建、更新或删除Znode,并接收服务器的watch事件通知。 3. **Watcher**:Watcher是Zookeeper的一种观察机制,允许...

    zookeeper-3.4.12版本

    通过`connect()`方法建立连接,然后可以调用`create()`, `exists()`, `getData()`, `setData()`等方法进行数据操作。 六、Zookeeper部署与配置 1. **安装**:解压`zookeeper-3.4.12`压缩包到服务器,修改`conf/zoo....

    zookeeper-3.4.6安装包

    2. **会话(Session)**:当客户端连接到Zookeeper服务器时,就会创建一个会话。会话期间,如果网络出现短暂的中断,Zookeeper能够恢复与客户端的连接,保证服务的连续性。 3. **Watcher**:Watcher是Zookeeper的一...

    zookeeper测试例子.rar

    4. **会话与心跳机制**:客户端与Zookeeper服务器建立会话,并通过心跳机制保持连接状态。如果服务器未收到心跳,会认为客户端已经断开,相应地清理资源。 5. **选举算法**:Zookeeper采用Paxos或Fast-Paxos算法...

    zookeeper-3.4.2.tar.gz

    一旦建立连接,客户端可以在该会话期间发送请求到服务器,并接收服务器的响应。 2. 节点(ZNode):Zookeeper的数据存储是以节点(ZNode)的形式存在的,每个节点都有一个唯一的路径标识。节点可以存储数据,也可以...

    zookeeper 自己学习资料

    2. **Zookeeper的工作流程**:包括客户端连接、会话建立、请求处理、数据同步等过程。 3. **Zookeeper的操作命令**:如`ls`、`get`、`set`、`create`等,用于查看、修改和创建Znodes。 4. **Zookeeper的Watcher机制*...

    zookeeper-3.4.8

    3. **会话与心跳**:客户端与服务器之间建立会话,通过心跳机制保持连接状态,当服务器感知到客户端长时间无响应,会话将超时。 4. **事件通知**:当ZNode的数据或者结构发生改变时,已设置watches的客户端会接收到...

Global site tag (gtag.js) - Google Analytics