内容
1. Apache Flume Sink
Flume Sink组件用来将数据存储到Flume的制定的目标存储中。 在本章中,将学习什么是Flume Sink。 此外,将会看到 Flume Sink 的类型如 HDFS Sink, Hive Sink, Logger Sink, Thrift Sink, IRC Sink, File Roll Sink, HBase Sink, MorphlineSolrSink, ElasticSearchSink, Kite Dataset Sink, Flume Kafka Sink, HTTP Sink, Custom Sink等。
2.什么是Apache Flume Sink
将数据存储到目标存储中时,可以使用Flume Sink组件, 目标存储可以是诸如HBase和HDFS分布式存储系统。它吸收来自Channel的事件消息,然后将其传送到目标地点。 Sink的目的地可能是另一个Flume Agent或 存储系统,如HDFS Sink。
需要注意的是,同一个agent有可能具有多个source,sink和channel。
请看下图Apache Flume Sink的类型:
——add picture here——
Apache Flume Sink
3. Apache Flume Sink的类型
下面将详细讨论一下 Flume Sink的几种类型:
HDFS Sink
当需要将事件消息写入到Hadoop分布式文件系统(HDFS)时,可以使用HDFS Sink in Flume。 它支持创建文本和序列文件,还支持对两种文件类型进行压缩。 并且可以根据时间长短,数据大小或消息事件的数量,创建适当的新文件,当创建 新文件时,意味着关闭当前文件并创建一个新文件。
HDFS Flume Sink支持以下转义序列:
HDFS Sink 的常用配置参数如下:
请看如下 agent a1的一个例子:
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = / flume / events /%y-%m-%d /%H%M /%S a1.sinks.k1.hdfs.filePrefix = events- a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit =minute
Hive Sink
Hive Sink将包含分隔符文本或JSON数据的事件消息直接写入Hive表或分区中。 事件消息是通过Hive事务编写的。 如果将一组事件消息提交给Hive,对Hive查询可以检索到提交的事件消息。 Sink 写入的分区可以是预先创建的,也可以由Flume创建。 事件消息数据的字段映射到Hive表中的相应列。
请看 Hive Sink 的主要属性:
下面看一个Hive Sink 的示例:
create table weblogs ( id int , msg string ) partitioned by (continent string, country string, time string) clustered by (id) into 5 buckets stored as orc;
代理 Hive Sink a1 如下所示:
a1.channels = c1 a1.channels.c1.type = memory a1.sinks = k1 a1.sinks.k1.type = hive a1.sinks.k1.channel = c1 a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083 a1.sinks.k1.hive.database = logsdb a1.sinks.k1.hive.table = weblogs a1.sinks.k1.hive.partition = asia,%{country},%y-%m-%d-%H-%M a1.sinks.k1.useLocalTimeStamp = false a1.sinks.k1.round = true a1.sinks.k1.roundValue = 10 a1.sinks.k1.roundUnit = minute a1.sinks.k1.serializer = DELIMITED a1.sinks.k1.serializer.delimiter = “t” a1.sinks.k1.serializer.serdeSeparator = ‘t’ a1.sinks.k1.serializer.fieldnames =id,,msg
Logger Sink
该Sink主要在INFO级别记录事件消息,通常用于测试/调试目的。
请看下列的属性:
请看下列 Agent a1的例子:
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = logger a1.sinks.k1.channel = c1
Avro Sink
Avro Sink 和 Avro Source一起工作,用于构建Flume分层收集数据消息结构。 发送到此接收器的流量事件转换为Avro消息事件并发送到配置的主机名/端口。 该Sink将从已配置的Channel中以批量的方式被读取。
请看下面的主要的Avro Sink 属性:
如下 Agent a1的Apache Flume Avro示例:
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = 10.10.10.10 a1.sinks.k1.port = 4545
Thrift Sink
Thrift Sink 和 Thrift Source用于构建Flume分层收集数据消息结构。 发送到该接收器的流量事件转换为Thrift事件并发送到配置的主机名/端口。该Sink将从已配置的Channel中以批量的方式获取数据。
通过启用Kerberos身份验证,该sink 可以以安全模式启动。 如果要和安全模式下启动的Thrift Source进行通信,Thrift source也应以安全模式运行。 client-principal和client-keytab是Thrift sink用于向kerberos KDC进行身份验证的属性。 server-principal用于设置此 sink 配置为以安全模式连接到的Thrift source时的Principal。
主要属性如下:
请看如下 agent a1的示例:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = thrift
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545
IRC Sink
IRC Sink接收来自 Channel的消息事件,并将它们转发给配置的IRC目的地。
下表列举出该Sink 的主要属性:
请看如下 agent a1的示例:
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = thrift a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = 10.10.10.10 a1.sinks.k1.port = 4545
IRC Sink
IRC Sink接收来自 Channel的消息事件,并将它们转发给配置的IRC目的地。
下表列举出该Sink 的主要属性:
请看如下的示例:
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = irc a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = irc.yourdomain.com a1.sinks.k1.nick = flume a1.sinks.k1.chan = #flume
File Roll Sink
File Flume Sink用于在本地文件系统上存储事件消息。
下表列举了主要属性名称及其对File Roll Flume Sink的描述:
请看如下示例:
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = file_roll a1.sinks.k1.channel = c1 a1.sinks.k1.sink.directory = /var/log/flume
Null Sink
Null Flume 丢弃从Channel中接收的所有事件消息。
下表显示了属性名称及其对Null Flume Sink的描述:
请看下面示例:
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = null a1.sinks.k1.channel = c1
HBase Sink
该Sink将数据写入HBase。 Hbase的配置信息从 hbase-site.xml中获取。使用配置指定的HbaseEventSerializer的类将事件消息存入到Hbase存储 。 当Hbase未能写入某些事件消息时,该Flume Sink将重新写入该事务中的所有事件消息。
下表显示属性名称及其对HBase Flume Sink的描述:
示例如下:
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = hbase a1.sinks.k1.table = foo_table a1.sinks.k1.columnFamily = bar_cf a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer a1.sinks.k1.channel = c1
MorphlineSolrSink
该Sink从Flume事件中提取数据并进行转换,并将事件消息实时加载到Apache Solr服务器中,后者又将查询提供给最终用户或搜索应用程序。
该 Sink非常适合将原始数据流写入HDFS(通过HdfsSink)并同时对数据进行提取,转换并加载到Solr(通过MorphlineSolrSink)的用例。 该Sink可以处理来自不同数据源的任意异构的原始数据,并将其转换为对搜索应用程序有用的数据模型。
常见的属性如下:
示例如下:
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink a1.sinks.k1.channel = c1 a1.sinks.k1.morphlineFile = /etc/flume-ng/conf/morphline.conf #a1.sinks.k1.morphlineId = morphline1 #a1.sinks.k1.batchSize = 1000 #a1.sinks.k1.batchDurationMillis = 1000
ElasticSearchSink
该Sink将数据写入到elasticsearch集群,可以在Kibana图形界面可以显示写入的消息事件。需要将elasticsearch 所需的lucene-core jar放置在Apache Flume安装的lib目录中。
默认情况下,ElasticSearchLogStashEventSerializer会将事件消息序列化以便elasticsearch使用。该参数可以被覆盖,请参考org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer或org.apache.flume.sink.elasticsearch.ElasticSearchIndexRequestBuilderFactory的实现。
下面是ElasticSearch的主要属性:
示例如下所示:
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = elasticsearch a1.sinks.k1.hostNames = 127.0.0.1:9200,127.0.0.2:9300 a1.sinks.k1.indexName = foo_index a1.sinks.k1.indexType = bar_type a1.sinks.k1.clusterName = foobar_cluster a1.sinks.k1.batchSize = 500 a1.sinks.k1.ttl = 5d a1.sinks.k1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer a1.sinks.k1.channel = c1
Kafka Sink
通过该Sink可将事件消息数据发布到Kafka topic 上。其目标是将Flume与Kafka集成,以便基于拉式的处理系统可以处理来自各种Flume Source的数据。 目前支持Kafka 0.9.x以上系列版本。
属性如下表所示:
示例如下:
a1.sinks.k1.channel = c1 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = mytopic a1.sinks.k1.kafka.bootstrap.servers = localhost:9092 a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1 a1.sinks.k1.kafka.producer.compression.type = snappy
Flume HTTP Sink
HTTP Flume sink从Channel中获取事件消息,并使用HTTP POST请求将这些事件消息发送到远程服务器。事件内容作为POST主体发送。
下表显示属性名称及其对HTTP Flume Sink的描述:
示例如下:
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = http a1.sinks.k1.channel = c1 a1.sinks.k1.endpoint = http:// localhost:8080 / someuri a1.sinks.k1.connectTimeout = 2000 a1.sinks.k1.requestTimeout = 2000 a1.sinks.k1.acceptHeader = application / json a1.sinks.k1.contentTypeHeader = application / json a1.sinks.k1.defaultBackoff = true a1.sinks.k1.defaultRollback = true a1.sinks.k1.defaultIncrementMetrics = false a1.sinks.k1.backoff.4XX = false a1.sinks.k1.rollback.4XX = false a1.sinks.k1.incrementMetrics.4XX = true a1.sinks.k1.backoff.200 = false a1.sinks.k1.rollback.200 = false a1.sinks.k1.incrementMetrics.200 = true
Custom Sink
Custom Sink是用户实现的Sink接口。 在启动Flume agent时,自定义 Sink的类及其依赖项必须包含在类路径中。 自定义Sink的类型是其FQCN。
属性如下:
示例如下:
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = org.example.MySink a1.sinks.k1.channel = c1