diff --git a/src/bin/gs_guc/cluster_guc.conf b/src/bin/gs_guc/cluster_guc.conf index 3041827732ea57ff021964c7addbd897356bc33b..b49b2d12908af1913c96087524251f121ec60878 100755 --- a/src/bin/gs_guc/cluster_guc.conf +++ b/src/bin/gs_guc/cluster_guc.conf @@ -661,6 +661,7 @@ pagewriter_thread_num|int|1,16|NULL|NULL| audit_thread_num|int|1,48|NULL|NULL| dw_file_num|int|1,16|NULL|NULL| dw_file_size|int|32,256|NULL|NULL| +parallel_recovery_dispatch_algorithm|int|1,2|NULL|NULL| incremental_checkpoint_timeout|int|1,3600|s|NULL| enable_incremental_checkpoint|bool|0,0|NULL|NULL| enable_double_write|bool|0,0|NULL|NULL| diff --git a/src/bin/pg_ctl/pg_ctl.cpp b/src/bin/pg_ctl/pg_ctl.cpp index 30e65e996e0c976b39c9d4655deda179f8ad0dc6..418c60e2c4ba98f38b258f3e013f483b215aec6d 100755 --- a/src/bin/pg_ctl/pg_ctl.cpp +++ b/src/bin/pg_ctl/pg_ctl.cpp @@ -116,7 +116,8 @@ typedef enum { CHANGE_ROLE_COMMAND, MINORITY_START_COMMAND, COPY_COMMAND, - GS_STACK_COMMAND + GS_STACK_COMMAND, + START_WALRCV_COMMAND } CtlCommand; typedef enum { @@ -379,6 +380,9 @@ extern int GetLengthAndCheckReplConn(const char* ConnInfoList); extern BuildErrorCode gs_increment_build(const char* pgdata, const char* connstr, char* sysidentifier, uint32 timeline, uint32 term); const char *BuildModeToString(BuildMode mode); static char* get_gausshome(); +static ServerMode get_mode_by_string(char *str_mode); +static DbState get_db_state_by_string(char *str_state); +#define MAXFIELDLEN 64 void check_input_for_security(char* input_env_value) { @@ -706,6 +710,7 @@ static ServerMode get_runmode(void) char* val = NULL; char run_mode[MAXRUNMODE] = {0}; GaussState state; + ServerMode return_mode; errno_t tnRet = EOK; conn = get_connectionex(); @@ -759,22 +764,9 @@ static ServerMode get_runmode(void) close_connection(); conn = NULL; - if (!strncmp(run_mode, "Normal", MAXRUNMODE)) - return NORMAL_MODE; - if (!strncmp(run_mode, "Primary", MAXRUNMODE)) - return PRIMARY_MODE; - if (!strncmp(run_mode, "Standby", MAXRUNMODE)) - return STANDBY_MODE; - if (!strncmp(run_mode, "Cascade Standby", MAXRUNMODE)) - return CASCADE_STANDBY_MODE; - if (!strncmp(run_mode, "Main Standby", MAXRUNMODE)) - return MAIN_STANDBY_MODE; - if (!strncmp(run_mode, "Pending", MAXRUNMODE)) - return PENDING_MODE; - if (!strncmp(run_mode, "Unknown", MAXRUNMODE)) - return UNKNOWN_MODE; + return_mode = get_mode_by_string(run_mode); - return UNKNOWN_MODE; + return return_mode; } static void freefile(char** lines) @@ -3754,6 +3746,7 @@ static void do_help(void) (void)printf(_(" %s hotpatch [-D DATADIR] [-a ACTION] [-n NAME]\n"), progname); #endif (void)printf(_(" %s stack [-D DATADIR] [-I lwtid]\n"), progname); + (void)printf(_(" %s startwalrcv [-w] [-t SECS] [-D DATADIR]\n"), progname); #ifndef ENABLE_MULTIPLE_NODES #ifndef ENABLE_LITE_MODE doDCFAddCmdHelp(); @@ -5863,6 +5856,236 @@ void do_gs_stack(void) return; } +ServerMode get_mode_by_string(char *str_mode) +{ + if (!strncmp(str_mode, "Normal", MAXRUNMODE)) + return NORMAL_MODE; + if (!strncmp(str_mode, "Primary", MAXRUNMODE)) + return PRIMARY_MODE; + if (!strncmp(str_mode, "Standby", MAXRUNMODE)) + return STANDBY_MODE; + if (!strncmp(str_mode, "Cascade Standby", MAXRUNMODE)) + return CASCADE_STANDBY_MODE; + if (!strncmp(str_mode, "Main Standby", MAXRUNMODE)) + return MAIN_STANDBY_MODE; + if (!strncmp(str_mode, "Pending", MAXRUNMODE)) + return PENDING_MODE; + if (!strncmp(str_mode, "Unknown", MAXRUNMODE)) + return UNKNOWN_MODE; + + return UNKNOWN_MODE; +} + +static DbState get_db_state_by_string(char *str_state) +{ + if (!strcmp(str_state, "Normal")) + return NORMAL_STATE; + if (!strcmp(str_state, "Unknown")) + return UNKNOWN_STATE; + if (!strcmp(str_state, "Need repair")) + return NEEDREPAIR_STATE; + if (!strcmp(str_state, "Starting")) + return STARTING_STATE; + if (!strcmp(str_state, "Wait promoting")) + return WAITING_STATE; + if (!strcmp(str_state, "Demoting")) + return DEMOTING_STATE; + if (!strcmp(str_state, "Promoting")) + return PROMOTING_STATE; + if (!strcmp(str_state, "Building")) + return BUILDING_STATE; + if (!strcmp(str_state, "Catchup")) + return CATCHUP_STATE; + if (!strcmp(str_state, "Coredump")) + return COREDUMP_STATE; + + return UNKNOWN_STATE; +} + +static void convert_pg_result(PGresult *res, ServerMode *run_mode, DbState *db_state, char *detail_info) +{ + char str_mode[MAXFIELDLEN] = {0}; + char str_state[MAXFIELDLEN] = {0}; + char* val = NULL; + errno_t nret = EOK; + + if ((val = PQgetvalue(res, 0, 0)) != NULL) { + nret = strncpy_s(str_mode, MAXFIELDLEN, val, strlen(val)); + securec_check_c(nret, "\0", "\0"); + str_mode[MAXFIELDLEN - 1] = '\0'; + *run_mode = get_mode_by_string(str_mode); + } + if ((val = PQgetvalue(res, 0, 1)) != NULL) { + nret = strncpy_s(str_state, MAXFIELDLEN, val, strlen(val)); + securec_check_c(nret, "\0", "\0"); + str_state[MAXFIELDLEN - 1] = '\0'; + *db_state = get_db_state_by_string(str_state); + } + if ((val = PQgetvalue(res, 0, 2)) != NULL) { + nret = strncpy_s(detail_info, MAXFIELDLEN, val, strlen(val)); + securec_check_c(nret, "\0", "\0"); + detail_info[MAXFIELDLEN - 1] = '\0'; + } +} + +static void get_mode_and_state(ServerMode *run_mode, DbState *db_state, char *detail_info) +{ +#define NFIELDS 3 +#define NTUPLES 1 + PGconn* pgconn = NULL; + PGresult* res = NULL; + const char* sql = "select local_role, db_state, detail_information from " + "pg_catalog.pg_stat_get_stream_replications();"; + GaussState state; + errno_t nret = EOK; + + pgconn = get_connectionex(); + if (PQstatus(pgconn) != CONNECTION_OK) { + nret = memset_s(&state, sizeof(state), 0, sizeof(state)); + securec_check_c(nret, "\0", "\0"); + pg_log(PG_PROGRESS, _("Getting state from gaussdb.state!\n")); + ReadDBStateFile(&state); + *run_mode = state.mode; + *db_state = state.state; + close_connection(); + pgconn = NULL; + return; + } + res = PQexec(pgconn, sql); + if (PQresultStatus(res) != PGRES_TUPLES_OK) { + PQclear(res); + pg_log(PG_WARNING, _("could not get state from the local server: %s"), PQerrorMessage(pgconn)); + close_connection(); + pgconn = NULL; + return; + } + if (PQnfields(res) != NFIELDS || PQntuples(res) != NTUPLES) { + int ntuples = PQntuples(res); + int nfields = PQnfields(res); + + PQclear(res); + pg_log(PG_WARNING, _("invalid response from standby server: " + "expected %d tuples with %d fields, got %d tuples with %d fields."), + NTUPLES, NFIELDS, ntuples, nfields); + close_connection(); + pgconn = NULL; + return; + } + convert_pg_result(res, run_mode, db_state, detail_info); + + PQclear(res); + close_connection(); + pgconn = NULL; + return; +} + +static void check_before_start_walrcv(pgpid_t pid) +{ + ServerMode run_mode; + DbState db_state; + char detail_info[MAXFIELDLEN] = {0}; + + if (pid == 0) { /* no pid file */ + pg_log(PG_WARNING, _(" PID file \"%s\" does not exist\n"), pid_file); + pg_log(PG_WARNING, _(" Is server running?\n")); + exit(1); + } else if (pid < 0) { /* standalone backend, not postmaster */ + pid = -pid; + pg_log(PG_WARNING, _(" cannot start walrcv;" + "single-user server is running (PID: %ld)\n"), pid); + exit(1); + } + + get_mode_and_state(&run_mode, &db_state, detail_info); + if (run_mode == UNKNOWN_MODE) { + pg_log(PG_WARNING, _(" cannot start walrcv: server mode is unknown\n")); + exit(1); + } else if (run_mode != STANDBY_MODE && run_mode != CASCADE_STANDBY_MODE && run_mode != MAIN_STANDBY_MODE) { + pg_log(PG_WARNING, _(" cannot start walrcv: server is not in standby or cascade standby mode\n")); + exit(1); + } + + if (db_state == STARTING_STATE) { + pg_log(PG_PROGRESS, _("try to start walrcv when standby is still starting\n")); + } else if (db_state == NEEDREPAIR_STATE && + (strcmp(detail_info, "Disconnected") == 0 || strcmp(detail_info, "Connecting...") == 0)) { + pg_log(PG_PROGRESS, _("try to start walrcv when standby is still redoing\n")); + } else if (db_state == PROMOTING_STATE && + (strcmp(detail_info, "Disconnected") == 0 || strcmp(detail_info, "Connecting...") == 0)) { + pg_log(PG_PROGRESS, _("try to start walrcv when main standby is still redoing\n")); + } else { + pg_log(PG_WARNING, _(" cannot start walrcv: server is already trying to start walrcv\n")); + exit(1); + } + + if (!do_wait) { + pg_log(PG_WARNING, _(" server starting walrcv\n")); + } else { + pg_log(PG_WARNING, _(" waiting for server to start walrcv\n")); + } +} + +void do_start_walrcv() +{ + int ret; + char flag_path[MAXPGPATH] = {0}; + FILE* fofile = NULL; + pgpid_t pid; + ServerMode run_mode; + DbState db_state; + char detail_info[MAXFIELDLEN] = {0}; + + pid = get_pgpid(); + check_before_start_walrcv(pid); + + ret = snprintf_s(flag_path, MAXPGPATH, MAXPGPATH - 1, "%s/preparse", pg_data); + securec_check_ss_c(ret, "\0", "\0"); + canonicalize_path(flag_path); + if ((fofile = fopen(flag_path, "w")) == NULL) { + pg_log(PG_WARNING, _(" could not open file successfully\n")); + exit(1); + } + if (fwrite(&wait_seconds, sizeof(int), 1, fofile) != 1) { + pg_log(PG_WARNING, _(" could not write file successfully\n")); + } + if (fclose(fofile)) { + pg_log(PG_WARNING, _(" file is closed\n")); + } + + fofile = NULL; + + for (int cnt = 0; ; cnt++) { + if (kill((pid_t)pid, SIGUSR1) != 0) { + pg_log(PG_WARNING, _(" could not send SIGUSR1 signal (PID: %ld): %s\n"), pid, strerror(errno)); + if (unlink(flag_path) != 0) { + pg_log(PG_WARNING, _(" could not remove file \"%s\": %s\n"), flag_path, strerror(errno)); + } + exit(1); + } + if (!do_wait) { + return; + } else if (cnt >= wait_seconds) { + break; + } + pg_log(PG_PRINT, "."); + pg_usleep(1000000); /* 1 sec */ + + get_mode_and_state(&run_mode, &db_state, detail_info); + if (db_state == NORMAL_STATE && strcmp(detail_info, "Normal") == 0) { + break; + } + } + if (db_state == NORMAL_STATE && strcmp(detail_info, "Normal") == 0) { + pg_log(PG_WARNING, _(" start walrcv completed (%s)\n"), pg_data); + } else if (db_state == STARTING_STATE) { + pg_log(PG_WARNING, _(" server is still starting (%s), refer to gs_log for more info\n"), pg_data); + } else { + pg_log(PG_WARNING, _(" start walrcv failed (%s), detail info: %s\n"), pg_data, detail_info); + } + + return; +} + static void Help(int argc, const char** argv) { /* support --help and --version even if invoked as root */ @@ -6952,6 +7175,8 @@ int main(int argc, char** argv) ctl_command = COPY_COMMAND; else if (strcmp(argv[optind], "stack") == 0) ctl_command = GS_STACK_COMMAND; + else if (strcmp(argv[optind], "startwalrcv") == 0) + ctl_command = START_WALRCV_COMMAND; else { pg_log(PG_WARNING, _(" unrecognized operation mode \"%s\"\n"), argv[optind]); do_advice(); @@ -7025,6 +7250,7 @@ int main(int argc, char** argv) case FAILOVER_COMMAND: case BUILD_COMMAND: case NOTIFY_COMMAND: + case START_WALRCV_COMMAND: do_wait = true; break; default: @@ -7300,6 +7526,13 @@ int main(int argc, char** argv) pg_log(PG_PROGRESS, _("Another gs_ctl command is still running, copy failed.\n")); } break; + case START_WALRCV_COMMAND: + if (-1 != pg_ctl_lock(pg_ctl_lockfile, &lockfile)) { + do_start_walrcv(); + (void)pg_ctl_unlock(lockfile); + } else { + pg_log(PG_PROGRESS, _("Another gs_ctl command is still running, start walrcv failed.\n")); + } default: break; } diff --git a/src/common/backend/catalog/builtin_funcs.ini b/src/common/backend/catalog/builtin_funcs.ini index 3b8a187d12d7af4aa59b29e7c7d75702e1713849..1368203829c191d3e117eb4df0301782a77aa231 100755 --- a/src/common/backend/catalog/builtin_funcs.ini +++ b/src/common/backend/catalog/builtin_funcs.ini @@ -2412,6 +2412,10 @@ "disable_sql_patch", 1, AddBuiltinFunc(_0(9062), _1("disable_sql_patch"), _2(1), _3(false), _4(false), _5(disable_sql_patch), _6(16), _7(DBE_SQL_UTIL_NAMESPACE), _8(BOOTSTRAP_SUPERUSERID), _9(INTERNALlanguageId), _10(1), _11(0), _12(0), _13(0), _14(false), _15(false), _16(false), _17(false), _18('v'), _19(0), _20(1, 19), _21(NULL), _22(NULL), _23(NULL), _24(NULL), _25("disable_sql_patch"), _26(NULL), _27(NULL), _28(NULL), _29(0), _30(false), _31(false), _32(false), _33(NULL), _34('f'), _35(NULL), _36(0), _37(false), _38(NULL), _39(NULL), _40(0)) ), + AddFuncGroup( + "dispatch_stat_detail", 1, + AddBuiltinFunc(_0(4395), _1("dispatch_stat_detail"), _2(0), _3(false), _4(true), _5(dispatch_stat_detail), _6(2249), _7(PG_CATALOG_NAMESPACE), _8(BOOTSTRAP_SUPERUSERID), _9(INTERNALlanguageId), _10(1), _11(1000), _12(0), _13(0), _14(false), _15(false), _16(false), _17(false), _18('s'), _19(0), _20(0), _21(5, 25, 20, 23, 700, 25), _22(5, 'o', 'o', 'o', 'o', 'o'), _23(5, "thread_name", "pid", "pending_count", "ratio", "detail"), _24(NULL), _25("dispatch_stat_detail"), _26(NULL), _27(NULL), _28(NULL), _29(0), _30(false), _31(false), _32(false), _33(NULL), _34('f'), _35(NULL), _36(0), _37(false), _38(NULL), _39(NULL), _40(0)) + ), AddFuncGroup( "dispell_init", 1, AddBuiltinFunc(_0(3731), _1("dispell_init"), _2(1), _3(true), _4(false), _5(dispell_init), _6(2281), _7(PG_CATALOG_NAMESPACE), _8(BOOTSTRAP_SUPERUSERID), _9(INTERNALlanguageId), _10(1), _11(0), _12(0), _13(0), _14(false), _15(false), _16(false), _17(false), _18('i'), _19(0), _20(1, 2281), _21(NULL), _22(NULL), _23(NULL), _24(NULL), _25("dispell_init"), _26(NULL), _27(NULL), _28(NULL), _29(0), _30(false), _31(NULL), _32(false), _33("(internal)"), _34('f'), _35(NULL), _36(0), _37(false), _38(NULL), _39(NULL), _40(0)) @@ -3743,6 +3747,10 @@ "gs_is_recycle_obj", 1, AddBuiltinFunc(_0(4896), _1("gs_is_recycle_obj"), _2(3), _3(false), _4(false), _5(gs_is_recycle_obj), _6(16), _7(PG_CATALOG_NAMESPACE), _8(BOOTSTRAP_SUPERUSERID), _9(INTERNALlanguageId), _10(1), _11(0), _12(0), _13(0), _14(false), _15(false), _16(false), _17(false), _18('s'), _19(0), _20(3, 26, 26, 19), _21(4, 26, 26, 19, 16), _22(4, 'i', 'i', 'i', 'o'), _23(4, "classid", "objid", "objname", "output_result"), _24(NULL), _25("gs_is_recycle_obj"), _26(NULL), _27(NULL), _28(NULL), _29(0), _30(false), _31(NULL), _32(false), _33(NULL), _34('f'), _35(NULL), _36(0), _37(false), _38(NULL), _39(NULL), _40(0)) ), + AddFuncGroup( + "gs_lwlock_status", 1, + AddBuiltinFunc(_0(8888), _1("gs_lwlock_status"), _2(0), _3(false), _4(true), _5(gs_lwlock_status), _6(2249), _7(PG_CATALOG_NAMESPACE), _8(BOOTSTRAP_SUPERUSERID), _9(INTERNALlanguageId), _10(1), _11(1000), _12(0), _13(0), _14(false), _15(false), _16(false), _17(false), _18('s'), _19(0), _20(0), _21(9, 25, 25, 20, 20, 20, 25, 25, 16, 1184), _22(9, 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o', 'o'), _23(9, "node_name", "lock_name", "lock_unique_id", "pid", "sessionid", "global_sessionid","mode", "granted", "start_time"), _24(NULL), _25("gs_lwlock_status"), _26(NULL), _27(NULL), _28(NULL), _29(0), _30(false), _31(false), _32(false), _33("View system lwlock information"), _34('f'), _35(NULL), _36(0), _37(false), _38(NULL), _39(NULL), _40(0)) + ), AddFuncGroup( "gs_parse_page_bypath", 1, AddBuiltinFunc(_0(2620), _1("gs_parse_page_bypath"), _2(4), _3(true), _4(false), _5(gs_parse_page_bypath), _6(25), _7(PG_CATALOG_NAMESPACE), _8(BOOTSTRAP_SUPERUSERID), _9(INTERNALlanguageId), _10(1), _11(0), _12(0), _13(0), _14(false), _15(false), _16(false), _17(false), _18('s'), _19(0), _20(4, 25, 20, 25, 16), _21(5, 25, 20, 25, 16, 25), _22(5, 'i', 'i', 'i', 'i', 'o'), _23(5, "path", "blocknum", "relation_type", "read_memory", "output_filepath"), _24(NULL), _25("gs_parse_page_bypath"), _26(NULL), _27(NULL), _28(NULL), _29(0), _30(false), _31(NULL), _32(false), _33("parse data page to output file based on given filepath"), _34('f'), _35(NULL), _36(0), _37(false), _38(NULL), _39(NULL), _40(0)) diff --git a/src/common/backend/parser/parse_coerce.cpp b/src/common/backend/parser/parse_coerce.cpp index 32866fc420aefdb0f7d60b8fe234d75e95d8ba80..337554d357f24fba1b17746fe52d59bcf07b2033 100644 --- a/src/common/backend/parser/parse_coerce.cpp +++ b/src/common/backend/parser/parse_coerce.cpp @@ -223,10 +223,10 @@ Node* coerce_to_target_charset(Node* expr, int target_charset, Oid target_type, /* construct a convert FuncExpr */ const char* expr_charset_name = pg_encoding_to_char(exprcharset); const char* target_charset_name = pg_encoding_to_char(target_charset); - cons = makeConst(NAMEOID, -1, InvalidOid, sizeof(const char*), NameGetDatum(expr_charset_name), false, true); + cons = makeConst(NAMEOID, -1, InvalidOid, -2, NameGetDatum(expr_charset_name), false, false); args = lappend(args, cons); - cons = makeConst(NAMEOID, -1, InvalidOid, sizeof(const char*), NameGetDatum(target_charset_name), false, true); + cons = makeConst(NAMEOID, -1, InvalidOid, -2, NameGetDatum(target_charset_name), false, false); args = lappend(args, cons); fexpr = makeFuncExpr(CONVERTFUNCOID, TEXTOID, args, target_collation, InvalidOid, COERCE_IMPLICIT_CAST); diff --git a/src/common/backend/utils/adt/pgstatfuncs.cpp b/src/common/backend/utils/adt/pgstatfuncs.cpp index 4893c2777c1eb6a6f21cdbb7d715bcefac58cfce..5d3e7728894352c09c5d8d25ac1cf3e949572edc 100644 --- a/src/common/backend/utils/adt/pgstatfuncs.cpp +++ b/src/common/backend/utils/adt/pgstatfuncs.cpp @@ -78,6 +78,15 @@ #include "storage/lock/lock.h" #include "nodes/makefuncs.h" #include "ddes/dms/ss_dms_bufmgr.h" +#include "storage/file/fio_device.h" +#include "ddes/dms/ss_dms_recovery.h" +#include "utils/json.h" +#include "utils/jsonapi.h" +#include "access/ondemand_extreme_rto/page_redo.h" +#ifdef ENABLE_HTAP +#include "access/htap/imcucache_mgr.h" +#endif +#include "access/parallel_recovery/dispatcher.h" #define UINT32_ACCESS_ONCE(var) ((uint32)(*((volatile uint32*)&(var)))) #define NUM_PG_LOCKTAG_ID 12 @@ -13817,6 +13826,46 @@ Datum remote_double_write_stat(PG_FUNCTION_ARGS) #endif } +Datum dispatch_stat_detail(PG_FUNCTION_ARGS) +{ + TupleDesc tupdesc; + Tuplestorestate *tupstore = BuildTupleResult(fcinfo, &tupdesc); + const uint32 dispatch_stat_detail_cols = 5; + Datum values[dispatch_stat_detail_cols] = {0}; + bool nulls[dispatch_stat_detail_cols] = {0}; + parallel_recovery::DispatchStat *stat = NULL; + uint32 realNum = 0; + + if (IsParallelRedo() && RecoveryInProgress()) { + parallel_recovery::get_dispatch_stat_detail(&stat, &realNum); + } + + for (uint32 i=0; i 0) { if (*++s == '\0') { @@ -1447,6 +1469,11 @@ static int pg_gbk_verifier(const unsigned char* s, int len) return -1; } + // when mbl is 2 + if (l == 2 && s[0] == NONUTF8_INVALID_BYTE0 && s[1] == NONUTF8_INVALID_BYTE1) { + return -1; + } + while (--l > 0) { if (*++s == '\0') { return -1; @@ -1488,6 +1515,11 @@ static int pg_uhc_verifier(const unsigned char* s, int len) return -1; } + // when mbl is 2 + if (l == 2 && s[0] == NONUTF8_INVALID_BYTE0 && s[1] == NONUTF8_INVALID_BYTE1) { + return -1; + } + while (--l > 0) { if (*++s == '\0') { return -1; @@ -1507,6 +1539,11 @@ static int pg_gb18030_verifier(const unsigned char* s, int len) return -1; } + // when mbl is 2 + if (l == 2 && s[0] == NONUTF8_INVALID_BYTE0 && s[1] == NONUTF8_INVALID_BYTE1) { + return -1; + } + while (--l > 0) { if (*++s == '\0') { return -1; @@ -1608,6 +1645,18 @@ bool pg_utf8_islegal(const unsigned char* source, int length) return true; } +/* + * Fills the provided buffer with two bytes such that: + * pg_encoding_mblen(dst) == 2 && pg_encoding_verifymbstr(dst) == 0 + */ +void pg_encoding_set_invalid(int encoding, char *dst) +{ + Assert(pg_encoding_max_length(encoding) > 1); + + dst[0] = (encoding == PG_UTF8) ? 0xc0 : NONUTF8_INVALID_BYTE0; + dst[1] = NONUTF8_INVALID_BYTE1; +} + #ifndef FRONTEND /* @@ -2041,14 +2090,70 @@ int pg_encoding_verifymb(int encoding, const char* mbstr, int len) : ((*pg_wchar_table[PG_SQL_ASCII].mbverify)((const unsigned char*)mbstr, len))); } +int pg_encoding_verifymbchar(int encoding, const char* mbstr, int len) +{ + int ok_bytes = pg_encoding_verifymb(encoding, mbstr, len); + if (ok_bytes == 0) { + return -1; + } + + return ok_bytes; +} + +int pg_encoding_verifymbstr(int encoding, const char* mbstr, int len) +{ + mbverifier mbverify; + int ok_bytes; + + Assert(PG_VALID_ENCODING(encoding)); + + if (pg_encoding_max_length(encoding) <= 1) { + const char* nullpos = (const char*)memchr(mbstr, 0, len); + + if (nullpos == NULL) { + return len; + } + + return nullpos - mbstr; + } + + mbverify = pg_wchar_table[encoding].mbverify; + ok_bytes = 0; + + while (len > 0) { + int l; + + if (!IS_HIGHBIT_SET(*mbstr)) { + if (*mbstr != '\0') { + ok_bytes++; + mbstr++; + len--; + continue; + } + + return ok_bytes; + } + + l = (*mbverify)((const unsigned char*)mbstr, len); + + if (l < 0) { + return ok_bytes; + } + + mbstr += l; + len -= l; + ok_bytes += l; + } + + return ok_bytes; +} + /* * fetch maximum length of a given encoding */ int pg_encoding_max_length(int encoding) { - Assert(PG_VALID_ENCODING(encoding)); - - return pg_wchar_table[encoding].maxmblen; + return PG_VALID_ENCODING(encoding) ? pg_wchar_table[encoding].maxmblen : pg_wchar_table[PG_SQL_ASCII].maxmblen; } #ifdef WIN32 diff --git a/src/common/backend/utils/misc/guc/guc_storage.cpp b/src/common/backend/utils/misc/guc/guc_storage.cpp index 2171bf9931b5de510b2b0cd5122aa968556bcdbf..35a3a526166a0963ef7be56f6ae3bbd7660bce09 100755 --- a/src/common/backend/utils/misc/guc/guc_storage.cpp +++ b/src/common/backend/utils/misc/guc/guc_storage.cpp @@ -1470,6 +1470,19 @@ static void InitStorageConfigureNamesInt() NULL, NULL, NULL}, + {{"parallel_recovery_dispatch_algorithm", + PGC_POSTMASTER, + NODE_SINGLENODE, + RESOURCES_RECOVERY, + gettext_noop("wal dispatch algorithm. It valid only when enable_batch_dispatch=on"), + NULL}, + &g_instance.attr.attr_storage.parallel_recovery_dispatch_algorithm, + 2, + 1, + 2, + NULL, + NULL, + NULL}, {{"basebackup_timeout", PGC_USERSET, NODE_SINGLENODE, diff --git a/src/common/interfaces/libpq/fe-exec.cpp b/src/common/interfaces/libpq/fe-exec.cpp index 6e75ff1f88c0f89ef312011639368dfb6b5b7268..6f29f7ea9727d53d8e4006359184abf9913c1cfd 100644 --- a/src/common/interfaces/libpq/fe-exec.cpp +++ b/src/common/interfaces/libpq/fe-exec.cpp @@ -3452,15 +3452,16 @@ static size_t PQescapeStringInternal( { const char* source = from; char* target = to; - size_t remaining = length; + size_t remaining = strnlen(from, length); + bool already_complained = false; if (error != NULL) { *error = 0; } - while (remaining > 0 && *source != '\0') { + while (remaining > 0) { char c = *source; - int len; + int charlen; int i; /* Fast path for plain ASCII */ @@ -3476,39 +3477,63 @@ static size_t PQescapeStringInternal( } /* Slow path for possible multibyte characters */ - len = pg_encoding_mblen(encoding, source); - - /* Copy the character */ - for (i = 0; i < len; i++) { - if (remaining == 0 || *source == '\0') { - break; - } - *target++ = *source++; - remaining--; - } - - /* - * If we hit premature end of string (ie, incomplete multibyte - * character), try to pad out to the correct length with spaces. We - * may not be able to pad completely, but we will always be able to - * insert at least one pad space (since we'd not have quoted a - * multibyte character). This should be enough to make a string that - * the server will error out on. - */ - if (i < len) { - if (error != NULL) { + charlen = pg_encoding_mblen(encoding, source); + if (remaining < size_t(charlen) || pg_encoding_verifymbchar(encoding, source, charlen) == -1) { + /* + * Multibyte character is invalid. It's important to verify that + * as invalid multibyte characters could e.g. be used to "skip" + * over quote characters, e.g. when parsing + * character-by-character. + * + * Report an error if possible, and replace the character's first + * byte with an invalid sequence. The invalid sequence ensures + * that the escaped string will trigger an error on the + * server-side, even if we can't directly report an error here. + * + * This isn't *that* crucial when we can report an error to the + * caller; but if we can't or the caller ignores it, the caller + * will use this string unmodified and it needs to be safe for + * parsing. + * + * We know there's enough space for the invalid sequence because + * the "to" buffer needs to be at least 2 * length + 1 long, and + * at worst we're replacing a single input byte with two invalid + * bytes. + * + * It would be a bit faster to verify the whole string the first + * time we encounter a set highbit, but this way we can replace + * just the invalid data, which probably makes it easier for users + * to find the invalidly encoded portion of a larger string. + */ + if (error) { *error = 1; } - if (conn != NULL) { - printfPQExpBuffer(&conn->errorMessage, libpq_gettext("incomplete multibyte character\n")); - } - for (; i < len; i++) { - if (((size_t)(target - to)) / 2 >= length) { - break; + if (conn && !already_complained) { + if (remaining < (size_t)charlen) { + printfPQExpBuffer(&conn->errorMessage, libpq_gettext("incomplete multibyte character\n")); + } else { + printfPQExpBuffer(&conn->errorMessage, libpq_gettext("invalid multibyte character\n")); } - *target++ = ' '; + /* Issue a complaint only once per string */ + already_complained = true; + } + + pg_encoding_set_invalid(encoding, target); + target += 2; + + /* + * Handle the following bytes as if this byte didn't exist. That's + * safer in case the subsequent bytes contain important characters + * for the caller (e.g. '>' in html). + */ + source++; + remaining--; + } else { + /* Copy the character */ + for (i = 0; i < charlen; i++) { + *target++ = *source++; + remaining--; } - break; } } @@ -3550,17 +3575,22 @@ static char* PQescapeInternal(PGconn* conn, const char* str, size_t len, bool as int num_quotes = 0; /* single or double, depending on as_ident */ int num_backslashes = 0; int rcs = 0; - int input_len; - int result_size; + size_t input_len = strnlen(str, len); + size_t result_size; char quote_char = as_ident ? '"' : '\''; + bool validated_mb = false; /* We must have a connection, else fail immediately. */ if (conn == NULL) { return NULL; } - /* Scan the string for characters that must be escaped. */ - for (s = str; (size_t)(s - str) < len && *s != '\0'; ++s) { + /* + * Scan the string for characters that must be escaped and for invalidly + * encoded data. + */ + s = str; + for (size_t remaining = input_len; remaining > 0; remaining--, s++) { if (*s == quote_char) { ++num_quotes; } else if (*s == '\\') { @@ -3572,13 +3602,32 @@ static char* PQescapeInternal(PGconn* conn, const char* str, size_t len, bool as charlen = pg_encoding_mblen(conn->client_encoding, s); /* Multibyte character overruns allowable length. */ - if ((size_t)(s - str) + charlen > len || memchr(s, 0, charlen) != NULL) { + if ((size_t)charlen > remaining) { printfPQExpBuffer(&conn->errorMessage, libpq_gettext("incomplete multibyte character\n")); return NULL; } + /* + * If we haven't already, check that multibyte characters are + * valid. It's important to verify that as invalid multi-byte + * characters could e.g. be used to "skip" over quote characters, + * e.g. when parsing character-by-character. + * + * We check validity once, for the whole remainder of the string, + * when we first encounter any multi-byte character. Some + * encodings have optimized implementations for longer strings. + */ + if (!validated_mb) { + if ((size_t)(pg_encoding_verifymbstr(conn->client_encoding, s, remaining)) != remaining) { + printfPQExpBuffer(&conn->errorMessage, libpq_gettext("invalid multibyte character\n")); + return NULL; + } + validated_mb = true; + } + /* Adjust s, bearing in mind that for loop will increment it. */ s += charlen - 1; + remaining -= charlen - 1; } } @@ -3621,11 +3670,12 @@ static char* PQescapeInternal(PGconn* conn, const char* str, size_t len, bool as * individually. */ if (num_quotes == 0 && (num_backslashes == 0 || as_ident) && (input_len > 0)) { - rcs = memcpy_s(rp, input_len, str, input_len); + rcs = memcpy_s(rp, result_size, str, input_len); securec_check_c(rcs, "\0", "\0"); rp += input_len; } else { - for (s = str; s - str < input_len; ++s) { + s = str; + for (size_t remaining = input_len; remaining > 0; remaining--, s++) { if (*s == quote_char || (!as_ident && *s == '\\')) { *rp++ = *s; *rp++ = *s; @@ -3639,6 +3689,7 @@ static char* PQescapeInternal(PGconn* conn, const char* str, size_t len, bool as if (--i == 0) { break; } + remaining--; ++s; /* for loop will provide the final increment */ } } diff --git a/src/gausskernel/process/postmaster/barrier_preparse.cpp b/src/gausskernel/process/postmaster/barrier_preparse.cpp index c3e86df3ab27bf6986af4c8f972ea7e38bab2628..4597d3db552cc603bfc20ab3e6aad3e22948f2d9 100644 --- a/src/gausskernel/process/postmaster/barrier_preparse.cpp +++ b/src/gausskernel/process/postmaster/barrier_preparse.cpp @@ -45,7 +45,7 @@ typedef struct XLogPageReadPrivate { } XLogPageReadPrivate; #define NEED_INSERT_INTO_HASH \ - ((record->xl_rmid == RM_BARRIER_ID) && ((info == XLOG_BARRIER_SWITCHOVER) || \ + (IS_MULTI_DISASTER_RECOVER_MODE && (record->xl_rmid == RM_BARRIER_ID) && ((info == XLOG_BARRIER_SWITCHOVER) || \ (IS_PGXC_COORDINATOR && info == XLOG_BARRIER_COMMIT) || (IS_PGXC_DATANODE && info == XLOG_BARRIER_CREATE))) static void InitBarrierHash() @@ -95,6 +95,65 @@ static void SetBarrieID(const char *barrierId, XLogRecPtr lsn) } } +static void request_wal_stream_for_preparse(XLogRecPtr recptr) +{ + if (t_thrd.barrier_preparse_cxt.shutdown_requested) { + return; + } + if (!WalRcvInProgress() && g_instance.pid_cxt.WalReceiverPID == 0) { + volatile WalRcvData *walrcv = t_thrd.walreceiverfuncs_cxt.WalRcv; + SpinLockAcquire(&walrcv->mutex); + walrcv->receivedUpto = 0; + SpinLockRelease(&walrcv->mutex); + if (t_thrd.xlog_cxt.readFile >= 0) { + (void)close(t_thrd.xlog_cxt.readFile); + t_thrd.xlog_cxt.readFile = -1; + } + + ereport(LOG, (errmsg("[BarrierPreParse] preparse thread request xlog streaming"))); + if (t_thrd.xlog_cxt.is_cascade_standby) { + RequestXLogStreaming(&recptr, NULL, REPCONNTARGET_STANDBY, NULL, true); + } else { + RequestXLogStreaming(&recptr, NULL, REPCONNTARGET_PRIMARY, NULL, true); + } + } +} + +static XLogRecPtr get_preparse_start_lsn() +{ + XLogRecPtr start_lsn = InvalidXLogRecPtr; + const uint32 shift_size = 32; + + while (XLogRecPtrIsInvalid(start_lsn)) { + GetXLogReplayRecPtr(NULL, &start_lsn); + if (t_thrd.barrier_preparse_cxt.shutdown_requested) { + return start_lsn; + } + } + ereport(LOG, (errmsg("[BarrierPreParse] preparse thread start at %08X/%08X", + (uint32)(start_lsn >> shift_size), (uint32)start_lsn))); + return start_lsn; +} + +static bool check_preparse_run_time(TimestampTz *start_time) +{ + long secs; + int usecs; + + if (!IS_MULTI_DISASTER_RECOVER_MODE && g_instance.csn_barrier_cxt.max_run_time > 0) { + if (*start_time < 0) { + *start_time = GetCurrentTimestamp(); + } + TimestampDifference(*start_time, GetCurrentTimestamp(), &secs, &usecs); + if (secs >= g_instance.csn_barrier_cxt.max_run_time) { + g_instance.csn_barrier_cxt.max_run_time = 0; + return true; + } + } + + return false; +} + static void BarrierPreParseSigHupHandler(SIGNAL_ARGS) { int save_errno = errno; @@ -168,9 +227,80 @@ void SetBarrierPreParseLsn(XLogRecPtr startptr) SpinLockRelease(&walrcv->mutex); } +bool check_preparse_result(XLogRecPtr *recptr) +{ + if (t_thrd.barrier_preparse_cxt.shutdown_requested) { + return false; + } + if (XLogRecPtrIsInvalid(g_instance.csn_barrier_cxt.latest_valid_record)) { + XLogRecPtr lastReplayRecPtr = InvalidXLogRecPtr; + (void)GetXLogReplayRecPtr(NULL, &lastReplayRecPtr); + if (XLogRecPtrIsInvalid(lastReplayRecPtr)) { + *recptr = lastReplayRecPtr; + } + return false; + } + if (!WalRcvInProgress() && g_instance.pid_cxt.WalReceiverPID == 0) { + return true; + } + return true; +} + +void RequestXLogStreamForBarrier() +{ + XLogRecPtr replayEndPtr = GetXLogReplayRecPtr(NULL); + if (t_thrd.xlog_cxt.is_cascade_standby && (CheckForSwitchoverTrigger() || CheckForFailoverTrigger())) { + HandleCascadeStandbyPromote(&replayEndPtr); + return; + } + if (!WalRcvInProgress() && g_instance.pid_cxt.WalReceiverPID == 0) { + volatile WalRcvData *walrcv = t_thrd.walreceiverfuncs_cxt.WalRcv; + SpinLockAcquire(&walrcv->mutex); + walrcv->receivedUpto = 0; + SpinLockRelease(&walrcv->mutex); + if (t_thrd.xlog_cxt.readFile >= 0) { + (void)close(t_thrd.xlog_cxt.readFile); + t_thrd.xlog_cxt.readFile = -1; + } + + RequestXLogStreaming(&replayEndPtr, t_thrd.xlog_cxt.PrimaryConnInfo, REPCONNTARGET_PRIMARY, + u_sess->attr.attr_storage.PrimarySlotName); + } +} + +void check_exit_preparse_conditions(XLogReaderState *xlogreader, TimestampTz *start_time, + XLogRecPtr *start_lsn, int try_time) +{ + int rc; + const int max_try_times = 20; + + if (check_preparse_result(start_lsn)) { + request_wal_stream_for_preparse(*start_lsn); + } + + if (!IS_MULTI_DISASTER_RECOVER_MODE && g_instance.csn_barrier_cxt.max_run_time == 0 && try_time > max_try_times) { + ereport(LOG, (errmsg("[BarrierPreParse] preparse thread shut down"))); + XLogReaderFree(xlogreader); + proc_exit(0); + } + + if (check_preparse_run_time(start_time)) { + ereport(LOG, (errmsg("[BarrierPreParse] preparse thread shut down"))); + XLogReaderFree(xlogreader); + proc_exit(0); + } + + const long sleepTime = 1000; + rc = WaitLatch(&t_thrd.proc->procLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, sleepTime); + if (((unsigned int)rc) & WL_POSTMASTER_DEATH) { + XLogReaderFree(xlogreader); + ereport(LOG, (errmsg("[BarrierPreParse] preparse thread shut down with code 1"))); + gs_thread_exit(1); + } +} + void BarrierPreParseMain(void) { - volatile WalRcvData *walrcv = t_thrd.walreceiverfuncs_cxt.WalRcv; MemoryContext preParseContext; XLogRecord *record = NULL; XLogReaderState *xlogreader = NULL; @@ -182,7 +312,11 @@ void BarrierPreParseMain(void) XLogRecPtr barrierLSN = InvalidXLogRecPtr; char *xLogBarrierId = NULL; char barrierId[MAX_BARRIER_ID_LENGTH] = {0}; - const uint32 shiftSize = 32; + TimestampTz start_time = -1; + int try_time = 0; + XLogRecPtr latest_valid_record; + pg_crc32 latest_record_crc; + uint32 latest_record_len; int rc; ereport(LOG, (errmsg("[BarrierPreParse] barrier preparse thread started"))); @@ -226,11 +360,10 @@ void BarrierPreParseMain(void) g_instance.proc_base->BarrierPreParseLatch = &t_thrd.proc->procLatch; - startLSN = walrcv->lastReceivedBarrierLSN; - ereport(LOG, (errmsg("[BarrierPreParse] preparse thread start at %08X/%08X", (uint32)(startLSN >> shiftSize), - (uint32)startLSN))); + startLSN = get_preparse_start_lsn(); + g_instance.csn_barrier_cxt.preparseStartLocation = startLSN; - if (g_instance.csn_barrier_cxt.barrier_hash_table == NULL) { + if (IS_MULTI_DISASTER_RECOVER_MODE && g_instance.csn_barrier_cxt.barrier_hash_table == NULL) { InitBarrierHash(); } @@ -260,13 +393,7 @@ void BarrierPreParseMain(void) proc_exit(0); /* done */ } - if (XLogRecPtrIsInvalid(startLSN)) { - ereport(ERROR, (errmsg("[BarrierPreParse] startLSN is invalid"))); - } - preStartLSN = startLSN; - ereport(DEBUG1, (errmsg("[BarrierPreParse] start to preparse at: %08X/%08X", - (uint32)(startLSN >> shiftSize), (uint32)startLSN))); startLSN = XLogFindNextRecord(xlogreader, startLSN); if (XLogRecPtrIsInvalid(startLSN)) { startLSN = preStartLSN; @@ -287,6 +414,9 @@ void BarrierPreParseMain(void) break; } lastReadLSN = xlogreader->ReadRecPtr; + latest_valid_record = lastReadLSN; + latest_record_crc = record->xl_crc; + latest_record_len = record->xl_tot_len; uint8 info = XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK; if (NEED_INSERT_INTO_HASH) { xLogBarrierId = XLogRecGetData(xlogreader); @@ -311,18 +441,19 @@ void BarrierPreParseMain(void) CloseXlogFile(); XLogReaderInvalReadState(xlogreader); + g_instance.csn_barrier_cxt.latest_valid_record = latest_valid_record; + g_instance.csn_barrier_cxt.latest_record_crc = latest_record_crc; + g_instance.csn_barrier_cxt.latest_record_len = latest_record_len; + startLSN = XLogRecPtrIsInvalid(lastReadLSN) ? preStartLSN : lastReadLSN; if (XLogRecPtrIsInvalid(xlogreader->ReadRecPtr) && errormsg) { ereport(LOG, (errmsg("[BarrierPreParse] preparse thread get an error info %s", errormsg))); } - const long sleepTime = 1000; - rc = WaitLatch(&t_thrd.proc->procLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, sleepTime); - if (((unsigned int)rc) & WL_POSTMASTER_DEATH) { - XLogReaderFree(xlogreader); - ereport(LOG, (errmsg("[BarrierPreParse] preparse thread shut down with code 1"))); - gs_thread_exit(1); - } + g_instance.csn_barrier_cxt.preparseEndLocation = startLSN; + + try_time++; + check_exit_preparse_conditions(xlogreader, &start_time, &startLSN, try_time); } } diff --git a/src/gausskernel/process/postmaster/pgstat.cpp b/src/gausskernel/process/postmaster/pgstat.cpp index 1933054d189e71b70e63de50634b12be32f52f36..ca1e8866ffd8c839f28ab9889bf44af3e2785503 100644 --- a/src/gausskernel/process/postmaster/pgstat.cpp +++ b/src/gausskernel/process/postmaster/pgstat.cpp @@ -3169,8 +3169,10 @@ void pgstat_bestart(void) beentry->lw_count++; } while (CHANGECOUNT_IS_EVEN(beentry->lw_count)); beentry->lw_want_lock = NULL; + beentry->lw_want_start_time = (TimestampTz)0; beentry->lw_held_num = get_held_lwlocks_num(); beentry->lw_held_locks = get_held_lwlocks(); + beentry->lw_held_times = get_lwlock_held_times(); beentry->st_lw_access_flag = false; beentry->st_lw_is_cleanning_flag = false; @@ -3294,6 +3296,7 @@ void pgstat_couple_decouple_session(bool is_couple) beentry->st_tid = is_couple ? gettid() : 0; beentry->lw_held_num = is_couple ? get_held_lwlocks_num() : NULL; beentry->lw_held_locks = is_couple ? get_held_lwlocks() : NULL; + beentry->lw_held_times = is_couple ? get_lwlock_held_times() : NULL; /* make this count be odd */ do { beentry->lw_count++; @@ -7163,10 +7166,12 @@ static void pgstat_recv_vacuum(PgStat_MsgVacuum* msg, int len) tabentry = pgstat_get_tab_entry(dbentry, msg->m_tableoid, true, msg->m_statFlag); /* Resetting dead_tuples ... use negtive number to verify Cstore */ - if (msg->m_tuples < 0) + if (msg->m_tuples < 0) { tabentry->n_dead_tuples = 0; - else + } else { tabentry->n_dead_tuples = Max(0, tabentry->n_dead_tuples - msg->m_tuples); + tabentry->changes_since_analyze += msg->m_tuples; + } if (msg->m_autovacuum) { tabentry->autovac_vacuum_timestamp = msg->m_vacuumtime; diff --git a/src/gausskernel/process/postmaster/postmaster.cpp b/src/gausskernel/process/postmaster/postmaster.cpp index a63cc15ac8f8b2f6893378926e8ce1caaf2da8ea..7ac4315ef117667c6ea129ceba6722ef38665f8e 100644 --- a/src/gausskernel/process/postmaster/postmaster.cpp +++ b/src/gausskernel/process/postmaster/postmaster.cpp @@ -3950,15 +3950,17 @@ static int ServerLoop(void) g_instance.pid_cxt.sharedStorageXlogCopyThreadPID = initialize_util_thread(SHARE_STORAGE_XLOG_COPYER); } -#ifdef ENABLE_MULTIPLE_NODES /* when execuating xlog redo in standby cluster, * pmState is PM_HOT_STANDBY, neither PM_RECOVERY nor PM_RUN */ - if (pmState == PM_HOT_STANDBY && g_instance.pid_cxt.BarrierPreParsePID == 0 && - !dummyStandbyMode && IS_MULTI_DISASTER_RECOVER_MODE) { + if (t_thrd.postmaster_cxt.HaShmData->current_mode == STANDBY_MODE && + (pmState == PM_HOT_STANDBY || pmState == PM_RECOVERY) && g_instance.pid_cxt.BarrierPreParsePID == 0 && + !dummyStandbyMode && !g_instance.csn_barrier_cxt.pre_parse_started && + u_sess->attr.attr_storage.recovery_min_apply_delay > 0) { + g_instance.csn_barrier_cxt.pre_parse_started = true; g_instance.pid_cxt.BarrierPreParsePID = initialize_util_thread(BARRIER_PREPARSE); } -#endif + /* * If no background writer process is running, and we are not in a * state that prevents it, start one. It doesn't matter if this @@ -4170,6 +4172,11 @@ static int ServerLoop(void) if (IS_PGXC_COORDINATOR && g_instance.attr.attr_sql.max_resource_package && (g_instance.pid_cxt.CPMonitorPID == 0) && (pmState == PM_RUN) && !dummyStandbyMode) g_instance.pid_cxt.CPMonitorPID = initialize_util_thread(WLM_CPMONITOR); + + if (g_instance.pid_cxt.BarrierPreParsePID == 0 && + u_sess->attr.attr_storage.recovery_min_apply_delay > 0) { + g_instance.pid_cxt.BarrierPreParsePID = initialize_util_thread(BARRIER_PREPARSE); + } #ifndef ENABLE_LITE_MODE /* If we have lost the twophase cleaner, try to start a new one */ @@ -5782,11 +5789,11 @@ static void SIGHUP_handler(SIGNAL_ARGS) signal_child(g_instance.pid_cxt.sharedStorageXlogCopyThreadPID, SIGHUP); } -#ifdef ENABLE_MULTIPLE_NODES if (g_instance.pid_cxt.BarrierPreParsePID != 0) { signal_child(g_instance.pid_cxt.BarrierPreParsePID, SIGHUP); } +#ifdef ENABLE_MULTIPLE_NODES if (g_instance.pid_cxt.TsCompactionPID != 0) { signal_child(g_instance.pid_cxt.TsCompactionPID, SIGHUP); } @@ -5922,11 +5929,9 @@ static void pmdie(SIGNAL_ARGS) signal_child(g_instance.pid_cxt.StartupPID, SIGTERM); } -#ifdef ENABLE_MULTIPLE_NODES if (g_instance.pid_cxt.BarrierPreParsePID != 0) { signal_child(g_instance.pid_cxt.BarrierPreParsePID, SIGTERM); } -#endif if (g_instance.pid_cxt.PageRepairPID != 0) { signal_child(g_instance.pid_cxt.PageRepairPID, SIGTERM); @@ -7672,7 +7677,6 @@ static void reaper(SIGNAL_ARGS) continue; } -#ifdef ENABLE_MULTIPLE_NODES if (pid == g_instance.pid_cxt.BarrierPreParsePID) { g_instance.pid_cxt.BarrierPreParsePID = 0; write_stderr("%s LOG: barrier pre parse thread exit\n", GetReaperLogPrefix(logBuf, ReaperLogBufSize)); @@ -7681,6 +7685,7 @@ static void reaper(SIGNAL_ARGS) continue; } +#ifdef ENABLE_MULTIPLE_NODES if (pid == g_instance.pid_cxt.TsCompactionPID) { g_instance.pid_cxt.TsCompactionPID = 0; if (!EXIT_STATUS_0(exitstatus)) @@ -8137,10 +8142,10 @@ static void PostmasterStateMachineReadOnly(void) csnminsync_thread_shutdown(); signal_child(g_instance.pid_cxt.CsnminSyncPID, SIGTERM); } +#endif /* ENABLE_MULTIPLE_NODES */ if (g_instance.pid_cxt.BarrierPreParsePID != 0) signal_child(g_instance.pid_cxt.BarrierPreParsePID, SIGTERM); -#endif /* ENABLE_MULTIPLE_NODES */ if (g_instance.pid_cxt.UndoLauncherPID != 0) signal_child(g_instance.pid_cxt.UndoLauncherPID, SIGTERM); @@ -8291,8 +8296,8 @@ static void PostmasterStateMachine(void) g_instance.pid_cxt.StackPerfPID == 0 && g_instance.pid_cxt.CfsShrinkerPID == 0 && g_instance.pid_cxt.BarrierCreatorPID == 0 && g_instance.pid_cxt.PageRepairPID == 0 && -#ifdef ENABLE_MULTIPLE_NODES g_instance.pid_cxt.BarrierPreParsePID == 0 && +#ifdef ENABLE_MULTIPLE_NODES g_instance.pid_cxt.CommPoolerCleanPID == 0 && streaming_backend_manager(STREAMING_BACKEND_SHUTDOWN) && g_instance.pid_cxt.TsCompactionPID == 0 && g_instance.pid_cxt.TsCompactionAuxiliaryPID == 0 && g_instance.pid_cxt.CommPoolerCleanPID == 0 && @@ -9392,11 +9397,6 @@ static void handle_begin_hot_standby() wal_get_role_string(get_cur_mode())))); ereport(LOG, (errmsg("database system is ready to accept read only connections"))); -#ifdef ENABLE_MULTIPLE_NODES - if (IS_MULTI_DISASTER_RECOVER_MODE && g_instance.pid_cxt.BarrierPreParsePID == 0) { - g_instance.pid_cxt.BarrierPreParsePID = initialize_util_thread(BARRIER_PREPARSE); - } -#endif pmState = PM_HOT_STANDBY; } } @@ -9818,6 +9818,11 @@ static void PaxosPromoteLeader(void) #endif } +static bool check_start_preparse_signal() { + return CheckSignalByFile(WAL_PREPARSER_SIGNAL_FILE, + &g_instance.csn_barrier_cxt.max_run_time, sizeof(g_instance.csn_barrier_cxt.max_run_time)); +} + /* * sigusr1_handler - handle signal conditions from child processes */ @@ -10329,6 +10334,11 @@ static void sigusr1_handler(SIGNAL_ARGS) } } + if (g_instance.pid_cxt.BarrierPreParsePID == 0 && check_start_preparse_signal() && + u_sess->attr.attr_storage.recovery_min_apply_delay > 0) { + g_instance.pid_cxt.BarrierPreParsePID = initialize_util_thread(BARRIER_PREPARSE); + } + #ifndef ENABLE_MULTIPLE_NODES uint32 nodeID = 0; NewNodeInfo nodeinfo; diff --git a/src/gausskernel/process/stream/streamCore.cpp b/src/gausskernel/process/stream/streamCore.cpp index 9736c8b7d1c120e5a65eed0f38eb950764be2a4c..263a22ca365e33cb1aaa4c4c775328e343dfc3b2 100755 --- a/src/gausskernel/process/stream/streamCore.cpp +++ b/src/gausskernel/process/stream/streamCore.cpp @@ -338,6 +338,7 @@ StreamNodeGroup::StreamNodeGroup() m_streamNum(1), m_createThreadNum(0), m_streamEnter(0), + m_streamEnterCount(0), m_canceled(false), m_needClean(false), m_errorStop(false), @@ -602,6 +603,8 @@ void StreamNodeGroup::cancelStreamThread() */ void StreamNodeGroup::quitSyncPoint() { + int timeout = 30; + if (StreamThreadAmI() == true) { StreamPair* pair = NULL; AutoMutexLock streamLock(&m_mutex); @@ -609,6 +612,7 @@ void StreamNodeGroup::quitSyncPoint() /* signal the top consumer if i am the last stream thread. */ streamLock.lock(); m_streamEnter++; + m_streamEnterCount++; Assert(u_sess->stream_cxt.producer_obj != NULL); pair = (u_sess->stream_cxt.producer_obj)->getPair(); @@ -621,11 +625,24 @@ void StreamNodeGroup::quitSyncPoint() Assert(m_quitWaitCond >= 0); Assert(pair->expectThreadNum >= pair->createThreadNum); - if (m_quitWaitCond == 0) + if (m_quitWaitCond < 0) { + ereport(WARNING, (errmsg("Stream sub thread m_quitWaitCond invalid: %d. " + "To get backtrace detail, set backtrace_min_messages=warning.", m_quitWaitCond))); + ereport(LOG, (errmsg("Stream info, smp id: %u, m_streamEnter: %d, m_streamEnterCount: %d, " + "ThreadId: %u, m_createThreadNum: %d, m_size: %d", + u_sess->stream_cxt.smp_id, m_streamEnter, m_streamEnterCount, + (u_sess->stream_cxt.producer_obj)->getThreadId(), m_createThreadNum, m_size))); + } + if (m_quitWaitCond <= 0) pthread_cond_broadcast(&m_cond); else { - while (m_quitWaitCond != 0) - pthread_cond_wait(&m_cond, &m_mutex); + struct timespec ts; + while (m_quitWaitCond > 0) { + clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_sec += timeout; + ts.tv_nsec = 0; + pthread_cond_timedwait(&m_cond, &m_mutex, &ts); + } } streamLock.unLock(); } else if (StreamTopConsumerAmI() == true) { @@ -640,7 +657,13 @@ void StreamNodeGroup::quitSyncPoint() Assert(m_quitWaitCond >= 0); Assert(m_size >= m_createThreadNum); - if (m_quitWaitCond == 0) + if (m_quitWaitCond < 0) { + ereport(WARNING, (errmsg("Stream top consumer thread m_quitWaitCond invalid: %d. " + "To get backtrace detail, set backtrace_min_messages=warning.", m_quitWaitCond))); + ereport(LOG, (errmsg("Stream info, m_streamEnter: %d, m_streamEnterCount: %d, m_createThreadNum: %d, m_size: %d", + m_streamEnter, m_streamEnterCount, m_createThreadNum, m_size))); + } + if (m_quitWaitCond <= 0) pthread_cond_broadcast(&m_cond); else { /* @@ -668,13 +691,24 @@ void StreamNodeGroup::quitSyncPoint() */ CHECK_FOR_INTERRUPTS(); - while (m_quitWaitCond != 0) - pthread_cond_wait(&m_cond, &m_mutex); + struct timespec ts; + while (m_quitWaitCond > 0) { + clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_sec += timeout; + ts.tv_nsec = 0; + pthread_cond_timedwait(&m_cond, &m_mutex, &ts); + } t_thrd.int_cxt.ImmediateInterruptOK = false; u_sess->stream_cxt.in_waiting_quit = false; } + if (m_streamEnterCount < m_createThreadNum) { + ereport(WARNING, (errmsg("Stream top consumer thread m_streamEnterCount invalid: %d", m_streamEnterCount))); + ereport(LOG, (errmsg("Stream info, m_streamEnter: %d, m_streamEnterCount: %d, m_createThreadNum: %d, m_size: %d", + m_streamEnter, m_streamEnterCount, m_createThreadNum, m_size))); + } + streamLock.unLock(); } } else @@ -1055,7 +1089,7 @@ void StreamNodeGroup::deInit(StreamObjStatus status) do { streamLock1.lock(); - if (m_streamEnter == 0) + if (m_streamEnter <= 0 && m_streamEnterCount >= m_createThreadNum) saveQuit = true; streamLock1.unLock(); diff --git a/src/gausskernel/process/threadpool/knl_instance.cpp b/src/gausskernel/process/threadpool/knl_instance.cpp index f1f91d2386878eb8b0280e19f11cf83eead00549..36a836b77944e2da9fb68dcc9c0da2184d20f4be 100755 --- a/src/gausskernel/process/threadpool/knl_instance.cpp +++ b/src/gausskernel/process/threadpool/knl_instance.cpp @@ -876,6 +876,8 @@ static void knl_g_csn_barrier_init(knl_g_csn_barrier_context* csn_barrier_cxt) { csn_barrier_cxt->barrier_hash_table = NULL; csn_barrier_cxt->barrier_hashtbl_lock = NULL; + csn_barrier_cxt->pre_parse_started = false; + csn_barrier_cxt->max_run_time = 0; csn_barrier_cxt->barrier_context = NULL; errno_t rc = memset_s(csn_barrier_cxt->stopBarrierId, MAX_BARRIER_ID_LENGTH, 0, sizeof(csn_barrier_cxt->stopBarrierId)); diff --git a/src/gausskernel/process/threadpool/knl_thread.cpp b/src/gausskernel/process/threadpool/knl_thread.cpp index 298e0935bc1848d7ecd653945c20e4040f739551..70b827f07846c750a8d68840c5d8e81c51948dc9 100755 --- a/src/gausskernel/process/threadpool/knl_thread.cpp +++ b/src/gausskernel/process/threadpool/knl_thread.cpp @@ -1406,6 +1406,8 @@ static void knl_t_storage_init(knl_t_storage_context* storage_cxt) storage_cxt->isSwitchoverLockHolder = false; storage_cxt->num_held_lwlocks = 0; storage_cxt->held_lwlocks = (LWLockHandle*)palloc0(MAX_SIMUL_LWLOCKS * sizeof(LWLockHandle)); + storage_cxt->lwlock_held_times = (TimestampTz*)palloc0(MAX_SIMUL_LWLOCKS * sizeof(TimestampTz)); + storage_cxt->trace_lwlock_time = false; storage_cxt->lock_addin_request = 0; storage_cxt->lock_addin_request_allowed = true; storage_cxt->counts_for_pid = 0; diff --git a/src/gausskernel/storage/access/transam/parallel_recovery/dispatcher.cpp b/src/gausskernel/storage/access/transam/parallel_recovery/dispatcher.cpp index 56968dfefd2f410a171bd501c9ea00875d61a2cc..769d764dbc57fdf776f0fabf0a9d3c54269fe14c 100755 --- a/src/gausskernel/storage/access/transam/parallel_recovery/dispatcher.cpp +++ b/src/gausskernel/storage/access/transam/parallel_recovery/dispatcher.cpp @@ -83,6 +83,20 @@ extern THR_LOCAL bool redo_oldversion_xlog; namespace parallel_recovery { + +#define DISPATCH_ALGORITHM_HASH 1 +#define DISPATCH_ALGORITHM_ALL_DIRECTION 2 +#define IS_DISPATCH_ALGORITHM_ALL_DIRECTION_ON (g_instance.attr.attr_storage.enable_batch_dispatch && (g_instance.attr.attr_storage.parallel_recovery_dispatch_algorithm == DISPATCH_ALGORITHM_ALL_DIRECTION)) + +#define WAL_SAMPLE_PERIOD 20 +#define MAX_RNODE_SAMPLE_LIST_SIZE 512 +#define REASSIGN_SCORE_THRESHOLD 3.0 + +#define TEMP_VAR_LEN 32 + +static const XLogRecPtr FIRST_INIT_REASSGINED_WORKER_PERIOD = (XLogRecPtr) 1024 * 1024 * 200; +static const XLogRecPtr REASSGINED_WORKER_PERIOD = (XLogRecPtr)1024 * 1024 * 1024 * 1; + typedef struct RmgrDispatchData { bool (*rm_dispatch)(XLogReaderState *record, List *expectedTLIs, TimestampTz recordXTime); bool (*rm_loginfovalid)(XLogReaderState *record, uint8 minInfo, uint8 maxInfo); @@ -394,6 +408,35 @@ static LogDispatcher *CreateDispatcher() newDispatcher->dispatchReadRecPtr = 0; newDispatcher->dispatchEndRecPtr = 0; newDispatcher->startupTimeCost = t_thrd.xlog_cxt.timeCost; + + if (IS_DISPATCH_ALGORITHM_ALL_DIRECTION_ON) { + newDispatcher->rbVar.begin_worker_idx = 0; + newDispatcher->rbVar.first_init_reassigned_worker = true; + newDispatcher->rbVar.last_lsn = 0; + newDispatcher->rbVar.re_assigned_times_step1 = 0; + newDispatcher->rbVar.re_assigned_times_step2 = 0; + newDispatcher->rbVar.wal_sample_loop = 0; + SpinLockInit(&newDispatcher->rbVar.dispatch_dyhash_lock); + + HASHCTL *wal_recovery_sample_hashctl = (HASHCTL *)MemoryContextAllocZero(ctx, sizeof(HASHCTL)); + wal_recovery_sample_hashctl->keysize = sizeof(RelFileNode); + wal_recovery_sample_hashctl->entrysize = sizeof(WalSampleStats); + wal_recovery_sample_hashctl->hash = tag_hash; + wal_recovery_sample_hashctl->hcxt = ctx; + newDispatcher->rbVar.wal_recovery_sample_hashtbl = hash_create("wal recovery sample hash", 512, + wal_recovery_sample_hashctl, HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT); + + HASHCTL *wal_recovery_dispatch_hashctl = (HASHCTL *)MemoryContextAllocZero(ctx, sizeof(HASHCTL)); + wal_recovery_dispatch_hashctl->keysize = sizeof(RelFileNode); + wal_recovery_dispatch_hashctl->entrysize = sizeof(Rnode2WorkerEntry); + wal_recovery_dispatch_hashctl->hash = tag_hash; + wal_recovery_dispatch_hashctl->hcxt = ctx; + newDispatcher->rbVar.wal_recovery_dispatch_hashtbl = hash_create("wal recovery dispatch hash", 512, + wal_recovery_dispatch_hashctl, HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT); + + newDispatcher->rbVar.rnode_sample_list = (RnodeInfo *)MemoryContextAllocZero(ctx, sizeof(RnodeInfo) * MAX_RNODE_SAMPLE_LIST_SIZE); + } + return newDispatcher; } @@ -515,6 +558,20 @@ static void DestroyRecoveryWorkers() pfree(g_dispatcher->chosedWorkerIds); g_dispatcher->chosedWorkerIds = NULL; } + if (IS_DISPATCH_ALGORITHM_ALL_DIRECTION_ON) { + if (g_dispatcher->rbVar.rnode_sample_list != NULL) { + pfree(g_dispatcher->rbVar.rnode_sample_list); + g_dispatcher->rbVar.rnode_sample_list = NULL; + } + if (g_dispatcher->rbVar.wal_recovery_sample_hashtbl != NULL) { + hash_destroy(g_dispatcher->rbVar.wal_recovery_sample_hashtbl); + g_dispatcher->rbVar.wal_recovery_sample_hashtbl = NULL; + } + if (g_dispatcher->rbVar.wal_recovery_dispatch_hashtbl != NULL) { + hash_destroy(g_dispatcher->rbVar.wal_recovery_dispatch_hashtbl); + g_dispatcher->rbVar.wal_recovery_dispatch_hashtbl = NULL; + } + } if (get_real_recovery_parallelism() > 1) { MemoryContextSwitchTo(g_dispatcher->oldCtx); MemoryContextDelete(g_instance.comm_cxt.predo_cxt.parallelRedoCtx); @@ -592,6 +649,150 @@ void CheckDispatchCount(XLogRecPtr lastCheckLsn) } } +inline int rnode_compare(const void *a, const void *b) { + return (((RnodeInfo*)b)->inc - ((RnodeInfo*)a)->inc); +} + +uint32 get_most_idle_worker(uint32 arr[], uint32 n) { + uint32 min = UINT32_MAX; + uint32 workerId = 0; + + for (uint32 i=0; irbVar.rnode_sample_list; + uint32 workloads[MOST_FAST_RECOVERY_LIMIT] = {0}; + + HASH_SEQ_STATUS status; + WalSampleStats* sampleStats = NULL; + Rnode2WorkerEntry* rnode2Worker = NULL; + errno_t rc; + + float keepStdDeviation = 0.0; + float bestStdDeviation = 0.0; + float score = 1.0; + + // 1. sort sampling data in reverse order + hash_seq_init(&status, g_dispatcher->rbVar.wal_recovery_sample_hashtbl); + while ((sampleStats = (WalSampleStats *)hash_seq_search(&status)) != NULL) { + uint32 inc = sampleStats->walSampleVal.totalCnt - sampleStats->walSampleVal.lastTotalCnt; + if (inc == 0) { + continue; + } + sampleStats->walSampleVal.lastTotalCnt = sampleStats->walSampleVal.totalCnt; + sortedList[rnodeNum] = {sampleStats->rnode, inc, false, 0}; + rnodeNum++; + if (rnodeNum == MAX_RNODE_SAMPLE_LIST_SIZE) { + hash_seq_term(&status); + break; + } + } + + qsort(sortedList, rnodeNum, sizeof(RnodeInfo), rnode_compare); + g_dispatcher->rbVar.re_assigned_times_step1++; + hash_seq_init(&status, g_dispatcher->rbVar.wal_recovery_sample_hashtbl); + while ((sampleStats = (WalSampleStats *)hash_seq_search(&status)) != NULL) { + hash_search(g_dispatcher->rbVar.wal_recovery_sample_hashtbl, &sampleStats->rnode, HASH_REMOVE, NULL); + } + + // 2. calculate current and best load balancing degree + rc = memset_s(workloads, sizeof(workloads), 0, sizeof(workloads)); + securec_check(rc, "", ""); + for (uint32 i=0; irbVar.wal_recovery_dispatch_hashtbl, &sortedList[i].rnode, HASH_FIND, NULL); + if (entry == NULL) { + elog(WARNING, "entry is null, rnode=%u/%u/%u", sortedList[i].rnode.spcNode, sortedList[i].rnode.dbNode, sortedList[i].rnode.relNode); + continue; + } + uint32 workerId = entry->workerId; + workloads[workerId] += sortedList[i].inc; + } + keepStdDeviation = cal_standard_deviation(workloads, workerNum); + + rc = memset_s(workloads, sizeof(workloads), 0, sizeof(workloads)); + securec_check(rc, "", ""); + for (uint32 i=0; i threshold, then remap + score = keepStdDeviation / bestStdDeviation; + if (!(force || score > REASSIGN_SCORE_THRESHOLD)) { + return; + } + + // 4. remap + g_dispatcher->rbVar.re_assigned_times_step2++; + elog(LOG, "reassign details: step1=%u, step2=%u", g_dispatcher->rbVar.re_assigned_times_step1, g_dispatcher->rbVar.re_assigned_times_step2); + SpinLockAcquire(&g_dispatcher->rbVar.dispatch_dyhash_lock); + hash_seq_init(&status, g_dispatcher->rbVar.wal_recovery_dispatch_hashtbl); + while ((rnode2Worker = (Rnode2WorkerEntry *)hash_seq_search(&status)) != NULL) { + elog(LOG, "before: rnode: %u, worker_id: %u", rnode2Worker->rnode.relNode, rnode2Worker->workerId); + hash_search(g_dispatcher->rbVar.wal_recovery_dispatch_hashtbl, &rnode2Worker->rnode, HASH_REMOVE, NULL); + } + + for (uint32 i=0; irbVar.wal_recovery_dispatch_hashtbl, &sortedList[i].rnode, HASH_ENTER, NULL); + rnode2Worker->workerId = sortedList[i].preWorkerId; + elog(LOG, "now: rnode: %u, worker_id: %u", rnode2Worker->rnode.relNode, rnode2Worker->workerId); + } + SpinLockRelease(&g_dispatcher->rbVar.dispatch_dyhash_lock); + + // 5. wait all worker consuming assigned xlogrecord + ApplyReadyTxnLogRecords(g_dispatcher->txnWorker, true); +} + +void ReAutoAssignWorker() { + XLogRecPtr period = g_dispatcher->dispatchEndRecPtr - g_dispatcher->rbVar.last_lsn; + if (g_dispatcher->rbVar.first_init_reassigned_worker && (period > FIRST_INIT_REASSGINED_WORKER_PERIOD)) { + g_dispatcher->rbVar.first_init_reassigned_worker = false; + g_dispatcher->rbVar.last_lsn = g_dispatcher->dispatchEndRecPtr; + reAssignAllWorker(true); + return; + } + + if (period > REASSGINED_WORKER_PERIOD) { + g_dispatcher->rbVar.last_lsn = g_dispatcher->dispatchEndRecPtr; + reAssignAllWorker(false); + } +} + /* Run from the dispatcher thread. */ void DispatchRedoRecordToFile(XLogReaderState *record, List *expectedTLIs, TimestampTz recordXTime) { @@ -633,6 +834,12 @@ void DispatchRedoRecordToFile(XLogReaderState *record, List *expectedTLIs, Times ProcessPendingRecords(true); else if (++g_dispatcher->pendingCount >= g_dispatcher->pendingMax) ProcessPendingRecords(); + if (IS_DISPATCH_ALGORITHM_ALL_DIRECTION_ON) { + ReAutoAssignWorker(); + } else if ((g_dispatcher->dispatchEndRecPtr - g_dispatcher->dispatchFix.lastCheckLsn) > DISPATCH_FIX_SIZE) { + CheckDispatchCount(g_dispatcher->dispatchEndRecPtr); + } + } if (fatalerror == true) { /* output panic error info */ @@ -1413,7 +1620,7 @@ static void GetWorkerIds(XLogReaderState *record, uint32 designatedWorker, bool /* * * count worker id by hash */ -uint32 GetWorkerId(const RelFileNode &node, BlockNumber block, ForkNumber forkNum) +uint32 GetWorkerIdByHash(const RelFileNode &node, BlockNumber block, ForkNumber forkNum) { uint32 workerCount = GetPageWorkerCount(); uint32 undoZidWorkersNum = get_recovery_undozidworkers_num(); @@ -1429,6 +1636,84 @@ uint32 GetWorkerId(const RelFileNode &node, BlockNumber block, ForkNumber forkNu return tag_hash(&tag, sizeof(tag)) % workerCount; } +uint32 get_most_idle_worker() { + uint32 workerId = 0; + uint32 min = UINT32_MAX; + uint32 workerNum = g_dispatcher->pageWorkerCount-1; + g_dispatcher->rbVar.begin_worker_idx++; + g_dispatcher->rbVar.begin_worker_idx %= workerNum; + + for (uint32 i=g_dispatcher->rbVar.begin_worker_idx;;) { + uint32 count = SPSCGetQueueCount(g_dispatcher->pageWorkers[i]->queue); + if (count < min) { + min = count; + workerId = i; + } + + i = (i + 1) % workerNum; + if (i == g_dispatcher->rbVar.begin_worker_idx) { + break; + } + } + + return workerId; +} + +uint32 GetWorkerIdByAllAssigned(const RelFileNode &node, BlockNumber block, ForkNumber forkNum) +{ + uint32 workerCount = GetPageWorkerCount(); + uint32 undoZidWorkersNum = get_recovery_undozidworkers_num(); + WalSampleStats* rnode_redo_stats = NULL; + bool found = false; + + if (SUPPORT_USTORE_UNDO_WORKER) + workerCount = workerCount - undoZidWorkersNum; + + if (workerCount == 0) + return ANY_WORKER; + + if (workerCount == 1) { + return workerCount - 1; + } + + // 1.sample + g_dispatcher->rbVar.wal_sample_loop++; + g_dispatcher->rbVar.wal_sample_loop %= WAL_SAMPLE_PERIOD; + if (g_dispatcher->rbVar.wal_sample_loop == 0) { + rnode_redo_stats = (WalSampleStats*)hash_search(g_dispatcher->rbVar.wal_recovery_sample_hashtbl, &node, HASH_ENTER, &found); + if (found) { + rnode_redo_stats->walSampleVal.totalCnt++; + } else { + rnode_redo_stats->walSampleVal.totalCnt = 1; + rnode_redo_stats->walSampleVal.lastTotalCnt = 0; + } + } + + // 2.directional select + found = false; + SpinLockAcquire(&g_dispatcher->rbVar.dispatch_dyhash_lock); + Rnode2WorkerEntry *entry = (Rnode2WorkerEntry*)hash_search(g_dispatcher->rbVar.wal_recovery_dispatch_hashtbl, &node, HASH_ENTER, &found); + if (found) { + entry->count++; + SpinLockRelease(&g_dispatcher->rbVar.dispatch_dyhash_lock); + return entry->workerId; + } + + // 3.select the most idle worker + entry->workerId = get_most_idle_worker(); + entry->count = 1; + SpinLockRelease(&g_dispatcher->rbVar.dispatch_dyhash_lock); + return entry->workerId; +} + +uint32 GetWorkerId(const RelFileNode &node, BlockNumber block, ForkNumber forkNum) +{ + if (IS_DISPATCH_ALGORITHM_ALL_DIRECTION_ON) { + return GetWorkerIdByAllAssigned(node, block, forkNum); + } + return GetWorkerIdByHash(node, block, forkNum); +} + static void AddWorkerToSet(uint32 id) { if (id >= g_dispatcher->pageWorkerCount) { @@ -2501,4 +2786,115 @@ static void HandleStartupProcInterruptsForParallelRedo(void) if (IsUnderPostmaster && !PostmasterIsAlive()) gs_thread_exit(1); } + +bool in_full_sync_dispatch(void) +{ + if (!g_dispatcher || !g_instance.attr.attr_storage.enable_batch_dispatch) + return true; + return g_dispatcher->full_sync_dispatch; +} + +void get_dispatch_stat_detail(DispatchStat **dispatch_stat, uint32 *realNum) +{ + if (!IS_DISPATCH_ALGORITHM_ALL_DIRECTION_ON) { + return; + } + + uint32 workerNum = g_dispatcher->pageWorkerCount; + *realNum = workerNum + 1; + HASH_SEQ_STATUS status; + Rnode2WorkerEntry* entry = NULL; + uint32 rnodeNum = 0; + uint32 totalNum = 0; + RnodeInfo *sortedList = NULL; + uint32 *workloads = (uint32*)palloc0(workerNum * sizeof(uint32)); + char **details = (char**)palloc0(workerNum * sizeof(char*)); + size_t *capacity = (size_t*)palloc0(workerNum * sizeof(size_t)); + size_t *usedCapacity = (size_t*)palloc0(workerNum * sizeof(size_t)); + errno_t rc; + for (uint32 i=0; irbVar.dispatch_dyhash_lock); + long num = hash_get_num_entries(g_dispatcher->rbVar.wal_recovery_dispatch_hashtbl); + sortedList = (RnodeInfo*)palloc0(num * sizeof(RnodeInfo)); + hash_seq_init(&status, g_dispatcher->rbVar.wal_recovery_dispatch_hashtbl); + while ((entry = (Rnode2WorkerEntry *)hash_seq_search(&status)) != NULL) { + totalNum += entry->count; + workloads[entry->workerId] += entry->count; + sortedList[rnodeNum] = {entry->rnode, entry->count, false, entry->workerId}; + rnodeNum++; + } + SpinLockRelease(&g_dispatcher->rbVar.dispatch_dyhash_lock); + qsort(sortedList, rnodeNum, sizeof(RnodeInfo), rnode_compare); + char temp[TEMP_VAR_LEN] = {0}; + + for (uint32 i=0; ipageWorkers[i]->tid.thid; + stat[i].entry_num = SPSCGetQueueCount(g_dispatcher->pageWorkers[i]->queue); + stat[i].percent = (float4)(workloads[i]) / (float4)(totalNum); + stat[i].detail = (char*)palloc(strlen(details[i]) + 1); + rc = sprintf_s(stat[i].detail, strlen(details[i]) + 1, "%s", details[i]); + securec_check_ss(rc, "\0", "\0"); + } + + stat[startup_idx].worker_name = (char*)palloc(strlen(startupName) + 1); + rc = sprintf_s(stat[startup_idx].worker_name, strlen(startupName) + 1, "%s", startupName); + securec_check_ss(rc, "\0", "\0"); + stat[startup_idx].pid = g_instance.pid_cxt.StartupPID; + stat[startup_idx].entry_num = getPendingCount(g_dispatcher->txnWorker); + stat[startup_idx].percent = 1.0; + + stat[startup_idx].detail = (char*)palloc(TEMP_VAR_LEN); + rc = memset_s(stat[startup_idx].detail, TEMP_VAR_LEN, 0, TEMP_VAR_LEN); + securec_check_ss(rc, "\0", "\0"); + rc = sprintf_s(stat[startup_idx].detail, TEMP_VAR_LEN, "%u/%u", g_dispatcher->rbVar.re_assigned_times_step1, + g_dispatcher->rbVar.re_assigned_times_step2); + securec_check_ss(rc, "\0", "\0"); + + pfree(workloads); + pfree(capacity); + pfree(usedCapacity); + pfree(sortedList); + for (uint32 i=0; itransed_txn_lsn); +} + +XLogRecPtr getTryingTxnLsn(TxnRedoWorker *worker) +{ + return (XLogRecPtr)pg_atomic_read_u64((volatile uint64*)&worker->txn_trying_lsn); +} + +uint32 getPendingCount(TxnRedoWorker *worker) +{ + return worker->pendingCount; +} + TxnRedoWorker *StartTxnRedoWorker() { TxnRedoWorker *worker = (TxnRedoWorker *)palloc(sizeof(TxnRedoWorker)); @@ -71,6 +90,7 @@ TxnRedoWorker *StartTxnRedoWorker() worker->procHead = NULL; worker->procTail = NULL; + worker->pendingCount = 0; return worker; } @@ -97,6 +117,7 @@ void AddTxnRedoItem(TxnRedoWorker *worker, RedoItem *item) } item->nextByWorker[0] = NULL; worker->pendingTail = item; + worker->pendingCount++; } void ApplyReadyTxnShareLogRecords(RedoItem *item) @@ -285,6 +306,7 @@ void ApplyReadyTxnLogRecords(TxnRedoWorker *worker, bool forceAll) */ if (XLByteLE(record->EndRecPtr, curRead)) { item = ProcTxnItem(item); + worker->pendingCount--; } else { break; } diff --git a/src/gausskernel/storage/access/transam/xact.cpp b/src/gausskernel/storage/access/transam/xact.cpp index 6670768714f5684e784295d407f40c7bd7096466..cf2c3f77e53cc72c9e69f390691991d910d1d6da 100755 --- a/src/gausskernel/storage/access/transam/xact.cpp +++ b/src/gausskernel/storage/access/transam/xact.cpp @@ -2545,6 +2545,7 @@ static void StartTransaction(bool begin_on_gtm) CallXactCallbacks(XACT_EVENT_START); #endif + t_thrd.storage_cxt.trace_lwlock_time = module_logging_is_on(MOD_LWLOCK); if (module_logging_is_on(MOD_TRANS_XACT)) { ereport(LOG, (errmodule(MOD_TRANS_XACT), errmsg("start transaction succ. In Node %s, trans state: %s -> %s.", @@ -3151,6 +3152,7 @@ static void CommitTransaction(bool STP_commit) oldstate = s->state; s->state = TRANS_DEFAULT; + t_thrd.storage_cxt.trace_lwlock_time = false; if (module_logging_is_on(MOD_TRANS_XACT)) { ereport(LOG, (errmodule(MOD_TRANS_XACT), errmsg("Local Node %s: local commit process completed, trans state : %s -> %s", @@ -3585,6 +3587,7 @@ static void PrepareTransaction(bool STP_commit) * back to default */ s->state = TRANS_DEFAULT; + t_thrd.storage_cxt.trace_lwlock_time = false; RESUME_INTERRUPTS(); @@ -4027,6 +4030,7 @@ static void AbortTransaction(bool PerfectRollback, bool STP_rollback) t_thrd.xact_cxt.callPrint = false; u_sess->catalog_cxt.myLobTempToastNamespace = InvalidOid; u_sess->plsql_cxt.ActiveLobToastOid = InvalidOid; + t_thrd.storage_cxt.trace_lwlock_time = false; } static void CleanupTransaction(void) diff --git a/src/gausskernel/storage/access/transam/xlog.cpp b/src/gausskernel/storage/access/transam/xlog.cpp index 935c168e2e19098e9db82fefe2285a29f8494943..2895e5641752706a96c42673e03dc93e3b844208 100755 --- a/src/gausskernel/storage/access/transam/xlog.cpp +++ b/src/gausskernel/storage/access/transam/xlog.cpp @@ -7747,16 +7747,31 @@ static bool CheckApplyDelayReady(void) return false; } - /* the walreceiver process is started behind here, - * ensure that the walreceiver process has been started, - * otherwise, the stream replication will be disconnected */ - if (g_instance.pid_cxt.WalReceiverPID == 0) { - return false; - } - return true; } +static void KeepWalrecvAliveWhenRecoveryDelay() +{ + static TimestampTz lastTestWalrecvTime = (TimestampTz)0; + + TimestampTz now = GetCurrentTimestamp(); + if (!TimestampDifferenceExceeds(lastTestWalrecvTime, now, 1000)) { // 1s + return; + } + lastTestWalrecvTime = now; + + if (WalRcvInProgress()) { + return; + } + + if (g_instance.pid_cxt.BarrierPreParsePID != 0) { + return; + } + + /* wake up walrecv by pre-parse thread */ + g_instance.csn_barrier_cxt.pre_parse_started = false; +} + /* * When recovery_min_apply_delay is set, we wait long enough to make sure * certain record types are applied at least that interval behind the master. @@ -7782,6 +7797,8 @@ static bool RecoveryApplyDelay(const XLogReaderState *record) return false; } + KeepWalrecvAliveWhenRecoveryDelay(); + /* * Is it a COMMIT record? * We deliberately choose not to delay aborts since they have no effect on @@ -7817,6 +7834,8 @@ static bool RecoveryApplyDelay(const XLogReaderState *record) /* might change the trigger file's location */ RedoInterruptCallBack(); + KeepWalrecvAliveWhenRecoveryDelay(); + if (CheckForFailoverTrigger() || CheckForSwitchoverTrigger() || CheckForStandbyTrigger()) { break; } @@ -10841,15 +10860,6 @@ static void sendPMBeginHotStby() pg_atomic_write_u32(&(g_instance.comm_cxt.predo_cxt.hotStdby), ATOMIC_TRUE); ereport(LOG, (errmsg("send signal to be hot standby at %X/%X", (uint32)(lastReplayedEndRecPtr >> 32), (uint32)lastReplayedEndRecPtr))); -#ifdef ENABLE_MULTIPLE_NODES - /* - * If we are in cluster-standby-mode, we need launch barreir preparse - * thread from the minrecoverypoint point. - */ - if (IS_MULTI_DISASTER_RECOVER_MODE && g_instance.pid_cxt.BarrierPreParsePID == 0) { - SetBarrierPreParseLsn(t_thrd.xlog_cxt.minRecoveryPoint); - } -#endif SendPostmasterSignal(PMSIGNAL_BEGIN_HOT_STANDBY); } } @@ -11959,9 +11969,11 @@ void CreateCheckPoint(int flags) if (TransactionIdIsNormal(globalXmin) && TransactionIdPrecedes(globalXmin, cutoff_xid)) { cutoff_xid = globalXmin; } - TransactionId globalRecycleXid = pg_atomic_read_u64(&g_instance.undo_cxt.globalRecycleXid); - if (TransactionIdIsNormal(globalRecycleXid) && TransactionIdPrecedes(globalRecycleXid, cutoff_xid)) { - cutoff_xid = globalRecycleXid; + if (g_instance.attr.attr_storage.enable_ustore) { + TransactionId globalRecycleXid = pg_atomic_read_u64(&g_instance.undo_cxt.globalRecycleXid); + if (TransactionIdIsNormal(globalRecycleXid) && TransactionIdPrecedes(globalRecycleXid, cutoff_xid)) { + cutoff_xid = globalRecycleXid; + } } TruncateCSNLOG(cutoff_xid); t_thrd.checkpoint_cxt.last_truncate_log_time = now; @@ -12768,9 +12780,11 @@ bool CreateRestartPoint(int flags) if (TransactionIdIsNormal(globalXmin) && TransactionIdPrecedes(globalXmin, cutoffXid)) { cutoffXid = globalXmin; } - TransactionId globalRecycleXid = pg_atomic_read_u64(&g_instance.undo_cxt.globalRecycleXid); - if (TransactionIdIsNormal(globalRecycleXid) && TransactionIdPrecedes(globalRecycleXid, cutoffXid)) { - cutoffXid = globalRecycleXid; + if (g_instance.attr.attr_storage.enable_ustore) { + TransactionId globalRecycleXid = pg_atomic_read_u64(&g_instance.undo_cxt.globalRecycleXid); + if (TransactionIdIsNormal(globalRecycleXid) && TransactionIdPrecedes(globalRecycleXid, cutoffXid)) { + cutoffXid = globalRecycleXid; + } } TruncateCSNLOG(cutoffXid); t_thrd.checkpoint_cxt.last_truncate_log_time = now; diff --git a/src/gausskernel/storage/lmgr/lwlock.cpp b/src/gausskernel/storage/lmgr/lwlock.cpp index 31c7c2c4a3c89e1d91551f7e508e00b1ea5d0b62..178eee5b6dd74d3df4c8ba77e4cb216baf5a1660 100644 --- a/src/gausskernel/storage/lmgr/lwlock.cpp +++ b/src/gausskernel/storage/lmgr/lwlock.cpp @@ -73,9 +73,11 @@ #include "access/csnlog.h" #include "access/multixact.h" #include "access/subtrans.h" +#include "access/tableam.h" #include "access/ustore/undo/knl_uundoapi.h" #include "commands/async.h" #include "commands/copy.h" +#include "funcapi.h" #include "lib/ilist.h" #include "miscadmin.h" #include "pg_trace.h" @@ -86,6 +88,7 @@ #include "storage/lock/lwlock_be.h" #include "storage/predicate.h" #include "storage/proc.h" +#include "storage/procarray.h" #include "storage/lock/s_lock.h" #include "storage/spin.h" #include "storage/cucache_mgr.h" @@ -1303,7 +1306,7 @@ bool LWLockAcquire(LWLock *lock, LWLockMode mode, bool need_update_lockid) ereport(ERROR, (errcode(ERRCODE_LOCK_NOT_AVAILABLE), errmsg("too many LWLocks taken"))); } - remember_lwlock_acquire(lock); + remember_lwlock_acquire(lock, mode); /* * Lock out cancel/die interrupts until we exit the code section protected @@ -1430,7 +1433,10 @@ bool LWLockAcquire(LWLock *lock, LWLockMode mode, bool need_update_lockid) /* Add lock to list of locks held by this backend */ t_thrd.storage_cxt.held_lwlocks[t_thrd.storage_cxt.num_held_lwlocks].lock = lock; - t_thrd.storage_cxt.held_lwlocks[t_thrd.storage_cxt.num_held_lwlocks++].mode = mode; + t_thrd.storage_cxt.held_lwlocks[t_thrd.storage_cxt.num_held_lwlocks].mode = mode; + t_thrd.storage_cxt.lwlock_held_times[t_thrd.storage_cxt.num_held_lwlocks] = + t_thrd.storage_cxt.trace_lwlock_time ? GetCurrentTimestamp() : 0; + t_thrd.storage_cxt.num_held_lwlocks++; /* * Fix the process wait semaphore's count for any absorbed wakeups. @@ -1479,7 +1485,10 @@ bool LWLockConditionalAcquire(LWLock *lock, LWLockMode mode) } else { /* Add lock to list of locks held by this backend */ t_thrd.storage_cxt.held_lwlocks[t_thrd.storage_cxt.num_held_lwlocks].lock = lock; - t_thrd.storage_cxt.held_lwlocks[t_thrd.storage_cxt.num_held_lwlocks++].mode = mode; + t_thrd.storage_cxt.held_lwlocks[t_thrd.storage_cxt.num_held_lwlocks].mode = mode; + t_thrd.storage_cxt.lwlock_held_times[t_thrd.storage_cxt.num_held_lwlocks] = + t_thrd.storage_cxt.trace_lwlock_time ? GetCurrentTimestamp() : 0; + t_thrd.storage_cxt.num_held_lwlocks++; TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE(T_NAME(lock), mode); } return !mustwait; @@ -1543,7 +1552,7 @@ bool LWLockAcquireOrWait(LWLock *lock, LWLockMode mode) #ifdef LWLOCK_STATS lwstats->block_count++; #endif - remember_lwlock_acquire(lock); + remember_lwlock_acquire(lock, mode); for (;;) { /* "false" means cannot accept cancel/die interrupt here. */ @@ -1599,7 +1608,10 @@ bool LWLockAcquireOrWait(LWLock *lock, LWLockMode mode) LOG_LWDEBUG("LWLockAcquireOrWait", lock, "succeeded"); /* Add lock to list of locks held by this backend */ t_thrd.storage_cxt.held_lwlocks[t_thrd.storage_cxt.num_held_lwlocks].lock = lock; - t_thrd.storage_cxt.held_lwlocks[t_thrd.storage_cxt.num_held_lwlocks++].mode = mode; + t_thrd.storage_cxt.held_lwlocks[t_thrd.storage_cxt.num_held_lwlocks].mode = mode; + t_thrd.storage_cxt.lwlock_held_times[t_thrd.storage_cxt.num_held_lwlocks] = + t_thrd.storage_cxt.trace_lwlock_time ? GetCurrentTimestamp() : 0; + t_thrd.storage_cxt.num_held_lwlocks++; TRACE_POSTGRESQL_LWLOCK_WAIT_UNTIL_FREE(T_NAME(lock), mode); } @@ -1809,6 +1821,7 @@ void LWLockRelease(LWLock *lock) uint64 oldstate; bool check_waiters = false; int i; + TimestampTz now = t_thrd.storage_cxt.trace_lwlock_time ? GetCurrentTimestamp() : (TimestampTz)0; /* Remove lock from list of locks held. Usually, but not always, it will * be the latest-acquired lock; so search array backwards. */ @@ -1821,9 +1834,22 @@ void LWLockRelease(LWLock *lock) if (i < 0) { ereport(ERROR, (errcode(ERRCODE_LOCK_NOT_AVAILABLE), errmsg("lock %s is not held", T_NAME(lock)))); } + /* if lwlock is held longer than 1min, ereport the detail and backtrace */ + if (t_thrd.storage_cxt.trace_lwlock_time && + TimestampDifferenceExceeds(t_thrd.storage_cxt.lwlock_held_times[i], now, MSECS_PER_MIN)) { + force_backtrace_messages = true; + int old_backtrace_min_messages = u_sess->attr.attr_common.backtrace_min_messages; + u_sess->attr.attr_common.backtrace_min_messages = LOG; + ereport(LOG, (errmodule(MOD_LWLOCK), (errmsg("lwlock %s mode %d is held " + "for %ld ms longer than 1 min", T_NAME(lock), (int)(t_thrd.storage_cxt.held_lwlocks[i].mode), + now - t_thrd.storage_cxt.lwlock_held_times[i])))); + u_sess->attr.attr_common.backtrace_min_messages = old_backtrace_min_messages; + } + t_thrd.storage_cxt.num_held_lwlocks--; for (; i < t_thrd.storage_cxt.num_held_lwlocks; i++) { t_thrd.storage_cxt.held_lwlocks[i] = t_thrd.storage_cxt.held_lwlocks[i + 1]; + t_thrd.storage_cxt.lwlock_held_times[i] = t_thrd.storage_cxt.lwlock_held_times[i + 1]; } PRINT_LWDEBUG("LWLockRelease", lock, mode); @@ -1980,7 +2006,11 @@ void LWLockOwn(LWLock *lock) ereport(ERROR, (errcode(ERRCODE_LOCK_NOT_AVAILABLE), errmsg("lock %s is not held", T_NAME(lock)))); } - t_thrd.storage_cxt.held_lwlocks[t_thrd.storage_cxt.num_held_lwlocks++].lock = lock; + t_thrd.storage_cxt.held_lwlocks[t_thrd.storage_cxt.num_held_lwlocks].lock = lock; + t_thrd.storage_cxt.lwlock_held_times[t_thrd.storage_cxt.num_held_lwlocks] = + t_thrd.storage_cxt.trace_lwlock_time ? GetCurrentTimestamp() : 0; + + t_thrd.storage_cxt.num_held_lwlocks++; HOLD_INTERRUPTS(); } @@ -2000,6 +2030,7 @@ void LWLockDisown(LWLock *lock) { uint64 expected_state; int i; + TimestampTz now = t_thrd.storage_cxt.trace_lwlock_time ? GetCurrentTimestamp() : (TimestampTz)0; /* Ensure that lock is held */ expected_state = pg_atomic_read_u64(&lock->state); @@ -2017,9 +2048,22 @@ void LWLockDisown(LWLock *lock) ereport(ERROR, (errcode(ERRCODE_LOCK_NOT_AVAILABLE), errmsg("lock %s is not held", T_NAME(lock)))); } + /* if lwlock is held longer than 1min, ereport the detail and backtrace */ + if (t_thrd.storage_cxt.trace_lwlock_time && + TimestampDifferenceExceeds(t_thrd.storage_cxt.lwlock_held_times[i], now, MSECS_PER_MIN)) { + force_backtrace_messages = true; + int old_backtrace_min_messages = u_sess->attr.attr_common.backtrace_min_messages; + u_sess->attr.attr_common.backtrace_min_messages = LOG; + ereport(LOG, (errmodule(MOD_LWLOCK), (errmsg("lwlock %s mode %d is held " + "for %ld ms longer than 1 min", T_NAME(lock), (int)(t_thrd.storage_cxt.held_lwlocks[i].mode), + now - t_thrd.storage_cxt.lwlock_held_times[i])))); + u_sess->attr.attr_common.backtrace_min_messages = old_backtrace_min_messages; + } + t_thrd.storage_cxt.num_held_lwlocks--; for (; i < t_thrd.storage_cxt.num_held_lwlocks; i++) { t_thrd.storage_cxt.held_lwlocks[i] = t_thrd.storage_cxt.held_lwlocks[i + 1]; + t_thrd.storage_cxt.lwlock_held_times[i] = t_thrd.storage_cxt.lwlock_held_times[i + 1]; } RESUME_INTERRUPTS(); @@ -2043,6 +2087,12 @@ void *get_held_lwlocks(void) return (void *)t_thrd.storage_cxt.held_lwlocks; } +/* get lwlock held times */ +void *get_lwlock_held_times(void) +{ + return (void *)t_thrd.storage_cxt.lwlock_held_times; +} + #define COPY_LWLOCK_HANDLE(src, dst) do { \ (dst)->lock_addr.lock = (src)->lock; \ (dst)->lock_sx = (src)->mode; \ @@ -2216,3 +2266,261 @@ void CheckLWLockPartNumRange(void) } } +const int GS_LWLOCK_STATUS_COL_NUM = 9; +typedef struct LWLockStatInfo { + lwlock_id_mode lwlock; + TimestampTz start_time; + bool granted; +} LWLockStatInfo; + +typedef struct LWLockInstanceData { + int lwlocks_num; /* number of locks held */ + LWLockStatInfo* lwlocks; /* lwlocks */ + ThreadId pid; /* pid of this PGPROC */ + uint64 sessionid; /* session id of this PGPROC */ + GlobalSessionId globalSessionId; /* global session id */ +} LWLockInstanceData; + +typedef struct LWLockData { + int nelements; /* The length of the array */ + LWLockInstanceData* lwlocks; +} LWLockData; + +typedef struct GsLWLockStatus { + LWLockData *lwLockData; + int currInstanceIdx; + int currLockInfoIdx; + TableDistributionInfo *remoteStatus; +} GsLWLockStatus; + +/* Get the head row */ +static TupleDesc GetGsLWLockStatusFuncTupleDesc() +{ + TupleDesc tupdesc = NULL; + tupdesc = CreateTemplateTupleDesc(GS_LWLOCK_STATUS_COL_NUM, false); + AttrNumber attrIdx = 1; + TupleDescInitEntry(tupdesc, attrIdx, "node_name", TEXTOID, -1, 0); + attrIdx++; + TupleDescInitEntry(tupdesc, attrIdx, "lock_name", TEXTOID, -1, 0); + attrIdx++; + TupleDescInitEntry(tupdesc, attrIdx, "lock_unique_id", INT8OID, -1, 0); + attrIdx++; + TupleDescInitEntry(tupdesc, attrIdx, "pid", INT8OID, -1, 0); + attrIdx++; + TupleDescInitEntry(tupdesc, attrIdx, "sessionid", INT8OID, -1, 0); + attrIdx++; + TupleDescInitEntry(tupdesc, attrIdx, "global_sessionid", TEXTOID, -1, 0); + attrIdx++; + TupleDescInitEntry(tupdesc, attrIdx, "mode", TEXTOID, -1, 0); + attrIdx++; + TupleDescInitEntry(tupdesc, attrIdx, "granted", BOOLOID, -1, 0); + attrIdx++; + TupleDescInitEntry(tupdesc, attrIdx, "start_time", TIMESTAMPTZOID, -1, 0); + return BlessTupleDesc(tupdesc); +} + +static void copy_lwlock_infos(void *held_locks, void *held_times, LWLockStatInfo *dst, int num_locks) +{ + LWLockHandle *src_locks = (LWLockHandle *)held_locks; + TimestampTz *src_times = (TimestampTz *)held_times; + for (int i = 0; i < num_locks; i++) { + dst[i].lwlock.lock_addr.lock = src_locks[i].lock; + dst[i].lwlock.lock_sx = src_locks[i].mode; + dst[i].start_time = src_times[i]; + dst[i].granted = true; + } +} + +static bool IsVaildBeentry(volatile PgBackendStatus *beentry) +{ + return (beentry->st_procpid > 0 || beentry->st_sessionid > 0); +} + +static void GetBeentryLWLockInfo(LWLockInstanceData *localEntry, volatile PgBackendStatus *beentry) +{ + for (;;) { + localEntry->lwlocks_num = 0; + localEntry->pid = 0; + localEntry->sessionid = 0; + int beforeChangeCount; + int afterChangeCount; + pgstat_save_changecount_before(beentry, beforeChangeCount); + /* the only Prerequisites is that thread is valid. */ + if (IsVaildBeentry(beentry)) { + int *lw_held_num = beentry->lw_held_num; + void *lw_held_locks = beentry->lw_held_locks; + void *lw_held_times = beentry->lw_held_times; + LWLock *lw_want_lock = beentry->lw_want_lock; + LWLockMode lw_want_mode = beentry->lw_want_mode; + TimestampTz lw_want_start_time = beentry->lw_want_start_time; + if (lw_held_num != NULL && lw_held_locks != NULL && lw_held_times != NULL) { + localEntry->lwlocks_num = *lw_held_num; + if ((uint32)localEntry->lwlocks_num > get_held_lwlocks_maxnum()) { + localEntry->lwlocks_num = 0; + break; + } + copy_lwlock_infos(lw_held_locks, lw_held_times, localEntry->lwlocks, localEntry->lwlocks_num); + } + if (lw_want_lock != NULL) { + localEntry->lwlocks[localEntry->lwlocks_num].lwlock.lock_addr.lock = lw_want_lock; + localEntry->lwlocks[localEntry->lwlocks_num].lwlock.lock_sx = lw_want_mode; + localEntry->lwlocks[localEntry->lwlocks_num].start_time = lw_want_start_time; + localEntry->lwlocks[localEntry->lwlocks_num].granted = false; + localEntry->lwlocks_num++; + } + localEntry->pid = beentry->st_procpid; + localEntry->sessionid = beentry->st_sessionid; + localEntry->globalSessionId.sessionId = beentry->globalSessionId.sessionId; + localEntry->globalSessionId.nodeId = beentry->globalSessionId.nodeId; + localEntry->globalSessionId.seq = beentry->globalSessionId.seq; + } + pgstat_save_changecount_after(beentry, afterChangeCount); + if (beforeChangeCount == afterChangeCount && (beforeChangeCount & 1) == 0) { + break; + } + /* Make sure we can break out of loop if stuck... */ + CHECK_FOR_INTERRUPTS(); + } +} + +static LWLockData* GetLWLockStatusData(void) +{ + volatile PgBackendStatus *beentry = t_thrd.shemem_ptr_cxt.BackendStatusArray + BackendStatusArray_size - 1; + LWLockData *data = (LWLockData *)palloc0(sizeof(LWLockData)); + if (!u_sess->attr.attr_common.pgstat_track_activities) { + ereport(INFO, (errmsg("The collection of information is disabled because track_activities is off."))); + data->nelements = 0; + return data; + } + LWLockInstanceData *localEntry = (LWLockInstanceData *)palloc0(sizeof(LWLockInstanceData)); + localEntry->lwlocks = (LWLockStatInfo *)palloc0((get_held_lwlocks_maxnum() + 1) * sizeof(LWLockStatInfo)); + Size totalLockInfoSize = sizeof(LWLockInstanceData) * BackendStatusArray_size; + data->lwlocks = (LWLockInstanceData *)palloc0(totalLockInfoSize); + for (int i = 1; i <= BackendStatusArray_size; ++i) { + GetBeentryLWLockInfo(localEntry, beentry); + /* Only valid entries get included into the local array */ + if (localEntry->lwlocks_num > 0 && (localEntry->pid > 0 || localEntry->sessionid > 0)) { + data->lwlocks[data->nelements].lwlocks_num = localEntry->lwlocks_num; + Size lockSize = localEntry->lwlocks_num * sizeof(LWLockStatInfo); + data->lwlocks[data->nelements].lwlocks = (LWLockStatInfo *)palloc(lockSize); + errno_t errorno = memcpy_s(data->lwlocks[data->nelements].lwlocks, lockSize, localEntry->lwlocks, lockSize); + securec_check(errorno, "", ""); + data->lwlocks[data->nelements].pid = localEntry->pid; + data->lwlocks[data->nelements].sessionid= localEntry->sessionid; + data->lwlocks[data->nelements].globalSessionId = localEntry->globalSessionId; + data->nelements++; + } + beentry--; + } + pfree(localEntry->lwlocks); + pfree(localEntry); + return data; +} + +static const char* GetLWLockModeType(LWLockMode mode) +{ + if (mode == LW_EXCLUSIVE) { + return "Exclusive"; + } else if (mode == LW_SHARED) { + return "Shared"; + } else if (mode == LW_WAIT_UNTIL_FREE) { + return "Wait until free"; + } else { + return "Unknown"; + } +} + +static uint64 GetLockIndex(LWLock* lockAddr) +{ + return (uint64)((char*)lockAddr - (char*)t_thrd.shemem_ptr_cxt.mainLWLockArray)/sizeof(LWLockPadded); +} + +Datum gs_lwlock_status(PG_FUNCTION_ARGS) +{ + FuncCallContext* funcctx = NULL; + GsLWLockStatus* mystatus = NULL; + LWLockData* lwLockData = NULL; + LWLockInstanceData* instance = NULL; + HeapTuple tuple = NULL; + Datum result; + Datum values[GS_LWLOCK_STATUS_COL_NUM]; + bool nulls[GS_LWLOCK_STATUS_COL_NUM]; + errno_t rc; + int i = 0; + + rc = memset_s(values, sizeof(values), 0, sizeof(values)); + securec_check(rc, "\0", "\0"); + rc = memset_s(nulls, sizeof(nulls), 0, sizeof(nulls)); + securec_check(rc, "\0", "\0"); + + if (SRF_IS_FIRSTCALL()) { + MemoryContext oldcontext; + /* create a function context for cross-call persistence */ + funcctx = SRF_FIRSTCALL_INIT(); + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + funcctx->tuple_desc = GetGsLWLockStatusFuncTupleDesc(); + mystatus = (GsLWLockStatus*)palloc0(sizeof(GsLWLockStatus)); + funcctx->user_fctx = (void*)mystatus; + mystatus->lwLockData = GetLWLockStatusData(); + (void)MemoryContextSwitchTo(oldcontext); + } + funcctx = SRF_PERCALL_SETUP(); + mystatus = (GsLWLockStatus*)funcctx->user_fctx; + lwLockData = mystatus->lwLockData; + + while (mystatus->currInstanceIdx < lwLockData->nelements) { + instance = &lwLockData->lwlocks[mystatus->currInstanceIdx]; + Assert(instance->lwlocks_num > 0); + int curIdx = mystatus->currLockInfoIdx; + int attrIdx = 0; + bool skip = (instance->lwlocks[curIdx].lwlock.lock_addr.lock == NULL); + if (!skip) { + values[attrIdx++] = CStringGetTextDatum(g_instance.attr.attr_common.PGXCNodeName); + values[attrIdx++] = CStringGetTextDatum(T_NAME(instance->lwlocks[curIdx].lwlock.lock_addr.lock)); + values[attrIdx++] = Int64GetDatum(GetLockIndex(instance->lwlocks[curIdx].lwlock.lock_addr.lock)); + values[attrIdx++] = Int64GetDatum(instance->pid); + values[attrIdx++] = Int64GetDatum(instance->sessionid); + char* gId = GetGlobalSessionStr(instance->globalSessionId); + values[attrIdx++] = CStringGetTextDatum(gId); + pfree(gId); + values[attrIdx++] = CStringGetTextDatum(GetLWLockModeType(instance->lwlocks[curIdx].lwlock.lock_sx)); + values[attrIdx++] = BoolGetDatum(instance->lwlocks[curIdx].granted); + values[attrIdx] = TimestampTzGetDatum(instance->lwlocks[curIdx].start_time); + nulls[attrIdx] = (values[attrIdx] == 0); + } + mystatus->currLockInfoIdx++; + /* till the end, continue next instance */ + if (mystatus->currLockInfoIdx == instance->lwlocks_num) { + mystatus->currLockInfoIdx = 0; + mystatus->currInstanceIdx++; + pfree_ext(instance->lwlocks); + } + if (skip) { + continue; + } + tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls); + result = HeapTupleGetDatum(tuple); + SRF_RETURN_NEXT(funcctx, result); + } + + if (IS_PGXC_COORDINATOR && !IsConnFromCoord() && mystatus->remoteStatus != NULL) { + Tuplestorestate *tupstore = (mystatus->remoteStatus)->state->tupstore; + TupleTableSlot *slot = (mystatus->remoteStatus)->slot; + + if (!tuplestore_gettupleslot(tupstore, true, false, slot)) { + FreeParallelFunctionState((mystatus->remoteStatus)->state); + ExecDropSingleTupleTableSlot(slot); + pfree_ext(mystatus->remoteStatus); + SRF_RETURN_DONE(funcctx); + } + for (i = 0; i < GS_LWLOCK_STATUS_COL_NUM; i++) { + values[i] = tableam_tslot_getattr(slot, (i + 1), &nulls[i]); + } + tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls); + (void)ExecClearTuple(slot); + + SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple)); + } + SRF_RETURN_DONE(funcctx); +} + diff --git a/src/gausskernel/storage/lmgr/lwlock_be.cpp b/src/gausskernel/storage/lmgr/lwlock_be.cpp index c7a61315c4cef2687e45bf326e46dd4fcaf867d5..d8017c1dd0df7b17a1909f0a8e82cbe66a414c75 100644 --- a/src/gausskernel/storage/lmgr/lwlock_be.cpp +++ b/src/gausskernel/storage/lmgr/lwlock_be.cpp @@ -28,7 +28,7 @@ * remember lwlock to require before entering to lwlock * waiting loop. */ -void remember_lwlock_acquire(LWLock *lock) +void remember_lwlock_acquire(LWLock *lock, LWLockMode mode) { if (t_thrd.shemem_ptr_cxt.MyBEEntry) { volatile PgBackendStatus *beentry = t_thrd.shemem_ptr_cxt.MyBEEntry; @@ -38,6 +38,8 @@ void remember_lwlock_acquire(LWLock *lock) * because this function maybe called before pgstat_bestart() function. */ beentry->lw_want_lock = lock; + beentry->lw_want_mode = mode; + beentry->lw_want_start_time = t_thrd.storage_cxt.trace_lwlock_time ? GetCurrentTimestamp() : (TimestampTz)0; } } @@ -55,5 +57,6 @@ void forget_lwlock_acquire(void) * because this function may be called before pgstat_bestart() function. */ beentry->lw_want_lock = NULL; + beentry->lw_want_start_time = (TimestampTz)0; } } diff --git a/src/gausskernel/storage/replication/walreceiverfuncs.cpp b/src/gausskernel/storage/replication/walreceiverfuncs.cpp index 9e47555a509cd2408ae738c338e92efdb0b9c5ca..34cee1850b56e10c1b25c46663d74994d9722a5f 100755 --- a/src/gausskernel/storage/replication/walreceiverfuncs.cpp +++ b/src/gausskernel/storage/replication/walreceiverfuncs.cpp @@ -526,6 +526,21 @@ void KillWalRcvWriter(void) } } +/* Set last valid record if walreceiver is requested by preparse thread. */ +void wal_rcv_set_last_record_by_preparse(bool for_preparse) +{ + if (!for_preparse || XLogRecPtrIsInvalid(g_instance.csn_barrier_cxt.latest_valid_record)) { + return; + } + + volatile WalRcvData *walrcv = t_thrd.walreceiverfuncs_cxt.WalRcv; + SpinLockAcquire(&walrcv->mutex); + walrcv->latestValidRecord = g_instance.csn_barrier_cxt.latest_valid_record; + walrcv->latestRecordCrc = g_instance.csn_barrier_cxt.latest_record_crc; + walrcv->latestRecordLen = g_instance.csn_barrier_cxt.latest_record_len; + SpinLockRelease(&walrcv->mutex); +} + /* * Stop walreceiver (if running) and wait for it to die. * Executed by the Startup process. @@ -602,7 +617,8 @@ void ShutdownWalRcv(void) * is a libpq connection string to use, and slotname is, optionally, the name * of a replication slot to acquire. */ -void RequestXLogStreaming(XLogRecPtr *recptr, const char *conninfo, ReplConnTarget conn_target, const char *slotname) +void RequestXLogStreaming(XLogRecPtr *recptr, const char *conninfo, ReplConnTarget conn_target, const char *slotname, + bool for_preparse) { if (IS_SHARED_STORAGE_STANDBY_CLUSTER_STANDBY_MODE) { ShareStorageXLogCtl *ctlInfo = g_instance.xlog_cxt.shareStorageXLogCtl; @@ -701,6 +717,7 @@ void RequestXLogStreaming(XLogRecPtr *recptr, const char *conninfo, ReplConnTarg walrcv->latestRecordCrc = latestRecordCrc; walrcv->latestRecordLen = latestRecordLen; SpinLockRelease(&walrcv->mutex); + wal_rcv_set_last_record_by_preparse(for_preparse); WalRcvSetPercentCountStartLsn(walrcv->latestValidRecord); if (XLByteLT(latestValidRecord, Lcrecptr)) ereport(LOG, (errmsg("latest valid record at %X/%X, wal receiver start point at %X/%X", diff --git a/src/include/access/parallel_recovery/dispatcher.h b/src/include/access/parallel_recovery/dispatcher.h index 11b01424c3c7a741967c58ea164c8d99539d7a7c..e0d7412dae092b4c75809f2b4fdb6d25976b457e 100644 --- a/src/include/access/parallel_recovery/dispatcher.h +++ b/src/include/access/parallel_recovery/dispatcher.h @@ -47,6 +47,52 @@ typedef struct _DispatchFix { XLogRecPtr lastCheckLsn; }DispatchFix; +typedef struct WalSampleVal { + uint32 totalCnt; + uint32 lastTotalCnt; +} WalSampleVal; + +typedef struct WalSampleStats { + RelFileNode rnode; + WalSampleVal walSampleVal; +} WalSampleStats; + +typedef struct Rnode2WorkerEntry { + RelFileNode rnode; + uint32 workerId; + uint32 count; +} Rnode2WorkerEntry; + +typedef struct RnodeInfo { + RelFileNode rnode; + uint32 inc; + bool isAssigned; + uint32 preWorkerId; +} RnodeInfo; + +typedef struct WalRebalanceVar { + HTAB* wal_recovery_sample_hashtbl; + HTAB* wal_recovery_dispatch_hashtbl; + slock_t dispatch_dyhash_lock; + RnodeInfo* rnode_sample_list; + + uint32 begin_worker_idx; + bool first_init_reassigned_worker; + XLogRecPtr last_lsn; + uint32 wal_sample_loop; + + uint32 re_assigned_times_step1; + uint32 re_assigned_times_step2; +} WalRebalanceVar; + +typedef struct DispatchStat { + char* worker_name; + ThreadId pid; + uint32 entry_num; + float4 percent; + char* detail; +} DispatchStat; + typedef struct LogDispatcher { MemoryContext oldCtx; PageRedoWorker** pageWorkers; /* Array of page redo workers. */ @@ -75,6 +121,8 @@ typedef struct LogDispatcher { XLogRedoNumStatics xlogStatics[RM_NEXT_ID][MAX_XLOG_INFO_NUM]; RedoTimeCost *startupTimeCost; DispatchFix dispatchFix; + + WalRebalanceVar rbVar; /* used by algorithm of dispatching wal to redo workers*/ } LogDispatcher; extern LogDispatcher* g_dispatcher; @@ -140,7 +188,7 @@ extern void CopyDataFromOldReader(XLogReaderState *newReaderState, XLogReaderSta bool TxnQueueIsEmpty(TxnRedoWorker* worker); void redo_get_worker_time_count(RedoWorkerTimeCountsInfo **workerCountInfoList, uint32 *realNum); - +void get_dispatch_stat_detail(DispatchStat **dispatch_stat, uint32 *realNum); } #endif diff --git a/src/include/access/parallel_recovery/txn_redo.h b/src/include/access/parallel_recovery/txn_redo.h index bfbd04b1ba3410500e7b64a041ef8f210d672d6f..4f83e0922fce9bd4cfb9d81c998927b689392933 100644 --- a/src/include/access/parallel_recovery/txn_redo.h +++ b/src/include/access/parallel_recovery/txn_redo.h @@ -38,5 +38,6 @@ void ApplyReadyTxnLogRecords(TxnRedoWorker* worker, bool forceAll); void MoveTxnItemToApplyQueue(TxnRedoWorker* worker); void DumpTxnWorker(TxnRedoWorker* txnWorker); bool IsTxnWorkerIdle(TxnRedoWorker* worker); +uint32 getPendingCount(TxnRedoWorker *worker); } #endif diff --git a/src/include/catalog/upgrade_sql/rollback_catalog_maindb/rollback-post_catalog_maindb_92_618.sql b/src/include/catalog/upgrade_sql/rollback_catalog_maindb/rollback-post_catalog_maindb_92_618.sql new file mode 100644 index 0000000000000000000000000000000000000000..aae0f9f6f26eb0967fa094b2fff61c12d6bf2a6c --- /dev/null +++ b/src/include/catalog/upgrade_sql/rollback_catalog_maindb/rollback-post_catalog_maindb_92_618.sql @@ -0,0 +1 @@ +DROP FUNCTION IF EXISTS pg_catalog.dispatch_stat_detail(); \ No newline at end of file diff --git a/src/include/catalog/upgrade_sql/rollback_catalog_maindb/rollback-post_catalog_maindb_92_865.sql b/src/include/catalog/upgrade_sql/rollback_catalog_maindb/rollback-post_catalog_maindb_92_865.sql new file mode 100644 index 0000000000000000000000000000000000000000..648e35fb63f18a75755cf96bd32ed28ca6b27bc6 --- /dev/null +++ b/src/include/catalog/upgrade_sql/rollback_catalog_maindb/rollback-post_catalog_maindb_92_865.sql @@ -0,0 +1,3 @@ +DROP FUNCTION IF EXISTS pg_catalog.dispatch_stat_detail(); + +DROP FUNCTION IF EXISTS pg_catalog.gs_lwlock_status() CASCADE; \ No newline at end of file diff --git a/src/include/catalog/upgrade_sql/rollback_catalog_otherdb/rollback-post_catalog_otherdb_92_618.sql b/src/include/catalog/upgrade_sql/rollback_catalog_otherdb/rollback-post_catalog_otherdb_92_618.sql new file mode 100644 index 0000000000000000000000000000000000000000..aae0f9f6f26eb0967fa094b2fff61c12d6bf2a6c --- /dev/null +++ b/src/include/catalog/upgrade_sql/rollback_catalog_otherdb/rollback-post_catalog_otherdb_92_618.sql @@ -0,0 +1 @@ +DROP FUNCTION IF EXISTS pg_catalog.dispatch_stat_detail(); \ No newline at end of file diff --git a/src/include/catalog/upgrade_sql/rollback_catalog_otherdb/rollback-post_catalog_otherdb_92_865.sql b/src/include/catalog/upgrade_sql/rollback_catalog_otherdb/rollback-post_catalog_otherdb_92_865.sql new file mode 100644 index 0000000000000000000000000000000000000000..648e35fb63f18a75755cf96bd32ed28ca6b27bc6 --- /dev/null +++ b/src/include/catalog/upgrade_sql/rollback_catalog_otherdb/rollback-post_catalog_otherdb_92_865.sql @@ -0,0 +1,3 @@ +DROP FUNCTION IF EXISTS pg_catalog.dispatch_stat_detail(); + +DROP FUNCTION IF EXISTS pg_catalog.gs_lwlock_status() CASCADE; \ No newline at end of file diff --git a/src/include/catalog/upgrade_sql/upgrade_catalog_maindb/upgrade-post_catalog_maindb_92_618.sql b/src/include/catalog/upgrade_sql/upgrade_catalog_maindb/upgrade-post_catalog_maindb_92_618.sql new file mode 100644 index 0000000000000000000000000000000000000000..a91c760f26aca44d81abbebaf1e49ca992a60e43 --- /dev/null +++ b/src/include/catalog/upgrade_sql/upgrade_catalog_maindb/upgrade-post_catalog_maindb_92_618.sql @@ -0,0 +1,10 @@ +DROP FUNCTION IF EXISTS pg_catalog.dispatch_stat_detail(); +SET LOCAL inplace_upgrade_next_system_object_oids=IUO_PROC, 4395; +CREATE FUNCTION pg_catalog.dispatch_stat_detail +( + OUT thread_name pg_catalog.text, + OUT pid pg_catalog.int8, + OUT pending_count pg_catalog.int4, + OUT ratio pg_catalog.float4, + OUT detail pg_catalog.text +) RETURNS SETOF record LANGUAGE INTERNAL STABLE as 'dispatch_stat_detail'; \ No newline at end of file diff --git a/src/include/catalog/upgrade_sql/upgrade_catalog_maindb/upgrade-post_catalog_maindb_92_865.sql b/src/include/catalog/upgrade_sql/upgrade_catalog_maindb/upgrade-post_catalog_maindb_92_865.sql new file mode 100644 index 0000000000000000000000000000000000000000..49fb9e970e36d65b552367c156d256b7851dab99 --- /dev/null +++ b/src/include/catalog/upgrade_sql/upgrade_catalog_maindb/upgrade-post_catalog_maindb_92_865.sql @@ -0,0 +1,26 @@ +DROP FUNCTION IF EXISTS pg_catalog.dispatch_stat_detail(); +SET LOCAL inplace_upgrade_next_system_object_oids=IUO_PROC, 4395; +CREATE FUNCTION pg_catalog.dispatch_stat_detail +( + OUT thread_name pg_catalog.text, + OUT pid pg_catalog.int8, + OUT pending_count pg_catalog.int4, + OUT ratio pg_catalog.float4, + OUT detail pg_catalog.text +) RETURNS SETOF record LANGUAGE INTERNAL STABLE as 'dispatch_stat_detail'; + +DROP FUNCTION IF EXISTS pg_catalog.gs_lwlock_status() CASCADE; +SET LOCAL inplace_upgrade_next_system_object_oids=IUO_PROC, 8888; +CREATE FUNCTION pg_catalog.gs_lwlock_status +( + OUT node_name pg_catalog.text, + OUT lock_name pg_catalog.text, + OUT lock_unique_id pg_catalog.int8, + OUT pid pg_catalog.int8, + OUT sessionid pg_catalog.int8, + OUT global_sessionid pg_catalog.text, + OUT mode pg_catalog.text, + OUT granted pg_catalog.bool, + OUT start_time pg_catalog.timestamptz +) RETURNS SETOF record LANGUAGE INTERNAL STABLE as 'gs_lwlock_status'; +comment on function pg_catalog.gs_lwlock_status() is 'View system lwlock information'; diff --git a/src/include/catalog/upgrade_sql/upgrade_catalog_otherdb/upgrade-post_catalog_otherdb_92_618.sql b/src/include/catalog/upgrade_sql/upgrade_catalog_otherdb/upgrade-post_catalog_otherdb_92_618.sql new file mode 100644 index 0000000000000000000000000000000000000000..a91c760f26aca44d81abbebaf1e49ca992a60e43 --- /dev/null +++ b/src/include/catalog/upgrade_sql/upgrade_catalog_otherdb/upgrade-post_catalog_otherdb_92_618.sql @@ -0,0 +1,10 @@ +DROP FUNCTION IF EXISTS pg_catalog.dispatch_stat_detail(); +SET LOCAL inplace_upgrade_next_system_object_oids=IUO_PROC, 4395; +CREATE FUNCTION pg_catalog.dispatch_stat_detail +( + OUT thread_name pg_catalog.text, + OUT pid pg_catalog.int8, + OUT pending_count pg_catalog.int4, + OUT ratio pg_catalog.float4, + OUT detail pg_catalog.text +) RETURNS SETOF record LANGUAGE INTERNAL STABLE as 'dispatch_stat_detail'; \ No newline at end of file diff --git a/src/include/catalog/upgrade_sql/upgrade_catalog_otherdb/upgrade-post_catalog_otherdb_92_865.sql b/src/include/catalog/upgrade_sql/upgrade_catalog_otherdb/upgrade-post_catalog_otherdb_92_865.sql new file mode 100644 index 0000000000000000000000000000000000000000..53c5e096706167b6463834a480c4a7eb1defac5e --- /dev/null +++ b/src/include/catalog/upgrade_sql/upgrade_catalog_otherdb/upgrade-post_catalog_otherdb_92_865.sql @@ -0,0 +1,26 @@ +DROP FUNCTION IF EXISTS pg_catalog.dispatch_stat_detail(); +SET LOCAL inplace_upgrade_next_system_object_oids=IUO_PROC, 4395; +CREATE FUNCTION pg_catalog.dispatch_stat_detail +( + OUT thread_name pg_catalog.text, + OUT pid pg_catalog.int8, + OUT pending_count pg_catalog.int4, + OUT ratio pg_catalog.float4, + OUT detail pg_catalog.text +) RETURNS SETOF record LANGUAGE INTERNAL STABLE as 'dispatch_stat_detail'; + +DROP FUNCTION IF EXISTS pg_catalog.gs_lwlock_status() CASCADE; +SET LOCAL inplace_upgrade_next_system_object_oids=IUO_PROC, 8888; +CREATE FUNCTION pg_catalog.gs_lwlock_status +( + OUT node_name pg_catalog.text, + OUT lock_name pg_catalog.text, + OUT lock_unique_id pg_catalog.int8, + OUT pid pg_catalog.int8, + OUT sessionid pg_catalog.int8, + OUT global_sessionid pg_catalog.text, + OUT mode pg_catalog.text, + OUT granted pg_catalog.bool, + OUT start_time pg_catalog.timestamptz +) RETURNS SETOF record LANGUAGE INTERNAL STABLE as 'gs_lwlock_status'; +comment on function pg_catalog.gs_lwlock_status() is 'View system lwlock information'; \ No newline at end of file diff --git a/src/include/distributelayer/streamCore.h b/src/include/distributelayer/streamCore.h index f356afd3c3603ac6af3b96f5cc98e87663b64e84..d899cbf01c0a347a9bf3bc30f2d6ff81093f9b7e 100755 --- a/src/include/distributelayer/streamCore.h +++ b/src/include/distributelayer/streamCore.h @@ -471,6 +471,9 @@ private: /* A flag to indicate stream enter the quit point. */ volatile int m_streamEnter; + /* A counter to indicate stream enter the quit point. */ + volatile int m_streamEnterCount; + /* Mutex and condition waiting for all thread in the node group is complete. */ pthread_mutex_t m_mutex; diff --git a/src/include/knl/knl_guc/knl_instance_attr_storage.h b/src/include/knl/knl_guc/knl_instance_attr_storage.h index 003a59bf9b63bd2467e266767cbdd8237ba44efc..d1e30bdc4963309da22deb448f0b389f8f555061 100755 --- a/src/include/knl/knl_guc/knl_instance_attr_storage.h +++ b/src/include/knl/knl_guc/knl_instance_attr_storage.h @@ -211,7 +211,8 @@ typedef struct knl_instance_attr_storage { #endif bool enable_huge_pages; int huge_page_size; + + int parallel_recovery_dispatch_algorithm; } knl_instance_attr_storage; #endif /* SRC_INCLUDE_KNL_KNL_INSTANCE_ATTR_STORAGE_H_ */ - diff --git a/src/include/knl/knl_instance.h b/src/include/knl/knl_instance.h index 1b93dd8660c30ca8aa17fe4c314b1cabcf5a2d9e..7db01e8cc09673ca340dd24b50a540b78fa64a1f 100755 --- a/src/include/knl/knl_instance.h +++ b/src/include/knl/knl_instance.h @@ -767,6 +767,8 @@ typedef struct knl_g_csn_barrier_context { struct HTAB* barrier_hash_table; LWLock* barrier_hashtbl_lock; char stopBarrierId[MAX_BARRIER_ID_LENGTH]; + bool pre_parse_started; + int max_run_time; MemoryContext barrier_context; } knl_g_csn_barrier_context; diff --git a/src/include/knl/knl_thread.h b/src/include/knl/knl_thread.h index c772e10d178f7c096dc51db007d45f9630212417..13a7e8c657b48abdf75ae1deee89133a15191220 100755 --- a/src/include/knl/knl_thread.h +++ b/src/include/knl/knl_thread.h @@ -2763,6 +2763,9 @@ typedef struct knl_t_storage_context { bool isSwitchoverLockHolder; int num_held_lwlocks; struct LWLockHandle* held_lwlocks; + TimestampTz* lwlock_held_times; + bool trace_lwlock_time; + int lock_addin_request; bool lock_addin_request_allowed; int counts_for_pid; diff --git a/src/include/mb/pg_wchar.h b/src/include/mb/pg_wchar.h index dadc6b8f9fb51d2ffd678e783b24eccfa51ac1c8..3103f559759ab37ae31c20d50f228387dc26830d 100644 --- a/src/include/mb/pg_wchar.h +++ b/src/include/mb/pg_wchar.h @@ -292,7 +292,7 @@ typedef struct pg_enc2name { #endif } pg_enc2name; -extern pg_enc2name pg_enc2name_tbl[]; +extern PGDLLIMPORT pg_enc2name pg_enc2name_tbl[]; /* * Encoding names for gettext @@ -441,9 +441,12 @@ extern int pg_char_and_wchar_strncmp(const char* s1, const pg_wchar* s2, size_t extern size_t pg_wchar_strlen(const pg_wchar* wstr); extern int pg_mblen(const char* mbstr); extern int pg_dsplen(const char* mbstr); +extern void pg_encoding_set_invalid(int encoding, char* dst); extern int pg_encoding_mblen(int encoding, const char* mbstr); extern int pg_encoding_dsplen(int encoding, const char* mbstr); extern int pg_encoding_verifymb(int encoding, const char* mbstr, int len); +extern int pg_encoding_verifymbstr(int encoding, const char* mbstr, int len); +extern int pg_encoding_verifymbchar(int encoding, const char* mbstr, int len); extern int pg_mule_mblen(const unsigned char* mbstr); extern int pg_mic_mblen(const unsigned char* mbstr); extern int pg_mbstrlen(const char* mbstr); diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 4eb62d40accf2e706735e46c8d949fa4fb0f4849..84da7103b1074c600390e6e2f9b6e8e9f0e1d972 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -1645,10 +1645,14 @@ typedef struct PgBackendStatus { int lw_count; /* lwlock object now requiring */ LWLock* lw_want_lock; + LWLockMode lw_want_mode; + TimestampTz lw_want_start_time; /* all lwlocks held by this thread */ int* lw_held_num; /* point to num_held_lwlocks */ void* lw_held_locks; /* point to held_lwlocks[] */ + void* lw_held_times; /* point to lwlock_held_times[] */ + volatile bool st_lw_access_flag; /* valid flag */ volatile bool st_lw_is_cleanning_flag; /* is cleanning lw ptr */ diff --git a/src/include/postmaster/barrier_preparse.h b/src/include/postmaster/barrier_preparse.h index 4dd96af156b6bcf54ef8ed1aa81f2655faf5c010..73a55fbe87a385d8598f580f0406ca2970ab5769 100644 --- a/src/include/postmaster/barrier_preparse.h +++ b/src/include/postmaster/barrier_preparse.h @@ -27,6 +27,7 @@ #define BARRIER_PREPARSE_H #define INIBARRIERCACHESIZE 100 +#define WAL_PREPARSER_SIGNAL_FILE "preparse" #define IS_BARRIER_HASH_INIT \ (g_instance.csn_barrier_cxt.barrier_hash_table != NULL && \ diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 26d889f5d82a2018eef516dcddaff926109266ac..ff09d8df249fabf0bf0c40dbc14aed92af2ea6ab 100755 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -282,8 +282,8 @@ extern void ShutdownWalRcv(void); extern bool WalRcvInProgress(void); extern bool WalRcvIsRunning(void); extern void connect_dn_str(char* conninfo, int replIndex); -extern void RequestXLogStreaming( - XLogRecPtr* recptr, const char* conninfo, ReplConnTarget conn_target, const char* slotname); +extern void RequestXLogStreaming(XLogRecPtr* recptr, const char* conninfo, ReplConnTarget conn_target, + const char* slotname, bool for_preparse = false); extern StringInfo get_rcv_slot_name(void); extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr* latestChunkStart); extern XLogRecPtr GetWalStartPtr(); diff --git a/src/include/storage/lock/lwlock.h b/src/include/storage/lock/lwlock.h index 86dcf187a87614fcbe946304f4b19b9816f0169d..650d213be57e22994c78fd85ad10d597bc4a3d48 100644 --- a/src/include/storage/lock/lwlock.h +++ b/src/include/storage/lock/lwlock.h @@ -431,6 +431,7 @@ extern void wakeup_victim(LWLock *lock, ThreadId victim_tid); extern int *get_held_lwlocks_num(void); extern uint32 get_held_lwlocks_maxnum(void); extern void* get_held_lwlocks(void); +extern void* get_lwlock_held_times(void); extern void copy_held_lwlocks(void* heldlocks, lwlock_id_mode* dst, int num_heldlocks); extern const char* GetLWLockIdentifier(uint32 classId, uint16 eventId); extern LWLockMode GetHeldLWLockMode(LWLock* lock); diff --git a/src/include/storage/lock/lwlock_be.h b/src/include/storage/lock/lwlock_be.h index 31b18e9b207ddf530b1f415c5fda0cc5c0b6877a..80e4bcb7fa44de19721399dfb1518d35bc2e419c 100644 --- a/src/include/storage/lock/lwlock_be.h +++ b/src/include/storage/lock/lwlock_be.h @@ -29,7 +29,7 @@ #include "knl/knl_variable.h" #include "storage/lock/lwlock.h" -extern void remember_lwlock_acquire(LWLock* lockid); +extern void remember_lwlock_acquire(LWLock* lockid, LWLockMode mode); extern void forget_lwlock_acquire(void); #endif // SRC_INCLUDE_STORAGE_LWLOCK_BE_H \ No newline at end of file diff --git a/src/include/utils/be_module.h b/src/include/utils/be_module.h index a2a0f3fee891c38e3ced91b5e8134e6833306cbd..bea5d60e743e9b9421d6e2e7675511ae3fede74b 100755 --- a/src/include/utils/be_module.h +++ b/src/include/utils/be_module.h @@ -149,6 +149,7 @@ enum ModuleId { MOD_GPI, /* debug info for global partition index */ MOD_PARTITION, MOD_SRF, /* debug info for SRF */ + MOD_LWLOCK, /* debug info for lwlock held longer than a threshold*/ /* * Add your module id above. diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h index 7eb7d52c3c3b835b563c668e9bdc2d9ce5886b17..abd24e030daa72ebda5f147eded6031a4c7f39e3 100644 --- a/src/include/utils/builtins.h +++ b/src/include/utils/builtins.h @@ -1480,6 +1480,7 @@ extern Datum pg_try_advisory_xact_lock_shared_int4(PG_FUNCTION_ARGS); extern Datum pg_advisory_unlock_int4(PG_FUNCTION_ARGS); extern Datum pg_advisory_unlock_shared_int4(PG_FUNCTION_ARGS); extern Datum pg_advisory_unlock_all(PG_FUNCTION_ARGS); +extern Datum gs_lwlock_status(PG_FUNCTION_ARGS); /* pgstatfuncs.cpp */ extern Datum gs_stack(PG_FUNCTION_ARGS); diff --git a/src/test/ha/ha_schedule_multi_cascade b/src/test/ha/ha_schedule_multi_cascade index 3c890c3588572d655487cef20a7c579d2af14cda..f014bd7d4941c4ede8e2e26f904d3ce412347ab7 100644 --- a/src/test/ha/ha_schedule_multi_cascade +++ b/src/test/ha/ha_schedule_multi_cascade @@ -3,3 +3,4 @@ cascade/failover_with_data cascade/switchover cascade/switchover_timeout cascade/inc_build_failover +cascade/startwalrcv \ No newline at end of file diff --git a/src/test/ha/standby_env.sh b/src/test/ha/standby_env.sh index 6ff7654382c08c637a483c2431333a1fcc17e297..48d4ee8de51664f33022b9acd80f79c91f82bd1c 100644 --- a/src/test/ha/standby_env.sh +++ b/src/test/ha/standby_env.sh @@ -2,7 +2,11 @@ #some enviroment vars export g_base_port=8888 -export prefix=${GAUSSHOME} +# Remove the trailing slash from the path: +while [ "${GAUSSHOME: -1}" == "/" ]; do + GAUSSHOME="${GAUSSHOME%/}" +done +export prefix=${PREFIX_HOME} export g_pooler_base_port=`expr $g_base_port \+ 410` export g_base_standby_port=`expr $g_base_port \+ 400` export install_path="$prefix" diff --git a/src/test/ha/testcase/cascade/startwalrcv.sh b/src/test/ha/testcase/cascade/startwalrcv.sh new file mode 100644 index 0000000000000000000000000000000000000000..208a00e5579fbf3e2315452a3689b0f10d82f5e0 --- /dev/null +++ b/src/test/ha/testcase/cascade/startwalrcv.sh @@ -0,0 +1,49 @@ +#!/bin/sh + +source ./util.sh + +function test_1() +{ + set_cascade_default + kill_cascade_cluster + start_cascade_cluster + + receiver_keyword="receiver_replay_location" + echo "query primary" + if [ $(query_primary | grep $receiver_keyword | wc -l) -ne 1 ]; then + echo "$failed_keyword when query primary" + fi + + gsql -d $db -p $dn1_primary_port -c "DROP TABLE if exists testapplydelay;" + gs_guc reload -D $standby_data_dir -c "recovery_min_apply_delay=900s" + + gsql -d $db -p $dn1_primary_port -c "start transaction; + create table testapplydelay(id integer); + insert into testapplydelay values(1); + commit;" + + kill_primary + echo "primary killed" + sleep 60 + start_primary + echo "primary started" + if [ $(query_primary | grep $receiver_keyword | wc -l) -ne 0 ]; then + echo "$failed_keyword when query primary" + fi + + echo "startwalrcv standby" + startwalrcv_standby + if [ $(query_primary | grep $receiver_keyword | wc -l) -ne 1 ]; then + echo "$failed_keyword when query primary" + fi +} + +function tear_down() +{ + sleep 1 + gs_guc reload -D $standby_data_dir -c "recovery_min_apply_delay=0" + gsql -d $db -p $dn1_primary_port -c "DROP TABLE if exists testapplydelay;" +} + +test_1 +tear_down diff --git a/src/test/ha/util.sh b/src/test/ha/util.sh index 3f832dbd71ec81ebcb5bbf3e1bc7a441f1014d79..a4cd1e52ae36d9609afb969c227df5a8df71d2fa 100644 --- a/src/test/ha/util.sh +++ b/src/test/ha/util.sh @@ -571,3 +571,8 @@ function stop_streaming_cluster(){ stop_standby3 stop_standby4 } + +function startwalrcv_standby() { + echo "start walrcv" + gs_ctl startwalrcv -D $data_dir/datanode1_standby +} \ No newline at end of file diff --git a/src/test/regress/expected/test_b_format_collate.out b/src/test/regress/expected/test_b_format_collate.out index dc67f4406804b1c206f9be9711e917c94be245ff..1e18b61c5d86dbc87ee328b19a1b3cfb6bb7d3ca 100644 --- a/src/test/regress/expected/test_b_format_collate.out +++ b/src/test/regress/expected/test_b_format_collate.out @@ -2556,3 +2556,18 @@ clean connection to all force for database test_collate_A; clean connection to all force for database test_collate_B; DROP DATABASE IF EXISTS test_collate_A; DROP DATABASE IF EXISTS test_collate_B; +create database test_collate_B dbcompatibility 'B' encoding 'SQL_ASCII' LC_COLLATE='C' LC_CTYPE='C'; +\c test_collate_B +set b_format_behavior_compat_options = 'enable_multi_charset'; +CREATE TABLE db_proc_invoke_log ( + id character varying(36) CHARACTER SET "UTF8" COLLATE utf8mb4_general_ci NOT NULL, + trace_id character varying(36) CHARACTER SET "UTF8" COLLATE utf8mb4_general_ci DEFAULT ''::character varying NOT NULL, + log_content text CHARACTER SET "UTF8" COLLATE utf8mb4_general_ci, + create_time timestamp(0) with time zone DEFAULT (pg_systimestamp())::timestamp(0) with time zone NOT NULL +) +CHARACTER SET = "UTF8" COLLATE = "utf8mb4_general_ci" +WITH (orientation=row, compression=no); +insert into db_proc_invoke_log ("id","log_content") values ('dsadassdadas00000sadasda', 'dsadwqedwqedsada'); +\c regression +clean connection to all force for database test_collate_B; +DROP DATABASE IF EXISTS test_collate_B; diff --git a/src/test/regress/output/recovery_2pc_tools.source b/src/test/regress/output/recovery_2pc_tools.source index a8ded9c66e6e8c3d88b45ec041a5642cee67d471..8bf79757cdc97186462a3d11713b18598c5e3525 100644 --- a/src/test/regress/output/recovery_2pc_tools.source +++ b/src/test/regress/output/recovery_2pc_tools.source @@ -508,6 +508,7 @@ select name,vartype,unit,min_val,max_val from pg_settings where name <> 'qunit_c opfusion_debug_mode | enum | | | pagewriter_sleep | integer | ms | 0 | 3600000 pagewriter_thread_num | integer | | 1 | 16 + parallel_recovery_dispatch_algorithm | integer | | 1 | 2 partition_iterator_elimination | bool | | | partition_lock_upgrade_timeout | integer | | -1 | 3000 partition_max_cache_size | integer | kB | 4096 | 1073741823 diff --git a/src/test/regress/sql/test_b_format_collate.sql b/src/test/regress/sql/test_b_format_collate.sql index 3c30dfcabec1ab3586f42d195c2fb60462c432bc..66085fefb343772c8cb4eb0b1fc11a1fb2fe938e 100644 --- a/src/test/regress/sql/test_b_format_collate.sql +++ b/src/test/regress/sql/test_b_format_collate.sql @@ -617,4 +617,21 @@ select substr('中文中文', 2); clean connection to all force for database test_collate_A; clean connection to all force for database test_collate_B; DROP DATABASE IF EXISTS test_collate_A; -DROP DATABASE IF EXISTS test_collate_B; \ No newline at end of file +DROP DATABASE IF EXISTS test_collate_B; + +create database test_collate_B dbcompatibility 'B' encoding 'SQL_ASCII' LC_COLLATE='C' LC_CTYPE='C'; +\c test_collate_B +set b_format_behavior_compat_options = 'enable_multi_charset'; +CREATE TABLE db_proc_invoke_log ( + id character varying(36) CHARACTER SET "UTF8" COLLATE utf8mb4_general_ci NOT NULL, + trace_id character varying(36) CHARACTER SET "UTF8" COLLATE utf8mb4_general_ci DEFAULT ''::character varying NOT NULL, + log_content text CHARACTER SET "UTF8" COLLATE utf8mb4_general_ci, + create_time timestamp(0) with time zone DEFAULT (pg_systimestamp())::timestamp(0) with time zone NOT NULL +) +CHARACTER SET = "UTF8" COLLATE = "utf8mb4_general_ci" +WITH (orientation=row, compression=no); + +insert into db_proc_invoke_log ("id","log_content") values ('dsadassdadas00000sadasda', 'dsadwqedwqedsada'); +\c regression +clean connection to all force for database test_collate_B; +DROP DATABASE IF EXISTS test_collate_B;