- 浏览: 519932 次
- 性别:
- 来自: 北京
文章分类
- 全部博客 (563)
- 工作经验 (12)
- 数据库 (13)
- Servlet (10)
- Struts2 (1)
- Spring (25)
- Eclipse (5)
- Hibernate (5)
- Eclips (8)
- HTTP (7)
- J2EE (21)
- EHcache (1)
- HTML (11)
- 工具插件使用 (20)
- JPA (2)
- 杂谈 (17)
- 数据结构与算法 (3)
- Cloud Foundry (1)
- 安全 (10)
- J2SE (57)
- SQL (9)
- DB2 (6)
- 操作系统 (2)
- 设计模式 (1)
- 版本代码管理工具 (13)
- 面试 (10)
- 代码规范 (3)
- Tomcat (12)
- Ajax (5)
- 异常总结 (11)
- REST (2)
- 云 (2)
- RMI (3)
- SOA (1)
- Oracle (12)
- Javascript (20)
- jquery (7)
- JSP自定义标签 (2)
- 电脑知识 (5)
- 浏览器 (3)
- 正则表达式 (3)
- 建站解决问题 (38)
- 数据库设计 (3)
- git (16)
- log4j (1)
- 每天100行代码 (1)
- socket (0)
- java设计模式 耿祥义著 (0)
- Maven (14)
- ibatis (7)
- bug整理 (2)
- 邮件服务器 (8)
- Linux (32)
- TCP/IP协议 (5)
- java多线程并发 (7)
- IO (1)
- 网页小工具 (2)
- Flash (2)
- 爬虫 (1)
- CSS (6)
- JSON (1)
- 触发器 (1)
- java并发 (12)
- ajaxfileupload (1)
- js验证 (1)
- discuz (2)
- Mysql (14)
- jvm (2)
- MyBatis (10)
- POI (1)
- 金融 (1)
- VMWare (0)
- Redis (4)
- 性能测试 (2)
- PostgreSQL (1)
- 分布式 (2)
- Easy UI (1)
- C (1)
- 加密 (6)
- Node.js (1)
- 事务 (2)
- zookeeper (3)
- Spring MVC (2)
- 动态代理 (3)
- 日志 (2)
- 微信公众号 (2)
- IDEA (1)
- 保存他人遇到的问题 (1)
- webservice (11)
- memcached (3)
- nginx (6)
- 抓包 (1)
- java规范 (1)
- dubbo (3)
- xwiki (1)
- quartz (2)
- 数字证书 (1)
- spi (1)
- 学习编程 (6)
- dom4j (1)
- 计算机系统知识 (2)
- JAVA系统知识 (1)
- rpcf (1)
- 单元测试 (2)
- php (1)
- 内存泄漏cpu100%outofmemery (5)
- zero_copy (2)
- mac (3)
- hive (3)
- 分享资料整理 (0)
- 计算机网络 (1)
- 编写操作系统 (1)
- springboot (1)
最新评论
-
masuweng:
亦论一次OutOfMemoryError的定位与解错 -
变脸小伙:
引用[color=red][/color]百度推广中运用的技术 ...
Spring 3 mvc中返回pdf,json,xml等不同的view -
Vanillva:
不同之处是什么??
Mybatis中的like查询 -
thrillerzw:
转了。做个有理想的程序员
有理想的程序员必须知道的15件事 -
liujunhui1988:
觉得很有概括力
15 个必须知道的 Java 面试问题(2年工作经验)
源:http://san-yun.iteye.com/blog/1897250
评:写的 调用链很清晰
dubbo是阿里巴巴开源的单一长连接服务框架,底层通信采用nio框架,支持netty,mina,grizzly,默认是netty。对dubbo比较感兴趣的是:
1. client端的线程模型是什么样的?
传统的io client是请求应答模式,发送请求-->等待远程应答。dubbo底层是异步IO的,所有请求复用单一长连接,所以调用都不会阻在IO上,而是阻在Future超时wait上。
2. server端的线程模型是什么样的?
这个比较成熟了,现在一般的server都是基于nio,一批io thread负责处理io,一批worker thread负责处理业务。
一. 快速启动
学习dubbo最好的方式是快速运行起来,由于dubbo还是比较重量级的产品,之前遇到一些问题。
server端:
Java代码 收藏代码
import java.io.IOException;
import com.alibaba.dubbo.config.ApplicationConfig;
import com.alibaba.dubbo.config.ProtocolConfig;
import com.alibaba.dubbo.config.ServiceConfig;
import com.duitang.dboss.client.test.BlogQueryService;
import com.duitang.dboss.client.test.BlogQueryServiceImpl;
public class DubboServerTester {
public static void main(String[] args) throws IOException {
BlogQueryService blogQueryService = new BlogQueryServiceImpl();
ApplicationConfig application = new ApplicationConfig();
application.setName("dubbo-test");
ProtocolConfig protocol = new ProtocolConfig();
protocol.setName("dubbo");
protocol.setPort(8989);
protocol.setThreads(200);
// RegistryConfig registry = new RegistryConfig();
// registry.setAddress("10.20.130.230:9090");
// registry.setUsername("aaa");
// registry.setPassword("bbb");
ServiceConfig<BlogQueryService> service = new ServiceConfig<BlogQueryService>(); // 此实例很重,封装了与注册中心的连接,请自行缓存,否则可能造成内存和连接泄漏
service.setApplication(application);
// service.setRegistry(registry);
service.setRegister(false);
service.setProtocol(protocol); // 多个协议可以用setProtocols()
service.setInterface(BlogQueryService.class);
service.setRef(blogQueryService);
service.setVersion("1.0.0");
// 暴露及注册服务
service.export();
System.out.println("Press any key to exit.");
System.in.read();
}
}
注意:dubbo export服务默认依赖于RegistryConfig,如果没有配置RegistryConfig会报错.可以通过service.setRegister(false)禁用。
client:
Java代码 收藏代码
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import com.alibaba.dubbo.config.ApplicationConfig;
import com.alibaba.dubbo.config.ReferenceConfig;
import com.duitang.dboss.client.test.BlogQueryService;
public class DubboClientTester {
public static void main(String[] args) throws InterruptedException, IOException {
ApplicationConfig application = new ApplicationConfig();
application.setName("dubbo-test");
ReferenceConfig<BlogQueryService> reference = new ReferenceConfig<BlogQueryService>();
reference.setUrl("dubbo://127.0.0.1:8989/com.duitang.dboss.client.test.BlogQueryService");
reference.setTimeout(500);
reference.setConnections(10);
reference.setApplication(application);
reference.setInterface(BlogQueryService.class);
reference.setVersion("1.0.0");
final BlogQueryService blogQueryService = reference.get();
long begin = System.currentTimeMillis();
System.out.println(blogQueryService.test());
long end = System.currentTimeMillis();
System.out.println(" cost:" + (end - begin));
ExecutorService es = Executors.newFixedThreadPool(50, new NamedThreadFactory("my test"));
List<Callable<String>> tasks = new ArrayList<Callable<String>>();
for (int i = 0; i < 100000; ++i) {
tasks.add(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("run");
System.out.println(blogQueryService.test());
System.out.println("run success");
return null;
}
});
}
List<Future<String>> futurelist = es.invokeAll(tasks);
for (Future<String> future : futurelist) {
try {
String result = future.get();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println("------------------------------------------------------------------------------------------------------------------------------------------------\r\n");
}
es.shutdown();
System.out.println("end");
System.in.read();
}
static class NamedThreadFactory implements ThreadFactory {
private static final AtomicInteger POOL_SEQ = new AtomicInteger(1);
private final AtomicInteger mThreadNum = new AtomicInteger(1);
private final String mPrefix;
private final boolean mDaemo;
private final ThreadGroup mGroup;
public NamedThreadFactory(){
this("pool-" + POOL_SEQ.getAndIncrement(), false);
}
public NamedThreadFactory(String prefix){
this(prefix, false);
}
public NamedThreadFactory(String prefix, boolean daemo){
mPrefix = prefix + "-thread-";
mDaemo = daemo;
SecurityManager s = System.getSecurityManager();
mGroup = (s == null) ? Thread.currentThread().getThreadGroup() : s.getThreadGroup();
}
public Thread newThread(Runnable runnable) {
String name = mPrefix + mThreadNum.getAndIncrement();
Thread ret = new Thread(mGroup, runnable, name, 0);
ret.setDaemon(mDaemo);
return ret;
}
public ThreadGroup getThreadGroup() {
return mGroup;
}
}
}
1. 通过setUrl("")来实现远程服务直连。
2. 需要注意的是默认connection只有一个,可以通过setConnections()来指定connection pool。在高负载环境下,nio的单连接也会遇到瓶颈,此时你可以通过设置连接池来让更多的连接分担dubbo的请求负载,从而提高系统的吞吐量。”
二. 代码流程
这里重点分析一下client的调用过程,client调用分为三个部分:
1). 初始化,建立连接。
2). 发送请求。
3). 等待远程应答。
(一).初始化
1. DubboProtocol.initClient()
2. Exchangers.connect(URL url, ExchangeHandler handler)
3. Exchangers.getExchanger(url).connect(url, handler)
4. HeaderExchanger.connect(URL url, ExchangeHandler handler)
5. return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
6. Transporters.getTransporter().connect(URL url, ChannelHandler handler)
7. NettyTransporter.connect(URL url, ChannelHandler listener)
8. new NettyClient(url, listener) //timeout默认值:timeout=1000;connectTimeout=3000;
9. NettyClient.doOpen() //创建netty的ClientBootstrap
bootstrap = new ClientBootstrap(channelFactory);
bootstrap.setOption("keepAlive", true);
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("connectTimeoutMillis", getTimeout()); //注意:此timeout是timeout,而非connectTimeout
10. AbstractClient.connect()
11. NettyClient.doConnect() //如果远程地址无法连接,抛出timeout异常流程结束。
ChannelFuture future = bootstrap.connect(getConnectAddress());
boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);
(二).发送请求
1.DubboInvoker.doInvoke(Invocation invocation) //currentClient.request(invocation, timeout).get()
2.HeaderExchangeClient.request(invocation, timeout)
3.HeaderExchangeChannel.request(Invocation invocation,timeout)
4.AbstractPeer.send(Request request)
5.NettyChannel.send(Object message, boolean sent)
6.NioClientSocketChannel.write(message)
7.NettyHandler.writeRequested(ChannelHandlerContext ctx, MessageEvent e)
8.AbstractPeer.sent(Channel ch, Request request)
(三).等待远程应答
在调用DubboInvoker.doInvoke(Invocation invocation)中实际是调用currentClient.request(invocation, timeout).get(),此方法会返回DefaultFuture,调用get方法会阻塞直到超时,在阻塞的同时netty的io线程会接收到远程应答,如果收到响应会产生io事件调用NettyHandler.messageReceived。
1.NettyHandler.messageReceived(ChannelHandlerContext ctx, MessageEvent e)
2.AbstractPeer.received(Channel ch, Object msg)
3.MultiMessageHandler.received(Channel channel, Object message)
4.AllChannelHandler.received(Channel channel, Object message)
5.DecodeHandler.received(Channel channel, Object message)
6.HeaderExchangeHandler.received(Channel channel, Object message)
7.DefaultFuture.received(Channel channel, Response response) //注意是static方法
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
future.doReceived(response);
}
三. dubbo client的核心
我认为dubbo client的核心在DefaultFuture。所以远程调用都不会阻在IO上,而是阻在Future超时wait上,下面忽略掉远程调用把future抽取出来。
下面是代码实现
Java代码 收藏代码
package executor;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
public class Commands {
private ExecutorService senders = Executors.newCachedThreadPool();
private ExecutorService receviers = Executors.newCachedThreadPool();
private AtomicLong counter = new AtomicLong();
public CommandResponse execute(Callable<Object> task, int timeout) {
Future<Object> result = senders.submit(task);
long id = counter.getAndIncrement();
CommandFuture commandFuture = new CommandFuture(id);
receviers.submit(new ReceiveWorker(id, result));
return commandFuture.get(timeout);
}
static class ReceiveWorker implements Runnable {
private Future<Object> result;
private Long id;
public ReceiveWorker(Long id, Future<Object> result){
super();
this.result = result;
this.id = id;
}
@Override
public void run() {
try {
Object obj = result.get();
CommandFuture.received(new CommandResponse(id, obj));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
public void shutdown() {
senders.shutdown();
receviers.shutdown();
}
}
Java代码 收藏代码
package executor;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class CommandFuture {
private final Lock lock = new ReentrantLock();
private final Condition done = lock.newCondition();
private CommandResponse response;
private static final Map<Long, CommandFuture> FUTURES = new ConcurrentHashMap<Long, CommandFuture>();
public CommandFuture(Long id){
FUTURES.put(id, this);
}
public boolean isDone() {
return response != null;
}
public CommandResponse get(int timeout) {
if (!isDone()) {
long start = System.currentTimeMillis();
lock.lock();
try {
while (!isDone()) {
done.await(timeout, TimeUnit.MILLISECONDS);
if (isDone() || System.currentTimeMillis() - start >= timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
if (!isDone()) {
throw new TimeoutException("timeout");
}
}
return response;
}
public void doReceived(CommandResponse response) {
lock.lock();
try {
this.response = response;
if (done != null) {
done.signal();
}
} finally {
lock.unlock();
}
}
public static void received(CommandResponse response) {
try {
CommandFuture future = FUTURES.remove(response.getId());
if (future != null) {
future.doReceived(response);
} else {
System.out.println("some error!");
}
} finally {
// CHANNELS.remove(response.getId());
}
}
}
Java代码 收藏代码
package executor;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
public class Commands {
private ExecutorService senders = Executors.newCachedThreadPool();
private ExecutorService receviers = Executors.newCachedThreadPool();
private AtomicLong counter = new AtomicLong();
public CommandResponse execute(Callable<Object> task, int timeout) {
Future<Object> result = senders.submit(task);
long id = counter.getAndIncrement();
CommandFuture commandFuture = new CommandFuture(id);
receviers.submit(new ReceiveWorker(id, result));
return commandFuture.get(timeout);
}
static class ReceiveWorker implements Runnable {
private Future<Object> result;
private Long id;
public ReceiveWorker(Long id, Future<Object> result){
super();
this.result = result;
this.id = id;
}
@Override
public void run() {
try {
Object obj = result.get();
CommandFuture.received(new CommandResponse(id, obj));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
public void shutdown() {
senders.shutdown();
receviers.shutdown();
}
}
下面是jstack
评:写的 调用链很清晰
dubbo是阿里巴巴开源的单一长连接服务框架,底层通信采用nio框架,支持netty,mina,grizzly,默认是netty。对dubbo比较感兴趣的是:
1. client端的线程模型是什么样的?
传统的io client是请求应答模式,发送请求-->等待远程应答。dubbo底层是异步IO的,所有请求复用单一长连接,所以调用都不会阻在IO上,而是阻在Future超时wait上。
2. server端的线程模型是什么样的?
这个比较成熟了,现在一般的server都是基于nio,一批io thread负责处理io,一批worker thread负责处理业务。
一. 快速启动
学习dubbo最好的方式是快速运行起来,由于dubbo还是比较重量级的产品,之前遇到一些问题。
server端:
Java代码 收藏代码
import java.io.IOException;
import com.alibaba.dubbo.config.ApplicationConfig;
import com.alibaba.dubbo.config.ProtocolConfig;
import com.alibaba.dubbo.config.ServiceConfig;
import com.duitang.dboss.client.test.BlogQueryService;
import com.duitang.dboss.client.test.BlogQueryServiceImpl;
public class DubboServerTester {
public static void main(String[] args) throws IOException {
BlogQueryService blogQueryService = new BlogQueryServiceImpl();
ApplicationConfig application = new ApplicationConfig();
application.setName("dubbo-test");
ProtocolConfig protocol = new ProtocolConfig();
protocol.setName("dubbo");
protocol.setPort(8989);
protocol.setThreads(200);
// RegistryConfig registry = new RegistryConfig();
// registry.setAddress("10.20.130.230:9090");
// registry.setUsername("aaa");
// registry.setPassword("bbb");
ServiceConfig<BlogQueryService> service = new ServiceConfig<BlogQueryService>(); // 此实例很重,封装了与注册中心的连接,请自行缓存,否则可能造成内存和连接泄漏
service.setApplication(application);
// service.setRegistry(registry);
service.setRegister(false);
service.setProtocol(protocol); // 多个协议可以用setProtocols()
service.setInterface(BlogQueryService.class);
service.setRef(blogQueryService);
service.setVersion("1.0.0");
// 暴露及注册服务
service.export();
System.out.println("Press any key to exit.");
System.in.read();
}
}
注意:dubbo export服务默认依赖于RegistryConfig,如果没有配置RegistryConfig会报错.可以通过service.setRegister(false)禁用。
client:
Java代码 收藏代码
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import com.alibaba.dubbo.config.ApplicationConfig;
import com.alibaba.dubbo.config.ReferenceConfig;
import com.duitang.dboss.client.test.BlogQueryService;
public class DubboClientTester {
public static void main(String[] args) throws InterruptedException, IOException {
ApplicationConfig application = new ApplicationConfig();
application.setName("dubbo-test");
ReferenceConfig<BlogQueryService> reference = new ReferenceConfig<BlogQueryService>();
reference.setUrl("dubbo://127.0.0.1:8989/com.duitang.dboss.client.test.BlogQueryService");
reference.setTimeout(500);
reference.setConnections(10);
reference.setApplication(application);
reference.setInterface(BlogQueryService.class);
reference.setVersion("1.0.0");
final BlogQueryService blogQueryService = reference.get();
long begin = System.currentTimeMillis();
System.out.println(blogQueryService.test());
long end = System.currentTimeMillis();
System.out.println(" cost:" + (end - begin));
ExecutorService es = Executors.newFixedThreadPool(50, new NamedThreadFactory("my test"));
List<Callable<String>> tasks = new ArrayList<Callable<String>>();
for (int i = 0; i < 100000; ++i) {
tasks.add(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("run");
System.out.println(blogQueryService.test());
System.out.println("run success");
return null;
}
});
}
List<Future<String>> futurelist = es.invokeAll(tasks);
for (Future<String> future : futurelist) {
try {
String result = future.get();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println("------------------------------------------------------------------------------------------------------------------------------------------------\r\n");
}
es.shutdown();
System.out.println("end");
System.in.read();
}
static class NamedThreadFactory implements ThreadFactory {
private static final AtomicInteger POOL_SEQ = new AtomicInteger(1);
private final AtomicInteger mThreadNum = new AtomicInteger(1);
private final String mPrefix;
private final boolean mDaemo;
private final ThreadGroup mGroup;
public NamedThreadFactory(){
this("pool-" + POOL_SEQ.getAndIncrement(), false);
}
public NamedThreadFactory(String prefix){
this(prefix, false);
}
public NamedThreadFactory(String prefix, boolean daemo){
mPrefix = prefix + "-thread-";
mDaemo = daemo;
SecurityManager s = System.getSecurityManager();
mGroup = (s == null) ? Thread.currentThread().getThreadGroup() : s.getThreadGroup();
}
public Thread newThread(Runnable runnable) {
String name = mPrefix + mThreadNum.getAndIncrement();
Thread ret = new Thread(mGroup, runnable, name, 0);
ret.setDaemon(mDaemo);
return ret;
}
public ThreadGroup getThreadGroup() {
return mGroup;
}
}
}
1. 通过setUrl("")来实现远程服务直连。
2. 需要注意的是默认connection只有一个,可以通过setConnections()来指定connection pool。在高负载环境下,nio的单连接也会遇到瓶颈,此时你可以通过设置连接池来让更多的连接分担dubbo的请求负载,从而提高系统的吞吐量。”
二. 代码流程
这里重点分析一下client的调用过程,client调用分为三个部分:
1). 初始化,建立连接。
2). 发送请求。
3). 等待远程应答。
(一).初始化
1. DubboProtocol.initClient()
2. Exchangers.connect(URL url, ExchangeHandler handler)
3. Exchangers.getExchanger(url).connect(url, handler)
4. HeaderExchanger.connect(URL url, ExchangeHandler handler)
5. return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
6. Transporters.getTransporter().connect(URL url, ChannelHandler handler)
7. NettyTransporter.connect(URL url, ChannelHandler listener)
8. new NettyClient(url, listener) //timeout默认值:timeout=1000;connectTimeout=3000;
9. NettyClient.doOpen() //创建netty的ClientBootstrap
bootstrap = new ClientBootstrap(channelFactory);
bootstrap.setOption("keepAlive", true);
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("connectTimeoutMillis", getTimeout()); //注意:此timeout是timeout,而非connectTimeout
10. AbstractClient.connect()
11. NettyClient.doConnect() //如果远程地址无法连接,抛出timeout异常流程结束。
ChannelFuture future = bootstrap.connect(getConnectAddress());
boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);
(二).发送请求
1.DubboInvoker.doInvoke(Invocation invocation) //currentClient.request(invocation, timeout).get()
2.HeaderExchangeClient.request(invocation, timeout)
3.HeaderExchangeChannel.request(Invocation invocation,timeout)
4.AbstractPeer.send(Request request)
5.NettyChannel.send(Object message, boolean sent)
6.NioClientSocketChannel.write(message)
7.NettyHandler.writeRequested(ChannelHandlerContext ctx, MessageEvent e)
8.AbstractPeer.sent(Channel ch, Request request)
(三).等待远程应答
在调用DubboInvoker.doInvoke(Invocation invocation)中实际是调用currentClient.request(invocation, timeout).get(),此方法会返回DefaultFuture,调用get方法会阻塞直到超时,在阻塞的同时netty的io线程会接收到远程应答,如果收到响应会产生io事件调用NettyHandler.messageReceived。
1.NettyHandler.messageReceived(ChannelHandlerContext ctx, MessageEvent e)
2.AbstractPeer.received(Channel ch, Object msg)
3.MultiMessageHandler.received(Channel channel, Object message)
4.AllChannelHandler.received(Channel channel, Object message)
5.DecodeHandler.received(Channel channel, Object message)
6.HeaderExchangeHandler.received(Channel channel, Object message)
7.DefaultFuture.received(Channel channel, Response response) //注意是static方法
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
future.doReceived(response);
}
三. dubbo client的核心
我认为dubbo client的核心在DefaultFuture。所以远程调用都不会阻在IO上,而是阻在Future超时wait上,下面忽略掉远程调用把future抽取出来。
下面是代码实现
Java代码 收藏代码
package executor;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
public class Commands {
private ExecutorService senders = Executors.newCachedThreadPool();
private ExecutorService receviers = Executors.newCachedThreadPool();
private AtomicLong counter = new AtomicLong();
public CommandResponse execute(Callable<Object> task, int timeout) {
Future<Object> result = senders.submit(task);
long id = counter.getAndIncrement();
CommandFuture commandFuture = new CommandFuture(id);
receviers.submit(new ReceiveWorker(id, result));
return commandFuture.get(timeout);
}
static class ReceiveWorker implements Runnable {
private Future<Object> result;
private Long id;
public ReceiveWorker(Long id, Future<Object> result){
super();
this.result = result;
this.id = id;
}
@Override
public void run() {
try {
Object obj = result.get();
CommandFuture.received(new CommandResponse(id, obj));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
public void shutdown() {
senders.shutdown();
receviers.shutdown();
}
}
Java代码 收藏代码
package executor;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class CommandFuture {
private final Lock lock = new ReentrantLock();
private final Condition done = lock.newCondition();
private CommandResponse response;
private static final Map<Long, CommandFuture> FUTURES = new ConcurrentHashMap<Long, CommandFuture>();
public CommandFuture(Long id){
FUTURES.put(id, this);
}
public boolean isDone() {
return response != null;
}
public CommandResponse get(int timeout) {
if (!isDone()) {
long start = System.currentTimeMillis();
lock.lock();
try {
while (!isDone()) {
done.await(timeout, TimeUnit.MILLISECONDS);
if (isDone() || System.currentTimeMillis() - start >= timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
if (!isDone()) {
throw new TimeoutException("timeout");
}
}
return response;
}
public void doReceived(CommandResponse response) {
lock.lock();
try {
this.response = response;
if (done != null) {
done.signal();
}
} finally {
lock.unlock();
}
}
public static void received(CommandResponse response) {
try {
CommandFuture future = FUTURES.remove(response.getId());
if (future != null) {
future.doReceived(response);
} else {
System.out.println("some error!");
}
} finally {
// CHANNELS.remove(response.getId());
}
}
}
Java代码 收藏代码
package executor;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
public class Commands {
private ExecutorService senders = Executors.newCachedThreadPool();
private ExecutorService receviers = Executors.newCachedThreadPool();
private AtomicLong counter = new AtomicLong();
public CommandResponse execute(Callable<Object> task, int timeout) {
Future<Object> result = senders.submit(task);
long id = counter.getAndIncrement();
CommandFuture commandFuture = new CommandFuture(id);
receviers.submit(new ReceiveWorker(id, result));
return commandFuture.get(timeout);
}
static class ReceiveWorker implements Runnable {
private Future<Object> result;
private Long id;
public ReceiveWorker(Long id, Future<Object> result){
super();
this.result = result;
this.id = id;
}
@Override
public void run() {
try {
Object obj = result.get();
CommandFuture.received(new CommandResponse(id, obj));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
public void shutdown() {
senders.shutdown();
receviers.shutdown();
}
}
下面是jstack
相关推荐
### Dubbo超时机制导致的雪崩连接问题分析 #### 一、背景介绍 在分布式系统中,服务间调用非常频繁,为了提高系统的稳定性和可用性,通常会采用诸如Dubbo这样的微服务框架来进行服务治理。Dubbo作为一款高性能、轻...
【标题】"dubbo运行实例"揭示了这是一个关于运行Dubbo服务的实际操作案例。Dubbo是阿里巴巴开源的一个高性能、轻量级的服务治理框架,它主要应用于分布式系统的微服务架构中,提供服务的发布、发现、调用等功能。 ...
【压缩包子文件的文件名称列表】"dubbo-master"通常表示这是Dubbo项目的主分支或者完整版本,可能包含了Dubbo的所有模块,包括核心库、服务治理模块、协议支持、SPI扩展机制、以及相关的示例和文档。 深入讲解这些...
在`dubbo-samples-monitor`模块中,你可以看到如何配置监控并查看服务的运行状态。 9. **API与XML配置** Dubbo既支持API编程,也支持基于XML的配置方式。在示例中,两者都有涉及,帮助开发者理解如何灵活选择配置...
Dubbo提供了服务注册、服务发现、负载均衡、容错机制、调用链路监控等多种功能。它支持多种协议,如RPC、HTTP、Hessian等,能够灵活地与各种中间件集成。Dubbo的核心特性包括: 1. **服务注册与发现**:服务提供者...
Dubbo SPI(Service Provider Interface)是阿里巴巴开源的Dubbo框架中的一个重要特性,它提供了一种动态发现服务提供者和加载实现类的机制,使得服务消费者无需关心服务提供者的具体实现,增强了系统的可扩展性和...
1. **访问登录页面**:Dubbo Admin启动成功后,可以通过浏览器访问`http://[your_server_ip]:[port]/dubbo-admin/`(其中`[your_server_ip]`为运行Tomcat服务器的IP地址,`[port]`为Tomcat监听的端口号,默认为8080...
【Dubbo分布式架构可运行代码Demo】是一个针对初学者和开发者设计的实例,旨在帮助他们理解和实践Dubbo的分布式架构。这个Demo包含了基础的配置和运行示例,以直观、简洁的方式展示了如何在实际项目中应用Dubbo。...
这份资料旨在帮助开发者深入理解Dubbo的内部工作机制,提升在实际项目中的应用能力。 1. **服务注册与发现** - ZooKeeper作为默认注册中心:Dubbo如何利用ZooKeeper进行服务注册和发现,以及心跳检测和会话保持的...
"的应用来演示Dubbo的基本用法,采用Maven作为项目构建工具,方便开发者直接导入到IDE中运行。 首先,我们需要了解Dubbo的核心概念。Dubbo是一个基于Java的RPC(Remote Procedure Call)框架,它提供了服务发现、...
通过这三个组件的组合,我们可以了解到Dubbo的核心运作机制:消费者如何查找并调用服务,提供者如何暴露服务,以及如何通过监控来维护系统的健康运行。此外,Dubbo还支持多种通信协议、序列化方式、服务注册中心,...
1. **JDK**: Dubbo运行在Java平台上,所以确保你已经安装了JDK,并且版本至少为1.8。设置好`JAVA_HOME`环境变量以便后续使用。 2. **Maven**:Dubbo项目通常使用Maven作为构建工具,因此你需要安装Maven并配置好`...
1. **Dubbo Admin**: Dubbo Admin是一个图形化界面,主要用于监控服务提供者和服务消费者的运行状态,可以查看服务的注册信息、调用统计、配置信息等。它提供了服务树视图,展示服务的层次结构;接口调用视图,展示...
在dubbo的小demo中,Maven作为依赖管理工具,负责管理项目依赖关系,包括Dubbo、SpringBoot等库的版本和引入,使得开发环境更加简洁,便于项目的构建和运行。 【SpringBoot的使用】 SpringBoot简化了Spring应用的...
本文对dubbo源码进行了深入的解析,涵盖了dubbo的架构、核心机制分析、扩展点加载流程、代理机制、远程调用流程、集群和容错处理、监控机制等多个方面。通过阅读和理解这些内容,可以更好地掌握dubbo的内部工作机制...
本篇文章将深入探讨`dubbo.xsd`文件及其在Dubbo中的作用,旨在帮助读者理解Dubbo的服务治理机制。 首先,`dubbo.xsd`是Dubbo项目的核心配置文件之一,它定义了Dubbo服务的各种配置元素和属性。通过这个文件,开发者...
总的来说,"incubator-dubbo-dubbo-2.5.8" 提供了一个完整的解决方案,涵盖了分布式服务的开发、运行和维护,是企业级微服务架构的重要组成部分。下载并使用这个版本,可以极大地提高开发效率和系统稳定性。
Dubbo 框架具有许多特性,如透明化的远程方法调用、软负载均衡及容错机制、服务自动注册与发现等。 Dubbo 框架采用全 Spring 配置方式,透明化接入应用,对应用没有任何 API 侵入。 在 Dubbo 框架中,节点角色包括 ...
1. 启动dubbo-admin:运行dubbo-admin-server-0.1.jar文件,一般通过Java的jar命令来启动,例如`java -jar dubbo-admin-server-0.1.jar`。 2. 访问Web界面:在浏览器中输入启动后的服务器地址,通常默认是`...
Dubbo作为一款优秀的分布式服务框架,不仅提供了高性能的RPC服务调用机制,还通过其完善的架构设计解决了分布式系统中的服务管理和监控问题。通过本文的学习,读者不仅对Dubbo有了初步的认识,而且也能了解到如何...