Flink DataSet API编程指南

Flink中的DataSet程序是实现数据集转换的常规程序(例如,Filter,映射,连接,分组)。数据集最初是从某些来源创建的(例如,通过读取文件或从本地集合创建)。结果通过接收器返回,接收器可以例如将数据写入(分布式)文件或标准输出(例如命令行终端)。Flink程序可以在各种环境中运行,独立运行或嵌入其他程序中。执行可以在本地JVM中执行,也可以在许多计算机的集群上执行。

有关Flink API 基本概念的介绍,请参阅基本概念

为了创建您自己的Flink DataSet程序,我们鼓励您从Flink程序解剖开始, 逐步添加您自己的 转换其余部分充当其他 算子操作和高级函数的参考。

示例程序

以下程序是WordCount的完整工作示例。您可以复制并粘贴代码以在本地运行它。您只需要在项目中包含正确的Flink库(请参见使用Flink链接)并指定导入。那你就准备好了!

public class WordCountExample {
    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<String> text = env.fromElements(
            "Who's there?",
            "I think I hear them. Stand, ho! Who's there?");

        DataSet<Tuple2<String, Integer>> wordCounts = text
            .flatMap(new LineSplitter())
            .groupBy(0)
            .sum(1);

        wordCounts.print();
    }

    public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
            for (String word : line.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }
}
import org.apache.flink.api.scala._

object WordCount {
  def main(args: Array[String]) {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val text = env.fromElements(
      "Who's there?",
      "I think I hear them. Stand, ho! Who's there?")

    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
      .map { (_, 1) }
      .groupBy(0)
      .sum(1)

    counts.print()
  }
}

回到顶部

数据集转换

数据转换将一个或多个DataSet转换为新的DataSet。程序可以将多个转换组合到复杂的程序集中。

本节简要概述了可用的转换。转换文档与示例全部转换的完整描述。


转换 描述
Map

采用一个数据元并生成一个数据元。

data.map(new MapFunction<String, Integer>() {
  public Integer map(String value) { return Integer.parseInt(value); }
});
FlatMap

采用一个数据元并生成零个,一个或多个数据元。

data.flatMap(new FlatMapFunction<String, String>() {
  public void flatMap(String value, Collector<String> out) {
    for (String s : value.split(" ")) {
      out.collect(s);
    }
  }
});
MapPartition

在单个函数调用中转换并行分区。该函数将分区作为Iterable流来获取,并且可以生成任意数量的结果值。每个分区中的数据元数量取决于并行度和先前的 算子操作。

data.mapPartition(new MapPartitionFunction<String, Long>() {
  public void mapPartition(Iterable<String> values, Collector<Long> out) {
    long c = 0;
    for (String s : values) {
      c++;
    }
    out.collect(c);
  }
});
Filter

计算每个数据元的布尔函数,并保存函数返回true的数据元。
重要信息:系统假定该函数不会修改应用谓词的数据元。违反此假设可能会导致错误的结果。

data.filter(new FilterFunction<Integer>() {
  public boolean filter(Integer value) { return value > 1000; }
});
Reduce

通过将两个数据元重复组合成一个数据元,将一组数据元组合成一个数据元。Reduce可以应用于完整数据集或分组数据集。

data.reduce(new ReduceFunction<Integer> {
  public Integer reduce(Integer a, Integer b) { return a + b; }
});

如果将reduce应用于分组数据集,则可以通过提供CombineHintto 来指定运行时执行reduce的组合阶段的方式 setCombineHint在大多数情况下,基于散列的策略应该更快,特别是如果不同键的数量与输入数据元的数量相比较小(例如1/10)。

ReduceGroup

将一组数据元组合成一个或多个数据元。ReduceGroup可以应用于完整数据集或分组数据集。

data.reduceGroup(new GroupReduceFunction<Integer, Integer> {
  public void reduce(Iterable<Integer> values, Collector<Integer> out) {
    int prefixSum = 0;
    for (Integer i : values) {
      prefixSum += i;
      out.collect(prefixSum);
    }
  }
});
Aggregate

将一组值聚合为单个值。聚合函数可以被认为是内置的reduce函数。聚合可以应用于完整数据集或分组数据集。

Dataset<Tuple3<Integer, String, Double>> input = // [...]
DataSet<Tuple3<Integer, String, Double>> output = input.aggregate(SUM, 0).and(MIN, 2);

您还可以使用简写语法进行最小,最大和总和聚合。

	Dataset<Tuple3<Integer, String, Double>> input = // [...]
DataSet<Tuple3<Integer, String, Double>> output = input.sum(0).andMin(2);
	
Distinct

返回数据集的不同数据元。它相对于数据元的所有字段或字段子集从输入DataSet中删除重复条目。

data.distinct();

使用reduce函数实现Distinct。您可以通过提供CombineHintto 来指定运行时执行reduce的组合阶段的方式 setCombineHint在大多数情况下,基于散列的策略应该更快,特别是如果不同键的数量与输入数据元的数量相比较小(例如1/10)。

Join 通过创建在其键上相等的所有数据元对来连接两个数据集。可选地使用JoinFunction将数据元对转换为单个数据元,或使用FlatJoinFunction将数据元对转换为任意多个(包括无)数据元。请参阅键部分以了解如何定义连接键。
result = input1.join(input2)
               .where(0)       // key of the first input (tuple field 0)
               .equalTo(1);    // key of the second input (tuple field 1)
您可以通过Join Hints指定运行时执行连接的方式提示描述了通过分区或广播进行连接,以及它是使用基于排序还是基于散列的算法。有关可能的提示和示例的列表,请参阅“ 转换指南”
如果未指定提示,系统将尝试估算输入大小,并根据这些估计选择最佳策略。
// This executes a join by broadcasting the first data set
// using a hash table for the broadcast data
result = input1.join(input2, JoinHint.BROADCAST_HASH_FIRST)
               .where(0).equalTo(1);
请注意,连接转换仅适用于等连接。其他连接类型需要使用OuterJoin或CoGroup表示。
OuterJoin 在两个数据集上执行左,右或全外连接。外连接类似于常规(内部)连接,并创建在其键上相等的所有数据元对。此外,如果在另一侧没有找到匹配的Keys,则保存“外部”侧(左侧,右侧或两者都满)的记录。匹配数据元对(或一个数据元和null另一个输入值)被赋予JoinFunction以将数据元对转换为单个数据元,或者转换为FlatJoinFunction以将数据元对转换为任意多个(包括无)数据元。请参阅键部分以了解如何定义连接键。
input1.leftOuterJoin(input2) // rightOuterJoin or fullOuterJoin for right or full outer joins
      .where(0)              // key of the first input (tuple field 0)
      .equalTo(1)            // key of the second input (tuple field 1)
      .with(new JoinFunction<String, String, String>() {
          public String join(String v1, String v2) {
             // NOTE:
             // - v2 might be null for leftOuterJoin
             // - v1 might be null for rightOuterJoin
             // - v1 OR v2 might be null for fullOuterJoin
          }
      });
CoGroup

reduce 算子操作的二维变体。将一个或多个字段上的每个输入分组,然后关联组。每对组调用转换函数。请参阅keys部分以了解如何定义coGroup键。

data1.coGroup(data2)
     .where(0)
     .equalTo(1)
     .with(new CoGroupFunction<String, String, String>() {
         public void coGroup(Iterable<String> in1, Iterable<String> in2, Collector<String> out) {
           out.collect(...);
         }
      });
Cross

构建两个输入的笛卡尔积(交叉乘积),创建所有数据元对。可选择使用CrossFunction将数据元对转换为单个数据元

DataSet<Integer> data1 = // [...]
DataSet<String> data2 = // [...]
DataSet<Tuple2<Integer, String>> result = data1.cross(data2);

注:交叉是一个潜在的非常计算密集型 算子操作它甚至可以挑战大的计算集群!建议使用crossWithTiny()crossWithHuge()来提示系统的DataSet大小

Union

生成两个数据集的并集。

DataSet<String> data1 = // [...]
DataSet<String> data2 = // [...]
DataSet<String> result = data1.union(data2);
Rebalance

均匀地Rebalance 数据集的并行分区以消除数据偏差。只有类似Map的转换可能会遵循Rebalance 转换。

DataSet<String> in = // [...]
DataSet<String> result = in.rebalance()
                           .map(new Mapper());
Hash-Partition

散列分区给定键上的数据集。键可以指定为位置键,表达键和键选择器函数。

DataSet<Tuple2<String,Integer>> in = // [...]
DataSet<Integer> result = in.partitionByHash(0)
                            .mapPartition(new PartitionMapper());
Range-Partition

Range-Partition给定键上的数据集。键可以指定为位置键,表达键和键选择器函数。

DataSet<Tuple2<String,Integer>> in = // [...]
DataSet<Integer> result = in.partitionByRange(0)
                            .mapPartition(new PartitionMapper());
Custom Partitioning

手动指定数据分区。
注意:此方法仅适用于单个字段键。

DataSet<Tuple2<String,Integer>> in = // [...]
DataSet<Integer> result = in.partitionCustom(Partitioner<K> partitioner, key)
Sort Partition

本地按指定顺序对指定字段上的数据集的所有分区进行排序。可以将字段指定为元组位置或字段表达式。通过链接sortPartition()调用来完成对多个字段的排序。

DataSet<Tuple2<String,Integer>> in = // [...]
DataSet<Integer> result = in.sortPartition(1, Order.ASCENDING)
                            .mapPartition(new PartitionMapper());
First-n

返回数据集的前n个(任意)数据元。First-n可以应用于常规数据集,分组数据集或分组排序数据集。分组键可以指定为键选择器函数或字段位置键。

DataSet<Tuple2<String,Integer>> in = // [...]
// regular data set
DataSet<Tuple2<String,Integer>> result1 = in.first(3);
// grouped data set
DataSet<Tuple2<String,Integer>> result2 = in.groupBy(0)
                                            .first(3);
// grouped-sorted data set
DataSet<Tuple2<String,Integer>> result3 = in.groupBy(0)
                                            .sortGroup(1, Order.ASCENDING)
                                            .first(3);

以下转换可用于元组的数据集:

转换 描述
project

从元组中选择字段的子集

DataSet<Tuple3<Integer, Double, String>> in = // [...]
DataSet<Tuple2<String, Integer>> out = in.project(2,0);
MinBy / MaxBy

从一组元组中选择一个元组,其元组的一个或多个字段的值最小(最大)。用于比较的字段必须是有效的关键字段,即可比较。如果多个元组具有最小(最大)字段值,则返回这些元组的任意元组。MinBy(MaxBy)可以应用于完整数据集或分组数据集。

DataSet<Tuple3<Integer, Double, String>> in = // [...]
// a DataSet with a single tuple with minimum values for the Integer and String fields.
DataSet<Tuple3<Integer, Double, String>> out = in.minBy(0, 2);
// a DataSet with one tuple for each group with the minimum value for the Double field.
DataSet<Tuple3<Integer, Double, String>> out2 = in.groupBy(2)
                                                  .minBy(1);


转换 描述
Map

采用一个元素并生成一个元素。

data.map { x => x.toInt }
FlatMap

采用一个元素并生成零个,一个或多个元素。

data.flatMap { str => str.split(" ") }
MapPartition

在单个函数调用中转换并行分区。该函数将分区作为“迭代器”,并可以生成任意数量的结果值。每个分区中的元素数量取决于并行度和先前的 算子操作。

data.mapPartition { in => in map { (_, 1) } }
Filter

计算每个元素的布尔函数,并保存函数返回true的元素。
重要信息:系统假定该函数不会修改应用谓词的元素。违反此假设可能会导致错误的结果。

data.filter { _ > 1000 }
Reduce

通过将两个元素重复组合成一个元素,将一组元素组合成一个元素。Reduce可以应用于完整数据集或分组数据集。

data.reduce { _ + _ }
ReduceGroup

将一组元素组合成一个或多个元素。ReduceGroup可以应用于完整数据集或分组数据集。

data.reduceGroup { elements => elements.sum }
Aggregate

将一组值聚合为单个值。聚合函数可以被认为是内置的reduce函数。聚合可以应用于完整数据集或分组数据集。

val input: DataSet[(Int, String, Double)] = // [...]
val output: DataSet[(Int, String, Double)] = input.aggregate(SUM, 0).aggregate(MIN, 2)

您还可以使用简写语法进行最小,最大和总和聚合。

val input: DataSet[(Int, String, Double)] = // [...]
val output: DataSet[(Int, String, Double)] = input.sum(0).min(2)
Distinct

返回数据集的不同元素。它相对于元素的所有字段或字段子集从输入DataSet中删除重复条目。

         data.distinct()
      
Join 通过创建在其键上相等的所有元素对来连接两个数据集。可选地使用JoinFunction将元素对转换为单个元素,或使用FlatJoinFunction将元素对转换为任意多个(包括无)元素。请参阅键部分以了解如何定义连接键。
// In this case tuple fields are used as keys. "0" is the join field on the first tuple
// "1" is the join field on the second tuple.
val result = input1.join(input2).where(0).equalTo(1)
您可以通过Join Hints指定运行时执行连接的方式提示描述了通过分区或广播进行连接,以及它是使用基于排序还是基于散列的算法。有关可能的提示和示例的列表,请参阅“ 转换指南”
如果未指定提示,系统将尝试估算输入大小,并根据这些估计选择最佳策略。
// This executes a join by broadcasting the first data set
// using a hash table for the broadcast data
val result = input1.join(input2, JoinHint.BROADCAST_HASH_FIRST)
                   .where(0).equalTo(1)
请注意,连接转换仅适用于等连接。其他连接类型需要使用OuterJoin或CoGroup表示。
OuterJoin 在两个数据集上执行左,右或全外连接。外连接类似于常规(内部)连接,并创建在其键上相等的所有元素对。此外,如果在另一侧没有找到匹配的密钥,则保存“外部”侧(左侧,右侧或两者都满)的记录。匹配元素对(或一个元素和另一个输入的`null`值)被赋予JoinFunction以将元素对转换为单个元素,或者给予FlatJoinFunction以将元素对转换为任意多个(包括无)元素。请参阅键部分以了解如何定义连接键。
val joined = left.leftOuterJoin(right).where(0).equalTo(1) {
   (left, right) =>
     val a = if (left == null) "none" else left._1
     (a, right)
  }
CoGroup

reduce 算子操作的二维变体。将一个或多个字段上的每个输入分组,然后关联组。每对组调用转换函数。请参阅keys部分以了解如何定义coGroup键。

data1.coGroup(data2).where(0).equalTo(1)
Cross

构建两个输入的笛卡尔积(交叉乘积),创建所有元素对。可选择使用CrossFunction将元素对转换为单个元素

val data1: DataSet[Int] = // [...]
val data2: DataSet[String] = // [...]
val result: DataSet[(Int, String)] = data1.cross(data2)

注:交叉是一个潜在的非常计算密集型 算子操作它甚至可以挑战大的计算集群!建议使用crossWithTiny()crossWithHuge()来提示系统的DataSet大小

Union

生成两个数据集的并集。

data.union(data2)
Rebalance

均匀地Rebalance 数据集的并行分区以消除数据偏差。只有类似Map的转换可能会遵循Rebalance 转换。

val data1: DataSet[Int] = // [...]
val result: DataSet[(Int, String)] = data1.rebalance().map(...)
Hash-Partition

散列分区给定键上的数据集。键可以指定为位置键,表达键和键选择器函数。

val in: DataSet[(Int, String)] = // [...]
val result = in.partitionByHash(0).mapPartition { ... }
Range-Partition

Range-Partition给定键上的数据集。键可以指定为位置键,表达键和键选择器函数。

val in: DataSet[(Int, String)] = // [...]
val result = in.partitionByRange(0).mapPartition { ... }
Custom Partitioning

手动指定数据分区。
注意:此方法仅适用于单个字段键。

val in: DataSet[(Int, String)] = // [...]
val result = in
  .partitionCustom(partitioner: Partitioner[K], key)
Sort Partition

本地按指定顺序对指定字段上的数据集的所有分区进行排序。可以将字段指定为元组位置或字段表达式。通过链接sortPartition()调用来完成对多个字段的排序。

val in: DataSet[(Int, String)] = // [...]
val result = in.sortPartition(1, Order.ASCENDING).mapPartition { ... }
First-n

返回数据集的前n个(任意)元素。First-n可以应用于常规数据集,分组数据集或分组排序数据集。可以将分组键指定为键选择器函数,元组位置或案例类字段。

val in: DataSet[(Int, String)] = // [...]
// regular data set
val result1 = in.first(3)
// grouped data set
val result2 = in.groupBy(0).first(3)
// grouped-sorted data set
val result3 = in.groupBy(0).sortGroup(1, Order.ASCENDING).first(3)

以下转换可用于元组的数据集:

转换 描述
MinBy / MaxBy

从一组元组中选择一个元组,其元组的一个或多个字段的值最小(最大)。用于比较的字段必须是有效的关键字段,即可比较。如果多个元组具有最小(最大)字段值,则返回这些元组的任意元组。MinBy(MaxBy)可以应用于完整数据集或分组数据集。

val in: DataSet[(Int, Double, String)] = // [...]
// a data set with a single tuple with minimum values for the Int and String fields.
val out: DataSet[(Int, Double, String)] = in.minBy(0, 2)
// a data set with one tuple for each group with the minimum value for the Double field.
val out2: DataSet[(Int, Double, String)] = in.groupBy(2)
                                             .minBy(1)

通过匿名模式匹配从元组,案例类和集合中提取,如下所示:

val data: DataSet[(Int, String, Double)] = // [...]
data.map {
  case (id, name, temperature) => // [...]
}

API开箱即用不支持。要使用此函数,您应该使用Scala API扩展

并行转换的可以定义为setParallelism(int)同时 name(String)指定一个自定义名称的转变这对于调试很有帮助。这同样是可能的数据源数据接收器

withParameters(Configuration)传递配置对象,可以从open()用户函数内的方法访问

回到顶部

数据源

数据源创建初始数据集,例如来自文件或Java集合。创建数据集的一般机制是在InputFormat后面抽象的 Flink附带了几种内置格式,可以从通用文件格式创建数据集。他们中的许多人在ExecutionEnvironment上都有快捷方法

基于文件的:

  • readTextFile(path)/ TextInputFormat- 按行读取文件并将其作为字符串返回。

  • readTextFileWithValue(path)/ TextValueInputFormat- 按行读取文件并将它们作为StringValues返回。StringValues是可变字符串。

  • readCsvFile(path)/ CsvInputFormat- 解析逗号(或其他字符)分隔字段的文件。返回元组或POJO的DataSet。支持基本java类型及其Value对应作为字段类型。

  • readFileOfPrimitives(path, Class)/ PrimitiveInputFormat- 解析新行(或其他字符序列)分隔的原始数据类型(如String或)的文件Integer

  • readFileOfPrimitives(path, delimiter, Class)/ PrimitiveInputFormat- 解析新行(或其他字符序列)分隔的原始数据类型的文件,例如StringInteger使用给定的分隔符。

  • readSequenceFile(Key, Value, path)/ SequenceFileInputFormat- 创建一个JobConf并从类型为SequenceFileInputFormat,Key class和Value类的指定路径中读取文件,并将它们作为Tuple2 <Key,Value>返回。

基于集合:

  • fromCollection(Collection) - 从Java Java.util.Collection创建数据集。集合中的所有数据元必须属于同一类型。

  • fromCollection(Iterator, Class) - 从迭代器创建数据集。该类指定迭代器返回的数据元的数据类型。

  • fromElements(T ...) - 根据给定的对象序列创建数据集。所有对象必须属于同一类型。

  • fromParallelCollection(SplittableIterator, Class) - 并行地从迭代器创建数据集。该类指定迭代器返回的数据元的数据类型。

  • generateSequence(from, to) - 并行生成给定间隔中的数字序列。

通用:

  • readFile(inputFormat, path)/ FileInputFormat- 接受文件输入格式。

  • createInput(inputFormat)/ InputFormat- 接受通用输入格式。

例子

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// read text file from local files system
DataSet<String> localLines = env.readTextFile("file:///path/to/my/textfile");

// read text file from a HDFS running at nnHost:nnPort
DataSet<String> hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile");

// read a CSV file with three fields
DataSet<Tuple3<Integer, String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
	                       .types(Integer.class, String.class, Double.class);

// read a CSV file with five fields, taking only two of them
DataSet<Tuple2<String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
                               .includeFields("10010")  // take the first and the fourth field
	                       .types(String.class, Double.class);

// read a CSV file with three fields into a POJO (Person.class) with corresponding fields
DataSet<Person>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
                         .pojoType(Person.class, "name", "age", "zipcode");

// read a file from the specified path of type SequenceFileInputFormat
DataSet<Tuple2<IntWritable, Text>> tuples =
 env.readSequenceFile(IntWritable.class, Text.class, "hdfs://nnHost:nnPort/path/to/file");

// creates a set from some given elements
DataSet<String> value = env.fromElements("Foo", "bar", "foobar", "fubar");

// generate a number sequence
DataSet<Long> numbers = env.generateSequence(1, 10000000);

// Read data from a relational database using the JDBC input format
DataSet<Tuple2<String, Integer> dbData =
    env.createInput(
      JDBCInputFormat.buildJDBCInputFormat()
                     .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
                     .setDBUrl("jdbc:derby:memory:persons")
                     .setQuery("select name, age from persons")
                     .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
                     .finish()
    );

// Note: Flink's program compiler needs to infer the data types of the data items which are returned
// by an InputFormat. If this information cannot be automatically inferred, it is necessary to
// manually provide the type information as shown in the examples above.

配置CSV分析

Flink为CSV解析提供了许多配置选项:

  • types(Class ... types)指定要解析的字段的类型。必须配置已解析字段的类型。 在类型为Boolean.class的情况下,“True”(不区分大小写),“False”(不区分大小写),“1”和“0”被视为布尔值。

  • lineDelimiter(String del)指定单个记录的分隔符。默认行分隔符是换行符'\n'

  • fieldDelimiter(String del)指定用于分隔记录字段的分隔符。默认字段分隔符是逗号字符','

  • includeFields(boolean ... flag)includeFields(String mask)includeFields(long bitMask)定义从输入文件中读取哪些字段(以及要忽略的字段)。默认情况下,将解析n个字段(由types()调用中的类型数定义)。

  • parseQuotedStrings(char quoteChar)启用带引号的字符串解析。如果字符串字段的第一个字符是引号字符(前导或拖尾空格未被修剪),则字符串将被解析为带引号的字符串引用字符串中的字段分隔符将被忽略。如果带引号的字符串字段的最后一个字符不是引号字符,或者引号字符出现在某个不是引用字符串字段的开头或结尾的点上(除非引号字符使用''转义,否则引用字符串解析失败)。如果启用了带引号的字符串解析并且该字段的第一个字符不是引用字符串,则该字符串将被解析为不带引号的字符串。默认情况下,禁用带引号的字符串解析。

  • ignoreComments(String commentPrefix)指定注释前缀。所有以指定注释前缀开头的行都不会被解析和忽略。默认情况下,不会忽略任何行。

  • ignoreInvalidLines()启用宽松解析,即忽略无法正确解析的行。默认情况下,禁用宽松解析,无效行引发异常。

  • ignoreFirstLine()配置InputFormat以忽略输入文件的第一行。默认情况下,不会忽略任何行。

递归遍历输入路径目录

对于基于文件的输入,当输入路径是目录时,默认情况下不会枚举嵌套文件。相反,只读取基目录中的文件,而忽略嵌套文件。可以通过recursive.file.enumeration配置参数启用嵌套文件的递归枚举,如下例所示。

// enable recursive enumeration of nested input files
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// create a configuration object
Configuration parameters = new Configuration();

// set the recursive enumeration parameter
parameters.setBoolean("recursive.file.enumeration", true);

// pass the configuration to the data source
DataSet<String> logs = env.readTextFile("file:///path/with.nested/files")
			  .withParameters(parameters);

数据源创建初始数据集,例如来自文件或Java集合。创建数据集的一般机制是在InputFormat后面抽象的 Flink附带了几种内置格式,可以从通用文件格式创建数据集。他们中的许多人在ExecutionEnvironment上都有快捷方法

基于文件的:

  • readTextFile(path)/ TextInputFormat- 按行读取文件并将其作为字符串返回。

  • readTextFileWithValue(path)/ TextValueInputFormat- 按行读取文件并将它们作为StringValues返回。StringValues是可变字符串。

  • readCsvFile(path)/ CsvInputFormat- 解析逗号(或其他字符)分隔字段的文件。返回元组,案例类对象或POJO的DataSet。支持基本java类型及其Value对应作为字段类型。

  • readFileOfPrimitives(path, delimiter)/ PrimitiveInputFormat- 解析新行(或其他字符序列)分隔的原始数据类型的文件,例如StringInteger使用给定的分隔符。

  • readSequenceFile(Key, Value, path)/ SequenceFileInputFormat- 创建一个JobConf并从类型为SequenceFileInputFormat,Key class和Value类的指定路径中读取文件,并将它们作为Tuple2 <Key,Value>返回。

基于集合:

  • fromCollection(Seq) - 从Seq创建数据集。集合中的所有元素必须属于同一类型。

  • fromCollection(Iterator) - 从迭代器创建数据集。该类指定迭代器返回的元素的数据类型。

  • fromElements(elements: _*) - 根据给定的对象序列创建数据集。所有对象必须属于同一类型。

  • fromParallelCollection(SplittableIterator) - 并行地从迭代器创建数据集。该类指定迭代器返回的元素的数据类型。

  • generateSequence(from, to) - 并行生成给定间隔中的数字序列。

通用:

  • readFile(inputFormat, path)/ FileInputFormat- 接受文件输入格式。

  • createInput(inputFormat)/ InputFormat- 接受通用输入格式。

例子

val env  = ExecutionEnvironment.getExecutionEnvironment

// read text file from local files system
val localLines = env.readTextFile("file:///path/to/my/textfile")

// read text file from a HDFS running at nnHost:nnPort
val hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile")

// read a CSV file with three fields
val csvInput = env.readCsvFile[(Int, String, Double)]("hdfs:///the/CSV/file")

// read a CSV file with five fields, taking only two of them
val csvInput = env.readCsvFile[(String, Double)](
  "hdfs:///the/CSV/file",
  includedFields = Array(0, 3)) // take the first and the fourth field

// CSV input can also be used with Case Classes
case class MyCaseClass(str: String, dbl: Double)
val csvInput = env.readCsvFile[MyCaseClass](
  "hdfs:///the/CSV/file",
  includedFields = Array(0, 3)) // take the first and the fourth field

// read a CSV file with three fields into a POJO (Person) with corresponding fields
val csvInput = env.readCsvFile[Person](
  "hdfs:///the/CSV/file",
  pojoFields = Array("name", "age", "zipcode"))

// create a set from some given elements
val values = env.fromElements("Foo", "bar", "foobar", "fubar")

// generate a number sequence
val numbers = env.generateSequence(1, 10000000)

// read a file from the specified path of type SequenceFileInputFormat
val tuples = env.readSequenceFile(classOf[IntWritable], classOf[Text],
 "hdfs://nnHost:nnPort/path/to/file")

配置CSV分析

Flink为CSV解析提供了许多配置选项:

  • lineDelimiter: String指定单个记录的分隔符。默认行分隔符是换行符'\n'

  • fieldDelimiter: String指定用于分隔记录字段的分隔符。默认字段分隔符是逗号字符','

  • includeFields: Array[Int]定义要从输入文件中读取的字段(以及要忽略的字段)。默认情况下,将解析n个字段(由types()调用中的类型数定义)。

  • pojoFields: Array[String]指定映射到CSV字段的POJO的字段。CSV字段的解析器将根据POJO字段的类型和顺序自动初始化。

  • parseQuotedStrings: Character启用带引号的字符串解析。如果字符串字段的第一个字符是引号字符(前导或拖尾空格未被修剪),则字符串将被解析为带引号的字符串引用字符串中的字段分隔符将被忽略。如果带引号的字符串字段的最后一个字符不是引号字符,则引用字符串解析将失败。如果启用了带引号的字符串解析并且该字段的第一个字符不是引用字符串,则该字符串将被解析为不带引号的字符串。默认情况下,禁用带引号的字符串解析。

  • ignoreComments: String指定注释前缀。所有以指定注释前缀开头的行都不会被解析和忽略。默认情况下,不会忽略任何行。

  • lenient: Boolean启用宽松解析,即忽略无法正确解析的行。默认情况下,禁用宽松解析,无效行引发异常。

  • ignoreFirstLine: Boolean配置InputFormat以忽略输入文件的第一行。默认情况下,不会忽略任何行。

递归遍历输入路径目录

对于基于文件的输入,当输入路径是目录时,默认情况下不会枚举嵌套文件。相反,只读取基目录中的文件,而忽略嵌套文件。可以通过recursive.file.enumeration配置参数启用嵌套文件的递归枚举,如下例所示。

// enable recursive enumeration of nested input files
val env  = ExecutionEnvironment.getExecutionEnvironment

// create a configuration object
val parameters = new Configuration

// set the recursive enumeration parameter
parameters.setBoolean("recursive.file.enumeration", true)

// pass the configuration to the data source
env.readTextFile("file:///path/with.nested/files").withParameters(parameters)

读压缩文件

Flink目前支持输入文件的透明解压缩,如果它们标有适当的文件扩展名。特别是,这意味着不需要进一步配置输入格式,并且任何FileInputFormat支持压缩,包括自定义输入格式。请注意,压缩文件可能无法并行读取,从而影响作业可伸缩性。

下表列出了当前支持的压缩方法。


压缩方法 文件扩展名 可并行
DEFLATE .deflate no / not
GZip .gz.gzip no / not
Bzip2 .bz2 no / not
XZ .xz no / not

回到顶部

数据接收

数据接收器使用DataSet并用于存储或返回它们。使用OutputFormat描述数据接收器 算子操作 Flink带有各种内置输出格式,这些格式封装在DataSet上的 算子操作后面:

  • writeAsText()/ TextOutputFormat- 按字符串顺序写入数据元。通过调用每个数据元toString()方法获得字符串
  • writeAsFormattedText()/ TextOutputFormat- 按字符串顺序写数据元。通过为每个数据元调用用户定义的format()方法来获取字符串
  • writeAsCsv(...)/ CsvOutputFormat- 将元组写为逗号分隔值文件。行和字段分隔符是可配置的。每个字段的值来自对象toString()方法。
  • print()/ printToErr()/ print(String msg)/ printToErr(String msg)- 在标准输出/标准错误流上打印每个数据元toString()值。可选地,可以提供前缀(msg),其前缀为输出。这有助于区分不同的打印调用如果并行度大于1,则输出也将与生成输出的任务的标识符一起添加。
  • write()/ FileOutputFormat- 自定义文件输出的方法和基类。支持自定义对象到字节的转换。
  • output()/ OutputFormat- 大多数通用输出方法,用于非基于文件的数据接收器(例如将结果存储在数据库中)。

可以将DataSet输入到多个 算子操作。程序可以编写或打印数据集,同时对它们执行其他转换。

例子

标准数据接收方法:

// text data
DataSet<String> textData = // [...]

// write DataSet to a file on the local file system
textData.writeAsText("file:///my/result/on/localFS");

// write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort
textData.writeAsText("hdfs://nnHost:nnPort/my/result/on/localFS");

// write DataSet to a file and overwrite the file if it exists
textData.writeAsText("file:///my/result/on/localFS", WriteMode.OVERWRITE);

// tuples as lines with pipe as the separator "a|b|c"
DataSet<Tuple3<String, Integer, Double>> values = // [...]
values.writeAsCsv("file:///path/to/the/result/file", "\n", "|");

// this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
values.writeAsText("file:///path/to/the/result/file");

// this writes values as strings using a user-defined TextFormatter object
values.writeAsFormattedText("file:///path/to/the/result/file",
    new TextFormatter<Tuple2<Integer, Integer>>() {
        public String format (Tuple2<Integer, Integer> value) {
            return value.f1 + " - " + value.f0;
        }
    });

使用自定义输出格式:

DataSet<Tuple3<String, Integer, Double>> myResult = [...]

// write Tuple DataSet to a relational database
myResult.output(
    // build and configure OutputFormat
    JDBCOutputFormat.buildJDBCOutputFormat()
                    .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
                    .setDBUrl("jdbc:derby:memory:persons")
                    .setQuery("insert into persons (name, age, height) values (?,?,?)")
                    .finish()
    );

本地排序输出

可以使用元组字段位置字段表达式以指定顺序在指定字段上对数据接收器的输出进行本地排序这适用于每种输出格式。

以下示例显示如何使用此函数:

DataSet<Tuple3<Integer, String, Double>> tData = // [...]
DataSet<Tuple2<BookPojo, Double>> pData = // [...]
DataSet<String> sData = // [...]

// sort output on String field in ascending order
tData.sortPartition(1, Order.ASCENDING).print();

// sort output on Double field in descending and Integer field in ascending order
tData.sortPartition(2, Order.DESCENDING).sortPartition(0, Order.ASCENDING).print();

// sort output on the "author" field of nested BookPojo in descending order
pData.sortPartition("f0.author", Order.DESCENDING).writeAsText(...);

// sort output on the full tuple in ascending order
tData.sortPartition("*", Order.ASCENDING).writeAsCsv(...);

// sort atomic type (String) output in descending order
sData.sortPartition("*", Order.DESCENDING).writeAsText(...);

尚不支持全局排序的输出。

数据接收器使用DataSet并用于存储或返回它们。使用OutputFormat描述数据接收器 算子操作 Flink带有各种内置输出格式,这些格式封装在DataSet上的 算子操作后面:

  • writeAsText()/ TextOutputFormat- 按字符串顺序写入元素。通过调用每个元素toString()方法获得字符串
  • writeAsCsv(...)/ CsvOutputFormat- 将元组写为逗号分隔值文件。行和字段分隔符是可配置的。每个字段的值来自对象toString()方法。
  • print()/ printToErr()- 在标准输出/标准错误流上打印每个元素toString()值。
  • write()/ FileOutputFormat- 自定义文件输出的方法和基类。支持自定义对象到字节的转换。
  • output()/ OutputFormat- 大多数通用输出方法,用于非基于文件的数据接收器(例如将结果存储在数据库中)。

可以将DataSet输入到多个 算子操作。程序可以编写或打印数据集,同时对它们执行其他转换。

例子

标准数据接收方法:

// text data
val textData: DataSet[String] = // [...]

// write DataSet to a file on the local file system
textData.writeAsText("file:///my/result/on/localFS")

// write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort
textData.writeAsText("hdfs://nnHost:nnPort/my/result/on/localFS")

// write DataSet to a file and overwrite the file if it exists
textData.writeAsText("file:///my/result/on/localFS", WriteMode.OVERWRITE)

// tuples as lines with pipe as the separator "a|b|c"
val values: DataSet[(String, Int, Double)] = // [...]
values.writeAsCsv("file:///path/to/the/result/file", "\n", "|")

// this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
values.writeAsText("file:///path/to/the/result/file")

// this writes values as strings using a user-defined formatting
values map { tuple => tuple._1 + " - " + tuple._2 }
  .writeAsText("file:///path/to/the/result/file")

本地排序输出

可以使用元组字段位置字段表达式以指定顺序在指定字段上对数据接收器的输出进行本地排序这适用于每种输出格式。

以下示例显示如何使用此函数:

val tData: DataSet[(Int, String, Double)] = // [...]
val pData: DataSet[(BookPojo, Double)] = // [...]
val sData: DataSet[String] = // [...]

// sort output on String field in ascending order
tData.sortPartition(1, Order.ASCENDING).print()

// sort output on Double field in descending and Int field in ascending order
tData.sortPartition(2, Order.DESCENDING).sortPartition(0, Order.ASCENDING).print()

// sort output on the "author" field of nested BookPojo in descending order
pData.sortPartition("_1.author", Order.DESCENDING).writeAsText(...)

// sort output on the full tuple in ascending order
tData.sortPartition("_", Order.ASCENDING).writeAsCsv(...)

// sort atomic type (String) output in descending order
sData.sortPartition("_", Order.DESCENDING).writeAsText(...)

尚不支持全局排序的输出。

回到顶部

迭代 算子

迭代在Flink程序中实现循环。迭代 算子封装程序的一部分并重复执行,将一次迭代的结果(部分解)反馈到下一次迭代中。Flink中有两种类型的迭代:BulkIterationDeltaIteration

本节提供有关如何使用这两个 算子的快速示例。查看“ 迭代简介”页面以获取更详细的介绍。

批量迭代

要创建BulkIteration,请调用iterate(int)迭代的DataSet方法。这将返回一个IterativeDataSet,可以使用常规 算子进行转换。迭代调用的单个参数指定最大迭代次数。

要指定迭代的结束,请调用closeWith(DataSet)方法IterativeDataSet以指定应将哪个转换反馈到下一次迭代。closeWith(DataSet, DataSet)如果此DataSet为空,您可以选择指定终止条件,该条件评估第二个DataSet并终止迭代。如果未指定终止条件,则迭代将在给定的最大数量迭代后终止。

以下示例迭代地估计数量Pi。目标是计算落入单位圆的随机点数。在每次迭代中,挑选一个随机点。如果此点位于单位圆内,我们会增加计数。然后估计Pi作为结果计数除以迭代次数乘以4。

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// Create initial IterativeDataSet
IterativeDataSet<Integer> initial = env.fromElements(0).iterate(10000);

DataSet<Integer> iteration = initial.map(new MapFunction<Integer, Integer>() {
    @Override
    public Integer map(Integer i) throws Exception {
        double x = Math.random();
        double y = Math.random();

        return i + ((x * x + y * y < 1) ? 1 : 0);
    }
});

// Iteratively transform the IterativeDataSet
DataSet<Integer> count = initial.closeWith(iteration);

count.map(new MapFunction<Integer, Double>() {
    @Override
    public Double map(Integer count) throws Exception {
        return count / (double) 10000 * 4;
    }
}).print();

env.execute("Iterative Pi Example");

您还可以查看 K-Means示例,该示例使用BulkIteration来聚类一组未标记的点。

Delta迭代

Delta迭代利用了某些算法在每次迭代中不会更改解决方案的每个数据点的事实。

除了在每次迭代中反馈的部分解决方案(称为工作集)之外,delta迭代还在迭代中维护状态(称为解决方案集),可以通过增量更新。迭代计算的结果是最后一次迭代之后的状态。有关delta迭代的基本原理的概述,请参阅迭代简介

定义DeltaIteration类似于定义BulkIteration。对于delta迭代,两个数据集构成每次迭代的输入(工作集和解决方案集),并且在每次迭代中生成两个数据集作为结果(新工作集,解决方案集delta)。

创建DeltaIteration调用iterateDelta(DataSet, int, int)(或iterateDelta(DataSet, int, int[])分别)。在初始解决方案集上调用此方法。参数是初始增量集,最大迭代次数和关键位置。返回的 DeltaIteration对象使您可以通过方法iteration.getWorkset()方式访问表示工作集和解决方案集的DataSet iteration.getSolutionSet()

下面是delta迭代语法的示例

// read the initial data sets
DataSet<Tuple2<Long, Double>> initialSolutionSet = // [...]

DataSet<Tuple2<Long, Double>> initialDeltaSet = // [...]

int maxIterations = 100;
int keyPosition = 0;

DeltaIteration<Tuple2<Long, Double>, Tuple2<Long, Double>> iteration = initialSolutionSet
    .iterateDelta(initialDeltaSet, maxIterations, keyPosition);

DataSet<Tuple2<Long, Double>> candidateUpdates = iteration.getWorkset()
    .groupBy(1)
    .reduceGroup(new ComputeCandidateChanges());

DataSet<Tuple2<Long, Double>> deltas = candidateUpdates
    .join(iteration.getSolutionSet())
    .where(0)
    .equalTo(0)
    .with(new CompareChangesToCurrent());

DataSet<Tuple2<Long, Double>> nextWorkset = deltas
    .filter(new FilterByThreshold());

iteration.closeWith(deltas, nextWorkset)
	.writeAsCsv(outputPath);

批量迭代

要创建BulkIteration,请调用iterate(int)迭代的DataSet方法,并指定步进函数。step函数获取当前迭代的输入DataSet,并且必须返回一个新的DataSet。迭代调用的参数是停止之后的最大迭代次数。

还有一个iterateWithTermination(int)函数接受一个返回两个DataSet的步骤函数:迭代步骤的结果和终止条件。一旦终止标准DataSet为空,就停止迭代。

以下示例迭代地估计数量Pi。目标是计算落入单位圆的随机点数。在每次迭代中,挑选一个随机点。如果此点位于单位圆内,我们会增加计数。然后估计Pi作为结果计数除以迭代次数乘以4。

val env = ExecutionEnvironment.getExecutionEnvironment()

// Create initial DataSet
val initial = env.fromElements(0)

val count = initial.iterate(10000) { iterationInput: DataSet[Int] =>
  val result = iterationInput.map { i =>
    val x = Math.random()
    val y = Math.random()
    i + (if (x * x + y * y < 1) 1 else 0)
  }
  result
}

val result = count map { c => c / 10000.0 * 4 }

result.print()

env.execute("Iterative Pi Example")

您还可以查看 K-Means示例,该示例使用BulkIteration来聚类一组未标记的点。

Delta迭代

Delta迭代利用了某些算法在每次迭代中不会更改解决方案的每个数据点的事实。

除了在每次迭代中反馈的部分解决方案(称为工作集)之外,delta迭代还在迭代中维护状态(称为解决方案集),可以通过增量更新。迭代计算的结果是最后一次迭代之后的状态。有关delta迭代的基本原理的概述,请参阅迭代简介

定义DeltaIteration类似于定义BulkIteration。对于delta迭代,两个数据集构成每次迭代的输入(工作集和解决方案集),并且在每次迭代中生成两个数据集作为结果(新工作集,解决方案集delta)。

要创建DeltaIteration,请iterateDelta(initialWorkset, maxIterations, key)在初始解决方案集上调用step函数有两个参数:(solutionSet,workset),并且必须返回两个值:(solutionSetDelta,newWorkset)。

下面是delta迭代语法的示例

// read the initial data sets
val initialSolutionSet: DataSet[(Long, Double)] = // [...]

val initialWorkset: DataSet[(Long, Double)] = // [...]

val maxIterations = 100
val keyPosition = 0

val result = initialSolutionSet.iterateDelta(initialWorkset, maxIterations, Array(keyPosition)) {
  (solution, workset) =>
    val candidateUpdates = workset.groupBy(1).reduceGroup(new ComputeCandidateChanges())
    val deltas = candidateUpdates.join(solution).where(0).equalTo(0)(new CompareChangesToCurrent())

    val nextWorkset = deltas.filter(new FilterByThreshold())

    (deltas, nextWorkset)
}

result.writeAsCsv(outputPath)

env.execute()

回到顶部

在函数中 算子操作数据对象

Flink的运行时以Java对象的形式与用户函数交换数据。函数从运行时接收输入对象作为方法参数,并返回输出对象作为结果。由于这些对象是由用户函数和运行时代码访问的,因此理解并遵循有关用户代码如何访问(即读取和修改)这些对象的规则非常重要。

用户函数从Flink的运行时接收对象,作为常规方法参数(如a MapFunction)或通过Iterable参数(如a GroupReduceFunction)。我们将运行时传递给用户函数的对象称为输入对象用户函数可以将对象作为方法返回值(如a MapFunction)或通过a Collector(如a FlatMapFunction发送到Flink运行时我们将用户函数发出的对象称为输出对象

Flink的DataSet API具有两种模式,这些模式在Flink的运行时创建或重用输入对象方面有所不同。此行为会影响用户函数如何与输入和输出对象进行交互的保证和约束。以下部分定义了这些规则,并给出了编写安全用户函数代码的编码指南。

禁用对象重用(DEFAULT)

默认情况下,Flink在禁用对象重用模式下运行。此模式可确保函数始终在函数调用中接收新的输入对象。禁用对象重用模式可提供更好的保证,并且使用起来更安全。但是,它带来了一定的处理开销,可能会导致更高的Java垃圾回收活动。下表说明了用户函数如何在禁用对象重用模式下访问输入和输出对象。

Operation 保证和限制
读取输入对象 在方法调用中,保证输入对象的值不会改变。这包括由Iterable提供的对象。例如,收集由List或Map中的Iterable提供的输入对象是安全的。请注意,在保存方法调用后,可以修改对象。在函数调用中记住对象不安全的
修改输入对象 您可以修改输入对象。
发射输入对象 您可以发出输入对象。输入对象的值在发出后可能已更改。在输出对象后,读取它是不安全的
读取输出对象 提供给收集器或作为方法结果返回的对象可能已更改其值。读取输出对象不安全的
修改输出对象 您可以在发射对象后对其进行修改并再次发射。

禁用对象重用(默认)模式的编码指南:

  • 不记得并跨方法调用读取输入对象。
  • 发射后不要读取对象。

对象重用已启用

在对象重用启用模式下,Flink的运行时最小化对象实例化的数量。这可以提高性能并可以ReduceJava垃圾收集压力。通过调用激活对象重用启用模式ExecutionConfig.enableObjectReuse()下表说明了用户函数如何在对象重用启用模式下访问输入和输出对象。

Operation 保证和限制
读取作为常规方法参数接收的输入对象 在常规方法参数中接收的输入对象不会在函数调用中修改。在离开方法调用后,可以修改对象。在函数调用中记住对象不安全的
读取从Iterable参数接收的输入对象 从Iterable接收的输入对象仅在调用next()方法之前有效。Iterable或Iterator可以多次为同一个对象实例提供服务。记住从Iterable接收的输入对象不安全的,例如,将它们放在List或Map中。
修改输入对象 除了MapFunction,FlatMapFunction,MapPartitionFunction,GroupReduceFunction,GroupCombineFunction,CoGroupFunction和InputFormat.next(重用)的输入对象外,不能修改输入对象。
发射输入对象 除了MapFunction,FlatMapFunction,MapPartitionFunction,GroupReduceFunction,GroupCombineFunction,CoGroupFunction和InputFormat.next(重用)的输入对象外,不能发出输入对象。
读取输出对象 提供给收集器或作为方法结果返回的对象可能已更改其值。读取输出对象不安全的
修改输出对象 您可以修改输出对象并再次发出。

启用对象重用的编码指南:

  • 不记得从中收到的输入对象Iterable
  • 不记得并跨方法调用读取输入对象。
  • 不要修改或发出输入对象,除了输入对象MapFunctionFlatMapFunctionMapPartitionFunctionGroupReduceFunctionGroupCombineFunctionCoGroupFunction,和InputFormat.next(reuse)
  • 要Reduce对象实例化,您始终可以发出重复修改但从不读取的专用输出对象。

回到顶部

调试

在对分布式集群中的大型数据集运行数据分析程序之前,最好确保实现的算法按预期工作。因此,实施数据分析程序通常是检查结果,调试和改进的增量过程。

Flink提供了一些很好的函数,通过支持IDE内的本地调试,测试数据的注入和结果数据的收集,显着简化了数据分析程序的开发过程。本节提供了一些如何简化Flink程序开发的提示。

本地运行环境

A LocalEnvironment在创建它的同一JVM进程中启动Flink系统。如果从IDE启动LocalEnvironment,则可以在代码中设置断点并轻松调试程序。

创建LocalEnvironment并使用如下:

final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();

DataSet<String> lines = env.readTextFile(pathToTextFile);
// build your program

env.execute();
val env = ExecutionEnvironment.createLocalEnvironment()

val lines = env.readTextFile(pathToTextFile)
// build your program

env.execute()

收集数据源和接收器

通过创建输入文件和读取输出文件来完成分析程序的输入并检查其输出是很麻烦的。Flink具有特殊的数据源和接收器,由Java集合支持以简化测试。一旦程序经过测试,源和接收器可以很容易地被读取/写入外部数据存储(如HDFS)的源和接收器替换。

集合数据源可以使用如下:

final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();

// Create a DataSet from a list of elements
DataSet<Integer> myInts = env.fromElements(1, 2, 3, 4, 5);

// Create a DataSet from any Java collection
List<Tuple2<String, Integer>> data = ...
DataSet<Tuple2<String, Integer>> myTuples = env.fromCollection(data);

// Create a DataSet from an Iterator
Iterator<Long> longIt = ...
DataSet<Long> myLongs = env.fromCollection(longIt, Long.class);

集合数据接收器指定如下:

DataSet<Tuple2<String, Integer>> myResult = ...

List<Tuple2<String, Integer>> outData = new ArrayList<Tuple2<String, Integer>>();
myResult.output(new LocalCollectionOutputFormat(outData));

注意:目前,集合数据接收器仅限于本地执行,作为调试工具。

val env = ExecutionEnvironment.createLocalEnvironment()

// Create a DataSet from a list of elements
val myInts = env.fromElements(1, 2, 3, 4, 5)

// Create a DataSet from any Collection
val data: Seq[(String, Int)] = ...
val myTuples = env.fromCollection(data)

// Create a DataSet from an Iterator
val longIt: Iterator[Long] = ...
val myLongs = env.fromCollection(longIt)

注意:目前,集合数据源要求实现数据类型和迭代器 Serializable此外,收集数据源不能并行执行(并行度= 1)。

回到顶部

语义注释

语义注释可用于提供有关函数行为的Flink提示。它们告诉系统函数读取和评估函数输入的哪些字段以及未修改的字段从其输入转发到其输出。语义注释是加速执行的有力手段,因为它们允许系统推断在多个 算子操作中重用排序顺序或分区。使用语义注释最终可以使程序免于不必要的数据混洗或不必要的排序,并显着提高程序的性能。

注意:语义注释的使用是可选的。但是,提供语义注释时保守是绝对至关重要的!不正确的语义注释会导致Flink对您的程序做出错误的假设,并最终可能导致错误的结果。如果算子的行为不明确可预测,则不应提供注释。请仔细阅读文档。

目前支持以下语义注释。

转发字段注释

转发字段信息声明输入字段,这些输入字段未被修改,由函数转发到相同位置或输出中的另一个位置。优化程序使用此信息来推断函数是否保存了数据属性(如排序或分区)。对于输入元件,诸如一组 算子操作的函数GroupReduceGroupCombineCoGroup,和MapPartition,被定义为转发字段的所有字段必须始终共同从相同的输入元件转发。由分组函数发出的每个数据元的转发字段可以源自函数输入组的不同数据元。

使用字段表达式指定字段转发信息转发到输出中相同位置的字段可以按其位置指定。指定的位置必须对输入和输出数据类型有效,并且具有相同的类型。例如,String "f2"声明Java输入元组的第三个字段始终等于输出元组中的第三个字段。

通过将输入中的源字段和输出中的目标字段指定为字段表达式来声明未修改的字段转发到输出中的另一个位置。String "f0->f2"表示Java输入元组的第一个字段未更改,复制到Java输出元组的第三个字段。通配符表达式*可用于指代整个输入或输出类型,即"f0->*"表示函数的输出始终等于其Java输入元组的第一个字段。

可以在单个String中声明多个转发字段,方法是将它们用分号分隔为"f0; f2->f1; f3->f2"单独的字符串"f0", "f2->f1", "f3->f2"指定转发字段时,不要求声明所有转发字段,但所有声明必须正确。

可以通过在函数类定义上附加Java注释或在调用DataSet上的函数后将它们作为 算子参数传递来声明转发的字段信息,如下所示。

函数类注释
  • @ForwardedFields 用于单输入函数,例如Map和Reduce。
  • @ForwardedFieldsFirst 用于第一次输入具有两个输入的函数,例如Join和CoGroup。
  • @ForwardedFieldsSecond 用于具有两个输入的函数的第二个输入,例如Join和CoGroup。
算子参数
  • data.map(myMapFnc).withForwardedFields() 用于单输入函数,例如Map和Reduce。
  • data1.join(data2).where().equalTo().with(myJoinFnc).withForwardFieldsFirst() 对于具有两个输入(例如Join和CoGroup)的函数的第一个输入。
  • data1.join(data2).where().equalTo().with(myJoinFnc).withForwardFieldsSecond() 对于具有两个输入的函数的第二个输入,例如Join和CoGroup。

请注意,无法覆盖由 算子参数指定为类注释的字段转发信息。

以下示例显示如何使用函数类注释声明转发的字段信息:

@ForwardedFields("f0->f2")
public class MyMap implements
              MapFunction<Tuple2<Integer, Integer>, Tuple3<String, Integer, Integer>> {
  @Override
  public Tuple3<String, Integer, Integer> map(Tuple2<Integer, Integer> val) {
    return new Tuple3<String, Integer, Integer>("foo", val.f1 / 2, val.f0);
  }
}
@ForwardedFields("_1->_3")
class MyMap extends MapFunction[(Int, Int), (String, Int, Int)]{
   def map(value: (Int, Int)): (String, Int, Int) = {
    return ("foo", value._2 / 2, value._1)
  }
}

非转发字段

非转发字段信息声明所有未保存在函数输出中相同位置的字段。所有其他字段的值都被视为保存在输出中的相同位置。因此,非转发字段信息与转发字段信息相反。对于分组方式算子,如非转场信息GroupReduceGroupCombineCoGroup,和MapPartition必须满足相同的要求转发的字段信息。

重要信息:非转发字段信息的规范是可选的。但如果使用, 全部!必须指定非转发字段,因为所有其他字段都被视为在适当位置转发。将转发字段声明为非转发是安全的。

非转发字段被指定为字段表达式列表该列表可以作为单个字符串给出,字段表达式用分号分隔,也可以作为多个字符串。例如两者"f1; f3""f1", "f3"宣布一个Java元组的第二和第四场不保存到位等各个领域都在处保存。只能为具有相同输入和输出类型的函数指定非转发字段信息。

使用以下注释将未转发的字段信息指定为函数类注释:

  • @NonForwardedFields 用于单输入函数,例如Map和Reduce。
  • @NonForwardedFieldsFirst 对于具有两个输入(例如Join和CoGroup)的函数的第一个输入。
  • @NonForwardedFieldsSecond 对于具有两个输入的函数的第二个输入,例如Join和CoGroup。

以下示例显示如何声明未转发的字段信息:

@NonForwardedFields("f1") // second field is not forwarded
public class MyMap implements
              MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
  @Override
  public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> val) {
    return new Tuple2<Integer, Integer>(val.f0, val.f1 / 2);
  }
}
@NonForwardedFields("_2") // second field is not forwarded
class MyMap extends MapFunction[(Int, Int), (Int, Int)]{
  def map(value: (Int, Int)): (Int, Int) = {
    return (value._1, value._2 / 2)
  }
}

阅读字段

读取字段信息声明由函数访问和评估的所有字段,即函数用于计算其结果的所有字段。例如,在指定读取字段信息时,必须将在条件语句中计算或用于计算的字段标记为已读。只有未经修改的字段转发到输出而不评估它们的值或根本不被访问的字段不被认为是被读取的。

重要信息:读取字段信息的规范是可选的。但如果使用, 全部!必须指定读取字段。将非读取字段声明为读取是安全的。

读取字段被指定为字段表达式列表该列表可以作为单个字符串给出,字段表达式用分号分隔,也可以作为多个字符串。例如同时"f1; f3""f1", "f3"声明一个Java元组的第二和第四场被读出并通过函数进行评价。

使用以下注释将读取字段信息指定为函数类注释:

  • @ReadFields 用于单输入函数,例如Map和Reduce。
  • @ReadFieldsFirst 对于具有两个输入(例如Join和CoGroup)的函数的第一个输入。
  • @ReadFieldsSecond 对于具有两个输入的函数的第二个输入,例如Join和CoGroup。

以下示例显示如何声明读取字段信息:

@ReadFields("f0; f3") // f0 and f3 are read and evaluated by the function.
public class MyMap implements
              MapFunction<Tuple4<Integer, Integer, Integer, Integer>,
                          Tuple2<Integer, Integer>> {
  @Override
  public Tuple2<Integer, Integer> map(Tuple4<Integer, Integer, Integer, Integer> val) {
    if(val.f0 == 42) {
      return new Tuple2<Integer, Integer>(val.f0, val.f1);
    } else {
      return new Tuple2<Integer, Integer>(val.f3+10, val.f1);
    }
  }
}
@ReadFields("_1; _4") // _1 and _4 are read and evaluated by the function.
class MyMap extends MapFunction[(Int, Int, Int, Int), (Int, Int)]{
   def map(value: (Int, Int, Int, Int)): (Int, Int) = {
    if (value._1 == 42) {
      return (value._1, value._2)
    } else {
      return (value._4 + 10, value._2)
    }
  }
}

回到顶部

广播变量

除了常规的 算子操作输入之外,广播变量还允许您为 算子操作的所有并行实例提供数据集。这对于辅助数据集或与数据相关的参数化非常有用。然后,算子可以将数据集作为集合访问。

  • 广播:广播集通过名称注册withBroadcastSet(DataSet, String),和
  • 访问:可通过getRuntimeContext().getBroadcastVariable(String)目标算子访问
// 1. The DataSet to be broadcast
DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3);

DataSet<String> data = env.fromElements("a", "b");

data.map(new RichMapFunction<String, String>() {
    @Override
    public void open(Configuration parameters) throws Exception {
      // 3. Access the broadcast DataSet as a Collection
      Collection<Integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");
    }


    @Override
    public String map(String value) throws Exception {
        ...
    }
}).withBroadcastSet(toBroadcast, "broadcastSetName"); // 2. Broadcast the DataSet

broadcastSetName注册和访问广播数据集时,请确保名称(在前面的示例中)匹配。有关完整的示例程序,请查看 K-Means算法

// 1. The DataSet to be broadcast
val toBroadcast = env.fromElements(1, 2, 3)

val data = env.fromElements("a", "b")

data.map(new RichMapFunction[String, String]() {
    var broadcastSet: Traversable[String] = null

    override def open(config: Configuration): Unit = {
      // 3. Access the broadcast DataSet as a Collection
      broadcastSet = getRuntimeContext().getBroadcastVariable[String]("broadcastSetName").asScala
    }

    def map(in: String): String = {
        ...
    }
}).withBroadcastSet(toBroadcast, "broadcastSetName") // 2. Broadcast the DataSet

Make sure that the names (broadcastSetName in the previous example) match when registering and accessing broadcast data sets. For a complete example program, have a look at KMeans Algorithm.

注意:由于广播变量的内容保存在每个节点的内存中,因此不应该变得太大。对于标量值之类的简单事物,您可以简单地将参数作为函数闭包的一部分,或者使用该withParameters(...)方法传递配置。

回到顶部

分布式缓存

Flink提供了一个分布式缓存,类似于Apache Hadoop,可以在本地访问用户函数的并行实例。此函数可用于共享包含静态外部数据的文件,如字典或机器学习的回归模型。

缓存的工作原理如下。程序在其作为缓存文件的特定名称下注册本地或远程文件系统(如HDFS或S3)的文件或目录ExecutionEnvironment执行程序时,Flink会自动将文件或目录复制到所有工作程序的本地文件系统。用户函数可以查找指定名称下的文件或目录,并从worker的本地文件系统访问它。

分布式缓存使用如下:

注册中的文件或目录ExecutionEnvironment

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// register a file from HDFS
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")

// register a local executable file (script, executable, ...)
env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)

// define your program and execute
...
DataSet<String> input = ...
DataSet<Integer> result = input.map(new MyMapper());
...
env.execute();

访问用户函数中的缓存文件或目录(此处为a MapFunction)。该函数必须扩展RichFunction类,因为它需要访问RuntimeContext

// extend a RichFunction to have access to the RuntimeContext
public final class MyMapper extends RichMapFunction<String, Integer> {

    @Override
    public void open(Configuration config) {

      // access cached file via RuntimeContext and DistributedCache
      File myFile = getRuntimeContext().getDistributedCache().getFile("hdfsFile");
      // read the file (or navigate the directory)
      ...
    }

    @Override
    public Integer map(String value) throws Exception {
      // use content of cached file
      ...
    }
}

Register the file or directory in the ExecutionEnvironment.

val env = ExecutionEnvironment.getExecutionEnvironment

// register a file from HDFS
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")

// register a local executable file (script, executable, ...)
env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)

// define your program and execute
...
val input: DataSet[String] = ...
val result: DataSet[Integer] = input.map(new MyMapper())
...
env.execute()

Access the cached file in a user function (here a MapFunction). The function must extend a RichFunction class because it needs access to the RuntimeContext.

// extend a RichFunction to have access to the RuntimeContext
class MyMapper extends RichMapFunction[String, Int] {

  override def open(config: Configuration): Unit = {

    // access cached file via RuntimeContext and DistributedCache
    val myFile: File = getRuntimeContext.getDistributedCache.getFile("hdfsFile")
    // read the file (or navigate the directory)
    ...
  }

  override def map(value: String): Int = {
    // use content of cached file
    ...
  }
}

回到顶部

将参数传递给函数

可以使用构造函数或withParameters(Configuration)方法将参数传递给函数参数被序列化为函数对象的一部分并传送到所有并行任务实例。

还查看有关如何将命令行参数传递给函数最佳实践指南

通过构造函数

DataSet<Integer> toFilter = env.fromElements(1, 2, 3);

toFilter.filter(new MyFilter(2));

private static class MyFilter implements FilterFunction<Integer> {

  private final int limit;

  public MyFilter(int limit) {
    this.limit = limit;
  }

  @Override
  public boolean filter(Integer value) throws Exception {
    return value > limit;
  }
}
val toFilter = env.fromElements(1, 2, 3)

toFilter.filter(new MyFilter(2))

class MyFilter(limit: Int) extends FilterFunction[Int] {
  override def filter(value: Int): Boolean = {
    value > limit
  }
}

通过 withParameters(Configuration)

此方法将Configuration对象作为参数,将其传递给rich函数open() 方法。Configuration对象是从String键到不同值类型的Map。

DataSet<Integer> toFilter = env.fromElements(1, 2, 3);

Configuration config = new Configuration();
config.setInteger("limit", 2);

toFilter.filter(new RichFilterFunction<Integer>() {
    private int limit;

    @Override
    public void open(Configuration parameters) throws Exception {
      limit = parameters.getInteger("limit", 0);
    }

    @Override
    public boolean filter(Integer value) throws Exception {
      return value > limit;
    }
}).withParameters(config);
val toFilter = env.fromElements(1, 2, 3)

val c = new Configuration()
c.setInteger("limit", 2)

toFilter.filter(new RichFilterFunction[Int]() {
    var limit = 0

    override def open(config: Configuration): Unit = {
      limit = config.getInteger("limit", 0)
    }

    def filter(in: Int): Boolean = {
        in > limit
    }
}).withParameters(c)

全局通过 ExecutionConfig

Flink还允许将自定义配置值传递到ExecutionConfig环境接口。由于执行配置可在所有(丰富)用户函数中访问,因此自定义配置将在所有函数中全局可用。

设置自定义全局配置

Configuration conf = new Configuration();
conf.setString("mykey","myvalue");
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(conf);
val env = ExecutionEnvironment.getExecutionEnvironment
val conf = new Configuration()
conf.setString("mykey", "myvalue")
env.getConfig.setGlobalJobParameters(conf)

请注意,您还可以将自定义ExecutionConfig.GlobalJobParameters类作为全局作业参数传递给执行配置。该接口允许实现该Map<String, String> toMap()方法,方法将依次显示来自Web前端中的配置的值。

从全局配置中访问值

全局作业参数中的对象可在系统中的许多位置访问。实现RichFunction接口的所有用户函数都可以通过运行时上下文访问。

public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {

    private String mykey;
    @Override
    public void open(Configuration parameters) throws Exception {
      super.open(parameters);
      ExecutionConfig.GlobalJobParameters globalParams = getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
      Configuration globConf = (Configuration) globalParams;
      mykey = globConf.getString("mykey", null);
    }
    // ... more here ...

回到顶部