mirror of
https://github.com/woodpecker-ci/woodpecker.git
synced 2024-12-28 03:10:29 +00:00
d494b6a959
This is the first step towards support for multiple forges (#138). It inserts a forge using the currently existing env varaibles into db and uses this forge from db later on in all places of the code. closes #621 addresses #138 # TODO - [x] add forges table - [x] add id of forge to repo - [x] use forge of repo - [x] add forge from env vars to db if not exists - [x] migrate repo.ForgeID to the newly generated forge - [x] support cache with forge from repo - [x] maybe add forge loading cache? (use LRU cache for forges, I expect users to have less than 10 forges normally) --------- Co-authored-by: qwerty287 <80460567+qwerty287@users.noreply.github.com>
149 lines
3.8 KiB
Go
149 lines
3.8 KiB
Go
// Copyright 2022 Woodpecker Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package cron
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/robfig/cron"
|
|
"github.com/rs/zerolog/log"
|
|
|
|
"go.woodpecker-ci.org/woodpecker/v2/server"
|
|
"go.woodpecker-ci.org/woodpecker/v2/server/forge"
|
|
"go.woodpecker-ci.org/woodpecker/v2/server/model"
|
|
"go.woodpecker-ci.org/woodpecker/v2/server/pipeline"
|
|
"go.woodpecker-ci.org/woodpecker/v2/server/store"
|
|
)
|
|
|
|
const (
|
|
// checkTime specifies the interval woodpecker checks for new crons to exec
|
|
checkTime = 10 * time.Second
|
|
|
|
// checkItems specifies the batch size of crons to retrieve per check from database
|
|
checkItems = 10
|
|
)
|
|
|
|
// Start starts the cron scheduler loop
|
|
func Start(ctx context.Context, store store.Store) error {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil
|
|
case <-time.After(checkTime):
|
|
go func() {
|
|
now := time.Now()
|
|
log.Trace().Msg("cron: fetch next crons")
|
|
|
|
crons, err := store.CronListNextExecute(now.Unix(), checkItems)
|
|
if err != nil {
|
|
log.Error().Err(err).Int64("now", now.Unix()).Msg("obtain cron list")
|
|
return
|
|
}
|
|
|
|
for _, cron := range crons {
|
|
if err := runCron(ctx, store, cron, now); err != nil {
|
|
log.Error().Err(err).Int64("cronID", cron.ID).Msg("run cron failed")
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
}
|
|
}
|
|
|
|
// CalcNewNext parses a cron string and calculates the next exec time based on it
|
|
func CalcNewNext(schedule string, now time.Time) (time.Time, error) {
|
|
// remove local timezone
|
|
now = now.UTC()
|
|
|
|
// TODO: allow the users / the admin to set a specific timezone
|
|
|
|
c, err := cron.Parse(schedule)
|
|
if err != nil {
|
|
return time.Time{}, fmt.Errorf("cron parse schedule: %w", err)
|
|
}
|
|
return c.Next(now), nil
|
|
}
|
|
|
|
func runCron(ctx context.Context, store store.Store, cron *model.Cron, now time.Time) error {
|
|
log.Trace().Msgf("cron: run id[%d]", cron.ID)
|
|
|
|
newNext, err := CalcNewNext(cron.Schedule, now)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// try to get lock on cron
|
|
gotLock, err := store.CronGetLock(cron, newNext.Unix())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !gotLock {
|
|
// another go routine caught it
|
|
return nil
|
|
}
|
|
|
|
repo, newPipeline, err := CreatePipeline(ctx, store, cron)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = pipeline.Create(ctx, store, repo, newPipeline)
|
|
return err
|
|
}
|
|
|
|
func CreatePipeline(ctx context.Context, store store.Store, cron *model.Cron) (*model.Repo, *model.Pipeline, error) {
|
|
repo, err := store.GetRepo(cron.RepoID)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
_forge, err := server.Config.Services.Manager.ForgeFromRepo(repo)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
if cron.Branch == "" {
|
|
// fallback to the repos default branch
|
|
cron.Branch = repo.Branch
|
|
}
|
|
|
|
creator, err := store.GetUser(cron.CreatorID)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// If the forge has a refresh token, the current access token
|
|
// may be stale. Therefore, we should refresh prior to dispatching
|
|
// the pipeline.
|
|
forge.Refresh(ctx, _forge, store, creator)
|
|
|
|
commit, err := _forge.BranchHead(ctx, creator, repo, cron.Branch)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
return repo, &model.Pipeline{
|
|
Event: model.EventCron,
|
|
Commit: commit.SHA,
|
|
Ref: "refs/heads/" + cron.Branch,
|
|
Branch: cron.Branch,
|
|
Message: cron.Name,
|
|
Timestamp: cron.NextExec,
|
|
Sender: cron.Name,
|
|
ForgeURL: commit.ForgeURL,
|
|
}, nil
|
|
}
|