`
badqiu
  • 浏览: 673924 次
  • 性别: Icon_minigender_1
  • 来自: 珠海
社区版块
存档分类
最新评论

分布式应用上下文(Distributed ThreadLocal)

阅读更多

1.问题

单机应用内,在进程内部,我们可以使用ThreadLocal传递应用上下文的方式. 当前的 Spring Secrucity , Spring TransactionManager,  Log4J MDC, Struts2 ActionContext等等应用场景随处可见.

 

但在是分布式系统下,由于不是在同一个进程内,所以无法使用ThreadLocal. 那么什么是分布式ThreadLocal呢?就是将一个系统中的ThreadLocal信息可以传递至下一个系统,将两者的调用可以关联起来。如对应用有一个调用,我们生成一个请求ID (traceId),在后面所有分布式系统调用中,可以通过这个traceId将所有调用关联起来,这样查找调用日志都将十分方便.

 

2.实现方式

我们现在使用的通讯协议,一般都包含两部分:Header,Body. 如 Soap Header,Http Header. 通过自定义Header,可以带上我们的自定义信息。 然后在服务器端解析Header,再得到自定义信息。那么就可以完成Distributed ThreadLocal的功能。

 

 

 

如上图,通过两个拦截器,client在调用之前,将DistrbiutedThreadLocal中的信息放在soap header中,在服务端方法调用之前,从soap header中取回 DistrbiutedThreadLocal信息。

 

 

3. 实现代码.

以下为CXF webservice的实现代码,一个DistributedThreadLocal及增加了两个拦截器. hessian 也可以自定义Header,完成传递.

 

DistributedThreadLocal

 

/**
 * 分布式 ThreadLocal, 存放在ThreadLocal中的数据可以传输至另外一台机器上
 * @author badqiu
 */
public class DistributedThreadLocal {
	public static String DISTRIBUTED_THREAD_LOCAL_KEY_PREFIX = "tl_";
	
	public static ThreadLocal<Map<String, String>> threadLocal = new ThreadLocal<Map<String, String>>();

	public static void putAll(Map<String, String> map) {
		getMap().putAll(map);
	}
	
	public static void put(String key, String value) {
		getMap().put(key, value);
	}

	public static String get(String key) {
		Map<String, String> map = threadLocal.get();
		if (map == null)
			return null;
		return (String) map.get(key);
	}

	public static Map<String, String> getMap() {
		Map<String, String> map = threadLocal.get();
		if (map == null) {
			map = new HashMap();
			threadLocal.set(map);
		}
		return map;
	}

	public static void clear() {
		threadLocal.set(null);
	}

}

 

DistributedThreadLocalInSOAPHeaderInterceptor

 

/**
 * 输入(In)拦截器,用于从 WebService SOAP 的Header中取回DistributedThreadLocal中的信息,并存放在DistributedThreadLocal中
 * 
 * @author badqiu
 */
public class DistributedThreadLocalInSOAPHeaderInterceptor extends AbstractSoapInterceptor {
	
    private SAAJInInterceptor saajIn = new SAAJInInterceptor();  
    
    public DistributedThreadLocalInSOAPHeaderInterceptor() {  
        super(Phase.PRE_PROTOCOL);  
        getAfter().add(SAAJInInterceptor.class.getName());  
    }  

	public void handleMessage(SoapMessage message) throws Fault {
		SOAPMessage doc = message.getContent(SOAPMessage.class);  
        if (doc == null) {  
            saajIn.handleMessage(message);  
            doc = message.getContent(SOAPMessage.class);  
        }  
        
        Map<String,String> headers = toHeadersMap(doc);  
		DistributedThreadLocal.putAll(headers);
		
	}

	private Map toHeadersMap(SOAPMessage doc) {
		SOAPHeader header = getSOAPHeader(doc);  
        if (header == null) {  
            return new HashMap(0);  
        } 
        
        Map<String,String> headersMap = new HashMap();
        NodeList nodes = header.getChildNodes();
        for(int i=0; i<nodes.getLength(); i++) {  
        	Node item = nodes.item(i);
        	if(item.hasChildNodes()) {
        		headersMap.put(item.getLocalName(), item.getFirstChild().getNodeValue());
        	}
        }
        return headersMap;
	}

	private SOAPHeader getSOAPHeader(SOAPMessage doc) {
		SOAPHeader header;
		try {
			header = doc.getSOAPHeader();
		} catch (SOAPException e) {
			throw new RuntimeException(e);
		}
		return header;
	}

}
 

DistributedThreadLocalOutSOAPHeaderInterceptor

 

/**
 * 输出(Out)拦截器,用于将DistributedThreadLocal中的信息存放在 WebService SOAP 的Header中
 * 
 * @author badqiu
 */
public class DistributedThreadLocalOutSOAPHeaderInterceptor extends AbstractSoapInterceptor {
	
	public DistributedThreadLocalOutSOAPHeaderInterceptor() {
		super(Phase.WRITE);
	}

	public void handleMessage(SoapMessage message) throws Fault {
		
		List<Header> headers = message.getHeaders();
		Map<String,String> threadlocalMap = DistributedThreadLocal.getMap();
		
		for(Map.Entry<String, String> entry : threadlocalMap.entrySet()) {
			headers.add(getHeader(entry.getKey(), entry.getValue()));
		}
	}

	private Header getHeader(String key, String value) {
		QName qName = new QName(key);
		Document document = DOMUtils.createDocument();
		Element element = document.createElement(key);
		element.appendChild(document.createTextNode(value));
		SoapHeader header = new SoapHeader(qName, element);
		return (header);
	}
}
 

CXF spring配置文件:

 

server端: 

 

<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:jaxws="http://cxf.apache.org/jaxws" xmlns:cxf="http://cxf.apache.org/core"
	xsi:schemaLocation="http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd http://cxf.apache.org/core http://cxf.apache.org/schemas/core.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"
	default-lazy-init="true">

	<description>Apache CXF的Web Service配置</description>

	<import resource="classpath:META-INF/cxf/cxf.xml" />
	<import resource="classpath:META-INF/cxf/cxf-servlet.xml" />
	<import resource="classpath:META-INF/cxf/cxf-extension-soap.xml" />

	<!-- jax-ws endpoint定义  -->
	<jaxws:endpoint address="/hello" >
		<jaxws:implementor ref="hello" />
		<jaxws:inInterceptors>
			<bean class="cn.org.rapid_framework.distributed.threadlocal.cfx.TraceIdInSOAPHeaderInterceptor"/>
		</jaxws:inInterceptors>
	</jaxws:endpoint>
	
	<!-- WebService的实现Bean定义 -->
	<bean id="hello" class="cn.org.rapid_framework.hessian.HessianTest.HelloImpl" />
</beans>

 

client端:

 

<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:jaxws="http://cxf.apache.org/jaxws" xmlns:cxf="http://cxf.apache.org/core"
	xsi:schemaLocation="http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd http://cxf.apache.org/core http://cxf.apache.org/schemas/core.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"
	default-lazy-init="true">
	<description>Apache CXF Web Service Client端配置</description>

	<jaxws:client id="hello" serviceClass="cn.org.rapid_framework.hessian.HessianTest.Hello"
		address="http://localhost:8080/service/hello" >
		<jaxws:outInterceptors>
			<bean class="cn.org.rapid_framework.distributed.threadlocal.cfx.TraceIdOutSOAPHeaderInterceptor"/>
		</jaxws:outInterceptors>
	</jaxws:client>

</beans>
 

4. 应用场景.

通过分布式应用上下文,暂时想到的几个应用场景.

 

1. Log4j MDC traceId传递.  通过一个traceId,将所有相关的 操作所有的日志信息关联起来。

2. sessionId 传递, 让我们的应用也有状态,可以使用session什么的

3. Security(username,password)传递. 在需要安全调用的地方,避免污染接口,需要显式的在接口传递username,password. 相对应的 WSSecurity也可以走这个通道

 

 

 

 

 

 

分布式应用上下文的概念,全球首创,欢迎转载(因为google 搜索不到相关文章,或许早已经有相同的概念了,欢迎提醒我)。

 

 

 

 

 

4
3
分享到:
评论
1 楼 grady 2011-01-05  
很好的思路

相关推荐

    Go-分布式定时任务库distributed-cron

    分布式定时任务库 `distributed-cron` 是一个专为 Go 语言设计的高级工具,用于构建可扩展、可靠的分布式系统中的定时任务。它充分利用了 Go 的并发特性,提供了一种高效的方式来管理和执行周期性的任务,同时支持在...

    分布式应用开发技术概述

    DCOM 主要用于 Windows 平台上的分布式应用开发。与 CORBA 不同的是,DCOM 仅支持 Windows 平台,并且在某些方面与 OMG 的标准不兼容。尽管如此,Microsoft 依然在不断地完善和发展 DCOM,并尝试与其他标准如 CORBA ...

    分布式 Java文档(Distributed Java).pdf

    分布式 Java(Distributed Java)

    Java 分布式应用程序设计代码

    13. **CDI(Contexts and Dependency Injection)上下文和依赖注入**:Java EE的一部分,CDI帮助管理对象的生命周期和依赖关系,简化了分布式系统中的组件装配。 以上知识点涵盖了Java分布式应用程序设计的核心概念...

    Visual Basic6.0分布式应用程序开发

    《Visual Basic6.0分布式应用程序开发》是一本深入探讨如何使用Visual Basic6.0进行分布式系统构建的专业教程。分布式应用程序是指由多个独立组件通过网络进行通信和协作来完成任务的软件系统。在VB6.0中,开发者...

    快速分布式数据挖掘 fast distributed data mining

    本项目"快速分布式数据挖掘(fast distributed data mining)"是基于MPI(Message Passing Interface)在Linux平台上实现的C++程序,旨在解决海量数据集上的挖掘任务。 MPI是一种用于分布式内存系统间进程间通信的...

    分布式事务演示-distributed-transaction-demo.zip

    分布式事务的基本概念是在一个事务中操作多个资源,这些资源可能分布在不同的服务器或网络节点上。在传统的单体应用中,我们可以依赖ACID(原子性、一致性、隔离性和持久性)特性来确保事务的正确执行,但在分布式...

    delphi7组件与分布式应用开发

    《Delphi7组件与分布式应用开发》是一本深入探讨Delphi 7环境下组件使用和分布式应用程序构建的专业书籍。Delphi 7是Borland公司(现Embarcadero Technologies)在2002年推出的一款强大的Windows应用开发工具,以其...

    分布式哈希表(Distributed Hash Table DHT)1

    分布式哈希表(DHT,Distributed Hash Table)是一种用于分布式环境的数据存储技术,它将数据分布在网络中的多个节点上,以实现高效、可扩展的数据管理和检索。DHT的设计目标是提供一种全局一致性的哈希函数,使得...

    云计算分布式应用(百度,新浪云技术应用)

    本篇文章将深入探讨“云计算分布式应用”,特别关注百度和新浪等大型企业在云技术上的应用实践。 首先,我们来理解什么是云计算。云计算是一种通过互联网提供计算资源和服务的模式,用户无需直接管理或拥有硬件基础...

    Visual Basic 6.0 分布式应用程序开发

    分布式应用程序是指运行在多个独立的计算设备上,通过网络进行通信和数据交换的程序。这些设备可以是服务器、工作站或移动设备,它们共享资源,协同完成一个或多个任务。分布式应用的主要优点包括负载均衡、高可用性...

    Java 分布式应用程序设计

    Java 分布式应用程序设计是构建大型、可扩展和高可用性系统的关键技术。在现代企业级应用中,Java被广泛用于构建分布式系统,利用网络中的多台计算机协同工作,实现高性能、高并发处理能力。本篇文章将深入探讨Java...

    Delphi7组件及分布式应用开发

    《Delphi7组件及分布式应用开发》这一主题深入探讨了Delphi7环境下组件设计与分布式应用的构建,是IT行业尤其是软件开发领域中一个极为重要的知识点。Delphi作为一款功能强大的快速应用程序开发工具,自发布以来便受...

    java 分布式应用程序设计.rar

    Java分布式应用程序设计是Java开发中的一个重要领域,它涉及到多个计算机节点通过网络协同工作来完成复杂的任务。在Java中实现分布式应用程序通常需要理解并掌握一系列关键技术和概念,包括但不限于网络编程、远程...

    SQL SERVER 2000分布式应用.pdf

    SQL Server 2000是微软公司推出的一款关系数据库管理系统,它是SQL Server系列的一个版本,适用于建立和管理数据仓库、数据挖掘以及分布式应用程序等多种场景。分布式应用在SQL Server 2000中扮演着重要的角色,它...

    阿里云 专有云企业版 V3.8.1 企业级分布式应用服务 用户指南 20200330

    这份用户指南提供了企业级分布式应用服务的使用指南,包括法律声明、通用约定、 Enterprise Distributed Application Service(EDAS)介绍、快速入门指南、登录 EDAS 控制台等内容。 法律声明部分介绍了用户在阅读...

    阿里云分布式应用服务EDAS-产品说明.pdf

    阿里云企业级分布式应用服务(Enterprise Distributed Application Service,简称 EDAS)是一款全面解决企业在云上构建、部署和管理分布式应用的服务平台。它集成了阿里巴巴集团多年在大规模分布式系统上的技术积累...

    网络技术-网络基础-基于COM和CORBA技术的分布式应用.pdf

    分布式对象技术已经成为构建服务应用框架和软件组件的核心,并在大规模分布式应用系统开发中展现出强大的生命力。 目前,有三种典型的主流分布式技术:COM(Component Object Model)、DCOM(Distributed Component...

    阿里云 专有云Enterprise版 企业级分布式应用服务EDAS V3.1.0 用户指南 20171129.pdf

    EDAS(Enterprise Distributed Application Service)是阿里云提供的一种企业级分布式应用服务,旨在帮助用户快速构建和部署分布式应用程序。EDAS提供了多种功能,例如网络管理、安全管理、监控管理等,以帮助用户更...

Global site tag (gtag.js) - Google Analytics