执行器手动设置IP时将会绑定Host;

执行器回调线程优化,线程销毁前批量回调队列中所有数据;
This commit is contained in:
xuxueli 2017-08-26 12:20:04 +08:00
parent 3fbe489067
commit 0b4849bb61
5 changed files with 70 additions and 31 deletions

View File

@ -238,7 +238,7 @@ XXL-JOB是一个轻量级分布式任务调度框架其核心设计目标是
### xxl-job admin address list调度中心部署跟地址如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调"。 ### xxl-job admin address list调度中心部署跟地址如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调"。
xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
### xxl-job executor address执行器"AppName"和地址信息配置AppName执行器心跳注册分组依据地址信息用于"调度中心请求并触发任务"和"执行器注册"。执行器默认端口为9999执行器IP默认为空表示自动获取IP多网卡时可手动设置指定IP。单机部署多个执行器时注意要配置不同执行器端口 ### xxl-job executor address执行器"AppName"和地址信息配置AppName执行器心跳注册分组依据地址信息用于"调度中心请求并触发任务"和"执行器注册"。执行器默认端口为9999执行器IP默认为空表示自动获取IP多网卡时可手动设置指定IP手动设置IP时将会绑定Host。单机部署多个执行器时,注意要配置不同执行器端口;
xxl.job.executor.appname=xxl-job-executor-sample xxl.job.executor.appname=xxl-job-executor-sample
xxl.job.executor.ip= xxl.job.executor.ip=
xxl.job.executor.port=9999 xxl.job.executor.port=9999
@ -984,6 +984,9 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段
- 1、解决执行器回调URL不支持配置HTTPS时问题 - 1、解决执行器回调URL不支持配置HTTPS时问题
- 2、规范项目目录方便扩展多执行器 - 2、规范项目目录方便扩展多执行器
- 3、新增JFinal类型执行器sample示例项目 - 3、新增JFinal类型执行器sample示例项目
- 4、执行器手动设置IP时将会绑定Host
- 5、项目主页搭建提供中英文文档
- 6、执行器回调线程优化线程销毁前批量回调队列中所有数据
### TODO LIST ### TODO LIST
- 1、任务权限管理执行器为粒度分配权限核心操作校验权限 - 1、任务权限管理执行器为粒度分配权限核心操作校验权限
@ -994,11 +997,10 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段
- 6、调度任务优先级 - 6、调度任务优先级
- 7、移除quartz依赖重写调度模块新增或恢复任务时将下次执行记录插入delayqueue调度中心集群竞争分布式锁成功节点批量加载到期delayqueue数据批量执行。 - 7、移除quartz依赖重写调度模块新增或恢复任务时将下次执行记录插入delayqueue调度中心集群竞争分布式锁成功节点批量加载到期delayqueue数据批量执行。
- 8、springboot 和 docker镜像并且推送docker镜像到中央仓库更进一步实现产品开箱即用 - 8、springboot 和 docker镜像并且推送docker镜像到中央仓库更进一步实现产品开箱即用
- 9、国际化调度中心界面 + 官方文档,新增英文版本 - 9、国际化调度中心界面。
- 10、执行器摘除执行器销毁时主动通知调度中心并摘除对应执行器节点提高执行器状态感知的时效性。 - 10、执行器摘除执行器销毁时主动通知调度中心并摘除对应执行器节点提高执行器状态感知的时效性。
- 11、调度中心API服务支持API方式触发任务执行 - 11、调度中心API服务支持API方式触发任务执行
- 12、执行器启动设定IP的时候主动绑定IP避免攻击 - 12、任务参数类型改为string进一步方便参数传递
- 13、任务参数类型改为string进一步方便参数传递
## 七、其他 ## 七、其他

View File

@ -122,7 +122,7 @@ $(function () {
}, },
series : [ series : [
{ {
name: '访问来源', name: '分布比例',
type: 'pie', type: 'pie',
radius : '55%', radius : '55%',
center: ['50%', '60%'], center: ['50%', '60%'],

View File

@ -30,6 +30,9 @@ public class JettyServer {
// HTTP connector // HTTP connector
ServerConnector connector = new ServerConnector(server); ServerConnector connector = new ServerConnector(server);
if (ip!=null && ip.trim().length()>0) {
connector.setHost(ip); // The network interface this connector binds to as an IP address or a hostname. If null or 0.0.0.0, then bind to all interfaces.
}
connector.setPort(port); connector.setPort(port);
server.setConnectors(new Connector[]{connector}); server.setConnectors(new Connector[]{connector});

View File

@ -23,7 +23,7 @@ public class ExecutorRegistryThread extends Thread {
} }
private Thread registryThread; private Thread registryThread;
private boolean toStop = false; private volatile boolean toStop = false;
public void start(final int port, final String ip, final String appName){ public void start(final int port, final String ip, final String appName){
// valid // valid
@ -75,6 +75,10 @@ public class ExecutorRegistryThread extends Thread {
logger.error(e.getMessage(), e); logger.error(e.getMessage(), e);
} }
} }
// registry remove
} }
}); });
registryThread.setDaemon(true); registryThread.setDaemon(true);

View File

@ -22,15 +22,34 @@ public class TriggerCallbackThread {
return instance; return instance;
} }
/**
* job results callback queue
*/
private LinkedBlockingQueue<HandleCallbackParam> callBackQueue = new LinkedBlockingQueue<HandleCallbackParam>(); private LinkedBlockingQueue<HandleCallbackParam> callBackQueue = new LinkedBlockingQueue<HandleCallbackParam>();
public static void pushCallBack(HandleCallbackParam callback){
getInstance().callBackQueue.add(callback);
logger.debug(">>>>>>>>>>> xxl-job, push callback request, logId:{}", callback.getLogId());
}
/**
* callback thread
*/
private Thread triggerCallbackThread; private Thread triggerCallbackThread;
private boolean toStop = false; private volatile boolean toStop = false;
public void start() { public void start() {
// valid
if (XxlJobExecutor.getAdminBizList() == null) {
logger.warn(">>>>>>>>>>>> xxl-job, executor callback config fail, adminAddresses is null.");
return;
}
triggerCallbackThread = new Thread(new Runnable() { triggerCallbackThread = new Thread(new Runnable() {
@Override @Override
public void run() { public void run() {
// normal callback
while(!toStop){ while(!toStop){
try { try {
HandleCallbackParam callback = getInstance().callBackQueue.take(); HandleCallbackParam callback = getInstance().callBackQueue.take();
@ -41,34 +60,27 @@ public class TriggerCallbackThread {
int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList); int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
callbackParamList.add(callback); callbackParamList.add(callback);
// valid
if (XxlJobExecutor.getAdminBizList()==null) {
logger.warn(">>>>>>>>>>>> xxl-job callback fail, adminAddresses is null, callbackParamList{}", callbackParamList);
continue;
}
// callback, will retry if error // callback, will retry if error
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) { if (callbackParamList!=null && callbackParamList.size()>0) {
try { doCallback(callbackParamList);
ReturnT<String> callbackResult = adminBiz.callback(callbackParamList);
if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) {
callbackResult = ReturnT.SUCCESS;
logger.info(">>>>>>>>>>> xxl-job callback success, callbackParamList:{}, callbackResult:{}", new Object[]{callbackParamList, callbackResult});
break;
} else {
logger.info(">>>>>>>>>>> xxl-job callback fail, callbackParamList:{}, callbackResult:{}", new Object[]{callbackParamList, callbackResult});
}
} catch (Exception e) {
logger.error(">>>>>>>>>>> xxl-job callback error, callbackParamList{}", callbackParamList, e);
//getInstance().callBackQueue.addAll(callbackParamList);
}
} }
} }
} catch (Exception e) { } catch (Exception e) {
logger.error(e.getMessage(), e); logger.error(e.getMessage(), e);
} }
} }
// last callback
try {
List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
if (callbackParamList!=null && callbackParamList.size()>0) {
doCallback(callbackParamList);
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
} }
}); });
triggerCallbackThread.setDaemon(true); triggerCallbackThread.setDaemon(true);
@ -78,9 +90,27 @@ public class TriggerCallbackThread {
toStop = true; toStop = true;
} }
public static void pushCallBack(HandleCallbackParam callback){ /**
getInstance().callBackQueue.add(callback); * do callback, will retry if error
logger.debug(">>>>>>>>>>> xxl-job, push callback request, logId:{}", callback.getLogId()); * @param callbackParamList
*/
private void doCallback(List<HandleCallbackParam> callbackParamList){
// callback, will retry if error
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try {
ReturnT<String> callbackResult = adminBiz.callback(callbackParamList);
if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) {
callbackResult = ReturnT.SUCCESS;
logger.info(">>>>>>>>>>> xxl-job callback success, callbackParamList:{}, callbackResult:{}", new Object[]{callbackParamList, callbackResult});
break;
} else {
logger.info(">>>>>>>>>>> xxl-job callback fail, callbackParamList:{}, callbackResult:{}", new Object[]{callbackParamList, callbackResult});
}
} catch (Exception e) {
logger.error(">>>>>>>>>>> xxl-job callback error, callbackParamList{}", callbackParamList, e);
//getInstance().callBackQueue.addAll(callbackParamList);
}
}
} }
} }