Hive 数据 bulkload 导入 HBase


作者: 康凯森

日期: 2016-04-26

分类: HBase


Hive 数据 bulkload 导入 HBase 原理

主要工作就是将Hive的数据格式转化为HBase中的数据格式。bulkload导入 HBase 绕过写memstore,直接写HFile,不影响线上业务,并且做到实时生效。

Hive 数据 bulkload 导入 HBase 步骤

生成HFile

其实质就是一个mapreduce任务。 map 阶段 : 将Hive表中的数据转化为HBase中的KeyValue结构。 reduce 阶段: 由于HFile要求写入的数据是按照(rowkey,family,column,ts)有序排列,所有这一步主要利用TreeSet对map阶段生成的keyvalue进行排序。

BulkLoad

其实就是一个HDFS上的mv操作,将生成的HFile mv到HBASE表的相应分区内,及时生效,导入过程不影响线上数据读写操作。 主要是调用Hbase的LoadIncrementalHFiles类。

Hive 数据 bulkload 导入 HBase 注意事项

数据格式

一般在生成keyvalue的时候把Hive中的数据都变为了String类型。

HFile的 KeyValue必须有序

因为HFile的 KeyValue 必须有序,所以要求Hive源数据中对于同一Rowkey下同一列族同一列的值不可以重复。

生成HFile的MR任务的reduce个数确定

HFileOutputFormat.configureIncrementalLoad(job, table);

默认我们可以通过此方法来确定生成HFile的MR任务的reduce个数,这个方法会读取Hbase表的splitkey来确定生成HFile的MR任务的reduce个数。

Distcp

如果Hive数据源使用的HDFS集群和HBase表使用的HDFS集群不是同一个集群,我们在生成HFile后还需要 通过distcp 将HFile 传输到HBase表所在的HDFS集群。

用户权限和安全认证问题

我们必须保证我们使用的用户拥有相应的Hbase集群以及HBase集群依赖的HDFS集群的读写权限。

避免单个HFile跨region

如果HFile跨region,在BulkLoad的时候就会进行split,就会导致BulkLoad的速度比较慢。所以如果是我们自己确定splitkey的话一定要保证单个HFile不跨region

加速生成HFile的MR任务

有时候可能HBase表的region数很少,导致由上述方法确定的生成HFile的MR任务的reduce个数很少,进而导致MR任务时间很长。这时候我们也可以根据生成HFile的个数划分splitkey,增加生成HFile的MR任务的reduce个数,减少MR任务时间。

Hive 数据 bulkload 导入 Hbase 核心代码

公司代码不方面贴,就给大家展示一下Kylin里的代码

生成HFile的Mapper

    @Override
    public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
        outputKey.set(key.getBytes(), 0, key.getLength());
        KeyValue outputValue;

        int n = keyValueCreators.size();
        if (n == 1 && keyValueCreators.get(0).isFullCopy) { // shortcut for simple full copy

            outputValue = keyValueCreators.get(0).create(key, value.getBytes(), 0, value.getLength());
            context.write(outputKey, outputValue);

        } else { // normal (complex) case that distributes measures to multiple HBase columns

            inputCodec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), inputMeasures);

            for (int i = 0; i < n; i++) {
                outputValue = keyValueCreators.get(i).create(key, inputMeasures);
                context.write(outputKey, outputValue);
            }
        }
    }

主要就是生成KeyValue,keyValueCreators的create方法如下:

public KeyValue create(byte[] keyBytes, int keyOffset, int keyLength, byte[] value, int voffset, int vlen) {
        return new KeyValue(keyBytes, keyOffset, keyLength, //
                cfBytes, 0, cfBytes.length, //
                qBytes, 0, qBytes.length, //
                timestamp, KeyValue.Type.Put, //
                value, voffset, vlen);
    }

生成HFile的Reducer

public class KeyValueSortReducer extends Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue> {
  protected void reduce(ImmutableBytesWritable row, java.lang.Iterable<KeyValue> kvs,
      org.apache.hadoop.mapreduce.Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue>.Context context)
  throws java.io.IOException, InterruptedException {
    TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
    for (KeyValue kv: kvs) {
      try {
        map.add(kv.clone());
      } catch (CloneNotSupportedException e) {
        throw new java.io.IOException(e);
      }
    }
    context.setStatus("Read " + map.getClass());
    int index = 0;
    for (KeyValue kv: map) {
      context.write(row, kv);
      if (++index % 100 == 0) context.setStatus("Wrote " + index);
    }
  }
}
主要就是利用 TreeSet 进行排序

《OLAP 性能优化指南》欢迎 Star&共建

《OLAP 性能优化指南》

欢迎关注微信公众号