作者: 康凯森
日期: 2016-04-25
分类: HBase
主要工作就是将HBase的数据格式转化为Hive中的数据格式。
因为HBase中的存储格式都是字节数组,所以转化为Hive数据时需要知道HBase中的数据原本的数据类型是什么,并转化为相应的数据类型。
我们在导入Hive时可能并不需要导入Hbase整张表的数据,我们参考HBase源码进行过滤
private static Scan getConfiguredScanForJob(Configuration conf, String[] args) throws IOException {
Scan s = new Scan();
// Optional arguments.
// Set Scan Versions
s.setMaxVersions(1);
// Set Scan Range
long startTime = args.length > 2 ? Long.parseLong(args[2]) : 0L;
long endTime = args.length > 3 ? Long.parseLong(args[3]) : Long.MAX_VALUE;
s.setTimeRange(startTime, endTime);
// Set cache blocks
s.setCacheBlocks(false);
// set Start and Stop row
if (conf.get(TableInputFormat.SCAN_ROW_START) != null) {
s.setStartRow(Bytes.toBytes(conf.get(TableInputFormat.SCAN_ROW_START)));
}
if (conf.get(TableInputFormat.SCAN_ROW_STOP) != null) {
s.setStopRow(Bytes.toBytes(conf.get(TableInputFormat.SCAN_ROW_STOP)));
}
// Set Scan Column Family
boolean raw = Boolean.parseBoolean(conf.get(RAW_SCAN));
if (raw) {
s.setRaw(raw);
}
if (conf.get(TableInputFormat.SCAN_COLUMN_FAMILY) != null) {
s.addFamily(Bytes.toBytes(conf.get(TableInputFormat.SCAN_COLUMN_FAMILY)));
}
// Set RowFilter or Prefix Filter if applicable.
Filter exportFilter = getExportFilter(args);
if (exportFilter != null) {
LOG.info("Setting Scan Filter for Export.");
s.setFilter(exportFilter);
}
int batching = conf.getInt(EXPORT_BATCHING, -1);
if (batching != -1) {
try {
s.setBatch(batching);
} catch (IncompatibleFilterException e) {
LOG.error("Batching could not be set", e);
}
}
return s;
}
/**
* Created by kangkaisen on 15/10/27.
*/
public class Mapper extends TableMapper<Text, NullWritable> {
public Mapper() {
}
String cfName;
String rowkeyType;
Text key = new Text();
HashMap<String, String> columnMap = new HashMap<String, String>();
public static void initJob(String table, Scan scan, Class<? extends TableMapper> mapper, Job job) throws IOException {
TableMapReduceUtil.initTableMapperJob(table, scan, mapper, Text.class, NullWritable.class, job);
}
protected void setup(Context context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
cfName = conf.get(TableInputFormat.SCAN_COLUMN_FAMILY);
rowkeyType = conf.get("hbase.rowkey.type");
getColumns(conf.get(TableInputFormat.SCAN_COLUMNS));
}
@Override
public void map(ImmutableBytesWritable row, Result values, Context context) throws IOException, InterruptedException {
StringBuilder result = new StringBuilder();
result.append(getString(values.getRow(), rowkeyType));
result.append("\t");
for (Map.Entry<String, String> entry : columnMap.entrySet()) {
byte[] cell = values.getValue(cfName.getBytes(), entry.getKey().getBytes());
if (cell != null) {
result.append(getString(cell, entry.getValue()));
result.append("\t");
} else {
result.append(" ");
result.append("\t");
}
}
key.set(result.toString());
context.write(key, NullWritable.get());
}
private void getColumns(String columns) {
if (columns != null) {
String[] cols = columns.split(" ");
for (int i = 0; i < cols.length; i++) {
String[] kv = cols[i].split(":");
columnMap.put(kv[0], kv[1]);
}
}
}
private String getString(byte[] cell, String type) {
TYPE_ENUM currentType = TYPE_ENUM.valueOf(type.toUpperCase());
switch (currentType) {
case STRING:
return new String(cell);
case INT:
Integer i = Bytes.toInt(cell);
return i.toString();
case LONG:
Long l = Bytes.toLong(cell);
return l.toString();
case FLOAT:
Float f = Bytes.toFloat(cell);
return f.toString();
case DOUBLE:
Double d = Bytes.toDouble(cell);
return d.toString();
default:
throw new IllegalArgumentException("wrong type");
}
}
private enum TYPE_ENUM {
STRING,
INT,
LONG,
FLOAT,
DOUBLE
}
}