事件时间 | 张洪铭的个人博客
张洪铭的个人博客

事件时间

此处输入图片的描述

精简介绍:
Processing time:
Processing time refers to the system time of the machine that is executing the respective operation.
Processing time 是指执行相应操作的机器的系统时间。

Event time: :
Event time is the time that each individual event occurred on its producing device.
Event time programs must specify how to generate Event Time Watermarks,
which is the mechanism that signals progress in event time.
事件时间是每个事件在其生产设备上发生的时间。
事件时间程序必须指定如何生成事件时间水印,这是事件时间中表示进度的机制

Ingestion time:
Ingestion time is the time that events enter Flink. At the source operator each record gets the source’s current time as a timestamp, and time-based operations (like time windows) refer to that timestamp.
摄取时间是事件进入燧石的时间。在源操作符中,每个记录都将源的当前时间作为时间戳,基于时间的操作(如时间窗口)引用该时间戳。

官方详细介绍:中文翻译:

处理时间:处理时间是指执行相应 算子操作的机器的系统时间。

当流程序在处理时间运行时,所有基于时间的 算子操作(如时间窗口)将使用运行相应算子的机器的系统时钟。每小时处理时间窗口将包括在系统时钟指示整个小时之间到达特定算子的所有记录。例如,如果应用程序在上午9:15开始运行,则第一个每小时处理时间窗口将包括在上午9:15到上午10:00之间处理的事件,下一个窗口将包括在上午10:00到11:00之间处理的事件,因此上。

处理时间是最简单的时间概念,不需要流和机器之间的协调。它提供最佳性能和最低延迟。但是,在分布式和异步环境中,处理时间不提供确定性,因为它容易受到记录到达系统的速度(例如从消息队列)到记录在系统内的算子之间流动的速度的影响。和停电(预定或其他)。

事件时间:事件时间是每个事件在其生产设备上发生的时间。此时间通常在进入Flink之前嵌入记录中,并且 可以从每个记录中提取该事件时间戳。在事件时间,时间的进展取决于数据,而不是任何挂钟。事件时间程序必须指定如何生成事件时间水印,这是表示事件时间进度的机制。该水印机制在下面的后面部分中描述。

在一个完美的世界中,事件时间处理将产生完全一致和确定的结果,无论事件何时到达,或者它们的排序。但是,除非事件已知按顺序到达(按时间戳),否则事件时间处理会在等待无序事件时产生一些延迟。由于只能等待一段有限的时间,因此限制了确定性事件时间应用程序的可能性。

假设所有数据都已到达,事件时间 算子操作将按预期运行,即使在处理无序或延迟事件或重新处理历史数据时也会产生正确且一致的结果。例如,每小时事件时间窗口将包含带有落入该小时的事件时间戳的所有记录,无论它们到达的顺序如何,或者何时处理它们。(有关更多信息,请参阅有关迟发事件的部分。)

请注意,有时当事件时间程序实时处理实时数据时,它们将使用一些处理时间 算子操作,以确保它们及时进行。

摄取时间:摄取时间是事件进入Flink的时间。在源算子处,每个记录将源的当前时间作为时间戳,并且基于时间的 算子操作(如时间窗口)引用该时间戳。

摄取时间在概念上位于事件时间和处理时间之间。与处理时间相比 ,它稍贵一些,但可以提供更可预测的结果。因为 摄取时间使用稳定的时间戳(在源处分配一次),所以对记录的不同窗口 算子操作将引用相同的时间戳,而在处理时间中,每个窗口算子可以将记录分配给不同的窗口(基于本地系统时钟和任何运输延误)。

与事件时间相比,摄取时间程序无法处理任何无序事件或后期数据,但程序不必指定如何生成水印。

在内部,摄取时间与事件时间非常相似,但具有自动时间戳分配和自动水印生成函数。

JobEventTime例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package org.apache.flink.streaming.examples.wordcount.util;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
/**
* Created by zhanghongming on 2019/8/17.
*/
public class StreamingJobEventTime {
public static void main(String[] args) throws Exception {
Configuration configuration= new Configuration(){
{
setInteger("rest.port",9191);
setBoolean("local.start-webserver",true);
}
};
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
env.setParallelism(3).setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.fromElements(
new Order(1218182888000L,100L,1001L,10L,1000L),
new Order(1218182889000L,101L,1002L,15L,2000L),
new Order(1218182890000L,100L,1003L,20L,3000L),
new Order(1218182891000L,101L,1004L,25L,4000L),
new Order(1218182892000L,100L,1005L,30L,5000L),
new Order(1218182893000L,101L,1006L,35L,6000L)
).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Order>(Time.seconds(5)) {
@Override
public long extractTimestamp(Order element) {
return element.getTimestamp();
}
}).keyBy("userid")
.timeWindow(Time.seconds(5))
.aggregate(new AggregateFunction<Order, Tuple4<Long, Long, Long, Long>, Tuple4<Long, Long, Long, Long>>() {
@Override
public Tuple4<Long, Long, Long, Long> createAccumulator() {
return Tuple4.of(0L, 0L, 0L, 0L);
}
@Override
public Tuple4<Long, Long, Long, Long> add(Order value, Tuple4<Long, Long, Long, Long> accumulator) {
return Tuple4.of(value.userid, accumulator.f1 + 1, accumulator.f2 + value.amount, accumulator.f3 + value.amount * value.price);
}
@Override
public Tuple4<Long, Long, Long, Long> getResult(Tuple4<Long, Long, Long, Long> accumulator) {
return accumulator;
}
@Override
public Tuple4<Long, Long, Long, Long> merge(Tuple4<Long, Long, Long, Long> a, Tuple4<Long, Long, Long, Long> b) {
return Tuple4.of(a.f0 + b.f0, a.f1 + b.f1, a.f2 + b.f2, a.f3 + b.f3);
}
}, new WindowFunction<Tuple4<Long,Long,Long,Long>, OrderSummary, Tuple, TimeWindow>() {
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple4<Long, Long, Long, Long>> input, Collector<OrderSummary> out) throws Exception {
input.forEach(record ->
out.collect(new OrderSummary(window.getStart(),window.getEnd(),record.f0,record.f1,record.f2,record.f3))
);
}
}).print();
env.execute("Flink StreamingJobEventTime");
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class Order{
private long timestamp;
private long userid;
private long itemid;
private long amount;
private long price;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class OrderSummary{
private static final DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
public OrderSummary(long start,long end,long userid,long orderNum,long amountNum,long total){
this.windowStart = Instant.ofEpochMilli(start).atZone(ZoneId.systemDefault()).format(timeFormatter);
this.windowEnd = Instant.ofEpochMilli(end).atZone(ZoneId.systemDefault()).format(timeFormatter);
this.userid = userid;
this.orderNum = orderNum;
this.amountNum = amountNum;
this.total = total;
}
private String windowStart;
private String windowEnd;
private long userid;
private long orderNum;
private long amountNum;
private long total;
}
}

执行结果:

1
2
3
4
2> StreamingJobEventTime.OrderSummary(windowStart=2008-08-08 16:08:05, windowEnd=2008-08-08 16:08:10, userid=100, orderNum=1, amountNum=10, total=10000)
2> StreamingJobEventTime.OrderSummary(windowStart=2008-08-08 16:08:10, windowEnd=2008-08-08 16:08:15, userid=100, orderNum=2, amountNum=50, total=210000)
1> StreamingJobEventTime.OrderSummary(windowStart=2008-08-08 16:08:05, windowEnd=2008-08-08 16:08:10, userid=101, orderNum=1, amountNum=15, total=30000)
1> StreamingJobEventTime.OrderSummary(windowStart=2008-08-08 16:08:10, windowEnd=2008-08-08 16:08:15, userid=101, orderNum=2, amountNum=60, total=310000)

ProcessingTime例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package org.apache.flink.streaming.examples.wordcount.util;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
/**
* Created by zhanghongming on 2019/8/17.
*/
public class StreamingJobProcessingTime {
public static void main(String[] args) throws Exception {
Configuration configuration= new Configuration(){
{
setInteger("rest.port",9191);
setBoolean("local.start-webserver",true);
}
};
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
env.setParallelism(1).setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.fromElements(
new Order(1218182888000L,100L,1001L,10L,1000L),
new Order(1218182889000L,101L,1002L,15L,2000L),
new Order(1218182890000L,100L,1003L,20L,3000L),
new Order(1218182891000L,101L,1004L,25L,4000L),
new Order(1218182892000L,100L,1005L,30L,5000L),
new Order(1218182893000L,101L,1006L,35L,6000L)
).keyBy("userid")
.timeWindow(Time.seconds(5))
.aggregate(new AggregateFunction<Order, Tuple4<Long, Long, Long, Long>, Tuple4<Long, Long, Long, Long>>() {
@Override
public Tuple4<Long, Long, Long, Long> createAccumulator() {
return Tuple4.of(0L, 0L, 0L, 0L);
}
@Override
public Tuple4<Long, Long, Long, Long> add(Order value, Tuple4<Long, Long, Long, Long> accumulator) {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Tuple4.of(value.userid, accumulator.f1 + 1, accumulator.f2 + value.amount, accumulator.f3 + value.amount * value.price);
}
@Override
public Tuple4<Long, Long, Long, Long> getResult(Tuple4<Long, Long, Long, Long> accumulator) {
return accumulator;
}
@Override
public Tuple4<Long, Long, Long, Long> merge(Tuple4<Long, Long, Long, Long> a, Tuple4<Long, Long, Long, Long> b) {
return Tuple4.of(a.f0 + b.f0, a.f1 + b.f1, a.f2 + b.f2, a.f3 + b.f3);
}
}, new WindowFunction<Tuple4<Long,Long,Long,Long>, OrderSummary, Tuple, TimeWindow>() {
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple4<Long, Long, Long, Long>> input, Collector<OrderSummary> out) throws Exception {
input.forEach(record ->
out.collect(new OrderSummary(window.getStart(),window.getEnd(),record.f0,record.f1,record.f2,record.f3))
);
}
}).print();
env.execute("Flink StreamingJobEventTime");
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class Order{
private long timestamp;
private long userid;
private long itemid;
private long amount;
private long price;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class OrderSummary{
private static final DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
public OrderSummary(long start,long end,long userid,long orderNum,long amountNum,long total){
this.windowStart = Instant.ofEpochMilli(start).atZone(ZoneId.systemDefault()).format(timeFormatter);
this.windowEnd = Instant.ofEpochMilli(end).atZone(ZoneId.systemDefault()).format(timeFormatter);
this.userid = userid;
this.orderNum = orderNum;
this.amountNum = amountNum;
this.total = total;
}
private String windowStart;
private String windowEnd;
private long userid;
private long orderNum;
private long amountNum;
private long total;
}
}

执行结果:

1
2
3
4
5
6
StreamingJobProcessingTime.OrderSummary(windowStart=2019-08-17 22:03:35, windowEnd=2019-08-17 22:03:40, userid=100, orderNum=1, amountNum=10, total=10000)
StreamingJobProcessingTime.OrderSummary(windowStart=2019-08-17 22:03:40, windowEnd=2019-08-17 22:03:45, userid=101, orderNum=1, amountNum=15, total=30000)
StreamingJobProcessingTime.OrderSummary(windowStart=2019-08-17 22:03:45, windowEnd=2019-08-17 22:03:50, userid=100, orderNum=1, amountNum=20, total=60000)
StreamingJobProcessingTime.OrderSummary(windowStart=2019-08-17 22:03:45, windowEnd=2019-08-17 22:03:50, userid=101, orderNum=1, amountNum=25, total=100000)
StreamingJobProcessingTime.OrderSummary(windowStart=2019-08-17 22:03:50, windowEnd=2019-08-17 22:03:55, userid=100, orderNum=1, amountNum=30, total=150000)
StreamingJobProcessingTime.OrderSummary(windowStart=2019-08-17 22:03:50, windowEnd=2019-08-17 22:03:55, userid=101, orderNum=1, amountNum=35, total=210000)

Watermarks

Flink中测量事件时间进度的机制是水印。水印作为数据流的一部分流动并带有时间戳t。水印(t)声明事件时间已达到该流中的时间t,这意味着时间戳t’<=t(即时间戳较旧或等于水印的事件)的流中不应存在更多元素。

此处输入图片的描述

水印对于无序流至关重要,如下图所示,其中事件不是按时间戳排序的。一般来说,水印是一种声明,在流中的该点之前,到某个时间戳之前的所有事件都应该到达。一旦operator抵达水印处,operator可以将其内部事件时钟提前到水印的值。

此处输入图片的描述

此处输入图片的描述

Allowed Lateness:

在处理事件时间窗口时,可能会发生元素到达晚的情况,即Flink用来跟踪事件时间进度的水印已经超过了元素所属窗口的结束时间戳。有关Flink如何处理事件时间的更深入讨论,请参阅事件时间,尤其是后期元素。

默认情况下,当水印超过窗口末尾时,将删除后期元素。然而,Flink允许为窗口操作符指定最大允许延迟。allowed lateness指定在删除元素之前可以延迟多少时间,其默认值为0。在水印通过窗口结束之后,但在它通过窗口结束之前,再加上允许的延迟时间到达的元素仍会添加到窗口中。根据使用的触发器,延迟但未丢弃的元素可能会导致窗口再次触发。这就是EventTimeTrigger的情况。

Allowed Lateness例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package org.apache.flink.streaming.examples.wordcount.util;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
/**
* Created by zhanghongming on 2019/8/17.
*/
public class StreamingJobAllowLateness {
public static void main(String[] args) throws Exception {
Configuration configuration= new Configuration(){
{
setInteger("rest.port",9191);
setBoolean("local.start-webserver",true);
}
};
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
env.setParallelism(3).setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
final OutputTag<Order> lateOutputTag = new OutputTag<Order>("late-data"){};
SingleOutputStreamOperator<OrderSummary> result = env.fromElements(
new Order(1218182888000L, 100L, 1001L, 10L, 1000L),
new Order(1218182889000L, 101L, 1002L, 15L, 2000L),
new Order(1218182890000L, 100L, 1003L, 20L, 3000L),
new Order(1218182891000L, 101L, 1004L, 25L, 4000L),
new Order(1218182892000L, 100L, 1005L, 30L, 5000L),
new Order(1218182893000L, 101L, 1006L, 35L, 6000L),
new Order(1218182896000L, 101L, 1006L, 35L, 6000L),
new Order(1218182888000L, 100L, 1007L, 40L, 7000L)
).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Order>(Time.seconds(5)) {
@Override
public long extractTimestamp(Order element) {
return element.getTimestamp();
}
}).keyBy("userid")
.timeWindow(Time.seconds(5))
.sideOutputLateData(lateOutputTag)
.allowedLateness(Time.seconds(2))
.aggregate(new AggregateFunction<Order, Tuple4<Long, Long, Long, Long>, Tuple4<Long, Long, Long, Long>>() {
@Override
public Tuple4<Long, Long, Long, Long> createAccumulator() {
return Tuple4.of(0L, 0L, 0L, 0L);
}
@Override
public Tuple4<Long, Long, Long, Long> add(Order value, Tuple4<Long, Long, Long, Long> accumulator) {
return Tuple4.of(value.userid, accumulator.f1 + 1, accumulator.f2 + value.amount, accumulator.f3 + value.amount * value.price);
}
@Override
public Tuple4<Long, Long, Long, Long> getResult(Tuple4<Long, Long, Long, Long> accumulator) {
return accumulator;
}
@Override
public Tuple4<Long, Long, Long, Long> merge(Tuple4<Long, Long, Long, Long> a, Tuple4<Long, Long, Long, Long> b) {
return Tuple4.of(a.f0 + b.f0, a.f1 + b.f1, a.f2 + b.f2, a.f3 + b.f3);
}
}, new WindowFunction<Tuple4<Long, Long, Long, Long>, OrderSummary, Tuple, TimeWindow>() {
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple4<Long, Long, Long, Long>> input, Collector<OrderSummary> out) throws Exception {
input.forEach(record ->
out.collect(new OrderSummary(window.getStart(), window.getEnd(), record.f0, record.f1, record.f2, record.f3))
);
}
});
result.print();
result.getSideOutput(lateOutputTag).print();
env.execute("Flink StreamingJobEventTime");
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class Order{
private long timestamp;
private long userid;
private long itemid;
private long amount;
private long price;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class OrderSummary{
private static final DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
public OrderSummary(long start,long end,long userid,long orderNum,long amountNum,long total){
this.windowStart = Instant.ofEpochMilli(start).atZone(ZoneId.systemDefault()).format(timeFormatter);
this.windowEnd = Instant.ofEpochMilli(end).atZone(ZoneId.systemDefault()).format(timeFormatter);
this.userid = userid;
this.orderNum = orderNum;
this.amountNum = amountNum;
this.total = total;
}
private String windowStart;
private String windowEnd;
private long userid;
private long orderNum;
private long amountNum;
private long total;
}
}

黑色为输出的晚到数据
此处输入图片的描述

坚持原创技术分享,您的支持将鼓励我继续创作!