- timelyRain
- 等级:
- 文章: 7
- 积分: 280
|
我的项目原来使用静态HashMap来实现Key->Object的缓存,并且实现脏数据刷新.由于项目要改成集群部署.在单个jvm里运行的静态Hash结构已经无法处理脏数据问题.所以准备使用memcached做分布式缓存来解决.
从网上搜索到的资料来看 memcached能够接受较大量的请求.但其javaclient 由于大量使用同步语句、hashmap,读取流没有使用bufferedStream。造成性能并不好
为此在我的项目里我参考memcached的协议,自己实现一个客户端功能。经过测试后发现一些问题。
测试环境1:
windows xp home
迅驰T2250
1G内存
jdk1.5
server: memcached-1.2.1 nt版本
client: 自己编写
用于传输的pojo 40个属性,序列化后为750个byte
测试client与server同台机器
测试方法
填充测试时填充10w个pojo。创建100个任务,每任务负责向memcached存储1000个pojo。
读取测试时读取10000个pojo 创建100个任务,每任务读取100个pojo。
平均值均按照线程内取平均值后,各线程再取平均值。
在进行上述测试的时候 cpu占用均在90%-100%,根据上述测试结果,开2个线程时效率最高。
thread count |
write avg/per obj |
write totall |
read avg/per obj |
read total |
10 |
2.404991 ms |
29s |
1.709544 ms |
2s |
5 |
0.704780 ms |
18s |
1.333013 ms |
2s |
2 |
0.262194 ms |
15s |
0.414683 ms |
2s |
测试环境2:
AIX5.2
IBM P650 Power4 1.5G(64bit) *4
内存8G
jdk1.4
memcached-1.2.1
libevnet1.1b
client自己编写
用于传输的pojo 40个属性,序列化后为750个byte
测试client与server同台机器
相同的测试方法。测试结果大跌眼睛 10线程时 读写速度为200ms/per object 比我的笔记本慢100倍。就没继续测试
测试环境3:
windows2000 server
xeon 1.5*4
内存8G
jdk1.5
测试时发现cpu占用不高于20%
thread count |
write avg/per obj |
write total |
read avg/per obj |
read total |
20 |
10.266615ns |
71s |
23.21283ns |
15s |
10 |
4.341574ns |
41s |
13.30084ns |
16s |
5 |
1.298717ns |
25s |
9.33258ns |
18s |
2 |
1.298717ns |
21s |
4.02503ns |
23s |
初步测试到这里 发现的问题
1.是暂时没有达到网上宣传的1.5w个对象/s (目前测试cpu为瓶颈)
2.是aix下效率低的可怕,对aix不太熟悉。应该有些设置的参数,这点还要向大家请教。
3.超过2个线程效率就开始低。没有发挥多线程的优势,不知道大家在使用过程中有没有发现这一点,还是说我写的MemBufferedDriver有问题。
我的期望是读写速度均能稳定在0.1毫秒左右,至少 1w obj/s 这样才有可能不影响到现有系统的效率
另外大家在java下使用memcached的时候效率怎么样都是怎么用的呢
下面贴下我的client的实现
memcached的协议可以参考memcache包的doc/protocol.txt
主要是socket初次打开后不关闭,直接存放到ThreadLocal中,与memcached保持一个长练接。每次使用的时候判断连接可用还是需要重新连接。
其他存储和读取实现memcached协议。使用BufferedStream。
序列化采用实现Serializable,直接使用ObjectStream来实现。(由于都是pojo简单数据对象,尝试过实现Externalizable接口自己实现序列化和使用750个byte的String 来序列化,发现性能相差不多故放弃)
java 代码
MemBufferedDriver 为client实现
- public class MemBufferedDriver {
-
-
-
- private final static ThreadLocal sockPool = new ThreadLocal();
- private static String serverAddress = "localhost:11211";
-
- public static final byte[] BYTE_GET = new byte[]{103, 101, 116, 32};
- public static final byte[] BYTE_SET = new byte[]{115, 101, 116, 32};
- public static final byte[] BYTE_DELETE = new byte[]{100, 101, 108, 101, 116, 101, 32};
- public static final byte[] BYTE_CRLF = new byte[]{13, 10};
- public static final byte[] BYTE_SPACE = new byte[]{32};
-
- public static final String SERVER_STATUS_DELETED = "DELETED";
- public static final String SERVER_STATUS_NOT_FOUND = "NOT_FOUND";
- public static final String SERVER_STATUS_STORED = "STORED";
- public static final String SERVER_STATUS_ERROR = "ERROR";
- public static final String SERVER_STATUS_END = "END";
- public static final String SERVER_STATUS_VALUE = "VALUE";
-
- public static final String ENCODING_TYPE = "UTF-8";
-
-
- public static Socket getSocket() throws UnknownHostException, IOException {
- Socket s = (Socket) MemBufferedDriver.sockPool.get();
- if (s == null || s.isClosed()) {
- s = MemBufferedDriver.reconnect();
- MemBufferedDriver.sockPool.set(s);
- }
- return s;
- }
-
- private static Socket reconnect() throws UnknownHostException, IOException {
- String[] ip = MemBufferedDriver.serverAddress.split(":");
- return new Socket(ip[0], Integer.parseInt(ip[1]));
-
- }
-
-
- public Map getMulti(String[] keys) {
- Map map = new HashMap();
- if (keys == null || keys.length <= 0) return map;
-
- for (int i = 0; i < keys.length; i++) {
- Object o = get(keys[i]);
- if (o != null) map.put(keys[i], o);
-
- }
- return map;
- }
-
- public Object[] getMultiArray(String[] keys) {
- if (keys == null || keys.length <= 0) return null;
-
- Object[] o = new Object[keys.length];
- for (int i = 0; i < keys.length; i++)
- o[i] = get(keys[i]);
-
- return o;
- }
-
- public boolean set(String key, Object obj) {
- try {
- if (obj == null || key == null || "".equals(key)) throw new Exception("对象和key 不能为空");
- Socket s = MemBufferedDriver.getSocket();
- BufferedInputStream in = new BufferedInputStream(s.getInputStream());
- BufferedOutputStream out = new BufferedOutputStream(s.getOutputStream());
-
- key = encodeKey(key);
- int flag = 0;
-
-
- byte[] bs = object2Byte(obj);
-
- out.write(MemBufferedDriver.BYTE_SET);
- out.write(key.getBytes());
- out.write(MemBufferedDriver.BYTE_SPACE);
- out.write(String.valueOf(flag).getBytes());
- out.write(MemBufferedDriver.BYTE_SPACE);
- out.write("0".getBytes());
- out.write(MemBufferedDriver.BYTE_SPACE);
- out.write(String.valueOf(bs.length).getBytes());
- out.write(MemBufferedDriver.BYTE_CRLF);
-
- out.write(bs);
- out.write(MemBufferedDriver.BYTE_CRLF);
- out.flush();
-
- String ret = readLine(in);
- return MemBufferedDriver.SERVER_STATUS_STORED.equals(ret);
- } catch (Exception e) {
- System.out.println(e.getMessage());
- return false;
- }
- }
-
- public Object get(String key) {
- try {
- Socket s = MemBufferedDriver.getSocket();
- InputStream in = s.getInputStream();
- OutputStream out = s.getOutputStream();
- key = encodeKey(key);
- out.write(MemBufferedDriver.BYTE_GET);
- out.write(key.getBytes());
- out.write(MemBufferedDriver.BYTE_CRLF);
- out.flush();
-
- return getObjectFromStream(in, out);
- } catch (Exception e) {
- System.out.println(e.getMessage());
- return null;
- }
- }
-
- public boolean delete(String key) {
- try {
- Socket s = MemBufferedDriver.getSocket();
- InputStream in = s.getInputStream();
- OutputStream out = s.getOutputStream();
- key = encodeKey(key);
- out.write(MemBufferedDriver.BYTE_DELETE);
- out.write(key.getBytes());
- out.write(MemBufferedDriver.BYTE_CRLF);
- out.flush();
-
- String ret = readLine(in);
- return MemBufferedDriver.SERVER_STATUS_DELETED.equals(ret) || MemBufferedDriver.SERVER_STATUS_NOT_FOUND.equals(ret);
- } catch (Exception e) {
- return false;
- }
- }
-
- private Object getObjectFromStream(InputStream in, OutputStream out) throws IOException, ClassNotFoundException {
- String cmd = readLine(in);
- if (cmd.startsWith(MemBufferedDriver.SERVER_STATUS_VALUE)) {
-
- String[] part = cmd.split(" ");
- String para = part[2];
- int length = Integer.parseInt(part[3]);
-
- byte[] bs = new byte[length];
-
- int count = 0;
- while (count < bs.length) count += in.read(bs, count, (bs.length - count));
- if (count != bs.length)
- throw new IOException("读取数据长度错误");
- readLine(in);
- String endstr = readLine(in);
- if (MemBufferedDriver.SERVER_STATUS_END.equals(endstr))
- return this.byte2Object(bs);
- else
- System.out.println("结束标记错误");
-
- }
- return null;
- }
-
- private String encodeKey(String key) throws UnsupportedEncodingException {
- return URLEncoder.encode(key, MemBufferedDriver.ENCODING_TYPE);
- }
-
-
- private String readLine(InputStream in) throws IOException {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- boolean eol = false;
- byte[] b = new byte[1];
- while (in.read(b, 0, 1) != -1) {
- if (b[0] == 13) eol = true;
- else if (eol && b[0] == 10) break;
- else
- eol = false;
-
- bos.write(b, 0, 1);
- }
-
-
- if (bos.size() == 0) return null;
- return bos.toString().trim();
- }
-
-
- private byte[] object2Byte(Object o) throws IOException {
- ByteArrayOutputStream b = new ByteArrayOutputStream();
- new ObjectOutputStream(b).writeObject(o);
- return b.toByteArray();
- }
-
- private Object byte2Object(byte[] b) throws IOException, ClassNotFoundException {
- return new ObjectInputStream(new ByteArrayInputStream(b)).readObject();
- }
-
-
- public static void main(String[] args) throws Exception {
- MemBufferedDriver m = new MemBufferedDriver();
- System.out.println(m.set("a", "DsSD"));
- System.out.println(m.get("a"));
- }
-
- public static void setServerAddress(String serverAddress) {
- MemBufferedDriver.serverAddress = serverAddress;
- }
- }
java 代码
写入测试类
- public class Fill2Server extends Thread {
- public static int THREAD_COUNT = 2;
- public static Queue queue = new Queue();
- MemBufferedDriver md = new MemBufferedDriver();
-
- public static void main(String[] args) throws Exception {
-
- int size ;
- if (args.length == 3 && args[0] != null && args[1] != null) {
- MemDriver.setServerAddress(args[0]);
- size = Integer.parseInt(args[1]);
- THREAD_COUNT = Integer.parseInt(args[2]);
- new Fill2Server().doFill(size);
- } else
- System.out.println("参数1 连接服务器地址 ipaddress:port ,参数2填充数量,不能小于10000,参数3为使用的线程数");
-
- }
-
- private void doFill(int size) throws InterruptedException {
- int taskCount = size / 1000;
- for (int i = 0; i < taskCount; i++) {
- Task t = new Task();
- t.setTaskId(String.valueOf(i));
- queue.add(t);
- }
-
- long time = System.currentTimeMillis();
- Thread tr[] = new Thread[THREAD_COUNT];
- for (int i = 0; i < THREAD_COUNT; i++) {
- FillThread ft = new FillThread();
- (tr[i] = new Thread(ft)).start();
- }
-
-
- while (true) {
- boolean flag = true;
- for (int i = 0; i < THREAD_COUNT; i++)
- flag &= tr[i].isAlive();
-
- if (!flag) break;
-
- Thread.sleep(1000);
- }
-
- time = System.currentTimeMillis() - time;
- System.out.println("任务完成,共用" + (time / 1000) + "s");
- }
-
- class FillThread implements Runnable {
- public void run() {
- Task task;
- while (true) {
- task = (Task) queue.get();
- if (task == null) break;
- long time = System.nanoTime();
- for (int i = 0; i < 1000; i++) {
- TestBO b = new TestBO();
- md.set(task.getTaskId() + i, b);
- }
- time = System.nanoTime() - time;
- System.out.println(Thread.currentThread().getName() + " avg " + (time / 1000) + " ns ");
- }
- }
- }
- }
java 代码
读取的测试方法
- public class GetFromServer extends Thread {
- public static int THREAD_COUNT = 2;
- public static Queue queue = new Queue();
- MemDriver md = new MemDriver();
-
- public static void main(String[] args) throws Exception {
-
- int size;
- if (args.length == 3 && args[0] != null && args[1] != null) {
- MemDriver.setServerAddress(args[0]);
- size = Integer.parseInt(args[1]);
- THREAD_COUNT = Integer.parseInt(args[2]);
- new GetFromServer().doFill(size);
- } else
- System.out.println("参数1 连接服务器地址 ipaddress:port ,参数2读取数量不能小于1000,参数3为使用的线程数");
-
- }
-
- private void doFill(int size) throws InterruptedException {
- int taskCount = size / 100;
- for (int i = 0; i < taskCount; i++) {
- Task t = new Task();
- t.setTaskId(String.valueOf(i));
- GetFromServer.queue.add(t);
- }
-
- long time = System.currentTimeMillis();
- Thread tr[] = new Thread[GetFromServer.THREAD_COUNT];
- for (int i = 0; i < GetFromServer.THREAD_COUNT; i++) {
- GetFromServer.FillThread ft = new GetFromServer.FillThread();
- (tr[i] = new Thread(ft)).start();
- }
-
-
- while (true) {
- boolean flag = true;
- for (int i = 0; i < GetFromServer.THREAD_COUNT; i++)
- flag &= tr[i].isAlive();
-
- if (!flag) break;
-
- Thread.sleep(1000);
- }
-
-
- time = System.currentTimeMillis() - time;
- System.out.println("任务完成,共用" + (time / 1000) + "s");
- }
-
-
- class FillThread implements
声明:ITeye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
|
返回顶楼 |
|
|
- sorphi
- 等级:
- 性别:
- 文章: 352
- 积分: 765
- 来自: 北京
|
你的测试中,只有一个data center,socket成了瓶颈
|
返回顶楼 |
|
|
- timelyRain
- 等级:
- 文章: 7
- 积分: 280
|
sorphi 写道 你的测试中,只有一个data center,socket成了瓶颈
从测试结果上看 在win2000 4C 8G上跑效率依然没有提高,cpu负荷很低, 瓶颈是在sock上,但具体瓶颈是在哪里呢。
另外aix下结果非常差,是不是有参数需要设置呢
|
返回顶楼 |
|
|
- sorphi
- 等级:
- 性别:
- 文章: 352
- 积分: 765
- 来自: 北京
|
对AIX也不熟悉,抱歉
你的client线程再多,都阻塞在一个socket通道上了。你试试多开几个memcached server(同一机器上可以不同端口),再看看效果?
|
返回顶楼 |
|
|
- timelyRain
- 等级:
- 文章: 7
- 积分: 280
|
memcache server 本身应该支持多线程访问的。 如果多开监听端口。每个监听端口都是一个实例的。向缓存压数据就要压多个server。
这样应该不行吧。
|
返回顶楼 |
|
|
- timelyRain
- 等级:
- 文章: 7
- 积分: 280
|
大家也说说自己项目里memcached的用法。存储数据量、访问压力和效率吧
|
返回顶楼 |
|
|
- codeutil
- 等级:
- 文章: 666
- 积分: 752
|
昨天听同学说他的memecache用的是12G内存.存储大约1000万个键值对,查询速度很快.
|
返回顶楼 |
|
|
- timelyRain
- 等级:
- 文章: 7
- 积分: 280
|
codeutil 写道
昨天听同学说他的memecache用的是12G内存.存储大约1000万个键值对,查询速度很快.
系统使用的是什么语言体系 memcached server跑在什么os下的呢,client用的什么 server的参数咋配的呢
使用场景是什么。能介绍一下么
|
返回顶楼 |
|
|
- 我想我是海
- 等级:
- 文章: 105
- 积分: 362
|
关注一下。 Memcached 的Java Client在02-28-2007发布了更新版本。据自己说性能有提高。 不知LZ有没测试过自己写的Client及官方的JavaClient的差别呢?
|
返回顶楼 |
|
|
- codeutil
- 等级:
- 文章: 666
- 积分: 752
|
php,全部是linux.应用场景,一个小型的条件匹配搜索.例如交友平台查找符合条件的联系人.
timelyRain 写道 codeutil 写道
昨天听同学说他的memecache用的是12G内存.存储大约1000万个键值对,查询速度很快.
系统使用的是什么语言体系 memcached server跑在什么os下的呢,client用的什么 server的参数咋配的呢
使用场景是什么。能介绍一下么
|
返回顶楼 |
|
|