From 443c946e5576be1ba7c33bf15839c33aaddfc8ce Mon Sep 17 00:00:00 2001
From: xuxueli <931591021@qq.com>
Date: Sun, 28 Oct 2018 04:09:16 +0800
Subject: [PATCH] =?UTF-8?q?-=201=E3=80=81=E8=B0=83=E5=BA=A6=E4=B8=AD?=
=?UTF-8?q?=E5=BF=83=E8=BF=81=E7=A7=BB=E5=88=B0=20springboot=EF=BC=9B=20-?=
=?UTF-8?q?=202=E3=80=81=E5=BA=95=E5=B1=82=E9=80=9A=E8=AE=AF=E7=BB=84?=
=?UTF-8?q?=E4=BB=B6=E8=BF=81=E7=A7=BB=E8=87=B3=20xxl-rpc=EF=BC=9B?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
doc/XXL-JOB官方文档.md | 24 ++--
xxl-job-admin/pom.xml | 28 ++++
.../admin/controller/JobApiController.java | 53 ++-----
.../interceptor/PermissionInterceptor.java | 6 +-
.../core/schedule/XxlJobDynamicScheduler.java | 83 +++++++----
.../job/admin/core/trigger/XxlJobTrigger.java | 2 +-
.../src/main/resources/application.properties | 2 +-
.../com/xxl/job/adminbiz/AdminBizTest.java | 13 +-
xxl-job-core/pom.xml | 29 +---
.../xxl/job/core/executor/XxlJobExecutor.java | 121 ++++++++++++----
.../xxl/job/core/rpc/codec/RpcRequest.java | 92 ------------
.../xxl/job/core/rpc/codec/RpcResponse.java | 41 ------
.../core/rpc/netcom/NetComClientProxy.java | 81 -----------
.../core/rpc/netcom/NetComServerFactory.java | 84 -----------
.../rpc/netcom/jetty/client/JettyClient.java | 48 -------
.../rpc/netcom/jetty/server/JettyServer.java | 92 ------------
.../jetty/server/JettyServerHandler.java | 68 ---------
.../core/rpc/serialize/HessianSerializer.java | 62 --------
.../core/thread/ExecutorRegistryThread.java | 15 +-
.../com/xxl/job/core/util/HttpClientUtil.java | 110 --------------
.../java/com/xxl/job/core/util/IpUtil.java | 135 ------------------
.../java/com/xxl/job/core/util/NetUtil.java | 70 ---------
.../jfinal/jobhandler/HttpJobHandler.java | 69 +++------
.../nutz/jobhandler/HttpJobHandler.java | 66 +++------
.../service/jobhandler/HttpJobHandler.java | 66 +++------
.../xxl/executor/test/DemoJobHandlerTest.java | 9 +-
.../pom.xml | 5 +
.../service/jobhandler/HttpJobHandler.java | 66 +++------
28 files changed, 322 insertions(+), 1218 deletions(-)
delete mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/rpc/codec/RpcRequest.java
delete mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/rpc/codec/RpcResponse.java
delete mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/NetComClientProxy.java
delete mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/NetComServerFactory.java
delete mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/jetty/client/JettyClient.java
delete mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/jetty/server/JettyServer.java
delete mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/jetty/server/JettyServerHandler.java
delete mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/rpc/serialize/HessianSerializer.java
delete mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/util/HttpClientUtil.java
delete mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/util/IpUtil.java
delete mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/util/NetUtil.java
diff --git a/doc/XXL-JOB官方文档.md b/doc/XXL-JOB官方文档.md
index c1c0d867..ffa21005 100644
--- a/doc/XXL-JOB官方文档.md
+++ b/doc/XXL-JOB官方文档.md
@@ -1324,17 +1324,19 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段
- 41、docker支持:调度中心提供 Dockerfile 方便快速构建docker镜像;
### 6.23 版本 V2.0.0 特性[迭代中]
-- 1、[迭代中]调度中心迁移到springboot;
-- 2、[迭代中]SimpleTrigger 支持;
-- 3、[迭代中]任务状态与quartz解耦,降低quartz调度压力,仅NORMAL状态任务绑定quartz;
-- 4、[迭代中]新增任务默认运行状态,任务更新时运行状态保持不变;
-- 5、[迭代中]任务权限管理:执行器为粒度分配权限,核心操作校验权限;
-- 6、[迭代中]Release发布时,一同发布调度中心安装包,真正实现开箱即用;
-- 7、[迭代中]docker镜像,并且推送docker镜像到中央仓库,更进一步实现产品开箱即用;
-- 8、[迭代中]cron在线生成工具,如 "cronboot/cron.qqe2";
-- 9、[迭代中]原生提供通用命令行任务Handler(Bean任务,"CommandJobHandler");业务方只需要提供命令行即可,可执行任意命令;
-- 10、IP获取逻辑优化,优先遍历网卡来获取可用IP;
-- 11、任务新增的API服务接口返回任务ID,方便调用方实用;
+- 1、调度中心迁移到 springboot;
+- 2、底层通讯组件迁移至 xxl-rpc;
+- 3、IP获取逻辑优化,优先遍历网卡来获取可用IP;
+- 4、任务新增的API服务接口返回任务ID,方便调用方实用;
+- 5、[迭代中]SimpleTrigger 支持;
+- 6、[迭代中]任务状态与quartz解耦,降低quartz调度压力,仅NORMAL状态任务绑定quartz;
+- 7、[迭代中]新增任务默认运行状态,任务更新时运行状态保持不变;
+- 8、[迭代中]任务权限管理:执行器为粒度分配权限,核心操作校验权限;
+- 9、[迭代中]Release发布时,一同发布调度中心安装包,真正实现开箱即用;
+- 10、[迭代中]docker镜像,并且推送docker镜像到中央仓库,更进一步实现产品开箱即用;
+- 11、[迭代中]cron在线生成工具,如 "cronboot/cron.qqe2";
+- 12、[迭代中]原生提供通用命令行任务Handler(Bean任务,"CommandJobHandler");业务方只需要提供命令行即可,可执行任意命令;
+
### TODO LIST
- 1、任务分片路由:分片采用一致性Hash算法计算出尽量稳定的分片顺序,即使注册机器存在波动也不会引起分批分片顺序大的波动;目前采用IP自然排序,可以满足需求,待定;
diff --git a/xxl-job-admin/pom.xml b/xxl-job-admin/pom.xml
index 68c2c15e..bd346c45 100644
--- a/xxl-job-admin/pom.xml
+++ b/xxl-job-admin/pom.xml
@@ -18,6 +18,34 @@
pom
import
+
+
+
+ org.eclipse.jetty
+ jetty-server
+ ${jetty-server.version}
+
+
+ org.eclipse.jetty
+ jetty-util
+ ${jetty-server.version}
+
+
+ org.eclipse.jetty
+ jetty-http
+ ${jetty-server.version}
+
+
+ org.eclipse.jetty
+ jetty-io
+ ${jetty-server.version}
+
+
+ org.eclipse.jetty
+ jetty-client
+ ${jetty-server.version}
+
+
diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobApiController.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobApiController.java
index a31bb293..0b39a4e1 100644
--- a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobApiController.java
+++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobApiController.java
@@ -1,69 +1,34 @@
package com.xxl.job.admin.controller;
import com.xxl.job.admin.controller.annotation.PermessionLimit;
+import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler;
import com.xxl.job.core.biz.AdminBiz;
-import com.xxl.job.core.rpc.codec.RpcRequest;
-import com.xxl.job.core.rpc.codec.RpcResponse;
-import com.xxl.job.core.rpc.netcom.NetComServerFactory;
-import com.xxl.job.core.rpc.serialize.HessianSerializer;
-import com.xxl.job.core.util.HttpClientUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
+import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
-import java.io.OutputStream;
/**
* Created by xuxueli on 17/5/10.
*/
@Controller
-public class JobApiController {
- private static Logger logger = LoggerFactory.getLogger(JobApiController.class);
+public class JobApiController implements InitializingBean {
- private RpcResponse doInvoke(HttpServletRequest request) {
- try {
- // deserialize request
- byte[] requestBytes = HttpClientUtil.readBytes(request);
- if (requestBytes == null || requestBytes.length==0) {
- RpcResponse rpcResponse = new RpcResponse();
- rpcResponse.setError("RpcRequest byte[] is null");
- return rpcResponse;
- }
- RpcRequest rpcRequest = (RpcRequest) HessianSerializer.deserialize(requestBytes, RpcRequest.class);
- // invoke
- RpcResponse rpcResponse = NetComServerFactory.invokeService(rpcRequest, null);
- return rpcResponse;
- } catch (Exception e) {
- logger.error(e.getMessage(), e);
+ @Override
+ public void afterPropertiesSet() throws Exception {
- RpcResponse rpcResponse = new RpcResponse();
- rpcResponse.setError("Server-error:" + e.getMessage());
- return rpcResponse;
- }
}
@RequestMapping(AdminBiz.MAPPING)
@PermessionLimit(limit=false)
- public void api(HttpServletRequest request, HttpServletResponse response) throws IOException {
-
- // invoke
- RpcResponse rpcResponse = doInvoke(request);
-
- // serialize response
- byte[] responseBytes = HessianSerializer.serialize(rpcResponse);
-
- response.setContentType("text/html;charset=utf-8");
- response.setStatus(HttpServletResponse.SC_OK);
- //baseRequest.setHandled(true);
-
- OutputStream out = response.getOutputStream();
- out.write(responseBytes);
- out.flush();
+ public void api(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
+ XxlJobDynamicScheduler.invokeAdminService(request, response);
}
+
}
diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/interceptor/PermissionInterceptor.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/interceptor/PermissionInterceptor.java
index 9205ed59..8f0334bd 100644
--- a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/interceptor/PermissionInterceptor.java
+++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/interceptor/PermissionInterceptor.java
@@ -3,8 +3,8 @@ package com.xxl.job.admin.controller.interceptor;
import com.xxl.job.admin.controller.annotation.PermessionLimit;
import com.xxl.job.admin.core.conf.XxlJobAdminConfig;
import com.xxl.job.admin.core.util.CookieUtil;
-import org.apache.commons.codec.digest.DigestUtils;
import org.springframework.stereotype.Component;
+import org.springframework.util.DigestUtils;
import org.springframework.web.method.HandlerMethod;
import org.springframework.web.servlet.handler.HandlerInterceptorAdapter;
@@ -29,7 +29,7 @@ public class PermissionInterceptor extends HandlerInterceptorAdapter {
String password = XxlJobAdminConfig.getAdminConfig().getLoginPassword();
// login token
- String tokenTmp = DigestUtils.md5Hex(username + "_" + password);
+ String tokenTmp = DigestUtils.md5DigestAsHex(String.valueOf(username + "_" + password).getBytes()); //.getBytes("UTF-8")
tokenTmp = new BigInteger(1, tokenTmp.getBytes()).toString(16);
LOGIN_IDENTITY_TOKEN = tokenTmp;
@@ -40,7 +40,7 @@ public class PermissionInterceptor extends HandlerInterceptorAdapter {
public static boolean login(HttpServletResponse response, String username, String password, boolean ifRemember){
// login token
- String tokenTmp = DigestUtils.md5Hex(username + "_" + password);
+ String tokenTmp = DigestUtils.md5DigestAsHex(String.valueOf(username + "_" + password).getBytes());
tokenTmp = new BigInteger(1, tokenTmp.getBytes()).toString(16);
if (!getLoginIdentityToken().equals(tokenTmp)){
diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/schedule/XxlJobDynamicScheduler.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/schedule/XxlJobDynamicScheduler.java
index 79124dc1..6ba08ed7 100644
--- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/schedule/XxlJobDynamicScheduler.java
+++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/schedule/XxlJobDynamicScheduler.java
@@ -7,27 +7,28 @@ import com.xxl.job.admin.core.thread.JobFailMonitorHelper;
import com.xxl.job.admin.core.thread.JobRegistryMonitorHelper;
import com.xxl.job.admin.core.thread.JobTriggerPoolHelper;
import com.xxl.job.admin.core.util.I18nUtil;
-import com.xxl.job.admin.dao.XxlJobGroupDao;
-import com.xxl.job.admin.dao.XxlJobInfoDao;
-import com.xxl.job.admin.dao.XxlJobLogDao;
-import com.xxl.job.admin.dao.XxlJobRegistryDao;
import com.xxl.job.core.biz.AdminBiz;
import com.xxl.job.core.biz.ExecutorBiz;
import com.xxl.job.core.enums.ExecutorBlockStrategyEnum;
-import com.xxl.job.core.rpc.netcom.NetComClientProxy;
-import com.xxl.job.core.rpc.netcom.NetComServerFactory;
+import com.xxl.rpc.remoting.invoker.XxlRpcInvokerFactory;
+import com.xxl.rpc.remoting.invoker.call.CallType;
+import com.xxl.rpc.remoting.invoker.reference.XxlRpcReferenceBean;
+import com.xxl.rpc.remoting.net.NetEnum;
+import com.xxl.rpc.remoting.net.impl.jetty.server.JettyServerHandler;
+import com.xxl.rpc.remoting.provider.XxlRpcProviderFactory;
+import com.xxl.rpc.serialize.Serializer;
+import org.eclipse.jetty.server.Request;
import org.quartz.*;
import org.quartz.Trigger.TriggerState;
import org.quartz.impl.triggers.CronTriggerImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.BeansException;
-import org.springframework.beans.factory.DisposableBean;
-import org.springframework.beans.factory.InitializingBean;
-import org.springframework.context.ApplicationContext;
-import org.springframework.context.ApplicationContextAware;
import org.springframework.util.Assert;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
import java.util.Date;
import java.util.HashSet;
import java.util.concurrent.ConcurrentHashMap;
@@ -50,32 +51,26 @@ public final class XxlJobDynamicScheduler {
// ---------------------- init + destroy ----------------------
public void start() throws Exception {
+ // valid
+ Assert.notNull(scheduler, "quartz scheduler is null");
+
+ // init i18n
+ initI18n();
+
// admin registry monitor run
JobRegistryMonitorHelper.getInstance().start();
// admin monitor run
JobFailMonitorHelper.getInstance().start();
- // admin-server(spring-mvc)
- NetComServerFactory.putService(AdminBiz.class, XxlJobAdminConfig.getAdminConfig().getAdminBiz());
- NetComServerFactory.setAccessToken(XxlJobAdminConfig.getAdminConfig().getAccessToken());
+ // admin-server
+ initRpcProvider();
- // init i18n
- initI18n();
-
- // valid
- Assert.notNull(scheduler, "quartz scheduler is null");
logger.info(">>>>>>>>> init xxl-job admin success.");
}
- private void initI18n(){
- for (ExecutorBlockStrategyEnum item:ExecutorBlockStrategyEnum.values()) {
- item.setTitle(I18nUtil.getString("jobconf_block_".concat(item.name())));
- }
- }
-
- public void destroy(){
+ public void destroy() throws Exception {
// admin trigger pool stop
JobTriggerPoolHelper.toStop();
@@ -84,6 +79,38 @@ public final class XxlJobDynamicScheduler {
// admin monitor stop
JobFailMonitorHelper.getInstance().toStop();
+
+ // admin-server
+ stopRpcProvider();
+ }
+
+
+ // ---------------------- I18n ----------------------
+
+ private void initI18n(){
+ for (ExecutorBlockStrategyEnum item:ExecutorBlockStrategyEnum.values()) {
+ item.setTitle(I18nUtil.getString("jobconf_block_".concat(item.name())));
+ }
+ }
+
+ // ---------------------- admin rpc provider (no server version) ----------------------
+ private static JettyServerHandler jettyServerHandler;
+ private void initRpcProvider(){
+ // init
+ XxlRpcProviderFactory xxlRpcProviderFactory = new XxlRpcProviderFactory();
+ xxlRpcProviderFactory.initConfig(NetEnum.JETTY, Serializer.SerializeEnum.HESSIAN.getSerializer(), null, 0, XxlJobAdminConfig.getAdminConfig().getAccessToken(), null, null);
+
+ // add services
+ xxlRpcProviderFactory.addService(AdminBiz.class.getName(), null, XxlJobAdminConfig.getAdminConfig().getAdminBiz());
+
+ // jetty handler
+ jettyServerHandler = new JettyServerHandler(xxlRpcProviderFactory);
+ }
+ private void stopRpcProvider() throws Exception {
+ new XxlRpcInvokerFactory().stop();
+ }
+ public static void invokeAdminService(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
+ jettyServerHandler.handle(null, new Request(null, null), request, response);
}
@@ -103,7 +130,9 @@ public final class XxlJobDynamicScheduler {
}
// set-cache
- executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, address, XxlJobAdminConfig.getAdminConfig().getAccessToken()).getObject();
+ executorBiz = (ExecutorBiz) new XxlRpcReferenceBean(NetEnum.JETTY, Serializer.SerializeEnum.HESSIAN.getSerializer(), CallType.SYNC,
+ ExecutorBiz.class, null, 10000, address, XxlJobAdminConfig.getAdminConfig().getAccessToken(), null).getObject();
+
executorBizRepository.put(address, executorBiz);
return executorBiz;
}
diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java
index 34d8b3c0..5afacd79 100644
--- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java
+++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java
@@ -12,7 +12,7 @@ import com.xxl.job.core.biz.ExecutorBiz;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
import com.xxl.job.core.enums.ExecutorBlockStrategyEnum;
-import com.xxl.job.core.util.IpUtil;
+import com.xxl.rpc.util.IpUtil;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
diff --git a/xxl-job-admin/src/main/resources/application.properties b/xxl-job-admin/src/main/resources/application.properties
index 4467f3df..b84c87ee 100644
--- a/xxl-job-admin/src/main/resources/application.properties
+++ b/xxl-job-admin/src/main/resources/application.properties
@@ -16,7 +16,7 @@ spring.freemarker.request-context-attribute=request
mybatis.mapper-locations=classpath:/mybatis-mapper/*Mapper.xml
### xxl-job, datasource
-spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl-job?Unicode=true&characterEncoding=UTF-8
+spring.datasource.url=jdbc:mysql://192.168.99.100:3306/xxl-job?Unicode=true&characterEncoding=UTF-8
spring.datasource.username=root
spring.datasource.password=root_pwd
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
diff --git a/xxl-job-admin/src/test/java/com/xxl/job/adminbiz/AdminBizTest.java b/xxl-job-admin/src/test/java/com/xxl/job/adminbiz/AdminBizTest.java
index 1f1b0742..7883f6fa 100644
--- a/xxl-job-admin/src/test/java/com/xxl/job/adminbiz/AdminBizTest.java
+++ b/xxl-job-admin/src/test/java/com/xxl/job/adminbiz/AdminBizTest.java
@@ -4,7 +4,10 @@ import com.xxl.job.core.biz.AdminBiz;
import com.xxl.job.core.biz.model.RegistryParam;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.enums.RegistryConfig;
-import com.xxl.job.core.rpc.netcom.NetComClientProxy;
+import com.xxl.rpc.remoting.invoker.call.CallType;
+import com.xxl.rpc.remoting.invoker.reference.XxlRpcReferenceBean;
+import com.xxl.rpc.remoting.net.NetEnum;
+import com.xxl.rpc.serialize.Serializer;
import org.junit.Assert;
import org.junit.Test;
@@ -26,7 +29,9 @@ public class AdminBizTest {
*/
@Test
public void registryTest() throws Exception {
- AdminBiz adminBiz = (AdminBiz) new NetComClientProxy(AdminBiz.class, addressUrl, accessToken).getObject();
+ addressUrl = addressUrl.replace("http://", "");
+ AdminBiz adminBiz = (AdminBiz) new XxlRpcReferenceBean(NetEnum.JETTY, Serializer.SerializeEnum.HESSIAN.getSerializer(), CallType.SYNC,
+ AdminBiz.class, null, 10000, addressUrl, accessToken, null).getObject();
// test executor registry
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), "xxl-job-executor-example", "127.0.0.1:9999");
@@ -41,7 +46,9 @@ public class AdminBizTest {
*/
@Test
public void registryRemove() throws Exception {
- AdminBiz adminBiz = (AdminBiz) new NetComClientProxy(AdminBiz.class, addressUrl, accessToken).getObject();
+ addressUrl = addressUrl.replace("http://", "");
+ AdminBiz adminBiz = (AdminBiz) new XxlRpcReferenceBean(NetEnum.JETTY, Serializer.SerializeEnum.HESSIAN.getSerializer(), CallType.SYNC,
+ AdminBiz.class, null, 10000, addressUrl, accessToken, null).getObject();
// test executor registry remove
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), "xxl-job-executor-example", "127.0.0.1:9999");
diff --git a/xxl-job-core/pom.xml b/xxl-job-core/pom.xml
index 60c7ec18..f5306a84 100644
--- a/xxl-job-core/pom.xml
+++ b/xxl-job-core/pom.xml
@@ -15,32 +15,11 @@
-
+
- org.slf4j
- slf4j-api
- ${slf4j-api.version}
-
-
-
-
- org.eclipse.jetty
- jetty-server
- ${jetty-server.version}
-
-
-
-
- org.apache.httpcomponents
- httpclient
- ${httpclient.version}
-
-
-
-
- com.caucho
- hessian
- ${hessian.version}
+ com.xuxueli
+ xxl-rpc-core
+ 1.2.0
diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java b/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java
index 74734728..bba0ee62 100644
--- a/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java
+++ b/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java
@@ -6,11 +6,19 @@ import com.xxl.job.core.biz.impl.ExecutorBizImpl;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.JobHandler;
import com.xxl.job.core.log.XxlJobFileAppender;
-import com.xxl.job.core.rpc.netcom.NetComClientProxy;
-import com.xxl.job.core.rpc.netcom.NetComServerFactory;
+import com.xxl.job.core.thread.ExecutorRegistryThread;
import com.xxl.job.core.thread.JobLogFileCleanThread;
import com.xxl.job.core.thread.JobThread;
-import com.xxl.job.core.util.NetUtil;
+import com.xxl.job.core.thread.TriggerCallbackThread;
+import com.xxl.rpc.registry.impl.LocalServiceRegistry;
+import com.xxl.rpc.remoting.invoker.XxlRpcInvokerFactory;
+import com.xxl.rpc.remoting.invoker.call.CallType;
+import com.xxl.rpc.remoting.invoker.reference.XxlRpcReferenceBean;
+import com.xxl.rpc.remoting.net.NetEnum;
+import com.xxl.rpc.remoting.provider.XxlRpcProviderFactory;
+import com.xxl.rpc.serialize.Serializer;
+import com.xxl.rpc.util.IpUtil;
+import com.xxl.rpc.util.NetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
@@ -30,7 +38,7 @@ public class XxlJobExecutor implements ApplicationContextAware {
// ---------------------- param ----------------------
private String adminAddresses;
- private String appName;
+ private static String appName;
private String ip;
private int port;
private String accessToken;
@@ -72,20 +80,26 @@ public class XxlJobExecutor implements ApplicationContextAware {
// ---------------------- start + stop ----------------------
public void start() throws Exception {
- // init admin-client
- initAdminBizList(adminAddresses, accessToken);
-
- // init executor-jobHandlerRepository
- initJobHandlerRepository(applicationContext);
// init logpath
XxlJobFileAppender.initLogPath(logPath);
- // init executor-server
- initExecutorServer(port, ip, appName, accessToken);
+ // init JobHandler Repository
+ initJobHandlerRepository(applicationContext);
+
+ // init admin-client
+ initAdminBizList(adminAddresses, accessToken);
// init JobLogFileCleanThread
JobLogFileCleanThread.getInstance().start(logRetentionDays);
+
+ // init TriggerCallbackThread
+ TriggerCallbackThread.getInstance().start();
+
+ // init executor-server
+ port = port>0?port: NetUtil.findAvailablePort(9999);
+ ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();
+ initRpcProvider(ip, port, appName, accessToken);
}
public void destroy(){
// destory jobThreadRepository
@@ -96,22 +110,35 @@ public class XxlJobExecutor implements ApplicationContextAware {
jobThreadRepository.clear();
}
- // destory executor-server
- stopExecutorServer();
-
// destory JobLogFileCleanThread
JobLogFileCleanThread.getInstance().toStop();
+
+ // destory TriggerCallbackThread
+ TriggerCallbackThread.getInstance().toStop();
+
+ // destory executor-server
+ stopRpcProvider();
}
- // ---------------------- admin-client ----------------------
+ // ---------------------- admin-client (rpc invoker) ----------------------
private static List adminBizList;
private static void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
if (adminAddresses!=null && adminAddresses.trim().length()>0) {
for (String address: adminAddresses.trim().split(",")) {
if (address!=null && address.trim().length()>0) {
+
String addressUrl = address.concat(AdminBiz.MAPPING);
- AdminBiz adminBiz = (AdminBiz) new NetComClientProxy(AdminBiz.class, addressUrl, accessToken).getObject();
+ if (addressUrl.startsWith("http://")) {
+ addressUrl = addressUrl.replace("http://", "");
+ }
+ if (addressUrl.startsWith("https://")) {
+ addressUrl = addressUrl.replace("https://", "");
+ }
+
+ AdminBiz adminBiz = (AdminBiz) new XxlRpcReferenceBean(NetEnum.JETTY, Serializer.SerializeEnum.HESSIAN.getSerializer(), CallType.SYNC,
+ AdminBiz.class, null, 10000, addressUrl, accessToken, null).getObject();
+
if (adminBizList == null) {
adminBizList = new ArrayList();
}
@@ -125,19 +152,59 @@ public class XxlJobExecutor implements ApplicationContextAware {
}
- // ---------------------- executor-server(jetty) ----------------------
- private NetComServerFactory serverFactory = new NetComServerFactory();
- private void initExecutorServer(int port, String ip, String appName, String accessToken) throws Exception {
- // valid param
- port = port>0?port: NetUtil.findAvailablePort(9999);
+ // ---------------------- executor-server (rpc provider) ----------------------
+ private XxlRpcInvokerFactory xxlRpcInvokerFactory = null;
+ private XxlRpcProviderFactory xxlRpcProviderFactory = null;
+ private void initRpcProvider(String ip, int port, String appName, String accessToken) throws Exception {
+ // init invoker factory
+ xxlRpcInvokerFactory = new XxlRpcInvokerFactory();
+
+ // init, provider factory
+ xxlRpcProviderFactory = new XxlRpcProviderFactory();
+ xxlRpcProviderFactory.initConfig(NetEnum.JETTY, Serializer.SerializeEnum.HESSIAN.getSerializer(), ip, port, accessToken, ExecutorServiceRegistry.class, null);
+
+ // add services
+ xxlRpcProviderFactory.addService(ExecutorBiz.class.getName(), null, new ExecutorBizImpl());
+
+ // start
+ xxlRpcProviderFactory.start();
- // start server
- NetComServerFactory.putService(ExecutorBiz.class, new ExecutorBizImpl()); // rpc-service, base on jetty
- NetComServerFactory.setAccessToken(accessToken);
- serverFactory.start(port, ip, appName); // jetty + registry
}
- private void stopExecutorServer() {
- serverFactory.destroy(); // jetty + registry + callback
+
+ public static class ExecutorServiceRegistry extends LocalServiceRegistry {
+ @Override
+ public boolean registry(String key, String value) {
+
+ // start registry
+ if (ExecutorBiz.class.getName().equalsIgnoreCase(key)) {
+ ExecutorRegistryThread.getInstance().start(appName, value);
+ }
+
+ return super.registry(key, value);
+ }
+
+ @Override
+ public void stop() {
+ // stop registry
+ ExecutorRegistryThread.getInstance().toStop();
+
+ super.stop();
+ }
+ }
+
+ private void stopRpcProvider() {
+ // stop invoker factory
+ try {
+ xxlRpcInvokerFactory.stop();
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ }
+ // stop provider factory
+ try {
+ xxlRpcProviderFactory.stop();
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ }
}
diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/rpc/codec/RpcRequest.java b/xxl-job-core/src/main/java/com/xxl/job/core/rpc/codec/RpcRequest.java
deleted file mode 100644
index 5b598659..00000000
--- a/xxl-job-core/src/main/java/com/xxl/job/core/rpc/codec/RpcRequest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-package com.xxl.job.core.rpc.codec;
-
-import java.io.Serializable;
-import java.util.Arrays;
-
-/**
- * request
- * @author xuxueli 2015-10-29 19:39:12
- */
-public class RpcRequest implements Serializable{
- private static final long serialVersionUID = 1L;
-
- private String serverAddress;
- private long createMillisTime;
- private String accessToken;
-
- private String className;
- private String methodName;
- private Class>[] parameterTypes;
- private Object[] parameters;
-
-
- public String getServerAddress() {
- return serverAddress;
- }
-
- public void setServerAddress(String serverAddress) {
- this.serverAddress = serverAddress;
- }
-
- public long getCreateMillisTime() {
- return createMillisTime;
- }
-
- public void setCreateMillisTime(long createMillisTime) {
- this.createMillisTime = createMillisTime;
- }
-
- public String getAccessToken() {
- return accessToken;
- }
-
- public void setAccessToken(String accessToken) {
- this.accessToken = accessToken;
- }
-
- public String getClassName() {
- return className;
- }
-
- public void setClassName(String className) {
- this.className = className;
- }
-
- public String getMethodName() {
- return methodName;
- }
-
- public void setMethodName(String methodName) {
- this.methodName = methodName;
- }
-
- public Class>[] getParameterTypes() {
- return parameterTypes;
- }
-
- public void setParameterTypes(Class>[] parameterTypes) {
- this.parameterTypes = parameterTypes;
- }
-
- public Object[] getParameters() {
- return parameters;
- }
-
- public void setParameters(Object[] parameters) {
- this.parameters = parameters;
- }
-
- @Override
- public String toString() {
- return "RpcRequest{" +
- "serverAddress='" + serverAddress + '\'' +
- ", createMillisTime=" + createMillisTime +
- ", accessToken='" + accessToken + '\'' +
- ", className='" + className + '\'' +
- ", methodName='" + methodName + '\'' +
- ", parameterTypes=" + Arrays.toString(parameterTypes) +
- ", parameters=" + Arrays.toString(parameters) +
- '}';
- }
-
-}
diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/rpc/codec/RpcResponse.java b/xxl-job-core/src/main/java/com/xxl/job/core/rpc/codec/RpcResponse.java
deleted file mode 100644
index bb95be32..00000000
--- a/xxl-job-core/src/main/java/com/xxl/job/core/rpc/codec/RpcResponse.java
+++ /dev/null
@@ -1,41 +0,0 @@
-package com.xxl.job.core.rpc.codec;
-
-import java.io.Serializable;
-
-/**
- * response
- * @author xuxueli 2015-10-29 19:39:54
- */
-public class RpcResponse implements Serializable{
- private static final long serialVersionUID = 1L;
-
- private String error;
- private Object result;
-
- public boolean isError() {
- return error != null;
- }
-
- public String getError() {
- return error;
- }
-
- public void setError(String error) {
- this.error = error;
- }
-
- public Object getResult() {
- return result;
- }
-
- public void setResult(Object result) {
- this.result = result;
- }
-
- @Override
- public String toString() {
- return "NettyResponse [error=" + error
- + ", result=" + result + "]";
- }
-
-}
diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/NetComClientProxy.java b/xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/NetComClientProxy.java
deleted file mode 100644
index 3d2f9bb2..00000000
--- a/xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/NetComClientProxy.java
+++ /dev/null
@@ -1,81 +0,0 @@
-package com.xxl.job.core.rpc.netcom;
-
-import com.xxl.job.core.rpc.codec.RpcRequest;
-import com.xxl.job.core.rpc.codec.RpcResponse;
-import com.xxl.job.core.rpc.netcom.jetty.client.JettyClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.FactoryBean;
-
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-
-/**
- * rpc proxy
- * @author xuxueli 2015-10-29 20:18:32
- */
-public class NetComClientProxy implements FactoryBean
diff --git a/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/java/com/xxl/job/executor/service/jobhandler/HttpJobHandler.java b/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/java/com/xxl/job/executor/service/jobhandler/HttpJobHandler.java
index 400e138b..ea68d749 100644
--- a/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/java/com/xxl/job/executor/service/jobhandler/HttpJobHandler.java
+++ b/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/java/com/xxl/job/executor/service/jobhandler/HttpJobHandler.java
@@ -4,16 +4,14 @@ import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.JobHandler;
import com.xxl.job.core.log.XxlJobLogger;
-import org.apache.http.HttpEntity;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.config.RequestConfig;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
-import org.apache.http.util.EntityUtils;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.api.ContentResponse;
+import org.eclipse.jetty.client.api.Request;
+import org.eclipse.jetty.http.HttpMethod;
+import org.eclipse.jetty.http.HttpStatus;
import org.springframework.stereotype.Component;
-import java.io.IOException;
+import java.util.concurrent.TimeUnit;
/**
* 跨平台Http任务
@@ -33,46 +31,26 @@ public class HttpJobHandler extends IJobHandler {
return FAIL;
}
- // httpGet config
- HttpGet httpGet = new HttpGet(param);
- RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(5000).setConnectTimeout(5000).build();
- httpGet.setConfig(requestConfig);
+ // httpclient
+ HttpClient httpClient = new HttpClient();
+ httpClient.setFollowRedirects(false); // Configure HttpClient, for example:
+ httpClient.start(); // Start HttpClient
- CloseableHttpClient httpClient = null;
- try{
- httpClient = HttpClients.custom().disableAutomaticRetries().build();
+ // request
+ Request request = httpClient.newRequest(param);
+ request.method(HttpMethod.GET);
+ request.timeout(5000, TimeUnit.MILLISECONDS);
- // parse response
- HttpResponse response = httpClient.execute(httpGet);
- HttpEntity entity = response.getEntity();
- if (response.getStatusLine().getStatusCode() != 200) {
- XxlJobLogger.log("Http StatusCode({}) Invalid.", response.getStatusLine().getStatusCode());
- return FAIL;
- }
- if (null == entity) {
- XxlJobLogger.log("Http Entity Empty.");
- return FAIL;
- }
-
- String responseMsg = EntityUtils.toString(entity, "UTF-8");
- XxlJobLogger.log(responseMsg);
- EntityUtils.consume(entity);
- return SUCCESS;
- } catch (Exception e) {
- XxlJobLogger.log(e);
+ // invoke
+ ContentResponse response = request.send();
+ if (response.getStatus() != HttpStatus.OK_200) {
+ XxlJobLogger.log("Http StatusCode({}) Invalid.", response.getStatus());
return FAIL;
- } finally{
- if (httpGet!=null) {
- httpGet.releaseConnection();
- }
- if (httpClient!=null) {
- try {
- httpClient.close();
- } catch (IOException e) {
- XxlJobLogger.log(e);
- }
- }
}
+
+ String responseMsg = response.getContentAsString();
+ XxlJobLogger.log(responseMsg);
+ return SUCCESS;
}
}