博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
自定义定时任务(从数据库读定时任务)
阅读量:6153 次
发布时间:2019-06-21

本文共 9834 字,大约阅读时间需要 32 分钟。

  最近项目经历,需要自定义增加定时发送任务,于是学习了下定时任务的知识, 是基于成熟的开源产品Quartz和Zookeeper及其客户端Curator进行二次开发。那么我要实现一个和 一样的自定义任务也应该基于Quartz

于是学习了下Quartz结合springboot的自定义任务。

1、任务记录入库,启动服务后读取该任务,并加入或删除或更新job计划。

2、定时任务执行指定任务。

指定任务是这类定时任务指定的是同样的方法或者一类方法 

具体代码思路如下

依赖的java包

org.quartz-scheduler
quartz
1.8.0
org.springframework
spring-context-support
3.2.17.RELEASE

 

a 配置的xml

b 这个是关键代码,

  从数据库读任务

  增加到job计划

  删除过时或者以取消的任务

  更新任务

package com.ql.vessels.schemajob;import com.fqgj.log.factory.LogFactory;import com.fqgj.log.interfaces.Log;import com.ql.vessels.common.util.DateUtils;import com.ql.vessels.domain.services.MsgSendSchemaService;import com.ql.vessels.repo.entity.MsgSendSchemaEntity;import com.ql.vessels.repo.entity.PushSchemaEntity;import com.ql.vessels.repo.vo.TaskVo;import org.quartz.JobDataMap;import org.quartz.JobDetail;import org.quartz.Scheduler;import org.quartz.SchedulerException;import org.springframework.beans.BeansException;import org.springframework.beans.factory.BeanFactory;import org.springframework.beans.factory.BeanFactoryAware;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.scheduling.quartz.CronTriggerBean;import java.util.Date;import java.util.HashMap;import java.util.List;import java.util.Map;/** * @author vampire * @date 2019/5/16 10:08 AM */public class QuartzManager implements BeanFactoryAware {    private final static Log log = LogFactory.getLog(QuartzManager.class);    private Scheduler scheduler;    private static BeanFactory beanFactory;    @Autowired    MsgSendSchemaService msgSendSchemaService;    /**     * 定时要执行的方法类。     */    public void reScheduleJob() {        // 1.读取数据库中的任务列表。        Date now = new Date();        Map map = new HashMap(4);        map.put("sendType", 2);        //获取短信定时任务        List
list = msgSendSchemaService.selectByParams(map); for (MsgSendSchemaEntity sendSchema : list ) { TaskVo taskVo = new TaskVo(); taskVo.setTaskId(sendSchema.getId()); taskVo.setTaskType(1); taskVo.setCronExpression(DateUtils.getCron(sendSchema.getSendTime())); taskVo.setState(sendSchema.getStatus()); if (sendSchema.getStatus() != 0 || now.after(sendSchema.getSendTime())) { //去掉过时的任务 removeExpireTasks(taskVo); } else { configSchedul(taskVo); } } } /** * 移除过期任务 * * @param bo */ private void removeExpireTasks(TaskVo bo) { try { CronTriggerBean trigger = (CronTriggerBean) scheduler.getTrigger(bo.getTaskId() + "name" + bo.getTaskType(), Scheduler.DEFAULT_GROUP); if (trigger != null) { log.info("==移除任务==" + bo.getTaskId() + "name" + bo.getTaskType()); scheduler.pauseTrigger(trigger.getName(), trigger.getGroup());// 停止触发器 scheduler.unscheduleJob(trigger.getName(), trigger.getGroup());// 移除触发器 scheduler.deleteJob(trigger.getJobName(), trigger.getJobGroup());// 删除任务 } } catch (SchedulerException e) { log.error("移除任务失败..."); e.printStackTrace(); } } /** * 配置任务列表 * * @param bo */ private void configSchedul(TaskVo bo) { try { CronTriggerBean trigger = (CronTriggerBean) scheduler.getTrigger(bo.getTaskId() + "name" + bo.getTaskType(), Scheduler.DEFAULT_GROUP); if (trigger == null) {
//说明schedule中不存在该定时任务 createTriggerTask(bo); } else { updateTriggerTask(bo, trigger); } } catch (SchedulerException e) { log.error("获取触发器trigger失败..."); e.printStackTrace(); } } /** * 更新任务列表 * * @param bo */ private void updateTriggerTask(TaskVo bo, CronTriggerBean trigger) { if (bo.getState() == 0) { try { // 判断从DB中取得的任务时间和现在的quartz线程中的任务时间是否相等 // 如果相等,则表示用户并没有重新设定数据库中的任务时间,这种情况不需要重新rescheduleJob if (trigger.getCronExpression() != null && !trigger.getCronExpression().equalsIgnoreCase(bo.getCronExpression())) { log.info("=真正更新方法:=" + bo.getTaskId() + "name" + bo.getTaskType()); trigger.setCronExpression(bo.getCronExpression()); scheduler.rescheduleJob(bo.getTaskId() + "name" + bo.getTaskType(), Scheduler.DEFAULT_GROUP, trigger); log.info("更新任务时间失败..."); } } catch (Exception e) { log.error("更新任务时间失败..."); e.printStackTrace(); } } else { this.removeExpireTasks(bo); } } /** * 创建任务列表 * * @param bo */ private void createTriggerTask(TaskVo bo) { if (bo.getState() == 0) { log.info("=创建:=" + bo.getTaskId() + "name" + bo.getTaskType()); try { Class clazz = QuartzJobFactory.class;//执行计划任务的类 JobDetail jobDetail = new JobDetail(bo.getTaskId() + "", clazz); Map map = new HashMap(); map.put("task", bo); jobDetail.setJobDataMap(new JobDataMap(map)); jobDetail.setName(bo.getTaskId() + "name" + bo.getTaskType()); scheduler.addJob(jobDetail, true); // 将Job添加到管理类 // 新一个基于Spring的时间类 CronTriggerBean c = new CronTriggerBean(); c.setCronExpression(bo.getCronExpression());// 设置时间表达式 c.setName(bo.getTaskId() + "name" + bo.getTaskType());// 设置名称 c.setJobDetail(jobDetail);// 注入Job c.setJobName(bo.getTaskId() + "name" + bo.getTaskType());// 设置Job名称 scheduler.scheduleJob(c);// 注入到管理类 scheduler.rescheduleJob(bo.getTaskId() + "name" + bo.getTaskType(), Scheduler.DEFAULT_GROUP, c);// 刷新管理类 } catch (Exception e) { log.error("创建" + bo.getTaskId() + "name" + bo.getTaskType() + "任务失败..."); e.printStackTrace(); } } else { this.removeExpireTasks(bo); } } @Override public void setBeanFactory(BeanFactory beanFactory) throws BeansException { this.beanFactory = beanFactory; } public Scheduler getScheduler() { return scheduler; } public QuartzManager setScheduler(Scheduler scheduler) { this.scheduler = scheduler; return this; } public static BeanFactory getBeanFactory() { return beanFactory; }}

c 任务添加后那每一个任务真正执行的方法是什么?需要我们来写任务具体做的事

所有的定时任务都会触发到这个 execute 方法中,根据定时任务创建时传入的参数来调用这个执行中的具体业务方法,

注意 :@Autowired 注入服务时空指针还需要添加下边一个类,刚刚在xml中也提到过,背景色标注的那两行就是为解决此问题的

package com.ql.vessels.schemajob;import com.alibaba.fastjson.JSON;import com.fqgj.log.factory.LogFactory;import com.fqgj.log.interfaces.Log;import com.ql.vessels.domain.services.MsgSendSchemaService;import com.ql.vessels.domain.services.PushSchemaService;import com.ql.vessels.repo.vo.TaskVo;import org.quartz.Job;import org.quartz.JobDataMap;import org.quartz.JobExecutionContext;import org.quartz.JobExecutionException;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;/** * @author vampire * @date 2019/5/16 11:56 AM */@Componentpublic class QuartzJobFactory implements Job {    private final static Log log = LogFactory.getLog(QuartzJobFactory.class);    @Autowired    MsgSendSchemaService msgSendSchemaService;    @Autowired    PushSchemaService pushSchemaService;    @Override    public void execute(JobExecutionContext context) throws JobExecutionException {        JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();        TaskVo taskVo = (TaskVo) jobDataMap.get("task");        log.info("定时任务开始执行。。。。。。。。。。。" + JSON.toJSONString(taskVo));        if (taskVo.getTaskType() == 1) {            log.info("message job=======" + taskVo.getTaskId());            msgSendSchemaService.sendSchemaMsg(taskVo.getTaskId());        } else if (taskVo.getTaskType() == 2) {            //推送任务执行            log.info("push job=======" + taskVo.getTaskId());            pushSchemaService.sendSchemaPush(taskVo.getTaskId());    }    }}

 

d 解决服务注入为空

package com.ql.vessels.schemajob;import org.quartz.spi.TriggerFiredBundle;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.config.AutowireCapableBeanFactory;import org.springframework.scheduling.quartz.AdaptableJobFactory;/** * @author vampire * @date 2019/5/16 4:12 PM * 作用是解决QuartzJobFactory 类中不能注入服务类 */public class JobAdapter extends AdaptableJobFactory {    @Autowired    private AutowireCapableBeanFactory capableBeanFactory;    @Override    protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {        //调用父类的方法        Object jobInstance = super.createJobInstance(bundle);        //进行注入        capableBeanFactory.autowireBean(jobInstance);        return jobInstance;    }}

 

 

到此就完美解决了自定义任务,当然服务器是单台的没有考虑到集群,若集群的需要添加zk,跑任务有且只能有一个服务器去跑,负责重复重复后果很严重。

 

转载于:https://www.cnblogs.com/likun10579/p/10896831.html

你可能感兴趣的文章
发布和逸出-构造过程中使this引用逸出
查看>>
Oracle执行计划发生过变化的SQL语句脚本
查看>>
使用SanLock建立简单的HA服务
查看>>
发现一个叫阿尔法城的小站(以后此贴为我记录日常常用网址的帖子了)
查看>>
Subversion使用Redmine帐户验证简单应用、高级应用以及优化
查看>>
Javascript Ajax 异步请求
查看>>
DBCP连接池
查看>>
cannot run programing "db2"
查看>>
mysql做主从relay-log问题
查看>>
Docker镜像与容器命令
查看>>
批量删除oracle中以相同类型字母开头的表
查看>>
Java基础学习总结(4)——对象转型
查看>>
BZOJ3239Discrete Logging——BSGS
查看>>
SpringMVC权限管理
查看>>
spring 整合 redis 配置
查看>>
redhat6.1下chrome的安装
查看>>
cacti分组发飞信模块开发
查看>>
浅析LUA中游戏脚本语言之魔兽世界
查看>>
飞翔的秘密
查看>>
Red Hat 安装源包出错 Package xxx.rpm is not signed
查看>>