diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java index 40ea8127..dd6d3754 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java @@ -1,19 +1,6 @@ package com.xxl.job.admin.core.jobbean; -import com.xxl.job.admin.core.enums.ExecutorFailStrategyEnum; -import com.xxl.job.admin.core.model.XxlJobGroup; -import com.xxl.job.admin.core.model.XxlJobInfo; -import com.xxl.job.admin.core.model.XxlJobLog; -import com.xxl.job.admin.core.route.ExecutorRouteStrategyEnum; -import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler; -import com.xxl.job.admin.core.thread.JobFailMonitorHelper; -import com.xxl.job.admin.core.thread.JobRegistryMonitorHelper; -import com.xxl.job.core.biz.model.ReturnT; -import com.xxl.job.core.biz.model.TriggerParam; -import com.xxl.job.core.enums.ExecutorBlockStrategyEnum; -import com.xxl.job.core.enums.RegistryConfig; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang.StringUtils; +import com.xxl.job.admin.core.trigger.XxlJobTrigger; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.quartz.JobKey; @@ -21,10 +8,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.quartz.QuartzJobBean; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Date; - /** * http job bean * “@DisallowConcurrentExecution” diable concurrent, thread size can not be only one, better given more @@ -38,96 +21,12 @@ public class RemoteHttpJobBean extends QuartzJobBean { protected void executeInternal(JobExecutionContext context) throws JobExecutionException { - // load job + // load jobId JobKey jobKey = context.getTrigger().getJobKey(); Integer jobId = Integer.valueOf(jobKey.getName()); - XxlJobInfo jobInfo = XxlJobDynamicScheduler.xxlJobInfoDao.loadById(jobId); - - // log part-1 - XxlJobLog jobLog = new XxlJobLog(); - jobLog.setJobGroup(jobInfo.getJobGroup()); - jobLog.setJobId(jobInfo.getId()); - XxlJobDynamicScheduler.xxlJobLogDao.save(jobLog); - logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId()); - - // log part-2 param - //jobLog.setExecutorAddress(executorAddress); - jobLog.setGlueType(jobInfo.getGlueType()); - jobLog.setExecutorHandler(jobInfo.getExecutorHandler()); - jobLog.setExecutorParam(jobInfo.getExecutorParam()); - jobLog.setTriggerTime(new Date()); - - // trigger request - TriggerParam triggerParam = new TriggerParam(); - triggerParam.setJobId(jobInfo.getId()); - triggerParam.setExecutorHandler(jobInfo.getExecutorHandler()); - triggerParam.setExecutorParams(jobInfo.getExecutorParam()); - triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy()); - triggerParam.setGlueType(jobInfo.getGlueType()); - triggerParam.setGlueSource(jobInfo.getGlueSource()); - triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime()); - triggerParam.setLogId(jobLog.getId()); - triggerParam.setLogDateTim(jobLog.getTriggerTime().getTime()); - - // do trigger - ReturnT triggerResult = doTrigger(triggerParam, jobInfo, jobLog); - - // fail retry - if (triggerResult.getCode()==ReturnT.FAIL_CODE && - ExecutorFailStrategyEnum.match(jobInfo.getExecutorFailStrategy(), null) == ExecutorFailStrategyEnum.FAIL_RETRY) { - ReturnT retryTriggerResult = doTrigger(triggerParam, jobInfo, jobLog); - - triggerResult.setCode(retryTriggerResult.getCode()); - triggerResult.setMsg(triggerResult.getMsg() + "

>>>>>>>>>>>失败重试<<<<<<<<<<<

" +retryTriggerResult.getMsg()); - } - - // log part-2 - jobLog.setTriggerCode(triggerResult.getCode()); - jobLog.setTriggerMsg(triggerResult.getMsg()); - XxlJobDynamicScheduler.xxlJobLogDao.updateTriggerInfo(jobLog); - - // monitor triger - JobFailMonitorHelper.monitor(jobLog.getId()); - logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId()); - } - - public ReturnT doTrigger(TriggerParam triggerParam, XxlJobInfo jobInfo, XxlJobLog jobLog){ - StringBuffer triggerSb = new StringBuffer(); - - // exerutor address list - ArrayList addressList = null; - XxlJobGroup group = XxlJobDynamicScheduler.xxlJobGroupDao.load(jobInfo.getJobGroup()); - if (group.getAddressType() == 0) { - triggerSb.append("注册方式:自动注册"); - addressList = (ArrayList) JobRegistryMonitorHelper.discover(RegistryConfig.RegistType.EXECUTOR.name(), group.getAppName()); - } else { - triggerSb.append("注册方式:手动录入"); - if (StringUtils.isNotBlank(group.getAddressList())) { - addressList = new ArrayList(Arrays.asList(group.getAddressList().split(","))); - } - } - triggerSb.append("
阻塞处理策略:").append(ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION).getTitle()); - triggerSb.append("
失败处理策略:").append(ExecutorFailStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorFailStrategyEnum.FAIL_ALARM).getTitle()); - triggerSb.append("
地址列表:").append(addressList!=null?addressList.toString():""); - if (CollectionUtils.isEmpty(addressList)) { - triggerSb.append("
----------------------
").append("调度失败:").append("执行器地址为空"); - return new ReturnT(ReturnT.FAIL_CODE, triggerSb.toString()); - } - - // executor route strategy - ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); - if (executorRouteStrategyEnum == null) { - triggerSb.append("
----------------------
").append("调度失败:").append("执行器路由策略为空"); - return new ReturnT(ReturnT.FAIL_CODE, triggerSb.toString()); - } - triggerSb.append("
路由策略:").append(executorRouteStrategyEnum.name() + "-" + executorRouteStrategyEnum.getTitle()); - - - // route run / trigger remote executor - ReturnT routeRunResult = executorRouteStrategyEnum.getRouter().routeRun(triggerParam, addressList, jobLog); - triggerSb.append("
----------------------
").append(routeRunResult.getMsg()); - return new ReturnT(routeRunResult.getCode(), triggerSb.toString()); + // trigger + XxlJobTrigger.trigger(jobId); } } \ No newline at end of file diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java new file mode 100644 index 00000000..f50dc5e3 --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java @@ -0,0 +1,128 @@ +package com.xxl.job.admin.core.trigger; + +import com.xxl.job.admin.core.enums.ExecutorFailStrategyEnum; +import com.xxl.job.admin.core.model.XxlJobGroup; +import com.xxl.job.admin.core.model.XxlJobInfo; +import com.xxl.job.admin.core.model.XxlJobLog; +import com.xxl.job.admin.core.route.ExecutorRouteStrategyEnum; +import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler; +import com.xxl.job.admin.core.thread.JobFailMonitorHelper; +import com.xxl.job.admin.core.thread.JobRegistryMonitorHelper; +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.biz.model.TriggerParam; +import com.xxl.job.core.enums.ExecutorBlockStrategyEnum; +import com.xxl.job.core.enums.RegistryConfig; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; + +/** + * xxl-job trigger + * Created by xuxueli on 17/7/13. + */ +public class XxlJobTrigger { + private static Logger logger = LoggerFactory.getLogger(XxlJobTrigger.class); + + /** + * trigger job + * + * @param jobId + */ + public static void trigger(int jobId) { + + // load job + XxlJobInfo jobInfo = XxlJobDynamicScheduler.xxlJobInfoDao.loadById(jobId); + + // log part-1 + XxlJobLog jobLog = new XxlJobLog(); + jobLog.setJobGroup(jobInfo.getJobGroup()); + jobLog.setJobId(jobInfo.getId()); + XxlJobDynamicScheduler.xxlJobLogDao.save(jobLog); + logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId()); + + // log part-2 param + //jobLog.setExecutorAddress(executorAddress); + jobLog.setGlueType(jobInfo.getGlueType()); + jobLog.setExecutorHandler(jobInfo.getExecutorHandler()); + jobLog.setExecutorParam(jobInfo.getExecutorParam()); + jobLog.setTriggerTime(new Date()); + + // trigger request + TriggerParam triggerParam = new TriggerParam(); + triggerParam.setJobId(jobInfo.getId()); + triggerParam.setExecutorHandler(jobInfo.getExecutorHandler()); + triggerParam.setExecutorParams(jobInfo.getExecutorParam()); + triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy()); + triggerParam.setGlueType(jobInfo.getGlueType()); + triggerParam.setGlueSource(jobInfo.getGlueSource()); + triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime()); + triggerParam.setLogId(jobLog.getId()); + triggerParam.setLogDateTim(jobLog.getTriggerTime().getTime()); + + // do trigger + ReturnT triggerResult = doTrigger(triggerParam, jobInfo, jobLog); + + // fail retry + if (triggerResult.getCode()==ReturnT.FAIL_CODE && + ExecutorFailStrategyEnum.match(jobInfo.getExecutorFailStrategy(), null) == ExecutorFailStrategyEnum.FAIL_RETRY) { + ReturnT retryTriggerResult = doTrigger(triggerParam, jobInfo, jobLog); + + triggerResult.setCode(retryTriggerResult.getCode()); + triggerResult.setMsg(triggerResult.getMsg() + "

>>>>>>>>>>>失败重试<<<<<<<<<<<

" +retryTriggerResult.getMsg()); + } + + // log part-2 + jobLog.setTriggerCode(triggerResult.getCode()); + jobLog.setTriggerMsg(triggerResult.getMsg()); + XxlJobDynamicScheduler.xxlJobLogDao.updateTriggerInfo(jobLog); + + // monitor triger + JobFailMonitorHelper.monitor(jobLog.getId()); + logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId()); + } + + private static ReturnT doTrigger(TriggerParam triggerParam, XxlJobInfo jobInfo, XxlJobLog jobLog){ + StringBuffer triggerSb = new StringBuffer(); + + // exerutor address list + ArrayList addressList = null; + XxlJobGroup group = XxlJobDynamicScheduler.xxlJobGroupDao.load(jobInfo.getJobGroup()); + if (group.getAddressType() == 0) { + triggerSb.append("注册方式:自动注册"); + addressList = (ArrayList) JobRegistryMonitorHelper.discover(RegistryConfig.RegistType.EXECUTOR.name(), group.getAppName()); + } else { + triggerSb.append("注册方式:手动录入"); + if (StringUtils.isNotBlank(group.getAddressList())) { + addressList = new ArrayList(Arrays.asList(group.getAddressList().split(","))); + } + } + triggerSb.append("
阻塞处理策略:").append(ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION).getTitle()); + triggerSb.append("
失败处理策略:").append(ExecutorFailStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorFailStrategyEnum.FAIL_ALARM).getTitle()); + triggerSb.append("
地址列表:").append(addressList!=null?addressList.toString():""); + if (CollectionUtils.isEmpty(addressList)) { + triggerSb.append("
----------------------
").append("调度失败:").append("执行器地址为空"); + return new ReturnT(ReturnT.FAIL_CODE, triggerSb.toString()); + } + + // executor route strategy + ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); + if (executorRouteStrategyEnum == null) { + triggerSb.append("
----------------------
").append("调度失败:").append("执行器路由策略为空"); + return new ReturnT(ReturnT.FAIL_CODE, triggerSb.toString()); + } + triggerSb.append("
路由策略:").append(executorRouteStrategyEnum.name() + "-" + executorRouteStrategyEnum.getTitle()); + + + // route run / trigger remote executor + ReturnT routeRunResult = executorRouteStrategyEnum.getRouter().routeRun(triggerParam, addressList, jobLog); + triggerSb.append("
----------------------
").append(routeRunResult.getMsg()); + return new ReturnT(routeRunResult.getCode(), triggerSb.toString()); + + } + +}