diff --git a/common/container/__init__.py b/common/container/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/common/container/container.py b/common/container/container.py new file mode 100644 index 0000000000000000000000000000000000000000..e4181dc3ad8d8fa7db363f028102c7390809cfcd --- /dev/null +++ b/common/container/container.py @@ -0,0 +1,46 @@ +import abc +import typing + + +class Container(abc.ABC): + def __init__(self, container_id): + self.container_id = container_id + + @abc.abstractmethod + def create_dir_in_container(self, target_dir, mode=755): + """ + 在docker中创建目录 + """ + + @abc.abstractmethod + def copy_to_container(self, origin_file, target_dir, mode=755): + """ + 复制宿主机文件到容器内 + """ + + @abc.abstractmethod + def copy_to_host(self, origin_file, target_dir, mode=755): + """ + 复制容器中的文件到宿主机 + """ + + @abc.abstractmethod + def delete_in_container(self, target_dir): + """ + 删除文件夹 + """ + + +class ContainerFactory(abc.ABC): + + @abc.abstractmethod + def check_in_machine(self): + """ + 检测当前服务器中是否存在该类型容器 + """ + + @abc.abstractmethod + def get_container(self) -> typing.Dict[str, Container]: + """ + 获取当前运行的容器 + """ diff --git a/common/container/crictl.py b/common/container/crictl.py new file mode 100644 index 0000000000000000000000000000000000000000..3e53d5f1859754b22b79b7c63047a5f2a1f7da3c --- /dev/null +++ b/common/container/crictl.py @@ -0,0 +1,90 @@ +import logging +import os.path +import re +import typing + +from container.container import Container, ContainerFactory +from devkit_utils import shell_tools + + +class CrictlContainer(Container): + + def __init__(self, container_id, name, pod_id, pod): + super().__init__(container_id) + self.name = name + self.pod_id = pod_id + self.pod = pod + + def create_dir_in_container(self, target_dir, mode=755): + logging.info("create dir:%s", target_dir) + outcome = shell_tools.exec_shell(f"kubectl exec {self.pod} -c {self.name} -- mkdir -p {target_dir}", + is_shell=True) + logging.info(outcome) + outcome = shell_tools.exec_shell(f"kubectl exec {self.pod} -c {self.name} -- chmod {mode} {target_dir}", + is_shell=True) + logging.info(outcome) + + def copy_to_container(self, origin_file, target_dir, mode=755): + logging.info("origin:%s target:%s", origin_file, target_dir) + file_name = os.path.basename(origin_file) + outcome = shell_tools.exec_shell(f"kubectl cp -c {self.name} {origin_file} {self.pod}:{target_dir}", + is_shell=True) + logging.info(outcome) + outcome = shell_tools.exec_shell( + f"kubectl exec {self.pod} -c {self.name} -- chmod {mode} {target_dir}/{file_name}", + is_shell=True) + logging.info(outcome) + + def copy_to_host(self, origin_file, target_dir, mode=755): + logging.info("origin:%s target:%s", origin_file, target_dir) + outcome = shell_tools.exec_shell(f"kubectl cp -c {self.name} {self.pod}:{origin_file} {target_dir}", + is_shell=True) + logging.info(outcome) + + def delete_in_container(self, target_dir): + logging.info("target:%s", target_dir) + outcome = shell_tools.exec_shell(f"kubectl exec {self.pod} -c {self.name} -- rm -rf {target_dir}", + is_shell=True) + logging.info(outcome) + + +class CrictlContainerFactory(ContainerFactory): + + def __init__(self): + self.container_id_index = 0 + self.name_index = 0 + self.pod_id_index = 0 + self.pod_index = 0 + + def check_in_machine(self): + outcome = shell_tools.exec_shell("crictl ps", is_shell=True) + logging.info(outcome) + if outcome.return_code == 0: + outcome = shell_tools.exec_shell("kubectl get pods", is_shell=True) + if outcome.return_code == 0: + return True + return False + + def get_container(self) -> typing.List[Container]: + containers = list() + outcome = shell_tools.exec_shell("crictl ps", is_shell=True) + lines = outcome.out.strip().split("\n") + self.__parse(lines[0]) + for line in lines[1:]: + fields = re.split(r"\s{2,}", line) + containers.append(CrictlContainer(fields[self.container_id_index], fields[self.name_index], + fields[self.pod_id_index], fields[self.pod_index])) + return containers + + def __parse(self, header: str): + fields = re.split(r"\s{2,}", header) + for i in range(0, len(fields)): + field = fields[i].lower() + if field == "container": + self.container_id_index = i + if field == "name": + self.name_index = i + if field == "pod id": + self.pod_id_index = i + if field == "pod": + self.pod_index = i diff --git a/common/container/docker.py b/common/container/docker.py new file mode 100644 index 0000000000000000000000000000000000000000..42cb7aa4caebf98430bea1b1ef2d5dc098f23842 --- /dev/null +++ b/common/container/docker.py @@ -0,0 +1,58 @@ +import logging +import os.path +import typing + +from container.container import Container, ContainerFactory +from devkit_utils import shell_tools + + +class DockerContainer(Container): + + def __init__(self, container_id): + super().__init__(container_id) + + def create_dir_in_container(self, target_dir, mode=755): + outcome = shell_tools.exec_shell(f"docker exec {self.container_id} mkdir -p {target_dir}", is_shell=True) + logging.info(outcome) + outcome = shell_tools.exec_shell(f"docker exec {self.container_id} chmod {mode} {target_dir}", is_shell=True) + logging.info(outcome) + + def copy_to_container(self, origin_file, target_dir, mode=755): + file_name = os.path.basename(origin_file) + outcome = shell_tools.exec_shell(f"docker cp {origin_file} {self.container_id}:{target_dir}", is_shell=True) + logging.info(outcome) + outcome = shell_tools.exec_shell(f"docker exec {self.container_id} chmod {mode} {target_dir}/{file_name}", + is_shell=True) + logging.info(outcome) + + def copy_to_host(self, origin_file, target_dir, mode=755): + outcome = shell_tools.exec_shell(f"docker cp {self.container_id}:{origin_file} {target_dir}", + is_shell=True) + logging.info(outcome) + + def delete_in_container(self, target_dir): + outcome = shell_tools.exec_shell(f"docker exec {self.container_id} rm -rf {target_dir}", + is_shell=True) + logging.info(outcome) + + +class DockerContainerFactory(ContainerFactory): + def __init__(self): + self.container_id_index = 0 + + def check_in_machine(self): + outcome = shell_tools.exec_shell("docker ps", is_shell=True) + logging.info(outcome) + if outcome.return_code == 0: + return True + else: + return False + + def get_container(self) -> typing.List[Container]: + containers = list() + outcome = shell_tools.exec_shell("docker ps", is_shell=True) + lines = outcome.out.strip().split("\n") + for line in lines[1:]: + fields = line.split() + containers.append(DockerContainer(fields[self.container_id_index])) + return containers diff --git a/common/devkit_utils/container_utils.py b/common/devkit_utils/container_utils.py new file mode 100644 index 0000000000000000000000000000000000000000..f4b917df605d42e1aa07fd08b1f6104d6adba782 --- /dev/null +++ b/common/devkit_utils/container_utils.py @@ -0,0 +1,18 @@ +import typing + +from container.container import Container +from container.crictl import CrictlContainerFactory +from container.docker import DockerContainerFactory + + +def get_containers() -> typing.List[Container]: + """ + 获取容器信息,当存在docker命令时,使用docker命令获取,不存在docker命令时,尝试使用crictl(必须存在kubectl命令)获取 + """ + docker_factory = DockerContainerFactory() + if docker_factory.check_in_machine(): + return docker_factory.get_container() + crictl_factory = CrictlContainerFactory() + if crictl_factory.check_in_machine(): + return crictl_factory.get_container() + return list() diff --git a/common/devkit_utils/docker_utils.py b/common/devkit_utils/docker_utils.py deleted file mode 100644 index 2c217c777bba1b9f31227c0fc3c04c3f28cfdd59..0000000000000000000000000000000000000000 --- a/common/devkit_utils/docker_utils.py +++ /dev/null @@ -1,40 +0,0 @@ -import logging -import os.path - -from devkit_utils import shell_tools - - -def get_docker_id(docker_path: str): - paths_name = docker_path.strip().split("/") - if len(paths_name) >= 3: - return paths_name[2] - else: - raise Exception("can not found docker id") - - -def is_docker_process(pid): - cgroup_file = f'/proc/{pid}/cgroup' - if not os.path.exists(cgroup_file): - return False, None - with open(cgroup_file, "r", encoding="utf-8") as file: - cgroup_infos = file.readlines() - for line in cgroup_infos: - fields = line.split(":") - if len(fields) >= 3 and str(fields[1]) == "devices" and str(fields[2]).startswith("/docker"): - return True, get_docker_id(fields[2]) - return False, None - - -def create_dir_in_docker(docker_id, target_dir, mode=755): - outcome = shell_tools.exec_shell(f"docker exec {docker_id} mkdir -p {target_dir}", is_shell=True) - logging.info(outcome) - outcome = shell_tools.exec_shell(f"docker exec {docker_id} chmod {mode} {target_dir}", is_shell=True) - logging.info(outcome) - - -def copy_to_docker(docker_id, origin_file, target_dir): - file_name = os.path.basename(origin_file) - outcome = shell_tools.exec_shell(f"docker cp {origin_file} {docker_id}:{target_dir}", is_shell=True) - logging.info(outcome) - outcome = shell_tools.exec_shell(f"docker exec {docker_id} chmod 755 {target_dir}/{file_name}", is_shell=True) - logging.info(outcome) diff --git a/common/devkit_utils/proc_utils.py b/common/devkit_utils/proc_utils.py new file mode 100644 index 0000000000000000000000000000000000000000..ac21ab18335034a5f6b835c4c3dc7d38a8d0f909 --- /dev/null +++ b/common/devkit_utils/proc_utils.py @@ -0,0 +1,51 @@ +import os +import typing + +from container.container import Container + + +def is_java_process(pid) -> bool: + """ + 判断是否是一个java进程 + """ + comm_file = f'/proc/{pid}/comm' + if not os.path.exists(comm_file): + return False + with open(comm_file, "r", encoding="utf-8") as file: + comm = file.readline() + if "java" in comm: + return True + return False + + +def is_container_process(pid, containers: typing.List[Container]) -> tuple[bool, typing.Optional[Container]]: + """ + 判断是否是一个容器进程 + """ + cgroup_file = f'/proc/{pid}/cgroup' + if not os.path.exists(cgroup_file): + return False, None + with open(cgroup_file, "r", encoding="utf-8") as file: + cgroup_infos = file.readlines() + for line in cgroup_infos: + # 三层结构不支持 宿主机-containerd-containerd + # 当前只支持两层结构 宿主机-containerd,并且宿主机 有 docker 或者(crictl和kubectl)命令 + if "devices" in line and "/kubepods/" in line: + return True, __get_container(line, containers, False) + if "devices" in line and "/docker/" in line: + return True, __get_container(line, containers, True) + return False, None + + +def __get_container(device_line: str, containers: typing.List[Container], is_docker: bool): + paths_name = device_line.strip().split("/") + length = len(paths_name) + if length >= 3 and is_docker: + for container in containers: + if container.container_id in paths_name[2]: + return container + elif length >= 3: + for container in containers: + if container.container_id in paths_name[length - 1]: + return container + raise Exception("can not found docker id") diff --git a/component/DevKitTester/devkit_tester/bin/entrance.py b/component/DevKitTester/devkit_tester/bin/entrance.py index c2f997f9b244052fe4384002b2fbc3c273c70946..dad4dfa9611931a2ff63b51446f6072ddcb64c31 100644 --- a/component/DevKitTester/devkit_tester/bin/entrance.py +++ b/component/DevKitTester/devkit_tester/bin/entrance.py @@ -343,12 +343,12 @@ class Distributor: logging.info("ip:%s start devkit pipeline agent", ip) if self.enable_jmeter_command: start_command = ( - f"bash {task_id}/devkit_tester_agent/bin/devkit_agent_start.sh -a {self.apps} " - f"-d {self.duration} -t {task_id} -w") + f"bash --login -c 'bash {task_id}/devkit_tester_agent/bin/devkit_agent_start.sh -a {self.apps} " + f"-d {self.duration} -t {task_id} -w'") else: start_command = ( - f"bash {task_id}/devkit_tester_agent/bin/devkit_agent_start.sh -a {self.apps} " - f"-d {self.duration} -t {task_id}") + f"bash --login -c 'bash {task_id}/devkit_tester_agent/bin/devkit_agent_start.sh -a {self.apps} " + f"-d {self.duration} -t {task_id}'") stdin, stdout, stderr = ssh_client.exec_command(start_command) logging.info("start the sampling process on server %s:%s", ip, stderr.readlines()) self.__close_pipeline(stdin, stdout, stderr) diff --git a/component/DevKitTester/devkit_tester_agent/bin/flight_records_sample.py b/component/DevKitTester/devkit_tester_agent/bin/flight_records_sample.py index 40ee1d735df05ae3a986c68eb27e58c0ea5fab16..5f451ea03c579b5fbab40f22018b9b87128bbf6f 100644 --- a/component/DevKitTester/devkit_tester_agent/bin/flight_records_sample.py +++ b/component/DevKitTester/devkit_tester_agent/bin/flight_records_sample.py @@ -4,10 +4,12 @@ import logging.config import os.path import shutil import time +import typing import psutil -from devkit_utils import shell_tools, file_utils, docker_utils +from container.container import Container +from devkit_utils import shell_tools, file_utils, container_utils, proc_utils from devkit_utils.error_coce import ErrorCodeEnum from devkit_utils.log_config import config_log_ini from devkit_utils.pyinstaller_utils import PyInstallerUtils @@ -16,13 +18,13 @@ ROOT_PATH = os.path.dirname(os.path.dirname(__file__)) class TargetProcess: - def __init__(self, pid, name, is_docker=False, docker_id=None): + def __init__(self, pid, name, container: typing.Optional[Container] = None): self.pid = pid self.name = name self.jfr_name = None - self.is_docker = is_docker - self.docker_id = docker_id + self.container: typing.Optional[Container] = container self.jfr_path = None + self.username = None class FlightRecordsFactory: @@ -35,7 +37,7 @@ class FlightRecordsFactory: self.root_path = root_path self.wait_for_jmeter_stop = wait_for_jmeter_stop self.pids: list[TargetProcess] = list() - self.pids_to_start_recording = list() + self.pids_to_start_recording: list[TargetProcess] = list() self.pids_to_stop_recording = list() self.jfr_paths: list[str] = [] self.return_code = ErrorCodeEnum.FINISHED @@ -50,7 +52,6 @@ class FlightRecordsFactory: file_utils.create_dir(self.dir_to_storage_jfr) self.jcmd_path = None self.user_is_root = False - self.is_docker = False def start_sample(self): try: @@ -126,15 +127,18 @@ class FlightRecordsFactory: os.chmod(self.temporary_settings_path, mode=0o644) def __init_pids(self): + containers = container_utils.get_containers() for app in self.apps.split(","): commander_to_view_pid = "ps -ef|grep java|grep -v grep|grep {}|awk '{{print $2}}'".format(app) outcome = shell_tools.exec_shell(commander_to_view_pid, is_shell=True) logging.info("app:%s to pid %s", app, outcome) pids = outcome.out.split() for pid in pids: - is_docker, docker_id = docker_utils.is_docker_process(pid) - if is_docker: - self.pids.append(TargetProcess(pid, app, is_docker, docker_id)) + if not proc_utils.is_java_process(pid): + continue + is_in_container, container = proc_utils.is_container_process(pid, containers) + if is_in_container: + self.pids.append(TargetProcess(pid, app, container)) else: self.pids.append(TargetProcess(pid, app)) @@ -155,16 +159,15 @@ class FlightRecordsFactory: self.user_is_root = False def __start_recorder_by_root(self): - logging.info(PyInstallerUtils.get_env()) for target in self.pids: - if target.is_docker: - docker_utils.create_dir_in_docker(target.docker_id, self.tmp_dir, mode=777) - docker_utils.create_dir_in_docker(target.docker_id, self.tmp_config_dir, mode=777) - docker_utils.create_dir_in_docker(target.docker_id, self.tmp_data_dir, mode=777) - docker_utils.copy_to_docker(target.docker_id, self.temporary_settings_path, - self.tmp_config_dir) + if target.container: + target.container.create_dir_in_container(self.tmp_dir, mode=777) + target.container.create_dir_in_container(self.tmp_config_dir, mode=777) + target.container.create_dir_in_container(self.tmp_data_dir, mode=777) + target.container.copy_to_container(self.temporary_settings_path, self.tmp_config_dir) jfr_path = self.__jfr_name(target.name, target.pid) username = psutil.Process(int(target.pid)).username() + target.username = username command = (f"su - {username} -c '" f"{self.jcmd_path} {target.pid} JFR.start settings={self.temporary_settings_path} " f"duration={self.duration}s name={self.RECORD_NAME} filename={jfr_path}'") @@ -175,18 +178,20 @@ class FlightRecordsFactory: target.jfr_path = jfr_path self.jfr_paths.append(jfr_path) self.pids_to_start_recording.append(target) + else: + if target.container: + target.container.delete_in_container(self.tmp_dir) # 移动到data目录下 with open(file=os.path.join(self.root_path, "config/upload_sample.ini"), mode="w", encoding="utf-8") as file: file.write(os.linesep.join(self.jfr_paths)) def __start_recorder(self): for target in self.pids: - if target.is_docker: - docker_utils.create_dir_in_docker(target.docker_id, self.tmp_dir, mode=777) - docker_utils.create_dir_in_docker(target.docker_id, self.tmp_config_dir, mode=777) - docker_utils.create_dir_in_docker(target.docker_id, self.tmp_data_dir, mode=777) - docker_utils.copy_to_docker(target.docker_id, self.temporary_settings_path, - self.tmp_config_dir) + if target.container: + target.container.create_dir_in_container(self.tmp_dir, mode=777) + target.container.create_dir_in_container(self.tmp_config_dir, mode=777) + target.container.create_dir_in_container(self.tmp_data_dir, mode=777) + target.container.copy_to_container(self.temporary_settings_path, self.tmp_config_dir) jfr_path = self.__jfr_name(target.name, target.pid) command = (f"jcmd {target.pid} JFR.start settings={self.temporary_settings_path} duration={self.duration}s" f" name={self.RECORD_NAME} filename={jfr_path}") @@ -197,27 +202,35 @@ class FlightRecordsFactory: target.jfr_path = jfr_path self.jfr_paths.append(jfr_path) self.pids_to_start_recording.append(target) + else: + if target.container: + target.container.delete_in_container(self.tmp_dir) # 移动到data目录下 with open(file=os.path.join(self.root_path, "config/upload_sample.ini"), mode="w", encoding="utf-8") as file: file.write(os.linesep.join(self.jfr_paths)) def __stop_recorder_by_root(self): for target in self.pids_to_start_recording: - username = psutil.Process(int(target.pid)).username() - command = f"su - {username} -c '{self.jcmd_path} {target.pid} JFR.stop name={self.RECORD_NAME}'" + if not os.path.exists(os.path.join("/proc", target.pid)): + continue + command = f"su - {target.username} -c '{self.jcmd_path} {target.pid} JFR.stop name={self.RECORD_NAME}'" outcome = shell_tools.exec_shell(command, is_shell=True) logging.info(outcome) def __stop_recorder(self): for target in self.pids_to_start_recording: + if not os.path.exists(os.path.join("/proc", target.pid)): + continue outcome = shell_tools.exec_shell("jcmd {} JFR.stop name={}".format(target.pid, self.RECORD_NAME), is_shell=True) logging.info(outcome) def __check_has_stopped_recorder_by_root(self): for target in self.pids_to_start_recording: - username = psutil.Process(int(target.pid)).username() - command = f"su - {username} -c '{self.jcmd_path} {target.pid} JFR.check name={self.RECORD_NAME}'" + if not os.path.exists(os.path.join("/proc", target.pid)): + self.pids_to_stop_recording.append(target) + continue + command = f"su - {target.username} -c '{self.jcmd_path} {target.pid} JFR.check name={self.RECORD_NAME}'" outcome = shell_tools.exec_shell(command, is_shell=True) logging.info(outcome) if outcome.out.find("Could not find"): @@ -230,6 +243,9 @@ class FlightRecordsFactory: def __check_has_stopped_recorder(self): for target in self.pids_to_start_recording: + if not os.path.exists(os.path.join("/proc", target.pid)): + self.pids_to_stop_recording.append(target) + continue outcome = shell_tools.exec_shell("jcmd {} JFR.check name={}".format(target.pid, self.RECORD_NAME), is_shell=True) logging.info(outcome) @@ -243,13 +259,9 @@ class FlightRecordsFactory: def __copy_to_host(self): for target in self.pids_to_start_recording: - if target.is_docker: - outcome = shell_tools.exec_shell(f"docker cp {target.docker_id}:{target.jfr_path} {target.jfr_path}", - is_shell=True) - logging.info(outcome) - outcome = shell_tools.exec_shell(f"docker exec {target.docker_id} rm -rf {self.tmp_dir}", - is_shell=True) - logging.info(outcome) + if target.container: + target.container.copy_to_host(target.jfr_path, target.jfr_path) + target.container.delete_in_container(self.tmp_dir) def __jfr_name(self, app, pid): return os.path.join(self.tmp_data_dir, f"{app}_PID_{pid}_Time_{self.now_date}.jfr") @@ -283,6 +295,7 @@ def main(): parser.set_defaults(root_path=PyInstallerUtils.obtain_root_path(ROOT_PATH)) args = parser.parse_args() config_log_ini(args.root_path, "devkit_tester_agent") + logging.info(PyInstallerUtils.get_env()) logging.info("start") factory = FlightRecordsFactory(args.applications, args.duration, args.root_path, args.waiting, args.task_id) factory.start_sample()