建立连接会话
建立连接会话请求响应没有头部
建立连接会话请求
class ConnectRequest {
int protocolVersion;
long lastZxidSeen;
int timeOut;
long sessionId;
buffer passwd;
}
建立连接会话响应
class ConnectResponse {
int protocolVersion;
int timeOut;
long sessionId;
buffer passwd;
}
public class ConnectTest {
private static final Logger log = LoggerFactory.getLogger(ConnectTest.class);
private static SocketChannel channel;
@BeforeClass
public static void initialize() throws IOException {
channel = SocketChannel.open();
SocketAddress socketAddress = new InetSocketAddress("localhost", 2191);
channel.connect(socketAddress);
}
private static class WriteWorker extends Thread {
public void run() {
//for (;;) {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
ConnectRequest connectRequest = new ConnectRequest(0, 0, 10000, 0, new byte[16]);
boa.writeInt(-1, "len"); // We'll fill this in later
connectRequest.serialize(boa, "connect");
boa.writeBool(true, "readOnly");
baos.close();
ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
bb.putInt(bb.capacity() - 4);
bb.rewind();
channel.write(bb);
} catch (IOException e) {
e.printStackTrace();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//}
}
}
private static class ReaderWorker extends Thread {
public void run() {
for (;;) {
try {
ByteBuffer lenBuffer = ByteBuffer.allocateDirect(4);
while (true) {
channel.read(lenBuffer);
if (! lenBuffer.hasRemaining()) {
break;
}
}
lenBuffer.flip();
int len = lenBuffer.getInt();
if (len < 0 || len >= ClientCnxn.packetLen) {
throw new IOException("Packet len" + len + " is out of range!");
}
ByteBuffer buffer = ByteBuffer.allocate(len);
while (true) {
channel.read(buffer);
if (! buffer.hasRemaining()) {
break;
}
}
buffer.flip();
ByteBufferInputStream bbis = new ByteBufferInputStream(buffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ConnectResponse conRsp = new ConnectResponse();
conRsp.deserialize(bbia, "connect");
// read "is read-only" flag
boolean isRO = false;
try {
isRO = bbia.readBool("readOnly");
} catch (IOException e) {
// this is ok -- just a packet from an old server which
// doesn't contain readOnly field
log.warn("Connected to an old server; r-o mode will be unavailable");
}
System.out.println(ToStringBuilder.reflectionToString(conRsp, ToStringStyle.MULTI_LINE_STYLE));
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
@Test
public void connect() throws IOException {
Thread wt = new WriteWorker();
wt.start();
Thread rt = new ReaderWorker();
rt.start();
try {
wt.join();
rt.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
启动运行后:
服务器打印出如下日志:
2017-04-14 20:53:31,138 [myid:1] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2191:NIOServerCnxnFactory@192] - Accepted socket connection from /127.0.0.1:2544
2017-04-14 20:53:31,170 [myid:1] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2191:ZooKeeperServer@900] - Client attempting to establish new session at /127.0.0.1:2544
2017-04-14 20:53:31,248 [myid:1] - INFO [CommitProcessor:1:ZooKeeperServer@645] - Established session 0x15b6c2343f90011 with negotiated timeout 10000 for client /127.0.0.1:2544
2017-04-14 20:53:42,060 [myid:1] - INFO [CommitProcessor:1:NIOServerCnxn@1008] - Closed socket connection for client /127.0.0.1:2544 which had sessionid 0x15b6c2343f90011
客户端输出:
org.apache.zookeeper.proto.ConnectResponse@16de067[
protocolVersion=0
timeOut=10000
sessionId=97790715638644753
passwd={-18,75,33,46,-73,2,30,-80,-43,-55,-68,-125,-23,-87,6,72}
]
可以看出建立了一个新会话,会话id为:97790715638644753
zookeeper 请求认证
认证
认证请求有头部,响应有头部
请求头
class RequestHeader { int xid; int type; }
认证请求
class AuthPacket { int type; ustring scheme; buffer auth; }
响应头
class ReplyHeader { int xid; long zxid; int err; }
public class AuthPacketTest { private static final Logger log = LoggerFactory.getLogger(AuthPacketTest.class); private static SocketChannel channel; private static volatile boolean isSessionOpened = false; private static long sid = 0; @BeforeClass public static void initialize() throws IOException { channel = SocketChannel.open(); //SocketAddress socketAddress = new InetSocketAddress("localhost", 2182); SocketAddress socketAddress = new InetSocketAddress("localhost", 2191); channel.connect(socketAddress); } private static class WriteWorker extends Thread { private void isSessionAlive() { heartbeat(); } private void heartbeat() { ping(); } private void ping() { try { ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); boa.writeInt(-1, "len"); // We'll fill this in later RequestHeader h = new RequestHeader(-2, OpCode.ping); h.serialize(boa, "header"); baos.close(); ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray()); bb.putInt(bb.capacity() - 4); bb.rewind(); channel.write(bb); } catch (IOException e) { e.printStackTrace(); } } private void auth() { try { ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); boa.writeInt(-1, "len"); // We'll fill this in later RequestHeader requestHeader = new RequestHeader(-4, OpCode.auth); requestHeader.serialize(boa, "header"); //AuthPacket authPacket = new AuthPacket(0, "auth", "".getBytes()); AuthPacket authPacket = new AuthPacket(0, "ip", "127.0.0.1".getBytes()); authPacket.serialize(boa, "request"); baos.close(); ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray()); bb.putInt(bb.capacity() - 4); bb.rewind(); channel.write(bb); } catch (IOException e) { e.printStackTrace(); } } private void connect() { try { ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); ConnectRequest connectRequest = new ConnectRequest(0, 0, 10000, 0, new byte[16]); boa.writeInt(-1, "len"); // We'll fill this in later connectRequest.serialize(boa, "connect"); boa.writeBool(true, "readOnly"); baos.close(); ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray()); bb.putInt(bb.capacity() - 4); bb.rewind(); channel.write(bb); } catch (IOException e) { e.printStackTrace(); } } public void run() { connect(); for (;;) { isSessionAlive(); auth(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } private static class ReaderWorker extends Thread { private void handConnectResponse() { try { ByteBuffer lenBuffer = ByteBuffer.allocateDirect(4); while (true) { channel.read(lenBuffer); if (! lenBuffer.hasRemaining()) { break; } } lenBuffer.flip(); int len = lenBuffer.getInt(); if (len < 0 || len >= ClientCnxn.packetLen) { throw new IOException("Packet len" + len + " is out of range!"); } ByteBuffer buffer = ByteBuffer.allocate(len); while (true) { channel.read(buffer); if (! buffer.hasRemaining()) { break; } } buffer.flip(); ByteBufferInputStream bbis = new ByteBufferInputStream(buffer); BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis); ConnectResponse conRsp = new ConnectResponse(); conRsp.deserialize(bbia, "connect"); // read "is read-only" flag boolean isRO = false; try { isRO = bbia.readBool("readOnly"); } catch (IOException e) { // this is ok -- just a packet from an old server which // doesn't contain readOnly field log.warn("Connected to an old server; r-o mode will be unavailable"); } sid = conRsp.getSessionId(); isSessionOpened = true; System.out.println(ToStringBuilder.reflectionToString(conRsp, ToStringStyle.MULTI_LINE_STYLE)); } catch (IOException e) { e.printStackTrace(); } } private void handleResponse() { try { ByteBuffer lenBuffer = ByteBuffer.allocateDirect(4); while (true) { channel.read(lenBuffer); if (! lenBuffer.hasRemaining()) { break; } } lenBuffer.flip(); int len = lenBuffer.getInt(); if (len < 0 || len >= ClientCnxn.packetLen) { throw new IOException("Packet len" + len + " is out of range!"); } ByteBuffer buffer = ByteBuffer.allocate(len); while (true) { channel.read(buffer); if (! buffer.hasRemaining()) { break; } } buffer.flip(); ByteBufferInputStream bbis = new ByteBufferInputStream(buffer); BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis); ReplyHeader replyHdr = new ReplyHeader(); replyHdr.deserialize(bbia, "header"); System.out.println(ToStringBuilder.reflectionToString(replyHdr, ToStringStyle.MULTI_LINE_STYLE)); } catch (IOException e) { e.printStackTrace(); } } public void run() { for (;;) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } if (! isSessionOpened) { handConnectResponse(); } else { handleResponse(); } } } } @Test public void connect() throws IOException { Thread wt = new WriteWorker(); wt.start(); Thread rt = new ReaderWorker(); rt.start(); try { wt.join(); rt.join(); } catch (InterruptedException e) { e.printStackTrace(); } } }
启动运行后:
服务器打印出如下日志:
2017-04-16 02:46:47,000 [myid:1] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2191:NIOServerCnxnFactory@192] - Accepted socket connection from /127.0.0.1:3681
2017-04-16 02:46:47,078 [myid:1] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2191:ZooKeeperServer@900] - Client attempting to establish new session at /127.0.0.1:3681
2017-04-16 02:46:47,203 [myid:1] - INFO [CommitProcessor:1:ZooKeeperServer@645] - Established session 0x15b71a085380002 with negotiated timeout 10000 for client /127.0.0.1:3681
2017-04-16 02:46:47,218 [myid:1] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2191:ZooKeeperServer@924] - got auth packet /127.0.0.1:3681
2017-04-16 02:46:47,375 [myid:1] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2191:ZooKeeperServer@958] - auth success /127.0.0.1:3681
2017-04-16 02:46:58,062 [myid:1] - INFO [CommitProcessor:1:NIOServerCnxn@1008] - Closed socket connection for client /127.0.0.1:3681 which had sessionid 0x15b71a085380002
客户端输出:
org.apache.zookeeper.proto.ConnectResponse@1e0be38[
protocolVersion=0
timeOut=10000
sessionId=97796751162343430
passwd={-32,49,98,88,-1,-10,-5,13,19,-80,10,87,71,61,41,-28}
]
org.apache.zookeeper.proto.ReplyHeader@1aaa14a[
xid=-4
zxid=0
err=0
]
相关推荐
3. **恢复会话状态**:一旦重新建立连接成功,客户端需要恢复之前的会话状态。这一过程涉及到数据同步、状态恢复等一系列操作。 4. **同步状态信息**:客户端会从服务器获取最新的数据状态,以确保数据的一致性。 5....
7. **commons-cli.jar**:Apache Commons CLI,提供解析命令行参数的功能,虽然在Java连接Zookeeper的场景中可能不是必须的,但在某些工具或应用中,可能会使用它来接收命令行参数来配置Zookeeper连接。 8. **netty...
客户端通过TCP连接与Zookeeper服务器建立会话,心跳机制确保会话的有效性。当客户端与服务器失去连接时,客户端可以自动重连到集群中的其他服务器,保证服务的连续性。 四、Zookeeper Watcher机制 Watcher是...
2. **会话(Session)**:客户端与ZooKeeper服务器建立的连接称为会话,会话期间,客户端可以发送请求并接收响应,会话具有超时机制。 3. **Watcher**:ZooKeeper提供了一种事件监听机制,称为Watcher,允许客户端...
4. **会话(Session)**:客户端与Zookeeper服务器之间建立的连接称为会话,会话具有超时机制,如果服务器和客户端之间的连接断开,会话将在一段时间后超时。 5. **选举算法**:Zookeeper使用Paxos或Fast-Paxos算法...
2. **会话(Session)**:客户端与Zookeeper服务器建立的连接称为会话,会话期间,服务器会持续监控客户端的状态,如果客户端因为网络故障断开连接,Zookeeper会根据预设的超时时间决定是否关闭该会话。 3. **...
- 调用 `ZooKeeper` 的 `connect()` 方法建立连接。 2. **操作 ZNode** - **创建 ZNode**:使用 `create()` 方法,指定 ZNode 的路径、数据、权限和类型(临时或持久)。 - **读取 ZNode**:通过 `getData()` ...
客户端首先通过TCP/IP协议建立连接,然后进行会话建立,期间可能涉及身份验证和权限检查。一旦会话建立,客户端就可以发送请求给服务器,包括读取、写入、监控节点变化等操作。Zookeeper保证了在会话期间,即使...
2. **会话(Session)**: 用户与Zookeeper服务器建立的连接称为会话。如果会话超时或网络中断,与Zookeeper的连接将会断开。 3. **Watcher**: Watcher 是一种事件监听机制,当Znode的状态发生改变时,注册了该节点...
1. 连接和断开Zookeeper服务器:测试程序会初始化一个Zookeeper客户端实例,通过指定的服务器地址和端口建立连接。断开连接通常发生在会话超时或者主动关闭。 2. 创建ZNode:可以创建临时或持久性的ZNode,并设置...
- **会话**:每个客户端与 ZooKeeper 服务器建立的连接称为会话。如果服务器与客户端之间的网络连接断开,只要在会话超时时间内重新连接,会话仍然有效。 - **Watcher**:Watcher 是 ZooKeeper 中的一种通知机制,...
2. **会话(Session)**: 客户端与ZooKeeper服务器建立的连接称为会话,会话期间,服务器保证了客户端操作的顺序性。 3. **Watcher事件**: ZooKeeper提供了一种机制,即Watcher,它可以注册在特定的ZNode上,当该...
- 会话:建立连接后,ZooKeeper服务器为客户端分配一个会话,具有一定的超时时间。如果网络中断导致会话失效,ZooKeeper会尝试重新连接,并保持已存在的会话状态。 2. **数据节点(ZNode)操作**: - `create()`...
2. **会话(Session)**:客户端与Zookeeper服务器建立的连接被称为会话。在会话期间,客户端可以创建、更新或删除Znode,并接收服务器的watch事件通知。 3. **Watcher**:Watcher是Zookeeper的一种观察机制,允许...
通过`connect()`方法建立连接,然后可以调用`create()`, `exists()`, `getData()`, `setData()`等方法进行数据操作。 六、Zookeeper部署与配置 1. **安装**:解压`zookeeper-3.4.12`压缩包到服务器,修改`conf/zoo....
2. **会话(Session)**:当客户端连接到Zookeeper服务器时,就会创建一个会话。会话期间,如果网络出现短暂的中断,Zookeeper能够恢复与客户端的连接,保证服务的连续性。 3. **Watcher**:Watcher是Zookeeper的一...
4. **会话与心跳机制**:客户端与Zookeeper服务器建立会话,并通过心跳机制保持连接状态。如果服务器未收到心跳,会认为客户端已经断开,相应地清理资源。 5. **选举算法**:Zookeeper采用Paxos或Fast-Paxos算法...
一旦建立连接,客户端可以在该会话期间发送请求到服务器,并接收服务器的响应。 2. 节点(ZNode):Zookeeper的数据存储是以节点(ZNode)的形式存在的,每个节点都有一个唯一的路径标识。节点可以存储数据,也可以...
2. **Zookeeper的工作流程**:包括客户端连接、会话建立、请求处理、数据同步等过程。 3. **Zookeeper的操作命令**:如`ls`、`get`、`set`、`create`等,用于查看、修改和创建Znodes。 4. **Zookeeper的Watcher机制*...