woodpecker/server/cron/cron.go
2023-09-16 10:53:37 +02:00

156 lines
4.1 KiB
Go

// Copyright 2022 Woodpecker Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cron
import (
"context"
"fmt"
"time"
"github.com/robfig/cron"
"github.com/rs/zerolog/log"
"github.com/woodpecker-ci/woodpecker/server/forge"
"github.com/woodpecker-ci/woodpecker/server/model"
"github.com/woodpecker-ci/woodpecker/server/pipeline"
"github.com/woodpecker-ci/woodpecker/server/store"
)
const (
// checkTime specifies the interval woodpecker checks for new crons to exec
checkTime = 10 * time.Second
// checkItems specifies the batch size of crons to retrieve per check from database
checkItems = 10
)
// Start starts the cron scheduler loop
func Start(ctx context.Context, store store.Store, forge forge.Forge) error {
for {
select {
case <-ctx.Done():
return nil
case <-time.After(checkTime):
go func() {
now := time.Now()
log.Trace().Msg("Cron: fetch next crons")
crons, err := store.CronListNextExecute(now.Unix(), checkItems)
if err != nil {
log.Error().Err(err).Int64("now", now.Unix()).Msg("obtain cron list")
return
}
for _, cron := range crons {
if err := runCron(store, forge, cron, now); err != nil {
log.Error().Err(err).Int64("cronID", cron.ID).Msg("run cron failed")
}
}
}()
}
}
}
// CalcNewNext parses a cron string and calculates the next exec time based on it
func CalcNewNext(schedule string, now time.Time) (time.Time, error) {
// remove local timezone
now = now.UTC()
// TODO: allow the users / the admin to set a specific timezone
c, err := cron.Parse(schedule)
if err != nil {
return time.Time{}, fmt.Errorf("cron parse schedule: %w", err)
}
return c.Next(now), nil
}
func runCron(store store.Store, forge forge.Forge, cron *model.Cron, now time.Time) error {
log.Trace().Msgf("Cron: run id[%d]", cron.ID)
ctx := context.Background()
newNext, err := CalcNewNext(cron.Schedule, now)
if err != nil {
return err
}
// try to get lock on cron
gotLock, err := store.CronGetLock(cron, newNext.Unix())
if err != nil {
return err
}
if !gotLock {
// another go routine caught it
return nil
}
repo, newPipeline, err := CreatePipeline(ctx, store, forge, cron)
if err != nil {
return err
}
_, err = pipeline.Create(ctx, store, repo, newPipeline)
return err
}
func CreatePipeline(ctx context.Context, store store.Store, f forge.Forge, cron *model.Cron) (*model.Repo, *model.Pipeline, error) {
repo, err := store.GetRepo(cron.RepoID)
if err != nil {
return nil, nil, err
}
if cron.Branch == "" {
// fallback to the repos default branch
cron.Branch = repo.Branch
}
creator, err := store.GetUser(cron.CreatorID)
if err != nil {
return nil, nil, err
}
// if the forge has a refresh token, the current access token
// may be stale. Therefore, we should refresh prior to dispatching
// the pipeline.
if refresher, ok := f.(forge.Refresher); ok {
refreshed, err := refresher.Refresh(ctx, creator)
if err != nil {
log.Error().Err(err).Msgf("failed to refresh oauth2 token for creator: %s", creator.Login)
} else if refreshed {
if err := store.UpdateUser(creator); err != nil {
log.Error().Err(err).Msgf("error while updating creator: %s", creator.Login)
// move forward
} else {
log.Debug().Msgf("token refreshed for creator: %s", creator.Login)
}
}
}
commit, err := f.BranchHead(ctx, creator, repo, cron.Branch)
if err != nil {
return nil, nil, err
}
return repo, &model.Pipeline{
Event: model.EventCron,
Commit: commit,
Ref: "refs/heads/" + cron.Branch,
Branch: cron.Branch,
Message: cron.Name,
Timestamp: cron.NextExec,
Sender: cron.Name,
Link: repo.Link,
}, nil
}