千家信息网

hadoop中如何实现DBInputFormat

发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,这篇文章主要介绍了hadoop中如何实现DBInputFormat,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。代码未做测试,先做记录
千家信息网最后更新 2025年12月02日hadoop中如何实现DBInputFormat

这篇文章主要介绍了hadoop中如何实现DBInputFormat,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。

代码未做测试,先做记录

package com.test;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.net.URI;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;import org.apache.hadoop.mapreduce.lib.db.DBWritable;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;/** * 要运行本示例 * 1、把mysql的jdbc驱动放到taskTracker的lib目录下,重启集群 * */public class WordCountDB extends Configured implements Tool {  private String OUT_PATH = "hdfs://grid131:9000/output";  public static class Map extends Mapper {  public void map(LongWritable key, MyUser value, Context context) throws IOException, InterruptedException {   context.write(key, new Text(value.toString()));  } }  public int run(String[] args) throws Exception {  Configuration conf = this.getConf();  DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://grid131:3306/test", "root", "admin");    //输出路径如果存在,则删除  FileSystem fs = FileSystem.get(new URI(OUT_PATH), conf);  fs.delete(new Path(OUT_PATH), true);    Job job = new Job(conf, WordCountDB.class.getSimpleName());  job.setJarByClass(WordCountDB.class);    FileOutputFormat.setOutputPath(job, new Path(args[1]));    //指定不需要reduce,直接把map输出写入到hdfs中  job.setNumReduceTasks(0);  job.setInputFormatClass(DBInputFormat.class);    //指定表、字段  //DBInputFormat.setInput(job, inputClass, tableName, conditions, orderBy, fieldNames)  DBInputFormat.setInput(job, MyUser.class, "myuser", null, null, "id", "name");  job.setMapperClass(Map.class);    //当reduce输出类型与map输出类型一致时,map的输出类型可以不设置  job.setMapOutputKeyClass(LongWritable.class);  job.setMapOutputValueClass(Text.class);    job.waitForCompletion(true);    return job.isSuccessful()?0:1; }  public static void main(String[] args) throws Exception {  int exit = ToolRunner.run(new WordCount(), args);  System.exit(exit); }}class MyUser implements Writable, DBWritable { private Long id; private String name;  public Long getId() {  return id; } public void setId(Long id) {  this.id = id; } public String getName() {  return name; } public void setName(String name) {  this.name = name; }  @Override public void write(DataOutput out) throws IOException {  out.writeLong(this.id);  Text.writeString(out, this.name); }  @Override public void readFields(DataInput in) throws IOException {  this.id = in.readLong();  this.name = Text.readString(in); }  @Override public void write(PreparedStatement statement) throws SQLException {  statement.setLong(1, this.id);  statement.setString(2, this.name); }  @Override public void readFields(ResultSet resultSet) throws SQLException {  this.id = resultSet.getLong(1);  this.name = resultSet.getString(2); }  @Override public String toString() {  return this.id + "\t" + this.name; }}

感谢你能够认真阅读完这篇文章,希望小编分享的"hadoop中如何实现DBInputFormat"这篇文章对大家有帮助,同时也希望大家多多支持,关注行业资讯频道,更多相关知识等着你来学习!

0