更多内容请访问 rubyonrails.org:

Active Job 基础

本指南为您提供了创建、入队和执行后台作业所需的一切。

阅读完本指南后,您将了解

  • 如何创建作业。
  • 如何入队作业。
  • 如何在后台运行作业。
  • 如何从您的应用程序异步发送电子邮件。

1 什么是 Active Job?

Active Job 是一个用于声明作业并在各种排队后端上运行它们的框架。这些作业可以是定期计划的清理工作、计费、邮件发送等。任何可以分解成小单位工作并并行运行的作业都可以。

2 Active Job 的目的

主要目的是确保所有 Rails 应用程序都具有就绪的作业基础设施。然后我们可以基于它构建框架功能和其他宝石,而无需担心不同作业运行程序(如 Delayed Job 和 Resque)之间的 API 差异。选择您的排队后端将成为一个更重要的操作问题。您可以随意切换它们,而无需重写作业。

Rails 默认情况下带有一个异步排队实现,它使用进程内线程池运行作业。作业将异步运行,但是,在重启时,队列中的任何作业都会被丢弃。对于小型应用程序或非关键作业,这可能没问题,但大多数生产应用程序将需要选择一个持久的后端。

3 创建和入队作业

本节将提供一个分步指南,说明如何创建一个作业并将其入队。

3.1 创建作业

Active Job 提供了一个 Rails 生成器来创建作业。以下将在 app/jobs 中创建一个作业(并在 test/jobs 下附加一个测试用例)

$ bin/rails generate job guests_cleanup
invoke  test_unit
create    test/jobs/guests_cleanup_job_test.rb
create  app/jobs/guests_cleanup_job.rb

您还可以创建一个将在特定队列上运行的作业

$ bin/rails generate job guests_cleanup --queue urgent

如果您不想使用生成器,您可以在 app/jobs 内创建自己的文件,只需确保它继承自 ApplicationJob 即可。

以下是作业的示例

class GuestsCleanupJob < ApplicationJob
  queue_as :default

  def perform(*guests)
    # Do something later
  end
end

请注意,您可以根据需要定义 perform 带有任意多个参数。

如果您已经有一个抽象类,并且它的名称与 ApplicationJob 不同,您可以传递 --parent 选项来指示您想要使用不同的抽象类

$ bin/rails generate job process_payment --parent=payment_job
class ProcessPaymentJob < PaymentJob
  queue_as :default

  def perform(*args)
    # Do something later
  end
end

3.2 入队作业

使用 perform_later 入队一个作业,并且可选地使用 set。例如

# Enqueue a job to be performed as soon as the queuing system is
# free.
GuestsCleanupJob.perform_later guest
# Enqueue a job to be performed tomorrow at noon.
GuestsCleanupJob.set(wait_until: Date.tomorrow.noon).perform_later(guest)
# Enqueue a job to be performed 1 week from now.
GuestsCleanupJob.set(wait: 1.week).perform_later(guest)
# `perform_now` and `perform_later` will call `perform` under the hood so
# you can pass as many arguments as defined in the latter.
GuestsCleanupJob.perform_later(guest1, guest2, filter: "some_filter")

就是这样!

3.3 批量入队作业

您可以使用 perform_all_later 一次入队多个作业。有关更多详细信息,请参见 批量入队

4 作业执行

为了在生产环境中入队和执行作业,您需要设置一个排队后端,也就是说,您需要决定使用 Rails 使用哪个第三方排队库。Rails 本身只提供了一个进程内排队系统,它只将作业保存在内存中。如果进程崩溃或机器重置,则所有未完成的作业都会随着默认的异步后端丢失。对于小型应用程序或非关键作业,这可能没问题,但大多数生产应用程序将需要选择一个持久的后端。

4.1 后端

Active Job 为多个排队后端(Sidekiq、Resque、Delayed Job 等)内置了适配器。要获取最新的适配器列表,请参见 ActiveJob::QueueAdapters 的 API 文档。

4.2 设置后端

您可以使用 config.active_job.queue_adapter 轻松设置您的排队后端

# config/application.rb
module YourApp
  class Application < Rails::Application
    # Be sure to have the adapter's gem in your Gemfile
    # and follow the adapter's specific installation
    # and deployment instructions.
    config.active_job.queue_adapter = :sidekiq
  end
end

您也可以在每个作业的基础上配置您的后端

class GuestsCleanupJob < ApplicationJob
  self.queue_adapter = :resque
  # ...
end

# Now your job will use `resque` as its backend queue adapter, overriding what
# was configured in `config.active_job.queue_adapter`.

4.3 启动后端

由于作业与 Rails 应用程序并行运行,大多数排队库要求您启动一个库特定的排队服务(除了启动 Rails 应用程序之外),以便作业处理正常工作。请参阅库文档,了解有关启动您的队列后端的说明。

以下是一个非详尽的文档列表

5 队列

大多数适配器支持多个队列。使用 Active Job,您可以使用 queue_as 将作业安排到特定队列上运行

class GuestsCleanupJob < ApplicationJob
  queue_as :low_priority
  # ...
end

您可以在 application.rb 中使用 config.active_job.queue_name_prefix 为所有作业设置队列名称前缀

# config/application.rb
module YourApp
  class Application < Rails::Application
    config.active_job.queue_name_prefix = Rails.env
  end
end
# app/jobs/guests_cleanup_job.rb
class GuestsCleanupJob < ApplicationJob
  queue_as :low_priority
  # ...
end

# Now your job will run on queue production_low_priority on your
# production environment and on staging_low_priority
# on your staging environment

您也可以在每个作业的基础上配置前缀。

class GuestsCleanupJob < ApplicationJob
  queue_as :low_priority
  self.queue_name_prefix = nil
  # ...
end

# Now your job's queue won't be prefixed, overriding what
# was configured in `config.active_job.queue_name_prefix`.

默认的队列名称前缀分隔符是 '_'. 这可以通过在 application.rb 中设置 config.active_job.queue_name_delimiter 来更改

# config/application.rb
module YourApp
  class Application < Rails::Application
    config.active_job.queue_name_prefix = Rails.env
    config.active_job.queue_name_delimiter = "."
  end
end
# app/jobs/guests_cleanup_job.rb
class GuestsCleanupJob < ApplicationJob
  queue_as :low_priority
  # ...
end

# Now your job will run on queue production.low_priority on your
# production environment and on staging.low_priority
# on your staging environment

要从作业级别控制队列,您可以将一个代码块传递给 queue_as。该代码块将在作业上下文中执行(因此它可以访问 self.arguments),并且它必须返回队列名称

class ProcessVideoJob < ApplicationJob
  queue_as do
    video = self.arguments.first
    if video.owner.premium?
      :premium_videojobs
    else
      :videojobs
    end
  end

  def perform(video)
    # Do process video
  end
end
ProcessVideoJob.perform_later(Video.last)

如果您想要更多地控制作业运行的队列,您可以将 :queue 选项传递给 set

MyJob.set(queue: :another_queue).perform_later(record)

确保您的排队后端“监听”您的队列名称。对于某些后端,您需要指定要监听的队列。

6 优先级

某些适配器在作业级别支持优先级,其中作业可以相对于队列中的其他作业或所有队列进行优先级排序。

您可以使用 queue_with_priority 将作业安排为以特定优先级运行

class GuestsCleanupJob < ApplicationJob
  queue_with_priority 10
  # ...
end

请注意,这对于不支持优先级的适配器没有任何效果。

queue_as 类似,您也可以将一个代码块传递给 queue_with_priority 以在作业上下文中进行评估

class ProcessVideoJob < ApplicationJob
  queue_with_priority do
    video = self.arguments.first
    if video.owner.premium?
      0
    else
      10
    end
  end

  def perform(video)
    # Process video
  end
end
ProcessVideoJob.perform_later(Video.last)

您也可以将 :priority 选项传递给 set

MyJob.set(priority: 50).perform_later(record)

优先级较低的数字是否在优先级较高的数字之前或之后执行取决于适配器的实现。有关更多信息,请参阅后端文档。鼓励适配器作者将较低的数字视为更重要。

7 回调函数

Active Job 提供了钩子,可以在作业的生命周期中触发逻辑。与 Rails 中的其他回调函数一样,您可以将回调函数实现为普通方法,并使用宏样式的类方法将其注册为回调函数。

class GuestsCleanupJob < ApplicationJob
  queue_as :default

  around_perform :around_cleanup

  def perform
    # Do something later
  end

  private
    def around_cleanup
      # Do something before perform
      yield
      # Do something after perform
    end
end

宏样式的类方法也可以接收一个块。如果块内的代码很短,可以放在一行中,请考虑使用这种样式。例如,您可以为每个排队的作业发送指标。

class ApplicationJob < ActiveJob::Base
  before_enqueue { |job| $statsd.increment "#{job.class.name.underscore}.enqueue" }
end

7.1 可用回调函数

请注意,使用 perform_all_later 批量排队作业时,不会在各个作业上触发 around_enqueue 等回调函数。请参阅 批量排队回调函数

8 批量排队

您可以使用 perform_all_later 同时排队多个作业。批量排队减少了到队列数据存储(如 Redis 或数据库)的往返次数,使其成为比单独排队相同作业更高效的操作。

perform_all_later 是 Active Job 上的顶级 API。它接受实例化的作业作为参数(请注意,这与 perform_later 不同)。perform_all_later 在内部确实调用了 perform。传递给 new 的参数将在最终调用 perform 时传递给它。

以下是如何使用 GuestCleanupJob 实例调用 perform_all_later 的示例。

# Create jobs to pass to `perform_all_later`.
# The arguments to `new` are passed on to `perform`
guest_cleanup_jobs = Guest.all.map { |guest| GuestsCleanupJob.new(guest) }

# Will enqueue a separate job for each instance of `GuestCleanupJob`
ActiveJob.perform_all_later(guest_cleanup_jobs)

# Can also use `set` method to configure options before bulk enqueuing jobs.
guest_cleanup_jobs = Guest.all.map { |guest| GuestsCleanupJob.new(guest).set(wait: 1.day) }

ActiveJob.perform_all_later(guest_cleanup_jobs)

perform_all_later 记录了成功排队的作业数量,例如,如果上面的 Guest.all.map 导致了 3 个 guest_cleanup_jobs,它将记录 排队了 3 个作业到 Async(3 个 GuestsCleanupJob)(假设所有作业都被排队)。

perform_all_later 的返回值为 nil。请注意,这与返回排队作业类实例的 perform_later 不同。

8.1 排队多个 Active Job 类

使用 perform_all_later,也可以在同一个调用中排队不同的 Active Job 类实例。例如。

class ExportDataJob < ApplicationJob
  def perform(*args)
    # Export data
  end
end

class NotifyGuestsJob < ApplicationJob
  def perform(*guests)
    # Email guests
  end
end

# Instantiate job instances
cleanup_job = GuestsCleanupJob.new(guest)
export_job = ExportDataJob.new(data)
notify_job = NotifyGuestsJob.new(guest)

# Enqueues job instances from multiple classes at once
ActiveJob.perform_all_later(cleanup_job, export_job, notify_job)

8.2 批量排队回调函数

使用 perform_all_later 批量排队作业时,不会在各个作业上触发 around_enqueue 等回调函数。此行为与其他 Active Record 批量方法一致。由于回调函数在各个作业上运行,因此它们无法利用此方法的批量特性。

但是,perform_all_later 方法确实会触发一个 enqueue_all.active_job 事件,您可以使用 ActiveSupport::Notifications 订阅该事件。

方法 successfully_enqueued? 可用于确定是否成功排队了给定的作业。

8.3 队列后端支持

对于 perform_all_later,批量排队需要由 队列后端 支持。

例如,Sidekiq 有一个 push_bulk 方法,它可以将大量作业推送到 Redis 并防止往返网络延迟。GoodJob 也支持使用 GoodJob::Bulk.enqueue 方法进行批量排队。新的队列后端 Solid Queue 也添加了对批量排队的支持。

如果队列后端 *不支持* 批量排队,perform_all_later 将逐个排队作业。

9 Action Mailer

现代 Web 应用程序中最常见的作业之一是在请求-响应周期之外发送电子邮件,这样用户就不必等待它。Active Job 与 Action Mailer 集成在一起,因此您可以轻松地异步发送电子邮件。

# If you want to send the email now use #deliver_now
UserMailer.welcome(@user).deliver_now

# If you want to send the email through Active Job use #deliver_later
UserMailer.welcome(@user).deliver_later

从 Rake 任务(例如,使用 .deliver_later 发送电子邮件)使用异步队列通常不起作用,因为 Rake 很可能会结束,导致进程内线程池在任何/所有 .deliver_later 电子邮件被处理之前被删除。为了避免此问题,请使用 .deliver_now 或在开发中运行持久队列。

10 国际化

每个作业都使用创建作业时设置的 I18n.locale。如果您异步发送电子邮件,这将很有用。

I18n.locale = :eo

UserMailer.welcome(@user).deliver_later # Email will be localized to Esperanto.

11 支持的参数类型

ActiveJob 默认支持以下类型的参数。

  • 基本类型(NilClassStringIntegerFloatBigDecimalTrueClassFalseClass
  • 符号
  • 日期
  • 时间
  • 日期时间
  • ActiveSupport::TimeWithZone
  • ActiveSupport::Duration
  • Hash(键应为 StringSymbol 类型)
  • ActiveSupport::HashWithIndifferentAccess
  • 数组
  • 范围
  • 模块

11.1 GlobalID

Active Job 支持 GlobalID 用于参数。这使得将实时 Active Record 对象传递给您的作业成为可能,而不是传递类/ID 对,然后您必须手动反序列化它们。以前,作业看起来像这样。

class TrashableCleanupJob < ApplicationJob
  def perform(trashable_class, trashable_id, depth)
    trashable = trashable_class.constantize.find(trashable_id)
    trashable.cleanup(depth)
  end
end

现在,您可以简单地执行以下操作。

class TrashableCleanupJob < ApplicationJob
  def perform(trashable, depth)
    trashable.cleanup(depth)
  end
end

这适用于任何混合了 GlobalID::Identification 的类,默认情况下,该类已混合到 Active Record 类中。

11.2 序列化器

您可以扩展支持的参数类型列表。您只需要定义自己的序列化器。

# app/serializers/money_serializer.rb
class MoneySerializer < ActiveJob::Serializers::ObjectSerializer
  # Converts an object to a simpler representative using supported object types.
  # The recommended representative is a Hash with a specific key. Keys can be of basic types only.
  # You should call `super` to add the custom serializer type to the hash.
  def serialize(money)
    super(
      "amount" => money.amount,
      "currency" => money.currency
    )
  end

  # Converts serialized value into a proper object.
  def deserialize(hash)
    Money.new(hash["amount"], hash["currency"])
  end

  private
    # Checks if an argument should be serialized by this serializer.
    def klass
      Money
    end
end

并将此序列化器添加到列表中。

# config/initializers/custom_serializers.rb
Rails.application.config.active_job.custom_serializers << MoneySerializer

请注意,在初始化期间自动加载可重新加载的代码不受支持。因此,建议设置序列化器以仅加载一次,例如,通过修改 config/application.rb 如下所示。

# config/application.rb
module YourApp
  class Application < Rails::Application
    config.autoload_once_paths << "#{root}/app/serializers"
  end
end

12 异常

可以使用 rescue_from 处理作业执行期间引发的异常。

class GuestsCleanupJob < ApplicationJob
  queue_as :default

  rescue_from(ActiveRecord::RecordNotFound) do |exception|
    # Do something with the exception
  end

  def perform
    # Do something later
  end
end

如果作业中的异常没有被捕获,则该作业被称为“失败”。

12.1 重试或丢弃失败的作业

失败的作业不会被重试,除非另有配置。

可以使用 retry_ondiscard_on 分别重试或丢弃失败的作业。例如。

class RemoteServiceJob < ApplicationJob
  retry_on CustomAppException # defaults to 3s wait, 5 attempts

  discard_on ActiveJob::DeserializationError

  def perform(*args)
    # Might raise CustomAppException or ActiveJob::DeserializationError
  end
end

12.2 反序列化

GlobalID 允许序列化传递给 #perform 的完整 Active Record 对象。

如果传递的记录在作业排队后但在 #perform 方法被调用之前被删除,Active Job 将引发一个 ActiveJob::DeserializationError 异常。

13 作业测试

您可以在 测试指南 中找到有关如何测试作业的详细说明。

14 调试

如果您需要帮助找出作业的来源,您可以启用 详细日志记录



返回顶部