精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
|
|
---|---|
作者 | 正文 |
发表时间:2008-10-30
uniseraph 大牛写了一个用mina实现的memcached java client.最近正好在研究mina,来的早不如来的巧,现在跟大家分享一下我今天一天的劳动成果,望大家指教 该项目的基本思想是:使用一个IoSession池,当前默认是使用SimpleCircularQueue,借助于mina的机制,实现高并发请求的。目前只有set和get功能。现在从set开始分析。 在session建立之后,通过MemcachedClientHandler#sessionCreated(IoSession)在每个session中加入一个List和一个Map,在MemcachedClient中的set方法中构造SetCommand然后使用pool发送。MemcachedMutexIoSessionPool#send()方法可以选择是否使用异步调用,但是我感觉这里的异步调用有些问题,同步当然是没有问题,通过holder.getContext().await()就可以阻塞send方法等待结果。但是如果是异步方式的话,返回null,我感觉不妥,客户端得到null?如果是取得数据的话,是否可以返回一个MemcachedResponse的子类,但是想得到结果时阻塞呢?当然这只是我自己YY的。MemcachedMutexIoSessionPool#send()代码如下。 public MemcachedResponse send(MemcachedCommand message, boolean sync) { IoSession session = selectSession(); RequestContextHolder holder = new RequestContextHolder(); holder.getContext().setCommand(message); if (sync) { session.write(holder); try { holder.getContext().await(); } catch (InterruptedException e) { throw new RuntimeException(e); } if (!holder.getContext().isSuccess()) { if (log.isInfoEnabled()) log.info(String.format("time out")); } return holder.getContext().getReponse(); } else { session.write(holder); return null; } } 调用sessio.write(holder)后,通过await方法阻塞,但是MINA会去调用encoder对消息进行encode,这里使用的是CommandEncoder,代码如下: public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception { RequestContextHolder holder = (RequestContextHolder) message; IoBuffer buf = holder.getContext().getCommand().toBuffer( MemcachedConstants.DEFAULT_CHARSET); buf.setAutoExpand(true); buf.flip(); ConcurrentHashMap<IoBuffer, RequestContext> encodedMessage2Message = (ConcurrentHashMap<IoBuffer, RequestContext>) session .getAttribute("encodedMessage2Message"); RequestContext newItem = encodedMessage2Message.putIfAbsent(buf, holder.getContext()); if (newItem == null) { out.write(buf); } else { if(log.isDebugEnabled()){ log.debug("reusing command."); } holder.setContext(newItem); } } 首先将command转换成IoBuffer,然后把IoBuffer和ReuestContext对于放入session中的map中。 消息发送完毕后,MINA的filter机制开始工作,AppendFilter中的messageSent方法拦截session,将刚刚session中map存储的RequestContext取出来,放入到session中的队列中。在memcached通讯层分析 中提到过memcache在同一个socket上,命令处理是按顺序的。根据这个就可以按照先后顺序将RequestContext加入到队列中就可以。 AppenderFilter代码: public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { Object obj = writeRequest.getMessage(); if (obj instanceof IoBuffer) { IoBuffer buf = (IoBuffer) obj; if (buf.limit() != 0) { ConcurrentHashMap<IoBuffer, RequestContext> encodedMessage2Message = SessionUtil .getEncodedMessage2Message(session); LinkedBlockingQueue<RequestContext> queue = SessionUtil .getCommandQueue(session); RequestContext item = encodedMessage2Message.remove(buf); if (item == null) { log.error("Can't find the command :" + buf + "in the map"); log.error("map :" + encodedMessage2Message.toString()); throw new IllegalStateException(); } queue.add(item); } } nextFilter.messageSent(session, writeRequest); } 然后就是response的decode,这里使用LinedReponseDecoder,在decode时,先使用TextlineDecoder首先进行decode,这里有一个hint,就是decode方法需要带一个ProtocolDecoderOutput的参数,其实ProtocolDecoderOutput只是decode方法的一个callback,这里,首先从session中取出先前存储的list,在TextlineDecoder decode完成后,将结果加到context中。具体看代码: public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { final List<String> context = (List<String>) session .getAttribute(MemcachedConstants.LINES_CONTEXT); decoder.decode(session, in, new ProtocolDecoderOutput() { public void flush() { /** * ignore */ } public void write(Object message) { context.add(message.toString()); } }); 最后decode方法再根据先前的先来先服务原则,解析对于的response就可以了。这部分比较简单了。
总结这个项目,重点是借助于mina的处理机制,对mina有很深了解的看这个项目应该不难,主要是思维比较好。 学习中……
声明:ITeye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
推荐链接
|
|
返回顶楼 | |
浏览 3085 次