Apache Beam | 张洪铭的个人博客
张洪铭的个人博客

Apache Beam

Apache Beam
Apache Beam provides an advanced unified programming model, allowing you to implement batch and streaming data processing jobs that can run on any execution engine.

Apache Beam提供了一个先进的统一编程模型,可以实现批量和流数据处理工作,可以运行在任何执行引擎。
Apache Beam is:

  • UNIFIED - Use a single programming model for both batch and streaming use cases.
  • PORTABLE - Execute pipelines on multiple execution environments, including Apache Apex, Apache Flink, Apache Spark, and Google Cloud Dataflow.
  • EXTENSIBLE - Write and share new SDKs, IO connectors, and transformation libraries.
    统一的 - 对批处理和流媒体用例使用单一编程模型。
    轻便的(可移植) - 便携式管道上执行多个执行环境,包括Apache的先端,ApacheFlink,Apache Spark,和谷歌云数据流。
    可扩展的 - 写和分享新的软件开发工具包,IO连接器,和动态库。

Get Started

To use Beam for your data processing tasks, start by reading the Beam Overview and performing the steps in the Quickstart. Then dive into the Documentation section for in-depth concepts and reference materials for the Beam model, SDKs, and runners.
使用Beam为你的数据处理任务,开始通过阅读Beam概述和示例执行以下步骤。然后潜入文档部分进行深入的概念和Beam模型,SDKs,和执行。

Contribute

Beam is an Apache Software Foundation project, available under the Apache v2 license. Beam is an open source community and contributions are greatly appreciated! If you’d like to contribute, please see the Contribute section.
Beam 是Apache软件基金会的项目,Apache v2许可下可用。Beam是一个开源社区和贡献非常感谢!如果你想贡献,请参阅投稿部分。

Apache Beam Overview

Apache Beam is an open source, unified programming model that you can use to create a data processing pipeline. You start by building a program that defines the pipeline using one of the open source Beam SDKs. The pipeline is then executed by one of Beam’s supported distributed processing back-ends, which include Apache Apex, Apache Flink, Apache Spark, and Google Cloud Dataflow.
Apache的Beam是一个开源的、统一的编程模型,可以用来创建一个数据处理管道。你开始建立一个程序,定义了管道使用一个开源的Beam的SDK。管道是通过一Beam的支持分布式处理后端执行,包括Apache的先端,Apache Flink,Apache的Spark,和谷歌云数据流。

Beam is particularly useful for Embarrassingly Parallel data processing tasks, in which the problem can be decomposed into many smaller bundles of data that can be processed independently and in parallel. You can also use Beam for Extract, Transform, and Load (ETL) tasks and pure data integration. These tasks are useful for moving data between different storage media and data sources, transforming data into a more desirable format, or loading data onto a new system.
Beam的高度并行的数据处理任务是特别有用的,其中的问题可以被分解成许多较小的许多数据,可以独立和并行处理。你也可以使用Beam变换,提取,和加载(ETL)任务和纯数据集成。这些任务用于在不同的存储介质和数据源之间移动数据,将数据转换成更理想的格式,或者将数据加载到新系统上。

Apache Beam SDKs

The Beam SDKs provide a unified programming model that can represent and transform data sets of any size, whether the input is a finite data set from a batch data source, or an infinite data set from a streaming data source. The Beam SDKs use the same classes to represent both bounded and unbounded data, and the same transforms to operate on that data. You use the Beam SDK of your choice to build a program that defines your data processing pipeline.
Beam的SDK提供了一个统一的编程模型,可以表示和变换任意大小的数据集,输入是否是一个有限的数据集从一个批处理的数据源,或无限的数据集从一个流的数据源。Beam的SDK使用同一类的代表有界和无界的数据,和相同的变换,对这些数据的操作。你用你选择的Beam SDK构建的程序定义数据处理管道。

Beam currently supports the following language-specific SDKs:
Language SDK Status
Java Active Development
Python Coming Soon
Other TBD

Apache Beam Pipeline Runners

The Beam Pipeline Runners translate the data processing pipeline you define with your Beam program into the API compatible with the distributed processing back-end of your choice. When you run your Beam program, you’ll need to specify the appropriate runner for the back-end where you want to execute your pipeline.
Beam流管道翻译您定义的数据处理管道与您的Beam程序到API与您选择的分布式处理后端兼容。当你运行你的Beam束计划时,你需要指定你想要执行你的管道的后端的合适的执行.。

Beam currently supports Runners that work with the following distributed processing back-ends:
Beam目前支持运行与下列分布式处理后端
Runner Status
Apache Apex In Development
Apache Flink In Development
Apache Spark In Development
Google Cloud Dataflow In Development

Quickstart 来一段hello World吧
环境准备:
JDK1.7+。Maven

maven命令:

1
2
3
4
5
6
7
8
9
10
$ mvn archetype:generate \
-DarchetypeRepository=https://repository.apache.org/content/groups/snapshots \
-DarchetypeGroupId=org.apache.beam \
-DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
-DarchetypeVersion=LATEST \
-DgroupId=org.example \
-DartifactId=word-count-beam \
-Dversion="0.1" \
-Dpackage=org.apache.beam.examples \
-DinteractiveMode=false

查看文件

1
2
3
4
5
[root@zhm1 Beam]# ls
word-count-beam
[root@zhm1 Beam]# cd word-count-beam/
[root@zhm1 word-count-beam]# ls src/main/java/org/apache/beam/examples/
common DebuggingWordCount.java MinimalWordCount.java WindowedWordCount.java WordCount.java

运行:

1
2
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
> -Dexec.args="--inputFile=pom.xml --output=counts" -Pdirect-runner

结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
[root@zhm1 word-count-beam]# ll -lh counts*
-rw-r--r--. 1 root root 618 1月 19 14:02 counts-00000-of-00005
-rw-r--r--. 1 root root 596 1月 19 14:02 counts-00001-of-00005
-rw-r--r--. 1 root root 585 1月 19 14:02 counts-00002-of-00005
-rw-r--r--. 1 root root 581 1月 19 14:02 counts-00003-of-00005
-rw-r--r--. 1 root root 593 1月 19 14:02 counts-00004-of-00005
[root@zhm1 word-count-beam]# cat counts-00000-of-00005
work: 1
IS: 1
versions: 1
direct: 3
specified: 1
incubating: 1
more: 1
snapshots: 4
submission: 1
...
Minimal WordCount demonstrates the basic principles involved in building a pipeline.
WordCount introduces some of the more common best practices in creating re-usable and maintainable pipelines.
Debugging WordCount introduces logging and debugging practices.
Windowed WordCount demonstrates how you can use Beam’s programming model to handle both bounded and unbounded datasets.
```java
从Minimal WordCount分析代码:
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.examples;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
/**
* An example that counts words in Shakespeare.
*
* <p>This class, {@link MinimalWordCount}, is the first in a series of four successively more
* detailed 'word count' examples. Here, for simplicity, we don't show any error-checking or
* argument processing, and focus on construction of the pipeline, which chains together the
* application of core transforms.
*
* <p>Next, see the {@link WordCount} pipeline, then the {@link DebuggingWordCount}, and finally the
* {@link WindowedWordCount} pipeline, for more detailed examples that introduce additional
* concepts.
*
* <p>Concepts:
*
* <pre>
* 1. Reading data from text files
* 2. Specifying 'inline' transforms
* 3. Counting items in a PCollection
* 4. Writing data to text files
* </pre>
*
* <p>No arguments are required to run this pipeline. It will be executed with the DirectRunner. You
* can see the results in the output files in your current working directory, with names like
* "wordcounts-00001-of-00005. When running on a distributed service, you would use an appropriate
* file service.
*/
public class MinimalWordCount {
public static void main(String[] args) {
// Create a PipelineOptions object. This object lets us set various execution
// options for our pipeline, such as the runner you wish to use. This example
// will run with the DirectRunner by default, based on the class path configured
// in its dependencies.
PipelineOptions options = PipelineOptionsFactory.create();
//默认是options.setRunner(DirectRunner.class);
// Create the Pipeline object with the options we defined above.
Pipeline p = Pipeline.create(options);
// Apply the pipeline's transforms.
// Concept #1: Apply a root transform to the pipeline; in this case, TextIO.Read to read a set
// of input text files. TextIO.Read returns a PCollection where each element is one line from
// the input text (a set of Shakespeare's texts).
// This example reads a public data set consisting of the complete works of Shakespeare.
p.apply(TextIO.Read.from("gs://apache-beam-samples/shakespeare/*"))
// 读取本地文件,构建第一个PTransfor
// Concept #2: Apply a ParDo transform to our PCollection of text lines. This ParDo invokes a
// DoFn (defined in-line) on each element that tokenizes the text line into individual words.
// The ParDo returns a PCollection<String>, where each element is an individual word in
// Shakespeare's collected texts.
.apply("ExtractWords", ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
for (String word : c.element().split("[^a-zA-Z']+")) {
if (!word.isEmpty()) {
c.output(word);
}
}
}
}))
// Concept #3: Apply the Count transform to our PCollection of individual words. The Count
// transform returns a new PCollection of key/value pairs, where each key represents a unique
// word in the text. The associated value is the occurrence count for that word.
.apply(Count.<String>perElement())
// Apply a MapElements transform that formats our PCollection of word counts into a printable
// string, suitable for writing to an output file.
.apply("FormatResults", MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
@Override
public String apply(KV<String, Long> input) {
return input.getKey() + ": " + input.getValue();
}
}))
// Concept #4: Apply a write transform, TextIO.Write, at the end of the pipeline.
// TextIO.Write writes the contents of a PCollection (in this case, our PCollection of
// formatted strings) to a series of text files.
//
// By default, it will write to a set of files with names like wordcount-00001-of-00005
.apply(TextIO.Write.to("wordcounts"));
// Run the pipeline.
p.run().waitUntilFinish();
}
}

创建管道

1
PipelineOptions options = PipelineOptionsFactory.create();

下一步是使用我们刚才构建的选项创建一个Pipeline对象。Pipeline对象构建要执行的变换图,与特定流水线相关联。

1
Pipeline p = Pipeline.create(options);

应用管道变换
Minimal的WordCount流水线包含几个变换,以将数据读入流水线,操纵或以其他方式transform数据,并写出结果。每个transform表示管道中的操作。

每个transform采用某种输入(数据或其他),并产生一些输出数据。输入和输出数据是由SDK类表示PCollection。PCollection是一个特殊的类,由Beam SDK提供,您可以使用它来表示几乎任何大小的数据集,包括无限数据集。

此处输入图片的描述

Minimal WordCount流水线包含五个transform

1.一个文本文件Read transform应用于流水线对象本身,并产生PCollection作为输出。输出PCollection中的每个元素表示输入文件中的一行文本。此示例恰好使用存储在可公开访问的Google Cloud Storage存储桶(“gs://”)中的输入数据。

1
p.apply(TextIO.Read.from("gs://apache-beam-samples/shakespeare/*"))

2.一个ParDo transform它调用DoFn了tokenizes文本行成单个的单词每个元素(在线作为一个匿名类中定义)。对于此transform的输入是PCollection由先前生成的文本的行TextIO.Read变换。的ParDo变换输出一个新的PCollection,其中每个元素表示的文本的单词。

1
2
3
4
5
6
7
8
9
10
.apply("ExtractWords", ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
for (String word : c.element().split("[^a-zA-Z']+")) {
if (!word.isEmpty()) {
c.output(word);
}
}
}
}))

3.该SDK提供的Count transform是一个通用的transform,它接受一个PCollection任何类型的,和返回PCollection键/值对。每个键表示输入集合中的唯一元素,每个值表示键在输入集合中出现的次数。

这条管线,用于将输入Count的是PCollection由以前产生个别单词ParDo,并输出为一个PCollection,其中每个键代表一个唯一字中的文本和相关值是出现计数每个键/值对。

1
.apply(Count.<String>perElement())

4.下一个transform将唯一字和出现计数的每个键/值对格式化为适于写入输出文件的可打印字符串。

MapElements是一个更高层次的复合转换,它封装了一个简单的ParDo; 为输入中的每个元件PCollection,MapElements适用于产生恰好一个输出元件的功能。在这个例子中,MapElements调用SimpleFunction(在线作为匿名类中定义),该确实的格式。作为输入,MapElements取PCollection所产生的键/值对的Count,并产生一个新PCollection的可打印字符串。

1
2
3
4
5
6
.apply("FormatResults", MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
@Override
public String apply(KV<String, Long> input) {
return input.getKey() + ": " + input.getValue();
}
}))

5.一个文本文件Write。此transform需要的最终PCollection格式字符串作为输入和每个元素写入到输出文本文件。输入中的每个元素PCollection表示在所产生的输出文件的文本的一个行。

1
.apply(TextIO.Write.to("wordcounts"));

注意,该Write变换产生类型的琐碎结果值PDone,在这种情况下被忽略

运行管道

通过调用运行管线run的方法,它会将您的管道由您在创建您的管道中指定的管道运行程序执行。

1
p.run().waitUntilFinish();

注意,该run方法是异步的。对于阻止执行,而不是,运行管线追加waitUntilFinish方法。

坚持原创技术分享,您的支持将鼓励我继续创作!