API迁移指南

自Flink 1.2以来,有一些API已被更改。大多数更改都记录在其特定文档中。以下是API更改的综合列表以及升级到Flink 1.3时迁移详细信息的链接。

TypeSerializer 界面变化

这主要适用TypeSerializer于为其状态实施自定义用户

从Flink 1.3开始,添加了两个与保存点恢复的串行器兼容性相关的其他方法。 有关如何实现这些方法的更多详细信息,请参阅 处理序列化程序升级和兼容性

ProcessFunction 总是一个 RichFunction

在Flink 1.2中,引入了ProcessFunction其丰富的变体RichProcessFunction自Flink 1.3以来,RichProcessFunction已被删除,ProcessFunction现在始终RichFunction可以访问生命周期方法和运行时上下文。

Flink 1.3中的CEP库附带了许多新函数,这些函数导致API发生了一些变化。有关详细信息,请访问CEP迁移文档

在Flink 1.3中,为了确保用户可以使用他们自己的自定义日志记录框架,核心Flink工件现在可以清除特定的记录器依赖项。

示例和快速入门原型已经指定了记录器,不应受到影响。对于其他自定义项目,请确保添加记录器依赖项。例如,在Maven中pom.xml,您可以添加:

<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.7</version>
</dependency>

<dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>1.2.17</version>
</dependency>

状态文档中所述,Flink有两种类型的状态:被Keys化状态 非被Keys化状态(也称为算子状态)。这两种类型都可用于 算子和用户定义的函数。本文档将指导您完成将Flink 1.1函数代码迁移到Flink 1.2的过程,并介绍Flink 1.2中引入的一些重要内部更改,这些更改涉及Flink 1.1中对齐窗口 算子的弃用(请参阅对齐处理时间窗口 算子)。

迁移过程将有两个目标:

  1. 允许您的函数利用Flink 1.2中引入的新函数,例如重新缩放,

  2. 确保您的新Flink 1.2作业能够从其Flink 1.1前身生成的保存点恢复执行。

按照本指南中的步骤 算子操作后,您可以将正在运行的作业从Flink 1.1迁移到Flink 1.2,只需在Flink 1.1作业中使用保存点并将其作为起点提供给Flink 1.2作业。这将允许Flink 1.2作业从其Flink 1.1前任中断的位置恢复执行。

示例用户函数

作为本文档其余部分的运行示例,我们将使用CountMapperBufferingSink 函数。第一个是具有被Keys化状态的函数的示例,而第二个是具有非被Keys化状态的函数Flink 1.1中上述两个函数的代码如下:

public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {

    private transient ValueState<Integer> counter;

    private final int numberElements;

    public CountMapper(int numberElements) {
        this.numberElements = numberElements;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        counter = getRuntimeContext().getState(
            new ValueStateDescriptor<>("counter", Integer.class, 0));
    }

    @Override
    public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
        int count = counter.value() + 1;
        counter.update(count);

        if (count % numberElements == 0) {
            out.collect(Tuple2.of(value.f0, count));
            counter.update(0); // reset to 0
        }
    }
}

public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
    Checkpointed<ArrayList<Tuple2<String, Integer>>> {

    private final int threshold;

    private ArrayList<Tuple2<String, Integer>> bufferedElements;

    BufferingSink(int threshold) {
        this.threshold = threshold;
        this.bufferedElements = new ArrayList<>();
    }

    @Override
    public void invoke(Tuple2<String, Integer> value) throws Exception {
        bufferedElements.add(value);
        if (bufferedElements.size() == threshold) {
            for (Tuple2<String, Integer> element: bufferedElements) {
                // send it to the sink
            }
            bufferedElements.clear();
        }
    }

    @Override
    public ArrayList<Tuple2<String, Integer>> snapshotState(
        long checkpointId, long checkpointTimestamp) throws Exception {
        return bufferedElements;
    }

    @Override
    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
        bufferedElements.addAll(state);
    }
}

CountMapper是一个RichFlatMapFunction假定按表格分组的输入流 (word, 1)该函数为每个传入的键(ValueState<Integer> counter保存一个计数器,如果某个单词的出现次数超过用户提供的阈值,则会发出一个包含单词本身和出现次数的元组。

BufferingSink是一个SinkFunction接收元件(的潜在的输出CountMapper),并直到达到某个用户指定的阈值,将它们发射到最终水槽之前缓冲它们。这是避免对数据库或外部存储系统进行许多昂贵调用的常用方法。为了以容错方式进行缓冲,缓冲数据元保存在列表(bufferedElements)中,该列表定期检查点。

状态API迁移

要利用Flink 1.2的新函数,应修改上面的代码以使用新的状态抽象。完成这些更改后,您将能够更改作业的并行度(向上或向下扩展),并确保您的新版本的作业将从其前任停止的位置开始。

Keys状态:在深入研究迁移过程的细节之前需要注意的是,如果您的函数只有被Keys化状态,那么Flink 1.1的完全相同的代码也适用于Flink 1.2,完全支持新函数和完全向后兼容性。可以仅针对更好的代码组织进行更改,但这仅仅是一种风格问题。

如上所述,本节的其余部分重点介绍非被Keys化状态

重新缩放和新状态抽象

第一个修改是从旧Checkpointed<T extends Serializable>状态接口到新状态接口的转换在Flink 1.2中,有状态函数可以实现更通用的CheckpointedFunction 接口或ListCheckpointed<T extends Serializable>接口,它在语义上更接近旧接口 Checkpointed

在这两种情况中,非键合状态被预期是一个List序列化的对象,彼此独立的,因而在重新缩放获再分配。换句话说,这些对象是可以重新分区非被Keys化状态的最细粒度。例如,如果并行性1 BufferingSink 包含数据元的检查点状态,(test1, 2)并且(test2, 2)当将并行性增加到2时,(test1, 2)可能最终在任务0中,而(test2, 2)将进入任务1。

有关被Keys化状态和非被Keys化状态重新缩放的原则的更多详细信息,请参阅状态文档

ListCheckpointed

ListCheckpointed接口需要实现两种方法:

List<T> snapshotState(long checkpointId, long timestamp) throws Exception;

void restoreState(List<T> state) throws Exception;

它们的语义与旧Checkpointed界面中的对应物相同唯一的区别是现在snapshotState()应该将对象列表返回到检查点,如前所述,并且 restoreState必须在恢复时处理此列表。如果状态不是重新分区,可以随时返回Collections.singletonList(MY_STATE)snapshotState()更新的代码BufferingSink 包括在下面:

public class BufferingSinkListCheckpointed implements
        SinkFunction<Tuple2<String, Integer>>,
        ListCheckpointed<Tuple2<String, Integer>>,
        CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {

    private final int threshold;

    private transient ListState<Tuple2<String, Integer>> checkpointedState;

    private List<Tuple2<String, Integer>> bufferedElements;

    public BufferingSinkListCheckpointed(int threshold) {
        this.threshold = threshold;
        this.bufferedElements = new ArrayList<>();
    }

    @Override
    public void invoke(Tuple2<String, Integer> value) throws Exception {
        this.bufferedElements.add(value);
        if (bufferedElements.size() == threshold) {
            for (Tuple2<String, Integer> element: bufferedElements) {
                // send it to the sink
            }
            bufferedElements.clear();
        }
    }

    @Override
    public List<Tuple2<String, Integer>> snapshotState(
            long checkpointId, long timestamp) throws Exception {
        return this.bufferedElements;
    }

    @Override
    public void restoreState(List<Tuple2<String, Integer>> state) throws Exception {
        if (!state.isEmpty()) {
            this.bufferedElements.addAll(state);
        }
    }

    @Override
    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
        // this is from the CheckpointedRestoring interface.
        this.bufferedElements.addAll(state);
    }
}

如代码所示,更新的函数也实现了CheckpointedRestoring接口。这是出于向后兼容性原因,更多细节将在本节末尾解释。

CheckpointedFunction

CheckpointedFunction接口需要再次执行两种方法:

void snapshotState(FunctionSnapshotContext context) throws Exception;

void initializeState(FunctionInitializationContext context) throws Exception;

与在Flink 1.1中一样,snapshotState()每当执行检查点时都会调用它,但是每次初始化用户定义的函数时,都会调用initializeState()它(它的对应部分restoreState()),而不是仅在我们从故障中恢复的情况下。鉴于此,initializeState()不仅是初始化不同类型状态的地方,而且还包括状态恢复逻辑。CheckpointedFunction接口的实现 BufferingSink如下所示。

public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
        CheckpointedFunction, CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {

    private final int threshold;

    private transient ListState<Tuple2<String, Integer>> checkpointedState;

    private List<Tuple2<String, Integer>> bufferedElements;

    public BufferingSink(int threshold) {
        this.threshold = threshold;
        this.bufferedElements = new ArrayList<>();
    }

    @Override
    public void invoke(Tuple2<String, Integer> value) throws Exception {
        bufferedElements.add(value);
        if (bufferedElements.size() == threshold) {
            for (Tuple2<String, Integer> element: bufferedElements) {
                // send it to the sink
            }
            bufferedElements.clear();
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        checkpointedState.clear();
        for (Tuple2<String, Integer> element : bufferedElements) {
            checkpointedState.add(element);
        }
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        checkpointedState = context.getOperatorStateStore().
            getSerializableListState("buffered-elements");

        if (context.isRestored()) {
            for (Tuple2<String, Integer> element : checkpointedState.get()) {
                bufferedElements.add(element);
            }
        }
    }

    @Override
    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
        // this is from the CheckpointedRestoring interface.
        this.bufferedElements.addAll(state);
    }
}

initializeState把参数作为一个FunctionInitializationContext这用于初始化非被Keys化状态“容器”。这是一个类型的容器,ListState其中非被Keys化状态对象将在检查点存储:

this.checkpointedState = context.getOperatorStateStore().getSerializableListState("buffered-elements");

在初始化容器之后,我们使用isRestored()上下文方法来检查我们是否在失败后恢复。如果是这样true我们正在恢复,则应用恢复逻辑。

如修改的代码所示,在状态初始化期间恢复的BufferingSink这个ListState被保存在类变量中以供将来使用snapshotState()在那里,ListState被清除由先前的检查点包含的所有对象,然后填充我们要设置检查点新的。

作为旁注,被Keys化状态也可以在initializeState()方法中初始化这可以使用FunctionInitializationContext给定的参数来完成,而不是RuntimeContextFlink 1.1的情况。如果CheckpointedFunction要在CountMapper示例中使用接口,则open()可以删除方法,new snapshotState()initializeState()方法如下所示:

public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>
        implements CheckpointedFunction {

    private transient ValueState<Integer> counter;

    private final int numberElements;

    public CountMapper(int numberElements) {
        this.numberElements = numberElements;
    }

    @Override
    public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
        int count = counter.value() + 1;
        counter.update(count);

        if (count % numberElements == 0) {
            out.collect(Tuple2.of(value.f0, count));
            counter.update(0); // reset to 0
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        // all managed, nothing to do.
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        counter = context.getKeyedStateStore().getState(
            new ValueStateDescriptor<>("counter", Integer.class, 0));
    }
}

请注意,该snapshotState()方法为空,因为Flink本身负责在检查点时SNAPSHOT托管的被Keys化状态。

到目前为止,我们已经看到如何修改我们的函数以利用Flink 1.2引入的新函数。剩下的问题是“我可以确保我的修改后的(Flink 1.2)作业将从我从Flink 1.1运行的作业停止的位置开始吗?”。

答案是肯定的,而这样做的方式非常简单。对于被Keys化状态,您必须什么都不做。Flink将负责从Flink 1.1恢复状态。对于非被Keys化状态,新函数必须实现CheckpointedRestoring接口,如上面的代码所示。这有一个方法,熟悉Flink 1.1 restoreState()的旧Checkpointed接口。如修改后的代码所示BufferingSink,该restoreState()方法与其前身相同。

对齐处理时间窗口 算子

在Flink 1.1中,只有在没有指定的逐出器或触发器的处理时间运行timeWindow()被Key化的数据流上的命令才会实例化特殊类型WindowOperator这可以是一个AggregatingProcessingTimeWindowOperator或一个AccumulatingProcessingTimeWindowOperator这两个 算子都被称为对齐窗口 算子,因为它们假设它们的输入数据元按顺序到达。这在处理时间内有效,因为数据元在到达窗口算子时获得挂钟时间的时间戳。这些 算子仅限于使用内存状态后台,并且具有优化的数据结构,用于存储利用有序输入数据元到达的每窗口数据元。

在Flink 1.2中,不推荐使用对齐的窗口 算子,并且所有窗口 算子操作都通过泛型 WindowOperator此迁移不需要更改Flink 1.1作业的代码,因为Flink将透明地读取Flink 1.1保存点中对齐的窗口 算子存储的状态,将其转换为与泛型相兼容的格式WindowOperator,并使用通用的WindowOperator

注意尽管已弃用,但您仍然可以使用Flink 1.2中的对齐窗口 算子,通过特殊WindowAssigners介绍来实现此目的。这些assigners是 SlidingAlignedProcessingTimeWindowsTumblingAlignedProcessingTimeWindowsassigners,滑动和翻滚分别窗口。使用对齐窗口的Flink 1.2作业必须是一项新工作,因为在使用这些 算子时无法从Flink 1.1保存点恢复执行。

注意对齐的窗口 算子不提供重新缩放函数,也不提供与Flink 1.1的向后兼容性

在Flink 1.2中使用对齐窗口 算子的代码如下所示:

// for tumbling windows
DataStream<Tuple2<String, Integer>> window1 = source
    .keyBy(0)
    .window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, TimeUnit.MILLISECONDS)))
    .apply(your-function)

// for sliding windows
DataStream<Tuple2<String, Integer>> window1 = source
    .keyBy(0)
    .window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
    .apply(your-function)
// for tumbling windows
val window1 = source
    .keyBy(0)
    .window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, TimeUnit.MILLISECONDS)))
    .apply(your-function)

// for sliding windows
val window2 = source
    .keyBy(0)
    .window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
    .apply(your-function)

回到顶部