1 什么是 Action Cable?
Action Cable 将 WebSockets 无缝地整合到您的 Rails 应用程序的其余部分。它允许您以与 Rails 应用程序其余部分相同的风格和形式用 Ruby 编写实时功能,同时保持高性能和可扩展性。它是一个全栈产品,提供客户端 JavaScript 框架和服务器端 Ruby 框架。您可以访问使用 Active Record 或您选择的 ORM 编写的整个域模型。
2 术语
Action Cable 使用 WebSockets 而不是 HTTP 请求-响应协议。Action Cable 和 WebSockets 都引入了一些不太熟悉的术语。
2.1 连接
连接 是客户端-服务器关系的基础。单个 Action Cable 服务器可以处理多个连接实例。它每个 WebSocket 连接都有一个连接实例。如果用户使用多个浏览器选项卡或设备,则单个用户可能对您的应用程序打开了多个 WebSockets。
2.2 消费者
WebSocket 连接的客户端称为消费者。在 Action Cable 中,消费者是由客户端 JavaScript 框架创建的。
2.3 频道
每个消费者都可以依次订阅多个频道。每个频道封装一个逻辑工作单元,类似于典型 MVC 设置中的控制器。例如,您可以拥有一个 ChatChannel
和一个 AppearancesChannel
,而消费者可以订阅其中一个或两个频道。至少,消费者应该订阅一个频道。
2.4 订阅者
当消费者订阅一个频道时,他们充当订阅者。订阅者与频道之间的连接,毫不意外地称为订阅。消费者可以多次充当给定频道的订阅者。例如,消费者可以同时订阅多个聊天室。(请记住,一个物理用户可能拥有多个消费者,每个选项卡/设备对您的连接打开一个)。
2.5 发布/订阅
发布/订阅 或 发布-订阅 是指一种消息队列范式,其中信息发送者(发布者)将数据发送到抽象的接收者类别(订阅者),而无需指定单个接收者。Action Cable 使用这种方法在服务器和多个客户端之间进行通信。
2.6 广播
广播是一种发布/订阅链接,其中广播者发送的任何内容都会直接发送到正在流式传输该广播名称的频道订阅者。每个频道可以流式传输零个或多个广播。
3 服务器端组件
3.1 连接
对于服务器接受的每个 WebSocket,都会实例化一个连接对象。此对象成为从此创建的所有频道订阅的父级。连接本身除了身份验证和授权之外,不处理任何特定应用程序逻辑。WebSocket 连接的客户端称为连接消费者。每个用户将为他们打开的每个浏览器选项卡、窗口或设备创建一个消费者-连接对。
连接是 ApplicationCable::Connection
的实例,它扩展了 ActionCable::Connection::Base
。在 ApplicationCable::Connection
中,您可以授权传入连接,并在用户可以识别的情况下继续建立连接。
3.1.1 连接设置
# app/channels/application_cable/connection.rb
module ApplicationCable
class Connection < ActionCable::Connection::Base
identified_by :current_user
def connect
self.current_user = find_verified_user
end
private
def find_verified_user
if verified_user = User.find_by(id: cookies.encrypted[:user_id])
verified_user
else
reject_unauthorized_connection
end
end
end
end
这里 identified_by
指定了一个连接标识符,该标识符可用于稍后查找特定连接。请注意,任何标记为标识符的内容都将在从连接创建的任何频道实例上自动创建一个同名代理。
此示例依赖于以下事实:您已在应用程序的其他地方处理了用户的身份验证,并且成功的身份验证会设置一个带有用户 ID 的加密 cookie。
尝试新连接时,cookie 会自动发送到连接实例,您使用它来设置 current_user
。通过使用此相同的当前用户标识连接,您还可以确保您可以稍后检索所有由给定用户打开的连接(如果用户被删除或未授权,则可能断开所有这些连接)。
如果您的身份验证方法包括使用会话,并且您使用 cookie 存储会话,并且您的会话 cookie 名称为 _session
,并且用户 ID 密钥为 user_id
,则可以使用此方法
verified_user = User.find_by(id: cookies.encrypted["_session"]["user_id"])
3.1.2 异常处理
默认情况下,未捕获的异常会被捕获并记录到 Rails 的日志记录器中。如果您想全局拦截这些异常并将它们报告给外部错误跟踪服务(例如),您可以使用 rescue_from
# app/channels/application_cable/connection.rb
module ApplicationCable
class Connection < ActionCable::Connection::Base
rescue_from StandardError, with: :report_error
private
def report_error(e)
SomeExternalBugtrackingService.notify(e)
end
end
end
3.1.3 连接回调
ActionCable::Connection::Callbacks
提供了在将命令发送到客户端时调用的回调挂钩,例如订阅、取消订阅或执行操作时
3.2 频道
频道 封装了一个逻辑工作单元,类似于典型 MVC 设置中的控制器。默认情况下,Rails 会创建一个父 ApplicationCable::Channel
类(它扩展了 ActionCable::Channel::Base
),用于封装频道之间的共享逻辑,当您第一次使用频道生成器时。
3.2.1 父频道设置
# app/channels/application_cable/channel.rb
module ApplicationCable
class Channel < ActionCable::Channel::Base
end
end
您自己的频道类可以像以下示例一样
# app/channels/chat_channel.rb
class ChatChannel < ApplicationCable::Channel
end
# app/channels/appearance_channel.rb
class AppearanceChannel < ApplicationCable::Channel
end
然后,消费者可以订阅这两个频道中的任何一个或两个。
3.2.2 订阅
消费者订阅频道,充当订阅者。他们的连接称为订阅。然后根据频道消费者发送的标识符将产生的消息路由到这些频道订阅。
# app/channels/chat_channel.rb
class ChatChannel < ApplicationCable::Channel
# Called when the consumer has successfully
# become a subscriber to this channel.
def subscribed
end
end
3.2.3 异常处理
与 ApplicationCable::Connection
一样,您也可以在特定频道上使用 rescue_from
来处理引发的异常
# app/channels/chat_channel.rb
class ChatChannel < ApplicationCable::Channel
rescue_from "MyError", with: :deliver_error_message
private
def deliver_error_message(e)
# broadcast_to(...)
end
end
3.2.4 频道回调
ActionCable::Channel::Callbacks
提供了在频道生命周期中调用的回调挂钩
before_subscribe
after_subscribe
(别名为on_subscribe
)before_unsubscribe
after_unsubscribe
(别名为on_unsubscribe
)
4 客户端组件
4.1 连接
消费者需要在其端点上拥有一个连接实例。这可以使用以下 JavaScript 来建立,默认情况下由 Rails 生成
4.1.1 连接消费者
// app/javascript/channels/consumer.js
// Action Cable provides the framework to deal with WebSockets in Rails.
// You can generate new channels where WebSocket features live using the `bin/rails generate channel` command.
import { createConsumer } from "@rails/actioncable"
export default createConsumer()
这将准备一个消费者,该消费者将在默认情况下连接到服务器上的 /cable
。在您也指定了您有兴趣订阅的至少一个订阅之前,连接将不会建立。
消费者可以选择接受一个参数,该参数指定要连接到的 URL。这可以是一个字符串,也可以是一个在打开 WebSocket 时调用的返回字符串的函数。
// Specify a different URL to connect to
createConsumer('wss://example.com/cable')
// Or when using websockets over HTTP
createConsumer('https://ws.example.com/cable')
// Use a function to dynamically generate the URL
createConsumer(getWebSocketURL)
function getWebSocketURL() {
const token = localStorage.get('auth-token')
return `wss://example.com/cable?token=${token}`
}
4.1.2 订阅者
消费者通过创建对给定频道的订阅而成为订阅者
// app/javascript/channels/chat_channel.js
import consumer from "./consumer"
consumer.subscriptions.create({ channel: "ChatChannel", room: "Best Room" })
// app/javascript/channels/appearance_channel.js
import consumer from "./consumer"
consumer.subscriptions.create({ channel: "AppearanceChannel" })
虽然这会创建订阅,但响应接收数据的必要功能将在稍后介绍。
消费者可以多次充当给定通道的订阅者。例如,消费者可以同时订阅多个聊天室。
// app/javascript/channels/chat_channel.js
import consumer from "./consumer"
consumer.subscriptions.create({ channel: "ChatChannel", room: "1st Room" })
consumer.subscriptions.create({ channel: "ChatChannel", room: "2nd Room" })
5 客户端-服务器交互
5.1 流
流提供了一种机制,通过该机制,通道将发布的内容(广播)路由到其订阅者。例如,以下代码使用 stream_from
订阅名为 chat_Best Room
的广播,当 :room
参数的值为 "Best Room"
时。
# app/channels/chat_channel.rb
class ChatChannel < ApplicationCable::Channel
def subscribed
stream_from "chat_#{params[:room]}"
end
end
然后,在 Rails 应用程序的其他地方,可以通过调用 broadcast
广播到这样的房间。
ActionCable.server.broadcast("chat_Best Room", { body: "This Room is Best Room." })
如果您有一个与模型相关的流,那么广播名称可以从通道和模型生成。例如,以下代码使用 stream_for
订阅像 posts:Z2lkOi8vVGVzdEFwcC9Qb3N0LzE
这样的广播,其中 Z2lkOi8vVGVzdEFwcC9Qb3N0LzE
是 Post 模型的 GlobalID。
class PostsChannel < ApplicationCable::Channel
def subscribed
post = Post.find(params[:id])
stream_for post
end
end
然后,可以通过调用 broadcast_to
广播到此通道。
PostsChannel.broadcast_to(@post, @comment)
5.2 广播
广播是一种发布/订阅链接,其中发布者传输的任何内容都会直接路由到正在流式传输该命名广播的通道订阅者。每个通道可以流式传输零个或多个广播。
广播纯粹是一个在线队列并且是时间相关的。如果消费者没有流式传输(订阅给定通道),那么如果他们稍后连接,他们将不会收到广播。
5.3 订阅
当消费者订阅通道时,他们充当订阅者。这种连接称为订阅。然后,根据电缆消费者发送的标识符,将传入消息路由到这些通道订阅。
// app/javascript/channels/chat_channel.js
import consumer from "./consumer"
consumer.subscriptions.create({ channel: "ChatChannel", room: "Best Room" }, {
received(data) {
this.appendLine(data)
},
appendLine(data) {
const html = this.createLine(data)
const element = document.querySelector("[data-chat-room='Best Room']")
element.insertAdjacentHTML("beforeend", html)
},
createLine(data) {
return `
<article class="chat-line">
<span class="speaker">${data["sent_by"]}</span>
<span class="body">${data["body"]}</span>
</article>
`
}
})
5.4 将参数传递给通道
在创建订阅时,您可以将参数从客户端传递到服务器端。例如
# app/channels/chat_channel.rb
class ChatChannel < ApplicationCable::Channel
def subscribed
stream_from "chat_#{params[:room]}"
end
end
传递给 subscriptions.create
作为第一个参数的对象成为电缆通道中的 params 哈希。关键字 channel
是必需的
// app/javascript/channels/chat_channel.js
import consumer from "./consumer"
consumer.subscriptions.create({ channel: "ChatChannel", room: "Best Room" }, {
received(data) {
this.appendLine(data)
},
appendLine(data) {
const html = this.createLine(data)
const element = document.querySelector("[data-chat-room='Best Room']")
element.insertAdjacentHTML("beforeend", html)
},
createLine(data) {
return `
<article class="chat-line">
<span class="speaker">${data["sent_by"]}</span>
<span class="body">${data["body"]}</span>
</article>
`
}
})
# Somewhere in your app this is called, perhaps
# from a NewCommentJob.
ActionCable.server.broadcast(
"chat_#{room}",
{
sent_by: "Paul",
body: "This is a cool chat app."
}
)
5.5 重新广播消息
一个常见的用例是将一个客户端发送的消息重新广播到任何其他连接的客户端。
# app/channels/chat_channel.rb
class ChatChannel < ApplicationCable::Channel
def subscribed
stream_from "chat_#{params[:room]}"
end
def receive(data)
ActionCable.server.broadcast("chat_#{params[:room]}", data)
end
end
// app/javascript/channels/chat_channel.js
import consumer from "./consumer"
const chatChannel = consumer.subscriptions.create({ channel: "ChatChannel", room: "Best Room" }, {
received(data) {
// data => { sent_by: "Paul", body: "This is a cool chat app." }
}
})
chatChannel.send({ sent_by: "Paul", body: "This is a cool chat app." })
重新广播将被所有连接的客户端接收,包括发送消息的客户端。请注意,参数与您订阅通道时相同。
6 全栈示例
以下设置步骤对于两个示例都适用
6.1 示例 1:用户出现
这是一个简单的示例,展示了一个跟踪用户是否在线以及他们所处页面的通道。(这对于创建诸如在用户在线时在用户名旁边显示一个绿色圆点之类的存在功能很有用)。
创建服务器端外观通道
# app/channels/appearance_channel.rb
class AppearanceChannel < ApplicationCable::Channel
def subscribed
current_user.appear
end
def unsubscribed
current_user.disappear
end
def appear(data)
current_user.appear(on: data["appearing_on"])
end
def away
current_user.away
end
end
当订阅启动时,subscribed
回调会被触发,我们借此机会说“当前用户确实出现了”。该出现/消失 API 可以由 Redis、数据库或其他任何东西支持。
创建客户端外观通道订阅
// app/javascript/channels/appearance_channel.js
import consumer from "./consumer"
consumer.subscriptions.create("AppearanceChannel", {
// Called once when the subscription is created.
initialized() {
this.update = this.update.bind(this)
},
// Called when the subscription is ready for use on the server.
connected() {
this.install()
this.update()
},
// Called when the WebSocket connection is closed.
disconnected() {
this.uninstall()
},
// Called when the subscription is rejected by the server.
rejected() {
this.uninstall()
},
update() {
this.documentIsActive ? this.appear() : this.away()
},
appear() {
// Calls `AppearanceChannel#appear(data)` on the server.
this.perform("appear", { appearing_on: this.appearingOn })
},
away() {
// Calls `AppearanceChannel#away` on the server.
this.perform("away")
},
install() {
window.addEventListener("focus", this.update)
window.addEventListener("blur", this.update)
document.addEventListener("turbo:load", this.update)
document.addEventListener("visibilitychange", this.update)
},
uninstall() {
window.removeEventListener("focus", this.update)
window.removeEventListener("blur", this.update)
document.removeEventListener("turbo:load", this.update)
document.removeEventListener("visibilitychange", this.update)
},
get documentIsActive() {
return document.visibilityState === "visible" && document.hasFocus()
},
get appearingOn() {
const element = document.querySelector("[data-appearing-on]")
return element ? element.getAttribute("data-appearing-on") : null
}
})
6.1.1 客户端-服务器交互
客户端通过
createConsumer()
连接到 服务器。(consumer.js
)。服务器通过current_user
识别此连接。客户端通过
consumer.subscriptions.create({ channel: "AppearanceChannel" })
订阅外观通道。(appearance_channel.js
)服务器识别到外观通道已启动新的订阅,并运行其
subscribed
回调,在current_user
上调用appear
方法。(appearance_channel.rb
)客户端识别到已建立订阅,并调用
connected
(appearance_channel.js
),这反过来又调用install
和appear
。appear
在服务器上调用AppearanceChannel#appear(data)
,并提供一个数据哈希{ appearing_on: this.appearingOn }
。这是可能的,因为服务器端通道实例会自动公开类上声明的所有公共方法(减去回调),以便可以通过订阅的perform
方法以远程过程调用的形式访问这些方法。服务器接收通过
current_user
(appearance_channel.rb
)识别的连接对外观通道上的appear
操作的请求。服务器从数据哈希中检索具有:appearing_on
键的数据,并将其设置为传递给current_user.appear
的:on
键的值。
6.2 示例 2:接收新的网络通知
外观示例都是关于通过 WebSocket 连接将服务器功能公开给客户端调用。但是,WebSocket 的好处是它是一条双向通道。所以,现在,让我们展示一个服务器在客户端上调用操作的示例。
这是一个网络通知通道,它允许您在广播到相关流时触发客户端的网络通知。
创建服务器端网络通知通道
# app/channels/web_notifications_channel.rb
class WebNotificationsChannel < ApplicationCable::Channel
def subscribed
stream_for current_user
end
end
创建客户端网络通知通道订阅
// app/javascript/channels/web_notifications_channel.js
// Client-side which assumes you've already requested
// the right to send web notifications.
import consumer from "./consumer"
consumer.subscriptions.create("WebNotificationsChannel", {
received(data) {
new Notification(data["title"], { body: data["body"] })
}
})
将内容广播到应用程序其他地方的网络通知通道实例
# Somewhere in your app this is called, perhaps from a NewCommentJob
WebNotificationsChannel.broadcast_to(
current_user,
title: "New things!",
body: "All the news fit to print"
)
WebNotificationsChannel.broadcast_to
调用将消息放置在当前订阅适配器的发布/订阅队列中,每个用户都使用单独的广播名称。对于 ID 为 1 的用户,广播名称将为 web_notifications:1
。
通过调用 received
回调,该通道已收到指示,将到达 web_notifications:1
的所有内容直接流式传输到客户端。作为参数传递的数据是作为服务器端广播调用的第二个参数发送的哈希,它被 JSON 编码以进行跨线传输,并被解压缩以作为 received
接收的数据参数。
6.3 更完整的示例
有关如何在 Rails 应用程序中设置 Action Cable 以及添加通道的完整示例,请参阅 rails/actioncable-examples 存储库。
7 配置
Action Cable 有两个必需的配置:订阅适配器和允许的请求来源。
7.1 订阅适配器
默认情况下,Action Cable 在 config/cable.yml
中查找配置文件。该文件必须为每个 Rails 环境指定一个适配器。有关适配器的更多信息,请参阅 依赖项 部分。
development:
adapter: async
test:
adapter: test
production:
adapter: redis
url: redis://10.10.3.153:6381
channel_prefix: appname_production
7.1.1 适配器配置
以下是可供最终用户使用的订阅适配器列表。
7.1.1.1 异步适配器
异步适配器适用于开发/测试,不应在生产环境中使用。
7.1.1.2 Redis 适配器
Redis 适配器要求用户提供指向 Redis 服务器的 URL。此外,可以提供 channel_prefix
以避免在将同一 Redis 服务器用于多个应用程序时发生通道名称冲突。有关更多详细信息,请参阅 Redis 发布/订阅文档。
Redis 适配器还支持 SSL/TLS 连接。所需的 SSL/TLS 参数可以在配置 YAML 文件的 ssl_params
键中传递。
production:
adapter: redis
url: rediss://10.10.3.153:tls_port
channel_prefix: appname_production
ssl_params:
ca_file: "/path/to/ca.crt"
传递给 ssl_params
的选项将直接传递给 OpenSSL::SSL::SSLContext#set_params
方法,并且可以是 SSL 上下文的任何有效属性。有关其他可用属性,请参阅 OpenSSL::SSL::SSLContext 文档。
如果您在防火墙后面使用自签名证书用于 Redis 适配器,并选择跳过证书检查,那么 ssl verify_mode
应设置为 OpenSSL::SSL::VERIFY_NONE
。
除非您完全了解安全隐患,否则不建议在生产环境中使用 VERIFY_NONE
。为了为 Redis 适配器设置此选项,配置应为 ssl_params: { verify_mode: <%= OpenSSL::SSL::VERIFY_NONE %> }
。
7.1.1.3 PostgreSQL 适配器
PostgreSQL 适配器使用 Active Record 的连接池,因此应用程序的 config/database.yml
数据库配置用于其连接。这在将来可能会改变。 #27214
PostgreSQL 对 NOTIFY
(用于发送通知的幕后命令)有 8000 字节限制,这在处理大型有效负载时可能是一个约束。
7.2 允许的请求来源
Action Cable 只会接受来自指定来源的请求,这些来源作为数组传递到服务器配置。来源可以是字符串或正则表达式的实例,将针对它们执行匹配检查。
config.action_cable.allowed_request_origins = ["https://rubyonrails.com", %r{http://ruby.*}]
要禁用并允许来自任何来源的请求
config.action_cable.disable_request_forgery_protection = true
默认情况下,Action Cable 在开发环境中运行时允许来自 localhost:3000 的所有请求。
7.3 消费者配置
要配置 URL,请在 HTML 布局 HEAD 中添加对 action_cable_meta_tag
的调用。这使用一个 URL 或路径,通常通过环境配置文件中的 config.action_cable.url
设置。
7.4 工作池配置
工作池用于独立于服务器主线程运行连接回调和通道操作。Action Cable 允许应用程序配置工作池中同时处理的线程数量。
config.action_cable.worker_pool_size = 4
此外,请注意,您的服务器必须提供至少与工作线程数量相同的数据库连接。默认的工作池大小设置为 4,这意味着您必须提供至少 4 个数据库连接。您可以在 config/database.yml
中通过 pool
属性更改它。
7.5 客户端日志记录
客户端日志记录默认情况下处于禁用状态。您可以通过将 ActionCable.logger.enabled
设置为 true 来启用它。
import * as ActionCable from '@rails/actioncable'
ActionCable.logger.enabled = true
7.6 其他配置
另一个常见的配置选项是应用于每个连接日志记录器的日志标签。以下是一个示例,它使用用户帐户 ID(如果可用),否则使用“no-account”进行标记
config.action_cable.log_tags = [
-> request { request.env["user_account_id"] || "no-account" },
:action_cable,
-> request { request.uuid }
]
有关所有配置选项的完整列表,请参阅 ActionCable::Server::Configuration
类。
8 运行独立电缆服务器
Action Cable 可以作为 Rails 应用程序的一部分运行,也可以作为独立服务器运行。在开发中,作为 Rails 应用程序的一部分运行通常是可以的,但在生产环境中,您应该将其作为独立服务器运行。
8.1 在应用程序中
Action Cable 可以与您的 Rails 应用程序一起运行。例如,要监听 /websocket
上的 WebSocket 请求,请将该路径指定到 config.action_cable.mount_path
# config/application.rb
class Application < Rails::Application
config.action_cable.mount_path = "/websocket"
end
如果在布局中调用了action_cable_meta_tag
,你可以使用ActionCable.createConsumer()
连接到电缆服务器。否则,将路径作为第一个参数指定给createConsumer
(例如:ActionCable.createConsumer("/websocket")
)。
对于创建的每个服务器实例,以及服务器生成的每个 worker,您还将拥有一个新的 Action Cable 实例,但 Redis 或 PostgreSQL 适配器会将消息同步到所有连接。
8.2 独立
电缆服务器可以与您的正常应用程序服务器分离。它仍然是一个 Rack 应用程序,但它是自己的 Rack 应用程序。推荐的基本设置如下
# cable/config.ru
require_relative "../config/environment"
Rails.application.eager_load!
run ActionCable.server
然后启动服务器
$ bundle exec puma -p 28080 cable/config.ru
这将在端口 28080 上启动一个电缆服务器。要告诉 Rails 使用此服务器,请更新您的配置
# config/environments/development.rb
Rails.application.configure do
config.action_cable.mount_path = nil
config.action_cable.url = "ws://127.0.0.1:28080" # use wss:// in production
end
最后,确保您已正确配置了消费者.
8.3 注意
WebSocket 服务器无法访问会话,但可以访问 cookie。当您需要处理身份验证时,这很有用。您可以在此文章中看到使用 Devise 的一种方法。
9 依赖项
Action Cable 提供了一个订阅适配器接口来处理其 pubsub 内部机制。默认情况下,包含异步、内联、PostgreSQL 和 Redis 适配器。新 Rails 应用程序中的默认适配器是异步 (async
) 适配器。
Ruby 部分构建在websocket-driver、nio4r 和 concurrent-ruby 之上。
10 部署
Action Cable 由 WebSockets 和线程的组合提供支持。框架管道和用户指定的通道工作都在内部通过利用 Ruby 的原生线程支持来处理。这意味着您可以毫无问题地使用所有现有的 Rails 模型,只要您没有犯任何线程安全错误。
Action Cable 服务器实现了 Rack 套接字劫持 API,从而允许使用多线程模式来内部管理连接,而与应用程序服务器是多线程还是单线程无关。
因此,Action Cable 可与 Unicorn、Puma 和 Passenger 等流行服务器一起使用。
11 测试
您可以在测试指南中找到有关如何测试您的 Action Cable 功能的详细说明。