瀏覽代碼

添加定时任务处理框架

liyan 3 年之前
父節點
當前提交
b9cbc0868d

+ 12 - 0
apps/svs-init/src/main/resources/sql/mysql/schema-svs.sql

@@ -1,4 +1,5 @@
 drop table IF EXISTS SVS_TASK_LOG $$
+drop table IF EXISTS SVS_SCHEDULE_TASK $$
 
 create table SVS_TASK_LOG
 (
@@ -17,3 +18,14 @@ create table SVS_TASK_LOG
     CREATED_AT datetime(0),
     UPDATED_AT datetime(0)
 ) $$
+
+create table SVS_SCHEDULE_TASK
+(
+    TASK_ID    varchar(48) not null primary key,
+    TASK_CLASS_NAME  varchar(500),
+    CRON_EXPRESSION  varchar(500),
+    STATUS     varchar(48),
+    REMARK     varchar(255),
+    CREATED_AT datetime(0),
+    UPDATED_AT datetime(0)
+) $$

+ 16 - 0
services/service-schedule/src/main/kotlin/llh/svs/core/services/schedule/ScheduleApplication.kt

@@ -0,0 +1,16 @@
+package llh.svs.core.services.schedule
+
+import org.springframework.boot.autoconfigure.SpringBootApplication
+import org.springframework.boot.autoconfigure.domain.EntityScan
+import org.springframework.boot.runApplication
+import org.springframework.data.jpa.repository.config.EnableJpaRepositories
+
+@SpringBootApplication
+@EntityScan(basePackages = ["llh.svs.core.services"])
+@EnableJpaRepositories(basePackages = ["llh.svs.core.services"])
+class ScheduleApplication {
+}
+
+fun main(args: Array<String>) {
+    runApplication<ScheduleApplication>(*args)
+}

+ 25 - 0
services/service-schedule/src/main/kotlin/llh/svs/core/services/schedule/ScheduleConfigure.kt

@@ -0,0 +1,25 @@
+package llh.svs.core.services.schedule
+
+import llh.svs.core.services.schedule.dao.SvsScheduleTaskDAO
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.context.annotation.Bean
+import org.springframework.context.annotation.Configuration
+import org.springframework.scheduling.annotation.EnableScheduling
+import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler
+
+@Configuration
+@EnableScheduling
+class ScheduleConfigure {
+
+    @Autowired
+    lateinit var dao: SvsScheduleTaskDAO
+
+    @Bean
+    fun threadPoolTaskScheduler(): ThreadPoolTaskScheduler? {
+        val threadPoolTaskScheduler = ThreadPoolTaskScheduler()
+        val taskCount = dao.count().toInt()
+        threadPoolTaskScheduler.poolSize = if (taskCount > 1) taskCount else 1
+        threadPoolTaskScheduler.isRemoveOnCancelPolicy = true
+        return threadPoolTaskScheduler
+    }
+}

+ 55 - 0
services/service-schedule/src/main/kotlin/llh/svs/core/services/schedule/ScheduleTaskController.kt

@@ -0,0 +1,55 @@
+package llh.svs.core.services.schedule
+
+import gaf3.core.data.ErrorResult
+import gaf3.core.data.PageParam
+import gaf3.core.data.PagedData
+import llh.svs.core.services.schedule.domain.ScheduleTaskForm
+import llh.svs.core.services.schedule.entity.SvsScheduleTask
+import llh.svs.core.services.schedule.service.SvsScheduleService
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.context.ApplicationContext
+import org.springframework.web.bind.annotation.*
+
+@RestController
+@RequestMapping(path = ["/svs/schedule"])
+class ScheduleTaskController {
+
+    @Autowired
+    lateinit var service: SvsScheduleService
+
+    @Autowired
+    lateinit var ctx: ApplicationContext
+
+    @PostMapping(path = ["/item"])
+    fun create(@RequestBody form: ScheduleTaskForm): SvsScheduleTask {
+        return service.create(form)
+    }
+
+    @PostMapping(path = ["/item/{id}"])
+    fun update(@PathVariable("id") id: String, @RequestBody form: ScheduleTaskForm): SvsScheduleTask {
+        return service.update(id, form)
+    }
+
+    @GetMapping(path = ["/items"])
+    fun find(filter: SvsScheduleTask, pageParam: PageParam, exact: Boolean?): PagedData<SvsScheduleTask> {
+        return service.find(filter, pageParam, exact ?: true)
+    }
+
+    @GetMapping(path = ["/item/{id}"])
+    fun fetch(@PathVariable("id") id: String): SvsScheduleTask {
+        return service.fetch(id)
+    }
+
+    @GetMapping(path = ["/task/cancel/{id}"])
+    fun stopTask(@PathVariable("id") id: String): ErrorResult {
+        service.cancel(id)
+        return ErrorResult(0, "ok")
+    }
+
+    @GetMapping(path = ["/task/execute/{id}"])
+    fun onceExecute(@PathVariable("id") id: String): ErrorResult {
+        service.onceExecute(id)
+        return ErrorResult(0, "ok")
+    }
+
+}

+ 12 - 0
services/service-schedule/src/main/kotlin/llh/svs/core/services/schedule/dao/SvsScheduleTaskDAO.kt

@@ -0,0 +1,12 @@
+package llh.svs.core.services.schedule.dao
+
+import llh.svs.core.services.schedule.entity.SvsScheduleTask
+import org.springframework.data.jpa.repository.JpaRepository
+
+interface SvsScheduleTaskDAO : JpaRepository<SvsScheduleTask, String> {
+
+    fun findByTaskClassName(className: String): SvsScheduleTask
+
+    fun findByStatus(status: String): List<SvsScheduleTask>
+
+}

+ 19 - 0
services/service-schedule/src/main/kotlin/llh/svs/core/services/schedule/domain/ScheduleTaskForm.kt

@@ -0,0 +1,19 @@
+package llh.svs.core.services.schedule.domain
+
+import org.springframework.validation.annotation.Validated
+import javax.validation.constraints.NotEmpty
+
+@Validated
+class ScheduleTaskForm {
+
+    @NotEmpty
+    var taskClassName: String? = null
+
+    @NotEmpty
+    var cronExpression: String? = null
+
+    @NotEmpty
+    var status: String? = null
+
+    var remark: String? = null
+}

+ 44 - 0
services/service-schedule/src/main/kotlin/llh/svs/core/services/schedule/entity/SvsScheduleTask.kt

@@ -0,0 +1,44 @@
+package llh.svs.core.services.schedule.entity
+
+import gaf3.core.jpa.GafTimestamp
+import javax.persistence.Column
+import javax.persistence.Entity
+import javax.persistence.Id
+import javax.persistence.Table
+
+@Entity
+@Table(name = "SVS_SCHEDULE_TASK")
+class SvsScheduleTask : GafTimestamp() {
+
+    /**
+     * 定时任务ID
+     */
+    @Id
+    @Column(name = "TASK_ID")
+    var taskId: String? = null
+
+    /**
+     * 定时任务完整类名
+     */
+    @Column(name = "TASK_CLASS_NAME")
+    var taskClassName: String? = null
+
+    /**
+     * 定时任务cron表达式
+     */
+    @Column(name = "CRON_EXPRESSION")
+    var cronExpression: String? = null
+
+    /**
+     * 定时任务状态 0正常 1停用
+     */
+    @Column(name = "STATUS")
+    var status: String? = null
+
+    /**
+     * 备注
+     */
+    @Column(name = "REMARK")
+    var remark: String? = null
+
+}

+ 101 - 0
services/service-schedule/src/main/kotlin/llh/svs/core/services/schedule/service/SvsScheduleService.kt

@@ -0,0 +1,101 @@
+package llh.svs.core.services.schedule.service
+
+import gaf3.core.data.PageParam
+import gaf3.core.data.PagedData
+import gaf3.core.exception.BusinessError
+import gaf3.core.jpa.extension.findAll
+import gaf3.core.util.DataBeanHelper
+import llh.svs.core.services.schedule.dao.SvsScheduleTaskDAO
+import llh.svs.core.services.schedule.domain.ScheduleTaskForm
+import llh.svs.core.services.schedule.entity.SvsScheduleTask
+import llh.svs.core.services.schedule.support.ScheduleTaskStatus
+import llh.svs.core.services.schedule.support.ScheduleTaskWorker
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.data.repository.findByIdOrNull
+import org.springframework.stereotype.Service
+import org.springframework.util.Assert
+import java.util.*
+import javax.annotation.PostConstruct
+
+
+@Service
+class SvsScheduleService(val dao: SvsScheduleTaskDAO) {
+
+    @Autowired
+    lateinit var taskWorker: ScheduleTaskWorker
+
+    /**
+     * 创建定时任务
+     * @param form 定时任务
+     */
+    fun create(form: ScheduleTaskForm): SvsScheduleTask {
+        val entity = SvsScheduleTask()
+        DataBeanHelper.Bean2Obj(form, entity)
+        entity.taskId = UUID.randomUUID().toString()
+        if (entity.status == ScheduleTaskStatus.ENABLED.status) {
+            taskWorker.startTask(entity)
+        }
+        return dao.save(entity)
+    }
+
+    /**
+     * 修改定时任务
+     * @param id 任务ID
+     * @param form 定时任务
+     */
+    fun update(id: String?, form: ScheduleTaskForm): SvsScheduleTask {
+        val entity = fetch(id)
+        DataBeanHelper.Bean2Obj(form, entity, "taskId", "createdAt", "updatedAt")
+        taskWorker.stopTask(entity)
+        if (entity.status == ScheduleTaskStatus.ENABLED.status) {
+            taskWorker.startTask(entity)
+        }
+        return dao.save(entity)
+    }
+
+    /**
+     * 查询定时任务列表
+     * @param filter 过滤条件
+     * @param pageParam 分页参数
+     */
+    fun find(filter: SvsScheduleTask, pageParam: PageParam, exact: Boolean = true): PagedData<SvsScheduleTask> {
+        val paged = dao.findAll(filter, pageParam.toPageRequest(), exact)
+        return PagedData(data = paged.content, total = paged.totalElements.toInt())
+    }
+
+    /**
+     * 按照ID查询定时任务
+     * @param id 定时任务id
+     */
+    fun fetch(id: String?): SvsScheduleTask {
+        Assert.hasText(id, "任务ID不可为空")
+        return dao.findByIdOrNull(id) ?: throw BusinessError(BusinessError.ERR_DATA_NOTEXIST, "指定任务不存在")
+    }
+
+    /**
+     * 取消定时任务
+     */
+    fun cancel(id: String?) {
+        val entity = dao.findByIdOrNull(id) ?: throw BusinessError(BusinessError.ERR_DATA_NOTEXIST, "指定任务不存在")
+        taskWorker.stopTask(entity)
+    }
+
+    /**
+     * 手动执行一次定时任务
+     */
+    fun onceExecute(id: String?) {
+        val entity = dao.findByIdOrNull(id) ?: throw BusinessError(BusinessError.ERR_DATA_NOTEXIST, "指定任务不存在")
+        taskWorker.onceStart(entity)
+    }
+
+    /**
+     * 初始化所有可执行定时任务
+     */
+    @PostConstruct
+    fun initScheduleTask() {
+        dao.findByStatus(ScheduleTaskStatus.ENABLED.status).forEach {
+            taskWorker.startTask(it)
+        }
+    }
+
+}

+ 9 - 0
services/service-schedule/src/main/kotlin/llh/svs/core/services/schedule/support/ScheduleTaskStatus.kt

@@ -0,0 +1,9 @@
+package llh.svs.core.services.schedule.support
+
+/**
+ * 定时任务状态枚举
+ */
+enum class ScheduleTaskStatus(val status: String) {
+    ENABLED(status = "0"),
+    DISABLED(status = "1")
+}

+ 88 - 0
services/service-schedule/src/main/kotlin/llh/svs/core/services/schedule/support/ScheduleTaskWorker.kt

@@ -0,0 +1,88 @@
+package llh.svs.core.services.schedule.support
+
+import gaf3.core.exception.BusinessError
+import llh.svs.core.services.schedule.entity.SvsScheduleTask
+import org.slf4j.LoggerFactory
+import org.springframework.beans.BeansException
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.context.ApplicationContext
+import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler
+import org.springframework.scheduling.support.CronTrigger
+import org.springframework.stereotype.Component
+import org.springframework.util.Assert
+import java.util.concurrent.ScheduledFuture
+
+/**
+ * 定时任务创建辅助类
+ */
+@Component
+class ScheduleTaskWorker {
+
+    @Autowired
+    lateinit var ctx: ApplicationContext
+
+    @Autowired
+    lateinit var threadPoolTaskScheduler: ThreadPoolTaskScheduler
+
+    /**
+     * 创建新定时任务
+     */
+    fun startTask(entity: SvsScheduleTask) {
+        log.debug("创建定时任务,className=${entity.taskClassName},cron=${entity.cronExpression}")
+        // 获取任务执行类
+        val task = runCatching {
+            ctx.getBean(Class.forName(entity.taskClassName))
+        }.getOrElse {
+            if (it is ClassNotFoundException) {
+                log.error("${entity.taskClassName}类不存在")
+            }
+            if (it is BeansException) {
+                log.error("${entity.taskClassName}未纳入bean管理")
+            }
+            log.error("配置${entity.taskClassName}任务异常,${it.localizedMessage}")
+            throw BusinessError(BusinessError.ERR_SERVICE_FAULT, "任务启动失败")
+        }
+        Assert.isAssignable(ScheduledTask::class.java, task.javaClass, "定时任务类必须实现ScheduledOfTask接口")
+        // 如果该任务正在执行,则取消
+        scheduleMap[entity.taskClassName!!]?.cancel(true)
+        // 添加定时任务
+        val scheduleFuture =
+            threadPoolTaskScheduler.schedule(task as Runnable, CronTrigger(entity.cronExpression!!))
+        scheduleMap[entity.taskClassName!!] = scheduleFuture
+    }
+
+    /**
+     * 停止定时任务
+     */
+    fun stopTask(entity: SvsScheduleTask) {
+        log.debug("取消定时任务,className=${entity.taskClassName}")
+        scheduleMap[entity.taskClassName!!]?.cancel(true)
+    }
+
+    /**
+     * 手动执行一次任务
+     */
+    fun onceStart(entity: SvsScheduleTask) {
+        log.debug("手动执行一次定时任务,className=${entity.taskClassName}")
+        val task = runCatching {
+            ctx.getBean(Class.forName(entity.taskClassName))
+        }.getOrElse {
+            if (it is ClassNotFoundException) {
+                log.error("${entity.taskClassName}类不存在")
+            }
+            if (it is BeansException) {
+                log.error("${entity.taskClassName}未纳入bean管理")
+            }
+            log.error("配置${entity.taskClassName}任务异常,${it.localizedMessage}")
+            throw BusinessError(BusinessError.ERR_SERVICE_FAULT, "任务启动失败")
+        }
+        Assert.isAssignable(ScheduledTask::class.java, task.javaClass, "定时任务类必须实现ScheduledOfTask接口")
+        val scheduleTask = task as ScheduledTask
+        scheduleTask.execute()
+    }
+
+    companion object {
+        private val log = LoggerFactory.getLogger(ScheduleTaskWorker::class.java)
+        private val scheduleMap = mutableMapOf<String, ScheduledFuture<*>?>()
+    }
+}

+ 19 - 0
services/service-schedule/src/main/kotlin/llh/svs/core/services/schedule/support/ScheduledTask.kt

@@ -0,0 +1,19 @@
+package llh.svs.core.services.schedule.support
+
+/**
+ * 定时任务接口
+ */
+interface ScheduledTask : Runnable {
+
+    /**
+     * 定时任务方法
+     */
+    fun execute()
+
+    /**
+     * 实现控制定时任务启用或禁用的功能
+     */
+    override fun run() {
+        execute()
+    }
+}

+ 17 - 0
services/service-schedule/src/main/resources/application-mysql.yml

@@ -0,0 +1,17 @@
+# 数据库配置
+---
+spring:
+  datasource:
+    username: root
+    password: 123456
+    url: jdbc:mysql://127.0.0.1:3306/svs?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=UTC
+    driver-class-name: com.mysql.cj.jdbc.Driver
+    platform: mysql
+  jpa:
+    database-platform: org.hibernate.dialect.MySQL8Dialect
+    show-sql: true
+    hibernate:
+      naming:
+        implicit-strategy: org.hibernate.boot.model.naming.ImplicitNamingStrategyJpaCompliantImpl
+        physical-strategy: org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl
+      ddl-auto: none

+ 20 - 0
services/service-schedule/src/main/resources/application.yml

@@ -0,0 +1,20 @@
+server:
+  port: 9090
+  servlet:
+    encoding:
+      charset: utf-8
+spring:
+  profiles:
+    include: mysql
+    active: local
+  main:
+    allow-bean-definition-overriding: true
+
+---
+spring:
+  config:
+    activate:
+      on-profile: local
+
+logging.level.gaf3.core.*: DEBUG
+logging.level.llh.svs.*: DEBUG

+ 1 - 0
settings.gradle.kts

@@ -3,3 +3,4 @@ include("apps:svs-init", "apps:svs-all-in-one")
 include("services", "shared", "platform")
 include("services:service-system")
 include("services:service-task")
+include("services:service-schedule")