论坛首页 Java企业应用论坛

memcached java下性能测试报告、分析与问题讨论

浏览 43919 次
该帖已经被评为良好帖
作者 正文
   发表时间:2007-05-17  
alin_ass 写道
不是很喜欢memcache java client这种搞个连接池,每次发送的时候拿到一个连接,发送后阻塞在那里等回应,一般remote cache如果每次请求带个id的话,是可以在client端用一个链接做异步的,也就是request pool而不是connection pool


connection pool 的确是方便了编程,有点像服务器端的 OneThreadPerConnection 的编程风格。
如果转为 request pool ,就相当于是 event-driven 的编程风格了。这种 event-driven 风格的 api 对于使用者来说会带来比较大的困难,会增加客户端的编程难度。如果这个客户端是一个需求改变比较频繁,代码修改比较频繁的模块,那么这些增加的编程难度将带来很大的障碍。
另外对于 http 这种 request/response 方式的应用来说,在服务器端处理请求的时候,即使使用 request pool ,用异步方式来访问 memcached ,还是一样要在处理请求的 action 里等待这个异步操作的完成,相当于还是同步阻塞的,和 connection pool 没很大区别。如果客户端是一个批处理模式的应用,那么使用异步处理方式会快很多。

引用
这个帖子里的朋友有兴趣去写一个request pool而不是connection pool的mem client么?

或者谁能证明n个链接发数据和1个链接在数据吞吐量上有什么差别, connection pool只是为了编程上方便的做法


要实现 request pool 的 memclient,需要 memcached 服务器的支持。目前的 memcached 的协议并不支持。
因为要实现 request pool 的方式,这个 RequestID 是服务器必须要知道。memcached 的协议并不支持传递 RequetID 或者产生 RequestID。
服务器必须要知道 RequestID,是因为服务器必须要在 Response 中加上这个 RequestID ,这样客户端才能把 response 和 request 对应起来。RequestID 可以由客户端生成,传递给服务器,也可以由服务器在收到请求之后,产生一个 RequestID ,客户端每次发送请求之后,同步读这个 RequestID 。通常如果请求的处理时间比较长,可以考虑由服务器来产生,否则读取服务器产生的 RequestID 的消耗可能已经超过读取本来的 Response 了。
1 请登录后投票
   发表时间:2007-05-31  

timelyRain 写道:

我的项目原来使用静态HashMap来实现Key->Object的缓存,并且实现脏数据刷新.由于项目要改成集群部署.在单个jvm里运行的静态Hash结构已经无法处理脏数据问题.所以准备使用memcached做分布式缓存来解决.

从网上搜索到的资料来看 memcached能够接受较大量的请求.但其javaclient 由于大量使用同步语句、hashmap,读取流没有使用bufferedStream。造成性能并不好
为此在我的项目里我参考memcached的协议,自己实现一个客户端功能。经过测试后发现一些问题。

测试环境1:
windows xp home
迅驰T2250
1G内存
jdk1.5
server: memcached-1.2.1 nt版本
client: 自己编写
用于传输的pojo 40个属性,序列化后为750个byte
测试client与server同台机器

测试方法
填充测试时填充10w个pojo。创建100个任务,每任务负责向memcached存储1000个pojo。
读取测试时读取10000个pojo 创建100个任务,每任务读取100个pojo。
平均值均按照线程内取平均值后,各线程再取平均值。
在进行上述测试的时候 cpu占用均在90%-100%,根据上述测试结果,开2个线程时效率最高。

thread count write avg/per obj write totall read avg/per obj read total
10 2.404991 ms 29s 1.709544 ms 2s
5 0.704780 ms 18s 1.333013 ms 2s
2 0.262194 ms 15s 0.414683 ms 2s






测试环境2:
AIX5.2
IBM P650  Power4 1.5G(64bit) *4
内存8G
jdk1.4
memcached-1.2.1
libevnet1.1b
client自己编写
用于传输的pojo 40个属性,序列化后为750个byte
测试client与server同台机器

相同的测试方法。测试结果大跌眼睛  10线程时 读写速度为200ms/per object 比我的笔记本慢100倍。就没继续测试

测试环境3:
windows2000 server
xeon 1.5*4
内存8G
jdk1.5
测试时发现cpu占用不高于20%

thread count write avg/per obj write total read avg/per obj read total
20 10.266615ns 71s 23.21283ns 15s
10 4.341574ns 41s 13.30084ns 16s
5 1.298717ns 25s 9.33258ns 18s
2 1.298717ns 21s 4.02503ns 23s


 




初步测试到这里 发现的问题
1.是暂时没有达到网上宣传的1.5w个对象/s (目前测试cpu为瓶颈)
2.是aix下效率低的可怕,对aix不太熟悉。应该有些设置的参数,这点还要向大家请教。
3.超过2个线程效率就开始低。没有发挥多线程的优势,不知道大家在使用过程中有没有发现这一点,还是说我写的MemBufferedDriver有问题。

我的期望是读写速度均能稳定在0.1毫秒左右,至少 1w obj/s 这样才有可能不影响到现有系统的效率
另外大家在java下使用memcached的时候效率怎么样都是怎么用的呢

下面贴下我的client的实现
memcached的协议可以参考memcache包的doc/protocol.txt

主要是socket初次打开后不关闭,直接存放到ThreadLocal中,与memcached保持一个长练接。每次使用的时候判断连接可用还是需要重新连接。

其他存储和读取实现memcached协议。使用BufferedStream。
序列化采用实现Serializable,直接使用ObjectStream来实现。(由于都是pojo简单数据对象,尝试过实现Externalizable接口自己实现序列化和使用750个byte的String 来序列化,发现性能相差不多故放弃)

java 代码
MemBufferedDriver 为client实现
  1. public class MemBufferedDriver {   
  2.     /**  
  3.      * 存放 连到cacheserver的socket连接  
  4.      */  
  5.     private final static ThreadLocal sockPool = new ThreadLocal();   
  6.     private static String serverAddress = "localhost:11211";   
  7.   
  8.     public static final byte[] BYTE_GET = new byte[]{10310111632};   
  9.     public static final byte[] BYTE_SET = new byte[]{11510111632};   
  10.     public static final byte[] BYTE_DELETE = new byte[]{10010110810111610132};   
  11.     public static final byte[] BYTE_CRLF = new byte[]{1310};   
  12.     public static final byte[] BYTE_SPACE = new byte[]{32};   
  13.   
  14.     public static final String SERVER_STATUS_DELETED = "DELETED";   
  15.     public static final String SERVER_STATUS_NOT_FOUND = "NOT_FOUND";   
  16.     public static final String SERVER_STATUS_STORED = "STORED";   
  17.     public static final String SERVER_STATUS_ERROR = "ERROR";   
  18.     public static final String SERVER_STATUS_END = "END";   
  19.     public static final String SERVER_STATUS_VALUE = "VALUE";   
  20.   
  21.     public static final String ENCODING_TYPE = "UTF-8";   
  22.   
  23.   
  24.     public static Socket getSocket() throws UnknownHostException, IOException {   
  25.         Socket s = (Socket) MemBufferedDriver.sockPool.get();   
  26.         if (s == null || s.isClosed()) {   
  27.             s = MemBufferedDriver.reconnect();   
  28.             MemBufferedDriver.sockPool.set(s);   
  29.         }   
  30.         return s;   
  31.     }   
  32.   
  33.     private static Socket reconnect() throws UnknownHostException, IOException {   
  34.         String[] ip = MemBufferedDriver.serverAddress.split(":");   
  35.         return new Socket(ip[0], Integer.parseInt(ip[1]));   
  36.   
  37.     }   
  38.   
  39.   
  40.     public Map getMulti(String[] keys) {   
  41.         Map map = new HashMap();   
  42.         if (keys == null || keys.length <= 0return map;   
  43.   
  44.         for (int i = 0; i < keys.length; i++) {   
  45.             Object o = get(keys[i]);   
  46.             if (o != null) map.put(keys[i], o);   
  47.   
  48.         }   
  49.         return map;   
  50.     }   
  51.   
  52.     public Object[] getMultiArray(String[] keys) {   
  53.         if (keys == null || keys.length <= 0return null;   
  54.   
  55.         Object[] o = new Object[keys.length];   
  56.         for (int i = 0; i < keys.length; i++)   
  57.             o[i] = get(keys[i]);   
  58.   
  59.         return o;   
  60.     }   
  61.   
  62.     public boolean set(String key, Object obj) {   
  63.         try {   
  64.             if (obj == null || key == null || "".equals(key)) throw new Exception("对象和key 不能为空");   
  65.             Socket s = MemBufferedDriver.getSocket();   
  66.             BufferedInputStream in = new BufferedInputStream(s.getInputStream());   
  67.             BufferedOutputStream out = new BufferedOutputStream(s.getOutputStream());   
  68.   
  69.             key = encodeKey(key);   
  70.             int flag = 0;   
  71.   
  72.             //序列化对象   
  73.             byte[] bs = object2Byte(obj);   
  74.   
  75.             out.write(MemBufferedDriver.BYTE_SET);           //write cmd   
  76.             out.write(key.getBytes());     //write key   
  77.             out.write(MemBufferedDriver.BYTE_SPACE);   
  78.             out.write(String.valueOf(flag).getBytes());     //write flag   
  79.             out.write(MemBufferedDriver.BYTE_SPACE);   
  80.             out.write("0".getBytes());     //write expire date   
  81.             out.write(MemBufferedDriver.BYTE_SPACE);   
  82.             out.write(String.valueOf(bs.length).getBytes());     //object length   
  83.             out.write(MemBufferedDriver.BYTE_CRLF);   
  84.   
  85.             out.write(bs);   
  86.             out.write(MemBufferedDriver.BYTE_CRLF);   
  87.             out.flush();   
  88.   
  89.             String ret = readLine(in);   
  90.             return MemBufferedDriver.SERVER_STATUS_STORED.equals(ret);   
  91.         } catch (Exception e) {   
  92.             System.out.println(e.getMessage());   
  93.             return false;   
  94.         }   
  95.     }   
  96.   
  97.     public Object get(String key) {   
  98.         try {   
  99.             Socket s = MemBufferedDriver.getSocket();   
  100.             InputStream in = s.getInputStream();   
  101.             OutputStream out = s.getOutputStream();   
  102.             key = encodeKey(key);   
  103.             out.write(MemBufferedDriver.BYTE_GET);   
  104.             out.write(key.getBytes());   
  105.             out.write(MemBufferedDriver.BYTE_CRLF);   
  106.             out.flush();   
  107.   
  108.             return getObjectFromStream(in, out);   
  109.         } catch (Exception e) {   
  110.             System.out.println(e.getMessage());   
  111.             return null;   
  112.         }   
  113.     }   
  114.   
  115.     public boolean delete(String key) {   
  116.         try {   
  117.             Socket s = MemBufferedDriver.getSocket();   
  118.             InputStream in = s.getInputStream();   
  119.             OutputStream out = s.getOutputStream();   
  120.             key = encodeKey(key);   
  121.             out.write(MemBufferedDriver.BYTE_DELETE);   
  122.             out.write(key.getBytes());   
  123.             out.write(MemBufferedDriver.BYTE_CRLF);   
  124.             out.flush();   
  125.   
  126.             String ret = readLine(in);   
  127.             return MemBufferedDriver.SERVER_STATUS_DELETED.equals(ret) || MemBufferedDriver.SERVER_STATUS_NOT_FOUND.equals(ret);   
  128.         } catch (Exception e) {   
  129.             return false;   
  130.         }   
  131.     }   
  132.   
  133.     private Object getObjectFromStream(InputStream in, OutputStream out) throws IOException, ClassNotFoundException {   
  134.         String cmd = readLine(in);   
  135.         if (cmd.startsWith(MemBufferedDriver.SERVER_STATUS_VALUE)) {   
  136.             //return object   
  137.             String[] part = cmd.split(" ");   
  138.             String para = part[2];   
  139.             int length = Integer.parseInt(part[3]);   
  140.   
  141.             byte[] bs = new byte[length];   
  142.   
  143.             int count = 0;   
  144.             while (count < bs.length) count += in.read(bs, count, (bs.length - count));   
  145.             if (count != bs.length)   
  146.                 throw new IOException("读取数据长度错误");   
  147.             readLine(in);   
  148.             String endstr = readLine(in);   
  149.             if (MemBufferedDriver.SERVER_STATUS_END.equals(endstr))   
  150.                 return this.byte2Object(bs);   
  151.             else  
  152.                 System.out.println("结束标记错误");   
  153.   
  154.         }   
  155.         return null;   
  156.     }   
  157.   
  158.     private String encodeKey(String key) throws UnsupportedEncodingException {   
  159.         return URLEncoder.encode(key, MemBufferedDriver.ENCODING_TYPE);   
  160.     }   
  161.   
  162.   
  163.     private String readLine(InputStream in) throws IOException {   
  164.         ByteArrayOutputStream bos = new ByteArrayOutputStream();   
  165.         boolean eol = false;   
  166.         byte[] b = new byte[1];   
  167.         while (in.read(b, 01) != -1) {   
  168.             if (b[0] == 13) eol = true;   
  169.             else if (eol && b[0] == 10break;   
  170.             else  
  171.                 eol = false;   
  172.   
  173.             bos.write(b, 01);   
  174.         }   
  175.   
  176.   
  177.         if (bos.size() == 0return null;   
  178.         return bos.toString().trim();   
  179.     }   
  180.   
  181.   
  182.     private byte[] object2Byte(Object o) throws IOException {   
  183.         ByteArrayOutputStream b = new ByteArrayOutputStream();   
  184.         new ObjectOutputStream(b).writeObject(o);   
  185.         return b.toByteArray();   
  186.     }   
  187.   
  188.     private Object byte2Object(byte[] b) throws IOException, ClassNotFoundException {   
  189.         return new ObjectInputStream(new ByteArrayInputStream(b)).readObject();   
  190.     }   
  191.   
  192.   
  193.     public static void main(String[] args) throws Exception {   
  194.         MemBufferedDriver m = new MemBufferedDriver();   
  195.         System.out.println(m.set("a""DsSD"));   
  196.         System.out.println(m.get("a"));   
  197.     }   
  198.   
  199.     public static void setServerAddress(String serverAddress) {   
  200.         MemBufferedDriver.serverAddress = serverAddress;   
  201.     }   
  202. }  

 
java 代码
写入测试类

  1. public class Fill2Server extends Thread {   
  2.     public static int THREAD_COUNT = 2;   
  3.     public static Queue queue = new Queue();   
  4.     MemBufferedDriver md = new MemBufferedDriver();   
  5.   
  6.     public static void main(String[] args) throws Exception {   
  7.   
  8.         int size ;   
  9.         if (args.length == 3 && args[0] != null && args[1] != null) {   
  10.             MemDriver.setServerAddress(args[0]);   
  11.             size = Integer.parseInt(args[1]);   
  12.             THREAD_COUNT = Integer.parseInt(args[2]);   
  13.             new Fill2Server().doFill(size);   
  14.         } else  
  15.             System.out.println("参数1 连接服务器地址 ipaddress:port ,参数2填充数量,不能小于10000,参数3为使用的线程数");   
  16.   
  17.     }   
  18.   
  19.     private void doFill(int size) throws InterruptedException {   
  20.         int taskCount = size / 1000;   //每个线程负责填充1000的对象   
  21.         for (int i = 0; i < taskCount; i++) {   
  22.             Task t = new Task();   
  23.             t.setTaskId(String.valueOf(i));   
  24.             queue.add(t);   
  25.         }   
  26.   
  27.         long time = System.currentTimeMillis();   
  28.         Thread tr[] = new Thread[THREAD_COUNT];   
  29.         for (int i = 0; i < THREAD_COUNT; i++) {   
  30.             FillThread ft = new FillThread();   
  31.             (tr[i] = new Thread(ft)).start();   
  32.         }   
  33.   
  34.         //监控填充完成   
  35.         while (true) {   
  36.             boolean flag = true;   
  37.             for (int i = 0; i < THREAD_COUNT; i++)   
  38.                 flag &= tr[i].isAlive();   
  39.   
  40.             if (!flag) break;   
  41.   
  42.             Thread.sleep(1000);   
  43.         }   
  44.   
  45.         time = System.currentTimeMillis() - time;   
  46.         System.out.println("任务完成,共用" + (time / 1000) + "s");   
  47.     }   
  48.   
  49.     class FillThread implements Runnable {   
  50.         public void run() {   
  51.             Task task;   
  52.             while (true) {   
  53.                 task = (Task) queue.get();   
  54.                 if (task == nullbreak;   
  55.                 long time = System.nanoTime();   
  56.                 for (int i = 0; i < 1000; i++) {   
  57.                     TestBO b = new TestBO();   
  58.                     md.set(task.getTaskId() + i, b);   
  59.                 }   
  60.                 time = System.nanoTime() - time;   
  61.                 System.out.println(Thread.currentThread().getName() + " avg " + (time / 1000) + " ns ");   
  62.             }   
  63.         }   
  64.     }   
  65. }  

 

java 代码
读取的测试方法
  1. public class GetFromServer extends Thread {   
  2.     public static int THREAD_COUNT = 2;   
  3.     public static Queue queue = new Queue();   
  4.     MemDriver md = new MemDriver();   
  5.   
  6.     public static void main(String[] args) throws Exception {   
  7.   
  8.         int size;   
  9.         if (args.length == 3 && args[0] != null && args[1] != null) {   
  10.             MemDriver.setServerAddress(args[0]);   
  11.             size = Integer.parseInt(args[1]);   
  12.             THREAD_COUNT = Integer.parseInt(args[2]);   
  13.             new GetFromServer().doFill(size);   
  14.         } else  
  15.             System.out.println("参数1 连接服务器地址 ipaddress:port ,参数2读取数量不能小于1000,参数3为使用的线程数");   
  16.   
  17.     }   
  18.   
  19.     private void doFill(int size) throws InterruptedException {   
  20.         int taskCount = size / 100;   //每个线程负责填充1000的对象   
  21.         for (int i = 0; i < taskCount; i++) {   
  22.             Task t = new Task();   
  23.             t.setTaskId(String.valueOf(i));   
  24.             GetFromServer.queue.add(t);   
  25.         }   
  26.   
  27.         long time = System.currentTimeMillis();   
  28.         Thread tr[] = new Thread[GetFromServer.THREAD_COUNT];   
  29.         for (int i = 0; i < GetFromServer.THREAD_COUNT; i++) {   
  30.             GetFromServer.FillThread ft = new GetFromServer.FillThread();   
  31.             (tr[i] = new Thread(ft)).start();   
  32.         }   
  33.   
  34.         //监控填充完成   
  35.         while (true) {   
  36.             boolean flag = true;   
  37.             for (int i = 0; i < GetFromServer.THREAD_COUNT; i++)   
  38.                 flag &= tr[i].isAlive();   
  39.   
  40.             if (!flag) break;   
  41.   
  42.             Thread.sleep(1000);   
  43.         }   
  44.   
  45.   
  46.         time = System.currentTimeMillis() - time;   
  47.         System.out.println("任务完成,共用" + (time / 1000) + "s"
0 请登录后投票
   发表时间:2007-09-01  
codeutil 写道

昨天听同学说他的memecache用的是12G内存.存储大约1000万个键值对,查询速度很快.

不太相信能用到12G内存,估计4G就差不多了
0 请登录后投票
论坛首页 Java企业应用版

跳转论坛:
Global site tag (gtag.js) - Google Analytics