千家信息网

使用BulkLoad从HDFS批量导入数据到HBase

发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,在向Hbase中写入数据时,常见的写入方法有使用HBase API,Mapreduce批量导入数据,使用这些方式带入数据时,一条数据写入到HBase数据库中的大致流程如图。数据发出后首先写入到雨鞋日志
千家信息网最后更新 2025年12月03日使用BulkLoad从HDFS批量导入数据到HBase

在向Hbase中写入数据时,常见的写入方法有使用HBase API,Mapreduce批量导入数据,使用这些方式带入数据时,一条数据写入到HBase数据库中的大致流程如图。

数据发出后首先写入到雨鞋日志WAl中,写入到预写日志中之后,随后写入到内存MemStore中,最后在Flush到Hfile中。这样写数据的方式不会导致数据的丢失,并且道正数据的有序性,但是当遇到大量的数据写入时,写入的速度就难以保证。所以,介绍一种性能更高的写入方式BulkLoad。

使用BulkLoad批量写入数据主要分为两部分:
一、使用HFileOutputFormat2通过自己编写的MapReduce作业将HFile写入到HDFS目录,由于写入到HBase中的数据是按照顺序排序的,HFileOutputFormat2中的configureIncrementalLoad()可以完成所需的配置。
二、将Hfile从HDFS移动到HBase表中,大致过程如图

实例代码pom依赖:

            org.apache.hbase            hbase-server            1.4.0                            org.apache.hadoop            hadoop-client            2.6.4                            org.apache.hbase            hbase-client            0.99.2        
package com.yangshou;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class BulkLoadMapper extends Mapper {    @Override    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {        //读取文件中的每一条数据,以序号作为行键        String line = value.toString();        //将数据进行切分        //切分后数组中的元素分别为:序号,用户id,商品id,用户行为,商品分类,时间,地址        String[] str = line.split(" ");        String id = str[0];        String user_id = str[1];        String item_id = str[2];        String behavior = str[3];        String item_type = str[4];        String time = str[5];        String address = "156";        //拼接rowkey和put        ImmutableBytesWritable rowkry = new ImmutableBytesWritable(id.getBytes());        Put put = new Put(id.getBytes());        put.add("info".getBytes(),"user_id".getBytes(),user_id.getBytes());        put.add("info".getBytes(),"item_id".getBytes(),item_id.getBytes());        put.add("info".getBytes(),"behavior".getBytes(),behavior.getBytes());        put.add("info".getBytes(),"item_type".getBytes(),item_type.getBytes());        put.add("info".getBytes(),"time".getBytes(),time.getBytes());        put.add("info".getBytes(),"address".getBytes(),address.getBytes());        //将数据写出        context.write(rowkry,put);    }}
package com.yangshou;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.TableName;import org.apache.hadoop.hbase.client.*;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class BulkLoadDriver  {    public static void main(String[] args) throws Exception {        //获取Hbase配置        Configuration conf = HBaseConfiguration.create();        Connection conn = ConnectionFactory.createConnection(conf);        Table table = conn.getTable(TableName.valueOf("BulkLoadDemo"));        Admin admin = conn.getAdmin();        //设置job        Job job = Job.getInstance(conf,"BulkLoad");        job.setJarByClass(BulkLoadDriver.class);        job.setMapperClass(BulkLoadMapper.class);        job.setMapOutputKeyClass(ImmutableBytesWritable.class);        job.setMapOutputValueClass(Put.class);        //设置文件的输入输出路径        job.setInputFormatClass(TextInputFormat.class);        job.setOutputFormatClass(HFileOutputFormat2.class);        FileInputFormat.setInputPaths(job,new Path("hdfs://hadoopalone:9000/tmp/000000_0"));        FileOutputFormat.setOutputPath(job,new Path("hdfs://hadoopalone:9000/demo1"));        //将数据加载到Hbase表中        HFileOutputFormat2.configureIncrementalLoad(job,table,conn.getRegionLocator(TableName.valueOf("BulkLoadDemo")));        if(job.waitForCompletion(true)){            LoadIncrementalHFiles load = new LoadIncrementalHFiles(conf);            load.doBulkLoad(new Path("hdfs://hadoopalone:9000/demo1"),admin,table,conn.getRegionLocator(TableName.valueOf("BulkLoadDemo")));        }    }}

实例数据

44979   100640791   134060896   1   5271    2014-12-09  天津市44980   100640791   96243605    1   13729   2014-12-02  新疆

在Hbase shell 中创建表

create 'BulkLoadDemo','info'

打包后执行
```hadoop jar BulkLoadDemo-1.0-SNAPSHOT.jar com.yangshou.BulkLoadDriver

注意:在执行hadoop jar之前应该先将Hbase中的相关包加载过来

export HADOOP_CLASSPATH=$HBASE_HOME/lib/*

数据 方式 商品 实例 序号 文件 日志 用户 如图 配置 有序 代码 元素 内存 地址 常见 性能 数据库 数组 方法 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 网络安全竞赛试题及答案解析 面试网络安全你怎么看 内存数据库核心技术问题 linux怎么看远程服务器端口 安康软件开发方案 虚拟机挂载虚拟服务器报错 数据库的sa账号是什么权限 进口企业实时数据库批发 软件开发每天的报价是多少 中美网络技术工作室 上海联通软件开发待遇 阜阳系统软件开发哪家好 广东萤火虫科技互联网有限公司 选择菜鸟打印服务器失败 小程序软件开发多少钱一个 南京亿赢网络技术 江苏数据网络技术创新服务 张家港知名服务器生产商 青浦区互联网软件开发怎么样 软件开发什么程度可以找工作 巨杉数据库和tidb哪个好 php调用数据库图片 政治网络安全辨析 让软件开发的加我微信 云erp软件是用什么软件开发的 服务器系统垃圾清理 2021网络安全专题会议 科技发展与互联网的关系 3g无线网络技术特点 锦江区小兵软件开发工作室
0