flink源码3--StreamGraph->JobGraph->ExecutionGraph | 张洪铭的个人博客
张洪铭的个人博客

flink源码3--StreamGraph->JobGraph->ExecutionGraph

此处输入图片的描述

我们接着分析StreamExecutionEnvironment这个类的实现类:
即我们在调用env.execute(“Flink StreamingChainingDemo”);
此处最新版的flink代码和以前的结构不同。先是调用的StreamExecutionEnvironment的execute代码

public JobExecutionResult execute(String jobName) throws Exception {
    Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");

    return execute(getStreamGraph(jobName));
}

@Internal
public StreamGraph getStreamGraph(String jobName) {
    return getStreamGraphGenerator().setJobName(jobName).generate();
}
private StreamGraphGenerator getStreamGraphGenerator() {
    if (transformations.size() <= 0) {
        throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
    }
    return new StreamGraphGenerator(transformations, config, checkpointCfg)
        .setStateBackend(defaultStateBackend)//null
        .setChaining(isChainingEnabled)//true
        .setUserArtifacts(cacheFile)
        .setTimeCharacteristic(timeCharacteristic)//DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime; //IngestionTime
        .setDefaultBufferTimeout(bufferTimeout);//100
}

根据transformations, config, checkpointCfg初始化StreamGraphGenerator

JobGraph:
接着会进入到LocalStreamEnvironment 这个实现类里面

public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
        JobGraph jobGraph = streamGraph.getJobGraph();
        jobGraph.setAllowQueuedScheduling(true);

        Configuration configuration = new Configuration();
        configuration.addAll(jobGraph.getJobConfiguration());
        configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");

        // add (and override) the settings with what the user defined
        configuration.addAll(this.configuration);

        if (!configuration.contains(RestOptions.BIND_PORT)) {
            configuration.setString(RestOptions.BIND_PORT, "0");
        }

        int numSlotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());

        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
            .setConfiguration(configuration)
            .setNumSlotsPerTaskManager(numSlotsPerTaskManager)
            .build();

        if (LOG.isInfoEnabled()) {
            LOG.info("Running job on local embedded Flink mini cluster");
        }

        MiniCluster miniCluster = new MiniCluster(cfg);

        try {
            miniCluster.start();
            configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().get().getPort());

            return miniCluster.executeJobBlocking(jobGraph);
        }
        finally {
            transformations.clear();
            miniCluster.close();
        }
    }

streamGraph.getJobGraph 里

@SuppressWarnings("deprecation")
@Override
public JobGraph getJobGraph(@Nullable JobID jobID) {
    // temporarily forbid checkpointing for iterative jobs
    if (isIterative() && checkpointConfig.isCheckpointingEnabled() && !checkpointConfig.isForceCheckpointing()) {
            throw new UnsupportedOperationException(
                "Checkpointing is currently not supported by default for iterative jobs, as we cannot guarantee exactly once semantics. "
                    + "State checkpoints happen normally, but records in-transit during the snapshot will be lost upon failure. "
                    + "\nThe user can force enable state checkpoints with the reduced guarantees by calling: env.enableCheckpointing(interval,true)");
        }

    return StreamingJobGraphGenerator.createJobGraph(this, jobID);
}


public static JobGraph createJobGraph(StreamGraph streamGraph, @Nullable JobID jobID) {
    return new StreamingJobGraphGenerator(streamGraph, jobID).createJobGraph();
}


private JobGraph createJobGraph() {
    // make sure that all vertices start immediately
    jobGraph.setScheduleMode(streamGraph.getScheduleMode());

    // Generate deterministic hashes for the nodes in order to identify them across
    // submission iff they didn't change.
    Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);

    // Generate legacy version hashes for backwards compatibility
    List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
    for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
        legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
    }

    Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>();

    setChaining(hashes, legacyHashes, chainedOperatorHashes);

    setPhysicalEdges();

    setSlotSharingAndCoLocation();

    configureCheckpointing();

    JobGraphGenerator.addUserArtifactEntries(streamGraph.getUserArtifacts(), jobGraph);

    // set the ExecutionConfig last when it has been finalized
    try {
        jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
    }
    catch (IOException e) {
            throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." +
                    "This indicates that non-serializable types (like custom serializers) were registered");
    }

        return jobGraph;
    }

上面setChaining(hashes, legacyHashes, chainedOperatorHashes);其核心代码为:
createChain的isChainable

public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
    StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);//获取StreamEdge的源和目标StreamNode
    StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);

    StreamOperatorFactory<?> headOperator = upStreamVertex.getOperatorFactory();//获取源和目标StreamNode中的StreamOperator
    StreamOperatorFactory<?> outOperator = downStreamVertex.getOperatorFactory();
    //可以chaining的条件:
    return downStreamVertex.getInEdges().size() == 1//下游节点只有一个输入
            && outOperator != null//下游节点的操作符不为null
            && headOperator != null//上游节点的操作符不为null
            && upStreamVertex.isSameSlotSharingGroup(downStreamVertex)//上下游节点在一个槽位共享组内
            && outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS//下游节点的连接策略是 ALWAYS
            && (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||//上游节点的连接策略是 HEAD 或者 ALWAYS
                headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
            && (edge.getPartitioner() instanceof ForwardPartitioner)//edge 的分区函数是 ForwardPartitioner 的实例
            && edge.getShuffleMode() != ShuffleMode.BATCH//边的shuffle模式为BATCH
            && upStreamVertex.getParallelism() == downStreamVertex.getParallelism()//上下游节点的并行度相等
            && streamGraph.isChainingEnabled();//isChainingEnabled为true,默认为true
}

只有上述的10个条件都同时满足时,才能说明两个StreamEdge的源和目标StreamNode是可以链接在一起执行的

private List<StreamEdge> createChain(
            Integer startNodeId,
            Integer currentNodeId,
            Map<Integer, byte[]> hashes,
            List<Map<Integer, byte[]>> legacyHashes,
            int chainIndex,
            Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {

        if (!builtVertices.contains(startNodeId)) {

            List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();

            List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
            List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();

            StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);

            for (StreamEdge outEdge : currentNode.getOutEdges()) {
                if (isChainable(outEdge, streamGraph)) {
                    chainableOutputs.add(outEdge);
                } else {
                    nonChainableOutputs.add(outEdge);
                }
            }

            for (StreamEdge chainable : chainableOutputs) {
                transitiveOutEdges.addAll(
                        createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes));
            }

            for (StreamEdge nonChainable : nonChainableOutputs) {
                transitiveOutEdges.add(nonChainable);//不可连接的StreamEdge,输出StreamEdge放入transitiveOutEdges
                createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes);//继续遍历可chaining的节点
            }
            //获取头节点散列值,没有初始化为空链表
            List<Tuple2<byte[], byte[]>> operatorHashes =
                chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>());
            //获取当前节点散列值
            byte[] primaryHashBytes = hashes.get(currentNodeId);
            OperatorID currentOperatorId = new OperatorID(primaryHashBytes);
            //遍历legacyHashes  与primaryHashBytes组成二元数组,添加到链表中
            for (Map<Integer, byte[]> legacyHash : legacyHashes) {
                operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId)));
            }
            //通过-> 拼接chaining名称
            chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
            chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
            chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));

            if (currentNode.getInputFormat() != null) {
                getOrCreateFormatContainer(startNodeId).addInputFormat(currentOperatorId, currentNode.getInputFormat());
            }

            if (currentNode.getOutputFormat() != null) {
                getOrCreateFormatContainer(startNodeId).addOutputFormat(currentOperatorId, currentNode.getOutputFormat());
            }
            //创建jobVertex并设置并行度返回StreamConfig实例
            StreamConfig config = currentNodeId.equals(startNodeId)
                    ? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes)
                    : new StreamConfig(new Configuration());
            //设置序列化器,StreamOperator,checkpoint(默认AT_LEAST_ONCE)
            setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);

            if (currentNodeId.equals(startNodeId)) {

                config.setChainStart();
                config.setChainIndex(0);
                config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
                config.setOutEdgesInOrder(transitiveOutEdges);
                config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());

                for (StreamEdge edge : transitiveOutEdges) {
                    connect(startNodeId, edge);//将JobVertex和JobEdge相连
                }
                //将chain中所有子节点的StreamConfig写入到 headOfChain 节点的  chainedTaskConfig_  配置中
                config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));

            } else {
                chainedConfigs.computeIfAbsent(startNodeId, k -> new HashMap<Integer, StreamConfig>());

                config.setChainIndex(chainIndex);
                StreamNode node = streamGraph.getStreamNode(currentNodeId);
                config.setOperatorName(node.getOperatorName());
                chainedConfigs.get(startNodeId).put(currentNodeId, config);
            }

            config.setOperatorID(currentOperatorId);

            if (chainableOutputs.isEmpty()) {
                config.setChainEnd();
            }
            return transitiveOutEdges;

        } else {
            return new ArrayList<>();
        }
    }

遍历transitiveOutEdges,并将每一条StreamEdge边作为参数传入connect( )函数中:

private void connect(Integer headOfChain, StreamEdge edge) {
        //将当前edge记录物理边界顺序集合中
        physicalEdgesInOrder.add(edge);
        //获取当前edge的下游节点ID
        Integer downStreamvertexID = edge.getTargetId();
        //获取上下游JobVertex节点
        JobVertex headVertex = jobVertices.get(headOfChain);
        JobVertex downStreamVertex = jobVertices.get(downStreamvertexID);
        //获取下游JobVertex的配置
        StreamConfig downStreamConfig = new StreamConfig(downStreamVertex.getConfiguration());
        //下游JobVertex的输入计数器加1
        downStreamConfig.setNumberOfInputs(downStreamConfig.getNumberOfInputs() + 1);

        StreamPartitioner<?> partitioner = edge.getPartitioner();
        //根据shuffle模式不同创建不同的ResultPartitionType
        ResultPartitionType resultPartitionType;
        switch (edge.getShuffleMode()) {
            case PIPELINED://有限的或无限的
                resultPartitionType = ResultPartitionType.PIPELINED_BOUNDED;
                break;
            case BATCH://仅在生成完整结果后向下游发送数据
                resultPartitionType = ResultPartitionType.BLOCKING;
                break;
            case UNDEFINED://blockingConnectionsBetweenChains 为true BLOCKING  flase 为PIPELINED_BOUNDED
                resultPartitionType = streamGraph.isBlockingConnectionsBetweenChains() ?
                        ResultPartitionType.BLOCKING : ResultPartitionType.PIPELINED_BOUNDED;
                break;
            default:
                throw new UnsupportedOperationException("Data exchange mode " +
                    edge.getShuffleMode() + " is not supported yet.");
        }
        //根据ForwardPartitioner和RescalePartitioner两种分区方式建立DistributionPattern.POINTWISE类型的JobEdge
        JobEdge jobEdge;
        if (partitioner instanceof ForwardPartitioner || partitioner instanceof RescalePartitioner) {
            jobEdge = downStreamVertex.connectNewDataSetAsInput(
                headVertex,
                DistributionPattern.POINTWISE,
                resultPartitionType);
        } else {//其他分区方式则是DistributionPattern.ALL_TO_ALL类型
            jobEdge = downStreamVertex.connectNewDataSetAsInput(
                    headVertex,
                    DistributionPattern.ALL_TO_ALL,
                    resultPartitionType);
        }
        // set strategy name so that web interface can show it. 设置策略名称方便web接口显示
        jobEdge.setShipStrategyName(partitioner.toString());

        if (LOG.isDebugEnabled()) {
            LOG.debug("CONNECTED: {} - {} -> {}", partitioner.getClass().getSimpleName(),
                    headOfChain, downStreamvertexID);
        }
    }


public JobEdge connectNewDataSetAsInput(
            JobVertex input,
            DistributionPattern distPattern,
            ResultPartitionType partitionType) {
        //JobVertex和JobEdge之间通过创建IntermediateDataSet来连接
        IntermediateDataSet dataSet = input.createAndAddResultDataSet(partitionType);

        JobEdge edge = new JobEdge(dataSet, this, distPattern);
        this.inputs.add(edge);
        dataSet.addConsumer(edge);
        return edge;
    }

最后附上一副    

此处输入图片的描述

execute方法最后执行。
miniCluster.executeJobBlocking(jobGraph);
中间一些列Akka 的RPC通讯省略不表,对并发编程有兴趣可以研究下Akka 和Actor

ExecutionGraph:

public JobExecutionResult executeJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
        checkNotNull(job, "job is null");

        final CompletableFuture<JobSubmissionResult> submissionFuture = submitJob(job);

        final CompletableFuture<JobResult> jobResultFuture = submissionFuture.thenCompose(
            (JobSubmissionResult ignored) -> requestJobResult(job.getJobID()));

        final JobResult jobResult;

        try {
            jobResult = jobResultFuture.get();
        } catch (ExecutionException e) {
            throw new JobExecutionException(job.getJobID(), "Could not retrieve JobResult.", ExceptionUtils.stripExecutionException(e));
        }

        try {
            return jobResult.toJobExecutionResult(Thread.currentThread().getContextClassLoader());
        } catch (IOException | ClassNotFoundException e) {
            throw new JobExecutionException(job.getJobID(), e);
        }
    }
public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) {
    final CompletableFuture<DispatcherGateway> dispatcherGatewayFuture = getDispatcherGatewayFuture();

    // we have to allow queued scheduling in Flip-6 mode because we need to request slots
    // from the ResourceManager
    jobGraph.setAllowQueuedScheduling(true);

    final CompletableFuture<InetSocketAddress> blobServerAddressFuture = createBlobServerAddress(dispatcherGatewayFuture);

    final CompletableFuture<Void> jarUploadFuture = uploadAndSetJobFiles(blobServerAddressFuture, jobGraph);

    final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = jarUploadFuture
        .thenCombine(
            dispatcherGatewayFuture,
            (Void ack, DispatcherGateway dispatcherGateway) -> dispatcherGateway.submitJob(jobGraph, rpcTimeout))
        .thenCompose(Function.identity());

    return acknowledgeCompletableFuture.thenApply(
        (Acknowledge ignored) -> new JobSubmissionResult(jobGraph.getJobID()));
}
坚持原创技术分享,您的支持将鼓励我继续创作!