千家信息网

PostgreSQL中ReceiveXlogStream有什么作用

发表于:2025-11-12 作者:千家信息网编辑
千家信息网最后更新 2025年11月12日,这篇文章主要介绍"PostgreSQL中ReceiveXlogStream有什么作用",在日常操作中,相信很多人在PostgreSQL中ReceiveXlogStream有什么作用问题上存在疑惑,小编
千家信息网最后更新 2025年11月12日PostgreSQL中ReceiveXlogStream有什么作用

这篇文章主要介绍"PostgreSQL中ReceiveXlogStream有什么作用",在日常操作中,相信很多人在PostgreSQL中ReceiveXlogStream有什么作用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"PostgreSQL中ReceiveXlogStream有什么作用"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

本节简单介绍了PostgreSQL的备份工具pg_basebackup源码中实际执行备份逻辑的BaseBackup中对WAL数据进行备份的实现函数StartLogStreamer->LogStreamerMain及其主要的实现函数ReceiveXlogStream.

一、数据结构

logstreamer_param
WAL data streamer参数.

typedef struct{     ////后台连接    PGconn     *bgconn;    //开始位置    XLogRecPtr  startptr;    //目录或者tar文件,依赖于使用的模式    char        xlog[MAXPGPATH];    /* directory or tarfile depending on mode */    //系统标识符    char       *sysidentifier;    //时间线    int         timeline;} logstreamer_param;

StreamCtl
接收xlog流数据时的全局参数

/* * Global parameters when receiving xlog stream. For details about the individual fields, * see the function comment for ReceiveXlogStream(). * 接收xlog流数据时的全局参数. * 每个域字段的详细解释,参见ReceiveXlogStream()函数注释. */typedef struct StreamCtl{    //streaming的开始位置    XLogRecPtr  startpos;       /* Start position for streaming */    //时间线    TimeLineID  timeline;       /* Timeline to stream data from */    //系统标识符    char       *sysidentifier;  /* Validate this system identifier and                                 * timeline */    //standby超时信息    int         standby_message_timeout;    /* Send status messages this often */    //是否同步(写入时是否马上Flush WAL data)    bool        synchronous;    /* Flush immediately WAL data on write */    //在已归档的数据中标记segment为已完成    bool        mark_done;      /* Mark segment as done in generated archive */    //刷新到磁盘上以确保数据的一致性状态(是否已刷新到磁盘上)    bool        do_sync;        /* Flush to disk to ensure consistent state of                                 * data */    //在返回T时停止streaming    stream_stop_callback stream_stop;   /* Stop streaming when returns true */    //如有效,监测该socket中的输入并检查stream_stop()的返回    pgsocket    stop_socket;    /* if valid, watch for input on this socket                                 * and check stream_stop() when there is any */    //如何写WAL    WalWriteMethod *walmethod;  /* How to write the WAL */    //附加到部分接受文件的后缀    char       *partial_suffix; /* Suffix appended to partially received files */    //使用的replication slot,如无则为NULL    char       *replication_slot;   /* Replication slot to use, or NULL */} StreamCtl;

二、源码解读

LogStreamerMain
WAL流复制主函数,用于fork后的子进程调用

static intLogStreamerMain(logstreamer_param *param){    StreamCtl   stream;//接收xlog流数据时的全局参数    in_log_streamer = true;    //初始化StreamCtl结构体    MemSet(&stream, 0, sizeof(stream));    stream.startpos = param->startptr;    stream.timeline = param->timeline;    stream.sysidentifier = param->sysidentifier;    stream.stream_stop = reached_end_position;#ifndef WIN32    stream.stop_socket = bgpipe[0];#else    stream.stop_socket = PGINVALID_SOCKET;#endif    stream.standby_message_timeout = standby_message_timeout;    stream.synchronous = false;    stream.do_sync = do_sync;    stream.mark_done = true;    stream.partial_suffix = NULL;    stream.replication_slot = replication_slot;    if (format == 'p')        stream.walmethod = CreateWalDirectoryMethod(param->xlog, 0, do_sync);    else        stream.walmethod = CreateWalTarMethod(param->xlog, compresslevel, do_sync);    //接收数据    if (!ReceiveXlogStream(param->bgconn, &stream))        /*         * Any errors will already have been reported in the function process,         * but we need to tell the parent that we didn't shutdown in a nice         * way.         * 在函数执行过程中出现的错误已通过警告的方式发出,         * 但仍需要告知父进程不能优雅的关闭本进程.         */        return 1;    if (!stream.walmethod->finish())    {        fprintf(stderr,                _("%s: could not finish writing WAL files: %s\n"),                progname, strerror(errno));        return 1;    }    //结束连接    PQfinish(param->bgconn);    //普通文件格式    if (format == 'p')        FreeWalDirectoryMethod();    else        FreeWalTarMethod();    //是否内存    pg_free(stream.walmethod);    return 0;}

ReceiveXlogStream
在指定的开始位置接收log stream

/* * Receive a log stream starting at the specified position. * 在指定的开始位置接收log stream * * Individual parameters are passed through the StreamCtl structure. * 通过StreamCtl结构体传递参数. * * If sysidentifier is specified, validate that both the system * identifier and the timeline matches the specified ones * (by sending an extra IDENTIFY_SYSTEM command) * 如指定了系统标识符,验证系统标识符和timeline是否匹配指定的信息. * (通过发送额外的IDENTIFY_SYSTEM命令) * * All received segments will be written to the directory * specified by basedir. This will also fetch any missing timeline history * files. * 所有接收到的segments会写入到basedir中. * 这同时会提前所有缺失的timeline history文件. * * The stream_stop callback will be called every time data * is received, and whenever a segment is completed. If it returns * true, the streaming will stop and the function * return. As long as it returns false, streaming will continue * indefinitely. * stream_stop回调函数在每次接收到数据以及segment完成传输后调用. * 如返回T,streaming会停止,函数返回. * 如返回F,streaming会一直继续. * * If stream_stop() checks for external input, stop_socket should be set to * the FD it checks.  This will allow such input to be detected promptly * rather than after standby_message_timeout (which might be indefinite). * Note that signals will interrupt waits for input as well, but that is * race-y since a signal received while busy won't interrupt the wait. * 如stream_stop()用于检测额外的输入,stop_socket变量应设置为该函数需检查的FD. * 这会允许立即检测此类输入,而不是在standby_message_timeout之后(可能会无限循环). * 注意信号也会中断输入等待,但这是存在竞争的,因为在忙时接收到信号不会中断等待. * * standby_message_timeout controls how often we send a message * back to the master letting it know our progress, in milliseconds. * Zero means no messages are sent. * This message will only contain the write location, and never * flush or replay. * standby_message_timeout控制发送进度消息回master的频度,单位为ms. * 0意味着没有消息会发送. * 该消息只保存写入位置,永远不会flush或replay. * * If 'partial_suffix' is not NULL, files are initially created with the * given suffix, and the suffix is removed once the file is finished. That * allows you to tell the difference between partial and completed files, * so that you can continue later where you left. * 如'partial_suffix'不为NULL,文件已通过给定的suffix创建, *   一旦文件完成传输,则suffix会被清除. * 这是部分和完整完成文件的异同,以便在离开后可以继续. * * If 'synchronous' is true, the received WAL is flushed as soon as written, * otherwise only when the WAL file is closed. * 如'synchronous'为T,接收到的WAL会刷新为写入,否则的话只会在WAL file关闭时才写入. * * Note: The WAL location *must* be at a log segment start! * 注意:WAL位置必须是log segment的起始位置. */boolReceiveXlogStream(PGconn *conn, StreamCtl *stream){    char        query[128];    char        slotcmd[128];    PGresult   *res;    XLogRecPtr  stoppos;    /*     * The caller should've checked the server version already, but doesn't do     * any harm to check it here too.     * 调用者已完成版本校验,但这里重复校验并没有什么问题.     */    if (!CheckServerVersionForStreaming(conn))        return false;    /*     * Decide whether we want to report the flush position. If we report the     * flush position, the primary will know what WAL we'll possibly     * re-request, and it can then remove older WAL safely. We must always do     * that when we are using slots.     * 确定是否需要报告flush位置.     * 如果我们报告了flush位置,主服务器将会知道可能重复请求的WAL file,     *   这样可以安全的移除更老的WAL.     * 如使用slots,应经常执行该操作.     *     * Reporting the flush position makes one eligible as a synchronous     * replica. People shouldn't include generic names in     * synchronous_standby_names, but we've protected them against it so far,     * so let's continue to do so unless specifically requested.     * 报告flush位置使其符合同步副本的条件.     * DBA不应该在synchronous_standby_names中包含常规的名称,但我们截止目前位置已很好的保护了它们,     *   因此可以继续这样执行除非特别请求.     */    if (stream->replication_slot != NULL)    {        //存在slot        reportFlushPosition = true;        sprintf(slotcmd, "SLOT \"%s\" ", stream->replication_slot);    }    else    {        if (stream->synchronous)            reportFlushPosition = true;//同步        else            reportFlushPosition = false;//异步        slotcmd[0] = 0;//ASCII 0    }    if (stream->sysidentifier != NULL)    {        //系统标识符不为NULL        /* Validate system identifier hasn't changed */        //验证系统标识符没有改变        //发送IDENTIFY_SYSTEM命令        res = PQexec(conn, "IDENTIFY_SYSTEM");        if (PQresultStatus(res) != PGRES_TUPLES_OK)        {            fprintf(stderr,                    _("%s: could not send replication command \"%s\": %s"),                    progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));            PQclear(res);            return false;        }        if (PQntuples(res) != 1 || PQnfields(res) < 3)        {            fprintf(stderr,                    _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d r more fields\n"),                    progname, PQntuples(res), PQnfields(res), 1, 3);            PQclear(res);            return false;        }        if (strcmp(stream->sysidentifier, PQgetvalue(res, 0, 0)) != 0)        {            fprintf(stderr,                    _("%s: system identifier does not match between base backup and streaming onnection\n"),                    progname);            PQclear(res);            return false;        }        if (stream->timeline > atoi(PQgetvalue(res, 0, 1)))        {            fprintf(stderr,                    _("%s: starting timeline %u is not present in the server\n"),                    progname, stream->timeline);            PQclear(res);            return false;        }        PQclear(res);    }    /*     * initialize flush position to starting point, it's the caller's     * responsibility that that's sane.     * 初始化flush位置为开始点,这是调用者的责任.     */    lastFlushPosition = stream->startpos;    while (1)    {        /*         * Fetch the timeline history file for this timeline, if we don't have         * it already. When streaming log to tar, this will always return         * false, as we are never streaming into an existing file and         * therefore there can be no pre-existing timeline history file.         * 为该timeline提前timeline history,如我们已不需要.         * 如streaming日志为tar格式,这通常会返回F,这如同从来没有streaming到已存在的文件中,         *   因此没有已存在的timeline history文件.         */        if (!existsTimeLineHistoryFile(stream))        {            //如不存在history文件            snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);            //发送TIMELINE_HISTORY命令            res = PQexec(conn, query);            if (PQresultStatus(res) != PGRES_TUPLES_OK)            {                /* FIXME: we might send it ok, but get an error */                fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),                        progname, "TIMELINE_HISTORY", PQresultErrorMessage(res));                PQclear(res);                return false;            }            /*             * The response to TIMELINE_HISTORY is a single row result set             * with two fields: filename and content             * TIMELINE_HISTORY的响应是一个单行结果集,有两个字段:filename和content             */            if (PQnfields(res) != 2 || PQntuples(res) != 1)            {                fprintf(stderr,                        _("%s: unexpected response to TIMELINE_HISTORY command: got %d rows and %d ields, expected %d rows and %d fields\n"),                        progname, PQntuples(res), PQnfields(res), 1, 2);            }            /* Write the history file to disk */            //写入history文件到磁盘上            writeTimeLineHistoryFile(stream,                                     PQgetvalue(res, 0, 0),                                     PQgetvalue(res, 0, 1));            PQclear(res);        }        /*         * Before we start streaming from the requested location, check if the         * callback tells us to stop here.         * 从请求的位置开始streaming前,检查回调函数告诉我们在哪停止         */        if (stream->stream_stop(stream->startpos, stream->timeline, false))            return true;        /* Initiate the replication stream at specified location */        //在指定的位置初始化复制流        snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",                 slotcmd,                 (uint32) (stream->startpos >> 32), (uint32) stream->startpos,                 stream->timeline);        //发送START_REPLICATION命令        res = PQexec(conn, query);        if (PQresultStatus(res) != PGRES_COPY_BOTH)        {            fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),                    progname, "START_REPLICATION", PQresultErrorMessage(res));            PQclear(res);            return false;        }        PQclear(res);        /* Stream the WAL */        //流化WAL        res = HandleCopyStream(conn, stream, &stoppos);        if (res == NULL)            goto error;        /*         * Streaming finished.         *         * There are two possible reasons for that: a controlled shutdown, or         * we reached the end of the current timeline. In case of         * end-of-timeline, the server sends a result set after Copy has         * finished, containing information about the next timeline. Read         * that, and restart streaming from the next timeline. In case of         * controlled shutdown, stop here.         * Streaming完成.         * 这里有两个可能的原因:可控的shutdown或者到达了当前时间线的末尾.         * 在end-of-timeline这种情况下,服务器在Copy完成后发送结果集,         *   含有关于下一个时间线的相关信息.         * 读取这些信息,在下一个时间线开始重新启动streaming.         * 如为可控的关闭,可以停止了.         */        if (PQresultStatus(res) == PGRES_TUPLES_OK)        {            /*             * End-of-timeline. Read the next timeline's ID and starting             * position. Usually, the starting position will match the end of             * the previous timeline, but there are corner cases like if the             * server had sent us half of a WAL record, when it was promoted.             * The new timeline will begin at the end of the last complete             * record in that case, overlapping the partial WAL record on the             * old timeline.             * 这是End-of-timeline的情况.             * 读取下一个时间线ID和开始位置.通常来说,开始位置将匹配先前时间线的末尾,             *   但会存在特殊的情况比如服务器已经传输了WAL Record的一部分.             * 这种情况下,新的时间线会在上次已完成的记录末尾开始,与旧时间线的部分WAL Record重叠.             */            uint32      newtimeline;//新的时间线            bool        parsed;//是否解析            //读取结果集的末尾            parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);            PQclear(res);            if (!parsed)                goto error;            /* Sanity check the values the server gave us */            //执行校验和坚持            if (newtimeline <= stream->timeline)            {                //新的时间线不可能小于等于stream中的时间线                fprintf(stderr,                        _("%s: server reported unexpected next timeline %u, following timeline %u\n"),                        progname, newtimeline, stream->timeline);                goto error;            }            if (stream->startpos > stoppos)            {                //开始位置大于结束位置                fprintf(stderr,                        _("%s: server stopped streaming timeline %u at %X/%X, but reported next timeline u to begin at %X/%X\n"),                        progname,                        stream->timeline, (uint32) (stoppos >> 32), (uint32) stoppos,                        newtimeline, (uint32) (stream->startpos >> 32), (uint32) stream->startpos);                goto error;            }            /* Read the final result, which should be CommandComplete. */            //读取最后的结果,应为命令结束            res = PQgetResult(conn);            if (PQresultStatus(res) != PGRES_COMMAND_OK)            {                fprintf(stderr,                        _("%s: unexpected termination of replication stream: %s"),                        progname, PQresultErrorMessage(res));                PQclear(res);                goto error;            }            PQclear(res);            /*             * Loop back to start streaming from the new timeline. Always             * start streaming at the beginning of a segment.             * 从新时间线开始循环,通常会在segment的开始出开始streaming             */            stream->timeline = newtimeline;            stream->startpos = stream->startpos -                XLogSegmentOffset(stream->startpos, WalSegSz);            continue;//继续循环        }        else if (PQresultStatus(res) == PGRES_COMMAND_OK)        {            PQclear(res);            /*             * End of replication (ie. controlled shut down of the server).             * replication完成(比如服务器关闭了复制)             *             * Check if the callback thinks it's OK to stop here. If not,             * complain.             * 检查是否回调函数认为在这里停止就OK了,如果不是,则报警.             */            if (stream->stream_stop(stoppos, stream->timeline, false))                return true;            else            {                fprintf(stderr, _("%s: replication stream was terminated before stop point\n"),                        progname);                goto error;            }        }        else        {            /* Server returned an error. */            //返回错误            fprintf(stderr,                    _("%s: unexpected termination of replication stream: %s"),                    progname, PQresultErrorMessage(res));            PQclear(res);            goto error;        }    }error:    if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NO_RENAME) != 0)        fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),                progname, current_walfile_name, stream->walmethod->getlasterror());    walfile = NULL;    return false;}/* * The main loop of ReceiveXlogStream. Handles the COPY stream after * initiating streaming with the START_REPLICATION command. * ReceiveXlogStream中的主循环实现函数. * 在使用START_REPLICATION命令初始化streaming后处理COPY stream. * * If the COPY ends (not necessarily successfully) due a message from the * server, returns a PGresult and sets *stoppos to the last byte written. * On any other sort of error, returns NULL. * 如COPY由于服务器端的原因终止,返回PGresult并设置*stoppos为最后写入的字节. * 如出现错误,则返回NULL. */static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream,                 XLogRecPtr *stoppos){    char       *copybuf = NULL;    TimestampTz last_status = -1;    XLogRecPtr  blockpos = stream->startpos;    still_sending = true;    while (1)    {        //循环处理        int         r;        TimestampTz now;//时间戳        long        sleeptime;        /*         * Check if we should continue streaming, or abort at this point.         * 检查我们是否应该继续streaming,或者在当前就退出         */        if (!CheckCopyStreamStop(conn, stream, blockpos, stoppos))            goto error;        now = feGetCurrentTimestamp();        /*         * If synchronous option is true, issue sync command as soon as there         * are WAL data which has not been flushed yet.         * 如同步选项为T,只要存在未flushed的WAL data,马上执行sync命令.         */        if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL)        {            if (stream->walmethod->sync(walfile) != 0)            {                fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),                        progname, current_walfile_name, stream->walmethod->getlasterror());                goto error;            }            lastFlushPosition = blockpos;            /*             * Send feedback so that the server sees the latest WAL locations             * immediately.             * 发送反馈以便服务器马上可看到最后的WAL位置.             */            if (!sendFeedback(conn, blockpos, now, false))                goto error;            last_status = now;        }        /*         * Potentially send a status message to the master         * 可能向主服务器发送状态消息         */        if (still_sending && stream->standby_message_timeout > 0 &&            feTimestampDifferenceExceeds(last_status, now,                                         stream->standby_message_timeout))        {            /* Time to send feedback! */            //是时候发送反馈了.            if (!sendFeedback(conn, blockpos, now, false))                goto error;            last_status = now;        }        /*         * Calculate how long send/receive loops should sleep         * 计算send/receive循环应该睡眠多长时间         */        sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout,                                                 last_status);        //拷贝stream中接收到的内容        r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, ©buf);        while (r != 0)        {            if (r == -1)                goto error;//出错            if (r == -2)            {                //已完结或出错                PGresult   *res = HandleEndOfCopyStream(conn, stream, copybuf, blockpos, stoppos);                if (res == NULL)                    goto error;                else                    return res;            }            /* Check the message type. */            //检查消息类型            if (copybuf[0] == 'k')            {                if (!ProcessKeepaliveMsg(conn, stream, copybuf, r, blockpos,                                         &last_status))                    goto error;            }            else if (copybuf[0] == 'w')            {                if (!ProcessXLogDataMsg(conn, stream, copybuf, r, &blockpos))                    goto error;                /*                 * Check if we should continue streaming, or abort at this                 * point.                 * 检查我们是否应该继续streaming或者在此就停止                 */                if (!CheckCopyStreamStop(conn, stream, blockpos, stoppos))                    goto error;            }            else            {                fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),                        progname, copybuf[0]);                goto error;            }            /*             * Process the received data, and any subsequent data we can read             * without blocking.             * 处理接收到的数据,后续的数据可以无阻塞的读取.             */            r = CopyStreamReceive(conn, 0, stream->stop_socket, ©buf);        }    }error:    if (copybuf != NULL)        PQfreemem(copybuf);    return NULL;}/* * Check if we should continue streaming, or abort at this point. */static boolCheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos,                    XLogRecPtr *stoppos){    if (still_sending && stream->stream_stop(blockpos, stream->timeline, false))    {        if (!close_walfile(stream, blockpos))        {            /* Potential error message is written by close_walfile */            return false;        }        if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))        {            fprintf(stderr, _("%s: could not send copy-end packet: %s"),                    progname, PQerrorMessage(conn));            return false;        }        still_sending = false;    }    return true;}/* * Receive CopyData message available from XLOG stream, blocking for * maximum of 'timeout' ms. * 接收从XLOG stream中可用的CopyData消息,如超出最大的'timeout'毫秒,需要阻塞. * * If data was received, returns the length of the data. *buffer is set to * point to a buffer holding the received message. The buffer is only valid * until the next CopyStreamReceive call. * 如接收到数据,则返回数据的大小. * 变量*buffer设置为指向含有接收到消息的buffer.buffer在下一个CopyStreamReceive调用才会生效. * * Returns 0 if no data was available within timeout, or if wait was * interrupted by signal or stop_socket input. * -1 on error. -2 if the server ended the COPY. * 如在timeout时间内没有数据返回,或者如果因为信号等待/stop_socket输入中断,则返回0. * -1:表示出现错误.-2表示服务器完成了COPY */static intCopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,                  char **buffer){    char       *copybuf = NULL;    int         rawlen;    if (*buffer != NULL)        PQfreemem(*buffer);    *buffer = NULL;    /* Try to receive a CopyData message */    rawlen = PQgetCopyData(conn, ©buf, 1);    if (rawlen == 0)    {        int         ret;        /*         * No data available.  Wait for some to appear, but not longer than         * the specified timeout, so that we can ping the server.  Also stop         * waiting if input appears on stop_socket.         */        ret = CopyStreamPoll(conn, timeout, stop_socket);        if (ret <= 0)            return ret;        /* Now there is actually data on the socket */        if (PQconsumeInput(conn) == 0)        {            fprintf(stderr,                    _("%s: could not receive data from WAL stream: %s"),                    progname, PQerrorMessage(conn));            return -1;        }        /* Now that we've consumed some input, try again */        rawlen = PQgetCopyData(conn, ©buf, 1);        if (rawlen == 0)            return 0;    }    if (rawlen == -1)           /* end-of-streaming or error */        return -2;    if (rawlen == -2)    {        fprintf(stderr, _("%s: could not read COPY data: %s"),                progname, PQerrorMessage(conn));        return -1;    }    /* Return received messages to caller */    *buffer = copybuf;    return rawlen;}

三、跟踪分析

备份命令

pg_basebackup -h 192.168.26.25 -U replicator -p 5432 -D /data/backup -P -Xs -R -v

启动gdb跟踪(跟踪fork的子进程)

[xdb@localhost ~]$ gdb pg_basebackupGNU gdb (GDB) Red Hat Enterprise Linux 7.6.1-100.el7Copyright (C) 2013 Free Software Foundation, Inc.License GPLv3+: GNU GPL version 3 or later This is free software: you are free to change and redistribute it.There is NO WARRANTY, to the extent permitted by law.  Type "show copying"and "show warranty" for details.This GDB was configured as "x86_64-redhat-linux-gnu".For bug reporting instructions, please see:...Reading symbols from /appdb/xdb/pg11.2/bin/pg_basebackup...done.(gdb) set args -h 192.168.26.25 -U replicator -p 5432 -D /data/backup -P -Xs -R -v(gdb) set follow-fork-mode child(gdb) b LogStreamerMainBreakpoint 1 at 0x403c51: file pg_basebackup.c, line 490.(gdb) rStarting program: /appdb/xdb/pg11.2/bin/pg_basebackup -h 192.168.26.25 -U replicator -p 5432 -D /data/backup -P -Xs -R -v[Thread debugging using libthread_db enabled]Using host libthread_db library "/lib64/libthread_db.so.1".Password: pg_basebackup: initiating base backup, waiting for checkpoint to completepg_basebackup: checkpoint completedpg_basebackup: write-ahead log start point: 0/5A000028 on timeline 16pg_basebackup: starting background WAL receiverpg_basebackup: created temporary replication slot "pg_basebackup_1604"[New process 2036][Thread debugging using libthread_db enabled]backup/backup_label          )Using host libthread_db library "/lib64/libthread_db.so.1".[Switching to Thread 0x7ffff7fe7840 (LWP 2036)]Breakpoint 1, LogStreamerMain (param=0x629db0) at pg_basebackup.c:490490     in_log_streamer = true;305153/305153 kB (100%), 1/1 tablespace                                          )pg_basebackup: write-ahead log end point: 0/5A0000F8pg_basebackup: waiting for background process to finish streaming ...(gdb)

输入参数

(gdb) n492     MemSet(&stream, 0, sizeof(stream));(gdb) p *param$1 = {bgconn = 0x62a280, startptr = 1509949440, xlog = "/data/backup/pg_wal", '\000' ,   sysidentifier = 0x61f1a0 "6666964067616600474", timeline = 16}(gdb)

设置StreamCtl结构体

(gdb) n493     stream.startpos = param->startptr;(gdb) 494     stream.timeline = param->timeline;(gdb) 495     stream.sysidentifier = param->sysidentifier;(gdb) 496     stream.stream_stop = reached_end_position;(gdb) 498     stream.stop_socket = bgpipe[0];(gdb) 502     stream.standby_message_timeout = standby_message_timeout;(gdb) 503     stream.synchronous = false;(gdb) 504     stream.do_sync = do_sync;(gdb) 505     stream.mark_done = true;(gdb) 506     stream.partial_suffix = NULL;(gdb) 507     stream.replication_slot = replication_slot;(gdb) 509     if (format == 'p')(gdb) 510         stream.walmethod = CreateWalDirectoryMethod(param->xlog, 0, do_sync);(gdb)

进入ReceiveXlogStream函数

(gdb) 514     if (!ReceiveXlogStream(param->bgconn, &stream))(gdb) stepReceiveXlogStream (conn=0x62a280, stream=0x7fffffffda30) at receivelog.c:458458     if (!CheckServerVersionForStreaming(conn))(gdb) (gdb) n472     if (stream->replication_slot != NULL)(gdb) p *stream$2 = {startpos = 1509949440, timeline = 16, sysidentifier = 0x61f1a0 "6666964067616600474",   standby_message_timeout = 10000, synchronous = false, mark_done = true, do_sync = true,   stream_stop = 0x403953 , stop_socket = 8, walmethod = 0x632b10, partial_suffix = 0x0,   replication_slot = 0x62a1e0 "pg_basebackup_1604"}(gdb)

判断系统标识符和时间线

(gdb) n474         reportFlushPosition = true;(gdb) 475         sprintf(slotcmd, "SLOT \"%s\" ", stream->replication_slot);(gdb) 486     if (stream->sysidentifier != NULL)(gdb) 489         res = PQexec(conn, "IDENTIFY_SYSTEM");(gdb) 490         if (PQresultStatus(res) != PGRES_TUPLES_OK)(gdb) 498         if (PQntuples(res) != 1 || PQnfields(res) < 3)(gdb) 506         if (strcmp(stream->sysidentifier, PQgetvalue(res, 0, 0)) != 0)(gdb) p PQgetvalue(res, 0, 0)$3 = 0x633500 "6666964067616600474"(gdb) n514         if (stream->timeline > atoi(PQgetvalue(res, 0, 1)))(gdb) 522         PQclear(res);(gdb) p PQgetvalue(res, 0, 1)$4 = 0x633514 "16"(gdb)

不存在时间线history文件,生成history文件

(gdb) n529     lastFlushPosition = stream->startpos;(gdb) 539         if (!existsTimeLineHistoryFile(stream))(gdb) 541             snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);(gdb) 542             res = PQexec(conn, query);(gdb) 543             if (PQresultStatus(res) != PGRES_TUPLES_OK)(gdb) 556             if (PQnfields(res) != 2 || PQntuples(res) != 1)(gdb) 564             writeTimeLineHistoryFile(stream,(gdb) 568             PQclear(res);(gdb)

调用START_REPLICATION命令初始化

(gdb) 575         if (stream->stream_stop(stream->startpos, stream->timeline, false))(gdb) n579         snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",(gdb) 581                  (uint32) (stream->startpos >> 32), (uint32) stream->startpos,(gdb) 579         snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",(gdb) 581                  (uint32) (stream->startpos >> 32), (uint32) stream->startpos,(gdb) 579         snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",(gdb) 583         res = PQexec(conn, query);(gdb) 584         if (PQresultStatus(res) != PGRES_COPY_BOTH)(gdb) 591         PQclear(res);(gdb)

执行命令,处理stream WAL,完成调用

595         if (res == NULL)(gdb) p *res$5 = {ntups = 0, numAttributes = 0, attDescs = 0x0, tuples = 0x0, tupArrSize = 0, numParameters = 0, paramDescs = 0x0,   resultStatus = PGRES_COMMAND_OK,   cmdStatus = "START_STREAMING\000\000\000\000\000\270\027u\367\377\177\000\000P/c\000\000\000\000\000CT\000\000\001", '\000' , "\200\000\000", binary = 0, noticeHooks = {noticeRec = 0x7ffff7b9eaa4 ,     noticeRecArg = 0x0, noticeProc = 0x7ffff7b9eaf9 , noticeProcArg = 0x0}, events = 0x0,   nEvents = 0, client_encoding = 0, errMsg = 0x0, errFields = 0x0, errQuery = 0x0, null_field = "", curBlock = 0x0,   curOffset = 0, spaceLeft = 0}(gdb) n608         if (PQresultStatus(res) == PGRES_TUPLES_OK)(gdb) 666         else if (PQresultStatus(res) == PGRES_COMMAND_OK)(gdb) 668             PQclear(res);(gdb) 676             if (stream->stream_stop(stoppos, stream->timeline, false))(gdb) 677                 return true;(gdb) 702 }(gdb) LogStreamerMain (param=0x629db0) at pg_basebackup.c:523523     if (!stream.walmethod->finish())(gdb)

到此,关于"PostgreSQL中ReceiveXlogStream有什么作用"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!

0