`

【原创】同步转异步+RPC的一个POS行业应用-关键技术实现

阅读更多



简单回顾下业务模型:收银台<=>POS代理服务器<=>POS机,三者之间进行通讯,POS代理作为一个报文转换和同步转异步角色。

下面介绍下几个关键技术实现:

1、报文

这里的报文,指的是POS代理服务器跟POS通讯之间约定的报文。根据POS交易类型,支付、刷卡、打印等,约定每个交易报文包含什么字段信息和长度,其中一个比较特别字段是UUID,这个字段是每个报文的关键字段,
具有唯一性,每个报文都不同,主要用来实现同步转异步中,POS返回数据给代理服务器后找回原来发送指令的channel,并最终把转换后的数据发送给收银台。

之所以要找到原来的channel,是因为同步转异步的过程中,channel是被临时保存起来的。


2、同步转异步关键代码

public class PosResponseFuture<I> {


private final Lock lock = new ReentrantLock();
private final Condition done = lock.newCondition();

private String uuid;//消息序列号

//psoresponse使用
private final static Map<String, PosResponseFuture> futures = new ConcurrentHashMap<String, PosResponseFuture>();
private final static Object synLock = new Object();

public I write2pos(boolean broadcastFlag,MsgRequest msg) throws PosConnException,TimeOutException,TryLaterException {


synchronized(synLock)
{
long st = System.currentTimeMillis();

lock.lock();
try {
this.uuid = msg.getId();
futures.put(this.uuid, this);//把当前调用环境保存起来

//向pos发送消息
log.debug("向POS发送消息:{}",msg);

PosIntContext.write2pos(msg);

int timeout = PosIntContext.getApiTimeout();
if (msg.getTimeout()!=-1)
{
timeout = msg.getTimeout();
log.debug("超时设置:{}",timeout);
}
//这里是同步转异步关键
//程序执行到这里,一直处于阻塞状态,直到POS返回
//这里还设置了一个超时时间,避免POS出现故障,导致调用一直在等待
done.await(timeout,TimeUnit.SECONDS);
if (!isDone())
{
throw new TimeOutException("超时("+timeout+"秒)");
}

} catch (InterruptedException e) {
log.error("write2pos InterruptedException: "+e.getMessage());
throw new PosConnException(e);
} catch (TimeOutException e) {
throw e;
} catch (PosConnException e) {
throw e;
} catch (TryLaterException e) {
throw e;
}
finally {
this.release();
lock.unlock();
}

long en = System.currentTimeMillis();

log.debug("{} 执行时间:{}",msg.toString(),(en-st));
//POS执行完成,正常返回
if (response instanceof MsgResponse)
{
return (I)response;
}

return null;
}

}

/**
* pos返回消息回调
* @Title: received
* @Description: TODO
* @param @param response
* @return void
* @throws
*/
public static void received(MsgResponse response) {
//用主键取回调用环境
PosResponseFuture<?> future = futures.remove(response.getId());

if (future != null) {
future.doReceived(response);
}

}

/**
* 检测返回值
* @Title: isDone
* @Description: TODO
* @param @return
* @return boolean
* @throws
*/
private boolean isDone() {
return this.response != null;//null代表超时
}

/**
* 接受到返回
* @Title: doReceived
* @Description: TODO
* @param @param response
* @return void
* @throws
*/
private void doReceived(MsgResponse response) {
lock.lock();//同步控制,线程安全
try {
this.response = response;
done.signal();//notify,通知线程往下执行
} finally {
lock.unlock();
}
}

/**
* 释放资源
* @Title: release
* @Description: TODO
* @param
* @return void
* @throws
*/
private void release()
{
PosResponseFuture<I> tmp = futures.remove(this.uuid);
if (tmp!=null)
{

log.debug("释放资源:{}",new Object[]{this.uuid,tmp.getProcessMsg()});
}
else
{
log.debug("释放资源:NULL!");
}
}

public static void main(String args[])
{
}

}

 



3、POS代理服务器暴露RPC调用接口关键代码

public class Client {
	
	//这个代码包含了rpc调用的核心
	@SuppressWarnings("unchecked")
	public <T> T getProxy(Class<T> interfaceClass,final  String host,final  int port) {
		return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
				new Class<?>[] { interfaceClass }, new InvocationHandler() {
					//其实就是一个AOP拦截
					public Object invoke(Object proxy, Method method,Object[] arguments) throws Throwable {
						
						Socket socket = null;
						ObjectOutputStream output = null;
						ObjectInputStream input = null;
						
						try
						{
							//把需要调用的类、方法和参数,序列化传输到RPC服务器
							//等待远端调用完成返回结果
							socket = new Socket(host, port);
							output = new ObjectOutputStream(socket.getOutputStream());
							output.writeUTF(method.getName());
							output.writeObject(method.getParameterTypes());
							output.writeObject(arguments);
							
							input = new ObjectInputStream(socket.getInputStream());
							
							return input.readObject();
						}
						catch(Exception e)
						{
							throw e;
						}
						finally
						{
							if (socket!=null)
							{
								socket.close();
							}
							if (output!=null)
							{
								output.close();
							}
							if (input!=null)
							{
								input.close();
							}
						}
						
					}
					
				});
	}


	public static void main(String args[])
	{
		HelloService helloService = new Client().getProxy(HelloService.class,"localhost",8080);
		
		long st = System.currentTimeMillis();
		for (int i=0; i<1; i++)
		{
			System.out.println(i+"> "+helloService.sayHello("哈哈"));
		}
		long en = System.currentTimeMillis();
		System.out.println("耗时:"+(en-st));
	}

}



public class Server {

	private int port = 8888;
	
	public void rpcServer()
		throws Exception
	{
		ServerSocket server = null;
		
		try
		{
			server = new ServerSocket(port);
			
			for(;;) 
			{
				final Socket socket = server.accept();
				System.out.println(socket.getRemoteSocketAddress());
				new Thread(new Runnable() {

					@Override
					public void run() {

						ObjectOutputStream output = null;
						ObjectInputStream input = null;

						try
						{
							input = new ObjectInputStream(socket.getInputStream());//接受rpc client请求
							String methodName = input.readUTF();//调用方法名
							Class<?>[] parameterTypes = (Class<?>[])input.readObject();
							Object[] arguments = (Object[])input.readObject();//调用参数

							output = new ObjectOutputStream(socket.getOutputStream());
							Method method =  new HelloServiceImp().getClass().getMethod(methodName, parameterTypes);
							Object result = method.invoke(new HelloServiceImp(), arguments);//执行调用
							output.writeObject(result);//回写结果
						}
						catch(Exception e)
						{
							e.printStackTrace();
						}
						finally
						{
							try
							{
								if (output!=null)
								{
									output.close();
								}
								if (input!=null)
								{
									input.close();
								}
							}
							catch(Exception e)
							{
							}
						}
						
					}
				
				
				}).start();
			}
		}
		catch(Exception e)
		{
			throw e;
		}
		finally
		{
			if (server!=null)
			{
				server.close();
			}
		}
		
	
	}

	
	public static void main(String args[]) throws Exception
	{
		new Server().rpcServer();
	}

	
	
}


public interface HelloService {
	public String sayHello(String input);
}

public class HelloServiceImp implements HelloService {

	@Override
	public String sayHello(String input) {
		return input + " wellcome.";
	}

}

 

分享到:
评论

相关推荐

    Dubbo视频教程--高级篇--第24节--简易版支付系统介绍

    ### Dubbo视频教程--高级篇--第...本教程通过具体的支付系统实例,详细讲解了在实际应用中如何使用Dubbo框架进行分布式系统开发,以及如何利用分布式文件系统、消息队列和缓存等关键技术点来支持系统的高性能和稳定性。

    基于Dubbo的分布式系统架构介绍

    总结来说,基于Dubbo的分布式系统架构是一个复杂的工程,它涉及到大量的技术选型、系统规划、服务治理和高可用保障。通过本文的介绍,我们可以了解到构建一个高性能、高可用的第三方支付系统所需的理论知识和实践...

    嵌入式八股文面试题库资料知识宝典-深圳禾苗通信科技有限公司.zip

    嵌入式八股文面试题库资料知识宝典-深圳禾苗通信科技有限公司.zip

    Arduino UART实验例程【正点原子EPS32S3】

    Arduino UART实验例程,开发板:正点原子EPS32S3,本人主页有详细实验说明可供参考。

    电力弹簧技术在主动配电网规划与运行优化调度中的应用研究

    内容概要:本文详细探讨了电力弹簧技术在主动配电网规划及运行优化调度中的应用。首先介绍了电力弹簧技术作为智能电网调控手段的优势,如自适应性强、响应速度快、节能环保等。接着阐述了主动配电网规划的目标和策略,包括优化电网结构、提高能源利用效率和降低故障风险。随后讨论了运行优化调度的原则和方法,强调了实时监测、智能调度策略以及优化调度模型的重要性。最后通过实际案例分析展示了电力弹簧技术在提升电网稳定性、可靠性和能效方面的显著效果,展望了其广阔的应用前景。 适合人群:从事电力系统规划、运行管理的研究人员和技术人员,以及对智能电网感兴趣的学者和学生。 使用场景及目标:适用于希望深入了解电力弹簧技术及其在主动配电网规划和运行优化调度中具体应用的专业人士。目标是掌握电力弹簧技术的工作原理、优势及其在实际项目中的实施方法。 其他说明:本文不仅提供了理论分析,还有具体的案例支持,有助于读者全面理解电力弹簧技术的实际应用价值。

    honor_1.145_testgray20250427.apk

    honor_1.145_testgray20250427.apk

    嵌入式八股文面试题库资料知识宝典-【开发】嵌入式开源项目&库&资料.zip

    嵌入式八股文面试题库资料知识宝典-【开发】嵌入式开源项目&库&资料.zip

    鸿蒙生态HarmonyOS:万物互联时代的操作系统革新与发展路径

    内容概要:本文详细介绍了华为推出的面向全场景的分布式操作系统HarmonyOS。HarmonyOS旨在打破设备间的壁垒,实现万物互联,通过分布式软总线和分布式任务调度等核心技术,让不同设备协同工作,如手机、平板、智能家居等设备间无缝流转任务。其应用生态涵盖教育、金融、出行等多个领域,华为通过资金、技术支持和流量扶持吸引开发者,推动生态繁荣。HarmonyOS从2019年首次发布至今,经历了多个版本迭代,性能和安全性不断提升,用户体验更加智能便捷。尽管面临应用生态丰富度不足、市场竞争压力等挑战,华为通过优化开发工具、加强市场推广等策略积极应对。未来,HarmonyOS将在分布式技术、AI融合和隐私安全等方面持续创新,并在智能家居、车联网、工业互联网等领域拓展生态。 适合人群:对操作系统技术感兴趣的专业人士、开发者、科技爱好者。 使用场景及目标:①了解HarmonyOS的技术架构和分布式技术的特点;②探讨HarmonyOS在智能家居、车联网等领域的应用前景;③评估HarmonyOS对现有操作系统市场的潜在影响。 阅读建议:HarmonyOS作为一款面向全场景的操作系统,不仅涉及技术实现,还包括生态建设和用户体验。因此,在阅读过程中,应重点关注其技术优势、应用场景及未来发展潜力,结合自身需求思考其在实际生活和工作中的应用价值。

    少儿编程scratch项目源代码文件案例素材-简单杀戮.zip

    少儿编程scratch项目源代码文件案例素材-简单杀戮.zip

    基于阻抗控制和工艺优化的机器人磨抛技术研究.pdf

    基于阻抗控制和工艺优化的机器人磨抛技术研究.pdf

    少儿编程scratch项目源代码文件案例素材-扛住别被压.zip

    少儿编程scratch项目源代码文件案例素材-扛住别被压.zip

    【操作系统领域】HarmonyOS架构解析:分布式设计与全场景智能应用的创新实践

    内容概要:本文详细介绍了华为自主研发的面向全场景的分布式操作系统——HarmonyOS的架构设计及其在智能家居、智能穿戴、智慧出行等领域的应用。HarmonyOS采用分层架构,包括内核层、系统服务层、框架层和应用层,各层分工明确,协同工作,为用户提供稳定、高效、智能的操作系统。其核心特性包括分布式架构、微内核设计、组件化开发和一次开发多端部署,这些特性使得不同设备能够实现互联互通和资源共享,为用户带来无缝的全场景智能体验。此外,文章还探讨了HarmonyOS面临的生态建设和兼容性挑战,以及未来的发展前景和技术创新方向。 适合人群:对操作系统架构感兴趣的科技爱好者、智能设备开发者及相关行业从业者。 使用场景及目标:①了解HarmonyOS架构设计及其在智能家居、智能穿戴、智慧出行等领域的具体应用;②掌握HarmonyOS的核心特性,如分布式架构、微内核设计、组件化开发和一次开发多端部署;③探讨HarmonyOS面临的挑战及其未来发展方向。 其他说明:HarmonyOS的出现不仅为华为在智能设备领域的发展提供了有力支撑,也为整个行业的创新发展注入了新的活力。作为科技爱好者和关注者,我们应持续关注HarmonyOS的发展,共同见证它在智能设备领域创造更多的辉煌。

    嵌入式八股文面试题库资料知识宝典-linux驱动开发.zip

    嵌入式八股文面试题库资料知识宝典-linux驱动开发.zip

    开关磁阻电机技术参数与建模技术深度解析:4kW电机性能详述

    内容概要:本文深入探讨了一款额定功率为4kW的开关磁阻电机,详细介绍了其性能参数如额定功率、转速、效率、输出转矩和脉动率等。同时,文章还展示了利用RMxprt、Maxwell 2D和3D模型对该电机进行仿真的方法和技术,通过外电路分析进一步研究其电气性能和动态响应特性。最后,文章提供了基于RMxprt模型的MATLAB仿真代码示例,帮助读者理解电机的工作原理及其性能特点。 适合人群:从事电机设计、工业自动化领域的工程师和技术人员,尤其是对开关磁阻电机感兴趣的科研工作者。 使用场景及目标:适用于希望深入了解开关磁阻电机特性和建模技术的研究人员,在新产品开发或现有产品改进时作为参考资料。 其他说明:文中提供的代码示例仅用于演示目的,实际操作时需根据所用软件的具体情况进行适当修改。

    嵌入式八股文面试题库资料知识宝典-新岸线.zip

    嵌入式八股文面试题库资料知识宝典-新岸线.zip

    基于支持向 量机和余弦相似度的故障诊断方法.pdf

    基于支持向 量机和余弦相似度的故障诊断方法.pdf

    Objective-C+ARKit实现图片识别、平面捕捉、人脸识别+源码(毕业设计&课程设计&项目开发)

    Objective-C+ARKit实现图片识别、平面捕捉、人脸识别+源码,适合毕业设计、课程设计、项目开发。项目源码已经过严格测试,可以放心参考并在此基础上延申使用 ARKit实现图片识别、平面捕捉、人脸识别 ARKit需要ios11 以及 A11处理器或更高版本设备支持 Objective-C+ARKit实现图片识别、平面捕捉、人脸识别+源码,适合毕业设计、课程设计、项目开发。项目源码已经过严格测试,可以放心参考并在此基础上延申使用 ARKit实现图片识别、平面捕捉、人脸识别 ARKit需要ios11 以及 A11处理器或更高版本设备支持~ Objective-C+ARKit实现图片识别、平面捕捉、人脸识别+源码,适合毕业设计、课程设计、项目开发。项目源码已经过严格测试,可以放心参考并在此基础上延申使用 ARKit实现图片识别、平面捕捉、人脸识别 ARKit需要ios11 以及 A11处理器或更高版本设备支持

    少儿编程scratch项目源代码文件案例素材-火柴人大战 中世纪战争.zip

    少儿编程scratch项目源代码文件案例素材-火柴人大战 中世纪战争.zip

    嵌入式八股文面试题库资料知识宝典-并行科技笔试题.zip

    嵌入式八股文面试题库资料知识宝典-并行科技笔试题.zip

    嵌入式八股文面试题库资料知识宝典-进程线程.zip

    嵌入式八股文面试题库资料知识宝典-进程线程.zip

Global site tag (gtag.js) - Google Analytics