心跳注册和结果回调,重试逻辑优化

This commit is contained in:
xuxueli 2017-07-27 23:38:28 +08:00
parent ff8e43af54
commit 8e07d501e2
2 changed files with 33 additions and 27 deletions

View File

@ -49,25 +49,31 @@ public class ExecutorRegistryThread extends Thread {
@Override @Override
public void run() { public void run() {
while (!toStop) { while (!toStop) {
try { try {
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, executorAddress); RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, executorAddress);
ReturnT<String> registryResult = null;
for (String addressUrl: XxlJobExecutor.adminAddresses.split(",")) { for (String addressUrl: XxlJobExecutor.adminAddresses.split(",")) {
String apiUrl = addressUrl.concat("/api"); String apiUrl = addressUrl.concat("/api");
try {
AdminBiz adminBiz = (AdminBiz) new NetComClientProxy(AdminBiz.class, apiUrl).getObject(); AdminBiz adminBiz = (AdminBiz) new NetComClientProxy(AdminBiz.class, apiUrl).getObject();
registryResult = adminBiz.registry(registryParam); ReturnT<String> registryResult = adminBiz.registry(registryParam);
if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) { if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
registryResult = ReturnT.SUCCESS; registryResult = ReturnT.SUCCESS;
logger.info(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
break; break;
} else {
logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
} }
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
}
} }
logger.info(">>>>>>>>>>> xxl-job Executor registry {}, RegistryParam:{}, registryResult:{}",
new Object[]{(registryResult.getCode()==ReturnT.SUCCESS_CODE?"success":"fail"), registryParam.toString(), registryResult.toString()});
} catch (Exception e) { } catch (Exception e) {
logger.error(">>>>>>>>>>> xxl-job ExecutorRegistryThread Exception:", e); logger.error(e.getMessage(), e);
} }
try { try {

View File

@ -37,40 +37,40 @@ public class TriggerCallbackThread {
HandleCallbackParam callback = getInstance().callBackQueue.take(); HandleCallbackParam callback = getInstance().callBackQueue.take();
if (callback != null) { if (callback != null) {
// valid
if (XxlJobExecutor.adminAddresses==null || XxlJobExecutor.adminAddresses.trim().length()==0) {
logger.warn(">>>>>>>>>>>> xxl-job callback fail, adminAddresses is null.");
continue;
}
// callback list param // callback list param
List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>(); List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList); int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
callbackParamList.add(callback); callbackParamList.add(callback);
// callback, will retry if error // valid
try { if (XxlJobExecutor.adminAddresses==null || XxlJobExecutor.adminAddresses.trim().length()==0) {
logger.warn(">>>>>>>>>>>> xxl-job callback fail, adminAddresses is null, callbackParamList{}", callbackParamList);
continue;
}
ReturnT<String> callbackResult = null; // callback, will retry if error
for (String addressUrl: XxlJobExecutor.adminAddresses.split(",")) { for (String addressUrl: XxlJobExecutor.adminAddresses.split(",")) {
String apiUrl = addressUrl.concat("/api"); String apiUrl = addressUrl.concat("/api");
try {
AdminBiz adminBiz = (AdminBiz) new NetComClientProxy(AdminBiz.class, apiUrl).getObject(); AdminBiz adminBiz = (AdminBiz) new NetComClientProxy(AdminBiz.class, apiUrl).getObject();
callbackResult = adminBiz.callback(callbackParamList); ReturnT<String> callbackResult = adminBiz.callback(callbackParamList);
if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) { if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) {
callbackResult = ReturnT.SUCCESS; callbackResult = ReturnT.SUCCESS;
logger.info(">>>>>>>>>>> xxl-job callback success, callbackParamList:{}, callbackResult:{}", new Object[]{callbackParamList, callbackResult});
break; break;
} else {
logger.info(">>>>>>>>>>> xxl-job callback fail, callbackParamList:{}, callbackResult:{}", new Object[]{callbackParamList, callbackResult});
} }
}
logger.info(">>>>>>>>>>> xxl-job callback, callbackParamList:{}, callbackResult:{}", new Object[]{callbackParamList, callbackResult});
} catch (Exception e) { } catch (Exception e) {
logger.error(">>>>>>>>>>> xxl-job TriggerCallbackThread Exception:", e); logger.error(">>>>>>>>>>> xxl-job callback error, callbackParamList{}", callbackParamList, e);
//getInstance().callBackQueue.addAll(callbackParamList); //getInstance().callBackQueue.addAll(callbackParamList);
} }
} }
}
} catch (Exception e) { } catch (Exception e) {
logger.error("", e); logger.error(e.getMessage(), e);
} }
} }
} }