千家信息网

MapReduce将文本数据导入到HBase中

发表于:2025-11-07 作者:千家信息网编辑
千家信息网最后更新 2025年11月07日,整体描述:将本地文件的数据整理之后导入到hbase中在HBase中创建表数据格式MapReduce程序map程序package com.hadoop.mapreduce.test.map;import
千家信息网最后更新 2025年11月07日MapReduce将文本数据导入到HBase中
  1. 整体描述:将本地文件的数据整理之后导入到hbase中

  2. 在HBase中创建表


  3. 数据格式


  4. MapReduce程序


    map程序


    package com.hadoop.mapreduce.test.map;import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;public class WordCountHBaseMapper extends Mapper{        public Text keyValue = new Text();    public Text valueValue = new Text();    //数据类型为:key@addressValue#ageValue#sexValue    @Override    protected void map(Object key, Text value, Context context)            throws IOException, InterruptedException {        String lineValue = value.toString();                if(lineValue != null){            String[] valuesArray = lineValue.split("@");            context.write(new Text(valuesArray[0]), new Text(valuesArray[1]));        }    }}

    Reduce程序


    package com.hadoop.mapreduce.test.reduce;import java.io.IOException;import java.util.Iterator;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.mapreduce.TableReducer;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;public class WordCountHBaseReduce extends TableReducer{    @Override    protected void reduce(Text key, Iterable value, Context out)            throws IOException, InterruptedException {        String keyValue = key.toString();        Iterator valueIterator = value.iterator();        while(valueIterator.hasNext()){            Text valueV = valueIterator.next();            String[] valueArray = valueV.toString().split("#");                        Put putRow = new Put(keyValue.getBytes());            putRow.add("address".getBytes(), "baseAddress".getBytes(),                         valueArray[0].getBytes());            putRow.add("sex".getBytes(), "baseSex".getBytes(),                         valueArray[1].getBytes());            putRow.add("age".getBytes(), "baseAge".getBytes(),                         valueArray[2].getBytes());                        out.write(NullWritable.get(), putRow);        }    }}

    主程序



    package com.hadoop.mapreduce.test;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import com.hadoop.mapreduce.test.map.WordCountHBaseMapper;import com.hadoop.mapreduce.test.reduce.WordCountHBaseReduce;/** * 将hdfs上的内容读取到,并插入到hbase的表中,然后读取hbase表中的内容,将统计结果插入到hbase中  */public class WordCountHBase {    public static void main(String args[]) throws IOException,         InterruptedException, ClassNotFoundException{                Configuration conf = HBaseConfiguration.create();        conf.set("hbase.zookeeper.quorum", "192.168.192.137");         Job job = Job.getInstance(conf, "MapReduceHbaseJob");        //各种class        job.setJarByClass(WordCountHBase.class);        job.setMapperClass(WordCountHBaseMapper.class);        TableMapReduceUtil.initTableReducerJob("userInfo3",                WordCountHBaseReduce.class, job);                FileInputFormat.addInputPath(job, new Path(args[0]));        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(Text.class);                System.exit(job.waitForCompletion(true) ? 0 : 1);    }}

    结果:


  5. 注:如果运行的client没有hbase,需要在hadoop里面的lib中加入hbase的lib

0