diff --git a/script/gs_expansion b/script/gs_expansion index 635b680747808cf04807bd334c1af8acf80b2e53..f256e94ff574458d8ef9cbd54c87e3a3aaf00db7 100644 --- a/script/gs_expansion +++ b/script/gs_expansion @@ -260,17 +260,21 @@ General options: """ if hostList == None: hostList = self.nodeNameList + gpHome = DefaultValue.getEnv("GPHOME") + psshPath = "python3 %s/script/gspylib/pssh/bin/pssh" % gpHome rootSSHExceptionHosts = [] individualSSHExceptionHosts = [] for host in hostList: # check root's trust - checkRootTrustCmd = "ssh %s -o ConnectTimeout=10 \"pwd\"" % host + checkRootTrustCmd = "%s -s -H %s 'pwd'" % (psshPath, host) + # (status, output) = subprocess.getstatusoutput(checkRootTrustCmd) (status, output) = subprocess.getstatusoutput(checkRootTrustCmd) if status != 0: rootSSHExceptionHosts.append(host) # check individual user's trust - checkUserTrustCmd = "su - %s -c 'ssh %s -o " \ - "ConnectTimeout=10 \"pwd\"'" % (self.user, host) + checkUserTrustCmd = "su - %s -c '%s -s -H %s pwd'" % ( + self.user, psshPath, host) + # (status, output) = subprocess.getstatusoutput(checkUserTrustCmd) (status, output) = subprocess.getstatusoutput(checkUserTrustCmd) if status != 0: individualSSHExceptionHosts.append(host) diff --git a/script/impl/expansion/ExpansionImpl.py b/script/impl/expansion/ExpansionImpl.py index 379b571ae15ddc5877182fe3107ece5753f88a29..331c0ac88a468aae5a33b0e2f7d18d74c6c3db66 100644 --- a/script/impl/expansion/ExpansionImpl.py +++ b/script/impl/expansion/ExpansionImpl.py @@ -87,9 +87,6 @@ class ExpansionImpl(): for newHost in self.context.newHostList: self.expansionSuccess[newHost] = False self.logger = self.context.logger - self.walKeepSegments = -1 - self.walKeepSegmentsChanged = False - self.needRollback = False envFile = DefaultValue.getEnv("MPPDB_ENV_SEPARATE_PATH") if envFile: @@ -105,33 +102,57 @@ class ExpansionImpl(): self.commonGsCtl = GsCtlCommon(expansion) self.tempFileDir = "/tmp/gs_expansion_%s" % (currentTime) self.logger.debug("tmp expansion dir is %s ." % self.tempFileDir) + # primary's wal_keep_segments value + self.walKeepSegments = -1 self._finalizer = weakref.finalize(self, self.final) globals()["paramiko"] = __import__("paramiko") + def queryPrimaryWalKeepSegments(self): + """ + query primary's wal_keep_segments, when current user is root + """ + primaryHostName = self.getPrimaryHostName() + primaryHostIp = self.context.clusterInfoDict[primaryHostName]["backIp"] + primaryDataNode = self.context.clusterInfoDict[primaryHostName]["dataNode"] + status, walKeepSegments = self.commonGsCtl.queryGucParaValue( + primaryHostIp, self.envFile, primaryDataNode, "wal_keep_segments", self.user) + if status != DefaultValue.SUCCESS: + GaussLog.exitWithError(ErrorCode.GAUSS_500["GAUSS_50021"] % "wal_keep_segments") + return walKeepSegments + + def rollbackPrimaryWalKeepSegments(self): + """ + rollback primary's wal_keep_segments, when current user is root + """ + self.logger.debug("Start to rollback primary's wal_keep_segments") + primary = self.getPrimaryHostName() + primaryDataNode = self.context.clusterInfoDict[primary]["dataNode"] + status = self.commonGsCtl.setGucPara(primary, self.envFile, primaryDataNode, + "wal_keep_segments", self.walKeepSegments, self.user) + if status != DefaultValue.SUCCESS: + self.logger.log("Failed to rollback wal_keep_segments, please manually " + "set it to original value %s." % self.walKeepSegments) + else: + self.reloadPrimaryConf(self.user) + def final(self): """ function: - 1. Make sure primary's wal_keep_segments is restored to its original value, + 1. Make sure primary's wal_keep_segments is restored to its + original value if it has been changed, 2. rollback, 3. clear temp file input : NA output: NA """ - if self.walKeepSegmentsChanged: - self.logger.debug("Start to rollback primary's wal_keep_segments") - primary = self.getPrimaryHostName() - primaryDataNode = self.context.clusterInfoDict[primary]["dataNode"] - status = self.commonGsCtl.setGucPara(primary, self.envFile, primaryDataNode, - "wal_keep_segments", self.walKeepSegments) - if status != DefaultValue.SUCCESS: - self.logger.log("Failed to rollback wal_keep_segments, please manually " - "set it to original value %s." % self.walKeepSegments) - else: - self.reloadPrimaryConf() - if self.needRollback: - self.rollback() + if self.walKeepSegments != -1: + currentWalKeepSegments = self.queryPrimaryWalKeepSegments() + if currentWalKeepSegments != "NULL" \ + and self.walKeepSegments != int(currentWalKeepSegments): + self.rollbackPrimaryWalKeepSegments() + self.rollback() self.clearTmpFile() def sendSoftToHosts(self): @@ -144,7 +165,7 @@ class ExpansionImpl(): for host in self.context.newHostList: sshTool = SshTool([host], timeout = 300) # mkdir package dir and send package to remote nodes. - sshTool.executeCommand("mkdir -p %s" % srcFile , "", + sshTool.executeCommand("umask 0022;mkdir -p %s" % srcFile , "", DefaultValue.SUCCESS, [host]) sshTool.scpFiles(srcFile, targetDir, [host]) self.cleanSshToolFile(sshTool) @@ -438,7 +459,7 @@ class ExpansionImpl(): self.buildStandbyHosts() self.generateClusterStaticFile() - def getExistingHosts(self): + def getExistingHosts(self, isRootUser=True): """ get the exiting hosts """ @@ -446,11 +467,12 @@ class ExpansionImpl(): primaryHost = self.getPrimaryHostName() command = "" if DefaultValue.getEnv("MPPDB_ENV_SEPARATE_PATH"): - command = "su - %s -c 'source %s;gs_om -t status --detail'" % \ - (self.user, self.envFile) + command = "source %s;gs_om -t status --detail" % self.envFile else: - command = "su - %s -c 'source /etc/profile;source %s;"\ - "gs_om -t status --detail'" % (self.user, self.envFile) + command = "source /etc/profile;source %s;"\ + "gs_om -t status --detail" % self.envFile + if isRootUser: + command = "su - %s -c '%s'" % (self.user, command) self.logger.debug(command) sshTool = SshTool([primaryHost]) resultMap, outputCollect = sshTool.getSshStatusOutput(command, @@ -566,12 +588,17 @@ gs_guc set -D {dn} -c "available_zone='{azName}'" self.context.createGrpcCa(needGRPCHosts) self.logger.debug("End to generate GRPC cert.") - def reloadPrimaryConf(self): + def reloadPrimaryConf(self, user=""): """ """ primaryHost = self.getPrimaryHostName() dataNode = self.context.clusterInfoDict[primaryHost]["dataNode"] - command = "gs_ctl reload -D %s " % dataNode + command = "" + if user: + command = "su - %s -c 'source %s;gs_ctl reload -D %s'" % \ + (user, self.envFile, dataNode) + else: + command = "gs_ctl reload -D %s " % dataNode sshTool = SshTool([primaryHost]) self.logger.debug(command) resultMap, outputCollect = sshTool.getSshStatusOutput(command, @@ -596,33 +623,23 @@ gs_guc set -D {dn} -c "available_zone='{azName}'" stop the new standby host`s database and build it as standby mode """ self.logger.debug("Start to build new nodes.") - self.needRollback = True standbyHosts = self.context.newHostList hostAzNameMap = self.context.hostAzNameMap primaryHostName = self.getPrimaryHostName() primaryHost = self.context.clusterInfoDict[primaryHostName]["backIp"] existingStandbys = list(set(self.existingHosts).difference(set([primaryHost]))) primaryDataNode = self.context.clusterInfoDict[primaryHostName]["dataNode"] - status, self.walKeepSegments = self.commonGsCtl.queryGucParaValue( - primaryHost, self.envFile, primaryDataNode, "wal_keep_segments") - if status != DefaultValue.SUCCESS: - for host in standbyHosts: - self.expansionSuccess[host] = False - GaussLog.exitWithError(ErrorCode.GAUSS_500["GAUSS_50021"] % "wal_keep_segments") + walKeepSegmentsChanged = False status, synchronous_commit = self.commonGsCtl.queryGucParaValue( primaryHost, self.envFile, primaryDataNode, "synchronous_commit") if status != DefaultValue.SUCCESS: - for host in standbyHosts: - self.expansionSuccess[host] = False GaussLog.exitWithError(ErrorCode.GAUSS_500["GAUSS_50021"] % "synchronous_commit") - if synchronous_commit != "on" and int(self.walKeepSegments) < 1024: + if synchronous_commit == "off" and self.walKeepSegments < 1024: status = self.commonGsCtl.setGucPara(primaryHost, self.envFile, primaryDataNode, "wal_keep_segments", 1024) if status != DefaultValue.SUCCESS: - for host in standbyHosts: - self.expansionSuccess[host] = False GaussLog.exitWithError(ErrorCode.GAUSS_500["GAUSS_50007"] % "wal_keep_segments") - self.walKeepSegmentsChanged = True + walKeepSegmentsChanged = True self.reloadPrimaryConf() time.sleep(10) insType, dbState = self.commonGsCtl.queryInstanceStatus( @@ -635,8 +652,6 @@ gs_guc set -D {dn} -c "available_zone='{azName}'" primaryExceptionInfo = ErrorCode.GAUSS_357["GAUSS_35709"] % \ ("db_state", "primary", "Normal") if primaryExceptionInfo != "": - for host in standbyHosts: - self.expansionSuccess[host] = False GaussLog.exitWithError(primaryExceptionInfo) waitChars = ["\\", "|", "/", "-"] for host in standbyHosts: @@ -746,14 +761,12 @@ gs_guc set -D {dn} -c "available_zone='{azName}'" else: self.expansionSuccess[host] = False self.logger.log("\rBuild %s %s failed." % (hostRole, host)) - if self.walKeepSegmentsChanged: + if walKeepSegmentsChanged: self.logger.debug("Start to rollback primary's wal_keep_segments") status = self.commonGsCtl.setGucPara(primaryHost, self.envFile, primaryDataNode, "wal_keep_segments", self.walKeepSegments) if status != DefaultValue.SUCCESS: self.logger.debug(ErrorCode.GAUSS_500["GAUSS_50007"] % "wal_keep_segments") - else: - self.walKeepSegmentsChanged = False self.reloadPrimaryConf() if self._isAllFailed(): GaussLog.exitWithError(ErrorCode.GAUSS_357["GAUSS_35706"] % "build") @@ -814,8 +827,6 @@ gs_guc set -D {dn} -c "available_zone='{azName}'" self.context.clusterInfo.saveToStaticConfig(staticConfigPath, dbNode.id) srcFile = staticConfigPath if not os.path.exists(srcFile): - for host in self.context.newHostList: - self.expansionSuccess[host] = False GaussLog.exitWithError(ErrorCode.GAUSS_357["GAUSS_35710"] % srcFile) hostSsh = SshTool([hostName]) targetFile = "%s/bin/cluster_static_config" % appPath @@ -828,8 +839,9 @@ gs_guc set -D {dn} -c "available_zone='{azName}'" self.logger.log("End to generate and send cluster static file.\n") self.logger.log("Expansion results:") + self.getExistingHosts(False) for newHost in self.context.newHostList: - if self.expansionSuccess[newHost]: + if newHost in self.existingHosts: self.logger.log("%s:\tSuccess" % newHost) else: self.logger.log("%s:\tFailed" % newHost) @@ -1130,7 +1142,11 @@ remoteservice={remoteservice}'" GaussLog.exitWithError(ErrorCode.GAUSS_516["GAUSS_51600"]) self.logger.debug("The primary database is normal.\n") - + currentWalKeepSegments = self.queryPrimaryWalKeepSegments() + if currentWalKeepSegments != "NULL": + self.walKeepSegments = int(currentWalKeepSegments) + else: + self.walKeepSegments = 16 def _adjustOrderOfNewHostList(self): """ @@ -1283,13 +1299,8 @@ remoteservice={remoteservice}'" """ rollback all hosts' replconninfo about failed hosts """ - existingHosts = self.existingHosts - failedHosts = [] - for host in self.expansionSuccess: - if self.expansionSuccess[host]: - existingHosts.append(host) - else: - failedHosts.append(host) + self.getExistingHosts() + failedHosts = list(set(self.context.newHostList) - set(self.existingHosts)) clusterInfoDict = self.context.clusterInfoDict for failedHost in failedHosts: # rollback GRPC cert on failed hosts @@ -1306,7 +1317,7 @@ remoteservice={remoteservice}'" sshTool.getSshStatusOutput(removeGRPCCertCmd, [host], self.envFile) self.cleanSshToolFile(sshTool) self.logger.debug("Start to rollback replconninfo about %s" % failedHost) - for host in existingHosts: + for host in self.existingHosts: hostName = self.context.backIpNameMap[host] dataNode = clusterInfoDict[hostName]["dataNode"] confFile = os.path.join(dataNode, "postgresql.conf") @@ -1322,10 +1333,12 @@ remoteservice={remoteservice}'" self.logger.debug("[%s] rollbackPg_hbaCmd:%s" % (host, rollbackPg_hbaCmd)) sshTool.getSshStatusOutput(rollbackPg_hbaCmd, [host]) - reloadGUCCommand = "source %s ; gs_ctl reload -D %s " % \ - (self.envFile, dataNode) + reloadGUCCommand = "su - %s -c 'source %s; gs_ctl reload " \ + "-D %s'" % (self.user, self.envFile, dataNode) + self.logger.debug(reloadGUCCommand) resultMap, outputCollect = sshTool.getSshStatusOutput( reloadGUCCommand, [host], self.envFile) + self.logger.debug(resultMap) self.logger.debug(outputCollect) self.cleanSshToolFile(sshTool) @@ -1450,37 +1463,45 @@ class GsCtlCommon: self.cleanSshToolTmpFile(sshTool) return outputCollect - def queryGucParaValue(self, host, env, datanode, para): + def queryGucParaValue(self, host, env, datanode, para, user=""): """ query guc parameter value """ - status = DefaultValue.FAILURE value = "" - command = "source %s; gs_guc check -D %s -c \"%s\"" % \ - (env, datanode, para) + command = "" + if user: + command = "su - %s -c 'source %s; gs_guc check -D %s -c \"%s\"'" % \ + (user, env, datanode, para) + else: + command = "source %s; gs_guc check -D %s -c \"%s\"" % \ + (env, datanode, para) sshTool = SshTool([host]) resultMap, outputCollect = sshTool.getSshStatusOutput( command, [host], env) self.logger.debug(host) self.logger.debug(outputCollect) if resultMap[host] == STATUS_FAIL: - return status, "" + return resultMap[host], "" self.cleanSshToolTmpFile(sshTool) paraPattern = re.compile(" %s=(.+)" % para) value = paraPattern.findall(outputCollect) if len(value) != 0: - status = resultMap[host] value = value[0] else: - return status, "" - return status, value + value = "NULL" + return resultMap[host], value - def setGucPara(self, host, env, datanode, para, value): + def setGucPara(self, host, env, datanode, para, value, user=""): """ set guc parameter """ - command = "source %s; gs_guc set -D %s -c \"%s=%s\"" % \ - (env, datanode, para, value) + command = "" + if not user: + command = "source %s; gs_guc set -D %s -c \"%s=%s\"" % \ + (env, datanode, para, value) + else: + command = "su - %s -c 'source %s; gs_guc set -D %s -c \"%s=%s\"'" % \ + (user, env, datanode, para, value) sshTool = SshTool([host]) resultMap, outputCollect = sshTool.getSshStatusOutput( command, [host], env)