最近项目经历,需要自定义增加定时发送任务,于是学习了下定时任务的知识, 是基于成熟的开源产品Quartz和Zookeeper及其客户端Curator进行二次开发。那么我要实现一个和 一样的自定义任务也应该基于Quartz
于是学习了下Quartz结合springboot的自定义任务。
1、任务记录入库,启动服务后读取该任务,并加入或删除或更新job计划。
2、定时任务执行指定任务。
指定任务是这类定时任务指定的是同样的方法或者一类方法
具体代码思路如下
依赖的java包
org.quartz-scheduler quartz 1.8.0 org.springframework spring-context-support 3.2.17.RELEASE
a 配置的xml
b 这个是关键代码,
从数据库读任务
增加到job计划
删除过时或者以取消的任务
更新任务
package com.ql.vessels.schemajob;import com.fqgj.log.factory.LogFactory;import com.fqgj.log.interfaces.Log;import com.ql.vessels.common.util.DateUtils;import com.ql.vessels.domain.services.MsgSendSchemaService;import com.ql.vessels.repo.entity.MsgSendSchemaEntity;import com.ql.vessels.repo.entity.PushSchemaEntity;import com.ql.vessels.repo.vo.TaskVo;import org.quartz.JobDataMap;import org.quartz.JobDetail;import org.quartz.Scheduler;import org.quartz.SchedulerException;import org.springframework.beans.BeansException;import org.springframework.beans.factory.BeanFactory;import org.springframework.beans.factory.BeanFactoryAware;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.scheduling.quartz.CronTriggerBean;import java.util.Date;import java.util.HashMap;import java.util.List;import java.util.Map;/** * @author vampire * @date 2019/5/16 10:08 AM */public class QuartzManager implements BeanFactoryAware { private final static Log log = LogFactory.getLog(QuartzManager.class); private Scheduler scheduler; private static BeanFactory beanFactory; @Autowired MsgSendSchemaService msgSendSchemaService; /** * 定时要执行的方法类。 */ public void reScheduleJob() { // 1.读取数据库中的任务列表。 Date now = new Date(); Map map = new HashMap(4); map.put("sendType", 2); //获取短信定时任务 Listlist = msgSendSchemaService.selectByParams(map); for (MsgSendSchemaEntity sendSchema : list ) { TaskVo taskVo = new TaskVo(); taskVo.setTaskId(sendSchema.getId()); taskVo.setTaskType(1); taskVo.setCronExpression(DateUtils.getCron(sendSchema.getSendTime())); taskVo.setState(sendSchema.getStatus()); if (sendSchema.getStatus() != 0 || now.after(sendSchema.getSendTime())) { //去掉过时的任务 removeExpireTasks(taskVo); } else { configSchedul(taskVo); } } } /** * 移除过期任务 * * @param bo */ private void removeExpireTasks(TaskVo bo) { try { CronTriggerBean trigger = (CronTriggerBean) scheduler.getTrigger(bo.getTaskId() + "name" + bo.getTaskType(), Scheduler.DEFAULT_GROUP); if (trigger != null) { log.info("==移除任务==" + bo.getTaskId() + "name" + bo.getTaskType()); scheduler.pauseTrigger(trigger.getName(), trigger.getGroup());// 停止触发器 scheduler.unscheduleJob(trigger.getName(), trigger.getGroup());// 移除触发器 scheduler.deleteJob(trigger.getJobName(), trigger.getJobGroup());// 删除任务 } } catch (SchedulerException e) { log.error("移除任务失败..."); e.printStackTrace(); } } /** * 配置任务列表 * * @param bo */ private void configSchedul(TaskVo bo) { try { CronTriggerBean trigger = (CronTriggerBean) scheduler.getTrigger(bo.getTaskId() + "name" + bo.getTaskType(), Scheduler.DEFAULT_GROUP); if (trigger == null) { //说明schedule中不存在该定时任务 createTriggerTask(bo); } else { updateTriggerTask(bo, trigger); } } catch (SchedulerException e) { log.error("获取触发器trigger失败..."); e.printStackTrace(); } } /** * 更新任务列表 * * @param bo */ private void updateTriggerTask(TaskVo bo, CronTriggerBean trigger) { if (bo.getState() == 0) { try { // 判断从DB中取得的任务时间和现在的quartz线程中的任务时间是否相等 // 如果相等,则表示用户并没有重新设定数据库中的任务时间,这种情况不需要重新rescheduleJob if (trigger.getCronExpression() != null && !trigger.getCronExpression().equalsIgnoreCase(bo.getCronExpression())) { log.info("=真正更新方法:=" + bo.getTaskId() + "name" + bo.getTaskType()); trigger.setCronExpression(bo.getCronExpression()); scheduler.rescheduleJob(bo.getTaskId() + "name" + bo.getTaskType(), Scheduler.DEFAULT_GROUP, trigger); log.info("更新任务时间失败..."); } } catch (Exception e) { log.error("更新任务时间失败..."); e.printStackTrace(); } } else { this.removeExpireTasks(bo); } } /** * 创建任务列表 * * @param bo */ private void createTriggerTask(TaskVo bo) { if (bo.getState() == 0) { log.info("=创建:=" + bo.getTaskId() + "name" + bo.getTaskType()); try { Class clazz = QuartzJobFactory.class;//执行计划任务的类 JobDetail jobDetail = new JobDetail(bo.getTaskId() + "", clazz); Map map = new HashMap(); map.put("task", bo); jobDetail.setJobDataMap(new JobDataMap(map)); jobDetail.setName(bo.getTaskId() + "name" + bo.getTaskType()); scheduler.addJob(jobDetail, true); // 将Job添加到管理类 // 新一个基于Spring的时间类 CronTriggerBean c = new CronTriggerBean(); c.setCronExpression(bo.getCronExpression());// 设置时间表达式 c.setName(bo.getTaskId() + "name" + bo.getTaskType());// 设置名称 c.setJobDetail(jobDetail);// 注入Job c.setJobName(bo.getTaskId() + "name" + bo.getTaskType());// 设置Job名称 scheduler.scheduleJob(c);// 注入到管理类 scheduler.rescheduleJob(bo.getTaskId() + "name" + bo.getTaskType(), Scheduler.DEFAULT_GROUP, c);// 刷新管理类 } catch (Exception e) { log.error("创建" + bo.getTaskId() + "name" + bo.getTaskType() + "任务失败..."); e.printStackTrace(); } } else { this.removeExpireTasks(bo); } } @Override public void setBeanFactory(BeanFactory beanFactory) throws BeansException { this.beanFactory = beanFactory; } public Scheduler getScheduler() { return scheduler; } public QuartzManager setScheduler(Scheduler scheduler) { this.scheduler = scheduler; return this; } public static BeanFactory getBeanFactory() { return beanFactory; }}
c 任务添加后那每一个任务真正执行的方法是什么?需要我们来写任务具体做的事
所有的定时任务都会触发到这个 execute 方法中,根据定时任务创建时传入的参数来调用这个执行中的具体业务方法,
注意 :@Autowired 注入服务时空指针还需要添加下边一个类,刚刚在xml中也提到过,背景色标注的那两行就是为解决此问题的
package com.ql.vessels.schemajob;import com.alibaba.fastjson.JSON;import com.fqgj.log.factory.LogFactory;import com.fqgj.log.interfaces.Log;import com.ql.vessels.domain.services.MsgSendSchemaService;import com.ql.vessels.domain.services.PushSchemaService;import com.ql.vessels.repo.vo.TaskVo;import org.quartz.Job;import org.quartz.JobDataMap;import org.quartz.JobExecutionContext;import org.quartz.JobExecutionException;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;/** * @author vampire * @date 2019/5/16 11:56 AM */@Componentpublic class QuartzJobFactory implements Job { private final static Log log = LogFactory.getLog(QuartzJobFactory.class); @Autowired MsgSendSchemaService msgSendSchemaService; @Autowired PushSchemaService pushSchemaService; @Override public void execute(JobExecutionContext context) throws JobExecutionException { JobDataMap jobDataMap = context.getJobDetail().getJobDataMap(); TaskVo taskVo = (TaskVo) jobDataMap.get("task"); log.info("定时任务开始执行。。。。。。。。。。。" + JSON.toJSONString(taskVo)); if (taskVo.getTaskType() == 1) { log.info("message job=======" + taskVo.getTaskId()); msgSendSchemaService.sendSchemaMsg(taskVo.getTaskId()); } else if (taskVo.getTaskType() == 2) { //推送任务执行 log.info("push job=======" + taskVo.getTaskId()); pushSchemaService.sendSchemaPush(taskVo.getTaskId()); } }}
d 解决服务注入为空
package com.ql.vessels.schemajob;import org.quartz.spi.TriggerFiredBundle;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.config.AutowireCapableBeanFactory;import org.springframework.scheduling.quartz.AdaptableJobFactory;/** * @author vampire * @date 2019/5/16 4:12 PM * 作用是解决QuartzJobFactory 类中不能注入服务类 */public class JobAdapter extends AdaptableJobFactory { @Autowired private AutowireCapableBeanFactory capableBeanFactory; @Override protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception { //调用父类的方法 Object jobInstance = super.createJobInstance(bundle); //进行注入 capableBeanFactory.autowireBean(jobInstance); return jobInstance; }}
到此就完美解决了自定义任务,当然服务器是单台的没有考虑到集群,若集群的需要添加zk,跑任务有且只能有一个服务器去跑,负责重复重复后果很严重。