flink sliding window join | 张洪铭的个人博客
张洪铭的个人博客

flink sliding window join

接上篇文章flink operator 具体分析下window join
官网地址:
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/joining.html

根据官网例子从demo实现:

工具类:模拟source:
GreenSource 发送:(”tom”,0),(”jerry”,1) , (”alice”,2) , (”tom”,3), (”tom”,4)
OrangeSource 发送:(”tom”,0),(”tom”,1) , (”tom”,2) , (”tom”,3), (”tom”,4)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package com.opensourceteams.module.bigdata.flink.window;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.io.Serializable;
import java.util.Iterator;
import java.util.Random;
@SuppressWarnings("serial")
public class WindowJoinSampleData {
static final String[] KEY1 = {"tom", "jerry", "alice", "tom", "tom"};
static final String[] KEY2 = {"tom", "tom", "tom", "tom", "tom"};
static final int[] VALUE = {0,1,2,3,4};
public static class GreenSource implements Iterator<Tuple2<String, Integer>>, Serializable {
static int GRADE_COUNT = 0;
private final Random rnd = new Random(hashCode());
@Override
public boolean hasNext() {
return GRADE_COUNT == 5 ? false :true;
}
@Override
public Tuple2<String, Integer> next() {
Tuple2<String, Integer> stringIntegerTuple2 = new Tuple2<>(KEY1[GRADE_COUNT], VALUE[GRADE_COUNT]);
GRADE_COUNT ++;
return stringIntegerTuple2;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
public static DataStream<Tuple2<String, Integer>> getSource(StreamExecutionEnvironment env, long rate) {
return env.fromCollection(new ThrottledIterator<>(new GreenSource(), rate),
TypeInformation.of(new TypeHint<Tuple2<String, Integer>>(){}));
}
}
/**
* Continuously generates (name, salary).
*/
public static class OrangeSource implements Iterator<Tuple2<String, Integer>>, Serializable {
static int GRADE_COUNT = 0;
private final Random rnd = new Random(hashCode());
@Override
public boolean hasNext() {
return GRADE_COUNT == 5 ? false :true;
}
@Override
public Tuple2<String, Integer> next() {
Tuple2<String, Integer> stringIntegerTuple2 = new Tuple2<>(KEY2[GRADE_COUNT], VALUE[GRADE_COUNT]);
GRADE_COUNT ++;
return stringIntegerTuple2;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
public static DataStream<Tuple2<String, Integer>> getSource(StreamExecutionEnvironment env, long rate) {
return env.fromCollection(new ThrottledIterator<>(new OrangeSource(), rate),
TypeInformation.of(new TypeHint<Tuple2<String, Integer>>(){}));
}
}
}

模拟频率发送,每隔五秒发送一次数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package com.opensourceteams.module.bigdata.flink.window;
import java.io.Serializable;
import java.util.Iterator;
import static java.util.Objects.requireNonNull;
public class ThrottledIterator<T> implements Iterator<T>, Serializable {
private static final long serialVersionUID = 1L;
@SuppressWarnings("NonSerializableFieldInSerializableClass")
private final Iterator<T> source;
private final long sleepBatchSize;
private final long sleepBatchTime;
private long lastBatchCheckTime;
private long num;
public ThrottledIterator(Iterator<T> source, long elementsPerSecond) {
this.source = requireNonNull(source);
if (!(source instanceof Serializable)) {
throw new IllegalArgumentException("source must be java.io.Serializable");
}
if (elementsPerSecond >= 1) {
// how long does element take
this.sleepBatchSize = 1;
this.sleepBatchTime = 5000 / elementsPerSecond;
}
else {
throw new IllegalArgumentException("'elements per second' must be positive and not zero");
}
}
@Override
public boolean hasNext() {
return source.hasNext();
}
@Override
public T next() {
// delay if necessary
if (lastBatchCheckTime > 0) {
if (++num >= sleepBatchSize) {
num = 0;
final long now = System.currentTimeMillis();
final long elapsed = now - lastBatchCheckTime;
if (elapsed < sleepBatchTime /2 ) {
try {
Thread.sleep(sleepBatchTime - elapsed);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}else {
try {
if (elapsed < sleepBatchTime) {
Thread.sleep(sleepBatchTime - (sleepBatchTime-elapsed));
}else if(elapsed == sleepBatchTime){
Thread.sleep(sleepBatchTime);
}else{
Thread.sleep(sleepBatchTime - (elapsed - sleepBatchTime));
}
} catch (InterruptedException e) {
// restore interrupt flag and proceed
Thread.currentThread().interrupt();
}
}
lastBatchCheckTime = now;
}
} else {
lastBatchCheckTime = System.currentTimeMillis();
}
return source.next();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
}

主函数:为观察方便,将时间扩大5000倍:window滑动窗口5秒滑动一次,窗口大小为10秒

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package com.opensourceteams.module.bigdata.flink.window;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
@SuppressWarnings("serial")
public class SlingdingWindowJoin {
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
final long windowSize = params.getLong("windowSize", 10000);
final long windowSlide = params.getLong("windowSize", 5000);
final long rate = params.getLong("rate", 1L);
// obtain execution environment, run this example in "ingestion time"
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// make parameters available in the web interface
env.getConfig().setGlobalJobParameters(params);
// create the data sources for both grades and salaries
DataStream<Tuple2<String, Integer>> orangeStream = WindowJoinSampleData.OrangeSource.getSource(env, rate);
DataStream<Tuple2<String, Integer>> greenStream = WindowJoinSampleData.GreenSource.getSource(env, rate);
// run the actual window join program
// for testability, this functionality is in a separate method.
DataStream<String> joinedStream = runWindowJoin(orangeStream, greenStream, windowSize,windowSlide);
// print the results with a single thread, rather than in parallel
joinedStream.print().setParallelism(1);
// execute program
env.execute("Slingding Window Join Example");
}
public static DataStream<String> runWindowJoin(
DataStream<Tuple2<String, Integer>> grades,
DataStream<Tuple2<String, Integer>> salaries,
long windowSize,long windowSlide) {
return grades.join(salaries)
.where(new NameKeySelector())
.equalTo(new NameKeySelector())
.window(SlidingEventTimeWindows.of(Time.milliseconds(windowSize), Time.milliseconds(windowSlide )))
.apply(new JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>() {
@Override
public String join(
Tuple2<String, Integer> first,
Tuple2<String, Integer> second) {
return first.f1 + "," + second.f1;
}
});
}
private static class NameKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
@Override
public String getKey(Tuple2<String, Integer> value) {
return value.f0;
}
}
}

预期结果:
此处输入图片的描述

运行结果:
此处输入图片的描述

坚持原创技术分享,您的支持将鼓励我继续创作!