Flink中的DataSet程序是实现数据集转换(例如,过滤,映射,连接,分组)的。 数据集最初是从某些源创建的(例如,通过读取文件或从本地集合创建)。 结果通过接收器返回,接收器可以例如将数据写入(分布式)文件或标准输出(例如命令行终端)。 Flink程序可以在各种环境中运行,独立运行或嵌入到其他程序中。 执行可以在本地JVM中执行,也可以在许多计算机的集群上执行。
内容
1. 示例程序
以下程序是WordCount的完整示例。 可以复制并粘贴代码以在本地运行它。 只需要在项目中包含正确的Flink库并指定导入。
public class WordCountExample { public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<String> text = env.fromElements( "Who's there?", "I think I hear them. Stand, ho! Who's there?"); DataSet<Tuple2<String, Integer>> wordCounts = text .flatMap(new LineSplitter()) .groupBy(0) .sum(1); wordCounts.print(); } public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String line, Collector<Tuple2<String, Integer>> out) { for (String word : line.split(" ")) { out.collect(new Tuple2<String, Integer>(word, 1)); } } } }
2. DataSet Transformations
数据转换将一个或多个DataSet转换为新的DataSet。 程序可以将多个转换组合到复杂的程序集中。
本节简要概述了可用的转换。 转换文档包含所有转换的完整描述和示例:
以下转换可用于元组的数据集:
转换的并行性可以通过setParallelism(int)定义,withParameters(Configuration)传递Configuration对象,可以从用户函数内的open()方法访问它们。
3. Data Sources(数据源)
数据源创建初始数据集,例如来自文件或Java集合。 Flink附带了几种内置格式,可以从通用文件格式创建数据集。可以参考在ExecutionEnvironment上的方法。
基于文件:
readTextFile(path)/ TextInputFormat – 按行读取文件并将它们作为字符串返回。
readTextFileWithValue(path)/ TextValueInputFormat – 按行读取文件并将它们作为StringValues返回。 StringValues是可变字符串。
readCsvFile(path)/ CsvInputFormat – 解析基于分隔符的文件。返回元组或POJO的DataSet。支持基本java类型及其Value对应作为字段类型。
readFileOfPrimitives(path,Class)/ PrimitiveInputFormat – 解析新行(或其他char序列)分隔的原始数据类型(如String或Integer)的文件。
readFileOfPrimitives(path,delimiter,Class)/ PrimitiveInputFormat – 使用给定的分隔符解析新行(或其他char序列)分隔的原始数据类型(如String或Integer)的文件。
readSequenceFile(Key,Value,path)/ SequenceFileInputFormat – 创建JobConf并从指定路径读取文件,类型为SequenceFileInputFormat,Key class和Value类,并将它们返回为Tuple2 <Key,Value>。
基于集合:
fromCollection(Collection) – 从Java Java.util.Collection创建数据集。集合中的所有元素必须属于同一类型。
fromCollection(Iterator,Class) – 从迭代器创建数据集。该类指定迭代器返回的元素的数据类型。
fromElements(T …) – 根据给定的对象序列创建数据集。所有对象必须属于同一类型。
fromParallelCollection(SplittableIterator,Class) – 并行地从迭代器创建数据集。该类指定迭代器返回的元素的数据类型。
generateSequence(from,to) – 并行生成给定间隔中的数字序列。
通用:
readFile(inputFormat,path)/ FileInputFormat – 接受文件输入格式。
createInput(inputFormat)/ InputFormat – 接受通用输入格式。
代码示例如下:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // read text file from local files system DataSet<String> localLines = env.readTextFile("file:///path/to/my/textfile"); // read text file from a HDFS running at nnHost:nnPort DataSet<String> hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile"); // read a CSV file with three fields DataSet<Tuple3<Integer, String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file") .types(Integer.class, String.class, Double.class); // read a CSV file with five fields, taking only two of them DataSet<Tuple2<String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file") .includeFields("10010") // take the first and the fourth field .types(String.class, Double.class); // read a CSV file with three fields into a POJO (Person.class) with corresponding fields DataSet<Person>> csvInput = env.readCsvFile("hdfs:///the/CSV/file") .pojoType(Person.class, "name", "age", "zipcode"); // read a file from the specified path of type SequenceFileInputFormat DataSet<Tuple2<IntWritable, Text>> tuples = env.readSequenceFile(IntWritable.class, Text.class, "hdfs://nnHost:nnPort/path/to/file"); // creates a set from some given elements DataSet<String> value = env.fromElements("Foo", "bar", "foobar", "fubar"); // generate a number sequence DataSet<Long> numbers = env.generateSequence(1, 10000000); // Read data from a relational database using the JDBC input format DataSet<Tuple2<String, Integer> dbData = env.createInput( JDBCInputFormat.buildJDBCInputFormat() .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") .setDBUrl("jdbc:derby:memory:persons") .setQuery("select name, age from persons") .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO)) .finish() );
4. Data Sink (数据接收器)
数据接收器使用DataSet并用于存储或返回它们。使用OutputFormat描述数据接收器操作。 Flink带有各种内置输出格式,这些格式封装在DataSet的方法上:
writeAsText()/ TextOutputFormat – 将元素按行顺序写入字符串。通过调用每个元素的toString()方法获得字符串。
writeAsFormattedText()/ TextOutputFormat – 将字符串行写为字符串。通过为每个元素调用用户定义的format()方法来获取字符串。
writeAsCsv(…)/ CsvOutputFormat – 将元组写为逗号分隔值文件。行和字段分隔符是可配置的。每个字段的值来自对象的toString()方法。
print()/ printToErr()/ print(String msg)/ printToErr(String msg) – 打印标准输出/标准错误流上每个元素的toString()值。可选地,可以提供前缀(msg),其前缀为输出。这有助于区分不同的打印调用。如果并行度大于1,则输出也将与生成输出的任务的标识符一起添加。
write()/ FileOutputFormat – 自定义文件输出的方法和基类。支持自定义对象到字节的转换。
output()/ OutputFormat – 最通用的输出方法,用于非基于文件的数据接收器(例如将结果存储在数据库中)。
可以将DataSet输入到多个操作。程序可以编写或打印数据集,同时对它们执行其他转换。
代码示例如下:
// text data DataSet<String> textData = // [...] // write DataSet to a file on the local file system textData.writeAsText("file:///my/result/on/localFS"); // write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort textData.writeAsText("hdfs://nnHost:nnPort/my/result/on/localFS"); // write DataSet to a file and overwrite the file if it exists textData.writeAsText("file:///my/result/on/localFS", WriteMode.OVERWRITE); // tuples as lines with pipe as the separator "a|b|c" DataSet<Tuple3<String, Integer, Double>> values = // [...] values.writeAsCsv("file:///path/to/the/result/file", "\n", "|"); // this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines values.writeAsText("file:///path/to/the/result/file"); // this writes values as strings using a user-defined TextFormatter object values.writeAsFormattedText("file:///path/to/the/result/file", new TextFormatter<Tuple2<Integer, Integer>>() { public String format (Tuple2<Integer, Integer> value) { return value.f1 + " - " + value.f0; } });
5. Broadcast Variables(广播变量)
除了常规的操作输入之外,广播变量还允许为操作的所有并行实例提供数据集。 这对于辅助数据集或与数据相关的参数化非常有用, 然后,运算符操作可以将数据集作为集合访问。
Broadcast:广播集通过withBroadcastSet(DataSet,String)按名称注册
Access:可通过目标运算符的getRuntimeContext().getBroadcastVariable(String)访问。
// 1. The DataSet to be broadcast DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3); DataSet<String> data = env.fromElements("a", "b"); data.map(new RichMapFunction<String, String>() { @Override public void open(Configuration parameters) throws Exception { // 3. Access the broadcast DataSet as a Collection Collection<Integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName"); } @Override public String map(String value) throws Exception { ... } }).withBroadcastSet(toBroadcast, "broadcastSetName"); // 2. Broadcast the DataSet
6. Distributed Cache(分布式缓存)
Flink提供了一个分布式缓存,类似于Apache Hadoop,可以让并行的实例在本地访问某些文件和用户函数。 此功能可用于共享包含静态外部数据的文件,如机器学习的回归模型。
缓存的工作原理如下: 程序将其ExecutionEnvironment中的本地或远程文件系统(如HDFS或S3)的文件或目录注册为缓存文件。 执行程序时,Flink会自动将文件或目录复制到所有工作程序的本地文件系统。 用户函数可以查找指定的文件或目录,并从worker的本地文件系统访问它:
分布式缓存使用如下,代码示例:
在ExecutionEnvironment中注册文件或目录:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // register a file from HDFS env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile") // register a local executable file (script, executable, ...) env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true) // define your program and execute ... DataSet<String> input = ... DataSet<Integer> result = input.map(new MyMapper()); ... env.execute();
访问用户函数中的缓存文件或目录(此处为MapFunction)。 该函数必须扩展RichFunction类,因为它需要访问RuntimeContext:
// extend a RichFunction to have access to the RuntimeContext public final class MyMapper extends RichMapFunction<String, Integer> { @Override public void open(Configuration config) { // access cached file via RuntimeContext and DistributedCache File myFile = getRuntimeContext().getDistributedCache().getFile("hdfsFile"); // read the file (or navigate the directory) ... } @Override public Integer map(String value) throws Exception { // use content of cached file ... } }