# Redis Streams tutorial

Redis Streams tutorial 综合教程

如果您不熟悉流,请参阅 Redis Streams 介绍。如需更全面的教程,请继续阅读。

# 介绍

Redis 流数据类型是在 Redis 5.0 中引入的。Streams 对日志数据结构进行建模,但也实现了几个操作来克服典型的仅附加日志的一些限制。其中包括 O(1) 时间内的随机访问和复杂的消费策略,例如消费群体。

# 流基础知识

流是一种只能追加的数据结构。 称为XADD 的基本写入命令将新条目附加到指定的流。

每个流条目由一个或多个字段值对组成,有点像记录或 Redis 哈希:

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

XADD 上面对命令的调用使用自动生成的条目 ID将条目添加sensor-id: 1234, temperature: 19.8到 key 的流中,该条目 ID 是命令返回的 ID,特别是. 它的第一个参数是键名,第二个参数是标识流中每个条目的条目 ID。但是,在这种情况下,我们通过mystream``1518951480106-0``mystream``*因为我们希望服务器为我们生成一个新的 ID。每个新的 ID 都会单调递增,所以更简单地说,与所有过去的条目相比,添加的每个新条目都将具有更高的 ID。服务器自动生成 ID 几乎总是您想要的,而明确指定 ID 的原因非常少见。稍后我们将详细讨论这一点。每个 Stream 条目都有一个 ID 的事实与日志文件的另一个相似之处在于,可以使用行号或文件内的字节偏移量来识别给定条目。回到我们的 XADD 示例,在键名和 ID 之后,下一个参数是组成流条目的字段-值对。

只需使用以下 XLEN 命令即可获取 Stream 中的项目数:

> XLEN mystream
(integer) 1

# 条目 ID

命令返回的条目 ID XADD,并明确标识给定流中的每个条目,由两部分组成:

<millisecondsTime>-<sequenceNumber>

毫秒时间部分实际上是本地 Redis 节点生成流 ID 的本地时间,但是如果当前毫秒时间恰好小于上一个入口时间,则使用上一个入口时间代替,所以如果时钟向后跳单调递增的 ID 属性仍然成立。序列号用于在同一毫秒内创建的条目。由于序列号是 64 位宽,实际上在同一毫秒内可以生成的条目数没有限制。

这样的 ID 格式乍一看可能很奇怪,温柔的读者可能会奇怪为什么时间是 ID 的一部分。原因是 Redis 流支持按 ID 进行范围查询。因为ID与条目的生成时间有关,所以基本上可以免费查询时间范围。 XRANGE 我们将在介绍命令时很快看到这一点。

如果由于某种原因用户需要与时间无关但实际上与另一个外部系统 ID 相关联的增量 ID,如前所述,该 XADD 命令可以采用显式 ID 而不是*触发自动生成的通配符 ID,如在以下示例:

> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

请注意,在这种情况下,最小 ID 为 0-1,并且该命令不会接受等于或小于前一个 ID 的 ID:

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

如果您运行的是 Redis 7 或更高版本,您还可以提供一个仅包含毫秒部分的显式 ID。在这种情况下,将自动生成 ID 的序列部分。为此,请使用以下语法:

> XADD somestream 0-* baz qux
0-3

# 从 Streams 中获取数据

现在我们终于可以通过 XADD. 然而,虽然将数据附加到流中非常明显,但查询流以提取数据的方式并不那么明显。如果我们继续类比日志文件,一种明显的方法是模仿我们通常使用 Unix 命令所做的事情tail -f,也就是说,我们可能会开始监听以获取附加到流中的新消息。请注意,与 Redis 的阻塞列表操作不同,其中给定元素将到达单个客户端,该客户端在弹出式操作中阻塞,例如 BLPOP 流,我们希望多个消费者看到附加到流中的新消息(许多tail -f进程可以看到添加到日志中的内容)。使用传统术语,我们希望流能够将消息扇出到多个客户端。

然而,这只是一种潜在的访问模式。我们还可以以完全不同的方式看到流:不是作为消息传递系统,而是作为时间序列存储。在这种情况下,获取附加的新消息可能也很有用,但另一种自然查询模式是按时间范围获取消息,或者使用游标迭代消息以增量检查所有历史记录。这绝对是另一种有用的访问模式。

最后,如果我们从消费者的角度来看一个流,我们可能希望以另一种方式访问该流,即作为一个消息流,可以划分给多个正在处理此类消息的消费者,以便消费者组只能看到以单个流到达的消息的子集。通过这种方式,可以在不同的消费者之间扩展消息处理,而无需单个消费者处理所有消息:每个消费者只会得到不同的消息来处理。这基本上就是 Kafka (TM) 对消费者群体所做的事情。通过消费者组读取消息是另一种从 Redis Stream 读取的有趣模式。

Redis Streams 通过不同的命令支持上述所有三种查询模式。接下来的部分将展示它们,从最简单和最直接使用的开始:范围查询。

# 按范围查询:XRANGE 和 XREVRANGE

要按范围查询流,我们只需要指定两个 ID,startend。返回的范围将包括 ID 为 start 或 end 的元素,因此该范围包括在内。这两个特殊的 ID分别表示可能的最小和最大 ID -+

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

返回的每个条目都是一个包含两项的数组:ID 和字段值对列表。我们已经说过条目 ID 与时间有关系,因为-字符左侧的部分是在创建条目时创建流条目的本地节点的 Unix 时间(以毫秒为单位)(但请注意,流使用完全指定 XADD 的命令进行复制,因此副本将具有与主服务器相同的 ID)。这意味着我可以使用查询时间范围 XRANGE. 但是,为了这样做,我可能想省略 ID 的序列部分:如果省略,则在范围的开头将假定为 0,而在结束部分将假定为最大值可用的序列号。这样,仅使用 2 毫秒 Unix 时间进行查询,我们就能以包容的方式获得在该时间范围内生成的所有条目。例如,如果我想查询一个两毫秒的时间段,我可以使用:

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

我在这个范围内只有一个条目,但是在实际数据集中,我可以查询小时范围,或者在两毫秒内可能有很多项目,返回的结果可能很大。出于这个原因,最后 XRANGE 支持一个可选的COUNT选项。通过指定一个计数,我可以得到前N个项目。如果我想要更多,我可以获取返回的最后一个 ID,将序列部分加一,然后再次查询。让我们在下面的示例中看到这一点。我们开始添加 10 个项目 XADD (我不会显示,假设流mystream中填充了 10 个项目)。为了开始我的迭代,每个命令获取 2 个项目,我从完整范围开始,但计数为 2。

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

为了继续对接下来的两项进行迭代,我必须选择返回的最后一个 ID,即为其1519073279157-0添加前缀(。产生的排他范围间隔,(1519073279157-0在这种情况下,现在可以用作下一次调用的新开始参数: XRANGE

> XRANGE mystream (1519073279157-0 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

等等。由于 XRANGE 复杂度是O(log(N))来寻找,然后O(M)来返回 M 个元素,因此该命令具有对数时间复杂度,这意味着迭代的每一步都很快。 XRANGE 事实上的流迭代器也是如此,并且不需要XSCAN命令。

该命令 XREVRANGE 等效于 XRANGE 但以倒序返回元素,因此 for 的实际用途 XREVRANGE 是检查 Stream 中的最后一项是什么:

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

请注意,该 XREVRANGE 命令以相反的顺序采用startstop参数。

# 使用 XREAD 收听新项目

当我们不想通过流中的范围访问项目时,通常我们想要的是订阅到达流的新项目。这个概念可能与 Redis Pub/Sub 相关,您订阅一个频道,或者与 Redis 阻塞列表相关,您在其中等待一个键来获取新元素,但您使用流的方式存在根本差异:

  1. 一个流可以有多个客户端(消费者)等待数据。默认情况下,每个新项目都将交付给正在等待给定流中的数据的每个消费者。这种行为与阻塞列表不同,阻塞列表中的每个消费者都将获得不同的元素。但是,散播给多个消费者的能力类似于Pub/Sub。
  2. 虽然在 Pub/Sub 中消息是一劳永逸的,并且无论如何都不会存储,而在使用阻塞列表时,当客户端接收到消息时,它会从列表中弹出(有效地删除),流以完全不同的方式工作。所有消息都无限期地附加到流中(除非用户明确要求删除条目):不同的消费者将通过记住最后收到的消息的 ID 从其角度知道什么是新消息。
  3. Streams Consumer Groups 提供了 Pub/Sub 或阻止列表无法实现的控制级别,具有相同流的不同组、已处理项目的显式确认、检查待处理项目的能力、未处理消息的声明以及每个流的连贯历史可见性单个客户端,只能查看其私人过去的消息历史记录。

提供侦听到达流中的新消息的能力的命令称为 XREAD。它比 复杂一点 XRANGE,所以我们将开始展示简单的表单,稍后将提供整个命令布局。

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

以上是 的非阻塞形式 XREAD。请注意,COUNT选项不是强制性的,实际上该命令的唯一强制性选项是STREAMS选项,它指定了一个键列表以及调用消费者已经为每个流看到的相应最大 ID,因此该命令将只向客户端提供 ID 大于我们指定的 ID 的消息。

在上面我们编写的命令中STREAMS mystream 0,我们希望 Streammystream中的所有消息的 ID 大于0-0. 正如您在上面的示例中看到的,该命令返回键名,因为实际上可以使用多个键调用此命令以同时从不同的流中读取。例如,我可以写:STREAMS mystream otherstream 0 0. 请注意,在STREAMS选项之后,我们需要提供密钥名称,然后是 ID。因此,STREAMS选项必须始终是最后一个。

除了可以一次访问多个流的事实 XREAD,并且我们能够指定我们拥有的最后一个 ID 来获取更新的消息之外,在这种简单的形式中,该命令与 XRANGE. 然而,有趣的是,我们可以通过指定BLOCK参数轻松地 XREAD 变成阻塞命令:

> XREAD BLOCK 0 STREAMS mystream $

请注意,在上面的示例中,除了删除COUNT之外,我指定了超时为 0 毫秒的新BLOCK选项(这意味着永远不会超时)。此外,mystream我没有为流传递普通 ID,而是传递了特殊 ID $。这个特殊的 ID 意味着 XREAD 应该使用已经存储在流中的最大 ID 作为最后一个 ID mystream,这样我们将只接收新消息,从我们开始收听的时间开始。这在某些方面类似于tail -fUnix 命令。

请注意,当使用BLOCK选项时,我们不必使用特殊 ID $。我们可以使用任何有效的身份证件。如果该命令能够立即处理我们的请求而不会阻塞,它将这样做,否则它将阻塞。通常,如果我们想从新条目开始消费流,我们从 ID 开始$,然后继续使用收到的最后一条消息的 ID 进行下一次调用,依此类推。

的阻塞形式 XREAD 也可以监听多个Stream,只需指定多个键名即可。如果请求可以同步服务,因为至少有一个流的元素大于我们指定的相应 ID,它会返回结果。否则,该命令将阻塞并返回第一个获取新数据的流的项目(根据指定的 ID)。

与阻塞列表操作类似,从等待数据的客户端的角度来看,阻塞流读取是*公平的,因为语义是 FIFO 风格。*当新项目可用时,为给定流阻塞的第一个客户端将是第一个被解除阻塞的客户端。

XREAD 除了COUNTBLOCK之外没有其他选项,因此它是一个非常基本的命令,具有将消费者附加到一个或多个流的特定目的。使用消费者组 API 可以使用更强大的消费流功能,但是通过消费者组读取是由名为 的不同命令实现的 XREADGROUP,本指南的下一节将介绍。

# 消费群体

当手头的任务是使用来自不同客户端的相同流时, XREAD 已经提供了一种向 N 个客户端扇出的方法,可能还使用副本以提供更多的读取可扩展性。然而,在某些问题中,我们想要做的不是向许多客户端提供相同的消息流,而是向许多客户端提供来自同一流的不同消息子集。一个明显有用的例子是处理缓慢的消息:让 N 个不同的工作人员接收流的不同部分的能力允许我们通过将不同的消息路由到准备好处理的不同工作人员来扩展消息处理。做更多的工作。

实际上,如果我们想象有三个消费者 C1、C2、C3 和一个包含消息 1、2、3、4、5、6、7 的流,那么我们想要的是根据下图提供消息:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

为了实现这一点,Redis 使用了一个叫做消费者组的概念。从实现的角度来看,Redis 消费者组与 Kafka (TM) 消费者组无关,了解这一点非常重要。然而它们在功能上相似,所以我决定保留 Kafka (TM) 的术语,因为它最初推广了这个想法。

一个消费者组就像一个伪消费者,从一个流中获取数据,实际上服务于多个消费者,提供一定的保证:

  1. 每条消息都提供给不同的消费者,因此不可能将相同的消息传递给多个消费者。
  2. 在一个消费者组中,消费者通过一个名称来标识,该名称是一个区分大小写的字符串,实现消费者的客户端必须选择该字符串。这意味着即使在断开连接后,流消费者组仍保留所有状态,因为客户端将再次声称自己是同一消费者。但是,这也意味着由客户端提供唯一标识符。
  3. 每个消费者组都有第一个从未消费过的 ID的概念,因此,当消费者请求新消息时,它可以只提供以前未传递的消息。
  4. 然而,使用消息需要使用特定命令的明确确认。Redis 将确认解释为:此消息已正确处理,因此可以将其从使用者组中逐出。
  5. 消费者组跟踪当前待处理的所有消息,即已传递给消费者组的某个消费者但尚未确认为已处理的消息。由于这个特性,当访问一个流的消息历史时,每个消费者将只能看到传递给它的消息

在某种程度上,可以将消费者组想象为有关流的某种状态

+----------------------------------------+
| consumer_group_name: mygroup           |
| consumer_group_stream: somekey         |
| last_delivered_id: 1292309234234-92    |
|                                        |
| consumers:                             |
|    "consumer-1" with pending messages  |
|       1292309234234-4                  |
|       1292309234232-8                  |
|    "consumer-42" with pending messages |
|       ... (and so forth)               |
+----------------------------------------+

如果你从这个角度来看,很容易理解消费者组可以做什么,它如何能够为消费者提供他们未决消息的历史,以及如何为消费者提供新消息。大于 的消息 ID last_delivered_id。同时,如果将消费者组视为 Redis 流的辅助数据结构,很明显单个流可以有多个消费者组,这些消费者组具有不同的消费者集。实际上,同一个流甚至可以让客户端在没有消费者组的情况下通过 XREAD 读取,而客户端 XREADGROUP 在不同的消费者组中读取。

现在是时候放大查看基本的消费者组命令了。它们是:

  • XGROUP 用于创建、销毁和管理消费者组。
  • XREADGROUP 用于通过消费者组从流中读取。
  • XACK 是允许消费者将待处理消息标记为已正确处理的命令。

# 创建消费者组

假设我已经存在一个mystream流类型的密钥,为了创建一个消费者组,我只需要执行以下操作:

> XGROUP CREATE mystream mygroup $
OK

正如您在上面的命令中看到的,在创建消费者组时,我们必须指定一个 ID,在示例中为$. 这是必需的,因为在其他状态中,消费者组必须知道在第一个消费者连接时接下来要提供什么消息,即刚刚创建组时的最后一条消息 ID是什么。如果我们$像以前那样提供,那么从现在开始只有新消息到达流中才会提供给组中的消费者。如果我们指定0消费者组将消费所有流历史记录中的消息开始。当然,您可以指定任何其他有效 ID。您所知道的是,消费者组将开始传递大于您指定的 ID 的消息。因为$表示流中当前最大的 ID,所以指定$将具有仅使用新消息的效果。

XGROUP CREATE 还支持自动创建流,如果它不存在,使用可选的MKSTREAM子命令作为最后一个参数:

> XGROUP CREATE newstream mygroup $ MKSTREAM
OK

现在已经创建了消费者组,我们可以立即尝试使用 XREADGROUP 命令通过消费者组读取消息。我们将从消费者那里得知我们将调用 Alice 和 Bob,以了解系统如何将不同的消息返回给 Alice 或 Bob。

XREADGROUP 与BLOCK非常相似 XREAD 并提供相同的选项,否则是同步命令。但是,有一个必须始终指定的强制选项,即GROUP并且有两个参数:消费者组的名称和尝试读取的消费者的名称。选项COUNT也受支持,与. XREAD

在从流中读取之前,让我们在里面放一些消息:

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

注意:这里的message是字段名,fruit是关联的值,记住stream items是小字典。

是时候尝试使用消费者组阅读内容了:

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

XREADGROUP 回复就像 XREAD 回复一样。但是请注意GROUP <group-name> <consumer-name>上面提供的内容。它声明我想使用消费者组从流中读取,mygroup并且我是消费者Alice。消费者每次对消费者组执行操作时,都必须指定其名称,在组内唯一标识该消费者。

上面的命令行中还有一个非常重要的细节,在强制STREAMS选项之后,为密钥请求的 IDmystream是特殊 ID >。这个特殊的 ID 只在消费者组的上下文中有效,它意味着:消息从未传递给其他消费者

这几乎总是您想要的,但是也可以指定一个真实的 ID,例如0或任何其他有效的 ID,但是,在这种情况下,发生的情况是我们要求 XREADGROUP 只向我们提供待处理消息的历史记录,在这种情况下,将永远不会在组中看到新消息。 XREADGROUP 所以根据我们指定的ID基本上有以下行为:

  • 如果 ID 是特殊 ID >,则该命令将仅返回迄今为止从未传递给其他消费者的新消息,并且作为副作用,将更新消费者组的最后一个 ID
  • 如果 ID 是任何其他有效的数字 ID,则该命令将让我们访问待处理消息的历史记录。也就是说,传递给此指定使用者(由提供的名称标识)的消息集,到目前为止从未使用 XACK.

我们可以立即指定 ID 为 0 来测试这种行为,而无需任何COUNT选项:我们只会看到唯一的待处理消息,即关于苹果的消息:

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

但是,如果我们确认消息已处理,它将不再是待处理消息历史记录的一部分,因此系统将不再报告任何内容:

> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

如果您还不知道如何 XACK 工作,请不要担心,这个想法只是处理过的消息不再是我们可以访问的历史记录的一部分。

现在轮到 Bob 读一些东西了:

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"

Bob 要求最多两条消息,并且正在通过同一组阅读mygroup。所以发生的事情是 Redis 只报告新消息。如您所见,“apple”消息没有传递,因为它已经传递给 Alice,所以 Bob 得到了橙色和草莓,等等。

这样,Alice、Bob 和组中的任何其他消费者都能够从同一流中读取不同的消息,读取他们尚未处理消息的历史记录,或者将消息标记为已处理。这允许创建不同的拓扑和语义来消费来自流的消息。

有几件事情要记住:

  • 消费者在第一次被提及时自动创建,无需显式创建。
  • 即使 XREADGROUP 您可以同时读取多个键,但是要使其正常工作,您需要在每个流中创建一个具有相同名称的使用者组。这不是常见的需求,但值得一提的是,该功能在技术上是可用的。
  • XREADGROUP 是一个写命令,因为即使它从流中读取,消费者组也会被修改为读取的副作用,因此只能在主实例上调用。

以下是使用 Ruby 语言编写的使用消费者组的消费者实现示例。Ruby 代码旨在让几乎任何有经验的程序员都能阅读,即使他们不了解 Ruby:

require 'redis'

if ARGV.length == 0
    puts "Please specify a consumer name"
    exit 1
end

ConsumerName = ARGV[0]
GroupName = "mygroup"
r = Redis.new

def process_message(id,msg)
    puts "[#{ConsumerName}] #{id} = #{msg.inspect}"
end

$lastid = '0-0'

puts "Consumer #{ConsumerName} starting..."
check_backlog = true
while true
    # Pick the ID based on the iteration: the first time we want to
    # read our pending messages, in case we crashed and are recovering.
    # Once we consumed our history, we can start getting new messages.
    if check_backlog
        myid = $lastid
    else
        myid = '>'
    end

    items = r.xreadgroup('GROUP',GroupName,ConsumerName,'BLOCK','2000','COUNT','10','STREAMS',:my_stream_key,myid)

    if items == nil
        puts "Timeout!"
        next
    end

    # If we receive an empty reply, it means we were consuming our history
    # and that the history is now empty. Let's start to consume new messages.
    check_backlog = false if items[0][1].length == 0

    items[0][1].each{|i|
        id,fields = i

        # Process the message
        process_message(id,fields)

        # Acknowledge the message as processed
        r.xack(:my_stream_key,GroupName,id)

        $lastid = id
    }
end

如您所见,这里的想法是从消费历史开始,即我们的待处理消息列表。这很有用,因为消费者之前可能已经崩溃,所以在重新启动的情况下,我们希望重新读取传递给我们但没有得到确认的消息。请注意,我们可能会多次或一次处理一条消息(至少在消费者失败的情况下,但也存在 Redis 持久性和复制的限制,请参阅本主题的具体部分)。

一旦历史被消费,我们得到一个空的消息列表,我们可以切换到使用>特殊 ID 来消费新消息。

# 从永久性故障中恢复

上面的示例允许我们编写参与同一个消费者组的消费者,每个消费者都处理一个消息子集,并在从故障中恢复时重新读取刚刚传递给他们的待处理消息。然而,在现实世界中,消费者可能会永久失败并且永远无法恢复。由于任何原因停止后永远不会恢复的消费者的未决消息会发生什么?

Redis 消费者组提供了在这些情况下使用的功能,以便声明给定消费者的待处理消息,以便此类消息将更改所有权并重新分配给不同的消费者。该功能非常明确。消费者必须检查待处理消息列表,并且必须使用特殊命令声明特定消息,否则服务器将永远将消息挂起并分配给旧消费者。通过这种方式,不同的应用程序可以选择是否使用这样的功能,以及如何使用它。

这个过程的第一步只是一个命令,它提供了消费者组中未决条目的可观察性,被称为 XPENDING. 这是一个只读命令,始终可以安全调用,并且不会更改任何消息的所有权。在最简单的形式中,该命令使用两个参数调用,它们是流的名称和使用者组的名称。

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"

以这种方式调用时,该命令会输出消费者组中的未决消息总数(本例中为两条),未决消息中较低和较高的消息ID,最后是消费者列表和他们的未决消息数有。我们只有 Bob 有两条待处理的消息,因为 Alice 请求的单个消息是使用 XACK.

我们可以通过向 提供更多参数来询问更多信息 XPENDING,因为完整的命令签名如下:

XPENDING <key> <groupname> [[IDLE <min-idle-time>] <start-id> <end-id> <count> [<consumer-name>]]

通过提供一个开始和结束 ID(可以是-+in 一样 XRANGE )和一个计数来控制命令返回的信息量,我们能够了解更多关于未决消息的信息。如果我们想将输出限制为给定消费者的未决消息,则使用可选的最终参数消费者名称,但在以下示例中不会使用此功能。

> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

现在我们有了每条消息的详细信息:ID、消费者名称、空闲时间(以毫秒为单位),即自上次将消息传递给某个消费者以来经过了多少毫秒,最后是给定的次数消息已传递。我们有两条来自 Bob 的消息,它们空闲了 74170458 毫秒,大约 20 小时。

请注意,没有人阻止我们仅使用 XRANGE.

> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

我们只需在参数中重复两次相同的 ID。现在我们有了一些想法,Alice 可能会决定,在 20 小时不处理消息之后,Bob 可能无法及时恢复,是时候认领这些消息并代替 Bob 恢复处理了。为此,我们使用 XCLAIM 命令。

这个命令非常复杂,并且包含完整形式的选项,因为它用于复制消费者组更改,但我们将只使用我们通常需要的参数。在这种情况下,它很简单:

XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2> ... <ID-N>

基本上我们说,对于这个特定的键和组,我希望指定的消息 ID 将更改所有权,并将分配给指定的使用者名称<consumer>。但是,我们也提供了一个最小空闲时间,因此只有当提到的消息的空闲时间大于指定的空闲时间时,操作才会起作用。这很有用,因为可能有两个客户端同时重试声明一条消息:

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Client 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

但是,作为副作用,声明一条消息将重置其空闲时间并增加其交付计数器的数量,因此第二个客户端将无法声明它。通过这种方式,我们避免了对消息进行微不足道的重新处理(即使在一般情况下,您无法准确地获得一次处理)。

这是命令执行的结果:

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Alice 成功认领了该消息,她现在可以处理该消息并确认它,并且即使原始消费者没有恢复,也可以继续处理。

从上面的示例中可以清楚地看出,作为成功声明给定消息的副作用,该 XCLAIM 命令也会返回它。然而,这不是强制性的。JUSTID选项可用于仅返回成功声明的消息的 ID **。**如果您想减少客户端和服务器之间使用的带宽(以及命令的性能)并且您对消息不感兴趣,这将非常有用,因为您的消费者是以重新扫描挂起历史的方式实现的不时发消息。

声明也可以通过一个单独的过程来实现:一个只检查待处理消息列表,并将空闲消息分配给似乎处于活动状态的消费者的过程。可以使用 Redis 流的可观察性功能之一获得活跃的消费者。这是下一节的主题。

# 自动认领

XAUTOCLAIM 命令在 Redis 6.2 中添加,实现了我们上面描述的声明过程。 XPENDINGXCLAIM 为不同类型的恢复机制提供基本构建块。此命令通过让 Redis 管理通用流程来优化通用流程,并为大多数恢复需求提供简单的解决方案。

XAUTOCLAIM 识别空闲的未决消息并将它们的所有权转移给消费者。该命令的签名如下所示:

XAUTOCLAIM <key> <group> <consumer> <min-idle-time> <start> [COUNT count] [JUSTID]

因此,在上面的示例中,我可以使用自动声明来声明一条消息,如下所示:

> XAUTOCLAIM mystream mygroup Alice 3600000 0-0 COUNT 1
1) 1526569498055-0
2) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

与 一样 XCLAIM,该命令使用声明的消息数组进行回复,但它还返回允许迭代待处理条目的流 ID。流 ID 是一个游标,我可以在下一次调用中使用它来继续声明空闲的待处理消息:

> XAUTOCLAIM mystream mygroup Lora 3600000 1526569498055-0 COUNT 1
1) 0-0
2) 1) 1526569506935-0
   2) 1) "message"
      2) "strawberry"

XAUTOCLAIM 返回“0-0”流ID作为游标时,这意味着它到达了消费者组待处理条目列表的末尾。这并不意味着没有新的空闲挂起消息,因此该过程 XAUTOCLAIM 通过从流的开头调用来继续。

# 索赔和交付柜台

您在输出中观察到的计数器 XPENDING 是每条消息的传递次数。计数器以两种方式递增:通过成功声明消息 XCLAIM 或使用 XREADGROUP 调用以访问未决消息的历史记录时。

当发生故障时,消息会被多次传递是正常的,但最终它们通常会得到处理和确认。但是,处理某些特定消息可能会出现问题,因为它已损坏或以触发处理代码中的错误的方式制作。在这种情况下,消费者将不断无法处理此特定消息。因为我们有传递尝试的计数器,所以我们可以使用该计数器来检测由于某种原因无法处理的消息。因此,一旦交付计数器达到您选择的给定大数字,将此类消息放在另一个流中并向系统管理员发送通知可能更明智。这基本上是 Redis Streams 实现死信概念的方式。

# 流可观察性

缺乏可观察性的消息系统很难使用。不知道谁在消费消息,哪些消息正在等待处理,不知道给定流中活跃的消费者组集,这使得一切都变得不透明。出于这个原因,Redis Streams 和消费者组有不同的方式来观察正在发生的事情。我们已经介绍了 XPENDING,它允许我们检查在给定时刻正在处理的消息列表,以及它们的空闲时间和传递次数。

然而,我们可能想做的还不止这些,该 XINFO 命令是一个可观察性接口,可以与子命令一起使用,以获取有关流或消费者组的信息。

此命令使用子命令来显示有关流状态及其使用者组的不同信息。例如信丰流报告有关流本身的信息。

> XINFO STREAM mystream
 1) "length"
 2) (integer) 2
 3) "radix-tree-keys"
 4) (integer) 1
 5) "radix-tree-nodes"
 6) (integer) 2
 7) "last-generated-id"
 8) "1638125141232-0"
 9) "max-deleted-entryid"
10) "0-0"
11) "entries-added"
12) (integer) 2
13) "groups"
14) (integer) 1
15) "first-entry"
16) 1) "1638125133432-0"
    2) 1) "message"
       2) "apple"
17) "last-entry"
18) 1) "1638125141232-0"
    2) 1) "message"
       2) "banana"

输出显示有关流如何在内部编码的信息,还显示流中的第一条和最后一条消息。另一条可用信息是与此流关联的消费者组的数量。我们可以进一步挖掘,询问有关消费者群体的更多信息。

> XINFO GROUPS mystream
1)  1) "name"
    2) "mygroup"
    3) "consumers"
    4) (integer) 2
    5) "pending"
    6) (integer) 2
    7) "last-delivered-id"
    8) "1638126030001-0"
    9) "entries-read"
   10) (integer) 2
   11) "lag"
   12) (integer) 0
2)  1) "name"
    2) "some-other-group"
    3) "consumers"
    4) (integer) 1
    5) "pending"
    6) (integer) 0
    7) "last-delivered-id"
    8) "1638126028070-0"
    9) "entries-read"
   10) (integer) 1
   11) "lag"
   12) (integer) 1

正如您在此输出和上一个输出中看到的那样,该 XINFO 命令输出一系列字段值项。因为它是一个可观察性命令,所以人类用户可以立即了解报告了哪些信息,并允许该命令通过添加更多字段来报告更多信息,而不会破坏与旧客户端的兼容性。其他必须提高带宽效率的命令,例如 XPENDING,只报告不带字段名称的信息。

上面示例的输出,其中使用了GROUPS子命令,应该清楚地观察字段名称。我们可以通过检查组中注册的消费者来更详细地检查特定消费者组的状态。

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

如果您不记得命令的语法,请向命令本身寻求帮助:

> XINFO HELP
1) XINFO <subcommand> [<arg> [value] [opt] ...]. Subcommands are:
2) CONSUMERS <key> <groupname>
3)     Show consumers of <groupname>.
4) GROUPS <key>
5)     Show the stream consumer groups.
6) STREAM <key> [FULL [COUNT <count>]
7)     Show information about the stream.
8) HELP
9)     Prints this help.

# 与 Kafka (TM) 分区的区别

Redis 流中的消费者组可能在某种程度上类似于基于 Kafka (TM) 分区的消费者组,但请注意,Redis 流实际上非常不同。分区只是逻辑的,消息只是放入单个 Redis 键中,因此为不同客户端提供服务的方式取决于谁准备好处理新消息,而不是客户端正在从哪个分区读取。例如,如果消费者 C3 在某个时刻永久失败,Redis 将继续为 C1 和 C2 提供所有到达的新消息,就好像现在只有两个逻辑分区一样。

类似地,如果给定的消费者处理消息的速度比其他消费者快得多,则该消费者将在同一时间单位内按比例接收更多消息。这是可能的,因为 Redis 明确地跟踪所有未确认的消息,并记住谁收到了哪条消息以及第一条消息的 ID 从未传递给任何消费者。

但是,这也意味着,在 Redis 中,如果你真的想将同一流中的消息划分为多个 Redis 实例,则必须使用多个键和一些分片系统,例如 Redis Cluster 或其他一些特定于应用程序的分片系统。单个 Redis 流不会自动分区到多个实例。

我们可以说,示意性地以下是正确的:

  • 如果您使用 1 个流 -> 1 个使用者,则您正在按顺序处理消息。
  • 如果您将 N 个流与 N 个消费者一起使用,那么只有给定的消费者会命中 N 个流的子集,您可以扩展上述 1 个流 -> 1 个消费者的模型。
  • 如果您使用 1 个流 -> N 个消费者,则您正在对 N 个消费者进行负载平衡,但是在这种情况下,关于同一逻辑项的消息可能会被乱序消费,因为给定的消费者处理消息 3 的速度可能比另一个消费者正在处理的速度快消息 4。

所以基本上 Kafka 分区更类似于使用 N 个不同的 Redis 键,而 Redis 消费者组是从给定流到 N 个不同消费者的消息的服务器端负载平衡系统。

# 封顶流

许多应用程序不希望永远将数据收集到流中。有时在流中最多包含给定数量的项目很有用,有时一旦达到给定大小,将数据从 Redis 移动到不在内存中且速度不快但适合存储的存储是很有用的未来几十年的历史。Redis 流对此有一些支持。一个是命令的MAXLEN选项 XADD。这个选项使用起来非常简单:

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

使用MAXLEN当达到指定的长度时,旧的条目会被自动驱逐,因此流的大小保持不变。目前没有选项可以告诉流只保留不超过给定时间段的项目,因为这样的命令为了始终如一地运行,可能会阻塞很长时间以驱逐项目。例如,想象一下如果有一个插入尖峰,然后是长时间的停顿,然后是另一个插入,所有这些都具有相同的最大时间会发生什么。流将阻塞以驱逐在暂停期间变得太旧的数据。因此,用户需要进行一些规划并了解所需的最大流长度是多少。此外,虽然流的长度与所使用的内存成正比,但按时间修剪不太容易控制和预测:

然而,使用MAXLEN进行修剪可能会很昂贵:流由宏节点表示为基数树,以便非常节省内存。改变由几十个元素组成的单个宏节点并不是最优的。因此可以使用以下特殊形式的命令:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

MAXLEN~选项和实际计数之间的争论意味着,我真的不需要这恰好是 1000 个项目。它可以是 1000 或 1010 或 1030,只要确保至少保存 1000 个项目即可。使用此参数,仅当我们可以删除整个节点时才执行修剪。这使它更有效率,而且它通常是你想要的。

还有一个命令,它执行的操作与上面的MAXLEN XTRIM 选项非常相似,除了它可以自行运行:

> XTRIM mystream MAXLEN 10

或者,至于 XADD 选项:

> XTRIM mystream MAXLEN ~ 10

但是, XTRIM 旨在接受不同的修剪策略。另一种修剪策略是MINID,它驱逐 ID 低于指定 ID 的条目。

作为 XTRIM 一个明确的命令,用户应该知道不同修剪策略可能存在的缺点。

未来可能添加的另一个有用的驱逐策略 XTRIM 是通过一系列 ID 删除以方便使用, XRANGEXTRIM 在需要时将数据从 Redis 移动到其他存储系统。

# 流 API 中的特殊 ID

您可能已经注意到,在 Redis API 中可以使用几个特殊的 ID。这是一个简短的回顾,以便它们在未来更有意义。

前两个特殊 ID 是-+,并且在使用 XRANGE 命令的范围查询中使用。这两个 ID 分别表示可能的最小 ID(基本上是0-1)和可能的最大 ID(即18446744073709551615-18446744073709551615)。正如你所看到的,它写起来更干净,-+不是那些数字。

然后是我们想说的API,流中ID最大的项目的ID。这是什么$意思。因此,例如,如果我只想要新的条目, XREADGROUP 我使用这个 ID 来表示我已经拥有所有现有的条目,而不是将来插入的新条目。同样,当我创建或设置消费者组的 ID 时,我可以将最后交付的项目设置为$,以便将新条目交付给组中的消费者。

如您所见$,这并不意味着+它们是两个不同的东西,+每个可能的流中可能$的最大 ID 是,而包含给定条目的给定流中的最大 ID 是。此外,API 通常只能理解+or $,但避免加载具有多种含义的给定符号很有用。

另一个特殊的 ID 是>,这是一个特殊的含义,仅与消费者群体相关,并且仅在使用 XREADGROUP 命令时。这个特殊的 ID 意味着我们只需要迄今为止从未交付给其他消费者的条目。所以基本上>ID 是消费者组的最后交付的 ID。

最后*,只能与 XADD 命令一起使用的特殊 ID 意味着为我们自动选择新条目的 ID。

所以我们有-, +, $, >and *, 并且都有不同的含义,而且大多数时候,可以在不同的上下文中使用。

# 持久性、复制和消息安全

与任何其他 Redis 数据结构一样,Stream 被异步复制到副本并持久化到 AOF 和 RDB 文件中。然而,可能不太明显的是,消费者组的完整状态也会传播到 AOF、RDB 和副本,所以如果消息在主服务器中挂起,副本也将具有相同的信息。同样,重启后,AOF 将恢复消费者组的状态。

但是请注意,Redis 流和使用者组是使用 Redis 默认复制进行持久化和复制的,因此:

  • 如果消息的持久性在您的应用程序中很重要,则必须将 AOF 与强大的 fsync 策略一起使用。
  • 默认情况下,异步复制不保证复制 XADD 命令或消费者组状态更改:在故障转移之后,可能会丢失某些内容,具体取决于副本从主服务器接收数据的能力。
  • WAIT 命令可用于强制将更改传播到一组副本。但是请注意,虽然这使得数据丢失的可能性很小,但由 Sentinel 或 Redis Cluster 操作的 Redis 故障转移过程仅会尽最大努力检查故障转移到最新更新的副本,并且在某些特定故障条件下可能会促进缺少一些数据的副本。

因此,在使用 Redis 流和消费者组设计应用程序时,请确保了解应用程序在故障期间应具有的语义属性,并相应地进行配置,评估它对于您的用例是否足够安全。

# 从流中删除单个项目

流还有一个特殊的命令,用于从流的中间删除项目,仅通过 ID。通常对于仅附加的数据结构,这可能看起来像一个奇怪的功能,但它实际上对于涉及隐私法规的应用程序很有用。调用该命令 XDEL 并接收流的名称,后跟要删除的 ID:

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

然而,在当前的实现中,直到宏节点完全为空,内存才真正被回收,所以你不应该滥用这个特性。

# 零长度流

流和其他 Redis 数据结构的区别在于,当其他数据结构不再有任何元素时,作为调用删除元素的命令的副作用,键本身将被删除。例如,当调用 ZREM 将删除排序集中的最后一个元素时,排序集将被完全删除。另一方面,流被允许保持为零元素,这既是因为使用了计数为零的MAXLEN选项( XADDXTRIM 命令),也因为 XDEL 被调用了。

之所以存在这种不对称,是因为 Streams 可能有关联的消费者组,我们不想因为流中不再有任何项目而丢失消费者组定义的状态。目前,即使没有关联的消费者组,流也不会被删除。

# 消费消息的总延迟

XRANGE 像和 XREAD 没有BLOCK 选项这样的非阻塞流命令 XREADGROUP 会像任何其他 Redis 命令一样同步服务,因此讨论此类命令的延迟是没有意义的:在 Redis 文档中检查命令的时间复杂度会更有趣。应该说,在提取范围时,流命令至少与排序集命令一样快,而且 XADD 速度非常快,如果使用流水线,可以很容易地在普通机器中每秒插入 50 万到 100 万个项目。

但是,如果我们想了解处理消息的延迟,那么延迟就变成了一个有趣的参数,在消费者组中阻塞消费者的上下文中,从通过 产生消息 XADD 的那一刻到消费者因为 XREADGROUP 返回而获得消息的那一刻与消息。

# 服务受阻消费者的工作原理

在提供执行测试的结果之前,有必要了解 Redis 使用什么模型来路由流消息(以及实际上如何管理等待数据的任何阻塞操作)。

  • 被阻塞的客户端在哈希表中被引用,该哈希表将至少有一个阻塞消费者的键映射到等待该键的消费者列表。这样,给定一个接收数据的密钥,我们可以解析所有正在等待此类数据的客户端。
  • 当发生写入时,在这种情况下,当 XADD 调用命令时,它会调用signalKeyAsReady()函数。此函数会将密钥放入需要处理的密钥列表中,因为这样的密钥可能有针对被屏蔽消费者的新数据。请注意,此类就绪键将在稍后处理,因此在同一事件循环周期的过程中,该键可能会收到其他写入。
  • 最后,在返回事件循环之前,最后处理就绪键。对于每个键,扫描等待数据的客户端列表,如果适用,这些客户端将接收到达的新数据。在流的情况下,数据是消费者请求的适用范围内的消息。

如您所见,基本上,在返回事件循环之前,客户端调用 XADD 和被阻塞以消费消息的客户端都会在输出缓冲区中获得回复,因此调用者 XADD 应该大约在同一时间收到来自 Redis 的回复消费者将收到新消息。

该模型是基于推送的,因为将数据添加到消费者缓冲区将直接通过调用操作执行 XADD,因此延迟往往是可以预测的。

# 延迟测试结果

为了检查这些延迟特性,使用多个 Ruby 程序实例执行测试,这些实例推送消息,该消息具有作为附加字段的计算机毫秒时间,以及 Ruby 程序从消费者组读取消息并处理它们。消息处理步骤包括将当前计算机时间与消息时间戳进行比较,以了解总延迟。

获得的结果:

Processed between 0 and 1 ms -> 74.11%
Processed between 1 and 2 ms -> 25.80%
Processed between 2 and 3 ms -> 0.06%
Processed between 3 and 4 ms -> 0.01%
Processed between 4 and 5 ms -> 0.02%

因此,99.9% 的请求延迟 <= 2 毫秒,异常值仍然非常接近平均值。

向流中添加几百万条未确认的消息不会改变基准测试的要点,大多数查询仍然以非常短的延迟处理。

几点说明:

  • 在这里,我们每次迭代最多处理 10k 条消息,这意味着COUNT参数 XREADGROUP 设置为 10000。这增加了很多延迟,但为了让慢速消费者能够跟上消息流,这是必需的。因此,您可以预期现实世界的延迟要小得多。
  • 与今天的标准相比,用于此基准测试的系统非常慢。
Last Updated: 4/18/2023, 8:45:33 AM