如何实现ceph SimpleMessenger模块消息的接收
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,小编给大家分享一下如何实现ceph SimpleMessenger模块消息的接收,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!OSD服务端消息的接收起始于OSD::init()中的m
千家信息网最后更新 2025年12月02日如何实现ceph SimpleMessenger模块消息的接收
小编给大家分享一下如何实现ceph SimpleMessenger模块消息的接收,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!
OSD服务端消息的接收起始于OSD::init()中的messenger::add_dispatcher_head(Dispatcher *d)函数
|- 358 void add_dispatcher_head(Dispatcher *d) {|| 359 bool first = dispatchers.empty();|| 360 dispatchers.push_front(d);|| 361 if (d->ms_can_fast_dispatch_any())|| 362 fast_dispatchers.push_front(d);|| 363 if (first)|| 364 ready(); //如果dispatcher list空,启动SimpleMessenger::ready,不为空证明SimpleMessenger已经启动了|| 365 }在SimpleMessenger::ready()中,启动DispatchQueue等待mqueue,如果绑定了端口就启动 accepter接收线程
76 void SimpleMessenger::ready()- 77 {| 78 ldout(cct,10) << "ready " << get_myaddr() << dendl;| 79 dispatch_queue.start(); //启动DispatchQueue,等待mqueue| 80 | 81 lock.Lock();| 82 if (did_bind)| 83 accepter.start();| 84 lock.Unlock();| 85 }Accepter是Thread的继承类,Accepter::start()最终调用Accepter::entry(),在entry中 accept并把接收到的sd加入到Pipe类中
void *Accepter::entry(){ ... struct pollfd pfd; pfd.fd = listen_sd; pfd.events = POLLIN | POLLERR | POLLNVAL | POLLHUP; while (!done) { int r = poll(&pfd, 1, -1); if (pfd.revents & (POLLERR | POLLNVAL | POLLHUP)) break; // accept entity_addr_t addr; socklen_t slen = sizeof(addr.ss_addr()); int sd = ::accept(listen_sd, (sockaddr*)&addr.ss_addr(), &slen); if (sd >= 0) { errors = 0; ldout(msgr->cct,10) << "accepted incoming on sd " << sd << dendl; msgr->add_accept_pipe(sd); //注册一个pipe,启动读线程,从该sd中读取数据 } else { ldout(msgr->cct,0) << "accepter no incoming connection? sd = " << sd << " errno " << errno << " " << cpp_strerror(errno) << dendl; if (++errors > 4) break; } } ... return 0;在SimpleMessenger::add_accept_pipe(int sd)中,申请一个Pipe类并把sd加入到Pipe中,开始Pipe::start_reader()
340 Pipe *SimpleMessenger::add_accept_pipe(int sd)- 341 { | 342 lock.Lock();| 343 Pipe *p = new Pipe(this, Pipe::STATE_ACCEPTING, NULL);| 344 p->sd = sd;| 345 p->pipe_lock.Lock();| 346 p->start_reader();| 347 p->pipe_lock.Unlock();| 348 pipes.insert(p);| 349 accepting_pipes.insert(p);| 350 lock.Unlock();| 351 return p;| 352 }Pipe类内部有一个Reader和Writer线程类,Pipe::start_reader()启动Pipe::Reader::entry(),最终启动Pipe::reader函数
134 void Pipe::start_reader()- 135 {| 136 assert(pipe_lock.is_locked());| 137 assert(!reader_running);|- 138 if (reader_needs_join) {|| 139 reader_thread.join();|| 140 reader_needs_join = false;|| 141 }| 142 reader_running = true;| 143 reader_thread.create("ms_pipe_read", msgr->cct->_conf->ms_rwthread_stack_bytes);| 144 }|- 48 class Reader : public Thread {|| 49 Pipe *pipe;|| 50 public:|| 51 explicit Reader(Pipe *p) : pipe(p) {}|| 52 void *entry() { pipe->reader(); return 0; }|| 53 } reader_thread;在Pipe::reader函数中根据tag接收不同类型的消息,如果是CEPH_MSGR_TAG_MSG类型消息调用read_message接收消息,并把消息加入到mqueue中
void Pipe::reader(){ pipe_lock.Lock(); if (state == STATE_ACCEPTING) { accept(); //第一次进入此函数处理 assert(pipe_lock.is_locked()); } // loop. while (state != STATE_CLOSED && state != STATE_CONNECTING) { assert(pipe_lock.is_locked()); ...... ...... else if (tag == CEPH_MSGR_TAG_MSG) { ldout(msgr->cct,20) << "reader got MSG" << dendl; Message *m = 0; int r = read_message(&m, auth_handler.get()); pipe_lock.Lock(); if (!m) { if (r < 0) fault(true); continue; } ...... ...... ...... // note last received message. in_seq = m->get_seq(); cond.Signal(); // wake up writer, to ack this ldout(msgr->cct,10) << "reader got message " << m->get_seq() << " " << m << " " << *m << dendl; in_q->fast_preprocess(m); //mds 、mon不会进入此函数,预处理 if (delay_thread) { utime_t release; if (rand() % 10000 < msgr->cct->_conf->ms_inject_delay_probability * 10000.0) { release = m->get_recv_stamp(); release += msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0; lsubdout(msgr->cct, ms, 1) << "queue_received will delay until " << release << " on " << m << " " << *m << dendl; } delay_thread->queue(release, m); } else { if (in_q->can_fast_dispatch(m)) { reader_dispatching = true; pipe_lock.Unlock(); in_q->fast_dispatch(m); pipe_lock.Lock(); reader_dispatching = false; if (state == STATE_CLOSED || notify_on_dispatch_done) { // there might be somebody waiting notify_on_dispatch_done = false; cond.Signal(); } } else { //mds进入此else in_q->enqueue(m, m->get_priority(), conn_id); //把接收到的messenger加入到mqueue中 } } } ...... ...... } // reap? reader_running = false; reader_needs_join = true; unlock_maybe_reap(); ldout(msgr->cct,10) << "reader done" << dendl;}在Pipe::DispatchQueue::enqueue函数中加入到mqueue中
void DispatchQueue::enqueue(Message *m, int priority, uint64_t id){ Mutex::Locker l(lock); ldout(cct,20) << "queue " << m << " prio " << priority << dendl; add_arrival(m); if (priority >= CEPH_MSG_PRIO_LOW) { mqueue.enqueue_strict( id, priority, QueueItem(m)); } else { mqueue.enqueue( id, priority, m->get_cost(), QueueItem(m)); } cond.Signal(); //唤醒dispatch_queue.start() 启动的dispatchThread,进入entry进行处理}看完了这篇文章,相信你对"如何实现ceph SimpleMessenger模块消息的接收"有了一定的了解,如果想了解更多相关知识,欢迎关注行业资讯频道,感谢各位的阅读!
消息
函数
线程
模块
篇文章
类型
处理
不同
完了
数据
更多
知识
端口
第一次
行业
资讯
资讯频道
频道
中加
服务
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
软件开发技术技巧性
完整的仓库管理数据库
2018国家网络安全答题
宏城互联网科技
服务器如何加下载的材质包
远程网络技术培训内容
专科生能学网络安全专业吗
数据库以后能干什么
铁路网络安全杂志
广州网络安全有限公司
软件工程金融软件开发方向
矿山利用统计数据库管理系统下载
深圳市匹配网络技术有限公司
网络安全法的四大亮点
公司未备案服务器
应聘网络技术员的问题和答案
软件开发类公司业务模式
云柜网络技术
服务器最大连接数
北图网络技术怎么样
重庆网络安全攻防竞赛
医院的网络安全建设
软件开发公司增值税率
web服务器文字编码
mongodb后台数据库
部队网络安全防微杜渐心得体会
3204服务器cpu怎样
医保局网络安全宣传工作总结
成绩差女孩子学软件开发怎么样
上亿简历大数据库