Flink DataSet API编程指南

Flink中的DataSet程序是实现数据集转换(例如,过滤,映射,连接,分组)的。 数据集最初是从某些源创建的(例如,通过读取文件或从本地集合创建)。 结果通过接收器返回,接收器可以例如将数据写入(分布式)文件或标准输出(例如命令行终端)。 Flink程序可以在各种环境中运行,独立运行或嵌入到其他程序中。 执行可以在本地JVM中执行,也可以在许多计算机的集群上执行。

1. 示例程序

以下程序是WordCount的完整示例。 可以复制并粘贴代码以在本地运行它。 只需要在项目中包含正确的Flink库并指定导入。 

public class WordCountExample {
    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<String> text = env.fromElements(
            "Who's there?",
            "I think I hear them. Stand, ho! Who's there?");
        DataSet<Tuple2<String, Integer>> wordCounts = text
            .flatMap(new LineSplitter())
            .groupBy(0)
            .sum(1);
        wordCounts.print();
    }
    public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
            for (String word : line.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }
}

2. DataSet Transformations

数据转换将一个或多个DataSet转换为新的DataSet。 程序可以将多个转换组合到复杂的程序集中。

本节简要概述了可用的转换。 转换文档包含所有转换的完整描述和示例:

1542436069316674.png

1542436097521139.png

1542436143577361.png

1542436175291256.png

1542436214383861.png

1542436242196376.png

以下转换可用于元组的数据集:

1542436290310377.png

转换的并行性可以通过setParallelism(int)定义,withParameters(Configuration)传递Configuration对象,可以从用户函数内的open()方法访问它们。

3. Data Sources(数据源)

数据源创建初始数据集,例如来自文件或Java集合。 Flink附带了几种内置格式,可以从通用文件格式创建数据集。可以参考在ExecutionEnvironment上的方法。

基于文件:

readTextFile(path)/ TextInputFormat – 按行读取文件并将它们作为字符串返回。

readTextFileWithValue(path)/ TextValueInputFormat – 按行读取文件并将它们作为StringValues返回。 StringValues是可变字符串。

readCsvFile(path)/ CsvInputFormat – 解析基于分隔符的文件。返回元组或POJO的DataSet。支持基本java类型及其Value对应作为字段类型。

readFileOfPrimitives(path,Class)/ PrimitiveInputFormat – 解析新行(或其他char序列)分隔的原始数据类型(如String或Integer)的文件。

readFileOfPrimitives(path,delimiter,Class)/ PrimitiveInputFormat – 使用给定的分隔符解析新行(或其他char序列)分隔的原始数据类型(如String或Integer)的文件。

readSequenceFile(Key,Value,path)/ SequenceFileInputFormat – 创建JobConf并从指定路径读取文件,类型为SequenceFileInputFormat,Key class和Value类,并将它们返回为Tuple2 <Key,Value>。

基于集合:

fromCollection(Collection) – 从Java Java.util.Collection创建数据集。集合中的所有元素必须属于同一类型。

fromCollection(Iterator,Class) – 从迭代器创建数据集。该类指定迭代器返回的元素的数据类型。

fromElements(T …) – 根据给定的对象序列创建数据集。所有对象必须属于同一类型。

fromParallelCollection(SplittableIterator,Class) – 并行地从迭代器创建数据集。该类指定迭代器返回的元素的数据类型。

generateSequence(from,to) – 并行生成给定间隔中的数字序列。

通用:

readFile(inputFormat,path)/ FileInputFormat – 接受文件输入格式。

createInput(inputFormat)/ InputFormat – 接受通用输入格式。

代码示例如下:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// read text file from local files system
DataSet<String> localLines = env.readTextFile("file:///path/to/my/textfile");
// read text file from a HDFS running at nnHost:nnPort
DataSet<String> hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile");
// read a CSV file with three fields
DataSet<Tuple3<Integer, String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
                       .types(Integer.class, String.class, Double.class);
// read a CSV file with five fields, taking only two of them
DataSet<Tuple2<String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
                               .includeFields("10010")  // take the first and the fourth field
                       .types(String.class, Double.class);
// read a CSV file with three fields into a POJO (Person.class) with corresponding fields
DataSet<Person>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
                         .pojoType(Person.class, "name", "age", "zipcode");
// read a file from the specified path of type SequenceFileInputFormat
DataSet<Tuple2<IntWritable, Text>> tuples =
 env.readSequenceFile(IntWritable.class, Text.class, "hdfs://nnHost:nnPort/path/to/file");
// creates a set from some given elements
DataSet<String> value = env.fromElements("Foo", "bar", "foobar", "fubar");
// generate a number sequence
DataSet<Long> numbers = env.generateSequence(1, 10000000);
// Read data from a relational database using the JDBC input format
DataSet<Tuple2<String, Integer> dbData =
    env.createInput(
      JDBCInputFormat.buildJDBCInputFormat()
                     .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
                     .setDBUrl("jdbc:derby:memory:persons")
                     .setQuery("select name, age from persons")
                     .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
                     .finish()
    );

4. Data Sink (数据接收器)

数据接收器使用DataSet并用于存储或返回它们。使用OutputFormat描述数据接收器操作。 Flink带有各种内置输出格式,这些格式封装在DataSet的方法上:

writeAsText()/ TextOutputFormat – 将元素按行顺序写入字符串。通过调用每个元素的toString()方法获得字符串。

writeAsFormattedText()/ TextOutputFormat – 将字符串行写为字符串。通过为每个元素调用用户定义的format()方法来获取字符串。

writeAsCsv(…)/ CsvOutputFormat – 将元组写为逗号分隔值文件。行和字段分隔符是可配置的。每个字段的值来自对象的toString()方法。

print()/ printToErr()/ print(String msg)/ printToErr(String msg) – 打印标准输出/标准错误流上每个元素的toString()值。可选地,可以提供前缀(msg),其前缀为输出。这有助于区分不同的打印调用。如果并行度大于1,则输出也将与生成输出的任务的标识符一起添加。

write()/ FileOutputFormat – 自定义文件输出的方法和基类。支持自定义对象到字节的转换。

output()/ OutputFormat – 最通用的输出方法,用于非基于文件的数据接收器(例如将结果存储在数据库中)。

可以将DataSet输入到多个操作。程序可以编写或打印数据集,同时对它们执行其他转换。

代码示例如下:

// text data
DataSet<String> textData = // [...]
// write DataSet to a file on the local file system
textData.writeAsText("file:///my/result/on/localFS");
// write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort
textData.writeAsText("hdfs://nnHost:nnPort/my/result/on/localFS");
// write DataSet to a file and overwrite the file if it exists
textData.writeAsText("file:///my/result/on/localFS", WriteMode.OVERWRITE);
// tuples as lines with pipe as the separator "a|b|c"
DataSet<Tuple3<String, Integer, Double>> values = // [...]
values.writeAsCsv("file:///path/to/the/result/file", "\n", "|");
// this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
values.writeAsText("file:///path/to/the/result/file");
// this writes values as strings using a user-defined TextFormatter object
values.writeAsFormattedText("file:///path/to/the/result/file",
    new TextFormatter<Tuple2<Integer, Integer>>() {
        public String format (Tuple2<Integer, Integer> value) {
            return value.f1 + " - " + value.f0;
        }
    });

5. Broadcast Variables(广播变量)

除了常规的操作输入之外,广播变量还允许为操作的所有并行实例提供数据集。 这对于辅助数据集或与数据相关的参数化非常有用, 然后,运算符操作可以将数据集作为集合访问。

Broadcast:广播集通过withBroadcastSet(DataSet,String)按名称注册

Access:可通过目标运算符的getRuntimeContext().getBroadcastVariable(String)访问。

// 1. The DataSet to be broadcast
DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3);
DataSet<String> data = env.fromElements("a", "b");
data.map(new RichMapFunction<String, String>() {
    @Override
    public void open(Configuration parameters) throws Exception {
      // 3. Access the broadcast DataSet as a Collection
      Collection<Integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");
    }
    @Override
    public String map(String value) throws Exception {
        ...
    }
}).withBroadcastSet(toBroadcast, "broadcastSetName"); // 2. Broadcast the DataSet

6. Distributed Cache(分布式缓存)

Flink提供了一个分布式缓存,类似于Apache Hadoop,可以让并行的实例在本地访问某些文件和用户函数。 此功能可用于共享包含静态外部数据的文件,如机器学习的回归模型。

缓存的工作原理如下: 程序将其ExecutionEnvironment中的本地或远程文件系统(如HDFS或S3)的文件或目录注册为缓存文件。 执行程序时,Flink会自动将文件或目录复制到所有工作程序的本地文件系统。 用户函数可以查找指定的文件或目录,并从worker的本地文件系统访问它:

分布式缓存使用如下,代码示例:

在ExecutionEnvironment中注册文件或目录:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// register a file from HDFS
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")
// register a local executable file (script, executable, ...)
env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)
// define your program and execute
...
DataSet<String> input = ...
DataSet<Integer> result = input.map(new MyMapper());
...
env.execute();

访问用户函数中的缓存文件或目录(此处为MapFunction)。 该函数必须扩展RichFunction类,因为它需要访问RuntimeContext:

// extend a RichFunction to have access to the RuntimeContext
public final class MyMapper extends RichMapFunction<String, Integer> {
    @Override
    public void open(Configuration config) {
      // access cached file via RuntimeContext and DistributedCache
      File myFile = getRuntimeContext().getDistributedCache().getFile("hdfsFile");
      // read the file (or navigate the directory)
      ...
    }
    @Override
    public Integer map(String value) throws Exception {
      // use content of cached file
      ...
    }
}
Flink DataSet API编程指南

发表评论

电子邮件地址不会被公开。 必填项已用*标注

× 一 = 7