woodpecker/server/pipeline/create.go
qwerty287 849b02a433
Fix pipeline-related environment (#2876)
closes https://github.com/woodpecker-ci/woodpecker/issues/2672

pipeline model must be persisted first to have some fields like `Number`
and `ID`
2023-11-26 19:59:36 +01:00

137 lines
5 KiB
Go

// Copyright 2022 Woodpecker Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pipeline
import (
"context"
"fmt"
"regexp"
"time"
"github.com/rs/zerolog/log"
"go.woodpecker-ci.org/woodpecker/pipeline/errors"
"go.woodpecker-ci.org/woodpecker/server"
"go.woodpecker-ci.org/woodpecker/server/forge"
"go.woodpecker-ci.org/woodpecker/server/model"
"go.woodpecker-ci.org/woodpecker/server/store"
)
var skipPipelineRegex = regexp.MustCompile(`\[(?i:ci *skip|skip *ci)\]`)
// Create a new pipeline and start it
func Create(ctx context.Context, _store store.Store, repo *model.Repo, pipeline *model.Pipeline) (*model.Pipeline, error) {
repoUser, err := _store.GetUser(repo.UserID)
if err != nil {
msg := fmt.Sprintf("failure to find repo owner via id '%d'", repo.UserID)
log.Error().Err(err).Str("repo", repo.FullName).Msg(msg)
return nil, fmt.Errorf(msg)
}
skipMatch := skipPipelineRegex.FindString(pipeline.Message)
if len(skipMatch) > 0 {
log.Debug().Str("repo", repo.FullName).Msgf("ignoring pipeline as skip-ci was found in the commit (%s) message '%s'", pipeline.Commit, pipeline.Message)
return nil, ErrFiltered
}
// If the forge has a refresh token, the current access token
// may be stale. Therefore, we should refresh prior to dispatching
// the pipeline.
forge.Refresh(ctx, server.Config.Services.Forge, _store, repoUser)
// update some pipeline fields
pipeline.RepoID = repo.ID
pipeline.Status = model.StatusPending
setGatedState(repo, pipeline)
err = _store.CreatePipeline(pipeline)
if err != nil {
msg := fmt.Errorf("failed to save pipeline for %s", repo.FullName)
log.Error().Str("repo", repo.FullName).Err(err).Msg(msg.Error())
return nil, msg
}
// fetch the pipeline file from the forge
configFetcher := forge.NewConfigFetcher(server.Config.Services.Forge, server.Config.Services.Timeout, server.Config.Services.ConfigService, repoUser, repo, pipeline)
forgeYamlConfigs, configFetchErr := configFetcher.Fetch(ctx)
if configFetchErr != nil {
log.Debug().Str("repo", repo.FullName).Err(configFetchErr).Msgf("cannot find config '%s' in '%s' with user: '%s'", repo.Config, pipeline.Ref, repoUser.Login)
return nil, updatePipelineWithErr(ctx, _store, pipeline, repo, repoUser, fmt.Errorf("pipeline definition not found in %s", repo.FullName))
}
pipelineItems, parseErr := parsePipeline(_store, pipeline, repoUser, repo, forgeYamlConfigs, nil)
if errors.HasBlockingErrors(parseErr) {
log.Debug().Str("repo", repo.FullName).Err(parseErr).Msg("failed to parse yaml")
return nil, updatePipelineWithErr(ctx, _store, pipeline, repo, repoUser, parseErr)
} else if parseErr != nil {
pipeline.Errors = errors.GetPipelineErrors(parseErr)
}
if len(pipelineItems) == 0 {
log.Debug().Str("repo", repo.FullName).Msg(ErrFiltered.Error())
return nil, ErrFiltered
}
pipeline = setPipelineStepsOnPipeline(pipeline, pipelineItems)
// persist the pipeline config for historical correctness, restarts, etc
var configs []*model.Config
for _, forgeYamlConfig := range forgeYamlConfigs {
config, err := findOrPersistPipelineConfig(_store, pipeline, forgeYamlConfig)
if err != nil {
msg := fmt.Sprintf("failed to find or persist pipeline config for %s", repo.FullName)
log.Error().Err(err).Msg(msg)
return nil, fmt.Errorf(msg)
}
configs = append(configs, config)
}
// link pipeline to persisted configs
if err := linkPipelineConfigs(_store, configs, pipeline.ID); err != nil {
msg := fmt.Sprintf("failed to find or persist pipeline config for %s", repo.FullName)
log.Error().Err(err).Msg(msg)
return nil, fmt.Errorf(msg)
}
if pipeline.Status == model.StatusBlocked {
publishPipeline(ctx, pipeline, repo, repoUser)
return pipeline, nil
}
pipeline, err = start(ctx, _store, pipeline, repoUser, repo, pipelineItems)
if err != nil {
msg := fmt.Sprintf("failed to start pipeline for %s", repo.FullName)
log.Error().Err(err).Msg(msg)
return nil, fmt.Errorf(msg)
}
return pipeline, nil
}
func updatePipelineWithErr(ctx context.Context, _store store.Store, pipeline *model.Pipeline, repo *model.Repo, repoUser *model.User, err error) error {
pipeline.Started = time.Now().Unix()
pipeline.Finished = pipeline.Started
pipeline.Status = model.StatusError
pipeline.Errors = errors.GetPipelineErrors(err)
dbErr := _store.UpdatePipeline(pipeline)
if dbErr != nil {
msg := fmt.Errorf("failed to save pipeline for %s", repo.FullName)
log.Error().Err(dbErr).Msg(msg.Error())
return msg
}
publishPipeline(ctx, pipeline, repo, repoUser)
return nil
}