Flink DataStream API 编程指南

Flink DataStream API 编程指南

Flink中的DataStream程序是对数据流进行转换(例如,过滤、更新状态、定义窗口、聚合)的常用方式。数据流起于各种sources(例如,消息队列,socket流,文件)。通过sinks返回结果,例如将数据写入文件或标准输出(例如命令行终端)。Flink程序可以运行在各种上下文环境中,独立或嵌入其他程序中。 执行过程可能发生在本地JVM或在由许多机器组成的集群上。

1. 示例程序

以下是基于流式窗口进行word count的一个完整可运行的程序示例,它从网络socket中以5秒的窗口统计单词个数。你可以复制并粘贴代码用以本地运行。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class WindowWordCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Tuple2<String, Integer>> dataStream = env
                .socketTextStream("localhost", 9999)
                .flatMap(new Splitter())
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .sum(1);
        dataStream.print();
        env.execute("Window WordCount");
    }
    public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word: sentence.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }
}

要运行示例程序,首先在终端启动netcat作为输入流:

nc -lk 9999

然后输入一些单词,回车换行输入新一行的单词。这些输入将作为示例程序的输入。如果要使得某个单词的计数大于1,请在5秒钟内重复输入相同的字词(如果5秒钟输入相同单词对你来说太快,请把示例程序中的窗口大小从5秒调大 )。

2. DataStream Transformations

数据的Transformations可以将一个或多个DataStream转换为一个新的DataStream。程序可以将多种Transformations组合成复杂的拓扑结构。

本节对所有可用Transformations进行详细说明。

1542424543878159.png

1542424593317374.png

1542424628543465.png

1542424668510560.png

1542424827213526.png

1542424870167070.png

以下transformations可用于元组类型的DataStream:

1542424926217302.png

3. 物理分区

在一个transformation之后,Flink也提供了底层API以允许用户在必要时精确控制流分区,参见如下的函数:

1542424999509973.png

4. 任务链接(chaining) 和资源组

链接(chaining)两个依次的transformations意味着将它们运行在同一个线程中以获得更好的性能。Flink默认情况下尽可能进行该链接操作(比如两个依次的map transformations),同时Flink根据需要提供API对链接进行细粒度控制:

如果不想在整个Job上进行默认的链接优化,可以设置StreamExecutionEnvironment.disableOperatorChaining()。下面的函数可用于更细粒度的控制链接。注意这些函数只能用在一个DataStream transformation之后,因为它们是指向先前的transformation。例如,你可以someStream.map(…).startNewChain(), 但是你不能someStream.startNewChain().

Flink中的一个资源组是一个slot, 必要时可以手动隔离不同slots中的operators。

1542425074135859.png

5. 数据Sources

Sources是程序读取输入的地方。可以通过StreamExecutionEnvironment.addSource(sourceFunction)将Source添加到程序中。Flink提供了若干已经实现好了的source functions,当然也可以通过实现SourceFunction来自定义非并行的source或者实现ParallelSourceFunction接口或者扩展RichParallelSourceFunction来自定义并行的source,

StreamExecutionEnvironment中可以使用以下几个已实现的stream sources:

基于文件:

readTextFile(path) – 读取文本文件,即符合TextInputFormat规范的文件,并将其作为字符串返回。

readFile(fileInputFormat, path) – 根据指定的文件输入格式读取文件(一次)。

readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) – 这是上面两个方法内部调用的方法。它根据给定的fileInputFormat和读取路径读取文件。根据提供的watchType,这个source可以定期(每隔interval毫秒)监测给定路径的新数据(FileProcessingMode.PROCESS_CONTINUOUSLY),或者处理一次路径对应文件的数据并退出(FileProcessingMode.PROCESS_ONCE)。你可以通过pathFilter进一步排除掉需要处理的文件。

实现:

在具体实现上,Flink把文件读取过程分为两个子任务,即目录监控和数据读取。每个子任务都由单独的实体实现。目录监控由单个非并行(并行度为1)的任务执行,而数据读取由并行运行的多个任务执行。后者的并行性等于作业的并行性。单个目录监控任务的作用是扫描目录(根据watchType定期扫描或仅扫描一次),查找要处理的文件并把文件分割成切分片(splits),然后将这些切分片分配给下游reader。reader负责读取数据。每个切分片只能由一个reader读取,但一个reader可以逐个读取多个切分片。

注意:

如果watchType设置为FileProcessingMode.PROCESS_CONTINUOUSLY,则当文件被修改时,其内容将被重新处理。这会打破“exactly-once”语义,因为在文件末尾附加数据将导致其所有内容被重新处理。

如果watchType设置为FileProcessingMode.PROCESS_ONCE,则source仅扫描路径一次然后退出,而不等待reader完成文件内容的读取。当然reader会继续阅读,直到读取所有的文件内容。关闭source后就不会再有检查点。这可能导致节点故障后的恢复速度较慢,因为该作业将从最后一个检查点恢复读取。

基于 Socket:

socketTextStream – 从socket读取。元素可以用分隔符切分。

基于集合:

fromCollection(Collection) – 从Java的Java.util.Collection创建数据流。集合中的所有元素类型必须相同。

fromCollection(Iterator, Class) – 从一个迭代器中创建数据流。Class指定了该迭代器返回元素的类型。

fromElements(T …) – 从给定的对象序列中创建数据流。所有对象类型必须相同。

fromParallelCollection(SplittableIterator, Class) – 从一个迭代器中创建并行数据流。Class指定了该迭代器返回元素的类型。

generateSequence(from, to) – 创建一个生成指定区间范围内的数字序列的并行数据流。

自定义:

addSource – 添加一个新的source function。例如,可以addSource(new FlinkKafkaConsumer08<>(…))以从Apache Kafka读取数据。

6. 数据Sinks

数据sinks消费DataStream并将其发往文件、socket、外部系统或进行打印。Flink自带多种内置的输出格式,这些都被封装在对DataStream的操作背后:

writeAsText() / TextOutputFormat – 将元素以字符串形式写入。字符串    通过调用每个元素的toString()方法获得。

writeAsCsv(…) / CsvOutputFormat – 将元组写入逗号分隔的csv文件。行和字段    分隔符均可配置。每个字段的值来自对象的toString()方法。

print() / printToErr() – 打印每个元素的toString()值到标准输出/错误输出流。可以配置前缀信息添加到输出,以区分不同print的结果。如果并行度大于1,则task id也会添加到输出前缀上。

writeUsingOutputFormat() / FileOutputFormat – 自定义文件输出的方法/基类。支持自定义的对象到字节的转换。

writeToSocket – 根据SerializationSchema把元素写到socket。

addSink – 调用自定义sink function。Flink自带了很多连接其他系统的连接器(connectors)(如      Apache Kafka),这些连接器都实现了sink function。

请注意,DataStream上的write*()方法主要用于调试目的。它们没有参与Flink的检查点机制,这意味着这些function通常都有 at-least-once语义。数据刷新到目标系统取决于OutputFormat的实现。这意味着并非所有发送到OutputFormat的元素都会立即在目标系统中可见。此外,在失败的情况下,这些记录可能会丢失。

为了可靠,在把流写到文件系统时,使用flink-connector-filesystem来实现exactly-once。此外,通过.addSink(…)方法自定义的实现可以参与Flink的检查点机制以实现exactly-once语义。 

7. 迭代(Iterations)

迭代流程序实现一个step function并将其嵌入到IterativeStream中。由于这样的DataStream程序可能永远不会结束,所以没有最大迭代次数。事实上你需要指定哪一部分的流被反馈到迭代过程,哪个部分通过split 或filter transformation向下游转发。在这里,我们展示一个使用过滤器的例子。首先,我们定义一个IterativeStream

IterativeStream<Integer> iteration = input.iterate();

然后,我们使用一系列transformations来指定在循环内执行的逻辑(这里示意一个简单的map transformation)

DataStream<Integer> iterationBody = iteration.map(/* this is executed many times */);

要关闭迭代并定义迭代尾部,需要调用IterativeStream的closeWith(feedbackStream)方法。传给closeWith function的DataStream将被反馈给迭代的头部。一种常见的模式是使用filter来分离流中需要反馈的部分和需要继续发往下游的部分。这些filter可以定义“终止”逻辑,以控制元素是流向下游而不是反馈迭代。

iteration.closeWith(iterationBody.filter(/* one part of the stream */));
DataStream<Integer> output = iterationBody.filter(/* some other part of the stream */);

默认情况下,反馈流的分区将自动设置为与迭代的头部的输入分区相同。用户可以在closeWith方法中设置一个可选的boolean标志来覆盖默认行为。

例如,如下程序从一系列整数连续减1,直到它们达到零:

DataStream<Long> someIntegers = env.generateSequence(0, 1000);
IterativeStream<Long> iteration = someIntegers.iterate();
DataStream<Long> minusOne = iteration.map(new MapFunction<Long, Long>() {
  @Override
  public Long map(Long value) throws Exception {
    return value - 1 ;
  }
});
DataStream<Long> stillGreaterThanZero = minusOne.filter(new FilterFunction<Long>() {
  @Override
  public boolean filter(Long value) throws Exception {
    return (value > 0);
  }
});
iteration.closeWith(stillGreaterThanZero);
DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() {
  @Override
  public boolean filter(Long value) throws Exception {
    return (value <= 0);
  }
});

8. 执行参数

StreamExecutionEnvironment包含ExecutionConfig,它允许为作业运行时进行配置。

更多配置参数请参阅execution configuration (https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/execution_configuration.html)。下面的参数是DataStream API特有的:

enableTimestamps() / disableTimestamps(): 如果启用,则从source发出的每一条记录都会附加一个时间戳。 areTimestampsEnabled() 返回当前是否启用该值。

setAutoWatermarkInterval(long milliseconds): 设置自动发射watermark的间隔,可以通过long getAutoWatermarkInterval()获取当前的发射间隔。

9. 延迟控制

默认情况下,元素不会逐个传输(这将导致不必要的网络流量),而是被缓冲的。缓冲(实际是在机器之间传输)的大小可以在Flink配置文件中设置。虽然这种方法对于优化吞吐量有好处,但是当输入流不够快时,它可能会导致延迟问题。要控制吞吐量和延迟,你可以在execution environment(或单个operator)上使用env.setBufferTimeout(timeoutMillis)来设置缓冲区填满的最大等待时间。如果超过该最大等待时间,即使缓冲区未满,也会被自动发送出去。该最大等待时间默认值为100 ms。

LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setBufferTimeout(timeoutMillis);
env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);

为了最大化吞吐量,可以设置setBufferTimeout(-1),这样就没有了超时机制,缓冲区只有在满时才会发送出去。为了最小化延迟,可以把超时设置为接近0的值(例如5或10 ms)。应避免将该超时设置为0,因为这样可能导致性能严重下降。

10. 调试

在分布式集群中运行Streaming程序之前,最好确保实现的算法可以正常工作。因此,实施数据分析程序通常是一个渐进的过程:检查结果,调试和改进。

Flink提供了诸多特性来大幅简化数据分析程序的开发:你可以在IDE中进行本地调试,注入测试数据,收集结果数据。本节给出一些如何简化Flink程序开发的指导。

本地执行环境

LocalStreamEnvironment会在其所在的进程中启动一个Flink引擎. 如果你在IDE中启动LocalEnvironment,你可以在你的代码中设置断点,轻松调试你的程序。

一个LocalEnvironment的创建和使用示例如下:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
DataStream<String> lines = env.addSource(/* some source */);
// build your program
env.execute();

基于集合的数据Sources

Flink提供了基于Java集合实现的特殊数据sources用于测试。一旦程序通过测试,它的sources和sinks可以方便的替换为从外部系统读写的sources和sinks。

基于集合的数据Sources可以像这样使用:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
// Create a DataStream from a list of elements
DataStream<Integer> myInts = env.fromElements(1, 2, 3, 4, 5);
// Create a DataStream from any Java collection
List<Tuple2<String, Integer>> data = ...
DataStream<Tuple2<String, Integer>> myTuples = env.fromCollection(data);
// Create a DataStream from an Iterator
Iterator<Long> longIt = ...
DataStream<Long> myLongs = env.fromCollection(longIt, Long.class);

注意:目前,集合数据source要求数据类型和迭代器实现Serializable。并行度 = 1)。

迭代的数据Sink

Flink还提供了一个sink来收集DataStream的测试和调试结果。它可以这样使用:

import org.apache.flink.contrib.streaming.DataStreamUtils
DataStream<Tuple2<String, Integer>> myResult = ...
Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult)

Flink DataStream API 编程指南

发表评论

电子邮件地址不会被公开。 必填项已用*标注

二 × = 16