flink源码1 | 张洪铭的个人博客
张洪铭的个人博客

flink源码1

nc -lk 9000
bin/flink run examples/streaming/SocketWindowWordCount.jar –hostname localhost –port 9000

编译flink
git clone https://github.com/apache/flink.git
cd flink
mvn clean package -DskipTests # this will take up to 10 minutes

Flink源码阅读:
从读取文件开始:
例如env.readFileStream
共有如下DataSource:

fromElements
fromElements
fromCollection
fromCollection
fromCollection
fromCollection
fromParallelCollection
fromParallelCollection
fromParallelCollection
readTextFile
readTextFile
readFile
readFile
readFile
readFileStream
readFile
socketTextStream
socketTextStream
socketTextStream
socketTextStream
socketTextStream
createInput
createInput
createInput
createFileInput
addSource
addSource
addSource
addSource

public DataStream<String> readFileStream(String filePath, long intervalMillis, FileMonitoringFunction.WatchType watchType) {
    DataStream<Tuple3<String, Long, Long>> source = addSource(new FileMonitoringFunction(
            filePath, intervalMillis, watchType), "Read File Stream source");

    return source.flatMap(new FileReadFunction());
}

第三个参数分为:

ONLY_NEW_FILES, // Only new files will be processed. 仅处理新增文件
REPROCESS_WITH_APPENDED, // When some files are appended, all contents of the files will be processed. 当文件内容增加了之后会重新处理整个文件。
PROCESS_ONLY_APPENDED // When some files are appended, only appended  contents will be processed. 当文件内容增加了之后只处理新增加内容

FileMonitoringFunction 继承于SourceFunction
接口SourceFunction有两个方法:
run 业务逻辑方法
cancel 取消数据源的数据产生
FileMonitoringFunction实现了这两个方法

@Override
public void run(SourceContext<Tuple3<String, Long, Long>> ctx) throws Exception {
    FileSystem fileSystem = FileSystem.get(new URI(path));

    while (isRunning) {
        List<String> files = listNewFiles(fileSystem);//列出新增文件
        for (String filePath : files) {
            if (watchType == WatchType.ONLY_NEW_FILES
                    || watchType == WatchType.REPROCESS_WITH_APPENDED) {
                ctx.collect(new Tuple3<String, Long, Long>(filePath, 0L, -1L));//从头到尾收集数据
                offsetOfFiles.put(filePath, -1L);
            } else if (watchType == WatchType.PROCESS_ONLY_APPENDED) {
                long offset = 0;
                long fileSize = fileSystem.getFileStatus(new Path(filePath)).getLen();
                if (offsetOfFiles.containsKey(filePath)) {
                    offset = offsetOfFiles.get(filePath);
                }
                //只收集新增部分数据,即从上次获取的offset到这次文件末尾filesize
                ctx.collect(new Tuple3<String, Long, Long>(filePath, offset, fileSize));
                offsetOfFiles.put(filePath, fileSize);

                LOG.info("File processed: {}, {}, {}", filePath, offset, fileSize);
            }
        }

        Thread.sleep(interval);
    }
}

private List<String> listNewFiles(FileSystem fileSystem) throws IOException {
    List<String> files = new ArrayList<String>();

    FileStatus[] statuses = fileSystem.listStatus(new Path(path));//列出给定路径中文件/目录的状态(如果路径为一个目录。)
    //FileStatus 有getLen,getBlockSize,getReplication,getModificationTime,getAccessTime,isDir,getPath方法

    if (statuses == null) {
        LOG.warn("Path does not exist: {}", path);
    } else {
        for (FileStatus status : statuses) {
            Path filePath = status.getPath();
            String fileName = filePath.getName();
            long modificationTime = status.getModificationTime();

            if (!isFiltered(fileName, modificationTime)) {
                //当WatchType 为ONLY_NEW_FILES并且modificationTimes这个map包含读取的文件时即新增文件
                // 或文件修改时间大于modificationTime时为true
                files.add(filePath.toString());
                modificationTimes.put(fileName, modificationTime);
            }
        }
    }

    return files;
}

接下来为们看canal方法只做了一件事挺直running

@Override
public void cancel() {
        isRunning = false;
}

对FileMonitoringFunction的实现清楚之后,回到StreamExecutionEnvironment中,看addSource方法。

public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName) {
    return addSource(function, sourceName, null);
}

public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo) {

    if (function instanceof ResultTypeQueryable) {
        //如果传入的function实现了ResultTypeQueryable接口, 则直接通过接口获取
        typeInfo = ((ResultTypeQueryable<OUT>) function).getProducedType();
    }
    if (typeInfo == null) {
        try {
            typeInfo = TypeExtractor.createTypeInfo(
                    SourceFunction.class,
                    function.getClass(), 0, null, null);//这个方法有点长,实际是typeInfo为空通过反射机制来提取typeInfo
        } catch (final InvalidTypesException e) {
            //获取失败返回MissingTypeInfo实例,里面两个变量:functionName,typeException
            typeInfo = (TypeInformation<OUT>) new MissingTypeInfo(sourceName, e);
        }
    }
    //根据function是实现了ParallelSourceFunction来判断是否是一个并行数据源节点
    boolean isParallel = function instanceof ParallelSourceFunction;
    //闭包清理, 可减少序列化内容, 以及防止序列化出错
    clean(function);
    //初始化一个ChainingStrategy.HEAD节点
    final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function);
    return new DataStreamSource<>(this, typeInfo, sourceOperator, isParallel, sourceName);//返回DataStreamSource
}

由于FileMonitoringFunction继承的是SourceFunction不是 ParallelSourceFunction 故isParallel为flase,即并行度为1
上面看到ChainingStrategy这个枚举类实际有三个属性:
ALWAYS 表示尽可能的与前后operator chaining
NEVER 表示不会chaining
HEAD 表示只会chaining后面。具体后面详细讲解其作用

接下来看最后一个函数DataStreamSource

public DataStreamSource(StreamExecutionEnvironment environment,
        TypeInformation<T> outTypeInfo, StreamSource<T, ?> operator,
        boolean isParallel, String sourceName) {
    super(environment, new SourceTransformation<>(sourceName, operator, outTypeInfo, environment.getParallelism()));

    this.isParallel = isParallel;
    if (!isParallel) {
        setParallelism(1);
    }
}

protected SingleOutputStreamOperator(StreamExecutionEnvironment environment, Transformation<T> transformation) {
    super(environment, transformation);
}

public DataStream(StreamExecutionEnvironment environment, Transformation<T> transformation) {
    this.environment = Preconditions.checkNotNull(environment, "Execution Environment must not be null.");
    this.transformation = Preconditions.checkNotNull(transformation, "Stream Transformation must not be null.");
}

后面就是一系列transform 后面章节详细介绍

坚持原创技术分享,您的支持将鼓励我继续创作!