feat: 定时报告使用优先级线程池

This commit is contained in:
fit2cloud-chenyw 2022-01-13 17:28:25 +08:00
parent b6a011c904
commit 9cd891556f
10 changed files with 269 additions and 46 deletions

View File

@ -0,0 +1,9 @@
package io.dataease.base.mapper.ext;
public interface ExtTaskMapper {
int runningCount(Long taskId);
void resetRunnings(Long taskId);
}

View File

@ -0,0 +1,13 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="io.dataease.base.mapper.ext.ExtTaskMapper">
<select id="runningCount" resultType="java.lang.Integer">
select count(*) as count from sys_task_instance where task_id = #{taskId} and status = 0
</select>
<update id="resetRunnings">
update sys_task_instance set status = -1, info = 'System Interrupt Error' where task_id = #{taskId} and status = 0
</update>
</mapper>

View File

@ -0,0 +1,113 @@
package io.dataease.commons.pool;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import io.dataease.commons.utils.LogUtil;
public class PriorityThreadPoolExecutor extends ThreadPoolExecutor {
public static AtomicInteger globaInteger = new AtomicInteger(1);
private ThreadLocal<Integer> local = new ThreadLocal<Integer>() {
@Override
protected Integer initialValue() {
return 1;
}
};
public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, getWorkQueue());
}
public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, getWorkQueue(), threadFactory);
}
public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, getWorkQueue(), handler);
}
public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, getWorkQueue(), threadFactory, handler);
}
protected static PriorityBlockingQueue getWorkQueue() {
return new PriorityBlockingQueue();
}
@Override
public void execute(Runnable command) {
int andIncrement = globaInteger.getAndIncrement();
Integer theadInteger = local.get();
try {
if (theadInteger == 0) {
this.execute(command, 0);
} else {
this.execute(command, andIncrement);
}
} finally {
local.set(1);
}
}
public void execute(Runnable command, int priority) {
super.execute(new PriorityRunnable(command, priority));
}
public <T> Future<T> submit(Callable<T> task, int priority) {
local.set(priority);
return super.submit(task);
}
protected static class PriorityRunnable<E extends Comparable<? super E>>
implements Runnable, Comparable<PriorityRunnable<E>> {
private final static AtomicLong seq = new AtomicLong();
private final long seqNum;
Runnable run;
private int priority;
public PriorityRunnable(Runnable run, int priority) {
seqNum = seq.getAndIncrement();
this.run = run;
this.priority = priority;
}
public int getPriority() {
return priority;
}
public void setPriority(int priority) {
this.priority = priority;
}
public Runnable getRun() {
return run;
}
@Override
public void run() {
LogUtil.info("number " + priority + " is starting...");
this.run.run();
}
@Override
public int compareTo(PriorityRunnable<E> other) {
int res = 0;
if (this.priority == other.priority) {
if (other.run != this.run) {// ASC
res = (seqNum < other.seqNum ? -1 : 1);
}
} else {// DESC
res = this.priority > other.priority ? 1 : -1;
}
return res;
}
}
}

View File

@ -0,0 +1,17 @@
package io.dataease.commons.pool;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import lombok.Data;
@ConfigurationProperties(prefix = "detask", ignoreInvalidFields = true)
@Data
@Component
public class PriorityThreadPoolProperties {
private int corePoolSize = 2;
private int maximumPoolSize = 100;
private int keepAliveTime = 60;
}

View File

@ -1,15 +1,25 @@
package io.dataease.config;
import java.util.concurrent.TimeUnit;
import javax.annotation.Resource;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import io.dataease.commons.pool.PriorityThreadPoolExecutor;
import io.dataease.commons.pool.PriorityThreadPoolProperties;
@EnableAsync(proxyTargetClass = true)
@Configuration
public class AsyncConfig {
@Resource
private PriorityThreadPoolProperties priorityThreadPoolProperties;
@Bean
public AsyncTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
@ -18,4 +28,18 @@ public class AsyncConfig {
executor.setMaxPoolSize(10);
return executor;
}
@Bean
public PriorityThreadPoolExecutor priorityExecutor() {
int corePoolSize = priorityThreadPoolProperties.getCorePoolSize();
int maximumPoolSize = priorityThreadPoolProperties.getMaximumPoolSize();
int keepAliveTime = priorityThreadPoolProperties.getKeepAliveTime();
PriorityThreadPoolExecutor executor = new PriorityThreadPoolExecutor(corePoolSize, maximumPoolSize,
keepAliveTime, TimeUnit.SECONDS);
return executor;
}
}

View File

@ -28,7 +28,7 @@ public class ScheduleManager {
* @throws SchedulerException
*/
public void addSimpleJob(JobKey jobKey, TriggerKey triggerKey, Class<? extends Job> cls, int repeatIntervalTime,
JobDataMap jobDataMap) throws SchedulerException {
JobDataMap jobDataMap) throws SchedulerException {
JobBuilder jobBuilder = JobBuilder.newJob(cls).withIdentity(jobKey);
@ -46,7 +46,8 @@ public class ScheduleManager {
scheduler.scheduleJob(jd, trigger);
}
public void addSimpleJob(JobKey jobKey, TriggerKey triggerKey, Class<? extends Job> cls, int repeatIntervalTime) throws SchedulerException {
public void addSimpleJob(JobKey jobKey, TriggerKey triggerKey, Class<? extends Job> cls, int repeatIntervalTime)
throws SchedulerException {
addSimpleJob(jobKey, triggerKey, cls, repeatIntervalTime);
}
@ -59,7 +60,8 @@ public class ScheduleManager {
* @param cron
* @param jobDataMap
*/
public void addCronJob(JobKey jobKey, TriggerKey triggerKey, Class jobClass, String cron, Date startTime, Date endTime, JobDataMap jobDataMap) {
public void addCronJob(JobKey jobKey, TriggerKey triggerKey, Class jobClass, String cron, Date startTime,
Date endTime, JobDataMap jobDataMap) {
try {
LogUtil.info("addCronJob: " + triggerKey.getName() + "," + triggerKey.getGroup());
@ -99,7 +101,8 @@ public class ScheduleManager {
}
}
public void addCronJob(JobKey jobKey, TriggerKey triggerKey, Class jobClass, String cron, Date startTime, Date endTime) {
public void addCronJob(JobKey jobKey, TriggerKey triggerKey, Class jobClass, String cron, Date startTime,
Date endTime) {
addCronJob(jobKey, triggerKey, jobClass, cron, startTime, endTime, null);
}
@ -140,7 +143,8 @@ public class ScheduleManager {
* @param cron
* @throws SchedulerException
*/
public void modifyCronJobTime(TriggerKey triggerKey, String cron, Date startTime, Date endTime) throws SchedulerException {
public void modifyCronJobTime(TriggerKey triggerKey, String cron, Date startTime, Date endTime)
throws SchedulerException {
LogUtil.info("modifyCronJobTime: " + triggerKey.getName() + "," + triggerKey.getGroup());
@ -151,7 +155,6 @@ public class ScheduleManager {
return;
}
/** 方式一 :调用 rescheduleJob 开始 */
TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger();// 触发器
@ -279,7 +282,6 @@ public class ScheduleManager {
}
}
public static void startJobs(Scheduler sched) {
try {
sched.start();
@ -289,7 +291,6 @@ public class ScheduleManager {
}
}
public void shutdownJobs(Scheduler sched) {
try {
if (!sched.isShutdown()) {
@ -312,7 +313,7 @@ public class ScheduleManager {
* @throws SchedulerException
*/
public void addOrUpdateSimpleJob(JobKey jobKey, TriggerKey triggerKey, Class clz,
int intervalTime, JobDataMap jobDataMap) throws SchedulerException {
int intervalTime, JobDataMap jobDataMap) throws SchedulerException {
if (scheduler.checkExists(triggerKey)) {
modifySimpleJobTime(triggerKey, intervalTime);
@ -323,7 +324,7 @@ public class ScheduleManager {
}
public void addOrUpdateSingleJob(JobKey jobKey, TriggerKey triggerKey, Class clz,
Date date, JobDataMap jobDataMap) throws SchedulerException {
Date date, JobDataMap jobDataMap) throws SchedulerException {
if (scheduler.checkExists(triggerKey)) {
modifySingleJobTime(triggerKey, date);
} else {
@ -333,15 +334,15 @@ public class ScheduleManager {
}
public void addOrUpdateSingleJob(JobKey jobKey, TriggerKey triggerKey, Class clz,
Date date) throws SchedulerException {
Date date) throws SchedulerException {
addOrUpdateSingleJob(jobKey, triggerKey, clz, date, null);
}
public void addOrUpdateSimpleJob(JobKey jobKey, TriggerKey triggerKey, Class clz, int intervalTime) throws SchedulerException {
public void addOrUpdateSimpleJob(JobKey jobKey, TriggerKey triggerKey, Class clz, int intervalTime)
throws SchedulerException {
addOrUpdateSimpleJob(jobKey, triggerKey, clz, intervalTime, null);
}
/**
* 添加或修改 cronJob
*
@ -352,7 +353,8 @@ public class ScheduleManager {
* @param jobDataMap
* @throws SchedulerException
*/
public void addOrUpdateCronJob(JobKey jobKey, TriggerKey triggerKey, Class jobClass, String cron, Date startTime, Date endTime, JobDataMap jobDataMap) throws SchedulerException {
public void addOrUpdateCronJob(JobKey jobKey, TriggerKey triggerKey, Class jobClass, String cron, Date startTime,
Date endTime, JobDataMap jobDataMap) throws SchedulerException {
LogUtil.info("AddOrUpdateCronJob: " + jobKey.getName() + "," + triggerKey.getGroup());
@ -363,7 +365,8 @@ public class ScheduleManager {
}
}
public void addOrUpdateCronJob(JobKey jobKey, TriggerKey triggerKey, Class jobClass, String cron, Date startTime, Date endTime) throws SchedulerException {
public void addOrUpdateCronJob(JobKey jobKey, TriggerKey triggerKey, Class jobClass, String cron, Date startTime,
Date endTime) throws SchedulerException {
addOrUpdateCronJob(jobKey, triggerKey, jobClass, cron, startTime, endTime, null);
}
@ -398,7 +401,8 @@ public class ScheduleManager {
if (!CronExpression.isValidExpression(cron)) {
DataEaseException.throwException("cron :" + cron + " error");
}
return TriggerBuilder.newTrigger().withIdentity("Calculate Date").withSchedule(CronScheduleBuilder.cronSchedule(cron)).build();
return TriggerBuilder.newTrigger().withIdentity("Calculate Date")
.withSchedule(CronScheduleBuilder.cronSchedule(cron)).build();
}

View File

@ -13,9 +13,6 @@ import java.util.Date;
public abstract class TaskHandler implements InitializingBean {
private static final String[] week = {"SUNDAY", "MONDAY", "TUESDAY", "WEDNESDAY", "THURSDAY", "FRIDAY", "SATURDAY"};
public void addTask(ScheduleManager scheduleManager, GlobalTaskEntity taskEntity) throws Exception {
// 1首先看看是否过期
Long endTime = taskEntity.getEndTime();
@ -30,12 +27,11 @@ public abstract class TaskHandler implements InitializingBean {
if (ObjectUtils.isNotEmpty(taskEntity.getEndTime())) {
new Date(taskEntity.getEndTime());
}
Class executor = this.getClass();
Class<? extends TaskHandler> executor = this.getClass();
String cron = cron(taskEntity);
scheduleManager.addOrUpdateCronJob(jobKey, triggerKey, executor, cron, start, end, jobDataMap(taskEntity));
}
protected abstract JobDataMap jobDataMap(GlobalTaskEntity taskEntity);
private String cron(GlobalTaskEntity taskEntity) {
@ -54,36 +50,34 @@ public abstract class TaskHandler implements InitializingBean {
instance.setTime(date);
if (taskEntity.getRateType() == 0) {
return
instance.get(Calendar.SECOND) + " " +
instance.get(Calendar.MINUTE) + " " +
instance.get(Calendar.HOUR_OF_DAY) + " * * ?";
return instance.get(Calendar.SECOND) + " " +
instance.get(Calendar.MINUTE) + " " +
instance.get(Calendar.HOUR_OF_DAY) + " * * ?";
}
if (taskEntity.getRateType() == 1) {
return
instance.get(Calendar.SECOND) + " " +
instance.get(Calendar.MINUTE) + " " +
instance.get(Calendar.HOUR_OF_DAY) + " ? * " +
getDayOfWeek(instance);
return instance.get(Calendar.SECOND) + " " +
instance.get(Calendar.MINUTE) + " " +
instance.get(Calendar.HOUR_OF_DAY) + " ? * " +
getDayOfWeek(instance);
}
if (taskEntity.getRateType() == 2) {
return
instance.get(Calendar.SECOND) + " " +
instance.get(Calendar.MINUTE) + " " +
instance.get(Calendar.HOUR_OF_DAY) + " " +
instance.get(Calendar.DATE) + " * ?";
return instance.get(Calendar.SECOND) + " " +
instance.get(Calendar.MINUTE) + " " +
instance.get(Calendar.HOUR_OF_DAY) + " " +
instance.get(Calendar.DATE) + " * ?";
}
return null;
}
public abstract void resetRunningInstance(Long taskId);
private String getDayOfWeek(Calendar instance) {
int index = instance.get(Calendar.DAY_OF_WEEK);
index = (index + 1) % 7;
index = (index + 1) % 7;
return String.valueOf(index);
}
public void removeTask(ScheduleManager scheduleManager, GlobalTaskEntity taskEntity) {
JobKey jobKey = new JobKey(taskEntity.getTaskId().toString());
TriggerKey triggerKey = new TriggerKey(taskEntity.getTaskId().toString());
@ -95,14 +89,16 @@ public abstract class TaskHandler implements InitializingBean {
scheduleManager.fireNow(jobKey);
}
//判断任务是否过期
// 判断任务是否过期
public Boolean taskExpire(Long endTime) {
if (ObjectUtils.isEmpty(endTime)) return false;
if (ObjectUtils.isEmpty(endTime))
return false;
Long now = System.currentTimeMillis();
return now > endTime;
}
protected abstract Boolean taskIsRunning(Long taskId);
@Override
public void afterPropertiesSet() throws Exception {
String beanName = null;

View File

@ -5,6 +5,7 @@ import io.dataease.auth.entity.TokenInfo;
import io.dataease.auth.service.AuthUserService;
import io.dataease.auth.service.impl.AuthUserServiceImpl;
import io.dataease.auth.util.JWTUtils;
import io.dataease.base.mapper.ext.ExtTaskMapper;
import io.dataease.commons.utils.CommonBeanFactory;
import io.dataease.commons.utils.LogUtil;
import io.dataease.commons.utils.ServletUtils;
@ -20,6 +21,7 @@ import io.dataease.service.system.EmailService;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.quartz.*;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@ -46,6 +48,16 @@ public class EmailTaskHandler extends TaskHandler implements Job {
return jobDataMap;
}
public EmailTaskHandler proxy() {
return CommonBeanFactory.getBean(EmailTaskHandler.class);
}
@Override
protected Boolean taskIsRunning(Long taskId) {
ExtTaskMapper extTaskMapper = CommonBeanFactory.getBean(ExtTaskMapper.class);
return extTaskMapper.runningCount(taskId) > 0;
}
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
// 插件没有加载 空转
@ -54,11 +66,16 @@ public class EmailTaskHandler extends TaskHandler implements Job {
JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
GlobalTaskEntity taskEntity = (GlobalTaskEntity) jobDataMap.get("taskEntity");
ScheduleManager scheduleManager = SpringContextUtil.getBean(ScheduleManager.class);
if (taskExpire(taskEntity.getEndTime())) {
ScheduleManager scheduleManager = SpringContextUtil.getBean(ScheduleManager.class);
removeTask(scheduleManager, taskEntity);
return;
}
if (taskIsRunning(taskEntity.getTaskId())) {
LogUtil.info("Skip synchronization task: {} ,due to task status is {}",
taskEntity.getTaskId(), "running");
return;
}
GlobalTaskInstance taskInstance = buildInstance(taskEntity);
Long instanceId = saveInstance(taskInstance);
@ -67,10 +84,15 @@ public class EmailTaskHandler extends TaskHandler implements Job {
XpackEmailTemplateDTO emailTemplate = (XpackEmailTemplateDTO) jobDataMap.get("emailTemplate");
SysUserEntity creator = (SysUserEntity) jobDataMap.get("creator");
LogUtil.info("start execute send panel report task...");
sendReport(taskInstance, emailTemplate, creator);
proxy().sendReport(taskInstance, emailTemplate, creator);
}
@Override
public void resetRunningInstance(Long taskId) {
ExtTaskMapper extTaskMapper = CommonBeanFactory.getBean(ExtTaskMapper.class);
extTaskMapper.resetRunnings(taskId);
}
public Long saveInstance(GlobalTaskInstance taskInstance) {
EmailXpackService emailXpackService = SpringContextUtil.getBean(EmailXpackService.class);
@ -99,11 +121,12 @@ public class EmailTaskHandler extends TaskHandler implements Job {
emailXpackService.saveInstance(taskInstance);
}
@Async("priorityExecutor")
public void sendReport(GlobalTaskInstance taskInstance, XpackEmailTemplateDTO emailTemplateDTO,
SysUserEntity user) {
SysUserEntity user) {
EmailXpackService emailXpackService = SpringContextUtil.getBean(EmailXpackService.class);
try {
String panelId = emailTemplateDTO.getPanelId();
String url = panelUrl(panelId);
String token = tokenByUser(user);
@ -116,11 +139,15 @@ public class EmailTaskHandler extends TaskHandler implements Job {
String recipients = emailTemplateDTO.getRecipients();
byte[] content = emailTemplateDTO.getContent();
EmailService emailService = SpringContextUtil.getBean(EmailService.class);
String contentStr = "";
if (ObjectUtils.isNotEmpty(content)) {
contentStr = new String(content, "UTF-8");
}
emailService.sendWithImage(recipients, emailTemplateDTO.getTitle(), contentStr, bytes);
emailService.sendWithImage(recipients, emailTemplateDTO.getTitle(),
contentStr, bytes);
Thread.sleep(10000);
success(taskInstance);
} catch (Exception e) {
error(taskInstance, e);

View File

@ -32,6 +32,7 @@ public class GlobalTaskStartListener implements ApplicationListener<ApplicationR
tasks.stream().forEach(task -> {
TaskHandler taskHandler = TaskStrategyFactory.getInvokeStrategy(task.getTaskType());
try {
taskHandler.resetRunningInstance(task.getTaskId());
taskHandler.addTask(scheduleManager, task);
} catch (Exception e) {
e.printStackTrace();

View File

@ -3,6 +3,7 @@ package io.dataease.plugins.server;
import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import io.dataease.commons.exception.DEException;
import io.dataease.commons.pool.PriorityThreadPoolExecutor;
import io.dataease.commons.utils.*;
import io.dataease.plugins.common.entity.GlobalTaskEntity;
import io.dataease.plugins.common.entity.GlobalTaskInstance;
@ -23,6 +24,9 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.List;
import java.util.concurrent.Future;
import javax.annotation.Resource;
@Api(tags = "xpack定时报告")
@RequestMapping("/plugin/task")
@ -32,6 +36,9 @@ public class XEmailTaskServer {
@Autowired
private ScheduleService scheduleService;
@Resource
private PriorityThreadPoolExecutor priorityExecutor;
@PostMapping("/queryTasks/{goPage}/{pageSize}")
public Pager<List<XpackTaskGridDTO>> queryTask(@PathVariable int goPage, @PathVariable int pageSize,
@RequestBody XpackGridRequest request) {
@ -85,7 +92,19 @@ public class XEmailTaskServer {
String token = ServletUtils.getToken();
String fileId = null;
try {
fileId = emailXpackService.print(url, token, buildPixel(request.getPixel()));
Future<?> future = priorityExecutor.submit(() -> {
try {
return emailXpackService.print(url, token, buildPixel(request.getPixel()));
} catch (Exception e) {
LogUtil.error(e.getMessage(), e);
DEException.throwException("预览失败,请联系管理员");
}
return null;
}, 0);
Object object = future.get();
if (ObjectUtils.isNotEmpty(object)) {
fileId = object.toString();
}
} catch (Exception e) {
LogUtil.error(e.getMessage(), e);
DEException.throwException("预览失败,请联系管理员");