聊聊storm的WindowedBolt

聊聊storm的WindowedBolttrack方法根据tuple的时间戳与lastWaterMarkTs判断,是否需要处理该tuplelastWaterMarkTs在WaterMa

欢迎大家来到IT世界,在知识的湖畔探索吧!

本文主要研究一下storm的WindowedBolt

聊聊storm的WindowedBolt

实例

 @Test public void testSlidingTupleTsTopology() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("integer", new RandomIntegerSpout(), 1); BaseWindowedBolt baseWindowedBolt = new SlidingWindowSumBolt() //windowLength , slidingInterval .withWindow(new BaseWindowedBolt.Duration(5, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(3, TimeUnit.SECONDS)) //通过withTimestampField指定tuple的某个字段作为这个tuple的timestamp .withTimestampField("timestamp") //输入流中最新的元组时间戳的最小值减去Lag值=watermark,用于指定触发watermark的的interval,默认为1秒 //当watermark被触发的时候,tuple timestamp比watermark早的window将被计算 .withWatermarkInterval(new BaseWindowedBolt.Duration(1, TimeUnit.SECONDS)) //withLag用于处理乱序的数据,当接收到的tuple的timestamp小于等lastWaterMarkTs(`取这批watermark的最大值`),则会被丢弃 .withLag(new BaseWindowedBolt.Duration(5, TimeUnit.SECONDS)); builder.setBolt("slidingSum", baseWindowedBolt, 1).shuffleGrouping("integer"); builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("slidingSum"); SubmitHelper.submitRemote("slideWindowTopology",builder.createTopology()); } 

欢迎大家来到IT世界,在知识的湖畔探索吧!

  • 这里主要设置了withWindow、withTimestampField、withWatermarkInterval、withLag
  • SlidingWindowSumBolt
欢迎大家来到IT世界,在知识的湖畔探索吧!public class SlidingWindowSumBolt extends BaseWindowedBolt { private static final Logger LOG = LoggerFactory.getLogger(SlidingWindowSumBolt.class); private int sum = 0; private OutputCollector collector; @Override public void prepare(Map topoConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(TupleWindow inputWindow) { /* * The inputWindow gives a view of * (a) all the events in the window * (b) events that expired since last activation of the window * (c) events that newly arrived since last activation of the window */ List<Tuple> tuplesInWindow = inputWindow.get(); List<Tuple> newTuples = inputWindow.getNew(); List<Tuple> expiredTuples = inputWindow.getExpired(); LOG.debug("Events in current window: " + tuplesInWindow.size()); /* * Instead of iterating over all the tuples in the window to compute * the sum, the values for the new events are added and old events are * subtracted. Similar optimizations might be possible in other * windowing computations. */ for (Tuple tuple : newTuples) { sum += (int) tuple.getValue(0); } for (Tuple tuple : expiredTuples) { sum -= (int) tuple.getValue(0); } collector.emit(new Values(sum)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sum")); } } 
  • TupleWindow可以获取三类值,一类是当前窗口里头的所有数据,一类是上次窗口后新到达的数据,一类是过期的数据

WindowedBolt

IWindowedBolt

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/IWindowedBolt.java

/** * A bolt abstraction for supporting time and count based sliding & tumbling windows. */ public interface IWindowedBolt extends IComponent { /** * This is similar to the {@link org.apache.storm.task.IBolt#prepare(Map, TopologyContext, OutputCollector)} except that while emitting, * the tuples are automatically anchored to the tuples in the inputWindow. */ void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector); /** * Process the tuple window and optionally emit new tuples based on the tuples in the input window. */ void execute(TupleWindow inputWindow); void cleanup(); /** * Return a {@link TimestampExtractor} for extracting timestamps from a tuple for event time based processing, or null for processing * time. * * @return the timestamp extractor */ TimestampExtractor getTimestampExtractor(); } 
  • IWindowedBolt是无状态的,也就是window的数据都存在内存中
  • IWindowedBolt接口有个抽象实现类BaseWindowedBolt,其子类有BaseStatefulWindowedBolt、JoinBolt

IStatefulWindowedBolt

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/IStatefulWindowedBolt.java

欢迎大家来到IT世界,在知识的湖畔探索吧!/** * A windowed bolt abstraction for supporting windowing operation with state. */ public interface IStatefulWindowedBolt<T extends State> extends IStatefulComponent<T>, IWindowedBolt { /** * If the stateful windowed bolt should have its windows persisted in state and maintain a subset of events in memory. * <p> * The default is to keep all the window events in memory. * </p> * * @return true if the windows should be persisted */ default boolean isPersistent() { return false; } /** * The maximum number of window events to keep in memory. */ default long maxEventsInMemory() { return 1_000_000L; // default } } 
  • 在1.2.2版本IStatefulWindowedBolt没有定义任何方法,2.0.0版本定义了两个default方法,一个是isPersistent,一个是maxEventsInMemory
  • isPersistent决定创建的是PersistentWindowedBoltExecutor还是StatefulWindowedBoltExecutor
  • maxEventsInMemory决定WindowState保留多少数据在内存,其余的移到KeyValueState(HBaseKeyValueState、InMemoryKeyValueState、RedisKeyValueState)中
  • IStatefulWindowedBolt接口有个抽象实现类BaseStatefulWindowedBolt

withWindow与withTumblingWindow

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java

 /** * Tuple count based sliding window configuration. * * @param windowLength the number of tuples in the window * @param slidingInterval the number of tuples after which the window slides */ public BaseWindowedBolt withWindow(Count windowLength, Count slidingInterval) { return withWindowLength(windowLength).withSlidingInterval(slidingInterval); } /** * Time duration based sliding window configuration. * * @param windowLength the time duration of the window * @param slidingInterval the time duration after which the window slides */ public BaseWindowedBolt withWindow(Duration windowLength, Duration slidingInterval) { return withWindowLength(windowLength).withSlidingInterval(slidingInterval); } /** * A time duration based tumbling window. * * @param duration the time duration after which the window tumbles */ public BaseWindowedBolt withTumblingWindow(Duration duration) { return withWindowLength(duration).withSlidingInterval(duration); } /** * A count based tumbling window. * * @param count the number of tuples after which the window tumbles */ public BaseWindowedBolt withTumblingWindow(Count count) { return withWindowLength(count).withSlidingInterval(count); } 
  • BaseWindowedBolt抽象类定义了诸多withWindow方法,该方法主要定义windowLength及slidingIntervals参数,而该参数有两个维度,一个是Duration,一个是Count
  • withWindow即sliding window,而withTumblingWindow则是tumbling window
  • 从方法定义可以看到withTumblingWindow的windowLength及slidingInterval参数值相同

WindowedBoltExecutor

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java

public class WindowedBoltExecutor implements IRichBolt { public static final String LATE_TUPLE_FIELD = "late_tuple"; private static final Logger LOG = LoggerFactory.getLogger(WindowedBoltExecutor.class); private static final int DEFAULT_WATERMARK_EVENT_INTERVAL_MS = 1000; // 1s private static final int DEFAULT_MAX_LAG_MS = 0; // no lag //...... private WindowManager<Tuple> initWindowManager(WindowLifecycleListener<Tuple> lifecycleListener, Map<String, Object> topoConf, TopologyContext context, Collection<Event<Tuple>> queue, boolean stateful) { WindowManager<Tuple> manager = stateful ? new StatefulWindowManager<>(lifecycleListener, queue) : new WindowManager<>(lifecycleListener, queue); Count windowLengthCount = null; Duration slidingIntervalDuration = null; Count slidingIntervalCount = null; // window length if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)) { windowLengthCount = new Count(((Number) topoConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)).intValue()); } else if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)) { windowLengthDuration = new Duration( ((Number) topoConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)).intValue(), TimeUnit.MILLISECONDS); } // sliding interval if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)) { slidingIntervalCount = new Count(((Number) topoConf.get(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)).intValue()); } else if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS)) { slidingIntervalDuration = new Duration(((Number) topoConf.get(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS)).intValue(), TimeUnit.MILLISECONDS); } else { // default is a sliding window of count 1 slidingIntervalCount = new Count(1); } // tuple ts if (timestampExtractor != null) { // late tuple stream lateTupleStream = (String) topoConf.get(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM); if (lateTupleStream != null) { if (!context.getThisStreams().contains(lateTupleStream)) { throw new IllegalArgumentException( "Stream for late tuples must be defined with the builder method withLateTupleStream"); } } // max lag if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS)) { maxLagMs = ((Number) topoConf.get(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS)).intValue(); } else { maxLagMs = DEFAULT_MAX_LAG_MS; } // watermark interval int watermarkInterval; if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS)) { watermarkInterval = ((Number) topoConf.get(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS)).intValue(); } else { watermarkInterval = DEFAULT_WATERMARK_EVENT_INTERVAL_MS; } waterMarkEventGenerator = new WaterMarkEventGenerator<>(manager, watermarkInterval, maxLagMs, getComponentStreams(context)); } else { if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM)) { throw new IllegalArgumentException("Late tuple stream can be defined only when specifying a timestamp field"); } } // validate validate(topoConf, windowLengthCount, windowLengthDuration, slidingIntervalCount, slidingIntervalDuration); evictionPolicy = getEvictionPolicy(windowLengthCount, windowLengthDuration); triggerPolicy = getTriggerPolicy(slidingIntervalCount, slidingIntervalDuration, manager, evictionPolicy); manager.setEvictionPolicy(evictionPolicy); manager.setTriggerPolicy(triggerPolicy); return manager; } @Override public void execute(Tuple input) { if (isTupleTs()) { long ts = timestampExtractor.extractTimestamp(input); if (waterMarkEventGenerator.track(input.getSourceGlobalStreamId(), ts)) { windowManager.add(input, ts); } else { if (lateTupleStream != null) { windowedOutputCollector.emit(lateTupleStream, input, new Values(input)); } else { LOG.info("Received a late tuple {} with ts {}. This will not be processed.", input, ts); } windowedOutputCollector.ack(input); } } else { windowManager.add(input); } } 
  • initWindowManager会读取maxLags的值,默认为0,即没有lag,之后创建WaterMarkEventGenerator的时候传入了maxLags参数
  • 如果waterMarkEventGenerator.track方法返回false,且没有配置Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM的话,则会打印log,其format为Received a late tuple {} with ts {}. This will not be processed.

WaterMarkEventGenerator

storm-2.0.0/storm-client/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java

public class WaterMarkEventGenerator<T> implements Runnable { /** * Creates a new WatermarkEventGenerator. * * @param windowManager The window manager this generator will submit watermark events to * @param intervalMs The generator will check if it should generate a watermark event with this interval * @param eventTsLagMs The max allowed lag behind the last watermark event before an event is considered late * @param inputStreams The input streams this generator is expected to handle */ public WaterMarkEventGenerator(WindowManager<T> windowManager, int intervalMs, int eventTsLagMs, Set<GlobalStreamId> inputStreams) { this.windowManager = windowManager; streamToTs = new ConcurrentHashMap<>(); ThreadFactory threadFactory = new ThreadFactoryBuilder() .setNameFormat("watermark-event-generator-%d") .setDaemon(true) .build(); executorService = Executors.newSingleThreadScheduledExecutor(threadFactory); this.interval = intervalMs; this.eventTsLag = eventTsLagMs; this.inputStreams = inputStreams; } public void start() { this.executorFuture = executorService.scheduleAtFixedRate(this, interval, interval, TimeUnit.MILLISECONDS); } //...... /** * Tracks the timestamp of the event in the stream, returns true if the event can be considered for processing or false if its a late * event. */ public boolean track(GlobalStreamId stream, long ts) { Long currentVal = streamToTs.get(stream); if (currentVal == null || ts > currentVal) { streamToTs.put(stream, ts); } checkFailures(); return ts >= lastWaterMarkTs; } @Override public void run() { try { long waterMarkTs = computeWaterMarkTs(); if (waterMarkTs > lastWaterMarkTs) { this.windowManager.add(new WaterMarkEvent<>(waterMarkTs)); lastWaterMarkTs = waterMarkTs; } } catch (Throwable th) { LOG.error("Failed while processing watermark event ", th); throw th; } } /** * Computes the min ts across all streams. */ private long computeWaterMarkTs() { long ts = 0; // only if some data has arrived on each input stream if (streamToTs.size() >= inputStreams.size()) { ts = Long.MAX_VALUE; for (Map.Entry<GlobalStreamId, Long> entry : streamToTs.entrySet()) { ts = Math.min(ts, entry.getValue()); } } return ts - eventTsLag; } } 
  • track方法根据tuple的时间戳与lastWaterMarkTs判断,是否需要处理该tuple
  • lastWaterMarkTs在WaterMarkEventGenerator的run方法里头被更新,computeWaterMarkTs方法先计算streamToTs这批tuple的最小时间戳,然后减去eventTsLag,就是waterMarkTs值
  • 如果waterMarkTs大于lastWaterMarkTs,则更新,也就是说WaterMarkEventGenerator的run方法不断计算waterMarkTs,然后保证lastWaterMarkTs取waterMarkTs的最大值
  • WaterMarkEventGenerator在start方法里头触发一个定时调度任务,其时间间隔正是watermarkInterval,也就是run方法每隔watermarkInterval时间被执行一次

小结

  • storm的WindowedBolt分为IWindowedBolt及IStatefulWindowedBolt,一个是无状态的,一个是有状态的
  • window有两个重要的参数,一个是windowLength,一个是slidingInterval,它们有两个维度,一个是Duration,一个是Count
  • BaseWindowedBolt的withTumblingWindow方法设置的windowLength及slidingInterval参数值相同;即tumbling window是一种特殊的sliding window,两个参数值一样,即window不会重叠
  • WaterMarkEventGenerator会触发一个调度任务,每隔watermarkInterval时间计算一下waterMarkTs(输入流中最新的元组时间戳的最小值减去Lag值),然后如果比lastWaterMarkTs值大,则更新lastWaterMarkTs
  • WaterMarkEventGenerator.track方法用于计算该tuple是否应该处理,如果该tuple的timestamp小于lastWaterMarkTs,则返回false,如果有配置Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM则会发送给该stream,没有则打印log

doc

  • Windowing Support in Core Storm
  • Tumbling Windows vs Sliding Windows区别与联系

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://itzsg.com/68055.html

(0)

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

联系我们YX

mu99908888

在线咨询: 微信交谈

邮件:itzsgw@126.com

工作时间:时刻准备着!

关注微信