可以使用golang中的goroutine和channel来实现从数据库读取定时任务数据,并异步执行。下面是一个简单的示例代码:
package main
import (
"database/sql"
"fmt"
"time"
_ "github.com/go-sql-driver/mysql"
)
type Task struct {
ID int
Name string
Interval time.Duration
LastRun time.Time
NextRun time.Time
Command string
Args string
CreatedAt time.Time
UpdatedAt time.Time
}
func main() {
db, err := sql.Open("mysql", "root:password@tcp(127.0.0.1:3306)/taskdb")
if err != nil {
panic(err.Error())
}
defer db.Close()
tasks := make(chan Task)
go func() {
for {
rows, err := db.Query("SELECT * FROM tasks WHERE next_run <= NOW()")
if err != nil {
fmt.Println(err.Error())
time.Sleep(1 * time.Minute)
continue
}
for rows.Next() {
var task Task
err := rows.Scan(&task.ID, &task.Name, &task.Interval, &task.LastRun, &task.NextRun, &task.Command, &task.Args, &task.CreatedAt, &task.UpdatedAt)
if err != nil {
fmt.Println(err.Error())
continue
}
tasks <- task
}
rows.Close()
time.Sleep(1 * time.Minute)
}
}()
for task := range tasks {
go func(task Task) {
fmt.Printf("Executing task %s with args %s\n", task.Command, task.Args)
//执行任务代码
task.LastRun = time.Now()
task.NextRun = time.Now().Add(task.Interval)
_, err := db.Exec("UPDATE tasks SET last_run=?, next_run=? WHERE id=?", task.LastRun, task.NextRun, task.ID)
if err != nil {
fmt.Println(err.Error())
}
}(task)
}
}
该示例代码启动一个goroutine来定期从数据库中查询到期的任务,并将其发送到一个channel中。然后,另一个goroutine从channel中接收任务并执行它们。注意,执行任务的goroutine需要在一个新的goroutine中运行,以避免阻塞channel。同时,也需要在执行任务完成后更新任务的最后执行时间和下一次执行时间。