Merge remote-tracking branch 'upstream/master'

# Conflicts:
#	xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobApiController.java
#	xxl-job-admin/src/main/java/com/xxl/job/admin/core/conf/XxlJobScheduler.java
#	xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AdminBizImpl.java
This commit is contained in:
Zhouchuanwen 2019-11-20 14:59:10 +08:00
commit 8512a34469
44 changed files with 714 additions and 417 deletions

17
.github/workflows/maven.yml vendored Normal file
View File

@ -0,0 +1,17 @@
name: Java CI
on: [push]
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v1
- name: Set up JDK 1.8
uses: actions/setup-java@v1
with:
java-version: 1.8
- name: Build with Maven
run: mvn -B package --file pom.xml

View File

@ -1,6 +0,0 @@
language: java
jdk:
- openjdk8
install: mvn install -DskipTests=true -Dmaven.javadoc.skip=true
#script: mvn test
script: mvn -DskipTests=true clean package

View File

@ -7,8 +7,8 @@
<a href="https://www.xuxueli.com/xxl-job/"><strong>-- Home Page --</strong></a>
<br>
<br>
<a href="https://travis-ci.org/xuxueli/xxl-job">
<img src="https://travis-ci.org/xuxueli/xxl-job.svg?branch=master" >
<a href="https://github.com/xuxueli/xxl-job/actions">
<img src="https://github.com/xuxueli/xxl-job/workflows/Java%20CI/badge.svg" >
</a>
<a href="https://hub.docker.com/r/xuxueli/xxl-job-admin/">
<img src="https://img.shields.io/badge/docker-passing-brightgreen.svg" >
@ -378,6 +378,16 @@ XXL-JOB是一个轻量级分布式任务调度平台其核心设计目标是
- 259、爱乐优二手平台
- 260、猫眼电影私有化部署【猫眼电影】
- 261、美团大象私有化部署【美团大象】
- 262、作业帮教育科技北京有限公司【作业帮】
- 263、北京小年糕互联网技术有限公司
- 264、山东矩阵软件工程股份有限公司
- 265、陕西国驿软件科技有限公司
- 266、君开信息科技
- 267、村鸟网络科技有限责任公司
- 268、云南国际信托有限公司
- 269、金智教育
- 270、珠海市筑巢科技有限公司
- 271、上海百胜软件股份有限公司
- ……
> 更多接入的公司,欢迎在 [登记地址](https://github.com/xuxueli/xxl-job/issues/1 ) 登记,登记仅仅为了产品推广。

View File

@ -1,6 +1,6 @@
## 《Distributed task scheduling framework XXL-JOB》
[![Build Status](https://travis-ci.org/xuxueli/xxl-job.svg?branch=master)](https://travis-ci.org/xuxueli/xxl-job)
[![Actions Status](https://github.com/xuxueli/xxl-job/workflows/Java%20CI/badge.svg)](https://github.com/xuxueli/xxl-job/actions)
[![Maven Central](https://maven-badges.herokuapp.com/maven-central/com.xuxueli/xxl-job/badge.svg)](https://maven-badges.herokuapp.com/maven-central/com.xuxueli/xxl-job/)
[![GitHub release](https://img.shields.io/github/release/xuxueli/xxl-job.svg)](https://github.com/xuxueli/xxl-job/releases)
[![License](https://img.shields.io/badge/license-GPLv3-blue.svg)](http://www.gnu.org/licenses/gpl-3.0.html)

View File

@ -1,6 +1,6 @@
## 《分布式任务调度平台XXL-JOB》
[![Build Status](https://travis-ci.org/xuxueli/xxl-job.svg?branch=master)](https://travis-ci.org/xuxueli/xxl-job)
[![Actions Status](https://github.com/xuxueli/xxl-job/workflows/Java%20CI/badge.svg)](https://github.com/xuxueli/xxl-job/actions)
[![Docker Status](https://img.shields.io/badge/docker-passing-brightgreen.svg)](https://hub.docker.com/r/xuxueli/xxl-job-admin/)
[![Maven Central](https://maven-badges.herokuapp.com/maven-central/com.xuxueli/xxl-job/badge.svg)](https://maven-badges.herokuapp.com/maven-central/com.xuxueli/xxl-job/)
[![GitHub release](https://img.shields.io/github/release/xuxueli/xxl-job.svg)](https://github.com/xuxueli/xxl-job/releases)
@ -346,6 +346,16 @@ XXL-JOB是一个轻量级分布式任务调度平台其核心设计目标是
- 259、爱乐优二手平台
- 260、猫眼电影私有化部署【猫眼电影】
- 261、美团大象私有化部署【美团大象】
- 262、作业帮教育科技北京有限公司【作业帮】
- 263、北京小年糕互联网技术有限公司
- 264、山东矩阵软件工程股份有限公司
- 265、陕西国驿软件科技有限公司
- 266、君开信息科技
- 267、村鸟网络科技有限责任公司
- 268、云南国际信托有限公司
- 269、金智教育
- 270、珠海市筑巢科技有限公司
- 271、上海百胜软件股份有限公司
- ……
> 更多接入的公司,欢迎在 [登记地址](https://github.com/xuxueli/xxl-job/issues/1 ) 登记,登记仅仅为了产品推广。
@ -383,7 +393,7 @@ XXL-JOB是一个轻量级分布式任务调度平台其核心设计目标是
### 1.6 环境
- Maven3+
- Jdk1.7+
- Mysql5.6+
- Mysql5.7+
## 二、快速入门
@ -465,7 +475,6 @@ XXL-JOB是一个轻量级分布式任务调度平台其核心设计目标是
调度中心集群部署时,几点要求和建议:
- DB配置保持一致
- 登陆账号配置保持一致;
- 集群机器时钟保持一致(单机集群忽视);
- 建议推荐通过nginx为调度中心集群做负载均衡分配域名。调度中心访问、执行器回调配置、调用API服务等操作均通过该域名进行。
@ -922,8 +931,8 @@ XXL-JOB的每个调度任务虽然在调度模块是并行调度执行的
任务调度错过触发时间时的处理策略:
- 可能原因:服务重启;调度线程被阻塞,线程被耗尽;上次调度持续阻塞,下次调度被错过;
- 处理策略:
- 过期超5s忽略,当前时间开始计算下次触发时间
- 过期超过5s过期5s内立即触发一次当前时间开始计算下次触发时间
- 过期超5s忽略,当前时间开始计算下次触发时间
- 过期5s内立即触发一次当前时间开始计算下次触发时间
#### 5.4.7 日志回调服务
@ -990,6 +999,11 @@ xxl-job-admin#com.xxl.job.admin.controller.JobApiController.callback
如若需要支撑更多的任务量,可以通过 "调大调度线程数" 、"降低调度中心与执行器ping延迟" 和 "提升机器配置" 几种方式优化。
#### 5.4.12 均衡调度
调度中心在集群部署时会自动进行任务平均分配,触发组件每次获取与线程池数量(调度中心支持自定义调度线程池大小)相关数量的任务,避免大量任务集中在单个调度中心集群节点;
### 5.5 任务 "运行模式" 剖析
#### 5.5.1 "Bean模式" 任务
开发步骤:可参考 "章节三"
@ -1113,7 +1127,7 @@ API服务请求参考代码com.xxl.job.adminbiz.AdminBizTest.java
7、任务触发
API服务位置com.xxl.job.admin.controller.JobInfoController.java
API服务请求参考代码可参考任务界面操作的ajax请求。任何ajax接口均可配置成为API服务只需在待启用的API服务上添加 “@PermessionLimit(limit = false)” 注解取消登陆态拦截即可;
API服务请求参考代码可参考任务界面操作的ajax请求。任何ajax接口均可配置成为API服务只需在待启用的API服务上添加 “@PermissionLimit(limit = false)” 注解取消登陆态拦截即可;
### 5.12 执行器API服务
执行器提供了API服务供调度中心选择使用目前提供的API服务有
@ -1563,15 +1577,21 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段
- 6、调度组件优化针对永远不会触发的Cron禁止配置和启动任务Cron最后一次触发后再也不会触发时比如一次性任务主动停止相关任务
- 7、任务列表交互优化支持查看任务所属执行器的注册节点
- 8、DB重连优化修复DB宕机重连后任务调度停止的问题重连后自动加入调度集群触发任务调度
- 9、任务触发组件加载顺序调整避免小概率情况下组件随机加载顺序导致的I18N的NPE问题
- 9、任务触发组件加载顺序调整避免小概率情况下组件随机加载顺序导致的I18N的NPE问题;
- 10、项目依赖升级至较新稳定版本如spring、spring-boot、mybatis、slf4j、groovy等等
- 11、JobThread自销毁优化避免并发触发导致triggerQueue中任务丢失问题
- 12、[ING交互兼容问题待处理]Cron在线生成工具任务新增、编辑框通过组件在线生成Cron表达式
- 12、Cron在线生成工具任务新增、编辑框通过组件在线生成Cron表达式
- 13、Cron下次执行时间查询支持通过界面在线查看后续连续5次执行时间
- 14、任务重试时参数丢失的问题修复
- 15、[ING]xxl-rpc服务端线程优化降低线程内存开销
- 16、[ING]调度日志优化:支持设置日志保留天数,过期日志天维度记录报表,并清理;调度报表汇总实时数据和报表;
- 17、[ING]父子任务参数传递;流程任务等,透传动态参数;
- 15、调度中心密码限制18位修复修改密码超过18位无法登陆的问题
- 16、任务告警组件分页参数无效问题修复
- 17、DB脚本默认编码改为utf8mb4修复字符乱码问题(建议Mysql版本5.7+)
- 18、调度中心任务平均分配触发组件每次获取与线程池数量相关数量的任务避免大量任务集中在单个调度中心集群节点
- 19、调度中心移除SQL中的 "now()" 函数集群部署时不再依赖DB时钟仅需要保证调度中心应用节点时钟一致即可
- 20、xxl-rpc服务端线程优化降低线程内存开销
- 21、调度中心回调API服务改为restful方式
- 22、[ING]调度日志优化:支持设置日志保留天数,过期日志天维度记录报表,并清理;调度报表汇总实时数据和报表;
- 23、[ING]调度中心日志删除改为分页获取ID根据ID删除的方式
@ -1589,7 +1609,7 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段
- 11、失败重试间隔
- 12、SimpleTrigger除Cron外支持设置固定时间间隔触发
- 13、调度日志列表加上执行时长列并支持排序
- 14、DAG流程任务替换子任务支持参数传递配置并列的"a-b、b-c"路径列表构成串行、并行、dag任务流程"dagre-d3"绘图;任务依赖,流程图,子任务+会签任务,各节点日志;
- 14、DAG流程任务替换子任务支持参数传递配置并列的"a-b、b-c"路径列表构成串行、并行、dag任务流程"dagre-d3"绘图;任务依赖,流程图,子任务+会签任务,各节点日志;支持根据成功、失败选择分支;
- 15、日期过滤支持多个时间段排除
- 16、告警邮件内容支持自定义模板配置
- 17、暂停状态支持Cron 为空;
@ -1604,6 +1624,14 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段
- 26、调度中心JDK版本调整为JDK8从而升级至最新版本SpringBoot
- 27、执行器服务端口与注册端口分离支持docker动态随机端口
- 28、执行器端口复用复用容器端口提供通讯服务
- 29、自定义失败重试时间间隔
- 30、分片任务全部成功后触发子任务
- 31、任务复制功能点击复制是弹出新建任务弹框并初始化被复制任务信息
- 32、AccessToken按照执行器维度设置控制调度、回调
- 33、任务执行一次的时候指定IP
- 34、通讯调整双向HTTP回调和其他API自定义AccessTokenRestful执行器复用容器端口
- 35、父子任务参数传递流程任务等透传动态参数
- 36、任务操作API服务调整为和回调服务一致降低接入成本
## 七、其他

View File

@ -2,7 +2,7 @@
# XXL-JOB v2.1.1-SNAPSHOT
# Copyright (c) 2015-present, xuxueli.
CREATE database if NOT EXISTS `xxl_job` default character set utf8 collate utf8_general_ci;
CREATE database if NOT EXISTS `xxl_job` default character set utf8mb4 collate utf8mb4_unicode_ci;
use `xxl_job`;
@ -30,7 +30,7 @@ CREATE TABLE `xxl_job_info` (
`trigger_last_time` bigint(13) NOT NULL DEFAULT '0' COMMENT '上次调度时间',
`trigger_next_time` bigint(13) NOT NULL DEFAULT '0' COMMENT '下次调度时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE `xxl_job_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
@ -51,7 +51,7 @@ CREATE TABLE `xxl_job_log` (
PRIMARY KEY (`id`),
KEY `I_trigger_time` (`trigger_time`),
KEY `I_handle_code` (`handle_code`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE `xxl_job_logglue` (
`id` int(11) NOT NULL AUTO_INCREMENT,
@ -59,20 +59,20 @@ CREATE TABLE `xxl_job_logglue` (
`glue_type` varchar(50) DEFAULT NULL COMMENT 'GLUE类型',
`glue_source` mediumtext COMMENT 'GLUE源代码',
`glue_remark` varchar(128) NOT NULL COMMENT 'GLUE备注',
`add_time` timestamp NULL DEFAULT NULL,
`update_time` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
`add_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE `xxl_job_registry` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`registry_group` varchar(255) NOT NULL,
`registry_group` varchar(50) NOT NULL,
`registry_key` varchar(255) NOT NULL,
`registry_value` varchar(255) NOT NULL,
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
`update_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `i_g_k_v` (`registry_group`,`registry_key`,`registry_value`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE `xxl_job_group` (
`id` int(11) NOT NULL AUTO_INCREMENT,
@ -82,7 +82,7 @@ CREATE TABLE `xxl_job_group` (
`address_type` tinyint(4) NOT NULL DEFAULT '0' COMMENT '执行器地址类型0=自动注册、1=手动录入',
`address_list` varchar(512) DEFAULT NULL COMMENT '执行器地址列表,多地址逗号分隔',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE `xxl_job_user` (
`id` int(11) NOT NULL AUTO_INCREMENT,
@ -92,12 +92,12 @@ CREATE TABLE `xxl_job_user` (
`permission` varchar(255) DEFAULT NULL COMMENT '权限执行器ID列表多个逗号分割',
PRIMARY KEY (`id`),
UNIQUE KEY `i_username` (`username`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE `xxl_job_lock` (
`lock_name` varchar(50) NOT NULL COMMENT '锁名称',
PRIMARY KEY (`lock_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
INSERT INTO `xxl_job_group`(`id`, `app_name`, `title`, `order`, `address_type`, `address_list`) VALUES (1, 'xxl-job-executor-sample', '示例执行器', 1, 0, NULL);

View File

@ -24,7 +24,7 @@
<maven.compiler.target>1.7</maven.compiler.target>
<maven.test.skip>true</maven.test.skip>
<xxl-rpc.version>1.4.1</xxl-rpc.version>
<xxl-rpc.version>1.4.2</xxl-rpc.version>
<spring.version>4.3.25.RELEASE</spring.version>
<spring-boot.version>1.5.22.RELEASE</spring-boot.version>

View File

@ -1,34 +1,97 @@
package com.xxl.job.admin.controller;
import com.xxl.job.admin.controller.annotation.PermissionLimit;
import com.xxl.job.admin.core.conf.XxlJobScheduler;
import com.xxl.job.admin.core.conf.XxlJobAdminConfig;
import com.xxl.job.core.biz.AdminBiz;
import org.springframework.beans.factory.InitializingBean;
import com.xxl.job.core.biz.model.HandleCallbackParam;
import com.xxl.job.core.biz.model.RegistryParam;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.util.XxlJobRemotingUtil;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.servlet.ServletException;
import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.List;
/**
* Created by xuxueli on 17/5/10.
*/
@Controller
public class JobApiController implements InitializingBean {
@RequestMapping("/api")
public class JobApiController {
@Resource
private AdminBiz adminBiz;
@Override
public void afterPropertiesSet() {
// ---------------------- admin biz ----------------------
}
@RequestMapping(AdminBiz.MAPPING)
/**
* callback
*
* @param callbackParamList
* @return
*/
@RequestMapping("/callback")
@ResponseBody
@PermissionLimit(limit=false)
public void api(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
XxlJobScheduler.invokeAdminService(request, response);
public ReturnT<String> callback(HttpServletRequest request, @RequestBody List<HandleCallbackParam> callbackParamList) {
if (XxlJobAdminConfig.getAdminConfig().getAccessToken()!=null
&& XxlJobAdminConfig.getAdminConfig().getAccessToken().trim().length()>0
&& !XxlJobAdminConfig.getAdminConfig().getAccessToken().equals(request.getHeader(XxlJobRemotingUtil.XXL_RPC_ACCESS_TOKEN))) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
}
return adminBiz.callback(callbackParamList);
}
/**
* registry
*
* @param registryParam
* @return
*/
@RequestMapping("/registry")
@ResponseBody
@PermissionLimit(limit=false)
public ReturnT<String> registry(HttpServletRequest request, @RequestBody RegistryParam registryParam) {
if (XxlJobAdminConfig.getAdminConfig().getAccessToken()!=null
&& XxlJobAdminConfig.getAdminConfig().getAccessToken().trim().length()>0
&& !XxlJobAdminConfig.getAdminConfig().getAccessToken().equals(request.getHeader(XxlJobRemotingUtil.XXL_RPC_ACCESS_TOKEN))) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
}
return adminBiz.registry(registryParam);
}
/**
* registry remove
*
* @param registryParam
* @return
*/
@RequestMapping("/registryRemove")
@ResponseBody
@PermissionLimit(limit=false)
public ReturnT<String> registryRemove(HttpServletRequest request, @RequestBody RegistryParam registryParam) {
if (XxlJobAdminConfig.getAdminConfig().getAccessToken()!=null
&& XxlJobAdminConfig.getAdminConfig().getAccessToken().trim().length()>0
&& !XxlJobAdminConfig.getAdminConfig().getAccessToken().equals(request.getHeader(XxlJobRemotingUtil.XXL_RPC_ACCESS_TOKEN))) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
}
return adminBiz.registryRemove(registryParam);
}
// ---------------------- job biz ----------------------
}

View File

@ -72,6 +72,8 @@ public class JobCodeController {
exists_jobInfo.setGlueSource(glueSource);
exists_jobInfo.setGlueRemark(glueRemark);
exists_jobInfo.setGlueUpdatetime(new Date());
exists_jobInfo.setUpdateTime(new Date());
xxlJobInfoDao.update(exists_jobInfo);
// log old code
@ -80,6 +82,9 @@ public class JobCodeController {
xxlJobLogGlue.setGlueType(exists_jobInfo.getGlueType());
xxlJobLogGlue.setGlueSource(glueSource);
xxlJobLogGlue.setGlueRemark(glueRemark);
xxlJobLogGlue.setAddTime(new Date());
xxlJobLogGlue.setUpdateTime(new Date());
xxlJobLogGlueDao.save(xxlJobLogGlue);
// remove code backup more than 30

View File

@ -14,10 +14,7 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.*;
/**
* job group controller
@ -119,7 +116,7 @@ public class JobGroupController {
private List<String> findRegistryByAppName(String appNameParam){
HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
List<XxlJobRegistry> list = xxlJobRegistryDao.findAll(RegistryConfig.DEAD_TIMEOUT);
List<XxlJobRegistry> list = xxlJobRegistryDao.findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
if (list != null) {
for (XxlJobRegistry item: list) {
if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {

View File

@ -1,6 +1,6 @@
package com.xxl.job.admin.controller;
import com.xxl.job.admin.core.conf.XxlJobScheduler;
import com.xxl.job.admin.core.scheduler.XxlJobScheduler;
import com.xxl.job.admin.core.exception.XxlJobException;
import com.xxl.job.admin.core.model.XxlJobGroup;
import com.xxl.job.admin.core.model.XxlJobInfo;

View File

@ -1,10 +1,11 @@
package com.xxl.job.admin.core.conf;
import com.xxl.job.admin.core.scheduler.XxlJobScheduler;
import com.xxl.job.admin.dao.XxlJobGroupDao;
import com.xxl.job.admin.dao.XxlJobInfoDao;
import com.xxl.job.admin.dao.XxlJobLogDao;
import com.xxl.job.admin.dao.XxlJobRegistryDao;
import com.xxl.job.core.biz.AdminBiz;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.mail.javamail.JavaMailSender;
@ -18,18 +19,36 @@ import javax.sql.DataSource;
*
* @author xuxueli 2017-04-28
*/
@Component
public class XxlJobAdminConfig implements InitializingBean{
public class XxlJobAdminConfig implements InitializingBean, DisposableBean {
private static XxlJobAdminConfig adminConfig = null;
public static XxlJobAdminConfig getAdminConfig() {
return adminConfig;
}
// ---------------------- XxlJobScheduler ----------------------
private XxlJobScheduler xxlJobScheduler;
@Override
public void afterPropertiesSet() {
adminConfig = this;
xxlJobScheduler = new XxlJobScheduler();
xxlJobScheduler.init();
}
@Override
public void destroy() throws Exception {
xxlJobScheduler.destroy();
}
// ---------------------- XxlJobScheduler ----------------------
// conf
@Value("${xxl.job.i18n}")
private String i18n;
@ -40,6 +59,12 @@ public class XxlJobAdminConfig implements InitializingBean{
@Value("${spring.mail.username}")
private String emailUserName;
@Value("${xxl.job.triggerpool.fast.max}")
private int triggerPoolFastMax;
@Value("${xxl.job.triggerpool.slow.max}")
private int triggerPoolSlowMax;
// dao, service
@Resource
@ -51,8 +76,6 @@ public class XxlJobAdminConfig implements InitializingBean{
@Resource
private XxlJobGroupDao xxlJobGroupDao;
@Resource
private AdminBiz adminBiz;
@Resource
private JavaMailSender mailSender;
@Resource
private DataSource dataSource;
@ -70,6 +93,20 @@ public class XxlJobAdminConfig implements InitializingBean{
return emailUserName;
}
public int getTriggerPoolFastMax() {
if (triggerPoolFastMax < 200) {
return 200;
}
return triggerPoolFastMax;
}
public int getTriggerPoolSlowMax() {
if (triggerPoolSlowMax < 100) {
return 100;
}
return triggerPoolSlowMax;
}
public XxlJobLogDao getXxlJobLogDao() {
return xxlJobLogDao;
}
@ -86,10 +123,6 @@ public class XxlJobAdminConfig implements InitializingBean{
return xxlJobGroupDao;
}
public AdminBiz getAdminBiz() {
return adminBiz;
}
public JavaMailSender getMailSender() {
return mailSender;
}

View File

@ -1,150 +0,0 @@
package com.xxl.job.admin.core.conf;
import com.xxl.job.admin.core.thread.JobFailMonitorHelper;
import com.xxl.job.admin.core.thread.JobRegistryMonitorHelper;
import com.xxl.job.admin.core.thread.JobScheduleHelper;
import com.xxl.job.admin.core.thread.JobTriggerPoolHelper;
import com.xxl.job.admin.core.util.I18nUtil;
import com.xxl.job.core.biz.AdminBiz;
import com.xxl.job.core.biz.ExecutorBiz;
import com.xxl.job.core.enums.ExecutorBlockStrategyEnum;
import com.xxl.rpc.remoting.invoker.XxlRpcInvokerFactory;
import com.xxl.rpc.remoting.invoker.call.CallType;
import com.xxl.rpc.remoting.invoker.reference.XxlRpcReferenceBean;
import com.xxl.rpc.remoting.invoker.route.LoadBalance;
import com.xxl.rpc.remoting.net.NetEnum;
import com.xxl.rpc.remoting.net.impl.servlet.server.ServletServerHandler;
import com.xxl.rpc.remoting.provider.XxlRpcProviderFactory;
import com.xxl.rpc.serialize.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* @author xuxueli 2018-10-28 00:18:17
*/
@Component
@DependsOn("xxlJobAdminConfig")
public class XxlJobScheduler implements InitializingBean, DisposableBean {
private static final Logger logger = LoggerFactory.getLogger(XxlJobScheduler.class);
@Override
public void afterPropertiesSet() {
// init i18n
initI18n();
// admin registry monitor run
JobRegistryMonitorHelper.getInstance().start();
// admin monitor run
JobFailMonitorHelper.getInstance().start();
// admin-server
initRpcProvider();
// start-schedule
JobScheduleHelper.getInstance().start();
logger.info(">>>>>>>>> init xxl-job admin success.");
}
@Override
public void destroy() throws Exception {
// stop-schedule
JobScheduleHelper.getInstance().toStop();
// admin trigger pool stop
JobTriggerPoolHelper.toStop();
// admin registry stop
JobRegistryMonitorHelper.getInstance().toStop();
// admin monitor stop
JobFailMonitorHelper.getInstance().toStop();
// admin-server
stopRpcProvider();
}
// ---------------------- I18n ----------------------
private void initI18n(){
for (ExecutorBlockStrategyEnum item:ExecutorBlockStrategyEnum.values()) {
item.setTitle(I18nUtil.getString("jobconf_block_".concat(item.name())));
}
}
// ---------------------- admin rpc provider (no server version) ----------------------
private static ServletServerHandler servletServerHandler;
private void initRpcProvider(){
// init
XxlRpcProviderFactory xxlRpcProviderFactory = new XxlRpcProviderFactory();
xxlRpcProviderFactory.initConfig(
NetEnum.NETTY_HTTP,
Serializer.SerializeEnum.HESSIAN.getSerializer(),
null,
0,
XxlJobAdminConfig.getAdminConfig().getAccessToken(),
null,
null);
// add services
xxlRpcProviderFactory.addService(AdminBiz.class.getName(), null, XxlJobAdminConfig.getAdminConfig().getAdminBiz());
// servlet handler
servletServerHandler = new ServletServerHandler(xxlRpcProviderFactory);
}
private void stopRpcProvider() throws Exception {
XxlRpcInvokerFactory.getInstance().stop();
}
public static void invokeAdminService(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
servletServerHandler.handle(null, request, response);
}
// ---------------------- executor-client ----------------------
private static ConcurrentMap<String, ExecutorBiz> executorBizRepository = new ConcurrentHashMap<String, ExecutorBiz>();
public static ExecutorBiz getExecutorBiz(String address) {
// valid
if (address==null || address.trim().length()==0) {
return null;
}
// load-cache
address = address.trim();
ExecutorBiz executorBiz = executorBizRepository.get(address);
if (executorBiz != null) {
return executorBiz;
}
// set-cache
executorBiz = (ExecutorBiz) new XxlRpcReferenceBean(
NetEnum.NETTY_HTTP,
Serializer.SerializeEnum.HESSIAN.getSerializer(),
CallType.SYNC,
LoadBalance.ROUND,
ExecutorBiz.class,
null,
3000,
address,
XxlJobAdminConfig.getAdminConfig().getAccessToken(),
null,
null).getObject();
executorBizRepository.put(address, executorBiz);
return executorBiz;
}
}

View File

@ -1,5 +1,7 @@
package com.xxl.job.admin.core.model;
import java.util.Date;
/**
* xxl-job log for glue, used to track job code process
* @author xuxueli 2016-5-19 17:57:46
@ -11,8 +13,8 @@ public class XxlJobLogGlue {
private String glueType; // GLUE类型 #com.xxl.job.core.glue.GlueTypeEnum
private String glueSource;
private String glueRemark;
private String addTime;
private String updateTime;
private Date addTime;
private Date updateTime;
public int getId() {
return id;
@ -54,19 +56,19 @@ public class XxlJobLogGlue {
this.glueRemark = glueRemark;
}
public String getAddTime() {
public Date getAddTime() {
return addTime;
}
public void setAddTime(String addTime) {
public void setAddTime(Date addTime) {
this.addTime = addTime;
}
public String getUpdateTime() {
public Date getUpdateTime() {
return updateTime;
}
public void setUpdateTime(String updateTime) {
public void setUpdateTime(Date updateTime) {
this.updateTime = updateTime;
}

View File

@ -1,6 +1,6 @@
package com.xxl.job.admin.core.route.strategy;
import com.xxl.job.admin.core.conf.XxlJobScheduler;
import com.xxl.job.admin.core.scheduler.XxlJobScheduler;
import com.xxl.job.admin.core.route.ExecutorRouter;
import com.xxl.job.admin.core.util.I18nUtil;
import com.xxl.job.core.biz.ExecutorBiz;

View File

@ -1,6 +1,6 @@
package com.xxl.job.admin.core.route.strategy;
import com.xxl.job.admin.core.conf.XxlJobScheduler;
import com.xxl.job.admin.core.scheduler.XxlJobScheduler;
import com.xxl.job.admin.core.route.ExecutorRouter;
import com.xxl.job.admin.core.util.I18nUtil;
import com.xxl.job.core.biz.ExecutorBiz;

View File

@ -0,0 +1,109 @@
package com.xxl.job.admin.core.scheduler;
import com.xxl.job.admin.core.conf.XxlJobAdminConfig;
import com.xxl.job.admin.core.thread.JobFailMonitorHelper;
import com.xxl.job.admin.core.thread.JobRegistryMonitorHelper;
import com.xxl.job.admin.core.thread.JobScheduleHelper;
import com.xxl.job.admin.core.thread.JobTriggerPoolHelper;
import com.xxl.job.admin.core.util.I18nUtil;
import com.xxl.job.core.biz.ExecutorBiz;
import com.xxl.job.core.enums.ExecutorBlockStrategyEnum;
import com.xxl.rpc.remoting.invoker.call.CallType;
import com.xxl.rpc.remoting.invoker.reference.XxlRpcReferenceBean;
import com.xxl.rpc.remoting.invoker.route.LoadBalance;
import com.xxl.rpc.remoting.net.impl.netty_http.client.NettyHttpClient;
import com.xxl.rpc.serialize.impl.HessianSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* @author xuxueli 2018-10-28 00:18:17
*/
public class XxlJobScheduler {
private static final Logger logger = LoggerFactory.getLogger(XxlJobScheduler.class);
public void init() throws Exception {
// init i18n
initI18n();
// admin registry monitor run
JobRegistryMonitorHelper.getInstance().start();
// admin monitor run
JobFailMonitorHelper.getInstance().start();
// admin trigger pool start
JobTriggerPoolHelper.toStart();
// start-schedule
JobScheduleHelper.getInstance().start();
logger.info(">>>>>>>>> init xxl-job admin success.");
}
public void destroy() throws Exception {
// stop-schedule
JobScheduleHelper.getInstance().toStop();
// admin trigger pool stop
JobTriggerPoolHelper.toStop();
// admin monitor stop
JobFailMonitorHelper.getInstance().toStop();
// admin registry stop
JobRegistryMonitorHelper.getInstance().toStop();
}
// ---------------------- I18n ----------------------
private void initI18n(){
for (ExecutorBlockStrategyEnum item:ExecutorBlockStrategyEnum.values()) {
item.setTitle(I18nUtil.getString("jobconf_block_".concat(item.name())));
}
}
// ---------------------- executor-client ----------------------
private static ConcurrentMap<String, ExecutorBiz> executorBizRepository = new ConcurrentHashMap<String, ExecutorBiz>();
public static ExecutorBiz getExecutorBiz(String address) throws Exception {
// valid
if (address==null || address.trim().length()==0) {
return null;
}
// load-cache
address = address.trim();
ExecutorBiz executorBiz = executorBizRepository.get(address);
if (executorBiz != null) {
return executorBiz;
}
// set-cache
XxlRpcReferenceBean referenceBean = new XxlRpcReferenceBean();
referenceBean.setClient(NettyHttpClient.class);
referenceBean.setSerializer(HessianSerializer.class);
referenceBean.setCallType(CallType.SYNC);
referenceBean.setLoadBalance(LoadBalance.ROUND);
referenceBean.setIface(ExecutorBiz.class);
referenceBean.setVersion(null);
referenceBean.setTimeout(3000);
referenceBean.setAddress(address);
referenceBean.setAccessToken(XxlJobAdminConfig.getAdminConfig().getAccessToken());
referenceBean.setInvokeCallback(null);
referenceBean.setInvokerFactory(null);
executorBiz = (ExecutorBiz) referenceBean.getObject();
executorBizRepository.put(address, executorBiz);
return executorBiz;
}
}

View File

@ -7,10 +7,7 @@ import com.xxl.job.core.enums.RegistryConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
@ -38,14 +35,14 @@ public class JobRegistryMonitorHelper {
if (groupList!=null && !groupList.isEmpty()) {
// remove dead address (admin/executor)
List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT);
List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
if (ids!=null && ids.size()>0) {
XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
}
// fresh online address (admin/executor)
HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT);
List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
if (list != null) {
for (XxlJobRegistry item: list) {
if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {

View File

@ -50,6 +50,9 @@ public class JobScheduleHelper {
}
logger.info(">>>>>>>>> init xxl-job admin scheduler success.");
// pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
while (!scheduleThreadToStop) {
// Scan Job
@ -73,7 +76,7 @@ public class JobScheduleHelper {
// 1pre read
long nowTime = System.currentTimeMillis();
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS);
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
if (scheduleList!=null && scheduleList.size()>0) {
// 2push time-ring
for (XxlJobInfo jobInfo: scheduleList) {
@ -88,20 +91,15 @@ public class JobScheduleHelper {
} else if (nowTime > jobInfo.getTriggerNextTime()) {
// 2.2trigger-expire < 5sdirect-trigger && make next-trigger-time
CronExpression cronExpression = new CronExpression(jobInfo.getJobCron());
long nextTime = cronExpression.getNextValidTimeAfter(new Date()).getTime();
// 1trigger
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
// 2fresh next
jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
jobInfo.setTriggerNextTime(nextTime);
refreshNextValidTime(jobInfo, new Date());
// next-trigger-time in 5s, pre-read again
if (jobInfo.getTriggerNextTime() - nowTime < PRE_READ_MS) {
if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
// 1make ring second
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);

View File

@ -1,5 +1,6 @@
package com.xxl.job.admin.core.thread;
import com.xxl.job.admin.core.conf.XxlJobAdminConfig;
import com.xxl.job.admin.core.trigger.TriggerTypeEnum;
import com.xxl.job.admin.core.trigger.XxlJobTrigger;
import org.slf4j.Logger;
@ -20,31 +21,44 @@ public class JobTriggerPoolHelper {
// ---------------------- trigger pool ----------------------
// fast/slow thread pool
private ThreadPoolExecutor fastTriggerPool = new ThreadPoolExecutor(
50,
200,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(1000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());
}
});
private ThreadPoolExecutor fastTriggerPool = null;
private ThreadPoolExecutor slowTriggerPool = null;
private ThreadPoolExecutor slowTriggerPool = new ThreadPoolExecutor(
10,
100,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());
}
});
public void start(){
fastTriggerPool = new ThreadPoolExecutor(
10,
XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(1000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());
}
});
slowTriggerPool = new ThreadPoolExecutor(
10,
XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());
}
});
}
public void stop() {
//triggerPool.shutdown();
fastTriggerPool.shutdownNow();
slowTriggerPool.shutdownNow();
logger.info(">>>>>>>>> xxl-job trigger thread pool shutdown success.");
}
// job timeout count
@ -100,17 +114,19 @@ public class JobTriggerPoolHelper {
});
}
public void stop() {
//triggerPool.shutdown();
fastTriggerPool.shutdownNow();
slowTriggerPool.shutdownNow();
logger.info(">>>>>>>>> xxl-job trigger thread pool shutdown success.");
}
// ---------------------- helper ----------------------
private static JobTriggerPoolHelper helper = new JobTriggerPoolHelper();
public static void toStart() {
helper.start();
}
public static void toStop() {
helper.stop();
}
/**
* @param jobId
* @param triggerType
@ -126,8 +142,4 @@ public class JobTriggerPoolHelper {
helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam);
}
public static void toStop() {
helper.stop();
}
}

View File

@ -1,7 +1,7 @@
package com.xxl.job.admin.core.trigger;
import com.xxl.job.admin.core.conf.XxlJobAdminConfig;
import com.xxl.job.admin.core.conf.XxlJobScheduler;
import com.xxl.job.admin.core.scheduler.XxlJobScheduler;
import com.xxl.job.admin.core.model.XxlJobGroup;
import com.xxl.job.admin.core.model.XxlJobInfo;
import com.xxl.job.admin.core.model.XxlJobLog;
@ -116,7 +116,7 @@ public class XxlJobTrigger {
triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
triggerParam.setLogId(jobLog.getId());
triggerParam.setLogDateTim(jobLog.getTriggerTime().getTime());
triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());
triggerParam.setGlueType(jobInfo.getGlueType());
triggerParam.setGlueSource(jobInfo.getGlueSource());
triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());

View File

@ -41,7 +41,7 @@ public interface XxlJobInfoDao {
public int findAllCount();
public List<XxlJobInfo> scheduleJobQuery(@Param("maxNextTime") long maxNextTime);
public List<XxlJobInfo> scheduleJobQuery(@Param("maxNextTime") long maxNextTime, @Param("pagesize") int pagesize );
public int scheduleUpdate(XxlJobInfo xxlJobInfo);

View File

@ -4,6 +4,7 @@ import com.xxl.job.admin.core.model.XxlJobRegistry;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.Date;
import java.util.List;
/**
@ -12,19 +13,23 @@ import java.util.List;
@Mapper
public interface XxlJobRegistryDao {
public List<Integer> findDead(@Param("timeout") int timeout);
public List<Integer> findDead(@Param("timeout") int timeout,
@Param("nowTime") Date nowTime);
public int removeDead(@Param("ids") List<Integer> ids);
public List<XxlJobRegistry> findAll(@Param("timeout") int timeout);
public List<XxlJobRegistry> findAll(@Param("timeout") int timeout,
@Param("nowTime") Date nowTime);
public int registryUpdate(@Param("registryGroup") String registryGroup,
@Param("registryKey") String registryKey,
@Param("registryValue") String registryValue);
@Param("registryValue") String registryValue,
@Param("updateTime") Date updateTime);
public int registrySave(@Param("registryGroup") String registryGroup,
@Param("registryKey") String registryKey,
@Param("registryValue") String registryValue);
@Param("registryValue") String registryValue,
@Param("updateTime") Date updateTime);
public int registryDelete(@Param("registryGroup") String registryGroup,
@Param("registryKey") String registryKey,

View File

@ -17,6 +17,7 @@ import com.xxl.job.core.handler.IJobHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.text.MessageFormat;
@ -126,9 +127,17 @@ public class AdminBizImpl implements AdminBiz {
@Override
public ReturnT<String> registry(RegistryParam registryParam) {
int ret = xxlJobRegistryDao.registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue());
// valid
if (!StringUtils.hasText(registryParam.getRegistGroup())
|| !StringUtils.hasText(registryParam.getRegistryKey())
|| !StringUtils.hasText(registryParam.getRegistryValue())) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");
}
int ret = xxlJobRegistryDao.registryUpdate(registryParam.getRegistGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
if (ret < 1) {
xxlJobRegistryDao.registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue());
xxlJobRegistryDao.registrySave(registryParam.getRegistGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
// fresh
freshGroupRegistryInfo(registryParam);
@ -138,7 +147,15 @@ public class AdminBizImpl implements AdminBiz {
@Override
public ReturnT<String> registryRemove(RegistryParam registryParam) {
int ret = xxlJobRegistryDao.registryDelete(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue());
// valid
if (!StringUtils.hasText(registryParam.getRegistGroup())
|| !StringUtils.hasText(registryParam.getRegistryKey())
|| !StringUtils.hasText(registryParam.getRegistryValue())) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");
}
int ret = xxlJobRegistryDao.registryDelete(registryParam.getRegistGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue());
if (ret > 0) {
// fresh

View File

@ -117,6 +117,9 @@ public class XxlJobServiceImpl implements XxlJobService {
}
// add in db
jobInfo.setAddTime(new Date());
jobInfo.setUpdateTime(new Date());
jobInfo.setGlueUpdatetime(new Date());
xxlJobInfoDao.save(jobInfo);
if (jobInfo.getId() < 1) {
return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("jobinfo_field_add")+I18nUtil.getString("system_fail")) );
@ -220,6 +223,8 @@ public class XxlJobServiceImpl implements XxlJobService {
exists_jobInfo.setExecutorFailRetryCount(jobInfo.getExecutorFailRetryCount());
exists_jobInfo.setChildJobId(jobInfo.getChildJobId());
exists_jobInfo.setTriggerNextTime(nextTriggerTime);
exists_jobInfo.setUpdateTime(new Date());
xxlJobInfoDao.update(exists_jobInfo);
@ -260,6 +265,7 @@ public class XxlJobServiceImpl implements XxlJobService {
xxlJobInfo.setTriggerLastTime(0);
xxlJobInfo.setTriggerNextTime(nextTriggerTime);
xxlJobInfo.setUpdateTime(new Date());
xxlJobInfoDao.update(xxlJobInfo);
return ReturnT.SUCCESS;
}
@ -272,6 +278,7 @@ public class XxlJobServiceImpl implements XxlJobService {
xxlJobInfo.setTriggerLastTime(0);
xxlJobInfo.setTriggerNextTime(0);
xxlJobInfo.setUpdateTime(new Date());
xxlJobInfoDao.update(xxlJobInfo);
return ReturnT.SUCCESS;
}

View File

@ -44,3 +44,7 @@ xxl.job.accessToken=
### xxl-job, i18n (default empty as chinese, "en" as english)
xxl.job.i18n=
## xxl-job, triggerpool max size
xxl.job.triggerpool.fast.max=200
xxl.job.triggerpool.slow.max=100

View File

@ -133,8 +133,8 @@
#{jobGroup},
#{jobCron},
#{jobDesc},
NOW(),
NOW(),
#{addTime},
#{updateTime},
#{author},
#{alarmEmail},
#{executorRouteStrategy},
@ -146,7 +146,7 @@
#{glueType},
#{glueSource},
#{glueRemark},
NOW(),
#{glueUpdatetime},
#{childJobId},
#{triggerStatus},
#{triggerLastTime},
@ -170,7 +170,7 @@
job_group = #{jobGroup},
job_cron = #{jobCron},
job_desc = #{jobDesc},
update_time = NOW(),
update_time = #{updateTime},
author = #{author},
alarm_email = #{alarmEmail},
executor_route_strategy = #{executorRouteStrategy},
@ -212,7 +212,9 @@
SELECT <include refid="Base_Column_List" />
FROM xxl_job_info AS t
WHERE t.trigger_status = 1
and t.trigger_next_time<![CDATA[ <= ]]> #{maxNextTime}
and t.trigger_next_time <![CDATA[ <= ]]> #{maxNextTime}
ORDER BY id ASC
LIMIT #{pagesize}
</select>
<update id="scheduleUpdate" parameterType="com.xxl.job.admin.core.model.XxlJobInfo" >

View File

@ -36,8 +36,8 @@
#{glueType},
#{glueSource},
#{glueRemark},
now(),
now()
#{addTime},
#{updateTime}
);
<!--<selectKey resultType="java.lang.Integer" order="AFTER" keyProperty="id">
SELECT LAST_INSERT_ID()

View File

@ -227,6 +227,7 @@
)
AND `alarm_status` = 0
ORDER BY id ASC
LIMIT #{pagesize}
</select>
<update id="updateAlarmStatus" >

View File

@ -19,10 +19,10 @@
t.update_time
</sql>
<select id="findDead" parameterType="java.lang.Integer" resultType="java.lang.Integer" >
<select id="findDead" parameterType="java.util.HashMap" resultType="java.lang.Integer" >
SELECT t.id
FROM xxl_job_registry AS t
WHERE t.update_time <![CDATA[ < ]]> DATE_ADD(NOW(),INTERVAL -#{timeout} SECOND)
WHERE t.update_time <![CDATA[ < ]]> DATE_ADD(#{nowTime},INTERVAL -#{timeout} SECOND)
</select>
<delete id="removeDead" parameterType="java.lang.Integer" >
@ -33,15 +33,15 @@
</foreach>
</delete>
<select id="findAll" parameterType="java.lang.Integer" resultMap="XxlJobRegistry">
<select id="findAll" parameterType="java.util.HashMap" resultMap="XxlJobRegistry">
SELECT <include refid="Base_Column_List" />
FROM xxl_job_registry AS t
WHERE t.update_time <![CDATA[ > ]]> DATE_ADD(NOW(),INTERVAL -#{timeout} SECOND)
WHERE t.update_time <![CDATA[ > ]]> DATE_ADD(#{nowTime},INTERVAL -#{timeout} SECOND)
</select>
<update id="registryUpdate" >
UPDATE xxl_job_registry
SET `update_time` = NOW()
SET `update_time` = #{updateTime}
WHERE `registry_group` = #{registryGroup}
AND `registry_key` = #{registryKey}
AND `registry_value` = #{registryValue}
@ -49,7 +49,7 @@
<insert id="registrySave" >
INSERT INTO xxl_job_registry( `registry_group` , `registry_key` , `registry_value`, `update_time`)
VALUES( #{registryGroup} , #{registryKey} , #{registryValue}, NOW())
VALUES( #{registryGroup} , #{registryKey} , #{registryValue}, #{updateTime})
</insert>
<delete id="registryDelete" >

View File

@ -109,7 +109,7 @@
<form class="form-horizontal form" role="form" >
<div class="form-group">
<label for="lastname" class="col-sm-2 control-label">${I18n.change_pwd_field_newpwd}<font color="red">*</font></label>
<div class="col-sm-10"><input type="text" class="form-control" name="password" placeholder="${I18n.system_please_input} ${I18n.change_pwd_field_newpwd}" maxlength="100" ></div>
<div class="col-sm-10"><input type="text" class="form-control" name="password" placeholder="${I18n.system_please_input} ${I18n.change_pwd_field_newpwd}" maxlength="18" ></div>
</div>
<hr>
<div class="form-group">

View File

@ -15,11 +15,11 @@
<div class="login-box-body">
<p class="login-box-msg">${I18n.admin_name}</p>
<div class="form-group has-feedback">
<input type="text" name="userName" class="form-control" placeholder="${I18n.login_username_placeholder}" maxlength="20" >
<input type="text" name="userName" class="form-control" placeholder="${I18n.login_username_placeholder}" maxlength="18" >
<span class="glyphicon glyphicon-envelope form-control-feedback"></span>
</div>
<div class="form-group has-feedback">
<input type="password" name="password" class="form-control" placeholder="${I18n.login_password_placeholder}" maxlength="20" >
<input type="password" name="password" class="form-control" placeholder="${I18n.login_password_placeholder}" maxlength="18" >
<span class="glyphicon glyphicon-lock form-control-feedback"></span>
</div>
<div class="row">

View File

@ -4,8 +4,6 @@ import com.xxl.job.admin.core.model.XxlJobInfo;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
@ -47,6 +45,10 @@ public class XxlJobInfoDaoTest {
info.setGlueRemark("setGlueRemark");
info.setChildJobId("1");
info.setAddTime(new Date());
info.setUpdateTime(new Date());
info.setGlueUpdatetime(new Date());
int count = xxlJobInfoDao.save(info);
XxlJobInfo info2 = xxlJobInfoDao.loadById(info.getId());
@ -64,6 +66,7 @@ public class XxlJobInfoDaoTest {
info2.setGlueUpdatetime(new Date());
info2.setChildJobId("1");
info2.setUpdateTime(new Date());
int item2 = xxlJobInfoDao.update(info2);
xxlJobInfoDao.delete(info2.getId());

View File

@ -1,15 +1,13 @@
package com.xxl.job.admin.dao;
import com.xxl.job.admin.core.model.XxlJobLogGlue;
import com.xxl.job.admin.dao.XxlJobLogGlueDao;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
@RunWith(SpringRunner.class)
@ -26,6 +24,9 @@ public class XxlJobLogGlueDaoTest {
logGlue.setGlueType("1");
logGlue.setGlueSource("1");
logGlue.setGlueRemark("1");
logGlue.setAddTime(new Date());
logGlue.setUpdateTime(new Date());
int ret = xxlJobLogGlueDao.save(logGlue);
List<XxlJobLogGlue> list = xxlJobLogGlueDao.findByJobId(1);

View File

@ -8,6 +8,7 @@ import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
@RunWith(SpringRunner.class)
@ -19,12 +20,12 @@ public class XxlJobRegistryDaoTest {
@Test
public void test(){
int ret = xxlJobRegistryDao.registryUpdate("g1", "k1", "v1");
int ret = xxlJobRegistryDao.registryUpdate("g1", "k1", "v1", new Date());
if (ret < 1) {
ret = xxlJobRegistryDao.registrySave("g1", "k1", "v1");
ret = xxlJobRegistryDao.registrySave("g1", "k1", "v1", new Date());
}
List<XxlJobRegistry> list = xxlJobRegistryDao.findAll(1);
List<XxlJobRegistry> list = xxlJobRegistryDao.findAll(1, new Date());
int ret2 = xxlJobRegistryDao.removeDead(Arrays.asList(1));
}

View File

@ -1,18 +1,17 @@
package com.xxl.job.adminbiz;
import com.xxl.job.core.biz.AdminBiz;
import com.xxl.job.core.biz.client.AdminBizClient;
import com.xxl.job.core.biz.model.HandleCallbackParam;
import com.xxl.job.core.biz.model.RegistryParam;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.enums.RegistryConfig;
import com.xxl.rpc.remoting.invoker.XxlRpcInvokerFactory;
import com.xxl.rpc.remoting.invoker.call.CallType;
import com.xxl.rpc.remoting.invoker.reference.XxlRpcReferenceBean;
import com.xxl.rpc.remoting.invoker.route.LoadBalance;
import com.xxl.rpc.remoting.net.NetEnum;
import com.xxl.rpc.serialize.Serializer;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
/**
* admin api test
*
@ -21,37 +20,38 @@ import org.junit.Test;
public class AdminBizTest {
// admin-client
private static String addressUrl = "http://127.0.0.1:8080/xxl-job-admin".concat(AdminBiz.MAPPING);
private static String addressUrl = "http://127.0.0.1:8080/xxl-job-admin/";
private static String accessToken = null;
@Test
public void callback() throws Exception {
AdminBiz adminBiz = new AdminBizClient(addressUrl, accessToken);
HandleCallbackParam param = new HandleCallbackParam();
param.setLogId(1);
param.setExecuteResult(ReturnT.SUCCESS);
List<HandleCallbackParam> callbackParamList = Arrays.asList(param);
ReturnT<String> returnT = adminBiz.callback(callbackParamList);
Assert.assertTrue(returnT.getCode() == ReturnT.SUCCESS_CODE);
}
/**
* registry executor
*
* @throws Exception
*/
@Test
public void registryTest() throws Exception {
addressUrl = addressUrl.replace("http://", "");
AdminBiz adminBiz = (AdminBiz) new XxlRpcReferenceBean(
NetEnum.NETTY_HTTP,
Serializer.SerializeEnum.HESSIAN.getSerializer(),
CallType.SYNC,
LoadBalance.ROUND,
AdminBiz.class,
null,
3000,
addressUrl,
accessToken,
null,
null).getObject();
public void registry() throws Exception {
AdminBiz adminBiz = new AdminBizClient(addressUrl, accessToken);
// test executor registry
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), "xxl-job-executor-example", "127.0.0.1:9999");
ReturnT<String> returnT = adminBiz.registry(registryParam);
Assert.assertTrue(returnT.getCode() == ReturnT.SUCCESS_CODE);
// stop invoker
XxlRpcInvokerFactory.getInstance().stop();
Assert.assertTrue(returnT.getCode() == ReturnT.SUCCESS_CODE);
}
/**
@ -61,27 +61,13 @@ public class AdminBizTest {
*/
@Test
public void registryRemove() throws Exception {
addressUrl = addressUrl.replace("http://", "");
AdminBiz adminBiz = (AdminBiz) new XxlRpcReferenceBean(
NetEnum.NETTY_HTTP,
Serializer.SerializeEnum.HESSIAN.getSerializer(),
CallType.SYNC,
LoadBalance.ROUND,
AdminBiz.class,
null,
3000,
addressUrl,
accessToken,
null,
null).getObject();
AdminBiz adminBiz = new AdminBizClient(addressUrl, accessToken);
// test executor registry remove
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), "xxl-job-executor-example", "127.0.0.1:9999");
ReturnT<String> returnT = adminBiz.registryRemove(registryParam);
Assert.assertTrue(returnT.getCode() == ReturnT.SUCCESS_CODE);
// stop invoker
XxlRpcInvokerFactory.getInstance().stop();
}
}

View File

@ -9,8 +9,8 @@ import com.xxl.rpc.remoting.invoker.XxlRpcInvokerFactory;
import com.xxl.rpc.remoting.invoker.call.CallType;
import com.xxl.rpc.remoting.invoker.reference.XxlRpcReferenceBean;
import com.xxl.rpc.remoting.invoker.route.LoadBalance;
import com.xxl.rpc.remoting.net.NetEnum;
import com.xxl.rpc.serialize.Serializer;
import com.xxl.rpc.remoting.net.impl.netty_http.client.NettyHttpClient;
import com.xxl.rpc.serialize.impl.HessianSerializer;
/**
* executor-api client, test
@ -45,22 +45,25 @@ public class ExecutorBizTest {
triggerParam.setGlueSource(null);
triggerParam.setGlueUpdatetime(System.currentTimeMillis());
triggerParam.setLogId(1);
triggerParam.setLogDateTim(System.currentTimeMillis());
triggerParam.setLogDateTime(System.currentTimeMillis());
// do remote trigger
String accessToken = null;
ExecutorBiz executorBiz = (ExecutorBiz) new XxlRpcReferenceBean(
NetEnum.NETTY_HTTP,
Serializer.SerializeEnum.HESSIAN.getSerializer(),
CallType.SYNC,
LoadBalance.ROUND,
ExecutorBiz.class,
null,
3000,
"127.0.0.1:9999",
null,
null,
null).getObject();
XxlRpcReferenceBean referenceBean = new XxlRpcReferenceBean();
referenceBean.setClient(NettyHttpClient.class);
referenceBean.setSerializer(HessianSerializer.class);
referenceBean.setCallType(CallType.SYNC);
referenceBean.setLoadBalance(LoadBalance.ROUND);
referenceBean.setIface(ExecutorBiz.class);
referenceBean.setVersion(null);
referenceBean.setTimeout(3000);
referenceBean.setAddress("127.0.0.1:9999");
referenceBean.setAccessToken(accessToken);
referenceBean.setInvokeCallback(null);
referenceBean.setInvokerFactory(null);
ExecutorBiz executorBiz = (ExecutorBiz) referenceBean.getObject();
ReturnT<String> runResult = executorBiz.run(triggerParam);

View File

@ -11,8 +11,6 @@ import java.util.List;
*/
public interface AdminBiz {
public static final String MAPPING = "/api";
// ---------------------- callback ----------------------

View File

@ -0,0 +1,48 @@
package com.xxl.job.core.biz.client;
import com.xxl.job.core.biz.AdminBiz;
import com.xxl.job.core.biz.model.HandleCallbackParam;
import com.xxl.job.core.biz.model.RegistryParam;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.util.XxlJobRemotingUtil;
import java.util.List;
/**
* admin api test
*
* @author xuxueli 2017-07-28 22:14:52
*/
public class AdminBizClient implements AdminBiz {
public AdminBizClient() {
}
public AdminBizClient(String addressUrl, String accessToken) {
this.addressUrl = addressUrl;
this.accessToken = accessToken;
// valid
if (!this.addressUrl.endsWith("/")) {
this.addressUrl = this.addressUrl + "/";
}
}
private String addressUrl ;
private String accessToken;
@Override
public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, callbackParamList, 3);
}
@Override
public ReturnT<String> registry(RegistryParam registryParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, registryParam, 3);
}
@Override
public ReturnT<String> registryRemove(RegistryParam registryParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "api/registryRemove", accessToken, registryParam, 3);
}
}

View File

@ -16,7 +16,7 @@ public class TriggerParam implements Serializable{
private int executorTimeout;
private long logId;
private long logDateTim;
private long logDateTime;
private String glueType;
private String glueSource;
@ -74,12 +74,12 @@ public class TriggerParam implements Serializable{
this.logId = logId;
}
public long getLogDateTim() {
return logDateTim;
public long getLogDateTime() {
return logDateTime;
}
public void setLogDateTim(long logDateTim) {
this.logDateTim = logDateTim;
public void setLogDateTime(long logDateTime) {
this.logDateTime = logDateTime;
}
public String getGlueType() {
@ -132,7 +132,7 @@ public class TriggerParam implements Serializable{
", executorBlockStrategy='" + executorBlockStrategy + '\'' +
", executorTimeout=" + executorTimeout +
", logId=" + logId +
", logDateTim=" + logDateTim +
", logDateTime=" + logDateTime +
", glueType='" + glueType + '\'' +
", glueSource='" + glueSource + '\'' +
", glueUpdatetime=" + glueUpdatetime +

View File

@ -2,6 +2,7 @@ package com.xxl.job.core.executor;
import com.xxl.job.core.biz.AdminBiz;
import com.xxl.job.core.biz.ExecutorBiz;
import com.xxl.job.core.biz.client.AdminBizClient;
import com.xxl.job.core.biz.impl.ExecutorBizImpl;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.log.XxlJobFileAppender;
@ -10,13 +11,10 @@ import com.xxl.job.core.thread.JobLogFileCleanThread;
import com.xxl.job.core.thread.JobThread;
import com.xxl.job.core.thread.TriggerCallbackThread;
import com.xxl.rpc.registry.ServiceRegistry;
import com.xxl.rpc.remoting.invoker.XxlRpcInvokerFactory;
import com.xxl.rpc.remoting.invoker.call.CallType;
import com.xxl.rpc.remoting.invoker.reference.XxlRpcReferenceBean;
import com.xxl.rpc.remoting.invoker.route.LoadBalance;
import com.xxl.rpc.remoting.net.NetEnum;
import com.xxl.rpc.remoting.net.impl.netty_http.server.NettyHttpServer;
import com.xxl.rpc.remoting.provider.XxlRpcProviderFactory;
import com.xxl.rpc.serialize.Serializer;
import com.xxl.rpc.serialize.impl.HessianSerializer;
import com.xxl.rpc.util.IpUtil;
import com.xxl.rpc.util.NetUtil;
import org.slf4j.Logger;
@ -105,35 +103,18 @@ public class XxlJobExecutor {
// destory TriggerCallbackThread
TriggerCallbackThread.getInstance().toStop();
// destory invoker
stopInvokerFactory();
}
// ---------------------- admin-client (rpc invoker) ----------------------
private static List<AdminBiz> adminBizList;
private static Serializer serializer;
private static Serializer serializer = new HessianSerializer();
private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
serializer = Serializer.SerializeEnum.HESSIAN.getSerializer();
if (adminAddresses!=null && adminAddresses.trim().length()>0) {
for (String address: adminAddresses.trim().split(",")) {
if (address!=null && address.trim().length()>0) {
String addressUrl = address.concat(AdminBiz.MAPPING);
AdminBiz adminBiz = (AdminBiz) new XxlRpcReferenceBean(
NetEnum.NETTY_HTTP,
serializer,
CallType.SYNC,
LoadBalance.ROUND,
AdminBiz.class,
null,
3000,
addressUrl,
accessToken,
null,
null
).getObject();
AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken);
if (adminBizList == null) {
adminBizList = new ArrayList<AdminBiz>();
@ -143,14 +124,6 @@ public class XxlJobExecutor {
}
}
}
private void stopInvokerFactory(){
// stop invoker factory
try {
XxlRpcInvokerFactory.getInstance().stop();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
public static List<AdminBiz> getAdminBizList(){
return adminBizList;
}
@ -171,7 +144,16 @@ public class XxlJobExecutor {
serviceRegistryParam.put("address", address);
xxlRpcProviderFactory = new XxlRpcProviderFactory();
xxlRpcProviderFactory.initConfig(NetEnum.NETTY_HTTP, Serializer.SerializeEnum.HESSIAN.getSerializer(), ip, port, accessToken, ExecutorServiceRegistry.class, serviceRegistryParam);
xxlRpcProviderFactory.setServer(NettyHttpServer.class);
xxlRpcProviderFactory.setSerializer(HessianSerializer.class);
xxlRpcProviderFactory.setCorePoolSize(20);
xxlRpcProviderFactory.setMaxPoolSize(200);
xxlRpcProviderFactory.setIp(ip);
xxlRpcProviderFactory.setPort(port);
xxlRpcProviderFactory.setAccessToken(accessToken);
xxlRpcProviderFactory.setServiceRegistry(ExecutorServiceRegistry.class);
xxlRpcProviderFactory.setServiceRegistryParam(serviceRegistryParam);
// add services
xxlRpcProviderFactory.addService(ExecutorBiz.class.getName(), null, new ExecutorBizImpl());

View File

@ -116,7 +116,7 @@ public class JobThread extends Thread{
triggerLogIdSet.remove(triggerParam.getLogId());
// log filename, like "logPath/yyyy-MM-dd/9999.log"
String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTim()), triggerParam.getLogId());
String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());
XxlJobFileAppender.contextHolder.set(logFileName);
ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal()));
@ -186,11 +186,11 @@ public class JobThread extends Thread{
// callback handler info
if (!toStop) {
// commonm
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTim(), executeResult));
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), executeResult));
} else {
// is killed
ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job runningkilled]");
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTim(), stopResult));
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult));
}
}
}
@ -202,7 +202,7 @@ public class JobThread extends Thread{
if (triggerParam!=null) {
// is killed
ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job not executed, in the job queue, killed.]");
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTim(), stopResult));
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult));
}
}

View File

@ -0,0 +1,122 @@
package com.xxl.job.core.util;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.registry.client.util.json.BasicJson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Map;
/**
* @author xuxueli 2018-11-25 00:55:31
*/
public class XxlJobRemotingUtil {
private static Logger logger = LoggerFactory.getLogger(XxlJobRemotingUtil.class);
public static String XXL_RPC_ACCESS_TOKEN = "XXL_RPC_ACCESS_TOKEN";
/**
* post
*
* @param url
* @param accessToken
* @param requestObj
* @return
*/
public static ReturnT<String> postBody(String url, String accessToken, Object requestObj, int timeout) {
HttpURLConnection connection = null;
BufferedReader bufferedReader = null;
try {
// connection
URL realUrl = new URL(url);
connection = (HttpURLConnection) realUrl.openConnection();
// connection setting
connection.setRequestMethod("POST");
connection.setDoOutput(true);
connection.setDoInput(true);
connection.setUseCaches(false);
connection.setReadTimeout(timeout * 1000);
connection.setConnectTimeout(3 * 1000);
connection.setRequestProperty("connection", "Keep-Alive");
connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8");
connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8");
if(accessToken!=null && accessToken.trim().length()>0){
connection.setRequestProperty(XXL_RPC_ACCESS_TOKEN, accessToken);
}
// do connection
connection.connect();
// write requestBody
String requestBody = BasicJson.toJson(requestObj);
DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream());
dataOutputStream.writeBytes(requestBody);
dataOutputStream.flush();
dataOutputStream.close();
/*byte[] requestBodyBytes = requestBody.getBytes("UTF-8");
connection.setRequestProperty("Content-Length", String.valueOf(requestBodyBytes.length));
OutputStream outwritestream = connection.getOutputStream();
outwritestream.write(requestBodyBytes);
outwritestream.flush();
outwritestream.close();*/
// valid StatusCode
int statusCode = connection.getResponseCode();
if (statusCode != 200) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "xxl-rpc remoting fail, StatusCode("+ statusCode +") invalid. for url : " + url);
}
// result
bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream()));
StringBuilder result = new StringBuilder();
String line;
while ((line = bufferedReader.readLine()) != null) {
result.append(line);
}
String resultJson = result.toString();
// parse returnT
try {
Map<String, Object> resultMap = BasicJson.parseMap(resultJson);
ReturnT<String> returnT = new ReturnT<String>();
if (resultMap==null) {
returnT.setCode(ReturnT.FAIL_CODE);
returnT.setMsg("AdminBizClient Remoting call fail.");
} else {
returnT.setCode(Integer.valueOf(String.valueOf(resultMap.get("code"))));
returnT.setMsg(String.valueOf(resultMap.get("msg")));
returnT.setContent(String.valueOf(resultMap.get("content")));
}
return returnT;
} catch (Exception e) {
e.printStackTrace();
return new ReturnT<String>(ReturnT.FAIL_CODE, "xxl-rpc remoting response content invalid("+ resultJson +"), for url : " + url);
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
return new ReturnT<String>(ReturnT.FAIL_CODE, "xxl-rpc remoting error("+ e.getMessage() +"), for url : " + url);
} finally {
try {
if (bufferedReader != null) {
bufferedReader.close();
}
if (connection != null) {
connection.disconnect();
}
} catch (Exception e2) {
logger.error(e2.getMessage(), e2);
}
}
}
}

View File

@ -10,8 +10,8 @@ import com.xxl.job.core.glue.GlueTypeEnum;
import com.xxl.rpc.remoting.invoker.call.CallType;
import com.xxl.rpc.remoting.invoker.reference.XxlRpcReferenceBean;
import com.xxl.rpc.remoting.invoker.route.LoadBalance;
import com.xxl.rpc.remoting.net.NetEnum;
import com.xxl.rpc.serialize.Serializer;
import com.xxl.rpc.remoting.net.impl.netty_http.client.NettyHttpClient;
import com.xxl.rpc.serialize.impl.HessianSerializer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -44,18 +44,20 @@ public class ExecutorBizImplTest {
TimeUnit.SECONDS.sleep(3);
// init executor biz proxy
executorBiz = (ExecutorBiz) new XxlRpcReferenceBean(
NetEnum.NETTY_HTTP,
Serializer.SerializeEnum.HESSIAN.getSerializer(),
CallType.SYNC,
LoadBalance.ROUND,
ExecutorBiz.class,
null,
3000,
"127.0.0.1:9999",
null,
null,
null).getObject();
XxlRpcReferenceBean referenceBean = new XxlRpcReferenceBean();
referenceBean.setClient(NettyHttpClient.class);
referenceBean.setSerializer(HessianSerializer.class);
referenceBean.setCallType(CallType.SYNC);
referenceBean.setLoadBalance(LoadBalance.ROUND);
referenceBean.setIface(ExecutorBiz.class);
referenceBean.setVersion(null);
referenceBean.setTimeout(3000);
referenceBean.setAddress("127.0.0.1:9999");
referenceBean.setAccessToken(null);
referenceBean.setInvokeCallback(null);
referenceBean.setInvokerFactory(null);
executorBiz = (ExecutorBiz) referenceBean.getObject();
}
@After
@ -131,7 +133,7 @@ public class ExecutorBizImplTest {
triggerParam.setGlueSource(null);
triggerParam.setGlueUpdatetime(System.currentTimeMillis());
triggerParam.setLogId(1);
triggerParam.setLogDateTim(System.currentTimeMillis());
triggerParam.setLogDateTime(System.currentTimeMillis());
// Act
final ReturnT<String> retval = executorBiz.run(triggerParam);