diff --git a/doc/XXL-JOB官方文档.md b/doc/XXL-JOB官方文档.md index 56f33b3f..b763f20c 100644 --- a/doc/XXL-JOB官方文档.md +++ b/doc/XXL-JOB官方文档.md @@ -2063,10 +2063,9 @@ data: post-data - 17、执行器注册线程优化,修复极端情况下初始化失败时导致NPE问题; - 18、执行器Commandhandler示例任务优化,修复极端情况下脚本进程挂起问题; - 19、调度中心页面交互优化:用户管理模块密码列取消;多处表达autocomplete取消;执行器管理模块XSS拦截校验等; -- 19、[ING]任务触发参数优化:支持选择 "Cron触发"、"固定间隔时间触发"、"指定时间点触发"、"不选择" 等; -- 20、[ING]任务 misfire 策略:忽略、补偿一次、补偿最近10次……等; -- 21、[规划中]执行器注册,异步写入; -- 22、[规划中]默认开启访问令牌鉴权; +- 20、执行器注册组件优化:注册逻辑调整为异步方式,提高注册性能; +- 21、[ING]任务触发参数优化:支持选择 "Cron触发"、"固定间隔时间触发"、"指定时间点触发"、"不选择" 等; +- 22、[ING]任务 misfire 策略:忽略、补偿一次等; ### 7.32 版本 v2.3.0 Release Notes[规划中] diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java index aae99283..c05575ac 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java @@ -28,7 +28,7 @@ public class XxlJobScheduler { JobTriggerPoolHelper.toStart(); // admin registry monitor run - JobRegistryMonitorHelper.getInstance().start(); + JobRegistryHelper.getInstance().start(); // admin fail-monitor run JobFailMonitorHelper.getInstance().start(); @@ -61,7 +61,7 @@ public class XxlJobScheduler { JobFailMonitorHelper.getInstance().toStop(); // admin registry stop - JobRegistryMonitorHelper.getInstance().toStop(); + JobRegistryHelper.getInstance().toStop(); // admin trigger pool stop JobTriggerPoolHelper.toStop(); diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java new file mode 100644 index 00000000..37edfd98 --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java @@ -0,0 +1,204 @@ +package com.xxl.job.admin.core.thread; + +import com.xxl.job.admin.core.conf.XxlJobAdminConfig; +import com.xxl.job.admin.core.model.XxlJobGroup; +import com.xxl.job.admin.core.model.XxlJobRegistry; +import com.xxl.job.core.biz.model.RegistryParam; +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.enums.RegistryConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.util.StringUtils; + +import java.util.*; +import java.util.concurrent.*; + +/** + * job registry instance + * @author xuxueli 2016-10-02 19:10:24 + */ +public class JobRegistryHelper { + private static Logger logger = LoggerFactory.getLogger(JobRegistryHelper.class); + + private static JobRegistryHelper instance = new JobRegistryHelper(); + public static JobRegistryHelper getInstance(){ + return instance; + } + + private ThreadPoolExecutor registryOrRemoveThreadPool = null; + private Thread registryMonitorThread; + private volatile boolean toStop = false; + + public void start(){ + + // for registry or remove + registryOrRemoveThreadPool = new ThreadPoolExecutor( + 2, + 10, + 30L, + TimeUnit.SECONDS, + new LinkedBlockingQueue(2000), + new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool-" + r.hashCode()); + } + }, + new RejectedExecutionHandler() { + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + r.run(); + logger.warn(">>>>>>>>>>> xxl-job, registry or remove too fast, match threadpool rejected handler(run now)."); + } + }); + + // for monitor + registryMonitorThread = new Thread(new Runnable() { + @Override + public void run() { + while (!toStop) { + try { + // auto registry group + List groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0); + if (groupList!=null && !groupList.isEmpty()) { + + // remove dead address (admin/executor) + List ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date()); + if (ids!=null && ids.size()>0) { + XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids); + } + + // fresh online address (admin/executor) + HashMap> appAddressMap = new HashMap>(); + List list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date()); + if (list != null) { + for (XxlJobRegistry item: list) { + if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) { + String appname = item.getRegistryKey(); + List registryList = appAddressMap.get(appname); + if (registryList == null) { + registryList = new ArrayList(); + } + + if (!registryList.contains(item.getRegistryValue())) { + registryList.add(item.getRegistryValue()); + } + appAddressMap.put(appname, registryList); + } + } + } + + // fresh group address + for (XxlJobGroup group: groupList) { + List registryList = appAddressMap.get(group.getAppname()); + String addressListStr = null; + if (registryList!=null && !registryList.isEmpty()) { + Collections.sort(registryList); + StringBuilder addressListSB = new StringBuilder(); + for (String item:registryList) { + addressListSB.append(item).append(","); + } + addressListStr = addressListSB.toString(); + addressListStr = addressListStr.substring(0, addressListStr.length()-1); + } + group.setAddressList(addressListStr); + group.setUpdateTime(new Date()); + + XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group); + } + } + } catch (Exception e) { + if (!toStop) { + logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e); + } + } + try { + TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT); + } catch (InterruptedException e) { + if (!toStop) { + logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e); + } + } + } + logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop"); + } + }); + registryMonitorThread.setDaemon(true); + registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread"); + registryMonitorThread.start(); + } + + public void toStop(){ + toStop = true; + + // stop registryOrRemoveThreadPool + registryOrRemoveThreadPool.shutdownNow(); + + // stop monitir (interrupt and wait) + registryMonitorThread.interrupt(); + try { + registryMonitorThread.join(); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); + } + } + + + // ---------------------- helper ---------------------- + + public ReturnT registry(RegistryParam registryParam) { + + // valid + if (!StringUtils.hasText(registryParam.getRegistryGroup()) + || !StringUtils.hasText(registryParam.getRegistryKey()) + || !StringUtils.hasText(registryParam.getRegistryValue())) { + return new ReturnT(ReturnT.FAIL_CODE, "Illegal Argument."); + } + + // async execute + registryOrRemoveThreadPool.execute(new Runnable() { + @Override + public void run() { + int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date()); + if (ret < 1) { + XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date()); + + // fresh + freshGroupRegistryInfo(registryParam); + } + } + }); + + return ReturnT.SUCCESS; + } + + public ReturnT registryRemove(RegistryParam registryParam) { + + // valid + if (!StringUtils.hasText(registryParam.getRegistryGroup()) + || !StringUtils.hasText(registryParam.getRegistryKey()) + || !StringUtils.hasText(registryParam.getRegistryValue())) { + return new ReturnT(ReturnT.FAIL_CODE, "Illegal Argument."); + } + + // async execute + registryOrRemoveThreadPool.execute(new Runnable() { + @Override + public void run() { + int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryDelete(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue()); + if (ret > 0) { + // fresh + freshGroupRegistryInfo(registryParam); + } + } + }); + + return ReturnT.SUCCESS; + } + + private void freshGroupRegistryInfo(RegistryParam registryParam){ + // Under consideration, prevent affecting core tables + } + + +} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryMonitorHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryMonitorHelper.java deleted file mode 100644 index 11d85c52..00000000 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryMonitorHelper.java +++ /dev/null @@ -1,114 +0,0 @@ -package com.xxl.job.admin.core.thread; - -import com.xxl.job.admin.core.conf.XxlJobAdminConfig; -import com.xxl.job.admin.core.model.XxlJobGroup; -import com.xxl.job.admin.core.model.XxlJobRegistry; -import com.xxl.job.core.enums.RegistryConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.*; -import java.util.concurrent.TimeUnit; - -/** - * job registry instance - * @author xuxueli 2016-10-02 19:10:24 - */ -public class JobRegistryMonitorHelper { - private static Logger logger = LoggerFactory.getLogger(JobRegistryMonitorHelper.class); - - private static JobRegistryMonitorHelper instance = new JobRegistryMonitorHelper(); - public static JobRegistryMonitorHelper getInstance(){ - return instance; - } - - private Thread registryThread; - private volatile boolean toStop = false; - public void start(){ - registryThread = new Thread(new Runnable() { - @Override - public void run() { - while (!toStop) { - try { - // auto registry group - List groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0); - if (groupList!=null && !groupList.isEmpty()) { - - // remove dead address (admin/executor) - List ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date()); - if (ids!=null && ids.size()>0) { - XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids); - } - - // fresh online address (admin/executor) - HashMap> appAddressMap = new HashMap>(); - List list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date()); - if (list != null) { - for (XxlJobRegistry item: list) { - if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) { - String appname = item.getRegistryKey(); - List registryList = appAddressMap.get(appname); - if (registryList == null) { - registryList = new ArrayList(); - } - - if (!registryList.contains(item.getRegistryValue())) { - registryList.add(item.getRegistryValue()); - } - appAddressMap.put(appname, registryList); - } - } - } - - // fresh group address - for (XxlJobGroup group: groupList) { - List registryList = appAddressMap.get(group.getAppname()); - String addressListStr = null; - if (registryList!=null && !registryList.isEmpty()) { - Collections.sort(registryList); - StringBuilder addressListSB = new StringBuilder(); - for (String item:registryList) { - addressListSB.append(item).append(","); - } - addressListStr = addressListSB.toString(); - addressListStr = addressListStr.substring(0, addressListStr.length()-1); - } - group.setAddressList(addressListStr); - group.setUpdateTime(new Date()); - - XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group); - } - } - } catch (Exception e) { - if (!toStop) { - logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e); - } - } - try { - TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT); - } catch (InterruptedException e) { - if (!toStop) { - logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e); - } - } - } - logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop"); - } - }); - registryThread.setDaemon(true); - registryThread.setName("xxl-job, admin JobRegistryMonitorHelper"); - registryThread.start(); - } - - public void toStop(){ - toStop = true; - // interrupt and wait - registryThread.interrupt(); - try { - registryThread.join(); - } catch (InterruptedException e) { - logger.error(e.getMessage(), e); - } - } - -} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AdminBizImpl.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AdminBizImpl.java index 51fbff92..670897c8 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AdminBizImpl.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AdminBizImpl.java @@ -2,6 +2,7 @@ package com.xxl.job.admin.service.impl; import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.model.XxlJobLog; +import com.xxl.job.admin.core.thread.JobRegistryHelper; import com.xxl.job.admin.core.thread.JobTriggerPoolHelper; import com.xxl.job.admin.core.trigger.TriggerTypeEnum; import com.xxl.job.admin.core.util.I18nUtil; @@ -17,7 +18,6 @@ import com.xxl.job.core.handler.IJobHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; -import org.springframework.util.StringUtils; import javax.annotation.Resource; import java.text.MessageFormat; @@ -131,45 +131,12 @@ public class AdminBizImpl implements AdminBiz { @Override public ReturnT registry(RegistryParam registryParam) { - - // valid - if (!StringUtils.hasText(registryParam.getRegistryGroup()) - || !StringUtils.hasText(registryParam.getRegistryKey()) - || !StringUtils.hasText(registryParam.getRegistryValue())) { - return new ReturnT(ReturnT.FAIL_CODE, "Illegal Argument."); - } - - int ret = xxlJobRegistryDao.registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date()); - if (ret < 1) { - xxlJobRegistryDao.registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date()); - - // fresh - freshGroupRegistryInfo(registryParam); - } - return ReturnT.SUCCESS; + return JobRegistryHelper.getInstance().registry(registryParam); } @Override public ReturnT registryRemove(RegistryParam registryParam) { - - // valid - if (!StringUtils.hasText(registryParam.getRegistryGroup()) - || !StringUtils.hasText(registryParam.getRegistryKey()) - || !StringUtils.hasText(registryParam.getRegistryValue())) { - return new ReturnT(ReturnT.FAIL_CODE, "Illegal Argument."); - } - - int ret = xxlJobRegistryDao.registryDelete(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue()); - if (ret > 0) { - - // fresh - freshGroupRegistryInfo(registryParam); - } - return ReturnT.SUCCESS; - } - - private void freshGroupRegistryInfo(RegistryParam registryParam){ - // Under consideration, prevent affecting core tables + return JobRegistryHelper.getInstance().registryRemove(registryParam); } }