From 0439e1dba9b769f71d83a87e417ba8704bed1398 Mon Sep 17 00:00:00 2001 From: lvzk <897706680@qq.com> Date: Wed, 9 Apr 2025 19:31:27 +0800 Subject: [PATCH 01/15] =?UTF-8?q?[=E5=8A=9F=E8=83=BD]=E5=88=9B=E5=BB=BA?= =?UTF-8?q?=E4=BD=9C=E4=B8=9A=E5=A2=9E=E5=8A=A0=E6=95=B0=E9=87=8F=E9=99=90?= =?UTF-8?q?=E5=88=B6=E6=8E=92=E9=98=9F=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../autoexecrunner/api/job/JobExecApi.java | 6 +- .../asynchronization/NeatLogicThread.java | 190 ++++++++++++++++++ .../queue/NeatLogicUniqueBlockingQueue.java | 102 ++++++++++ .../threadlocal/UserContext.java | 24 ++- .../threadpool/CachedThreadPool.java | 144 +++++++++++++ .../autoexec/ProcessWaitTask.java | 69 +++++++ .../autoexecrunner/common/config/Config.java | 13 ++ .../core/ExecProcessCommand.java | 1 + .../autoexecrunner/dto/ThreadPoolVo.java | 124 ++++++++++++ .../autoexecrunner/dto/ThreadTaskVo.java | 84 ++++++++ .../autoexecrunner/dto/ThreadVo.java | 62 ++++++ .../startup/handler/AutoexecQueueThread.java | 157 +++++++++++++++ 12 files changed, 962 insertions(+), 14 deletions(-) create mode 100644 src/main/java/com/neatlogic/autoexecrunner/asynchronization/NeatLogicThread.java create mode 100644 src/main/java/com/neatlogic/autoexecrunner/asynchronization/queue/NeatLogicUniqueBlockingQueue.java create mode 100644 src/main/java/com/neatlogic/autoexecrunner/asynchronization/threadpool/CachedThreadPool.java create mode 100644 src/main/java/com/neatlogic/autoexecrunner/autoexec/ProcessWaitTask.java create mode 100644 src/main/java/com/neatlogic/autoexecrunner/dto/ThreadPoolVo.java create mode 100644 src/main/java/com/neatlogic/autoexecrunner/dto/ThreadTaskVo.java create mode 100644 src/main/java/com/neatlogic/autoexecrunner/dto/ThreadVo.java create mode 100644 src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java diff --git a/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java b/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java index 7622d2e..6ed4b7a 100644 --- a/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java +++ b/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java @@ -17,10 +17,9 @@ package com.neatlogic.autoexecrunner.api.job; import com.alibaba.fastjson.JSONObject; import com.neatlogic.autoexecrunner.asynchronization.threadlocal.UserContext; import com.neatlogic.autoexecrunner.constvalue.JobAction; -import com.neatlogic.autoexecrunner.core.ExecProcessCommand; import com.neatlogic.autoexecrunner.dto.CommandVo; import com.neatlogic.autoexecrunner.restful.core.privateapi.PrivateApiComponentBase; -import com.neatlogic.autoexecrunner.threadpool.CommonThreadPool; +import com.neatlogic.autoexecrunner.startup.handler.AutoexecQueueThread; import com.neatlogic.autoexecrunner.util.FileUtil; import org.apache.commons.collections4.CollectionUtils; import org.springframework.stereotype.Component; @@ -83,8 +82,7 @@ public class JobExecApi extends PrivateApiComponentBase { } commandList.add("--reuseconslog"); commandVo.setCommandList(commandList); - ExecProcessCommand processCommand = new ExecProcessCommand(commandVo); - CommonThreadPool.execute(processCommand); + AutoexecQueueThread.addUpdateTagent(commandVo); return null; } diff --git a/src/main/java/com/neatlogic/autoexecrunner/asynchronization/NeatLogicThread.java b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/NeatLogicThread.java new file mode 100644 index 0000000..d37b747 --- /dev/null +++ b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/NeatLogicThread.java @@ -0,0 +1,190 @@ +/*Copyright (C) 2024 深圳极向量科技有限公司 All Rights Reserved. + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see .*/ + +package com.neatlogic.autoexecrunner.asynchronization; + + +import com.neatlogic.autoexecrunner.asynchronization.threadlocal.TenantContext; +import com.neatlogic.autoexecrunner.asynchronization.threadlocal.UserContext; +import com.neatlogic.autoexecrunner.exception.core.ApiRuntimeException; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; + +public abstract class NeatLogicThread implements Runnable, Comparable { + private static final Logger logger = LoggerFactory.getLogger(NeatLogicThread.class); + protected UserContext userContext; + private final String tenantUuid; + private String threadName; + private boolean isUnique = false; + /* For generating thread ID */ + private long id; + private int priority = 3;//默认优先级是3,数字越低优先级越高 + private Semaphore lock;//用于hold住其他异步线程,控制两个异步线程的先后顺序 + private CountDownLatch countDownLatch;//用于hold住主线程,这里只需要countdown,不需要等待 + private boolean needAwaitAdvance = true;// 是否需要等待所有模块加载完成后再任务 + + @Override + public int compareTo(NeatLogicThread other) { + return Integer.compare(this.priority, other.priority); // 优先级高的先出队,priority越小代表优先级越高 + } + + public Semaphore getLock() { + return lock; + } + + public void setLock(Semaphore lock) { + this.lock = lock; + } + + public long getId() { + return id; + } + + public void setId(long id) { + this.id = id; + } + + public int getPriority() { + if (priority <= 1) { + priority = 1; + } + return priority; + } + + public void setPriority(int priority) { + this.priority = priority; + } + + + + /*public NeatLogicThread() { + userContext = UserContext.get(); + tenantContext = TenantContext.get(); + inputFromContext = InputFromContext.get(); + }*/ + + /*public NeatLogicThread(UserContext _userContext, TenantContext _tenantContext) { + if (_userContext != null) { + userContext = _userContext.copy(); + } + tenantUuid = _tenantContext.getTenantUuid(); + activeModuleList = _tenantContext.getActiveModuleList(); + inputFromContext = InputFromContext.get(); + }*/ + + + public NeatLogicThread(String _threadName) { + UserContext tmp = UserContext.get(); + if (tmp != null) { + userContext = tmp.copy(); + } + tenantUuid = TenantContext.get().getTenantUuid(); + this.threadName = _threadName; + } + + public NeatLogicThread(String _threadName, int priority) { + UserContext tmp = UserContext.get(); + if (tmp != null) { + userContext = tmp.copy(); + } + tenantUuid = TenantContext.get().getTenantUuid(); + this.threadName = _threadName; + this.priority = priority; + } + + public NeatLogicThread(String _threadName, boolean _isUnique) { + userContext = UserContext.get(); + tenantUuid = TenantContext.get().getTenantUuid(); + this.threadName = _threadName; + this.isUnique = _isUnique; + } + + @Override + public final void run() { + TenantContext tenantContext = TenantContext.init(tenantUuid); + UserContext.init(userContext); + try { + String oldThreadName = Thread.currentThread().getName(); + if (StringUtils.isNotBlank(threadName)) { + Thread.currentThread().setName(threadName); + } + + if (this.lock != null) { + //System.out.println(this.getThreadName() + "尝试获取锁" + this.lock); + lock.acquire(); + //System.out.println(this.getThreadName() + "成功获取锁" + this.lock); + } + execute(); + Thread.currentThread().setName(oldThreadName); + } catch (ApiRuntimeException ex) { + logger.warn(ex.getMessage(), ex); + } catch (Exception ex) { + logger.error(ex.getMessage(), ex); + } finally { + if (this.lock != null) { + lock.release(); + //System.out.println(this.getThreadName() + "成功释放锁" + this.lock); + } + if (countDownLatch != null) { + countDownLatch.countDown(); + } + // 清除所有threadlocal + if (TenantContext.get() != null) { + TenantContext.get().release(); + } + if (UserContext.get() != null) { + UserContext.get().release(); + } + } + } + + protected abstract void execute(); + + public void setUnique(boolean unique) { + isUnique = unique; + } + + public boolean isUnique() { + return isUnique; + } + + public void setThreadName(String threadName) { + this.threadName = threadName; + } + + public String getThreadName() { + return threadName; + } + + public CountDownLatch getCountDownLatch() { + return countDownLatch; + } + + public void setCountDownLatch(CountDownLatch countDownLatch) { + this.countDownLatch = countDownLatch; + } + + public boolean isNeedAwaitAdvance() { + return needAwaitAdvance; + } + + public void setNeedAwaitAdvance(boolean needAwaitAdvance) { + this.needAwaitAdvance = needAwaitAdvance; + } +} diff --git a/src/main/java/com/neatlogic/autoexecrunner/asynchronization/queue/NeatLogicUniqueBlockingQueue.java b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/queue/NeatLogicUniqueBlockingQueue.java new file mode 100644 index 0000000..ae9deaf --- /dev/null +++ b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/queue/NeatLogicUniqueBlockingQueue.java @@ -0,0 +1,102 @@ +/* + * Copyright (C) 2024 深圳极向量科技有限公司 All Rights Reserved. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package com.neatlogic.autoexecrunner.asynchronization.queue; + +import com.alibaba.fastjson.JSON; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; + +public class NeatLogicUniqueBlockingQueue { + private static final Logger logger = LoggerFactory.getLogger(NeatLogicUniqueBlockingQueue.class); + private final BlockingQueue> blockingQueue; + private final ConcurrentHashMap taskMap; // 用于去重 + + public NeatLogicUniqueBlockingQueue(int capacity) { + this.blockingQueue = new LinkedBlockingQueue<>(capacity); + this.taskMap = new ConcurrentHashMap<>(); + } + + public boolean offer(T t) { + Task task = new Task<>(t); + // 保证任务唯一性 + if (taskMap.putIfAbsent(task.getUniqueKey(), Boolean.TRUE) == null) { + logger.debug("====TagentUpdateInfo-addQueue:" + JSON.toJSONString(task)); + // 如果任务是新任务,放入队列 + boolean added = blockingQueue.offer(task); + if (!added) { + // 如果队列已满,移除任务标记 + taskMap.remove(task.getUniqueKey()); + logger.error("Queue is full!"); + } + return added; + } else { + if (t != null) { + logger.debug("NeatLogicUniqueBlockingQueue repeat: {}", JSON.toJSONString(t)); + } + } + return false; // 已存在任务,直接返回 false + } + + public T take() throws InterruptedException { + Task task = blockingQueue.take(); // 阻塞式获取任务 + taskMap.remove(task.getUniqueKey()); // 移除已处理任务的唯一标记 + return task.getT(); + } + + private static class Task { + private final T t; + + public Task(T t) { + this.t = t; + } + + public T getT() { + return t; + } + + public String getUniqueKey() { + return String.valueOf(t.hashCode()); + } + } + + public int size(){ + return blockingQueue.size(); + } + +// public static void main(String[] args) throws InterruptedException { +// NeatLogicUniqueBlockingQueue queue = new NeatLogicUniqueBlockingQueue<>(1); +// +// // 模拟任务插入 +// UserSessionVo a = new UserSessionVo(); +// a.setToken("1111"); +// System.out.println(queue.offer(a)); // 返回 true,任务插入成功 +// UserSessionVo b = new UserSessionVo(); +// b.setToken("222"); +// System.out.println(queue.offer(b)); // 返回 false,任务已存在 +// +// // 模拟任务消费 +// UserSessionVo task = queue.take(); // 消费 "task1" +// UserSessionVo task2 = queue.take(); // 消费 "task1" +// UserSessionVo task3 = queue.take(); // 消费 "task1" +// } +} + diff --git a/src/main/java/com/neatlogic/autoexecrunner/asynchronization/threadlocal/UserContext.java b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/threadlocal/UserContext.java index 89bf0a2..f63f71a 100755 --- a/src/main/java/com/neatlogic/autoexecrunner/asynchronization/threadlocal/UserContext.java +++ b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/threadlocal/UserContext.java @@ -27,19 +27,23 @@ public class UserContext implements Serializable { private String timezone = "+8:00"; private String token; private List roleUuidList = new ArrayList<>(); - + + public UserContext copy() { + UserContext userContext = new UserContext(); + userContext.setRequest(request); + userContext.setToken(token); + userContext.setTenant(tenant); + userContext.setUserName(userName); + userContext.setUserId(userId); + userContext.setUserUuid(userUuid); + userContext.setTimezone(timezone); + return userContext; + } + public static UserContext init(UserContext _userContext) { UserContext context = new UserContext(); if (_userContext != null) { - context.setUserId(_userContext.getUserId()); - context.setUserUuid(_userContext.getUserUuid()); - context.setUserName(_userContext.getUserName()); - context.setTenant(_userContext.getTenant()); - context.setTimezone(_userContext.getTimezone()); - context.setToken(_userContext.getToken()); - // context.setRequest(_userContext.getRequest()); - // context.setResponse(_userContext.getResponse()); - context.setRoleUuidList(_userContext.getRoleUuidList()); + context = _userContext.copy(); } instance.set(context); return context; diff --git a/src/main/java/com/neatlogic/autoexecrunner/asynchronization/threadpool/CachedThreadPool.java b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/threadpool/CachedThreadPool.java new file mode 100644 index 0000000..b1f50a2 --- /dev/null +++ b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/threadpool/CachedThreadPool.java @@ -0,0 +1,144 @@ +/*Copyright (C) 2024 深圳极向量科技有限公司 All Rights Reserved. + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see .*/ + +package com.neatlogic.autoexecrunner.asynchronization.threadpool; + + +import com.neatlogic.autoexecrunner.asynchronization.NeatLogicThread; +import com.neatlogic.autoexecrunner.dto.ThreadPoolVo; +import com.neatlogic.autoexecrunner.dto.ThreadTaskVo; +import com.neatlogic.autoexecrunner.dto.ThreadVo; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.util.*; +import java.util.concurrent.*; + +public class CachedThreadPool { + static int rank = 15; + static int cpu = Runtime.getRuntime().availableProcessors(); + private static final Log logger = LogFactory.getLog(CachedThreadPool.class); + private static final Map threadTaskMap = new ConcurrentHashMap<>(); + private static final Map threadMap = new ConcurrentHashMap<>(); + private static final Set threadSet = new HashSet<>(); + private static final PriorityBlockingQueue threadQueue = new PriorityBlockingQueue<>(); + + static class NeatLogicThreadFactory implements ThreadFactory { + @Override + public Thread newThread(Runnable r) { + Thread thread = new Thread(r) { + @Override + public void run() { + try { + super.run(); + } finally { + threadMap.remove(this.getId()); + } + } + }; + threadMap.put(thread.getId(), new ThreadVo(thread.getId(), thread.getName())); + return thread; + } + } + + private static final ThreadPoolExecutor mainThreadPool = new ThreadPoolExecutor(0, cpu * rank, + 60L, TimeUnit.SECONDS, + new SynchronousQueue<>(), new NeatLogicThreadFactory(), new NeatLogicRejectHandler()) { + @Override + protected void beforeExecute(Thread t, Runnable r) { + super.beforeExecute(t, r); + if (r instanceof NeatLogicThread) { + NeatLogicThread nt = (NeatLogicThread) r; + nt.setId(t.getId()); + ThreadTaskVo threadVo = new ThreadTaskVo(); + threadVo.setId(t.getId()); + threadVo.setName(nt.getThreadName()); + threadVo.setPoolName("main"); + threadVo.setStartTime(new Date()); + threadVo.setPriority(nt.getPriority()); + threadTaskMap.put(nt.getId(), threadVo); + threadSet.add(nt.getThreadName()); + } + } + + @Override + protected void afterExecute(Runnable r, Throwable t) { + super.afterExecute(r, t); + + // 任务完成后从 activeTasks 中移除 + if (r instanceof NeatLogicThread) { + NeatLogicThread task = (NeatLogicThread) r; + threadTaskMap.remove(task.getId()); + threadSet.remove(task.getThreadName()); + } + //尝试从队列中拿出任务处理 + Runnable task = threadQueue.poll(); + if (task != null) { + mainThreadPool.execute(task); + } + } + }; + + public static void execute(NeatLogicThread command, CountDownLatch countDownLatch) { + command.setCountDownLatch(countDownLatch); + execute(command); + } + + public static void execute(NeatLogicThread command, Semaphore lock) { + command.setLock(lock); + execute(command); + } + + public static void execute(NeatLogicThread command) { + try { + boolean isExists = command.isUnique() && StringUtils.isNotBlank(command.getThreadName()) && threadSet.contains(command.getThreadName()); + if (!isExists) { + mainThreadPool.execute(command); + } else { + logger.warn(command.getThreadName() + " is running"); + } + } catch (RejectedExecutionException ex) { + logger.error(ex.getMessage(), ex); + } + } + + + static class NeatLogicRejectHandler implements RejectedExecutionHandler { + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + //进入等待队列 + if (r instanceof NeatLogicThread) { + threadQueue.offer((NeatLogicThread) r); + } else { + logger.error("线程池已满,非NeatLogicThread子类线程将被抛弃"); + } + } + } + + + public static ThreadPoolVo getStatus() { + ThreadPoolVo threadPoolVo = new ThreadPoolVo(); + List threadTasks = new ArrayList<>(threadTaskMap.values()); + List threads = new ArrayList<>(threadMap.values()); + threadPoolVo.setThreadTaskList(threadTasks); + threadPoolVo.setThreadList(threads); + threadPoolVo.setMaxThreadCount(cpu * rank); + threadPoolVo.setMainPoolSize(mainThreadPool.getPoolSize()); + threadPoolVo.setMainActiveCount(mainThreadPool.getActiveCount()); + threadPoolVo.setMainQueueSize(threadQueue.size()); + return threadPoolVo; + } +} diff --git a/src/main/java/com/neatlogic/autoexecrunner/autoexec/ProcessWaitTask.java b/src/main/java/com/neatlogic/autoexecrunner/autoexec/ProcessWaitTask.java new file mode 100644 index 0000000..8068c16 --- /dev/null +++ b/src/main/java/com/neatlogic/autoexecrunner/autoexec/ProcessWaitTask.java @@ -0,0 +1,69 @@ +package com.neatlogic.autoexecrunner.autoexec; + +import com.alibaba.fastjson.JSONObject; +import com.neatlogic.autoexecrunner.asynchronization.NeatLogicThread; +import com.neatlogic.autoexecrunner.dto.CommandVo; +import com.neatlogic.autoexecrunner.startup.handler.AutoexecQueueThread; +import org.apache.commons.collections4.MapUtils; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.reflect.Field; + +public class ProcessWaitTask extends NeatLogicThread { + private static final Logger logger = LoggerFactory.getLogger(ProcessWaitTask.class); + private final Process process; + private final CommandVo commandVo; + private final JSONObject payload; + + public ProcessWaitTask(Process process, CommandVo commandVo, JSONObject payload) { + super("THREAD-AUTOEXEC-WAIT-" + commandVo.getTenant() + "-" + commandVo.getJobId() + "-" + (MapUtils.isNotEmpty(commandVo.getPassThroughEnv()) ? commandVo.getPassThroughEnv().getString("groupSort") : StringUtils.EMPTY)); + this.process = process; + this.commandVo = commandVo; + this.payload = payload; + } + + + @Override + protected void execute() { + + try { + int exitCode = process.waitFor(); + int exitStatus = process.exitValue(); + commandVo.setExitValue(exitStatus); + logger.debug("进程[{}] 退出,状态码: {}", getPid(process), exitCode); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error(String.format("等待被中断: 入参:%s, errorMsg:%s", payload, e.getMessage()), e); + } finally { + // 确保关闭流 + closeQuietly(process.getInputStream()); + closeQuietly(process.getErrorStream()); + closeQuietly(process.getOutputStream()); + AutoexecQueueThread.removeProcess(process); + } + } + + // 兼容不同JDK版本的PID获取 + private long getPid(Process p) { + try { + if (p.getClass().getName().contains("UNIXProcess")) { + Field pidField = p.getClass().getDeclaredField("pid"); + pidField.setAccessible(true); + return pidField.getLong(p); + } + } catch (Exception e) { + // 忽略异常 + } + return -1; // 未知PID + } + + private void closeQuietly(java.io.Closeable c) { + try { + if (c != null) c.close(); + } catch (IOException ignore) { + } + } +} diff --git a/src/main/java/com/neatlogic/autoexecrunner/common/config/Config.java b/src/main/java/com/neatlogic/autoexecrunner/common/config/Config.java index 7a5eb5e..bac3c8c 100644 --- a/src/main/java/com/neatlogic/autoexecrunner/common/config/Config.java +++ b/src/main/java/com/neatlogic/autoexecrunner/common/config/Config.java @@ -37,6 +37,8 @@ public class Config { private static String DATA_HOME;//文件根目录 private static String DEPLOY_HOME;//发布目录 private static String GITLAB_PASSWORD;// gitlab private_token + private static Integer MAX_PROCESS_QUEUE_SIZE;//最大自动化作业队列数,多余的则丢弃 + private static Integer MAX_PROCESS_EXECUTE_COUNT;//最大执行作业数,超过的则进入队列 //neatlogic private static String NEATLOGIC_ROOT; @@ -168,6 +170,13 @@ public class Config { return AUTOEXEC_TOKEN; } + public static Integer MAX_PROCESS_QUEUE_SIZE() { + return MAX_PROCESS_QUEUE_SIZE; + } + public static Integer MAX_PROCESS_EXECUTE_COUNT() { + return MAX_PROCESS_EXECUTE_COUNT; + } + @PostConstruct public void init() { try { @@ -238,6 +247,10 @@ public class Config { AUTOEXEC_TOKEN = prop.getProperty("autoexec.token", "499922b4317c251c2ce525f7b83e3d94"); UPDATE_RUNNER_STATUS_PERIOD = Integer.parseInt(prop.getProperty("update.runner.status.period", "1800000")); + + MAX_PROCESS_QUEUE_SIZE = Integer.parseInt(prop.getProperty("max.process.queue.size", "2000")); + + MAX_PROCESS_EXECUTE_COUNT = Integer.parseInt(prop.getProperty("max.process.execute.size", "10")); } catch (IOException e) { logger.error(e.getMessage(), e); } diff --git a/src/main/java/com/neatlogic/autoexecrunner/core/ExecProcessCommand.java b/src/main/java/com/neatlogic/autoexecrunner/core/ExecProcessCommand.java index 14acccf..f4b3350 100644 --- a/src/main/java/com/neatlogic/autoexecrunner/core/ExecProcessCommand.java +++ b/src/main/java/com/neatlogic/autoexecrunner/core/ExecProcessCommand.java @@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit; * @author lvzk * @since 2021/4/21 17:12 **/ +@Deprecated public class ExecProcessCommand implements Runnable { private static final Logger logger = LoggerFactory.getLogger(ExecProcessCommand.class); private final ProcessBuilder builder; diff --git a/src/main/java/com/neatlogic/autoexecrunner/dto/ThreadPoolVo.java b/src/main/java/com/neatlogic/autoexecrunner/dto/ThreadPoolVo.java new file mode 100644 index 0000000..a12af67 --- /dev/null +++ b/src/main/java/com/neatlogic/autoexecrunner/dto/ThreadPoolVo.java @@ -0,0 +1,124 @@ +/* + * Copyright (C) 2024 深圳极向量科技有限公司 All Rights Reserved. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package com.neatlogic.autoexecrunner.dto; + +import org.apache.commons.collections4.CollectionUtils; + +import java.util.ArrayList; +import java.util.List; + +public class ThreadPoolVo { + private int mainQueueSize; + private int mainPoolSize; + private int backupQueueSize; + private int backupPoolSize; + private int mainActiveCount; + private int backupActiveCount; + private int maxThreadCount; + private List threadTaskList = new ArrayList<>(); + private List threadList = new ArrayList<>(); + + public int getMaxThreadCount() { + return maxThreadCount; + } + + public void setMaxThreadCount(int maxThreadCount) { + this.maxThreadCount = maxThreadCount; + } + + + public int getMainQueueSize() { + return mainQueueSize; + } + + public void setMainQueueSize(int mainQueueSize) { + this.mainQueueSize = mainQueueSize; + } + + public int getMainPoolSize() { + return mainPoolSize; + } + + public int getMainActiveCount() { + return mainActiveCount; + } + + public void setMainActiveCount(int mainActiveCount) { + this.mainActiveCount = mainActiveCount; + } + + public int getBackupActiveCount() { + return backupActiveCount; + } + + public void setBackupActiveCount(int backupActiveCount) { + this.backupActiveCount = backupActiveCount; + } + + public void setMainPoolSize(int mainPoolSize) { + this.mainPoolSize = mainPoolSize; + } + + public int getBackupQueueSize() { + return backupQueueSize; + } + + public void setBackupQueueSize(int backupQueueSize) { + this.backupQueueSize = backupQueueSize; + } + + public int getBackupPoolSize() { + return backupPoolSize; + } + + public void setBackupPoolSize(int backupPoolSize) { + this.backupPoolSize = backupPoolSize; + } + + boolean isSorted = false; + + public List getThreadTaskList() { + if (CollectionUtils.isNotEmpty(threadTaskList)) { + threadTaskList.sort((o1, o2) -> { + long s = o1.getStartTime().getTime(); + long e = o2.getStartTime().getTime(); + return Long.compare(s, e); + }); + } + return threadTaskList; + } + + public void setThreadTaskList(List threadTaskList) { + this.threadTaskList = threadTaskList; + } + + public List getThreadList() { + if (CollectionUtils.isNotEmpty(threadList)) { + threadList.sort((o1, o2) -> { + long s = o1.getStartTime().getTime(); + long e = o2.getStartTime().getTime(); + return Long.compare(s, e); + }); + } + return threadList; + } + + public void setThreadList(List threadList) { + this.threadList = threadList; + } +} diff --git a/src/main/java/com/neatlogic/autoexecrunner/dto/ThreadTaskVo.java b/src/main/java/com/neatlogic/autoexecrunner/dto/ThreadTaskVo.java new file mode 100644 index 0000000..0c95c6c --- /dev/null +++ b/src/main/java/com/neatlogic/autoexecrunner/dto/ThreadTaskVo.java @@ -0,0 +1,84 @@ +/* + * Copyright (C) 2024 深圳极向量科技有限公司 All Rights Reserved. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package com.neatlogic.autoexecrunner.dto; + +import java.util.Date; + +public class ThreadTaskVo { + private Long id; + private String name; + private Date startTime; + private long timeCost; + private String poolName; + private String status; + private int priority; + + public Long getId() { + return id; + } + + public void setId(Long id) { + this.id = id; + } + + public String getStatus() { + return status; + } + + public void setStatus(String status) { + this.status = status; + } + + public int getPriority() { + return priority; + } + + public void setPriority(int priority) { + this.priority = priority; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + + public Date getStartTime() { + return startTime; + } + + public void setStartTime(Date startTime) { + this.startTime = startTime; + } + + public long getTimeCost() { + return System.currentTimeMillis() - this.startTime.getTime(); + } + + + public String getPoolName() { + return poolName; + } + + public void setPoolName(String poolName) { + this.poolName = poolName; + } +} diff --git a/src/main/java/com/neatlogic/autoexecrunner/dto/ThreadVo.java b/src/main/java/com/neatlogic/autoexecrunner/dto/ThreadVo.java new file mode 100644 index 0000000..3b4f2c0 --- /dev/null +++ b/src/main/java/com/neatlogic/autoexecrunner/dto/ThreadVo.java @@ -0,0 +1,62 @@ +/* + * Copyright (C) 2024 深圳极向量科技有限公司 All Rights Reserved. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package com.neatlogic.autoexecrunner.dto; + +import java.util.Date; + +public class ThreadVo { + private long id; + private String name; + private Date startTime; + private long timeCost; + + public ThreadVo(long id, String name) { + this.id = id; + this.name = name; + this.startTime = new Date(); + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public long getId() { + return id; + } + + public void setId(long id) { + this.id = id; + } + + public Date getStartTime() { + return startTime; + } + + public void setStartTime(Date startTime) { + this.startTime = startTime; + } + + public long getTimeCost() { + return System.currentTimeMillis() - this.startTime.getTime(); + } + +} diff --git a/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java b/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java new file mode 100644 index 0000000..141674c --- /dev/null +++ b/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java @@ -0,0 +1,157 @@ +package com.neatlogic.autoexecrunner.startup.handler; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.neatlogic.autoexecrunner.asynchronization.queue.NeatLogicUniqueBlockingQueue; +import com.neatlogic.autoexecrunner.asynchronization.threadpool.CachedThreadPool; +import com.neatlogic.autoexecrunner.autoexec.ProcessWaitTask; +import com.neatlogic.autoexecrunner.common.config.Config; +import com.neatlogic.autoexecrunner.dto.CommandVo; +import com.neatlogic.autoexecrunner.startup.IStartUp; +import com.neatlogic.autoexecrunner.util.FileUtil; +import org.apache.commons.collections4.MapUtils; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +public class AutoexecQueueThread implements IStartUp { + private static final BlockingQueue processQueue = new LinkedBlockingQueue<>(Config.MAX_PROCESS_QUEUE_SIZE() + 5); + private static final Logger logger = LoggerFactory.getLogger(AutoexecQueueThread.class); + private static final NeatLogicUniqueBlockingQueue blockingQueue = new NeatLogicUniqueBlockingQueue<>(50000); + private volatile boolean running = true; + + @Override + public String getName() { + return "创建自动化作业线程"; + } + + @Override + public String getDescription() { + return null; + } + + @Override + public void doService() { + // 启动监控线程 + Thread watchdog = new Thread(() -> { + Thread workerThread = null; + + while (running) { + if (workerThread == null || !workerThread.isAlive()) { + System.out.println("工作线程未运行,准备启动..."); + logger.debug("工作线程未运行,准备启动..."); + workerThread = new Thread(() -> { + Thread.currentThread().setName(""); + while (running) { + CommandVo commandVo = null; + try { + // 你的业务逻辑 + if (processQueue.size() <= Config.MAX_PROCESS_QUEUE_SIZE()) { + commandVo = blockingQueue.take(); + logger.debug("作业{} 即将运行...", commandVo.getTenant() + "-" + commandVo.getJobId()); + createSubProcessAndStart(commandVo); + } else { + logger.debug("作业进程最大数量:{}, 需等待运行中的进程结束后,才继续创建队列内的作业进程!", Config.MAX_PROCESS_QUEUE_SIZE()); + } + Thread.sleep(2000); + } catch (InterruptedException e) { + System.out.printf("创建自动化作业子进程的线程被中断...入参:%s ,errorMsg:%s%n", commandVo != null ? JSON.toJSONString(commandVo) : StringUtils.EMPTY, e.getMessage()); + logger.error(String.format("创建自动化作业子进程的线程被中断...入参:%s ,errorMsg:%s%n", commandVo != null ? JSON.toJSONString(commandVo) : StringUtils.EMPTY, e.getMessage()), e); + break; + } catch (Exception e) { + System.out.printf("创建自动化作业子进程失败:入参:%s ,errorMsg:%s%n", commandVo != null ? JSON.toJSONString(commandVo) : StringUtils.EMPTY, e.getMessage()); + logger.error(String.format("创建自动化作业子进程失败:入参:%s ,errorMsg:%s", commandVo != null ? JSON.toJSONString(commandVo) : StringUtils.EMPTY, e.getMessage()), e); + break; // 退出由外层 watchdog 负责重启 + } + } + }); + + workerThread.setDaemon(true); + workerThread.start(); + } + + try { + Thread.sleep(2000); // 每2秒检查一次 + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + } + }); + + watchdog.setDaemon(true); + watchdog.start(); + } + + private void createSubProcessAndStart(CommandVo commandVo) { + File NULL_FILE = new File("/dev/null"); + ProcessBuilder builder = new ProcessBuilder(commandVo.getCommandList()); + builder.redirectOutput(NULL_FILE); + File consoleLog = FileUtil.createFile(commandVo.getConsoleLogPath()); + builder.redirectError(ProcessBuilder.Redirect.appendTo(consoleLog)); + Map env = builder.environment(); + JSONObject environment = commandVo.getEnvironment(); + if (MapUtils.isNotEmpty(environment)) { + for (Map.Entry entry : environment.entrySet()) { + env.put(entry.getKey(), entry.getValue().toString()); + } + } + env.put("tenant", commandVo.getTenant()); + JSONObject payload = new JSONObject(); + Process process = null; + try { + payload.put("jobId", commandVo.getJobId()); + payload.put("status", 1); + payload.put("command", commandVo); + payload.put("passThroughEnv", commandVo.getPassThroughEnv().toJSONString()); + process = builder.start(); + addProcess(process); + CachedThreadPool.execute(new ProcessWaitTask(process, commandVo, payload)); + } catch (IOException e) { + logger.error(String.format("进程启动失败: %s ,error: %s", payload, e.getMessage()), e); + System.err.println("进程启动失败: " + payload + ",error:" + e.getMessage()); + } finally { + // 确保关闭流 + if (process != null) { + closeQuietly(process.getInputStream()); + closeQuietly(process.getErrorStream()); + closeQuietly(process.getOutputStream()); + } + } + } + + private void closeQuietly(java.io.Closeable c) { + try { + if (c != null) c.close(); + } catch (IOException ignore) { + } + } + + public void stop() { + running = false; + } + + public static void addUpdateTagent(CommandVo commandVo) { + blockingQueue.offer(commandVo); + } + + public static void addProcess(Process process) { + boolean result = processQueue.offer(process); + if (!result) { + logger.error("processQueue offer failed!current queue size is :{}", processQueue.size()); + } + } + + public static void removeProcess(Process process) { + boolean result = processQueue.remove(process); + if (!result) { + logger.error("processQueue remove failed!current queue size is :{}", processQueue.size()); + } + } +} -- Gitee From 6e2f08b7b2a55c236e944eeb8632533df043d3a3 Mon Sep 17 00:00:00 2001 From: lvzk <897706680@qq.com> Date: Thu, 10 Apr 2025 19:08:26 +0800 Subject: [PATCH 02/15] =?UTF-8?q?[=E5=8A=9F=E8=83=BD]=E5=88=9B=E5=BB=BA?= =?UTF-8?q?=E4=BD=9C=E4=B8=9A=E5=A2=9E=E5=8A=A0=E6=95=B0=E9=87=8F=E9=99=90?= =?UTF-8?q?=E5=88=B6=E6=8E=92=E9=98=9F=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../asynchronization/NeatLogicThread.java | 5 + .../queue/NeatLogicUniqueBlockingQueue.java | 17 ++- .../threadlocal/RequestContext.java | 134 ++++++++++++++++++ .../autoexec/ProcessWaitTask.java | 17 +-- .../autoexecrunner/common/config/Config.java | 4 +- .../core/ExecProcessCommand.java | 1 - .../filter/JsonWebTokenValidFilter.java | 4 +- .../logback/RequestUrlConverter.java | 37 +++++ .../logback/TenantConverter.java | 38 +++++ .../startup/handler/AutoexecQueueThread.java | 59 ++++---- src/main/resources/logback-spring.xml | 47 ++++-- 11 files changed, 315 insertions(+), 48 deletions(-) create mode 100644 src/main/java/com/neatlogic/autoexecrunner/asynchronization/threadlocal/RequestContext.java create mode 100644 src/main/java/com/neatlogic/autoexecrunner/logback/RequestUrlConverter.java create mode 100644 src/main/java/com/neatlogic/autoexecrunner/logback/TenantConverter.java diff --git a/src/main/java/com/neatlogic/autoexecrunner/asynchronization/NeatLogicThread.java b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/NeatLogicThread.java index d37b747..87e8f72 100644 --- a/src/main/java/com/neatlogic/autoexecrunner/asynchronization/NeatLogicThread.java +++ b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/NeatLogicThread.java @@ -16,6 +16,7 @@ along with this program. If not, see .*/ package com.neatlogic.autoexecrunner.asynchronization; +import com.neatlogic.autoexecrunner.asynchronization.threadlocal.RequestContext; import com.neatlogic.autoexecrunner.asynchronization.threadlocal.TenantContext; import com.neatlogic.autoexecrunner.asynchronization.threadlocal.UserContext; import com.neatlogic.autoexecrunner.exception.core.ApiRuntimeException; @@ -29,6 +30,7 @@ import java.util.concurrent.Semaphore; public abstract class NeatLogicThread implements Runnable, Comparable { private static final Logger logger = LoggerFactory.getLogger(NeatLogicThread.class); protected UserContext userContext; + protected RequestContext requestContext; private final String tenantUuid; private String threadName; private boolean isUnique = false; @@ -95,6 +97,7 @@ public abstract class NeatLogicThread implements Runnable, Comparable { public T take() throws InterruptedException { Task task = blockingQueue.take(); // 阻塞式获取任务 taskMap.remove(task.getUniqueKey()); // 移除已处理任务的唯一标记 + TenantContext tenantContext = TenantContext.get(); + if (tenantContext != null) { + tenantContext.switchTenant(task.getTenantUuid()); + } else { + TenantContext.init(task.getTenantUuid()); + } return task.getT(); } private static class Task { private final T t; + private final String tenantUuid; public Task(T t) { this.t = t; + this.tenantUuid = TenantContext.get().getTenantUuid(); } public T getT() { return t; } + public String getTenantUuid() { + return tenantUuid; + } + public String getUniqueKey() { - return String.valueOf(t.hashCode()); + // 唯一标识任务的 key,可根据需求定义,例如 `tenantUuid-t.hashCode` + //System.out.println(tenantUuid + "-" + t.hashCode()); + return tenantUuid + "-" + t.hashCode(); } } diff --git a/src/main/java/com/neatlogic/autoexecrunner/asynchronization/threadlocal/RequestContext.java b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/threadlocal/RequestContext.java new file mode 100644 index 0000000..efcb667 --- /dev/null +++ b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/threadlocal/RequestContext.java @@ -0,0 +1,134 @@ +/*Copyright (C) 2024 深圳极向量科技有限公司 All Rights Reserved. + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see .*/ + +package com.neatlogic.autoexecrunner.asynchronization.threadlocal; + +import javax.servlet.http.Cookie; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.Serializable; +import java.util.Arrays; +import java.util.Locale; +import java.util.Objects; +import java.util.Optional; + +/** + * 保存请求信息 + */ +public class RequestContext implements Serializable { + private static final ThreadLocal instance = new ThreadLocal<>(); + private static final long serialVersionUID = -5420998728515359626L; + private String url; + private HttpServletRequest request; + private HttpServletResponse response; + //接口访问速率 + private Double apiRate; + //租户接口访问总速率 + private Double tenantRate; + //语言 + Locale locale; + + public String getUrl() { + return url; + } + + public void setUrl(String url) { + this.url = url; + } + + public HttpServletRequest getRequest() { + return request; + } + + public void setRequest(HttpServletRequest request) { + this.request = request; + } + + public HttpServletResponse getResponse() { + return response; + } + + public void setResponse(HttpServletResponse response) { + this.response = response; + } + + public Double getApiRate() { + return apiRate; + } + + public void setApiRate(Double apiRate) { + this.apiRate = apiRate; + } + + + public Double getTenantRate() { + return tenantRate; + } + + public void setTenantRate(Double tenantRate) { + this.tenantRate = tenantRate; + } + + public Locale getLocale() { + return locale; + } + + public void setLocale(Locale locale) { + this.locale = locale; + } + + public static RequestContext init(RequestContext _requestContext) { + RequestContext context = new RequestContext(); + if (_requestContext != null) { + context.setUrl(_requestContext.getUrl()); + context.setLocale(_requestContext.getLocale()); + } + instance.set(context); + return context; + } + + public static RequestContext init(HttpServletRequest request, String url, HttpServletResponse response) { + RequestContext context = new RequestContext(request, url); + context.setResponse(response); + instance.set(context); + if (request.getCookies() != null && request.getCookies().length > 0) { + Optional languageCookie = Arrays.stream(request.getCookies()).filter(o -> Objects.equals(o.getName(), "neatlogic_language")).findFirst(); + if (languageCookie.isPresent()) { + context.setLocale(new Locale(languageCookie.get().getValue())); + } else { + context.setLocale(Locale.getDefault()); + } + } + return context; + } + + private RequestContext() { + + } + + private RequestContext(HttpServletRequest request, String url) { + this.url = url; + this.request = request; + } + + public static RequestContext get() { + return instance.get(); + } + + public void release() { + instance.remove(); + } + +} diff --git a/src/main/java/com/neatlogic/autoexecrunner/autoexec/ProcessWaitTask.java b/src/main/java/com/neatlogic/autoexecrunner/autoexec/ProcessWaitTask.java index 8068c16..c2b0f81 100644 --- a/src/main/java/com/neatlogic/autoexecrunner/autoexec/ProcessWaitTask.java +++ b/src/main/java/com/neatlogic/autoexecrunner/autoexec/ProcessWaitTask.java @@ -4,8 +4,6 @@ import com.alibaba.fastjson.JSONObject; import com.neatlogic.autoexecrunner.asynchronization.NeatLogicThread; import com.neatlogic.autoexecrunner.dto.CommandVo; import com.neatlogic.autoexecrunner.startup.handler.AutoexecQueueThread; -import org.apache.commons.collections4.MapUtils; -import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -17,32 +15,35 @@ public class ProcessWaitTask extends NeatLogicThread { private final Process process; private final CommandVo commandVo; private final JSONObject payload; + private final String jobName; - public ProcessWaitTask(Process process, CommandVo commandVo, JSONObject payload) { - super("THREAD-AUTOEXEC-WAIT-" + commandVo.getTenant() + "-" + commandVo.getJobId() + "-" + (MapUtils.isNotEmpty(commandVo.getPassThroughEnv()) ? commandVo.getPassThroughEnv().getString("groupSort") : StringUtils.EMPTY)); + public ProcessWaitTask(Process process, CommandVo commandVo, JSONObject payload, String jobName) { + super("THREAD-AUTOEXEC-WAIT-" + jobName); this.process = process; this.commandVo = commandVo; this.payload = payload; + this.jobName = jobName; } @Override protected void execute() { - + Long pid = null; try { int exitCode = process.waitFor(); int exitStatus = process.exitValue(); commandVo.setExitValue(exitStatus); - logger.debug("进程[{}] 退出,状态码: {}", getPid(process), exitCode); + pid = getPid(process); + logger.debug("process[{}] finished,exitCode: {}", pid, exitCode); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - logger.error(String.format("等待被中断: 入参:%s, errorMsg:%s", payload, e.getMessage()), e); + logger.error(String.format("thread interrupt: param:%s, errorMsg:%s", payload, e.getMessage()), e); } finally { // 确保关闭流 closeQuietly(process.getInputStream()); closeQuietly(process.getErrorStream()); closeQuietly(process.getOutputStream()); - AutoexecQueueThread.removeProcess(process); + AutoexecQueueThread.removeProcess(process, jobName, pid); } } diff --git a/src/main/java/com/neatlogic/autoexecrunner/common/config/Config.java b/src/main/java/com/neatlogic/autoexecrunner/common/config/Config.java index bac3c8c..dd660c9 100644 --- a/src/main/java/com/neatlogic/autoexecrunner/common/config/Config.java +++ b/src/main/java/com/neatlogic/autoexecrunner/common/config/Config.java @@ -248,9 +248,9 @@ public class Config { UPDATE_RUNNER_STATUS_PERIOD = Integer.parseInt(prop.getProperty("update.runner.status.period", "1800000")); - MAX_PROCESS_QUEUE_SIZE = Integer.parseInt(prop.getProperty("max.process.queue.size", "2000")); + MAX_PROCESS_QUEUE_SIZE = Integer.parseInt(prop.getProperty("max.process.queue.size", "1000")); - MAX_PROCESS_EXECUTE_COUNT = Integer.parseInt(prop.getProperty("max.process.execute.size", "10")); + MAX_PROCESS_EXECUTE_COUNT = Integer.parseInt(prop.getProperty("max.process.execute.count", "20")); } catch (IOException e) { logger.error(e.getMessage(), e); } diff --git a/src/main/java/com/neatlogic/autoexecrunner/core/ExecProcessCommand.java b/src/main/java/com/neatlogic/autoexecrunner/core/ExecProcessCommand.java index f4b3350..14acccf 100644 --- a/src/main/java/com/neatlogic/autoexecrunner/core/ExecProcessCommand.java +++ b/src/main/java/com/neatlogic/autoexecrunner/core/ExecProcessCommand.java @@ -24,7 +24,6 @@ import java.util.concurrent.TimeUnit; * @author lvzk * @since 2021/4/21 17:12 **/ -@Deprecated public class ExecProcessCommand implements Runnable { private static final Logger logger = LoggerFactory.getLogger(ExecProcessCommand.class); private final ProcessBuilder builder; diff --git a/src/main/java/com/neatlogic/autoexecrunner/filter/JsonWebTokenValidFilter.java b/src/main/java/com/neatlogic/autoexecrunner/filter/JsonWebTokenValidFilter.java index d942f06..917777b 100755 --- a/src/main/java/com/neatlogic/autoexecrunner/filter/JsonWebTokenValidFilter.java +++ b/src/main/java/com/neatlogic/autoexecrunner/filter/JsonWebTokenValidFilter.java @@ -2,6 +2,7 @@ package com.neatlogic.autoexecrunner.filter; import com.alibaba.fastjson.JSONObject; +import com.neatlogic.autoexecrunner.asynchronization.threadlocal.RequestContext; import com.neatlogic.autoexecrunner.asynchronization.threadlocal.TenantContext; import com.neatlogic.autoexecrunner.asynchronization.threadlocal.UserContext; import com.neatlogic.autoexecrunner.common.config.Config; @@ -41,7 +42,8 @@ public class JsonWebTokenValidFilter extends OncePerRequestFilter { UserVo userVo = null; JSONObject redirectObj = new JSONObject(); String authType = null; - + //初始化request上下文 + RequestContext.init(request, request.getRequestURI(), response); //判断租户 try { String tenant = request.getHeader("Tenant"); diff --git a/src/main/java/com/neatlogic/autoexecrunner/logback/RequestUrlConverter.java b/src/main/java/com/neatlogic/autoexecrunner/logback/RequestUrlConverter.java new file mode 100644 index 0000000..cb3cfb1 --- /dev/null +++ b/src/main/java/com/neatlogic/autoexecrunner/logback/RequestUrlConverter.java @@ -0,0 +1,37 @@ +/*Copyright (C) $today.year 深圳极向量科技有限公司 All Rights Reserved. + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see .*/ + +package com.neatlogic.autoexecrunner.logback; + +import ch.qos.logback.classic.pattern.ClassicConverter; +import ch.qos.logback.classic.spi.ILoggingEvent; +import com.neatlogic.autoexecrunner.asynchronization.threadlocal.RequestContext; + +public class RequestUrlConverter extends ClassicConverter { + /** + * The convert method is responsible for extracting data from the event and + * storing it for later use by the write method. + * + * @param event + */ + @Override + public String convert(ILoggingEvent event) { + RequestContext requestContext = RequestContext.get(); + if (requestContext != null) { + return requestContext.getUrl(); + } + return ""; + } +} diff --git a/src/main/java/com/neatlogic/autoexecrunner/logback/TenantConverter.java b/src/main/java/com/neatlogic/autoexecrunner/logback/TenantConverter.java new file mode 100644 index 0000000..b20ceb4 --- /dev/null +++ b/src/main/java/com/neatlogic/autoexecrunner/logback/TenantConverter.java @@ -0,0 +1,38 @@ +/*Copyright (C) $today.year 深圳极向量科技有限公司 All Rights Reserved. + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see .*/ + +package com.neatlogic.autoexecrunner.logback; + + +import ch.qos.logback.classic.pattern.ClassicConverter; +import ch.qos.logback.classic.spi.ILoggingEvent; +import com.neatlogic.autoexecrunner.asynchronization.threadlocal.TenantContext; + +public class TenantConverter extends ClassicConverter { + /** + * The convert method is responsible for extracting data from the event and + * storing it for later use by the write method. + * + * @param event + */ + @Override + public String convert(ILoggingEvent event) { + TenantContext tenantContext = TenantContext.get(); + if (tenantContext != null) { + return tenantContext.getTenantUuid(); + } + return ""; + } +} diff --git a/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java b/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java index 141674c..a98bfc2 100644 --- a/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java +++ b/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java @@ -16,6 +16,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.lang.reflect.Field; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -44,29 +45,28 @@ public class AutoexecQueueThread implements IStartUp { while (running) { if (workerThread == null || !workerThread.isAlive()) { - System.out.println("工作线程未运行,准备启动..."); - logger.debug("工作线程未运行,准备启动..."); + System.out.println("autoexec job thread is down,ready to start..."); + logger.debug("autoexec job thread is down,ready to start..."); workerThread = new Thread(() -> { - Thread.currentThread().setName(""); + Thread.currentThread().setName("AutoexecQueueThread"); + System.out.println("autoexec job thread start succeed!"); while (running) { CommandVo commandVo = null; try { // 你的业务逻辑 - if (processQueue.size() <= Config.MAX_PROCESS_QUEUE_SIZE()) { + if (processQueue.size() <= Config.MAX_PROCESS_EXECUTE_COUNT()) { commandVo = blockingQueue.take(); - logger.debug("作业{} 即将运行...", commandVo.getTenant() + "-" + commandVo.getJobId()); + logger.debug("current autoexec sub process count:{} <= {},autoexec job:{} will create...", processQueue.size(), Config.MAX_PROCESS_EXECUTE_COUNT(), (commandVo.getTenant() + "-" + commandVo.getJobId() + "-" + (MapUtils.isNotEmpty(commandVo.getPassThroughEnv()) ? commandVo.getPassThroughEnv().getString("groupSort") : StringUtils.EMPTY))); createSubProcessAndStart(commandVo); } else { - logger.debug("作业进程最大数量:{}, 需等待运行中的进程结束后,才继续创建队列内的作业进程!", Config.MAX_PROCESS_QUEUE_SIZE()); + logger.debug("autoexec sub process limit count :{}, need to wait process finish,then keep on creating sub process!", Config.MAX_PROCESS_EXECUTE_COUNT()); } Thread.sleep(2000); } catch (InterruptedException e) { - System.out.printf("创建自动化作业子进程的线程被中断...入参:%s ,errorMsg:%s%n", commandVo != null ? JSON.toJSONString(commandVo) : StringUtils.EMPTY, e.getMessage()); - logger.error(String.format("创建自动化作业子进程的线程被中断...入参:%s ,errorMsg:%s%n", commandVo != null ? JSON.toJSONString(commandVo) : StringUtils.EMPTY, e.getMessage()), e); + logger.error(String.format("autoexec job thread is interrupted...params:%s ,errorMsg:%s%n", commandVo != null ? JSON.toJSONString(commandVo) : StringUtils.EMPTY, e.getMessage()), e); break; } catch (Exception e) { - System.out.printf("创建自动化作业子进程失败:入参:%s ,errorMsg:%s%n", commandVo != null ? JSON.toJSONString(commandVo) : StringUtils.EMPTY, e.getMessage()); - logger.error(String.format("创建自动化作业子进程失败:入参:%s ,errorMsg:%s", commandVo != null ? JSON.toJSONString(commandVo) : StringUtils.EMPTY, e.getMessage()), e); + logger.error(String.format("create sub process failed:params:%s ,errorMsg:%s", commandVo != null ? JSON.toJSONString(commandVo) : StringUtils.EMPTY, e.getMessage()), e); break; // 退出由外层 watchdog 负责重启 } } @@ -104,33 +104,34 @@ public class AutoexecQueueThread implements IStartUp { } env.put("tenant", commandVo.getTenant()); JSONObject payload = new JSONObject(); - Process process = null; + Process process; try { payload.put("jobId", commandVo.getJobId()); payload.put("status", 1); payload.put("command", commandVo); payload.put("passThroughEnv", commandVo.getPassThroughEnv().toJSONString()); process = builder.start(); + String jobName = (commandVo.getTenant() + "-" + commandVo.getJobId() + "-" + (MapUtils.isNotEmpty(commandVo.getPassThroughEnv()) ? commandVo.getPassThroughEnv().getString("groupSort") : StringUtils.EMPTY)); addProcess(process); - CachedThreadPool.execute(new ProcessWaitTask(process, commandVo, payload)); + logger.debug("autoexec job sub process {} is running, pid {},added to processQueue,now processQueue size is {}", jobName, getPid(process), processQueue.size()); + CachedThreadPool.execute(new ProcessWaitTask(process, commandVo, payload, jobName)); } catch (IOException e) { - logger.error(String.format("进程启动失败: %s ,error: %s", payload, e.getMessage()), e); - System.err.println("进程启动失败: " + payload + ",error:" + e.getMessage()); - } finally { - // 确保关闭流 - if (process != null) { - closeQuietly(process.getInputStream()); - closeQuietly(process.getErrorStream()); - closeQuietly(process.getOutputStream()); - } + logger.error(String.format("autoexec job sub process start failed: %s ,error: %s", payload, e.getMessage()), e); } } - private void closeQuietly(java.io.Closeable c) { + // 兼容不同JDK版本的PID获取 + private long getPid(Process p) { try { - if (c != null) c.close(); - } catch (IOException ignore) { + if (p.getClass().getName().contains("UNIXProcess")) { + Field pidField = p.getClass().getDeclaredField("pid"); + pidField.setAccessible(true); + return pidField.getLong(p); + } + } catch (Exception e) { + // 忽略异常 } + return -1; // 未知PID } public void stop() { @@ -148,10 +149,16 @@ public class AutoexecQueueThread implements IStartUp { } } - public static void removeProcess(Process process) { + public static void removeProcess(Process process, String jobName, Long pid) { boolean result = processQueue.remove(process); if (!result) { - logger.error("processQueue remove failed!current queue size is :{}", processQueue.size()); + logger.error("autoexec process:{} (pid:{})finished. processQueue remove failed!current queue size is :{}", jobName, pid, blockingQueue.size()); + } else { + logger.debug("autoexec process:{} (pid:{})finished. processQueue remove succeed!current queue size is :{}", jobName, pid, blockingQueue.size()); } } + + public static Integer getProcessQueueSize() { + return processQueue.size(); + } } diff --git a/src/main/resources/logback-spring.xml b/src/main/resources/logback-spring.xml index e18ecc8..aefed5d 100755 --- a/src/main/resources/logback-spring.xml +++ b/src/main/resources/logback-spring.xml @@ -1,10 +1,12 @@ + + ${LOG_HOME}/debug.log - debug.log.%i + ${LOG_HOME}/debug.log.%i 1 5 @@ -18,14 +20,14 @@ 100MB - [%-5level]%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36}[%line] - %msg%n + [%-5level]%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36}[%line] [%tenant] %requestUrl- %msg%n ${LOG_HOME}/info.log - info.log.%i + ${LOG_HOME}/info.log.%i 1 5 @@ -39,7 +41,7 @@ 100MB - [%-5level]%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36}[%line] - %msg%n + [%-5level]%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36}[%line] [%tenant] %requestUrl- %msg%n @@ -47,7 +49,7 @@ class="ch.qos.logback.core.rolling.RollingFileAppender"> ${LOG_HOME}/warn.log - warn.log.%i + ${LOG_HOME}/warn.log.%i 1 5 @@ -61,14 +63,14 @@ 100MB - [%-5level]%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36}[%line] - %msg%n + [%-5level]%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36}[%line] [%tenant] %requestUrl- %msg%n ${LOG_HOME}/error.log - error.log.%i + ${LOG_HOME}/error.log.%i 1 5 @@ -82,7 +84,27 @@ 100MB - [%-5level]%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36}[%line]- %msg%n + [%-5level]%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36}[%line] [%tenant] %requestUrl- %msg%n + + + + + ${LOG_HOME}/process.log + + ${LOG_HOME}/process.%i + 1 + 5 + + + DEBUG + ACCEPT + DENY + + + 100MB + + + [%-5level]%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36}[%line] [%tenant] %requestUrl- %msg%n @@ -110,7 +132,7 @@ - [%-5level]%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36}[%line] - %msg%n + [%-5level]%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36}[%line] [%tenant] %requestUrl- %msg%n UTF-8 false @@ -137,5 +159,12 @@ + + + + + + + \ No newline at end of file -- Gitee From dc40d3a8746ccef8b642c8a27cf0a3439c33382a Mon Sep 17 00:00:00 2001 From: lvzk <897706680@qq.com> Date: Fri, 11 Apr 2025 20:57:23 +0800 Subject: [PATCH 03/15] =?UTF-8?q?[=E5=8A=9F=E8=83=BD]=E5=88=9B=E5=BB=BA?= =?UTF-8?q?=E4=BD=9C=E4=B8=9A=E5=A2=9E=E5=8A=A0=E6=95=B0=E9=87=8F=E9=99=90?= =?UTF-8?q?=E5=88=B6=E6=8E=92=E9=98=9F=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../api/job/GetJobQueueStatusApi.java | 68 +++++++++++++++++++ .../autoexecrunner/api/job/JobExecApi.java | 2 +- .../queue/NeatLogicUniqueBlockingQueue.java | 17 ++++- .../autoexecrunner/dto/CommandVo.java | 14 ++-- .../startup/handler/AutoexecQueueThread.java | 8 ++- 5 files changed, 101 insertions(+), 8 deletions(-) create mode 100755 src/main/java/com/neatlogic/autoexecrunner/api/job/GetJobQueueStatusApi.java diff --git a/src/main/java/com/neatlogic/autoexecrunner/api/job/GetJobQueueStatusApi.java b/src/main/java/com/neatlogic/autoexecrunner/api/job/GetJobQueueStatusApi.java new file mode 100755 index 0000000..9898c0a --- /dev/null +++ b/src/main/java/com/neatlogic/autoexecrunner/api/job/GetJobQueueStatusApi.java @@ -0,0 +1,68 @@ +/*Copyright (C) 2024 深圳极向量科技有限公司 All Rights Reserved. + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see .*/ +package com.neatlogic.autoexecrunner.api.job; + +import com.alibaba.fastjson.JSONObject; +import com.neatlogic.autoexecrunner.asynchronization.queue.NeatLogicUniqueBlockingQueue; +import com.neatlogic.autoexecrunner.constvalue.ApiParamType; +import com.neatlogic.autoexecrunner.dto.CommandVo; +import com.neatlogic.autoexecrunner.restful.annotation.Input; +import com.neatlogic.autoexecrunner.restful.annotation.Param; +import com.neatlogic.autoexecrunner.restful.core.privateapi.PrivateApiComponentBase; +import com.neatlogic.autoexecrunner.startup.handler.AutoexecQueueThread; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + + +@Component +public class GetJobQueueStatusApi extends PrivateApiComponentBase { + @Override + public String getName() { + return "获取作业排队状态"; + } + + @Input({ + @Param(name = "jobId", type = ApiParamType.LONG, isRequired = true, desc = "作业id"), + @Param(name = "groupSort", type = ApiParamType.LONG, desc = "阶段组id"), + @Param(name = "phaseName", type = ApiParamType.STRING, desc = "阶段名"), + }) + @Override + public Object myDoService(JSONObject jsonObj) throws Exception { + String jobId = jsonObj.getString("jobId"); + Integer groupSort = jsonObj.getInteger("groupSort"); + NeatLogicUniqueBlockingQueue blockingQueue = AutoexecQueueThread.getBlockingQueue(); + List list = blockingQueue.getQueue(); + JSONObject result = new JSONObject(); + for (int i = 0; i < list.size(); i++) { + CommandVo commandVo = list.get(i); + JSONObject commandJson = new JSONObject(); + commandJson.put("fcd",commandVo.getFcd().getTime()); + commandJson.put("command",commandVo.getCommandList().stream().map(Object::toString).collect(Collectors.joining("','"))); + if (Objects.equals(commandVo.getJobId(), jobId) && (groupSort == null || commandVo.getJobGroupIdList().contains(groupSort))) { + result.put(String.valueOf(i + 1), commandJson); + } + } + result.put("count", list.size()); + return result; + } + + @Override + public String getToken() { + return "/job/queue/status/get"; + } +} diff --git a/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java b/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java index 6ed4b7a..ad2ee79 100644 --- a/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java +++ b/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java @@ -82,7 +82,7 @@ public class JobExecApi extends PrivateApiComponentBase { } commandList.add("--reuseconslog"); commandVo.setCommandList(commandList); - AutoexecQueueThread.addUpdateTagent(commandVo); + AutoexecQueueThread.addCommand(commandVo); return null; } diff --git a/src/main/java/com/neatlogic/autoexecrunner/asynchronization/queue/NeatLogicUniqueBlockingQueue.java b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/queue/NeatLogicUniqueBlockingQueue.java index dea5fb3..e34029b 100644 --- a/src/main/java/com/neatlogic/autoexecrunner/asynchronization/queue/NeatLogicUniqueBlockingQueue.java +++ b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/queue/NeatLogicUniqueBlockingQueue.java @@ -19,12 +19,16 @@ package com.neatlogic.autoexecrunner.asynchronization.queue; import com.alibaba.fastjson.JSON; import com.neatlogic.autoexecrunner.asynchronization.threadlocal.TenantContext; +import org.apache.commons.collections4.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; +import java.util.stream.Collectors; public class NeatLogicUniqueBlockingQueue { private static final Logger logger = LoggerFactory.getLogger(NeatLogicUniqueBlockingQueue.class); @@ -69,7 +73,7 @@ public class NeatLogicUniqueBlockingQueue { return task.getT(); } - private static class Task { + public static class Task { private final T t; private final String tenantUuid; @@ -93,10 +97,19 @@ public class NeatLogicUniqueBlockingQueue { } } - public int size(){ + public int size() { return blockingQueue.size(); } + public List getQueue() { + List list = new ArrayList<>(); + List> taskList = new ArrayList<>(blockingQueue); + if (CollectionUtils.isNotEmpty(taskList)) { + list = taskList.stream().map(Task::getT).collect(Collectors.toList()); + } + return list; + } + // public static void main(String[] args) throws InterruptedException { // NeatLogicUniqueBlockingQueue queue = new NeatLogicUniqueBlockingQueue<>(1); // diff --git a/src/main/java/com/neatlogic/autoexecrunner/dto/CommandVo.java b/src/main/java/com/neatlogic/autoexecrunner/dto/CommandVo.java index 34d6c91..d3e3cb7 100644 --- a/src/main/java/com/neatlogic/autoexecrunner/dto/CommandVo.java +++ b/src/main/java/com/neatlogic/autoexecrunner/dto/CommandVo.java @@ -9,10 +9,7 @@ import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import java.io.File; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import java.util.Map; +import java.util.*; /** * @author lvzk @@ -33,6 +30,7 @@ public class CommandVo { private List jobGroupIdList;//需要执行的组 private JSONArray jobPhaseNodeSqlList; private JSONObject environment;//设置环境变量 + private Date fcd; private String consoleLogPath; @@ -219,4 +217,12 @@ public class CommandVo { this.consoleLogPath = Config.AUTOEXEC_HOME() + File.separator + JobUtil.getJobPath(getJobId(), new StringBuilder()) + File.separator + "log" + File.separator + "console.txt"; return consoleLogPath; } + + public Date getFcd() { + return fcd; + } + + public void setFcd(Date fcd) { + this.fcd = fcd; + } } diff --git a/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java b/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java index a98bfc2..1685a99 100644 --- a/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java +++ b/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java @@ -17,6 +17,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.lang.reflect.Field; +import java.util.Date; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -138,7 +139,8 @@ public class AutoexecQueueThread implements IStartUp { running = false; } - public static void addUpdateTagent(CommandVo commandVo) { + public static void addCommand(CommandVo commandVo) { + commandVo.setFcd(new Date()); blockingQueue.offer(commandVo); } @@ -161,4 +163,8 @@ public class AutoexecQueueThread implements IStartUp { public static Integer getProcessQueueSize() { return processQueue.size(); } + + public static NeatLogicUniqueBlockingQueue getBlockingQueue(){ + return blockingQueue; + } } -- Gitee From 791eb29c2f77d86db67eebdcfe7902182af246f0 Mon Sep 17 00:00:00 2001 From: lvzk <897706680@qq.com> Date: Tue, 15 Apr 2025 17:15:49 +0800 Subject: [PATCH 04/15] =?UTF-8?q?[=E5=8A=9F=E8=83=BD]=E5=88=9B=E5=BB=BA?= =?UTF-8?q?=E4=BD=9C=E4=B8=9A=E5=A2=9E=E5=8A=A0=E6=95=B0=E9=87=8F=E9=99=90?= =?UTF-8?q?=E5=88=B6=E6=8E=92=E9=98=9F=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...usApi.java => GetJobWaitingDetailApi.java} | 13 +++------- .../autoexecrunner/api/job/JobExecApi.java | 5 +++- .../exception/job/JobQueueFullException.java | 14 ++++++++++ .../startup/handler/AutoexecQueueThread.java | 26 +++++++++++++------ 4 files changed, 40 insertions(+), 18 deletions(-) rename src/main/java/com/neatlogic/autoexecrunner/api/job/{GetJobQueueStatusApi.java => GetJobWaitingDetailApi.java} (80%) create mode 100755 src/main/java/com/neatlogic/autoexecrunner/exception/job/JobQueueFullException.java diff --git a/src/main/java/com/neatlogic/autoexecrunner/api/job/GetJobQueueStatusApi.java b/src/main/java/com/neatlogic/autoexecrunner/api/job/GetJobWaitingDetailApi.java similarity index 80% rename from src/main/java/com/neatlogic/autoexecrunner/api/job/GetJobQueueStatusApi.java rename to src/main/java/com/neatlogic/autoexecrunner/api/job/GetJobWaitingDetailApi.java index 9898c0a..498fe1e 100755 --- a/src/main/java/com/neatlogic/autoexecrunner/api/job/GetJobQueueStatusApi.java +++ b/src/main/java/com/neatlogic/autoexecrunner/api/job/GetJobWaitingDetailApi.java @@ -15,7 +15,6 @@ along with this program. If not, see .*/ package com.neatlogic.autoexecrunner.api.job; import com.alibaba.fastjson.JSONObject; -import com.neatlogic.autoexecrunner.asynchronization.queue.NeatLogicUniqueBlockingQueue; import com.neatlogic.autoexecrunner.constvalue.ApiParamType; import com.neatlogic.autoexecrunner.dto.CommandVo; import com.neatlogic.autoexecrunner.restful.annotation.Input; @@ -25,12 +24,11 @@ import com.neatlogic.autoexecrunner.startup.handler.AutoexecQueueThread; import org.springframework.stereotype.Component; import java.util.List; -import java.util.Objects; import java.util.stream.Collectors; @Component -public class GetJobQueueStatusApi extends PrivateApiComponentBase { +public class GetJobWaitingDetailApi extends PrivateApiComponentBase { @Override public String getName() { return "获取作业排队状态"; @@ -45,17 +43,14 @@ public class GetJobQueueStatusApi extends PrivateApiComponentBase { public Object myDoService(JSONObject jsonObj) throws Exception { String jobId = jsonObj.getString("jobId"); Integer groupSort = jsonObj.getInteger("groupSort"); - NeatLogicUniqueBlockingQueue blockingQueue = AutoexecQueueThread.getBlockingQueue(); - List list = blockingQueue.getQueue(); + List list = AutoexecQueueThread.getBlockingQueueByJobIdAndGroupSort(jobId,groupSort); JSONObject result = new JSONObject(); for (int i = 0; i < list.size(); i++) { CommandVo commandVo = list.get(i); JSONObject commandJson = new JSONObject(); commandJson.put("fcd",commandVo.getFcd().getTime()); commandJson.put("command",commandVo.getCommandList().stream().map(Object::toString).collect(Collectors.joining("','"))); - if (Objects.equals(commandVo.getJobId(), jobId) && (groupSort == null || commandVo.getJobGroupIdList().contains(groupSort))) { - result.put(String.valueOf(i + 1), commandJson); - } + result.put(String.valueOf(i + 1), commandJson); } result.put("count", list.size()); return result; @@ -63,6 +58,6 @@ public class GetJobQueueStatusApi extends PrivateApiComponentBase { @Override public String getToken() { - return "/job/queue/status/get"; + return "/job/waiting/detail/get"; } } diff --git a/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java b/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java index ad2ee79..c6596f5 100644 --- a/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java +++ b/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java @@ -18,6 +18,7 @@ import com.alibaba.fastjson.JSONObject; import com.neatlogic.autoexecrunner.asynchronization.threadlocal.UserContext; import com.neatlogic.autoexecrunner.constvalue.JobAction; import com.neatlogic.autoexecrunner.dto.CommandVo; +import com.neatlogic.autoexecrunner.exception.job.JobQueueFullException; import com.neatlogic.autoexecrunner.restful.core.privateapi.PrivateApiComponentBase; import com.neatlogic.autoexecrunner.startup.handler.AutoexecQueueThread; import com.neatlogic.autoexecrunner.util.FileUtil; @@ -82,7 +83,9 @@ public class JobExecApi extends PrivateApiComponentBase { } commandList.add("--reuseconslog"); commandVo.setCommandList(commandList); - AutoexecQueueThread.addCommand(commandVo); + if(!AutoexecQueueThread.addCommand(commandVo)){ + throw new JobQueueFullException(); + } return null; } diff --git a/src/main/java/com/neatlogic/autoexecrunner/exception/job/JobQueueFullException.java b/src/main/java/com/neatlogic/autoexecrunner/exception/job/JobQueueFullException.java new file mode 100755 index 0000000..8b25d14 --- /dev/null +++ b/src/main/java/com/neatlogic/autoexecrunner/exception/job/JobQueueFullException.java @@ -0,0 +1,14 @@ +package com.neatlogic.autoexecrunner.exception.job; + + +import com.neatlogic.autoexecrunner.common.config.Config; +import com.neatlogic.autoexecrunner.exception.core.ApiRuntimeException; +import com.neatlogic.autoexecrunner.startup.handler.AutoexecQueueThread; + +public class JobQueueFullException extends ApiRuntimeException { + + public JobQueueFullException() { + super("作业队列已满(当前队列数" + AutoexecQueueThread.getBlockingQueueSize() + ">= 配置队列最大数" + Config.MAX_PROCESS_QUEUE_SIZE() + "),无法执行"); + } + +} diff --git a/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java b/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java index 1685a99..3b84ec8 100644 --- a/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java +++ b/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java @@ -17,15 +17,14 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.lang.reflect.Field; -import java.util.Date; -import java.util.Map; +import java.util.*; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; public class AutoexecQueueThread implements IStartUp { - private static final BlockingQueue processQueue = new LinkedBlockingQueue<>(Config.MAX_PROCESS_QUEUE_SIZE() + 5); + private static final BlockingQueue processQueue = new LinkedBlockingQueue<>(Config.MAX_PROCESS_EXECUTE_COUNT() + 5); private static final Logger logger = LoggerFactory.getLogger(AutoexecQueueThread.class); - private static final NeatLogicUniqueBlockingQueue blockingQueue = new NeatLogicUniqueBlockingQueue<>(50000); + private static final NeatLogicUniqueBlockingQueue blockingQueue = new NeatLogicUniqueBlockingQueue<>(Config.MAX_PROCESS_QUEUE_SIZE()); private volatile boolean running = true; @Override @@ -139,9 +138,9 @@ public class AutoexecQueueThread implements IStartUp { running = false; } - public static void addCommand(CommandVo commandVo) { + public static boolean addCommand(CommandVo commandVo) { commandVo.setFcd(new Date()); - blockingQueue.offer(commandVo); + return blockingQueue.offer(commandVo); } public static void addProcess(Process process) { @@ -164,7 +163,18 @@ public class AutoexecQueueThread implements IStartUp { return processQueue.size(); } - public static NeatLogicUniqueBlockingQueue getBlockingQueue(){ - return blockingQueue; + public static Integer getBlockingQueueSize() { + return blockingQueue.size(); + } + + public static List getBlockingQueueByJobIdAndGroupSort(String jobId, Integer groupSort) { + List list = blockingQueue.getQueue(); + List jobCommandList = new ArrayList<>(); + for (CommandVo commandVo : list) { + if (Objects.equals(commandVo.getJobId(), jobId) && (groupSort == null || commandVo.getJobGroupIdList().contains(groupSort))) { + jobCommandList.add(commandVo); + } + } + return jobCommandList; } } -- Gitee From 86e147ec294eaf30014e0ad59442cb45e676e03b Mon Sep 17 00:00:00 2001 From: lvzk <897706680@qq.com> Date: Wed, 16 Apr 2025 19:40:50 +0800 Subject: [PATCH 05/15] =?UTF-8?q?[=E5=8A=9F=E8=83=BD]=E5=88=9B=E5=BB=BA?= =?UTF-8?q?=E4=BD=9C=E4=B8=9A=E5=A2=9E=E5=8A=A0=E6=95=B0=E9=87=8F=E9=99=90?= =?UTF-8?q?=E5=88=B6=E6=8E=92=E9=98=9F=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../api/job/GetJobWaitingDetailApi.java | 13 +++++-- .../autoexecrunner/api/job/JobAbortApi.java | 10 +++-- .../autoexecrunner/api/job/JobExecApi.java | 12 ++++-- .../queue/NeatLogicUniqueBlockingQueue.java | 26 +++++++++++-- .../autoexecrunner/dto/CommandVo.java | 39 ++++++++++++++++--- .../startup/handler/AutoexecQueueThread.java | 24 +++++++----- 6 files changed, 97 insertions(+), 27 deletions(-) diff --git a/src/main/java/com/neatlogic/autoexecrunner/api/job/GetJobWaitingDetailApi.java b/src/main/java/com/neatlogic/autoexecrunner/api/job/GetJobWaitingDetailApi.java index 498fe1e..ab7ee80 100755 --- a/src/main/java/com/neatlogic/autoexecrunner/api/job/GetJobWaitingDetailApi.java +++ b/src/main/java/com/neatlogic/autoexecrunner/api/job/GetJobWaitingDetailApi.java @@ -24,6 +24,7 @@ import com.neatlogic.autoexecrunner.startup.handler.AutoexecQueueThread; import org.springframework.stereotype.Component; import java.util.List; +import java.util.Objects; import java.util.stream.Collectors; @@ -43,16 +44,22 @@ public class GetJobWaitingDetailApi extends PrivateApiComponentBase { public Object myDoService(JSONObject jsonObj) throws Exception { String jobId = jsonObj.getString("jobId"); Integer groupSort = jsonObj.getInteger("groupSort"); - List list = AutoexecQueueThread.getBlockingQueueByJobIdAndGroupSort(jobId,groupSort); + List list = AutoexecQueueThread.getBlockingQueueByJobIdAndGroupSort(); JSONObject result = new JSONObject(); for (int i = 0; i < list.size(); i++) { CommandVo commandVo = list.get(i); JSONObject commandJson = new JSONObject(); + commandJson.put("groupSortList",commandVo.getJobGroupSortList()); + commandJson.put("nodeSqlList",commandVo.getJobPhaseNodeSqlList()); + commandJson.put("phaseNameList",commandVo.getJobPhaseNameList()); + commandJson.put("resourceIdList",commandVo.getJobPhaseResourceIdList()); commandJson.put("fcd",commandVo.getFcd().getTime()); commandJson.put("command",commandVo.getCommandList().stream().map(Object::toString).collect(Collectors.joining("','"))); - result.put(String.valueOf(i + 1), commandJson); + if (Objects.equals(commandVo.getJobId(), jobId) && (groupSort == null || commandVo.getJobGroupSortList().contains(groupSort))) { + result.put(String.valueOf(i + 1), commandJson); + } } - result.put("count", list.size()); + result.put("count", AutoexecQueueThread.getBlockingQueueSize()); return result; } diff --git a/src/main/java/com/neatlogic/autoexecrunner/api/job/JobAbortApi.java b/src/main/java/com/neatlogic/autoexecrunner/api/job/JobAbortApi.java index 8c46885..b31930f 100755 --- a/src/main/java/com/neatlogic/autoexecrunner/api/job/JobAbortApi.java +++ b/src/main/java/com/neatlogic/autoexecrunner/api/job/JobAbortApi.java @@ -20,6 +20,7 @@ import com.neatlogic.autoexecrunner.constvalue.JobAction; import com.neatlogic.autoexecrunner.core.ExecProcessCommand; import com.neatlogic.autoexecrunner.dto.CommandVo; import com.neatlogic.autoexecrunner.restful.core.privateapi.PrivateApiComponentBase; +import com.neatlogic.autoexecrunner.startup.handler.AutoexecQueueThread; import com.neatlogic.autoexecrunner.threadpool.CommonThreadPool; import org.springframework.stereotype.Component; @@ -45,13 +46,16 @@ public class JobAbortApi extends PrivateApiComponentBase { //set command List commandList = Arrays.asList("autoexec", "--jobid", commandVo.getJobId(), "--execuser", UserContext.get().getUserUuid(), "--abort"); commandList = new ArrayList<>(commandList); - if(commandVo.getPassThroughEnv() != null){ + if (commandVo.getPassThroughEnv() != null) { commandList.add("--passthroughenv"); commandList.add(commandVo.getPassThroughEnv().toString()); } commandVo.setCommandList(commandList); - ExecProcessCommand processCommand = new ExecProcessCommand(commandVo); - CommonThreadPool.execute(processCommand); + //队列里不存在才执行abort命令 + if (!AutoexecQueueThread.removeCommand(commandVo)) { + ExecProcessCommand processCommand = new ExecProcessCommand(commandVo); + CommonThreadPool.execute(processCommand); + } return null; } diff --git a/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java b/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java index c6596f5..d9c534f 100644 --- a/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java +++ b/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java @@ -23,6 +23,8 @@ import com.neatlogic.autoexecrunner.restful.core.privateapi.PrivateApiComponentB import com.neatlogic.autoexecrunner.startup.handler.AutoexecQueueThread; import com.neatlogic.autoexecrunner.util.FileUtil; import org.apache.commons.collections4.CollectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import java.util.ArrayList; @@ -36,6 +38,8 @@ import java.util.stream.Collectors; **/ @Component public class JobExecApi extends PrivateApiComponentBase { + private static final Logger logger = LoggerFactory.getLogger(JobExecApi.class); + @Override public String getName() { return "创建执行作业剧本进程"; @@ -65,9 +69,9 @@ public class JobExecApi extends PrivateApiComponentBase { commandList.add("--passthroughenv"); commandList.add(commandVo.getPassThroughEnv().toString()); } - if (CollectionUtils.isNotEmpty(commandVo.getJobGroupIdList())) { + if (CollectionUtils.isNotEmpty(commandVo.getJobGroupSortList())) { commandList.add("--phasegroups"); - commandList.add(commandVo.getJobGroupIdList().stream().map(Object::toString).collect(Collectors.joining("','"))); + commandList.add(commandVo.getJobGroupSortList().stream().map(Object::toString).collect(Collectors.joining("','"))); } if (CollectionUtils.isNotEmpty(commandVo.getJobPhaseNameList())) { commandList.add("--phases"); @@ -83,8 +87,10 @@ public class JobExecApi extends PrivateApiComponentBase { } commandList.add("--reuseconslog"); commandVo.setCommandList(commandList); - if(!AutoexecQueueThread.addCommand(commandVo)){ + if (AutoexecQueueThread.addCommand(commandVo) == -1) { throw new JobQueueFullException(); + } else if (AutoexecQueueThread.addCommand(commandVo) == 0) { + logger.debug("队列里已存在相同的执行命令:{}", String.join(",", commandList)); } return null; } diff --git a/src/main/java/com/neatlogic/autoexecrunner/asynchronization/queue/NeatLogicUniqueBlockingQueue.java b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/queue/NeatLogicUniqueBlockingQueue.java index e34029b..1af28a5 100644 --- a/src/main/java/com/neatlogic/autoexecrunner/asynchronization/queue/NeatLogicUniqueBlockingQueue.java +++ b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/queue/NeatLogicUniqueBlockingQueue.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; +import java.util.function.Predicate; import java.util.stream.Collectors; public class NeatLogicUniqueBlockingQueue { @@ -40,7 +41,11 @@ public class NeatLogicUniqueBlockingQueue { this.taskMap = new ConcurrentHashMap<>(); } - public boolean offer(T t) { + /** + * 添加队列成员 + * 返回-1:队列已满,1:添加成功,0:重复添加 + */ + public int offer(T t) { Task task = new Task<>(t); // 保证任务唯一性 if (taskMap.putIfAbsent(task.getUniqueKey(), Boolean.TRUE) == null) { @@ -51,14 +56,29 @@ public class NeatLogicUniqueBlockingQueue { // 如果队列已满,移除任务标记 taskMap.remove(task.getUniqueKey()); logger.error("Queue is full!"); + return -1; } - return added; + return 1; } else { if (t != null) { logger.debug("NeatLogicUniqueBlockingQueue repeat: {}", JSON.toJSONString(t)); } + return 0; } - return false; // 已存在任务,直接返回 false + } + + public boolean remove(Predicate> condition) { + for (Task task : blockingQueue) { + if (condition.test(task)) { + boolean removed = blockingQueue.remove(task); + if (removed) { + taskMap.remove(task.getUniqueKey()); + logger.debug("Removed task: {}", JSON.toJSONString(task)); + return true; + } + } + } + return false; } public T take() throws InterruptedException { diff --git a/src/main/java/com/neatlogic/autoexecrunner/dto/CommandVo.java b/src/main/java/com/neatlogic/autoexecrunner/dto/CommandVo.java index d3e3cb7..db7a3ca 100644 --- a/src/main/java/com/neatlogic/autoexecrunner/dto/CommandVo.java +++ b/src/main/java/com/neatlogic/autoexecrunner/dto/CommandVo.java @@ -27,7 +27,7 @@ public class CommandVo { private JSONObject passThroughEnv;//web端传到runner贯穿autoexec 回调web端会携带该变量 private List jobPhaseNameList;//需要执行的phaseNameList private List jobPhaseResourceIdList;//需要执行的resourceIdList - private List jobGroupIdList;//需要执行的组 + private List jobGroupSortList;//需要执行的组 private JSONArray jobPhaseNodeSqlList; private JSONObject environment;//设置环境变量 private Date fcd; @@ -71,9 +71,9 @@ public class CommandVo { if (CollectionUtils.isNotEmpty(jobPhaseResourceIdArray)) { this.jobPhaseResourceIdList = jobPhaseResourceIdArray.toJavaList(Long.class); } - JSONArray jobGroupIdArray = jsonObj.getJSONArray("jobGroupIdList"); + JSONArray jobGroupIdArray = jsonObj.getJSONArray("jobGroupSortList"); if (CollectionUtils.isNotEmpty(jobGroupIdArray)) { - this.jobGroupIdList = jobGroupIdArray.toJavaList(Integer.class); + this.jobGroupSortList = jobGroupIdArray.toJavaList(Integer.class); } JSONArray jobPhaseNodeSqlList = jsonObj.getJSONArray("jobPhaseNodeSqlList"); @@ -187,8 +187,8 @@ public class CommandVo { return jobPhaseResourceIdList; } - public List getJobGroupIdList() { - return jobGroupIdList; + public List getJobGroupSortList() { + return jobGroupSortList; } public JSONArray getJobPhaseNodeSqlList() { @@ -225,4 +225,33 @@ public class CommandVo { public void setFcd(Date fcd) { this.fcd = fcd; } + + private String getFilteredCommandString() { + if (commandList == null) return ""; + List filtered = new ArrayList<>(); + Iterator iterator = commandList.iterator(); + while (iterator.hasNext()) { + String item = iterator.next(); + if ("--execid".equals(item)) { + // 跳过 "--execid" 和它后面的那个参数 + if (iterator.hasNext()) iterator.next(); + continue; + } + filtered.add(item); + } + return String.join(",", filtered); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof CommandVo)) return false; + CommandVo that = (CommandVo) o; + return Objects.equals(getFilteredCommandString(), that.getFilteredCommandString()); + } + + @Override + public int hashCode() { + return getFilteredCommandString().hashCode(); + } } diff --git a/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java b/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java index 3b84ec8..7c34974 100644 --- a/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java +++ b/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java @@ -138,11 +138,22 @@ public class AutoexecQueueThread implements IStartUp { running = false; } - public static boolean addCommand(CommandVo commandVo) { + /** + * 进入队列 + * 返回-1:队列已满,1:添加成功,0:重复添加 + */ + public static int addCommand(CommandVo commandVo) { commandVo.setFcd(new Date()); return blockingQueue.offer(commandVo); } + /** + * 删除命令 + */ + public static boolean removeCommand(CommandVo commandVo) { + return blockingQueue.remove(task -> task.getT().getJobId().equals(commandVo.getJobId())); + } + public static void addProcess(Process process) { boolean result = processQueue.offer(process); if (!result) { @@ -167,14 +178,7 @@ public class AutoexecQueueThread implements IStartUp { return blockingQueue.size(); } - public static List getBlockingQueueByJobIdAndGroupSort(String jobId, Integer groupSort) { - List list = blockingQueue.getQueue(); - List jobCommandList = new ArrayList<>(); - for (CommandVo commandVo : list) { - if (Objects.equals(commandVo.getJobId(), jobId) && (groupSort == null || commandVo.getJobGroupIdList().contains(groupSort))) { - jobCommandList.add(commandVo); - } - } - return jobCommandList; + public static List getBlockingQueueByJobIdAndGroupSort() { + return blockingQueue.getQueue(); } } -- Gitee From 82830847eeb1eb3f6d30fac791124cfd2393308f Mon Sep 17 00:00:00 2001 From: lvzk <897706680@qq.com> Date: Thu, 17 Apr 2025 11:48:05 +0800 Subject: [PATCH 06/15] =?UTF-8?q?[=E5=8A=9F=E8=83=BD]=E5=88=9B=E5=BB=BA?= =?UTF-8?q?=E4=BD=9C=E4=B8=9A=E5=A2=9E=E5=8A=A0=E6=95=B0=E9=87=8F=E9=99=90?= =?UTF-8?q?=E5=88=B6=E6=8E=92=E9=98=9F=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../autoexecrunner/common/config/Config.java | 16 ++++++++-------- .../exception/job/JobQueueFullException.java | 2 +- .../startup/handler/AutoexecQueueThread.java | 10 +++++----- src/main/resources/application.properties | 10 +++++++++- 4 files changed, 23 insertions(+), 15 deletions(-) diff --git a/src/main/java/com/neatlogic/autoexecrunner/common/config/Config.java b/src/main/java/com/neatlogic/autoexecrunner/common/config/Config.java index dd660c9..77cfc7b 100644 --- a/src/main/java/com/neatlogic/autoexecrunner/common/config/Config.java +++ b/src/main/java/com/neatlogic/autoexecrunner/common/config/Config.java @@ -37,8 +37,8 @@ public class Config { private static String DATA_HOME;//文件根目录 private static String DEPLOY_HOME;//发布目录 private static String GITLAB_PASSWORD;// gitlab private_token - private static Integer MAX_PROCESS_QUEUE_SIZE;//最大自动化作业队列数,多余的则丢弃 - private static Integer MAX_PROCESS_EXECUTE_COUNT;//最大执行作业数,超过的则进入队列 + private static Integer SUBPROCESS_COMMAND_QUEUE_MAX_SIZE;//自动化作业命令等待队列的最大容量,超过此数量的新命令将被拒绝或丢弃 + private static Integer SUBPROCESS_EXECUTION_MAX_CONCURRENT;//自动化作业最大并发子进程数,超过此数量的命令将进入等待队列 //neatlogic private static String NEATLOGIC_ROOT; @@ -170,11 +170,11 @@ public class Config { return AUTOEXEC_TOKEN; } - public static Integer MAX_PROCESS_QUEUE_SIZE() { - return MAX_PROCESS_QUEUE_SIZE; + public static Integer SUBPROCESS_COMMAND_QUEUE_MAX_SIZE() { + return SUBPROCESS_COMMAND_QUEUE_MAX_SIZE; } - public static Integer MAX_PROCESS_EXECUTE_COUNT() { - return MAX_PROCESS_EXECUTE_COUNT; + public static Integer SUBPROCESS_EXECUTION_MAX_CONCURRENT() { + return SUBPROCESS_EXECUTION_MAX_CONCURRENT; } @PostConstruct @@ -248,9 +248,9 @@ public class Config { UPDATE_RUNNER_STATUS_PERIOD = Integer.parseInt(prop.getProperty("update.runner.status.period", "1800000")); - MAX_PROCESS_QUEUE_SIZE = Integer.parseInt(prop.getProperty("max.process.queue.size", "1000")); + SUBPROCESS_COMMAND_QUEUE_MAX_SIZE = Integer.parseInt(prop.getProperty("subprocess.command.queue.max-size", "1000")); - MAX_PROCESS_EXECUTE_COUNT = Integer.parseInt(prop.getProperty("max.process.execute.count", "20")); + SUBPROCESS_EXECUTION_MAX_CONCURRENT = Integer.parseInt(prop.getProperty("subprocess.execution.max-concurrent", "20")); } catch (IOException e) { logger.error(e.getMessage(), e); } diff --git a/src/main/java/com/neatlogic/autoexecrunner/exception/job/JobQueueFullException.java b/src/main/java/com/neatlogic/autoexecrunner/exception/job/JobQueueFullException.java index 8b25d14..a72dae6 100755 --- a/src/main/java/com/neatlogic/autoexecrunner/exception/job/JobQueueFullException.java +++ b/src/main/java/com/neatlogic/autoexecrunner/exception/job/JobQueueFullException.java @@ -8,7 +8,7 @@ import com.neatlogic.autoexecrunner.startup.handler.AutoexecQueueThread; public class JobQueueFullException extends ApiRuntimeException { public JobQueueFullException() { - super("作业队列已满(当前队列数" + AutoexecQueueThread.getBlockingQueueSize() + ">= 配置队列最大数" + Config.MAX_PROCESS_QUEUE_SIZE() + "),无法执行"); + super("作业队列已满(当前队列数" + AutoexecQueueThread.getBlockingQueueSize() + ">= 配置队列最大数" + Config.SUBPROCESS_COMMAND_QUEUE_MAX_SIZE() + "),无法执行"); } } diff --git a/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java b/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java index 7c34974..9f1a4e0 100644 --- a/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java +++ b/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java @@ -22,9 +22,9 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; public class AutoexecQueueThread implements IStartUp { - private static final BlockingQueue processQueue = new LinkedBlockingQueue<>(Config.MAX_PROCESS_EXECUTE_COUNT() + 5); + private static final BlockingQueue processQueue = new LinkedBlockingQueue<>(Config.SUBPROCESS_EXECUTION_MAX_CONCURRENT() + 5); private static final Logger logger = LoggerFactory.getLogger(AutoexecQueueThread.class); - private static final NeatLogicUniqueBlockingQueue blockingQueue = new NeatLogicUniqueBlockingQueue<>(Config.MAX_PROCESS_QUEUE_SIZE()); + private static final NeatLogicUniqueBlockingQueue blockingQueue = new NeatLogicUniqueBlockingQueue<>(Config.SUBPROCESS_COMMAND_QUEUE_MAX_SIZE()); private volatile boolean running = true; @Override @@ -54,12 +54,12 @@ public class AutoexecQueueThread implements IStartUp { CommandVo commandVo = null; try { // 你的业务逻辑 - if (processQueue.size() <= Config.MAX_PROCESS_EXECUTE_COUNT()) { + if (processQueue.size() <= Config.SUBPROCESS_EXECUTION_MAX_CONCURRENT()) { commandVo = blockingQueue.take(); - logger.debug("current autoexec sub process count:{} <= {},autoexec job:{} will create...", processQueue.size(), Config.MAX_PROCESS_EXECUTE_COUNT(), (commandVo.getTenant() + "-" + commandVo.getJobId() + "-" + (MapUtils.isNotEmpty(commandVo.getPassThroughEnv()) ? commandVo.getPassThroughEnv().getString("groupSort") : StringUtils.EMPTY))); + logger.debug("current autoexec sub process count:{} <= {},autoexec job:{} will create...", processQueue.size(), Config.SUBPROCESS_EXECUTION_MAX_CONCURRENT(), (commandVo.getTenant() + "-" + commandVo.getJobId() + "-" + (MapUtils.isNotEmpty(commandVo.getPassThroughEnv()) ? commandVo.getPassThroughEnv().getString("groupSort") : StringUtils.EMPTY))); createSubProcessAndStart(commandVo); } else { - logger.debug("autoexec sub process limit count :{}, need to wait process finish,then keep on creating sub process!", Config.MAX_PROCESS_EXECUTE_COUNT()); + logger.debug("autoexec sub process limit count :{}, need to wait process finish,then keep on creating sub process!", Config.SUBPROCESS_EXECUTION_MAX_CONCURRENT()); } Thread.sleep(2000); } catch (InterruptedException e) { diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 2606178..81f7bfe 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -14,7 +14,6 @@ spring.servlet.multipart.max-file-size=100MB #NEATLOGIC WEB neatlogic.root=http://127.0.0.1:8080/neatlogic -#认证 连接时校验 #RUNNER @@ -32,3 +31,12 @@ autoexec.home=/Users/cocokong/IdeaProjects/autoexec/data/job deploy.home=/app/autoexec/data/verdata data.home=${runner.home}/data file.mimetype.text.plain=sql text c cc c++ cpp h pl py txt java el gitignore js css properties jsp yml json md vue sh config htm html xml classpath project pm less scss + + +# Maximum number of concurrent subprocesses for autoexec jobs. +# Commands exceeding this limit will be placed in the waiting queue. +subprocess.execution.max-concurrent=20 + +# Maximum capacity of the command waiting queue for autoexec jobs. +# New commands exceeding this limit will be rejected or discarded. +subprocess.command.queue.max-size=1000 -- Gitee From 110d9eed2f723197b545f86accd2c8d839739b9d Mon Sep 17 00:00:00 2001 From: lvzk <897706680@qq.com> Date: Mon, 21 Apr 2025 12:37:53 +0800 Subject: [PATCH 07/15] =?UTF-8?q?[=E4=BF=AE=E5=A4=8D]=E5=91=BD=E4=BB=A4?= =?UTF-8?q?=E6=89=A7=E8=A1=8C=E5=A4=9A=E9=81=8D=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../autoexecrunner/api/job/JobExecApi.java | 6 +++-- .../queue/NeatLogicUniqueBlockingQueue.java | 26 ++++++++++++------- .../startup/handler/AutoexecQueueThread.java | 3 ++- 3 files changed, 22 insertions(+), 13 deletions(-) diff --git a/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java b/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java index d9c534f..9249918 100644 --- a/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java +++ b/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java @@ -87,9 +87,11 @@ public class JobExecApi extends PrivateApiComponentBase { } commandList.add("--reuseconslog"); commandVo.setCommandList(commandList); - if (AutoexecQueueThread.addCommand(commandVo) == -1) { + + int addResult = AutoexecQueueThread.addCommand(commandVo); + if (addResult == -1) { throw new JobQueueFullException(); - } else if (AutoexecQueueThread.addCommand(commandVo) == 0) { + } else if (addResult == 0) { logger.debug("队列里已存在相同的执行命令:{}", String.join(",", commandList)); } return null; diff --git a/src/main/java/com/neatlogic/autoexecrunner/asynchronization/queue/NeatLogicUniqueBlockingQueue.java b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/queue/NeatLogicUniqueBlockingQueue.java index 1af28a5..a6a03b2 100644 --- a/src/main/java/com/neatlogic/autoexecrunner/asynchronization/queue/NeatLogicUniqueBlockingQueue.java +++ b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/queue/NeatLogicUniqueBlockingQueue.java @@ -49,7 +49,7 @@ public class NeatLogicUniqueBlockingQueue { Task task = new Task<>(t); // 保证任务唯一性 if (taskMap.putIfAbsent(task.getUniqueKey(), Boolean.TRUE) == null) { - logger.debug("====TagentUpdateInfo-addQueue:" + JSON.toJSONString(task)); + logger.debug("====addQueue:" + JSON.toJSONString(task)); // 如果任务是新任务,放入队列 boolean added = blockingQueue.offer(task); if (!added) { @@ -131,20 +131,26 @@ public class NeatLogicUniqueBlockingQueue { } // public static void main(String[] args) throws InterruptedException { -// NeatLogicUniqueBlockingQueue queue = new NeatLogicUniqueBlockingQueue<>(1); +// NeatLogicUniqueBlockingQueue queue = new NeatLogicUniqueBlockingQueue<>(1); // // // 模拟任务插入 -// UserSessionVo a = new UserSessionVo(); -// a.setToken("1111"); +// CommandVo a = new CommandVo(); +// a.setCommandList(Arrays.asList("--nodes")); // System.out.println(queue.offer(a)); // 返回 true,任务插入成功 -// UserSessionVo b = new UserSessionVo(); -// b.setToken("222"); -// System.out.println(queue.offer(b)); // 返回 false,任务已存在 +// System.out.println(queue.size()); +// Thread.sleep(2000); +// CommandVo b = new CommandVo(); +// b.setCommandList(Arrays.asList("--nodes1")); +// System.out.println(queue.offer(b)); // 返回 true,任务插入成功 +// System.out.println(queue.size()); +// // // // 模拟任务消费 -// UserSessionVo task = queue.take(); // 消费 "task1" -// UserSessionVo task2 = queue.take(); // 消费 "task1" -// UserSessionVo task3 = queue.take(); // 消费 "task1" +// queue.take(); // 消费 "task1" +// System.out.println(queue.size()); +// Thread.sleep(2000); +// System.out.println(queue.size()); +// queue.take(); // 消费 "task1" // } } diff --git a/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java b/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java index 9f1a4e0..c9a4ff3 100644 --- a/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java +++ b/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java @@ -56,7 +56,7 @@ public class AutoexecQueueThread implements IStartUp { // 你的业务逻辑 if (processQueue.size() <= Config.SUBPROCESS_EXECUTION_MAX_CONCURRENT()) { commandVo = blockingQueue.take(); - logger.debug("current autoexec sub process count:{} <= {},autoexec job:{} will create...", processQueue.size(), Config.SUBPROCESS_EXECUTION_MAX_CONCURRENT(), (commandVo.getTenant() + "-" + commandVo.getJobId() + "-" + (MapUtils.isNotEmpty(commandVo.getPassThroughEnv()) ? commandVo.getPassThroughEnv().getString("groupSort") : StringUtils.EMPTY))); + logger.debug("current autoexec sub process count:{} <= {},autoexec job:{} ,{} will create...", processQueue.size(), Config.SUBPROCESS_EXECUTION_MAX_CONCURRENT(), (commandVo.getTenant() + "-" + commandVo.getJobId() + "-" + (MapUtils.isNotEmpty(commandVo.getPassThroughEnv()) ? commandVo.getPassThroughEnv().getString("groupSort") : StringUtils.EMPTY)), JSON.toJSONString(commandVo.getCommandList())); createSubProcessAndStart(commandVo); } else { logger.debug("autoexec sub process limit count :{}, need to wait process finish,then keep on creating sub process!", Config.SUBPROCESS_EXECUTION_MAX_CONCURRENT()); @@ -144,6 +144,7 @@ public class AutoexecQueueThread implements IStartUp { */ public static int addCommand(CommandVo commandVo) { commandVo.setFcd(new Date()); + logger.debug("autoexec offer command:{}", JSON.toJSONString(commandVo.getCommandList())); return blockingQueue.offer(commandVo); } -- Gitee From e7843ab70e2a5a2eb5a54fe391ea5faf79e52a25 Mon Sep 17 00:00:00 2001 From: lvzk <897706680@qq.com> Date: Sun, 27 Apr 2025 14:33:42 +0800 Subject: [PATCH 08/15] =?UTF-8?q?[=E5=8A=9F=E8=83=BD]runner=E6=89=A7?= =?UTF-8?q?=E8=A1=8C=E2=80=9C=E4=B8=AD=E6=AD=A2=E2=80=9D=E5=92=8C=E2=80=9C?= =?UTF-8?q?=E6=9A=82=E5=81=9C=E2=80=9D=E5=91=BD=E4=BB=A4=E5=90=8E=EF=BC=8C?= =?UTF-8?q?=E5=BC=82=E5=B8=B8=E6=83=85=E5=86=B5=E4=B8=8D=E5=9B=9E=E8=B0=83?= =?UTF-8?q?=E6=9B=B4=E6=96=B0=E7=8A=B6=E6=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../core/ExecProcessCommand.java | 28 ++++++++----------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/src/main/java/com/neatlogic/autoexecrunner/core/ExecProcessCommand.java b/src/main/java/com/neatlogic/autoexecrunner/core/ExecProcessCommand.java index 14acccf..ce2a8bd 100644 --- a/src/main/java/com/neatlogic/autoexecrunner/core/ExecProcessCommand.java +++ b/src/main/java/com/neatlogic/autoexecrunner/core/ExecProcessCommand.java @@ -1,15 +1,10 @@ package com.neatlogic.autoexecrunner.core; -import com.alibaba.fastjson.JSONException; import com.alibaba.fastjson.JSONObject; import com.neatlogic.autoexecrunner.asynchronization.threadlocal.TenantContext; import com.neatlogic.autoexecrunner.asynchronization.threadlocal.UserContext; -import com.neatlogic.autoexecrunner.common.config.Config; -import com.neatlogic.autoexecrunner.constvalue.AuthenticateType; import com.neatlogic.autoexecrunner.dto.CommandVo; -import com.neatlogic.autoexecrunner.dto.RestVo; import com.neatlogic.autoexecrunner.util.FileUtil; -import com.neatlogic.autoexecrunner.util.RestUtil; import com.neatlogic.autoexecrunner.util.TimeUtil; import org.apache.commons.collections4.MapUtils; import org.slf4j.Logger; @@ -92,18 +87,19 @@ public class ExecProcessCommand implements Runnable { } catch (Exception ex) { logger.error(e.getMessage(), e); } - } finally { - if (commandVo != null && Objects.equals(commandVo.getExitValue(), 2)) { - String CALLBACK_PROCESS_UPDATE_URL = "autoexec/job/process/status/update"; - String url = String.format("%s/api/rest/%s", Config.NEATLOGIC_ROOT(), CALLBACK_PROCESS_UPDATE_URL); - try { - result = RestUtil.sendRequest(new RestVo(url, payload, AuthenticateType.HMAC.getValue(), commandVo.getTenant())); - JSONObject.parseObject(result); - } catch (JSONException e) { - logger.error("do RESTFul api failed,url: #{},result: #{}", url, result); - } - } } +// finally { +// if (commandVo != null && Objects.equals(commandVo.getExitValue(), 2)) { +// String CALLBACK_PROCESS_UPDATE_URL = "autoexec/job/process/status/update"; +// String url = String.format("%s/api/rest/%s", Config.NEATLOGIC_ROOT(), CALLBACK_PROCESS_UPDATE_URL); +// try { +// result = RestUtil.sendRequest(new RestVo(url, payload, AuthenticateType.HMAC.getValue(), commandVo.getTenant())); +// JSONObject.parseObject(result); +// } catch (JSONException e) { +// logger.error("do RESTFul api failed,url: #{},result: #{}", url, result); +// } +// } +// } } } -- Gitee From 8b0cc1063900651ee1f05e78bd56395efe40323d Mon Sep 17 00:00:00 2001 From: lvzk <897706680@qq.com> Date: Thu, 15 May 2025 15:32:20 +0800 Subject: [PATCH 09/15] =?UTF-8?q?[=E4=BF=AE=E5=A4=8D]=E9=87=8D=E7=BD=AE?= =?UTF-8?q?=E5=8D=95=E4=B8=AAsql=EF=BC=8C=E6=97=A0=E6=B3=95=E5=8D=95?= =?UTF-8?q?=E4=B8=AA=E9=87=8D=E6=96=B0=E6=89=A7=E8=A1=8Csql?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../autoexecrunner/api/job/node/JobPhaseNodeStatusResetApi.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/java/com/neatlogic/autoexecrunner/api/job/node/JobPhaseNodeStatusResetApi.java b/src/main/java/com/neatlogic/autoexecrunner/api/job/node/JobPhaseNodeStatusResetApi.java index d5e782a..dd029c8 100755 --- a/src/main/java/com/neatlogic/autoexecrunner/api/job/node/JobPhaseNodeStatusResetApi.java +++ b/src/main/java/com/neatlogic/autoexecrunner/api/job/node/JobPhaseNodeStatusResetApi.java @@ -103,6 +103,8 @@ public class JobPhaseNodeStatusResetApi extends PrivateApiComponentBase { String host = node.getString("host"); Integer port = node.getInteger("port"); nodeStatusPath.append(host).append("-").append(port == null ? StringUtils.EMPTY : port).append("-").append(node.getString("resourceId")).append(File.separator).append(node.getString("sqlFile")).append(".txt"); + //删除对应status文件记录 + FileUtil.deleteDirectoryOrFile(nodeStatusPath.toString()); } } else { //重置整个phase -- Gitee From 4deaaa85c0460af12b0fbf5fe27aa8903232e92e Mon Sep 17 00:00:00 2001 From: lvzk <897706680@qq.com> Date: Wed, 21 May 2025 11:59:47 +0800 Subject: [PATCH 10/15] =?UTF-8?q?[=E4=BF=AE=E5=A4=8D]=E5=AE=9A=E6=97=B6?= =?UTF-8?q?=E6=B8=85=E7=90=86=E8=87=AA=E5=8A=A8=E5=8C=96=E4=BD=9C=E4=B8=9A?= =?UTF-8?q?=EF=BC=8C=E6=B2=A1=E6=9C=89=E6=B8=85=E9=99=A4runner=E7=AB=AFaut?= =?UTF-8?q?oexec=E7=9A=84=E5=8E=86=E5=8F=B2=E4=BD=9C=E4=B8=9A=E6=96=87?= =?UTF-8?q?=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../api/job/JobDataPurgeApi.java | 34 +++++++++++++++---- 1 file changed, 27 insertions(+), 7 deletions(-) diff --git a/src/main/java/com/neatlogic/autoexecrunner/api/job/JobDataPurgeApi.java b/src/main/java/com/neatlogic/autoexecrunner/api/job/JobDataPurgeApi.java index b481b35..1369dcf 100755 --- a/src/main/java/com/neatlogic/autoexecrunner/api/job/JobDataPurgeApi.java +++ b/src/main/java/com/neatlogic/autoexecrunner/api/job/JobDataPurgeApi.java @@ -18,10 +18,18 @@ import com.alibaba.fastjson.JSONObject; import com.neatlogic.autoexecrunner.constvalue.JobAction; import com.neatlogic.autoexecrunner.core.ExecProcessCommand; import com.neatlogic.autoexecrunner.dto.CommandVo; +import com.neatlogic.autoexecrunner.exception.core.ApiRuntimeException; import com.neatlogic.autoexecrunner.restful.core.privateapi.PrivateApiComponentBase; -import com.neatlogic.autoexecrunner.threadpool.CommonThreadPool; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; +import java.io.DataInputStream; +import java.io.InputStreamReader; +import java.io.StringWriter; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -32,6 +40,8 @@ import java.util.List; **/ @Component public class JobDataPurgeApi extends PrivateApiComponentBase { + private static final Logger logger = LoggerFactory.getLogger(ExecProcessCommand.class); + @Override public String getName() { return "清除历史作业"; @@ -44,13 +54,23 @@ public class JobDataPurgeApi extends PrivateApiComponentBase { //set command List commandList = Arrays.asList("autoexec", "--purgejobdata", jsonObj.getString("expiredDays")); commandList = new ArrayList<>(commandList); - if(commandVo.getPassThroughEnv() != null){ - commandList.add("--passthroughenv"); - commandList.add(commandVo.getPassThroughEnv().toString()); +// if (commandVo.getPassThroughEnv() != null) { +// commandList.add("--passthroughenv"); +// commandList.add(commandVo.getPassThroughEnv().toString()); +// } + + ProcessBuilder builder = new ProcessBuilder(commandList); + Process proc = builder.start(); + proc.waitFor(); + DataInputStream input = new DataInputStream(proc.getErrorStream()); + StringWriter writer = new StringWriter(); + InputStreamReader reader = new InputStreamReader(input, StandardCharsets.UTF_8); + IOUtils.copy(reader, writer); + if (StringUtils.isNotBlank(writer.toString())) { + logger.error(writer.toString()); + throw new ApiRuntimeException(writer.toString()); } - commandVo.setCommandList(commandList); - ExecProcessCommand processCommand = new ExecProcessCommand(commandVo); - CommonThreadPool.execute(processCommand); + return null; } -- Gitee From 36d555b95ab711dc5c0717a3862113e8a0317efb Mon Sep 17 00:00:00 2001 From: lvzk <897706680@qq.com> Date: Fri, 6 Jun 2025 15:10:55 +0800 Subject: [PATCH 11/15] =?UTF-8?q?[=E5=8A=9F=E8=83=BD]=E6=8E=92=E9=98=9F?= =?UTF-8?q?=E6=97=B6=E6=94=AF=E6=8C=81=E6=9A=82=E5=81=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/neatlogic/autoexecrunner/api/job/JobPauseApi.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/neatlogic/autoexecrunner/api/job/JobPauseApi.java b/src/main/java/com/neatlogic/autoexecrunner/api/job/JobPauseApi.java index c65d06a..366b728 100755 --- a/src/main/java/com/neatlogic/autoexecrunner/api/job/JobPauseApi.java +++ b/src/main/java/com/neatlogic/autoexecrunner/api/job/JobPauseApi.java @@ -20,6 +20,7 @@ import com.neatlogic.autoexecrunner.constvalue.JobAction; import com.neatlogic.autoexecrunner.core.ExecProcessCommand; import com.neatlogic.autoexecrunner.dto.CommandVo; import com.neatlogic.autoexecrunner.restful.core.privateapi.PrivateApiComponentBase; +import com.neatlogic.autoexecrunner.startup.handler.AutoexecQueueThread; import com.neatlogic.autoexecrunner.threadpool.CommonThreadPool; import org.springframework.stereotype.Component; @@ -50,8 +51,11 @@ public class JobPauseApi extends PrivateApiComponentBase { commandList.add(commandVo.getPassThroughEnv().toString()); } commandVo.setCommandList(commandList); - ExecProcessCommand processCommand = new ExecProcessCommand(commandVo); - CommonThreadPool.execute(processCommand); + //队列里不存在才执行pause命令 + if (!AutoexecQueueThread.removeCommand(commandVo)) { + ExecProcessCommand processCommand = new ExecProcessCommand(commandVo); + CommonThreadPool.execute(processCommand); + } return null; } -- Gitee From 3d2bfa639560153819b1147aa599f96307289e75 Mon Sep 17 00:00:00 2001 From: lvzk <897706680@qq.com> Date: Mon, 9 Jun 2025 11:35:18 +0800 Subject: [PATCH 12/15] =?UTF-8?q?[=E4=BF=AE=E5=A4=8D]=E5=8B=BE=E9=80=89?= =?UTF-8?q?=E5=A4=9A=E8=8A=82=E7=82=B9=E9=87=8D=E7=BD=AE=E6=B2=A1=E6=9C=89?= =?UTF-8?q?=E5=88=A0=E9=99=A4=E5=AF=B9=E5=BA=94=E8=8A=82=E7=82=B9=E7=9A=84?= =?UTF-8?q?=E7=8A=B6=E6=80=81=E6=96=87=E4=BB=B6=EF=BC=8C=E5=AF=BC=E8=87=B4?= =?UTF-8?q?=E9=87=8D=E7=BD=AE=E6=97=A0=E6=95=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../api/job/node/JobPhaseNodeStatusResetApi.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/neatlogic/autoexecrunner/api/job/node/JobPhaseNodeStatusResetApi.java b/src/main/java/com/neatlogic/autoexecrunner/api/job/node/JobPhaseNodeStatusResetApi.java index dd029c8..033709d 100755 --- a/src/main/java/com/neatlogic/autoexecrunner/api/job/node/JobPhaseNodeStatusResetApi.java +++ b/src/main/java/com/neatlogic/autoexecrunner/api/job/node/JobPhaseNodeStatusResetApi.java @@ -70,10 +70,11 @@ public class JobPhaseNodeStatusResetApi extends PrivateApiComponentBase { String execMode = jsonObj.getString("execMode"); JSONArray phaseNodeList = jsonObj.getJSONArray("phaseNodeList"); JSONArray jobPhaseNodeSqlList = jsonObj.getJSONArray("jobPhaseNodeSqlList"); - StringBuilder nodeStatusPath = new StringBuilder(Config.AUTOEXEC_HOME() + File.separator + JobUtil.getJobPath(jobId.toString(), new StringBuilder()) + File.separator + "status" + File.separator + phase + File.separator); + //重置单个或多个节点 if (CollectionUtils.isNotEmpty(phaseNodeList)) { for (int i = 0; i < phaseNodeList.size(); i++) { + StringBuilder nodeStatusPath = new StringBuilder(Config.AUTOEXEC_HOME() + File.separator + JobUtil.getJobPath(jobId.toString(), new StringBuilder()) + File.separator + "status" + File.separator + phase + File.separator); //删除db对应的status记录 JSONObject node = phaseNodeList.getJSONObject(i); String host = node.getString("host"); @@ -99,6 +100,7 @@ public class JobPhaseNodeStatusResetApi extends PrivateApiComponentBase { } if (CollectionUtils.isNotEmpty(jobPhaseNodeSqlList)) { for (int i = 0; i < jobPhaseNodeSqlList.size(); i++) { + StringBuilder nodeStatusPath = new StringBuilder(Config.AUTOEXEC_HOME() + File.separator + JobUtil.getJobPath(jobId.toString(), new StringBuilder()) + File.separator + "status" + File.separator + phase + File.separator); JSONObject node = jobPhaseNodeSqlList.getJSONObject(i); String host = node.getString("host"); Integer port = node.getInteger("port"); @@ -108,6 +110,7 @@ public class JobPhaseNodeStatusResetApi extends PrivateApiComponentBase { } } else { //重置整个phase + String nodeStatusPath = Config.AUTOEXEC_HOME() + File.separator + JobUtil.getJobPath(jobId.toString(), new StringBuilder()) + File.separator + "status" + File.separator + phase + File.separator; Document document = new Document(); document.put("jobId", jobId.toString()); document.put("phase", phase); @@ -118,7 +121,7 @@ public class JobPhaseNodeStatusResetApi extends PrivateApiComponentBase { throw new MongodbException(); } //删除对应status文件记录 - FileUtil.deleteDirectoryOrFile(nodeStatusPath.toString()); + FileUtil.deleteDirectoryOrFile(nodeStatusPath); } return null; } -- Gitee From 1ce5bcdfc35fc226ee8633b6979622e4a218a6f9 Mon Sep 17 00:00:00 2001 From: lvzk <897706680@qq.com> Date: Mon, 9 Jun 2025 19:21:42 +0800 Subject: [PATCH 13/15] =?UTF-8?q?[=E4=BF=AE=E5=A4=8D]=E5=8B=BE=E9=80=89?= =?UTF-8?q?=E5=A4=9A=E8=8A=82=E7=82=B9=E9=87=8D=E7=BD=AE=E6=B2=A1=E6=9C=89?= =?UTF-8?q?=E5=88=A0=E9=99=A4=E5=AF=B9=E5=BA=94=E8=8A=82=E7=82=B9=E7=9A=84?= =?UTF-8?q?=E7=8A=B6=E6=80=81=E6=96=87=E4=BB=B6=EF=BC=8C=E5=AF=BC=E8=87=B4?= =?UTF-8?q?=E9=87=8D=E7=BD=AE=E6=97=A0=E6=95=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../api/job/node/JobPhaseNodeStatusResetApi.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/java/com/neatlogic/autoexecrunner/api/job/node/JobPhaseNodeStatusResetApi.java b/src/main/java/com/neatlogic/autoexecrunner/api/job/node/JobPhaseNodeStatusResetApi.java index 033709d..46aa63c 100755 --- a/src/main/java/com/neatlogic/autoexecrunner/api/job/node/JobPhaseNodeStatusResetApi.java +++ b/src/main/java/com/neatlogic/autoexecrunner/api/job/node/JobPhaseNodeStatusResetApi.java @@ -97,8 +97,7 @@ public class JobPhaseNodeStatusResetApi extends PrivateApiComponentBase { } FileUtil.deleteDirectoryOrFile(nodeStatusPath.toString()); } - } - if (CollectionUtils.isNotEmpty(jobPhaseNodeSqlList)) { + } else if (CollectionUtils.isNotEmpty(jobPhaseNodeSqlList)) { for (int i = 0; i < jobPhaseNodeSqlList.size(); i++) { StringBuilder nodeStatusPath = new StringBuilder(Config.AUTOEXEC_HOME() + File.separator + JobUtil.getJobPath(jobId.toString(), new StringBuilder()) + File.separator + "status" + File.separator + phase + File.separator); JSONObject node = jobPhaseNodeSqlList.getJSONObject(i); -- Gitee From 650ff7efa9fd365607ac855b0f926e3345edb95d Mon Sep 17 00:00:00 2001 From: lvzk <897706680@qq.com> Date: Wed, 11 Jun 2025 18:46:53 +0800 Subject: [PATCH 14/15] =?UTF-8?q?[=E4=BF=AE=E5=A4=8D]spring=E6=BC=8F?= =?UTF-8?q?=E6=B4=9E=20=E5=BD=93=E5=BA=94=E7=94=A8=E7=A8=8B=E5=BA=8F?= =?UTF-8?q?=E4=BD=BF=E7=94=A8RouterFunctions=20=E6=9D=A5=E5=A4=84=E7=90=86?= =?UTF-8?q?=E9=9D=99=E6=80=81=E8=B5=84=E6=BA=90=E7=9B=AE=E8=B5=84=E6=BA=90?= =?UTF-8?q?=E5=A4=84=E7=90=86=E9=80=9A=E8=BF=87FileSystemResource=E8=BF=9B?= =?UTF-8?q?=E8=A1=8C=E9=85=8D=E7=BD=AE=E6=97=B6=EF=BC=8C=E6=94=BB=E5=87=BB?= =?UTF-8?q?=E8=80=85=E5=8F=AF=E4=BB=A5=E9=80=9A=E8=BF=87=E6=9E=84=E9=80=A0?= =?UTF-8?q?=E6=81=B6=E6=84=8FHTTP=E8=AF=B7=E6=B1=82=EF=BC=8C=E5=88=A9?= =?UTF-8?q?=E7=94=A8=E8=B7=AF=E5=BE=84=E9=81=8D=E5=8E=86=E6=BC=8F=E6=B4=9E?= =?UTF-8?q?=E8=8E=B7=E5=8F=96=E6=96=87=E4=BB=B6=E7=B3=BB=E7=BB=9F=E4=B8=AD?= =?UTF-8?q?=E7=9A=84=E4=BB=BB=E6=84=8F=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../filter/PathTraversalFilter.java | 58 +++++++++++++++++++ .../filter/ResourcesConfig.java | 51 ++++++++++++++++ 2 files changed, 109 insertions(+) create mode 100644 src/main/java/com/neatlogic/autoexecrunner/filter/PathTraversalFilter.java create mode 100644 src/main/java/com/neatlogic/autoexecrunner/filter/ResourcesConfig.java diff --git a/src/main/java/com/neatlogic/autoexecrunner/filter/PathTraversalFilter.java b/src/main/java/com/neatlogic/autoexecrunner/filter/PathTraversalFilter.java new file mode 100644 index 0000000..d911ace --- /dev/null +++ b/src/main/java/com/neatlogic/autoexecrunner/filter/PathTraversalFilter.java @@ -0,0 +1,58 @@ +/* + * Copyright (C) 2025 深圳极向量科技有限公司 All Rights Reserved. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package com.neatlogic.autoexecrunner.filter; + + +import javax.servlet.*; +import javax.servlet.annotation.WebFilter; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +@WebFilter(filterName = "pathTraversalFilter",urlPatterns = {"/*"}) +public class PathTraversalFilter implements Filter { + + // 需要检查的请求参数类型 + private static final String[] CHECKED_PARAMS = { + "fileName", "filePath", "path", "download" + }; + + @Override + public void doFilter(ServletRequest request, ServletResponse response, FilterChain filterChain) throws IOException, ServletException { + HttpServletRequest httpRequest = (HttpServletRequest) request; + HttpServletResponse httpResponse = (HttpServletResponse) response; + // 检测所有参数值 + for (String paramName : CHECKED_PARAMS) { + String paramValue = httpRequest.getParameter(paramName); + if (paramValue != null && isUnsafePath(paramValue)) { + httpResponse.sendError(HttpServletResponse.SC_FORBIDDEN, "Invalid path detected"); + return; + } + } + filterChain.doFilter(request, response); + } + + private boolean isUnsafePath(String value) { + // 检测多种路径遍历模式(包含URL编码形式) + return value.contains("../") + || value.contains("..\\") + || value.contains("%2e%2e/") + || value.contains("%2e%2e%2f") + || value.contains("..%2f") + || value.matches(".*\\b(?:absolute|true)path\\b.*"); + } +} diff --git a/src/main/java/com/neatlogic/autoexecrunner/filter/ResourcesConfig.java b/src/main/java/com/neatlogic/autoexecrunner/filter/ResourcesConfig.java new file mode 100644 index 0000000..8023b3b --- /dev/null +++ b/src/main/java/com/neatlogic/autoexecrunner/filter/ResourcesConfig.java @@ -0,0 +1,51 @@ +/* + * Copyright (C) 2025 深圳极向量科技有限公司 All Rights Reserved. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package com.neatlogic.autoexecrunner.filter; + +import org.springframework.context.annotation.Configuration; +import org.springframework.core.io.Resource; +import org.springframework.web.servlet.config.annotation.ResourceHandlerRegistry; +import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; +import org.springframework.web.servlet.resource.PathResourceResolver; + +import java.io.IOException; + +@Configuration +public class ResourcesConfig implements WebMvcConfigurer { + + @Override + public void addResourceHandlers(ResourceHandlerRegistry registry) { + System.out.println("ResourcesConfig addResourceHandlers registry = " + registry); + registry.addResourceHandler("/static/**") + .addResourceLocations("classpath:/static/") + .setCachePeriod(3600) + .resourceChain(true) + .addResolver(new PathResourceResolver() { + @Override + protected Resource getResource(String resourcePath, Resource location) throws IOException { + System.out.println("ResourcesConfig addResourceHandlers resourcePath = " + resourcePath); + System.out.println("ResourcesConfig addResourceHandlers location = " + location); + // 检查路径是否合法,避免路径遍历 + if (resourcePath.contains("./") || resourcePath.contains("..")) { + return null; // 返回 null 表示不允许访问 + } + return super.getResource(resourcePath, location); + } + }); + } +} -- Gitee From f87bddb765acc345fd0df37f61e73c82c56e70cf Mon Sep 17 00:00:00 2001 From: lvzk <897706680@qq.com> Date: Mon, 16 Jun 2025 16:10:07 +0800 Subject: [PATCH 15/15] =?UTF-8?q?[=E5=8A=9F=E8=83=BD]=E5=88=A0=E9=99=A4?= =?UTF-8?q?=E6=97=A0=E7=94=A8=E4=BE=9D=E8=B5=96snakeyaml?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pom.xml b/pom.xml index bcde623..10c5d52 100755 --- a/pom.xml +++ b/pom.xml @@ -85,6 +85,10 @@ slf4j-api org.slf4j + + snakeyaml + org.yaml + -- Gitee