From 293ffca14d32c0a25f7771b6e6b9eb3ab784a956 Mon Sep 17 00:00:00 2001 From: xuxueli <931591021@qq.com> Date: Thu, 20 Jun 2019 16:59:18 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E5=BA=A6=E7=BB=84=E4=BB=B6=E9=94=80?= =?UTF-8?q?=E6=AF=81=E6=B5=81=E7=A8=8B=E4=BC=98=E5=8C=96=EF=BC=8C=E5=85=88?= =?UTF-8?q?=E5=81=9C=E6=AD=A2=E8=B0=83=E5=BA=A6=E7=BA=BF=E7=A8=8B=EF=BC=8C?= =?UTF-8?q?=E7=84=B6=E5=90=8E=E7=AD=89=E5=BE=85=E6=97=B6=E9=97=B4=E8=BD=AE?= =?UTF-8?q?=E5=86=85=E5=AD=98=E9=87=8F=E4=BB=BB=E5=8A=A1=E5=A4=84=E7=90=86?= =?UTF-8?q?=E5=AE=8C=E6=88=90=EF=BC=8C=E6=9C=80=E7=BB=88=E9=94=80=E6=AF=81?= =?UTF-8?q?=E6=97=B6=E9=97=B4=E8=BD=AE=E7=BA=BF=E7=A8=8B=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- doc/XXL-JOB官方文档.md | 1 + .../admin/core/thread/JobScheduleHelper.java | 71 ++++++++++++++----- 2 files changed, 56 insertions(+), 16 deletions(-) diff --git a/doc/XXL-JOB官方文档.md b/doc/XXL-JOB官方文档.md index 54eed9eb..ea404743 100644 --- a/doc/XXL-JOB官方文档.md +++ b/doc/XXL-JOB官方文档.md @@ -1477,6 +1477,7 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段 - 11、升级xxl-rpc至较新版本,修复代理服务初始化时远程服务不可用导致长连冗余创建的问题; - 12、首页调度报表的日期排序在TIDB下乱序问题修复; - 13、调度中心与执行器双向通讯超时时间调整为3s; +- 14、调度组件销毁流程优化,先停止调度线程,然后等待时间轮内存量任务处理完成,最终销毁时间轮线程; ### 6.26 版本 v2.1.1 Release Notes[规划中] diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java index e759c931..b9f54446 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java @@ -27,7 +27,8 @@ public class JobScheduleHelper { private Thread scheduleThread; private Thread ringThread; - private volatile boolean toStop = false; + private volatile boolean scheduleThreadToStop = false; + private volatile boolean ringThreadToStop = false; private volatile static Map> ringData = new ConcurrentHashMap<>(); public void start(){ @@ -40,13 +41,13 @@ public class JobScheduleHelper { try { TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 ); } catch (InterruptedException e) { - if (!toStop) { + if (!scheduleThreadToStop) { logger.error(e.getMessage(), e); } } logger.info(">>>>>>>>> init xxl-job admin scheduler success."); - while (!toStop) { + while (!scheduleThreadToStop) { // 扫描任务 long start = System.currentTimeMillis(); @@ -127,7 +128,7 @@ public class JobScheduleHelper { conn.commit(); } catch (Exception e) { - if (!toStop) { + if (!scheduleThreadToStop) { logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e); } } finally { @@ -152,7 +153,7 @@ public class JobScheduleHelper { TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000); } } catch (InterruptedException e) { - if (!toStop) { + if (!scheduleThreadToStop) { logger.error(e.getMessage(), e); } } @@ -175,13 +176,13 @@ public class JobScheduleHelper { try { TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000 ); } catch (InterruptedException e) { - if (!toStop) { + if (!ringThreadToStop) { logger.error(e.getMessage(), e); } } int lastSecond = -1; - while (!toStop) { + while (!ringThreadToStop) { try { // second data @@ -216,7 +217,7 @@ public class JobScheduleHelper { ringItemData.clear(); } } catch (Exception e) { - if (!toStop) { + if (!ringThreadToStop) { logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e); } } @@ -225,7 +226,7 @@ public class JobScheduleHelper { try { TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000); } catch (InterruptedException e) { - if (!toStop) { + if (!ringThreadToStop) { logger.error(e.getMessage(), e); } } @@ -239,23 +240,61 @@ public class JobScheduleHelper { } public void toStop(){ - toStop = true; - // interrupt and wait - scheduleThread.interrupt(); + // 1、stop schedule + scheduleThreadToStop = true; try { - scheduleThread.join(); + TimeUnit.SECONDS.sleep(1); // wait } catch (InterruptedException e) { logger.error(e.getMessage(), e); } + if (scheduleThread.getState() != Thread.State.TERMINATED){ + // interrupt and wait + scheduleThread.interrupt(); + try { + scheduleThread.join(); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); + } + } - // interrupt and wait - ringThread.interrupt(); + // if has ring data + boolean hasRingData = false; + if (!ringData.isEmpty()) { + for (int second : ringData.keySet()) { + List tmpData = ringData.get(second); + if (tmpData!=null && tmpData.size()>0) { + hasRingData = true; + break; + } + } + } + if (hasRingData) { + try { + TimeUnit.SECONDS.sleep(8); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); + } + } + + // stop ring (wait job-in-memory stop) + ringThreadToStop = true; try { - ringThread.join(); + TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { logger.error(e.getMessage(), e); } + if (ringThread.getState() != Thread.State.TERMINATED){ + // interrupt and wait + ringThread.interrupt(); + try { + ringThread.join(); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); + } + } + + logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper stop"); } }