全国免费咨询:

13245491521

VR图标白色 VR图标黑色
X

中高端软件定制开发服务商

与我们取得联系

13245491521     13245491521

2020-05-04_Flink高级应用模式第二辑:应用模式的动态更新

您的位置:首页 >> 新闻 >> 行业资讯

Flink高级应用模式第二辑:应用模式的动态更新 作者 | Alexander Fedulov 译者 | 王强 策划 | 钰莹 在 本系列的第一篇文章 中,我们对欺诈检测引擎的目标和所需功能给出了高层次的描述。我们还解释了如何让 Apache Flink 中的数据分区基于可修改的规则来定制,替代使用硬编码的 KeysExtractor 实现。 我们特意略过了关于如何初始化应用的规则,以及在运行时有哪些方法来更新这些规则的细节内容。在这篇文章中我们将具体介绍这些细节。你将学习如何将第一部分中描述的数据分区方法与动态配置结合起来使用。只要共同使用这两种模式,调整很多业务逻辑时就不用再重新编译代码和重新部署 Flink 作业了。 规则广播 首先我们来看一下先前定义的数据处理管道: DataStreamAlert alerts = transactions .process(new DynamicKeyFunction()) .keyBy((keyed) - keyed.getKey()); .process(new DynamicAlertFunction()) DynamicKeyFunction 提供动态数据分区,而 DynamicAlertFunction 负责执行处理事务的主要逻辑,并根据已定义的规则发送警报消息。 本系列的第一篇文章简化了用例,并假定应用的规则集已预先初始化,可以通过 DynamicKeyFunction 中的 List访问。 public class DynamicKeyFunction extends ProcessFunctionTransaction, KeyedTransaction, String, Integer { /* Simplified */ ListRule rules = /* Rules that are initialized somehow.*/; ... } 显然,在初始化阶段就可以直接在 Flink 作业的代码内部向这个列表添加规则(创建一个 List 对象,使用它的 add 方法)。这样做的主要缺点是每次修改规则后都需要重新编译作业。在现实的欺诈检测系统中规则会经常更改,因此从业务和运营角度来看,这种方法是不可接受的。我们需要另一种方式。 接下来是在上篇文章中引入的规则定义示例: 图 1:规则定义 上一篇文章提到,DynamicKeyFunction 使用 groupingKeyNames 来提取消息键。该规则第二部分中的参数由 DynamicAlertFunction 使用:它们定义所执行操作的实际逻辑及其参数(例如警报触发阈值)。这意味着在 DynamicKeyFunction 和 DynamicAlertFunction 中必须存在相同的规则。为了获得这个结果,我们将使用 Apache Flink 的 数据分发广播机制。 下图展示了我们正在构建系统的最终作业图: 图 2:欺诈检测 Flink 作业的作业图 事务处理管道的主要模块有: 事务源(Transaction Source),它并行消费来自 Kafka 分区的事务消息。 动态键函数(Dynamic Key Function),使用一个动态键执行数据强化(enrichment)。后续的 keyBy 对这个动态键进行哈希处理,并在随后的运算符的所有并行实例之间对数据进行分区操作。 动态警报函数 (Dynamic Alert Function),可生成一个数据窗口并基于该窗口创建警报。 Apache Flink 内部的数据交换 上面的作业图还指出了运算符之间的各种数据交换模式。为了解广播模式的工作机制,我们先走一小段弯路,讨论 Apache Flink 的分布式运行时中存在哪些消息传播方法。 事务源之后的 FORWARD 连接意味着事务源运算符的一个并行实例消费的所有数据,都将精确传输到后续 DynamicKeyFunction 运算符的一个实例上。它还指出两个连接的运算符(在上述情况下为 12)并行度相同。此通信模式如图 3 所示。橙色圆圈表示事务,虚线矩形表示相联运算符的并行实例。 图 3:跨运算符实例传递的 FORWARD 消息 DynamicKeyFunction 和 DynamicAlertFunction 之间的 HASH 连接意味着,对于每个消息都将计算一个哈希码,并且消息将在下一个运算符的可用并行实例之间平均分配。需要使用 keyBy 从 Flink 显式“请求”这样的连接。 图 4:在运算符实例之间传递的哈希消息(通过keyBy) REBALANCE 分布是由对 rebalance() 的显式调用或并行度的更改(对于图 2 中的作业图而言,为 12-1)引起的。调用 rebalance() 会使数据以循环方式重新分区,并且在某些情况下可以帮助减轻数据偏斜。 图 5:跨运算符实例传递的 REBALANCE 消息 图 2 中的欺诈检测作业图包含一个附加数据源:规则源(Rules Source)。它还从 Kafka 消费。规则通过 BROADCAST 通道“混合”到主处理数据流中。在运算符之间传输数据的其他方法(例如 forward、hash 或 rebalance),会让每个消息只可在接收的运算符的并行实例之一中处理;相比之下,broadcast 会让每个消息在 broadcast stream 连接的运算符的所有并行实例的输入上可用。这使得 broadcast 方法适用于多种需要影响所有消息处理的任务,而无需考虑它们的键或源分区。 图 6:跨运算符实例传递的 BROADCAST 消息 注意: 实际上 Flink 中有一些更特殊的数据分区方案,我们在这里没有提到。如果你想了解更多信息,请参阅 Flink有关流分区的文档。 广播状态模式 为了使用规则源,我们需要将其“连接”到主数据流: // Streams setup DataStreamTransaction transactions = [...] DataStreamRule rulesUpdateStream = [...] BroadcastStreamRule rulesStream = rulesUpdateStream.broadcast(RULES_STATE_DESCRIPTOR); // Processing pipeline setup DataStreamAlert alerts = transactions .connect(rulesStream) .process(new DynamicKeyFunction()) .keyBy((keyed) - keyed.getKey()) .connect(rulesStream) .process(new DynamicAlertFunction()) 如你所见,可以调用 broadcast 方法并指定状态描述符,从任何常规流中创建广播流。Flink 假定在处理主数据流的事件时需要存储和检索广播的数据,因此总是从该状态描述符自动创建相应的广播状态(broadcast state)。这与其他的 Apache Flink 状态类型是不一样的,其他类型中你需要在处理函数的 open() 方法中对其进行初始化。另请注意,广播状态始终具有键值格式(MapState)。 public static final MapStateDescriptorInteger, Rule RULES_STATE_DESCRIPTOR = new MapStateDescriptor("rules", Integer.class, Rule.class); 连接到 rulesStream 会导致处理函数的签名发生某些变化。上一篇文章在这里做了一点简化,用的是 ProcessFunction。但是,DynamicKeyFunction 实际上是一个 BroadcastProcessFunction。 public abstract class BroadcastProcessFunctionIN1, IN2, OUT { public abstract void processElement(IN1 value, ReadOnlyContext ctx, CollectorOUT out) throws Exception; public abstract void processBroadcastElement(IN2 value, Context ctx, CollectorOUT out) throws Exception; } 这里的区别在于增加了 processBroadcastElement 方法,规则流的消息将通过该方法到达。下面新版本的 DynamicKeyFunction 允许在运行时通过这个流,修改数据分配键的列表: public class DynamicKeyFunction extends BroadcastProcessFunctionTransaction, Rule, KeyedTransaction, String, Integer { @Override public void processBroadcastElement(Rule rule, Context ctx, CollectorKeyedTransaction, String, Integer out) { BroadcastStateInteger, Rule broadcastState = ctx.getBroadcastState(RULES_STATE_DESCRIPTOR); broadcastState.put(rule.getRuleId(), rule); } @Override public void processElement(Transaction event, ReadOnlyContext ctx, CollectorKeyedTransaction, String, Integer out){ ReadOnlyBroadcastStateInteger, Rule rulesState = ctx.getBroadcastState(RULES_STATE_DESCRIPTOR); for (Map.EntryInteger, Rule entry : rulesState.immutableEntries()) { final Rule rule = entry.getValue(); out.collect( new Keyed( event, KeysExtractor.getKey(rule.getGroupingKeyNames(), event), rule.getRuleId())); } } } 在上面的代码中,processElement() 接收事务,而 processBroadcastElement() 接收规则更新。创建新规则后将按图 6 所示分配,并使用 processBroadcastState 将其保存在运算符的所有并行实例中。我们使用规则的 ID 作为存储和引用各个规则的键。我们不再迭代硬编码的 List,而是迭代动态更新的广播状态的条目。 在将规则存储在广播 MapState 中时,DynamicAlertFunction 遵循相同的逻辑。如第一部分中所述,processElement 输入中的每个消息会由一个特定规则处理,并通过 DynamicKeyFunction 带有相应 ID 的“预标记”。我们需要做的就是使用提供的 ID 从 BroadcastState 中检索相应规则的定义,并根据该规则所需的逻辑对其进行处理。在这一阶段,我们还将消息添加到内部函数状态,以便在所需的数据时间窗口上执行计算。我们将在欺诈检测系列的最后一篇文章中探讨如何做到这一点。 小结 本文,我们继续研究了使用 Apache Flink 构建的欺诈检测系统的用例。我们研究了在并行运算符实例之间分配数据的各种方式,而最重要的是探讨了广播状态。我们演示了如何通过广播状态模式提供的功能来配合和增强动态分区(本系列第一部分中介绍的一种模式)。在运行时发送动态更新的能力是 Apache Flink 的强大功能,适用于其他多种用例,例如控制状态(清除 / 插入 / 修复)、运行 A/B 实验或执行 ML 模型系数的更新等。 原文链接:https://flink.apache.org/news/2020/03/24/demo-fraud-detection-2.html 你也「在看」吗??? 阅读原文

上一篇:2021-10-15_二十年老程序员的二十条心得:面试几乎没用,警惕很久没写过代码的“大牛” 下一篇:2025-02-21_大语言模型系统评估新框架:微观指标构建方法论

TAG标签:

14
网站开发网络凭借多年的网站建设经验,坚持以“帮助中小企业实现网络营销化”为宗旨,累计为4000多家客户提供品质建站服务,得到了客户的一致好评。如果您有网站建设网站改版域名注册主机空间手机网站建设网站备案等方面的需求...
请立即点击咨询我们或拨打咨询热线:13245491521 13245491521 ,我们会详细为你一一解答你心中的疑难。
项目经理在线

相关阅读 更多>>

猜您喜欢更多>>

我们已经准备好了,你呢?
2022我们与您携手共赢,为您的企业营销保驾护航!

不达标就退款

高性价比建站

免费网站代备案

1对1原创设计服务

7×24小时售后支持

 

全国免费咨询:

13245491521

业务咨询:13245491521 / 13245491521

节假值班:13245491521()

联系地址:

Copyright © 2019-2025      ICP备案:沪ICP备19027192号-6 法律顾问:律师XXX支持

在线
客服

技术在线服务时间:9:00-20:00

在网站开发,您对接的直接是技术员,而非客服传话!

电话
咨询

13245491521
7*24小时客服热线

13245491521
项目经理手机

微信
咨询

加微信获取报价