flink AllowLateness | 张洪铭的个人博客
张洪铭的个人博客

flink AllowLateness

AllowLateness:
为了处理乱序问题而产生的概念
默认情况下,当watermark通过end-of-window之后,再有之前的数据到达时,这些数据会被删除。

为了避免有些迟到的数据被删除,因此产生了allowedLateness的概念。

简单来讲,allowedLateness就是针对event time而言,对于watermark超过end-of-window之后,还允许有一段时间(也是以event time来衡量)来等待之前的数据到达,以便再次处理这些数据。

工具类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.opensourceteams.module.bigdata.flink.window.streaming;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
/**
* Created by zhanghongming on 2019/8/18.
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Order{
private long timestamp;
private long userid;
private long itemid;
private long amount;
private long price;
private static final DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
@Override
public String toString() {
String time = Instant.ofEpochMilli(timestamp).atZone(ZoneId.systemDefault()).format(timeFormatter);
return "Order{" +
"timestamp=" + time +
", userid=" + userid +
", itemid=" + itemid +
", amount=" + amount +
", price=" + price +
'}';
}
}

数据类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package com.opensourceteams.module.bigdata.flink.window.streaming;
import com.opensourceteams.module.bigdata.flink.window.ThrottledIterator;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.io.Serializable;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.Iterator;
import java.util.Random;
@SuppressWarnings("serial")
public class WindowData {
static final Long[] USERID = {101L,100L};
static final Long[] AMOUNT = {10L,15L,20L,25L,30L};
static final Long[] PRICE = {1000L,2000L,3000L,4000L,5000L};
static final DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
/**
* Continuously generates (name, grade).
*/
public static class GradeSource implements Iterator<Tuple5<Long,Long, Long, Long, Long>>, Serializable {
private final Random rnd = new Random(hashCode());
private volatile long itemid = 0L;
@Override
public boolean hasNext() {
return true;
}
@Override
public Tuple5<Long, Long, Long, Long, Long> next() {
long time = System.currentTimeMillis();
if(itemid !=0 && itemid%6 ==0){
time = time-6000;
}else if(itemid !=0 && itemid%10 ==0){
time = time-8000;
}
itemid++;
Tuple5<Long, Long, Long, Long, Long> longLongLongLongLongTuple5 = new Tuple5<>(time
, USERID[rnd.nextInt(USERID.length)], itemid
, AMOUNT[rnd.nextInt(AMOUNT.length)], PRICE[rnd.nextInt(PRICE.length)]);
System.out.println(Instant.ofEpochMilli(time).atZone(ZoneId.systemDefault()).format(timeFormatter)+"=="+longLongLongLongLongTuple5);
return longLongLongLongLongTuple5;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
public static DataStream<Tuple5<Long,Long, Long, Long, Long>> getSource(StreamExecutionEnvironment env, long rate) {
return env.fromCollection(new ThrottledIterator<>(new GradeSource(), rate),
TypeInformation.of(new TypeHint<Tuple5<Long,Long, Long, Long, Long>>(){}));
}
}
}

工具类速率控制类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package com.opensourceteams.module.bigdata.flink.window;
import java.io.Serializable;
import java.util.Iterator;
import static java.util.Objects.requireNonNull;
public class ThrottledIterator<T> implements Iterator<T>, Serializable {
private static final long serialVersionUID = 1L;
@SuppressWarnings("NonSerializableFieldInSerializableClass")
private final Iterator<T> source;
private final long sleepBatchSize;
private final long sleepBatchTime;
private long lastBatchCheckTime;
private long num;
public ThrottledIterator(Iterator<T> source, long elementsPerSecond) {
this.source = requireNonNull(source);
if (!(source instanceof Serializable)) {
throw new IllegalArgumentException("source must be java.io.Serializable");
}
if (elementsPerSecond >= 1) {
// how long does element take
this.sleepBatchSize = 1;
this.sleepBatchTime = 3000 / elementsPerSecond;
}
else {
throw new IllegalArgumentException("'elements per second' must be positive and not zero");
}
}
@Override
public boolean hasNext() {
return source.hasNext();
}
@Override
public T next() {
// delay if necessary
if (lastBatchCheckTime > 0) {
if (++num >= sleepBatchSize) {
num = 0;
final long now = System.currentTimeMillis();
final long elapsed = now - lastBatchCheckTime;
if (elapsed < sleepBatchTime /2 ) {
try {
Thread.sleep(sleepBatchTime - elapsed);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}else {
try {
if (elapsed < sleepBatchTime) {
Thread.sleep(sleepBatchTime - (sleepBatchTime-elapsed));
}else if(elapsed == sleepBatchTime){
Thread.sleep(sleepBatchTime);
}else{
Thread.sleep(sleepBatchTime - (elapsed - sleepBatchTime));
}
} catch (InterruptedException e) {
// restore interrupt flag and proceed
Thread.currentThread().interrupt();
}
}
lastBatchCheckTime = now;
}
} else {
lastBatchCheckTime = System.currentTimeMillis();
}
return source.next();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
}

AllowLateness实现类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package com.opensourceteams.module.bigdata.flink.window.streaming;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
/**
* Created by zhanghongming on 2019/8/17.
*/
public class StreamingJobAllowLateness {
public static void main(String[] args) throws Exception {
Configuration configuration= new Configuration(){
{
setInteger("rest.port",9191);
setBoolean("local.start-webserver",true);
}
};
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
env.setParallelism(3).setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
OutputTag<Order> lateOutputTag = new OutputTag<Order>("late-data"){};
final ParameterTool params = ParameterTool.fromArgs(args);
final long rate = params.getLong("rate", 1L);
DataStream<Tuple5<Long, Long, Long, Long, Long>> source = WindowData.GradeSource.getSource(env, rate);
SingleOutputStreamOperator<Tuple5<Long, Long, Long, Long, Long>> outputStreamOperator = source.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<Tuple5<Long, Long, Long, Long, Long>>(Time.seconds(5)) {
@Override
public long extractTimestamp(Tuple5<Long, Long, Long, Long, Long> longLongLongLongLongTuple5) {
return longLongLongLongLongTuple5.f0;
}
});
SingleOutputStreamOperator<OrderSummary> result = outputStreamOperator.keyBy(1).timeWindow(Time.seconds(5))
.allowedLateness(Time.seconds(2))
.aggregate(new AggregateFunction<Tuple5<Long, Long, Long, Long, Long>, Tuple4<Long, Long, Long, Long>, Tuple4<Long, Long, Long, Long>>() {
@Override
public Tuple4<Long, Long, Long, Long> createAccumulator() {
return Tuple4.of(0L, 0L, 0L, 0L);
}
@Override
public Tuple4<Long, Long, Long, Long> add(Tuple5<Long, Long, Long, Long, Long> value, Tuple4<Long, Long, Long, Long> accumulator) {
return Tuple4.of(value.f1, accumulator.f1 + 1, accumulator.f2 + value.f2, accumulator.f3 + value.f3 * value.f4);
}
@Override
public Tuple4<Long, Long, Long, Long> getResult(Tuple4<Long, Long, Long, Long> accumulator) {
return accumulator;
}
@Override
public Tuple4<Long, Long, Long, Long> merge(Tuple4<Long, Long, Long, Long> a, Tuple4<Long, Long, Long, Long> b) {
return Tuple4.of(a.f0 + b.f0, a.f1 + b.f1, a.f2 + b.f2, a.f3 + b.f3);
}
}, new WindowFunction<Tuple4<Long, Long, Long, Long>, OrderSummary, Tuple, TimeWindow>() {
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple4<Long, Long, Long, Long>> input, Collector<OrderSummary> out) throws Exception {
input.forEach(record ->
out.collect(new OrderSummary(window.getStart(), window.getEnd(), record.f0, record.f1, record.f2, record.f3))
);
}
});
result.print();
//result.getSideOutput(lateOutputTag).print();
env.execute("Flink StreamingJobEventTime");
}
private static class NameKeySelector implements KeySelector<Tuple5<Long,Long,Long,Long,Long>, Long> {
@Override
public Long getKey(Tuple5<Long, Long, Long, Long, Long> longLongLongLongLongTuple5) throws Exception {
return longLongLongLongLongTuple5.f1;
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class OrderSummary{
private static final DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
public OrderSummary(long start,long end,long userid,long orderNum,long amountNum,long total){
this.windowStart = Instant.ofEpochMilli(start).atZone(ZoneId.systemDefault()).format(timeFormatter);
this.windowEnd = Instant.ofEpochMilli(end).atZone(ZoneId.systemDefault()).format(timeFormatter);
this.userid = userid;
this.orderNum = orderNum;
this.amountNum = amountNum;
this.total = total;
}
private String windowStart;
private String windowEnd;
private long userid;
private long orderNum;
private long amountNum;
private long total;
}
}

延迟的数据会被收集到窗口中:

assignTimestampsAndWatermarks 为五秒故23:03:20~23:03:25 秒的窗口的数据再23:03:25+5 =23:03:30秒的数据到来时触发了。

同理:
23:03:35~23:03:40的窗口的数据再23:03:45秒后的第一个数据23:03:45被触发了计算
由于allowedLateness(Time.seconds(2)) 允许2秒的延迟数据:
因此在watermark < end-of-window + allowedLateness23:03:47秒内到达的数据都被计算了

此处输入图片的描述

坚持原创技术分享,您的支持将鼓励我继续创作!