HBase 数据导入Hive


作者: 康凯森

日期: 2016-04-25

分类: HBase


HBase 数据导入 Hive 原理

主要工作就是将HBase的数据格式转化为Hive中的数据格式。

HBase 数据导入 Hive 基本步骤

  • 跑一个MapRudece作业,在Map阶段读取Hbase表的数据,并将HBase的KV数据格式转化为Hive表的数据格式,并在Map阶段直接输出到HDFS
  • 在Hive中建表,并将HDFS中的数据load进Hive中。

HBase 数据导入 Hive 注意问题

数据类型

因为HBase中的存储格式都是字节数组,所以转化为Hive数据时需要知道HBase中的数据原本的数据类型是什么,并转化为相应的数据类型。

数据过滤

我们在导入Hive时可能并不需要导入Hbase整张表的数据,我们参考HBase源码进行过滤

  private static Scan getConfiguredScanForJob(Configuration conf, String[] args) throws IOException {
    Scan s = new Scan();
    // Optional arguments.
    // Set Scan Versions
    s.setMaxVersions(1);
    // Set Scan Range
    long startTime = args.length > 2 ? Long.parseLong(args[2]) : 0L;
    long endTime = args.length > 3 ? Long.parseLong(args[3]) : Long.MAX_VALUE;
    s.setTimeRange(startTime, endTime);
    // Set cache blocks
    s.setCacheBlocks(false);
    // set Start and Stop row
    if (conf.get(TableInputFormat.SCAN_ROW_START) != null) {
      s.setStartRow(Bytes.toBytes(conf.get(TableInputFormat.SCAN_ROW_START)));
    }
    if (conf.get(TableInputFormat.SCAN_ROW_STOP) != null) {
      s.setStopRow(Bytes.toBytes(conf.get(TableInputFormat.SCAN_ROW_STOP)));
    }
    // Set Scan Column Family
    boolean raw = Boolean.parseBoolean(conf.get(RAW_SCAN));
    if (raw) {
      s.setRaw(raw);
    }
    if (conf.get(TableInputFormat.SCAN_COLUMN_FAMILY) != null) {
      s.addFamily(Bytes.toBytes(conf.get(TableInputFormat.SCAN_COLUMN_FAMILY)));
    }
    // Set RowFilter or Prefix Filter if applicable.
    Filter exportFilter = getExportFilter(args);
    if (exportFilter != null) {
      LOG.info("Setting Scan Filter for Export.");
      s.setFilter(exportFilter);
    }
    int batching = conf.getInt(EXPORT_BATCHING, -1);
    if (batching != -1) {
      try {
        s.setBatch(batching);
      } catch (IncompatibleFilterException e) {
        LOG.error("Batching could not be set", e);
      }
    }
    return s;
  }

HBase 数据导入 Hive 核心代码

/**
 * Created by kangkaisen on 15/10/27.
 */
public class Mapper extends TableMapper<Text, NullWritable> {
  public Mapper() {
  }

  String cfName;
  String rowkeyType;
  Text key = new Text();
  HashMap<String, String> columnMap = new HashMap<String, String>();

  public static void initJob(String table, Scan scan, Class<? extends TableMapper> mapper, Job job) throws IOException {
    TableMapReduceUtil.initTableMapperJob(table, scan, mapper, Text.class, NullWritable.class, job);
  }

  protected void setup(Context context) throws IOException, InterruptedException {
    Configuration conf = context.getConfiguration();
    cfName = conf.get(TableInputFormat.SCAN_COLUMN_FAMILY);
    rowkeyType = conf.get("hbase.rowkey.type");
    getColumns(conf.get(TableInputFormat.SCAN_COLUMNS));
  }

  @Override
  public void map(ImmutableBytesWritable row, Result values, Context context) throws IOException, InterruptedException {
    StringBuilder result = new StringBuilder();
    result.append(getString(values.getRow(), rowkeyType));
    result.append("\t");
    for (Map.Entry<String, String> entry : columnMap.entrySet()) {
      byte[] cell = values.getValue(cfName.getBytes(), entry.getKey().getBytes());
      if (cell != null) {
        result.append(getString(cell, entry.getValue()));
        result.append("\t");
      } else {
        result.append(" ");
        result.append("\t");
      }
    }
    key.set(result.toString());
    context.write(key, NullWritable.get());
  }

  private void getColumns(String columns) {
    if (columns != null) {
      String[] cols = columns.split(" ");
      for (int i = 0; i < cols.length; i++) {
        String[] kv = cols[i].split(":");
        columnMap.put(kv[0], kv[1]);
      }
    }
  }

  private String getString(byte[] cell, String type) {
    TYPE_ENUM currentType = TYPE_ENUM.valueOf(type.toUpperCase());
    switch (currentType) {
      case STRING:
        return new String(cell);
      case INT:
        Integer i = Bytes.toInt(cell);
        return i.toString();
      case LONG:
        Long l = Bytes.toLong(cell);
        return l.toString();
      case FLOAT:
        Float f = Bytes.toFloat(cell);
        return f.toString();
      case DOUBLE:
        Double d = Bytes.toDouble(cell);
        return d.toString();
      default:
        throw new IllegalArgumentException("wrong type");
    }
  }

  private enum TYPE_ENUM {
    STRING,
    INT,
    LONG,
    FLOAT,
    DOUBLE
  }
}

《OLAP 性能优化指南》欢迎 Star&共建

《OLAP 性能优化指南》

欢迎关注微信公众号