会导致“多个workers" (单个 worker 下工作正常)的状态下,导入 CSV 文件之后,不能正常的根据导入的 device进行 query remote, update emails . 这两个操作。
根据Randy的要求, 导入CSV之后,可以自动的查询远程,然后更新本地的device信息。 所以后来的代码是这样做的:
1. 建立优先级是 0 的JOB (最高), 来导入CSV
2. 建立优先级是 10 的JOB, (中级) , 来查询远程远程DEVICE数据。
3. 建立优先级是 20 的JOB (最低), 来更新本地DEVICE数据。
可是实际上发现,在多个WORKER下,上述情况不成里。 会有不少 优先级 = 20 的JOB,会在 优先级 = 10 的JOB执行完成之前被执行。
经过研究,发现了 delayed_job 的执行机制: 某个JOB在被执行时,会被设置 lock = true ,然后等它执行完毕了,worker再删掉这个job. 所以,在下面这个情况下, 低优先级的JOB会在高优先级的JOB未完成之前被执行:
job1, p = 10, 耗时 10s
job2, p = 10, 耗时 10s
job3, p = 10, 耗时 10s
job4, p = 20, 耗时 5s
job5, p = 20, 耗时 5s
在 worker = 5 (总之只要满足 > 3 这个条件) 时, 就会出现:
job1, job2, job3 被执行
job4, job5 也被执行
结果 job4, job5 被执行完之后, job 1, 2,3 都没执行完。
所以为了解决这个问题,我加入了"wait job" , 专门用于等待。
4 loop do
5 unless preconditional_jobs_exist?(priority_of_this_job)
6 break
7 end
8 Rails.logger.info "=== preconditional_jobs_exist:(priority < #{priority_of_this_job}), sleep 10 seconds"
9 sleep 10
10 end
最新(正在测试中)的代码是这样的:
1. 建立优先级是 0 的JOB (最高), 来导入CSV
2. 建立优先级是 10 的JOB, (中级) , 来查询远程远程DEVICE数据。
2.5 建立优先级是 15 的 WAIT JOB , 用来专门等待,确定 所有的 高优先级JOB 完成。
3. 建立优先级是 20 的JOB (最低), 来更新本地DEVICE数据。
目前从LOG来看, 这些JOB不是严格按照优先级执行的。 而是有极少数(3~ 5%) 会忽略优先级被执行。
所以这个问题还需要进一步的排查。
分享到:
相关推荐
在某些场景下,我们需要将消息延迟发送,例如定时任务、订单超时处理等,这时`rabbitmq_delayed_message_exchange`插件就派上了用场。 **插件介绍** `rabbitmq_delayed_message_exchange`是RabbitMQ的一个扩展,它...
安装一个插件即可:https://www.rabbitmq.com/community-plugins.html ,下载rabbitmq_delayed_message_exchange插件,然后解压放置到RabbitMQ的插件目录。 接下来,进入RabbitMQ的安装目录下的sbin目录,执行下面...
延迟的工作Mongoid后端 要求 Mongoid 5或更高版本。... script/rails runner 'Delayed::Backend::Mongoid::Job.create_indexes' 生成script/delayed_job : rails generate delayed_job 就是这样。 正常使用 。
RabbitMQ延迟消息交换机(rabbitmq_delayed_message_exchange)则是RabbitMQ中的一个特色插件,允许我们设定消息在特定时间后才被投递,这对于实现定时任务、延迟通知等功能具有极大的价值。本文将深入探讨RabbitMQ ...
rails g delayed_job:active_recordrake db:migrate锁定作业时出现问题您可以尝试使用旧版锁定代码。 通常速度较慢,但对某些人来说效果更好。 Delayed::Backend::ActiveRecord.configuration.reserve_sql_...
在RabbitMQ 3.7版本中,引入了一个新的特性——延迟消息交换机(Delayed Message Exchange),这为处理定时任务和延迟触发的业务逻辑提供了便利。本文将深入探讨这一功能及其应用场景。 ### 1. 延迟消息交换机介绍 ...
交换机是RabbitMQ的核心组件之一,它负责接收生产者发送的消息,并根据预设的路由规则将消息分发到一个或多个队列。在默认情况下,RabbitMQ提供了多种类型的交换机,如Direct、Fanout、Topic和Header。而延时交换机...
安装一个插件即可:https://www.rabbitmq.com/community-plugins.html ,下载rabbitmq_delayed_message_exchange插件,然后解压放置到RabbitMQ的插件目录。 接下来,进入RabbitMQ的安装目录下的sbin目录,执行下面...
delay_job 支持多个后端来存储作业队列。 。 如果你打算使用的delayed_job与活动记录,添加delayed_job_active_record到你Gemfile 。 gem 'delayed_job_active_record' 如果你打算使用的delayed_job与Mongoid,...
【RabbitMQ】延时队列插件(delayed_message_exchange) 查找Docker容器中的RabbitMQ镜像 2.上传rabbitmq_delayed_message_exchange-20171201-3.7.x.ez插件到Linux文件夹中 3.拷贝插件文件到rabbitMQ的Docker容器中
标题中的“rabbitmq_delayed_3.6.x延迟插件.rar”表明这是一个关于RabbitMQ的3.6.x版本的延迟消息插件。RabbitMQ是业界广泛使用的开源消息代理和队列服务器,它允许应用程序之间进行异步通信。延迟队列在某些场景下...
在这个压缩包文件"rabbitmq_delayed_message_exchange-20171215-3.6.x.zip"中,我们关注的核心是延迟消息交换(Delayed Message Exchange)功能,这是RabbitMQ在3.6.x版本中引入的一个重要特性。该功能允许用户设置...
这个功能在很多业务场景中非常有用,比如订单超时处理、定时任务调度等,它允许我们设置一个精确的延迟时间,让消息在那个时间点之后才被消费。 首先,我们需要理解RabbitMQ的基础概念。RabbitMQ是一个开源的消息...
截止2021.08.19日适配官网最新版rabbitmq3.9.3的消息延时队列插件,包内含有rabbitmq_delayed_message_exchange-3.9.0.ez格式、zip格式、tar.gz格式安装包任你选择~~
【RabbitMQ】延时队列插件(delayed_message_exchange) 查找Docker容器中的RabbitMQ镜像 ...3.拷贝插件文件到rabbitMQ的Docker容器中的/usr/lib/rabbitmq/lib/rabbitmq_server-3.7.7/plugins 目录下
在没有这个插件的情况下,RabbitMQ默认立即处理并转发消息到队列。通过使用延迟交换机,我们可以设置一个延迟时间,使得消息在指定的时间点才被消费。 **插件安装与配置** 在RabbitMQ中,插件可以通过命令行工具...
必须定义一个且只有一个 worker 的队列: Delayed :: Worker . queues = [ :some_queue ] # or with URL to avoid SQS requests Delayed :: Backend :: Sqs . queue_url = '...
标题中提到的"rabbitmq_delayed_message_exchange-3.8.0.ez.zip"是一个针对RabbitMQ 3.8X系列的延时队列插件安装包。这个插件允许我们创建特殊类型的交换机,这些交换机可以将消息放入一个具有特定延迟时间的队列,...
rabbitmq 延迟队列插件 rabbitmq_delayed_message_exchange_3.8.17 解压即用 输入命令进行安装 .\rabbitmq-plugins enable rabbitmq_delayed_message_exchange