diff --git a/src/bin/pg_ctl/pg_build.h b/src/bin/pg_ctl/pg_build.h index 2a025b47b6a41339bf6e3dd75fd5590353387bd5..2a8f254f16395bacc11bec1bd98381be84260a46 100755 --- a/src/bin/pg_ctl/pg_build.h +++ b/src/bin/pg_ctl/pg_build.h @@ -68,4 +68,22 @@ extern bool libpqRotateCbmFile(PGconn* connObj, XLogRecPtr lsn); extern int fsync_fname(const char *fname, bool isdir); extern void fsync_pgdata(const char *pg_data); +typedef enum { + DEFAULT_REASON = 0, + CONN_PRIMARY_FAIL, + VERIFY_COMMIT_LSN_FAIL +} BuildFailReason; + +extern BuildFailReason g_inc_fail_reason; + + +typedef enum { + VERIFY_COMMIT_DISABLE = 0, + VERIFY_COMMIT_WORKING, + VERIFY_COMMIT_SUCCESS, + VERIFY_COMMIT_FAILED +} VerifyCommitStatus; + +extern VerifyCommitStatus verifyCommitStatus; + #endif /* PG_BUILD_H */ diff --git a/src/bin/pg_ctl/pg_ctl.cpp b/src/bin/pg_ctl/pg_ctl.cpp index d9e56ec312e08c4e37c3f1f266821829dec348f5..986dc5daf462b4d63bc4dcca2c3c98c0c6ef1cf3 100755 --- a/src/bin/pg_ctl/pg_ctl.cpp +++ b/src/bin/pg_ctl/pg_ctl.cpp @@ -137,11 +137,6 @@ typedef enum { CHANGE_OPERATION } MemberOperation; -typedef enum { - DEFAULT_REASON = 0, - CONN_PRIMARY_FAIL -} BuildFailReason; - #define MAX_PERCENT 100 #define FAIL_PERCENT -1 @@ -241,6 +236,7 @@ const int g_length_stop_char = 2; const int g_length_suffix = 3; const static int INC_BUILD_RETRY_TIMES = 3; BuildFailReason g_inc_fail_reason = DEFAULT_REASON; +VerifyCommitStatus verifyCommitStatus = VERIFY_COMMIT_DISABLE; bool g_is_obsmode = false; @@ -4254,6 +4250,10 @@ void ResetBuildInfo() connstr_source = NULL; } replication_type = RT_WITH_DUMMY_STANDBY; + + if (verifyCommitStatus != VERIFY_COMMIT_DISABLE) { + verifyCommitStatus = VERIFY_COMMIT_WORKING; + } } static bool DoIncBuild(uint32 term) @@ -4283,6 +4283,12 @@ static bool DoAutoBuild(uint32 term) pg_log(PG_WARNING, _("inc build failed due to primary connection failure, skip full build.\n")); return buildSuccess; } + if (g_inc_fail_reason == VERIFY_COMMIT_LSN_FAIL) { + pg_log(PG_WARNING, + _("inc build failed due to we find some commited transaction different from primary, " + "and you specify '--verify-commit'.\n")); + return buildSuccess; + } pg_log(PG_WARNING, _("inc build failed, change to full build.\n")); buildSuccess = do_actual_build(term); @@ -6502,6 +6508,7 @@ int main(int argc, char** argv) {"socketpath", required_argument, NULL, 6}, {"enable-dss", no_argument, NULL, 7}, {"dms_url", required_argument, NULL, 8}, + {"verify-commit", no_argument, NULL, 9}, {NULL, 0, NULL, 0}}; int option_index; @@ -6995,6 +7002,10 @@ int main(int argc, char** argv) break; } #endif + case 9: { + verifyCommitStatus = VERIFY_COMMIT_WORKING; + break; + } default: /* getopt_long already issued a suitable error message */ do_advice(); diff --git a/src/bin/pg_rewind/parsexlog.cpp b/src/bin/pg_rewind/parsexlog.cpp index cd44c20abeae5fabdf8ec8044cc98e2802156b9c..055482b3dec65a19fc0387cf875c49b54b9944dd 100644 --- a/src/bin/pg_rewind/parsexlog.cpp +++ b/src/bin/pg_rewind/parsexlog.cpp @@ -33,6 +33,20 @@ #define CONFIG_NODENAME "pgxc_node_name" #define INVALID_LINES_IDX (int)(~0) +/* + * same with access/xact.h + * XLOG allows to store some information in high 4 bits of log record xl_info + * field. We use 3 for the opcode, and one about an optional flag variable. + */ +#define XLOG_XACT_COMMIT 0x00 +#define XLOG_XACT_PREPARE 0x10 +#define XLOG_XACT_ABORT 0x20 +#define XLOG_XACT_COMMIT_PREPARED 0x30 +#define XLOG_XACT_ABORT_PREPARED 0x40 +#define XLOG_XACT_ASSIGNMENT 0x50 +#define XLOG_XACT_COMMIT_COMPACT 0x60 +#define XLOG_XACT_ABORT_WITH_XID 0x70 + static int xlogreadfd = -1; extern uint32 term; @@ -45,6 +59,7 @@ typedef struct XLogPageReadPrivate { static void extractPageInfo(XLogReaderState* record); static TimestampTz localGetCurrentTimestamp(void); bool checkCommonAncestorByXlog(XLogRecPtr recptr, pg_crc32 standby_reccrc, uint32 term = 0); +bool IncrementalVerifyCommit(XLogRecPtr recptr, pg_crc32 sCrc, uint32 term = 0); /* * Read WAL from the datadir/pg_xlog, starting from 'startpoint' on timeline @@ -231,6 +246,21 @@ BuildErrorCode findCommonCheckpoint(const char* datadir, TimeLineID tli, XLogRec return BUILD_FATAL; } + info = XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK; + + /* + * Chcek if '--verify-commit' is specified and if so, verify the last commit record. + */ + if (verifyCommitStatus == VERIFY_COMMIT_WORKING && + XLogRecGetRmid(xlogreader) == RM_XACT_ID && + (info == XLOG_XACT_COMMIT || info == XLOG_XACT_COMMIT_PREPARED || info == XLOG_XACT_COMMIT_COMPACT)) { + if (IncrementalVerifyCommit(xlogreader->ReadRecPtr, record->xl_crc, term)) { + verifyCommitStatus = VERIFY_COMMIT_SUCCESS; + } else { + verifyCommitStatus = VERIFY_COMMIT_FAILED; + } + } + /* * Check if it is a checkpoint record. This checkpoint record needs to * be the latest checkpoint before WAL forked and not the checkpoint @@ -238,7 +268,6 @@ BuildErrorCode findCommonCheckpoint(const char* datadir, TimeLineID tli, XLogRec * Check the Common LSN between primary and standby using 'IDENTIFY_CONSISTENCE' * The checkpoint should be already finished according to control file of both. */ - info = XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK; if (xlogreader->ReadRecPtr <= startrec && XLogRecGetRmid(xlogreader) == RM_XLOG_ID && (info == XLOG_CHECKPOINT_SHUTDOWN || info == XLOG_CHECKPOINT_ONLINE)) { if (checkCommonAncestorByXlog(xlogreader->ReadRecPtr, record->xl_crc, term) == true) { @@ -275,6 +304,8 @@ BuildErrorCode findCommonCheckpoint(const char* datadir, TimeLineID tli, XLogRec if (XLogRecPtrIsInvalid(searchptr)) { pg_log(PG_FATAL, "could not find any common checkpoint, must to do full build\n"); return BUILD_FATAL; + } else if (verifyCommitStatus == VERIFY_COMMIT_FAILED) { + return BUILD_FATAL; } return BUILD_SUCCESS; } @@ -424,6 +455,92 @@ bool checkCommonAncestorByXlog(XLogRecPtr recptr, pg_crc32 standby_reccrc, uint3 return false; } + +bool IncrementalVerifyCommit(XLogRecPtr recptr, pg_crc32 sCrc, uint32 term) +{ + char sql[128]; // just a enough buffer + const int lsnHighShift = 32; // lsn high 32 bits shift + PGconn* conn = NULL; + PGresult* res = NULL; + char* pCrcStr = NULL; + pg_crc32 pCrc = 0; + errno_t rc; + bool succ = true; + + pg_log(PG_PROGRESS, + "Incremental build verify commit at lsn:%X/%X, crc:%u.\n", + (uint32)(recptr >> lsnHighShift), + (uint32)recptr, + sCrc); + + /* find a available conn */ + conn = check_and_conn(standby_connect_timeout, standby_recv_timeout, term); + if (conn == NULL) { + pg_fatal("Incremental build verify commit failed, could not connect to server."); + succ = false; + goto end; + } + + /* quick exit when connection lost */ + if (PQstatus(conn) != CONNECTION_OK) { + pg_fatal("Incremental build verify commit failed, server connection lost."); + succ = false; + goto end; + } + + /* get primary node xlog crc at this lsn by IDENTIFY_CONSISTENCE */ + rc = snprintf_s(sql, sizeof(sql), sizeof(sql) - 1, + "IDENTIFY_CONSISTENCE %X/%X", (uint32)(recptr >> lsnHighShift), (uint32)recptr); + securec_check_ss_c(rc, "\0", "\0"); + res = PQexec(conn, sql); + if (PQresultStatus(res) != PGRES_TUPLES_OK) { + pg_log(PG_PROGRESS, + "Incremental build verify commit failed: %s\n", + PQerrorMessage(conn)); + succ = false; + goto end; + } + + /* due to gray upgrade, msg with 1 row of 2 or 3 columns are permitted. */ + if ((PQnfields(res) != 3 && PQnfields(res) != 2) || PQntuples(res) != 1) { + pg_log(PG_PROGRESS, + "Incremental build verify commit failed: IDENTIFY_CONSISTENCE got %d rows and %d fields\n", + PQntuples(res), + PQnfields(res)); + succ = false; + goto end; + } + + /* parse primary crc and compare with local crc. */ + pCrcStr = PQgetvalue(res, 0, 0); + if (pCrcStr && sscanf_s(pCrcStr, "%8X", &pCrc) != 1) { + pg_log(PG_PROGRESS, + "Incremental build verify commit failed: crc %s is invalid.\n", + pCrcStr); + succ = false; + goto end; + } + + if (pCrc != sCrc) { + pg_log(PG_PROGRESS, + "Incremental build verify commit failed: crc not matche, primary crc is %u.\n", + pCrc); + succ = false; + goto end; + } + + pg_log(PG_PROGRESS, "Incremental build verify commit success.\n"); + +end: + if (!succ) { + g_inc_fail_reason = VERIFY_COMMIT_LSN_FAIL; + } + PQclear(res); + PQfinish(conn); + return succ; +} + + bool TransLsn2XlogFileName(XLogRecPtr lsn, TimeLineID lastcommontli, char* xlogName) { XLogSegNo segno;