`
tjuxiaoqiang
  • 浏览: 25914 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

kestrel 源码分析之四 Journal消息持久化类

阅读更多
上文我们看到PersistentQueue类的实现,它就代表每个消息队列在服务其中的实现,另外我们会看到每个PersistentQueue类包含一个Journal对象,该对象主要是负责队列的持久化操作。对于文件的读写我们用Java NIO来实现。下面我们来看一看主要的几个方法
def fillReadBehind(gotItem: QItem => Unit)(gotCheckpoint: Checkpoint => Unit): Unit = {
    val pos = if (replayer.isDefined) replayer.get.position else writer.position
    val filename = if (replayerFilename.isDefined) replayerFilename.get else queueName

    reader.foreach { rj =>
      if (rj.position == pos && readerFilename.get == filename) {
        // we've caught up.
        rj.close()
        reader = None
        readerFilename = None
      } else {
        readJournalEntry(rj) match {
          case (JournalItem.Add(item), _) =>
            gotItem(item)
          case (JournalItem.Remove, _) =>
            removesSinceReadBehind -= 1
          case (JournalItem.ConfirmRemove(_), _) =>
            removesSinceReadBehind -= 1
          case (JournalItem.Continue(item, xid), _) =>
            removesSinceReadBehind -= 1
            gotItem(item)
          case (JournalItem.EndOfFile, _) =>
            // move to next file and try again.
            val oldFilename = readerFilename.get
            rj.close()
            readerFilename = Journal.journalAfter(queuePath, queueName, readerFilename.get)
            reader = Some(new FileInputStream(new File(queuePath, readerFilename.get)).getChannel)
            log.info("Read-behind on '%s' moving from file %s to %s", queueName, oldFilename, readerFilename.get)
            if (checkpoint.isDefined && checkpoint.get.filename == oldFilename) {
              gotCheckpoint(checkpoint.get)
            }
            fillReadBehind(gotItem)(gotCheckpoint)
          case (_, _) =>
        }
      }
    }
  }


Kestrel对于过期数据的处理很巧妙,他通过两种方式来触发对过期数据的检查,一种是定时任务,我们在Kestrel.scala看到的定时器,就是设置定期去检查数据是否过期,第二种是当消费队列中的数据时会检查数据是否过期,这两种方法检查的都是队列头的数据。

当你需要对队列中的数据进行消费或者检查过期数据时,都需要调用fillReadBehind方法,因为你的内存是有一定空间的,它不一定能够把之前所有持久化的数据都导入内存中,所以当内存中一旦有空闲空间,都会从文件中继续读取数据到内存。

另外一个有意思的地方是,Journal对象会记录一个removesSinceReadBehind对象,这个对象记录从读文件开始到现在所有 ReadFile 模式下接受的remove操作的个数,这样每在文件中读到一个remove操作,就对其进行减一,就收一个pop操作就对该变量加一。这个方法能够保证服务在重启时replay这些日志文件时,消息没有丢失,但是对于消息的重复发送,不能进行保证。

对于事物,Kestrel会在PersistentQueue中保存一个Map中,用于存储该事物<事物ID,Item>, 当确认消费时,从map中删除这个事物,如果不提交,则把该消息插入到队列的首位,并从Map中删除该数据。

Kestrel中如果对消息队列选择了持久化,那么,客户的每一个操作都会记录为日志。这个日志正如Mysql一样,通过回放能够恢复之前的数据。
分享到:
评论

相关推荐

    Kestrel持久化队列服务器

    4. **持久化**: 当内存中的消息达到一定数量或者超过预设时间时,Kestrel会将消息持久化到磁盘,防止数据丢失。 ### Kestrel的工作原理 1. **内存缓冲区**: Kestrel首先将接收到的消息存储在内存中,提供快速的...

    征服 Kestrel

    通过阅读和分析源码,我们可以深入理解web服务器的工作原理,如如何处理请求和响应、如何进行TCP连接管理、如何实现异步I/O等。源码的结构清晰,易于理解和调试,可以帮助开发者定制自己的服务器行为或者优化性能。 ...

    征服 Kestrel + XMemcached

    例如,在电商网站中,用户添加商品到购物车的操作可以先通过Kestrel进行排队,然后后台服务再将这些信息持久化到数据库,同时利用XMemcached缓存用户购物车状态,提供快速响应。 **学习资源与实践** 要深入了解...

    在.NET 6.0上使用Kestrel配置和自定义HTTPS.doc

    四、Kestrel 配置步骤 我们可以通过以下步骤来配置 Kestrel: 1. 配置 Kestrel,我们可以在 CreateHostBuilder 方法中使用 UseKestrel 方法来配置 Kestrel。 2. 在 .NET Core 6.0 中,我们可以使用 var builder = ...

    kesterl源文件包

    kestrel项目源文件包

    征服 Kestrel + XMemcached + Spring TaskExecutor

    它的核心特性包括持久化、多客户端支持以及基于TCP协议的高效通信。在实际应用中,开发者可以将Kestrel作为任务队列,处理异步任务和批量操作,以减轻应用程序的负担并提高响应速度。 XMemcached是Java社区中一个...

    ASP.NET Core Kestrel 中使用 HTTPS (SSL)

    在ASP.NET Core中,如果在Kestrel中想使用HTTPS对站点进行加密传输,可以按照如下方式  申请证书  这一步就不详细说了,有免费的和收费的,申请完成之后会给你一个*.pfx结尾的文件。  添加NuGet包  nuget中...

    Python库 | kestrel-lang-1.0.5.tar.gz

    资源分类:Python库 所属语言:Python 资源全名:kestrel-lang-1.0.5.tar.gz 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059

    Kestrel框架的使用demo

    **四、Kestrel与IIS/IIS Express的配合** 虽然Kestrel可以直接对外提供服务,但为了生产环境的安全和性能考虑,通常会将其部署在IIS或IIS Express后面。这样,IIS可以处理网络层面的连接,Kestrel则专注于处理应用...

    addlog-kestrel

    addlog-kestrel

    kestrel.node:Node.js 的 Kestrel 客户端

    红隼节点Node.js 的 Kestrel 客户端安装 npm install kestrel.node用法 var Kestrel = require ( 'kestrel.node' ) ;var client = new Kestrel ( 'localhost:22133' ) ;// get can optionally take a timeout in ...

    web服务器KestrelHttpServer.zip

    5. **可扩展性**:Kestrel设计为模块化,可以通过中间件系统轻松扩展其功能。 6. **集成能力**:Kestrel可以与反向代理服务器如Nginx或IIS集成,实现负载均衡和静态文件服务等功能。 **三、KestrelHttpServer的...

    Kestrel:Kestrel是在I386上运行的实验内核-开源

    Kestrel是不是Unix或Windows的内核。

    Kestrel封装成WindowServer.zip

    这篇文章主要是记录如何将Kestrel的服务封装在WindowService中 关于WindowsServer 请参考如下这篇文章 .netcore worker service (辅助角色服务) 的上手入门,包含linux和windows服务部署 开发服务 之前做过.net5...

    ASP.NET Core 因为 Nginx 配置 Connection 为 Upgrade 导致 Kestrel 返回 400

    4. **WebSocket**:提供持久连接的网络协议,允许双向通信。 5. **HTTP 首部字段**:如 `Connection` 和 `Upgrade`,在 WebSocket 协议升级过程中起关键作用。 6. **Nginx 配置**:`proxy_set_header` 用于设置转发...

    Python库 | kestrel_lang-1.1.0-py3-none-any.whl

    # 接下来,你可以调用库提供的函数或类,具体取决于库的功能 result = kestrel_lang.some_function() ``` Python库的开发遵循一定的规范,如PEP 8编码风格、文档字符串(docstrings)以及测试框架如unittest或...

    message_system_test_report.rar_ActiveMQ java_activemq_httpsqs_me

    在性能测试中,会关注其缓存效率、消息持久化策略以及高可用性。 在所有这些测试报告中,除了详细记录每个消息队列的安装步骤和配置细节,还会包含性能测试的具体指标和结果,例如每秒处理消息数(TPS)、平均响应...

Global site tag (gtag.js) - Google Analytics