上文我们看到PersistentQueue类的实现,它就代表每个消息队列在服务其中的实现,另外我们会看到每个PersistentQueue类包含一个Journal对象,该对象主要是负责队列的持久化操作。对于文件的读写我们用Java NIO来实现。下面我们来看一看主要的几个方法
def fillReadBehind(gotItem: QItem => Unit)(gotCheckpoint: Checkpoint => Unit): Unit = {
val pos = if (replayer.isDefined) replayer.get.position else writer.position
val filename = if (replayerFilename.isDefined) replayerFilename.get else queueName
reader.foreach { rj =>
if (rj.position == pos && readerFilename.get == filename) {
// we've caught up.
rj.close()
reader = None
readerFilename = None
} else {
readJournalEntry(rj) match {
case (JournalItem.Add(item), _) =>
gotItem(item)
case (JournalItem.Remove, _) =>
removesSinceReadBehind -= 1
case (JournalItem.ConfirmRemove(_), _) =>
removesSinceReadBehind -= 1
case (JournalItem.Continue(item, xid), _) =>
removesSinceReadBehind -= 1
gotItem(item)
case (JournalItem.EndOfFile, _) =>
// move to next file and try again.
val oldFilename = readerFilename.get
rj.close()
readerFilename = Journal.journalAfter(queuePath, queueName, readerFilename.get)
reader = Some(new FileInputStream(new File(queuePath, readerFilename.get)).getChannel)
log.info("Read-behind on '%s' moving from file %s to %s", queueName, oldFilename, readerFilename.get)
if (checkpoint.isDefined && checkpoint.get.filename == oldFilename) {
gotCheckpoint(checkpoint.get)
}
fillReadBehind(gotItem)(gotCheckpoint)
case (_, _) =>
}
}
}
}
Kestrel对于过期数据的处理很巧妙,他通过两种方式来触发对过期数据的检查,一种是定时任务,我们在Kestrel.scala看到的定时器,就是设置定期去检查数据是否过期,第二种是当消费队列中的数据时会检查数据是否过期,这两种方法检查的都是队列头的数据。
当你需要对队列中的数据进行消费或者检查过期数据时,都需要调用fillReadBehind方法,因为你的内存是有一定空间的,它不一定能够把之前所有持久化的数据都导入内存中,所以当内存中一旦有空闲空间,都会从文件中继续读取数据到内存。
另外一个有意思的地方是,Journal对象会记录一个removesSinceReadBehind对象,这个对象记录从读文件开始到现在所有 ReadFile 模式下接受的remove操作的个数,这样每在文件中读到一个remove操作,就对其进行减一,就收一个pop操作就对该变量加一。这个方法能够保证服务在重启时replay这些日志文件时,消息没有丢失,但是对于消息的重复发送,不能进行保证。
对于事物,Kestrel会在PersistentQueue中保存一个Map中,用于存储该事物<事物ID,Item>, 当确认消费时,从map中删除这个事物,如果不提交,则把该消息插入到队列的首位,并从Map中删除该数据。
Kestrel中如果对消息队列选择了持久化,那么,客户的每一个操作都会记录为日志。这个日志正如Mysql一样,通过回放能够恢复之前的数据。
分享到:
相关推荐
4. **持久化**: 当内存中的消息达到一定数量或者超过预设时间时,Kestrel会将消息持久化到磁盘,防止数据丢失。 ### Kestrel的工作原理 1. **内存缓冲区**: Kestrel首先将接收到的消息存储在内存中,提供快速的...
通过阅读和分析源码,我们可以深入理解web服务器的工作原理,如如何处理请求和响应、如何进行TCP连接管理、如何实现异步I/O等。源码的结构清晰,易于理解和调试,可以帮助开发者定制自己的服务器行为或者优化性能。 ...
例如,在电商网站中,用户添加商品到购物车的操作可以先通过Kestrel进行排队,然后后台服务再将这些信息持久化到数据库,同时利用XMemcached缓存用户购物车状态,提供快速响应。 **学习资源与实践** 要深入了解...
四、Kestrel 配置步骤 我们可以通过以下步骤来配置 Kestrel: 1. 配置 Kestrel,我们可以在 CreateHostBuilder 方法中使用 UseKestrel 方法来配置 Kestrel。 2. 在 .NET Core 6.0 中,我们可以使用 var builder = ...
kestrel项目源文件包
它的核心特性包括持久化、多客户端支持以及基于TCP协议的高效通信。在实际应用中,开发者可以将Kestrel作为任务队列,处理异步任务和批量操作,以减轻应用程序的负担并提高响应速度。 XMemcached是Java社区中一个...
在ASP.NET Core中,如果在Kestrel中想使用HTTPS对站点进行加密传输,可以按照如下方式 申请证书 这一步就不详细说了,有免费的和收费的,申请完成之后会给你一个*.pfx结尾的文件。 添加NuGet包 nuget中...
资源分类:Python库 所属语言:Python 资源全名:kestrel-lang-1.0.5.tar.gz 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059
**四、Kestrel与IIS/IIS Express的配合** 虽然Kestrel可以直接对外提供服务,但为了生产环境的安全和性能考虑,通常会将其部署在IIS或IIS Express后面。这样,IIS可以处理网络层面的连接,Kestrel则专注于处理应用...
addlog-kestrel
红隼节点Node.js 的 Kestrel 客户端安装 npm install kestrel.node用法 var Kestrel = require ( 'kestrel.node' ) ;var client = new Kestrel ( 'localhost:22133' ) ;// get can optionally take a timeout in ...
5. **可扩展性**:Kestrel设计为模块化,可以通过中间件系统轻松扩展其功能。 6. **集成能力**:Kestrel可以与反向代理服务器如Nginx或IIS集成,实现负载均衡和静态文件服务等功能。 **三、KestrelHttpServer的...
Kestrel是不是Unix或Windows的内核。
这篇文章主要是记录如何将Kestrel的服务封装在WindowService中 关于WindowsServer 请参考如下这篇文章 .netcore worker service (辅助角色服务) 的上手入门,包含linux和windows服务部署 开发服务 之前做过.net5...
4. **WebSocket**:提供持久连接的网络协议,允许双向通信。 5. **HTTP 首部字段**:如 `Connection` 和 `Upgrade`,在 WebSocket 协议升级过程中起关键作用。 6. **Nginx 配置**:`proxy_set_header` 用于设置转发...
# 接下来,你可以调用库提供的函数或类,具体取决于库的功能 result = kestrel_lang.some_function() ``` Python库的开发遵循一定的规范,如PEP 8编码风格、文档字符串(docstrings)以及测试框架如unittest或...
在性能测试中,会关注其缓存效率、消息持久化策略以及高可用性。 在所有这些测试报告中,除了详细记录每个消息队列的安装步骤和配置细节,还会包含性能测试的具体指标和结果,例如每秒处理消息数(TPS)、平均响应...