Apache Flume Source – Flume Source类型

1. Apache Flume Source – 目标

Source 用于从数据发生器接收数据,它将Flume Event传输到一个或多个Channel。 在本章中,我们将了解不同类型的Flume Source, 如Avro Flume Source,Thrift Source,Exec Source,JMS Source,Spooling Directory Source,Flume Kafka Source,NetCat TCP Source,NetCat UDP Source,Sequence Generator Source, 和Flume中的syslog源文件。

2. Apache Flume 支持的数据源信息

Apache Flume中提供了几种Apache Flume Source。如下:

——-add picture here——

Avro Source

监听Avro端口并接收来自外部Avro客户端的事件信息。 当与前一跳Flume Agent的Avro Sink组合时,可以创建分层收集的拓扑结构。 

下表包含Avro Flume Source,其中包含属性名称,默认值和说明:

1542617610426429.png

1542617654517313.png

请看下列名为a1的agent的例子:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141

ipFilterRules的用法

ipFilterRules定义N个由逗号分隔的ipFilters,模式规则采用下面格式:

<’allow’ or deny>:<’ip’ or ‘name’ for computer name>:<pattern> or allow/deny:ip/name:pattern

例如:ipFilterRules = allow:ip:127.*,allow:name:localhost,deny:ip:*

如下面规则,将允许客户端从本地主机访问,而拒绝来自任何其他ip的客户端;

“allow:name:localhost,deny:ip:”

下面规则,会拒绝localhost上的客户端访问,但是允许来自任何其他ip的客户端;

“deny:name:localhost,allow:ip:”

Thrift Source

Thrift Source在Thrift端口上监听并接收来自外部Thrift客户端的事件信息。 如果将它与前一跳的Flume Agent中的内置ThriftSink 相结合,它可以创建分层的收集数据拓扑结构。 可以通过启用Kerberos身份验证将其配置为以安全模式启动。 如要进行kerberos身份验证,需要设置KDC agent-principal和agent-keytab属性。

下表罗列出Thrift Flume Source的属性名称,默认和描述:

1542617848641854.png

1542617887756528.png

 如下面名为a1的agent的例子:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = thrift
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141

Exec Source

该Apache Flume Source在启动时运行给定的Unix命令,该命令持续地输出数据到标准输出。如果将属性logStdErr设置为true,那么stderr将被丢弃。如果命令由于某种原因退出,那么Exec Flume Source也会退出并且不会生成其他数据。如cat 或tail -F [file]命令等。

请看下面名为a1的agent的例子:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F / var / log / secure
a1.sources.r1.channels = c1

在这里通过命令shell调用'command' ,为了执行,'command'作为参数传递给'shell'。该命令允许'command'使用shell中的功能。像通配符, 管道,循环,条件等等。'command'将在没有'shell'配置的情况下被直接调用。 'shell'的常见值有:'/ bin / sh -c','/ bin / ksh -c','cmd / c','powershell -Command'等。

例如,

a1.sources.tailsource-1.type = exec
a1.sources.tailsource-1.shell = / bin / bash -c
a1.sources.tailsource-1.command = for i in /path/*.txt; do cat $i; done

JMS Source

JMS Flume Source从JMS目标(如队列或主题)读取消息信息。 当前Flume版本用ActiveMQ进行过测试,但该Source也应该可以与任何JMS Provider一起工作。 该Source提供了可配置的批量大小,消息选择器 (message selector),用户名及密码以及消息到事件的转换器。 需要注意的是,JMS供应商提供的JMS jar应该包含在Flume类路径中,使用plugins.d目录优先,也可使用命令行上的-classpath或flume-env.sh中的FLUME_CLASSPATH变量。

下表列出JMS Flume Source属性名称,缺省值和说明:

1542618088696125.png

1542618160937207.png

Spooling Directory Source

Spooling Directory Source通过将文件放入磁盘上的“Spooling”目录并使用这个“Spooling”目录来获取数据。Apache Flume Spooling Directory Source会监控指定目录下的出现的新文件,并且会解析在新文件出现的事件消息。事件消息的解析逻辑是可定制编写的。当给定文件被完全读入Channel后,  文件将会被重命名。

Spooling Directory Source是可靠的,即使Flume被重启,也不会丢失数据。只有不可变的,唯一命名的文件可以放入到Spooling Directory 中,如果违反这一约束,将会发生下述情况:

如果在将文件放入Spooling目录后写入文件,Flume将在其日志文件中输出错误并停止处理;

如果出现同样的文件名并且该文件已经被处理过, Flume将在其日志文件中输出错误,并且停止处理。

对于文件名,添加一个唯一的标识符(例如时间戳)以便在文件名移动到Spooling目录时,避免上述问题会十分有用。

Spooling Directory Source的属性名称,默认值和解释如下表所示:

1542618314919422.png

1542618386598431.png

1542618443631404.png

1542618488261291.png

如下名为agent-1的Agent的例子:

a1.channels = ch-1
a1.sources = src-1
a1.sources.src-1.type = spooldir
a1.sources.src-1.channels = ch-1
a1.sources.src-1.spoolDir = / var / log / apache / flumeSpool
a1.sources.src-1.fileHeader = true

Flume Kafka Source

Flume Kafka Source是一个Apache Kafka &#x1f3ba;消耗者,它可以从Kafka Topic中读取消息。 如果有多个Kafka Flume Source运行,可以使用相同的Consumer Group

来进行配置,这样每个Source都将读取特定的分区 Topic。

下表列举了Flume Kafka Source 的属性名称,默认值和注释:

1542630255815538.png

1542630316862310.png

下面看两个Kafka Source 的例子:

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.batchSize = 5000
tier1.sources.source1.batchDurationMillis = 2000
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics = test1,test2
tier1.sources.source1.kafka.consumer.group.id = custom.g.id

下面是一个正则表达式的例子:

例如,

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$
# the default kafka.consumer.group.id=flume is used

NetCat TCP Source

Netcat-TCP Apache Flume source用于监听给定端口并将每行文本转换为一个事件消息。 就像nc -k -l [host] [port] 命令一样。 它会打开指定的端口并监听数据。 期望的数据具有换行符的分隔文本, 每一行文本都会解析成Flume事件并通过Channel发送。

下表列举了NetCat TCP Flume Source的属性名称,缺省值和说明:

1542630449156295.png

请看名为a1的Agent的例子:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666
a1.sources.r1.channels = c1

NetCat UDP Source

 如同上节中的Netcat(TCP)Source,Apache Flume NetCat UDP Source 在给定的端口上监听,并将每行文本转换为事件消息并通过连接的channel发送。 如nc -u -k -l [host] [port] 命令一样。

下表列举了NetCat UDP Source的属性名称,默认值和说明:

1542630589769869.png

请看下面名为a1的Agent的例子:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = netcatudp
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666
a1.sources.r1.channels = c1

Sequence Generator Source

Sequence Generator Source 是一个连续生成事件消息的发生器,其计数器从0开始,递增1并停止在totalEvents处,就是常说的简单序列。 另外,当它不能将事件消息发送到channel时,它会重新尝试发送。 该 source 主要用于测试。 

下表中包含Sequence Generator Source具有属性名称,默认值和说明:

1542630694398224.png

如下名为a1的Agent的例子:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = seq
a1.sources.r1.channels = c1

SysLog Source

读取系统日志数据并生成Flume事件小溪。 UDP Source将整个消息视为单个事件消息。 TCP  Source为由换行符('n')分隔的每个字符串创建一个新的事件消息。

如下例子:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1

Apache Flume Source – Flume Source类型

发表评论

电子邮件地址不会被公开。 必填项已用*标注

÷ 五 = 二