Flink中是怎么处理乱序数据的
在 Flink 里,“乱序”本质是 事件时间(event time) 先后顺序和 到达时间(processing time) 不一致。Flink 处理乱序数据的核心机制主要围绕:事件时间语义 + Watermark + 窗口触发/延迟 + 迟到数据处理 + 状态与一致性。
一、时间语义
Flink中有三种时间语义:
- Processing Time:按机器当前时间处理,最简单、吞吐高,但对乱序不敏感(乱序会直接导致窗口统计不准)。
- Event Time(最常用) :按事件自带时间戳处理,Flink 用 Watermark 推进“事件时间进度”,从而容忍一定乱序。
- Ingestion Time:介于两者之间(进入 Flink 时打时间戳),现在用得较少。
乱序处理几乎都建立在 Event Time 上。
二、Watermark
Watermark是乱序容忍与“事件时间进度”的关键。可以理解为:“我认为未来不会再来 时间戳 ≤ watermark 的事件了”(近似判断,允许误差)。
常见生成策略
- 有界乱序(Bounded Out-of-Orderness) :最常用
设定最大乱序程度maxOutOfOrderness,例如 5s:
watermark ≈ 当前观测到的最大事件时间 - 5s
→ 能容忍 5 秒内乱序,超过就可能变成迟到数据。 - 单调递增(Monotonous Timestamps) :适合源数据严格按时间递增(基本无乱序)。
- 自定义 WatermarkGenerator:适合多分区、多来源、需要特殊规则(比如按业务字段分组、按分区对齐等)。
关键点
- Watermark 不是“等数据到齐”,而是“推进时间并触发计算”的机制。
- 对于 并行 source,下游算子的 watermark 通常取 各输入分区 watermark 的最小值,因此某个分区卡住会拖慢整体事件时间推进。
三、窗口(Window)与触发(Trigger)
乱序数据最典型场景是做窗口聚合(滚动、滑动、会话窗口)。
Flink 的窗口何时“关窗”
- 默认基于事件时间时:当 watermark >= window_end 时触发窗口计算并输出。
允许迟到:Allowed Lateness(延迟关窗)
即使窗口第一次输出了,也可以设置:
- allowedLateness = X:窗口在
window_end 后再额外等 X 时间
在这段时间内到来的迟到事件仍会进入窗口并触发 更新输出(取决于输出模式/下游算子)。 - 超过 allowedLateness 的事件才会被视为“最终迟到”。
Trigger / Evictor(高级)
- Trigger:自定义触发逻辑(例如每来一条就触发、每 N 秒触发一次、同时满足事件数/时间等)。
- Evictor:触发前/后剔除窗口内元素(较少用,成本高,通常能用聚合/ProcessFunction替代)。
四、迟到数据(Late Events)处理
当事件到达时,它的事件时间戳已经 落后于 watermark(以及超过 allowed lateness),就会变成迟到数据。常见处理策略:
- 直接丢弃(默认)
简单但会损失数据(很多实时大盘能接受)。 - 侧输出(Side Output)收集迟到数据
把迟到数据打到一个旁路流,做补偿计算、落库、离线回补或告警。 - 允许迟到并更新结果(Allowed Lateness)
窗口结果会被修正(需要下游能接收“更新/撤回”语义,或你用 upsert sink)。 - 用更长乱序容忍(更大的 watermark 延迟)
减少迟到,但会增加延迟(latency)——典型的准确性 vs 延迟权衡。
五、有序输出与排序
如果你要的是“按事件时间严格有序输出”,Flink 不会全局帮你排序(成本太高),但你可以:
- Keyed 后用 KeyedProcessFunction + 状态 + 定时器
将事件暂存一段时间(比如 5s),等 watermark/定时器到了再按时间戳输出。 - 代价:更多状态、更高延迟、可能出现内存/状态膨胀,需要 TTL、容量控制。
六、与乱序紧密相关的运行时机制
乱序本身还会影响系统行为:
- 状态(State) :窗口/乱序缓存都依赖 state,乱序越大、allowed lateness 越大,state 留存越久。
- State TTL:防止“永不关闭”的 key 造成状态无限增长。
- Checkpoint & Exactly-Once:保证乱序场景下也能在故障恢复后维持一致结果(尤其窗口更新、迟到补偿更依赖一致性)。
- 反压(Backpressure) :watermark 推进慢、窗口堆积、state 变大,都可能导致下游慢→反压。
七、常见的选型建议
- 目标:低延迟优先,允许少量误差
用较小乱序 watermark(如 1-3s),不设置/少设置 allowed lateness,迟到侧输出做补偿。 - 目标:结果尽量准确,延迟可接受
watermark 延迟设大一些(如 10-60s),再配 allowed lateness;sink 用 upsert/幂等写支持更新。 - 目标:必须严格按事件时间有序输出
KeyedProcessFunction 缓存排序 + watermark/定时器释放,但要严格控制状态与延迟。
八、代码示例
8.1 DataStream API
8.1.1 定义Watermark
事件模型 + Watermark(容忍乱序)
public static class Event {
public String userId;
public String type; // e.g. "click"
public long eventTime; // epoch millis
public double amount; // optional
public Event() {}
public Event(String userId, String type, long eventTime, double amount) {
this.userId = userId;
this.type = type;
this.eventTime = eventTime;
this.amount = amount;
}
}
WatermarkStrategy:容忍 5 秒乱序 + source 空闲检测避免拖慢 watermark
import org.apache.flink.api.common.eventtime.*;
import java.time.Duration;
WatermarkStrategy<Event> wm = WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((e, ts) -> e.eventTime)
.withIdleness(Duration.ofSeconds(30)); // 可选:分区空闲 30s 视为 idle
8.1.2 窗口聚合
// 典型大盘统计:10 秒滚动窗口,允许再等 10 秒 修正;超过 allowed lateness 的走侧输出。
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;
public class WindowAggDemo {
private static final OutputTag<Event> LATE_TAG = new OutputTag<Event>("late-events") {};
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<Event> source = env.fromElements(
new Event("u1","click", 1_000L, 1),
new Event("u1","click", 4_000L, 1),
new Event("u1","click", 3_000L, 1), // 乱序
new Event("u1","click", 12_000L, 1),
new Event("u1","click", 2_000L, 1) // 可能迟到
);
WatermarkStrategy<Event> wm = WatermarkStrategy
.<Event>forBoundedOutOfOrderness(java.time.Duration.ofSeconds(5))
.withTimestampAssigner((e, ts) -> e.eventTime)
.withIdleness(java.time.Duration.ofSeconds(30));
SingleOutputStreamOperator<Event> withWm = source.assignTimestampsAndWatermarks(wm);
SingleOutputStreamOperator<Tuple2<String, Long>> cnt = withWm
.keyBy(e -> e.userId)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.allowedLateness(Time.seconds(10)) // 窗口结束后再等 10s
.sideOutputLateData(LATE_TAG) // 超过 allowed lateness 的进侧输出
.aggregate(new CountAgg());
cnt.print("WINDOW");
cnt.getSideOutput(LATE_TAG).print("LATE");
env.execute("Window Agg Out-of-Order Demo");
}
public static class CountAgg implements AggregateFunction<Event, Long, Tuple2<String, Long>> {
private String key;
@Override public Long createAccumulator() { return 0L; }
@Override public Long add(Event v, Long acc) { key = v.userId; return acc + 1; }
@Override public Tuple2<String, Long> getResult(Long acc) { return Tuple2.of(key, acc); }
@Override public Long merge(Long a, Long b) { return a + b; }
}
}
8.1.3 Join
Interval Join
// 两条流都要 assign watermarks,否则事件时间 join 不会按预期工作。
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
// left stream: orders
DataStream<Event> orders = ...; // 记得 assignTimestampsAndWatermarks(wm)
// right stream: payments
DataStream<Event> payments = ...; // 记得 assignTimestampsAndWatermarks(wm)
SingleOutputStreamOperator<String> joined =
orders.assignTimestampsAndWatermarks(wm)
.keyBy(e -> e.userId)
.intervalJoin(payments.assignTimestampsAndWatermarks(wm).keyBy(e -> e.userId))
// 匹配条件:payment.eventTime 在 order.eventTime 之后 0~600s
.between(Time.seconds(0), Time.minutes(10))
// 可选:迟到容忍(Interval Join 的迟到处理主要靠 watermark + 这个参数)
// .withLowerBoundExclusive() / .withUpperBoundExclusive() 也可用
.process(new ProcessJoinFunction<Event, Event, String>() {
@Override
public void processElement(Event left, Event right, Context ctx, Collector<String> out) {
out.collect("JOIN user=" + left.userId +
" orderTs=" + left.eventTime +
" payTs=" + right.eventTime);
}
});
joined.print("INTERVAL_JOIN");
Interval Join 的关键风险:状态膨胀
- join 本质要把一侧数据在 state 里“留一段时间等待匹配”
- 等待窗口大(between 范围大)+ 乱序大(watermark 延迟大)= state 留得更久
Window Join
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
DataStream<Event> a = ...; // with watermarks
DataStream<Event> b = ...; // with watermarks
DataStream<String> windowJoined =
a.keyBy(e -> e.userId)
.join(b.keyBy(e -> e.userId))
.where(e -> e.userId)
.equalTo(e -> e.userId)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.apply((left, right) -> "WIN_JOIN user=" + left.userId +
" aTs=" + left.eventTime + " bTs=" + right.eventTime);
windowJoined.print("WINDOW_JOIN");
8.2 Flink SQL
8.2.1 SQL:声明 watermark + TUMBLE 窗口聚合
CREATE TABLE events (
user_id STRING,
type STRING,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'events',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'g1',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
CREATE TABLE sink_print (
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
user_id STRING,
cnt BIGINT
) WITH ('connector'='print');
INSERT INTO sink_print
SELECT window_start, window_end, user_id, COUNT(*) AS cnt
FROM TABLE(
TUMBLE(TABLE events, DESCRIPTOR(ts), INTERVAL '10' SECOND)
)
GROUP BY window_start, window_end, user_id;
8.2.2 SQL:流-流 Interval Join
-- 写法核心:两张表都要 watermark,并在 ON 条件里写清楚时间范围。
CREATE TABLE orders (
order_id STRING,
user_id STRING,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (...);
CREATE TABLE payments (
pay_id STRING,
user_id STRING,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (...);
CREATE TABLE sink_join (
order_id STRING,
pay_id STRING,
user_id STRING,
order_ts TIMESTAMP(3),
pay_ts TIMESTAMP(3)
) WITH ('connector'='print');
INSERT INTO sink_join
SELECT
o.order_id,
p.pay_id,
o.user_id,
o.ts AS order_ts,
p.ts AS pay_ts
FROM orders o
JOIN payments p
ON o.user_id = p.user_id
-- payment 在 order 之后 0~10分钟内
AND p.ts BETWEEN o.ts AND o.ts + INTERVAL '10' MINUTE;
强烈建议)SQL 里配状态 TTL,防止 join 状态无限长
-- 让 join / 聚合等算子的 state 有 TTL(示例 2 小时,你按业务改)
SET 'table.exec.state.ttl' = '2 h';