欢迎大家来到IT世界,在知识的湖畔探索吧!
在大数据生态中,数据分析系统在数据创造价值过程中起着非常关键的作用,直接影响业务决策效率以及决策质量。Apache Doris 作为一款支持对海量大数据进行快速分析的 MPP 数据库,在数据分析领域有着简单易用、高性能等优点。近日,Apache Doris 组织了一场线上 Meetup,作业帮带来了《Doris 在作业帮实时数仓中的应用实践》的主题分享。
大家下午好。很感谢大家参加全球 100 案例峰会预热沙龙关于 Doris 的线上 MeetUp。
下面我来介绍下 Doris 在作业帮实时数仓中的应用与实践。
这次的分享主要分三个主题:
1. 首先是所在团队的业务与背景介绍
2. 其次会介绍下基于 Doris,作业帮的查询系统是如何构建的,以及主要解决的问题
3. 未来的规划
我所在团队是作业帮大数据团队,主要负责建设公司级数仓,向各个产品线提供面向业务的数据信息,如到课时长、答题情况等业务数据以及如 pv、uv、活跃等流量类数据,服务于拉新、教学、BI 等多个重要业务线。
在数仓体系中,大数据团队主要负责到 ODS-DWS 的建设,从 DWS 到 ADS 一般是数仓系统和业务线系统的边界。
在过去,由于缺失有效、统一的查询系统,我们探索了很多模式来支持各个业务线发展。
- 有些业务线对大数据相关技术比较了解,熟悉 spark 等计算系统,可以自己处理计算。因此会选用 kafka 接收数据后使用 spark 计算的模式来对接大数据团队;但是其他业务线不一定熟悉这套技术栈,因此这种方案的主要问题无法复制到其他业务线。且 Spark 集群跨越多个业务线使用,本身就给业务线带来了额外的维护成本。
- 既然 Kafka+Spark 的模式无法大范围推广,我们又探索了基于 ES 的方案,即大数据将数据写入 ES 中,然后业务先直接访问 ES 来获取数据,但是发现一方面高性能的使用 ES,本身就具有很高的成本,对 ES 得非常熟悉,这对于业务线来说很难有精力去做,其次,由于使用 ES 的系统质量参差不齐,偶会还发生将 ES 集群打垮的问题,稳定性也不可控,最后 ES-Sql 语法完备性不足,如不支持 join、多列 group by(6.3 版本)等。
- 因此我们又探索开发 API 接口,希望在稳定性上可以有更好的解决方案。虽然 API 可以可控,但是由于 API 不提供 Sql 功能,基于需求场景不断 case by case 的 API 开发反而成了影响交付效率的主要瓶颈点。
- 上述多是支持查询明细数据,一旦涉及到大规模的流量类查询,如 pv、uv,只好引入 druid 类系统,但是 duird 的接口和其他系统的接口不一致,用户往往又得学习,且 Druid 不支持明细,一旦需要明细,就需要到 ES 去查询,由于涉及两套系统,有时候还得处理明细数据和聚合数据不一致的问题。
随着需求越来越多,系统也越来越难以维护,交付效率也特别低,需求排队非常严重。因此,提供有效而统一的查询系统,对于实时数仓建设在提高业务支持效率、降低维护成本上都具有非常重大的意义。
经过过去数月的探索与实践,我们确立了以 Doris 为基础的实时查询系统。同时也对整个实时数仓的数据计算系统做了一次大的重构,最终整体的架构图如下:
如图所示(从下到上),原始业务层日志经数据摄入系统进入数仓,在数据清洗计算层,我们将原来基 Spark 系统升级到了 Flink,并且基于 Flink-Sql 提供了统一的数据开发框架,从原有的代码开发升级到 Sql 开发来提升数据的研发效率。
其后查询系统将 Kafka 的数据实时同步到查询引擎内,并通过 OpenAPI 的统一接口对外提供查询服务。
基于 Doris 的查询系统上线后,我们面对一个需求,不用像过去一样做方案调研、开发接口、联调测试,现在只要把数据写入,业务层就可以基于 sql 自己完成数据查询、业务开发,交付效率(数据计算好到提供可读服务)从过去的数人周加快到小时级。
在性能方面,过去基于 ES 或者 mysql 来做,当查询的数据量较大时,我们只能忍受数十个小时到数分钟的延迟,基于 Doris 的方案,加快到分钟级甚至秒级。
Doris 的整体架构非常简单,不依赖任何第三方组件,社区支持度也非常好,从上线到今,我们只需做一些轻量级的运维规范,即可保证高稳定性。
所以说,通过引入 Doris,解决了作业帮内实时数仓查询交付慢、查询慢的痛点问题,对于后续数仓的系统发展起到了非常关键的作用。
接下来,重点讲下查询系统的工作,分两部分:查询系统的架构选型以及原理,以及应用 & 实践.
在讲查询引擎之前,先讲下业务场景。
作业帮内,业务场景主要分两种:一种是传统的流量类,比如算 pv、uv、活跃……,作业帮内很多时候还需要看进一步的明细,比如作业帮主 App 在每天各个小时的活跃用户数,还要看 作业帮主 App 每个小时内各个版本的活跃用户数。
第二种是面向我们业务线的工作台,比如教学的老师。比如我们的老师上完课后,会看下自己班内的同学们的出勤数据、课堂测验数据等。
这两种场景下,考虑到调研成本、团队技术生态、维护成本等多种因素,我们最后选择了 Doris 作为我们的查询引擎。主要是 Doris 可在上述两种场景下都可以统一的满足业务的需求。
首先介绍下 Doris。
Doris 是 mpp 架构的查询引擎。
整体架构非常简单,只有 FE、BE 两个服务,FE 负责 Sql 解析、规划以及元数据存储,BE 负责 Sql-Plan 的执行以及数据的存储,整体运行不依赖任何第三方系统,功能也非常丰富如支持丰富的数据更新模型、Mysql 协议、智能路由等。对于业务线部署运维到使用都非常友好。
接下来讲下用 Doris 如何解决我们前面提到的业务场景下的问题。
Doris 有多种数据模型,流量类场景常用的是聚合模型。比如对于前面提到的场景,我们会吧作业帮主 App 各个版本的明细数据存到 base 表中,如果直接从 base 表中读取跨天级的聚合数据,由于数据行比较多,可能会出现查询延迟的问题,因此我们会对常用的天级数据做一次 rollup,这样通过预聚合,来减少查询的数据量,可以加快查询的延迟。
要高效的使用 Doris 的聚合模型,前提都是基于 key 列做数据行筛选,如果使用 value 列,Doris 需要把相关的行全部聚合计算后方可决策是否属于结果集,因此效率比较低。
而对于教研工作台,前面提到的都是基于 value 的筛选,因此使用了 Doris on ES 的模型。主要是考虑到 可以发挥 ES 的任意列检索的能力,来加快查询速度。
在我们的实践中,发现 Doris on ES 相比直接裸用 ES 或社区的其他方案如 Presto on ES 在性能上有很大的提升,接下来介绍下 Doris on ES 高性能的设计原理。
Doris on ES 整体的架构如图,FE 负责查询 ES 的元数据信息如 location、shard 等,BE 负责从 ES 数据节点扫描数据。
Doris on ES 高性能,相比裸用 ES,有几个优化点:
裸用 ES 时,ES 采用的是 Query then Fetch 的模式,比如请求 1000 条文档,ES 有 10 个分片,这时候每个分片都会给协调返回 1000 个 doc id,然后 协调节点其实拿到了 10 * 1000 个 doc id,然后选择 1000 个。这样其实每个分片多返回了 900 个.
Doris on ES 则绕过了协调节点直接去操作 datanode。它会在每个 datanode 上查询符合预期的 docid,这样不会有过多的 docid 返回。
其次,Doris 从 ES 扫描数据时,也做了很多优化。比如在扫描速度上,采用了顺序扫描、列存优化、谓词下推等,在数据从 ES 传输到 Doris 时,采用就近原则如 BE 会优先访问本机的 datanode、source filter 来过滤不用的字段等来加速传输速度。
在我们的调研中,Doris on ES 的性能,比 Presto on ES 快了有数十倍。
在作业帮内,除了上面介绍的基于 Doris 的数据模型做的基础应用,要完整的支持业务、保证稳定性、提高效率,还需要其他周边的系统建设。
接下来介绍下基于 Doris,作业帮查询系统架构的整体设计以及工作模式。
这是作业帮查询系统的总体架构。
从上往下,首先是我们平台,包括各个报表平台、元数据管理平台等,主要来提高各个场景的人效。
其下红色部分为我们统一的 api 接口层,这里我们主要是制定了 api 的规范比如请求响应方式、返回码等,来减少系统之间对接的成本。
基于 api 除了提供了主要的读写接口外,也包含了周边的服务建设,比如元数据管理、调度系统等。
接下来就基于一个完整的流程来介绍下各部分系统。
首先是元数据。Doris 基于 mysql 语法建表,已经有元数据,我们这里做元数据,有几个额外的考虑:
- 首先是保障查询性能方面:如果一个表在建表时配置写错,那么查询性能会非常差,比如 ES 的 index mapping 中关闭了 docvalue,或者 Doris 表未启动列存模式,那么查询就会退化成行存模式,性能会比较低,因此为了最大化性能,就需要将建表的过程全部自动化且规范化。这是其一。
- Doris 自身存储是有强 Schema 约束的,比如一个字符串的长度。但是 ES 并没有明确的长度约束,对于一个 keyword 类型的字段,写入 128B 或者 256B 都可以成功,但这会导致一个问题,当把一张 es 表同步到 Doris 表时,同步的成功率无法保障。另外,一旦 Doris 表声明的类型(如 bigint)和 ES index 的类型不一致(如 keyword)时,也会导致 Sql 运行失败。因此需要构建统一的数据模型来避免这类问题。
- 第三:使用效率。我们在使用过程中,建表、删除表、修改表是一个常见的操作,为了让各个业务线的同学(不管是否了解 Doris)都可以快速的建表,这也是要做统一元数据、统一模型的基础。
- 最后,前面也提到了我们整个计算系统也在重构为 flink-sql。flink-sql 则会强依赖元数据,比如 table on kafka、table on redis……
要统一元数据,统一数据模型,就得抽象整个数据表的结构,来管理好不同存储上的表,我们基于 env、db、table 为基本单位来管理表,database、table 大家相对熟悉,env 是我们引入的新 namespace,主要用于提供不同集群 / 业务线的定义,如百度云的数仓集群、腾讯云的数仓集群,表单元下主要包含 field(列类型、值域)、index(如 rollup、bitmap 索引等)、storage(存储属性)。
关于列属性,主要是规范化类型系统,考虑到 json-schema 由于其校验规则丰富、描述能力强,因此对于列值的约束统一使用 json-schema 来做。
对于数据类型,我们设计了公共数据类型以及私有数据类型。公共类如 varchar、int 等,这些在不同的存储系统都有对应的实现,也支持私有类型如 Doris::bitmap,方便私有系统的兼容和扩展。通过这个模式可以将基于各个存储系统的表做了统一的管理。
这是我们线上的真实的一张表。里面包含了列信息以及对应的存储配置。
左图中的纵向红框是 json-schema 的描述,来规范化值域。横向红框为 ES 表的一些 meta 字段,比如 docid、数据更新时间。这些字段可以方便追查数据问题、以及用作数据筛选。
因为我们统一了数据模型,因此可以很方便的对所有表统一设置要增加这些 meta 字段。
通过元数据的统一管理,构建的表质量都非常高。所有的表都在最大化性能的提供查询服务,且由于数据导致的查询不可用 case 为 0。且对于任何业务线的同学,不管是否了解 Doris,都可以分钟级构建出这样一张高质量的表。
建好表后,就是数据的写以及读。统一基于 openapi 来做。
做 api 接口其实本质上也是为了在提供系统能力的前提下,进一步保障系统的稳定性和易用性。
比如要控制业务线的误用(如连接数打满),提供统一的入口方便写 es、Doris,且控制数据质量……
首先介绍下数据写接口。
由于统一了表模型,因此可以很方便的提供统一的写入接口协议。用户也无须关注实际表的存储是 es 还是 Doris 以及处理异构系统的系统。
第二,统一了写接口,就可以统一的对写入的数据会做校验检查,如数据的大小、类型等,这样可以保证数据写入的质量与准确性。这样对于数据的二次加工非常重要。
第三,接入协议中还增加了关键词,如数据的版本。可以解决数据的乱序问题,以及建立统一的写入监控。如下图是我们整个写入数据流的 qps 以及端到端(数据写入存储时间以及数据生产时间)的延迟分位值,这样可以让系统提高可观测性、白盒化。
接下来讲一个具体的场景,写入端是如何解决乱序问题的。
常态下我们的实时数据流是经过 flink 或 spark 计算后写入 kafka,然后由查询系统同步到 Doris/es 中。
当需要修数时,如果直接写入,会导致同一个 key 的数据被互相覆盖,因此为了避免数据被乱序覆盖,就得必须停掉实时流,这个会导致数据时效性式受损。
因此我们基于写入端做了改进,实时数据流、离线修复数据流各自写入不同的 topic,同步服务对每个 topic 做限速消费,如实时流时效性要求高,可以配额调的大些,保证配额,离线时效性则允许配额小点,或者在业务低峰期将配额调大,并基于数据 key& 列版本存储做了过滤。这样可以保证时效性的前提下,修数也可以按照预期进行。
最后是读的部分。
在提供 sql 能力的前提下,我们也做了一些额外的方案,比如缓存、统一的系统配置。对于系统延迟、稳定性提升都有很大的改进。并且由于统一了读接口,上述的这些改造,对于业务线来说都是透明的。
除了常规下面向低延迟的读,还有一类场景面向吞吐的读。
介绍下场景,比如 要统计统计某个学部下(各个老师)的学生上课情况:上课人数、上课时长等。
在过去,我们是基于 spark/flink 来处理这类问题,如 spark 消费 kafka 中的课中数据,对于每一条数据,会去 redis 中查询教师信息来补全维度。
常态下,当课中数据到达的时候,教师信息是就绪的,因此没有什么问题。可是在异常下,如维度流迟到、存储查询失败等,会导致课中流到达时,无法获取对应的教师信息,也就无法计算相关维度如学部的统计。
过去面临这种情况时,只能遇到这种异常,如重试如果无法解决,只能丢弃或者紧急人工干预,比如在尾标就绪后再重新回刷课中表,一旦遇到上游 kafka 数据过期就只能从 ods 层或者离线修复,效率特别低,用户体验也非常差。
基于 Doris 模式下,我们使用微批调度的模式。
调度系统会定期(分钟级)执行一个调度任务,基于 sql join 完成数据的选取。这样哪怕在异常下,课中流查不到教师数据,这样 join 的结果只是包含了可以查到教师数据的信息,
待教师数据就绪后,即可自动补全这部分课中数据的维度。整个过程全部自动化来容错。效率非常高。
因此这个模式的主要好处:
- 业务端延迟可控、稳定性好。整个过程主要取决于调度的周期和 Sql 执行时长。调度周期可控,且由于 Doris on ES 的高性能,Sql 执行时长几乎都可以在分钟内完成。
- 数据修复成本低、维护方便。一旦数据有异常,可以自动触发对应的数据窗口进行重新计算。
最后,讲下其他方面的建议实践,这些相对简单,但是在实际的应用中非常容易忽视。
- ADS 层表,尤其是面向平台侧的应用,慎用 join。Doris 的 join 策略比较多,如 broadcast、shuffer 等,如果使用需要了解原理,属于高级用户的使用范畴。对于强调快速迭代的场景下,可以使用微批模式来略降低数据更新的延迟,提高数据查询的效率。
- 使用 Doris on ES 时,尤其是在 ES 集群负载很高的情况下,在延迟允许的情况下建议将 es 的扫描超时时间设置大一点,如 30s 甚至更久。
- Batch size,不是越大越好。我们实践中发现 4096 下最好,可以最高达到每秒 30w 的扫描速度。
- Doris 使用 bitmap 做精确去重时,有时候会发现 Sql 延迟比较高,但是系统 cpu 利用率低,可以通过调大 fragment_instance_num 的值。
- 运维 Doris 时,建议使用 supervisor,可以帮助避免很多服务异常挂掉的问题;机器全部开启 ulimit –c,避免出 core 时无法高效定位
- 当前我们在使用 master 版本,主要是考虑到 bugfix 很及时,但是也要避免新代码、feature 的 bug 引入,因此我们会关注社区的 issue、并做好 case 回归、固化使用模式等一系列手段来保障 master 在实际生产中的稳定性。
最后,讲下规划。
Doris 在作业帮实时数仓的建设中发挥了很关键的作用。
在实际的应用中,我们也发现了一些当前的一些不足。
如 Doris on ES 在面对大表的 join 查询时,目前延迟还比较大,因此需要进一步的优化解决;
Doris 自身的 olap 表可以做动态分区,对于 ES 表目前可控性还不足;
其次,当 ES 修改表后,如增加字段,只能删除 Doris 表重建,可能会有短暂的表不可用,需要自动化同步或者支持在线热修改;
最后 Doris on ES 可以支持更多的谓词下推,如 count 等。
我们也希望可以和社区一起,把 Doris 建设的越来越好。
好的。我的分享到此结束。谢谢大家。
问题 1:Doris on ES V.S. sparksql on ES,在功能上和性能上咱们调研过吗?对于使用那个您这边有什么建议吗?
- SparkSql on ES 和 Doris on ES 虽然都是 Sql,但是在实际的生产环境中使用差异还是比较大的。
- 功能上来说,SparkSql 和 Doris-Sql 需要考虑语法的兼容性问题,毕竟是两个系统,语法兼容其实很难。一旦不一致就需要用户端面向不同的系统做适配。
- 性能上,SparkSql 或者 Doris on ES,虽然访问 ES 的原理都差不多,但是实现上可能会有 diff,这些 diff 会导致性能上差异比较大,如 SparkSql 的 connector 是不支持列存模式的。
- 场景上,如果使用 SparkSql 建议可以使用在流计算场景,更多的是解决吞吐的问题,类似的系统应该是 Flink-Sql。可以吧数据按照行扫出来后,基于 Spark 的分布式计算能力、yarn 的资源管理走流计算的模式。Doris on ES 更适合走低延迟的场景。
问题 2:Doris 支持 Hive Metastore,和 Flink SQL 是什么关系?刚才讲的太快,有点没听懂
- Doris 其实是不支持 Hive MetaStore 的。只是可以从 HDFS 上 load 文件,然后在 Doris 的 load 语法中指定对应的列。
- FlinkSql 和这块关系不大。不过我理解你说的应该是我们的元数据,这部分背景是因为 Flink-Sql 运行时需要设置 ddl 语句,比如一张基于 redis 的表都有哪些列,类型是什么,这些需要统一的管理起来,目前是存储到了我们的元数据系统中。通过接口和 Flink 系统完成对接。
问题 3:_version 字段是一个内部字段?需要用户端写入的时候指定,还是系统自动创建?和 HBase 的 version 的应用场景有区别吗?
- _version 是我们数据流的一个内置协议字段。在数流转过程中,用户只要设置值即可,不需要显示创建。具体的值可以根据数据字段的写入服务来设置,比如在 ods 层,应该是采集侧服务来写入,如果在中间的 flink 清洗环节,应该是 flink 系统来设置,尽量让架构服务统一设置,保证稳定性。
- _version 字段最终会映射到存储系统中的 UpdateTime 字段,这个也是架构负责写入的。不需要业务侧关注。
- HBase 的 version 更多是用于多版本的管理,比如数据的回滚等。这里查询系统的 _version 更多是为了保证数据的时鲜性,即用户从查询系统读到的数据始终是最新的。这么做的前提主要是因为查询系统比如 ES 对于数据列多版本支持不太好,对于数据流更新时如果没有版本管理,容易导致乱序覆盖。和 HBase 的 version 场景还不同。
- ES 内部也有一个 _version,但是这个 _version 一般是 ES 内部使用,用于高并发下乐观锁的实现。和当前的场景都不一样。
关注我并转发此篇文章,私信我“领取资料”,即可免费获得InfoQ价值4999元迷你书,点击文末「了解更多」,即可移步InfoQ官网,获取最新资讯~
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://itzsg.com/76461.html