`
liudunxu2
  • 浏览: 32147 次
  • 性别: Icon_minigender_1
  • 来自: 青岛
文章分类
社区版块
存档分类
最新评论

metaq的客户端自动断线重连机制

 
阅读更多
1.metaq的数据传输基于gecko
2.metaq的RemotingClientWrapper是gecko的RemotingClient的包装类,通过ConcurrentHashMap<String/* url */, Set<Object>/* references */>成员变量添加了连接的建立和关闭计数功能。
3.remoteclient的连接语句如下:
for (int i = 0; i < connCount; i++) { 
            try { 
                final TimerRef timerRef = new TimerRef(((ClientConfig) this.config).getConnectTimeout(), null); 
                final Future<NioSession> future = 
                        ((GeckoTCPConnectorController) this.controller).connect(remoteAddress, groupSet, remoteAddress, 
                            timerRef); 
                final CheckConnectFutureRunner runnable = 
                        new CheckConnectFutureRunner(future, remoteAddress, groupSet, this); 
                timerRef.setRunnable(runnable); 
                this.insertTimer(timerRef); 
            } 
            catch (final Exception e) { 
                log.error("连接" + RemotingUtils.getAddrString(remoteAddress) + "失败,启动重连任务", e); 
                this.reconnectManager.addReconnectTask(new ReconnectTask(groupSet, remoteAddress)); 
            } 
        } 

在连接建立时,会新建一个定时任务,进行连接检测,连接的代码如下:
 @Override 
        public void run() { 
            try { 
                if (!this.future.isDone() && this.future.get(10, TimeUnit.MILLISECONDS) == null) { 
                    final ReconnectManager reconnectManager = this.remotingClient.getReconnectManager(); 
                    reconnectManager.addReconnectTask(new ReconnectTask(this.groupSet, this.remoteAddress)); 
                } 
            } 
            catch (final Exception e) { 
                log.error("连接" + this.remoteAddress + "失败", e); 
            } 
        }
插入到TimerRefQueue对象中,reactor类实例使用访问者模式遍历queue对象,实现重连调用,代码如下:
  private void processMoveTimer() { 
        final long now = this.getTime(); 
        // 距离上一次检测时间超过1秒 
        if (now - this.lastMoveTimestamp >= TIMEOUT_THRESOLD && !this.timerQueue.isEmpty()) { 
            this.lastMoveTimestamp = now; 
            this.timerQueue.iterateQueue(new TimerQueueVisitor(now)); 
        } 
    }
如果需要重连,插入重连任务队列LinkedBlockingQueue<ReconnectTask>tasks,进行重连,代码如下:
 private void doReconnectTask(final ReconnectTask task) throws IOException, NotifyRemotingException { 
            log.info("Try to reconnect to " + RemotingUtils.getAddrString(task.getRemoteAddress())); 
            final TimerRef timerRef = new TimerRef(ReconnectManager.this.clientConfig.getConnectTimeout(), null); 
            try { 
                final Future<NioSession> future = 
                        ReconnectManager.this.connector.connect(task.getRemoteAddress(), task.getGroupSet(), 
                            task.getRemoteAddress(), timerRef); 
                final DefaultRemotingClient.CheckConnectFutureRunner runnable = 
                        new DefaultRemotingClient.CheckConnectFutureRunner(future, task.getRemoteAddress(), 
                            task.getGroupSet(), ReconnectManager.this.remotingClient); 
                timerRef.setRunnable(runnable); 
                ReconnectManager.this.remotingClient.insertTimer(timerRef); 
                // 标记这个任务完成 
                task.setDone(true); 
            } 
            catch (final Exception e) { 
                this.readdTask(task); 
            } 
        }


分享到:
评论

相关推荐

    metaq消息中间件服务端、客户端资源汇集

    Metamorphosis是淘宝开源的一个Java消息中间件,他类似apache-kafka,但不是一个简单的山寨拷贝,而是做了很多改进和优化,项目的主页在淘蝌蚪上。服务端、客户端、javadoc都包含在内。

    metamorphosis(metaq)

    5. 调度与重试:如果消息未能成功送达消费者,MetaQ会根据配置进行重试,直至消息被正确消费或达到最大重试次数。 五、总结 MetaQ 1.4.3版本作为一款成熟的消息队列系统,以其高性能、高可用性和丰富的功能,为...

    metaq-server-1.4.6.2客户端+服务端

    MetaQ Server 1.4.6.2版本是这个中间件的一个特定发行版,包含了服务端和客户端的组件,以及相关的Javadoc文档。下面我们将详细探讨MetaQ的核心特性、工作原理、客户端和服务端的使用,以及Javadoc文档的重要性。 *...

    Metaq原理与应用

    Metaq 是一种高性能、高可用的消息中间件,其设计灵感来源于 Kafka,但并不严格遵循任何特定的规范,如 JMS(Java Message Service)或 CORBA Notification 规范。Metaq 提供了丰富的特性来解决 Messaging System 中...

    metaQ向spark传数据

    在大数据处理领域,MetaQ和Spark是两个非常关键的组件。MetaQ是腾讯开源的一款分布式消息中间件,常用于实时数据处理系统中的消息传递。而Spark则是一个强大的、通用的并行计算框架,专为大数据分析设计,尤其擅长...

    metaq-server-1.4.6.2.tar.gz

    1. 高可用:MetaQ Server通过主备切换机制确保服务的连续性,当主节点故障时,备份节点可以快速接管服务,保证业务不受影响。 2. 高性能:采用多线程并行处理和异步I/O模型,实现高效的消息发送和接收,同时支持...

    metaq-server-1.4.6.2.zip 和原版一样就是换了个名字

    MetaQ 1.4.6.2可能包含了身份验证和授权机制,如SSL/TLS加密通信,以及基于角色的访问控制(RBAC),以保护消息的机密性和完整性。 最后,MetaQ还提供了丰富的监控和管理工具,包括日志分析、性能指标监控和Web管理...

    Metaq在JDk 7下的异常及解决方案

    《Metaq在JDK 7下的异常及其解决策略》 Metaq是一款高性能的消息中间件,广泛应用于分布式系统中,提供高效、稳定的消息传递服务。然而,在JDK 7环境下,Metaq可能会遇到一些运行异常,其中最常见的就是与物理文件...

    Metaq详细手册.docx

    3. **客户端消费状态保存**:消费状态保存在客户端,允许消费者在断线后能从上次断点继续消费,无需重新读取已处理的消息,提升了效率。 4. **分布式架构**:Metaq支持分布式部署,生产者、服务器和消费者都可以...

    MetaQ 分布式消息服务中间件.pdf

    MetaQ支持的消息过滤方式有两种:服务器端过滤和客户端过滤。服务器端过滤可以在消息到达消费者之前进行过滤,减少无用数据在网路上传输,但会增加服务器端的负担。客户端过滤则是将无用数据传输到客户端,增加了...

    metaQ的安装包

    - 容错机制:在消费者端实现重试策略,防止因短暂网络问题导致的消息丢失。 - 限流与降级:在高并发场景下,可以使用消息队列进行流量控制,必要时进行服务降级。 通过以上介绍,我们可以了解到 MetaQ 是一个强大...

    zookeeper资料

    短暂znode的生命周期仅限于客户端会话,会话结束时Zookeeper会自动删除节点。持久znode不依赖于客户端会话,只有客户端明确要删除时才会被删除。Znode还有四种形式的目录节点,分别是PERSISTENT、PERSISTENT_...

    RocketMQ技术原理

    - 心跳处理: 维持与客户端连接的活动性检测机制。 - 连接复用: 利用现有连接进行通信,以减少资源消耗。 - 超时连接: 处理长时间无响应连接的策略。 7. RocketMQ服务发现(NameServer) NameServer是RocketMQ的...

    支付宝钱包系统架构内部剖析(架构图)

    - **配套项目**:为了进一步提升MetaQ的易用性和扩展性,支付宝团队还开发了一系列配套项目,包括Python客户端、Twitter Storm的Spout、Tail4j等工具。 ### 总结 通过上述分析可以看出,支付宝钱包系统的架构设计...

    Zookeeper概述

    - **Watcher机制**:客户端可以在节点上设置Watcher,当节点状态发生变化时,Zookeeper会通知客户端。 ### ZooKeeper的应用案例 - **统一命名服务**:在分布式环境中,使用Zookeeper进行服务的统一命名,简化服务...

    支付宝之所以牛逼的原因:来看内部架构剖析

    Metamorphosis(MetaQ)是一个高性能、高可用、可扩展的分布式消息中间件,类似于LinkedIn的Kafka,具有消息存储顺序写、吞吐量大和支持本地和XA事务等特性,适用于大吞吐量、顺序消息、广播和日志数据传输等场景,在...

    Metamorphosis, 一种高可用高性能的分布式.zip

    Metamorphosis, 一种高可用高性能的分布式 #新闻MetaQ 1.4.6.2 发布。更新日志MetaQ 1.4.6.1 发布。更新日志MetaQ 1.4.5.1 发布。更新日志MetaQ 1.4.5发布。更新日志meta: 一个用于的ruby 客户端。 源代码

    阿里rocketMQ

    阿里RocketMQ是一款开源的消息中间件,它在阿里巴巴集团内部广泛使用,并且被社区接纳成为Apache顶级项目。RocketMQ的设计目标是提供低延迟、高可靠、高可扩展的消息传递服务,适用于大规模分布式系统中的消息通讯。...

    Java消息通信研究.pdf

    RPC通信机制屏蔽了底层网络通信的细节和通信报文协议,减少了编程工作量,但并没有在运维上降低成本,文件系统客户端仍需感知服务端的存在。 考虑到传统通信方式的缺点,消息队列通信机制应运而生。消息队列的主要...

Global site tag (gtag.js) - Google Analytics