千家信息网

MapReduce怎么使用

发表于:2025-12-01 作者:千家信息网编辑
千家信息网最后更新 2025年12月01日,本篇内容主要讲解"MapReduce怎么使用",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"MapReduce怎么使用"吧!什么是MRMR是一种分布计算模型
千家信息网最后更新 2025年12月01日MapReduce怎么使用

本篇内容主要讲解"MapReduce怎么使用",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"MapReduce怎么使用"吧!

  1. 什么是MR

    MR是一种分布计算模型,主要用来解决海量数据的计算问题的。它包含了两种计算函数,一个是Mapping,另外一个是Reducing。Mapping对集合内的每个目标做同一个操作,Reduceing则是遍历集合中的元素返回一个综合的结果。我们操作代码时,只需要重写map和reduce方法就行,十分简单。这两个函数的形参都是k,v对,当数据量到达10PB以上时,则会速度变慢。

  2. MR执行过程

    MR程序启动时,会把输入文件转化成键值对传给map函数,有几个键值对就执行几次map函数,但不是说有几个键值对就有几个Mapper进程,这是不对的。经过map函数处理,变成键值对。由转变成reduce函数的输入的过程被称之为shuffle。shuffle并不是象map和reduce这样的某个函数,不是需要单独拿出节点运行的,它仅仅只是一个过程。进过reduce函数处理,变成了最后的输出。在到达reduce函数之前,键值对的数目是不变的。

    Map阶段

    (1).根据输入文件解析成对,每一对调用一次map函数

    (2).根据自己编写的map函数,将键值对处理,变成新的键值对输出

    (3).对输出的键值对进行分区,不同分区对应着不同的Reducer进程

    (4).每个分区中的键值对,根据key进行排序,分组。然后把相同key的val放到同一个集合中。

    (5).进行规约(可选)

Reduce阶段

(1).多个map函数输出的kv对,按照不同分区,传输到不同的reduce节点上。

(2).将多个map函数输出的kv对合并,排序。根据reduce函数逻辑,处理,转换成新的键值对输出

(3).输出保存文件

3.简单例子

Wordcount

public class WordCount { public static class  MyMapper extends Mapper{  Text k2=new Text();  LongWritable v2=new LongWritable();  @Override  protected void map(LongWritable k1, Text v1,Context context)    throws IOException, InterruptedException {    String[] words=v1.toString().split("\t");    for (String string : words) {     k2.set(string);     v2.set(1L);    context.write(k2, v2);   }  } } public static class MyReduce extends Reducer{  LongWritable v3=new LongWritable();  @Override  protected void reduce(Text k2, Iterable v2s,Context context) throws IOException, InterruptedException {   long sum=0;    for (LongWritable longWritable : v2s) {    sum=sum+longWritable.get();   }   v3.set(sum);   context.write(k2, v3);  }   } public static void main(String[] args) throws Exception {  Configuration conf=new Configuration();  Job job=Job.getInstance(conf, WordCount.class.getSimpleName());  job.setJarByClass(WordCount.class);  job.setMapperClass(MyMapper.class);  job.setReducerClass(MyReduce.class);    job.setMapOutputKeyClass(Text.class);  job.setMapOutputValueClass(LongWritable.class);  job.setOutputKeyClass(Text.class);  job.setOutputValueClass(LongWritable.class);    FileInputFormat.setInputPaths(job, new Path("hdfs://115.28.138.100:9000/a.txt"));  FileOutputFormat.setOutputPath(job, new Path("hdfs://115.28.138.100:9000/out4"));    job.waitForCompletion(true); }  }

4.MR的序列化

序列化就是把结构化的对象转换为字节流,在MR中,他没有用java自己的序列化,而是自己实现了一套序列化。因为相比较而言,hadoop的序列化有着诸多优点。在mr程序中,我们的参数和输出的键值对全都是实现了序列化的对象,当我们需要自订一个序列化对象,该如何操作呢?只需要实现Writable接口即可,当然key需要实现WritableComparable接口,因为需要根据key来排序和分组。

接着有个小例子来展示序列化。就是电信流量的处理例子。

public class LiuLiang { public static class MyMapper extends Mapper{  Text k2=new Text();  MyArrayWritable v2=new MyArrayWritable();  LongWritable v21=new LongWritable();  LongWritable v22=new LongWritable();  LongWritable v23=new LongWritable();  LongWritable v24=new LongWritable();  LongWritable[] values=new LongWritable[4];  @Override  protected void map(LongWritable k1, Text v1, Context context)      throws IOException, InterruptedException {    String[] words=v1.toString().split("\t");    k2.set(words[1]);    v21.set(Long.parseLong(words[6]));    v22.set(Long.parseLong(words[7]));    v23.set(Long.parseLong(words[8]));    v24.set(Long.parseLong(words[9]));    values[0]=v21;    values[1]=v22;    values[2]=v23;    values[3]=v24;    v2.set(values);    context.write(k2, v2);  } } public static class MyReduce extends Reducer{  Text v3=new Text();  @Override  protected void reduce(Text k2, Iterable v2s, Context context)      throws IOException, InterruptedException {    long sum1=0;    long sum2=0;    long sum3=0;    long sum4=0;    for (MyArrayWritable myArrayWritable : v2s) {     Writable[] values= myArrayWritable.get();     sum1=sum1+((LongWritable)values[0]).get();     sum2=sum2+((LongWritable)values[1]).get();     sum3=sum3+((LongWritable)values[2]).get();     sum4=sum4+((LongWritable)values[3]).get();   }    v3.set("\t"+sum1+"\t"+sum2+"\t"+sum3+"\t"+sum4);    context.write(k2, v3);  } } public static void main(String[] args) throws Exception {  Configuration conf=new Configuration();  Job job=Job.getInstance(conf, LiuLiang.class.getSimpleName());  job.setJarByClass(LiuLiang.class);  job.setMapperClass(MyMapper.class);  job.setReducerClass(MyReduce.class);    job.setMapOutputKeyClass(Text.class);  job.setMapOutputValueClass(MyArrayWritable.class);  job.setOutputKeyClass(Text.class);  job.setOutputValueClass(Text.class);    FileInputFormat.setInputPaths(job, new Path("hdfs://115.28.138.100:9000/HTTP_20130313143750.dat"));  FileOutputFormat.setOutputPath(job, new Path("hdfs://115.28.138.100:9000/ceshi3"));    job.waitForCompletion(true); } }class MyArrayWritable extends ArrayWritable{ public MyArrayWritable(){  super(LongWritable.class); } public MyArrayWritable(String[] arg0) {  super(arg0); } }

5.SequenceFile

在HDFS的学习中,提到了小文件的解决方案,其中一个便是这个SequenceFile。他是一种无序存储,将kv对序列化到文件中,从而合并许多小文件并且支持压缩。缺点是必须遍历才能查看里面各个小文件。

public class SequenceFileTest { public static void main(String[] args) throws Exception{  Configuration conf = new Configuration();  FileSystem fileSystem = FileSystem.get(new URI("hdfs://115.28.138.100:9000"), conf, "hadoop");  //Write(conf, fileSystem);  Read(conf, fileSystem); }   private static void Read(Configuration conf, FileSystem fileSystem) throws IOException {  Reader reader=new SequenceFile.Reader(fileSystem, new Path("/sqtest"), conf);  Text key=new Text();  Text val=new Text();  while(reader.next(key, val)){   System.out.println(key.toString()+"----"+val.toString());  }  IOUtils.closeStream(reader); }   private static void Write(Configuration conf, FileSystem fileSystem) throws IOException {  Writer writer = SequenceFile.createWriter(fileSystem, conf, new Path("/sqtest"), Text.class, Text.class);  Collection files = FileUtils.listFiles(new File("F:\\ceshi1"), new String[] { "txt" }, false);  for (File file : files) {   Text text = new Text();   text.set(FileUtils.readFileToString(file));   writer.append(new Text(file.getName()), text);  }  IOUtils.closeStream(writer); }}

到此,相信大家对"MapReduce怎么使用"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

函数 序列 输出 文件 处理 不同 例子 对象 过程 学习 排序 输入 内容 多个 就是 接口 数据 方法 程序 节点 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 江西恩网络技术有限公司 制作我的世界服务器 山西服务器虚拟化哪家便宜 湖北双路机架服务器购买 辽宁信息化软件开发服务五星服务 防网络安全心得体会1000字 百万条数据如何导入数据库 敏捷软件开发中外案例 福州有哪些软件开发公司吗 数据库安全管理的主要机制 沈阳禹辰软件开发有限公司 软件开发的实际基本流程 网络技术学院可以考什么证 管理的服务器 杨浦区微型软件开发服务 滨州服装库存软件开发 随着互联网科技信息 思维软件开发厦门 武汉有网络安全培训机构吗 在数据库中一对一 东微服务器 网络安全编程与实践微盘 软件开发设备多少钱 数据库不存在数据却查到了数据 阿里云服务器创建用户 网络服务器在哪几个国家 万方数据库用什么分类法 教育软件开发与应用成果 重庆服务器机柜厂家虚拟主机 网络安全教育宣传工作总结报告
0