从hadoop取出文件写入hbase表中
example2; java.io.IOException; org.apache.hadoop.conf.Configuration; org.apache.hadoop.hbase.HBaseConfiguration; org.apache.hadoop.hbase.KeyValue; org.apache.hadoop.hbase.client.Put; org.apache.hadoop.hbase.io.ImmutableBytesWritable; org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; org.apache.hadoop.hbase.mapreduce.TableOutputFormat; org.apache.hadoop.hbase.mapreduce.TableReducer; org.apache.hadoop.hbase.util.Bytes; org.apache.hadoop.io.LongWritable; org.apache.hadoop.io.Text; org.apache.hadoop.mapreduce.Job; org.apache.hadoop.mapreduce.Mapper; org.apache.hadoop.mapreduce.lib.input.FileInputFormat; org.apache.hadoop.util.GenericOptionsParser; { MapperLongWritable, Text, ImmutableBytesWritable,Text { map(LongWritable key, Text value, Context context) IOException, InterruptedException { context.write( ImmutableBytesWritable(Bytes.toBytes(key.get())), value); } } TableReducerImmutableBytesWritable, Text, ImmutableBytesWritable { [] family; []qualifier; setup(Context context) IOException, InterruptedException { columncontext.getConfiguration().get(); [][]colkeyKeyValue.parseColumn(Bytes.toBytes(column)); familycolkey[]; (colkey.length){ qualifiercolkey[]; } } reduce(ImmutableBytesWritable key, IterableText values, Context context) IOException, InterruptedException { valueCon; (Text text:values){ valueContext.toString(); } Put put Put(key.get()); put.add(family, qualifier, Bytes.toBytes(valueCon)); context.write(key, put); } } main([] args) Exception { Configuration confHBaseConfiguration.create(); []argArray GenericOptionsParser(conf, args).getRemainingArgs(); (argArray.length){ System.exit(); } conf.set(, ); Job job Job(conf,); job.setJarByClass(ImportFromFileExample.); job.setMapperClass(ImportMapper.); job.setOutputFormatClass(TableOutputFormat.); job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, ); job.setMapOutputKeyClass(ImmutableBytesWritable.); job.setMapOutputValueClass(Text.); job.setOutputKeyClass(ImmutableBytesWritable.); job.setOutputValueClass(Put.); TableMapReduceUtil.initTableReducerJob(, Reducer1., job); FileInputFormat.addInputPaths(job, argArray[]); System.exit(job.waitForCompletion() : ); } }
2从hbase加载数据到hdfs
example2; java.io.IOException; org.apache.hadoop.conf.Configuration; org.apache.hadoop.fs.Path; org.apache.hadoop.hbase.HBaseConfiguration; org.apache.hadoop.hbase.KeyValue; org.apache.hadoop.hbase.client.Result; org.apache.hadoop.hbase.client.Scan; org.apache.hadoop.hbase.io.ImmutableBytesWritable; org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; org.apache.hadoop.hbase.mapreduce.TableMapper; org.apache.hadoop.hbase.util.Bytes; org.apache.hadoop.io.IntWritable; org.apache.hadoop.io.Text; org.apache.hadoop.mapreduce.Job; org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; org.apache.hadoop.util.GenericOptionsParser; { TableMapperIntWritable, Text{ map(ImmutableBytesWritable key, Result value, Context context) IOException, InterruptedException { (KeyValue kv:value.raw()){ context.write(, Text(Bytes.toString(kv.getValue()))); } } } main([] args) Exception { Configuration confHBaseConfiguration.create(); []argArray GenericOptionsParser(conf, args).getRemainingArgs(); (argArray.length){ System.exit(); } Job job Job(conf,); job.setJarByClass(FromHBaseToHDFSExample.); TableMapReduceUtil.initTableMapperJob(, Scan(), HBaseMapper., IntWritable., Text., job); FileOutputFormat.setOutputPath(job, Path(argArray[])); job.setNumReduceTasks(); System.exit(job.waitForCompletion() : ); } }
3hbase 到hbase
example2; java.io.IOException; org.apache.hadoop.conf.Configuration; org.apache.hadoop.hbase.Cell; org.apache.hadoop.hbase.HBaseConfiguration; org.apache.hadoop.hbase.client.Put; org.apache.hadoop.hbase.client.Result; org.apache.hadoop.hbase.client.Scan; org.apache.hadoop.hbase.io.ImmutableBytesWritable; org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; org.apache.hadoop.hbase.mapreduce.TableMapper; org.apache.hadoop.hbase.mapreduce.TableReducer; org.apache.hadoop.hbase.util.Bytes; org.apache.hadoop.io.Text; org.apache.hadoop.mapreduce.Job; { TableMapperImmutableBytesWritable, Text{ map(ImmutableBytesWritable key, Result value, Context context) IOException, InterruptedException { (Cell kv:value.rawCells()){ Text out Text(Bytes.toString(kv.getFamilyArray())Bytes.toString(kv.getQualifierArray())Bytes.toString(kv.getValueArray())); context.write( ImmutableBytesWritable(kv.getRowArray()), out); } } } TableReducerImmutableBytesWritable, Text, ImmutableBytesWritable{ reduce(ImmutableBytesWritable key, IterableText values, Context context) IOException, InterruptedException { (Text value:values){ textvalue.toString(); [] textArraytext.split(); Put put Put(key.get()); System.out.println(Bytes.toBytes(textArray[])); put.add(Bytes.toBytes(), Bytes.toBytes(textArray[]), Bytes.toBytes(textArray[])); context.write(, put); } } } main([] args) Exception { Configuration confHBaseConfiguration.create(); conf.set(, ); conf.set(, ); Job job Job(conf,); job.setJarByClass(FromHBaseToHBaseExample.); TableMapReduceUtil.initTableMapperJob(conf.get(), Scan(), HBaseMapper.,ImmutableBytesWritable., Text., job); TableMapReduceUtil.initTableReducerJob(conf.get(), HBaseReducer., job); System.exit(job.waitForCompletion():); } }