hdfs常用API和putMerge功能实现
发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,所需jar包一、URL API操作方式import java.io.InputStream;import java.net.URL;import org.apache.hadoop.fs.FsUrlS
千家信息网最后更新 2025年12月03日hdfs常用API和putMerge功能实现
所需jar包
一、URL API操作方式
import java.io.InputStream;import java.net.URL;import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;import org.apache.hadoop.io.IOUtils;import org.junit.Test;public class HDFSUrlTest { /** * HDFS URL API操作方式 * 不需要读取core-site.xml和hdfs-site.xml配置文件 */ // 让JAVA程序识别HDFS的URL static { URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory()); } // 查看文件内容 @Test public void testRead() throws Exception { InputStream in = null; // 文件路径 String fileUrl = "hdfs://hadoop-master.dragon.org:9000/opt/data/test/01.data"; try { // 获取文件输入流 in = new URL(fileUrl).openStream(); // 将文件内容读取出来,打印控制台 IOUtils.copyBytes(in, System.out, 4096, false); } finally { IOUtils.closeStream(in); } }}二、通过FileSystem API操作HDFS
HDFS工具类
import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;public class HDFSUtils {/** * HDFS工具类 */ public static FileSystem getFileSystem() { //声明FileSystem FileSystem hdfs=null; try { //获取文件配置信息 Configuration conf =new Configuration(); //获取文件系统 hdfs=FileSystem.get(conf); } catch (IOException e) { e.printStackTrace(); } return hdfs; } }常用操作实现类
import org.apache.hadoop.fs.BlockLocation;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileStatus;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hdfs.DistributedFileSystem;import org.apache.hadoop.hdfs.protocol.DatanodeInfo;import org.apache.hadoop.io.IOUtils;import org.apache.hadoop.mapred.gethistory_jsp;import org.junit.Test;public class HDFSFsTest { /** * * 通过FileSystem API操作HDFS */ // 读取文件内容 @Test public void testRead() throws Exception { // 获取文件系统 FileSystem hdfs = HDFSUtils.getFileSystem(); // 文件名称 Path path = new Path("/opt/data/test/touch.data"); // 打开文件输入流 FSDataInputStream inStream = hdfs.open(path); // 读取文件到控制台显示 IOUtils.copyBytes(inStream, System.out, 4096, false); // 关闭流 IOUtils.closeStream(inStream); } // 查看目录 @Test public void testList() throws Exception { FileSystem hdfs = HDFSUtils.getFileSystem(); // 文件名称 Path path = new Path("/opt/data"); FileStatus[] fileStatus = hdfs.listStatus(path); for (FileStatus file : fileStatus) { Path p = file.getPath(); String info = file.isDir() ? "目录" : "文件"; System.out.println(info + ":" + p); } } // 创建目录 @Test public void testDirectory() throws Exception { FileSystem hdfs = HDFSUtils.getFileSystem(); // 要创建的目录 Path path = new Path("/opt/data/dir"); boolean isSuccessful = hdfs.mkdirs(path);// 相当于 linux下 mkdir -p // /opt/data/dir String info = isSuccessful ? "成功" : "失败"; System.out.println("创建目录【" + path + "】" + info); } // 上传文件-- put copyFromLocal @Test public void testPut() throws Exception { FileSystem hdfs = HDFSUtils.getFileSystem(); // 本地文件(目录+文件名称) Path srcPath = new Path("c:/0125.log"); // hdfs文件上传路径 Path dstPath = new Path("/opt/data/dir/"); hdfs.copyFromLocalFile(srcPath, dstPath); } // 创建hdfs文件并写入内容 @Test public void testCreate() throws Exception { FileSystem hdfs = HDFSUtils.getFileSystem(); Path path = new Path("/opt/data/dir/touch.data"); // 创建文件并获取输出流 FSDataOutputStream fSDataOutputStream = hdfs.create(path); // 通过输出流写入数据 fSDataOutputStream.write("你好".getBytes()); fSDataOutputStream.writeUTF("hello hadoop!"); IOUtils.closeStream(fSDataOutputStream); } // 文件重命名 @Test public void testRename() throws Exception { FileSystem hdfs = HDFSUtils.getFileSystem(); Path oldPath = new Path("/opt/data/dir/touch.data"); Path newPath = new Path("/opt/data/dir/rename.data"); boolean flag = hdfs.rename(oldPath, newPath); System.out.println(flag); } // 删除文件 public void testDelete() throws Exception { FileSystem hdfs = HDFSUtils.getFileSystem(); Path path = new Path("/opt/data/dir/touch.data"); boolean flag = hdfs.deleteOnExit(path); System.out.println(flag); } // 删除目录 public void testDeleteDir() throws Exception { FileSystem hdfs = HDFSUtils.getFileSystem(); Path path = new Path("/opt/data/dir"); boolean flag = hdfs.delete(path, true);// 如果是目录第二个参数必须为true System.out.println(flag); } // 查找某个文件在hdfs集群的位置 public void testLocation() throws Exception { FileSystem hdfs = HDFSUtils.getFileSystem(); Path path = new Path("/opt/data/test.file"); FileStatus fileStatus = hdfs.getFileStatus(path); BlockLocation[] blockLocations = hdfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen()); for (BlockLocation blockLocation : blockLocations) { String[] hosts = blockLocation.getHosts(); for (String host : hosts) { System.out.print(host + " "); } System.out.println(); } } // 获取hdfs集群上所有节点名称信息 public void testCluster() throws Exception { FileSystem hdfs = HDFSUtils.getFileSystem(); DistributedFileSystem distributedFileSystem = (DistributedFileSystem) hdfs; DatanodeInfo[] datanodeInfos = distributedFileSystem.getDataNodeStats(); for (DatanodeInfo datanodeInfo : datanodeInfos) { String hostName = datanodeInfo.getHostName(); System.out.println(hostName); } }}三、上传合并小文件到hdfs
实现思想:循环遍历本地文件输入流
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileStatus;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;/** * * 向hdfs上传复制文件的过程中,进行合并文件 * */public class PutMerge { /** * * @param localDir * 本地要上传的文件目录 * @param hdfsFile * HDFS上的文件名称,包括路径 */ public static void put(String localDir, String hdfsFile) throws Exception { // 获取配置信息 Configuration conf = new Configuration(); Path localPath = new Path(localDir); Path hdfsPath = new Path(hdfsFile); // 获取本地文件系统 FileSystem localFs = FileSystem.getLocal(conf); // 获取HDFS FileSystem hdfs = FileSystem.get(conf); // 本地文件系统指定目录中的所有文件 FileStatus[] status = localFs.listStatus(localPath); // 打开hdfs上文件的输出流 FSDataOutputStream fSDataOutputStream = hdfs.create(hdfsPath); // 循环遍历本地文件 for (FileStatus fileStatus : status) { // 获取文件 Path path = fileStatus.getPath(); System.out.println("文件为:" + path.getName()); // 打开文件输入流 FSDataInputStream fSDataInputStream = localFs.open(path); // 进行流的读写操作 byte[] buff = new byte[1024]; int len = 0; while ((len = fSDataInputStream.read(buff)) > 0) { fSDataOutputStream.write(buff, 0, len); } fSDataInputStream.close(); } fSDataOutputStream.close(); } public static void main(String[] args) { String localDir="D:/logs"; String hdfsFile="hdfs://hadoop-master.dragon.org:9000/opt/data/logs.data"; try { put(localDir,hdfsFile); } catch (Exception e) { e.printStackTrace(); }} }
文件
目录
名称
内容
系统
输入
信息
路径
输出
配置
工具
控制台
方式
集群
循环
控制
常用
成功
位置
你好
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
jfinal搭建数据库
山西电商软件开发平台
学生网络安全应急演练方案
烟草行业网络安全三个不得
化解网络安全风险小结
数据库中查询某一行数据类型
上海软件开发培训哪个好
崇明区多功能软件开发设计标准
外键关联同时插数据库
hp 1001打印服务器
手机如何购买服务器
单元测试数据库连接池不够
新手怎么挑选备案型腾讯云服务器
网络技术是一把 双刃剑
同森网络技术 产品
华为路由h6打印服务器
php数据库时间
数据分析服务器如何使用
计算机网络技术专业学啥
服务器集虚拟管理技术
网络安全论文一千字
服务器无法处理请求
武汉财务软件开发
电脑自动被改代理服务器端口
数据库中如何增加服务器
网络技术员网站
校园网的服务器作用
软件开发时间节点确定
网络安全为人民的画
偶数mpp数据库安全