广播分片任务
This commit is contained in:
parent
f5f98bc042
commit
0c03f6348c
|
@ -306,7 +306,7 @@ XXL-JOB是一个轻量级分布式任务调度框架,其核心设计目标是
|
||||||
|
|
||||||
#### 步骤一:执行器项目中,开发JobHandler:
|
#### 步骤一:执行器项目中,开发JobHandler:
|
||||||
- 1、 新建一个继承com.xxl.job.core.handler.IJobHandler的Java类;
|
- 1、 新建一个继承com.xxl.job.core.handler.IJobHandler的Java类;
|
||||||
- 2、 该类被Spring容器扫描为Bean实例,如加“@Service注解”;
|
- 2、 该类被Spring容器扫描为Bean实例,如加“@Component”注解;
|
||||||
- 3、 添加 “@JobHander(value="自定义jobhandler名称")”注解,注解的value值为自定义的JobHandler名称,该名称对应的是调度中心新建任务的JobHandler属性的值。
|
- 3、 添加 “@JobHander(value="自定义jobhandler名称")”注解,注解的value值为自定义的JobHandler名称,该名称对应的是调度中心新建任务的JobHandler属性的值。
|
||||||
(可参考xxl-job-executor-example项目中的DemoJobHandler,见下图)
|
(可参考xxl-job-executor-example项目中的DemoJobHandler,见下图)
|
||||||
|
|
||||||
|
@ -889,6 +889,7 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段
|
||||||
|
|
||||||
#### 6.18 版本 V1.8.1 特性[快照版本]
|
#### 6.18 版本 V1.8.1 特性[快照版本]
|
||||||
- 1、任务分片:一个任务被拆分成N个独立的任务单元,然后由分布式部署的执行器分别执行某一个或几个分片单元;
|
- 1、任务分片:一个任务被拆分成N个独立的任务单元,然后由分布式部署的执行器分别执行某一个或几个分片单元;
|
||||||
|
- 2、执行器JobHandler禁止命名冲突;
|
||||||
|
|
||||||
#### TODO LIST
|
#### TODO LIST
|
||||||
- 1、任务权限管理:执行器为粒度分配权限,核心操作校验权限;
|
- 1、任务权限管理:执行器为粒度分配权限,核心操作校验权限;
|
||||||
|
|
|
@ -15,7 +15,8 @@ public enum ExecutorRouteStrategyEnum {
|
||||||
LEAST_FREQUENTLY_USED("最不经常使用", new ExecutorRouteLFU()),
|
LEAST_FREQUENTLY_USED("最不经常使用", new ExecutorRouteLFU()),
|
||||||
LEAST_RECENTLY_USED("最近最久未使用", new ExecutorRouteLRU()),
|
LEAST_RECENTLY_USED("最近最久未使用", new ExecutorRouteLRU()),
|
||||||
FAILOVER("故障转移", new ExecutorRouteFailover()),
|
FAILOVER("故障转移", new ExecutorRouteFailover()),
|
||||||
BUSYOVER("忙碌转移", new ExecutorRouteBusyover());
|
BUSYOVER("忙碌转移", new ExecutorRouteBusyover()),
|
||||||
|
BROADCAST("广播", null);
|
||||||
|
|
||||||
ExecutorRouteStrategyEnum(String title, ExecutorRouter router) {
|
ExecutorRouteStrategyEnum(String title, ExecutorRouter router) {
|
||||||
this.title = title;
|
this.title = title;
|
||||||
|
|
|
@ -28,7 +28,7 @@ public abstract class ExecutorRouter {
|
||||||
* run executor
|
* run executor
|
||||||
* @param triggerParam
|
* @param triggerParam
|
||||||
* @param address
|
* @param address
|
||||||
* @return
|
* @return ReturnT.content: final address
|
||||||
*/
|
*/
|
||||||
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
|
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
|
||||||
ReturnT<String> runResult = null;
|
ReturnT<String> runResult = null;
|
||||||
|
@ -46,6 +46,7 @@ public abstract class ExecutorRouter {
|
||||||
runResultSB.append("<br>msg:").append(runResult.getMsg());
|
runResultSB.append("<br>msg:").append(runResult.getMsg());
|
||||||
|
|
||||||
runResult.setMsg(runResultSB.toString());
|
runResult.setMsg(runResultSB.toString());
|
||||||
|
runResult.setContent(address);
|
||||||
return runResult;
|
return runResult;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5,6 +5,7 @@ import com.xxl.job.admin.core.model.XxlJobGroup;
|
||||||
import com.xxl.job.admin.core.model.XxlJobInfo;
|
import com.xxl.job.admin.core.model.XxlJobInfo;
|
||||||
import com.xxl.job.admin.core.model.XxlJobLog;
|
import com.xxl.job.admin.core.model.XxlJobLog;
|
||||||
import com.xxl.job.admin.core.route.ExecutorRouteStrategyEnum;
|
import com.xxl.job.admin.core.route.ExecutorRouteStrategyEnum;
|
||||||
|
import com.xxl.job.admin.core.route.ExecutorRouter;
|
||||||
import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler;
|
import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler;
|
||||||
import com.xxl.job.admin.core.thread.JobFailMonitorHelper;
|
import com.xxl.job.admin.core.thread.JobFailMonitorHelper;
|
||||||
import com.xxl.job.core.biz.model.ReturnT;
|
import com.xxl.job.core.biz.model.ReturnT;
|
||||||
|
@ -40,6 +41,79 @@ public class XxlJobTrigger {
|
||||||
ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy
|
ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy
|
||||||
ArrayList<String> addressList = (ArrayList<String>) group.getRegistryList();
|
ArrayList<String> addressList = (ArrayList<String>) group.getRegistryList();
|
||||||
|
|
||||||
|
// broadcast
|
||||||
|
if (ExecutorRouteStrategyEnum.BROADCAST == executorRouteStrategyEnum && CollectionUtils.isNotEmpty(addressList)) {
|
||||||
|
for (int i = 0; i < addressList.size(); i++) {
|
||||||
|
String address = addressList.get(i);
|
||||||
|
|
||||||
|
// 1、save log-id
|
||||||
|
XxlJobLog jobLog = new XxlJobLog();
|
||||||
|
jobLog.setJobGroup(jobInfo.getJobGroup());
|
||||||
|
jobLog.setJobId(jobInfo.getId());
|
||||||
|
XxlJobDynamicScheduler.xxlJobLogDao.save(jobLog);
|
||||||
|
logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());
|
||||||
|
|
||||||
|
// 2、prepare trigger-info
|
||||||
|
//jobLog.setExecutorAddress(executorAddress);
|
||||||
|
jobLog.setGlueType(jobInfo.getGlueType());
|
||||||
|
jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
|
||||||
|
jobLog.setExecutorParam(jobInfo.getExecutorParam());
|
||||||
|
jobLog.setTriggerTime(new Date());
|
||||||
|
|
||||||
|
ReturnT<String> triggerResult = new ReturnT<String>(null);
|
||||||
|
StringBuffer triggerMsgSb = new StringBuffer();
|
||||||
|
triggerMsgSb.append("注册方式:").append( (group.getAddressType() == 0)?"自动注册":"手动录入" );
|
||||||
|
triggerMsgSb.append("<br>阻塞处理策略:").append(blockStrategy.getTitle());
|
||||||
|
triggerMsgSb.append("<br>失败处理策略:").append(failStrategy.getTitle());
|
||||||
|
triggerMsgSb.append("<br>地址列表:").append(group.getRegistryList());
|
||||||
|
triggerMsgSb.append("<br>路由策略:").append(executorRouteStrategyEnum.getTitle()).append("("+i+"/"+addressList.size()+")"); // update01
|
||||||
|
|
||||||
|
// 3、trigger-valid
|
||||||
|
if (triggerResult.getCode()==ReturnT.SUCCESS_CODE && CollectionUtils.isEmpty(addressList)) {
|
||||||
|
triggerResult.setCode(ReturnT.FAIL_CODE);
|
||||||
|
triggerMsgSb.append("<br>----------------------<br>").append("调度失败:").append("执行器地址为空");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (triggerResult.getCode() == ReturnT.SUCCESS_CODE) {
|
||||||
|
// 4.1、trigger-param
|
||||||
|
TriggerParam triggerParam = new TriggerParam();
|
||||||
|
triggerParam.setJobId(jobInfo.getId());
|
||||||
|
triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
|
||||||
|
triggerParam.setExecutorParams(jobInfo.getExecutorParam());
|
||||||
|
triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
|
||||||
|
triggerParam.setLogId(jobLog.getId());
|
||||||
|
triggerParam.setLogDateTim(jobLog.getTriggerTime().getTime());
|
||||||
|
triggerParam.setGlueType(jobInfo.getGlueType());
|
||||||
|
triggerParam.setGlueSource(jobInfo.getGlueSource());
|
||||||
|
triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
|
||||||
|
triggerParam.setBroadcastIndex(i);
|
||||||
|
triggerParam.setBroadcastTotal(addressList.size()); // update02
|
||||||
|
|
||||||
|
// 4.2、trigger-run (route run / trigger remote executor)
|
||||||
|
triggerResult = ExecutorRouter.runExecutor(triggerParam, address); // update03
|
||||||
|
triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>触发调度<<<<<<<<<<< </span><br>").append(triggerResult.getMsg());
|
||||||
|
|
||||||
|
// 4.3、trigger (fail retry)
|
||||||
|
if (triggerResult.getCode()!=ReturnT.SUCCESS_CODE && failStrategy == ExecutorFailStrategyEnum.FAIL_RETRY) {
|
||||||
|
triggerResult = ExecutorRouter.runExecutor(triggerParam, address); // update04
|
||||||
|
triggerMsgSb.append("<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>失败重试<<<<<<<<<<< </span><br>").append(triggerResult.getMsg());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 5、save trigger-info
|
||||||
|
jobLog.setExecutorAddress(triggerResult.getContent());
|
||||||
|
jobLog.setTriggerCode(triggerResult.getCode());
|
||||||
|
jobLog.setTriggerMsg(triggerMsgSb.toString());
|
||||||
|
XxlJobDynamicScheduler.xxlJobLogDao.updateTriggerInfo(jobLog);
|
||||||
|
|
||||||
|
// 6、monitor triger
|
||||||
|
JobFailMonitorHelper.monitor(jobLog.getId());
|
||||||
|
logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
|
||||||
|
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// 1、save log-id
|
// 1、save log-id
|
||||||
XxlJobLog jobLog = new XxlJobLog();
|
XxlJobLog jobLog = new XxlJobLog();
|
||||||
jobLog.setJobGroup(jobInfo.getJobGroup());
|
jobLog.setJobGroup(jobInfo.getJobGroup());
|
||||||
|
@ -67,10 +141,6 @@ public class XxlJobTrigger {
|
||||||
triggerResult.setCode(ReturnT.FAIL_CODE);
|
triggerResult.setCode(ReturnT.FAIL_CODE);
|
||||||
triggerMsgSb.append("<br>----------------------<br>").append("调度失败:").append("执行器地址为空");
|
triggerMsgSb.append("<br>----------------------<br>").append("调度失败:").append("执行器地址为空");
|
||||||
}
|
}
|
||||||
if (triggerResult.getCode() == ReturnT.SUCCESS_CODE && executorRouteStrategyEnum == null) {
|
|
||||||
triggerResult.setCode(ReturnT.FAIL_CODE);
|
|
||||||
triggerMsgSb.append("<br>----------------------<br>").append("调度失败:").append("执行器路由策略为空");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (triggerResult.getCode() == ReturnT.SUCCESS_CODE) {
|
if (triggerResult.getCode() == ReturnT.SUCCESS_CODE) {
|
||||||
// 4.1、trigger-param
|
// 4.1、trigger-param
|
||||||
|
@ -79,11 +149,13 @@ public class XxlJobTrigger {
|
||||||
triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
|
triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
|
||||||
triggerParam.setExecutorParams(jobInfo.getExecutorParam());
|
triggerParam.setExecutorParams(jobInfo.getExecutorParam());
|
||||||
triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
|
triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
|
||||||
|
triggerParam.setLogId(jobLog.getId());
|
||||||
|
triggerParam.setLogDateTim(jobLog.getTriggerTime().getTime());
|
||||||
triggerParam.setGlueType(jobInfo.getGlueType());
|
triggerParam.setGlueType(jobInfo.getGlueType());
|
||||||
triggerParam.setGlueSource(jobInfo.getGlueSource());
|
triggerParam.setGlueSource(jobInfo.getGlueSource());
|
||||||
triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
|
triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
|
||||||
triggerParam.setLogId(jobLog.getId());
|
triggerParam.setBroadcastIndex(0);
|
||||||
triggerParam.setLogDateTim(jobLog.getTriggerTime().getTime());
|
triggerParam.setBroadcastTotal(1);
|
||||||
|
|
||||||
// 4.2、trigger-run (route run / trigger remote executor)
|
// 4.2、trigger-run (route run / trigger remote executor)
|
||||||
triggerResult = executorRouteStrategyEnum.getRouter().routeRun(triggerParam, addressList);
|
triggerResult = executorRouteStrategyEnum.getRouter().routeRun(triggerParam, addressList);
|
||||||
|
|
|
@ -76,10 +76,13 @@ public class ExecutorBizImpl implements ExecutorBiz {
|
||||||
// valid:jobHandler + jobThread
|
// valid:jobHandler + jobThread
|
||||||
if (GlueTypeEnum.BEAN==GlueTypeEnum.match(triggerParam.getGlueType())) {
|
if (GlueTypeEnum.BEAN==GlueTypeEnum.match(triggerParam.getGlueType())) {
|
||||||
|
|
||||||
|
// new jobhandler
|
||||||
|
IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
|
||||||
|
|
||||||
// valid old jobThread
|
// valid old jobThread
|
||||||
if (jobThread != null && jobHandler!=null && jobThread.getHandler() != jobHandler) {
|
if (jobThread!=null && jobHandler != newJobHandler) {
|
||||||
// change handler, need kill old thread
|
// change handler, need kill old thread
|
||||||
removeOldReason = "更新JobHandler或更换任务模式,终止旧任务线程";
|
removeOldReason = "更换JobHandler或更换任务模式,终止旧任务线程";
|
||||||
|
|
||||||
jobThread = null;
|
jobThread = null;
|
||||||
jobHandler = null;
|
jobHandler = null;
|
||||||
|
@ -87,7 +90,7 @@ public class ExecutorBizImpl implements ExecutorBiz {
|
||||||
|
|
||||||
// valid handler
|
// valid handler
|
||||||
if (jobHandler == null) {
|
if (jobHandler == null) {
|
||||||
jobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
|
jobHandler = newJobHandler;
|
||||||
if (jobHandler == null) {
|
if (jobHandler == null) {
|
||||||
return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
|
return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,12 +14,15 @@ public class TriggerParam implements Serializable{
|
||||||
private String executorParams;
|
private String executorParams;
|
||||||
private String executorBlockStrategy;
|
private String executorBlockStrategy;
|
||||||
|
|
||||||
|
private int logId;
|
||||||
|
private long logDateTim;
|
||||||
|
|
||||||
private String glueType;
|
private String glueType;
|
||||||
private String glueSource;
|
private String glueSource;
|
||||||
private long glueUpdatetime;
|
private long glueUpdatetime;
|
||||||
|
|
||||||
private int logId;
|
private int broadcastIndex;
|
||||||
private long logDateTim;
|
private int broadcastTotal;
|
||||||
|
|
||||||
public int getJobId() {
|
public int getJobId() {
|
||||||
return jobId;
|
return jobId;
|
||||||
|
@ -53,6 +56,22 @@ public class TriggerParam implements Serializable{
|
||||||
this.executorBlockStrategy = executorBlockStrategy;
|
this.executorBlockStrategy = executorBlockStrategy;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getLogId() {
|
||||||
|
return logId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setLogId(int logId) {
|
||||||
|
this.logId = logId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getLogDateTim() {
|
||||||
|
return logDateTim;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setLogDateTim(long logDateTim) {
|
||||||
|
this.logDateTim = logDateTim;
|
||||||
|
}
|
||||||
|
|
||||||
public String getGlueType() {
|
public String getGlueType() {
|
||||||
return glueType;
|
return glueType;
|
||||||
}
|
}
|
||||||
|
@ -77,20 +96,20 @@ public class TriggerParam implements Serializable{
|
||||||
this.glueUpdatetime = glueUpdatetime;
|
this.glueUpdatetime = glueUpdatetime;
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getLogId() {
|
public int getBroadcastIndex() {
|
||||||
return logId;
|
return broadcastIndex;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setLogId(int logId) {
|
public void setBroadcastIndex(int broadcastIndex) {
|
||||||
this.logId = logId;
|
this.broadcastIndex = broadcastIndex;
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getLogDateTim() {
|
public int getBroadcastTotal() {
|
||||||
return logDateTim;
|
return broadcastTotal;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setLogDateTim(long logDateTim) {
|
public void setBroadcastTotal(int broadcastTotal) {
|
||||||
this.logDateTim = logDateTim;
|
this.broadcastTotal = broadcastTotal;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -100,11 +119,14 @@ public class TriggerParam implements Serializable{
|
||||||
", executorHandler='" + executorHandler + '\'' +
|
", executorHandler='" + executorHandler + '\'' +
|
||||||
", executorParams='" + executorParams + '\'' +
|
", executorParams='" + executorParams + '\'' +
|
||||||
", executorBlockStrategy='" + executorBlockStrategy + '\'' +
|
", executorBlockStrategy='" + executorBlockStrategy + '\'' +
|
||||||
|
", logId=" + logId +
|
||||||
|
", logDateTim=" + logDateTim +
|
||||||
", glueType='" + glueType + '\'' +
|
", glueType='" + glueType + '\'' +
|
||||||
", glueSource='" + glueSource + '\'' +
|
", glueSource='" + glueSource + '\'' +
|
||||||
", glueUpdatetime=" + glueUpdatetime +
|
", glueUpdatetime=" + glueUpdatetime +
|
||||||
", logId=" + logId +
|
", broadcastIndex=" + broadcastIndex +
|
||||||
", logDateTim=" + logDateTim +
|
", broadcastTotal=" + broadcastTotal +
|
||||||
'}';
|
'}';
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -98,6 +98,9 @@ public class XxlJobExecutor implements ApplicationContextAware, ApplicationListe
|
||||||
if (serviceBean instanceof IJobHandler){
|
if (serviceBean instanceof IJobHandler){
|
||||||
String name = serviceBean.getClass().getAnnotation(JobHander.class).value();
|
String name = serviceBean.getClass().getAnnotation(JobHander.class).value();
|
||||||
IJobHandler handler = (IJobHandler) serviceBean;
|
IJobHandler handler = (IJobHandler) serviceBean;
|
||||||
|
if (loadJobHandler(name) != null) {
|
||||||
|
throw new RuntimeException("xxl-job jobhandler naming conflicts.");
|
||||||
|
}
|
||||||
registJobHandler(name, handler);
|
registJobHandler(name, handler);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,8 +18,8 @@ public class XxlJobFileAppender {
|
||||||
|
|
||||||
// for JobThread (support log for child thread of job handler)
|
// for JobThread (support log for child thread of job handler)
|
||||||
//public static ThreadLocal<String> contextHolder = new ThreadLocal<String>();
|
//public static ThreadLocal<String> contextHolder = new ThreadLocal<String>();
|
||||||
public static InheritableThreadLocal<String> contextHolder = new InheritableThreadLocal<String>();
|
public static final InheritableThreadLocal<String> contextHolder = new InheritableThreadLocal<String>();
|
||||||
public static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
|
public static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* log filename: yyyy-MM-dd/9999.log
|
* log filename: yyyy-MM-dd/9999.log
|
||||||
|
|
|
@ -7,6 +7,7 @@ import com.xxl.job.core.executor.XxlJobExecutor;
|
||||||
import com.xxl.job.core.handler.IJobHandler;
|
import com.xxl.job.core.handler.IJobHandler;
|
||||||
import com.xxl.job.core.log.XxlJobFileAppender;
|
import com.xxl.job.core.log.XxlJobFileAppender;
|
||||||
import com.xxl.job.core.log.XxlJobLogger;
|
import com.xxl.job.core.log.XxlJobLogger;
|
||||||
|
import com.xxl.job.core.util.ShardingUtil;
|
||||||
import org.eclipse.jetty.util.ConcurrentHashSet;
|
import org.eclipse.jetty.util.ConcurrentHashSet;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -112,6 +113,7 @@ public class JobThread extends Thread{
|
||||||
String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTim()), triggerParam.getLogId());
|
String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTim()), triggerParam.getLogId());
|
||||||
|
|
||||||
XxlJobFileAppender.contextHolder.set(logFileName);
|
XxlJobFileAppender.contextHolder.set(logFileName);
|
||||||
|
ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal()));
|
||||||
XxlJobLogger.log("<br>----------- xxl-job job execute start -----------<br>----------- Params:" + Arrays.toString(handlerParams));
|
XxlJobLogger.log("<br>----------- xxl-job job execute start -----------<br>----------- Params:" + Arrays.toString(handlerParams));
|
||||||
|
|
||||||
executeResult = handler.execute(handlerParams);
|
executeResult = handler.execute(handlerParams);
|
||||||
|
|
|
@ -0,0 +1,46 @@
|
||||||
|
package com.xxl.job.core.util;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* sharding vo
|
||||||
|
* @author xuxueli 2017-07-25 21:26:38
|
||||||
|
*/
|
||||||
|
public class ShardingUtil {
|
||||||
|
|
||||||
|
private static InheritableThreadLocal<ShardingVO> contextHolder = new InheritableThreadLocal<ShardingVO>();
|
||||||
|
|
||||||
|
public static class ShardingVO {
|
||||||
|
|
||||||
|
private int index; // sharding index
|
||||||
|
private int total; // sharding total
|
||||||
|
|
||||||
|
public ShardingVO(int index, int total) {
|
||||||
|
this.index = index;
|
||||||
|
this.total = total;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getIndex() {
|
||||||
|
return index;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setIndex(int index) {
|
||||||
|
this.index = index;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getTotal() {
|
||||||
|
return total;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTotal(int total) {
|
||||||
|
this.total = total;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void setShardingVo(ShardingVO shardingVo){
|
||||||
|
contextHolder.set(shardingVo);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ShardingVO getShardingVo(){
|
||||||
|
return contextHolder.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -4,7 +4,7 @@ import com.xxl.job.core.biz.model.ReturnT;
|
||||||
import com.xxl.job.core.handler.IJobHandler;
|
import com.xxl.job.core.handler.IJobHandler;
|
||||||
import com.xxl.job.core.handler.annotation.JobHander;
|
import com.xxl.job.core.handler.annotation.JobHander;
|
||||||
import com.xxl.job.core.log.XxlJobLogger;
|
import com.xxl.job.core.log.XxlJobLogger;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
@ -13,15 +13,15 @@ import java.util.concurrent.TimeUnit;
|
||||||
* 任务Handler的一个Demo(Bean模式)
|
* 任务Handler的一个Demo(Bean模式)
|
||||||
*
|
*
|
||||||
* 开发步骤:
|
* 开发步骤:
|
||||||
* 1、继承 “IJobHandler” ;
|
* 1、新建一个继承com.xxl.job.core.handler.IJobHandler的Java类;
|
||||||
* 2、装配到Spring,例如加 “@Service” 注解;
|
* 2、该类被Spring容器扫描为Bean实例,如加“@Component”注解;
|
||||||
* 3、加 “@JobHander” 注解,注解value值为新增任务生成的JobKey的值;多个JobKey用逗号分割;
|
* 3、添加 “@JobHander(value="自定义jobhandler名称")”注解,注解的value值为自定义的JobHandler名称,该名称对应的是调度中心新建任务的JobHandler属性的值。
|
||||||
* 4、执行日志:需要通过 "XxlJobLogger.log" 打印执行日志;
|
* 4、执行日志:需要通过 "XxlJobLogger.log" 打印执行日志;
|
||||||
*
|
*
|
||||||
* @author xuxueli 2015-12-19 19:43:36
|
* @author xuxueli 2015-12-19 19:43:36
|
||||||
*/
|
*/
|
||||||
@JobHander(value="demoJobHandler")
|
@JobHander(value="demoJobHandler")
|
||||||
@Service
|
@Component
|
||||||
public class DemoJobHandler extends IJobHandler {
|
public class DemoJobHandler extends IJobHandler {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -0,0 +1,39 @@
|
||||||
|
package com.xxl.job.executor.service.jobhandler;
|
||||||
|
|
||||||
|
import com.xxl.job.core.biz.model.ReturnT;
|
||||||
|
import com.xxl.job.core.handler.IJobHandler;
|
||||||
|
import com.xxl.job.core.handler.annotation.JobHander;
|
||||||
|
import com.xxl.job.core.log.XxlJobLogger;
|
||||||
|
import com.xxl.job.core.util.ShardingUtil;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 广播分片任务
|
||||||
|
*
|
||||||
|
* @author xuxueli 2017-07-25 20:56:50
|
||||||
|
*/
|
||||||
|
@JobHander(value="shardingJobHandler")
|
||||||
|
@Service
|
||||||
|
public class ShardingJobHandler extends IJobHandler {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ReturnT<String> execute(String... params) throws Exception {
|
||||||
|
|
||||||
|
// 分片参数
|
||||||
|
ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
|
||||||
|
XxlJobLogger.log("分片参数:当前分片序号 = {0}, 总分片数 = {1}", shardingVO.getIndex(), shardingVO.getTotal());
|
||||||
|
|
||||||
|
// 业务逻辑
|
||||||
|
for (int i = 0; i < shardingVO.getTotal(); i++) {
|
||||||
|
if (i == shardingVO.getIndex()) {
|
||||||
|
XxlJobLogger.log("第 {0} 片, 命中分片开始处理", i);
|
||||||
|
} else {
|
||||||
|
XxlJobLogger.log("第 {0} 片, 忽略", i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return ReturnT.SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,39 @@
|
||||||
|
package com.xxl.job.executor.service.jobhandler;
|
||||||
|
|
||||||
|
import com.xxl.job.core.biz.model.ReturnT;
|
||||||
|
import com.xxl.job.core.handler.IJobHandler;
|
||||||
|
import com.xxl.job.core.handler.annotation.JobHander;
|
||||||
|
import com.xxl.job.core.log.XxlJobLogger;
|
||||||
|
import com.xxl.job.core.util.ShardingUtil;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 广播分片任务
|
||||||
|
*
|
||||||
|
* @author xuxueli 2017-07-25 20:56:50
|
||||||
|
*/
|
||||||
|
@JobHander(value="shardingJobHandler")
|
||||||
|
@Service
|
||||||
|
public class ShardingJobHandler extends IJobHandler {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ReturnT<String> execute(String... params) throws Exception {
|
||||||
|
|
||||||
|
// 分片参数
|
||||||
|
ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
|
||||||
|
XxlJobLogger.log("分片参数:当前分片序号 = {0}, 总分片数 = {1}", shardingVO.getIndex(), shardingVO.getTotal());
|
||||||
|
|
||||||
|
// 业务逻辑
|
||||||
|
for (int i = 0; i < shardingVO.getTotal(); i++) {
|
||||||
|
if (i == shardingVO.getIndex()) {
|
||||||
|
XxlJobLogger.log("第 {0} 片, 命中分片开始处理", i);
|
||||||
|
} else {
|
||||||
|
XxlJobLogger.log("第 {0} 片, 忽略", i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return ReturnT.SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue