`
tjuxiaoqiang
  • 浏览: 25987 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

kestrel 源码分析之三 PersistentQueue对应一个内存中的队列

阅读更多
PersistentQueue类主要用于在内存中维护一个队列,该类是Kestrel的核心类,是真正对于一个队列的维护。我们看到上文中QueueCollection中主要是调用下面的setup方法,下面我们看看setup方法到底做了什么
def setup() {
    synchronized {
      queueSize = 0
      replayJournal()
    }
  }

def replayJournal() {
    if (!config.keepJournal) return

    log.info("Replaying transaction journal for '%s'", name)
    xidCounter = 0

    journal.replay {
      case JournalItem.Add(item) =>
        _add(item)
        // when processing the journal, this has to happen after:
        if (!journal.inReadBehind && queueSize >= config.maxMemorySize.inBytes) {
          log.info("Dropping to read-behind for queue '%s' (%d bytes)", name, queueSize)
          journal.startReadBehind()
        }
      case JournalItem.Remove => _remove(false, None)
      case JournalItem.RemoveTentative(xid) =>
        _remove(true, Some(xid))
        xidCounter = xid
      case JournalItem.SavedXid(xid) => xidCounter = xid
      case JournalItem.Unremove(xid) => _unremove(xid)
      case JournalItem.ConfirmRemove(xid) => openTransactions.remove(xid)
      case JournalItem.Continue(item, xid) =>
        openTransactions.remove(xid)
        _add(item)
      case x => log.error("Unexpected item in journal: %s", x)
    }

    log.info("Finished transaction journal for '%s' (%d items, %d bytes) xid=%d", name, queueLength,
             journal.size, xidCounter)
    journal.open()

    // now, any unfinished transactions must be backed out.
    for (xid <- openTransactionIds) {
      journal.unremove(xid)
      _unremove(xid)
    }
  }

我们看到setup方法主要是回放journal文件中的redo Log。我们看到该类主要是读Journal文件,并解析为JournalItem对象,并根据对象类型对PersistentQueue进行入队和出队。我们看到一共有7种不同的日志类型,下面我们看看几个主要的类型日志的结构
ADDX:
Remove:

我们会看到在回放日志文件的同时,他会记录所有到目前为止所有的没有结束的事务在openTransactions中,其中他们的key的集合为openTransactionIds,对这些没有结束的事务做unremove操作。
另外还有一个关键是在对队列进行Add和Remove操作时调用checkRotateJournal()方法,即检查是否需要对日志文件进行切分、删除等操作。下面我们看看具体的代码
// you are holding the lock, and config.keepJournal is true.
  private def checkRotateJournal() {
    /*
     * if the queue is empty, and the journal is larger than defaultJournalSize, rebuild it.
     * if the queue is smaller than maxMemorySize, and the combined journals are larger than
     *   maxJournalSize, rebuild them. (we are not in read-behind.)
     * if the current journal is larger than maxMemorySize, rotate to a new file. if the combined
     *   journals are larger than maxJournalSize, checkpoint in preparation for rebuilding the
     *   older files in the background.
     */
    if ((journal.size >= config.defaultJournalSize.inBytes && queueLength == 0) ||
        (journal.size + journal.archivedSize > config.maxJournalSize.inBytes &&
         queueSize < config.maxMemorySize.inBytes)) {
      log.info("Rewriting journal file for '%s' (qsize=%d)", name, queueSize)
      journal.rewrite(openTransactionIds.map { openTransactions(_) }, queue)
    } else if (journal.size > config.maxMemorySize.inBytes) {
      log.info("Rotating journal file for '%s' (qsize=%d)", name, queueSize)
      val setCheckpoint = (journal.size + journal.archivedSize > config.maxJournalSize.inBytes)
      journal.rotate(openTransactionIds.map { openTransactions(_) }, setCheckpoint)
    }
  }

1. 当内存中queue为空,并且日志文件的大小大于defaultJournalSize时,将创建一个新的日志文件并删除该时间点之前的文件。
2. 当之前的日志文件大小加上当前日志文件大小大于maxJournalSize,并且queue中数据的大小小于maxMemorySize时,将创建一个新的日志文件并删除该时间点之前的文件。
3. 当日志文件的大小大于maxMemorySize,将创建新的日志文件,之后达到的消息将存入新的文件。
分享到:
评论

相关推荐

    Kestrel持久化队列服务器

    Kestrel是一个强大的消息队列服务器,以其简洁的设计、高效的性能和良好的持久化策略在实时处理和分布式系统中发挥着重要作用。通过深入研究其源码和测试用例,我们可以更好地理解其内部工作机制,并可能进行定制化...

    征服 Kestrel

    Kestrel,这个名字在IT行业中通常指的是Microsoft开发的一款开源、高性能、异步网络库,它是ASP.NET Core框架的一部分,用于构建web服务器。Kestrel设计的目标是提供一个轻量级、可靠且可扩展的平台,能够处理高并发...

    征服 Kestrel + XMemcached

    Kestrel是一个高性能、异步的分布式消息队列,而XMemcached则是一个广泛使用的Java客户端,用于连接到Memcached缓存服务器。在这里,我们将会探讨这两个技术的基本概念、它们在IT领域的应用以及如何将它们结合使用。...

    在.NET 6.0上使用Kestrel配置和自定义HTTPS.doc

    在 ASP.NET Core 的第一个版本中,微软使用了 libuv,然后在其顶部添加了一个名为 Kestrel 的层。现在,Kestrel 已经发展成熟,是一个能运行 ASP.NET Core 应用的微软系的 HTTP 服务器。 二、Kestrel 的角色 IIS ...

    征服 Kestrel + XMemcached + Spring TaskExecutor

    标题中的“征服 Kestrel + XMemcached + Spring TaskExecutor”揭示了本次讨论的核心内容,涉及到三个关键的技术组件:Kestrel、XMemcached以及Spring的TaskExecutor。这些技术都是在构建高性能、可扩展的分布式系统...

    ASP.NET Core Kestrel 中使用 HTTPS (SSL)

    这一步就不详细说了,有免费的和收费的,申请完成之后会给你一个*.pfx结尾的文件。  添加NuGet包  nuget中查找然后再程序中添加引用Microsoft.AspNetCore.Server.Kestrel.Https  配置  把*.pfx结尾的文件...

    kesterl源文件包

    kestrel项目源文件包

    Kestrel框架的使用demo

    Kestrel框架是ASP.NET Core的一部分,它是一个高度可配置、高性能的Web服务器,被设计用于构建跨平台的现代Web应用。Kestrel自.NET Core 1.0版本起就已成为默认的Web服务器,支持Windows、Linux、macOS等操作系统,...

    Python库 | kestrel-lang-1.0.5.tar.gz

    资源分类:Python库 所属语言:Python 资源全名:kestrel-lang-1.0.5.tar.gz 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059

    一个超级简单方便的网关,基于Kestrel+Yarp实现的网关 支持动态配置路由,支持动态配置集群,支持动态配置HTTPS证书

    **FastGateway:基于Kestrel+Yarp的...总之,FastGateway凭借其基于Kestrel和Yarp的高效实现,以及动态配置的能力,为开发者提供了一个强大而灵活的API网关解决方案,满足了现代Web应用对性能、安全和易用性的需求。

    web服务器KestrelHttpServer.zip

    KestrelHttpServer是ASP.NET 5框架中的一个核心组件,它作为Web服务器的角色,为开发者提供了轻量级、高性能的解决方案。这个开源项目使得开发人员能够构建高度可定制的Web应用程序,并在各种操作系统上运行,包括...

    addlog-kestrel

    addlog-kestrel

    message_system_test_report.rar_ActiveMQ java_activemq_httpsqs_me

    在IT行业中,消息队列(Message Queue)是用于在分布式系统中解耦组件、提高处理效率和实现异步通信的关键技术。本测试报告涉及到多个消息队列服务的安装部署及性能测试,包括ActiveMQ、HTTPSQS、Kestrel和MemcacheQ...

    Kestrel封装成WindowServer.zip

    背景 ...新建一个webapi项目,如下图 添加Controller using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.Mvc; namespace WebApiNet_v5.Controllers { [Route("api/[controller]")]

    kestrel.node:Node.js 的 Kestrel 客户端

    红隼节点Node.js 的 Kestrel 客户端安装 npm install kestrel.node用法 var Kestrel = require ( 'kestrel.node' ) ;var client = new Kestrel ( 'localhost:22133' ) ;// get can optionally take a timeout in ...

    .NET-KestrelHttpServer一个用于ASPNETCore的跨平台Web服务器

    `.NET-KestrelHttpServer` 是ASP.NET Core框架中的一个重要组成部分,它是一个高度可配置、高性能的跨平台Web服务器。Kestrel被设计为能够直接与操作系统交互,从而提供高效的服务,支持HTTP/1.1和HTTP/2协议。本文...

    kestrel-task-executor:Kestrel + XMemcached + Spring TaskExecutor

    这个项目名为"kestrel-task-executor",它结合了Kestrel消息队列、XMemcached缓存客户端以及Spring的TaskExecutor框架,构建了一个强大的任务处理平台。以下将详细解析这个项目的组成部分和它们如何协同工作。 1. *...

    Python库 | kestrel_lang-1.1.0-py3-none-any.whl

    "kestrel_lang-1.1.0-py3-none-any.whl"就是这样一个Python库的发行版,它适用于Python 3解释器。 首先,我们来了解一下`.whl`文件。`.whl`是Python的二进制分发格式,它是Python Wheel项目的产物。Wheel格式旨在...

    ASP.NET Core 因为 Nginx 配置 Connection 为 Upgrade 导致 Kestrel 返回 400

    ` 设置不当时,Nginx 可能无法正确地将升级请求转发给 Kestrel,导致 Kestrel 无法识别出这是一个 WebSocket 请求,从而返回 400 坏请求错误。Kestrel 期望 `Connection` 字段包含 `upgrade`,而不仅仅是 `Upgrade`...

Global site tag (gtag.js) - Google Analytics