风暴兼容性Beta

Flink流与Apache Storm接口兼容,因此允许重用为Storm实现的代码。

您可以:

  • Topology在Flink 执行整个Storm
  • 在Flink流处理节目中使用Storm Spout/ Bolt作为源/算子。

本文档介绍了如何在Flink中使用现有的Storm代码。

项目配置

支持Storm包含在flink-stormMaven模块中。代码驻留在org.apache.flink.storm包中。

pom.xml如果要在Flink中执行Storm代码,请将以下依赖项添加到您的

<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-storm_2.11</artifactId>
	<version>1.7-SNAPSHOT</version>
</dependency>

请注意:不要添加storm-core为依赖项。它已包含在内flink-storm

请注意flink-storm不是提供的二进制Flink发行版的一部分。因此,您需要flink-storm在提交给Flink的JobManager的程序jar(也称为uber-jar或fat-jar)中包含类(及其依赖项)。字计数风暴flink-storm-examples/pom.xml的一个例子,如何正确地打包罐。

如果你想避免大尤伯杯罐子,你可以手动复制storm-core-0.9.4.jarjson-simple-1.1.jarflink-storm-1.7-SNAPSHOT.jar进入Flink的lib/每个群集节点的文件夹(之前在启动群集)。对于这种情况,仅将您自己的Spout和Bolt类(及其内部依赖项)包含在程序jar中就足够了。

执行Storm拓扑

Flink提供与Storm兼容的API(org.apache.flink.storm.api),它可以替代以下类:

  • StormSubmitter 取而代之 FlinkSubmitter
  • NimbusClientClient替换为FlinkClient
  • LocalCluster 取而代之 FlinkLocalCluster

为了向Flink提交Storm拓扑,只需使用组装拓扑的Storm 客户端代码中的Flink替换来替换使用过的Storm类实际的运行时代码,即Spouts和Bolts,可以不加修改地使用如果拓扑在远程集群执行时,参数nimbus.hostnimbus.thrift.port被用作jobmanger.rpc.addressjobmanger.rpc.port分别。如果未指定参数,则取值flink-conf.yaml

TopologyBuilder builder = new TopologyBuilder(); // the Storm topology builder

// actual topology assembling code and used Spouts/Bolts can be used as-is
builder.setSpout("source", new FileSpout(inputFilePath));
builder.setBolt("tokenizer", new BoltTokenizer()).shuffleGrouping("source");
builder.setBolt("counter", new BoltCounter()).fieldsGrouping("tokenizer", new Fields("word"));
builder.setBolt("sink", new BoltFileSink(outputFilePath)).shuffleGrouping("counter");

Config conf = new Config();
if(runLocal) { // submit to test cluster
	// replaces: LocalCluster cluster = new LocalCluster();
	FlinkLocalCluster cluster = new FlinkLocalCluster();
	cluster.submitTopology("WordCount", conf, FlinkTopology.createTopology(builder));
} else { // submit to remote cluster
	// optional
	// conf.put(Config.NIMBUS_HOST, "remoteHost");
	// conf.put(Config.NIMBUS_THRIFT_PORT, 6123);
	// replaces: StormSubmitter.submitTopology(topologyId, conf, builder.createTopology());
	FlinkSubmitter.submitTopology("WordCount", conf, FlinkTopology.createTopology(builder));
}

在Flink流程序中嵌入Storm算子

作为替代方案,Spouts和Bolts可以嵌入到常规流处理节目中。Storm兼容层为每个提供了一个打包类,即SpoutWrapperBoltWrapperorg.apache.flink.storm.wrappers)。

每默认情况下,打包转换风暴输出元组Flink的元组类型(即,Tuple0Tuple25根据风暴元组的字段数)。对于单场输出元组,也可以转换为字段的数据类型(例如,String代替Tuple1<String>)。

由于Flink无法推断Storm 算子的输出字段类型,因此需要手动指定输出类型。为了获得正确的TypeInformation对象,TypeExtractor可以使用Flink

嵌入Spouts

要将Spout用作Flink源,请使用StreamExecutionEnvironment.addSource(SourceFunction, TypeInformation)Spout对象被传递给它的构造函数SpoutWrapper<OUT>,作为第一个参数addSource(...)泛型类型声明OUT指定源输出流的类型。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// stream has `raw` type (single field output streams only)
DataStream<String> rawInput = env.addSource(
	new SpoutWrapper<String>(new FileSpout(localFilePath), new String[] { Utils.DEFAULT_STREAM_ID }), // emit default output stream as raw type
	TypeExtractor.getForClass(String.class)); // output type

// process data stream
[...]

如果Spout发出有限数量的元组,SpoutWrapper可以通过numberOfInvocations在其构造函数中设置参数来配置为自动终止这允许Flink程序在处理完所有数据后自动关闭。默认情况下,程序将一直运行,直到手动取消

嵌入螺栓

要使用Bolt作为Flink 算子,请使用DataStream.transform(String, TypeInformation, OneInputStreamOperator)Bolt对象被传递给它的构造函数BoltWrapper<IN,OUT>,作为最后一个参数transform(...)泛型类型声明IN并分别OUT指定 算子的输入和输出流的类型。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.readTextFile(localFilePath);

DataStream<Tuple2<String, Integer>> counts = text.transform(
	"tokenizer", // operator name
	TypeExtractor.getForObject(new Tuple2<String, Integer>("", 0)), // output type
	new BoltWrapper<String, Tuple2<String, Integer>>(new BoltTokenizer())); // Bolt operator

// do further processing
[...]

嵌入式螺栓的命名属性访问

螺栓可以通过名称访问输入元组字段(另外通过索引访问)。要在嵌入式螺栓中使用此函数,您需要具有a

  1. POJO类型输入流或
  2. 元组类型输入流并指定输入模式(即名称到索引映射)

对于POJO输入类型,Flink通过反射访问字段。对于这种情况,Flink期望相应的公共成员变量或公共getter方法。例如,如果Bolt通过名称sentence(例如String s = input.getStringByField("sentence");访问字段,则输入POJO类必须具有成员变量public String sentence;或方法public String getSentence() { ... };(注意驼峰式命名)。

对于Tuple输入类型,需要使用Storm的Fields指定输入模式对于这种情况,构造函数BoltWrapper需要另外一个参数:new BoltWrapper<Tuple1<String>, ...>(..., new Fields("sentence"))输入类型是Tuple1<String>Fields("sentence")指定input.getStringByField("sentence")相当于input.getString(0)

有关示例请参阅BoltTokenizerWordCountPojoBoltTokenizerWordCountWithNames

配置喷口和螺栓

在Storm中,Spouts和Bolts可以配置一个全局分布的Map对象,该对象被赋予submitTopology(...)方法LocalClusterStormSubmitterMap是由拓扑旁边的用户提供的,并作为参数转发给呼叫Spout.open(...)Bolt.prepare(...)如果在Flink中使用FlinkTopologyBuilder执行整个拓扑,则不需要特别注意 - 它与常规Storm一样。

对于嵌入式使用,必须使用Flink的配置机制。可以在StreamExecutionEnvironmentvia中设置全局配置.getConfig().setGlobalJobParameters(...)Flink的常规Configuration课程可用于配置Spouts和Bolts。但是,Configuration不像Storm那样支持任意Keys数据类型(只String允许Keys)。因此,Flink还提供StormConfig了可以像raw一样使用的类,Map以提供与Storm的完全兼容性。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StormConfig config = new StormConfig();
// set config values
[...]

// set global Storm configuration
env.getConfig().setGlobalJobParameters(config);

// assemble program with embedded Spouts and/or Bolts
[...]

多输出流

Flink还可以处理Spout和Bolts的多个输出流的声明。如果在Flink中使用FlinkTopologyBuilder执行整个拓扑,则不需要特别注意 - 它与常规Storm一样。

对于嵌入式使用,输出流将是数据类型SplitStreamType<T>,必须使用DataStream.split(...)拆分SplitStream.select(...)Flink提供预定义输出选择StormStreamSelector<T>.split(...)已经。此外,SplitStreamTuple<T>可以使用除去打包类型SplitStreamMapper<T>

[...]

// get DataStream from Spout or Bolt which declares two output streams s1 and s2 with output type SomeType
DataStream<SplitStreamType<SomeType>> multiStream = ...

SplitStream<SplitStreamType<SomeType>> splitStream = multiStream.split(new StormStreamSelector<SomeType>());

// remove SplitStreamType using SplitStreamMapper to get data stream of type SomeType
DataStream<SomeType> s1 = splitStream.select("s1").map(new SplitStreamMapper<SomeType>()).returns(SomeType.class);
DataStream<SomeType> s2 = splitStream.select("s2").map(new SplitStreamMapper<SomeType>()).returns(SomeType.class);

// do further processing on s1 and s2
[...]

有关完整示例,请参阅SpoutSplitExample.java

Flink Extensions

有限的喷口

在Flink中,流处理源可以是有限的,即发出有限数量的记录并在发出最后一条记录后停止。但是,Spouts通常会发出无限的流。两种方法之间的桥接是FiniteSpout除了IRichSpout包含reachedEnd()方法之外接口,其中用户可以指定停止条件。用户可以通过实现此接口而不是(或另外)来创建有限Spout IRichSpout,并reachedEnd()另外实现该方法。SpoutWrapper配置为发出有限数量的元组的FiniteSpout接口相比接口允许实现更复杂的终止标准。

尽管有限的Spout不需要将Spouts嵌入到Flink流程序中或向Flink提交整个Storm拓扑,但有些情况下它们可能会派上用场:

  • 实现原生Spout的行为与有限Flink源相同,只需要很少的修改
  • 用户想要只处理一段时间; 之后,Spout可以自动停止
  • 将文件读入流中
  • 用于测试目的

有限Spout的示例,仅发出10秒的记录:

public class TimedFiniteSpout extends BaseRichSpout implements FiniteSpout {
	[...] // implement open(), nextTuple(), ...

	private long starttime = System.currentTimeMillis();

	public boolean reachedEnd() {
		return System.currentTimeMillis() - starttime > 10000l;
	}
}

Storm兼容性示例

您可以在Maven模块中找到更多示例flink-storm-examples有关不同版本的WordCount,请参阅README.md要运行示例,您需要组装正确的jar文件。 flink-storm-examples-1.7-SNAPSHOT.jarno / not作业执行有效的jar文件(这仅仅是一个标准的Maven神器)。

有嵌入式喷口和螺栓,即例如罐WordCount-SpoutSource.jarWordCount-BoltTokenizer.jar分别。比较pom.xml看两个罐子是如何构建的。此外,整个Storm拓扑(WordCount-StormTopology.jar有一个例子

您可以通过运行这些示例中的每一个bin/flink run <jarname>.jar每个jar的清单文件中都包含正确的入口点类。

回到顶部