Apache Kafka Connect

1. 什么是Kafka Connect

Kafka Connect是一个用于在Apache Kafka和其他系统之间可靠且可伸缩地传输数据的工具。它使得快速定义将大量数据集合移入和移出Kafka的连接器变得简单。Kafka Connect可以把整个数据库或从所有应用程序服务器收集到的指标导入到Kafka主题,使这些数据可用于低延迟的流处理。导出作业可以将Kafka主题中的数据传递到辅助存储和查询系统,也可以传输到批处理系统以进行离线分析。

pic

2. Kafka Connect的特征

pic

a. Kafka连接器的通用框架

    Kafka Connect标准化了其他数据系统与Kafka的集成,简化了连接器的开发,部署和管理。

b. 分布式和单节点模式

    可以支持大到整个组织的大型集中管理服务,小到开发,测试和小型生产部署。 

c. REST 接口

    通过易于使用的REST API提交和管理连接到Kafka 集群集群的连接器。

d.自动偏移管理

    只需来自连接器的一些信息,Kafka Connect就可以自动管理偏移提交过程,因此连接器开发人员无需担心连接器开发中易出错的这一部分

e. 默认是分布式和可伸缩的

    Kafka Connect建立在现有的组管理协议之上可以,添加更多工作节点来扩展Kafka Connect集群。

f. 流式/批量集成

    利用Kafka现有的功能,Kafka Connect是桥接流媒体(bridging streaming)和批量数据系统(batch data system)的理想解决方案。

3. 为什么选择Kafka Connect

我们知道有许多工具能够向Kafka写入数据或从Kafka读取数据,或者导入和导出数据,比如Flume name,为什么我们要用Kafka Connect?这里我们列出了Kafka Connect的主要优势:

pic

a. 失败后自动恢复

    对于传递给Kafka Connect的每条记录,连接器都会附加一条源位置的信息,所以,当发生失败时,Kafka Connect会自动将此信息提供给连接器。通过这种方式,连接器可以在失败的地方恢复。

b. 自动容错

    因为Kafka Connect节点构建了Kafka集群,所以自动容错成为可能。如果一个节点发生故障,它正在进行的工作将重新分配给其他节点。

c. 简单并行

    连接器可以定义数据导入或导出任务,尤其是并行执行的任务。

4. Kafka Connect概念 

a. 连接器和任务

  • 连接器Connector)用来在Kafka和其他系统直接复制数据。

  • 连接器有两种形式:SourceConnectors从另一个系统导入数据(例如JDBCSourceConnector将关系数据库导入Kafka)和SinkConnectors导出数据(例如HDFSSinkConnectorKafka主题的内容导出到HDFS文件)。

  • 在子线程中执行连接器及其相关任务的操作系统进程(基于Java)就是我们所说的Kafka Connect工作节点(worker)。

  • 连接器本身不执行任何数据复制:它们的配置描述了要复制的数据,连接器负责将该作业分解为可以分发给worker的一组任务。 这些任务也有两种:SourceTaskSinkTask 

b. 流和记录

  • 每个流应该是一系列键值记录。键和值都可以具有复杂的结构 – 可以是基本类型,但也可以是数组,对象或者嵌套数据结构。运行时数据格式不假定任何特定的序列化格式,此转换由框架内部处理。

  • 除了键和值之外,记录(由源生成的和传递到接收器的记录)都具有关联的流ID和偏移量。框架使用这些信息来定期提交已处理的数据的偏移量,以便在发生故障时,数据处理可以从上次提交的偏移量恢复,从而避免不必要的重新处理和事件重复。

c. 动态连接器

并非所有作业都是静态的,因此连接器实现还负责监视外部系统是否存在可能需要重新配置的任何更改。例如,在JDBCSourceConnector示例中,连接器可能会为每个任务分配一组表。当创建新表时,连接器必须发现有新的表创建,以便通过更新其配置将新表分配给其中一个任务。当连接器注意到需要重新配置(或任务数量的更改)的更改时,它会通知框架然后框架就更新任何相关的任务。

我们可以说Kafka Connect不是转换重要数据的选择,但为了定义基本数据转换,最新版本的Kafka Connect允许为连接器配置参数。对于“源”连接器,此功能将任务的输入转换为AVROJSON格式,这些转换在将数据写入Kafka主题之前发生。对于“接收器”连接器来说,改功能认为作为输入的Kafka主题的数据已经是AVROJSON格式。

Kafka Connect支持两种模式:分布式模式和单节点模式。

5. Kafka Connect的依赖关系

Kafka Connect节点需要连接到Kafka消息代理集群,无论它是以单节点模式还是以分布式模式运行。

对于分布式模式,Kafka Connect没有其他依赖性,即使连接器配置信息存储在Kafka的消息主题中,Kafka Connect节点也完全无状态。

对于单节点模式,我们需要少量的本地磁盘存储空间来存储“当前位置”和连接器配置。

6. 分布式模式 

分布式模式处理工作的自动平衡,允许你动态扩展(或向下),并在活动任务以及配置和偏移提交数据中提供容错。 

通过传入Kafka 代理地址,几个“内部使用”的Kafka主题的名称和“组ID”参数,我们可以启动Kafka Connect worker实例(即java进程)。通过“内部使用”的Kafka主题,每个worker实例与属于同一组ID的其他worker实例进行协调。这里,一切都是通过Kafka消息代理完成的,不需要其他外部协调机制(没有Zookeeper等)。虽然可以自动定义这些主题,但系统管理员最好使用适当的设置明确定义它们。

Worker之间通过主题就如何最好地在可用的worker组中分发连接器和任务集进行协商。如果一个worker进程终止,集群就会重新平衡,以便在剩余的worker上公平地分配工作。如果启动了一个新的worker,则重新平衡可确保它接管现有worker的某些工作。 

7. 单节点模式 

在独立模式下,所有工作都在一个过程中执行。这种配置更易于设置和使用,并且在只有一个worker的情况下(例如收集日志文件)可能很有用,但它不具有Kafka Connect的某些功能,例如容错。

我们可以说,它是一个简单的分布式模式,只是worker实例在Kafka消息代理中不使用内部主题。这个单节点进程运行所有指定的连接器及其生成的任务。

由于单节点模式将当前源偏移量存储在本地文件中,因此它不会使用Kafka Connect“内部主题”进行存储。在单节点模式下要把有关要执行的连接器的信息作为命令行参数传给执行脚本。

8. REST API 

基本上,每个工作器实例都启动一个嵌入式Web服务器。因此,通过这个Web服务器,Kafka Connect为状态查询和配置公开了一个REST API。对于处于分布式模式的工worker,通过此REST API设定的配置将保存在Kafka消息代理的内部主题中,但是,对于单节点模式的workerREST API没有太大意义。

通过包装worker REST APIConfluent Control Center提供了大量的UI去管理Kafka Connect

NagiosREST调用可以监控Kafka Connect守护程序的来定期获取系统状态。

9. Kafka连接器类型

可以通过实现特定的Java接口来创建连接器。我们有一套现有的连接器,也有一个工具可以编写自定义连接器。

“Confluent Control Center开源版”下载包中有几个连接器,它们是:

  • JDBC

  • HDFS

  • S3

  • Elasticsearch

但是,没有办法单独下载这些连接器,但我们可以从Confluent Open Source中提取它们,因为它们是开源的,我们也可以下载并将其复制到标准的Kafka安装中。

10. Kafka Connect worker连接Kafka消息代理

出于管理目的,在分布式模式中,每个worker建立一个与Kafka消息代理集群的连接。我们把在worker配置文件定义的设置称为“顶级”设置。 

每个连接器都建立了与Kafka消息代理集群的单独连接(套接字集)。这些连接的许多设置都是从“顶级”设置继承的,但是可以使用配置前缀“consumer.”(由sink使用)或“producer.”(由source使用)来覆盖它们,以便让承载生产数据的连接与承载管理消息的连接使用不同的Kafka消息代理的网络设置。不能为所有的接收器(消费者)或这源(生产者)配置每个连接器设置。

11. 标准的JDBC 源连接器

连接器hub site列出了JDBC源连接器,此连接器是Confluent Open Source下载的一部分。我们无法单独下载JDBC源连接器,因对于已安装Apache的“纯”Kafka软件包而非Confluent软件包的用户,必须从Confluent软件包中提取此连接器并将其复制。

JDBC源连接器有各种配置选项:

  • 要扫描的数据库,指定为JDBC URL

  • 轮询间隔。

  • 一个正则表达式,指定要监视的表;对于每个表,都有一个单独的Kafka主题。

  • 具有递增ID”SQL列,在这种情况下,连接器可以检测新记录(select id> last-known-id)。

  • 具有可更新的时间戳的SQL列,在这种情况下,连接器可以检测新的/已修改的记录(select where timestamp > last-known-timestamp)。

要注意的是,尽管连接器具有复制多个表的能力,但“递增id”和“时间戳”列名是全局的即,当复制多个表时,它们的列必须遵循相同的命名约定。

12. Kafka Connect的安全 

对于使用Kerberos安全的Kafka消息代理,Kafka Connectv0.10.1.0)工作得非常好,使用SSL加密连接的消息代理也可以正常工作。

但是,不论通过Kerberos还是SSL,都无法保护Kafka Connect节点公开的REST API,尽管有一个功能请求来追踪这个问题。因此,在配置一个安全的集群时,必须配置外部代理(例如Apache HTTP)以充当REST服务的安全网关。

13. Kafka Connect的局限性

尽管有很多优点,Kafka Connect也有一些局限性:

  • 目前,连接器的选择非常少。

  • 商业和开源功能的分离非常差。

  • 缺乏配置工具。

  • 糟糕/原始的部署自定义连接器(插件)的方法。

  • 非常以Java / Scala为中心。 

因此,目前,至少在没有购买商业工具的情况下,Kafka  Connect感觉更像是一个“工具包”,而不是打包解决方案(packaged solution)。


Apache Kafka Connect

发表评论

邮箱地址不会被公开。 必填项已用*标注

一 × = 六

滚动到顶部