跳转至

8.Kafka连接

8.1 概述

Kafka Connect 是一种在 Apache Kafka 和其他系统之间可扩展且可靠地传输数据的工具。它使得快速定义将大量数据移入和移出 Kafka 的连接器变得简单。Kafka Connect 可以摄取整个数据库或将所有应用程序服务器的指标收集到 Kafka 主题中,从而使数据可用于低延迟的流处理。导出作业可以将 Kafka 主题中的数据传输到辅助存储和查询系统或批处理系统中以进行离线分析。

Kafka连接功能包括:

  • Kafka 连接器的通用框架 - Kafka Connect 标准化了其他数据系统与 Kafka 的集成,简化了连接器的开发、部署和管理
  • 分布式和独立模式 - 扩展到支持整个组织的大型集中管理服务,或缩小到开发、测试和小型生产部署
  • REST 接口 - 通过易于使用的 REST API 提交和管理到 Kafka Connect 集群的连接器
  • 自动偏移管理 - 只需来自连接器的少量信息,Kafka Connect 就可以自动管理偏移提交过程,因此连接器开发人员无需担心连接器开发中这个容易出错的部分
  • 默认情况下是分布式且可扩展的 - Kafka Connect 构建在现有的组管理协议之上。可以添加更多工作人员来扩展 Kafka Connect 集群。
  • Streaming/batch 集成 - 利用 Kafka 的现有功能,Kafka Connect 是桥接流和批数据系统的理想解决方案

8.2 用户指南

快速入门提供了如何运行独立版本的 Kafka Connect 的简短示例。本节更详细地介绍如何配置、运行和管理 Kafka Connect。

运行 Kafka 连接

Kafka Connect目前支持两种执行模式:独立(单进程)和分布式。

在独立模式下,所有工作都在单个进程中执行。此配置更易于设置和上手,并且在只有一个工作人员有意义的情况下可能很有用(例如收集日志文件),但它无法从 Kafka Connect 的某些功能(例如容错)中受益。您可以使用以下命令启动独立进程:

bin/connect-standalone.sh config/connect-standalone.properties [connector1.propertiesconnector2.properties ...]

第一个参数是工作线程的配置。这包括 Kafka 连接参数、序列化格式以及提交偏移量的频率等设置。提供的示例应该适用于使用 提供的默认配置运行的本地集群config/server.properties。需要进行调整才能与不同的配置或生产部署一起使用。所有工作人员(独立的和分布式的)都需要一些配置:

  • bootstrap.servers - 用于引导与 Kafka 的连接的 Kafka 服务器列表
  • key.converter - 转换器类,用于在 Kafka Connect 格式和写入 Kafka 的序列化形式之间进行转换。这控制写入 Kafka 或从 Kafka 读取的消息中键的格式,并且由于它独立于连接器,因此允许任何连接器使用任何序列化格式。常见格式的示例包括 JSON 和 Avro。
  • value.converter - 转换器类,用于在 Kafka Connect 格式和写入 Kafka 的序列化形式之间进行转换。这控制写入 Kafka 或从 Kafka 读取的消息中的值的格式,并且由于它独立于连接器,因此允许任何连接器使用任何序列化格式。常见格式的示例包括 JSON 和 Avro。
  • plugin.path(默认empty) - 包含 Connect 插件(连接器、转换器、转换)的路径列表。在运行快速启动之前,用户必须添加包含打包在 中的示例 FileStreamSourceConnector 和 FileStreamSinkConnector 的绝对路径connect-file-"version".jar,因为默认情况下这些连接器不包含在Connect Worker 中(请参阅CLASSPATHplugin.path 属性以获取示例)。

独立模式特有的重要配置选项是:

  • offset.storage.file.filename - 存储源连接器偏移的文件

此处配置的参数适用于 Kafka Connect 使用的生产者和消费者来访问配置、偏移量和状态主题。配置Kafka Source任务使用的生产者和Kafka Sink任务使用的消费者,可以使用相同的参数,但需要分别加上producer.和前缀consumer.。从工作配置中继承的唯一不带前缀的 Kafka 客户端参数是bootstrap.servers,在大多数情况下这就足够了,因为同一个集群通常用于所有目的。一个值得注意的例外是安全集群,它需要额外的参数来允许连接。这些参数需要在工作配置中设置最多三次,一次用于管理访问,一次用于 Kafka 源,一次用于 Kafka 接收器。

从 2.3.0 开始,可以分别使用 Kafka 源或 Kafka 接收器的前缀producer.override.consumer.override.为每个连接器单独配置客户端配置覆盖。这些覆盖包含在连接器的其余配置属性中。

其余参数是连接器配置文件。您可以包含任意数量的内容,但所有内容都将在同一进程中(在不同的线程上)执行。您还可以选择不在命令行上指定任何连接器配置文件,而是使用 REST API 在独立工作线程启动后在运行时创建连接器。

分布式模式处理工作的自动平衡,允许您动态扩展(或缩小),并在活动任务以及配置和偏移提交数据中提供容错能力。执行与独立模式非常相似:

bin/connect-distributed.sh config/connect-distributed.properties

区别在于启动的类和配置参数,它们改变了 Kafka Connect 进程决定在哪里存储配置、如何分配工作以及在哪里存储偏移量和任务状态的方式。在分布式模式下,Kafka Connect 将偏移量、配置和任务状态存储在 Kafka 主题中。建议手动创建偏移量、配置和状态的主题,以获得所需的分区数量和复制因子。如果启动 Kafka Connect 时尚未创建主题,则会使用默认的分区数和复制因子自动创建主题,这可能不太适合其使用。

特别是,除了上面提到的常见设置之外,在启动集群之前设置以下配置参数也很重要:

  • group.id(默认connect-cluster)- 集群的唯一名称,用于形成 Connect 集群组;请注意,这不能与消费者组 ID冲突
  • config.storage.topic(默认connect-configs)- 用于存储连接器和任务配置的主题;请注意,这应该是单个分区、高度复制、压缩的主题。您可能需要手动创建主题以确保正确配置,因为自动创建的主题可能有多个分区或自动配置为删除而不是压缩
  • offset.storage.topic(默认connect-offsets)- 用于存储偏移量的主题;该主题应该有许多分区,可以复制,并配置为压缩
  • status.storage.topic(默认connect-status)- 用于存储状态的主题;该主题可以有多个分区,并且应该复制并配置为压缩

请注意,在分布式模式下,连接器配置不会在命令行上传递。相反,请使用下面描述的 REST API 来创建、修改和销毁连接器。

配置连接器

连接器配置是简单的键值映射。在独立模式和分布式模式下,它们都包含在创建(或修改)连接器的 REST 请求的 JSON 有效负载中。在独立模式下,这些也可以在属性文件中定义并传递到命令行上的 Connect 进程。

大多数配置都依赖于连接器,因此此处无法概述。但是,有一些常见的选项:

  • name - 连接器的唯一名称。尝试使用相同名称再次注册将会失败。
  • connector.class - 连接器的 Java 类
  • tasks.max - 应为此连接器创建的最大任务数。如果连接器无法实现这种并行级别,它可能会创建更少的任务。
  • key.converter -(可选)覆盖工作人员设置的默认密钥转换器。
  • value.converter -(可选)覆盖工作人员设置的默认值转换器。

connector.class配置支持多种格式:该连接器的类的全名或别名。如果连接器是 org.apache.kafka.connect.file.FileStreamSinkConnector,您可以指定此全名,也可以使用 FileStreamSink 或 FileStreamSinkConnector 使配置更短一些。

接收器连接器还有一些附加选项来控制其输入。每个接收器连接器必须设置以下其中一项:

  • topics - 用作此连接器输入的以逗号分隔的主题列表
  • topics.regex - 用作此连接器输入的主题的 Java 正则表达式

对于任何其他选项,您应该查阅连接器的文档。

转换

连接器可以配置转换以进行轻量级的一次消息修改。它们可以方便地进行数据处理和事件路由。

可以在连接器配置中指定转换链。

  • transforms - 转换的别名列表,指定转换的应用顺序。
  • transforms.$alias.type - 转换的完全限定类名称。
  • transforms.$alias.$transformationSpecificConfig转换的配置属性

例如,让我们采用内置文件源连接器并使用转换来添加静态字段。

在整个示例中,我们将使用无模式 JSON 数据格式。为了使用无模式格式,我们将以下两行connect-standalone.properties从 true 更改为 false:

key.converter.schemas.enable value.converter.schemas.enable

文件源连接器将每一行读取为字符串。我们将把每一行包装在一个 Map 中,然后添加第二个字段来标识事件的起源。为此,我们使用两种转换:

  • HoistField将输入线放置在地图内
  • InsertField添加静态字段。在此示例中,我们将指示记录来自文件连接器

添加转换后,connect-file-source.properties文件如下所示:

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test
transforms=MakeMap, InsertSource
transforms.MakeMap.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.MakeMap.field=line
transforms.InsertSource.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertSource.static.field=data_source
transforms.InsertSource.static.value=test-file-source

所有以 开头的行transforms都是为了转换而添加的。您可以看到我们创建的两个转换:“InsertSource”和“MakeMap”是我们选择提供转换的别名。转换类型基于您可以在下面看到的内置转换列表。每个转换类型都有附加配置:HoistField 需要一个名为“field”的配置,它是映射中包含文件中原始字符串的字段的名称。InsertField 转换让我们指定字段名称和要添加的值。

当我们在不进行转换的情况下在示例文件上运行文件源连接器,然后使用 读取它们时kafka-console-consumer.sh,结果是:

"foo"
"bar"
"hello world"

然后,我们在将转换添加到配置文件之后创建一个新的文件连接器。这次,结果将是:

{"line":"foo","data_source":"test-file-source"}
{"line":"bar","data_source":"test-file-source"}
{"line":"hello world","data_source":"test-file-source"}

您可以看到我们读取的行现在是 JSON 映射的一部分,并且有一个带有我们指定的静态值的额外字段。这只是您可以使用转换执行的操作的示例之一。

包含的转换

Kafka Connect 包含多种广泛适用的数据和路由转换:

  • InsertField - 使用静态数据或记录元数据添加字段
  • ReplaceField - 过滤或重命名字段
  • MaskField - 将字段替换为类型的有效空值(0、空字符串等)或自定义替换(仅限非空字符串或数值)
  • ValueToKey - 将记录键替换为由记录值中的字段子集形成的新键
  • HoistField - 将整个事件包装为结构或映射中的单个字段
  • ExtractField - 从 Struct 和 Map 中提取特定字段并在结果中仅包含该字段
  • SetSchemaMetadata - 修改模式名称或版本
  • TimestampRouter - 根据原始主题和时间戳修改记录的主题。当使用需要根据时间戳写入不同表或索引的接收器时很有用
  • RegexRouter - 根据原始主题、替换字符串和正则表达式修改记录的主题
  • Filter - 从所有进一步处理中删除消息。这与谓词一起使用来选择性地过滤某些消息。
  • InsertHeader - 使用静态数据添加标题
  • HeadersFrom - 将键或值中的字段复制或移动到记录标题
  • DropHeaders - 按名称删除标头

下面列出了如何配置每个转换的详细信息:

org.apache.kafka.connect.transforms.InsertField

使用记录元数据中的属性或配置的静态值插入字段。

使用为记录键 ( org.apache.kafka.connect.transforms.InsertField$Key) 或值 ( org.apache.kafka.connect.transforms.InsertField$Value) 设计的具体转换类型。

  • 偏移量字段

    Kafka 偏移量的字段名称 - 仅适用于接收器连接器。
    后缀 with!使其成为必填字段,或?使其保持可选(默认)。

    Type:string
    Default:null
    Valid Values:
    Importance:medium

  • 分区字段

    Kafka 分区的字段名称。后缀 with!使其成为必填字段,或?使其保持可选(默认)。

    Type:string
    Default:null
    Valid Values:
    Importance:medium

  • 静态场

    静态数据字段的字段名称。后缀 with!使其成为必填字段,或?使其保持可选(默认)。

    Type:string
    Default:null
    Valid Values:
    Importance:medium

  • 静态值

    静态字段值(如果配置了字段名称)。

    Type:string
    Default:null
    Valid Values:
    Importance:medium

  • 时间戳字段

    记录时间戳的字段名称。后缀 with!使其成为必填字段,或?使其保持可选(默认)。

    Type:string
    Default:null
    Valid Values:
    Importance:medium

  • topic.字段

    Kafka 主题的字段名称。后缀 with!使其成为必填字段,或?使其保持可选(默认)。

    Type:string
    Default:null
    Valid Values:
    Importance:medium

org.apache.kafka.connect.transforms.ReplaceField

过滤或重命名字段。

使用为记录键 ( org.apache.kafka.connect.transforms.ReplaceField$Key) 或值 ( org.apache.kafka.connect.transforms.ReplaceField$Value) 设计的具体转换类型。

  • 排除

    要排除的字段。这优先于要包含的字段。

    Type:list
    Default:""
    Valid Values:
    Importance:medium

  • 包括

    要包含的字段。如果指定,则仅使用这些字段。

    Type:list
    Default:""
    Valid Values:
    Importance:medium

  • 重命名

    字段重命名映射。

    Type:list
    Default:""
    Valid Values:以冒号分隔的对的列表,例如foo:bar,abc:xyz
    Importance:medium

  • 黑名单

    已弃用。请改用排除。

    Type:list
    Default:null
    Valid Values:
    Importance:low

  • 白名单

    已弃用。使用包含代替。

    Type:list
    Default:null
    Valid Values:
    Importance:low

org.apache.kafka.connect.transforms.MaskField

使用字段类型的有效空值(即 0, false, empty string 等)屏蔽指定字段。

对于数字和字符串字段,可以指定转换为正确类型的可选替换值。

使用为记录键 ( org.apache.kafka.connect.transforms.MaskField$Key) 或值 ( org.apache.kafka.connect.transforms.MaskField$Value) 设计的具体转换类型。

  • 领域

    要屏蔽的字段的名称。

    Type:list
    Default:
    Valid Values:non-empty list
    Importance:high

  • 替代品

    自定义值替换,将应用于所有“字段”值(仅限数字或非空字符串值)。

    Type:string
    Default:null
    Valid Values:non-empty string
    Importance:low

org.apache.kafka.connect.transforms.ValueToKey

将记录键替换为由记录值中的字段子集形成的新键。

  • 领域

    要提取作为记录键的记录值上的字段名称。

    Type:list
    Default:
    Valid Values:non-empty list
    Importance:high

org.apache.kafka.connect.transforms.HoistField

如果存在架构,则使用结构中指定的字段名称包装数据;如果存在无架构数据,则使用映射包装数据。

使用为记录键 ( org.apache.kafka.connect.transforms.HoistField$Key) 或值 ( org.apache.kafka.connect.transforms.HoistField$Value) 设计的具体转换类型。

  • 场地

    将在生成的结构或映射中创建的单个字段的字段名称。

    Type:string
    Default:
    Valid Values:
    Importance:medium

org.apache.kafka.connect.transforms.ExtractField

如果存在模式,则从结构中提取指定字段;如果存在无模式数据,则从映射中提取指定字段。任何空值都会不加修改地传递。

使用为记录键 ( org.apache.kafka.connect.transforms.ExtractField$Key) 或值 ( org.apache.kafka.connect.transforms.ExtractField$Value) 设计的具体转换类型。

  • 场地

    要提取的字段名称。

    Type:string
    Default:
    Valid Values:
    Importance:medium

org.apache.kafka.connect.transforms.SetSchemaMetadata

org.apache.kafka.connect.transforms.SetSchemaMetadata$Key在记录的键 ( ) 或值 ( org.apache.kafka.connect.transforms.SetSchemaMetadata$Value) 架构 上设置架构名称、版本或两者。

  • 模式名称

    要设置的架构名称。

    Type:string
    Default:null
    Valid Values:
    Importance:high

  • 模式版本

    要设置的架构版本。

    Type:int
    Default:null
    Valid Values:
    Importance:high

org.apache.kafka.connect.transforms.TimestampRouter

根据原始主题值和记录时间戳更新记录的主题字段。

这主要对接收器连接器有用,因为主题字段通常用于确定目标系统中的等效实体名称(例如数据库表或搜索索引名称)。

  • 时间戳.格式

    与 兼容的时间戳的格式字符串java.text.SimpleDateFormat

    Type:string
    Default:yyyyMMdd
    Valid Values:
    Importance:high

  • topic.格式

    格式字符串可以包含${topic}${timestamp}作为主题和时间戳的占位符。

    Type:string
    Default:\({topic}-\){timestamp}
    Valid Values:
    Importance:high

org.apache.kafka.connect.transforms.RegexRouter

使用配置的正则表达式和替换字符串更新记录主题。

在底层,正则表达式被编译为java.util.regex.Pattern. 如果模式与输入主题匹配,java.util.regex.Matcher#replaceFirst()则与替换字符串一起使用来获取新主题。

  • 正则表达式

    用于匹配的正则表达式。

    Type:string
    Default:
    Valid Values:有效的正则表达式
    Importance:high

  • 替代品

    替换字符串。

    Type:string
    Default:
    Valid Values:
    Importance:high

org.apache.kafka.connect.transforms.Flatten

展平嵌套数据结构,通过将每个级别的字段名称与可配置的分隔符连接来生成每个字段的名称。当模式存在时适用于结构,或者在无模式数据的情况下适用于映射。数组字段及其内容不会被修改。默认分隔符是“.”。

使用为记录键 ( org.apache.kafka.connect.transforms.Flatten$Key) 或值 ( org.apache.kafka.connect.transforms.Flatten$Value) 设计的具体转换类型。

  • 分隔符

    为输出记录生成字段名称时在输入记录的字段名称之间插入的分隔符

    Type:string
    Default:.
    Valid Values:
    Importance:medium

org.apache.kafka.connect.transforms.Cast

将字段或整个键或值转换为特定类型,例如强制整数字段具有较小的宽度。从整数、浮点数、布尔值和字符串转换为任何其他类型,并将二进制转换为字符串(base64 编码)。

使用为记录键 ( org.apache.kafka.connect.transforms.Cast$Key) 或值 ( org.apache.kafka.connect.transforms.Cast$Value) 设计的具体转换类型。

  • 规格

    字段列表以及将其转换为 form field1:type,field2:type 的类型以转换映射或结构的字段。用于转换整个值的单一类型。有效类型包括 int8、int16、int32、int64、float32、float64、boolean 和 string。请注意,二进制字段只能转换为字符串。

    Type:list
    Default:
    Valid Values:以冒号分隔的对的列表,例如foo:bar,abc:xyz
    Importance:high

org.apache.kafka.connect.transforms.TimestampConverter

在不同格式(例如 Unix 纪元、字符串和连接日期/时间戳类型)之间转换时间戳。适用于单个字段或整个值。

使用为记录键 ( org.apache.kafka.connect.transforms.TimestampConverter$Key) 或值 ( org.apache.kafka.connect.transforms.TimestampConverter$Value) 设计的具体转换类型。

  • 目标类型

    所需的时间戳表示形式:字符串、unix、日期、时间或时间戳

    Type:string
    Default:
    Valid Values:[string, unix, Date, Time, Timestamp]
    Importance:high

  • 场地

    包含时间戳的字段,如果整个值是时间戳,则为空

    Type:string
    Default:""
    Valid Values:
    Importance:high

  • 格式

    与 SimpleDateFormat 兼容的时间戳格式。当 type=string 时用于生成输出,或者如果输入是字符串则用于解析输入。

    Type:string
    Default:""
    Valid Values:
    Importance:medium

  • unix.精度

    时间戳所需的 Unix 精度:秒、毫秒、微秒或纳秒。用于在 type=unix 时生成输出,或用于在输入为 Long 时解析输入。注意:此 SMT 将在与具有亚毫秒分量的值进行转换期间导致精度损失。

    Type:string
    Default:milliseconds
    Valid Values:[nanoseconds, microseconds, milliseconds, seconds]
    Importance:low

org.apache.kafka.connect.transforms.Filter

删除所有记录,从链中的后续转换中过滤它们。这旨在有条件地用于过滤掉与特定谓词匹配(或不匹配)的记录。

org.apache.kafka.connect.transforms.InsertHeader

为每条记录添加标题。

  • 标头

    标头的名称。

    Type:string
    Default:
    Valid Values:non-empty string
    Importance:high

  • 值.文字

    要设置为所有记录上的标题值的文字值。

    Type:string
    Default:
    Valid Values:non-empty string
    Importance:high

org.apache.kafka.connect.transforms.DropHeaders

从每条记录中删除一个或多个标头。

  • 标头

    要删除的标头的名称。

    Type:list
    Default:
    Valid Values:non-empty list
    Importance:high

org.apache.kafka.connect.transforms.HeaderFrom

将记录的键/值中的字段移动或复制到该记录的标题中。fields和 的相应元素headers一起标识字段以及应将其移动或复制到的标题。使用为记录键 ( org.apache.kafka.connect.transforms.HeaderFrom$Key) 或值 ( org.apache.kafka.connect.transforms.HeaderFrom$Value) 设计的具体转换类型。

  • 领域

    记录中的字段名称,其值将被复制或移动到标题。

    Type:list
    Default:
    Valid Values:non-empty list
    Importance:high

  • 标头

    标头名称的顺序与字段配置属性中列出的字段名称的顺序相同。

    Type:list
    Default:
    Valid Values:non-empty list
    Importance:high

  • 手术

    如果move要将字段移动到标题(从键/值中删除),或者copy将字段复制到标题(保留在键/值中)。

    Type:string
    Default:
    Valid Values:[move, copy]
    Importance:high

谓词

可以使用谓词配置转换,以便转换仅应用于满足某些条件的消息。特别是,当与过滤器转换谓词结合使用时,可用于有选择地过滤掉某些消息。

谓词在连接器配置中指定。

  • predicates - 应用于某些转换的谓词的别名集。
  • predicates.$alias.type - 谓词的完全限定类名。
  • predicates.$alias.$predicateSpecificConfig - 谓词的配置属性。

All transformations have the implicit config properties predicate and negate. A predicular predicate is associated with a transformation by setting the transformation's predicate config to the predicate's alias. The predicate's value can be reversed using the negate configuration property.

For example, suppose you have a source connector which produces messages to many different topics and you want to:

  • filter out the messages in the 'foo' topic entirely
  • apply the ExtractField transformation with the field name 'other_field' to records in all topics except the topic 'bar'

To do this we need first to filter out the records destined for the topic 'foo'. The Filter transformation removes records from further processing, and can use the TopicNameMatches predicate to apply the transformation only to records in topics which match a certain regular expression. TopicNameMatches's only configuration property is pattern which is a Java regular expression for matching against the topic name. The configuration would look like this:

transforms=Filter transforms.Filter.type=org.apache.kafka.connect.transforms.Filter transforms.Filter.predicate=IsFoo predicates=IsFoo predicates.IsFoo.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches predicates.IsFoo.pattern=foo

接下来,仅当记录的主题名称不是“bar”时,我们才需要应用ExtractField。我们不能直接使用 TopicNameMatches,因为这会将转换应用于匹配的主题名称,而不是不匹配的主题名称。转换的隐式negate配置属性允许我们反转谓词匹配的记录集。将其配置添加到前面的示例中,我们得到:

变换=过滤、提取 Transforms.Filter.type=org.apache.kafka.connect.transforms.Filter Transforms.Filter.predicate=IsFoo Transforms.Extract.type=org.apache.kafka.connect.transforms.ExtractField$Key 变换.Extract.field=other_field Transforms.Extract.predicate=IsBar 变换.Extract.negate=true 谓词=IsFoo,IsBar predicates.IsFoo.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches predicates.IsFoo.pattern=foo predicates.IsBar.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches predicates.IsBar.pattern=bar

Kafka Connect 包括以下谓词:

  • TopicNameMatches - 匹配主题中名称与特定 Java 正则表达式匹配的记录。
  • HasHeaderKey - 匹配具有给定键的标头的记录。
  • RecordIsTombstone - 匹配墓碑记录,即具有空值的记录。

下面列出了如何配置每个谓词的详细信息:

org.apache.kafka.connect.transforms.predicates.HasHeaderKey

一个谓词,对于至少具有一个具有配置名称的标头的记录为真。

  • 姓名

    标头名称。

    Type:string
    Default:
    Valid Values:non-empty string
    Importance:medium

org.apache.kafka.connect.transforms.predicates.RecordIsTombstone

对于逻辑删除(即具有空值)的记录为真的谓词。

org.apache.kafka.connect.transforms.predicates.TopicNameMatches

对于主题名称与配置的正则表达式匹配的记录为 true 的谓词。

  • 图案

    用于匹配记录主题名称的 Java 正则表达式。

    Type:string
    Default:
    Valid Values:非空字符串,有效的正则表达式
    Importance:medium

休息API

由于 Kafka Connect 旨在作为服务运行,因此它还提供了用于管理连接器的 REST API。此 REST API 可在独立模式和分布式模式下使用。可以使用配置选项配置 REST API 服务器listeners。该字段应包含以下格式的侦听器列表:protocol://host:port,protocol2://host2:port2。目前支持的协议有httphttps。例如:

listeners=http://localhost:8080,https://localhost:8443

By default, if no listeners are specified, the REST server runs on port 8083 using the HTTP protocol. When using HTTPS, the configuration has to include the SSL configuration. By default, it will use the ssl.* settings. In case it is needed to use different configuration for the REST API than for connecting to Kafka brokers, the fields can be prefixed with listeners.https. When using the prefix, only the prefixed options will be used and the ssl.* options without the prefix will be ignored. Following fields can be used to configure HTTPS for the REST API:

  • ssl.keystore.location
  • ssl.keystore.password
  • ssl.keystore.type
  • ssl.key.password
  • ssl.truststore.location
  • ssl.truststore.password
  • ssl.truststore.type
  • ssl.enabled.protocols
  • ssl.provider
  • ssl.protocol
  • ssl.cipher.suites
  • ssl.keymanager.algorithm
  • ssl.secure.random.implementation
  • ssl.trustmanager.algorithm
  • ssl.endpoint.identification.algorithm
  • ssl.client.auth

The REST API is used not only by users to monitor / manage Kafka Connect. In distributed mode, it is also used for the Kafka Connect cross-cluster communication. Some requests received on the follower nodes REST API will be forwarded to the leader node REST API. In case the URI under which is given host reachable is different from the URI which it listens on, the configuration options rest.advertised.host.namerest.advertised.port and rest.advertised.listener can be used to change the URI which will be used by the follower nodes to connect with the leader. When using both HTTP and HTTPS listeners, the rest.advertised.listener option can be also used to define which listener will be used for the cross-cluster communication. When using HTTPS for communication between nodes, the same ssl.* or listeners.https options will be used to configure the HTTPS client.

以下是当前支持的 REST API 端点:

  • GET /connectors - 返回活动连接器的列表
  • POST /connectors - 创建一个新的连接器;请求正文应该是一个 JSON 对象,其中包含字符串name字段和config带有连接器配置参数的对象字段
  • GET /connectors/{name} - 获取有关特定连接器的信息
  • GET /connectors/{name}/config - 获取特定连接器的配置参数
  • PUT /connectors/{name}/config - 更新特定连接器的配置参数
  • GET /connectors/{name}/status - 获取连接器的当前状态,包括它是否正在运行、失败、暂停等,它被分配给哪个工作线程,失败时的错误信息,以及它的所有任务的状态
  • GET /connectors/{name}/tasks - 获取连接器当前运行的任务列表
  • GET /connectors/{name}/tasks/{taskid}/status - 获取任务的当前状态,包括它是否正在运行、失败、暂停等,它被分配给哪个worker,以及失败时的错误信息
  • PUT /connectors/{name}/pause - 暂停连接器及其任务,这会停止消息处理,直到连接器恢复为止。其任务占用的任何资源都会保留分配状态,这使得连接器可以在恢复后快速开始处理数据。
  • PUT /connectors/{name}/stop - 停止连接器并关闭其任务,取消分配其任务占用的任何资源。从资源使用的角度来看,这比暂停连接器更有效,但可能会导致连接器恢复后需要更长时间才能开始处理数据。
  • PUT /connectors/{name}/resume - 恢复暂停或停止的连接器(或者如果连接器未暂停或停止则不执行任何操作)
  • POST /connectors/{name}/restart?includeTasks=<true|false>&onlyFailed=<true|false> - 重新启动连接器及其任务实例。
    • “includeTasks”参数指定是否重新启动连接器实例和任务实例(“includeTasks=true”)或仅重新启动连接器实例(“includeTasks=false”),默认值(“false”)保留与早期版本相同的行为。
    • “onlyFailed”参数指定是仅重新启动具有 FAILED 状态的实例(“onlyFailed=true”)还是重新启动所有实例(“onlyFailed=false”),默认值(“false”)保留与早期版本相同的行为。
  • POST /connectors/{name}/tasks/{taskId}/restart - 重新启动单个任务(通常是因为它失败了)
  • DELETE /connectors/{name} - 删除连接器,停止所有任务并删除其配置
  • GET /connectors/{name}/topics - 获取自创建连接器或发出重置其活动主题集的请求以来特定连接器正在使用的主题集
  • PUT /connectors/{name}/topics/reset - 发送请求以清空连接器的活动主题集
  • GET /connectors/{name}/offsets - 获取连接器的当前偏移(有关更多详细信息,请参阅KIP-875 )

Kafka Connect 还提供了一个 REST API,用于获取有关连接器插件的信息:

  • GET /connector-plugins - 返回 Kafka Connect 集群中安装的连接器插件列表。请注意,API 仅检查处理请求的工作线程上的连接器,这意味着您可能会看到不一致的结果,尤其是在滚动升级期间,如果您添加新的连接器 jar
  • PUT /connector-plugins/{connector-type}/config/validate - 根据配置定义验证提供的配置值。此 API 执行每个配置验证,在验证期间返回建议值和错误消息。

以下是顶级(根)端点支持的 REST 请求:

  • GET / - 返回有关 Kafka Connect 集群的基本信息,例如服务 REST 请求的 Connect Worker 的版本(包括源代码的 git commit ID)以及连接到的 Kafka 集群 ID。

有关 REST API 的完整规范,请参阅OpenAPI 文档

Connect 中的错误报告

Kafka Connect 提供错误报告来处理各个处理阶段遇到的错误。默认情况下,转换期间或转换中遇到的任何错误都会导致连接器失败。每个连接器配置还可以通过跳过此类错误、选择性地将每个错误以及失败操作的详细信息和有问题的记录(具有各种详细级别)写入 Connect 应用程序日志来容忍此类错误。当接收器连接器处理从其 Kafka 主题消耗的消息时,这些机制还会捕获错误,并且所有错误都可以写入可配置的“死信队列”(DLQ) Kafka 主题。

要将连接器的转换器、转换或接收器连接器本身内的错误报告到日志,请errors.log.enable=true在连接器配置中设置以记录每个错误和问题记录的主题、分区和偏移量的详细信息。出于其他调试目的,设置errors.log.include.messages=true为还将问题记录键、值和标头记录到日志中(请注意,这可能会记录敏感信息)。

要将连接器的转换器、转换或接收器连接器本身内的错误报告给死信队列主题,请设置errors.deadletterqueue.topic.name和(可选)errors.deadletterqueue.context.headers.enable=true

默认情况下,连接器在出现错误或异常时立即表现出“快速失败”行为。这相当于将以下配置属性及其默认值添加到连接器配置中:

# 禁用失败重试 errors.retry.timeout=0

不记录错误及其上下文

errors.log.enable=false

不要在死信队列主题中记录错误

errors.deadletterqueue.topic.name=

第一次错误时失败

错误容差=无

可以更改这些和其他相关的连接器配置属性以提供不同的行为。例如,可以将以下配置属性添加到连接器配置中,以通过多次重试设置错误处理、记录到应用程序日志和 Kafka 主题my-connector-errors,并通过报告错误而不是使连接器任务失败来容忍所有错误:

# 重试最多 10 分钟,每次连续失败最多等待 30 秒 errors.retry.timeout=600000 errors.retry.delay.max.ms=30000

记录错误上下文以及应用程序日志,但不包括配置和消息

错误log.enable=true errors.log.include.messages=false

将错误上下文生成到 Kafka 主题中

error.deadletterqueue.topic.name=我的连接器错误

容忍所有错误。

错误容差=全部

exactly.once.support

Kafka Connect 能够为接收器连接器(从版本 0.11.0 开始)和源连接器(从版本 3.3.0 开始)提供一次性语义。请注意,对一次语义的支持高度依赖于您运行的连接器类型。即使您在集群中每个节点的配置中设置了所有正确的工作线程属性,如果连接器未设计为或无法利用 Kafka Connect 框架的功能,则可能无法实现精确一次。

水槽连接器

如果接收器连接器支持精确一次语义,要在 Connect 工作线程级别启用精确一次,您必须确保其使用者组配置为忽略中止事务中的记录。您可以通过将工作程序属性设置consumer.isolation.level为来实现此read_committed目的,或者,如果运行支持它的 Kafka Connect 版本,则使用连接器客户端配置覆盖策略,允许在各个连接器配置中consumer.override.isolation.level将该属性设置为。read_committed没有额外的 ACL 要求。

源连接器

如果源连接器支持一次性语义,则必须配置 Connect 集群以启用对一次性源连接器的框架级支持。如果针对安全的 Kafka 集群运行,可能需要额外的 ACL。请注意,对源连接器的一次性支持目前仅在分布式模式下可用;独立的 Connect 工作线程无法提供一次性语义。

工作人员配置

对于新的 Connect 集群,请在集群中每个节点的工作程序配置中将该exactly.once.source.support属性设置为。enabled对于现有集群,需要进行两次滚动升级。在第一次升级期间,该exactly.once.source.support属性应设置为preparing,在第二次升级期间,应设置为enabled

ACL要求

启用一次性源支持后,每个 Connect 工作线程的主体将需要以下 ACL:

手术 资源类型 资源名称 笔记
交易ID connect-cluster-${groupId},其中${groupId}group.id簇的
描述 交易ID connect-cluster-${groupId},其中${groupId}group.id簇的
幂等写 托管worker配置主题的Kafka集群的ID IdempotWrite ACL 自 2.8 起已被弃用,仅对于在 2.8 之前的 Kafka 集群上运行的 Connect 集群是必需的

每个连接器的主体将需要以下 ACL:

手术 资源类型 资源名称 笔记
交易ID ${groupId}-${connector}-${taskId},对于连接器将创建的每个任务,其中${groupId}group.idConnect 集群的 ,${connector}是连接器的名称,${taskId}是任务的 ID(从零开始) ${groupId}-${connector}*如果不存在与其他事务 ID 冲突的风险或者用户可以接受冲突,则可以使用通配符前缀以方便起见。
描述 交易ID ${groupId}-${connector}-${taskId},对于连接器将创建的每个任务,其中${groupId}group.idConnect 集群的 ,${connector}是连接器的名称,${taskId}是任务的 ID(从零开始) ${groupId}-${connector}*如果不存在与其他事务 ID 冲突的风险或者用户可以接受冲突,则可以使用通配符前缀以方便起见。
话题 连接器使用的偏移主题,它是offsets.storage.topic连接器配置中的属性值(如果提供)或offsets.storage.topic工作线程配置中的属性值(如果未提供)。
话题 连接器使用的偏移主题,它是offsets.storage.topic连接器配置中的属性值(如果提供)或offsets.storage.topic工作线程配置中的属性值(如果未提供)。
描述 话题 连接器使用的偏移主题,它是offsets.storage.topic连接器配置中的属性值(如果提供)或offsets.storage.topic工作线程配置中的属性值(如果未提供)。
创造 话题 连接器使用的偏移主题,它是offsets.storage.topic连接器配置中的属性值(如果提供)或offsets.storage.topic工作线程配置中的属性值(如果未提供)。 仅当连接器的偏移主题尚不存在时才需要
幂等写 源连接器写入的Kafka集群ID IdempotWrite ACL 自 2.8 起已被弃用,仅对于在 2.8 之前的 Kafka 集群上运行的 Connect 集群是必需的

8.3 连接器开发指南

本指南介绍了开发人员如何为 Kafka Connect 编写新的连接器以在 Kafka 和其他系统之间移动数据。它简要回顾了一些关键概念,然后描述了如何创建简单的连接器。

核心概念和 API

连接器和任务

要在 Kafka 和另一个系统之间复制数据,用户需要Connector为他们想要从中提取数据或向其中推送数据的系统创建一个 Kafka 系统。连接器有两种类型:SourceConnectors从另一个系统导入数据(例如JDBCSourceConnector将关系数据库导入到 Kafka)和SinkConnectors导出数据(例如HDFSSinkConnector将 Kafka 主题的内容导出到 HDFS 文件)。

Connectors自己不执行任何数据复制:它们的配置描述了要复制的数据,并且Connector负责将该作业分解为一组Tasks可以分发给工作人员的数据。它们Tasks也有两种相应的风格:SourceTaskSinkTask

完成任务后,每个人都Task必须将其数据子集复制到 Kafka 或从 Kafka 复制数据。在 Kafka Connect 中,应该始终可以将这些分配构建为一组输入和输出流,其中包含具有一致模式的记录。有时这种映射是显而易见的:一组日志文件中的每个文件都可以被视为一个流,其中每个解析的行使用相同的模式形成一条记录,并将偏移量存储为文件中的字节偏移量。在其他情况下,映射到此模型可能需要更多工作:JDBC 连接器可以将每个表映射到流,但偏移量不太清楚。一种可能的映射使用时间戳列来生成增量返回新数据的查询,并且最后查询的时间戳可以用作偏移量。

流和记录

每个流应该是一系列键值记录。键和值都可以具有复杂的结构——提供了许多基本类型,但也可以表示数组、对象和嵌套数据结构。运行时数据格式不采用任何特定的序列化格式;这种转换由框架内部处理。

除了键和值之外,记录(由源生成的记录和传送到接收器的记录)还具有关联的流 ID 和偏移量。框架使用它们定期提交已处理的数据的偏移量,以便在发生故障时,可以从上次提交的偏移量恢复处理,从而避免不必要的重新处理和重复事件。

动态连接器

并非所有作业都是静态的,因此Connector实现还负责监视外部系统是否有任何可能需要重新配置的更改。例如,在该JDBCSourceConnector示例中,Connector可能会为每个Task. 创建新表时,它必须发现这一点,以便可以通过更新其配置将新表分配给其中之一Tasks。当它注意到需要重新配置的更改(或 数量的更改Tasks)时,它会通知框架,框架会更新任何相应的Tasks.

开发一个简单的连接器

开发连接器只需要实现两个接口:ConnectorTask。Kafka 的源代码中包含一个简单的示例,位于file。该连接器适用于独立模式,并具有SourceConnector/的实现SourceTask,用于读取文件的每一行并将其作为记录发出,以及SinkConnector/SinkTask将每个记录写入文件。

本节的其余部分将逐步介绍一些代码来演示创建连接器的关键步骤,但开发人员还应该参考完整的示例源代码,因为为了简洁起见,省略了许多细节。

连接器示例

我们将作为一个简单的例子进行介绍SourceConnectorSinkConnector实现非常相似。首先创建继承自的类SourceConnector并添加一个字段,该字段将存储要传播到任务的配置信息(要发送数据的主题,以及可选的 - 要读取的文件名和最大批量大小):

公共类 FileStreamSourceConnector 扩展 SourceConnector { 私有 Map 道具;

最简单的填写方法是taskClass(),它定义了应该在工作进程中实例化以实际读取数据的类:

@覆盖 公共课<? 扩展任务>任务类(){ 返回 FileStreamSourceTask.class; }

我们将FileStreamSourceTask在下面定义该类。接下来,我们添加一些标准生命周期方法,start()并且stop()

@覆盖 公共无效开始(地图<字符串,字符串>道具){ // 初始化逻辑和资源设置可以在此方法中进行。 // 此连接器不需要执行任何操作,但我们确实会向用户记录一条有用的消息。 this.props = 道具; AbstractConfig config = new AbstractConfig(CONFIG_DEF, props); 字符串文件名 = config.getString(FILE_CONFIG); 文件名 = (文件名 == null || 文件名.isEmpty()) ? “标准输入”:config.getString(FILE_CONFIG); log.info("开始从 {} 读取文件源连接器", filename); } @覆盖 公共无效停止(){ // 无需执行任何操作,因为不需要后台监控。 }

最后,实现的真正核心是在taskConfigs(). 在这种情况下,我们只处理单个文件,因此即使我们可能被允许根据参数生成更多任务 maxTasks,我们也会返回一个只有一个条目的列表:

@覆盖 公共列表 > taskConfigs(int maxTasks) { // 请注意,如果需要,任务配置可以包含连接器配置之外的附加配置或不同于连接器配置的配置。例如, // 如果不同的任务有不同的职责,或者如果不同的任务要处理源数据流的不同子集)。 ArrayList> configs = new ArrayList<>(); // 只有一个输入流才有意义。 configs.add(props); 返回配置; }

即使有多个任务,此方法的实现通常也非常简单。它只需要确定输入任务的数量,这可能需要联系从中提取数据的远程服务,然后将它们分开。由于一些在任务之间分配工作的模式非常常见,因此提供了一些实用程序ConnectorUtils来简化这些情况。

请注意,这个简单的示例不包括动态输入。有关如何触发任务配置更新的信息,请参阅下一节中的讨论。

任务示例 - 源任务

接下来我们将描述相应的实现SourceTask。实现很短,但太长,无法在本指南中完全涵盖。我们将使用伪代码来描述大部分实现,但您可以参考完整示例的源代码。

就像连接器一样,我们需要创建一个从适当的基类继承的类Task。它还有一些标准的生命周期方法:

公共类 FileStreamSourceTask 扩展 SourceTask { 私有字符串文件名; 私有输入流流; 私有字符串主题; 私有 int 批量大小; @覆盖 公共无效开始(地图<字符串,字符串>道具){ 文件名 = props.get(FileStreamSourceConnector.FILE_CONFIG); 流 = openOrThrowError(文件名); 主题 = props.get(FileStreamSourceConnector.TOPIC_CONFIG); batchSize = props.get(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG); } @覆盖 公共同步无效停止(){ 流.close(); }

这些是稍微简化的版本,但表明这些方法应该相对简单,并且它们应该执行的唯一工作是分配或释放资源。这个实现有两点需要注意。首先,该start()方法尚未处理从先前偏移量恢复的问题,这将在后面的部分中解决。其次,stop()方法是同步的。这是必要的,因为SourceTasks它们有一个可以无限期阻塞的专用线程,因此需要通过来自 Worker 中不同线程的调用来停止它们。

接下来,我们实现任务的主要功能,即poll()从输入系统获取事件并返回 a 的方法List<SourceRecord>

@覆盖 public List poll() 抛出 InterruptedException { 尝试 { ArrayList 记录 = new ArrayList<>(); while (streamValid(stream) && 记录.isEmpty()) { LineAndOffset line = readToNextLine(stream); 如果(行!=空){ Map sourcePartition = Collections.singletonMap("文件名", 文件名); Map sourceOffset = Collections.singletonMap("position", streamOffset); 记录。添加(新的SourceRecord(sourcePartition,sourceOffset,主题,Schema.STRING_SCHEMA,行)); if (records.size() >= batchSize) { 返回记录; } } 别的 { 线程.sleep(1); } } 返回记录; } catch (IOException e) { // 底层流被终止,可能是由于调用 stop 的结果。允许返回 // null,驱动线程将在必要时处理任何关闭。 } 返回空值; }

同样,我们省略了一些细节,但我们可以看到重要的步骤:该poll()方法将被重复调用,并且对于每次调用,它将循环尝试从文件中读取记录。对于它读取的每一行,它还会跟踪文件偏移量。它使用此信息创建一个包含SourceRecord四部分信息的输出:源分区(只有一个,正在读取的单个文件)、源偏移量(文件中的字节偏移量)、输出主题名称和输出值(行,并且我们包含一个模式,指示该值始终是一个字符串)。构造函数的其他变体SourceRecord还可以包括特定的输出分区、键和标头。

请注意,此实现使用普通的 JavaInputStream接口,如果数据不可用,则可能会休眠。这是可以接受的,因为 Kafka Connect 为每个任务提供了专用线程。虽然任务实现必须符合基本poll()接口,但它们在实现方式上具有很大的灵活性。在这种情况下,基于 NIO 的实现会更高效,但这种简单的方法有效,实现速度快,并且与旧版本的 Java 兼容。

虽然示例中没有使用,但SourceTask还提供了两个 API 来提交源系统中的偏移量:commitcommitRecord。API 是为具有消息确认机制的源系统提供的。重写这些方法允许源连接器在将消息写入 Kafka 后,批量或单独地确认源系统中的消息。APIcommit将偏移量存储在源系统中,直到 . 返回的偏移量为止poll。此 API 的实现应阻塞,直到提交完成。API将每个写入 Kafka 后的commitRecord偏移量保存在源系统中。SourceRecord由于 Kafka Connect 会自动记录偏移量,SourceTask不需要实施它们。如果连接器确实需要确认源系统中的消息,通常只需要其中一个 API。

接收器任务

上一节描述了如何实现一个简单的SourceTaskSourceConnector与和不同SinkConnectorSourceTaskSinkTask具有非常不同的接口,因为SourceTask使用拉接口和SinkTask使用推接口。两者共享共同的生命周期方法,但SinkTask接口却截然不同:

公共抽象类 SinkTask 实现 Task { 公共无效初始化(SinkTaskContext上下文){ this.context = 上下文; } public Abstract void put(Collection 记录); 公共无效刷新(地图 currentOffsets){ }

SinkTask文档包含完整的详细信息,但该界面几乎与SourceTask. 该put()方法应该包含大部分实现,接受 的集合SinkRecords,执行任何所需的转换,并将它们存储在目标系统中。此方法在返回之前不需要确保数据已完全写入目标系统。事实上,在许多情况下,内部缓冲很有用,因此可以一次发送整批记录,从而减少将事件插入下游数据存储的开销。它们包含与 Kafka 主题、分区、偏移量、事件键和值以及可选标头SinkRecords基本相同的信息。SourceRecords

flush()方法在偏移提交过程中使用,允许任务从故障中恢复并从安全点恢复,从而不会错过任何事件。该方法应将任何未完成的数据推送到目标系统,然后阻塞,直到写入被确认。该offsets参数通常可以被忽略,但在某些情况下很有用,其中实现希望将偏移量信息存储在目标存储中以提供一次性交付。例如,HDFS 连接器可以执行此操作并使用原子移动操作来确保该flush()操作以原子方式将数据和偏移量提交到 HDFS 中的最终位置。

错误记录报告者

当为连接器启用错误报告ErrantRecordReporter时,连接器可以使用来报告发送到接收器连接器的各个记录的问题。以下示例显示了当未启用 DLQ 或连接器安装在不具有此报告器功能的旧版 Connect 运行时中时,连接器的SinkTask子类如何获取并使用 、安全地处理空报告器:ErrantRecordReporter

私人 ErrantRecordReporter 记者; @覆盖 公共无效开始(地图<字符串,字符串>道具){ ... 尝试 { 记者 = context.errantRecordReporter(); // 如果未启用 DLQ,则可能为 null } catch (NoSuchMethodException | NoClassDefFoundError e) { // 将发生在 2.6 之前的 Connect 运行时中 记者=空; } } @覆盖 公共无效put(集合记录){ for (SinkRecord 记录:记录) { 尝试 { // 尝试处理记录并将其发送到数据接收器 过程(记录); } catch(异常e){ if (记者!= null) { // 将错误记录发送给错误报告者 记者.报告(记录,e); } 别的 { // 没有错误报告器,所以失败 throw new ConnectException("记录失败", e); } } } }

从之前的偏移量恢复

SourceTask实现包括每个记录的流 ID(输入文件名)和偏移量(文件中的位置)。框架使用它定期提交偏移量,以便在发生故障的情况下,任务可以恢复并最小化重新处理和可能重复的事件数量(或者如果 Kafka Connect 正常停止,则从最近的偏移量恢复,例如在独立模式下或由于作业重新配置)。此提交过程完全由框架自动化,但只有连接器知道如何返回输入流中的正确位置以从该位置恢复。

为了在启动时正确恢复,任务可以使用SourceContext传递到其initialize()方法中的数据来访问偏移数据。在 中initialize(),我们将添加更多代码来读取偏移量(如果存在)并寻找该位置:

流 = new FileInputStream(文件名); Map offset = context.offsetStorageReader().offset(Collections.singletonMap(FILENAME_FIELD, 文件名)); 如果(偏移量!= null){ Long lastRecordedOffset = (Long) offset.get("位置"); if (lastRecordedOffset!= null) eekToOffset(流,lastRecordedOffset); }

当然,您可能需要读取每个输入流的许多键。该OffsetStorageReader接口还允许您发出批量读取以有效加载所有偏移量,然后通过将每个输入流查找到适当的位置来应用它们。

一次性源连接器

支持恰好一次

随着KIP-618的通过,Kafka Connect 从 3.3.0 版本开始支持一次性源连接器。为了使源连接器能够利用此支持,它必须能够为其发出的每个记录提供有意义的源偏移量,并在与任何这些偏移量相对应的确切位置处恢复从外部系统的消耗,而不会丢失或重复消息。

定义事务边界

默认情况下,Kafka Connect 框架将为源任务从其方法返回的每批记录创建并提交一个新的 Kafka 事务poll。但是,连接器还可以定义自己的事务边界,用户可以通过在连接器的配置中设置属性来启用该边界transaction.boundaryconnector

TransactionContext如果启用,连接器的任务将可以从其访问 a SourceTaskContext,它们可以使用它来控制事务何时中止和提交。

例如,至少每十条记录提交一个事务:

私有 int 记录已发送; @覆盖 公共无效开始(地图<字符串,字符串>道具){ this.recordsSent = 0; } @覆盖 公共列表 poll() { List 记录 = fetchRecords(); 布尔值 shouldCommit = false; for (SourceRecord 记录:记录) { if (++this.recordsSent >= 10) { 应该提交=真; } } 如果(应该提交){ this.recordsSent = 0; this.context.transactionContext().commitTransaction(); } 返回记录; }

或者为每十条记录提交一个事务:

私有 int 记录已发送; @覆盖 公共无效开始(地图<字符串,字符串>道具){ this.recordsSent = 0; } @覆盖 公共列表 poll() { List 记录 = fetchRecords(); for (SourceRecord 记录:记录) { if (++this.recordsSent % 10 == 0) { this.context.transactionContext().commitTransaction(record); } } 返回记录; }

大多数连接器不需要定义自己的事务边界。但是,如果源系统中的文件或对象被分解为多个源记录,但应以原子方式传递,则它可能会很有用。此外,如果不可能为每个源记录提供唯一的源偏移量,并且具有给定偏移量的每个记录都在单个事务中传递,则它可能会很有用。

请注意,如果用户未在连接器配置中启用连接器定义的事务边界,则TransactionContext返回的context.transactionContext()将为null

验证API

源连接器开发人员可以实现一些额外的预检验证 API。

某些用户可能需要来自连接器的一次性语义。在这种情况下,他们可以在连接器的配置中设置该exactly.once.support属性。required发生这种情况时,Kafka Connect 框架将询问连接器是否可以使用指定的配置提供一次性语义。这是通过调用exactlyOnceSupport连接器上的方法来完成的。

如果连接器不支持精确一次语义,它仍然应该实现此方法,以让用户确定它无法提供精确一次语义:

@覆盖 公共 ExactlyOnceSupport strictOnceSupport(Map props) { // 该连接器在任何情况下都无法提供一次性语义 返回 ExactlyOnceSupport.UNSUPPORTED; }

否则,连接器应该检查配置,并返回ExactlyOnceSupport.SUPPORTED它是否可以提供一次性语义:

@覆盖 公共 ExactlyOnceSupport strictOnceSupport(Map props) { // 这个连接器总是可以提供exactly-once语义 返回ExactlyOnceSupport.SUPPORTED; }

此外,如果用户已将连接器配置为定义自己的事务边界,Kafka Connect 框架将询问连接器是否可以使用指定的配置定义自己的事务边界,使用以下方法canDefineTransactionBoundaries

@覆盖 公共 ConnectorTransactionBoundaries canDefineTransactionBoundaries(Map props) { // 该连接器始终可以定义自己的事务边界 返回 ConnectorTransactionBoundaries.SUPPORTED; }

此方法仅适用于在某些情况下可以定义自己的事务边界的连接器。如果连接器永远无法定义自己的事务边界,则它不需要实现此方法。

动态输入/输出流

Kafka Connect 旨在定义批量数据复制作业,例如复制整个数据库,而不是创建许多作业来单独复制每个表。这种设计的一个结果是连接器的一组输入或输出流可能会随着时间的推移而变化。

源连接器需要监视源系统的更改,例如数据库中的表添加/删除。当他们接受更改时,他们应该通过ConnectorContext对象通知框架需要重新配置。例如,在SourceConnector

如果(输入更改()) this.context.requestTaskReconfiguration();

该框架将立即请求新的配置信息并更新任务,允许它们在重新配置之前优雅地提交进度。请注意,此SourceConnector监视当前由连接器实现。如果需要额外的线程来执行此监视,则连接器必须自行分配它。

理想情况下,用于监视更改的代码将与任务隔离,Connector并且任务不需要担心它们。然而,更改也会影响任务,最常见的是当任务的输入流之一在输入系统中被破坏时,例如,如果从数据库中删除表。如果Task遇到之前的问题(如果需要轮询更改,Connector这很常见),则需要处理后续错误。值得庆幸的是,这通常可以通过捕获并处理适当的异常来简单地处理。Connector``Task

SinkConnectors通常只需要处理流的添加,这可能会转换为输出中的新条目(例如,新的数据库表)。该框架管理 Kafka 输入的任何更改,例如当输入主题集因正则表达式订阅而更改时。SinkTasks应该期待新的输入流,这可能需要在下游系统中创建新资源,例如数据库中的新表。在这些情况下要处理的最棘手的情况可能是多个人SinkTasks第一次看到新的输入流并同时尝试创建新资源之间的冲突。SinkConnectors另一方面,通常不需要特殊代码来处理动态流集。

连接配置验证

Kafka Connect 允许您在提交要执行的连接器之前验证连接器配置,并可以提供有关错误和建议值的反馈。为了利用这一点,连接器开发人员需要提供一个实现,config()以将配置定义公开给框架。

以下代码FileStreamSourceConnector定义了配置并将其公开给框架。

静态最终 ConfigDef CONFIG_DEF = new ConfigDef() .define(FILE_CONFIG, Type.STRING, null, Importance.HIGH, "源文件名。如果未指定,将使用标准输入") .define(TOPIC_CONFIG, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, new ConfigDef.NonEmptyString(), Importance.HIGH, "要将数据发布到的主题") .define(TASK_BATCH_SIZE_CONFIG, 类型.INT, DEFAULT_TASK_BATCH_SIZE, 重要性.LOW, “源任务每次轮询时可以从文件中读取的最大记录数”); 公共 ConfigDef 配置(){ 返回CONFIG_DEF; }

ConfigDef类用于指定一组预期的配置。对于每个配置,您可以指定名称、类型、默认值、文档、组信息、组中的顺序、配置值的宽度以及适合在 UI 中显示的名称。另外,您可以通过覆盖该类来提供用于单个配置验证的特殊验证逻辑Validator。此外,由于配置之间可能存在依赖关系,例如,一个配置的有效值和可见性可能会根据其他配置的值而改变。为了处理这个问题,ConfigDef允许您指定配置的依赖项,并提供一个实现来Recommender获取有效值并在给定当前配置值的情况下设置配置的可见性。

此外,validate()中的方法Connector提供了默认验证实现,该实现返回允许的配置列表以及配置错误和每个配置的推荐值。但是,它不使用建议值进行配置验证。您可以为自定义配置验证提供默认实现的覆盖,这可能会使用推荐值。

使用模式

FileStream 连接器是很好的例子,因为它们很简单,但它们也有简单的结构化数据——每一行只是一个字符串。几乎所有实用的连接器都需要具有更复杂数据格式的模式。

要创建更复杂的数据,您需要使用 Kafka Connect dataAPI。大多数结构化记录除了基本类型之外还需要与两个类交互:SchemaStruct

API 文档提供了完整的参考,但这里是一个创建Schemaand的简单示例Struct

架构 schema = SchemaBuilder.struct().name(NAME) .field("名称", Schema.STRING_SCHEMA) .field("年龄", Schema.INT_SCHEMA) .field("admin", SchemaBuilder.bool().defaultValue(false).build()) 。建造(); 结构体 = 新结构体(模式) .put("姓名", "芭芭拉·利斯科夫") .put("年龄", 75);

如果您正在实现源连接器,则需要决定何时以及如何创建模式。如果可能,您应该尽可能避免重新计算它们。例如,如果您的连接器保证具有固定架构,请静态创建它并重用单个实例。

然而,许多连接器将具有动态模式。一个简单的例子是数据库连接器。即使只考虑单个表,也不会为整个连接器预定义架构(因为它因表而异)。但它也可能无法在连接器的生命周期内针对单个表进行修复,因为用户可能会执行命令ALTER TABLE。连接器必须能够检测这些变化并做出适当的反应。

接收器连接器通常更简单,因为它们正在消耗数据,因此不需要创建模式。然而,他们应该同样小心地验证他们收到的模式是否具有预期的格式。当架构不匹配时(通常表明上游生产者正在生成无法正确转换到目标系统的无效数据),接收器连接器应抛出异常以向系统指示此错误。

Kafka连接管理

Kafka Connect 的REST 层提供了一组 API 来支持集群管理。这包括用于查看连接器配置及其任务状态以及更改其当前行为(例如更改配置和重新启动任务)的 API。

当连接器首次提交到集群时,会在连接工作线程之间触发重新平衡,以分配由新连接器的任务组成的负载。当连接器增加或减少其所需的任务数量、更改连接器的配置时,或者作为 Connect 集群有意升级的一部分或由于以下原因在组中添加或删除工作线程时,也会使用相同的重新平衡过程:失败。

在 2.3.0 之前的版本中,Connect Worker 会重新平衡集群中的全套连接器及其任务,这是一种简单的方法,可确保每个 Worker 的工作量大致相同。仍然可以通过设置来启用此行为connect.protocol=eager

从 2.3.0 开始,Kafka Connect 默认使用一种执行 增量协作重新平衡的 协议,该协议可以增量平衡 Connect 工作线程之间的连接器和任务,仅影响新任务、要删除的任务或需要从一个工作线程移至另一个工作线程的任务。其他。在重新平衡期间,其他任务不会像旧协议那样停止和重新启动。

scheduled.rebalance.max.delay.ms如果 Connect 工作线程故意或由于故障而离开组,Connect 会在触发重新平衡之前 等待。此延迟默认为五分钟 ( 300000ms),以容忍工作人员故障或升级,而无需立即重新分配离职工作人员的负载。如果该工作人员在配置的延迟内返回,它将获得之前分配的全部任务。但是,这意味着任务将保持未分配状态,直到指定的时间过去scheduled.rebalance.max.delay.ms。如果工作人员未在该时间限制内返回,Connect 将在 Connect 集群中的剩余工作人员之间重新分配这些任务。

当构成 Connect 集群的所有工作线程都配置了 时,就会启用新的 Connect 协议connect.protocol=compatible,这也是缺少此属性时的默认值。因此,当所有工作人员升级到 2.3.0 时,会自动升级到新的 Connect 协议。当最后一个工作线程加入版本 2.3.0 时,Connect 集群的滚动升级将激活增量协作重新平衡。

您可以使用 REST API 查看连接器及其任务的当前状态,包括每个连接器分配到的工作线程的 ID。例如,GET /connectors/file-source/status请求显示名为 的连接器的状态file-source

{ "name": "文件源", “连接器”: { “状态”:“正在运行”, “worker_id”:“192.168.1.208:8083” }, “任务”: [ { “id”:0, “状态”:“正在运行”, “worker_id”:“192.168.1.209:8083” } ] }

status.storage.topic连接器及其任务将状态更新发布到集群中所有工作线程都监控的 共享主题(配置为)。由于工作人员异步使用此主题,因此在通过状态 API 看到状态更改之前通常会有(短暂的)延迟。连接器或其任务之一可能处于以下状态:

  • 未分配:连接器/任务尚未分配给工作人员。
  • 正在运行:连接器/任务正在运行。
  • 已暂停:连接器/任务已被管理暂停。
  • FAILED:连接器/任务失败(通常通过引发异常,在状态输出中报告)。
  • 正在重新启动:连接器/任务正在主动重新启动或预计很快会重新启动

在大多数情况下,连接器和任务状态将匹配,但在发生更改或任务失败时,它们可能会在短时间内有所不同。例如,当连接器首次启动时,在连接器及其任务全部转换为 RUNNING 状态之前可能会有明显的延迟。当任务失败时,状态也会出现分歧,因为 Connect 不会自动重新启动失败的任务。要手动重新启动连接器/任务,您可以使用上面列出的重新启动 API。请注意,如果您在重新平衡期间尝试重新启动任务,Connect 将返回 409(冲突)状态代码。您可以在重新平衡完成后重试,但这可能没有必要,因为重新平衡会有效地重新启动集群中的所有连接器和任务。

从 2.5.0 开始,Kafka Connectstatus.storage.topic还使用 来存储与每个连接器正在使用的主题相关的信息。GET /connectors/{name}/topicsConnect Workers 使用这些每个连接器主题状态更新,通过返回连接器正在使用的主题名称集来响应对 REST 端点的请求。对 REST 端点的请求PUT /connectors/{name}/topics/reset会重置连接器的活动主题集,并允许根据连接器的最新主题使用模式填充新的主题集。连接器删除后,连接器的活动主题集也会被删除。主题跟踪默认启用,但可以通过设置禁用topic.tracking.enable=false。如果要禁止在运行时期间重置连接器活动主题的请求,请设置 Worker 属性topic.tracking.allow.reset=false

有时,暂时停止连接器的消息处理很有用。例如,如果远程系统正在进行维护,源连接器最好停止轮询新数据,而不是用异常垃圾邮件填充日志。对于此用例,Connect 提供了暂停/恢复 API。当源连接器暂停时,Connect 将停止轮询它以获取其他记录。当接收器连接器暂停时,Connect 将停止向其推送新消息。暂停状态是持久的,因此即使您重新启动集群,连接器也不会再次开始消息处理,直到任务恢复为止。请注意,在连接器的所有任务转换到 PAUSED 状态之前可能会有延迟,因为它们可能需要一些时间才能完成暂停时正在进行的任何处理。此外,失败的任务在重新启动之前不会转换为 PAUSED 状态。


我们一直在努力

apachecn/AiLearning

【布客】中文翻译组