有状态后台任务并发库 Honeycomb 介绍

发表于 更新于

前言

Honeycomb 是一个 Elixir 库,它设计用于批量执行需要保留结果的异步或后台任务。它的核心功能是异步/后台执行和并发控制。在此前我使用一个功能类似的名为 honeydew 的库,但它已缺乏维护,为了移除对它的依赖我开发了 honeycomb 以取代它。

Honeycomb 并不一定会具有 Honeydew 的完整功能,因为我会优先实现自己需要的部分。实际上目前 Honeycomb 已经和 Honeydew 存在巨大的设计差异,它更加符合 OTP 工作模型。

掌握蜂巢

想象一下,这里有一个蜂巢,而你是蜂巢的指挥官(你可以将自己代入为蜂后)。你给每一只蜜蜂下达“采蜜”任务,蜜蜂们一只只的去执行。采蜜结束蜜蜂归巢,蜂巢逐步布满待取走的蜂蜜。

你还可以限制同时外出的蜜蜂数量(并发控制)或让蜜蜂等待片刻后再出发(延迟执行)。这就是 Honeycomb(蜂巢)库的基本使用逻辑。

并且你可以创建和指挥数个蜂巢的活动,它们不会互相干扰。

进程树

这是单棵 Honeycomb 的进程树(已省略部分模块名和 id)。当存在多个服务时,它们将根据 id 互相独立。

graph LR;
    H[Honeycomb] -- 控制 -->S[Honeycomb.Scheduler];
    H-->R[Honeycomb.Runner];
    R -- 创建 -->T1[task1];
    R -- 创建 -->T2[task2];

基础用法

这个章节介绍 Honeycomb 库的基本用法。例子不一定具有现实意义,具体的使用场景还需用户自行发掘。

加入监督树

首先需要定义自己的蜂后:

defmodule YourProject.Queen do
  use Honeycomb.Queen, id: :my_honeycomb
end

将蜂巢和蜂后搭配在一起,添加到监督树中:

children = [
  # 省略其它进程...
  {Honeycomb, queen: YourProject.Queen}
]

opts = [strategy: :one_for_one, name: YourProject.Supervisor]
Supervisor.start_link(children, opts)

接下来我们就可以用 :my_honeycomb 这个 id 作为 queen 来执行所有调用。你也可以不设置 id,那么 YourProject.Queen 这个模块名将作为 queen 参数。

执行任务

通过 gather_* 系列 API 来创建任务,它们的前三个参数都是 queennamerun。其中 run 参数就是我们的任务,它可以是一个符合 (-> any) 规范的函数,也可以是一个符合 {module(), atom(), [any()]} 规范的元组。

我们执行一个休息 10 秒的任务:

iex> Honeycomb.gather_honey :my_honeycomb, "sleep-10", fn -> :timer.sleep(10 * 1000) end

它的元组版本是这样的:

iex> Honeycomb.gather_honey :my_honeycomb, "sleep-10", {:timer, :sleep, [10 * 1000]}

这里的 sleep-10 参数值是我们为蜜蜂取的名称,它应该是独一无二的。

上述的 Honeycomb.gather_honey/3 调用立即得到返回结果:

{:ok,
 %Honeycomb.Bee{
   name: "sleep-10",
   status: :pending,
   run: #Function<43.105768164/0 in :erl_eval.expr/6>,
   expect_run_at: ~U[2024-05-07 21:32:15.810149Z],
   work_start_at: nil,
   work_end_at: nil,
   stateless: false,
   result: nil
 }}

这就是我们的蜜蜂。但此刻它还是 pending 状态,还未立即执行。因为 Honeycomb 整个系统是异步的,它并不会同步调用任务。

如果我们马上调用 Honeycomb.bees/1 会看到它已经在执行了:

iex> Honeycomb.bees :my_honeycomb
[
  %Honeycomb.Bee{
    name: "sleep-10",
    status: :running,
    run: #Function<43.105768164/0 in :erl_eval.expr/6>,
    expect_run_at: ~U[2024-05-07 21:35:56.390067Z],
    work_start_at: ~U[2024-05-07 21:35:56.390377Z],
    work_end_at: nil,
    stateless: false,
    result: nil
  }
]

此处的 Honeycomb.bees/1 函数,将返回所有的蜜蜂(以下简称 bee)。其中 expect_run_at 字段表示预期执行的时间,此处并非延迟任务,所以预期执行时间通常就是创建 bee 的时间。接着系统会立即进入队列检查并执行该任务,所以 bee 会马上转换到执行状态,并更新 work_start_at 也就是工作开始的时间。

如果是延迟任务,这两个时间会有明显差距。但此处是非延迟任务,所以它们的相差时间可以忽略不计。

等待 10 秒后,再次查看:

iex> Honeycomb.bees :my_honeycomb
[
  %Honeycomb.Bee{
    name: "sleep-10",
    status: :done,
    run: #Function<43.105768164/0 in :erl_eval.expr/6>,
    expect_run_at: ~U[2024-05-07 21:35:56.390067Z],
    work_start_at: ~U[2024-05-07 21:35:56.390377Z],
    work_end_at: ~U[2024-05-07 21:36:06.391425Z],
    stateless: false,
    result: :ok
  }
]

会发现这个 bee 已经执行完成了。其中 work_end_at 表示真实的执行结束时间,和 work_start_at 间隔大约为 10 秒。

执行结果 :ok:timer.sleep/1 的返回值)放在 result 字段中。

取走蜂蜜

在上述的例子中,我们通过 Honeycomb.bees/1 查看所有蜜蜂,从 result 字段来获得任务执行结果。但它并不是专门用来获取结果的函数,我们还可以通过 Honeycomb.bee/2 函数来获取特定的 bee 的状态:

iex> Honeycomb.bee :my_honeycomb, "sleep-10"
%Honeycomb.Bee{
  name: "sleep-10",
  status: :done,
  run: #Function<43.105768164/0 in :erl_eval.expr/6>,
  expect_run_at: ~U[2024-05-07 21:35:56.390067Z],
  work_start_at: ~U[2024-05-07 21:35:56.390377Z],
  work_end_at: ~U[2024-05-07 21:36:06.391425Z],
  stateless: false,
  result: :ok
}

上面通过 sleep-10 这个 bee 名称来直接获取相对应的 bee 的最新状态,而不是遍历每一个 bee。

很多时候我们有可能只在意结果,而不在意过程(不在意 bee 的状态)。调用 Honeycomb.harvest_honey/2 函数直接收获结果:

iex> Honeycomb.harvest_honey :my_honeycomb, "sleep-10"
{:done, :ok}

它返回了 :done:ok,分别表示执行完成后的状态和执行的结果(:timer.sleep/1 的返回值)。这里的 done 有时候会是 raised,它表示执行过程中出错了。

除此之外的都将是 {:error, reason} 这个经典错误返回结构,错误原因包括:

  • 未找到(:not_found
  • 未完成(:undone

并且 Honeycomb.harvest_honey/2 在成功调用后,bee 会被移除掉。也就是说这个函数同样的参数只能成功调用一次,“收获蜂蜜”后蜜蜂和蜂蜜(结果)都将不复存在。这很好理解,很多时候我们不用永远保留结果,取走并再次利用,和蜂巢不再相关:

iex> Honeycomb.harvest_honey :my_honeycomb, "sleep-10"
{:error, :not_found} # sleep-10 这个 bee 已经被移除了,因为蜂蜜已被取走。

错误处理

对于运行的任务,除非你需要刻意的处理某些异常,否则你不需要用 try/resuce 来包装任务函数。因为 Honeycomb 已经做了这件事。我们执行一个立即报错的任务:

iex> Honeycomb.gather_honey :my_honeycomb, "raise-now", fn -> raise "I am an error" end
{:ok,
 %Honeycomb.Bee{
   name: "raise-now",
   status: :pending,
   run: #Function<43.105768164/0 in :erl_eval.expr/6>,
   expect_run_at: ~U[2024-05-07 22:09:45.608439Z],
   work_start_at: nil,
   work_end_at: nil,
   stateless: false,
   result: nil
 }}

接着我们查看这个任务:

iex> Honeycomb.bee :my_honeycomb, "raise-now"
%Honeycomb.Bee{
  name: "raise-now",
  status: :raised,
  run: #Function<43.105768164/0 in :erl_eval.expr/6>,
  expect_run_at: ~U[2024-05-07 22:09:45.608439Z],
  work_start_at: ~U[2024-05-07 22:09:45.609694Z],
  work_end_at: ~U[2024-05-07 22:09:45.609700Z],
  stateless: false,
  result: %RuntimeError{message: "I am an error"}
}

可以看到 bee 的状态变成了 raised 而不是 done。并且 result 字段保存了 raise 发生时的异常结构体数据。所以你的任务如何出错,都不会影响 Honeycomb 的系统运作。

失败重试

任务执行失败后,Honeycomb 会收集错误结果,并将 bee 设置为 raised 状态。有时候,我们希望发生某些错误时自动重试,如网络超时。通过配置 failure_mode(失败模式)可以轻易做到这一点:

defmodule YourProject.CleanerQueen do
  alias Honeycomb.FailureMode.Retry

  use Honeycomb.Queen,
    id: :cleaner,
    failure_mode: %Retry{max_times: 5, ensure: &ensure/1}

  def ensure(error) do
    case error do
      %MatchError{term: {:error, %Telegex.RequestError{reason: :timeout}}} ->
        # 请求超时,继续重试
        :continue

      _ ->
        :halt
    end
  end
end

我们创建了一个 CleanerQueen,并添加了 failure_mode 设置。当任务执行出错后,Honeycomb 会进入失败模式,并按照模式配置做更多的工作。此处我们将失败模式配置为超时,并自行实现了 ensure/1 函数。一旦任务执行失败,便会进入这个函数确认是否重试。

我们自定义的 ensure/1 函数通过判断错误细节决定在网络超时后重试,其它情况不再重试。同时我们设置了 max_times,它限制了重试的最大次数,避免陷入无限重试中。

上面实际上是一个简化后的真实例子,它是对 Telegex 函数的包装,让 API 们具有自动重试的能力。包装如下:

def async_delete_message(chat_id, message_id) do
  run = fn -> delete_message!(chat_id, message_id) end

  Honeycomb.gather_honey(:cleaner, "delete-#{chat_id}-#{message_id}", run, stateless: true)
end

def delete_message!(chat_id, message_id) do
  {:ok, true} = Telegex.delete_message(chat_id, message_id)
end

我们包装了 Telegex.delete_message/2 函数,不处理任何错误,直接模式匹配正确的返回结果。一旦不匹配就表示出现错误,进入失败模式判断错误细节。如果是网络超时就自动重试。

理论上任何幂等性的调用,都可以这样包装,让调用更加稳定。不需要自行实现任何重试机制,因为 Honyecomb 帮我们做了。

注意 ensure/1 中的代码不应该包含耗时操作,因为这个回调函数在 Honeycomb 的调度器进程中执行,耗时操作会影响调度效率。如果此回调函数执行出错了,调度器不会安排重试。

终止任务

一旦 bee 运行起来,调用 Honeycomb.terminate_bee/2 就可以终止它。我们首先创建一个任务,它在睡眠 10 秒后输出 :hello 到控制台,并将其作为结果返回:

iex> Honeycomb.gather_honey :my_honeycomb, "hello", fn -> :timer.sleep(10 * 1000); IO.inspect(:hello) end
{:ok,
 %Honeycomb.Bee{
   name: "hello",
   status: :pending,
   run: #Function<43.105768164/0 in :erl_eval.expr/6>,
   task_pid: nil,
   create_at: ~U[2024-05-09 01:48:02.609955Z],
   expect_run_at: ~U[2024-05-09 01:48:02.609955Z],
   timer: nil,
   work_start_at: nil,
   work_end_at: nil,
   stateless: false,
   result: nil
 }}

接着,我们趁它还在运行的 10 秒内终止它:

iex> Honeycomb.terminate_bee :my_honeycomb, "hello"
{:ok,
 %Honeycomb.Bee{
   name: "hello",
   status: :terminated,
   run: #Function<43.105768164/0 in :erl_eval.expr/6>,
   task_pid: nil,
   create_at: ~U[2024-05-09 01:48:02.609955Z],
   expect_run_at: ~U[2024-05-09 01:48:02.609955Z],
   timer: nil,
   work_start_at: ~U[2024-05-09 01:48:02.610295Z],
   work_end_at: nil,
   stateless: false,
   result: nil
 }

执行终止函数后,它会立即返回最新的 bee,其 terminated 状态表示被终止。我们等待 10 秒,不会看到控制台有任何输出,重复查看这个 bee 也不会看到 result 再有更新,因为任务已经被杀死了。

注意:Honeycomb.terminate_bee/2 函数无法终止尚未运行的 bee,会返回 {:error, bad_status} 。对于尚未进入 running 状态的 bee,你可以调用 Honeycomb.cancel_bee/2 将其取消。

取消任务

调用 Honeycomb.cancel_bee/2 可以取消一个 bee 的执行。只有 bee 在 pending 状态时它才可以被取消,否则将返回 {:error, bad_staus}。取消成功时,bee 的状态为 canceled。用法和终止任务类似。

停止任务

调用 Honeycomb.stop_bee/2 可以停止一个 bee,通常它比「终止」和「取消」更好用。这是第三个和杀死任务有关的 API,你或许会有以下疑问:

  • 为什么要提供「终止」和「取消」两个 API?

    因为「终止」和「取消」的底层逻辑及语义是完全不同的,所以我刻意暴露了两个对应的独立实现。它们对 bee 的状态要求很苛刻,这有可能造成麻烦。例如:我不关心被杀死的任务正在运行或没运行。

  • 所谓的「停止」操作和它们两个又有什么区别?

    Honeycomb.stop_bee/2 函数就是为了避免这类麻烦而诞生,它同时整合了「终止」和「取消」的两种逻辑,兼容 :pending:running 两种状态下 bee 的停止操作。

上面有提到「停止」是对「终止」和「取消」两种操作的复用,所以被停止的 bee 不具有表示停止的状态。bee 可能是 terminated(对应运行中被终止),也可能是 canceled(对应运行前被取消)。

匿名任务

有时候我们的任务是大量生成的,没有一对一的外部变量能组合成具有约定性质的名称。这时候可以给 gather_* 系列函数的 name 参数传递 anon 这个特殊 atom,它会在创建 bee 时生成一个具有唯一性质的名称。如下:

iex> Honeycomb.gather_honey :my_honeycomb, :anon, fn -> :ok end
{:ok,
 %Honeycomb.Bee{
   name: "-576460752303423485",
   status: :pending,
   caller: nil,
   run: #Function<43.105768164/0 in :erl_eval.expr/6>,
   task_pid: nil,
   retry: 0,
   create_at: ~U[2024-05-10 09:22:53.888790Z],
   expect_run_at: ~U[2024-05-10 09:22:53.888790Z],
   timer: nil,
   work_start_at: nil,
   work_end_at: nil,
   stateless: false,
   result: nil
 }}
由于 :anon 并不是名称值,它是一种生成策略,所以千万不要将 :anon 作为名称去调用 Honeycomb.bee/2Honeycomb.harvest_honey/2 这类函数。

同步调用

使用 Honeycomb.gather_honey_sync/4 函数可以模拟同步调用,它会阻塞当前的调用进程直到收到执行结果,并将结果作为返回值返回。直白来说,这个函数可以像同步执行任务那样等待任务执行完成,并返回任务的结果:

iex> Honeycomb.gather_honey_sync :my_honeycomb, :anon, fn -> :timer.sleep(2 * 1000); :hello end
:hello

阻塞 2 秒后,直接得到执行结果 :hello。有些时候同步 API 十分有价值,它可以包装任何需要返回值的函数,给人一种底层并非一个异步系统的错觉。

仍要强调此类同步 API 只是模拟同步,任务实际上仍然进入到调度系统中。任务也会受到其它机制的影响,如并发控制、失败模式。

传递 timeout 选项可以设置超时,超时后返回 {:error, :sync_timeout}。调用超时指的是指定时间到达,但任务仍然没有执行结束。注意:超时发生后,任务会将被立即终止

同步 API 生成的是无状态任务,也就是说执行完成后会自动删除。毕竟我们已经得到结果了。

进阶用法

从上面的教程中,我们可以简单的下一个定义,Honeycomb 就是一个异步任务的执行和结果收集系统。但它的作用不仅如此,这个章节会涉及一些更复杂的使用场景。

并发控制

修改我们自己的 Queen 模块,增加 concurrency 参数:

defmodule YourProject.Queen do
  use Honeycomb.Queen, id: :my_honeycomb, concurrency: 10
end

此时,我们的 :my_honeycomb 蜂巢已具有并发控制能力。它会始终保证仅同时执行 10 个任务,多余的任务将按照顺序进入队列等待执行。等待的 bee 在被运行之前,状态将一直是 pending,而 work_start_at 一直是 nil。任务开始后,通过计算 expect_run_atwork_start_at 的差异即可得到相关 bee 等待的时长。

延迟重试

失败重试章节我们了解到了 Honeycomb 的重试机制,但它还有更进一步的使用方法。在 ensure/1 回调中,可以返回第三个值 {:continue, delay},这里的 delay 表示延迟此次重试的时间。

带有延迟的重试和立即重试有着底层机制上的巨大区别。当我们直接返回 :continue 时,调度系统会立即重新分配 runner 再次执行。这个过程是无视现有等待队列的,就好像把所有重试都视作一个整体,除非重试结束,否则这个任务不算完成。而延迟重试(哪怕是延迟 0 毫秒)会将任务重新入队,参与到调度中。就好像这个任务被重新加入系统了,它需要再次排队。所以哪怕是返回 0 毫秒的延迟都是具有实际意义的。

我们可以这样进一步优化对 Telegex 的 API 调用的重试机制,如下:

def ensure(error) do
  case error do
    %MatchError{term: {:error, %Telegex.RequestError{reason: :timeout}}} ->
      # 请求超时,执行重试
      :continue

    %MatchError{
      term:
        {:error,
          %Telegex.Error{
            description: <<"Too Many Requests: retry after " <> second>>,
            error_code: 429
          }}
    } ->
      # 按照错误消息中的秒数等待重试
      {:continue, String.to_integer(second) * 1000}

    _ ->
      :halt
  end
end

这个 ensure/1 函数按照 API 响应中提示的等待时间安排重试延迟,保障 API 调用的成功率。

延迟执行

添加或启动任务的入口是 Honeycomb.gather_honey/4 函数,它的第四个参数是一些可选参数的 keyword。我们传递 delay 选项,用于延迟执行任务:

iex> Honeycomb.gather_honey :my_honeycomb, "delay-run", fn -> :ok end, delay: 10 * 1000
{:ok,
 %Honeycomb.Bee{
   name: "delay-run",
   status: :pending,
   run: #Function<43.105768164/0 in :erl_eval.expr/6>,
   create_at: ~U[2024-05-07 22:36:35.216562Z],
   expect_run_at: ~U[2024-05-07 22:36:45.216562Z],
   work_start_at: nil,
   work_end_at: nil,
   stateless: false,
   result: nil
 }}

我们创建了一个名为 delay-run 的 bee,但我们的任务函数并未添加任何阻塞调用。从 create_atexpect_run_at 的差异可以看到系统预期在 10 秒后执行它。等待 10 秒,再次查看这个 bee:

iex> Honeycomb.bee :my_honeycomb, "delay-run"
%Honeycomb.Bee{
  name: "delay-run",
  status: :done,
  run: #Function<43.105768164/0 in :erl_eval.expr/6>,
  create_at: ~U[2024-05-07 22:36:35.216562Z],
  expect_run_at: ~U[2024-05-07 22:36:45.216562Z],
  work_start_at: ~U[2024-05-07 22:36:45.221104Z],
  work_end_at: ~U[2024-05-07 22:36:45.221107Z],
  stateless: false,
  result: :ok
}

完美,10 秒后任务被准时执行。不过,如果 Honeycomb 系统中存在并发限制,就不一定会准时执行了。你也可以调用 Honeycomb.gather_honey_after/5 函数,它直接传递数字值作为延迟时间,更为方便。

无状态任务

如果你只是想利用 Honeycomb 的异步执行和并发控制等功能,而你的任务的结果并不重要,那么可以将任务设置为 stateless(无状态)。所有无状态任务在执行结束后会清理自身,但在运行结束之前它仍然存在。这是一个例子:

iex> Honeycomb.gather_honey :my_honeycomb, "sleep-5", fn -> :timer.sleep(5000) end, stateless: true
{:ok,
 %Honeycomb.Bee{
   name: "sleep-5",
   status: :pending,
   run: #Function<43.105768164/0 in :erl_eval.expr/6>,
   create_at: ~U[2024-05-07 22:43:35.516373Z],
   expect_run_at: ~U[2024-05-07 22:43:35.516373Z],
   work_start_at: nil,
   work_end_at: nil,
   stateless: true,
   result: nil
 }}

在 5 秒内,我们仍然可以查询到这个 bee:

iex> Honeycomb.bee :my_honeycomb, "sleep-5"
%Honeycomb.Bee{
  name: "sleep-5",
  status: :running,
  run: #Function<43.105768164/0 in :erl_eval.expr/6>,
  create_at: ~U[2024-05-07 22:43:35.516373Z],
  expect_run_at: ~U[2024-05-07 22:43:35.516373Z],
  work_start_at: ~U[2024-05-07 22:43:35.517164Z],
  work_end_at: nil,
  stateless: true,
  result: nil
}

但 5 秒后(执行结束),这个 bee 就自动清理了:

iex> Honeycomb.bee :my_honeycomb, "sleep-5"
nil

无状态任务可以避免每次都要调用 Honeycomb.harvest_honey/2 主动清理的麻烦,给不在意结果的任务提供便利。

多服务

上面有提到过,你可以创建多个 Honeycomb 系统,并各自独立使用。如:

defmodule YourProject.AnotherQueen do
  use Honeycomb.Queen, id: :another, concurrency: 10
end
children = [
  # 省略其它进程...
  {Honeycomb, queen: YourProject.Queen},
  {Honeycomb, queen: YourProject.AnotherQueen},
]

opts = [strategy: :one_for_one, name: YourProject.Supervisor]
Supervisor.start_link(children, opts)

上面创建了两个 Honeycomb 系统,其中 my_honeycomb 不限制并发数量,而 another 仅允许同时进行 10 个任务。它们可以针对性的用于不同的场合。

日志元数据

有时候你不确定日志中的内容是否来自于 Hoenycomb,或来自哪一个 Honeycomb 系统。你可以添加 honeycomb 字段到日志的元数据中,如下:

config :logger, :console,
  # 省略其它配置...
  metadata: [:honeycomb]

来自 Honeycomb 的日志就会包含 queen 的 id,效果:

honeycomb=my_honeycomb [debug] retry bee: -576460752303423484
honeycomb=another [debug] retry bee: -576460752303423487

结束语

Honeycomb 库的设计不仅如此,它还有一个未完成的路线图。这个库是个人对某些场景下的一些经验总结,它肯定不适合所有场合。但有限的场景中,我也会让它尽可能稳定和可靠,并支持更多周边设施(如速率限制、存储后端)。

作者头像 一点点入门知识 打赏作者
订阅我的 Telegram 频道

订阅频道第一时间掌握作者博客的最新动态,获取更多的分享。

本文由作者按照 CC BY 4.0 进行授权
分享:

相关文章