使用 Phoenix 框架开发原生 WebSocket 服务

发表于 更新于

前言

在 Phoenix 框架中,一般使用 Channel 实现和前端的双工通信,它基于 WebSocket 并支持长轮询作为后备模式。Channels 的设计已经非常好了,但偶尔情况下我们仍需要原生的 WebSocket 后端,例如为某个现成的前端/客户端产品做适配。

本文将告诉你如何用 Phoenix 做原生 WebSocket 开发。这是一篇新手向教程。过程十分简单!

例子

这里有一个蓝色的画布,实时接收从 WebSocket 服务器发来的随机座标,并以该座标为中心生成涟漪:

如果座标在前端随机,那么打开多个页面显示的涟漪会各不相同。但此处的座标来自于后端,无论你打开多少个页面它们刷新出的涟漪都完全的同步。我们将以这个需求开发与之适配的 WebSocket 后端。

实现

首先你需要新建一个或基于已存在的 Phoenix 项目,我们将基于这个项目做开发。在本文的例子中,我将项目命名为 homepage,因为本文的 WebSocket 服务由我的主页应用做支撑。

订阅者

我们需要一个 GenServer 服务来维护所有的 WebSocket 客户端连接。由于所有客户端只是单方面的获取同一个实时数据,我将其称之为订阅者。

完整代码如下:

defmodule HomepageWeb.Random.Subscribers do
  use GenServer

  def start_link(_) do
    GenServer.start_link(__MODULE__, %{list: []}, name: __MODULE__)
  end

  @impl true
  def init(init_arg) do
    {:ok, init_arg}
  end

  @doc """
  增加一个订阅者。
  """
  def add(pid) do
    GenServer.cast(__MODULE__, {:add, pid})
  end

  @doc """
  删除一个订阅者。
  """
  def remove(pid) do
    GenServer.cast(__MODULE__, {:remove, pid})
  end

  @doc """
  获取所有订阅者。
  """
  def all do
    GenServer.call(__MODULE__, :all)
  end

  @impl true
  def handle_cast({:add, pid}, state) do
    {:noreply, Map.put(state, :list, [pid | state.list])}
  end

  @impl true
  def handle_cast({:remove, pid}, state) do
    {:noreply, Map.put(state, :list, List.delete(state.list, pid))}
  end

  @impl true
  def handle_call(:all, _from, state) do
    {:reply, state.list, state}
  end
end

注意上述的每一个订阅者实际上都是进程 pid。我们将这些进程存储在一个列表中,并维护新增和删除,以便在有新数据时向所有订阅者广播。

记得把 HomepageWeb.Random.Subscribers 加入监督树,放到 Endpoint 模块的上面。

Socket

WebSocket 需实现 Phoenix.Socket.Transport 行为。以下是完整代码:

defmodule HomepageWeb.Random.Socket do
  @behaviour Phoenix.Socket.Transport

  require Logger

  alias HomepageWeb.Random.Subscribers

  @impl true
  def child_spec(_opts) do
    # We won't spawn any process, so let's ignore the child spec
    :ignore
  end

  @impl true
  def connect(state) do
    Logger.info("New connection")

    :ok = Subscribers.add(self())

    {:ok, state}
  end

  @impl true
  def init(state) do
    # Now we are effectively inside the process that maintains the socket.
    {:ok, state}
  end

  @impl true
  def handle_in({text, _opts}, state) do
    {:reply, :ok, {:text, text}, state}
  end

  @impl true
  def handle_info({:message, [x, y] = message}, state) do
    Logger.debug("Send position to #{inspect(self())}: #{inspect(message)}")

    {:reply, :ok, {:text, "[#{x}, #{y}]"}, state}
  end

  @impl true
  def handle_info(_, state) do
    {:ok, state}
  end

  @impl true
  def terminate(_reason, _state) do
    Logger.info("Connection terminated")

    :ok = Subscribers.remove(self())

    :ok
  end
end

上面几乎是一个最简化的实现,我们将新连接的进程 pid 加入到订阅者列表中,在连接终止时将其移除。同时我们添加了一个处理 {:message, message} 消息的 handle_info/2 函数,这是为了让其它进程能通过与此进程通信的方式实现 WebSocket 消息的主动发送。

HomepageWeb.Random.Socket 作为 socket 添加到 Endpoint 代码中:

defmodule HomepageWeb.Endpoint do
  use Phoenix.Endpoint, otp_app: :homepage

  # 省略此模块的绝大多数代码,保留 `/live` 提供定位:
  socket "/live", Phoenix.LiveView.Socket,
  # ...

  socket "/random", HomepageWeb.Random.Socket, websocket: true
end

这里我将我们自己的 WebSocket 放到了 LiveView 的 socket 下方。类似于路由策略,不冲突的路径顺序可以随意。

生产者

现在,我们实现最后一个模块。此模块负责生成随机座标,并将座标发送给所有订阅者。以下是完整代码:

defmodule HomepageWeb.Random.Producer do
  use GenServer

  @ratio 2

  def start_link(_) do
    GenServer.start_link(__MODULE__, %{random: [random_x(), random_y()]}, name: __MODULE__)
  end

  @impl true
  def init(init_arg) do
    schedule_next()

    {:ok, init_arg}
  end

  def schedule_next do
    Process.send_after(self(), :next, 3000)
  end

  @impl true
  def handle_info(:next, state) do
    for pid <- HomepageWeb.Random.Subscribers.all() do
      send(pid, {:message, state.random})
    end

    schedule_next()

    {:noreply, %{random: [random_x(), random_y()]}}
  end

  def random_x, do: :rand.uniform(1000)
  def random_y, do: :rand.uniform(trunc(1000 / @ratio))
end

上面这个 GenServer 服务会自主运行,每三秒推送一次坐标,同时生成下一个随机座标。

记得把 HomepageWeb.Random.Producer 加入监督树,放在 HomepageWeb.Random.Subscribers 模块的下面。

现在可以启动 Phoenix 应用程序了!我们可以用命令行工具 websocat 来测试,路径是 /random/websocket。然后你会发现所有的新连接收到的最新座标数据都是一样的,同步的。

如果你也要基于此 WebSocket 制作动画,切记让画布(或视图容器)保持和代码中相同的长宽比例。

总结

重点是要理解 WebSocket 在 Phoenix 中的抽象,每一个进程维护一个连接。有新连接时,进程被创建断开时进程被销毁,我们仅需实现对应的回调即可。

还有就是我们只能通过返回 {:reply, :ok, {:text, message}, state} 的方式发消息,但由于每一个连接就是一个进程,我们向代表该连接的进程发消息,然后进程处理我们消息并将其返回便可实现向客户端的主动发送。

结束语

这就是 Phoenix 开发原生 WebSocket 后端代码的教程了。顺带一提此页面的动画使用 p5.js 制作,我喜欢有创意的东西,如果你也一样可参考此文了解和学习。

加入我们

如果你也是 Elixir 开发者/爱好者,这里有一些我创建的群组:

添加 QQ 群时请填写来源为“博客”。注意请不要灌水,谢谢。

作者头像 一点点入门知识 打赏作者
本文由作者按照 CC BY 4.0 进行授权
分享:

相关文章