Apache Flume Sink – Flume Sink类型

1. Apache Flume Sink

Flume Sink组件用来将数据存储到Flume的制定的目标存储中。 在本章中,将学习什么是Flume Sink。 此外,将会看到 Flume Sink 的类型如 HDFS Sink, Hive Sink, Logger Sink, Thrift Sink, IRC Sink, File Roll Sink, HBase Sink, MorphlineSolrSink, ElasticSearchSink, Kite Dataset Sink, Flume Kafka Sink, HTTP Sink, Custom Sink等。

2.什么是Apache Flume Sink

将数据存储到目标存储中时,可以使用Flume Sink组件, 目标存储可以是诸如HBase和HDFS分布式存储系统。它吸收来自Channel的事件消息,然后将其传送到目标地点。 Sink的目的地可能是另一个Flume Agent或 存储系统,如HDFS Sink。

需要注意的是,同一个agent有可能具有多个source,sink和channel。

请看下图Apache Flume Sink的类型:

——add picture here——

Apache Flume Sink

3. Apache Flume Sink的类型

下面将详细讨论一下 Flume Sink的几种类型:

HDFS Sink

当需要将事件消息写入到Hadoop分布式文件系统(HDFS)时,可以使用HDFS Sink in Flume。 它支持创建文本和序列文件,还支持对两种文件类型进行压缩。 并且可以根据时间长短,数据大小或消息事件的数量,创建适当的新文件,当创建 新文件时,意味着关闭当前文件并创建一个新文件。  

HDFS Flume Sink支持以下转义序列:

1542633238620128

1542633311702816.png

1542633350283997.png

HDFS Sink 的常用配置参数如下:

1542633450925519.png

1542633487493385.png

1542633533260899.png

请看如下 agent a1的一个例子:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = / flume / events /%y-%m-%d /%H%M /%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit =minute

Hive Sink

Hive Sink将包含分隔符文本或JSON数据的事件消息直接写入Hive表或分区中。 事件消息是通过Hive事务编写的。 如果将一组事件消息提交给Hive,对Hive查询可以检索到提交的事件消息。 Sink 写入的分区可以是预先创建的,也可以由Flume创建。 事件消息数据的字段映射到Hive表中的相应列。

请看 Hive Sink 的主要属性:

1542633667945729.png

1542634440330573.png

1542634528679705.png

下面看一个Hive Sink 的示例:

create table weblogs ( id int , msg string )
partitioned by (continent string, country string, time string)
clustered by (id) into 5 buckets
stored as orc;

代理 Hive Sink a1 如下所示:

a1.channels = c1
a1.channels.c1.type = memory
a1.sinks = k1
a1.sinks.k1.type = hive
a1.sinks.k1.channel = c1
a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083
a1.sinks.k1.hive.database = logsdb
a1.sinks.k1.hive.table = weblogs
a1.sinks.k1.hive.partition = asia,%{country},%y-%m-%d-%H-%M
a1.sinks.k1.useLocalTimeStamp = false
a1.sinks.k1.round = true
a1.sinks.k1.roundValue = 10
a1.sinks.k1.roundUnit = minute
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = “t”
a1.sinks.k1.serializer.serdeSeparator = ‘t’
a1.sinks.k1.serializer.fieldnames =id,,msg

Logger Sink

该Sink主要在INFO级别记录事件消息,通常用于测试/调试目的。

请看下列的属性:

1542634660384026.png

请看下列 Agent a1的例子:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1

Avro Sink

Avro Sink 和 Avro Source一起工作,用于构建Flume分层收集数据消息结构。 发送到此接收器的流量事件转换为Avro消息事件并发送到配置的主机名/端口。 该Sink将从已配置的Channel中以批量的方式被读取。

请看下面的主要的Avro Sink 属性:

1542634759428087.png

1542634797290941.png

如下 Agent a1的Apache Flume Avro示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545

Thrift Sink

Thrift Sink 和 Thrift Source用于构建Flume分层收集数据消息结构。 发送到该接收器的流量事件转换为Thrift事件并发送到配置的主机名/端口。该Sink将从已配置的Channel中以批量的方式获取数据。

通过启用Kerberos身份验证,该sink  可以以安全模式启动。 如果要和安全模式下启动的Thrift Source进行通信,Thrift source也应以安全模式运行。 client-principal和client-keytab是Thrift sink用于向kerberos KDC进行身份验证的属性。 server-principal用于设置此 sink 配置为以安全模式连接到的Thrift source时的Principal。

主要属性如下:

1542634926414046.png

1542634968901980.png

请看如下 agent a1的示例:

a1.channels = c1

a1.sinks = k1

a1.sinks.k1.type = thrift

a1.sinks.k1.channel = c1

a1.sinks.k1.hostname = 10.10.10.10

a1.sinks.k1.port = 4545

IRC Sink

IRC Sink接收来自 Channel的消息事件,并将它们转发给配置的IRC目的地。

下表列举出该Sink 的主要属性:

1542635006588005.png

请看如下 agent a1的示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = thrift
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545

IRC Sink

IRC Sink接收来自 Channel的消息事件,并将它们转发给配置的IRC目的地。

下表列举出该Sink 的主要属性:

1542635165226408.png

1542635200111882.png

请看如下的示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = irc
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = irc.yourdomain.com
a1.sinks.k1.nick = flume
a1.sinks.k1.chan = #flume

File Roll Sink

File Flume Sink用于在本地文件系统上存储事件消息。 

下表列举了主要属性名称及其对File Roll Flume Sink的描述:

1542635327577832.png

请看如下示例:

a1.channels = c1 
a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume

Null Sink

Null Flume 丢弃从Channel中接收的所有事件消息。 

下表显示了属性名称及其对Null Flume Sink的描述:

1542635435762887.png

请看下面示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = null
a1.sinks.k1.channel = c1

HBase Sink

该Sink将数据写入HBase。 Hbase的配置信息从 hbase-site.xml中获取。使用配置指定的HbaseEventSerializer的类将事件消息存入到Hbase存储 。 当Hbase未能写入某些事件消息时,该Flume Sink将重新写入该事务中的所有事件消息。 

下表显示属性名称及其对HBase Flume Sink的描述:

1542635573812978.png

示例如下:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hbase
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
a1.sinks.k1.channel = c1

MorphlineSolrSink

该Sink从Flume事件中提取数据并进行转换,并将事件消息实时加载到Apache Solr服务器中,后者又将查询提供给最终用户或搜索应用程序。

该 Sink非常适合将原始数据流写入HDFS(通过HdfsSink)并同时对数据进行提取,转换并加载到Solr(通过MorphlineSolrSink)的用例。 该Sink可以处理来自不同数据源的任意异构的原始数据,并将其转换为对搜索应用程序有用的数据模型。

常见的属性如下:

1542635692179322.png

示例如下:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink
a1.sinks.k1.channel = c1
a1.sinks.k1.morphlineFile = /etc/flume-ng/conf/morphline.conf
#a1.sinks.k1.morphlineId = morphline1
#a1.sinks.k1.batchSize = 1000
#a1.sinks.k1.batchDurationMillis = 1000

ElasticSearchSink

该Sink将数据写入到elasticsearch集群,可以在Kibana图形界面可以显示写入的消息事件。需要将elasticsearch 所需的lucene-core jar放置在Apache Flume安装的lib目录中。

默认情况下,ElasticSearchLogStashEventSerializer会将事件消息序列化以便elasticsearch使用。该参数可以被覆盖,请参考org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer或org.apache.flume.sink.elasticsearch.ElasticSearchIndexRequestBuilderFactory的实现。

下面是ElasticSearch的主要属性:

1542635827695237.png

1542635901124375.png

示例如下所示:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = elasticsearch
a1.sinks.k1.hostNames = 127.0.0.1:9200,127.0.0.2:9300
a1.sinks.k1.indexName = foo_index
a1.sinks.k1.indexType = bar_type
a1.sinks.k1.clusterName = foobar_cluster
a1.sinks.k1.batchSize = 500
a1.sinks.k1.ttl = 5d
a1.sinks.k1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
a1.sinks.k1.channel = c1

Kafka Sink

通过该Sink可将事件消息数据发布到Kafka topic 上。其目标是将Flume与Kafka集成,以便基于拉式的处理系统可以处理来自各种Flume Source的数据。 目前支持Kafka 0.9.x以上系列版本。

属性如下表所示:

1542636031133044.png

示例如下:

a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy

Flume  HTTP Sink

HTTP Flume sink从Channel中获取事件消息,并使用HTTP POST请求将这些事件消息发送到远程服务器。事件内容作为POST主体发送。

下表显示属性名称及其对HTTP Flume Sink的描述:

1542636143399997.png

1542636191397583.png

示例如下:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = http
a1.sinks.k1.channel = c1
a1.sinks.k1.endpoint = http:// localhost:8080 / someuri
a1.sinks.k1.connectTimeout = 2000
a1.sinks.k1.requestTimeout = 2000
a1.sinks.k1.acceptHeader = application / json
a1.sinks.k1.contentTypeHeader = application / json
a1.sinks.k1.defaultBackoff = true
a1.sinks.k1.defaultRollback = true
a1.sinks.k1.defaultIncrementMetrics = false
a1.sinks.k1.backoff.4XX = false
a1.sinks.k1.rollback.4XX = false
a1.sinks.k1.incrementMetrics.4XX = true
a1.sinks.k1.backoff.200 = false
a1.sinks.k1.rollback.200 = false
a1.sinks.k1.incrementMetrics.200 = true

Custom Sink

Custom Sink是用户实现的Sink接口。 在启动Flume agent时,自定义 Sink的类及其依赖项必须包含在类路径中。 自定义Sink的类型是其FQCN。

属性如下:

1542636290842835.png

示例如下:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = org.example.MySink
a1.sinks.k1.channel = c1

Apache Flume Sink – Flume Sink类型

发表评论

电子邮件地址不会被公开。 必填项已用*标注

÷ 5 = 一