作者: 康凯森
日期: 2016-04-26
分类: HBase
主要工作就是将Hive的数据格式转化为HBase中的数据格式。bulkload导入 HBase 绕过写memstore,直接写HFile,不影响线上业务,并且做到实时生效。
其实质就是一个mapreduce任务。
map 阶段 : 将Hive表中的数据转化为HBase中的KeyValue结构。
reduce 阶段: 由于HFile要求写入的数据是按照(rowkey,family,column,ts)有序排列,所有这一步主要利用TreeSet
对map阶段生成的keyvalue进行排序。
其实就是一个HDFS上的mv操作,将生成的HFile mv到HBASE表的相应分区内,及时生效,导入过程不影响线上数据读写操作。
主要是调用Hbase的LoadIncrementalHFiles
类。
一般在生成keyvalue的时候把Hive中的数据都变为了String类型。
因为HFile的 KeyValue 必须有序,所以要求Hive源数据中对于同一Rowkey下同一列族同一列的值不可以重复。
HFileOutputFormat.configureIncrementalLoad(job, table);
默认我们可以通过此方法来确定生成HFile的MR任务的reduce个数,这个方法会读取Hbase表的splitkey来确定生成HFile的MR任务的reduce个数。
如果Hive数据源使用的HDFS集群和HBase表使用的HDFS集群不是同一个集群,我们在生成HFile后还需要 通过distcp
将HFile 传输到HBase表所在的HDFS集群。
我们必须保证我们使用的用户拥有相应的Hbase集群以及HBase集群依赖的HDFS集群的读写权限。
如果HFile跨region,在BulkLoad的时候就会进行split,就会导致BulkLoad的速度比较慢。所以如果是我们自己确定splitkey的话一定要保证单个HFile不跨region
有时候可能HBase表的region数很少,导致由上述方法确定的生成HFile的MR任务的reduce个数很少,进而导致MR任务时间很长。这时候我们也可以根据生成HFile的个数划分splitkey,增加生成HFile的MR任务的reduce个数,减少MR任务时间。
公司代码不方面贴,就给大家展示一下Kylin里的代码
@Override
public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
outputKey.set(key.getBytes(), 0, key.getLength());
KeyValue outputValue;
int n = keyValueCreators.size();
if (n == 1 && keyValueCreators.get(0).isFullCopy) { // shortcut for simple full copy
outputValue = keyValueCreators.get(0).create(key, value.getBytes(), 0, value.getLength());
context.write(outputKey, outputValue);
} else { // normal (complex) case that distributes measures to multiple HBase columns
inputCodec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), inputMeasures);
for (int i = 0; i < n; i++) {
outputValue = keyValueCreators.get(i).create(key, inputMeasures);
context.write(outputKey, outputValue);
}
}
}
主要就是生成KeyValue,keyValueCreators的create方法如下:
public KeyValue create(byte[] keyBytes, int keyOffset, int keyLength, byte[] value, int voffset, int vlen) {
return new KeyValue(keyBytes, keyOffset, keyLength, //
cfBytes, 0, cfBytes.length, //
qBytes, 0, qBytes.length, //
timestamp, KeyValue.Type.Put, //
value, voffset, vlen);
}
public class KeyValueSortReducer extends Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue> {
protected void reduce(ImmutableBytesWritable row, java.lang.Iterable<KeyValue> kvs,
org.apache.hadoop.mapreduce.Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue>.Context context)
throws java.io.IOException, InterruptedException {
TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
for (KeyValue kv: kvs) {
try {
map.add(kv.clone());
} catch (CloneNotSupportedException e) {
throw new java.io.IOException(e);
}
}
context.setStatus("Read " + map.getClass());
int index = 0;
for (KeyValue kv: map) {
context.write(row, kv);
if (++index % 100 == 0) context.setStatus("Wrote " + index);
}
}
}
主要就是利用 TreeSet 进行排序