1
vvtf 2022-08-31 14:16:50 +08:00 1
2 种思路,
1 是使用滑动窗口, 窗口大小为 24h, 然后去重; 2 是记录一个上次 a 出现的时间, 如果大于 24h 就重新计算, 小于 24h 就跳过. 下面是方法 1 的代码: ds .windowAll(TumblingProcessingTimeWindows.of(Time.hours(24))) .process(new ProcessAllWindowFunction<String, String, TimeWindow>() { private static final long serialVersionUID = 1L; @Override public void process(ProcessAllWindowFunction<String, String, TimeWindow>.Context ctx, Iterable<String> values, Collector<String> out) throws Exception { boolean repeat = false; for (String value : values) { if (value.toLowerCase().indexOf("a") > -1) { if (repeat) { continue; } repeat = true; } out.collect(value); } } }) .print(); |
2
vvtf 2022-08-31 14:18:16 +08:00 1
if (value.toLowerCase().indexOf("a") > -1)
这个判断是我的测试代码, 你可以改成你的 json 方式判断. |
4
ihehe 2022-08-31 14:52:10 +08:00 via iPhone 1
一楼的滚动窗口是有问题的,24 小时才触发一次
这种简单的去重 keyby 后存个 state 就可以了 |