From 7e3508876498516f3117bcf660a175c614135a45 Mon Sep 17 00:00:00 2001 From: xuxueli <931591021@qq.com> Date: Tue, 27 Jun 2017 22:02:46 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B7=AF=E7=94=B1=E7=AD=96=E7=95=A5=E6=96=B0?= =?UTF-8?q?=E5=A2=9E=20"=E5=BF=99=E7=A2=8C=E8=BD=AC=E7=A7=BB"=20=E6=A8=A1?= =?UTF-8?q?=E5=BC=8F=EF=BC=9A=E6=8C=89=E7=85=A7=E9=A1=BA=E5=BA=8F=E4=BE=9D?= =?UTF-8?q?=E6=AC=A1=E8=BF=9B=E8=A1=8C=E7=A9=BA=E9=97=B2=E6=A3=80=E6=B5=8B?= =?UTF-8?q?=EF=BC=8C=E7=AC=AC=E4=B8=80=E4=B8=AA=E7=A9=BA=E9=97=B2=E6=A3=80?= =?UTF-8?q?=E6=B5=8B=E6=88=90=E5=8A=9F=E7=9A=84=E6=9C=BA=E5=99=A8=E9=80=89?= =?UTF-8?q?=E5=AE=9A=E4=B8=BA=E7=9B=AE=E6=A0=87=E6=89=A7=E8=A1=8C=E5=99=A8?= =?UTF-8?q?=E5=B9=B6=E5=8F=91=E8=B5=B7=E8=B0=83=E5=BA=A6=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 3 +- .../admin/core/jobbean/RemoteHttpJobBean.java | 130 +++++++++--------- .../core/route/ExecutorRouteStrategyEnum.java | 3 +- .../com/xxl/job/core/biz/ExecutorBiz.java | 8 ++ .../job/core/biz/impl/ExecutorBizImpl.java | 16 +++ .../core/thread/ExecutorRegistryThread.java | 3 +- 6 files changed, 98 insertions(+), 65 deletions(-) diff --git a/README.md b/README.md index bb5a9b55..b5fe19ff 100644 --- a/README.md +++ b/README.md @@ -857,6 +857,7 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段 - 1、任务Cron更新逻辑优化,改为rescheduleJob,同时防止cron重复设置; - 2、优化:API回调服务失败状态码优化,方便问题排查; - 3、XxlJobLogger的日志多参数支持; +- 4、路由策略新增 "忙碌转移" 模式:按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度; #### TODO LIST - 1、任务权限管理:执行器为粒度分配权限,核心操作校验权限; @@ -868,7 +869,7 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段 - 7、调度任务优先级; - 8、移除quartz依赖,重写调度模块:新增或恢复任务时将下次执行记录插入delayqueue,调度中心集群竞争分布式锁,成功节点批量加载到期delayqueue数据,批量执行。 - 9、任务线程轮空30次后自动销毁,降低低频任务的无效线程消耗。 -- 10、路由策略新增 "忙碌转移" 模式,发现执行器运行中,主动转移下一个执行器调度任务; + ## 七、其他 diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java index fba3dd54..13be274f 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java @@ -114,78 +114,84 @@ public class RemoteHttpJobBean extends QuartzJobBean { return new ReturnT(ReturnT.FAIL_CODE, triggerSb.toString()); } + // executor route strategy + ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); + if (executorRouteStrategyEnum == null) { + triggerSb.append("
----------------------
").append("调度失败:").append("执行器路由策略为空"); + return new ReturnT(ReturnT.FAIL_CODE, triggerSb.toString()); + } + triggerSb.append("
路由策略:").append(executorRouteStrategyEnum.name() + "-" + executorRouteStrategyEnum.getTitle()); + // trigger remote executor - if (addressList.size() == 1) { - String address = addressList.get(0); + if (executorRouteStrategyEnum == ExecutorRouteStrategyEnum.FAILOVER) { + for (String address : addressList) { + // beat + ReturnT beatResult = null; + try { + ExecutorBiz executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, address).getObject(); + beatResult = executorBiz.beat(); + } catch (Exception e) { + logger.error("", e); + beatResult = new ReturnT(ReturnT.FAIL_CODE, ""+e ); + } + triggerSb.append("
----------------------
") + .append("心跳检测:") + .append("
address:").append(address) + .append("
code:").append(beatResult.getCode()) + .append("
msg:").append(beatResult.getMsg()); + + // beat success + if (beatResult.getCode() == ReturnT.SUCCESS_CODE) { + jobLog.setExecutorAddress(address); + + ReturnT runResult = runExecutor(triggerParam, address); + triggerSb.append("
----------------------
").append(runResult.getMsg()); + + return new ReturnT(runResult.getCode(), triggerSb.toString()); + } + } + return new ReturnT(ReturnT.FAIL_CODE, triggerSb.toString()); + } else if (executorRouteStrategyEnum == ExecutorRouteStrategyEnum.BUSYOVER) { + for (String address : addressList) { + // beat + ReturnT idleBeatResult = null; + try { + ExecutorBiz executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, address).getObject(); + idleBeatResult = executorBiz.idleBeat(triggerParam.getJobId()); + } catch (Exception e) { + logger.error("", e); + idleBeatResult = new ReturnT(ReturnT.FAIL_CODE, ""+e ); + } + triggerSb.append("
----------------------
") + .append("空闲检测:") + .append("
address:").append(address) + .append("
code:").append(idleBeatResult.getCode()) + .append("
msg:").append(idleBeatResult.getMsg()); + + // beat success + if (idleBeatResult.getCode() == ReturnT.SUCCESS_CODE) { + jobLog.setExecutorAddress(address); + + ReturnT runResult = runExecutor(triggerParam, address); + triggerSb.append("
----------------------
").append(runResult.getMsg()); + + return new ReturnT(runResult.getCode(), triggerSb.toString()); + } + } + return new ReturnT(ReturnT.FAIL_CODE, triggerSb.toString()); + } else { + // get address + String address = executorRouteStrategyEnum.getRouter().route(jobInfo.getId(), addressList); jobLog.setExecutorAddress(address); + // run ReturnT runResult = runExecutor(triggerParam, address); triggerSb.append("
----------------------
").append(runResult.getMsg()); return new ReturnT(runResult.getCode(), triggerSb.toString()); - } else { - // executor route strategy - ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); - triggerSb.append("
路由策略:").append(executorRouteStrategyEnum!=null?(executorRouteStrategyEnum.name() + "-" + executorRouteStrategyEnum.getTitle()):null); - if (executorRouteStrategyEnum == null) { - triggerSb.append("
----------------------
").append("调度失败:").append("执行器路由策略为空"); - return new ReturnT(ReturnT.FAIL_CODE, triggerSb.toString()); - } - - if (executorRouteStrategyEnum != ExecutorRouteStrategyEnum.FAILOVER) { - // get address - String address = executorRouteStrategyEnum.getRouter().route(jobInfo.getId(), addressList); - jobLog.setExecutorAddress(address); - - // run - ReturnT runResult = runExecutor(triggerParam, address); - triggerSb.append("
----------------------
").append(runResult.getMsg()); - - return new ReturnT(runResult.getCode(), triggerSb.toString()); - } else { - for (String address : addressList) { - // beat - ReturnT beatResult = beatExecutor(address); - triggerSb.append("
----------------------
").append(beatResult.getMsg()); - - if (beatResult.getCode() == ReturnT.SUCCESS_CODE) { - jobLog.setExecutorAddress(address); - - ReturnT runResult = runExecutor(triggerParam, address); - triggerSb.append("
----------------------
").append(runResult.getMsg()); - - return new ReturnT(runResult.getCode(), triggerSb.toString()); - } - } - return new ReturnT(ReturnT.FAIL_CODE, triggerSb.toString()); - } } } - /** - * run executor - * @param address - * @return - */ - public ReturnT beatExecutor(String address){ - ReturnT beatResult = null; - try { - ExecutorBiz executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, address).getObject(); - beatResult = executorBiz.beat(); - } catch (Exception e) { - logger.error("", e); - beatResult = new ReturnT(ReturnT.FAIL_CODE, ""+e ); - } - - StringBuffer sb = new StringBuffer("心跳检测:"); - sb.append("
address:").append(address); - sb.append("
code:").append(beatResult.getCode()); - sb.append("
msg:").append(beatResult.getMsg()); - beatResult.setMsg(sb.toString()); - - return beatResult; - } - /** * run executor * @param triggerParam diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/ExecutorRouteStrategyEnum.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/ExecutorRouteStrategyEnum.java index 99244d2b..2efea6a0 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/ExecutorRouteStrategyEnum.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/ExecutorRouteStrategyEnum.java @@ -14,7 +14,8 @@ public enum ExecutorRouteStrategyEnum { CONSISTENT_HASH("一致性HASH", new ExecutorRouteConsistentHash()), LEAST_FREQUENTLY_USED("最不经常使用", new ExecutorRouteLFU()), LEAST_RECENTLY_USED("最近最久未使用", new ExecutorRouteLRU()), - FAILOVER("故障转移", null); + FAILOVER("故障转移", null), + BUSYOVER("忙碌转移", null); ExecutorRouteStrategyEnum(String title, ExecutorRouter router) { this.title = title; diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/biz/ExecutorBiz.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/ExecutorBiz.java index 40465725..6051e095 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/biz/ExecutorBiz.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/biz/ExecutorBiz.java @@ -15,6 +15,14 @@ public interface ExecutorBiz { */ public ReturnT beat(); + /** + * idle beat + * + * @param jobId + * @return + */ + public ReturnT idleBeat(int jobId); + /** * kill * @param jobId diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java index 0b408b29..57547676 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java @@ -29,6 +29,22 @@ public class ExecutorBizImpl implements ExecutorBiz { return ReturnT.SUCCESS; } + @Override + public ReturnT idleBeat(int jobId) { + + // isRunningOrHasQueue + boolean isRunningOrHasQueue = false; + JobThread jobThread = XxlJobExecutor.loadJobThread(jobId); + if (jobThread != null && jobThread.isRunningOrHasQueue()) { + isRunningOrHasQueue = true; + } + + if (isRunningOrHasQueue) { + return new ReturnT(ReturnT.FAIL_CODE, "job thread is running or has trigger queue."); + } + return ReturnT.SUCCESS; + } + @Override public ReturnT kill(int jobId) { // kill handlerThread, and create new one diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java index ac3ab039..c567b9bb 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java @@ -46,7 +46,8 @@ public class ExecutorRegistryThread extends Thread { try { RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, executorAddress); ReturnT registryResult = AdminApiUtil.callApiFailover(AdminApiUtil.REGISTRY, registryParam); - logger.info(">>>>>>>>>>> xxl-job registry, RegistryParam:{}, registryResult:{}", new Object[]{registryParam.toString(), registryResult.toString()}); + logger.info(">>>>>>>>>>> xxl-job Executor registry {}, RegistryParam:{}, registryResult:{}", + new Object[]{(registryResult.getCode()==ReturnT.SUCCESS_CODE?"success":"fail"), registryParam.toString(), registryResult.toString()}); } catch (Exception e) { logger.error(">>>>>>>>>>> xxl-job ExecutorRegistryThread Exception:", e); }