diff --git a/server/pipeline/approve.go b/server/pipeline/approve.go index 6f38c2a2b..c700139f7 100644 --- a/server/pipeline/approve.go +++ b/server/pipeline/approve.go @@ -29,24 +29,47 @@ import ( // and start them afterward func Approve(ctx context.Context, store store.Store, currentPipeline *model.Pipeline, user *model.User, repo *model.Repo) (*model.Pipeline, error) { if currentPipeline.Status != model.StatusBlocked { - return nil, ErrBadRequest{Msg: fmt.Sprintf("cannot decline a pipeline with status %s", currentPipeline.Status)} + return nil, ErrBadRequest{Msg: fmt.Sprintf("cannot approve a pipeline with status %s", currentPipeline.Status)} } // fetch the pipeline file from the database configs, err := store.ConfigsForPipeline(currentPipeline.ID) if err != nil { - msg := fmt.Sprintf("failure to get pipeline config for %s. %s", repo.FullName, err) - log.Error().Msg(msg) + msg := fmt.Sprintf("failure to get pipeline config for %s", repo.FullName) + log.Error().Err(err).Msg(msg) return nil, ErrNotFound{Msg: msg} } + var yamls []*forge_types.FileMeta + for _, y := range configs { + yamls = append(yamls, &forge_types.FileMeta{Data: y.Data, Name: y.Name}) + } + + if currentPipeline.Workflows, err = store.WorkflowGetTree(currentPipeline); err != nil { + return nil, fmt.Errorf("error: loading workflows. %w", err) + } if currentPipeline, err = UpdateToStatusPending(store, *currentPipeline, user.Login); err != nil { return nil, fmt.Errorf("error updating pipeline. %w", err) } - var yamls []*forge_types.FileMeta - for _, y := range configs { - yamls = append(yamls, &forge_types.FileMeta{Data: y.Data, Name: y.Name}) + for _, wf := range currentPipeline.Workflows { + if wf.State != model.StatusBlocked { + continue + } + wf.State = model.StatusPending + if err := store.WorkflowUpdate(wf); err != nil { + return nil, fmt.Errorf("error updating workflow. %w", err) + } + + for _, step := range wf.Children { + if step.State != model.StatusBlocked { + continue + } + step.State = model.StatusPending + if err := store.StepUpdate(step); err != nil { + return nil, fmt.Errorf("error updating step. %w", err) + } + } } currentPipeline, pipelineItems, err := createPipelineItems(ctx, store, currentPipeline, user, repo, yamls, nil) @@ -56,6 +79,29 @@ func Approve(ctx context.Context, store store.Store, currentPipeline *model.Pipe return nil, fmt.Errorf(msg) } + // TODO improve this + for _, item := range pipelineItems { + for _, wf := range currentPipeline.Workflows { + if item.Workflow.Name == wf.Name { + item.Workflow = wf + for _, stage := range item.Config.Stages { + for _, step := range stage.Steps { + for _, storeStep := range wf.Children { + if storeStep.Name == step.Name { + step.UUID = storeStep.UUID + break + } + } + } + } + + break + } + } + } + + publishPipeline(ctx, currentPipeline, repo, user) + currentPipeline, err = start(ctx, store, currentPipeline, user, repo, pipelineItems) if err != nil { msg := fmt.Sprintf("failure to start pipeline for %s: %v", repo.FullName, err) diff --git a/server/pipeline/create.go b/server/pipeline/create.go index 3d2574447..c81881c0a 100644 --- a/server/pipeline/create.go +++ b/server/pipeline/create.go @@ -120,8 +120,12 @@ func Create(ctx context.Context, _store store.Store, repo *model.Repo, pipeline return nil, fmt.Errorf(msg) } + if err := prepareStart(ctx, _store, pipeline, repoUser, repo); err != nil { + log.Error().Err(err).Str("repo", repo.FullName).Msgf("error preparing pipeline for %s#%d", repo.FullName, pipeline.Number) + return nil, err + } + if pipeline.Status == model.StatusBlocked { - publishPipeline(ctx, pipeline, repo, repoUser) return pipeline, nil } diff --git a/server/pipeline/decline.go b/server/pipeline/decline.go index d2d78e988..9587bf4ba 100644 --- a/server/pipeline/decline.go +++ b/server/pipeline/decline.go @@ -39,6 +39,20 @@ func Decline(ctx context.Context, store store.Store, pipeline *model.Pipeline, u log.Error().Err(err).Msg("cannot build tree from step list") } + for _, wf := range pipeline.Workflows { + wf.State = model.StatusDeclined + if err := store.WorkflowUpdate(wf); err != nil { + return nil, fmt.Errorf("error updating workflow. %w", err) + } + + for _, step := range wf.Children { + step.State = model.StatusDeclined + if err := store.StepUpdate(step); err != nil { + return nil, fmt.Errorf("error updating step. %w", err) + } + } + } + updatePipelineStatus(ctx, pipeline, repo, user) publishToTopic(pipeline, repo) diff --git a/server/pipeline/items.go b/server/pipeline/items.go index 35206cd11..5ac9f1bcd 100644 --- a/server/pipeline/items.go +++ b/server/pipeline/items.go @@ -127,12 +127,8 @@ func setPipelineStepsOnPipeline(pipeline *model.Pipeline, pipelineItems []*stepb for _, item := range pipelineItems { for _, stage := range item.Config.Stages { - var gid int for _, step := range stage.Steps { pidSequence++ - if gid == 0 { - gid = pidSequence - } step := &model.Step{ Name: step.Name, UUID: step.UUID, @@ -146,9 +142,15 @@ func setPipelineStepsOnPipeline(pipeline *model.Pipeline, pipelineItems []*stepb if item.Workflow.State == model.StatusSkipped { step.State = model.StatusSkipped } + if pipeline.Status == model.StatusBlocked { + step.State = model.StatusBlocked + } item.Workflow.Children = append(item.Workflow.Children, step) } } + if pipeline.Status == model.StatusBlocked { + item.Workflow.State = model.StatusBlocked + } item.Workflow.PipelineID = pipeline.ID pipeline.Workflows = append(pipeline.Workflows, item.Workflow) } diff --git a/server/pipeline/restart.go b/server/pipeline/restart.go index 4cab9ae92..11e3c68fc 100644 --- a/server/pipeline/restart.go +++ b/server/pipeline/restart.go @@ -89,6 +89,12 @@ func Restart(ctx context.Context, store store.Store, lastPipeline *model.Pipelin return nil, fmt.Errorf(msg) } + if err := prepareStart(ctx, store, newPipeline, user, repo); err != nil { + msg := fmt.Sprintf("failure to prepare pipeline for %s", repo.FullName) + log.Error().Err(err).Msg(msg) + return nil, fmt.Errorf(msg) + } + newPipeline, err = start(ctx, store, newPipeline, user, repo, pipelineItems) if err != nil { msg := fmt.Sprintf("failure to start pipeline for %s", repo.FullName) diff --git a/server/pipeline/start.go b/server/pipeline/start.go index cb9d77939..5ca22807f 100644 --- a/server/pipeline/start.go +++ b/server/pipeline/start.go @@ -47,6 +47,16 @@ func start(ctx context.Context, store store.Store, activePipeline *model.Pipelin return activePipeline, nil } +func prepareStart(ctx context.Context, store store.Store, activePipeline *model.Pipeline, user *model.User, repo *model.Repo) error { + if err := store.WorkflowsCreate(activePipeline.Workflows); err != nil { + log.Error().Err(err).Str("repo", repo.FullName).Msgf("error persisting steps for %s#%d", repo.FullName, activePipeline.Number) + return err + } + + publishPipeline(ctx, activePipeline, repo, user) + return nil +} + func publishPipeline(ctx context.Context, pipeline *model.Pipeline, repo *model.Repo, repoUser *model.User) { publishToTopic(pipeline, repo) updatePipelineStatus(ctx, pipeline, repo, repoUser)