Apache Kylin CubeHFileJob 终极优化


作者: 康凯森

日期: 2017-10-22

分类: OLAP


前言

Apache Kylin Cube 构建原理 一文中我们介绍了Kylin 的Cube构建原理,我们知道目前Kylin的Cube最终是要存储在HBase中,所以Cube构建的最后一步需要生成HFile,关于生成HFile的原理,注意事项,优化技巧我在Hive 数据 bulkload 导入 HBase中有介绍,本文主要介绍我是如何对Kylin中的HFile生成进行进一步优化,从而避免CubeHFileJob的Reducer阶段OOM。

背景

我们知道,目前Kylin中生成HFile的reducer十分简单,就是用的HBase中的KeyValueSortReducer。 其代码很短, 逻辑也十分简单,就是利用TreeSet排序后,再输出。 但是这样会有一个问题,就是当1个rowkey对应的KeyValue很大且很多时,经常会OOM,而这种情况在Kylin Merge的时候十分容易发生。

    TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
    for (KeyValue kv: kvs) {
      try {
        map.add(kv.clone());
      } catch (CloneNotSupportedException e) {
        throw new java.io.IOException(e);
      }
    }
    context.setStatus("Read " + map.getClass());
    int index = 0;
    for (KeyValue kv: map) {
      context.write(row, kv);
      if (++index % 100 == 0) context.setStatus("Wrote " + index);
    }

前几天和我们HBase同学无意中聊起时,他提到可以将排序放到Shuffle阶段,这样Reducer节点就不用排序,自然不会有OOM问题,并且他很早已经向HBase社区提交了Patch,已经合入了HBase master分支 HBASE-13897。于是我就准备优化下Kylin的CubeHFileJob。

HBase的HFile Shuffle排序实现原理

  1. 将Reducer的key通过createKeyOnly方法将完整的KeyValue改为仅HBase的Rowkey的KeyValue。
  2. 实现一个自定义的KeyValueWritableComparable利用CellComparator.COMPARATOR.compar对Rowkey进行排序。
  3. 实现一个自定义的KeyValueWritableComparablePartitioner同样利用CellComparator.COMPARATOR.compar进行分区,为了保证全局有序,分隔点是根据Region的startKey确定的,一个reducer处理1个region。
  4. Reducer中没有任何额外操作,直接输出记录。

HBase中的实现不能直接在Kylin中使用:

  1. HBase是根据Region来进行Recuer切分的,但是在Kylin中是根据HFile来切分Reducer的。只根据Region来切分的话,Reducer的个数会很少,Reducer执行就会很慢。
  2. HBase中获取Region startKey的时候,是通过直接访问HBase来获取的,但是在Kylin中我们不希望在MR中访问HBase。

Kylin目前的CubeHFileJob实现原理

  1. Kylin在Cube构建的前几步会估算出Cuboid的大小。
  2. 在建HBase表的时候,根据统计信息计算出分裂点,写入临时文件。分裂点的Key是ShardID,或者ShardID+Cuboid,其中ShardID对应Region,ShardID+Cuboid对应HFile。Key的类型是ImmutableBytesWritable,Value是NullWritable。
  3. CubeHFileJob的mapper阶段就是Cuboid的格式转化为KeyValue。
  4. CubeHFileJob的reducer阶段利用KeyValueSortReducer对KeyValue进行排序。
  5. CubeHFileJob是利用TotalOrderPartitioner进行全局排序的。

Kylin的HFile Shuffle排序实现原理

  1. TotalOrderPartitioner 可以接受的partition key有两种:BinaryComparable 和 RawComparator,所以我们依旧可以使用TotalOrderPartitioner来实现全局排序,只要我们的sortComparatorClass是BinaryComparable或者RawComparator的子类, 不需要像HBase中自定义Partitioner 类,自然也就不需要访问HBase。
  2. KeyValueWritableComparable 是借助CellComparator.compareStatic来比较KeyValue的,这样的前提是我们必须在Comparator反序列化出KeyValue,而实际上我们可以利用KeyValue.KVComparator()直接对字节进行排序,不需要反序列化。
  3. Kylin中确定HFile的分割点是在生成Cuboid之前,而此时我们只知道cuboid,不知道具体的rowkey,不知道具体的列族和列,所以不能直接构造出具体的真实KeyValue, 但是kylin的cuboid是Rowkey的起始部分,所以我们可以用KeyValue.createFirstOnRow方法根据cuboid构造出第一个合法的可能的KeyValue。

综上,实际我们只需要实现一个RowKeyWritable,然后将PartitionFile和CubeHFileJob Mapper的Key都改为RowKeyWritable就可以。

总结

虽然我们的灵感来源于HBase,但最终没有使用HBase现有的类,我们的实现更简洁,更优秀。

在实现过程中有一个坑需要注意,就是TotalOrderPartitioner在setconf的时候用的是ComparatorClass的确是mapreduce.job.output.key.comparator.class,但是其调用的方法是comparator.compare(splitPoints[i], splitPoints[i+1]),并不是compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)。也就是说我们仅仅继承WritableComparator实现一个Comparator是不够的,我们还必须实现WritableComparable接口来重写compareTo(T o)方法,因为WritableComparator的compare方法调用了compareTo方法。


《OLAP 性能优化指南》欢迎 Star&共建

《OLAP 性能优化指南》

欢迎关注微信公众号