内容
1. Apache Flume Source – 目标
Source 用于从数据发生器接收数据,它将Flume Event传输到一个或多个Channel。 在本章中,我们将了解不同类型的Flume Source, 如Avro Flume Source,Thrift Source,Exec Source,JMS Source,Spooling Directory Source,Flume Kafka Source,NetCat TCP Source,NetCat UDP Source,Sequence Generator Source, 和Flume中的syslog源文件。
2. Apache Flume 支持的数据源信息
Apache Flume中提供了几种Apache Flume Source。如下:
——-add picture here——
Avro Source
监听Avro端口并接收来自外部Avro客户端的事件信息。 当与前一跳Flume Agent的Avro Sink组合时,可以创建分层收集的拓扑结构。
下表包含Avro Flume Source,其中包含属性名称,默认值和说明:
请看下列名为a1的agent的例子:
a1.sources = r1 a1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.channels = c1 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 4141
ipFilterRules的用法
ipFilterRules定义N个由逗号分隔的ipFilters,模式规则采用下面格式:
<’allow’ or deny>:<’ip’ or ‘name’ for computer name>:<pattern> or allow/deny:ip/name:pattern
例如:ipFilterRules = allow:ip:127.*,allow:name:localhost,deny:ip:*
如下面规则,将允许客户端从本地主机访问,而拒绝来自任何其他ip的客户端;
“allow:name:localhost,deny:ip:”
下面规则,会拒绝localhost上的客户端访问,但是允许来自任何其他ip的客户端;
“deny:name:localhost,allow:ip:”
Thrift Source
Thrift Source在Thrift端口上监听并接收来自外部Thrift客户端的事件信息。 如果将它与前一跳的Flume Agent中的内置ThriftSink 相结合,它可以创建分层的收集数据拓扑结构。 可以通过启用Kerberos身份验证将其配置为以安全模式启动。 如要进行kerberos身份验证,需要设置KDC agent-principal和agent-keytab属性。
下表罗列出Thrift Flume Source的属性名称,默认和描述:
如下面名为a1的agent的例子:
a1.sources = r1 a1.channels = c1 a1.sources.r1.type = thrift a1.sources.r1.channels = c1 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 4141
Exec Source
该Apache Flume Source在启动时运行给定的Unix命令,该命令持续地输出数据到标准输出。如果将属性logStdErr设置为true,那么stderr将被丢弃。如果命令由于某种原因退出,那么Exec Flume Source也会退出并且不会生成其他数据。如cat 或tail -F [file]命令等。
请看下面名为a1的agent的例子:
a1.sources = r1 a1.channels = c1 a1.sources.r1.type = exec a1.sources.r1.command = tail -F / var / log / secure a1.sources.r1.channels = c1
在这里通过命令shell调用'command' ,为了执行,'command'作为参数传递给'shell'。该命令允许'command'使用shell中的功能。像通配符, 管道,循环,条件等等。'command'将在没有'shell'配置的情况下被直接调用。 'shell'的常见值有:'/ bin / sh -c','/ bin / ksh -c','cmd / c','powershell -Command'等。
例如,
a1.sources.tailsource-1.type = exec a1.sources.tailsource-1.shell = / bin / bash -c a1.sources.tailsource-1.command = for i in /path/*.txt; do cat $i; done
JMS Source
JMS Flume Source从JMS目标(如队列或主题)读取消息信息。 当前Flume版本用ActiveMQ进行过测试,但该Source也应该可以与任何JMS Provider一起工作。 该Source提供了可配置的批量大小,消息选择器 (message selector),用户名及密码以及消息到事件的转换器。 需要注意的是,JMS供应商提供的JMS jar应该包含在Flume类路径中,使用plugins.d目录优先,也可使用命令行上的-classpath或flume-env.sh中的FLUME_CLASSPATH变量。
下表列出JMS Flume Source属性名称,缺省值和说明:
Spooling Directory Source
Spooling Directory Source通过将文件放入磁盘上的“Spooling”目录并使用这个“Spooling”目录来获取数据。Apache Flume Spooling Directory Source会监控指定目录下的出现的新文件,并且会解析在新文件出现的事件消息。事件消息的解析逻辑是可定制编写的。当给定文件被完全读入Channel后, 文件将会被重命名。
Spooling Directory Source是可靠的,即使Flume被重启,也不会丢失数据。只有不可变的,唯一命名的文件可以放入到Spooling Directory 中,如果违反这一约束,将会发生下述情况:
如果在将文件放入Spooling目录后写入文件,Flume将在其日志文件中输出错误并停止处理;
如果出现同样的文件名并且该文件已经被处理过, Flume将在其日志文件中输出错误,并且停止处理。
对于文件名,添加一个唯一的标识符(例如时间戳)以便在文件名移动到Spooling目录时,避免上述问题会十分有用。
Spooling Directory Source的属性名称,默认值和解释如下表所示:
如下名为agent-1的Agent的例子:
a1.channels = ch-1 a1.sources = src-1 a1.sources.src-1.type = spooldir a1.sources.src-1.channels = ch-1 a1.sources.src-1.spoolDir = / var / log / apache / flumeSpool a1.sources.src-1.fileHeader = true
Flume Kafka Source
Flume Kafka Source是一个Apache Kafka 🎺消耗者,它可以从Kafka Topic中读取消息。 如果有多个Kafka Flume Source运行,可以使用相同的Consumer Group
来进行配置,这样每个Source都将读取特定的分区 Topic。
下表列举了Flume Kafka Source 的属性名称,默认值和注释:
下面看两个Kafka Source 的例子:
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource tier1.sources.source1.channels = channel1 tier1.sources.source1.batchSize = 5000 tier1.sources.source1.batchDurationMillis = 2000 tier1.sources.source1.kafka.bootstrap.servers = localhost:9092 tier1.sources.source1.kafka.topics = test1,test2 tier1.sources.source1.kafka.consumer.group.id = custom.g.id
下面是一个正则表达式的例子:
例如,
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource tier1.sources.source1.channels = channel1 tier1.sources.source1.kafka.bootstrap.servers = localhost:9092 tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$ # the default kafka.consumer.group.id=flume is used
NetCat TCP Source
Netcat-TCP Apache Flume source用于监听给定端口并将每行文本转换为一个事件消息。 就像nc -k -l [host] [port] 命令一样。 它会打开指定的端口并监听数据。 期望的数据具有换行符的分隔文本, 每一行文本都会解析成Flume事件并通过Channel发送。
下表列举了NetCat TCP Flume Source的属性名称,缺省值和说明:
请看名为a1的Agent的例子:
a1.sources = r1 a1.channels = c1 a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 6666 a1.sources.r1.channels = c1
NetCat UDP Source
如同上节中的Netcat(TCP)Source,Apache Flume NetCat UDP Source 在给定的端口上监听,并将每行文本转换为事件消息并通过连接的channel发送。 如nc -u -k -l [host] [port] 命令一样。
下表列举了NetCat UDP Source的属性名称,默认值和说明:
请看下面名为a1的Agent的例子:
a1.sources = r1 a1.channels = c1 a1.sources.r1.type = netcatudp a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 6666 a1.sources.r1.channels = c1
Sequence Generator Source
Sequence Generator Source 是一个连续生成事件消息的发生器,其计数器从0开始,递增1并停止在totalEvents处,就是常说的简单序列。 另外,当它不能将事件消息发送到channel时,它会重新尝试发送。 该 source 主要用于测试。
下表中包含Sequence Generator Source具有属性名称,默认值和说明:
如下名为a1的Agent的例子:
a1.sources = r1 a1.channels = c1 a1.sources.r1.type = seq a1.sources.r1.channels = c1
SysLog Source
读取系统日志数据并生成Flume事件小溪。 UDP Source将整个消息视为单个事件消息。 TCP Source为由换行符('n')分隔的每个字符串创建一个新的事件消息。
如下例子:
a1.sources = r1 a1.channels = c1 a1.sources.r1.type = syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.host = localhost a1.sources.r1.channels = c1