1 什么是 Active Job?
Active Job 是一个用于声明作业并在各种排队后端上运行它们的框架。这些作业可以是定期计划的清理工作、计费、邮件发送等。任何可以分解成小单位工作并并行运行的作业都可以。
2 Active Job 的目的
主要目的是确保所有 Rails 应用程序都具有就绪的作业基础设施。然后我们可以基于它构建框架功能和其他宝石,而无需担心不同作业运行程序(如 Delayed Job 和 Resque)之间的 API 差异。选择您的排队后端将成为一个更重要的操作问题。您可以随意切换它们,而无需重写作业。
Rails 默认情况下带有一个异步排队实现,它使用进程内线程池运行作业。作业将异步运行,但是,在重启时,队列中的任何作业都会被丢弃。对于小型应用程序或非关键作业,这可能没问题,但大多数生产应用程序将需要选择一个持久的后端。
3 创建和入队作业
本节将提供一个分步指南,说明如何创建一个作业并将其入队。
3.1 创建作业
Active Job 提供了一个 Rails 生成器来创建作业。以下将在 app/jobs
中创建一个作业(并在 test/jobs
下附加一个测试用例)
$ bin/rails generate job guests_cleanup
invoke test_unit
create test/jobs/guests_cleanup_job_test.rb
create app/jobs/guests_cleanup_job.rb
您还可以创建一个将在特定队列上运行的作业
$ bin/rails generate job guests_cleanup --queue urgent
如果您不想使用生成器,您可以在 app/jobs
内创建自己的文件,只需确保它继承自 ApplicationJob
即可。
以下是作业的示例
class GuestsCleanupJob < ApplicationJob
queue_as :default
def perform(*guests)
# Do something later
end
end
请注意,您可以根据需要定义 perform
带有任意多个参数。
如果您已经有一个抽象类,并且它的名称与 ApplicationJob
不同,您可以传递 --parent
选项来指示您想要使用不同的抽象类
$ bin/rails generate job process_payment --parent=payment_job
class ProcessPaymentJob < PaymentJob
queue_as :default
def perform(*args)
# Do something later
end
end
3.2 入队作业
使用 perform_later
入队一个作业,并且可选地使用 set
。例如
# Enqueue a job to be performed as soon as the queuing system is
# free.
GuestsCleanupJob.perform_later guest
# Enqueue a job to be performed tomorrow at noon.
GuestsCleanupJob.set(wait_until: Date.tomorrow.noon).perform_later(guest)
# Enqueue a job to be performed 1 week from now.
GuestsCleanupJob.set(wait: 1.week).perform_later(guest)
# `perform_now` and `perform_later` will call `perform` under the hood so
# you can pass as many arguments as defined in the latter.
GuestsCleanupJob.perform_later(guest1, guest2, filter: "some_filter")
就是这样!
3.3 批量入队作业
您可以使用 perform_all_later
一次入队多个作业。有关更多详细信息,请参见 批量入队。
4 作业执行
为了在生产环境中入队和执行作业,您需要设置一个排队后端,也就是说,您需要决定使用 Rails 使用哪个第三方排队库。Rails 本身只提供了一个进程内排队系统,它只将作业保存在内存中。如果进程崩溃或机器重置,则所有未完成的作业都会随着默认的异步后端丢失。对于小型应用程序或非关键作业,这可能没问题,但大多数生产应用程序将需要选择一个持久的后端。
4.1 后端
Active Job 为多个排队后端(Sidekiq、Resque、Delayed Job 等)内置了适配器。要获取最新的适配器列表,请参见 ActiveJob::QueueAdapters
的 API 文档。
4.2 设置后端
您可以使用 config.active_job.queue_adapter
轻松设置您的排队后端
# config/application.rb
module YourApp
class Application < Rails::Application
# Be sure to have the adapter's gem in your Gemfile
# and follow the adapter's specific installation
# and deployment instructions.
config.active_job.queue_adapter = :sidekiq
end
end
您也可以在每个作业的基础上配置您的后端
class GuestsCleanupJob < ApplicationJob
self.queue_adapter = :resque
# ...
end
# Now your job will use `resque` as its backend queue adapter, overriding what
# was configured in `config.active_job.queue_adapter`.
4.3 启动后端
由于作业与 Rails 应用程序并行运行,大多数排队库要求您启动一个库特定的排队服务(除了启动 Rails 应用程序之外),以便作业处理正常工作。请参阅库文档,了解有关启动您的队列后端的说明。
以下是一个非详尽的文档列表
5 队列
大多数适配器支持多个队列。使用 Active Job,您可以使用 queue_as
将作业安排到特定队列上运行
class GuestsCleanupJob < ApplicationJob
queue_as :low_priority
# ...
end
您可以在 application.rb
中使用 config.active_job.queue_name_prefix
为所有作业设置队列名称前缀
# config/application.rb
module YourApp
class Application < Rails::Application
config.active_job.queue_name_prefix = Rails.env
end
end
# app/jobs/guests_cleanup_job.rb
class GuestsCleanupJob < ApplicationJob
queue_as :low_priority
# ...
end
# Now your job will run on queue production_low_priority on your
# production environment and on staging_low_priority
# on your staging environment
您也可以在每个作业的基础上配置前缀。
class GuestsCleanupJob < ApplicationJob
queue_as :low_priority
self.queue_name_prefix = nil
# ...
end
# Now your job's queue won't be prefixed, overriding what
# was configured in `config.active_job.queue_name_prefix`.
默认的队列名称前缀分隔符是 '_'. 这可以通过在 application.rb
中设置 config.active_job.queue_name_delimiter
来更改
# config/application.rb
module YourApp
class Application < Rails::Application
config.active_job.queue_name_prefix = Rails.env
config.active_job.queue_name_delimiter = "."
end
end
# app/jobs/guests_cleanup_job.rb
class GuestsCleanupJob < ApplicationJob
queue_as :low_priority
# ...
end
# Now your job will run on queue production.low_priority on your
# production environment and on staging.low_priority
# on your staging environment
要从作业级别控制队列,您可以将一个代码块传递给 queue_as
。该代码块将在作业上下文中执行(因此它可以访问 self.arguments
),并且它必须返回队列名称
class ProcessVideoJob < ApplicationJob
queue_as do
video = self.arguments.first
if video.owner.premium?
:premium_videojobs
else
:videojobs
end
end
def perform(video)
# Do process video
end
end
ProcessVideoJob.perform_later(Video.last)
如果您想要更多地控制作业运行的队列,您可以将 :queue
选项传递给 set
MyJob.set(queue: :another_queue).perform_later(record)
确保您的排队后端“监听”您的队列名称。对于某些后端,您需要指定要监听的队列。
6 优先级
某些适配器在作业级别支持优先级,其中作业可以相对于队列中的其他作业或所有队列进行优先级排序。
您可以使用 queue_with_priority
将作业安排为以特定优先级运行
class GuestsCleanupJob < ApplicationJob
queue_with_priority 10
# ...
end
请注意,这对于不支持优先级的适配器没有任何效果。
与 queue_as
类似,您也可以将一个代码块传递给 queue_with_priority
以在作业上下文中进行评估
class ProcessVideoJob < ApplicationJob
queue_with_priority do
video = self.arguments.first
if video.owner.premium?
0
else
10
end
end
def perform(video)
# Process video
end
end
ProcessVideoJob.perform_later(Video.last)
您也可以将 :priority
选项传递给 set
MyJob.set(priority: 50).perform_later(record)
优先级较低的数字是否在优先级较高的数字之前或之后执行取决于适配器的实现。有关更多信息,请参阅后端文档。鼓励适配器作者将较低的数字视为更重要。
7 回调函数
Active Job 提供了钩子,可以在作业的生命周期中触发逻辑。与 Rails 中的其他回调函数一样,您可以将回调函数实现为普通方法,并使用宏样式的类方法将其注册为回调函数。
class GuestsCleanupJob < ApplicationJob
queue_as :default
around_perform :around_cleanup
def perform
# Do something later
end
private
def around_cleanup
# Do something before perform
yield
# Do something after perform
end
end
宏样式的类方法也可以接收一个块。如果块内的代码很短,可以放在一行中,请考虑使用这种样式。例如,您可以为每个排队的作业发送指标。
class ApplicationJob < ActiveJob::Base
before_enqueue { |job| $statsd.increment "#{job.class.name.underscore}.enqueue" }
end
7.1 可用回调函数
请注意,使用 perform_all_later
批量排队作业时,不会在各个作业上触发 around_enqueue
等回调函数。请参阅 批量排队回调函数。
8 批量排队
您可以使用 perform_all_later
同时排队多个作业。批量排队减少了到队列数据存储(如 Redis 或数据库)的往返次数,使其成为比单独排队相同作业更高效的操作。
perform_all_later
是 Active Job 上的顶级 API。它接受实例化的作业作为参数(请注意,这与 perform_later
不同)。perform_all_later
在内部确实调用了 perform
。传递给 new
的参数将在最终调用 perform
时传递给它。
以下是如何使用 GuestCleanupJob
实例调用 perform_all_later
的示例。
# Create jobs to pass to `perform_all_later`.
# The arguments to `new` are passed on to `perform`
guest_cleanup_jobs = Guest.all.map { |guest| GuestsCleanupJob.new(guest) }
# Will enqueue a separate job for each instance of `GuestCleanupJob`
ActiveJob.perform_all_later(guest_cleanup_jobs)
# Can also use `set` method to configure options before bulk enqueuing jobs.
guest_cleanup_jobs = Guest.all.map { |guest| GuestsCleanupJob.new(guest).set(wait: 1.day) }
ActiveJob.perform_all_later(guest_cleanup_jobs)
perform_all_later
记录了成功排队的作业数量,例如,如果上面的 Guest.all.map
导致了 3 个 guest_cleanup_jobs
,它将记录 排队了 3 个作业到 Async(3 个 GuestsCleanupJob)
(假设所有作业都被排队)。
perform_all_later
的返回值为 nil
。请注意,这与返回排队作业类实例的 perform_later
不同。
8.1 排队多个 Active Job 类
使用 perform_all_later
,也可以在同一个调用中排队不同的 Active Job 类实例。例如。
class ExportDataJob < ApplicationJob
def perform(*args)
# Export data
end
end
class NotifyGuestsJob < ApplicationJob
def perform(*guests)
# Email guests
end
end
# Instantiate job instances
cleanup_job = GuestsCleanupJob.new(guest)
export_job = ExportDataJob.new(data)
notify_job = NotifyGuestsJob.new(guest)
# Enqueues job instances from multiple classes at once
ActiveJob.perform_all_later(cleanup_job, export_job, notify_job)
8.2 批量排队回调函数
使用 perform_all_later
批量排队作业时,不会在各个作业上触发 around_enqueue
等回调函数。此行为与其他 Active Record 批量方法一致。由于回调函数在各个作业上运行,因此它们无法利用此方法的批量特性。
但是,perform_all_later
方法确实会触发一个 enqueue_all.active_job
事件,您可以使用 ActiveSupport::Notifications
订阅该事件。
方法 successfully_enqueued?
可用于确定是否成功排队了给定的作业。
8.3 队列后端支持
对于 perform_all_later
,批量排队需要由 队列后端 支持。
例如,Sidekiq 有一个 push_bulk
方法,它可以将大量作业推送到 Redis 并防止往返网络延迟。GoodJob 也支持使用 GoodJob::Bulk.enqueue
方法进行批量排队。新的队列后端 Solid Queue
也添加了对批量排队的支持。
如果队列后端 *不支持* 批量排队,perform_all_later
将逐个排队作业。
9 Action Mailer
现代 Web 应用程序中最常见的作业之一是在请求-响应周期之外发送电子邮件,这样用户就不必等待它。Active Job 与 Action Mailer 集成在一起,因此您可以轻松地异步发送电子邮件。
# If you want to send the email now use #deliver_now
UserMailer.welcome(@user).deliver_now
# If you want to send the email through Active Job use #deliver_later
UserMailer.welcome(@user).deliver_later
从 Rake 任务(例如,使用 .deliver_later
发送电子邮件)使用异步队列通常不起作用,因为 Rake 很可能会结束,导致进程内线程池在任何/所有 .deliver_later
电子邮件被处理之前被删除。为了避免此问题,请使用 .deliver_now
或在开发中运行持久队列。
10 国际化
每个作业都使用创建作业时设置的 I18n.locale
。如果您异步发送电子邮件,这将很有用。
I18n.locale = :eo
UserMailer.welcome(@user).deliver_later # Email will be localized to Esperanto.
11 支持的参数类型
ActiveJob 默认支持以下类型的参数。
- 基本类型(
NilClass
、String
、Integer
、Float
、BigDecimal
、TrueClass
、FalseClass
) 符号
日期
时间
日期时间
ActiveSupport::TimeWithZone
ActiveSupport::Duration
Hash
(键应为String
或Symbol
类型)ActiveSupport::HashWithIndifferentAccess
数组
范围
模块
类
11.1 GlobalID
Active Job 支持 GlobalID 用于参数。这使得将实时 Active Record 对象传递给您的作业成为可能,而不是传递类/ID 对,然后您必须手动反序列化它们。以前,作业看起来像这样。
class TrashableCleanupJob < ApplicationJob
def perform(trashable_class, trashable_id, depth)
trashable = trashable_class.constantize.find(trashable_id)
trashable.cleanup(depth)
end
end
现在,您可以简单地执行以下操作。
class TrashableCleanupJob < ApplicationJob
def perform(trashable, depth)
trashable.cleanup(depth)
end
end
这适用于任何混合了 GlobalID::Identification
的类,默认情况下,该类已混合到 Active Record 类中。
11.2 序列化器
您可以扩展支持的参数类型列表。您只需要定义自己的序列化器。
# app/serializers/money_serializer.rb
class MoneySerializer < ActiveJob::Serializers::ObjectSerializer
# Converts an object to a simpler representative using supported object types.
# The recommended representative is a Hash with a specific key. Keys can be of basic types only.
# You should call `super` to add the custom serializer type to the hash.
def serialize(money)
super(
"amount" => money.amount,
"currency" => money.currency
)
end
# Converts serialized value into a proper object.
def deserialize(hash)
Money.new(hash["amount"], hash["currency"])
end
private
# Checks if an argument should be serialized by this serializer.
def klass
Money
end
end
并将此序列化器添加到列表中。
# config/initializers/custom_serializers.rb
Rails.application.config.active_job.custom_serializers << MoneySerializer
请注意,在初始化期间自动加载可重新加载的代码不受支持。因此,建议设置序列化器以仅加载一次,例如,通过修改 config/application.rb
如下所示。
# config/application.rb
module YourApp
class Application < Rails::Application
config.autoload_once_paths << "#{root}/app/serializers"
end
end
12 异常
可以使用 rescue_from
处理作业执行期间引发的异常。
class GuestsCleanupJob < ApplicationJob
queue_as :default
rescue_from(ActiveRecord::RecordNotFound) do |exception|
# Do something with the exception
end
def perform
# Do something later
end
end
如果作业中的异常没有被捕获,则该作业被称为“失败”。
12.1 重试或丢弃失败的作业
失败的作业不会被重试,除非另有配置。
可以使用 retry_on
或 discard_on
分别重试或丢弃失败的作业。例如。
class RemoteServiceJob < ApplicationJob
retry_on CustomAppException # defaults to 3s wait, 5 attempts
discard_on ActiveJob::DeserializationError
def perform(*args)
# Might raise CustomAppException or ActiveJob::DeserializationError
end
end
12.2 反序列化
GlobalID 允许序列化传递给 #perform
的完整 Active Record 对象。
如果传递的记录在作业排队后但在 #perform
方法被调用之前被删除,Active Job 将引发一个 ActiveJob::DeserializationError
异常。
13 作业测试
您可以在 测试指南 中找到有关如何测试作业的详细说明。
14 调试
如果您需要帮助找出作业的来源,您可以启用 详细日志记录。