python如何实现查询bucket已用量脚本
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,小编给大家分享一下python如何实现查询bucket已用量脚本,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!目前仅支持c
千家信息网最后更新 2025年12月02日python如何实现查询bucket已用量脚本
小编给大家分享一下python如何实现查询bucket已用量脚本,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!
目前仅支持ceph的s3方案,具体配置看说明
# -*- coding: utf-8 -*-import requestsimport jsonfrom email.utils import formatdateimport hmacpy3k = Falsefrom hashlib import sha1 as shatry: from urlparse import urlparse from base64 import encodestringexcept: py3k = True from urllib.parse import urlparse from base64 import encodebytes as encodestringclass AuthBase(object): """Base class that all auth implementations derive from""" def __call__(self, r): raise NotImplementedError('Auth hooks must be callable.')class S3Auth(AuthBase): """Attaches AWS Authentication to the given Request object.""" service_base_url = 's3.amazonaws.com' # List of Query String Arguments of Interest special_params = [ 'acl', 'location', 'logging', 'partNumber', 'policy', 'requestPayment', 'torrent', 'versioning', 'versionId', 'versions', 'website', 'uploads', 'uploadId', 'response-content-type', 'response-content-language', 'response-expires', 'response-cache-control', 'delete', 'lifecycle', 'response-content-disposition', 'response-content-encoding' ] def __init__(self, access_key, secret_key, service_url=None): if service_url: self.service_base_url = service_url self.access_key = str(access_key) self.secret_key = str(secret_key) self.au ="" def __call__(self, r): # Create date header if it is not created yet. if not 'date' in r.headers and not 'x-amz-date' in r.headers: r.headers['date'] = formatdate( timeval=None, localtime=False, usegmt=True) signature = self.get_signature(r) if py3k: signature = signature.decode('utf-8') r.headers['Authorization'] = 'AWS %s:%s' % (self.access_key, signature) self.au = r.headers # print self.au return r def get_signature(self, r): canonical_string = self.get_canonical_string( r.url, r.headers, r.method) if py3k: key = self.secret_key.encode('utf-8') msg = canonical_string.encode('utf-8') else: key = self.secret_key msg = canonical_string h = hmac.new(key, msg, digestmod=sha) return encodestring(h.digest()).strip() def get_canonical_string(self, url, headers, method): parsedurl = urlparse(url) objectkey = parsedurl.path[1:] query_args = sorted(parsedurl.query.split('&')) bucket = parsedurl.netloc[:-len(self.service_base_url)] if len(bucket) > 1: # remove last dot bucket = bucket[:-1] interesting_headers = { 'content-md5': '', 'content-type': '', 'date': ''} for key in headers: lk = key.lower() try: lk = lk.decode('utf-8') except: pass if headers[key] and (lk in interesting_headers.keys() or lk.startswith('x-amz-')): interesting_headers[lk] = headers[key].strip() # If x-amz-date is used it supersedes the date header. if not py3k: if 'x-amz-date' in interesting_headers: interesting_headers['date'] = '' else: if 'x-amz-date' in interesting_headers: interesting_headers['date'] = '' buf = '%s\n' % method for key in sorted(interesting_headers.keys()): val = interesting_headers[key] if key.startswith('x-amz-'): buf += '%s:%s\n' % (key, val) else: buf += '%s\n' % val # append the bucket if it exists if bucket != '': buf += '/%s' % bucket # add the objectkey. even if it doesn't exist, add the slash buf += '/%s' % objectkey params_found = False # handle special query string arguments for q in query_args: k = q.split('=')[0] if k in self.special_params: if params_found: buf += '&%s' % q else: buf += '?%s' % q params_found = True return bufclass S3Admin(): def __init__(self): self.access_key = '' #填access_key self.secret_key = '' #填secret_key self.endpoint = 's3.ceph.work' #填endpoint def get_bucket_usage(self,bucketname): #url = 'http://%s/%s/' % (self.endpoint, bucketname) #path style url = 'http://%s.%s/' % (bucketname,self.endpoint) #virtual hosted styple r = requests.head(url, auth=S3Auth(self.access_key, self.secret_key, self.endpoint)) print r.headers return r.headerss3client = S3Admin()bucket_name= 'xxx' #替换成相应的bucket名称result = s3client.get_bucket_usage(bucket_name)print 'objects_num= %s , total_Bytes_Used= %s ' % (result['X-RGW-Object-Count'],result['X-RGW-Bytes-Used'])#注意 objects_num 为当前bucket内的object数量,total_Bytes_Used为当前bucket内的已用容量(单位为Byte)以上是"python如何实现查询bucket已用量脚本"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!
篇文章
用量
脚本
查询
内容
不怎么
单位
名称
大部分
容量
数量
方案
更多
知识
行业
资讯
资讯频道
频道
utf-8
参考
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
郑州app软件开发报价
江苏数据库空投箱代理商
电梯刷卡系统数据库建立
网络安全事件处置应急指南
数据库如何插入测试
数据库创建date字段
大兴区综合软件开发
建邺区方便软件开发创新服务
能打印目录页的数据库
exl导入erp数据库数据
帮别人租了服务器
天津特种网络技术工程
乾庄网络技术
软件开发人员职位
2020hcs网络安全大会
网络安全靠认民演讲稿
山东省专科院校网络技术
客户端如何管理他人数据库
泰拉瑞亚所有服务器地址
数据库同表查重语句
数据库创建登录的账号
虚拟机浏览器显示找不到服务器
ibmlotus服务器
北京约牛网络技术有限公司
管理员网络安全自查汇报
ntp服务器 精度
虹口区网络技术服务咨询
烟草数据库官网
信息网络技术专业就业前景如何
网络安全动画小视频