作者: 康凯森
日期: 2019-01-01
分类: OLAP
Doris 最新版本已经支持 Stream Load,本文简介下 Doris Stream Load 的核心原理以及我如何在 Doris 外围实现 Exactly Once 的 Kafka To Doris 实时数据同步。
Doris Stream Load 实现的核心思想和 HBase,Druid 比较类似:在内存中实现一个面向写优化的 MemTable(MemStore),当 MemTable 的大小超过一定阈值时,进行 flush,将 MemTable 转为带有前缀索引的 SSTable(HBase 中是将 MemStore 转为 HFile, Druid 中是将 Map 转为带倒排索引的 Segment)
Doris 新增了 GlobalTransactionMgr 来保证单次 Stream Load 的原子性。
下面我简介下 Doris 一次 Stream Load 的流程:
会的,当数据在插入到内存的 SkipList 时,Aggregate 模型 和 Uniq 模型的数据会进行聚合
不可查, 和 HBase,Druid 不同,Doris 只有每次 Stream Load 的事务状态变为 VISIBLE 后,数据才可查。Doris 目前并没有实现内存中数据的 Scanner。
如果是和此次 Stream Load 无关的 BE 节点,肯定不会受影响;
如果是负责数据摄入的 BE 节点挂掉,此次 Stream Load 会失败,事务会标记为 ABORTED;
如果是数据所在的 BE 节点超过半数挂掉,此次 Stream Load 会失败,事务会标记为 ABORTED。
因为 FE 的 Master 节点是事务管理者,中间会有多次 BE 和 FE 的请求交互,如果 FE 的 Master 节点在 BE 请求时挂掉,此次导入也会失败。FE 的非 Master 节点挂掉应该不会有啥影响。
CSV 格式,默认的列分隔符是 '\t',行分隔符是 '\n'。列分隔符可以自定义。
目前 Doris 社区最新版已经提供了Stream Load 的能力,可以通过 Http 接口进行流式数据的实时导入,但是还没有对接 Kafka 等数据源。
而我司用户对 Kafka To Doris 实时导入的需求十分迫切,所以我在 Doris 外围实现了 Kafka To Doris 的 Exactly Once 实时数据同步。
如上图所示,Kafka To Doris 的功能是由 Doris-Load 这个 Java 进程实现,该进程可以运行在任意可以 ping 通 Kafka 集群和 Doris 集群的节点上。
Doris-Load 进程包含以下 5 类线程:
OffsetsRecord
: 代表 Mysql Meta 中一行记录List<String> recordsBuffer
: 待发送的一批 Kafka ConsumerRecordLong curSequenceId
: 当前的 SequenceIdLong lastCommittedSequenceId
: 所有小于等于 lastCommittedSequenceId 的 SequenceId 对应的 Kafka 消息已成功 Load 进 Doris, 也意味着在 Mysql Meta 中所有 SequenceId 小于等于 lastCommittedSequenceId 的记录都可以删除Map<Long, OffsetsRecord> pendingRecords
: 等待 Doris response 的 Kafka Record,key 是 SequenceIdList<Long> pendingCommittedIds
: SequenceId 对应的数据已经成功 Load 进 Doris,等待更新 Mysql Meta 中的 kafka_committed 字段保证 Kafka 数据的 Exactly Once 消费等价于同时保证消费 Kafka 数据不丢且不重。
如何保证消费 Kafka 数据时不丢?
精确记录 Kafka 消费的 Offset,只有在确认 Kafka 消息成功被 Doris 消费时,对应的 Kafka 消息才会在 Mysql Meta 中被标记为 Committed。
如何保证消费 Kafka 数据时不重?
核心是借助 Doris Stream Load 的 Label 机制: Doris 的每个 Stream Load Http 请求可以增加一个 Label 的 Http Header,Doris 可以保证相同 Label 的数据在 7 天(可配置)内只能 Load 一次,重复提交会报错,而且可以根据 Label 查询每个 Load 请求的状态。
所以我们只需要让 Load 请求的 Label(dorisDb_dorisTable_sequence_id)和 Kafka 消息的 offsets 严格对应,就可以保证不会重复消费 Kafka 的消息,即使当 后发送的 Http 请求成功但是先发送的 Http 请求失败时 也可以保证。
定期 check Kafka topic 的 Partition 数量,如果发现有变化,则重新 assign。
目前需要重启 Doris-Load 进程。 后续可以考虑将 Doris 建模和 Doris-Load 进程的管理集成到一个系统(Doris 管理平台),做到用户在 Schema 变更后,自动重启 Doris-Load 进程。
根据配置决定是否可以容忍错误数据,如果不可以容忍,则 Doris-Load 进程退出;否则每批数据可以容忍指定百分比的错误数据。
目前 Doris-Load 进程最耗 CPU,最慢的部分就是 Kafka 消息的 Json 解析。 我 Json 解析用的是 Jackson 框架,开始时是直接把 Json 当做一个 map 来解析,后来发现在实际使用中 Doris 的字段数是很有可能小于 Kafka 的字段数的,所以就采用了运行时根据 Doris 的元数据动态生成类,让 Jackson 根据生成的类解析 Json 的方式。这种方式不仅可以提高解析速度,同时还可以更容易的保证导入时列的顺序,进行 Null 值的处理。
由于 Doris 的 Http Load 有 label 机制,所以每次 Http Load 是幂等的,所以我们可以 Catch FE 或 BE 连接失败的异常,并进行重试。
让 0.10 的 KafkaConsumer 使用 V0 版本的 API,即可支持 0.8 的 Kafka 集群,同时需要保证每次在 assign 和 seek 时必须包含所有的 partition,否则在 pull 数据时会触发 updateFetchPositions 逻辑,就会报 SchemaException。
Kafka To Doris 在我司12月6号已上线,目前线上已运行了10多个同步流程,期间也完善了一些异常处理,监控报警。后续会参考 Druid Kafka Indexing Service 启动一个 Service,暴露以下 API: 管理 Doris-Laod 进程的启动,关闭和重启;查看每个 Doris-Load 进程的存活状态,消费状态。