From 168050dcda7cea22bd0760f850ae47d997189758 Mon Sep 17 00:00:00 2001 From: "xueli.xue" Date: Fri, 3 Jun 2016 13:40:29 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8A=9F=E8=83=BD=E5=AE=8C=E5=96=84=EF=BC=9A?= =?UTF-8?q?=201=E3=80=81jetty=E5=85=B3=E9=97=AD=E4=BC=98=E5=8C=96=EF=BC=88?= =?UTF-8?q?=E6=9D=A5=E8=87=AAosc=E4=B8=8A=E5=A5=BD=E5=8F=8BQQ2575029833?= =?UTF-8?q?=E7=9A=84pr=EF=BC=89=EF=BC=9B=202=E3=80=81=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E7=BB=88=E6=AD=A2=E6=97=B6=E5=9B=9E=E8=B0=83=E4=BC=98=E5=8C=96?= =?UTF-8?q?=EF=BC=8C=E6=89=A7=E8=A1=8C=E9=98=9F=E5=88=97=E4=B8=AD=E7=9A=84?= =?UTF-8?q?=E8=B0=83=E5=BA=A6=E8=BF=9B=E8=A1=8C=E5=9B=9E=E8=B0=83=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 8 +++++++ .../callback/XxlJobLogCallbackServer.java | 13 ++++++++++- .../admin/core/util/DynamicSchedulerUtil.java | 11 +++++++++- .../resources/applicationcontext-xxl-job.xml | 2 +- .../core/executor/jetty/XxlJobExecutor.java | 15 ++++++++++++- .../xxl/job/core/handler/HandlerThread.java | 22 +++++++++++++++++++ .../resources/applicationcontext-xxl-job.xml | 2 +- 7 files changed, 68 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 70c8c2e7..41c19d3a 100644 --- a/README.md +++ b/README.md @@ -71,6 +71,13 @@ git.osc地址:http://git.oschina.net/xuxueli0323/xxl-job 2、执行器异步回调执行日志; 3、【重要】在 “调度中心” 支持HA的基础上,扩展执行器的Failover支持,支持配置多执行期地址; +# 规划中 + 1、任务终止时,任务队列中调度回调通过被终止的接口; + 2、任务执行规则自定义:假如前一个任务正在执行,后续调度执行规则支持自定义; + 串行(默认,当前逻辑):后续调度入调度队列; + 并行:后续调度并行执行; + Pass:后续调度被Pass; + # 源码目录说明 /xxl-job-admin 【调度中心】:负责管理调度信息,按照调度配置发出调度请求; /xxl-job-core 公共依赖 @@ -87,3 +94,4 @@ git.osc地址:http://git.oschina.net/xuxueli0323/xxl-job 4、人人聚财金服; 5、…… 更多接入公司,欢迎在https://github.com/xuxueli/xxl-job/issues/1 登记。 + diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/callback/XxlJobLogCallbackServer.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/callback/XxlJobLogCallbackServer.java index 5ad3c418..f884bc87 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/callback/XxlJobLogCallbackServer.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/callback/XxlJobLogCallbackServer.java @@ -22,6 +22,7 @@ public class XxlJobLogCallbackServer { return trigger_log_address; } + Server server = null; public void start(int callBackPort) throws Exception { // init address @@ -32,7 +33,7 @@ public class XxlJobLogCallbackServer { new Thread(new Runnable() { @Override public void run() { - Server server = new Server(); + server = new Server(); server.setThreadPool(new ExecutorThreadPool(200, 200, 30000)); // 非阻塞 // connector @@ -59,4 +60,14 @@ public class XxlJobLogCallbackServer { } + public void destroy() { + if (server!=null) { + try { + server.stop(); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/util/DynamicSchedulerUtil.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/util/DynamicSchedulerUtil.java index aed4f79a..a200b6d3 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/util/DynamicSchedulerUtil.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/util/DynamicSchedulerUtil.java @@ -55,15 +55,24 @@ public final class DynamicSchedulerUtil implements ApplicationContextAware, Init } // init + XxlJobLogCallbackServer xxlJobLogCallbackServer = null; public void init(){ try { // start callback server - new XxlJobLogCallbackServer().start(callBackPort); + xxlJobLogCallbackServer = new XxlJobLogCallbackServer(); + xxlJobLogCallbackServer.start(callBackPort); } catch (Exception e) { e.printStackTrace(); } } + // destroy + public void destroy(){ + if (xxlJobLogCallbackServer!=null) { + xxlJobLogCallbackServer.destroy(); + } + } + // xxlJobLogDao、xxlJobInfoDao public static IXxlJobLogDao xxlJobLogDao; public static IXxlJobInfoDao xxlJobInfoDao; diff --git a/xxl-job-admin/src/main/resources/applicationcontext-xxl-job.xml b/xxl-job-admin/src/main/resources/applicationcontext-xxl-job.xml index e01d121e..29c9c77e 100644 --- a/xxl-job-admin/src/main/resources/applicationcontext-xxl-job.xml +++ b/xxl-job-admin/src/main/resources/applicationcontext-xxl-job.xml @@ -18,7 +18,7 @@ - + diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/executor/jetty/XxlJobExecutor.java b/xxl-job-core/src/main/java/com/xxl/job/core/executor/jetty/XxlJobExecutor.java index 073f21cc..96f89a20 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/executor/jetty/XxlJobExecutor.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/executor/jetty/XxlJobExecutor.java @@ -29,12 +29,14 @@ public class XxlJobExecutor implements ApplicationContextAware { this.port = port; } + // ---------------------------------- job server ------------------------------------ + Server server = null; public void start() throws Exception { new Thread(new Runnable() { @Override public void run() { - Server server = new Server(); + server = new Server(); server.setThreadPool(new ExecutorThreadPool(200, 200, 30000)); // 非阻塞 // connector @@ -60,7 +62,18 @@ public class XxlJobExecutor implements ApplicationContextAware { }).start(); } + + public void destroy(){ + if (server!=null) { + try { + server.stop(); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + // ---------------------------------- init job handler ------------------------------------ public static ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/handler/HandlerThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/handler/HandlerThread.java index 67a4e5d8..8896a5a9 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/handler/HandlerThread.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/handler/HandlerThread.java @@ -96,6 +96,12 @@ public class HandlerThread extends Thread{ params.put("status", _status.name()); params.put("msg", _msg); HandlerRepository.pushCallBack(HttpUtil.addressToUrl(log_address), params); + } else { + HashMap params = new HashMap(); + params.put("log_id", log_id); + params.put("status", JobHandleStatus.FAIL.name()); + params.put("msg", "人工手动终止[业务运行中,被强制终止]"); + HandlerRepository.pushCallBack(HttpUtil.addressToUrl(log_address), params); } } else { i++; @@ -113,6 +119,22 @@ public class HandlerThread extends Thread{ logger.info("HandlerThread Exception:", e); } } + + // callback trigger request in queue + while(handlerDataQueue!=null && handlerDataQueue.size()>0){ + Map handlerData = handlerDataQueue.poll(); + if (handlerData!=null) { + String log_address = handlerData.get(HandlerParamEnum.LOG_ADDRESS.name()); + String log_id = handlerData.get(HandlerParamEnum.LOG_ID.name()); + + HashMap params = new HashMap(); + params.put("log_id", log_id); + params.put("status", JobHandleStatus.FAIL.name()); + params.put("msg", "人工手动终止[任务尚未执行,在调度队列中被终止]"); + HandlerRepository.pushCallBack(HttpUtil.addressToUrl(log_address), params); + } + } + logger.info(">>>>>>>>>>>> xxl-job handlerThrad stoped, hashCode:{}", Thread.currentThread()); } } diff --git a/xxl-job-executor-example/src/main/resources/applicationcontext-xxl-job.xml b/xxl-job-executor-example/src/main/resources/applicationcontext-xxl-job.xml index 79bdb8e1..cef7668a 100644 --- a/xxl-job-executor-example/src/main/resources/applicationcontext-xxl-job.xml +++ b/xxl-job-executor-example/src/main/resources/applicationcontext-xxl-job.xml @@ -12,7 +12,7 @@ - +