1.同步调用:
核心原理:利用JDK的动态代理类创建service代理对象,然后在InvocationHandler中调用channel发送数据, 同时利用信号量同步等待结果返回。
本质我的理解是发送数据时采用“future超时模式”把异步变同步等待数据返回,这里的同步是指每次发送、接收过程的同步,即每次发送都会等待数据响应,消息传输还是异步的。
核心本质:每个请求都持有一个InvokeResult(封装一个信号量),通过invokeFuture.getResult(timeout, TimeUnit.MILLISECONDS);等待超时实现
核心代码:invokeFuture.getResult(timeout, TimeUnit.MILLISECONDS);
相关核心代码:
创建代理:
/**
* @Title: wrapSyncProxy4Service
* @Description: 为远程服务创建同步动态代理,返回代理对象
* @author 简道
* @param serviceInterface
* 远程服务接口
* @param dispatchStrategy
* 分发策略
* @return T 返回类型
*/
@SuppressWarnings("unchecked")
private static <T> T wrapSyncProxy4Service(Class<T> serviceInterface, SyncDispatchStrategy dispatchStrategy) {
if (serviceInterface == null) {
throw new IllegalArgumentException("serviceInterface can not be null.");
} else if (!serviceInterface.isInterface()) {
throw new IllegalArgumentException("serviceInterface is required to be interface.");
} else if (dispatchStrategy == null) {
throw new IllegalArgumentException("dispatchStrategy is required to be interface.");
}
InvocationHandler requestHandler = new SyncServiceRequestHandler(serviceInterface.getSimpleName(),
dispatchStrategy);
// 创建代理
T serviceProxy = (T) Proxy.newProxyInstance(getClassLoader(serviceInterface), new Class[] { serviceInterface },
requestHandler);
return serviceProxy;
}
同步请求处理:
/**
* @Title: SyncServiceRequestHandler.java
* @Package org.summercool.hsf.proxy.strategy
* @Description: 同步请求处理
* @author 简道
* @date 2011-9-30 下午3:10:10
* @version V1.0
*/
public class SyncServiceRequestHandler implements InvocationHandler {
String serviceName;
SyncDispatchStrategy dispatchStrategy;
public SyncServiceRequestHandler(String serviceName, SyncDispatchStrategy dispatchStrategy) {
if (serviceName == null) {
throw new IllegalArgumentException("serviceName can not be null.");
} else if (dispatchStrategy == null) {
throw new IllegalArgumentException("dispatchStrategy can not be null.");
}
this.serviceName = serviceName;
this.dispatchStrategy = dispatchStrategy;
}
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
RemoteServiceObject remoteServiceObject = new RemoteServiceObject();
remoteServiceObject.setMethodName(method.getName());
remoteServiceObject.setServiceName(serviceName);
remoteServiceObject.setArgs(args);
InvokeResult result = dispatchStrategy.dispatch(remoteServiceObject);
if (result.size() > 0) {
return result.getFirstValue();
}
return ReflectionUtil.getDefaultValue(method.getReturnType());
}
消息分发:
@Override
public InvokeResult dispatch(Object message) {
if (message == null) {
throw new IllegalArgumentException("Message can not be null.");
} else if (!service.isAlived()) {
throw new IllegalStateException("service is not alived.");
}
HsfChannel channel = getChannel(service.getGroups());
Object retObj = write(message, channel);
// 构建结果
InvokeResult invokeResult = new InvokeResult();
invokeResult.put(((HsfChannel) channel).getChannelGroup().getName(), retObj);
return invokeResult;
}
HsfChannel同步write:
public Object writeSync(Object msg) {
InvokeFuture<?> invokeFuture = writeAsync(msg);
Object retObj = null;
boolean invokeTimeout = false;
Integer timeout = LangUtil.parseInt(service.getOption(HsfOptions.SYNC_INVOKE_TIMEOUT), 60000);
if (invokeTimeout = (timeout != null && timeout > 0)) {
// 等待返回,直到Response返回或超时
retObj = invokeFuture.getResult(timeout, TimeUnit.MILLISECONDS);
}
if (!invokeTimeout) {
// 一直等待,直到Response返回
retObj = invokeFuture.getResult();
}
return retObj;
}
public V getResult(long timeout, TimeUnit unit) {
if (!isDone()) {
try {
if (!semaphore.tryAcquire(timeout, unit)) {
setCause(new HsfTimeoutException("time out."));
}
} catch (InterruptedException e) {
throw new HsfRuntimeException(e);
}
}
// check exception
if (cause != null) {
if (cause instanceof HsfRemoteServiceException) {
throw ((HsfRemoteServiceException) cause);
}
throw new HsfRuntimeException(cause);
}
//
return this.result;
}
思考其他同步模型思路:利用JAVA Object的 wait() notify()实现同步模型, 这种思路请求和响应信息必须存储在一个对象上才好实现同步,并且只有一个线程用来轮询发送。
理论和原理:
(1).wait()
等待对象的同步锁,需要获得该对象的同步锁才可以调用这个方法,否则编译可以通过,但运行时会收到一个异常:IllegalMonitorStateException。调用任意对象的 wait() 方法导致该线程阻塞,该线程不可继续执行,并且该对象上的锁被释放。
(2).notify()
唤醒在等待该对象同步锁的线程(只唤醒一个,如果有多个在等待),注意的是在调用此方法的时候,并不能确切的唤醒某一个等待状态的线程,而是由JVM确定唤醒哪个线程,而且不是按优先级。调用任意对象的notify()方法则导致因调用该对象的 wait()方法而阻塞的线程中随机选择的一个解除阻塞(但要等到获得锁后才真正可执行)。
伪代码:
public class RequestSendTask extends Thread
{
public RequestSendTask(Connector connector1, RequestHolder requestholder)
{
super("request_send_task");
shutdown = false;
connector = connector1;
requestHolder = requestholder;
start();
}
public void run()
{
while(!shutdown)
try
{
execute();
}
catch(Exception exception)
{
logger.error(String.format("send request to [%s:%d] error.", new Object[] {
connector.getHost(), Integer.valueOf(connector.getPort())
}), exception);
closeAndClear();
}
}
private void execute()
throws Exception
{
do
{
if(shutdown)
break;
Request request = requestHolder.getRequest();
if(request != null)
{
connector.getSession().write(request);
requestHolder.resetRequestTime(request.getRequestId());
}
} while(true);
}
public Response waitForResponse(Request request)
throws Exception
{
RequestWrap requestwrap = new RequestWrap(request);
waitingObjs.put(Integer.valueOf(request.getRequestId()), requestwrap);
requestQueue.put(Integer.valueOf(request.getRequestId()));
if(requestwrap.getStatus() != 5)
synchronized(requestwrap)
{
requestwrap.setStatus((byte)2);
requestwrap.wait();
}
if(requestwrap.getStatus() == 3)
return (Response)requestwrap.getObj();
String s = (new StringBuilder()).append(request.getBeanName()).append(".").append(request.getMethod()).toString();
switch(requestwrap.getStatus())
{
case 4: // '\004'
throw new TimeoutException((new StringBuilder()).append("invoke ").append(s).append(" timeout.").toString());
case 5: // '\005'
throw new SessionCloseException((new StringBuilder()).append("invoke ").append(s).append(" session close.").toString());
}
throw new TransportException((new StringBuilder()).append("invoke ").append(s).append(" error.").toString());
}
public void receiveResponse(Response response)
{
RequestWrap requestwrap = (RequestWrap)waitingObjs.remove(Integer.valueOf(response.getRequestId()));
if(requestwrap != null)
{
requestwrap.setObj(response);
synchronized(requestwrap)
{
requestwrap.setStatus((byte)3);
requestwrap.notify();
}
} else
{
logger.warn((new StringBuilder()).append("remove response, id: ").append(response.getRequestId()).toString());
}
}
分享到:
相关推荐
笔者工作的这几年之中,总结并开发了如下几个框架: summercool(Web 框架,已经应用于某国内大型网络公司的等重要应用)、summercool-hsf(基于Netty实现的RPC框架,已经应用国内某移动互联网公司)、 summercool-...
summercool-hsf Automatically exported from code.google.com/p/summercool-hsf 1.目前为止性能最高的RPC远程通讯框架 2.也可以做为手机长连接的Server,经测试已经达到了50W以上的性能长连接 (需调整linux内核...
summercool-ddlAutomatically exported from code.google.com/p/summercool-ddl1.依赖Xml代码 收藏代码org.summercoolsummercool-ddl1.0源码svn地址:2.准备Sql映射文件Xml代码 收藏...3.Spring配置Xml代码 收藏代码
summercool-ddl Automatically exported from code.google.com/p/summercool-ddl 学习了解使用!
笔者工作的这几年之中,总结并开发了如下几个框架: summercool( Web框架,已经应用于某国内大型网络公司的等重要应用)、summercool-hsf(基于Netty实现的RPC框架,已经应用国内某移动互联网公司)、summercool-...
3. **分布式事务**:分库分表后,跨表操作可能会涉及多个数据库,这时就需要解决分布式事务的问题。Ibatis不提供内置的分布式事务管理,但可以通过其他开源框架如Seata(原名:TCC)来实现。 4. **数据一致性**:...