Hadoop2.6.0学习笔记(七)MapReduce分区
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,鲁春利的工作笔记,谁说程序员不能有文艺范?MapReduce中map task任务的数量是由spli分片决定,那么reduce task的数量由什么来确定的呢?就是这里要讨论的MapReduce分区。
千家信息网最后更新 2025年12月02日Hadoop2.6.0学习笔记(七)MapReduce分区
鲁春利的工作笔记,谁说程序员不能有文艺范?
MapReduce中map task任务的数量是由spli分片决定,那么reduce task的数量由什么来确定的呢?就是这里要讨论的MapReduce分区。默认情况下,MapReduce中使用的是HashPartitioner。
/** Partition keys by their {@link Object#hashCode()}. */public class HashPartitioner extends Partitioner { /** Use {@link Object#hashCode()} to partition. */ public int getPartition(K key, V value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; }} 在HashPartitioner中getPartition()方法有三个形参,key、value分别指的是Mapper任务的输出,numReduceTasks指的是设置的Reducer任务数量,默认值是1。通过取key的hashCode,然后通过和Integer.MAX_VALUE与运算被转换为一个非负整数,任何整数与1相除的余数肯定是0。也就是说getPartition(…)方法的返回值总是0,也就是Mapper任务的输出总是送给一个Reducer任务,最终只能输出到一个文件中。
示例:对于通过不同协议访问某些url数据进行统计(日志五元组)
原始数据
[hadoop@nnode code]$ hdfs dfs -text /http_interceptor_20130913.txt2013-09-13 16:04:08 www.subnetc1.com 192.168.1.7 80 192.168.1.139 18863 FTP www.subnetc1.com/index.html2013-09-13 16:04:08 www.subnetc2.com 192.168.1.7 80 192.168.1.159 14100 HTTP www.subnetc2.com/index.html2013-09-13 16:04:08 www.subnetc3.com 192.168.1.7 80 192.168.1.130 4927 HTTPS www.subnetc3.com/index.html2013-09-13 16:04:08 www.subnetc4.com 192.168.1.7 80 192.168.1.154 39044 HTTP www.subnetc4.com/index.html[hadoop@nnode code]$
实现Mapper
package com.lucl.hadoop.mapreduce.part;import java.io.IOException;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;/** * * @author luchunli * @description 实现Mapper * */public class ProtocolMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String [] values = value.toString().split("\t"); if (null == values || values.length != 8) { return; } Text newKey = new Text(); Text newValue = new Text(); newKey.set(values[6].trim()); newValue.set(values[7].trim()); context.write(newKey, newValue); }}
实现Reducer
package com.lucl.hadoop.mapreduce.part;import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;/** * * @author luchunli * @description 实现Reducer * */public class ProtocolReducer extends Reducer{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { StringBuffer sbf = new StringBuffer(); for (Text text : values) { sbf.append(text.toString()); sbf.append(";"); } context.write(key, new Text(sbf.toString())); }}
实现Partitioner
package com.lucl.hadoop.mapreduce.part;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Partitioner;/** * * @author luchunli * @description 自定义分区类 * */public class ProtocolPartitioner extends Partitioner{ @Override public int getPartition(Text key, Text value, int numReduceTasks) { if (key.toString().equals("FTP")) { return 0; } if (key.toString().equals("HTTP")) { return 1; } if (key.toString().equals("HTTPS")) { return 2; } return 0; }}
实现驱动器类
package com.lucl.hadoop.mapreduce.part;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class ProtocolDriver extends Configured implements Tool { public static void main(String[] args) { try { ToolRunner.run(new ProtocolDriver(), args); } catch (Exception e) { e.printStackTrace(); } } @Override public int run(String[] args) throws Exception { Job job = Job.getInstance(this.getConf(), this.getClass().getSimpleName()); job.setJarByClass(ProtocolDriver.class); FileInputFormat.addInputPath(job, new Path(args[0])); job.setMapperClass(ProtocolMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // 设置task reduce的个数 job.setNumReduceTasks(3); job.setPartitionerClass(ProtocolPartitioner.class); job.setReducerClass(ProtocolReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // job.setOutputFormatClass(ProtocolOutputFormat.class); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; }}调用执行
[hadoop@nnode code]$ hadoop jar PartMR.jar /http_interceptor_20130913.txt /201512050001815/12/05 21:41:12 INFO client.RMProxy: Connecting to ResourceManager at nnode/192.168.137.117:803215/12/05 21:41:13 INFO input.FileInputFormat: Total input paths to process : 115/12/05 21:41:13 INFO mapreduce.JobSubmitter: number of splits:115/12/05 21:41:13 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1449302623953_000815/12/05 21:41:13 INFO impl.YarnClientImpl: Submitted application application_1449302623953_000815/12/05 21:41:14 INFO mapreduce.Job: The url to track the job: http://nnode:8088/proxy/application_1449302623953_0008/15/12/05 21:41:14 INFO mapreduce.Job: Running job: job_1449302623953_000815/12/05 21:41:43 INFO mapreduce.Job: Job job_1449302623953_0008 running in uber mode : false15/12/05 21:41:43 INFO mapreduce.Job: map 0% reduce 0/12/05 21:42:12 INFO mapreduce.Job: map 100% reduce 0/12/05 21:42:32 INFO mapreduce.Job: map 100% reduce 33/12/05 21:42:52 INFO mapreduce.Job: map 100% reduce 100/12/05 21:42:55 INFO mapreduce.Job: Job job_1449302623953_0008 completed successfully15/12/05 21:42:55 INFO mapreduce.Job: Counters: 50 File System Counters FILE: Number of bytes read=158 FILE: Number of bytes written=431827 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=532 HDFS: Number of bytes written=130 HDFS: Number of read operations=12 HDFS: Number of large read operations=0 HDFS: Number of write operations=6 Job Counters Killed reduce tasks=1 Launched map tasks=1 Launched reduce tasks=4 Data-local map tasks=1 Total time spent by all maps in occupied slots (ms)=26277 Total time spent by all reduces in occupied slots (ms)=105054 Total time spent by all map tasks (ms)=26277 Total time spent by all reduce tasks (ms)=105054 Total vcore-seconds taken by all map tasks=26277 Total vcore-seconds taken by all reduce tasks=105054 Total megabyte-seconds taken by all map tasks=26907648 Total megabyte-seconds taken by all reduce tasks=107575296 Map-Reduce Framework Map input records=4 Map output records=4 Map output bytes=132 Map output materialized bytes=158 Input split bytes=109 Combine input records=0 Combine output records=0 Reduce input groups=3 Reduce shuffle bytes=158 Reduce input records=4 Reduce output records=3 Spilled Records=8 Shuffled Maps =3 Failed Shuffles=0 Merged Map outputs=3 GC time elapsed (ms)=410 CPU time spent (ms)=4360 Physical memory (bytes) snapshot=515862528 Virtual memory (bytes) snapshot=3399213056 Total committed heap usage (bytes)=167907328 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=423 File Output Format Counters Bytes Written=130[hadoop@nnode code]$
查看结果
[hadoop@nnode code]$ hdfs dfs -ls /2015120500018Found 4 items-rw-r--r-- 2 hadoop hadoop 0 2015-12-05 21:42 /2015120500018/_SUCCESS-rw-r--r-- 2 hadoop hadoop 33 2015-12-05 21:42 /2015120500018/part-r-00000-rw-r--r-- 2 hadoop hadoop 62 2015-12-05 21:42 /2015120500018/part-r-00001-rw-r--r-- 2 hadoop hadoop 35 2015-12-05 21:42 /2015120500018/part-r-00002[hadoop@nnode code]$ hdfs dfs -text /2015120500018/part-r-00000FTP www.subnetc1.com/index.html;[hadoop@nnode code]$ hdfs dfs -text /2015120500018/part-r-00001HTTP www.subnetc4.com/index.html;www.subnetc2.com/index.html;[hadoop@nnode code]$ hdfs dfs -text /2015120500018/part-r-00002HTTPS www.subnetc3.com/index.html;[hadoop@nnode code]$
上述生成的文件命名格式是MapReduce根据任务自动生成的,我们可以通过自定义OutputFormat来自定义输出文件的名称。
自定义的OutputFormat代码如下,这里和之前的MultipleWorkCount的区别在于本示例中直接通过FSDataOutputStream来实现,而不是之前调用LineRecordWriter的方式。
package com.lucl.hadoop.mapreduce.part;import java.io.DataOutputStream;import java.io.IOException;import java.io.UnsupportedEncodingException;import java.util.HashMap;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.OutputCommitter;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;/** * * @author luchunli * @description 自定义OutputFormat */public class ProtocolOutputFormat extends TextOutputFormat{ protected static class ProtocolRecordWriter extends RecordWriter { private static final String utf8 = "UTF-8"; private static final byte[] newline; static { try { newline = "\n".getBytes(utf8); } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + utf8 + " encoding"); } } protected TaskAttemptContext context = null; protected HashMap recordStream = null; protected Path workPath = null; public ProtocolRecordWriter () {} public ProtocolRecordWriter (TaskAttemptContext context, Path workPath) { this.context = context; this.workPath = workPath; recordStream = new HashMap (); } @Override public void write(Text key, Text value) throws IOException, InterruptedException { boolean nullKey = key == null; boolean nullValue = value == null; if (nullKey && nullValue) { return; } DataOutputStream out = recordStream.get(key); if (null == out) { Path file = new Path(workPath, key + ".txt"); out = file.getFileSystem(this.context.getConfiguration()).create(file, false); recordStream.put(key, out); } if (!nullKey) { out.write(key.getBytes(), 0, key.getLength()); } if (!(nullKey || nullValue)) { out.write("\t".getBytes()); } if (!nullValue) { out.write(value.getBytes(), 0, value.getLength()); } out.write(newline); } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { for (DataOutputStream out : recordStream.values()) { out.close(); } recordStream.clear(); recordStream = null; } } @Override public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { Path workPath = this.getTaskOutputPath(context); return new ProtocolRecordWriter(context, workPath); } private Path getTaskOutputPath(TaskAttemptContext context) throws IOException { Path workPath = null; OutputCommitter committer = super.getOutputCommitter(context); if (committer instanceof FileOutputCommitter) { // Get the directory that the task should write results into. workPath = ((FileOutputCommitter) committer).getWorkPath(); } else { // Get the {@link Path} to the output directory for the map-reduce job. // context.getConfiguration().get(FileOutputFormat.OUTDIR); Path outputPath = super.getOutputPath(context); if (null == outputPath) { throw new IOException("Undefined job output-path."); } workPath = outputPath; } return workPath; }}
再次运行
[hadoop@nnode code]$ hadoop jar PartMR.jar /http_interceptor_20130913.txt /201512050002015/12/05 21:59:28 INFO client.RMProxy: Connecting to ResourceManager at nnode/192.168.137.117:803215/12/05 21:59:30 INFO input.FileInputFormat: Total input paths to process : 115/12/05 21:59:30 INFO mapreduce.JobSubmitter: number of splits:115/12/05 21:59:30 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1449302623953_001015/12/05 21:59:30 INFO impl.YarnClientImpl: Submitted application application_1449302623953_001015/12/05 21:59:31 INFO mapreduce.Job: The url to track the job: http://nnode:8088/proxy/application_1449302623953_0010/15/12/05 21:59:31 INFO mapreduce.Job: Running job: job_1449302623953_001015/12/05 22:00:00 INFO mapreduce.Job: Job job_1449302623953_0010 running in uber mode : false15/12/05 22:00:00 INFO mapreduce.Job: map 0% reduce 0/12/05 22:00:29 INFO mapreduce.Job: map 100% reduce 0/12/05 22:00:48 INFO mapreduce.Job: map 100% reduce 33/12/05 22:01:07 INFO mapreduce.Job: map 100% reduce 100/12/05 22:01:07 INFO mapreduce.Job: Job job_1449302623953_0010 completed successfully15/12/05 22:01:07 INFO mapreduce.Job: Counters: 50 File System Counters FILE: Number of bytes read=158 FILE: Number of bytes written=432595 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=532 HDFS: Number of bytes written=130 HDFS: Number of read operations=12 HDFS: Number of large read operations=0 HDFS: Number of write operations=6 Job Counters Killed reduce tasks=1 Launched map tasks=1 Launched reduce tasks=4 Data-local map tasks=1 Total time spent by all maps in occupied slots (ms)=26075 Total time spent by all reduces in occupied slots (ms)=92427 Total time spent by all map tasks (ms)=26075 Total time spent by all reduce tasks (ms)=92427 Total vcore-seconds taken by all map tasks=26075 Total vcore-seconds taken by all reduce tasks=92427 Total megabyte-seconds taken by all map tasks=26700800 Total megabyte-seconds taken by all reduce tasks=94645248 Map-Reduce Framework Map input records=4 Map output records=4 Map output bytes=132 Map output materialized bytes=158 Input split bytes=109 Combine input records=0 Combine output records=0 Reduce input groups=3 Reduce shuffle bytes=158 Reduce input records=4 Reduce output records=3 Spilled Records=8 Shuffled Maps =3 Failed Shuffles=0 Merged Map outputs=3 GC time elapsed (ms)=339 CPU time spent (ms)=4690 Physical memory (bytes) snapshot=513667072 Virtual memory (bytes) snapshot=3405312000 Total committed heap usage (bytes)=167907328 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=423 File Output Format Counters Bytes Written=130[hadoop@nnode code]$
查看结果
[hadoop@nnode code]$ hdfs dfs -ls /2015120500020Found 4 items-rw-r--r-- 2 hadoop hadoop 33 2015-12-05 22:01 /2015120500020/FTP.txt-rw-r--r-- 2 hadoop hadoop 62 2015-12-05 22:00 /2015120500020/HTTP.txt-rw-r--r-- 2 hadoop hadoop 35 2015-12-05 22:01 /2015120500020/HTTPS.txt-rw-r--r-- 2 hadoop hadoop 0 2015-12-05 22:01 /2015120500020/_SUCCESS[hadoop@nnode code]$ hdfs dfs -text /2015120500020/FTP.txtFTP www.subnetc1.com/index.html;[hadoop@nnode code]$ hdfs dfs -text /2015120500020/HTTP.txtHTTP www.subnetc4.com/index.html;www.subnetc2.com/index.html;[hadoop@nnode code]$ hdfs dfs -text /2015120500020/HTTPS.txtHTTPS www.subnetc3.com/index.html;[hadoop@nnode code]$
任务
输出
数量
文件
也就是
数据
整数
方法
示例
结果
生成
笔记
不同
原始
三个
个数
也就是说
代码
余数
再次
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
泸州专门做软件开发的公司
浙江常规软件开发价格实惠
计算机软件开发合同履行瑕疵
国家网络安全宣传小视频下载
查询数据库重复的数据
软件开发x1还是mbp
关于网页和服务器的关系
软件开发的风险分析与评估
服务器被修空调
舰船软件开发标准
公安网络安全调研文章
软件开发的用户故事
易天天软件开发
江苏省绿色专利数据库
数据库建表怎么看类型
手机软件开发学习有哪些书
数据库服务器不联网渗透测试
论文查询四大数据库
数据库8.0.27怎么弄中文
如何防止网络安全防范措施
苹果官方服务器无法同步更新机器
树莓派搭建web服务器
软件开发道路
王者荣耀 上海服务器
方舟生存进化服务器安全区设置
万方数据库如何分类检索
嵌入式软件开发的几个阶段
网站代理服务器获得软件
网络安全风险处置报告
泉州设备信息化管理软件开发