From 1812fc0ecfb186ed9b584d3608640b1e96e09330 Mon Sep 17 00:00:00 2001 From: zhang_xubo <2578876417@qq.com> Date: Mon, 26 Aug 2024 21:35:47 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E5=8D=87=E7=BA=A7=E6=97=B6=E5=80=99cm?= =?UTF-8?q?=E5=92=8Cdb=E5=88=86=E5=BC=80=E6=89=A7=E8=A1=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- script/gs_om | 19 +- script/gspylib/common/OMCommand.py | 4 +- script/gspylib/common/ParameterParsecheck.py | 10 +- script/impl/om/OLAP/OmImplOLAP.py | 53 ++++- script/impl/upgrade/UpgradeImpl.py | 34 ++- script/local/LocalCmOpt.py | 211 +++++++++++++++++++ script/local/UpgradeUtility.py | 41 +--- 7 files changed, 318 insertions(+), 54 deletions(-) create mode 100644 script/local/LocalCmOpt.py diff --git a/script/gs_om b/script/gs_om index 06139dd3..a87614ca 100644 --- a/script/gs_om +++ b/script/gs_om @@ -115,6 +115,9 @@ class CmdOptions(): # generate_xml self.add_hostips = "" self.add_hostnames = "" + + # operate cm + self.component = "" ########################################### @@ -139,11 +142,12 @@ Usage: gs_om -V | --version OLAP scene: gs_om -t start [-h HOSTNAME] [-D dataDir] [--time-out=SECS] - [--security-mode=MODE] [--cluster-number=None] [-l LOGFILE] + [--security-mode=MODE] [--cluster-number=None] + [--component=CM] [-l LOGFILE] gs_om -t stop [-h HOSTNAME] [-D dataDir] [--time-out=SECS] [-m MODE] - [-l LOGFILE] + [-l LOGFILE] [--component=CM] gs_om -t restart [-h HOSTNAME] [-D dataDir] [--time-out=SECS] - [--security-mode=MODE] [-l LOGFILE] [-m MODE] + [--security-mode=MODE] [--component=CM] [-l LOGFILE] [-m MODE] gs_om -t status [-h HOSTNAME] [-o OUTPUT] [--detail] [--all] [--az=AZ] [-l LOGFILE] [--time-out=SECS] gs_om -t generateconf -X XMLFILE [--distribute] [-l LOGFILE] @@ -472,6 +476,13 @@ Install options: self.g_opts.add_hostnames = ParaDict.get("add_hostname") if (ParaDict.__contains__("add_hostip")): self.g_opts.add_hostips = ParaDict.get("add_hostip") + + def parse_component(self, ParaDict): + support_comp_list = [ "CM", "DN" ] + if (ParaDict.__contains__("component")): + self.g_opts.component = ParaDict.get("component") + if self.g_opts.component not in support_comp_list: + GaussLog.exitWithError(f"--component only support {support_comp_list}") def parseCommandLine(self): """ @@ -524,6 +535,8 @@ Install options: self.parseAZ(ParaDict) # Parse generate xml parameter self.parse_generate_xml(ParaDict) + # Parse operation component + self.parse_component(ParaDict) ########################################################################### # Check parameters for all operations diff --git a/script/gspylib/common/OMCommand.py b/script/gspylib/common/OMCommand.py index 050d4dbe..fd98dc9e 100644 --- a/script/gspylib/common/OMCommand.py +++ b/script/gspylib/common/OMCommand.py @@ -135,7 +135,9 @@ class OMCommand(): "Local_Upgrade_Utility": os.path.normpath( Current_Path + "/../../local/UpgradeUtility.py"), "Local_Upgrade_CM": os.path.normpath( - Current_Path + "/../../local/upgrade_cm_utility.py") + Current_Path + "/../../local/upgrade_cm_utility.py"), + "Local_Operate_CM": os.path.normpath( + Current_Path + "/../../local/LocalCmOpt.py") } return "python3 '%s'" % LocalScript[script] diff --git a/script/gspylib/common/ParameterParsecheck.py b/script/gspylib/common/ParameterParsecheck.py index 8d97b96d..0e98d873 100644 --- a/script/gspylib/common/ParameterParsecheck.py +++ b/script/gspylib/common/ParameterParsecheck.py @@ -108,12 +108,13 @@ gs_dropnode = ["-?", "--help", "-V", "--version", "-U:", "-G:", # gs_om child branch gs_om_start = ["-t:", "-?", "--help", "-V", "--version", "-h:", "-I:", "--time-out=", "--az=", "-l:", "--nodeId=", "-D:", - "--security-mode=", "--cluster-number="] + "--security-mode=", "--cluster-number=", "--component="] gs_om_stop = ["-t:", "-?", "--help", "-V", "--version", "-h:", "-I:", "-m:", - "--az=", "-l:", "--mode=", "--nodeId=", "--time-out=", "-D:"] + "--az=", "-l:", "--mode=", "--nodeId=", "--time-out=", "-D:", + "--component="] gs_om_restart = ["-t:", "-?", "--help", "-V", "--version", "-h:", "-I:", "--time-out=", "--az=", "-l:", "--nodeId=", "-D:", - "--security-mode=", "--mode=", "-m:"] + "--security-mode=", "--mode=", "-m:", "--component="] gs_om_view = ["-t:", "-?", "--help", "-V", "--version", "-o:", "-l:", "--dynamic"] gs_om_query = ["-t:", "-?", "--help", "-V", "--version", "-o:", "-l:", "--time-out="] gs_om_status = ["-t:", "-?", "--help", "-V", "--version", "-h:", "-o:", @@ -356,7 +357,8 @@ class Parameter(): "--add-hostname": "add_hostname", "--add-hostip": "add_hostip", "--security-mode": "security_mode", - "--cluster-number": "cluster_number" + "--cluster-number": "cluster_number", + "--component": "component" } parameterNeedValue_keys = parameterNeedValue.keys() diff --git a/script/impl/om/OLAP/OmImplOLAP.py b/script/impl/om/OLAP/OmImplOLAP.py index 19f8136e..c0e89cf0 100644 --- a/script/impl/om/OLAP/OmImplOLAP.py +++ b/script/impl/om/OLAP/OmImplOLAP.py @@ -202,8 +202,18 @@ class OmImplOLAP(OmImpl): output: NA """ self.logger.debug("Operating: Starting.") - # if has cm, will start cluster by cm_ctl command - if ((not self.context.clusterInfo.hasNoCm()) + + # only stop cm components(cm_server cm_agent om_monitor) + print(self.context.g_opts.component) + if self.context.g_opts.component == "CM": + if self.context.clusterInfo.hasNoCm(): + self.logger.log("No CM components to start.") + else: + self.do_opt_cm_components('start') + return + + # if has cm and param --component!=DN, will start cluster by cm_ctl command + if ((not self.context.clusterInfo.hasNoCm() and self.context.g_opts.component != "DN") and DefaultValue.isgreyUpgradeNodeSpecify(self.context.user, DefaultValue.GREY_UPGRADE_STEP_UPGRADE_PROCESS, None, self.context.logger)): self.context.logger.debug("Have CM configuration, upgrade all" @@ -267,6 +277,9 @@ class OmImplOLAP(OmImpl): cluster_state = "" cmd = "source %s; gs_om -t status|grep cluster_state" \ % self.context.g_opts.mpprcFile + if self.context.g_opts.component == "DN": + cmd = "source %s; gs_om -t query|grep cluster_state" \ + % self.context.g_opts.mpprcFile while time.time() <= 30 + starttime: status, output = subprocess.getstatusoutput(cmd) if status != 0: @@ -310,7 +323,29 @@ class OmImplOLAP(OmImpl): self.dataDir, self.context.g_opts.azName)) self.logger.debug("Operation succeeded: Stop by cm.") + + def do_opt_cm_components(self, action): + if self.context.g_opts.nodeName == "": + host_list = self.clusterInfo.getClusterNodeNames() + else: + host_list = [] + host_list.append(self.context.g_opts.nodeName) + self.sshTool = SshTool(self.clusterInfo.getClusterNodeNames(), None, + DefaultValue.TIMEOUT_CLUSTER_START) + + stopcmd = "crontab -l | sed '/om_monitor/s/^/#/' | crontab -;" \ + + f"pkill -9 om_monitor -U {self.context.user}" \ + + f"pkill -9 cm_agent -U {self.context.user};" \ + + f"pkill -9 cm_server -U {self.context.user}" + + cmd = "source %s; %s -U %s -R %s --action %s" % ( + self.context.g_opts.mpprcFile, + OMCommand.getLocalScript("Local_Operate_CM"), + self.context.user, self.context.clusterInfo.appPath, + action) + (statusMap, output) = self.sshTool.getSshStatusOutput(cmd, host_list) + def doStopCluster(self): """ function: do stop cluster @@ -318,8 +353,18 @@ class OmImplOLAP(OmImpl): output: NA """ self.logger.debug("Operating: Stopping.") - # if has cm, will start cluster by cm_ctl command - if not self.context.clusterInfo.hasNoCm(): + + # only stop cm components(cm_server cm_agent om_monitor) + print(self.context.g_opts.component) + if self.context.g_opts.component == "CM": + if self.context.clusterInfo.hasNoCm(): + self.logger.log("No CM components to stop.") + else: + self.do_opt_cm_components('stop') + return + + # if has cm and param --component!=DN, will start cluster by cm_ctl command + if not self.context.clusterInfo.hasNoCm() and self.context.g_opts.component != "DN": self.doStopClusterByCm() return # Specifies the stop node diff --git a/script/impl/upgrade/UpgradeImpl.py b/script/impl/upgrade/UpgradeImpl.py index d11afae8..d109fbe7 100644 --- a/script/impl/upgrade/UpgradeImpl.py +++ b/script/impl/upgrade/UpgradeImpl.py @@ -1838,6 +1838,7 @@ class UpgradeImpl: """ self.context.logger.log("Switching all db processes.", "addStep") self._check_and_start_cluster() + if DefaultValue.get_cm_server_num_from_static(self.context.oldClusterInfo) > 0: self.setUpgradeFromParam(self.context.oldClusterNumber) self.reloadCmAgent() @@ -1959,6 +1960,22 @@ class UpgradeImpl: self.waif_for_om_monitor_start(is_rollback=isRollback) + # Under cm, restart the cm component and database component separately. + # ddes mode and specified node upgrade are not supported. + upgrade_sep_comps = False + if DefaultValue.get_cm_server_num_from_static(self.context.oldClusterInfo) > 0 and \ + not EnvUtil.is_dss_mode(getpass.getuser()) and \ + len(self.context.nodeNames) == len(self.context.clusterNodes): + self.context.logger.log("Upgrade with seperating CM and Dn components.") + upgrade_sep_comps = True + + self.context.logger.log("Stop CM components.") + cmd = "gs_om -t stop --component=CM" + (status, output) = subprocess.getstatusoutput(cmd) + if status != 0: + raise Exception(ErrorCode.GAUSS_514["GAUSS_51400"] % + "Command:%s. Error:\n%s" % (cmd, output)) + self.refresh_dynamic_config_file(); self.context.logger.log("Switching DN processes.") is_rolling = False start_time = timeit.default_timer() @@ -2000,15 +2017,24 @@ class UpgradeImpl: hostList = copy.deepcopy(self.context.nodeNames) self.context.sshTool.executeCommand(cmd, hostList=hostList) start_cluster_time = timeit.default_timer() - self.greyStartCluster() + self.greyStartCluster(upgrade_sep_comps) end_cluster_time = timeit.default_timer() - start_cluster_time self.context.logger.debug("Time to start cluster is %s" % self.getTimeFormat(end_cluster_time)) elapsed = timeit.default_timer() - start_time self.context.logger.debug("Time to switch DN process version: %s" % self.getTimeFormat(elapsed)) - - def greyStartCluster(self): + + if upgrade_sep_comps: + self.context.logger.log("Start CM components.") + cmd = "gs_om -t start --component=CM" + (status, output) = subprocess.getstatusoutput(cmd) + if status != 0: + raise Exception(ErrorCode.GAUSS_514["GAUSS_51400"] % + "Command:%s. Error:\n%s" % (cmd, output)) + + + def greyStartCluster(self, only_dn=False): """ start cluster in grey upgrade :return: @@ -2021,6 +2047,8 @@ class UpgradeImpl: cmd = "gs_om -t start --cluster-number='%s' --time-out=600" % (number) else: cmd = "gs_om -t start" + if only_dn: + cmd += " --component=DN" (status, output) = subprocess.getstatusoutput(cmd) if status != 0: raise Exception(ErrorCode.GAUSS_514["GAUSS_51400"] % diff --git a/script/local/LocalCmOpt.py b/script/local/LocalCmOpt.py new file mode 100644 index 00000000..bce06b64 --- /dev/null +++ b/script/local/LocalCmOpt.py @@ -0,0 +1,211 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# Copyright (c) 2024 Huawei Technologies Co.,Ltd. +# +# openGauss is licensed under Mulan PSL v2. +# You can use this software according to the terms +# and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# +# http://license.coscl.org.cn/MulanPSL2 +# +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, +# WITHOUT WARRANTIES OF ANY KIND, +# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +# See the Mulan PSL v2 for more details. +# ---------------------------------------------------------------------------- + +from pickle import STOP +import subprocess +from re import sub +import sys +import getopt + +sys.path.append(sys.path[0] + "/../") +from gspylib.common.GaussLog import GaussLog +from gspylib.common.ErrorCode import ErrorCode +from gspylib.common.LocalBaseOM import LocalBaseOM +from gspylib.common.ParameterParsecheck import Parameter +from domain_utils.cluster_file.cluster_log import ClusterLog +from domain_utils.domain_common.cluster_constants import ClusterConstants +from base_utils.os.env_util import EnvUtil +from gspylib.component.DSS.dss_checker import DssConfig +from base_utils.os.crontab_util import CrontabUtil +from domain_utils.cluster_file.cluster_dir import ClusterDir +from gspylib.common.Common import DefaultValue + +class CMOptConst: + START = "start" + STOP = "stop" + + +class LocalCmOpt(LocalBaseOM): + """ + The class is used to do perform start + """ + + def __init__(self): + """ + function: initialize the parameters + input: NA + output: NA + """ + super(LocalCmOpt, self).__init__() + self.user = "" + self.dataDir = "" + self.time_out = 300 + self.logFile = "" + self.logger = None + self.installPath = "" + self.security_mode = "" + self.cluster_number = None + self.action = "" + + def usage(self): + """ +gs_start is a utility to start the database + +Uasge: + gs_start -? | --help + gs_start -U USER [-D DATADIR][-t SECS][-l LOGFILE] + +General options: + -U USER the database program and cluster owner") + -D DATADIR data directory of instance + -t SECS seconds to wait + -l LOGFILE log file + -?, --help show this help, then exit + """ + print(self.usage.__doc__) + + def parseCommandLine(self): + """ + function: Check input parameters + input : NA + output: NA + """ + try: + opts, args = getopt.getopt(sys.argv[1:], "U:D:R:l:h?", + ["help", "action="]) + except getopt.GetoptError as e: + GaussLog.exitWithError(ErrorCode.GAUSS_500["GAUSS_50000"] % str(e)) + + if (len(args) > 0): + GaussLog.exitWithError( + ErrorCode.GAUSS_500["GAUSS_50000"] % str(args[0])) + + for key, value in opts: + if key == "-U": + self.user = value + elif key == "-D": + self.dataDir = value + elif key == "-t": + self.time_out = int(value) + elif key == "-l": + self.logFile = value + elif key == "-R": + self.installPath = value + elif key == "--action": + self.action = value + elif key == "--help" or key == "-h" or key == "-?": + self.usage() + sys.exit(0) + else: + GaussLog.exitWithError(ErrorCode.GAUSS_500["GAUSS_50000"] + % key) + Parameter.checkParaVaild(key, value) + + if self.user == "": + GaussLog.exitWithError(ErrorCode.GAUSS_500["GAUSS_50001"] + % 'U' + ".") + if self.logFile == "": + self.logFile = ClusterLog.getOMLogPath( + ClusterConstants.LOCAL_LOG_FILE, self.user, self.installPath) + + def __initLogger(self): + """ + function: Init logger + input : NA + output: NA + """ + self.logger = GaussLog(self.logFile, "LocalCMOperation") + + def init(self): + """ + function: constructor + """ + self.__initLogger() + self.readConfigInfo() + + def do_stop_components(self): + cmd = "" + if CrontabUtil.check_user_crontab_permission(): + crondesc = subprocess.getoutput("crontab -l | grep om_monitor") + if not crondesc.startswith("#"): + cmd += "crontab -l | sed '/om_monitor/s/^/#/' | crontab -;" + cmd += f"pkill -9 om_monitor -U {self.user};" + cmd += f"pkill -9 cm_agent -U {self.user};" + cmd += f"pkill -9 cm_server -U {self.user};" + + self.logger.log(f"stop cm components: {cmd}") + status, output = subprocess.getstatusoutput(cmd) + self.logger.log(status, output) + + + def do_start_components(self): + mpprc_file = EnvUtil.getEnv(DefaultValue.MPPRC_FILE_ENV) + app_path = ClusterDir.getInstallDir(self.user) + log_path = ClusterLog.getOMLogPath(DefaultValue.OM_MONITOR_DIR_FILE, + self.user, + app_path) + cmd = "" + if CrontabUtil.check_user_crontab_permission(): + crondesc = subprocess.getoutput("crontab -l | grep om_monitor") + if crondesc.startswith("#"): + cmd = "crontab -l | sed '/om_monitor/s/^#//' | crontab -;" + if mpprc_file != "" and mpprc_file is not None: + cmd += "source ~/.bashrc;source %s; nohup %s/bin/om_monitor -L %s " \ + ">>/dev/null 2>&1 &" % (mpprc_file, app_path, log_path) + else: + cmd += "source ~/.bashrc; nohup %s/bin/om_monitor -L %s >>" \ + "/dev/null 2>&1 &" % (app_path, log_path) + + self.logger.log(f"start cm components: {cmd}") + status, output = subprocess.getstatusoutput(cmd) + self.logger.log(status, output) + + + def do_operate(self): + """ + function: do start database + input : NA + output : NA + """ + print(self.action) + if self.action == CMOptConst.START: + self.do_start_components() + elif self.action == CMOptConst.STOP: + self.do_stop_components() + else: + self.logger.warn(f"action [{self.action}] is unknown, Do nothing.") + + +def main(): + """ + main function + """ + try: + opt = LocalCmOpt() + opt.parseCommandLine() + opt.init() + except Exception as e: + GaussLog.exitWithError(ErrorCode.GAUSS_536["GAUSS_53608"] % str(e)) + try: + opt.do_operate() + except Exception as e: + GaussLog.exitWithError(str(e)) + + +if __name__ == "__main__": + main() diff --git a/script/local/UpgradeUtility.py b/script/local/UpgradeUtility.py index c4736b8e..a0e88432 100644 --- a/script/local/UpgradeUtility.py +++ b/script/local/UpgradeUtility.py @@ -4362,45 +4362,8 @@ def isKillDn(): pattern = re.compile(r'[(](.*?)[)]') versionInBrackets = re.findall(pattern, output) curCommitid = versionInBrackets[0].split(" ")[-1] - # get the dn and cn name - dnInst = None - clusterNodes = g_clusterInfo.dbNodes - with_cm = True if g_clusterInfo.cmscount > 0 else False - for dbNode in clusterNodes: - if len(dbNode.datanodes) == 0: - continue - dnInst = dbNode.datanodes[0] - primaryDnNode, _ = DefaultValue.getPrimaryNode(g_opts.userProfile, with_cm=with_cm) - if dnInst.hostname not in primaryDnNode: - continue - break - localHost = NetUtil.GetHostIpOrName() - if int(g_opts.oldVersion) >= 92069: - sql = "select node_name, node_type from pg_catalog.pgxc_node " \ - "where node_host = '%s';" % localHost - else: - if g_dbNode.name != dnInst.hostname: - sql = "select node_name, node_type from pg_catalog.pgxc_node " \ - "where node_host = '%s';" % localHost - else: - sql = "select node_name, node_type from pg_catalog.pgxc_node" \ - " where node_host = 'localhost';" - g_logger.debug("Sql to query node name: %s" % sql) - (status, output) = ClusterCommand.remoteSQLCommand( - sql, g_opts.user, - dnInst.hostname, dnInst.port, False, - DefaultValue.DEFAULT_DB_NAME, IsInplaceUpgrade=True) - if status != 0 or SqlResult.findErrorInSql(output): - raise Exception(ErrorCode.GAUSS_513["GAUSS_51300"] % sql + - " Error: \n%s" % str(output)) - resList = output.split('\n') - dnNames = [] - for record in resList: - record = record.split('|') - nodeName = record[0].strip() - dnNames.append(nodeName) - g_logger.debug("isKillDn dnName:{0} " - "commitid:{1}".format(dnNames, curCommitid)) + + g_logger.debug("isKillDn commitid:{0}".format(curCommitid)) # execute on the dn and cn to get the exists process version if g_opts.rolling: current_user = pwd.getpwuid(os.getuid()).pw_name -- Gitee From 55e27c48102d1e0a6226a92eea0b2d1d78d38843 Mon Sep 17 00:00:00 2001 From: zhang_xubo <2578876417@qq.com> Date: Sat, 7 Sep 2024 18:13:45 +0800 Subject: [PATCH 2/2] =?UTF-8?q?CM=E4=B8=8B=E5=8D=87=E7=BA=A7=E6=97=B6?= =?UTF-8?q?=E5=80=99=EF=BC=8Ccm=E5=92=8Cdn=E7=BB=84=E4=BB=B6=E5=88=86?= =?UTF-8?q?=E5=BC=80=E9=87=8D=E5=90=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- script/local/LocalCmOpt.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/script/local/LocalCmOpt.py b/script/local/LocalCmOpt.py index bce06b64..de3676de 100644 --- a/script/local/LocalCmOpt.py +++ b/script/local/LocalCmOpt.py @@ -64,18 +64,13 @@ class LocalCmOpt(LocalBaseOM): def usage(self): """ -gs_start is a utility to start the database - -Uasge: - gs_start -? | --help - gs_start -U USER [-D DATADIR][-t SECS][-l LOGFILE] - General options: -U USER the database program and cluster owner") -D DATADIR data directory of instance -t SECS seconds to wait -l LOGFILE log file -?, --help show this help, then exit + --action start or stop """ print(self.usage.__doc__) -- Gitee