任务调度模块,底层代码重构

This commit is contained in:
xuxueli 2018-09-23 01:22:50 +08:00
parent a75d82f983
commit e43791d4a1
12 changed files with 109 additions and 165 deletions

View File

@ -5,7 +5,7 @@ import com.xxl.job.core.biz.model.TriggerParam;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.ArrayList; import java.util.List;
/** /**
* Created by xuxueli on 17/3/10. * Created by xuxueli on 17/3/10.
@ -19,6 +19,6 @@ public abstract class ExecutorRouter {
* @param addressList * @param addressList
* @return ReturnT.content=address * @return ReturnT.content=address
*/ */
public abstract ReturnT<String> route(TriggerParam triggerParam, ArrayList<String> addressList); public abstract ReturnT<String> route(TriggerParam triggerParam, List<String> addressList);
} }

View File

@ -2,13 +2,12 @@ package com.xxl.job.admin.core.route.strategy;
import com.xxl.job.admin.core.route.ExecutorRouter; import com.xxl.job.admin.core.route.ExecutorRouter;
import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler; import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler;
import com.xxl.job.admin.core.trigger.XxlJobTrigger;
import com.xxl.job.admin.core.util.I18nUtil; import com.xxl.job.admin.core.util.I18nUtil;
import com.xxl.job.core.biz.ExecutorBiz; import com.xxl.job.core.biz.ExecutorBiz;
import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam; import com.xxl.job.core.biz.model.TriggerParam;
import java.util.ArrayList; import java.util.List;
/** /**
* Created by xuxueli on 17/3/10. * Created by xuxueli on 17/3/10.
@ -16,7 +15,7 @@ import java.util.ArrayList;
public class ExecutorRouteBusyover extends ExecutorRouter { public class ExecutorRouteBusyover extends ExecutorRouter {
@Override @Override
public ReturnT<String> route(TriggerParam triggerParam, ArrayList<String> addressList) { public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
StringBuffer idleBeatResultSB = new StringBuffer(); StringBuffer idleBeatResultSB = new StringBuffer();
for (String address : addressList) { for (String address : addressList) {
// beat // beat

View File

@ -1,14 +1,13 @@
package com.xxl.job.admin.core.route.strategy; package com.xxl.job.admin.core.route.strategy;
import com.xxl.job.admin.core.route.ExecutorRouter; import com.xxl.job.admin.core.route.ExecutorRouter;
import com.xxl.job.admin.core.trigger.XxlJobTrigger;
import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam; import com.xxl.job.core.biz.model.TriggerParam;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.security.MessageDigest; import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException; import java.security.NoSuchAlgorithmException;
import java.util.ArrayList; import java.util.List;
import java.util.SortedMap; import java.util.SortedMap;
import java.util.TreeMap; import java.util.TreeMap;
@ -57,7 +56,7 @@ public class ExecutorRouteConsistentHash extends ExecutorRouter {
return truncateHashCode; return truncateHashCode;
} }
public String hashJob(int jobId, ArrayList<String> addressList) { public String hashJob(int jobId, List<String> addressList) {
// ------A1------A2-------A3------ // ------A1------A2-------A3------
// -----------J1------------------ // -----------J1------------------
@ -78,7 +77,7 @@ public class ExecutorRouteConsistentHash extends ExecutorRouter {
} }
@Override @Override
public ReturnT<String> route(TriggerParam triggerParam, ArrayList<String> addressList) { public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
String address = hashJob(triggerParam.getJobId(), addressList); String address = hashJob(triggerParam.getJobId(), addressList);
return new ReturnT<String>(address); return new ReturnT<String>(address);
} }

View File

@ -2,13 +2,12 @@ package com.xxl.job.admin.core.route.strategy;
import com.xxl.job.admin.core.route.ExecutorRouter; import com.xxl.job.admin.core.route.ExecutorRouter;
import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler; import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler;
import com.xxl.job.admin.core.trigger.XxlJobTrigger;
import com.xxl.job.admin.core.util.I18nUtil; import com.xxl.job.admin.core.util.I18nUtil;
import com.xxl.job.core.biz.ExecutorBiz; import com.xxl.job.core.biz.ExecutorBiz;
import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam; import com.xxl.job.core.biz.model.TriggerParam;
import java.util.ArrayList; import java.util.List;
/** /**
* Created by xuxueli on 17/3/10. * Created by xuxueli on 17/3/10.
@ -16,7 +15,7 @@ import java.util.ArrayList;
public class ExecutorRouteFailover extends ExecutorRouter { public class ExecutorRouteFailover extends ExecutorRouter {
@Override @Override
public ReturnT<String> route(TriggerParam triggerParam, ArrayList<String> addressList) { public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
StringBuffer beatResultSB = new StringBuffer(); StringBuffer beatResultSB = new StringBuffer();
for (String address : addressList) { for (String address : addressList) {

View File

@ -1,11 +1,10 @@
package com.xxl.job.admin.core.route.strategy; package com.xxl.job.admin.core.route.strategy;
import com.xxl.job.admin.core.route.ExecutorRouter; import com.xxl.job.admin.core.route.ExecutorRouter;
import com.xxl.job.admin.core.trigger.XxlJobTrigger;
import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam; import com.xxl.job.core.biz.model.TriggerParam;
import java.util.ArrayList; import java.util.List;
/** /**
* Created by xuxueli on 17/3/10. * Created by xuxueli on 17/3/10.
@ -13,7 +12,7 @@ import java.util.ArrayList;
public class ExecutorRouteFirst extends ExecutorRouter { public class ExecutorRouteFirst extends ExecutorRouter {
@Override @Override
public ReturnT<String> route(TriggerParam triggerParam, ArrayList<String> addressList){ public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList){
return new ReturnT<String>(addressList.get(0)); return new ReturnT<String>(addressList.get(0));
} }

View File

@ -1,7 +1,6 @@
package com.xxl.job.admin.core.route.strategy; package com.xxl.job.admin.core.route.strategy;
import com.xxl.job.admin.core.route.ExecutorRouter; import com.xxl.job.admin.core.route.ExecutorRouter;
import com.xxl.job.admin.core.trigger.XxlJobTrigger;
import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam; import com.xxl.job.core.biz.model.TriggerParam;
@ -20,7 +19,7 @@ public class ExecutorRouteLFU extends ExecutorRouter {
private static ConcurrentHashMap<Integer, HashMap<String, Integer>> jobLfuMap = new ConcurrentHashMap<Integer, HashMap<String, Integer>>(); private static ConcurrentHashMap<Integer, HashMap<String, Integer>> jobLfuMap = new ConcurrentHashMap<Integer, HashMap<String, Integer>>();
private static long CACHE_VALID_TIME = 0; private static long CACHE_VALID_TIME = 0;
public String route(int jobId, ArrayList<String> addressList) { public String route(int jobId, List<String> addressList) {
// cache clear // cache clear
if (System.currentTimeMillis() > CACHE_VALID_TIME) { if (System.currentTimeMillis() > CACHE_VALID_TIME) {
@ -57,7 +56,7 @@ public class ExecutorRouteLFU extends ExecutorRouter {
} }
@Override @Override
public ReturnT<String> route(TriggerParam triggerParam, ArrayList<String> addressList) { public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
String address = route(triggerParam.getJobId(), addressList); String address = route(triggerParam.getJobId(), addressList);
return new ReturnT<String>(address); return new ReturnT<String>(address);
} }

View File

@ -1,12 +1,11 @@
package com.xxl.job.admin.core.route.strategy; package com.xxl.job.admin.core.route.strategy;
import com.xxl.job.admin.core.route.ExecutorRouter; import com.xxl.job.admin.core.route.ExecutorRouter;
import com.xxl.job.admin.core.trigger.XxlJobTrigger;
import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam; import com.xxl.job.core.biz.model.TriggerParam;
import java.util.ArrayList;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
/** /**
@ -21,7 +20,7 @@ public class ExecutorRouteLRU extends ExecutorRouter {
private static ConcurrentHashMap<Integer, LinkedHashMap<String, String>> jobLRUMap = new ConcurrentHashMap<Integer, LinkedHashMap<String, String>>(); private static ConcurrentHashMap<Integer, LinkedHashMap<String, String>> jobLRUMap = new ConcurrentHashMap<Integer, LinkedHashMap<String, String>>();
private static long CACHE_VALID_TIME = 0; private static long CACHE_VALID_TIME = 0;
public String route(int jobId, ArrayList<String> addressList) { public String route(int jobId, List<String> addressList) {
// cache clear // cache clear
if (System.currentTimeMillis() > CACHE_VALID_TIME) { if (System.currentTimeMillis() > CACHE_VALID_TIME) {
@ -55,7 +54,7 @@ public class ExecutorRouteLRU extends ExecutorRouter {
} }
@Override @Override
public ReturnT<String> route(TriggerParam triggerParam, ArrayList<String> addressList) { public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
String address = route(triggerParam.getJobId(), addressList); String address = route(triggerParam.getJobId(), addressList);
return new ReturnT<String>(address); return new ReturnT<String>(address);
} }

View File

@ -1,11 +1,10 @@
package com.xxl.job.admin.core.route.strategy; package com.xxl.job.admin.core.route.strategy;
import com.xxl.job.admin.core.route.ExecutorRouter; import com.xxl.job.admin.core.route.ExecutorRouter;
import com.xxl.job.admin.core.trigger.XxlJobTrigger;
import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam; import com.xxl.job.core.biz.model.TriggerParam;
import java.util.ArrayList; import java.util.List;
/** /**
* Created by xuxueli on 17/3/10. * Created by xuxueli on 17/3/10.
@ -13,7 +12,7 @@ import java.util.ArrayList;
public class ExecutorRouteLast extends ExecutorRouter { public class ExecutorRouteLast extends ExecutorRouter {
@Override @Override
public ReturnT<String> route(TriggerParam triggerParam, ArrayList<String> addressList) { public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
return new ReturnT<String>(addressList.get(addressList.size()-1)); return new ReturnT<String>(addressList.get(addressList.size()-1));
} }

View File

@ -1,11 +1,10 @@
package com.xxl.job.admin.core.route.strategy; package com.xxl.job.admin.core.route.strategy;
import com.xxl.job.admin.core.route.ExecutorRouter; import com.xxl.job.admin.core.route.ExecutorRouter;
import com.xxl.job.admin.core.trigger.XxlJobTrigger;
import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam; import com.xxl.job.core.biz.model.TriggerParam;
import java.util.ArrayList; import java.util.List;
import java.util.Random; import java.util.Random;
/** /**
@ -16,7 +15,7 @@ public class ExecutorRouteRandom extends ExecutorRouter {
private static Random localRandom = new Random(); private static Random localRandom = new Random();
@Override @Override
public ReturnT<String> route(TriggerParam triggerParam, ArrayList<String> addressList) { public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
String address = addressList.get(localRandom.nextInt(addressList.size())); String address = addressList.get(localRandom.nextInt(addressList.size()));
return new ReturnT<String>(address); return new ReturnT<String>(address);
} }

View File

@ -1,11 +1,10 @@
package com.xxl.job.admin.core.route.strategy; package com.xxl.job.admin.core.route.strategy;
import com.xxl.job.admin.core.route.ExecutorRouter; import com.xxl.job.admin.core.route.ExecutorRouter;
import com.xxl.job.admin.core.trigger.XxlJobTrigger;
import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam; import com.xxl.job.core.biz.model.TriggerParam;
import java.util.ArrayList; import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -31,7 +30,7 @@ public class ExecutorRouteRound extends ExecutorRouter {
} }
@Override @Override
public ReturnT<String> route(TriggerParam triggerParam, ArrayList<String> addressList) { public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
String address = addressList.get(count(triggerParam.getJobId())%addressList.size()); String address = addressList.get(count(triggerParam.getJobId())%addressList.size());
return new ReturnT<String>(address); return new ReturnT<String>(address);
} }

View File

@ -18,6 +18,7 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.List;
/** /**
* xxl-job trigger * xxl-job trigger
@ -36,57 +37,40 @@ public class XxlJobTrigger {
* *
*/ */
public static void trigger(int jobId, int failRetryCount, TriggerTypeEnum triggerType) { public static void trigger(int jobId, int failRetryCount, TriggerTypeEnum triggerType) {
// load data // load data
XxlJobInfo jobInfo = XxlJobDynamicScheduler.xxlJobInfoDao.loadById(jobId); // job info XxlJobInfo jobInfo = XxlJobDynamicScheduler.xxlJobInfoDao.loadById(jobId); // job info
if (jobInfo == null) { if (jobInfo == null) {
logger.warn(">>>>>>>>>>>> trigger fail, jobId invalidjobId={}", jobId); logger.warn(">>>>>>>>>>>> trigger fail, jobId invalidjobId={}", jobId);
return; return;
} }
int finalFailRetryCount = jobInfo.getExecutorFailRetryCount(); int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();
if (failRetryCount >= 0) {
finalFailRetryCount = failRetryCount;
}
XxlJobGroup group = XxlJobDynamicScheduler.xxlJobGroupDao.load(jobInfo.getJobGroup()); // group info XxlJobGroup group = XxlJobDynamicScheduler.xxlJobGroupDao.load(jobInfo.getJobGroup()); // group info
// process trigger
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null) && CollectionUtils.isNotEmpty(group.getRegistryList())) {
for (int i = 0; i < group.getRegistryList().size(); i++) {
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i);
}
} else {
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, 0);
}
}
private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index){
// param
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy
ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy
ArrayList<String> addressList = (ArrayList<String>) group.getRegistryList();
// broadcast
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum && CollectionUtils.isNotEmpty(addressList)) {
for (int i = 0; i < addressList.size(); i++) {
String address = addressList.get(i);
// 1save log-id // 1save log-id
XxlJobLog jobLog = new XxlJobLog(); XxlJobLog jobLog = new XxlJobLog();
jobLog.setJobGroup(jobInfo.getJobGroup()); jobLog.setJobGroup(jobInfo.getJobGroup());
jobLog.setJobId(jobInfo.getId()); jobLog.setJobId(jobInfo.getId());
jobLog.setTriggerTime(new Date());
XxlJobDynamicScheduler.xxlJobLogDao.save(jobLog); XxlJobDynamicScheduler.xxlJobLogDao.save(jobLog);
logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId()); logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());
// 2prepare trigger-info // 2init trigger-param
//jobLog.setExecutorAddress(executorAddress);
jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
jobLog.setExecutorParam(jobInfo.getExecutorParam());
jobLog.setExecutorFailRetryCount(finalFailRetryCount);
jobLog.setTriggerTime(new Date());
ReturnT<String> triggerResult = new ReturnT<String>(null);
StringBuffer triggerMsgSb = new StringBuffer();
triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append("").append(triggerType.getTitle());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append("").append(IpUtil.getIp());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append("")
.append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append("").append(group.getRegistryList());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append("").append(executorRouteStrategyEnum.getTitle()).append("("+i+"/"+addressList.size()+")"); // update01
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append("").append(blockStrategy.getTitle());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append("").append(jobInfo.getExecutorTimeout());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append("").append(finalFailRetryCount);
// 3.1trigger-param
TriggerParam triggerParam = new TriggerParam(); TriggerParam triggerParam = new TriggerParam();
triggerParam.setJobId(jobInfo.getId()); triggerParam.setJobId(jobInfo.getId());
triggerParam.setExecutorHandler(jobInfo.getExecutorHandler()); triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
@ -98,41 +82,34 @@ public class XxlJobTrigger {
triggerParam.setGlueType(jobInfo.getGlueType()); triggerParam.setGlueType(jobInfo.getGlueType());
triggerParam.setGlueSource(jobInfo.getGlueSource()); triggerParam.setGlueSource(jobInfo.getGlueSource());
triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime()); triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
triggerParam.setBroadcastIndex(i); triggerParam.setBroadcastIndex(index);
triggerParam.setBroadcastTotal(addressList.size()); // update02 triggerParam.setBroadcastTotal(group.getRegistryList()!=null?group.getRegistryList().size():0);
// 3.2trigger-run (route run / trigger remote executor)
triggerResult = runExecutor(triggerParam, address); // update03
triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>").append(triggerResult.getMsg());
// 4save trigger-info
jobLog.setExecutorAddress(triggerResult.getContent());
jobLog.setTriggerCode(triggerResult.getCode());
jobLog.setTriggerMsg(triggerMsgSb.toString());
XxlJobDynamicScheduler.xxlJobLogDao.updateTriggerInfo(jobLog);
// 5monitor trigger
JobFailMonitorHelper.monitor(jobLog.getId());
logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
// 3init address
String address = null;
ReturnT<String> routeAddressResult = null;
if (CollectionUtils.isNotEmpty(group.getRegistryList())) {
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
address = group.getRegistryList().get(index);
} else {
routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
address = routeAddressResult.getContent();
}
}
} else {
routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, "<br>----------------------<br>" + I18nUtil.getString("jobconf_trigger_address_empty"));
} }
// 4trigger remote executor
ReturnT<String> triggerResult = null;
if (address != null) {
triggerResult = runExecutor(triggerParam, address);
} else { } else {
// 1save log-id triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
XxlJobLog jobLog = new XxlJobLog(); }
jobLog.setJobGroup(jobInfo.getJobGroup());
jobLog.setJobId(jobInfo.getId());
XxlJobDynamicScheduler.xxlJobLogDao.save(jobLog);
logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());
// 2prepare trigger-info // 5collection trigger info
//jobLog.setExecutorAddress(executorAddress);
jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
jobLog.setExecutorParam(jobInfo.getExecutorParam());
jobLog.setExecutorFailRetryCount(finalFailRetryCount);
jobLog.setTriggerTime(new Date());
ReturnT<String> triggerResult = new ReturnT<String>(null);
StringBuffer triggerMsgSb = new StringBuffer(); StringBuffer triggerMsgSb = new StringBuffer();
triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append("").append(triggerType.getTitle()); triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append("").append(triggerType.getTitle());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append("").append(IpUtil.getIp()); triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append("").append(IpUtil.getIp());
@ -140,61 +117,36 @@ public class XxlJobTrigger {
.append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") ); .append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append("").append(group.getRegistryList()); triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append("").append(group.getRegistryList());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append("").append(executorRouteStrategyEnum.getTitle()); triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append("").append(executorRouteStrategyEnum.getTitle());
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)) {
triggerMsgSb.append("("+index+"/"+(group.getRegistryList()!=null?group.getRegistryList().size():0)+")");
}
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append("").append(blockStrategy.getTitle()); triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append("").append(blockStrategy.getTitle());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append("").append(jobInfo.getExecutorTimeout()); triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append("").append(jobInfo.getExecutorTimeout());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append("").append(finalFailRetryCount); triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append("").append(finalFailRetryCount);
// 3.0trigger-valid
String address = null;
if (CollectionUtils.isEmpty(addressList)) {
triggerResult.setCode(ReturnT.FAIL_CODE);
triggerMsgSb.append("<br>----------------------<br>").append(I18nUtil.getString("jobconf_trigger_address_empty"));
} else {
// 3.1trigger-param
TriggerParam triggerParam = new TriggerParam();
triggerParam.setJobId(jobInfo.getId());
triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
triggerParam.setExecutorParams(jobInfo.getExecutorParam());
triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
triggerParam.setLogId(jobLog.getId());
triggerParam.setLogDateTim(jobLog.getTriggerTime().getTime());
triggerParam.setGlueType(jobInfo.getGlueType());
triggerParam.setGlueSource(jobInfo.getGlueSource());
triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
triggerParam.setBroadcastIndex(0);
triggerParam.setBroadcastTotal(1);
// 3.2trigger-run (route run / trigger remote executor)
//triggerResult = executorRouteStrategyEnum.getRouter().routeRun(triggerParam, addressList);
ReturnT<String> routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, addressList);
if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
address = routeAddressResult.getContent();
triggerResult = runExecutor(triggerParam, address);
}
triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>") triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>")
.append(routeAddressResult.getMsg()!=null?routeAddressResult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():""); .append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():"");
} // 6save log trigger-info
// 4save trigger-info
jobLog.setExecutorAddress(address); jobLog.setExecutorAddress(address);
jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
jobLog.setExecutorParam(jobInfo.getExecutorParam());
jobLog.setExecutorFailRetryCount(finalFailRetryCount);
//jobLog.setTriggerTime();
jobLog.setTriggerCode(triggerResult.getCode()); jobLog.setTriggerCode(triggerResult.getCode());
jobLog.setTriggerMsg(triggerMsgSb.toString()); jobLog.setTriggerMsg(triggerMsgSb.toString());
XxlJobDynamicScheduler.xxlJobLogDao.updateTriggerInfo(jobLog); XxlJobDynamicScheduler.xxlJobLogDao.updateTriggerInfo(jobLog);
// 5monitor trigger // 7monitor trigger
JobFailMonitorHelper.monitor(jobLog.getId()); JobFailMonitorHelper.monitor(jobLog.getId());
logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId()); logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
} }
}
/** /**
* run executor * run executor
* @param triggerParam * @param triggerParam
* @param address * @param address
* @return ReturnT.content: final address * @return
*/ */
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){ public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
ReturnT<String> runResult = null; ReturnT<String> runResult = null;
@ -212,7 +164,6 @@ public class XxlJobTrigger {
runResultSB.append("<br>msg").append(runResult.getMsg()); runResultSB.append("<br>msg").append(runResult.getMsg());
runResult.setMsg(runResultSB.toString()); runResult.setMsg(runResultSB.toString());
runResult.setContent(address);
return runResult; return runResult;
} }

View File

@ -117,11 +117,13 @@
INSERT INTO XXL_JOB_QRTZ_TRIGGER_LOG ( INSERT INTO XXL_JOB_QRTZ_TRIGGER_LOG (
`job_group`, `job_group`,
`job_id`, `job_id`,
`trigger_time`,
`trigger_code`, `trigger_code`,
`handle_code` `handle_code`
) VALUES ( ) VALUES (
#{jobGroup}, #{jobGroup},
#{jobId}, #{jobId},
#{triggerTime},
#{triggerCode}, #{triggerCode},
#{handleCode} #{handleCode}
); );