Mapreduce导入导出Hbase0.98代码示例


作者: 康凯森

日期: 2016-04-10

分类: HBase


1:Mapreduce从HDFS导入Hbase0.98代码示例

源文件格式:

175.44.30.93 - - [29/Sep/2013:00:10:15 +0800] "GET /structure/heap HTTP/1.1" 301 406 "-" "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1;)"

Java代码:

package hbase;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class HbaseMr {

      public static class MapperClass extends Mapper<LongWritable,Text,ImmutableBytesWritable,Put>{ 

            public void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException{  

            String[] strs = value.toString().split(" ");
                String rowkey = strs[0]+"-"+strs[3].substring(1);

                  byte [] row = Bytes.toBytes(rowkey);  
                  byte [] family = Bytes.toBytes("info");  
                  byte [] qualifier = Bytes.toBytes("url");  
                  byte [] values = Bytes.toBytes(strs[6]);  
                 Put put=new Put(row);  
                 put.add(family,qualifier,values);  

                 context.write(new ImmutableBytesWritable(row),put);  
            }  
    }  

      public static void main(String[] args) throws Exception {

          Configuration conf = HBaseConfiguration.create();
            conf.set(TableOutputFormat.OUTPUT_TABLE,"access-log"); 
            String[] otherArgs = new GenericOptionsParser(conf, args)  
            .getRemainingArgs();
            Job job = Job.getInstance(conf, "Hbase_Mr");
            job.setNumReduceTasks(0);
            job.setJarByClass(HbaseMr.class);
            job.setMapperClass(MapperClass.class);
            job.setOutputKeyClass(ImmutableBytesWritable.class);  
            job.setOutputValueClass(Put.class);  
         job.setOutputFormatClass(TableOutputFormat.class); 

            FileInputFormat.addInputPath(job, new Path(otherArgs[0]));  
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }

}

导入效果如下: 此处输入图片的描述

2:Mapreduce从Hbase导入Hbase代码示例

统计上张图中的同一IP的目录总数,代码如下:

package hbase;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;

import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;

public class HbaseMR {

    final static String inTable = "access-log";
    final static String inFamily = "info";
    final static String inColumn = "url";

    final static  String outTable="total-access";
    final static  String outFamily="url";
    final static  String outColumn="count";
    public static class Mapper extends TableMapper<Text,Text> {
        public Mapper() {}
        @Override
        public void map(ImmutableBytesWritable row, Result values,Context context) throws IOException, InterruptedException {

          byte [] b =values.getValue(inFamily.getBytes(), inColumn.getBytes());
          if(b!=null){
           String v = new String(b);

           String r= new String(values.getRow());
           String[] strs = r.split("-");
           String ip=strs[0];

           context.write(new Text(ip), new Text(v));
          }
        }
    }

    public static class Reducer extends TableReducer<Text, Text, Text> {
        @Override
        public void reduce(Text key,Iterable<Text> values,
          Context context) throws IOException, InterruptedException {
         int count=0;
         String sum=null;
         for (Text val : values) {
             count++;
         }
         sum=String.valueOf(count);

         Put put = new Put(Bytes.toBytes(key.toString()));

         put.add(outFamily.getBytes(),outColumn.getBytes(),sum.getBytes());
         context.write(key, put);
        }
        }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = HBaseConfiguration.create();

        Job job =  Job.getInstance(conf, "HbaseMR");
        job.setJarByClass(HbaseMR.class);
        Scan scan = new Scan();
        TableMapReduceUtil.initTableMapperJob("access-log",scan,Mapper.class,
                 Text.class, Text.class, job);
            TableMapReduceUtil.initTableReducerJob("total-access",Reducer.class, job);
            System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

}

新表效果如下: 此处输入图片的描述

3:Mapreduce导入Hbase时去重

在上面的导入hbase过程中我们并没有考虑同一IP访问的子目录重复问题,所以我们应该去重。

一般的Mapreduce去重思路:

利用经过shuffle之后输入到reduce中key的唯一性直接输出key即可。

具体就是map的输出的key为map输入的value,value为null。自然reduce的输入就是map的输出。当reduce接收到一个<key,value-list>时,其中value-list为null,就直接将key复制到输出的key中,并将value设置成空值

map端的输出:

context.write(value, NullWriteable.get());

reduce端的输出:

context.write(key, NullWriteable.get());

在Mapreduce导入Hbase时我们也可以先去重,然后再导入,不过那样会需要俩次任务。

我们可以直接在上面的reduce任务中去重即可。利用HashSet来去重。改动代码如下:

public static class Reducer extends TableReducer<Text, Text, Text> {
        @Override
        public void reduce(Text key,Iterable<Text> values,
          Context context) throws IOException, InterruptedException {

         String sum=null;
         HashSet<Text> set = new HashSet<Text>();

         for (Text val : values) {
             set.add(val);
         }

         sum=String.valueOf(set.size());

         Put put = new Put(Bytes.toBytes(key.toString()));

         put.add(outFamily.getBytes(),outColumn.getBytes(),sum.getBytes());
         context.write(key, put);
        }
        }

同一数据源,去重后效果如下: 此处输入图片的描述 大家可以发现,统计的值明显减少。


《OLAP 性能优化指南》欢迎 Star&共建

《OLAP 性能优化指南》

欢迎关注微信公众号