RadosClient OSDC怎么使用
发表于:2025-12-01 作者:千家信息网编辑
千家信息网最后更新 2025年12月01日,本篇内容介绍了"RadosClient OSDC怎么使用"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成
千家信息网最后更新 2025年12月01日RadosClient OSDC怎么使用
本篇内容介绍了"RadosClient OSDC怎么使用"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
RadosClient.h
class librados::RadosClient : public Dispatcher //继承自Dispatcher(消息分发类){public: using Dispatcher::cct; md_config_t *conf; //配置文件private: enum { DISCONNECTED, CONNECTING, CONNECTED, } state; //网络连接状态 MonClient monclient; // monc Messenger *messenger; //网络消息接口 uint64_t instance_id; //相关消息分发 Dispatcher类的函数重写 bool _dispatch(Message *m); bool ms_dispatch(Message *m); bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new); void ms_handle_connect(Connection *con); bool ms_handle_reset(Connection *con); void ms_handle_remote_reset(Connection *con); Objecter *objecter; // Osdc模块中的 用于发送封装好的OP消息 Mutex lock; Cond cond; SafeTimer timer; //定时器 int refcnt; version_t log_last_version; rados_log_callback_t log_cb; void *log_cb_arg; string log_watch; int wait_for_osdmap();public: Finisher finisher; // 回调函数的类 explicit RadosClient(CephContext *cct_); ~RadosClient(); int ping_monitor(string mon_id, string *result); int connect(); // 连接 void shutdown(); int watch_flush(); int async_watch_flush(AioCompletionImpl *c); uint64_t get_instance_id(); int wait_for_latest_osdmap(); // 根据pool名字或id创建ioctx int create_ioctx(const char *name, IoCtxImpl **io); int create_ioctx(int64_t, IoCtxImpl **io); int get_fsid(std::string *s); // pool相关操作 int64_t lookup_pool(const char *name); bool pool_requires_alignment(int64_t pool_id); int pool_requires_alignment2(int64_t pool_id, bool *requires); uint64_t pool_required_alignment(int64_t pool_id); int pool_required_alignment2(int64_t pool_id, uint64_t *alignment); int pool_get_auid(uint64_t pool_id, unsigned long long *auid); int pool_get_name(uint64_t pool_id, std::string *auid); int pool_list(std::list >& ls); int get_pool_stats(std::list& ls, map& result); int get_fs_stats(ceph_statfs& result); /* -1 was set as the default value and monitor will pickup the right crush rule with below order: a) osd pool default crush replicated ruleset b) the first ruleset in crush ruleset c) error out if no value find */ // 同步创建pool 和 异步创建pool int pool_create(string& name, unsigned long long auid=0, int16_t crush_rule=-1); int pool_create_async(string& name, PoolAsyncCompletionImpl *c, unsigned long long auid=0, int16_t crush_rule=-1); int pool_get_base_tier(int64_t pool_id, int64_t* base_tier); //同步删除和异步删除 int pool_delete(const char *name); int pool_delete_async(const char *name, PoolAsyncCompletionImpl *c); int blacklist_add(const string& client_address, uint32_t expire_seconds); //Mon命令处理,调用monclient.start_mon_command 把命令发送给Mon处理 int mon_command(const vector& cmd, const bufferlist &inbl, bufferlist *outbl, string *outs); int mon_command(int rank, const vector& cmd, const bufferlist &inbl, bufferlist *outbl, string *outs); int mon_command(string name, const vector& cmd, const bufferlist &inbl, bufferlist *outbl, string *outs); //OSD命令处理,objector->osd_command 把命令发送给OSD处理 int osd_command(int osd, vector& cmd, const bufferlist& inbl, bufferlist *poutbl, string *prs); //PG命令处理,objector->pg_command 把命令发送给PG处理 int pg_command(pg_t pgid, vector& cmd, const bufferlist& inbl, bufferlist *poutbl, string *prs); void handle_log(MLog *m); int monitor_log(const string& level, rados_log_callback_t cb, void *arg); void get(); bool put(); void blacklist_self(bool set);}; connect() 连接
int librados::RadosClient::connect(){ common_init_finish(cct); int err; // already connected? if (state == CONNECTING) return -EINPROGRESS; if (state == CONNECTED) return -EISCONN; state = CONNECTING; // get monmap err = monclient.build_initial_monmap(); //通过配置文件获取初始化的Monitor if (err < 0) goto out; err = -ENOMEM; messenger = Messenger::create_client_messenger(cct, "radosclient"); //创建通信模块 if (!messenger) goto out; // require OSDREPLYMUX feature. this means we will fail to talk to // old servers. this is necessary because otherwise we won't know // how to decompose the reply data into its consituent pieces. messenger->set_default_policy(Messenger::Policy::lossy_client(0, CEPH_FEATURE_OSDREPLYMUX)); ldout(cct, 1) << "starting msgr at " << messenger->get_myaddr() << dendl; ldout(cct, 1) << "starting objecter" << dendl; //创建objecter objecter = new (std::nothrow) Objecter(cct, messenger, &monclient, &finisher, cct->_conf->rados_mon_op_timeout, cct->_conf->rados_osd_op_timeout); if (!objecter) goto out; objecter->set_balanced_budget(); // mc添加 messenger monclient.set_messenger(messenger); // objecter 初始化 objecter->init(); // messenger添加 dispather messenger->add_dispatcher_tail(objecter); messenger->add_dispatcher_tail(this); // messenger启动 messenger->start(); ldout(cct, 1) << "setting wanted keys" << dendl; monclient.set_want_keys(CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD); ldout(cct, 1) << "calling monclient init" << dendl; // mc 初始化 err = monclient.init(); if (err) { ldout(cct, 0) << conf->name << " initialization error " << cpp_strerror(-err) << dendl; shutdown(); goto out; } err = monclient.authenticate(conf->client_mount_timeout); if (err) { ldout(cct, 0) << conf->name << " authentication error " << cpp_strerror(-err) << dendl; shutdown(); goto out; } messenger->set_myname(entity_name_t::CLIENT(monclient.get_global_id())); objecter->set_client_incarnation(0); // objecter 启动 objecter->start(); lock.Lock(); // 定时器初始化 timer.init(); monclient.renew_subs(); //执行回调的完成类start finisher.start(); // 更新 状态为已连接 state = CONNECTED; instance_id = monclient.get_global_id(); ...}create_ioctx 根据pool创建ioctx
int librados::RadosClient::create_ioctx(const char *name, IoCtxImpl **io){ // 获取 poolid int64_t poolid = lookup_pool(name); ... // 创建 IoCtxImpl *io = new librados::IoCtxImpl(this, objecter, poolid, CEPH_NOSNAP); return 0;}Mon OSD pg 命令操作
int librados::RadosClient::mon_command(const vector& cmd, const bufferlist &inbl, bufferlist *outbl, string *outs){ // mc start_mon_command 发送到monitor monclient.start_mon_command(cmd, inbl, outbl, outs, new C_SafeCond(&mylock, &cond, &done, &rval));}int librados::RadosClient::osd_command(int osd, vector & cmd, const bufferlist& inbl, bufferlist *poutbl, string *prs){ // 发送到osd int r = objecter->osd_command(osd, cmd, inbl, &tid, poutbl, prs, new C_SafeCond(&mylock, &cond, &done, &ret));}int librados::RadosClient::pg_command(pg_t pgid, vector & cmd, const bufferlist& inbl, bufferlist *poutbl, string *prs){ // 发送到pg int r = objecter->pg_command(pgid, cmd, inbl, &tid, poutbl, prs, new C_SafeCond(&mylock, &cond, &done, &ret));}
Ioctximpl
librados::IoCtx的实现IoCtxImpl
把请求封装成ObjectOperation 类(osdc 中的)
把相关的pool信息添加到里面,封装成Objecter::Op对像
调用相应的函数 objecter- >op_submit 发送给相应的OSD
操作完成后,调用相应的回调函数。
如rados_write
extern "C" int rados_write(rados_ioctx_t io, const char *o, const char *buf, size_t len, uint64_t off){ librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; object_t oid(o); bufferlist bl; bl.append(buf, len); int retval = ctx->write(oid, bl, len, off);}调用IoCtxImpl::write
int librados::IoCtxImpl::write(const object_t& oid, bufferlist& bl, size_t len, uint64_t off){ ::ObjectOperation op; prepare_assert_ops(&op); // assert ops bufferlist mybl; mybl.substr_of(bl, 0, len); op.write(off, mybl); // 封装到op.write Objecter.h ObjectOperation write return operate(oid, &op, NULL); // IoCtxImpl::operate}int librados::IoCtxImpl::operate(const object_t& oid, ::ObjectOperation *o, ceph::real_time *pmtime, int flags){ int op = o->ops[0].op.op; Objecter::Op *objecter_op = objecter->prepare_mutate_op(oid, oloc, *o, snapc, ut, flags, NULL, oncommit, &ver); objecter->op_submit(objecter_op);}AioCompletionImpl
OSDC
ObjectOperation
struct ObjectOperation { vector ops; // ops集合 int flags; int priority; vector out_bl; // 输出bufferlist vector out_handler; // 回调函数 vector out_rval; // 返回码集合 size_t size() { // op个数 return ops.size(); } /** * This is a more limited form of C_Contexts, but that requires * a ceph_context which we don't have here. */ // 用户添加回调函数 class C_TwoContexts : public Context { Context *first; Context *second; }; /** * Add a callback to run when this operation completes, * after any other callbacks for it. */ // 添加回调函数 void add_handler(Context *extra) { size_t last = out_handler.size() - 1; Context *orig = out_handler[last]; if (orig) { Context *wrapper = new C_TwoContexts(orig, extra); out_handler[last] = wrapper; } else { out_handler[last] = extra; } } // 添加操作 OSDOp& add_op(int op) { int s = ops.size(); ops.resize(s+1); ops[s].op.op = op; out_bl.resize(s+1); out_bl[s] = NULL; out_handler.resize(s+1); out_handler[s] = NULL; out_rval.resize(s+1); out_rval[s] = NULL; return ops[s]; } // 添加data void add_data(int op, uint64_t off, uint64_t len, bufferlist& bl) { OSDOp& osd_op = add_op(op); osd_op.op.extent.offset = off; osd_op.op.extent.length = len; osd_op.indata.claim_append(bl); } void add_clone_range(int op, uint64_t off, uint64_t len, const object_t& srcoid, uint64_t srcoff, snapid_t srcsnapid) {} void add_xattr(int op, const char *name, const bufferlist& data) {} void add_xattr_cmp(int op, const char *name, uint8_t cmp_op, uint8_t cmp_mode, const bufferlist& data) {} // 添加call method void add_call(int op, const char *cname, const char *method, bufferlist &indata, bufferlist *outbl, Context *ctx, int *prval) { OSDOp& osd_op = add_op(op); unsigned p = ops.size() - 1; out_handler[p] = ctx; out_bl[p] = outbl; out_rval[p] = prval; osd_op.op.cls.class_len = strlen(cname); osd_op.op.cls.method_len = strlen(method); osd_op.op.cls.indata_len = indata.length(); osd_op.indata.append(cname, osd_op.op.cls.class_len); osd_op.indata.append(method, osd_op.op.cls.method_len); osd_op.indata.append(indata); } void add_pgls(int op, uint64_t count, collection_list_handle_t cookie, epoch_t start_epoch) {} void add_pgls_filter(int op, uint64_t count, const bufferlist& filter, collection_list_handle_t cookie, epoch_t start_epoch) {} void add_alloc_hint(int op, uint64_t expected_object_size, uint64_t expected_write_size) {} // ------ // pg 操作 void pg_ls(uint64_t count, bufferlist& filter, collection_list_handle_t cookie, epoch_t start_epoch) {} void pg_nls(uint64_t count, const bufferlist& filter, collection_list_handle_t cookie, epoch_t start_epoch) {} // 创建 操作 void create(bool excl) { OSDOp& o = add_op(CEPH_OSD_OP_CREATE); o.op.flags = (excl ? CEPH_OSD_OP_FLAG_EXCL : 0); } // 状态 struct C_ObjectOperation_stat : public Context { bufferlist bl; uint64_t *psize; ceph::real_time *pmtime; time_t *ptime; struct timespec *pts; int *prval; // 完成大小,时间等 void finish(int r) {} } }; // 查看状态,获取C_ObjectOperation_stat void stat(uint64_t *psize, ceph::real_time *pmtime, int *prval) {} void stat(uint64_t *psize, time_t *ptime, int *prval) {} void stat(uint64_t *psize, struct timespec *pts, int *prval) {} // object data // 读操作 void read(uint64_t off, uint64_t len, bufferlist *pbl, int *prval, Context* ctx) { bufferlist bl; add_data(CEPH_OSD_OP_READ, off, len, bl); unsigned p = ops.size() - 1; out_bl[p] = pbl; out_rval[p] = prval; out_handler[p] = ctx; } void sparse_read(uint64_t off, uint64_t len, std::map *m, bufferlist *data_bl, int *prval) {} // 写操作 void write(uint64_t off, bufferlist& bl, uint64_t truncate_size, uint32_t truncate_seq) { add_data(CEPH_OSD_OP_WRITE, off, bl.length(), bl); // 添加data, 将WRITE存入ops,将数据放在op中 OSDOp& o = *ops.rbegin(); o.op.extent.truncate_size = truncate_size; o.op.extent.truncate_seq = truncate_seq; } void write(uint64_t off, bufferlist& bl) {} void write_full(bufferlist& bl) {} void append(bufferlist& bl) {} void zero(uint64_t off, uint64_t len) {} void truncate(uint64_t off) {} void remove() {} void mapext(uint64_t off, uint64_t len) {} void sparse_read(uint64_t off, uint64_t len) {} void clone_range(const object_t& src_oid, uint64_t src_offset, uint64_t len, uint64_t dst_offset) {} // object attrs // 属性操作 void getxattr(const char *name, bufferlist *pbl, int *prval) {} void getxattrs(std::map *pattrs, int *prval) {} void setxattr(const char *name, const bufferlist& bl) {} void setxattr(const char *name, const string& s) {} void cmpxattr(const char *name, uint8_t cmp_op, uint8_t cmp_mode, const bufferlist& bl) {} void rmxattr(const char *name) {} void setxattrs(map& attrs) {} void resetxattrs(const char *prefix, map& attrs) {} // trivialmap void tmap_update(bufferlist& bl) {} void tmap_put(bufferlist& bl) {} void tmap_get(bufferlist *pbl, int *prval) {} void tmap_get() {} void tmap_to_omap(bool nullok=false) {} // objectmap void omap_get_keys(const string &start_after, uint64_t max_to_get, std::set *out_set, int *prval) { OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETKEYS); bufferlist bl; ::encode(start_after, bl); ::encode(max_to_get, bl); op.op.extent.offset = 0; op.op.extent.length = bl.length(); op.indata.claim_append(bl); if (prval || out_set) { unsigned p = ops.size() - 1; C_ObjectOperation_decodekeys *h = new C_ObjectOperation_decodekeys(out_set, prval); out_handler[p] = h; out_bl[p] = &h->bl; out_rval[p] = prval; } } void omap_get_vals(const string &start_after, const string &filter_prefix, uint64_t max_to_get, std::map *out_set, int *prval) {} void omap_get_vals_by_keys(const std::set &to_get, std::map *out_set, int *prval) {} void omap_cmp(const std::map > &assertions, int *prval) {} void copy_get(object_copy_cursor_t *cursor, uint64_t max, uint64_t *out_size, ceph::real_time *out_mtime, std::map *out_attrs, bufferlist *out_data, bufferlist *out_omap_header, bufferlist *out_omap_data, vector *out_snaps, snapid_t *out_snap_seq, uint32_t *out_flags, uint32_t *out_data_digest, uint32_t *out_omap_digest, vector > *out_reqids, uint64_t *truncate_seq, uint64_t *truncate_size, int *prval) {} void undirty() {} struct C_ObjectOperation_isdirty : public Context {}; void is_dirty(bool *pisdirty, int *prval) {} void omap_get_header(bufferlist *bl, int *prval) {} void omap_set(const map &map) {} void omap_set_header(bufferlist &bl) {} void omap_clear() {} void omap_rm_keys(const std::set &to_remove) {} // object classes void call(const char *cname, const char *method, bufferlist &indata) {} void call(const char *cname, const char *method, bufferlist &indata, bufferlist *outdata, Context *ctx, int *prval) {} void rollback(uint64_t snapid) {} void copy_from(object_t src, snapid_t snapid, object_locator_t src_oloc, version_t src_version, unsigned flags, unsigned src_fadvise_flags) {}}; OSDOp osd_types.h
struct OSDOp { ceph_osd_op op; // 操作 sobject_t soid; // oid bufferlist indata, outdata; // 输入输出data int32_t rval; // 返回码};Objecter
class Objecter : public md_config_obs_t, public Dispatcher {public: Messenger *messenger; // 消息 MonClient *monc; // mc Finisher *finisher;private: OSDMap *osdmap; // osdmappublic: using Dispatcher::cct; std::multimap crush_location; atomic_t initialized;private: atomic64_t last_tid; atomic_t inflight_ops; atomic_t client_inc; uint64_t max_linger_id; atomic_t num_unacked; atomic_t num_uncommitted; atomic_t global_op_flags; // flags which are applied to each IO op bool keep_balanced_budget; bool honor_osdmap_full;public: void maybe_request_map();private: void _maybe_request_map(); version_t last_seen_osdmap_version; version_t last_seen_pgmap_version; mutable boost::shared_mutex rwlock; using lock_guard = std::unique_lock; using unique_lock = std::unique_lock; using shared_lock = boost::shared_lock; using shunique_lock = ceph::shunique_lock; ceph::timer timer; PerfCounters *logger; uint64_t tick_event; void start_tick(); void tick(); void update_crush_location();public: /*** track pending operations ***/ // readpublic: struct OSDSession; struct op_target_t {} // 操作目标 struct Op : public RefCountedObject {}; // 操作}; op_target_t
操作目标,封装pg信息,osd信息
struct op_target_t { int flags; object_t base_oid; object_locator_t base_oloc; object_t target_oid; // 目标oid object_locator_t target_oloc; // 位置 // 是否 base_pgid bool precalc_pgid; ///< true if we are directed at base_pgid, not base_oid // 直接的 pgid pg_t base_pgid; ///< explciti pg target, if any pg_t pgid; ///< last pg we mapped to unsigned pg_num; ///< last pg_num we mapped to unsigned pg_num_mask; ///< last pg_num_mask we mapped to // 启动的osd vector up; ///< set of up osds for last pg we mapped to // acting osd vector acting; ///< set of acting osds for last pg we mapped to // primary int up_primary; ///< primary for last pg we mapped to based on the up set int acting_primary; ///< primary for last pg we mapped to based on the /// acting set // pool 大小 int size; ///< the size of the pool when were were last mapped // pool 最小size int min_size; ///< the min size of the pool when were were last mapped // 是否按位排序 bool sort_bitwise; ///< whether the hobject_t sort order is bitwise // 是否副本 bool used_replica; bool paused; int osd; ///< the final target osd, or -1 }; 操作
struct Op : public RefCountedObject { OSDSession *session; // session 连接 int incarnation; op_target_t target; // 操作目标 ConnectionRef con; // for rx buffer only uint64_t features; // explicitly specified op features vector ops; // 操作集合 snapid_t snapid; SnapContext snapc; ceph::real_time mtime; bufferlist *outbl; vector out_bl; vector out_handler; vector out_rval; int priority; Context *onack, *oncommit; uint64_t ontimeout; Context *oncommit_sync; // used internally by watch/notify ceph_tid_t tid; eversion_t replay_version; // for op replay int attempts; version_t *objver; epoch_t *reply_epoch; ceph::mono_time stamp; epoch_t map_dne_bound; bool budgeted; /// true if we should resend this message on failure bool should_resend; /// true if the throttle budget is get/put on a series of OPs, /// instead of per OP basis, when this flag is set, the budget is /// acquired before sending the very first OP of the series and /// released upon receiving the last OP reply. bool ctx_budgeted; int *data_offset; epoch_t last_force_resend; osd_reqid_t reqid; // explicitly setting reqid }; 分片 Striper
扩展 ObjectExtent
记录分片信息
class ObjectExtent { public: object_t oid; // object id uint64_t objectno; // 序号 uint64_t offset; // in object object内偏移 uint64_t length; // in object 分片长度 uint64_t truncate_size; // in object object_locator_t oloc; // object locator (pool etc) pool位置 vector > buffer_extents; // off -> len. extents in buffer being mapped (may be fragmented bc of striping!) }; "RadosClient OSDC怎么使用"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!
函数
命令
处理
消息
封装
信息
状态
目标
输出
位置
内容
大小
定时器
文件
更多
知识
网络
同步
配置
实用
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
服务器硬盘亮红灯报警丢失
回连c2服务器
虚谷数据库和oracle
导致服务器会有短时间的卡顿
缺乏网络技术支撑
原生sql连接数据库
行政单位软件开发管理
商业密码保护网络安全吗
盈盈网络技术有限公司
服务器设地址设置什么比较好
影院服务器主板价格
天津网络技术咨询口碑推荐
寄递业务四大数据库指的什么
服务器关闭后怎么没有属性
生态船服务器
关于网络安全政策
网络安全注册审批
网络服务器硬件的要求
超市数据库系统图片
关于网络安全的标语怎么写
vue打包到服务器上组件报错
如何注意网络安全问题文字
软件开发价款支付依据
2018南阳网络安全
网络安全对保险行业的影响
工业互联网软件开发与应用
数据库用户中指定登录名
邢台燃气集团网络安全测评
山东网络技术
如何加强乡镇网络安全