flink源码2 | 张洪铭的个人博客
张洪铭的个人博客

flink源码2

根据上篇文章内容扩展一下chaining demo

首先写一个streaming的 wordcount

public class StreamingChainingDemo {

@Data
@AllArgsConstructor
@NoArgsConstructor
public static class KeyCount{
    private String key;
    private int count;
}

public static void main(String[] args) throws Exception {
    // set up the streaming execution environment

    Configuration configuration= new Configuration(){
        {
            setInteger("rest.port",9191);
            setBoolean("local.start-webserver",true);
        }
    };
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);

    env.setParallelism(2).setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
    DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 9999);
    dataStreamSource.flatMap((String line, Collector<KeyCount> out)
    -> {
         Stream.of(line.split("\\s+")).forEach(value -> out.collect(new KeyCount(value,1)));
       }
    ).returns(Types.POJO(KeyCount.class))
    .keyBy(new KeySelector<KeyCount, Object>() {
        @Override
        public Object getKey(KeyCount value) throws Exception {
            return value.getKey();
        }
    }).timeWindow(Time.seconds(10)).sum("count").print();
    // execute program
    env.execute("Flink StreamingChainingDemo");
}

}

运行如上代码后我们看WEB UI
此处输入图片的描述
可以看到keyBy操作和sink是chaining在一起的。
如果我们在print()的后面加上.disableChaining()

此处输入图片的描述

可以看到keyBy和sink是forward的并非chaining在一起

此时我们在returns(Types.POJO(KeyCount.class))
后面增加.filter(word-> !””.equals(word))
大家猜flat和filter会chaining在一起吗?

此处输入图片的描述

yeah,你答对了吗?flat和filter会chaining在一起。

这时 我引入并行度的概念:
在.filter(word-> !””.equals(word))
后面机上.setParallelism(3)

就会发现flat和filter是Reblance的关系
此处输入图片的描述

StreamOperator 源码解析
这个接口继承 CheckpointListener, KeyContext, Disposable, Serializable

提供了如下方法:

生命周期相关:
open
close
dispose
prepareSnapshotPreBarrier

容错与状态

snapshotState
initializeState

与StreamRecord相关

setKeyContextElement1
setKeyContextElement2

chain相关

getChainingStrategy
setChainingStrategy

监控相关

getMetricGroup
getOperatorID

AbstractStreamOperator,OneInputStreamOperator与TwoInputStreamOperator接口继承自StreamOperator

OneInputStreamOperator有3个方法

processElement
processWatermark
processLatencyMarker

TwoInputStreamOperator有6个方法

processElement1
processElement2
processWatermark1
processWatermark2
processLatencyMarker1
processLatencyMarker2

AbstractStreamOperator 重要的变量:后面会将具体的用法用处
决定是否在生成JobGraph时对算子进行Chaining优化:

protected ChainingStrategy chainingStrategy = ChainingStrategy.HEAD; 

3个与状态相关的变量

private transient AbstractKeyedStateBackend<?> keyedStateBackend;
private transient DefaultKeyedStateStore keyedStateStore;
private transient OperatorStateBackend operatorStateBackend;

监控相关的变量

protected transient OperatorMetricGroup metrics;
protected transient LatencyStats latencyStats;

方法作用和父类大同小异此处略

AbstractStreamOperator的子类抽象类AbstractUdfStreamOperator
这个抽象类同时实现了OutputTypeConfigurable接口并重写了setOutputType方法设置了输出类型

最后我们来看
OneInputStreamOperator这个类的实现类:

StreamFilter,StreamMap与StreamFlatMap算子在实现的processElement分别调用传入的FilterFunction,MapFunction, FlatMapFunction的udf将element传到下游。其中StreamFlatMap用到了TimestampedCollector,它是output的一层封装,将timestamp加入到StreamRecord中发送到下游。

StreamGroupedReduce与StreamGroupedFold算子相似的点是都涉及到了操作状态, 所以在覆盖open方法时通过创建一个状态的描述符以及调用AbstractStreamOperator实现的getPartitionedState方法获取了一个stateBackend的操作句柄。在processElement方法中借助这个句柄获取当前状态值,在用UDF将新的元素聚合进去并更新状态值,最后输出到下游。不同的是Fold的输出类型可能不一样(所以实现了OutputTypeConfigurable接口的setOutputType方法),并且有初始值。

ProcessOperator, LegacyKeyedProcessOperator(@Deprecated)
ProcessFunction是比较灵活的UDF,允许用户通过在processElement的时候可以通过传入的ctx操作TimerService
注册ProcessingTimeTimer和EventTimeTimer,并且通过实现方法onTimer就可以在Timer被触发的时候执行回调的逻辑。

StreamSink:
SimpleContext,可以获取processingTime,watermark和element的时间戳。
GenericWriteAheadSink提供了一个可以被实现为Exactly once的sink的抽象类
AsyncWaitOperator提供了异步处理的能力,是一个比较特殊的算子,对元素的处理和备份恢复都比较特殊。element的输出通过一个Emitter对象来实现
TimestampsAndPeriodicWatermarksOperator,TimestampsAndPunctuatedWatermarksOperator通过TimestampAssigner提取timestamp并生按照规则生成watermark

和TwoInputStreamOperator这个类的实现类
CoStreamMap, CoStreamFlatMap基本与单流的逻辑没什么区别,只是针对两个流的Function做类似的处理。
IntervalJoinOperator对双流的元素根据提供的ProcessJoinFunction做内连接,并且每个元素都有失效时间。在processElement方法中,每当一个流的元素到达,会将它加入对应流的buffer,并且遍历另一个流的buffer找到所有join的选项。最后再根据失效时间注册一个状态清理的Timer防止buffer无限增长。

CoBroadcastWithKeyedOperator和CoBroadcastWithNonKeyedOperator提供了对(Keyed)BroadcastProcessFunction的支持,和CoProcess有一些类似,只是Broadcast的Stream只有读权限,没有写权限。并且可以通过context直接获得BroadcastState

CoProcessOperator和KeyedCoProcessOperator本质上与单流的处理也没有什么区别,但是提供了双流之间共享状态的可能。CoProcessOperator也被用来实现NonWindowJoin

坚持原创技术分享,您的支持将鼓励我继续创作!