千家信息网

Hadoop Outline的示例分析

发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,小编给大家分享一下Hadoop Outline的示例分析,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!Hdfs Java API SampleRead by hadoop FsURL
千家信息网最后更新 2025年12月02日Hadoop Outline的示例分析

小编给大家分享一下Hadoop Outline的示例分析,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!

Hdfs Java API Sample

  • Read by hadoop FsURLStreamHandlerFactory

  • Read/Write by hadoop DistributeFileSystem

package com.jinbao.hadoop.hdfs;import java.io.IOException;import java.io.InputStream;import java.io.OutputStream;import java.net.URI;import java.net.URL;import org.apache.commons.io.IOUtils;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;import org.apache.hadoop.fs.Path;/** *  *//** * @author cloudera * */public class HdfsClient {        static String sFileUrl = "hdfs://quickstart.cloudera/gis/gistool/README.md";        /**         * @param args         * @throws IOException          */        public static void main(String[] args) throws IOException {                if(args.length >= 2){                        String sUrl = sFileUrl;                        if(args[0].equalsIgnoreCase("-r-url")){                                sUrl = args[1];                                                                //test read by hadoop FsURLStreamHandlerFactory                                 readHdfsFileByDfsUrl(sUrl);                        }                        else if(args[0].equalsIgnoreCase("-r-file")){                                sUrl = args[1];                                //test read by hadoop dfsFile                                readHdfsFileByDfsFileApi(sUrl);                        }                        else if(args[0].equalsIgnoreCase("-w-file")){                                sUrl = args[1];                                //test read by hadoop dfsFile                                writeHdfsFileByDfsFileApi(sUrl);                        }                        else if(args[0].equalsIgnoreCase("-w-del")){                                sUrl = args[1];                                //test read by hadoop dfsFile                                deleteHdfsFileByDfsFileApi(sUrl);                        }                }                                        }        private static void deleteHdfsFileByDfsFileApi(String sUrl) {                Configuration conf = new Configuration();                try {                                                FileSystem fs = FileSystem.get(URI.create(sUrl),conf);                        Path path = new Path(sUrl);                        fs.delete(path,true);                                        } catch (IOException e) {                        e.printStackTrace();                }                finally{                }        }        private static void writeHdfsFileByDfsFileApi(String sUrl) {                                Configuration conf = new Configuration();                OutputStream out = null;                byte[] data = "Writing Test".getBytes();                 // Get a FSDataInputStream object                try {                        // Get a FSDataInputStream object, actually is HdfsDataInputSteam                        FileSystem fs = FileSystem.get(URI.create(sUrl),conf);                        Path path = new Path(sUrl);                        if(fs.exists(path)){                                out = fs.append(path);                                IOUtils.write(data, out);                        }                        else{                                out = fs.create(path);                                out.write(data);                                // flush buffer to OS                                out.flush();                                FSDataOutputStream fsout = FSDataOutputStream.class.cast(out);                                // Sync data to disk                                fsout.hsync();                                                                // call sync implicitly                                out.close();                        }                } catch (IOException e) {                        // TODO Auto-generated catch block                        e.printStackTrace();                }                finally{                        IOUtils.closeQuietly(out);                }                        }        public static void readHdfsFileByDfsUrl(String sUrl){                                URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());                                InputStream in = null;                try{                        URL url = new URL(sUrl);                        in = url.openStream();                        IOUtils.copy(in,System.out);                }                catch(IOException ioe){                        ioe.printStackTrace();                }                finally{                        IOUtils.closeQuietly(in);                }        }                private static void readHdfsFileByDfsFileApi(String sUrl) {                                Configuration conf = new Configuration();                                InputStream in = null;                                try{                        FileSystem fs = FileSystem.get(URI.create(sUrl),conf);                        //Get a FSDataInputStream object, actually HdfsDataInputStream                                         in = fs.open(new Path(sUrl));                        IOUtils.copy(in,System.out);                }                catch(IOException ioe){                        ioe.printStackTrace();                }                finally{                        IOUtils.closeQuietly(in);                }               }}

Data Flow on Read

1. FileSystem to get DistributeFileSystem

2. DistributeFileSystem通过和namenode调用,获得前面几个块位置,并返回datanode的地址,包括备用节点。

3. 客户端调用DistributeFileSystem获取FSDataInputStream,这个流对象连接距离最近的datanode,通过调用Read读取数据。

4. 如果当前块已经读完,则FSDataInputStream关闭这个块,并寻找下一个最佳的datanode并继续读取。

5. 读完最后一个块后,调用close方法关闭数据流。

容错处理

1. FSDataInputStream如果与datanode通信遇到错误,则尝试下一个最佳备用节点。另外,也会记住那个坏节点,以后不会在它上面读数据。并且,会通知namenode哪一个节点有问题。

Data Flow on Write

1. FileSystem to get DistributeFileSystem

2. DistributeFileSystem通过和namenode的RPC调用create()方法,创建文件原数据。如果已经存在,报出IOException.

3. 客户端调用DistributeFileSystem获取FSDataOutputStream,它封装了一个DFSOutputStream对象,来处理datanode和namenode通信。

4. 客户端开始写数据,则FSDataOutputStream将当前块数据分成一个个的数据包packet,并写入一个数据包队列(Data packet queue), 然后datastreamer来根据datanode列表,要求namenode分配合适的datanode,DataStreamer把这些datanode组成数据管线 (datanode pipeline),数目有dfs.replication决定。

5. 开始写数据,每写入一个包都将它备份到另一个确认队列(Data Ack queue),第一个被写入的节点,会把数据写入第二个节点,然后第三个。如果收到都写完的通知,则从确认队列中删除。

6. 写完一个块后,重复4-5,直到最后写完,调用close方法关闭数据流。

容错处理

FSDataOutputStream如果与datanode通信遇到错误

1. 关闭pipeline

2. 把确认队列的数据包,添加到数据包队列,以防止下游节点(downstream node)丢失数据。

3. 为存储正常的datanode的当前数据制定一个新的标示(identifier), 并把这个标示传递给namenode,以便namenode删除故障node的部分数据。

4. 把剩余的数据写入剩下的好的datanode。namenode会创建新的节点,来复制数据,以达到复本量。对于当前写过程,如果写入成功的节点达到dfs.replication.min就算成功,其他的由namenode进行复制。

复本的布局

照顾稳定性和负载均衡

Hadoop的默认布局策略是在运行客户端上放置第1个复本,如果客户端在cluster外,则在集群中随机选择一个节点.

第2个和第3个会随机选择另外一个相同Rack上的两个节点.

分布式复制 distcp

% hadoop distcp hdfs://namenode/foo hdfs://namenode2/foo

distcp使用map-reduce的作业来实现,非常适用于两个数据中心同步数据。

如果两个数据中心版本不一致,可以试用hftp协议,使得作业之运行在目标系统上

% hadoop distcp hftp://namenode:50070/foo hdfs://namenode2/foo

注:需要指定hftp端口 50070

为了使map平衡集群,可以参考N*20的设置:-m 20*N,N是节点总数.

balancer还没看。

归档工具Har

减小namenode的内存,适合管理小文件,它还是透明的,对map-reduce也是适用的。

%hadoop archive -archivename files.har /myfiles/ /my

不足:

har相当于tar功能,可以打包文件,不支持压缩。仅仅节省的namenode的内存。

一旦创建就不能修改,想添加和删除文件,必须重新建立har.

看完了这篇文章,相信你对"Hadoop Outline的示例分析"有了一定的了解,如果想了解更多相关知识,欢迎关注行业资讯频道,感谢各位的阅读!

0