基本Flink API概念

Flink程序是实现分布式集合转换的流数据处理工具(例如,过滤,映射,更新状态,join,分组,定义时间窗口,聚合)。 流数据最初是从数据源创建的(例如,通过读取文件,kafka主题或从本地的内存中集合),结果通过接收器(Sink)返回,接收器可以将数据写入(分布式)文件或标准输出中(如命令行终端)。 Flink程序可以在各种环境中运行,独立运行或嵌入在其他程序中。 执行可以在本地JVM中执行,也可以在许多计算机的集群上执行。

根据数据源的类型,即有界或无界源,可以编写批处理程序或流程序,其中DataSet API用于批处理,DataStream API用于流数据处理。 本章将介绍两种API共有的基本概念。

1. DataSet和DataStream

Flink使用类DataSet和DataStream来表示程序中的数据。 可以将它们视为可以包含重复项的不可变数据集合。 在DataSet的情况下,数据是有限的,而对于DataStream,元素的数量可以是无限的。

这些集合在某些关键方面与常规Java集合不同。 首先,它们是不可变的,这意味着一旦创建它们就无法添加或删除元素, 也不能简单地检查里面的元素。

最初通过在Flink程序中添加源( Source )来创建集合,并通过使用诸如map,filter等API方法对它们进行转换来从这些集合中派生新集合。

2.Flink 程序执行过程

Flink程序看起来像是转换数据集合的通用程序, 每个程序包含相同的基本部分:

  • 获得执行环境,

  • 加载/创建初始数据,

  • 指定此数据的转换,

  • 指定放置计算结果的位置,

  • 触发程序执行。

现在将讲述每个步骤,请参阅相应部分以获取更多详细信息。 请注意,Java DataSet API的所有核心类都可以在org.apache.flink.api.java包中找到,而Java DataStream API的类可以在org.apache.flink.streaming.api中找到。

StreamExecutionEnvironment是所有Flink程序的基础。 可以在StreamExecutionEnvironment上使用这些静态方法获取:

getExecutionEnvironment()
createLocalEnvironment()
createRemoteEnvironment(String host, int port, String... jarFiles)

通常,只需要使用getExecutionEnvironment(),这将根据上下文执行正确的操作:如果在IDE中执行程序或作为常规Java程序,它将创建一个本地环境,将执行程序 在本地机器中。 如果从程序创建了一个JAR文件,并通过命令行调用它,则Flink集群管理器将执行main方法,getExecutionEnvironment()将返回一个执行环境,用于在集群上执行程序。

对于指定数据源,执行环境有几种方法可以使用各种方法从文件中读取:可以逐行读取它们,CSV文件或使用完全自定义数据输入格式。 要将文本文件作为一系列行读取,可以使用:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.readTextFile("file:///path/to/file");

这将提供一个DataStream,然后可以在其上应用转换来创建新的派生DataStream。

可以通过使用转换函数调用DataStream上的方法来应用转换。 例如,Map转换如下所示:

DataStream<String> input = ...;
DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() {
    @Override
    public Integer map(String value) {
        return Integer.parseInt(value);
    }
});

这将通过将原始集合中的每个String转换为Integer来创建新的DataStream。

一旦有了包含最终结果的DataStream,就可以通过创建接收器将其写入外部系统。 下面列举创建接收器的一些示例方法:

writeAsText(String path)
print()

需要通过调用StreamExecutionEnvironment上的execute()来触发程序执行。 根据ExecutionEnvironment的类型,将在本地计算机上触发执行或提交程序以在集群上执行。

execute()方法返回一个JobExecutionResult,它包含执行时间和结果。

3. Lazy Evaluation

所有Flink程序都是延时并计划执行的。当执行程序的main方法时,数据加载和转换不会直接发生。 而是创建每个操作并将其添加到程序的计划中。 当执行环境上的execute()调用显式触发执行时,实际执行操作。 程序是在本地执行还是在集群上执行取决于执行环境的类型。Lazy Evaluatoin 可以使Flink 构建相对复杂的程序并作为一个整体按照计划单元执行。

4. 指定 Key

某些转换(如join,coGroup,keyBy,groupBy)要求在元素集合上定义键。 其他转换(Reduce,GroupReduce,Aggregate,Windows)允许数据在应用之前在键上分组。

DataSet被分组为:

DataSet<...> input = // [...]
DataSet<...> reduced = input
  .groupBy(/*define key here*/)
  .reduceGroup(/*do something*/);

使用DataStream指定 Key:

DataStream<...> input = // [...]
DataStream<...> windowed = input
  .keyBy(/*define key here*/)
  .window(/*window specification*/);

Flink的数据模型不基于键值对。 因此,无需将数据集类型转换为键值对。 键用于对数据流进行Join 和 分组等操作。

注意:在下面的讨论中,将使用DataStream API和keyBy。 对于DataSet API,只需要用DataSet和groupBy替换。

5. 定义元组的键

最简单的情况是在元组的一个或多个字段上对元组进行分组:

DataStream<Tuple3<Integer,String,Long>> input = // [...]
KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0)

上述示例中,元组在第一个字段(整数类型)上分组。

DataStream<Tuple3<Integer,String,Long>> input = // [...]
KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0,1)

在这里,将元组分组在由第一个和第二个字段组成的复合键上。

关于嵌套元组:如果有一个带有嵌套元组的DataStream,例如:

DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;

指定keyBy(0)将导致系统使用Tuple2作为键(以Integer和Float为键)。 如果要用到嵌套在Tuple2 键值,则须使用将要解释的字段表达式来引用键。

6. 使用Field Expressions定义键

可以使用基于字符串的字段表达式来引用嵌套字段,并定义用于分组,排序,连接或coGrouping的键。

字段表达式可以非常轻松地选择(嵌套)复合类型中的字段,例如Tuple和POJO类型。

在下面的示例中,有一个WC POJO,其中包含两个字段“word”和“count”。 要按字段word分组,只需将其名称传递给keyBy()函数。

// some ordinary POJO (Plain old Java Object)
public class WC {
  public String word;
  public int count;
}
DataStream<WC> words = // [...]
DataStream<WC> wordCounts = words.keyBy("word").window(/*window specification*/);

字段表达式语法:

按字段名称选择POJO字段。 例如,“user”指的是POJO类型的“user”字段;

按字段名称或0偏移字段索引选择元组字段。 例如,“0”和“5”分别表示Java元组类型的第一和第六字段;

可以在POJO和Tuples中选择嵌套字段。 例如,“user.zip”指的是POJO的“zip”字段,其存储在POJO类型的“user”字段中。 支持任意嵌套和混合POJO和元组,例如“f1.user.zip”或“user.f3.1.zip”;

可以使用“*”通配符表达式选择完整的类型。 适用于非Tuple或POJO类型。

示例如下:

public static class WC {
  public ComplexNestedClass complex; //nested POJO
  private int count;
  // getter / setter for private field (count)
  public int getCount() {
    return count;
  }
  public void setCount(int c) {
    this.count = c;
  }
}
public static class ComplexNestedClass {
  public Integer someNumber;
  public float someFloat;
  public Tuple3<Long, Long, String> word;
  public IntWritable hadoopCitizen;
}

上面示例代码的有效字段表达式如下:

“count”:WC类中的count字段;
“complex”:递归选择POJO类型ComplexNestedClass的字段复合体的所有字段;
“complex.word.f2”:选择嵌套Tuple3的最后一个字段;
“complex.hadoopCitizen”:选择Hadoop IntWritable类型;

使用Key Selector功能定义键

定义键的另一种方法是“Key Selector”功能。 Key Selector函数将单个元素作为输入并返回元素的键。 Key可以是任何类型,并且可以从任意计算中导出。

以下示例显示了一个Key Selector函数,它只返回一个对象的字段:

// some ordinary POJO
public class WC {public String word; public int count;}
DataStream<WC> words = // [...]
KeyedStream<WC> kyed = words
  .keyBy(new KeySelector<WC, String>() {
     public String getKey(WC wc) { return wc.word; }
   });

7. 指定转换函数

大多数转换都需要用户定义的函数。 本节列出了如何指定transformation的不同方法。

实现接口

最基本的方法是实现一个提供的接口:

class MyMapFunction implements MapFunction<String, Integer> {
  public Integer map(String value) { return Integer.parseInt(value); }
});
data.map(new MyMapFunction());

匿名类

可以将函数作为匿名类传递:

data.map(new MapFunction<String, Integer> () {
  public Integer map(String value) { return Integer.parseInt(value); }
});

Java 8 Lambdas

Flink还支持Java API中的Java 8 Lambdas:

data.filter(s -> s.startsWith("http://"));
data.reduce((i1,i2) -> i1 + i2);

Rich Function

用户定义函数的所有转换都可以将Rich Function作为参数。 如下:

class MyMapFunction implements MapFunction<String, Integer> {
  public Integer map(String value) { return Integer.parseInt(value); }
});

可以写成:

class MyMapFunction extends RichMapFunction<String, Integer> {
  public Integer map(String value) { return Integer.parseInt(value); }
});

将函数传递给Map转换:

data.map(new MyMapFunction());

Rich Function可以被定义成匿名类,如下:

data.map (new RichMapFunction<String, Integer>() {
  public Integer map(String value) { return Integer.parseInt(value); }
});

除了用户定义的函数(map,reduce等)之外,Rich函数还提供了四种方法:open,close,getRuntimeContext和setRuntimeContext。 这些用于参数化函数,创建和完成本地状态,访问广播变量以及访问运行时信息(如累加器和计数器)。

8. 支持的数据类型

Flink对可以在DataSet或DataStream中的元素类型进行了一些限制。 Flink通过分析类型以确定有效的执行策略。

有七种不同类别的数据类型:

  • Java Tuples 和Scala Case Classes

  • Java POJOs

  • Primitive Types

  • Regular Classes

  • Values

  • Hadoop Writables

  • Special Types

Java Tuples 和Scala Case Classes

元组是包含固定数量的具有各种类型的字段的复合类型。 Java API提供从Tuple1到Tuple25的类。 元组的每个字段都可以是包含更多元组的任意Flink类型,从而产生嵌套元组。 可以使用字段名称tuple.f4或使用通用getter方法tuple.getField(int position)直接访问元组的字段。 字段索引从0开始。

DataStream<Tuple2<String, Integer>> wordCounts = env.fromElements(
    new Tuple2<String, Integer>("hello", 1),
    new Tuple2<String, Integer>("world", 2));
wordCounts.map(new MapFunction<Tuple2<String, Integer>, Integer>() {
    @Override
    public Integer map(Tuple2<String, Integer> value) throws Exception {
        return value.f1;
    }
});
wordCounts.keyBy(0); // also valid .keyBy("f0")

POJO

如果满足以下要求,则Flink将Java和Scala类视为特殊的POJO数据类型:

该类必须为公开类;

必须有一个没有参数的公共构造函数(默认构造函数);

所有字段都是公共的,或者必须通过getter和setter函数访问。 对于名为foo的字段,getter和setter方法必须命名为getFoo()和setFoo();

Flink必须支持字段的类型。 目前,Flink使用Avro序列化任意对象(例如Date);

Flink分析POJO类型的结构,Flink了解POJO的字段。 因此,POJO类型比一般类型更容易使用。 此外,Flink可以比一般类型更有效地处理POJO;

以下示例显示了一个包含两个公共字段的简单POJO:

public class WordWithCount {
    public String word;
    public int count;
    public WordWithCount() {}
    public WordWithCount(String word, int count) {
        this.word = word;
        this.count = count;
    }
}
DataStream<WordWithCount> wordCounts = env.fromElements(
    new WordWithCount("hello", 1),
    new WordWithCount("world", 2));
wordCounts.keyBy("word"); // key by field expression "word"

Primitive Types

Flink支持所有Java和Scala原始类型,如Integer,String和Double。

Regular Classes

Flink支持大多数Java和Scala类(API和自定义),有限制地适用于包含无法序列化的字段的类,如文件指针,I / O流或其他本机资源。 遵循Java Beans约定的类通常可以很好地工作。

所有未标识为POJO类型的类都由Flink作为常规类类型处理。 Flink将这些数据类型视为黑盒并且无法访问其内容用于有效的排序。 Flink使用序列化框架Kryo对常规类型进行反序列化。

Values

Values类型用户定义地描述它们的序列化和反序列化,不是通过通用序列化框架,而是通过使用read和write方法实现org.apache.flinktypes.Value接口来为这些操作提供自定义代码。当通用序列化效率非常低时,可以使用该类型。一个示例是将元素的稀疏向量实现为数组的数据类型,如知道数组大部分元素为零,可以对非零元素使用特殊编码,而通用序列化只能编写所有数组元素。

org.apache.flinktypes.CopyableValue接口以类似的方式支持用户自定义内部复制的逻辑。

Flink带有与基本数据类型对应的预定义值类型。 (ByteValue,ShortValue,IntValue,LongValue,FloatValue,DoubleValue,StringValue,CharValue,BooleanValue)。这些Value类型充当基本数据类型的可变变体:它们的值可以被更改,允许用户重用对象。

Hadoop Writables

可以使用实现org.apache.hadoop.Writable接口的类型。 write()和readFields()方法中定义的序列化逻辑将用于序列化。

Special Types

可以使用特殊类型,包括Scala的Either,Option和Try。 Java API有自定义Either实现。 与Scala的Either类似,它代表两种可能类型的值。 两者都可用于错误处理或需要输出两种不同类型记录的运算符。

9. Accumulators & Counters (累加器和计数器)

累加器是具有添加操作和最终累积结果的简单构造,通常在作业结束后获得并使用。

最直接的累加器是一个计数器:可以使用Accumulator.add(V value)方法递增它。在工作结束时,Flink将汇总所有部分结果并将结果发送给客户。在调试过程中, 如果想了解有关数据的信息,累加器非常有用。

Flink目前有以下内置累加器。它们中的每一个都实现了Accumulator接口。

IntCounter,LongCounter和DoubleCounter;

Histogram(直方图):离散数量的区间的直方图实现。在内部,它是一个从Integer到Integer的映射。可以使用它来计算值的分布,例如字数统计程序的每行字数分布。

下面介绍如何使用累加器:

首先,必须在要使用它的用户定义转换函数中创建累加器对象(此处为计数器):

private IntCounter numLines = new IntCounter();

其次,必须注册累加器对象,通常在Rich function的open()方法中。在这里还可以定义名称:

getRuntimeContext().addAccumulator("num-lines", this.numLines);

现在可以在运算符函数中的任何位置使用累加器,包括open()和close()方法。

this.numLines.add(1);

整个结果将存储在JobExecutionResult对象中,该对象是从执行环境的execute()方法返回的(当前这仅在执行等待作业完成时才有效)。

myJobExecutionResult.getAccumulatorResult("num-lines")

定制累加器

要实现自己的累加器,只需编写Accumulator接口的实现即可。可以选择实现Accumulator或SimpleAccumulator 接口。

基本Flink API概念

发表评论

邮箱地址不会被公开。 必填项已用*标注

四十 二 − 三十 四 =

滚动到顶部