Java NIO时间服务
这篇文章内容是另一篇文章《Java 实现基于Redis的分布式锁》的分支.
时间服务包括客户端和服务端, 服务端监听请求 ,若是时间请求,则返回当前服务器的时间, 各个客户端(分布式锁) 都从给服务器获取时间,已达到全局时间一致。
共三个类 TimeServer、 TimeClient和TimeClientException,下面是源码:
package cc.lixiaohui.lock.time.nio.server; import; import; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 提供简单时间服务的服务器 * * @author lixiaohui */ public class TimeServer { private ServerSocketChannel serverChannel; private Selector selector; private volatile boolean alive = true; private static final String TIME_CMD = "time"; private static final String HALT_CMD = "halt"; private static final String ERROR = "error"; private static final Logger logger = LoggerFactory.getLogger(TimeServer.class); public void start(int port) throws IOException { selector =; serverChannel =; serverChannel.configureBlocking(false); // non-blocking mode serverChannel.bind(new InetSocketAddress(port)); // interested only in accept event serverChannel.register(selector, SelectionKey.OP_ACCEPT); while (alive) { try { if ( < 0) { // no events continue; } Iterator<SelectionKey> it = selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey key =; it.remove(); try { if (!key.isValid()) continue; if (key.isAcceptable()) { // new channel incoming SocketChannel ch = ((ServerSocketChannel); // ignore if register failed if (!registerChannel(selector, ch, SelectionKey.OP_READ)) { continue; }"new channel registered {}", ch.getRemoteAddress().toString()); } // client request if (key.isReadable()) { handleRead(key); } if (key.isWritable()) { handleWrite(key); } } catch (IOException e) { logger.error("{} exception: {}",, e); if (key != null) { key.cancel(); if ( != null) {; } } } } } catch (Exception e) { logger.error("{}", e); } } if (selector != null) { try { selector.close(); } catch (Exception e) { logger.error("error occurred when closing selector: e", e); } } } private void handleWrite(SelectionKey key) throws IOException { SocketChannel ch = (SocketChannel); try { ByteBuffer buf = (ByteBuffer) key.attachment(); if (buf != null) { writeBytesToChannel(ch, buf, key); } } catch (ClassCastException e) { logger.error("{}", e); } } private void handleRead(SelectionKey key) throws IOException { SocketChannel ch = (SocketChannel); ByteBuffer buffer = ByteBuffer.allocate(16); int read =; if (read < 4) { // not a full command, write error back, // meaning client will send command // again. writeBytesToChannel(ch, ERROR.getBytes(), key); } else { String cmd = extractCommand(buffer);"recieve {} request from {}", cmd, ch.getRemoteAddress().toString()); if (TIME_CMD.equalsIgnoreCase(cmd)) { // 回写时间 writeBytesToChannel(ch, String.valueOf(time()).getBytes(), key);"write time to {}", ch.getRemoteAddress().toString()); } else if (HALT_CMD.equalsIgnoreCase(cmd)) { // 停止服务"stopping timeserver"); stop();"timeserver stopped"); } else { writeBytesToChannel(ch, ERROR.getBytes(), key); logger.warn("unreconized command {}, will discard it.", cmd); } } } private String extractCommand(ByteBuffer buffer) { buffer.flip(); byte[] array = buffer.array(); byte[] newArray = new byte[buffer.remaining()]; System.arraycopy(array, buffer.position(), newArray, 0, buffer.remaining()); return new String(newArray); } private void writeBytesToChannel(SocketChannel ch, byte[] bs, SelectionKey key) throws IOException { ByteBuffer buf = ByteBuffer.wrap(bs); int total = buf.remaining(); int write = ch.write(buf); if (write < total) { // didn't wrote all, then write rest when next // event triggered key.attach(buf); } } private void writeBytesToChannel(SocketChannel ch, ByteBuffer buf, SelectionKey key) throws IOException { if (!buf.hasRemaining()) { return; } int total = buf.remaining(); int write = ch.write(buf); if (write < total) { // didn't wrote all, then write rest when next // event triggered key.attach(buf); } } protected void stop() { alive = false; try { serverChannel.close(); selector.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } private boolean registerChannel(Selector sel, SocketChannel sc, int ops) { try { sc.configureBlocking(false); sc.register(sel, ops); } catch (Exception e) { return false; } return true; } private long time() { return System.currentTimeMillis(); } }
package cc.lixiaohui.lock.time.nio.client; import; import; import; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; /** * 时间获取客户端 * @author lixiaohui * */ public class TimeClient { private static final String TIME_CMD = "time"; private final SocketAddress address; private SocketChannel channel; public TimeClient(SocketAddress address) throws IOException { this.address = address; channel =; channel.configureBlocking(true); // blocking mode } /** * @throws TimeClientException when connection with time server is closed. * @return currentTimeMillis in server */ public long currentTimeMillis() { try { channel.write(ByteBuffer.wrap(TIME_CMD.getBytes())); ByteBuffer buf = ByteBuffer.allocate(64);; buf.flip(); // flip for use of read byte[] bytes = new byte[buf.limit() - buf.position()]; System.arraycopy(buf.array(), buf.position(), bytes, 0, bytes.length); return Long.parseLong(new String(bytes)); } catch(NumberFormatException e) { System.err.println(e); return System.currentTimeMillis(); } catch (IOException e) { throw new TimeClientException(address); } } /** * close the client, along with its connection with server. */ public void close() { try { if (channel != null) { channel.close(); } } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) throws IOException { TimeClient client = new TimeClient(new InetSocketAddress("localhost", 9999)); System.out.println(client.currentTimeMillis()); //client.close();; } }
package cc.lixiaohui.lock.time.nio.client; import; public class TimeClientException extends RuntimeException { /** * */ private static final long serialVersionUID = 1L; public TimeClientException() { super(); // TODO Auto-generated constructor stub } public TimeClientException(String message) { super(message); // TODO Auto-generated constructor stub } public TimeClientException(SocketAddress address) { super(address.toString()); } }
