Flink1.13架构全集| 一文带你由浅入深精通Flink方方面面:CEP篇B

Flink1.13架构全集| 一文带你由浅入深精通Flink方方面面:CEP篇B限定子类型调用.subtype()方法可以为当前模式增加子类型限制条件。例如:pattern.subtype(SubEvent.class);这

欢迎大家来到IT世界,在知识的湖畔探索吧!

限定子类型

调用.subtype()方法可以为当前模式增加子类型限制条件。例如:

pattern.subtype(SubEvent.class);

欢迎大家来到IT世界,在知识的湖畔探索吧!

这里SubEvent是流中数据类型Event的子类型。这时,只有当事件是SubEvent类型时,才可以满足当前模式pattern的匹配条件。

简单条件(Simple Conditions)

简单条件是最简单的匹配规则,只根据当前事件的特征来决定是否接受它。这在本质上其实就是一个filter操作。代码中我们为.where()方法传入一个SimpleCondition的实例作为参数。SimpleCondition是表示“简单条件”的抽象类,内部有一个.filter()方法,唯一的参数就是当前事件。所以它可以当作FilterFunction来使用。下面是一个具体示例:

欢迎大家来到IT世界,在知识的湖畔探索吧!pattern.where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) { return value.user.startsWith("A"); } });

迭代条件(Iterative Conditions)

简单条件只能基于当前事件做判断,能够处理的逻辑比较有限。在实际应用中,我们可能需要将当前事件跟之前的事件做对比,才能判断出要不要接受当前事件。这种需要依靠之前事件来做判断的条件,就叫作“迭代条件”(Iterative Condition)。

Flink CEP中,提供了IterativeCondition抽象类。这其实是更加通用的条件表达,查看源码可以发现,.where()方法本身要求的参数类型就是IterativeCondition;而之前的SimpleCondition是它的一个子类。在IterativeCondition中同样需要实现一个filter()方法,不过与SimpleCondition中不同的是,这个方法有两个参数:除了当前事件之外,还有一个上下文Context。调用这个上下文的.getEventsForPattern()方法,传入一个模式名称,就可以拿到这个模式中已匹配到的所有数据了。下面是一个具体示例:

middle.oneOrMore() .where(new IterativeCondition<Event>() { @Override public boolean filter(Event value, Context<Event> ctx) throws Exception { // 事件中的user必须以A开头 if (!value.user.startsWith("A")) { return false; } int sum = value.amount; // 获取当前模式之前已经匹配的事件,求所有事件amount之和 for (Event event : ctx.getEventsForPattern("middle")) { sum += event.amount; } // 在总数量小于100时,当前事件满足匹配规则,可以匹配成功 return sum < 100; } });

组合条件(Combining Conditions)

独立定义多个条件,然后在外部把它们连接起来,就可以构成一个“组合条件”(Combining Condition)。最简单的组合条件,就是.where()后面再接一个.where()。因为前面提到过,一个条件就像是一个filter操作,所以每次调用.where()方法都相当于做了一次过滤,连续多次调用就表示多重过滤,最终匹配的事件自然就会同时满足所有条件。这相当于就是多个条件的“逻辑与”(AND)。而多个条件的逻辑或(OR),则可以通过.where()后加一个.or()来实现。

终止条件(Stop Conditions)

对于循环模式而言,还可以指定一个“终止条件”(Stop Condition),表示遇到某个特定事件时当前模式就不再继续循环匹配了。终止条件的定义是通过调用模式对象的.until()方法来实现的,同样传入一个IterativeCondition作为参数。需要注意的是,终止条件只与oneOrMore()或者oneOrMore().optional()结合使用。

3.2 组合模式

有了定义好的个体模式,就可以尝试按一定的顺序把它们连接起来,定义一个完整的复杂事件匹配规则了。这种将多个个体模式组合起来的完整模式,就叫作“组合模式”(Combining Pattern),为了跟个体模式区分有时也叫作“模式序列”(Pattern Sequence)。一个组合模式有以下形式:

欢迎大家来到IT世界,在知识的湖畔探索吧!Pattern<Event, ?> pattern = Pattern .<Event>begin("start").where(...) .next("next").where(...) .followedBy("follow").where(...) ...

可以看到,组合模式确实就是一个“模式序列”,是用诸如begin、next、followedBy等表示先后顺序的“连接词”将个体模式串连起来得到的。

1. 初始模式(Initial Pattern)

所有的组合模式,都必须以一个“初始模式”开头;而初始模式必须通过调用Pattern的静态方法.begin()来创建。如下所示:Pattern<Event, ?> start = Pattern.begin(“start”); 这里我们调用Pattern的.begin()方法创建了一个初始模式。传入的String类型的参数就是模式的名称;而begin方法需要传入一个类型参数,这就是模式要检测流中事件的基本类型,这里我们定义为Event。调用的结果返回一个Pattern的对象实例。

2. 近邻条件(Contiguity Conditions)

模式之间的组合是通过一些“连接词”方法实现的,这些连接词指明了先后事件之间有着怎样的近邻关系,这就是所谓的“近邻条件”(Contiguity Conditions,也叫“连续性条件”)。Flink CEP中提供了三种近邻关系:

严格近邻(Strict Contiguity)

匹配的事件严格地按顺序一个接一个出现,中间不会有任何其他事件。代码中对应的就是Pattern的.next()方法,名称上就能看出来,“下一个”自然就是紧挨着的。

宽松近邻(Relaxed Contiguity)

宽松近邻只关心事件发生的顺序,而放宽了对匹配事件的“距离”要求,也就是说两个匹配的事件之间可以有其他不匹配的事件出现。代码中对应.followedBy()方法,很明显这表示“跟在后面”就可以,不需要紧紧相邻。

Flink1.13架构全集| 一文带你由浅入深精通Flink方方面面:CEP篇B

非确定性宽松近邻(Non-Deterministic Relaxed Contiguity)

这种近邻关系更加宽松。所谓“非确定性”是指可以重复使用之前已经匹配过的事件;这种近邻条件下匹配到的不同复杂事件,可以以同一个事件作为开始,所以匹配结果一般会比宽松近邻更多。代码中对应.followedByAny()方法。

Flink1.13架构全集| 一文带你由浅入深精通Flink方方面面:CEP篇B

3. 其他限制条件

除了上面提到的next()、followedBy()、followedByAny()可以分别表示三种近邻条件,我们还可以用否定的“连接词”来组合个体模式。主要包括:

.notNext() 表示前一个模式匹配到的事件后面,不能紧跟着某种事件。 .notFollowedBy() 表示前一个模式匹配到的事件后面,不会出现某种事件。这里需要注意,由于notFollowedBy()是没有严格限定的;流数据不停地到来,我们永远不能保证之后“不会出现某种事件”。所以一个模式序列不能以notFollowedBy()结尾,这个限定条件主要用来表示“两个事件中间不会出现某种事件”。

另外,Flink CEP中还可以为模式指定一个时间限制,这是通过调用.within()方法实现的。方法传入一个时间参数,这是模式序列中第一个事件到最后一个事件之间的最大时间间隔,只有在这期间成功匹配的复杂事件才是有效的。下面是模式序列中所有限制条件在代码中的定义:

// 严格近邻条件 Pattern<Event, ?> strict = start.next("middle").where(...); // 宽松近邻条件 Pattern<Event, ?> relaxed = start.followedBy("middle").where(...); // 非确定性宽松近邻条件 Pattern<Event, ?> nonDetermin = start.followedByAny("middle").where(...); // 不能严格近邻条件 Pattern<Event, ?> strictNot = start.notNext("not").where(...); // 不能宽松近邻条件 Pattern<Event, ?> relaxedNot = start.notFollowedBy("not").where(...); // 时间限制条件 middle.within(Time.seconds(10));

4. 循环模式中的近邻条件

在循环模式中,近邻关系同样有三种:严格近邻、宽松近邻以及非确定性宽松近邻。对于定义了量词(如oneOrMore()、times())的循环模式,默认内部采用的是宽松近邻。也就是说,当循环匹配多个事件时,它们中间是可以有其他不匹配事件的;相当于用单例模式分别定义、再用followedBy()连接起来。

.consecutive()

为循环模式中的匹配事件增加严格的近邻条件,保证所有匹配事件是严格连续的。也就是说,一旦中间出现了不匹配的事件,当前循环检测就会终止。这起到的效果跟模式序列中的next()一样,需要与循环量词times()、oneOrMore()配合使用。于是,检测连续三次登录失败的代码可以改成:

// 1. 定义Pattern,登录失败事件,循环检测3次 Pattern<LoginEvent, LoginEvent> pattern = Pattern .<LoginEvent>begin("fails") .where(new SimpleCondition<LoginEvent>() { @Override public boolean filter(LoginEvent loginEvent) throws Exception { return loginEvent.eventType.equals("fail"); } }).times(3).consecutive();

这样显得更加简洁;而且即使要扩展到连续100次登录失败,也只需要改动一个参数而已。

.allowCombinations()

除严格近邻外,也可以为循环模式中的事件指定非确定性宽松近邻条件,表示可以重复使用已经匹配的事件。这需要调用.allowCombinations()方法来实现,实现的效果与.followedByAny()相同。

3.3 模式组

一般来说,代码中定义的模式序列,就是我们在业务逻辑中匹配复杂事件的规则。不过在有些非常复杂的场景中,可能需要划分多个“阶段”,每个“阶段”又有一连串的匹配规则。为了应对这样的需求,Flink CEP允许我们以“嵌套”的方式来定义模式。之前在模式序列中,我们用begin()、next()、followedBy()、followedByAny()这样的“连接词”来组合个体模式,这些方法的参数就是一个个体模式的名称;而现在它们可以直接以一个模式序列作为参数,就将模式序列又一次连接组合起来了。这样得到的就是一个“模式组”(Groups of Patterns)。

四、模式的检测处理

利用Pattern API定义好模式还只是整个复杂事件处理的第一步,接下来还需要将模式应用到事件流上、检测提取匹配的复杂事件并定义处理转换的方法,最终得到想要的输出信息。

4.1 将模式应用到流上

将模式应用到事件流上的代码非常简单,只要调用CEP类的静态方法.pattern(),将数据流(DataStream)和模式(Pattern)作为两个参数传入就可以了。最终得到的是一个PatternStream:

DataStream<Event> inputStream = ... Pattern<Event, ?> pattern = ... PatternStream<Event> patternStream = CEP.pattern(inputStream, pattern);

这里的DataStream,也可以通过keyBy进行按键分区得到KeyedStream,接下来对复杂事件的检测就会针对不同的key单独进行了。模式中定义的复杂事件,发生是有先后顺序的,这里“先后”的判断标准取决于具体的时间语义。默认情况下采用事件时间语义,那么事件会以各自的时间戳进行排序;如果是处理时间语义,那么所谓先后就是数据到达的顺序。

4.2 处理匹配事件

基于PatternStream可以调用一些转换方法,对匹配的复杂事件进行检测和处理,并最终得到一个正常的DataStream。PatternStream的转换操作主要可以分成两种:简单的选择提取(select)操作,和更加通用的处理(process)操作。与DataStream的转换类似,具体实现也是在调用API时传入一个函数类:选择操作传入的是一个PatternSelectFunction,处理操作传入的则是一个PatternProcessFunction。

1. 匹配事件的选择提取(select)

处理匹配事件最简单的方式,就是从PatternStream中直接把匹配的复杂事件提取出来,包装成想要的信息输出,这个操作就是“选择”(select)。

PatternSelectFunction

代码中基于PatternStream直接调用.select()方法,传入一个PatternSelectFunction作为参数。

PatternStream<Event> patternStream = CEP.pattern(inputStream, pattern); DataStream<String> result = patternStream.select(new MyPatternSelectFunction());

这里的MyPatternSelectFunction是PatternSelectFunction的一个具体实现。PatternSelectFunction是Flink CEP提供的一个函数类接口,它会将检测到的匹配事件保存在一个Map里,对应的key就是这些事件的名称。这里的“事件名称”就对应着在模式中定义的每个个体模式的名称;而个体模式可以是循环模式,一个名称会对应多个事件,所以最终保存在Map里的value就是一个事件的列表(List)。下面是MyPatternSelectFunction的一个具体实现:

class MyPatternSelectFunction implements PatternSelectFunction<Event, String>{ @Override public String select(Map<String, List<Event>> pattern) throws Exception { Event startEvent = pattern.get("start").get(0); Event middleEvent = pattern.get("middle").get(0); return startEvent.toString() + " " + middleEvent.toString(); } }

可以通过名称从Map中选择提取出对应的事件。注意调用Map的.get(key)方法后得到的是一个事件的List;如果个体模式是单例的,那么List中只有一个元素,直接调用.get(0)就可以把它取出。当然,如果个体模式是循环的,List中就有可能有多个元素了。例如我们对连续登录失败检测的改进,可以将匹配到的事件包装成String类型的报警信息输出,代码如下:

// 1. 定义Pattern,登录失败事件,循环检测3次 Pattern<LoginEvent, LoginEvent> pattern = Pattern .<LoginEvent>begin("fails") .where(new SimpleCondition<LoginEvent>() { @Override public boolean filter(LoginEvent loginEvent) throws Exception { return loginEvent.eventType.equals("fail"); } }).times(3).consecutive(); // 2. 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStream PatternStream<LoginEvent> patternStream = CEP.pattern(stream, pattern); // 3. 将匹配到的复杂事件选择出来,然后包装成报警信息输出 patternStream .select(new PatternSelectFunction<LoginEvent, String>() { @Override public String select(Map<String, List<LoginEvent>> map) throws Exception { // 只有一个模式,匹配到了3个事件,放在List中 LoginEvent first = map.get("fails").get(0); LoginEvent second = map.get("fails").get(1); LoginEvent third = map.get("fails").get(2); return first.userId + " 连续三次登录失败!登录时间:" + first.timestamp + ", " + second.timestamp + ", " + third.timestamp; } }) .print("warning");

PatternFlatSelectFunction

除此之外,PatternStream还有一个类似的方法是.flatSelect(),传入的参数是一个PatternFlatSelectFunction。从名字上就能看出,这是PatternSelectFunction的“扁平化”版本;内部需要实现一个flatSelect()方法,它与之前select()的不同就在于没有返回值,而是多了一个收集器(Collector)参数out,通过调用out.collet()方法就可以实现多次发送输出数据了。

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://itzsg.com/64058.html

(0)

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

联系我们YX

mu99908888

在线咨询: 微信交谈

邮件:itzsgw@126.com

工作时间:时刻准备着!

关注微信