跳转至

6. 运营

以下是根据 LinkedIn 的使用情况和经验实际将 Kafka 作为生产系统运行的一些信息。请将您知道的任何其他提示发送给我们。

6.1 Kafka基本操作

本节将回顾您将在 Kafka 集群上执行的最常见操作。本节中回顾的所有工具都可以在bin/Kafka 发行版的目录下找到,如果不带参数运行,每个工具都会打印所有可能的命令行选项的详细信息。

添加和删​​除主题

您可以选择手动添加主题,也可以在数据首次发布到不存在的主题时自动创建主题。如果主题是自动创建的,那么您可能需要调整用于自动创建主题的 默认主题配置。

使用主题工具添加和修改主题:

  > bin/kafka-topics.sh --bootstrap-server broker_host:port --create --topic my_topic_name \
        --partitions 20 --replication-factor 3 --config x=y

复制因子控制有多少服务器将复制每条写入的消息。如果复制因子为 3,则在您失去对数据的访问权限之前最多 2 个服务器可能会发生故障。我们建议您使用 2 或 3 的复制因子,以便您可以透明地弹跳机器而不中断数据消耗。

分区计数控制主题将被分片为多少个日志。分区计数有多种影响。首先,每个分区必须完全适合一台服务器。因此,如果您有 20 个分区,则完整数据集(以及读写负载)将由不超过 20 个服务器(不包括副本)处理。最后,分区计数会影响消费者的最大并行度。概念部分对此进行了更详细的讨论。

每个分片分区日志都放置在 Kafka 日志目录下自己的文件夹中。此类文件夹的名称由主题名称、附加破折号 (-) 和分区 ID 组成。由于典型的文件夹名称长度不能超过 255 个字符,因此主题名称的长度将受到限制。我们假设分区数量永远不会超过 100,000。因此,主题名称不能超过 249 个字符。这会在文件夹名称中留下足够的空间用于短划线和可能为 5 位数长的分区 ID。

在命令行上添加的配置会覆盖服务器的默认设置,例如数据应保留的时间长度。完整的每个主题配置集记录在此处

修改主题

您可以使用同一主题工具更改主题的配置或分区。

要添加分区,您可以执行以下操作

  > bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic my_topic_name \
        --partitions 40

请注意,分区的一种用例是对数据进行语义分区,并且添加分区不会更改现有数据的分区,因此如果消费者依赖该分区,这可能会干扰他们。也就是说,如果数据已分区hash(key) % number_of_partitions,则该分区可能会通过添加分区而被打乱,但 Kafka 不会尝试以任何方式自动重新分配数据。

添加配置:

  > bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --add-config x=y

要删除配置:

  > bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --delete-config x

最后删除一个主题:

  > bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic my_topic_name

Kafka目前不支持减少主题的分区数量。

可以在此处 找到有关更改主题的复制因子的说明。

优雅关机

Kafka 集群将自动检测任何代理关闭或故障,并为该机器上的分区选举新的领导者。无论服务器发生故障还是为了维护或配置更改而故意关闭服务器,都会发生这种情况。对于后一种情况,Kafka 支持一种更优雅的机制来停止服务器,而不仅仅是杀死它。当服务器正常停止时,它将利用两个优化:

  1. 它将所有日志同步到磁盘,以避免重新启动时需要执行任何日志恢复(即验证日志尾部所有消息的校验和)。日志恢复需要时间,因此这可以加快有意重新启动的速度。
  2. 它将在关闭之前将服务器作为领导者的任何分区迁移到其他副本。这将使领导权转移更快,并将每个分区不可用的时间最小化到几毫秒。

每当服务器停止(除了硬终止)时,同步日志都会自动发生,但受控领导迁移需要使用特殊设置:

      controlled.shutdown.enable=true

请注意,只有在代理上托管的所有分区都有副本(即复制因子大于 1并且这些副本中至少有一个处于活动状态)时,受控关闭才会成功。这通常是您想要的,因为关闭最后一个副本将使该主题分区不可用。

平衡领导力

每当代理停止或崩溃时,该代理的分区的领导权就会转移到其他副本。当代理重新启动时,它只会成为其所有分区的追随者,这意味着它不会用于客户端读取和写入。

为了避免这种不平衡,Kafka 有一个首选副本的概念。如果分区的副本列表为 1、5、9,则节点 1 优先作为节点 5 或 9 的领导者,因为它在副本列表中较早。默认情况下,Kafka 集群将尝试恢复首选副本的领导地位。此行为配置为:

      auto.leader.rebalance.enable=true

您也可以将其设置为 false,但随后您需要通过运行以下命令手动恢复已恢复副本的领导权:

  > bin/kafka-leader-election.sh --bootstrap-server broker_host:port --election-type preferred --all-topic-partitions

跨机架平衡副本

机架感知功能将同一分区的副本分布在不同的机架上。这扩展了 Kafka 为代理故障提供的保证以涵盖机架故障,从而限制了机架上的所有代理同时发生故障时数据丢失的风险。该功能还可以应用于其他代理分组,例如 EC2 中的可用区域。

您可以通过向代理配置添加属性来指定代理属于特定机架:

  broker.rack=my-rack-id

当创建修改主题或重新分配副本 时,将遵守机架约束,确保副本跨越尽可能多的机架(分区将跨越 min(#racks,replication-factor) 个不同的机架)。

用于将副本分配给代理的算法可确保每个代理的领导者数量保持不变,无论代理如何跨机架分布。这确保了平衡的吞吐量。

但是,如果为机架分配了不同数量的代理,则副本的分配将不均匀。具有较少代理的机架将获得更多副本,这意味着它们将使用更多存储并将更多资源用于复制。因此,每个机架配置相同数量的代理是明智的。

集群之间的数据镜像和异地复制

Kafka 管理员可以定义跨越各个 Kafka 集群、数据中心或地理区域边界的数据流。请参阅异地复制部分以获取更多信息。

检查消费者位置

有时了解消费者的立场很有用。我们有一个工具可以显示消费者组中所有消费者的位置以及它们距离日志末尾有多远。要在使用名为my-topic 的主题的名为my-group 的消费者组上运行此工具,如下所示:

  > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group

  TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
  my-topic                       0          2               4               2          consumer-1-029af89c-873c-4751-a720-cefd41a669d6   /127.0.0.1                     consumer-1
  my-topic                       1          2               3               1          consumer-1-029af89c-873c-4751-a720-cefd41a669d6   /127.0.0.1                     consumer-1
  my-topic                       2          2               3               1          consumer-2-42c1abd4-e3b2-425d-a8bb-e1ea49b29bb2   /127.0.0.1                     consumer-2

管理消费者群体

使用 ConsumerGroupCommand 工具,我们可以列出、描述或删除消费者组。可以手动删除消费者组,也可以在该组的最后提交的偏移量到期时自动删除该消费者组。仅当组没有任何活动成员时,手动删除才有效。例如,要列出所有主题的所有消费者组:

  > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

  test-consumer-group

要查看偏移量,如前所述,我们像这样“描述”消费者组:

  > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group

  TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                    HOST            CLIENT-ID
  topic3          0          241019          395308          154289          consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2
  topic2          1          520678          803288          282610          consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2
  topic3          1          241018          398817          157799          consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2
  topic1          0          854144          855809          1665            consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1      consumer1
  topic2          0          460537          803290          342753          consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1      consumer1
  topic3          2          243655          398812          155157          consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1      consumer4

有许多附加的“描述”选项可用于提供有关消费者组的更详细信息:

  • --members:此选项提供消费者组中所有活跃成员的列表。

    ```bash > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members

      CONSUMER-ID                                    HOST            CLIENT-ID       #PARTITIONS
      consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1      consumer1       2
      consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1      consumer4       1
      consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2       3
      consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /127.0.0.1      consumer3       0
    

    ```

  • --members --verbose:除了上面的“--members”选项报告的信息之外,此选项还提供分配给每个成员的分区。

    ```bash > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members --verbose

      CONSUMER-ID                                    HOST            CLIENT-ID       #PARTITIONS     ASSIGNMENT
      consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1      consumer1       2               topic1(0), topic2(0)
      consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1      consumer4       1               topic3(2)
      consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2       3               topic2(1), topic3(0,1)
      consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /127.0.0.1      consumer3       0               -
    

    ```

  • --offsets:这是默认的描述选项,提供与“--describe”选项相同的输出。

  • --state:此选项提供有用的组级别信息。

    ```bash > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --state

      COORDINATOR (ID)          ASSIGNMENT-STRATEGY       STATE                #MEMBERS
      localhost:9092 (0)        range                     Stable               4
    

    ```

要手动删除一个或多个消费者组,可以使用“--delete”选项:

  > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group my-group --group my-other-group

  Deletion of requested consumer groups ('my-group', 'my-other-group') was successful.

要重置消费者组的偏移量,可以使用“--reset-offsets”选项。此选项同时支持一个消费者组。它需要定义以下范围:--all-topics 或--topic。除非您使用“--from-file”方案,否则必须选择一个范围。另外,首先确保使用者实例处于非活动状态。有关更多详细信息, 请参阅 KIP-122 。

它有 3 个执行选项:

  • (默认)显示要重置的偏移量。
  • --execute : 执行 --reset-offsets 进程。
  • --export :将结果导出为 CSV 格式。

--reset-offsets 还有以下场景可供选择(至少必须选择一种场景):

  • --to-datetime :将偏移量重置为日期时间的偏移量。格式:'YYYY-MM-DDTHH:mm:SS.sss'
  • --to-earliest :将偏移量重置为最早的偏移量。
  • --to-latest :将偏移量重置为最新偏移量。
  • --shift-by :重置偏移量,将当前偏移量移动“n”,其中“n”可以是正数或负数。
  • --from-file :将偏移量重置为 CSV 文件中定义的值。
  • --to-current :将偏移量重置为当前偏移量。
  • --by-duration :将偏移量重置为从当前时间戳开始按持续时间偏移。格式:'PnDTnHnMnS'
  • --to-offset :将偏移量重置为特定偏移量。

请注意,超出范围的偏移将调整为可用的偏移端。例如,如果偏移结束为10,偏移移位请求为15,则实际上会选择10处的偏移。

例如,将消费者组的偏移量重置为最新的偏移量:

  > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group consumergroup1 --topic topic1 --to-latest

  TOPIC                          PARTITION  NEW-OFFSET
  topic1                         0          0

如果您使用旧的高级使用者并将组元数据存储在 ZooKeeper 中(即offsets.storage=zookeeper),请传递 --zookeeper而不是--bootstrap-server

  > bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list

扩展您的集群

将服务器添加到 Kafka 集群非常简单,只需为它们分配一个唯一的代理 ID 并在新服务器上启动 Kafka 即可。但是,这些新服务器不会自动分配任何数据分区,因此除非将分区移动到它们,否则在创建新主题之前它们不会执行任何工作。因此,通常当您向集群添加机器时,您会希望将一些现有数据迁移到这些机器。

迁移数据的过程是手动启动的,但完全自动化。在幕后发生的事情是,Kafka 会将新服务器添加为它正在迁移的分区的跟随者,并允许它完全复制该分区中的现有数据。当新服务器完全复制该分区的内容并加入同步副本时,现有副本之一将删除其分区的数据。

分区重新分配工具可用于跨代理移动分区。理想的分区分布将确保所有代理的数据负载和分区大小均匀。分区重新分配工具无法自动研究 Kafka 集群中的数据分布并移动分区以获得均匀的负载分布。因此,管理员必须弄清楚应该移动哪些主题或分区。

分区重新分配工具可以在 3 种互斥的模式下运行:

  • --generate:在此模式下,给定主题列表和代理列表,该工具会生成候选重新​​分配,以将指定主题的所有分区移动到新代理。此选项仅提供了一种在给定主题和目标代理列表的情况下生成分区重新分配计划的便捷方法。
  • --execute:在此模式下,该工具根据用户提供的重新分配计划启动分区的重新分配。(使用 --reassignment-json-file 选项)。这可以是由管理员手工制作的自定义重新分配计划,也可以使用 --generate 选项提供
  • --verify:在此模式下,该工具验证上次 --execute 期间列出的所有分区的重新分配状态。状态可以是成功完成、失败或正在进行

自动将数据迁移到新机器

分区重新分配工具可用于将某些主题从当前代理集中移至新添加的代理。这在扩展现有集群时通常很有用,因为将整个主题移动到一组新的代理比一次移动一个分区更容易。当用于执行此操作时,用户应提供应移动到新代理集的主题列表以及新代理的目标列表。然后,该工具将给定主题列表的所有分区均匀分布在新的代理集上。在此移动过程中,主题的复制因子保持不变。实际上,主题输入列表的所有分区的副本都从旧的代理集移动到新添加的代理。

例如,以下示例将主题 foo1,foo2 的所有分区移动到新的代理集 5,6。在此移动结束时,主题 foo1 和 foo2 的所有分区将仅存在于代理 5,6 上。

由于该工具接受 json 文件形式的主题输入列表,因此您首先需要确定要移动的主题并创建 json 文件,如下所示:

  > cat topics-to-move.json
  {"topics": [{"topic": "foo1"},
              {"topic": "foo2"}],
  "version":1
  }

json 文件准备好后,使用分区重新分配工具生成候选分配:

  > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate
  Current partition replica assignment

  {"version":1,
  "partitions":[{"topic":"foo1","partition":0,"replicas":[2,1]},
                {"topic":"foo1","partition":1,"replicas":[1,3]},
                {"topic":"foo1","partition":2,"replicas":[3,4]},
                {"topic":"foo2","partition":0,"replicas":[4,2]},
                {"topic":"foo2","partition":1,"replicas":[2,1]},
                {"topic":"foo2","partition":2,"replicas":[1,3]}]
  }

  Proposed partition reassignment configuration

  {"version":1,
  "partitions":[{"topic":"foo1","partition":0,"replicas":[6,5]},
                {"topic":"foo1","partition":1,"replicas":[5,6]},
                {"topic":"foo1","partition":2,"replicas":[6,5]},
                {"topic":"foo2","partition":0,"replicas":[5,6]},
                {"topic":"foo2","partition":1,"replicas":[6,5]},
                {"topic":"foo2","partition":2,"replicas":[5,6]}]
  }

该工具生成一个候选分配,将所有分区从主题 foo1,foo2 移动到代理 5,6。但请注意,此时分区移动尚未开始,它仅告诉您当前分配和建议的新分配。应保存当前分配,以防您想回滚到当前分配。新分配应保存在 json 文件中(例如 Expand-cluster-reassignment.json),以便使用 --execute 选项输入到工具中,如下所示:

  > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file expand-cluster-reassignment.json --execute
  Current partition replica assignment

  {"version":1,
  "partitions":[{"topic":"foo1","partition":0,"replicas":[2,1]},
                {"topic":"foo1","partition":1,"replicas":[1,3]},
                {"topic":"foo1","partition":2,"replicas":[3,4]},
                {"topic":"foo2","partition":0,"replicas":[4,2]},
                {"topic":"foo2","partition":1,"replicas":[2,1]},
                {"topic":"foo2","partition":2,"replicas":[1,3]}]
  }

  Save this to use as the --reassignment-json-file option during rollback
  Successfully started partition reassignments for foo1-0,foo1-1,foo1-2,foo2-0,foo2-1,foo2-2

最后,--verify 选项可以与该工具一起使用来检查分区重新分配的状态。请注意,相同的 Expand-cluster-reassignment.json(与 --execute 选项一起使用)应与 --verify 选项一起使用:

  > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file expand-cluster-reassignment.json --verify
  Status of partition reassignment:
  Reassignment of partition [foo1,0] is completed
  Reassignment of partition [foo1,1] is still in progress
  Reassignment of partition [foo1,2] is still in progress
  Reassignment of partition [foo2,0] is completed
  Reassignment of partition [foo2,1] is completed
  Reassignment of partition [foo2,2] is completed

自定义分区分配和迁移

分区重新分配工具还可用于有选择地将分区的副本移动到一组特定的代理。当以这种方式使用时,假设用户知道重新分配计划并且不需要该工具来生成候选重新​​分配,从而有效地跳过 --generate 步骤并直接进入 --execute 步骤

例如,以下示例将主题 foo1 的分区 0 移动到代理 5,6,将主题 foo2 的分区 1 移动到代理 2,3:

第一步是在 json 文件中手工制定自定义重新分配计划:

  > cat custom-reassignment.json
  {"version":1,"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},{"topic":"foo2","partition":1,"replicas":[2,3]}]}

然后,使用带有 --execute 选项的 json 文件来启动重新分配过程:

  > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file custom-reassignment.json --execute
  Current partition replica assignment

  {"version":1,
  "partitions":[{"topic":"foo1","partition":0,"replicas":[1,2]},
                {"topic":"foo2","partition":1,"replicas":[3,4]}]
  }

  Save this to use as the --reassignment-json-file option during rollback
  Successfully started partition reassignments for foo1-0,foo2-1

--verify 选项可以与该工具一起使用来检查分区重新分配的状态。请注意,相同的 custom-reassignment.json (与 --execute 选项一起使用)应与 --verify 选项一起使用:

  > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file custom-reassignment.json --verify
  Status of partition reassignment:
  Reassignment of partition [foo1,0] is completed
  Reassignment of partition [foo2,1] is completed

退役broker

分区重新分配工具尚不具备为退役代理自动生成重新分配计划的能力。因此,管理员必须制定一个重新分配计划,将要停用的代理上托管的所有分区的副本移动到其余代理。这可能相对繁琐,因为重新分配需要确保所有副本不会从已停用的代理仅移动到另一个代理。为了使这个过程变得轻松,我们计划在未来为退役broker添加工具支持。

增加复制因子

增加现有分区的复制因子很容易。只需在自定义重新分配 json 文件中指定额外的副本,并将其与 --execute 选项一起使用即可增加指定分区的复制因子。

例如,以下示例将主题 foo 的分区 0 的复制因子从 1 增加到 3。在增加复制因子之前,该分区的唯一副本存在于代理 5 上。作为增加复制因子的一部分,我们将在代理 5 上添加更多副本broker 6 和 7。

第一步是在 json 文件中手工制定自定义重新分配计划:

  > cat increase-replication-factor.json
  {"version":1,
  "partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}

然后,使用带有 --execute 选项的 json 文件来启动重新分配过程:

  > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file increase-replication-factor.json --execute
  Current partition replica assignment

  {"version":1,
  "partitions":[{"topic":"foo","partition":0,"replicas":[5]}]}

  Save this to use as the --reassignment-json-file option during rollback
  Successfully started partition reassignment for foo-0

--verify 选项可以与该工具一起使用来检查分区重新分配的状态。请注意,相同的increase-replication-factor.json(与--execute选项一起使用)应与--verify选项一起使用:

  > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file increase-replication-factor.json --verify
  Status of partition reassignment:
  Reassignment of partition [foo,0] is completed

您还可以使用 kafka-topics 工具验证复制因子的增加:

  > bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic foo --describe
  Topic:foo PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: foo  Partition: 0    Leader: 5   Replicas: 5,6,7 Isr: 5,6,7

限制数据迁移期间的带宽使用

Kafka 允许您对复制流量进行限制,设置用于在机器之间移动副本的带宽上限。这在重新平衡集群、引导新代理或添加或删除代理时非常有用,因为它限制了这些数据密集型操作对用户产生的影响。

有两个接口可用于接合油门。最简单、最安全的方法是在调用 kafka-reassign-partitions.sh 时应用限制,但 kafka-configs.sh 也可用于直接查看和更改限制值。

例如,如果您要使用以下命令执行重新平衡,它将以不超过 50MB/s 的速度移动分区。

$ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --execute --reassignment-json-file big-cluster.json --throttle 50000000

当您执行此脚本时,您将看到油门接合:

  The inter-broker throttle limit was set to 50000000 B/s
  Successfully started partition reassignment for foo1-0

如果您希望在重新平衡期间改变油门,例如增加吞吐量,以便更快地完成,您可以通过使用传递相同的重新分配 json 文件的 --additional 选项重新运行执行命令来完成此操作:

$ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --additional --execute --reassignment-json-file big-cluster.json --throttle 700000000 经纪商间限制设置为 700000000 B/s

重新平衡完成后,管理员可以使用 --verify 选项检查重新平衡的状态。如果重新平衡已完成,则将通过 --verify 命令删除限制。重要的是,管理员通过运行带有 --verify 选项的命令,在重新平衡完成后及时取消限制。如果不这样做可能会导致常规复制流量受到限制。

当执行 --verify 选项并且重新分配完成时,脚本将确认限制已被删除:

  > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092  --verify --reassignment-json-file bigger-cluster.json
  Status of partition reassignment:
  Reassignment of partition [my-topic,1] is completed
  Reassignment of partition [my-topic,0] is completed

  Clearing broker-level throttles on brokers 1,2,3
  Clearing topic-level throttles on topic my-topic

管理员还可以使用 kafka-configs.sh 验证分配的配置。有两对节流配置用于管理节流过程。第一对指油门值本身。这是在代理级别使用动态属性进行配置的:

    leader.replication.throttled.rate
    follower.replication.throttled.rate

然后是限制副本的枚举集的配置对:

    leader.replication.throttled.replicas
    follower.replication.throttled.replicas

哪些是按主题配置的。

所有四个配置值均由 kafka-reassign-partitions.sh 自动分配(如下所述)。

查看油门限制配置:

  > bin/kafka-configs.sh --describe --bootstrap-server localhost:9092 --entity-type brokers
  Configs for brokers '2' are leader.replication.throttled.rate=700000000,follower.replication.throttled.rate=700000000
  Configs for brokers '1' are leader.replication.throttled.rate=700000000,follower.replication.throttled.rate=700000000

这显示了应用于复制协议的领导者和跟随者端的限制。默认情况下,双方都分配有相同的限制吞吐量值。

要查看受限制的副本列表:

  > bin/kafka-configs.sh --describe --bootstrap-server localhost:9092 --entity-type topics
  Configs for topic 'my-topic' are leader.replication.throttled.replicas=1:102,0:101,
      follower.replication.throttled.replicas=1:101,0:102

在这里,我们看到领导者节流应用于代理 102 上的分区 1 和代理 101 上的分区 0。同样,追随者节流应用于代理 101 上的分区 1 和代理 102 上的分区 0。

默认情况下,kafka-reassign-partitions.sh 会将领导者限制应用于重新平衡之前存在的所有副本,其中任何一个都可能是领导者。它将对所有移动目的地应用跟随油门。因此,如果代理 101,102 上有一个具有副本的分区,被重新分配给 102,103,则该分区的领导者限制将应用于 101,102,追随者限制将仅应用于 103。

如果需要,您还可以使用 kafka-configs.sh 上的 --alter 开关手动更改节流配置。

安全使用限制复制

使用限制复制时应小心。尤其:

(1) 节气门拆除:

重新分配完成后应及时移除限制(通过运行 kafka-reassign-partitions.sh --verify)。

(2) 确保进展:

如果与传入写入速率相比,限制设置得太低,复制可能无法取得进展。出现这种情况时:

max(BytesInPerSec) > 节流阀

其中 BytesInPerSec 是监控生产者写入每个代理的吞吐量的指标。

管理员可以在重新平衡期间使用以下指标监控复制是否取得进展:

kafka.server:类型=FetcherLagMetrics,名称=ConsumerLag,clientId=([-.\w]+),主题=([-.\w]+),分区=([0-9]+)

复制过程中滞后应该不断减少。如果指标没有减少,管理员应如上所述增加限制吞吐量。

设置配额

配额覆盖和默认值可以在(用户、客户端 ID)、用户或客户端 ID 级别进行配置,如此处所述。默认情况下,客户端获得无限配额。可以为每个(用户、客户端 ID)、用户或客户端 ID 组设置自定义配额。

配置自定义配额(user=user1,client-id=clientA):

  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
  Updated config for entity: user-principal 'user1', client-id 'clientA'.

为 user=user1 配置自定义配额:

  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1
  Updated config for entity: user-principal 'user1'.

为 client-id=clientA 配置自定义配额:

  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type clients --entity-name clientA
  Updated config for entity: client-id 'clientA'.

通过指定 --entity-default 选项而不是--entity-name , 可以为每个(用户、客户端 ID)、用户或客户端 ID 组设置默认配额。

为 user=userA 配置默认 client-id 配额:

  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1 --entity-type clients --entity-default
  Updated config for entity: user-principal 'user1', default client-id.

为用户配置默认配额:

  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-default
  Updated config for entity: default user-principal.

配置 client-id 的默认配额:

  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type clients --entity-default
  Updated config for entity: default client-id.

以下是描述给定(用户、客户端 ID)的配额的方法:

  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --describe --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
  Configs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200

描述给定用户的配额:

  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --describe --entity-type users --entity-name user1
  Configs for user-principal 'user1' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200

描述给定客户端 ID 的配额:

  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --describe --entity-type clients --entity-name clientA
  Configs for client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200

如果未指定实体名称,则描述指定类型的所有实体。例如,描述所有用户:

  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --describe --entity-type users
  Configs for user-principal 'user1' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
  Configs for default user-principal are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200

类似地对于(用户,客户端):

  > bin/kafka-configs.sh  --bootstrap-server localhost:9092 --describe --entity-type users --entity-type clients
  Configs for user-principal 'user1', default client-id are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
  Configs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200

6.2 数据中心

某些部署需要管理跨越多个数据中心的数据管道。我们推荐的方法是在每个数据中心部署一个本地 Kafka 集群,每个数据中心中的应用程序实例仅与其本地集群交互并在集群之间镜像数据(有关如何执行此操作的信息, 请参阅异地复制文档)。

这种部署模式允许数据中心充当独立实体,并允许我们集中管理和调整数据中心间的复制。即使数据中心间链路不可用,这也允许每个设施独立运行:当发生这种情况时,镜像会落后,直到链路恢复,此时它会赶上。

对于需要所有数据的全局视图的应用程序,您可以使用镜像来提供具有从所有数据中心的本地集群镜像的聚合数据的集群。这些聚合集群用于由需要完整数据集的应用程序进行读取。

这不是唯一可能的部署模式。可以通过 WAN 读取或写入远程 Kafka 集群,但显然这会增加获取集群所需的延迟。

Kafka 自然地在生产者和消费者中对数据进行批处理,因此即使在高延迟连接下也能实现高吞吐量。为了实现这一点,可能需要使用socket.send.buffer.bytessocket.receive.buffer.bytes配置来增加生产者、消费者和代理的 TCP 套接字缓冲区大小。此处记录了设置此值的适当方法。

通常建议通过高延迟链路运行跨多个数据中心的单个Kafka 集群。这将导致 Kafka 写入和 ZooKeeper 写入产生非常高的复制延迟,并且如果位置之间的网络不可用,Kafka 和 ZooKeeper 都不会在所有位置保持可用。

6.3 异地复制(跨集群数据镜像)

异地复制概述

Kafka 管理员可以定义跨越各个 Kafka 集群、数据中心或地理区域边界的数据流。组织、技术或法律要求通常需要此类事件流设置。常见场景包括:

  • 异地复制
  • 灾难恢复
  • 将边缘集群馈送到中央聚合集群
  • 集群的物理隔离(例如生产与测试)
  • 云迁移或混合云部署
  • 法律和合规要求

管理员可以使用 Kafka 的 MirrorMaker(版本 2)设置此类集群间数据流,这是一种以流式传输方式在不同 Kafka 环境之间复制数据的工具。MirrorMaker 构建在 Kafka Connect 框架之上,支持以下功能:

  • 复制主题(数据加配置)
  • 复制消费者组,包括在集群之间迁移应用程序的偏移量
  • 复制 ACL
  • 保留分区
  • 自动检测新主题和分区
  • 提供广泛的指标,例如跨多个数据中心/集群的端到端复制延迟
  • 容错和水平可扩展的操作

注意:使用 MirrorMaker 进行异地复制可跨 Kafka 集群复制数据。这种集群间复制与 Kafka 的集群内复制不同,后者在同一个 Kafka 集群内复制数据。

什么是复制流

借助 MirrorMaker,Kafka 管理员可以将主题、主题配置、消费者组及其偏移量以及 ACL 从一个或多个源 Kafka 集群复制到一个或多个目标 Kafka 集群,即跨集群环境。简而言之,MirrorMaker 使用连接器从源集群进行消费并生产到目标集群。

这些从源集群到目标集群的定向流称为复制流。它们是使用 MirrorMaker 配置文件中的格式定义的,{source_cluster}->{target_cluster}如下所述。管理员可以根据这些流程创建复杂的复制拓扑。

以下是一些示例模式:

  • 主动/主动高可用性部署:A->B, B->A
  • 主动/被动或主动/备用高可用性部署:A->B
  • 聚合(例如,从多个集群到一个集群):A->K, B->K, C->K
  • 扇出(例如,从一个集群到多个集群):K->A, K->B, K->C
  • 转发:A->B, B->C, C->D

默认情况下,流会复制所有主题和消费者组。但是,每个复制流都可以独立配置。例如,您可以定义仅将特定主题或消费者组从源集群复制到目标集群。

以下是有关如何配置从primary集群到secondary集群的数据复制(主动/被动设置)的第一个示例:

# Basic settings
clusters = primary, secondary
primary.bootstrap.servers = broker3-primary:9092
secondary.bootstrap.servers = broker5-secondary:9092

# Define replication flows
primary->secondary.enabled = true
primary->secondary.topics = foobar-topic, quux-.*

配置异地复制

以下部分介绍如何配置和运行专用 MirrorMaker 集群。如果您想在现有 Kafka Connect 集群或其他支持的部署设置中运行 MirrorMaker,请参阅KIP-382:MirrorMaker 2.0,并注意配置设置的名称可能因部署模式而异。

除了以下部分中介绍的内容之外,有关配置设置的更多示例和信息可在以下位置找到:

配置文件语法

MirrorMaker 配置文件通常命名为connect-mirror-maker.properties. 您可以在此文件中配置各种组件:

  • MirrorMaker 设置:全局设置,包括集群定义(别名)以及每个复制流的自定义设置
  • Kafka Connect 和连接器设置
  • Kafka 生产者、消费者和管理客户端设置

示例:定义 MirrorMaker 设置(稍后详细说明)。

# Global settings
clusters = us-west, us-east   # defines cluster aliases
us-west.bootstrap.servers = broker3-west:9092
us-east.bootstrap.servers = broker5-east:9092

topics = .*   # all topics to be replicated by default

# Specific replication flow settings (here: flow from us-west to us-east)
us-west->us-east.enabled = true
us-west->us.east.topics = foo.*, bar.*  # override the default above

MirrorMaker 基于 Kafka Connect 框架。Kafka Connect 文档章节中描述的任何 Kafka Connect、源连接器和接收器连接器设置都可以直接在 MirrorMaker 配置中使用,而无需更改配置设置的名称或为其添加前缀。

示例:定义 MirrorMaker 使用的自定义 Kafka Connect 设置。

# Setting Kafka Connect defaults for MirrorMaker
tasks.max = 5

大多数默认的 Kafka Connect 设置对于开箱即用的 MirrorMaker 都能很好地工作,但tasks.max. 为了在多个 MirrorMaker 进程之间均匀分配工作负载,建议根据可用硬件资源和要复制的主题分区总数 设置tasks.max为至少(最好更高)。2

您可以进一步自定义每个源或目标集群 的 MirrorMaker 的 Kafka Connect 设置(更准确地说,您可以“每个连接器”指定 Kafka Connect 工作线程级别的配置设置)。{cluster}.{config_name}使用MirrorMaker 配置文件中 的格式。

示例:为us-west集群定义自定义连接器设置。

# us-west custom settings
us-west.offset.storage.topic = my-mirrormaker-offsets

MirrorMaker 内部使用 Kafka 生产者、消费者和管理客户端。通常需要为这些客户端进行自定义设置。要覆盖默认值,请在 MirrorMaker 配置文件中使用以下格式:

  • {source}.consumer.{consumer_config_name}
  • {target}.producer.{producer_config_name}
  • {source_or_target}.admin.{admin_config_name}

示例:定义自定义生产者、消费者、管理客户端设置。

# us-west cluster (from which to consume)
us-west.consumer.isolation.level = read_committed
us-west.admin.bootstrap.servers = broker57-primary:9092

# us-east cluster (to which to produce)
us-east.producer.compression.type = gzip
us-east.producer.buffer.memory = 32768
us-east.admin.bootstrap.servers = broker8-secondary:9092

创建和启用复制流

要定义复制流,您必须首先在 MirrorMaker 配置文件中定义相应的源和目标 Kafka 集群。

  • clusters(必需):以逗号分隔的 Kafka 集群“别名”列表
  • {clusterAlias}.bootstrap.servers(必填):特定集群的连接信息;“引导”Kafka broker的逗号分隔列表

示例:定义两个集群别名primarysecondary,包括它们的连接信息。

clusters = primary, secondary
primary.bootstrap.servers = broker10-primary:9092,broker-11-primary:9092
secondary.bootstrap.servers = broker5-secondary:9092,broker6-secondary:9092

{source}->{target}.enabled = true其次,您必须根据需要 显式启用各个复制流。请记住,流是定向的:如果需要双向(双向)复制,则必须启用两个方向的流。

# Enable replication from primary to secondary
primary->secondary.enabled = true

默认情况下,复制流会将除少数特殊主题和使用者组之外的所有内容从源集群复制到目标集群,并自动检测任何新创建的主题和组。目标集群中复制主题的名称将以源集群的名称为前缀(请参阅下面的进一步部分)。例如,foo源集群中的主题将被复制到目标集群中us-west命名的主题。 us-west.foo``us-east

后续部分解释如何根据您的需要自定义此基本设置。

配置复制流

复制流的配置是顶级默认设置(例如topics)的组合,在其之上应用特定于流的设置(如果有)(例如us-west->us-east.topics)。要更改顶级默认设置,请将相应的顶级设置添加到 MirrorMaker 配置文件中。要仅覆盖特定复制流的默认值,请使用语法 format {source}->{target}.{config.name}

最重要的设置是:

  • topics:主题列表或正则表达式,定义源集群中要复制的主题(默认值topics = .*:)
  • topics.exclude:主题列表或正则表达式,用于随后排除与设置匹配的主题topics(默认值topics.exclude = .*[\-.]internal, .*.replica, __.*:)
  • groups:主题或正则表达式列表,定义源集群中要复制的消费者组(默认值groups = .*:)
  • groups.exclude:主题列表或正则表达式,用于随后排除与设置匹配的消费者组groups(默认值groups.exclude = console-consumer-.*, connect-.*, __.*:)
  • {source}->{target}.enable:设置true为启用复制流(默认值false:)

例子:

# Custom top-level defaults that apply to all replication flows
topics = .*
groups = consumer-group1, consumer-group2

# Don't forget to enable a flow!
us-west->us-east.enabled = true

# Custom settings for specific replication flows
us-west->us-east.topics = foo.*
us-west->us-east.groups = bar.*
us-west->us-east.emit.heartbeats = false

支持其他配置设置,下面列出了其中一些。在大多数情况下,您可以将这些设置保留为默认值。有关更多详细信息,请参阅MirrorMakerConfigMirrorConnectorConfig

  • refresh.topics.enabled:是否定期检查源集群中的新主题(默认:true)
  • refresh.topics.interval.seconds:在源集群中检查新主题的频率;低于默认值可能会导致性能下降(默认值:600,每十分钟)
  • refresh.groups.enabled:是否定期检查源集群中是否有新的消费者组(默认:true)
  • refresh.groups.interval.seconds:检查源集群中新消费者组的频率;低于默认值可能会导致性能下降(默认值:600,每十分钟)
  • sync.topic.configs.enabled:是否从源集群复制主题配置(默认:true)
  • sync.topic.acls.enabled:是否同步源集群的ACL(默认:true)
  • emit.heartbeats.enabled:是否定期发出心跳(默认:true)
  • emit.heartbeats.interval.seconds:发出心跳的频率(默认值:1,每隔一秒)
  • heartbeats.topic.replication.factor:MirrorMaker内部心跳主题的复制因子(默认:3)
  • emit.checkpoints.enabled:是否定期发出 MirrorMaker 的消费者偏移量(默认值:true)
  • emit.checkpoints.interval.seconds:发出检查点的频率(默认值:60,每分钟)
  • checkpoints.topic.replication.factor:MirrorMaker 内部检查点主题的复制因子(默认值:3)
  • sync.group.offsets.enabled``__consumer_offsets:只要该组中没有活动消费者连接到目标集群,是否 定期将复制的消费者组(在源集群中)的翻译偏移量写入目标集群中的主题(默认值:false)
  • sync.group.offsets.interval.seconds:消费者组偏移量同步的频率(默认值:60,每分钟)
  • offset-syncs.topic.replication.factor:MirrorMaker内部偏移同步主题的复制因子(默认值:3)

保护复制流

MirrorMaker 支持与 Kafka Connect 相同的安全设置,因此请参阅链接部分以获取更多信息。

示例:加密 MirrorMaker 与us-east集群之间的通信。

us-east.security.protocol=SSL
us-east.ssl.truststore.location=/path/to/truststore.jks
us-east.ssl.truststore.password=my-secret-password
us-east.ssl.keystore.location=/path/to/keystore.jks
us-east.ssl.keystore.password=my-secret-password
us-east.ssl.key.password=my-secret-password

目标集群中复制主题的自定义命名

目标集群中的复制主题(有时称为远程主题)根据复制策略进行重命名。MirrorMaker 使用此策略来确保来自不同集群的事件(也称为记录、消息)不会写入同一主题分区。默认情况下,根据DefaultReplicationPolicy,目标集群中复制主题的名称采用以下格式{source}.{source_topic_name}

us-west         us-east
=========       =================
                bar-topic
foo-topic  -->  us-west.foo-topic

您可以使用以下设置自定义分隔符(默认:.replication.policy.separator

# Defining a custom separator
us-west->us-east.replication.policy.separator = _

如果您需要进一步控制复制主题的命名方式,您可以在 MirrorMaker 配置中 实现自定义ReplicationPolicy并覆盖replication.policy.class(默认为)。DefaultReplicationPolicy

防止配置冲突

MirrorMaker 进程通过其目标 Kafka 集群共享配置。当针对同一目标集群运行的 MirrorMaker 进程之间的配置不同时,此行为可能会导致冲突。

例如,以下两个 MirrorMaker 进程将是活泼的:

# Configuration of process 1
A->B.enabled = true
A->B.topics = foo

# Configuration of process 2
A->B.enabled = true
A->B.topics = bar

在这种情况下,两个进程将通过 cluster 共享配置B,这会导致冲突。根据两个进程中的哪一个被选举为“领导者”,结果将是主题foo或主题bar被复制,但不会同时复制。

因此,在同一目标集群的复制流中保持 MirrorMaker 配置一致非常重要。例如,这可以通过自动化工具或为整个组织使用单个共享的 MirrorMaker 配置文件来实现。

最佳实践:从远程消费,生产到本地

为了最大限度地减少延迟(“生产者滞后”),建议将 MirrorMaker 进程放置在尽可能靠近其目标集群(即它向其生成数据的集群)的位置。这是因为 Kafka 生产者通常比 Kafka 消费者更容易遇到不可靠或高延迟的网络连接。

First DC          Second DC
==========        =========================
primary --------- MirrorMaker --> secondary
(remote)                           (local)

要运行这样的“从远程使用,生成到本地”设置,请在靠近目标集群且最好在与目标集群相同的位置运行 MirrorMaker 进程,并在命令行参数(空白分隔列表)中显式设置这些“本地”--clusters集群集群别名):

# Run in secondary's data center, reading from the remote `primary` cluster
$ ./bin/connect-mirror-maker.sh connect-mirror-maker.properties --clusters secondary

--clusters secondary告诉 MirrorMaker 进程给定的集群位于附近,并阻止其复制数据或将配置发送到其他远程位置的集群。

示例:主动/被动高可用性部署

以下示例显示了将主题从主 Kafka 环境复制到辅助 Kafka 环境的基本设置,但不从辅助 Kafka 环境复制回主环境。请注意,大多数生产设置都需要进一步配置,例如安全设置。

# Unidirectional flow (one-way) from primary to secondary cluster
primary.bootstrap.servers = broker1-primary:9092
secondary.bootstrap.servers = broker2-secondary:9092

primary->secondary.enabled = true
secondary->primary.enabled = false

primary->secondary.topics = foo.*  # only replicate some topics

示例:主动/主动高可用性部署

以下示例显示了以两种方式在两个集群之间复制主题的基本设置。请注意,大多数生产设置都需要进一步配置,例如安全设置。

# Bidirectional flow (two-way) between us-west and us-east clusters
clusters = us-west, us-east
us-west.bootstrap.servers = broker1-west:9092,broker2-west:9092
Us-east.bootstrap.servers = broker3-east:9092,broker4-east:9092

us-west->us-east.enabled = true
us-east->us-west.enabled = true

关于防止复制“循环”的注意事项(其中主题最初从 A 复制到 B,然后复制的主题将再次从 B 复制到 A,依此类推):只要您在同一个 MirrorMaker 中定义上述流程配置文件中,您不需要显式添加topics.exclude设置来防止两个集群之间的复制循环。

示例:多集群异地复制

让我们将前面部分中的所有信息放在一个更大的示例中。想象一下,有三个数据中心(西、东、北),每个数据中心有两个 Kafka 集群(例如 、west-1west-2。本节中的示例显示如何配置 MirrorMaker (1) 以实现每个数据中心内的主动/主动复制,以及 (2) 跨数据中心复制 (XDCR)。

首先,在配置中定义源集群和目标集群及其复制流:

# Basic settings
clusters: west-1, west-2, east-1, east-2, north-1, north-2
west-1.bootstrap.servers = ...
west-2.bootstrap.servers = ...
east-1.bootstrap.servers = ...
east-2.bootstrap.servers = ...
north-1.bootstrap.servers = ...
north-2.bootstrap.servers = ...

# Replication flows for Active/Active in West DC
west-1->west-2.enabled = true
west-2->west-1.enabled = true

# Replication flows for Active/Active in East DC
east-1->east-2.enabled = true
east-2->east-1.enabled = true

# Replication flows for Active/Active in North DC
north-1->north-2.enabled = true
north-2->north-1.enabled = true

# Replication flows for XDCR via west-1, east-1, north-1
west-1->east-1.enabled  = true
west-1->north-1.enabled = true
east-1->west-1.enabled  = true
east-1->north-1.enabled = true
north-1->west-1.enabled = true
north-1->east-1.enabled = true

然后,在每个数据中心中,启动一个或多个 MirrorMaker,如下所示:

# In West DC:
$ ./bin/connect-mirror-maker.sh connect-mirror-maker.properties --clusters west-1 west-2

# In East DC:
$ ./bin/connect-mirror-maker.sh connect-mirror-maker.properties --clusters east-1 east-2

# In North DC:
$ ./bin/connect-mirror-maker.sh connect-mirror-maker.properties --clusters north-1 north-2

通过此配置,任何集群生成的记录都将在数据中心内复制,并跨到其他数据中心。通过提供--clusters参数,我们确保每个 MirrorMaker 进程仅向附近的集群生成数据。

注意:--clusters从技术上讲,此处不需要该参数。没有它,MirrorMaker 也能正常工作。但是,吞吐量可能会受到数据中心之间“生产者滞后”的影响,并且您可能会产生不必要的数据传输成本。

开始异地复制

您可以根据需要运行任意数量的 MirrorMaker 进程(例如:节点、服务器)。由于 MirrorMaker 基于 Kafka Connect,因此配置为复制相同 Kafka 集群的 MirrorMaker 进程在分布式设置中运行:它们将找到彼此、共享配置(请参阅下面的部分)、负载平衡其工作等等。例如,如果您想要提高复制流的吞吐量,一种选择是并行运行其他 MirrorMaker 进程。

要启动 MirrorMaker 进程,请运行以下命令:

$ ./bin/connect-mirror-maker.sh connect-mirror-maker.properties

启动后,MirrorMaker 进程可能需要几分钟时间才开始复制数据。

或者,如前所述,您可以设置参数--clusters以确保 MirrorMaker 进程仅向附近的集群生成数据。

# Note: The cluster alias us-west must be defined in the configuration file
$ ./bin/connect-mirror-maker.sh connect-mirror-maker.properties \
            --clusters us-west

测试使用者组复制时请注意:默认情况下,MirrorMaker 不会复制该 kafka-console-consumer.sh工具创建的使用者组,您可以使用该工具在命令行上测试 MirrorMaker 设置。如果您确实也想复制这些使用者组,请groups.exclude相应地设置配置(默认值groups.exclude = console-consumer-.*, connect-.*, __.*:)。请记住在完成测试后再次更新配置。

停止异地复制

您可以通过使用以下命令发送 SIGTERM 信号来停止正在运行的 MirrorMaker 进程:

$ kill <MirrorMaker pid>

应用配置更改

要使配置更改生效,必须重新启动 MirrorMaker 进程。

监控异地复制

建议监控 MirrorMaker 进程,以确保所有定义的复制流程均正常启动并运行。MirrorMaker 基于 Connect 框架构建,并继承了 Connect 的所有指标,例如source-record-poll-rate. 此外,MirrorMaker 在kafka.connect.mirror指标组下生成自己的指标。指标带有以下属性标记:

  • source:源集群的别名(例如,primary
  • target:目标集群的别名(例如,secondary
  • topic:目标集群上的复制主题
  • partition:正在复制的分区

跟踪每个复制主题的指标。可以从主题名称推断出源集群。例如,复制topic1primary->secondary产生如下指标:

  • target=secondary
  • topic=primary.topic1
  • partition=1

发出以下指标:

# MBean: kafka.connect.mirror:type=MirrorSourceConnector,target=([-.w]+),topic=([-.w]+),partition=([0-9]+)

record-count            # number of records replicated source -> target
record-age-ms           # age of records when they are replicated
record-age-ms-min
record-age-ms-max
record-age-ms-avg
replication-latency-ms  # time it takes records to propagate source->target
replication-latency-ms-min
replication-latency-ms-max
replication-latency-ms-avg
byte-rate               # average number of bytes/sec in replicated records

# MBean: kafka.connect.mirror:type=MirrorCheckpointConnector,source=([-.w]+),target=([-.w]+)

checkpoint-latency-ms   # time it takes to replicate consumer offsets
checkpoint-latency-ms-min
checkpoint-latency-ms-max
checkpoint-latency-ms-avg

这些指标不区分创建时间和日志追加时间戳。

6.4 多租户

多租户概述

作为一个高度可扩展的事件流平台,Kafka 被许多用户用作他们的中枢神经系统,实时连接来自不同团队和业务线的各种不同系统和应用程序。这种多租户集群环境需要适当的控制和管理,以确保这些不同需求的和平共存。本节重点介绍设置此类共享环境的功能和最佳实践,这将帮助您操作满足 SLA/OLA 的集群,并最大限度地减少“吵闹的邻居”造成的潜在附带损害。

多租户是一个多方面的主题,包括但不限于:

  • 为租户创建用户空间(有时称为命名空间)
  • 使用数据保留策略等配置主题
  • 通过加密、身份验证和授权来保护主题和集群
  • 通过配额和费率限制隔离租户
  • 监控与计量
  • 集群间数据共享(参见异地复制)

使用主题命名为租户创建用户空间(命名空间)

操作多租户集群的 Kafka 管理员通常需要为每个租户定义用户空间。就本节而言,“用户空间”是主题的集合,这些主题在单个实体或用户的管理下组合在一起。

在Kafka中,数据的主要单位是主题。用户可以创建并命名每个主题。他们也可以删除它们,但无法直接重命名主题。相反,要重命名主题,用户必须创建新主题,将消息从原始主题移动到新主题,然后删除原始主题。考虑到这一点,建议基于分层主题命名结构来定义逻辑空间。然后,此设置可以与安全功能(例如前缀 ACL)相结合,以隔离不同的空间和租户,同时最大限度地减少保护集群中数据的管理开销。

这些逻辑用户空间可以通过不同的方式进行分组,具体选择取决于您的组织更喜欢如何使用 Kafka 集群。最常见的分组如下。

按团队或组织单位:在这里,团队是主要的聚合者。在团队是 Kafka 基础设施主要用户的组织中,这可能是最好的分组。

主题命名结构示例:

  • <organization>.<team>.<dataset>.<event-name>
    (例如,“acme.infosec.telemetry.logins”)

按项目或产品:在这里,一个团队管理多个项目。每个项目的凭据都不同,因此所有控件和设置将始终与项目相关。

主题命名结构示例:

  • <project>.<product>.<event-name>
    (例如,“移动性.付款.可疑”)

某些信息通常不应放在主题名称中,例如可能随时间变化的信息(例如,目标消费者的名称)或者是其他地方可用的技术细节或元数据(例如,主题的分区)计数和其他配置设置)。

要强制实施主题命名结构,可以使用以下几个选项:

  • 使用前缀 ACL(参见KIP-290)强制主题名称使用公共前缀。例如,团队 A 可能只被允许创建名称以 开头的主题payments.teamA.
  • 定义自定义CreateTopicPolicy(参见KIP-108和设置create.topic.policy.class.name)以强制执行严格的命名模式。这些策略提供了最大的灵活性,并且可以涵盖复杂的模式和规则来满足组织的需求。
  • 通过使用 ACL 拒绝普通用户禁用主题创建,然后依靠外部进程代表用户创建主题(例如,脚本或您最喜欢的自动化工具包)。
  • auto.create.topics.enable=false通过在代理配置中进行设置来禁用 Kafka 功能以按需自动创建主题也可能很有用。请注意,您不应仅仅依赖此选项。

配置主题:数据保留等

Kafka 的配置由于其精细的粒度而非常灵活,并且它支持大量的按主题配置设置,以帮助管理员设置多租户集群。例如,管理员通常需要定义数据保留策略,以控制数据在主题中存储的数量和/或多长时间,并使用诸如retention.bytes(大小)和retention.ms(时间)等设置。这限制了集群内的存储消耗,并有助于遵守 GDPR 等法律要求。

保护集群和主题:身份验证、授权、加密

由于该文档有专门的一章介绍适用于任何 Kafka 部署的安全性,因此本节重点介绍多租户环境的其他注意事项。

Kafka 的安全设置分为三个主要类别,这与管理员保护其他客户端-服务器数据系统(如关系数据库和传统消息系统)的方式类似。

  1. 对 Kafka 代理和 Kafka 客户端之间、代理之间、代理和 ZooKeeper 节点之间以及代理和其他可选工具之间传输的数据进行加密。
  2. 对从 Kafka 客户端和应用程序到 Kafka 代理的连接以及从 Kafka 代理到 ZooKeeper 节点的连接进行身份验证。
  3. 对主题的创建、删除、更改配置等客户端操作进行授权;将事件写入主题或从主题读取事件;创建和删除 ACL。管理员还可以定义自定义策略以实施其他限制,例如CreateTopicPolicyand AlterConfigPolicy(请参阅KIP-108和设置create.topic.policy.class.namealter.config.policy.class.name)。

当保护多租户 Kafka 环境时,最常见的管理任务是第三类(授权),即管理用户/客户端权限,授予或拒绝对某些主题的访问,从而授予或拒绝对集群内用户存储的数据的访问。该任务主要通过访问控制列表(ACL)的设置来执行。在这里,多租户环境的管理员特别受益于将分层主题命名结构放在适当的位置(如上一节所述),因为他们可以通过前缀 ACL 方便地控制对主题的访问(--resource-pattern-type Prefixed)。这大大减少了多租户环境中保护主题的管理开销:管理员可以在更高的开发便利性(更宽松的权限,使用更少和更广泛的 ACL)与更严格的安全性(更严格的权限,使用更多和更广泛的 ACL)之间进行权衡。更窄的 ACL)。

在以下示例中,用户 Alice(ACME 公司 InfoSec 团队的新成员)被授予对名称以“acme.infosec.”开头的所有主题的写入权限,例如“acme.infosec.telemetry.logins”和“acme.infosec.logins”。 infosec.syslogs.events”。

# Grant permissions to user Alice
$ bin/kafka-acls.sh \
    --bootstrap-server broker1:9092 \
    --add --allow-principal User:Alice \
    --producer \
    --resource-pattern-type prefixed --topic acme.infosec.

您可以类似地使用此方法来隔离同一共享集群上的不同客户。

隔离租户:配额、速率限制、限制

多租户集群通常应配置配额,以防止用户(租户)占用过多集群资源,例如当他们尝试写入或读取大量数据时,或以过高的速率向代理创建请求时。这可能会导致网络饱和、垄断代理资源并影响其他客户端——所有这些都是您希望在共享环境中避免的。

客户端配额: Kafka 支持不同类型的(每用户主体)客户端配额。由于客户端的配额与客户端写入或读取哪个主题无关,因此它们是在多租户集群中分配资源的便捷且有效的工具。例如,请求速率配额通过限制代理在请求处理路径上花费的时间来帮助限制用户对代理 CPU 使用率的影响在许多情况下,在多租户集群中,使用请求速率配额隔离用户比设置传入/传出网络带宽配额影响更大,因为用于处理请求的代理 CPU 使用率过高会降低有效带宽broker可以提供服务。此外,管理员还可以定义主题操作的配额(例如创建、删除和更改),以防止 Kafka 集群因高并发主题操作而不堪重负(请参阅KIP-599和配额类型controller_mutation_rate)。

服务器配额: Kafka还支持不同类型的broker端配额。例如,管理员可以设置代理接受新连接的速率限制、设置每个代理的最大连接数或设置允许来自特定 IP 地址的最大连接数。

欲了解更多信息,请参阅配额概述如何设置配额

监控与计量

监控是一个更广泛的主题,在文档的其他部分中有介绍。任何 Kafka 环境(尤其是多租户环境)的管理员都应根据这些说明设置监控。Kafka 支持广泛的指标,例如身份验证尝试失败率、请求延迟、消费者滞后、消费者组总数、上一节中描述的配额指标等等。

例如,可以将监控配置为跟踪主题分区的大小(使用 JMX 指标kafka.log.Log.Size.<TOPIC-NAME>),从而跟踪主题中存储的数据的总大小。然后,您可以定义当共享集群上的租户即将使用过多存储空间时发出的警报。

多租户和地理复制

Kafka 允许您跨不同集群共享数据,这些集群可能位于不同的地理区域、数据中心等。除了灾难恢复等用例之外,当多租户设置需要集群间数据共享时,此功能非常有用。有关详细信息, 请参阅异地复制(跨集群数据镜像)部分。

进一步的考虑

数据合约:您可能需要使用事件模式在集群中的数据生产者和消费者之间定义数据契约。这确保写入 Kafka 的事件始终可以再次正确读取,并防止写入格式错误或损坏的事件。实现此目标的最佳方法是与集群一起部署所谓的模式注册表。(Kafka 不包含模式注册表,但有可用的第三方实现。)模式注册表管理事件模式并将模式映射到主题,以便生产者知道哪些主题正在接受哪些类型(模式)的事件,以及消费者知道如何读取和解析主题中的事件。一些注册表实现提供了更多功能,例如架构演变、存储所有架构的历史记录以及架构兼容性设置。

6.5 Kafka配置

重要的客户端配置

最重要的生产者配置是:

  • 确认
  • 压缩
  • 批量大小

最重要的消费者配置是获取大小。

所有配置都记录在配置部分中。

生产服务器配置

以下是生产服务器配置示例:

  # ZooKeeper
  zookeeper.connect=[list of ZooKeeper servers]

  # Log configuration
  num.partitions=8
  default.replication.factor=3
  log.dir=[List of directories. Kafka should have its own dedicated disk(s) or SSD(s).]

  # Other configurations
  broker.id=[An integer. Start with 0 and increment by 1 for each new broker.]
  listeners=[list of listeners]
  auto.create.topics.enable=false
  min.insync.replicas=2
  queued.max.requests=[number of concurrent requests]

我们的客户端配置在不同的用例之间存在很大差异。

6.6 Java版本

支持 Java 8、Java 11 和 Java 17。请注意,自 Apache Kafka 3.0 起,Java 8 支持已被弃用,并将在 Apache Kafka 4.0 中删除。如果启用 TLS,Java 11 及更高版本的性能会显着提高,因此强烈推荐它们(它们还包括许多其他性能改进:G1GC、CRC32C、紧凑字符串、线程本地握手等)。从安全角度来看,我们推荐最新发布的补丁版本,因为旧的免费版本已披露了安全漏洞。使用基于 OpenJDK 的 Java 实现(包括 Oracle JDK)运行 Kafka 的典型参数是:

  -Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseG1GC
  -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M
  -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80 -XX:+ExplicitGCInvokesConcurrent

作为参考,以下是使用上述 Java 参数的 LinkedIn 最繁忙集群之一(高峰期)的统计数据:

  • 60 名broker
  • 50k 分区(复制因子 2)
  • 800k 消息/秒
  • 入站 300 MB/秒,出站 1 GB/秒以上

该集群中的所有代理的 90% GC 暂停时间约为 21 毫秒,每秒少于 1 次年轻 GC。

6.7 硬件和操作系统

我们使用具有 24GB 内存的双四核 Intel Xeon 机器。

您需要足够的内存来缓冲活动的读取器和写入器。您可以通过假设您希望能够缓冲 30 秒并将内存需求计算为 write_throughput*30 来对内存需求进行粗略估计。

磁盘吞吐量很重要。我们有 8x7200 rpm SATA 驱动器。一般来说磁盘吞吐量是性能瓶颈,磁盘越多越好。根据您配置刷新行为的方式,您可能会也可能不会从更昂贵的磁盘中受益(如果您经常强制刷新,那么更高 RPM 的 SAS 驱动器可能会更好)。

操作系统

Kafka应该可以在任何unix系统上运行良好,并且已经在Linux和Solaris上进行了测试。

我们发现在 Windows 上运行时存在一些问题,并且 Windows 目前不是一个得到良好支持的平台,但我们很乐意对此进行更改。

它不太可能需要太多操作系统级别的调整,但存在三个潜在重要的操作系统级别配置:

  • 文件描述符限制:Kafka 使用文件描述符来表示日志段和打开的连接。如果代理托管许多分区,请考虑代理除了代理建立的连接数之外,还至少需要 (number_of_partitions)*(partition_size/segment_size) 来跟踪所有日志段。我们建议代理进程至少使用 100000 个允许的文件描述符作为起点。注意:mmap() 函数添加对与文件描述符 fildes 关联的文件的额外引用,该文件描述符上的后续 close() 不会删除该引用。当不再有到该文件的映射时,该引用将被删除。
  • 最大套接字缓冲区大小:可以增加以实现数据中心之间的高性能数据传输,如此处所述
  • 进程可以拥有的内存映射区域的最大数量(也称为 vm.max_map_count)。请参阅 Linux 内核文档。在考虑代理可能拥有的最大分区数时,您应该留意这个操作系统级别的属性。默认情况下,在许多 Linux 系统上,vm.max_map_count 的值约为 65535。每个分区分配的每个日志段都需要一对索引/时间索引文件,并且每个文件消耗 1 个映射区域。换句话说,每个日志段使用2个地图区域。因此,每个分区至少需要 2 个映射区域,只要它托管单个日志段即可。也就是说,在代理上创建 50000 个分区将导致分配 100000 个映射区域,并可能在具有默认 vm.max_map_count 的系统上导致代理崩溃并出现 OutOfMemoryError(映射失败)。请记住,每个分区的日志段数根据段大小、负载强度、保留策略以及通常情况下的变化而变化

磁盘和文件系统

我们建议使用多个驱动器来获得良好的吞吐量,并且不要与应用程序日志或其他操作系统文件系统活动共享用于 Kafka 数据的相同驱动器,以确保良好的延迟。您可以将这些驱动器一起 RAID 到单个卷中,也可以将每个驱动器格式化并安装为其自己的目录。由于 Kafka 具有复制功能,因此 RAID 提供的冗余也可以在应用程序级别提供。这种选择有几个权衡。

如果配置多个数据目录,分区将循环分配给数据目录。每个分区将完全位于一个数据目录中。如果分区之间的数据没有很好地平衡,这可能会导致磁盘之间的负载不平衡。

RAID 在平衡磁盘之间的负载方面可能会做得更好(尽管看起来并不总是如此),因为它在较低级别上平衡负载。RAID 的主要缺点是它通常会严重影响写入吞吐量并减少可用磁盘空间。

RAID 的另一个潜在好处是能够容忍磁盘故障。然而,我们的经验是,重建 RAID 阵列的 I/O 密集程度很高,以至于它会有效地禁用服务器,因此这并不能提供太多真正的可用性改进。

应用程序与操作系统刷新管理

Kafka 总是立即将所有数据写入文件系统,并支持配置刷新策略的功能,该策略控制何时使用刷新将数据强制从操作系统缓存中取出并写入磁盘。可以控制此刷新策略,以在一段时间后或在写入一定数量的消息后强制将数据写入磁盘。此配置有多种选择。

Kafka 最终必须调用 fsync 才能知道数据已刷新。当从任何未知的 fsync 日志段的崩溃中恢复时,Kafka 将通过检查其 CRC 来检查每条消息的完整性,并重建随附的偏移量索引文件,作为启动时执行的恢复过程的一部分。

请注意,Kafka 中的持久性不需要将数据同步到磁盘,因为故障节点始终会从其副本中恢复。

我们建议使用默认刷新设置,完全禁用应用程序 fsync。这意味着依赖操作系统完成的后台刷新和 Kafka 自己的后台刷新。这为大多数用途提供了最好的特性:无需调节旋钮、出色的吞吐量和延迟以及完全恢复保证。我们通常认为复制提供的保证比同步到本地磁盘更强,但是偏执者可能仍然更喜欢两者,并且仍然支持应用程序级 fsync 策略。

使用应用程序级别刷新设置的缺点是,它的磁盘使用模式效率较低(它给操作系统重新排序写入的余地较小),并且可能会引入延迟,因为大多数 Linux 文件系统中的 fsync 会阻止写入文件,而后台刷新执行更细粒度的页面级锁定。

一般来说,您不需要对文件系统进行任何低级调整,但在接下来的几节中,我们将讨论其中的一些内容,以防它有用。

了解 Linux 操作系统刷新行为

在 Linux 中,写入文件系统的数据保留在页面缓存中,直到必须将其写出到磁盘(由于应用程序级 fsync 或操作系统自身的刷新策略)。数据的刷新是由一组称为 pdflush 的后台线程(或在 2.6.32 后的内核中的“flusher 线程”)完成的。

Pdflush 有一个可配置的策略,可以控制缓存中可以保留多少脏数据以及必须将其写回磁盘之前的时间。此处描述了该策略。当 Pdflush 无法跟上数据写入的速率时,最终会导致写入过程阻塞,从而产生写入延迟,从而减慢数据的积累。

您可以通过以下方式查看操作系统内存使用的当前状态

> 猫 /proc/meminfo

这些值的含义在上面的链接中有描述。

与进程内缓存相比,使用页面缓存来存储将写出到磁盘的数据有几个优点:

  • I/O 调度程序会将连续的小写入批量合并为更大的物理写入,从而提高吞吐量。
  • I/O 调度程序将尝试重新排序写入,以最大限度地减少磁盘头的移动,从而提高吞吐量。
  • 它会自动使用机器上的所有可用内存

文件系统选择

Kafka 使用磁盘上的常规文件,因此它对特定文件系统没有硬依赖。然而,使用最多的两个文件系统是 EXT4 和 XFS。从历史上看,EXT4 的使用量较多,但最近对 XFS 文件系统的改进表明,它对于 Kafka 工作负载具有更好的性能特征,且不影响稳定性。

使用各种文件系统创建和挂载选项,在具有大量消息负载的集群上执行比较测试。Kafka 中受监控的主要指标是“请求本地时间”,表示追加操作所花费的时间。XFS 带来了更好的本地时间(对于最佳 EXT4 配置,本地时间为 160 毫秒,而 250 毫秒以上),并且平均等待时间更短。XFS 性能还显示磁盘性能的变化较小。

一般文件系统注释

对于用于数据目录的任何文件系统,在 Linux 系统上,建议在挂载时使用以下选项:

  • noatime:此选项禁止在读取文件时更新文件的 atime(上次访问时间)属性。这可以消除大量的文件系统写入,特别是在引导消费者的情况下。Kafka 根本不依赖 atime 属性,因此禁用它是安全的。

XFS 注释

XFS 文件系统具有大量自动调整功能,因此无论是在文件系统创建时还是在挂载时,都不需要对默认设置进行任何更改。唯一值得考虑的调整参数是:

  • Largeio:这会影响 stat 调用报告的首选 I/O 大小。虽然这可以在较大的磁盘写入上实现更高的性能,但实际上它对性能的影响很小或没有影响。
  • nobarrier:对于具有电池支持缓存的底层设备,此选项可以通过禁用定期写入刷新来提供更高的性能。但是,如果底层设备表现良好,它将向文件系统报告它不需要刷新,并且此选项将不起作用。

EXT4注释

EXT4 是 Kafka 数据目录的一个可用的文件系统选择,但是要充分利用它的性能需要调整多个安装选项。此外,这些选项在故障情况下通常是不安全的,并且会导致更多的数据丢失和损坏。对于单个代理故障,这并不是什么大问题,因为可以擦除磁盘并从集群重建副本。在多次故障的情况下,例如断电,这可能意味着底层文件系统(以及数据)损坏且难以恢复。可以调整以下选项:

  • data=writeback:Ext4 默认为 data=ordered,这对某些写入设置了强顺序。Kafka 不需要这种排序,因为它对所有未刷新的日志进行非常偏执的数据恢复。此设置消除了排序限制,并且似乎显着减少了延迟。
  • 禁用日志记录:日志记录是一种权衡:它使服务器崩溃后重新启动速度更快,但它引入了大量额外的锁定,从而增加了写入性能的差异。那些不关心重新启动时间并希望减少写入延迟峰值的主要来源的人可以完全关闭日志记录。
  • commit=num_secs:这会调整 ext4 提交其元数据日志的频率。将其设置为较低的值可以减少崩溃期间未刷新数据的丢失。将其设置为更高的值将提高吞吐量。
  • nobh:此设置控制使用 data=writeback 模式时的附加排序保证。这对于 Kafka 来说应该是安全的,因为我们不依赖于写入顺序并提高了吞吐量和延迟。
  • delalloc:延迟分配意味着文件系统在物理写入发生之前避免分配任何块。这允许 ext4 分配较大的范围而不是较小的页面,并有助于确保数据按顺序写入。此功能对于吞吐量非常有用。它似乎确实涉及文件系统中的一些锁定,这增加了一些延迟差异。

更换 KRaft 控制器磁盘

metadata.log.dir当 Kafka 配置为使用 KRaft 时,控制器将集群元数据存储在-- 或第一个日志目录(如果metadata.log.dir未配置)中指定的目录中。metadata.log.dir有关详细信息,请参阅文档。

如果由于硬件故障或需要更换硬件而导致集群元数据目录中的数据丢失,则在配置新的控制器节点时应小心。在大多数控制器拥有所有提交的数据之前,不应格式化并启动新的控制器节点。要确定大多数控制器是否具有已提交的数据,请运行该kafka-metadata-quorum.sh工具来描述复制状态:

 > bin/kafka-metadata-quorum.sh --bootstrap-server broker_host:port describe --replication
 NodeId  LogEndOffset    Lag     LastFetchTimestamp      LastCaughtUpTimestamp   Status
 1       25806           0       1662500992757           1662500992757           Leader
 ...     ...             ...     ...                     ...                     ...

检查并等待,直到Lag对于大多数控制器来说都很小。如果领导者的结束偏移量没有增加,则可以等到滞后为 0 时才获得多数;否则,您可以选择最新的领导者末端偏移量并等待所有副本都到达它。检查并等待,直到大多数控制器的LastFetchTimestamp和彼此接近。LastCaughtUpTimestamp此时,格式化控制器的元数据日志目录会更安全。这可以通过运行命令来完成kafka-storage.sh

 > bin/kafka-storage.sh format --cluster-id uuid --config server_properties

上面的命令可能bin/kafka-storage.sh format会失败并显示类似 的消息Log directory ... is already formatted。当使用组合模式并且仅丢失元数据日志目录而不丢失其他目录时,可能会发生这种情况。在这种情况下并且只有在这种情况下,您才能运行kafka-storage.sh format带有该--ignore-formatted选项的命令。

格式化日志目录后启动 KRaft 控制器。

 > /bin/kafka-server-start.sh server_properties

6.8 监控

Kafka 使用 Yammer Metrics 在服务器中进行指标报告。Java 客户端使用 Kafka Metrics,这是一个内置指标注册表,可最大程度地减少客户端应用程序中的传递依赖性。两者都通过 JMX 公开指标,并且可以配置为使用可插入统计报告器报告统计信息以连接到您的监控系统。

所有 Kafka 速率指标都有一个相应的累积计数指标,后缀为-total。例如, records-consumed-rate有一个名为 的相应指标records-consumed-total

查看可用指标的最简单方法是启动 jconsole 并将其指向正在运行的 kafka 客户端或服务器;这将允许使用 JMX 浏览所有指标。

使用 JMX 进行远程监控的安全注意事项

Apache Kafka 默认禁用远程 JMX。JMX_PORT您可以通过为使用 CLI启动的进程设置环境变量或标准 Java 系统属性来以编程方式启用远程 JMX,从而使用 JMX 启用远程监控 。在生产场景中启用远程 JMX 时,您必须启用安全性,以确保未经授权的用户无法监视或控制您的代理或应用程序以及它们运行的​​平台。KAFKA_JMX_OPTS请注意,默认情况下,Kafka 中的 JMX 身份验证处于禁用状态,并且必须通过为使用 CLI 启动的进程设置环境变量或设置适当的 Java 系统属性来覆盖生产部署的安全配置 。请参阅 使用 JMX 技术进行监控和管理 有关保护 JMX 的详细信息。

我们对以下指标进行绘图和警报:

描述 MBEAN 名称 正常值
留言率 kafka.server:类型=BrokerTopicMetrics,名称=MessagesInPerSec,主题=([-.\w]+) 每个主题的传入消息率。省略“topic=(...)”将产生全主题率。
来自客户端的字节速率 kafka.server:类型=BrokerTopicMetrics,名称=BytesInPerSec,主题=([-.\w]+) 每个主题的字节输入(来自客户端)速率。省略“topic=(...)”将产生全主题率。
其他经纪商的字节率 kafka.server:类型=BrokerTopicMetrics,名称=ReplicationBytesInPerSec,主题=([-.\w]+) 每个主题的字节(来自其他代理)速率。省略“topic=(...)”将产生全主题率。
控制器对代理的请求率 kafka.controller:类型=ControllerChannelManager,名称=RequestRateAndQueueTimeMs,brokerId=([0-9]+) ControllerChannelManager 从给定代理的队列中获取请求的速率(每秒请求数)。以及请求在从队列中取出之前在此队列中停留的时间。
控制器事件队列大小 kafka.controller:类型=ControllerEventManager,名称=EventQueueSize ControllerEventManager 队列的大小。
控制器事件队列时间 kafka.controller:类型=ControllerEventManager,名称=EventQueueTimeMs 任何事件(Idle 事件除外)在处理之前在 ControllerEventManager 队列中等待所花费的时间
请求率 kafka.network:类型=RequestMetrics,名称=RequestsPerSec,请求={Produce FetchConsumer
错误率 kafka.network:类型=RequestMetrics,名称=ErrorsPerSec,请求=([-.\w]+),错误=([-.\w]+) 每个请求类型、每个错误代码计数的响应中的错误数。如果响应包含多个错误,则所有错误都会被计算在内。error=NONE 表示响应成功。
生产请求率 kafka.server:类型=BrokerTopicMetrics,名称=TotalProduceRequestsPerSec,主题=([-.\w]+) 生成每个主题的请求率。省略“topic=(...)”将产生全主题率。
获取请求率 kafka.server:类型=BrokerTopicMetrics,名称=TotalFetchRequestsPerSec,主题=([-.\w]+) 获取每个主题的请求(来自客户或关注者)率。省略“topic=(...)”将产生全主题率。
失败的产品请求率 kafka.server:类型=BrokerTopicMetrics,名称=FailedProduceRequestsPerSec,主题=([-.\w]+) 每个主题的生成请求率失败。省略“topic=(...)”将产生全主题率。
获取请求失败率 kafka.server:类型=BrokerTopicMetrics,名称=FailedFetchRequestsPerSec,主题=([-.\w]+) 每个主题的失败获取请求(来自客户端或关注者)率。省略“topic=(...)”将产生全主题率。
请求大小(以字节为单位) kafka.network:类型=RequestMetrics,名称=RequestBytes,请求=([-.\w]+) 每种请求类型的请求大小。
临时内存大小(以字节为单位) kafka.network:type=RequestMetrics,name=TemporaryMemoryBytes,request={Produce Fetch}
消息转换时间 kafka.network:类型=RequestMetrics,名称=MessageConversionsTimeMs,请求={生产 获取}
消息转化率 kafka.server:type=BrokerTopicMetrics,name={Produce Fetch}MessageConversionsPerSec,topic=([-.\w]+)
请求队列大小 kafka.network:类型=RequestChannel,名称=RequestQueueSize 请求队列的大小。
客户端的字节输出率 kafka.server:类型=BrokerTopicMetrics,名称=BytesOutPerSec,主题=([-.\w]+) 每个主题的字节输出(至客户端)速率。省略“topic=(...)”将产生全主题率。
其他经纪商的字节输出率 kafka.server:类型=BrokerTopicMetrics,名称=ReplicationBytesOutPerSec,主题=([-.\w]+) 每个主题的字节输出(至其他代理)速率。省略“topic=(...)”将产生全主题率。
拒绝字节率 kafka.server:类型=BrokerTopicMetrics,名称=BytesRejectedPerSec,主题=([-.\w]+) 由于记录批量大小大于 max.message.bytes 配置,每个主题被拒绝的字节率。省略“topic=(...)”将产生全主题率。
由于没有为压缩主题指定密钥而导致消息验证失败率 kafka.server:类型=BrokerTopicMetrics,名称=NoKeyCompactedTopicRecordsPerSec 0
由于无效幻数导致的消息验证失败率 kafka.server:类型=BrokerTopicMetrics,名称=InvalidMagicNumberRecordsPerSec 0
由于 crc 校验和不正确导致消息验证失败率 kafka.server:类型=BrokerTopicMetrics,名称=InvalidMessageCrcRecordsPerSec 0
批次中偏移量或序列号不连续导致消息验证失败率 kafka.server:类型=BrokerTopicMetrics,名称=InvalidOffsetOrSequenceRecordsPerSec 0
日志刷新率和时间 kafka.log:类型=LogFlushStats,名称=LogFlushRateAndTimeMs
脱机日志目录数 kafka.log:type=LogManager,name=OfflineLogDirectoryCount 0
领导者选举率 kafka.controller:类型=ControllerStats,名称=LeaderElectionRateAndTimeMs 当代理失败时非零
不干净的领导人选举率 kafka.controller:类型=ControllerStats,名称=UncleanLeaderElectionsPerSec 0
控制器在代理上是否处于活动状态 kafka.controller:类型=KafkaController,名称=ActiveControllerCount 集群中只有一个代理应该有 1
待定主题删除 kafka.controller:类型=KafkaController,名称=TopicsToDeleteCount
待处理的副本删除 kafka.controller:类型=KafkaController,名称=ReplicasToDeleteCount
不符合条件的待定主题删除 kafka.controller:类型= KafkaController,名称= TopicsIneligibleToDeleteCount
不合格的待定副本删除 kafka.controller:类型= KafkaController,名称= ReplicasIneligibleToDeleteCount
复制不足的分区数量( ISR <
minIsr 分区数量 ( ISR < min.insync.replicas)
# of at minIsr 分区 ( ISR = min.insync.replicas)
生产者 ID 计数 kafka.server:类型=ReplicaManager,名称=ProducerIdCount 代理上每个副本中事务性和幂等生产者创建的所有生产者 ID 的计数
分区计数 kafka.server:类型=ReplicaManager,名称=PartitionCount 大多数甚至跨broker
离线副本计数 kafka.server:类型=ReplicaManager,名称=OfflineReplicaCount 0
领导者副本数量 kafka.server:类型=ReplicaManager,名称=LeaderCount 大多数甚至跨broker
ISR收缩率 kafka.server:类型=ReplicaManager,名称=IsrShrinksPerSec 如果代理出现故障,某些分区的 ISR 将缩小。当该代理再次启动时,一旦副本完全赶上,ISR 将扩展。除此之外,ISR收缩率和扩张率的预期值为0。
ISR扩张率 kafka.server:类型=ReplicaManager,名称=IsrExpandsPerSec 往上看
ISR 更新失败率 kafka.server:类型=ReplicaManager,名称=FailedIsrUpdatesPerSec 0
追随者和领导者副本之间的消息最大延迟 kafka.server:类型=ReplicaFetcherManager,名称=MaxLag,clientId=副本 滞后应与生产请求的最大批量大小成正比。
每个追随者副本的消息滞后 kafka.server:类型=FetcherLagMetrics,名称=ConsumerLag,clientId=([-.\w]+),主题=([-.\w]+),分区=([0-9]+) 滞后应与生产请求的最大批量大小成正比。
请求在生产者炼狱中等待 kafka.server:类型=DelayedOperationPurgatory,名称=PurgatorySize,delayedOperation=生产 如果使用 ack=-1,则非零
请求在获取炼狱中等待 kafka.server:类型=DelayedOperationPurgatory,名称=PurgatorySize,delayedOperation=Fetch 大小取决于消费者中的 fetch.wait.max.ms
请求总时间 kafka.network:type=RequestMetrics,name=TotalTimeMs,request={Produce FetchConsumer
请求在请求队列中等待的时间 kafka.network:类型=RequestMetrics,名称=RequestQueueTimeMs,请求={Produce FetchConsumer
领导者处理请求的时间 kafka.network:type=RequestMetrics,name=LocalTimeMs,request={Produce FetchConsumer
请求等待关注者的时间 kafka.network:type=RequestMetrics,name=RemoteTimeMs,request={Produce FetchConsumer
请求在响应队列中等待的时间 kafka.network:类型=RequestMetrics,名称=ResponseQueueTimeMs,请求={Produce FetchConsumer
发送响应的时间 kafka.network:类型=RequestMetrics,名称=ResponseSendTimeMs,请求={Produce FetchConsumer
消费者落后于生产者的消息数量。由消费者而非broker发布。 kafka.consumer:type=consumer-fetch-manager-metrics,client-id={client-id} 属性:records-lag-max
网络处理器空闲的平均时间比例 kafka.network:类型=SocketServer,名称=NetworkProcessorAvgIdlePercent 0 到 1 之间,理想情况下 > 0.3
由于客户端未重新进行身份验证,然后在过期时间之后使用该连接进行除重新身份验证之外的任何操作而在处理器上断开的连接数 kafka.server:类型 = 套接字服务器指标,监听器 = [SASL_PLAINTEXT SASL_SSL],networkProcessor = <#>,名称 = 过期连接杀死计数
由于客户端未重新进行身份验证,然后在过期时间之后使用连接进行除重新身份验证以外的任何操作,导致所有处理器上断开的连接总数 kafka.network:类型=SocketServer,名称=ExpiredConnectionsKilledCount 理想情况下,启用重新身份验证时为 0,这意味着不再有任何旧的、2.2.0 之前的客户端连接到该代理
请求处理程序线程空闲的平均时间比例 kafka.server:类型=KafkaRequestHandlerPool,名称=RequestHandlerAvgIdlePercent 0 到 1 之间,理想情况下 > 0.3
每个(用户、客户端 ID)、用户或客户端 ID 的带宽配额指标 kafka.server:type={Produce Fetch},user=([-.\w]+),client-id=([-.\w]+)
每个(用户、客户端 ID)、用户或客户端 ID 的请求配额指标 kafka.server:类型=请求,用户=([-.\w]+),客户端id=([-.\w]+) 两个属性。throttle-time 表示客户端被限制的时间(以毫秒为单位)。理想情况下 = 0。请求时间表示代理网络和 I/O 线程处理来自客户端组的请求所花费的时间百分比。对于(用户、客户端 ID)配额,同时指定用户和客户端 ID。如果将 per-client-id 配额应用于客户端,则不指定用户。如果应用每用户配额,则不指定 client-id。
请求免受限制 kafka.server:type=请求 except-throttle-time 表示代理网络和 I/O 线程处理不受限制的请求所花费的时间百分比。
ZooKeeper 客户端请求延迟 kafka.server:类型=ZooKeeperClientMetrics,名称=ZooKeeperRequestLatencyMs 来自代理的 ZooKeeper 请求的延迟(以毫秒为单位)。
ZooKeeper 连接状态 kafka.server:类型=SessionExpireListener,名称=SessionState 代理的 ZooKeeper 会话的连接状态,可能是 Disconnected
加载组元数据的最长时间 kafka.server:类型=组协调器指标,名称=分区加载时间最大值 从最近 30 秒内加载的消费者偏移分区中加载偏移量和分组元数据所花费的最长时间,以毫秒为单位(包括等待调度加载任务所花费的时间)
加载组元数据的平均时间 kafka.server:类型=组协调器指标,名称=分区加载时间平均 从最近 30 秒内加载的消费者偏移分区中加载偏移量和分组元数据所花费的平均时间,以毫秒为单位(包括等待调度加载任务所花费的时间)
加载交易元数据的最长时间 kafka.server:类型=事务协调器指标,名称=分区加载时间最大值 从最近 30 秒内加载的消费者偏移分区加载事务元数据所花费的最长时间,以毫秒为单位(包括等待调度加载任务所花费的时间)
加载交易元数据的平均时间 kafka.server:类型=事务协调器指标,名称=分区加载时间平均 从最近 30 秒内加载的消费者偏移分区加载事务元数据所花费的平均时间,以毫秒为单位(包括等待调度加载任务所花费的时间)
消费者组偏移计数 kafka.server:类型=GroupMetadataManager,名称=NumOffsets 消费者组承诺抵消总数
消费者组数量 kafka.server:类型=GroupMetadataManager,名称=NumGroups 消费者群体总数
每个州的消费者群体数量 kafka.server:类型= GroupMetadataManager,名称= NumGroups [准备重新平衡,完成重新平衡,空,稳定,死] 每个状态的Consumer Group数量:PreparingRebalance、CompletingRebalance、Empty、Stable、Dead
重新分配分区的数量 kafka.server:类型=ReplicaManager,名称=重新分配分区 代理上重新分配领导者分区的数量。
重新分配流量的传出字节率 kafka.server:类型=BrokerTopicMetrics,名称=ReassignmentBytesOutPerSec 0; 当分区重新分配正在进行时非零。
重新分配流量的传入字节率 kafka.server:类型=BrokerTopicMetrics,名称=ReassignmentBytesInPerSec 0; 当分区重新分配正在进行时非零。
磁盘上分区的大小(以字节为单位) kafka.log:类型=日志,名称=大小,主题=([-.\w]+),分区=([0-9]+) 磁盘上分区的大小,以字节为单位。
分区中日志段的数量 kafka.log:类型=日志,名称=NumLogSegments,主题=([-.\w]+),分区=([0-9]+) 分区中日志段的数量。
分区中的第一个偏移量 kafka.log:类型=日志,名称=LogStartOffset,主题=([-.\w]+),分区=([0-9]+) 分区中的第一个偏移量。
分区中的最后一个偏移量 kafka.log:类型=日志,名称=LogEndOffset,主题=([-.\w]+),分区=([0-9]+) 分区中的最后一个偏移量。

Kraft 监控指标

允许监控 KRaft 仲裁和元数据日志的指标集。
请注意,一些公开的指标取决于节点的角色,如process.roles

KRaft 仲裁监控指标

这些指标在 KRaft 集群中的控制器和代理上报告

指标/属性名称 描述 MBEAN 名称
当前状态 该成员的当前状态;可能的值为领导者、候选人、投票者、追随者、独立者、观察者。 kafka.server:type=raft-metrics,name=当前状态
现任领导人 当前仲裁组领导者的 id;-1表示未知。 kafka.server:type=raft-metrics,name=当前领导者
当前投票数 当前投票领导者的 id;-1 表示没有投票给任何人。 kafka.server:type=raft-metrics,name=当前投票
当前纪元 当前的法定人数纪元。 kafka.server:type=raft-metrics,name=current-epoch
高水印 该成员保持高水位线;-1(如果未知)。 kafka.server:type=raft-metrics,name=high-watermark
对数结束偏移 当前筏日志末端偏移。 kafka.server:type=raft-metrics,name=log-end-offset
未知选民连接数 连接信息未缓存的未知投票者数量。该指标的值始终为 0。 kafka.server:type=raft-metrics,name=number-unknown-voter-connections
平均提交延迟 在 raft 日志中提交条目的平均时间(以毫秒为单位)。 kafka.server:type=raft-metrics,name=commit-latency-avg
最大提交延迟 在 raft 日志中提交条目的最长时间(以毫秒为单位)。 kafka.server:type=raft-metrics,name=commit-latency-max
平均选举延迟 选举新领导者所花费的平均时间(以毫秒为单位)。 kafka.server:type=raft-metrics,name=election-latency-avg
最大选举延迟 选举新领导者所花费的最长时间(以毫秒为单位)。 kafka.server:type=raft-metrics,name=election-latency-max
获取记录率 从 raft 法定人数的领导者处获取的平均记录数。 kafka.server:type=raft-metrics,name=fetch-records-rate
追加记录率 raft 仲裁组的领导者每秒附加的平均记录数。 kafka.server:type=raft-metrics,name=append-records-rate
平均轮询空闲率 客户端 poll() 处于空闲状态而不是等待用户代码处理记录的平均时间比例。 kafka.server:type=raft-metrics,name=poll-idle-ratio-avg

KRaft 控制器监控指标

指标/属性名称 描述 MBEAN 名称
活动控制器数量 该节点上活动控制器的数量。有效值为“0”或“1”。 kafka.controller:类型=KafkaController,名称=ActiveControllerCount
事件队列时间 Ms 请求在控制器事件队列中等待的时间(以毫秒为单位)的直方图。 kafka.controller:类型=ControllerEventManager,名称=EventQueueTimeMs
事件队列处理时间 Ms 控制器事件队列中处理请求所花费的时间(以毫秒为单位)的直方图。 kafka.controller:类型=ControllerEventManager,名称=EventQueueProcessingTimeMs
受保护的broker数量 该控制器观察到的受保护broker的数量。 kafka.controller:类型=KafkaController,名称=FencedBrokerCount
活跃经纪商数量 该控制器观察到的活跃经纪商数量。 kafka.controller:类型=KafkaController,名称=ActiveBrokerCount
全球主题数 该控制器观察到的全局主题的数量。 kafka.controller:类型=KafkaController,名称=GlobalTopicCount
全局分区计数 该控制器观察到的全局分区的数量。 kafka.controller:类型=KafkaController,名称=GlobalPartitionCount
离线分区计数 该控制器观察到的离线主题分区(非内部)的数量。 kafka.controller:类型=KafkaController,名称=OfflinePartitionCount
首选副本不平衡计数 领导者不是首选领导者的主题分区的数量。 kafka.controller:类型=KafkaController,名称=PreferredReplicaImbalanceCount
元数据错误计数 该控制器节点在元数据日志处理期间遇到错误的次数。 kafka.controller:类型=KafkaController,名称=MetadataErrorCount
最后应用的记录偏移量 控制器应用的集群元数据分区中最后一条记录的偏移量。 kafka.controller:类型= KafkaController,名称= LastAppliedRecordOffset
最后提交的记录偏移量 提交给该控制器的最后一条记录的偏移量。 kafka.controller:类型= KafkaController,名称= LastCommitedRecordOffset
Last Applied Record Timestamp The timestamp of the last record from the cluster metadata partition that was applied by the Controller. kafka.controller:type=KafkaController,name=LastAppliedRecordTimestamp
Last Applied Record Lag Ms The difference between now and the timestamp of the last record from the cluster metadata partition that was applied by the controller. For active Controllers the value of this lag is always zero. kafka.controller:type=KafkaController,name=LastAppliedRecordLagMs

KRaft Broker Monitoring Metrics

METRIC/ATTRIBUTE NAME DESCRIPTION MBEAN NAME
Last Applied Record Offset The offset of the last record from the cluster metadata partition that was applied by the broker kafka.server:type=broker-metadata-metrics,name=last-applied-record-offset
Last Applied Record Timestamp 代理应用的集群元数据分区的最后一条记录的时间戳。 kafka.server:type=broker-metadata-metrics,name=last-applied-record-timestamp
最后应用的记录滞后女士 现在与代理应用的集群元数据分区中最后一条记录的时间戳之间的差异 kafka.server:type=broker-metadata-metrics,name=last-applied-record-lag-ms
元数据加载错误计数 BrokerMetadataListener 在加载元数据日志并根据其生成新的 MetadataDelta 时遇到的错误数。 kafka.server:类型=代理元数据指标,名称=元数据加载错误计数
元数据应用错误计数 BrokerMetadataPublisher 在应用基于最新 MetadataDelta 的新 MetadataImage 时遇到的错误数。 kafka.server:类型=代理元数据指标,名称=元数据应用错误计数

生产者/消费者/连接/流的常见监控指标

以下指标可用于生产者/消费者/连接器/流实例。有关具体指标,请参阅以下部分。

指标/属性名称 描述 MBEAN 名称
连接关闭率 窗口中每秒关闭的连接数。 kafka.[生产者
连接关闭总数 窗口中关闭的总连接数。 kafka.[生产者
连接创建率 窗口中每秒建立的新连接。 kafka.[生产者
连接创建总数 窗口中建立的新连接总数。 kafka.[生产者
网络 io 速率 每秒所有连接上的平均网络操作(读取或写入)数。 kafka.[生产者
网络 io 总计 所有连接上的网络操作(读取或写入)总数。 kafka.[生产者
传出字节率 每秒发送到所有服务器的平均传出字节数。 kafka.[生产者
传出字节总数 发送到所有服务器的传出字节总数。 kafka.[生产者
请求率 每秒发送的平均请求数。 kafka.[生产者
请求总数 发送的请求总数。 kafka.[生产者
平均请求大小 窗口中所有请求的平均大小。 kafka.[生产者
请求大小最大 窗口中发送的任何请求的最大大小。 kafka.[生产者
传入字节率 字节/秒读取所有套接字。 kafka.[生产者
传入字节总数 从所有套接字读取的总字节数。 kafka.[生产者
反应速度 每秒收到的响应数。 kafka.[生产者
响应总数 收到的回复总数。 kafka.[生产者
选择率 I/O 层每秒检查要执行的新 I/O 的次数。 kafka.[生产者
选择总计 I/O 层检查要执行的新 I/O 的总次数。 kafka.[生产者
io 等待时间 ns 平均 I/O 线程等待套接字准备好进行读取或写入所花费的平均时间(以纳秒为单位)。 kafka.[生产者
io 等待时间 ns 总计 I/O 线程等待所花费的总时间(以纳秒为单位)。 kafka.[生产者
io-等待时间-总计 *已弃用* I/O 线程等待的总时间(以纳秒为单位)。替换是io-wait-time-ns-total. kafka.[生产者
io 等待比率 I/O 线程等待的时间比例。 kafka.[生产者
io 时间 ns 平均 每个选择调用的 I/O 平均时间长度(以纳秒为单位)。 kafka.[生产者
io 时间 ns 总计 I/O 线程执行 I/O 所花费的总时间(以纳秒为单位)。 kafka.[生产者
iotime-总计 *已弃用* I/O 线程执行 I/O 所花费的总时间(以纳秒为单位)。替换是io-time-ns-total. kafka.[生产者
io-比率 I/O 线程执行 I/O 所花费的时间比例。 kafka.[生产者
连接数 当前活动连接数。 kafka.[生产者
认证成功率 使用 SASL 或 SSL 成功验证的每秒连接数。 kafka.[生产者
成功的身份验证总数 使用 SASL 或 SSL 成功验证的连接总数。 kafka.[生产者
验证失败率 身份验证失败的每秒连接数。 kafka.[生产者
验证失败总数 身份验证失败的连接总数。 kafka.[生产者
重新验证成功率 使用 SASL 成功重新验证的每秒连接数。 kafka.[生产者
成功重新验证总数 使用 SASL 成功重新验证的连接总数。 kafka.[生产者
重新验证最大延迟 由于重新身份验证而观察到的最大延迟(以毫秒为单位)。 kafka.[生产者
重新验证延迟平均 由于重新身份验证而观察到的平均延迟(以毫秒为单位)。 kafka.[生产者
重新验证失败率 重新验证失败的每秒连接数。 kafka.[生产者
重新验证失败总数 重新验证失败的连接总数。 kafka.[生产者
成功认证-无-reauth-总计 由不支持重新身份验证的旧版 2.2.0 之前的 SASL 客户端成功身份验证的总连接数。只能是非零。 kafka.[生产者

生产者/消费者/连接/流的常见每个代理指标

以下指标可用于生产者/消费者/连接器/流实例。有关具体指标,请参阅以下部分。

指标/属性名称 描述 MBEAN 名称
传出字节率 节点每秒发送的平均传出字节数。 kafka.[生产者
传出字节总数 为节点发送的传出字节总数。 kafka.[生产者
请求率 节点每秒发送的平均请求数。 kafka.[生产者
请求总数 向节点发送的请求总数。 kafka.[生产者
平均请求大小 节点窗口中所有请求的平均大小。 kafka.[生产者
请求大小最大 在节点的窗口中发送的任何请求的最大大小。 kafka.[生产者
传入字节率 节点每秒接收的平均字节数。 kafka.[生产者
传入字节总数 节点接收到的总字节数。 kafka.[生产者
请求平均延迟 节点的平均请求延迟(以毫秒为单位)。 kafka.[生产者
请求最大延迟 节点的最大请求延迟(以毫秒为单位)。 kafka.[生产者
反应速度 节点每秒收到的响应数。 kafka.[生产者
响应总数 节点收到的响应总数。 kafka.[生产者

生产者监控

以下指标可用于生产者实例。

指标/属性名称 描述 MBEAN 名称
等待线程 因等待缓冲存储器将其记录排队而阻塞的用户线程数。 kafka.生产者:类型=生产者指标,客户端id=([-.\w]+)
缓冲区总字节数 客户端可以使用的最大缓冲区内存量(无论当前是否使用)。 kafka.生产者:类型=生产者指标,客户端id=([-.\w]+)
缓冲区可用字节 未使用的缓冲区内存总量(未分配或在空闲列表中)。 kafka.生产者:类型=生产者指标,客户端id=([-.\w]+)
缓冲池等待时间 附加程序等待空间分配的时间比例。 kafka.生产者:类型=生产者指标,客户端id=([-.\w]+)
缓冲池等待时间总计 *已弃用*附加程序等待空间分配的总时间(以纳秒为单位)。更换是bufferpool-wait-time-ns-total kafka.生产者:类型=生产者指标,客户端id=([-.\w]+)
缓冲池等待时间 ns 总计 附加程序等待空间分配的总时间(以纳秒为单位)。 kafka.生产者:类型=生产者指标,客户端id=([-.\w]+)
刷新时间 ns 总计 Producer 在 Producer.flush 中花费的总时间(以纳秒为单位)。 kafka.生产者:类型=生产者指标,客户端id=([-.\w]+)
txn-init-时间-ns-总计 生产者初始化交易所花费的总时间(以纳秒为单位)(对于 EOS)。 kafka.生产者:类型=生产者指标,客户端id=([-.\w]+)
txn 开始时间 ns 总计 生产者在 beginTransaction 中花费的总时间(以纳秒为单位)(对于 EOS)。 kafka.生产者:类型=生产者指标,客户端id=([-.\w]+)
txn-发送-偏移-时间-ns-总计 生产者向交易发送偏移量所花费的总时间(以纳秒为单位)(对于 EOS)。 kafka.生产者:类型=生产者指标,客户端id=([-.\w]+)
txn 提交时间 ns 总计 生产者提交交易所花费的总时间(以纳秒为单位)(对于 EOS)。 kafka.生产者:类型=生产者指标,客户端id=([-.\w]+)
txn-中止-时间-ns-总计 生产者中止交易所花费的总时间(以纳秒为单位)(对于 EOS)。 kafka.生产者:类型=生产者指标,客户端id=([-.\w]+)

生产者发送者指标

kafka.生产者:类型=生产者指标,客户端id =“{客户端id}”
属性名称描述
批量大小平均值每个分区每个请求发送的平均字节数。
最大批量大小每个分区每个请求发送的最大字节数。
批次分割率每秒平均批分割数
批次-分割-总计批次拆分总数
平均压缩率记录批次的平均压缩率,定义为压缩批次大小与未压缩大小的平均比率。
元数据时代当前使用的生产者元数据的寿命(以秒为单位)。
生产节流时间平均请求被代理限制的平均时间(以毫秒为单位)
生产节流时间最大请求被代理限制的最长时间(以毫秒为单位)
记录错误率导致错误的平均每秒记录发送数
记录错误总数导致错误的记录发送总数
记录队列平均时间记录批次在发送缓冲区中花费的平均时间(以毫秒为单位)。
记录队列最大时间记录批次在发送缓冲区中花费的最长时间(以毫秒为单位)。
记录重试率平均每秒重试记录发送次数
记录重试总数重试记录发送总数
记录发送率每秒发送的平均记录数。
记录发送总数发送的记录总数。
平均记录大小平均记录大小
记录大小最大值最大记录大小
每个请求的平均记录数每个请求的平均记录数。
请求平均延迟平均请求延迟(以毫秒为单位)
请求最大延迟最大请求延迟(以毫秒为单位)
进行中的请求当前等待响应的正在进行的请求数。
kafka.生产者:类型=生产者主题指标,客户端ID =“{客户端ID}”,主题=“{主题}”
属性名称描述
字节率主题每秒发送的平均字节数。
字节总数为主题发送的总字节数。
压缩率主题的记录批次的平均压缩率,定义为压缩批次大小与未压缩大小的平均比率。
记录错误率导致主题错误的平均每秒记录发送数
记录错误总数导致主题错误的记录发送总数
记录重试率主题的平均每秒重试记录发送次数
记录重试总数主题的重试记录发送总数
记录发送率主题每秒发送的平均记录数。
记录发送总数为某个主题发送的记录总数。

消费者监控

以下指标可用于消费者实例。

指标/属性名称 描述 MBEAN 名称
平均轮询间隔时间 poll() 调用之间的平均延迟。 kafka.consumer:type=consumer-metrics,client-id=([-.\w]+)
最大轮询间隔时间 poll() 调用之间的最大延迟。 kafka.consumer:type=consumer-metrics,client-id=([-.\w]+)
上次轮询秒前 自上次 poll() 调用以来的秒数。 kafka.consumer:type=consumer-metrics,client-id=([-.\w]+)
平均轮询空闲率 消费者的 poll() 空闲时间而不是等待用户代码处理记录的平均时间比例。 kafka.consumer:type=consumer-metrics,client-id=([-.\w]+)
承诺时间 ns 总计 消费者花费在承诺上的总时间(以纳秒为单位)。 kafka.consumer:type=consumer-metrics,client-id=([-.\w]+)
提交同步时间 ns 总计 消费者提交偏移量所花费的总时间(以纳秒为单位)(对于 AOS)。 kafka.consumer:type=consumer-metrics,client-id=([-.\w]+)

消费者组指标

指标/属性名称 描述 MBEAN 名称
平均提交延迟 提交请求的平均时间 kafka.consumer:类型=消费者协调器指标,客户端id=([-.\w]+)
最大提交延迟 提交请求所花费的最长时间 kafka.consumer:类型=消费者协调器指标,客户端id=([-.\w]+)
提交率 每秒提交调用的次数 kafka.consumer:类型=消费者协调器指标,客户端id=([-.\w]+)
提交总数 提交调用总数 kafka.consumer:类型=消费者协调器指标,客户端id=([-.\w]+)
指定分区 当前分配给该消费者的分区数量 kafka.consumer:类型=消费者协调器指标,客户端id=([-.\w]+)
心跳响应时间最大 接收心跳请求响应所需的最长时间 kafka.consumer:类型=消费者协调器指标,客户端id=([-.\w]+)
心率 每秒平均心跳次数 kafka.consumer:类型=消费者协调器指标,客户端id=([-.\w]+)
心跳总数 心跳总数 kafka.consumer:类型=消费者协调器指标,客户端id=([-.\w]+)
平均加入时间 群组重新加入所需的平均时间 kafka.consumer:类型=消费者协调器指标,客户端id=([-.\w]+)
最大加入时间 群组重新加入所需的最长时间 kafka.consumer:类型=消费者协调器指标,客户端id=([-.\w]+)
加入率 每秒组加入数 kafka.consumer:类型=消费者协调器指标,客户端id=([-.\w]+)
联合总计 群组加入总数 kafka.consumer:类型=消费者协调器指标,客户端id=([-.\w]+)
平均同步时间 组同步所需的平均时间 kafka.consumer:类型=消费者协调器指标,客户端id=([-.\w]+)
最大同步时间 组同步所需的最长时间 kafka.consumer:类型=消费者协调器指标,客户端id=([-.\w]+)
同步率 每秒组同步数 kafka.consumer:类型=消费者协调器指标,客户端id=([-.\w]+)
同步总计 组同步总数 kafka.consumer:类型=消费者协调器指标,客户端id=([-.\w]+)
重新平衡平均延迟 组重新平衡所需的平均时间 kafka.consumer:类型=消费者协调器指标,客户端id=([-.\w]+)
重新平衡最大延迟 组重新平衡所需的最长时间 kafka.consumer:类型=消费者协调器指标,客户端id=([-.\w]+)
重新平衡延迟总计 到目前为止组重新平衡所需的总时间 kafka.consumer:类型=消费者协调器指标,客户端id=([-.\w]+)
重新平衡总计 参与群组重新平衡的总数 kafka.consumer:类型=消费者协调器指标,客户端id=([-.\w]+)
每小时重新平衡率 每小时参与的群组重新平衡次数 kafka.consumer:类型=消费者协调器指标,客户端id=([-.\w]+)
重新平衡失败总计 组重新平衡失败的总数 kafka.consumer:类型=消费者协调器指标,客户端id=([-.\w]+)
每小时失败重新平衡率 每小时失败的组重新平衡事件数 kafka.consumer:类型=消费者协调器指标,客户端id=([-.\w]+)
上次重新平衡秒前 自上次重新平衡事件以来的秒数 kafka.consumer:类型=消费者协调器指标,客户端id=([-.\w]+)
last-heartbeat-seconds-ago The number of seconds since the last controller heartbeat kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
partitions-revoked-latency-avg The average time taken by the on-partitions-revoked rebalance listener callback kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
partitions-revoked-latency-max The max time taken by the on-partitions-revoked rebalance listener callback kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
partitions-assigned-latency-avg The average time taken by the on-partitions-assigned rebalance listener callback kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
partitions-assigned-latency-max The max time taken by the on-partitions-assigned rebalance listener callback kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
partitions-lost-latency-avg The average time taken by the on-partitions-lost rebalance listener callback kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
partitions-lost-latency-max The max time taken by the on-partitions-lost rebalance listener callback kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)

Consumer Fetch Metrics

kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}"
属性名称描述
字节消耗率平均每秒消耗的字节数
消耗总字节数消耗的总字节数
获取平均延迟获取请求所花费的平均时间。
获取延迟最大任何获取请求所花费的最长时间。
获取率每秒的获取请求数。
获取大小平均值每个请求获取的平均字节数
获取大小最大值每个请求获取的最大字节数
获取节流时间平均平均节流时间(以毫秒为单位)
获取节流时间最大值最大节流时间(以毫秒为单位)
获取总计获取请求的总数。
记录消耗率平均每秒消耗的记录数
消耗记录总数消耗的记录总数
最大记录滞后此窗口中任何分区的记录数的最大滞后。注意:这是基于当前偏移量而不是提交的偏移量
记录-领先-分钟此窗口中任何分区的记录数的最小领先量
每个请求的平均记录数每个请求的平均记录数
kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}",topic="{topic}"
属性名称描述
字节消耗率主题每秒消耗的平均字节数
消耗总字节数主题消耗的总字节数
获取大小平均值每个主题请求获取的平均字节数
获取大小最大值每个主题请求获取的最大字节数
记录消耗率某个主题每秒消耗的平均记录数
消耗记录总数某个主题消耗的记录总数
每个请求的平均记录数某个主题的每次请求的平均记录数
kafka.consumer:type=consumer-fetch-manager-metrics,partition="{partition}",topic="{topic}",client-id="{client-id}"
属性名称描述
首选只读副本分区的当前只读副本,如果从领导者读取,则为 -1
记录滞后分区的最新滞后时间
记录滞后平均分区的平均滞后
最大记录滞后分区的最大滞后
记录领先分区最新线索
记录领先平均分区平均导程
记录-领先-分钟隔板最小导程

连接监控

Connect 工作进程包含所有生产者和消费者指标以及特定于 Connect 的指标。工作进程本身有许多指标,而每个连接器和任务都有其他指标。[2023-05-22 16:22:33,884] INFO Metrics 调度程序关闭(org.apache.kafka.common.metrics.Metrics:693) [2023-05-22 16:22:33,886] INFO Metrics 记者关闭(org.apache.kafka.common.metrics.Metrics:693) apache.kafka.common.metrics.Metrics:703)

kafka.connect:type=connect-worker-metrics
属性名称描述
连接器数量该工作线程中运行的连接器数量。
连接器启动尝试总数该工作线程尝试启动连接器的总数。
连接器启动失败百分比该工作线程的连接器启动失败的平均百分比。
连接器启动失败总计失败的连接器启动总数。
连接器启动成功百分比该工作人员的连接器启动成功的平均百分比。
连接器启动成功总计成功启动的连接器总数。
任务计数该工作线程中运行的任务数。
任务启动尝试总数该工作线程尝试启动的任务总数。
任务启动失败百分比该工作人员任务启动失败的平均百分比。
任务启动失败总数失败的任务启动总数。
任务启动成功百分比该工作人员的任务开始成功的平均百分比。
任务启动成功总计成功启动的任务总数。
kafka.connect:type=connect-worker-metrics,连接器=“{连接器}”
属性名称描述
连接器损坏的任务计数Worker 上连接器已销毁的任务数。
连接器失败任务计数工作线程上连接器失败的任务数。
连接器暂停任务计数Worker 上连接器暂停的任务数。
连接器重新启动任务计数Worker 上连接器的重新启动任务数。
连接器运行任务计数Worker 上连接器正在运行的任务数。
连接器总任务计数连接器在工作线程上执行的任务数。
连接器未分配的任务计数工作线程上连接器未分配的任务数。
kafka.connect:type=connect-worker-rebalance-metrics
属性名称描述
完成再平衡总数该工作人员完成的重新平衡总数。
连接协[eager, compatible, sessioned]该集群使用的连接协[eager, compatible, sessioned]
时代该工作人员的纪元或代号。
领导者姓名小组领导者的姓名。
重新平衡平均时间毫秒该工作人员重新平衡所花费的平均时间(以毫秒为单位)。
重新平衡最大时间毫秒该工作人员重新平衡所花费的最长时间(以毫秒为单位)。
再平衡该工作人员当前是否正在重新平衡。
自上次重新平衡以来的时间毫秒自该工作人员完成最近一次重新平衡以来的时间(以毫秒为单位)。
kafka.connect:类型=连接器指标,连接器=“{连接器}”
属性名称描述
连接器级连接器类的名称。
连接器型连接器的类型。“源”或“汇”之一。
连接器版本连接器类的版本,由连接器报告。
地位连接器的状态。“未分配”、“正在运行”、“已暂停”、“失败”或“正在重新启动”之一。
kafka.connect:类型=连接器任务指标,连接器=“{连接器}”,任务=“{任务}”
属性名称描述
批量大小平均值任务到目前为止已处理的批次中的平均记录数。
最大批量大小任务迄今为止已处理的最大批次中的记录数。
偏移提交平均时间毫秒该任务提交偏移量所花费的平均时间(以毫秒为单位)。
偏移提交失败百分比此任务的偏移提交尝试失败的平均百分比。
偏移提交最大时间毫秒此任务提交偏移量所花费的最长时间(以毫秒为单位)。
偏移提交成功百分比此任务的偏移量提交尝试成功的平均百分比。
停顿率该任务处于暂停状态的时间比例。
运行比该任务处于运行状态的时间比例。
地位连接器任务的状态。“未分配”、“正在运行”、“已暂停”、“失败”或“正在重新启动”之一。
kafka.connect:类型=接收器任务指标,连接器=“{连接器}”,任务=“{任务}”
属性名称描述
偏移提交完成率成功完成的平均每秒偏移提交完成次数。
偏移-提交-完成-总计已成功完成的偏移量提交完成总数。
偏移提交序列号偏移量提交的当前序列号。
偏移提交跳过率每秒接收得太晚且被跳过/忽略的偏移提交完成的平均数量。
偏移提交跳过总计接收得太晚且跳过/忽略的偏移量提交完成总数。
分区计数分配给该任务的主题分区数,属于该工作线程中指定的接收器连接器。
put-batch-avg-time-ms该任务放置一批接收器记录所花费的平均时间。
put-batch-max-time-ms该任务放置一批接收器记录所花费的最长时间。
接收器记录活动计数已从 Kafka 读取但尚未由接收器任务完全提交/刷新/确认的记录数。
接收器记录活动计数平均值已从 Kafka 读取但尚未由接收器任务完全提交/刷新/确认的平均记录数。
接收器记录活动计数最大值已从 Kafka 读取但尚未由接收器任务完全提交/刷新/确认的最大记录数。
接收记录最大滞后接收器任务落后于任何主题分区的消费者位置的最大记录数延迟。
接收器记录读取率对于属于该工作进程中指定接收器连接器的该任务,每秒从 Kafka 读取的平均记录数。这是应用转换之前的情况。
接收器记录读取总数自上次重新启动该任务以来,属于该工作线程中指定接收器连接器的该任务从 Kafka 读取的记录总数。
接收器记录发送率从转换输出并发送/放入属于此工作线程中指定接收器连接器的此任务的平均每秒记录数。这是应用转换之后的结果,并且不包括转换过滤掉的任何记录。
接收记录发送总计The total number of records output from the transformations and sent/put to this task belonging to the named sink connector in this worker, since the task was last restarted.
kafka.connect:type=source-task-metrics,connector="{connector}",task="{task}"
ATTRIBUTE NAMEDESCRIPTION
poll-batch-avg-time-msThe average time in milliseconds taken by this task to poll for a batch of source records.
poll-batch-max-time-msThe maximum time in milliseconds taken by this task to poll for a batch of source records.
source-record-active-countThe number of records that have been produced by this task but not yet completely written to Kafka.
source-record-active-count-avgThe average number of records that have been produced by this task but not yet completely written to Kafka.
source-record-active-count-max该任务已生成但尚未完全写入 Kafka 的最大记录数。
源记录轮询率属于该工作线程中指定源连接器的任务每秒生成/轮询(转换之前)的平均记录数。
源记录民意调查总计此任务生成/轮询(转换前)的记录总数,属于该工作线程中的指定源连接器。
源记录写入率自上次重新启动任务以来,每秒为属于此工作线程中指定源连接器的任务写入 Kafka 的平均记录数。这是应用转换之后的结果,并且不包括转换过滤掉的任何记录。
源记录写入总数自上次重新启动任务以来,针对属于此工作线程中指定源连接器的此任务写入 Kafka 的记录输出数。这是应用转换之后的结果,并且不包括转换过滤掉的任何记录。
平均交易大小任务迄今为止已提交的事务中的平均记录数。
交易大小最大值任务迄今为止已提交的最大事务中的记录数。
最小交易大小任务迄今为止已提交的最小事务中的记录数。
kafka.connect:类型=任务错误指标,连接器=“{连接器}”,任务=“{任务}”
属性名称描述
死信队列生产失败写入死信队列失败的次数。
死信队列产生请求尝试写入死信队列的次数。
最后错误时间戳此任务上次遇到错误时的纪元时间戳。
记录的总错误数记录的错误数。
总记录错误数此任务中记录处理错误的数量。
总记录失败数此任务中记录处理失败的数量。
跳过的总记录数由于错误而跳过的记录数。
总重试次数重试操作的次数。

流监控

Kafka Streams 实例包含所有生产者和消费者指标以及特定于 Streams 的其他指标。这些指标具有三个记录级别:infodebugtrace

请注意,指标有 4 层层次结构。在顶层,每个启动的 Kafka Streams 客户端都有客户端级指标。每个客户端都有流线程,有自己的指标。每个流线程都有任务,有自己的指标。每个任务都有多个处理器节点,并具有自己的指标。每个任务还​​有许多状态存储和记录缓存,它们都有自己的指标。

使用以下配置选项指定您要收集的指标:

metrics.recording.level="info"

客户指标

以下所有指标的记录级别均为info

指标/属性名称 描述 MBEAN 名称
版本 Kafka Streams 客户端的版本。 kafka.streams:type=stream-metrics,client-id=([-.\w]+)
提交 ID Kafka Streams 客户端的版本控制提交 ID。 kafka.streams:type=stream-metrics,client-id=([-.\w]+)
应用程序 ID Kafka Streams 客户端的应用程序 ID。 kafka.streams:type=stream-metrics,client-id=([-.\w]+)
拓扑描述 在 Kafka Streams 客户端中执行的拓扑的描述。 kafka.streams:type=stream-metrics,client-id=([-.\w]+)
状态 Kafka Streams 客户端的状态。 kafka.streams:type=stream-metrics,client-id=([-.\w]+)
失败的流线程 自 Kafka Streams 客户端启动以来失败的流线程数。 kafka.streams:type=stream-metrics,client-id=([-.\w]+)

线程指标

以下所有指标的记录级别均为info

指标/属性名称 描述 MBEAN 名称
平均提交延迟 该线程的所有正在运行的任务的提交平均执行时间(以毫秒为单位)。 kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
最大提交延迟 该线程的所有正在运行的任务的提交的最大执行时间(以毫秒为单位)。 kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
轮询延迟平均 消费者轮询的平均执行时间(以毫秒为单位)。 kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
轮询延迟最大 消费者轮询的最大执行时间(以毫秒为单位)。 kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
进程平均延迟 处理的平均执行时间(以毫秒为单位)。 kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
进程最大延迟 处理的最大执行时间(以毫秒为单位)。 kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
标点延迟平均 用于标点符号的平均执行时间(以毫秒为单位)。 kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
最大标点延迟 用于标点符号的最大执行时间(以毫秒为单位)。 kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
提交率 每秒的平均提交次数。 kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
提交总数 提交调用的总数。 kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
轮询率 每秒消费者轮询调用的平均数量。 kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
投票总数 消费者民意调查呼叫的总数。 kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
处理率 每秒处理的平均记录数。 kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
进程总数 已处理的记录总数。 kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
标点率 每秒平均标点调用次数。 kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
标点总计 标点调用的总数。 kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
任务创建率 每秒创建的平均任务数。 kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
任务创建总数 创建的任务总数。 kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
任务完成率 每秒关闭的平均任务数。 kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
任务关闭总计 已关闭的任务总数。 kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
阻塞时间 ns 总计 线程在 kafka 上阻塞的总时间。 kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
线程启动时间 线程启动的时间。 kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)

任务指标

以下所有指标的记录级别均为debug,但 drop-records-* 和 active-process-ratio 指标的记录级别除外info

指标/属性名称 描述 MBEAN 名称
进程平均延迟 处理的平均执行时间(以纳秒为单位)。 kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
进程最大延迟 处理的最大执行时间(以 ns 为单位)。 kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
处理率 此任务的所有源处理器节点每秒处理的平均记录数。 kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
进程总数 此任务的所有源处理器节点上处理的记录总数。 kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
平均提交延迟 提交的平均执行时间(以纳秒为单位)。 kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
最大提交延迟 提交的最大执行时间(以 ns 为单位)。 kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
提交率 每秒平均提交调用次数。 kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
提交总数 提交调用的总数。 kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
平均迟到记录 观察到的记录的平均延迟(流时间 - 记录时间戳)。 kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
最大迟到记录 观察到的记录的最大延迟时间(流时间 - 记录时间戳)。 kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
强制处理率 每秒强制执行的平均处理次数。 kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
强制处理总计 强制处理的总数。 kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
记录丢失率 此任务中丢弃的平均记录数。 kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
丢弃记录总数 此任务中删除的记录总数。 kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
活动进程比率 流线程在所有分配的活动任务中处理此任务所花费的时间比例。 kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)

处理器节点指标

以下指标仅在某些类型的节点上可用,即 process-* 指标仅适用于源处理器节点,suppression-emit-* 指标仅适用于抑制操作节点,而 record-e2e-latency- * 指标仅适用于源处理器节点和终端节点(没有后继节点的节点)。所有指标的记录级别均为debug,但 record-e2e-latency-* 指标的记录级别除外info

指标/属性名称 描述 MBEAN 名称
消耗总字节数 源处理器节点消耗的总字节数。 kafka.streams:类型=流处理器节点指标,线程id=([-.\w]+),任务id=([-.\w]+),处理器节点id=([ -.\w]+),主题=([-.\w]+)
产生的字节总数 接收器处理器节点产生的字节总数。 kafka.streams:类型=流处理器节点指标,线程id=([-.\w]+),任务id=([-.\w]+),处理器节点id=([ -.\w]+),主题=([-.\w]+)
处理率 源处理器节点每秒处理的平均记录数。 kafka.streams:类型=流处理器节点指标,线程id=([-.\w]+),任务id=([-.\w]+),处理器节点id=([ -.\w]+)
进程总数 源处理器节点每秒处理的记录总数。 kafka.streams:类型=流处理器节点指标,线程id=([-.\w]+),任务id=([-.\w]+),处理器节点id=([ -.\w]+)
抑制发射率 从抑制操作节点向下游发送记录的速率。 kafka.streams:类型=流处理器节点指标,线程id=([-.\w]+),任务id=([-.\w]+),处理器节点id=([ -.\w]+)
抑制发射总量 从抑制操作节点向下游发出的记录总数。 kafka.streams:类型=流处理器节点指标,线程id=([-.\w]+),任务id=([-.\w]+),处理器节点id=([ -.\w]+)
记录 e2e 延迟平均 记录的平均端到端延迟,通过将记录时间戳与节点完全处理该记录时的系统时间进行比较来测量。 kafka.streams:类型=流处理器节点指标,线程id=([-.\w]+),任务id=([-.\w]+),处理器节点id=([ -.\w]+)
记录 e2e 延迟最大 记录的最大端到端延迟,通过将记录时间戳与节点完全处理该记录时的系统时间进行比较来测量。 kafka.streams:类型=流处理器节点指标,线程id=([-.\w]+),任务id=([-.\w]+),处理器节点id=([ -.\w]+)
记录-e2e-延迟-分钟 记录的最小端到端延迟,通过将记录时间戳与节点完全处理该记录时的系统时间进行比较来测量。 kafka.streams:类型=流处理器节点指标,线程id=([-.\w]+),任务id=([-.\w]+),处理器节点id=([ -.\w]+)
消耗记录总数 源处理器节点消耗的记录总数。 kafka.streams:类型=流处理器节点指标,线程id=([-.\w]+),任务id=([-.\w]+),处理器节点id=([ -.\w]+),主题=([-.\w]+)
记录生成总数 接收器处理器节点生成的记录总数。 kafka.streams:类型=流处理器节点指标,线程id=([-.\w]+),任务id=([-.\w]+),处理器节点id=([ -.\w]+),主题=([-.\w]+)

状态存储指标

以下所有指标的记录级别均为debug,但 record-e2e-latency-* 指标的记录级别除外trace。请注意,该store-scope值是在StoreSupplier#metricsScope()用户自定义状态存储中指定的;对于内置状态存储,目前我们有:

  • in-memory-state
  • in-memory-lru-state
  • in-memory-window-state
  • in-memory-suppression(用于抑制缓冲液)
  • rocksdb-state(对于 RocksDB 支持的键值存储)
  • rocksdb-window-state(对于 RocksDB 支持的橱窗商店)
  • rocksdb-session-state(对于 RocksDB 支持的会话存储)

指标suppression-buffer-size-avg、suppression-buffer-size-max、suppression-buffer-count-avg和suppression-buffer-count-max仅适用于抑制缓冲区。所有其他指标均不可用于抑制缓冲区。

指标/属性名称 描述 MBEAN 名称
放置平均延迟 平均 put 执行时间(以 ns 为单位)。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
最大延迟时间 put 的最大执行时间(以 ns 为单位)。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
如果不存在则放置平均延迟 put-if-absent 的平均执行时间(以 ns 为单位)。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
如果不存在则放置最大延迟 put-if-absent 的最大执行时间(以 ns 为单位)。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
获取平均延迟 平均 get 执行时间(以 ns 为单位)。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
获取最大延迟 最大 get 执行时间(以 ns 为单位)。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
删除平均延迟 平均删除执行时间(以 ns 为单位)。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
删除最大延迟 最大删除执行时间(以 ns 为单位)。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
放置所有延迟平均 put-all 的平均执行时间(以 ns 为单位)。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
放置所有延迟最大 put-all 的最大执行时间(以 ns 为单位)。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
所有延迟平均 所有操作的平均执行时间(以 ns 为单位)。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
所有最大延迟 所有操作的最大执行时间(以 ns 为单位)。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
范围延迟平均 平均范围执行时间(以 ns 为单位)。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
范围-延迟-最大 最大范围执行时间(以 ns 为单位)。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
刷新平均延迟 平均刷新执行时间(以 ns 为单位)。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
刷新延迟最大 最大刷新执行时间(以 ns 为单位)。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
恢复平均延迟 平均恢复执行时间(以纳秒为单位)。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
最大恢复延迟 最大恢复执行时间(以 ns 为单位)。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
认沽利率 这家商店的平均成交率。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
缺席时看跌期权利率 这家商店的平均缺货率。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
获得率 这家商店的平均获得率。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
删除率 该商店的平均删除率。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
全部利率 这家商店的平均全盘率。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
全费率 该商店的平均所有营业率。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
距离速率 这家商店的平均范围率。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
冲洗率 这家商店的平均刷新率。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
恢复率 该商店的平均恢复率。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
抑制缓冲区大小平均值 采样窗口内缓冲数据的平均总大小(以字节为单位)。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),内存抑制-id=([ -.\w]+)
抑制缓冲区大小最大 采样窗口内缓冲数据的最大总大小(以字节为单位)。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),内存抑制-id=([ -.\w]+)
抑制缓冲区计数平均值 采样窗口内缓冲的平均记录数。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),内存抑制-id=([ -.\w]+)
抑制缓冲区计数最大值 采样窗口内缓冲的最大记录数。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),内存抑制-id=([ -.\w]+)
记录 e2e 延迟平均 记录的平均端到端延迟,通过将记录时间戳与节点完全处理该记录时的系统时间进行比较来测量。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
记录 e2e 延迟最大 记录的最大端到端延迟,通过将记录时间戳与节点完全处理该记录时的系统时间进行比较来测量。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
记录-e2e-延迟-分钟 记录的最小端到端延迟,通过将记录时间戳与节点完全处理该记录时的系统时间进行比较来测量。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)

RocksDB 指标

RocksDB 指标分为基于统计的指标和基于属性的指标。前者是从 RocksDB 状态存储收集的统计信息中记录的,而后者是从 RocksDB 公开的属性中记录的。RocksDB 收集的统计数据提供了一段时间内的累积测量值,例如写入状态存储的字节数。RocksDB 公开的属性提供当前测量值,例如当前使用的内存量。请注意,store-scope内置 RocksDB 状态存储当前如下:

  • rocksdb-state(对于 RocksDB 支持的键值存储)
  • rocksdb-window-state(对于 RocksDB 支持的橱窗商店)
  • rocksdb-session-state(对于 RocksDB 支持的会话存储)

RocksDB 基于统计的指标: 以下所有基于统计的指标的记录级别都是 ,debug因为在 RocksDB 中收集统计数据可能会对性能产生影响。每分钟从 RocksDB 状态存储收集基于统计数据的指标。如果状态存储由多个 RocksDB 实例组成(如 WindowStores 和 SessionStores 的情况),则每个指标都会报告状态存储的 RocksDB 实例的聚合。

指标/属性名称 描述 MBEAN 名称
字节写入率 每秒写入 RocksDB 状态存储的平均字节数。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
写入总字节数 写入 RocksDB 状态存储的总字节数。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
字节读取率 The average number of bytes read per second from the RocksDB state store. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
bytes-read-total The total number of bytes read from the RocksDB state store. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
memtable-bytes-flushed-rate The average number of bytes flushed per second from the memtable to disk. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
memtable-bytes-flushed-total The total number of bytes flushed from the memtable to disk. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
memtable-hit-ratio The ratio of memtable hits relative to all lookups to the memtable. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
memtable-flush-time-avg The average duration of memtable flushes to disc in ms. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
memtable-flush-time-min The minimum duration of memtable flushes to disc in ms. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
memtable-flush-time-max The maximum duration of memtable flushes to disc in ms. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
block-cache-data-hit-ratio The ratio of block cache hits for data blocks relative to all lookups for data blocks to the block cache. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
block-cache-index-hit-ratio The ratio of block cache hits for index blocks relative to all lookups for index blocks to the block cache. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
block-cache-filter-hit-ratio The ratio of block cache hits for filter blocks relative to all lookups for filter blocks to the block cache. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
write-stall-duration-avg The average duration of write stalls in ms. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
write-stall-duration-total The total duration of write stalls in ms. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
bytes-read-compaction-rate The average number of bytes read per second during compaction. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
bytes-written-compaction-rate The average number of bytes written per second during compaction. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
compaction-time-avg The average duration of disc compactions in ms. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
compaction-time-min The minimum duration of disc compactions in ms. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
compaction-time-max The maximum duration of disc compactions in ms. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
number-open-files The number of current open files. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
number-file-errors-total The total number of file errors occurred. kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)

RocksDB 基于属性的指标: 以下所有基于属性的指标的记录级别均为 ,info并在访问指标时进行记录。如果状态存储由多个 RocksDB 实例组成,如 WindowStores 和 SessionStores 的情况,则每个指标报告状态存储的所有 RocksDB 实例的总和,块缓存指标除外 block-cache-*。如果每个实例都使用自己的块缓存,则块缓存指标报告所有 RocksDB 实例的总和;如果在所有实例之间共享单个块缓存,则它们仅报告一个实例的记录值。

指标/属性名称 描述 MBEAN 名称
num-不可变内存表 尚未刷新的不可变内存表的数量。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
当前大小活动内存表 活动内存表的大致大小(以字节为单位)。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
当前大小所有内存表 活动和未刷新的不可变内存表的大致大小(以字节为单位)。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
调整所有内存表的大小 活动的、未刷新的不可变的和固定的不可变内存表的大致大小(以字节为单位)。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
活跃内存表数量 活动内存表中的条目数。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
num-entries-imm-mem-表 未刷新的不可变内存表中的条目数。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
num-deletes-active-mem-表 活动内存表中删除条目的数量。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
num-deletes-imm-mem-表 未刷新的不可变内存表中的删除条目数。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
内存表刷新挂起 如果内存表刷新处于挂起状态,则该指标报告 1,否则报告 0。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
运行冲水次数 当前运行的刷新次数。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
待压缩 如果至少有一个压缩待处理,则该指标报告 1,否则报告 0。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
运行压缩次数 当前正在运行的压缩数量。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
估计待压缩字节数 压缩需要在磁盘上重写的估计总字节数,以使所有级别降至目标大小以下(仅对级别压缩有效)。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
sst 文件总数 所有 SST 文件的总大小(以字节为单位)。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
实时 sst 文件大小 属于最新 LSM 树的所有 SST 文件的总大小(以字节为单位)。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
实时版本数 LSM 树的实时版本数。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
块缓存容量 块缓存的容量(以字节为单位)。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
块缓存使用情况 驻留在块缓存中的条目的内存大小(以字节为单位)。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
块缓存固定使用 固定在块缓存中的条目的内存大小(以字节为单位)。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
估计键数 活动和未刷新的不可变内存表和存储中的键的估计数量。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
估计表读取器内存 用于读取 SST 表的估计内存(以字节为单位),不包括块缓存中使用的内存。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)
背景错误 背景错误总数。 kafka.streams:类型=流状态指标,线程id=([-.\w]+),任务id=([-.\w]+),[存储范围]-id=([ -.\w]+)

记录缓存指标

以下所有指标的记录级别均为debug

指标/属性名称 描述 MBEAN 名称
平均命中率 平均缓存命中率定义为缓存读取命中数与缓存读取请求总数的比率。 kafka.streams:类型=流记录缓存指标,线程id=([-.\w]+),任务id=([-.\w]+),记录缓存id=([ -.\w]+)
最小命中率 最小缓存命中率。 kafka.streams:类型=流记录缓存指标,线程id=([-.\w]+),任务id=([-.\w]+),记录缓存id=([ -.\w]+)
最大命中率 最大缓存命中率。 kafka.streams:类型=流记录缓存指标,线程id=([-.\w]+),任务id=([-.\w]+),记录缓存id=([ -.\w]+)

其他的

我们建议监控 GC 时间和其他统计信息以及各种服务器统计信息,例如 CPU 利用率、I/O 服务时间等。在客户端,我们建议监控消息/字节率(全局和每个主题)、请求率/大小/时间,在消费者方面,所有分区之间消息的最大延迟和最小获取请求率。为了让消费者跟上,最大延迟需要小于阈值,最小获取率需要大于 0。

6.9 动物园管理员

稳定版

当前的稳定分支是 3.5。Kafka 会定期更新以包含 3.5 系列中的最新版本。

ZooKeeper 弃用

随着 Apache Kafka 3.5 的发布,Zookeeper 现已被标记为已弃用。计划在 Apache Kafka 的下一个主要版本(版本 4.0)中删除 ZooKeeper,该版本计划最早于 2024 年 4 月进行。在弃用阶段,仍然支持 ZooKeeper 进行 Kafka 集群的元数据管理,但不建议这样做用于新的部署。KRaft 中仍有一小部分功能有待实现,请参阅当前缺失的功能以获取更多信息。

移民

将现有的基于 ZooKeeper 的 Kafka 集群迁移到 KRaft 目前处于预览阶段,我们预计它可以在 3.6 版本中投入生产使用。建议用户开始计划迁移到 KRaft,并开始测试以提供反馈。有关如何执行从ZooKeeper 到 KRaft实时迁移以及当前限制的详细信息,请参阅ZooKeeper 到 KRaft 迁移。

3.x 和 ZooKeeper 支持

支持 ZooKeeper 模式的最终 3.x 小版本将在发布后 12 个月内获得关键错误修复和安全修复。

ZooKeeper 和 KRaft 时间轴

有关 ZooKeeper 删除和计划的 KRaft 功能发布的暂定时间表的详细信息和更新,请参阅KIP-833

运行 ZooKeeper

在操作上,我们为健康的 ZooKeeper 安装执行以下操作:

  • 物理/硬件/网络布局中的冗余:尽量不要将它们全部放在同一个机架中,体面(但不要疯狂)的硬件,尽量保留冗余电源和网络路径等。典型的 ZooKeeper 整体有 5 个或7 台服务器,分别允许 2 台和 3 台服务器宕机。如果您的部署规模较小,那么使用 3 台服务器是可以接受的,但请记住,在这种情况下您只能容忍 1 台服务器停机。
  • I/O 隔离:如果您执行大量写入类型流量,您几乎肯定希望事务日志位于专用磁盘组上。对事务日志的写入是同步的(但为了性能而进行批处理),因此并发写入会显着影响性能。ZooKeeper 快照就是这样一种并发写入源,理想情况下应该写入与事务日志分开的磁盘组上。快照异步写入磁盘,因此通常可以与操作系统和消息日志文件共享。您可以通过 dataLogDir 参数将服务器配置为使用单独的磁盘组。
  • 应用程序隔离:除非您真正了解要安装在同一机器上的其他应用程序的应用程序模式,否则最好单独运行 ZooKeeper(尽管这可能是与硬件功能的平衡行为)。
  • 谨慎使用虚拟化:它可以工作,具体取决于您的集群布局、读/写模式和 SLA,但是虚拟化层引入的微小开销可能会累积起来并导致 ZooKeeper 失效,因为它可能对时间非常敏感
  • ZooKeeper 配置:它是 java,请确保给它“足够”的堆空间(我们通常使用 3-5G 运行它们,但这主要是由于我们这里的数据集大小)。不幸的是,我们没有一个好的公式,但请记住,允许更多的 ZooKeeper 状态意味着快照可能会变得很大,而大快照会影响恢复时间。事实上,如果快照变得太大(几 GB),那么您可能需要增加 initLimit 参数,以便为服务器提供足够的时间来恢复并加入集合。
  • 监控:JMX 和 4 字母单词 (4lw) 命令都非常有用,它们在某些情况下确实重叠(在这些情况下,我们更喜欢 4 字母命令,它们看起来更可预测,或者至少,它们与LI 监控基础设施)
  • 不要过度构建集群:大型集群,尤其是在写入大量使用模式中,意味着大量的集群内通信(写入和后续集群成员更新的仲裁),但不要构建不足(并有淹没集群的风险)。拥有更多服务器会增加您的读取能力。

总体而言,我们尝试使 ZooKeeper 系统尽可能小,以处理负载(加上标准增长容量规划)并尽可能简单。与官方版本相比,我们尽量不对配置或应用程序布局做任何花哨的事情,并尽可能保持其独立性。由于这些原因,我们倾向于跳过操作系统打包版本,因为它倾向于尝试将内容放入操作系统标准层次结构中,这可能会“混乱”,因为缺乏更好的措辞方式。

6.10 卡夫

配置

流程角色

在 KRaft 模式下,每个 Kafka 服务器都可以使用该属性配置为控制器、代理或两者process.roles。该属性可以具有以下值:

  • 如果process.roles设置为broker,则服务器充当代理。
  • 如果process.roles设置为controller,则服务器充当控制器。
  • 如果process.roles设置为broker,controller,则服务器既充当代理又充当控制器。
  • 如果process.roles根本没有设置,则假定处于 ZooKeeper 模式。

既充当代理又充当控制器的 Kafka 服务器被称为“组合”服务器。对于开发环境等小型用例,组合服务器更易于操作。主要缺点是控制器与系统其他部分的隔离程度较低。例如,无法在组合模式下与代理分开滚动或扩展控制器。在关键部署环境中不建议使用组合模式。

控制器

在KRaft模式下,选择特定的Kafka服务器作为控制器(与基于ZooKeeper的模式不同,在该模式中任何服务器都可以成为控制器)。被选为控制器的服务器将参与元数据仲裁。每个控制器都是当前活动控制器的活动控制器或热备用控制器。

Kafka 管理员通常会选择 3 或 5 个服务器来担任此角色,具体取决于成本和系统在不影响可用性的情况下应承受的并发故障数量等因素。大多数控制器必须处于活动状态才能保持可用性。3个控制器时,集群可以容忍1个控制器故障;如果有 5 个控制器,集群可以容忍 2 个控制器故障。

Kafka 集群中的所有服务器都使用该controller.quorum.voters属性发现仲裁投票者。这标识了应使用的仲裁控制器服务器。必须枚举所有控制器。每个控制器均通过其idhostport信息进行标识。例如:

controller.quorum.voters=id1@host1:port1,id2@host2:port2,id3@host3:port3

如果一个Kafka集群有3个控制器,分别名为controller1、controller2和controller3,那么controller1可能有以下配置:


process.roles=controller
node.id=1
listeners=CONTROLLER://controller1.example.com:9093
controller.quorum.voters=1@controller1.example.com:9093,2@controller2.example.com:9093,3@controller3.example.com:9093

每个broker和控制器都必须设置该controller.quorum.voters属性。属性中提供的节点 IDcontroller.quorum.voters必须与控制器服务器上的相应 ID 匹配。例如,在controller1上,node.id必须设置为1,依此类推。每个节点 ID 在特定集群中的所有服务器中必须是唯一的。任何两个服务器都不能具有相同的节点 ID,无论其process.roles值如何。

存储工具

kafka-storage.sh random-uuid命令可用于为新集群生成集群 ID。使用该命令格式化集群中的每个服务器时,必须使用此集群 ID kafka-storage.sh format

这与Kafka过去的运作方式不同。此前,Kafka会自动格式化空白存储目录,并自动生成新的集群ID。进行更改的原因之一是自动格式化有时会掩盖错误情况。这对于控制器和代理服务器维护的元数据日志尤其重要。如果大多数控制器能够以空日志目录启动,则可能会在丢失已提交数据的情况下选举领导者。

调试

元数据仲裁工具

kafka-metadata-quorum工具可用于描述集群元数据分区的运行时状态。例如,以下命令显示元数据仲裁的摘要:

  > bin/kafka-metadata-quorum.sh --bootstrap-server  broker_host:port describe --status
ClusterId:              fMCL8kv1SWm87L_Md-I2hg
LeaderId:               3002
LeaderEpoch:            2
HighWatermark:          10
MaxFollowerLag:         0
MaxFollowerLagTimeMs:   -1
CurrentVoters:          [3000,3001,3002]
CurrentObservers:       [0,1,2]

转储日志工具

kafka-dump-log工具可用于调试集群元数据目录的日志段和快照。该工具将扫描提供的文件并解码元数据记录。例如,此命令解码并打印第一个日志段中的记录:

  > bin/kafka-dump-log.sh --cluster-metadata-decoder --files metadata_log_dir/__cluster_metadata-0/00000000000000000000.log

此命令解码并打印集群元数据快照中的记录:

  > bin/kafka-dump-log.sh --cluster-metadata-decoder --files metadata_log_dir/__cluster_metadata-0/00000000000000000100-0000000001.checkpoint

元数据外壳

kafka-metadata-shell工具可用于交互式检查集群元数据分区的状态:


  > bin/kafka-metadata-shell.sh  --snapshot metadata_log_dir/__cluster_metadata-0/00000000000000000000.log
>> ls /
brokers  local  metadataQuorum  topicIds  topics
>> ls /topics
foo
>> cat /topics/foo/0/data
{
  "partitionId" : 0,
  "topicId" : "5zoAlv-xEh9xRANKXt1Lbg",
  "replicas" : [ 1 ],
  "isr" : [ 1 ],
  "removingReplicas" : null,
  "addingReplicas" : null,
  "leader" : 1,
  "leaderEpoch" : 0,
  "partitionEpoch" : 0
}
>> exit

部署注意事项

  • Kafka 服务器process.role应设置为其中之一brokercontroller但不能同时设置两者。组合模式可以在开发环境中使用,但在关键部署环境中应避免使用。
  • 为了实现冗余,Kafka 集群应使用 3 个控制器。在关键环境中不建议使用超过 3 个控制器。在极少数情况下,出现部分网络故障时,集群元数据仲裁可能会变得不可用。此限制将在 Kafka 的未来版本中得到解决。
  • Kafka 控制器将集群的所有元数据存储在内存和磁盘上。我们认为,对于典型的 Kafka 集群,5GB 主内存和元数据日志管理器上的 5GB 磁盘空间就足够了。

缺少的功能

KRaft 模式中未完全实现以下功能:

  • 支持具有多个存储目录的JBOD配置
  • 修改独立 KRaft 控制器上的某些动态配置
  • 委托代币

ZooKeeper 到 KRaft 迁移

ZooKeeper 到 KRaft 的迁移被视为早期访问功能,不建议用于生产集群。

ZK 到 KRaft 的迁移尚不支持以下功能:

请使用 项目 JIRA和“kraft”组件报告 ZooKeeper 到 KRaft 迁移的问题。

术语

我们这里使用术语“迁移”来指将Kafka集群的元数据系统从ZooKeeper更改为KRaft并迁移现有元数据的过程。“升级”是指安装更新版本的 Kafka。不建议在执行元数据迁移的同时升级软件。

我们还使用术语“ZK 模式”来指代使用 ZooKeeper 作为元数据系统的 Kafka 代理。“KRaft 模式”是指使用 KRaft 控制器仲裁作为其元数据系统的 Kafka 代理。

准备迁移

在开始迁移之前,Kafka 代理必须升级到软件版本 3.5.0,并将“inter.broker.protocol.version”配置设置为“3.5”。有关升级说明,请参阅升级到 3.5.0

建议在迁移处于活动状态时为迁移组件启用 TRACE 级别日志记录。这可以通过将以下 log4j 配置添加到每个 KRaft 控制器的“log4j.properties”文件来完成。

log4j.logger.org.apache.kafka.metadata.migration=TRACE

在迁移期间在 KRaft 控制器和 ZK 代理上启用 DEBUG 日志记录通常很有用。

配置 KRaft 控制器仲裁

在开始迁移之前需要做两件事。首先,必须配置代理以支持迁移,其次,必须部署 KRaft 控制器仲裁。KRaft 控制器应配置与现有 Kafka 集群相同的集群 ID。这可以通过检查代理数据目录中的“meta.properties”文件之一或运行以下命令来找到。

./bin/zookeeper-shell.sh localhost:2181 获取 /cluster/id

KRaft 控制器仲裁还应配置最新metadata.version的“3.4”。有关 KRaft 部署的更多说明,请参阅上述文档

除了标准的 KRaft 配置之外,KRaft 控制器还需要启用迁移支持并提供 ZooKeeper 连接配置。

以下是准备迁移的 KRaft 控制器的示例配置:

#监听 9093 的 KRaft 集群controller.properties 示例
process.roles=控制器
节点 ID=3000
controller.quorum.voters=3000@localhost:9093
controller.listener.names=控制器
听众=控制器://:9093
# 启用迁移
Zookeeper.metadata.migration.enable=true
# ZooKeeper客户端配置
Zookeeper.connect=本地主机:2181
# 其他配置...

注意:KRaft 集群node.id值必须与任何现有的 ZK 代理不同broker.id。在 KRaft 模式中,代理和控制器共享相同的 Node ID 命名空间。

在代理上启用迁移

一旦 KRaft 控制器仲裁启动,代理将需要重新配置并重新启动。代理可以以滚动方式重新启动,以避免影响集群可用性。每个代理都需要以下配置才能与 KRaft 控制器通信并启用迁移。

以下是准备迁移的代理的示例配置:

#监听 9092 的 ZK 代理 server.properties 示例
brokerid=0
听众=纯文本://:9092
广告.listeners=PLAINTEXT://localhost:9092
Listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
# 设置有创血压
inter.broker.protocol.version=3.5
# 启用迁移
Zookeeper.metadata.migration.enable=true
# ZooKeeper客户端配置
zookeeper.connect=本地主机:2181
# KRaft 控制器仲裁配置
controller.quorum.voters=3000@localhost:9093
controller.listener.names=控制器

注意:使用必要的配置重新启动最终的 ZK 代理后,迁移将自动开始。 迁移完成后,在主控上可以观察到一条INFO级别的日志:

完成元数据从 Zookeeper 到 KRaft 的迁移

将代理迁移到 KRaft

一旦 KRaft 控制器完成元数据迁移,代理仍将以 ZK 模式运行。当 KRaft 控制器处于迁移模式时,它将继续向 ZK 模式代理发送控制器 RPC。这包括 UpdateMetadata 和 LeaderAndIsr 等 RPC。

要将代理迁移到 KRaft,只需将它们重新配置为 KRaft 代理并重新启动即可。以上面的代理配置为例,我们将替换broker.idnode.id并添加 process.roles=broker。代理在重新启动时保持相同的代理/节点 ID 非常重要。此时应该删除 Zookeeper 配置。

# 监听 9092 的 KRaft 代理 server.properties 示例
process.roles=broker
节点 ID=0
听众=纯文本://:9092
广告.listeners=PLAINTEXT://localhost:9092
Listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
# 不要设置 IBP,KRaft 使用“metadata.version”功能标志
# inter.broker.protocol.version=3.5
# 删除迁移启用标志
#zookeeper.metadata.migration.enable=true
# 删除 ZooKeeper 客户端配置
#zookeeper.connect=localhost:2181
# 保留KRaft控制器仲裁配置
controller.quorum.voters=3000@localhost:9093
controller.listener.names=控制器

每个代理都会使用 KRaft 配置重新启动,直到整个集群在 KRaft 模式下运行。

完成迁移

在 KRaft 模式下重新启动所有代理后,完成迁移的最后一步是使 KRaft 控制器退出迁移模式。这是通过从每个配置中删除“zookeeper.metadata.migration.enable”属性并一次重新启动它们来完成的。

# 监听 9093 的 KRaft 集群controller.properties 示例
process.roles=控制器
节点 ID=3000
controller.quorum.voters=3000@localhost:9093
controller.listener.names=控制器
听众=控制器://:9093
# 禁用迁移
#zookeeper.metadata.migration.enable=true
# 删除 ZooKeeper 客户端配置
#zookeeper.connect=localhost:2181
# 其他配置...

我们一直在努力

apachecn/AiLearning

【布客】中文翻译组