上篇文章,简单介绍了HSF框架的初始化。这一篇,小编将为大家带来HSF provider的启动和服务细节。主要关注点:Server的启动,服务的注册,调用处理主流程(IO线程,业务线程)。
一. Server的启动
在某一个HSFSpringProviderBean初始化时,启动HSF Server。在HSF默认协议使用TBRemoting作为RPC框架,TBRemotingRPCProtocolComponent.registerProvider中:
providerServer.startHSFServer();
其代码如下:
Server server = new DefaultServer(configService.getHSFServerPort());
server.start();
……
DefaultServer构造:
acceptor = new SocketAcceptor(processorCount, IO_THREAD_POOL);
SocketAcceptorConfig cfg =(SocketAcceptorConfig)acceptor.getDefaultConfig();
cfg.setBacklog(BACKLOG_SIZE);
cfg.setThreadModel(ThreadModel.MANUAL);
cfg.getFilterChain().addLast(CODEC_FILTER_NAME, CODEC_FILTER);
1. 可以看到是使用了mina作为NIO框架,启动一个server的SocketAcceptor,并配置
2. 关键的一点是在filter的最后加了一个CODEC_FILTER,做为序列化的filter。后续会单独分析。
Server的start:
IoHandlerioHandler = newDefaultIoHandler(this);
try {
acceptor.bind(serverSocket,ioHandler);
started= true;
LOGGER.warn("服务器已启动:" + serverSocket);
}
1. 构造一个默认handler,作为接受RPC请求的处理入口
2. 绑定端口
已上就完成了HSF的Server启动。此时端口已经打开,可以接受请求了。
二. 服务的注册
在启动server之后,HSF会将这个provider信息注册到configserver上。入口是: metadataService.publish(metadata);
具体实现:
privatePublisher<String> doPublish(ServiceMetadata metadata) {
final String serviceUniqueName =metadata.getUniqueName();
final String data =HSFServiceTargetUtil.getTarget(configService.getHSFServerPort(), metadata);
final String publisherId =PUBLISHER_PREFIX + serviceUniqueName;
PublisherRegistration<String>registration = new PublisherRegistration<String>(publisherId,serviceUniqueName);
registration.setGroup(metadata.getGroup());
Publisher<String> publisher =PublisherRegistrar.register(registration);
publisher.publish(data);
return publisher;
}
1. 拼成TB Remoting格式的Target地址信息: 10.7.42.161:12200?CLIENTRETRYCONNECTIONTIMES=3&CLIENTRETRYCONNECTIONTIMEOUT=1000&_SERIALIZETYPE=hessian&_IDLETIMEOUT=10&_TIMEOUT=5000
2. 构造PublisherRegistration数据实体,代表一次发布,设置provider的分组,给分组调用用。
3. 注册一个新的发布者身份以发布数据
a. 默认使用IsolatedPublisherRegistrar进行注册
b. 构造默认发布者DefaultPublisher
DefaultPublisher(PublisherRegistration<T>registration, ConfigClientWorker worker) {
super(registration, worker);
......
this.regRequest = newPublisherRegReqPacket(registration.getDataId(), registration.getClientId(),
datumId != null ? datumId :UUID.randomUUID().toString()); //UUID as default datum ID.
.......
}
在父类DefaultDataClient中初始化ConfigClientWorker:
this.worker= (worker != null) ? worker : ConfigClientWorker.getDefaultWorker();
ConfigClientWorker是config server客户端的核心类,负责管理客户端task的异步执行,当然发布provider也是一个task。初始化ConfigClientWorker时,会初始化其内部的WorkThread线程和连接管理ConfigClientConnection。
可见ConfigClientWorker管理着业务线程和IO线程之间的交互,非常重要。此时,work线程还没启动,和Config server的连接也没有连接,只是初始化而已。
4. 发布数据
a. 先看看data是否可序列化,使用java序列化data
newObjectOutputStream(new OutputStream() {
@Override public void write(intb) throws IOException {}
}).writeObject(datum);
b. 通过之前创建的ConfigClientWorker提交一个任务 worker.schedule(this);
c. 将task放入阻塞队列 tasks.put(task);
d. 唤醒work线程,workerThread.signal();
e. 如果是第一次,则启动work线程
synchronized(bell){
if (isAlive()) {
bell.notifyAll();
} else {
try {
start(); // Lazy start
}catch(IllegalThreadStateException e) {
log.fatal("[Internal] Worker thread is dead.");
}
}
}
可以发现发布数据是一个异步操作,中间通过ConfigClientWorker隔开了,而WorkThread是处理task的线程,task包括发送和接受,对应ConfigClientWorker的2个属性:
// Tokeep messages from configServer. Access is shared between deliverer thread andcommunication thread.
finalQueue<ProtocolPackage> mailbox = newLinkedBlockingQueue<ProtocolPackage>(MAX_MAILBOX_SIZE);
// Access isshared between user thread and deliverer thread.
private final TaskQueue tasks =new TaskQueue();
f. Work线程执行发布任务,创建和ConfigServer的连接
privatevoid ensureConnected() throws InterruptedException {
if (connection.isConnected())return;
// Block until connected to server.
while(! connection.connect()) {
Thread.sleep(GLOBAL_RECONNECTING_DELAY);
}
}
之前创建的Connection在这里同步初始化连接,使用TB remoting框架,这里最终是通过一个共享的资源器拿到连接,有兴趣的同学可以单独研究哈。最终的mina client代码:
ConnectFutureminaConnectFuture = connector.connect(
ConnectionUrl.socketAddr(targetUrl),
localAddress,
new ClientIoHandler(this,clientKey, copyListeners),
cfg);
g. 连接创建好之后,开始发送数据 resp= sendReceive(packagee);此处最终会调用mina的session.write(connectionMsg, wfl); 发送数据是异步+wait的过程,发送完成之后会起一个定时timer到超时点时运行,返回客户端超时异常。
TimeoutHandle timeoutHandle = new TimeoutHandle();
timeoutFuture =DefaultClientManager.timer.schedule(timeoutHandle,connRequest.getRespTimeout(), TimeUnit.MILLISECONDS);
三. 服务调用
Provider注册完之后,consumer就可以通过configserver拿到地址了,发起调用了。
在server端通过mina框架的handler接受请求并处理。在mina的filter端有一个codecfilter类RemotingProtocolCodecFilter。对应的decoder和encoder为RemotingProtocolEncoder和RemotingProtocolDecoder。
在Decoder关键点:
a. 使用session保存半包的请求,二进制协议的关键
b. 按协议对ByteBuffer进行解析,输出到ProtocolDecoderOutput
/*
* 新版报文组成:
* Header(1B): 报文版本
* Header(1B): 请求/响应
* Header(1B): 报文协议(HESSIAN/JAVA)
* Header(1B): 单向/双向
* Header(1B): Reserved
* Header(4B): 通信层对象长度
* Header(1B): 应用层对象类名长度
* Header(4B): 应用层对象长度
* Body: 通信层对象
* Body: 应用层对象类名
* Body: 应用层对象
*/
Codec之后会调用DefaultIoHandler.messageReceived进行处理:
DefaultConnection conn =DefaultConnection.findBySession(session);
conn.getMsgReceivedListener().messageReceived(conn, message);
DefaultMsgListener端会执行doRequest方法:
a. 构造响应的ConnectionResponse对象
b. 拿之前注册的processor即ProviderProcessor
c. 拿之前注册的处理请求的线程池
d. 启动一个runnable,扔到线程池中执行
e. Mina io线程返回
业务线程端:
a. 调用ProviderProcessor. handleRequest进行处理
b. 切换classloader到app的classloader,不然会找不到app的类
Thread.currentThread().setContextClassLoader(servicePOJO.getClass().getClassLoader());
c. 反射调用服务方法
Object appResp =workerMethod.invoke(servicePOJO, methodArgs);
d. 返回执行结果
四. 小结
本文简单小结了HSF的服务启动和调用,典型的RPC应用,中间的一些知识点,序列化,nio,异步调用等都是java的基础知识,希望通过本文让大家增加对rpc框架的认识~~
HSF由于使用了OSGI容器,导致对app容器的依赖,扩展性上确实做的不够,tb remoting基本和mina绑死了,这也是淘宝应用的一个特点,能跑能解决业务问题就是王道~~
分享到:
相关推荐
Spring Boot、HSF(High Speed Service Framework)以及EDAS(Enterprise Distributed Application Service)是阿里巴巴提供的强大工具,帮助开发者快速构建和部署分布式服务。让我们深入探讨这些技术,并了解如何...
HSF(High Speed Service Framework)是阿里巴巴开源的一款高性能、轻量级的服务框架,主要用于构建...因此,我们主要关注HSF服务框架的相关内容,期望从"HSF服务框架共28页.pdf.zip"中获得深入的理论知识和实践经验。
4. **服务启动与停止脚本(Start & Stop Scripts)**:用于启动和停止HSF服务的shell或bat脚本。 5. **示例客户端(Sample Client)**:演示如何作为服务消费者调用HSF服务。 通过以上介绍,你对HSF的开发环境设置...
《分布式服务框架原理与实践(Dubbo,HSF)_李林锋著》这本书深入探讨了分布式服务框架的关键技术和实际应用,主要聚焦于两个知名的开源框架——Dubbo和HSF。这两者都是实现高效率、可扩展的分布式服务的核心工具,尤其...
HSF(High Speed Service Framework,高速服务框架)是阿里巴巴开源的一款高性能、轻量级的服务治理框架,主要用于构建分布式服务系统。它为开发者提供了简单易用的API,使得在Java环境中开发、发布、调用分布式服务...
HSF(High Speed Service Framework,高速服务框架)是阿里巴巴开源的一款高性能、轻量级的服务框架,主要用于构建分布式服务系统。HSF使得应用可以像调用本地方法一样调用远程服务,极大地提升了开发效率和系统的可...
淘宝HSF(High Speed Service Framework)是阿里巴巴开源的一款高性能、轻量级的服务框架,主要用于构建大规模分布式服务系统。HSF全称为High Speed Service Framework,它基于Java语言开发,旨在提高服务调用的效率...
阿里云HSF(High Speed Service Framework)用户开发指导手册是一份专门为Java开发者设计的文档,旨在帮助他们理解和高效地使用HSF服务框架。HSF是阿里巴巴开源的一款高性能、轻量级的服务治理框架,主要用于构建...
HSF(High Speed Service Framework)是阿里云EDAS中的核心组件,它是一个高性能、轻量级的Java服务框架,用于构建微服务架构。HSF-BOOT则是一个帮助开发者快速启动HSF服务的引导项目,它简化了HSF服务的开发流程,...
HSF(High Speed Service Framework,高速服务框架)是阿里巴巴开源的一款高性能、轻量级的分布式服务框架,主要用于构建大规模分布式服务系统。它基于Java语言,提供了包括服务注册、发现、调用、负载均衡、容错、...
【标题】"taobao-hsf" 是一个与阿里巴巴云服务相关的技术组件,它主要用于构建分布式服务框架。在阿里云的生态系统中,HSF(High Speed Service Framework)是为大规模分布式服务设计的一种高性能、高可用的服务化...
标题中的“taobao-HSF”指的是淘宝的High Speed Service(HSF)框架,它是一个高性能、高可用的服务治理框架,主要用于阿里巴巴集团内部的分布式服务调用。HSF提供了包括服务发现、服务调用、负载均衡、熔断保护等一...
HSF 框架的原理讲解,主要包含了知识点:rpc,动态代理,HSF可以作为微服务的基础框架进行二次开发
"淘宝好舒服"(Taobao-HSF)是一个专为阿里巴巴集团内部设计的高性能服务框架,全称为High Speed Service。这个框架主要用于实现企业级的服务化架构,提供高效、稳定、灵活的服务调用解决方案。HSF是淘宝核心的服务...
HSF(High-Speed Service Framework)是阿里巴巴集团内部广泛使用的一个高性能、轻量级的RPC框架。它主要应用于分布式系统中,提供服务间的高效通信机制。HSF的目标是简化分布式应用的开发过程,通过隐藏底层的复杂...
阿里云HSF(High Speed Service Framework)是一款高性能、轻量级的服务框架,专为构建分布式服务而设计。HSF是阿里巴巴内部广泛使用的微服务框架,它提供了服务注册、发现、调用、负载均衡、熔断、限流等功能,极大...
淘宝的HSF框架,用户手册,有兴趣的人欢迎下载~内部文档哦~