作者: 康凯森
日期: 2016-04-10
分类: HBase
源文件格式:
175.44.30.93 - - [29/Sep/2013:00:10:15 +0800] "GET /structure/heap HTTP/1.1" 301 406 "-" "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1;)"
Java代码:
package hbase;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class HbaseMr {
public static class MapperClass extends Mapper<LongWritable,Text,ImmutableBytesWritable,Put>{
public void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException{
String[] strs = value.toString().split(" ");
String rowkey = strs[0]+"-"+strs[3].substring(1);
byte [] row = Bytes.toBytes(rowkey);
byte [] family = Bytes.toBytes("info");
byte [] qualifier = Bytes.toBytes("url");
byte [] values = Bytes.toBytes(strs[6]);
Put put=new Put(row);
put.add(family,qualifier,values);
context.write(new ImmutableBytesWritable(row),put);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set(TableOutputFormat.OUTPUT_TABLE,"access-log");
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
Job job = Job.getInstance(conf, "Hbase_Mr");
job.setNumReduceTasks(0);
job.setJarByClass(HbaseMr.class);
job.setMapperClass(MapperClass.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Put.class);
job.setOutputFormatClass(TableOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
导入效果如下:
统计上张图中的同一IP的目录总数,代码如下:
package hbase;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
public class HbaseMR {
final static String inTable = "access-log";
final static String inFamily = "info";
final static String inColumn = "url";
final static String outTable="total-access";
final static String outFamily="url";
final static String outColumn="count";
public static class Mapper extends TableMapper<Text,Text> {
public Mapper() {}
@Override
public void map(ImmutableBytesWritable row, Result values,Context context) throws IOException, InterruptedException {
byte [] b =values.getValue(inFamily.getBytes(), inColumn.getBytes());
if(b!=null){
String v = new String(b);
String r= new String(values.getRow());
String[] strs = r.split("-");
String ip=strs[0];
context.write(new Text(ip), new Text(v));
}
}
}
public static class Reducer extends TableReducer<Text, Text, Text> {
@Override
public void reduce(Text key,Iterable<Text> values,
Context context) throws IOException, InterruptedException {
int count=0;
String sum=null;
for (Text val : values) {
count++;
}
sum=String.valueOf(count);
Put put = new Put(Bytes.toBytes(key.toString()));
put.add(outFamily.getBytes(),outColumn.getBytes(),sum.getBytes());
context.write(key, put);
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = HBaseConfiguration.create();
Job job = Job.getInstance(conf, "HbaseMR");
job.setJarByClass(HbaseMR.class);
Scan scan = new Scan();
TableMapReduceUtil.initTableMapperJob("access-log",scan,Mapper.class,
Text.class, Text.class, job);
TableMapReduceUtil.initTableReducerJob("total-access",Reducer.class, job);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
新表效果如下:
在上面的导入hbase过程中我们并没有考虑同一IP访问的子目录重复问题,所以我们应该去重。
利用经过shuffle之后输入到reduce中key的唯一性直接输出key即可。
具体就是map的输出的key为map输入的value,value为null。自然reduce的输入就是map的输出。当reduce接收到一个<key,value-list>时,其中value-list为null,就直接将key复制到输出的key中,并将value设置成空值
map端的输出:
context.write(value, NullWriteable.get());
reduce端的输出:
context.write(key, NullWriteable.get());
在Mapreduce导入Hbase时我们也可以先去重,然后再导入,不过那样会需要俩次任务。
我们可以直接在上面的reduce任务中去重即可。利用HashSet来去重。改动代码如下:
public static class Reducer extends TableReducer<Text, Text, Text> {
@Override
public void reduce(Text key,Iterable<Text> values,
Context context) throws IOException, InterruptedException {
String sum=null;
HashSet<Text> set = new HashSet<Text>();
for (Text val : values) {
set.add(val);
}
sum=String.valueOf(set.size());
Put put = new Put(Bytes.toBytes(key.toString()));
put.add(outFamily.getBytes(),outColumn.getBytes(),sum.getBytes());
context.write(key, put);
}
}
同一数据源,去重后效果如下: 大家可以发现,统计的值明显减少。