`

【原创】同步转异步+RPC的一个POS行业应用-关键技术实现

阅读更多



简单回顾下业务模型:收银台<=>POS代理服务器<=>POS机,三者之间进行通讯,POS代理作为一个报文转换和同步转异步角色。

下面介绍下几个关键技术实现:

1、报文

这里的报文,指的是POS代理服务器跟POS通讯之间约定的报文。根据POS交易类型,支付、刷卡、打印等,约定每个交易报文包含什么字段信息和长度,其中一个比较特别字段是UUID,这个字段是每个报文的关键字段,
具有唯一性,每个报文都不同,主要用来实现同步转异步中,POS返回数据给代理服务器后找回原来发送指令的channel,并最终把转换后的数据发送给收银台。

之所以要找到原来的channel,是因为同步转异步的过程中,channel是被临时保存起来的。


2、同步转异步关键代码

public class PosResponseFuture<I> {


private final Lock lock = new ReentrantLock();
private final Condition done = lock.newCondition();

private String uuid;//消息序列号

//psoresponse使用
private final static Map<String, PosResponseFuture> futures = new ConcurrentHashMap<String, PosResponseFuture>();
private final static Object synLock = new Object();

public I write2pos(boolean broadcastFlag,MsgRequest msg) throws PosConnException,TimeOutException,TryLaterException {


synchronized(synLock)
{
long st = System.currentTimeMillis();

lock.lock();
try {
this.uuid = msg.getId();
futures.put(this.uuid, this);//把当前调用环境保存起来

//向pos发送消息
log.debug("向POS发送消息:{}",msg);

PosIntContext.write2pos(msg);

int timeout = PosIntContext.getApiTimeout();
if (msg.getTimeout()!=-1)
{
timeout = msg.getTimeout();
log.debug("超时设置:{}",timeout);
}
//这里是同步转异步关键
//程序执行到这里,一直处于阻塞状态,直到POS返回
//这里还设置了一个超时时间,避免POS出现故障,导致调用一直在等待
done.await(timeout,TimeUnit.SECONDS);
if (!isDone())
{
throw new TimeOutException("超时("+timeout+"秒)");
}

} catch (InterruptedException e) {
log.error("write2pos InterruptedException: "+e.getMessage());
throw new PosConnException(e);
} catch (TimeOutException e) {
throw e;
} catch (PosConnException e) {
throw e;
} catch (TryLaterException e) {
throw e;
}
finally {
this.release();
lock.unlock();
}

long en = System.currentTimeMillis();

log.debug("{} 执行时间:{}",msg.toString(),(en-st));
//POS执行完成,正常返回
if (response instanceof MsgResponse)
{
return (I)response;
}

return null;
}

}

/**
* pos返回消息回调
* @Title: received
* @Description: TODO
* @param @param response
* @return void
* @throws
*/
public static void received(MsgResponse response) {
//用主键取回调用环境
PosResponseFuture<?> future = futures.remove(response.getId());

if (future != null) {
future.doReceived(response);
}

}

/**
* 检测返回值
* @Title: isDone
* @Description: TODO
* @param @return
* @return boolean
* @throws
*/
private boolean isDone() {
return this.response != null;//null代表超时
}

/**
* 接受到返回
* @Title: doReceived
* @Description: TODO
* @param @param response
* @return void
* @throws
*/
private void doReceived(MsgResponse response) {
lock.lock();//同步控制,线程安全
try {
this.response = response;
done.signal();//notify,通知线程往下执行
} finally {
lock.unlock();
}
}

/**
* 释放资源
* @Title: release
* @Description: TODO
* @param
* @return void
* @throws
*/
private void release()
{
PosResponseFuture<I> tmp = futures.remove(this.uuid);
if (tmp!=null)
{

log.debug("释放资源:{}",new Object[]{this.uuid,tmp.getProcessMsg()});
}
else
{
log.debug("释放资源:NULL!");
}
}

public static void main(String args[])
{
}

}

 



3、POS代理服务器暴露RPC调用接口关键代码

public class Client {
	
	//这个代码包含了rpc调用的核心
	@SuppressWarnings("unchecked")
	public <T> T getProxy(Class<T> interfaceClass,final  String host,final  int port) {
		return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
				new Class<?>[] { interfaceClass }, new InvocationHandler() {
					//其实就是一个AOP拦截
					public Object invoke(Object proxy, Method method,Object[] arguments) throws Throwable {
						
						Socket socket = null;
						ObjectOutputStream output = null;
						ObjectInputStream input = null;
						
						try
						{
							//把需要调用的类、方法和参数,序列化传输到RPC服务器
							//等待远端调用完成返回结果
							socket = new Socket(host, port);
							output = new ObjectOutputStream(socket.getOutputStream());
							output.writeUTF(method.getName());
							output.writeObject(method.getParameterTypes());
							output.writeObject(arguments);
							
							input = new ObjectInputStream(socket.getInputStream());
							
							return input.readObject();
						}
						catch(Exception e)
						{
							throw e;
						}
						finally
						{
							if (socket!=null)
							{
								socket.close();
							}
							if (output!=null)
							{
								output.close();
							}
							if (input!=null)
							{
								input.close();
							}
						}
						
					}
					
				});
	}


	public static void main(String args[])
	{
		HelloService helloService = new Client().getProxy(HelloService.class,"localhost",8080);
		
		long st = System.currentTimeMillis();
		for (int i=0; i<1; i++)
		{
			System.out.println(i+"> "+helloService.sayHello("哈哈"));
		}
		long en = System.currentTimeMillis();
		System.out.println("耗时:"+(en-st));
	}

}



public class Server {

	private int port = 8888;
	
	public void rpcServer()
		throws Exception
	{
		ServerSocket server = null;
		
		try
		{
			server = new ServerSocket(port);
			
			for(;;) 
			{
				final Socket socket = server.accept();
				System.out.println(socket.getRemoteSocketAddress());
				new Thread(new Runnable() {

					@Override
					public void run() {

						ObjectOutputStream output = null;
						ObjectInputStream input = null;

						try
						{
							input = new ObjectInputStream(socket.getInputStream());//接受rpc client请求
							String methodName = input.readUTF();//调用方法名
							Class<?>[] parameterTypes = (Class<?>[])input.readObject();
							Object[] arguments = (Object[])input.readObject();//调用参数

							output = new ObjectOutputStream(socket.getOutputStream());
							Method method =  new HelloServiceImp().getClass().getMethod(methodName, parameterTypes);
							Object result = method.invoke(new HelloServiceImp(), arguments);//执行调用
							output.writeObject(result);//回写结果
						}
						catch(Exception e)
						{
							e.printStackTrace();
						}
						finally
						{
							try
							{
								if (output!=null)
								{
									output.close();
								}
								if (input!=null)
								{
									input.close();
								}
							}
							catch(Exception e)
							{
							}
						}
						
					}
				
				
				}).start();
			}
		}
		catch(Exception e)
		{
			throw e;
		}
		finally
		{
			if (server!=null)
			{
				server.close();
			}
		}
		
	
	}

	
	public static void main(String args[]) throws Exception
	{
		new Server().rpcServer();
	}

	
	
}


public interface HelloService {
	public String sayHello(String input);
}

public class HelloServiceImp implements HelloService {

	@Override
	public String sayHello(String input) {
		return input + " wellcome.";
	}

}

 

分享到:
评论

相关推荐

    ASP+ajax+jQuery--顶-踩-无刷新【点赞】程式3.0 修正调试版

    ASP.NET、Ajax 和 jQuery 是构建动态网页应用的三个关键技术,它们在实现无刷新用户体验方面扮演着重要角色。本文将深入探讨这些技术如何协同工作,以及如何利用它们来创建一个"顶-踩-无刷新【点赞】"功能的程序。 ...

    RPC-client异步收发核心细节

    RPC允许程序像调用本地方法一样调用另一个地址空间的方法或函数。在RPC中,客户端和服务端之间的通信通常涉及到数据序列化、网络传输以及错误处理等多个环节。本文将重点讨论RPC-client在异步收发模式下的核心组件与...

    异步&同步加载树节点----zTree(一)

    同步加载是指浏览器在执行代码时,如果遇到一个耗时的操作(如加载大量数据),会暂停后续操作,直到这个操作完成。而异步加载则不同,它允许浏览器在进行耗时操作的同时继续处理其他任务,提高用户体验。 zTree...

    异步的 RPC

    **异步RPC(远程过程调用)...总结来说,异步RPC是现代分布式系统中提升性能和可扩展性的关键技术之一,它通过优化通信模式,实现了客户端和服务器之间的高效协作,为构建大规模、高并发的互联网应用提供了有力支撑。

    PyPI 官网下载 | awesome_rpc-0.1.0-py3-none-any.whl

    `awesome_rpc`这个名字暗示这可能是一个用于实现远程过程调用(RPC)的库。 描述中提到的"资源来自pypi官网,解压后可用。资源全名:awesome_rpc-0.1.0-py3-none-any.whl"告诉我们这个资源是`awesome_rpc`的特定...

    jrpc:C ++ 17中的JSON-RPC实现

    jrpc是一个异步多线程的RPC框架, 采用json格式的序列化/反序列化方案, 传输协议为. 框架的结构如下图所示: 位于框架底层, 向下调用Linux socket API, 向上提供消息回调. 此外,网络库还具有定时器, 线程池, 日志输出...

    Python库 | aiohttp_json_rpc-0.12.2-py3-none-any.whl

    `aiohttp_json_rpc` 是一个 Python 库,专门用于实现基于 JSON-RPC 的异步网络通信。JSON-RPC(JavaScript Object Notation Remote Procedure Call)是一种轻量级的远程过程调用协议,它使用 JSON 格式进行数据交换...

    从根上理解高性能、高并发(四):深入操作系统,彻底理解同步与异步-其它分享_专项技术区 - 即时通讯开发者社区!.pdf

    同步指的是程序执行时,所有任务都是顺序执行的,每个任务都需要等待前一个任务完成后才能继续执行。异步则是程序执行时,多个任务可以并发执行,每个任务都不需要等待其他任务完成。 知识点2:同步和异步在高并发...

    C#TCP/IP同步和异步通信

    例如,可能存在一个Server类用于启动监听,一个Client类用于建立连接,它们都有同步和异步版本的发送和接收数据的方法。 在实际应用中,同步通信适用于简单、低并发的场景,而异步通信适合高并发和实时性要求较高的...

    android-json-rpc

    总的来说,android-json-rpc是Android开发中的一个强大工具,它极大地简化了JSON-RPC通信的实现,使开发者能更专注于业务逻辑,而不是底层通信细节。通过合理使用这个库,可以构建高效、可靠的Android应用。

    json-rpc-java-talk-20050225.pdf

    - **简单应用分析:**演讲中可能展示了如何使用JSON-RPC-Java构建一个简单的Web应用实例,包括应用架构、客户端与服务器端交互的过程等。 - **高级主题:** - 异步操作:如何实现客户端与服务器之间的异步通信。 -...

    Apache Thrift.rar+RPC+微服务+异步通信+安全认证+服务发现等

    Thrift服务定义与实现 Thrift跨语言服务开发实践 Thrift编译器与代码生成 Thrift客户端与服务器端通信 Thrift异常处理与错误码 Thrift性能优化与调优 Thrift与RPC原理深入解析 Thrift在微服务架构中的应用 Thrift...

    proe二次开发同步异步两种模式的区别

    * 同步模式有多进程模式和动态链接库模式两个子模式,异步模式有简单异步模式和完全异步模式两个子模式。 在 PROE 二次开发中,选择合适的模式取决于具体的应用场景和开发需求。了解同步模式和异步模式的区别可以...

    《异步加载图片(一)》源码----仅实现异步加载 .

    这是博文《异步加载图片(一)》对应的源码,这段代码仅实现了异步加载,对于滑动时暂停,停划时加载的问题会在《异步加载图片(二)》中讲解,这篇文章的地址:...

    公路测量GPS控制网同步环和异步环闭合差计算

    在GPS控制网的设计与实施过程中,同步环和异步环的闭合差计算是非常关键的技术环节,它直接影响到整个控制网的质量和可靠性。 #### 二、GPS控制网基础概念 **GPS控制网**:利用全球定位系统(GPS)技术建立的空间...

    json-rpc-python

    总的来说,json-rpc-python库是Python开发者实现JSON-RPC通信的一个实用工具,它提供了一套简洁的API来构建服务器和客户端,便于在分布式系统中实现跨进程或跨网络的远程调用。在学习和使用这个库时,理解JSON-RPC...

    Python库 | lnd_rpc-0.7.1.post3-py3-none-any.whl

    1. **RPC(Remote Procedure Call)**:RPC是一种分布式计算技术,它允许程序在一个进程中调用另一个进程中的函数或方法,就像它们在同一进程中一样。在`lnd_rpc`中,RPC用于客户端和LND服务器之间的通信,使得...

    QT C++ http get、post 同步异步请求

    本项目中,我们关注的是GET和POST两种请求方式,以及它们的同步和异步实现。下面将详细介绍这两种请求方式以及同步与异步的区别。 1. **QT中的HTTP GET请求** GET请求是最常见的HTTP请求类型,用于从服务器获取资源...

    RPC的简单实现

    RPC(Remote Procedure Call)是一种计算机通信协议,允许一个程序在某个网络中的一个计算机上执行远程操作,就像它在本地执行一样。本篇文章将探讨RPC的基本原理、实现方式以及其在IT领域的应用。 **1. RPC的基本...

Global site tag (gtag.js) - Google Analytics