`
wyuxiao729
  • 浏览: 34439 次
  • 性别: Icon_minigender_1
  • 来自: 成都
社区版块
存档分类
最新评论

osgi实战项目(osmp)一步一步玩转osgi之服务发现与服务路由(5)

    博客分类:
  • osgi
阅读更多

这一节里主要讲解osmp的服务发现与路由。osmp通过osmp-http组件对外发布了一个cxf的restful服务,所有的请求都通过这个restful这个接口接受请求并解析请求后再调用osgi的服务完成请求后返回到前端。

 

request->osmp-http的restful接口->解析请求->osgi服务发现->服务路由->调用服务->返回-->组装返回参数->返回

 

osmp通过osmp-service组件来对服务进行统一管理,主要功能包括服务的监听、bundle的监听、服务容器管理、服务注册到zookeeper等功能。

 

在osgi里可以通过ServiceTracker 服务跟踪器来跟踪某一类接口服务的新增、修改、删除等操作,通过BundlerContext.addBundleListener()注册 bundle监听器(BundleListener)来监听bundle各生命周期的事件,比如bundle的安装、卸载、更新、启动、停止等事件。

 

osmp为了便于对服务的统一管理,要求所有的服务都必须实现 osmp-intf-define里的定义的BaseDataService接口。这样我们就可以通过服务跟踪器跟踪到BaseDataService接口的服务新增、修改、删除等事件!

具体代码可以参照osmp-service里 ServiceWatcher.java类

 

/*   
 * Project: OSMP
 * FileName: ServiceWatcher.java
 * version: V1.0
 */
package com.osmp.service.watch;

import java.util.Date;
import java.util.UUID;

import org.osgi.framework.BundleContext;
import org.osgi.framework.BundleEvent;
import org.osgi.framework.BundleListener;
import org.osgi.framework.ServiceReference;
import org.osgi.util.tracker.ServiceTracker;
import org.osgi.util.tracker.ServiceTrackerCustomizer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.osgi.context.BundleContextAware;
import org.springframework.util.Assert;

import com.osmp.intf.define.config.FrameConst;
import com.osmp.intf.define.interceptor.ServiceInterceptor;
import com.osmp.intf.define.service.BaseDataService;
import com.osmp.intf.define.service.ZookeeperService;
import com.osmp.service.bean.DataServiceInfo;
import com.osmp.service.bean.InterceptorInfo;
import com.osmp.service.factory.ServiceFactoryImpl;
import com.osmp.service.manager.ServiceStateManager;
import com.osmp.service.registration.ServiceContainer;
import com.osmp.service.util.ServiceUtil;
import com.osmp.utils.net.RequestInfoHelper;

/**
 * 
 * Description:服务注册、注销、监听
 * @author: wangkaiping
 * @date: 2016年8月9日 上午10:27:15上午10:51:30
 */
public class ServiceWatcher implements BundleContextAware, InitializingBean, DisposableBean {
	private Logger logger = LoggerFactory.getLogger(ServiceWatcher.class);

	private ServiceTracker dataServiceTracker;
	private ServiceTracker serviceInterceptorTracker;

	private ServiceStateManager serviceStateManager;
	private ServiceFactoryImpl serviceFactoryImpl;
	private BundleContext context;
	private ZookeeperService zookeeper;
	private final static String NODE_CHANGE = "/osmp/nodechange";

	@Override
	public void setBundleContext(BundleContext context) {
		this.context = context;
	}

	public void setServiceFactoryImpl(ServiceFactoryImpl serviceFactoryImpl) {
		this.serviceFactoryImpl = serviceFactoryImpl;
	}

	public void setServiceStateManager(ServiceStateManager serviceStateManager) {
		this.serviceStateManager = serviceStateManager;
	}

	public void setZookeeper(ZookeeperService zookeeper) {
		this.zookeeper = zookeeper;
	}

	@Override
	public void destroy() throws Exception {
		if (dataServiceTracker != null) {
			dataServiceTracker.close();
		}
		if (serviceInterceptorTracker != null) {
			serviceInterceptorTracker.close();
		}

		logger.info("服务监听结束");
	}

	@Override
	public void afterPropertiesSet() throws Exception {
		Assert.notNull(context);
		Assert.notNull(serviceStateManager);
		Assert.notNull(serviceFactoryImpl);
		dataServiceTracker = new ServiceTracker(context, BaseDataService.class.getName(),
				new DataServiceTrackerCustomizer());
		serviceInterceptorTracker = new ServiceTracker(context, ServiceInterceptor.class.getName(),
				new ServiceInterceptorTrackerCustomizer());

		dataServiceTracker.open(true);
		serviceInterceptorTracker.open(true);

		context.addBundleListener(new BundleListener() {

			@Override
			public void bundleChanged(BundleEvent event) {
				if (event.getType() == BundleEvent.UNINSTALLED) {
					String name = event.getBundle().getSymbolicName();
					try {
						logger.info("uninstall bundle " + name);
						zookeeper.deleteNodeByBundle(name);
						ServiceWatcher.this.nodeUpdate();
					} catch (Exception e) {
						logger.error(
								"zookeeper delete service by bundle, bundle name : "
										+ name, e);
					}
				}
			}
		});

		logger.info("服务监听启动");
	}

	// dataService监听
	private class DataServiceTrackerCustomizer implements ServiceTrackerCustomizer {
		@Override
		public Object addingService(ServiceReference reference) {
			BaseDataService bsService = (BaseDataService) context.getService(reference);
			String bundleName = reference.getBundle().getSymbolicName();
			String bundleVersion = reference.getBundle().getVersion().toString();
			Object name = reference.getProperty(FrameConst.SERVICE_NAME);
			if (name == null || "".equals(name)) {
				logger.error("组件" + bundleName + "(" + bundleVersion
						+ ") dataService服务name未设置");
				return bsService;
			}
			Object mark = reference.getProperty(FrameConst.SERVICE_MARK);
			ServiceContainer.getInstance().putDataService(bundleName, bundleVersion, name.toString(), bsService);
			DataServiceInfo info = new DataServiceInfo();
			info.setBundle(bundleName);
			info.setVersion(bundleVersion);
			info.setName(name.toString());
			info.setState(1);
			info.setUpdateTime(new Date());
			info.setMark(mark == null ? "" : mark.toString());
			serviceStateManager.updateDataService(info);
			String path = ZookeeperService.ROOT_PATH
					+ ZookeeperService.SERVICE + "/"
					+ RequestInfoHelper.getLocalIp() + "/";
			logger.debug("register service path: " + path + " bundle : " + bundleName + " to zookeeper ");
			ServiceWatcher.this.registerService(path, info);
			return bsService;
		}

		@Override
		public void modifiedService(ServiceReference reference, Object service) {
		}

		@Override
		public void removedService(ServiceReference reference, Object service) {
			String bundleName = reference.getBundle().getSymbolicName();
			String bundleVersion = reference.getBundle().getVersion()
					.toString();
			Object name = reference.getProperty(FrameConst.SERVICE_NAME);
			if (name == null || "".equals(name)) {
				logger.error("组件" + bundleName + "(" + bundleVersion
						+ ") dataService服务name未设置");
				return;
			}
			ServiceContainer.getInstance().removeDataService(bundleName, bundleVersion, name.toString());
			Object mark = reference.getProperty(FrameConst.SERVICE_MARK);
			DataServiceInfo info = new DataServiceInfo();
			info.setBundle(bundleName);
			info.setVersion(bundleVersion);
			info.setName(name.toString());
			info.setState(0);
			info.setMark(mark == null ? "" : mark.toString());
			info.setUpdateTime(new Date());
			serviceStateManager.updateDataService(info);
			System.out.println("===============remove service bundleName:"
					+ bundleName + " name: " + name.toString() + " mark: "
					+ mark.toString());
			String path = ZookeeperService.ROOT_PATH
					+ ZookeeperService.SERVICE + "/"
					+ RequestInfoHelper.getLocalIp() + "/";
			ServiceWatcher.this.unRegisterService(path, info);
		}

	}

	// ServiceInterceptor监听
	private class ServiceInterceptorTrackerCustomizer implements ServiceTrackerCustomizer {
		@Override
		public Object addingService(ServiceReference reference) {
			ServiceInterceptor sicpt = (ServiceInterceptor) context.getService(reference);
			String bundleName = reference.getBundle().getSymbolicName();
			String bundleVersion = reference.getBundle().getVersion().toString();
			Object name = reference.getProperty(FrameConst.SERVICE_NAME);
			if (name == null || "".equals(name)) {
				logger.error("组件" + bundleName + "(" + bundleVersion
						+ ") serviceInterceptor服务name未设置");
				return sicpt;
			}
			Object mark = reference.getProperty(FrameConst.SERVICE_MARK);
			ServiceContainer.getInstance().putInterceptor(
					ServiceUtil.generateServiceName(bundleName, bundleVersion, name.toString()), sicpt);
			InterceptorInfo info = new InterceptorInfo();
			info.setBundle(bundleName);
			info.setVersion(bundleVersion);
			info.setName(name.toString());
			info.setState(1);
			info.setUpdateTime(new Date());
			info.setMark(mark == null ? "" : mark.toString());
			serviceStateManager.updateServiceInterceptor(info);
			return sicpt;
		}

		@Override
		public void modifiedService(ServiceReference reference, Object service) {
		}

		@Override
		public void removedService(ServiceReference reference, Object service) {
			String bundleName = reference.getBundle().getSymbolicName();
			String bundleVersion = reference.getBundle().getVersion().toString();
			Object name = reference.getProperty(FrameConst.SERVICE_NAME);
			if (name == null || "".equals(name)) {
				logger.error("组件" + bundleName + "(" + bundleVersion
						+ ") serviceInterceptor服务name未设置");
				return;
			}
			Object mark = reference.getProperty(FrameConst.SERVICE_MARK);
			ServiceContainer.getInstance().removeInterceptor(
					ServiceUtil.generateServiceName(bundleName, bundleVersion, name.toString()));
			InterceptorInfo info = new InterceptorInfo();
			info.setBundle(bundleName);
			info.setVersion(bundleVersion);
			info.setName(name.toString());
			info.setState(0);
			info.setMark(mark == null ? "" : mark.toString());
			info.setUpdateTime(new Date());
			serviceStateManager.updateServiceInterceptor(info);
		}

	}

	/**
	 * 向zookeeper注册服务
	 * 
	 * @param path
	 * @param ds
	 */
	public void registerService(String path, DataServiceInfo ds) {
		String bundle = ds.getBundle();
		String cname = ds.getMark();
		String name = ds.getName();
		String version = ds.getVersion();
		String status = String.valueOf(ds.getState());
		try {
			if (!zookeeper.exists(path + name)) {
				zookeeper.createNode(path + name);
			}
			if (!zookeeper.exists(path + name + "/bundle")) {
				zookeeper.createNode(path + name + "/bundle", bundle);
			} else {
				zookeeper.setNodeData(path + name + "/bundle", bundle);
			}
			if (!zookeeper.exists(path + name + "/cname")) {
				zookeeper.createNode(path + name + "/cname", cname);
			} else {
				zookeeper.setNodeData(path + name + "/cname", cname);
			}
			if (!zookeeper.exists(path + name + "/version")) {
				zookeeper.createNode(path + name + "/version", version);
			} else {
				zookeeper.setNodeData(path + name + "/version", version);
			}
			if (!zookeeper.exists(path + name + "/status")) {
				zookeeper.createNode(path + name + "/status", status);
			} else {
				zookeeper.setNodeData(path + name + "/status", status);
			}
			this.nodeUpdate();
		} catch (Exception e) {
			e.printStackTrace();
			logger.error("zookeeper register service fail, service name : "
					+ name, e);
		}
	}

	/**
	 * 卸载服务
	 * 
	 * @param path
	 * @param ds
	 */
	public void unRegisterService(String path, DataServiceInfo ds) {
		String name = ds.getName();
		try {
			if (zookeeper.exists(path + name + "/status")) {
				zookeeper.setNodeData(path + name + "/status",
						String.valueOf(ds.getState()));
				this.nodeUpdate();
			}
		} catch (Exception e) {
			e.printStackTrace();
			logger.error("zookeeper unRegister service fail, service name : "
					+ name, e);
		}
	}
	
	/**
	 * 更新节点状态变化
	 */
	public void nodeUpdate(){
		String data = UUID.randomUUID().toString();
		try {
			if(zookeeper.exists(NODE_CHANGE)){
				zookeeper.setNodeData(NODE_CHANGE, data);
			}else{
				zookeeper.createNode(NODE_CHANGE);
				zookeeper.setNodeData(NODE_CHANGE, data);
			}
		} catch (Exception e) {
			logger.error("更新节点变化状态错误", e);
		}
	}

}

 

  1. ServiceWatcher 实现 BundleContextAware接口,将BundleContext 设置进来。
  2. 在ServiceWatcher初始化的时候 实例化了两个服务跟踪器 (ServiceTracker)分别用来跟踪 BaseDataService、ServiceInterceptor 两类接口的服务。
  3. 在ServiceWatcher初始化的时候 给BundleContext新增Bunlde监听用来监听bundle的事件
  4. ServiceTracker需要传递一个ServiceTrackerCustomizer实例来具体执行监听的事件,在这里我们通过DataServiceTrackerCustomizer来具体执行BaseDataService接口服务跟踪操作。
  5. BaseDataService 服务被发布到osgi容器里的时候,会自动调用DataServiceTrackerCustomizer.addingService(ServiceReference reference) 方法,通过BundleContext.getService(ServiceReference reference),我们可以获取到当前被发布到osgi容器里的服务。
  6. 通过获取bundleName、bundleVersion、以前发布服务时定义的tag标签(name)组成唯一的key(bundleName+bundleVersion+name) 以获取到service为value。将此保存到服务容器(Map)里。

注:osmp-service里将监听到的服务同时保存到数据库里和注册到zookeeper请暂时忽略,稍后osmp注册中会作详细讲解。

 

服务发现和路由:

 

osmp-http接收到请求后解析服务名称,通过服务查询此服务是否绑定了拦截器,如果绑定了拦截器,则先执行拦截器链,拦截器如果执行失败则直接返回,如果拦截器执行成功则通过服务名称获取服务,获取服务成功后直接执行服务的execute方法,将并结果返回!

 

这里讲的简单点儿,具体osmp封装了一层代理实现,有兴趣的可以直接查看osmp-http源码。

 

至此osmp的服务发现和服务调用功能就讲到这里!

 

 

 

0
0
分享到:
评论

相关推荐

    《OSGi实战》完整中文版

    《 OSGi实战》是学习OSGi的全面指导,利用与架构和开发人员相关的示例清楚地讲解OSGi概念,同时探讨了很多实践场景和技术,阐述了开发人员有多需要OSGi,怎么将OSGi嵌入其他容器中,将遗留系统移入OSGi的最佳实践,...

    OSGI实战教程

    在OSGI实战教程中,首先需要了解OSGI(Open Services Gateway Initiative)是一个由众多IT公司共同制定的Java模块化标准规范,旨在实现软件组件的热插拔和服务动态管理。OSGI技术允许应用程序通过动态地安装、启动、...

    osgi,林昊写的osgi实战和进阶

    林昊所著的《OSGI实战》与《OSGI进阶》是深入理解OSGI技术的重要参考资料,适合对Java模块化系统感兴趣的初学者和有经验的开发者。 在《OSGI实战》中,作者林昊可能会详细讲解以下几个核心知识点: 1. **OSGI基础*...

    osgi实战(pdf超请版)

    ### OSGI实战知识点概述 #### 一、序言与背景介绍 - **背景与动机**:作者在工作之初便对插件体系结构产生了浓厚兴趣,尤其关注ant、maven等构建工具及其插件系统。这表明作者对于软件模块化、可扩展性的重视。 - ...

    OSGi实战(OSGi规范制定者亲自撰写,汇集Apache项目技术实战经验),完整扫描版

    为了弥补OSGi规范在应用指导方面的不足,四位活跃在OSGi开发第一线的技术专家联手打造了《OSGi实战》。《OSGi实战》面向OSGi规范的使用者,系统、全面、深入地阐述OSGi的重要特性及其使用方法。《OSGi实战》还介绍了...

    OSGI实战和OSGI进阶打包提供

    "OSGI实战"和"OSGI进阶"两份文档提供了深入理解OSGI的理论和实践指导,涵盖基础概念、核心API、实战案例以及高级特性,是学习OSGI的宝贵资料。其中,"OSGI实战.pdf"着重于实践操作,而"osgiopendoc2.pdf"可能包含了...

    OSGI资料,OSGI进阶,OSGI实战,OSGI入门和整合Spring

    本资料集合涵盖了OSGI的入门、进阶以及实战应用,特别是与Spring框架的整合。 《OSGI进阶》着重介绍了OSGI的核心概念和技术细节,包括: 1. **模块系统**:OSGI基于JAR文件作为模块的基本单位,每个模块都有自己的...

    OSGi 入门+进阶+实战

    5. **Blueprint或Declarative Services**:这两种是OSGi中的服务配置方式,Blueprint更接近XML,而Declarative Services使用注解,简化了服务的声明和管理。 6. **远程服务**:OSGi Remote Services允许Bundle之间...

    OSGI实战+进阶

    3. **远程服务**:学习如何在分布式环境中使用OSGI服务,以及如何处理网络延迟和服务发现的问题。 4. **Blueprint和Declarative Services**:这两个是OSGI中常见的服务配置方式,它们提供了一种声明式的XML格式来...

    OSGI实战中文版

    **OSGI实战中文版** OSGi(Open Services Gateway Initiative)是一种Java模块化系统,它允许开发者将应用程序分解为独立的、可热插拔的模块,称为服务。这些服务可以互相发现并交互,提供了灵活的组件化开发环境。...

    OSGI实战及源码

    5. **配置管理**:学习如何使用OSGI的配置Admin服务来动态管理和更新模块的配置。 6. **部署与更新**:介绍如何在OSGI运行时环境中部署和更新模块,以及如何处理模块间的依赖关系。 7. **Blueprint和Declarative ...

    OSGi实战

    资源名称:OSGi实战内容简介:为了弥补OSGi规范在应用指导方面的不足,四位活跃在OSGi开发第一线的技术专家联手打造了《OSGi实战》。《OSGi实战》面向OSGi规范的使用者,系统、全面、深入地阐述OSGi的重要特性及其...

    基于osgi框架实战源码

    2. **服务导向**:OSGi提供了一种服务注册和发现机制,允许模块之间通过服务进行通信,增强了组件间的解耦。 3. **动态性**:OSGi环境中,bundle可以在运行时动态地安装、启动、停止、更新和卸载,这对于软件维护和...

    OSGI合集 OSGi原理与最佳实践

    网上收集的OSGI资料. 包括: OSGi原理与最佳实践(精选版).pdf OSGI实战和源码.rar osgi进阶.pdf Introduce.OSGi.ppt OSGi.in.action.ppt r4.cmpn.pdf r4.core.pdf r4.enterprise.pdf

    OSGI实战.docx

    技术角度可能包括了OSGI的模块化结构、服务发现机制以及如何管理模块间的依赖。 三. OSGI带来什么 这部分将探讨OSGI为开发者和企业带来的好处,例如: 1. 动态性:OSGI允许在运行时安装、卸载、更新模块,无需重启...

    OSGI实战.pdf

    《OSGI实战》这本书主要涵盖了OSGI(Open Service Gateway Initiative)技术在实际开发中的应用与实践,它是一种Java模块化系统,旨在解决大型软件系统的复杂性问题。以下将详细阐述OSGI的核心概念、优势以及其在...

    OSGI实战(PDF)

    4. **服务**:OSGI服务是一种组件间通信的方式,通过服务注册和查找机制,bundle可以发现并使用其他bundle提供的功能。 在阅读《OSGI实战》这本书时,你会了解到如何: 1. **配置环境**:设置开发环境,如Eclipse ...

    OSGi实战 实例源代码

    这个“OSGi实战 实例源代码”是BlueDavy的opendoc《OSGi实战》一书中的配套实践代码,为初学者提供了深入理解OSGi机制的宝贵资源。通过这些源代码,读者可以更好地掌握OSGi的核心概念,包括服务、模块化、依赖管理和...

    osgi 实战 pdf

    《OSGi实战》一书由BlueDavy撰写,是一份详尽的OSGi入门与实战指南,适合初学者和有经验的开发者深入了解OSGi框架及其应用。以下是对该书籍核心知识点的总结: ### OSGi简介 OSGi(Open Service Gateway Initiative...

Global site tag (gtag.js) - Google Analytics