Python3 操作 HDFS
发表于:2025-12-04 作者:千家信息网编辑
千家信息网最后更新 2025年12月04日,【第三方包】pyhdfs(pypi,github,支持HA)【功能】重命名 hdfs 文件或目录# encoding: utf-8# author: walker# date: 2018-03-17
千家信息网最后更新 2025年12月04日Python3 操作 HDFS
【第三方包】
pyhdfs(pypi,github,支持HA)
【功能】
重命名 hdfs 文件或目录
# encoding: utf-8# author: walker# date: 2018-03-17 # summary: 利用 pyhdfs 重命名 hdfs 文件或目录import os, sys, timefrom pyhdfs import HdfsClientSrcPath = '/test/xxx'DstPath = '/test/yyy'NameNode = 'nn1.example.com:50070,nn2.example.com:50070'# 将 SrcPath 改名为 DstPathdef Rename(SrcPath, DstPath): fs = HdfsClient(hosts=NameNode) if not fs.exists(SrcPath): print('Error: not found %s' % SrcPath) sys.exit(-1) print('Reanme ... \n%s\n -> \n%s \n' % (SrcPath, DstPath)) fs.rename(SrcPath, DstPath) if __name__ == '__main__': Rename(SrcPath, DstPath)上传文件
# encoding: utf-8# author: walker# date: 2018-01-23# summary: 上传本地文件到 hdfs 目录import os, sys, timefrom pyhdfs import HdfsClientfrom configparser import ConfigParsercur_dir_fullpath = os.path.dirname(os.path.abspath(__file__))StartTime = time.time()FileSize = 0 #文件总大小LocalDir = ''HdfsDir = ''NameNode = ''UserName = ''#读取配置文件 def ReadConfig(): global LocalDir, HdfsDir, NameNode, UserName cfg = ConfigParser() cfgFile = os.path.join(cur_dir_fullpath, 'config.ini') if not os.path.exists(cfgFile): input(cfgFile + ' not found') sys.exit(-1) cfgLst = cfg.read(cfgFile) if len(cfgLst) < 1: input('Read config.ini failed...') sys.exit(-1) LocalDir = cfg.get('config', 'LocalDir').strip() if not os.path.exists(LocalDir): input(LocalDir + ' not found') sys.exit(-1) print('LocalDir:' + LocalDir) HdfsDir = cfg.get('config', 'HdfsDir').strip() print('HdfsDir:' + HdfsDir) NameNode = cfg.get('config', 'NameNode').strip() print('NameNode:' + NameNode) UserName = cfg.get('config', 'UserName').strip() print('UserName:' + UserName) print('Read config.ini successed!') #处理一个def ProcOne(client, srcFile, dstFile): global FileSize print('ProcOne \n%s\n -> \n%s ' % (srcFile, dstFile)) #目标文件已经存在且大小相同 if client.exists(dstFile) and \ (os.path.getsize(srcFile) == client.list_status(dstFile)[0].length): print('file exists: %s ' % dstFile) return True #注意,如果已存在会被覆盖 client.copy_from_local(srcFile, dstFile, overwrite=True) #校验文件大小 if os.path.getsize(srcFile) == client.list_status(dstFile)[0].length: FileSize += os.path.getsize(srcFile) return True return False #处理所有def ProcAll(): client = HdfsClient(hosts=NameNode, user_name=UserName) if not client.exists(HdfsDir): print(HdfsDir + ' not found') sys.exit(-1) total = len(os.listdir(LocalDir)) processed = 0 failedList = list() for filename in os.listdir(LocalDir): srcFile = os.path.join(LocalDir, filename) dstFile = HdfsDir + '/' + filename if not ProcOne(client, srcFile, dstFile): failedList.append(srcFile) processed += 1 print('%d/%d/%d, time cost: %.2f s' % (total, processed, len(failedList), time.time()-StartTime)) print('%d B, %.2f MB/s \n' % (FileSize, FileSize/1024/1024/(time.time()-StartTime))) if failedList: print('failedList: %s' % repr(failedList)) else: print('Good! No Error!') print('%d B, %.2f MB, %.2f GB, %.2f MB/s' % \ (FileSize, FileSize/1024/1024, FileSize/1024/1024/1024, FileSize/1024/1024/(time.time()-StartTime))) if __name__ == '__main__': ReadConfig() ProcAll() print('Time total: %.2f s' % (time.time()-StartTime)) print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime()))下载 HDFS 文件到本地
# encoding: utf-8# author: walker# date: 2018-06-07# summary: 下载 HDFS 文件(或目录)到本地import os, sys, timefrom pyhdfs import HdfsClientfrom configparser import ConfigParsercur_dir_fullpath = os.path.dirname(os.path.abspath(__file__))StartTime = time.time()FileSize = 0 #文件总大小LocalDir = ''HdfsDir = ''NameNode = ''UserName = ''#读取配置文件 def ReadConfig(): global LocalDir, HdfsDir, NameNode, UserName cfg = ConfigParser() cfgFile = os.path.join(cur_dir_fullpath, 'config.ini') if not os.path.exists(cfgFile): input(cfgFile + ' not found') sys.exit(-1) cfgLst = cfg.read(cfgFile) if len(cfgLst) < 1: input('Read config.ini failed...') sys.exit(-1) LocalDir = cfg.get('config', 'LocalDir').strip() if not os.path.exists(LocalDir): input(LocalDir + ' not found') sys.exit(-1) print('LocalDir:' + LocalDir) HdfsDir = cfg.get('config', 'HdfsDir').strip().rstrip('/') print('HdfsDir:' + HdfsDir) NameNode = cfg.get('config', 'NameNode').strip() print('NameNode:' + NameNode) UserName = cfg.get('config', 'UserName').strip() print('UserName:' + UserName) print('Read config.ini successed!') #处理一个def ProcOne(client, srcFile, dstFile): global FileSize print('ProcOne \n%s\n -> \n%s ' % (srcFile, dstFile)) dstDir = os.path.dirname(dstFile) if not os.path.exists(dstDir): os.makedirs(dstDir) # 目标文件已经存在且大小相同 if os.path.exists(dstFile) and \ (os.path.getsize(dstFile) == client.list_status(srcFile)[0].length): print('file exists: %s ' % dstFile) return True # 注意,如果已存在会被覆盖 client.copy_to_local(srcFile, dstFile, overwrite=True) if os.path.getsize(dstFile) != client.list_status(srcFile)[0].length: #校验文件大小 return False FileSize += os.path.getsize(dstFile) return True #处理所有def ProcAll(): client = HdfsClient(hosts=NameNode, user_name=UserName) if not client.exists(HdfsDir): print(HdfsDir + ' not found') sys.exit(-1) total = 0 # 先遍历一遍,得到总文件个数 for parent, dirnames, filenames in client.walk(HdfsDir): for filename in filenames: total += 1 processed = 0 failedList = list() for parent, dirnames, filenames in client.walk(HdfsDir): for filename in filenames: srcFile = '%s/%s' % (parent, filename) relPath = srcFile[len(HdfsDir)+1:].replace('/', '\\') # 相对于根目录的路径 dstFile = os.path.join(LocalDir, relPath) if not ProcOne(client, srcFile, dstFile): failedList.append(srcFile) processed += 1 print('%d/%d/%d, time cost: %.2f s' % (total, processed, len(failedList), time.time()-StartTime)) print('%d B, %.2f MB/s \n' % (FileSize, FileSize/1024/1024/(time.time()-StartTime))) if failedList: print('failedList: %s' % repr(failedList)) else: print('Good! No Error!') print('%d B, %.2f MB, %.2f GB, %.2f MB/s' % \(FileSize, FileSize/1024/1024, FileSize/1024/1024/1024, FileSize/1024/1024/(time.time()-StartTime))) if __name__ == '__main__': ReadConfig() ProcAll() print('Time total: %.2f s' % (time.time()-StartTime)) print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime()))*** walker ***
文件
大小
目录
相同
目标
处理
配置
个数
功能
根目录
第三方
路径
支持
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
数据库安全市场份额
湖北工控软件开发如何收费
银行软件开发考什么条件
服务器机盖怎么打开 hp
对于网络安全的认识正确的是
幼儿园网络安全培训美篇
服务器不显示
K公司是一家软件开发公司
数据库连接 方式
数据库课程设计运动系统
设备管理 软件开发
一梦江湖的服务器经验
网格员开展网络安全周宣传
中国知网是否为索引数据库
网络安全在我心小学五年级作文
CENTOS下载软件开发
软件开发通常包含的步骤
企业管理软件开发杭州
融媒体网络安全应急预案
数据库 硬盘
删除企业级地理数据库
java本地服务器搭建
爬虫爬取文章并保存数据库
湛江网络安全实战特训营要怎么学
学生网络安全教育教育平台
浦东新区本地网络技术收费
网络安全排查方法
软件开发怎样找客户
数据库应用系统等生命周期
手机上搭建网站服务器