diff --git a/cli/pipeline/ps.go b/cli/pipeline/ps.go index f77fd2ccd..d161b7a9f 100644 --- a/cli/pipeline/ps.go +++ b/cli/pipeline/ps.go @@ -74,7 +74,7 @@ func pipelinePs(c *cli.Context) error { return err } - for _, step := range pipeline.Steps { + for _, step := range pipeline.Workflows { for _, child := range step.Children { if err := tmpl.Execute(os.Stdout, child); err != nil { return err diff --git a/cmd/server/docs/docs.go b/cmd/server/docs/docs.go index 876de38b9..36faf48a4 100644 --- a/cmd/server/docs/docs.go +++ b/cmd/server/docs/docs.go @@ -3717,12 +3717,6 @@ const docTemplate = `{ "status": { "$ref": "#/definitions/StatusValue" }, - "steps": { - "type": "array", - "items": { - "$ref": "#/definitions/Step" - } - }, "timestamp": { "type": "integer" }, @@ -3737,6 +3731,12 @@ const docTemplate = `{ "additionalProperties": { "type": "string" } + }, + "workflows": { + "type": "array", + "items": { + "$ref": "#/definitions/model.Workflow" + } } } }, @@ -3974,24 +3974,9 @@ const docTemplate = `{ "Step": { "type": "object", "properties": { - "agent_id": { - "type": "integer" - }, - "children": { - "type": "array", - "items": { - "$ref": "#/definitions/Step" - } - }, "end_time": { "type": "integer" }, - "environ": { - "type": "object", - "additionalProperties": { - "type": "string" - } - }, "error": { "type": "string" }, @@ -4010,9 +3995,6 @@ const docTemplate = `{ "pipeline_id": { "type": "integer" }, - "platform": { - "type": "string" - }, "ppid": { "type": "integer" }, @@ -4128,6 +4110,53 @@ const docTemplate = `{ "LogEntryMetadata", "LogEntryProgress" ] + }, + "model.Workflow": { + "type": "object", + "properties": { + "agent_id": { + "type": "integer" + }, + "children": { + "type": "array", + "items": { + "$ref": "#/definitions/Step" + } + }, + "end_time": { + "type": "integer" + }, + "environ": { + "type": "object", + "additionalProperties": { + "type": "string" + } + }, + "error": { + "type": "string" + }, + "id": { + "type": "integer" + }, + "name": { + "type": "string" + }, + "pid": { + "type": "integer" + }, + "pipeline_id": { + "type": "integer" + }, + "platform": { + "type": "string" + }, + "start_time": { + "type": "integer" + }, + "state": { + "$ref": "#/definitions/StatusValue" + } + } } } }` diff --git a/docs/docs/30-administration/10-server-config.md b/docs/docs/30-administration/10-server-config.md index 7456b2576..6e8a64c35 100644 --- a/docs/docs/30-administration/10-server-config.md +++ b/docs/docs/30-administration/10-server-config.md @@ -364,9 +364,10 @@ Context prefix Woodpecker will use to publish status messages to SCM. You probab Template for the status messages published to forges, uses [Go templates](https://pkg.go.dev/text/template) as template language. Supported variables: + - `context`: Woodpecker's context (see `WOODPECKER_STATUS_CONTEXT`) - `event`: the event which started the pipeline -- `pipeline`: the pipeline's name +- `workflow`: the workflow's name - `owner`: the repo's owner - `repo`: the repo's name diff --git a/pipeline/frontend/metadata.go b/pipeline/frontend/metadata.go index 412005ff2..b790bfddb 100644 --- a/pipeline/frontend/metadata.go +++ b/pipeline/frontend/metadata.go @@ -37,7 +37,7 @@ func EnvVarSubst(yaml string, environ map[string]string) (string, error) { } // MetadataFromStruct return the metadata from a pipeline will run with. -func MetadataFromStruct(forge metadata.ServerForge, repo *model.Repo, pipeline, last *model.Pipeline, workflow *model.Step, link string) metadata.Metadata { +func MetadataFromStruct(forge metadata.ServerForge, repo *model.Repo, pipeline, last *model.Pipeline, workflow *model.Workflow, link string) metadata.Metadata { host := link uri, err := url.Parse(link) if err == nil { diff --git a/pipeline/frontend/metadata_test.go b/pipeline/frontend/metadata_test.go index 3def1c344..9574e67f5 100644 --- a/pipeline/frontend/metadata_test.go +++ b/pipeline/frontend/metadata_test.go @@ -61,7 +61,7 @@ func TestMetadataFromStruct(t *testing.T) { forge metadata.ServerForge repo *model.Repo pipeline, last *model.Pipeline - workflow *model.Step + workflow *model.Workflow link string expectedMetadata metadata.Metadata expectedEnviron map[string]string @@ -92,7 +92,7 @@ func TestMetadataFromStruct(t *testing.T) { repo: &model.Repo{FullName: "testUser/testRepo", Link: "https://gitea.com/testUser/testRepo", Clone: "https://gitea.com/testUser/testRepo.git", Branch: "main", IsSCMPrivate: true}, pipeline: &model.Pipeline{Number: 3}, last: &model.Pipeline{Number: 2}, - workflow: &model.Step{Name: "hello"}, + workflow: &model.Workflow{Name: "hello"}, link: "https://example.com", expectedMetadata: metadata.Metadata{ Forge: metadata.Forge{Type: "gitea", URL: "https://gitea.com"}, diff --git a/pipeline/stepBuilder.go b/pipeline/stepBuilder.go index 6772470a0..620ada31a 100644 --- a/pipeline/stepBuilder.go +++ b/pipeline/stepBuilder.go @@ -20,7 +20,6 @@ import ( "path/filepath" "strings" - "github.com/google/uuid" "github.com/oklog/ulid/v2" "github.com/rs/zerolog/log" @@ -53,7 +52,7 @@ type StepBuilder struct { } type Item struct { - Workflow *model.Step + Workflow *model.Workflow Platform string Labels map[string]string DependsOn []string @@ -79,8 +78,7 @@ func (b *StepBuilder) Build() ([]*Item, error) { } for _, axis := range axes { - workflow := &model.Step{ - UUID: uuid.New().String(), // TODO(#1784): Remove once workflows are a separate entity in database + workflow := &model.Workflow{ PipelineID: b.Curr.ID, PID: pidSequence, State: model.StatusPending, @@ -284,7 +282,6 @@ func (b *StepBuilder) toInternalRepresentation(parsed *yaml_types.Workflow, envi func SetPipelineStepsOnPipeline(pipeline *model.Pipeline, pipelineItems []*Item) *model.Pipeline { var pidSequence int for _, item := range pipelineItems { - pipeline.Steps = append(pipeline.Steps, item.Workflow) if pidSequence < item.Workflow.PID { pidSequence = item.Workflow.PID } @@ -309,9 +306,10 @@ func SetPipelineStepsOnPipeline(pipeline *model.Pipeline, pipelineItems []*Item) if item.Workflow.State == model.StatusSkipped { step.State = model.StatusSkipped } - pipeline.Steps = append(pipeline.Steps, step) + item.Workflow.Children = append(item.Workflow.Children, step) } } + pipeline.Workflows = append(pipeline.Workflows, item.Workflow) } return pipeline diff --git a/pipeline/stepBuilder_test.go b/pipeline/stepBuilder_test.go index dc61e82c1..4c374536f 100644 --- a/pipeline/stepBuilder_test.go +++ b/pipeline/stepBuilder_test.go @@ -545,14 +545,11 @@ steps: if err != nil { t.Fatal(err) } - if len(pipeline.Steps) != 3 { + if len(pipeline.Workflows) != 1 { t.Fatal("Should generate three in total") } - if pipeline.Steps[1].PPID != 1 { - t.Fatal("Clone step should be a children of the stage") - } - if pipeline.Steps[2].PPID != 1 { - t.Fatal("Pipeline step should be a children of the stage") + if len(pipeline.Workflows[0].Children) != 2 { + t.Fatal("Workflow should have two children") } } diff --git a/server/api/pipeline.go b/server/api/pipeline.go index c4dbcb0a3..68477b073 100644 --- a/server/api/pipeline.go +++ b/server/api/pipeline.go @@ -152,8 +152,7 @@ func GetPipeline(c *gin.Context) { _ = c.AbortWithError(http.StatusInternalServerError, err) return } - steps, _ := _store.StepList(pl) - if pl.Steps, err = model.Tree(steps); err != nil { + if pl.Workflows, err = _store.WorkflowGetTree(pl); err != nil { _ = c.AbortWithError(http.StatusInternalServerError, err) return } @@ -172,12 +171,7 @@ func GetPipelineLast(c *gin.Context) { return } - steps, err := _store.StepList(pl) - if err != nil { - _ = c.AbortWithError(http.StatusInternalServerError, err) - return - } - if pl.Steps, err = model.Tree(steps); err != nil { + if pl.Workflows, err = _store.WorkflowGetTree(pl); err != nil { _ = c.AbortWithError(http.StatusInternalServerError, err) return } diff --git a/server/forge/bitbucket/bitbucket.go b/server/forge/bitbucket/bitbucket.go index b81ffbcf4..62a76cb1e 100644 --- a/server/forge/bitbucket/bitbucket.go +++ b/server/forge/bitbucket/bitbucket.go @@ -21,9 +21,10 @@ import ( "net/http" "net/url" - shared_utils "github.com/woodpecker-ci/woodpecker/shared/utils" "golang.org/x/oauth2" + shared_utils "github.com/woodpecker-ci/woodpecker/shared/utils" + "github.com/woodpecker-ci/woodpecker/server" "github.com/woodpecker-ci/woodpecker/server/forge" "github.com/woodpecker-ci/woodpecker/server/forge/bitbucket/internal" @@ -227,7 +228,7 @@ func (c *config) Dir(_ context.Context, _ *model.User, _ *model.Repo, _ *model.P } // Status creates a pipeline status for the Bitbucket commit. -func (c *config) Status(ctx context.Context, user *model.User, repo *model.Repo, pipeline *model.Pipeline, _ *model.Step) error { +func (c *config) Status(ctx context.Context, user *model.User, repo *model.Repo, pipeline *model.Pipeline, _ *model.Workflow) error { status := internal.PipelineStatus{ State: convertStatus(pipeline.Status), Desc: common.GetPipelineStatusDescription(pipeline.Status), diff --git a/server/forge/bitbucket/bitbucket_test.go b/server/forge/bitbucket/bitbucket_test.go index ad90acc7a..c5a6d59e2 100644 --- a/server/forge/bitbucket/bitbucket_test.go +++ b/server/forge/bitbucket/bitbucket_test.go @@ -227,7 +227,7 @@ func Test_bitbucket(t *testing.T) { }) g.It("Should update the status", func() { - err := c.Status(ctx, fakeUser, fakeRepo, fakePipeline, fakeStep) + err := c.Status(ctx, fakeUser, fakeRepo, fakePipeline, fakeWorkflow) g.Assert(err).IsNil() }) @@ -309,7 +309,7 @@ var ( Commit: "9ecad50", } - fakeStep = &model.Step{ + fakeWorkflow = &model.Workflow{ Name: "test", State: model.StatusSuccess, } diff --git a/server/forge/bitbucketserver/bitbucketserver.go b/server/forge/bitbucketserver/bitbucketserver.go index 80569bdf6..003f25a06 100644 --- a/server/forge/bitbucketserver/bitbucketserver.go +++ b/server/forge/bitbucketserver/bitbucketserver.go @@ -200,7 +200,7 @@ func (c *Config) Dir(_ context.Context, _ *model.User, _ *model.Repo, _ *model.P } // Status is not supported by the bitbucketserver driver. -func (c *Config) Status(ctx context.Context, user *model.User, repo *model.Repo, pipeline *model.Pipeline, _ *model.Step) error { +func (c *Config) Status(ctx context.Context, user *model.User, repo *model.Repo, pipeline *model.Pipeline, _ *model.Workflow) error { status := internal.PipelineStatus{ State: convertStatus(pipeline.Status), Desc: common.GetPipelineStatusDescription(pipeline.Status), diff --git a/server/forge/common/status.go b/server/forge/common/status.go index 952eaa864..cae222fbc 100644 --- a/server/forge/common/status.go +++ b/server/forge/common/status.go @@ -23,7 +23,7 @@ import ( "github.com/woodpecker-ci/woodpecker/server/model" ) -func GetPipelineStatusContext(repo *model.Repo, pipeline *model.Pipeline, step *model.Step) string { +func GetPipelineStatusContext(repo *model.Repo, pipeline *model.Pipeline, workflow *model.Workflow) string { event := string(pipeline.Event) switch pipeline.Event { case model.EventPull: @@ -38,7 +38,7 @@ func GetPipelineStatusContext(repo *model.Repo, pipeline *model.Pipeline, step * err = tmpl.Execute(&ctx, map[string]interface{}{ "context": server.Config.Server.StatusContext, "event": event, - "pipeline": step.Name, + "workflow": workflow.Name, "owner": repo.Owner, "repo": repo.Name, }) @@ -72,10 +72,10 @@ func GetPipelineStatusDescription(status model.StatusValue) string { } } -func GetPipelineStatusLink(repo *model.Repo, pipeline *model.Pipeline, step *model.Step) string { - if step == nil { +func GetPipelineStatusLink(repo *model.Repo, pipeline *model.Pipeline, workflow *model.Workflow) string { + if workflow == nil { return fmt.Sprintf("%s/repos/%d/pipeline/%d", server.Config.Server.Host, repo.ID, pipeline.Number) } - return fmt.Sprintf("%s/repos/%d/pipeline/%d/%d", server.Config.Server.Host, repo.ID, pipeline.Number, step.PID) + return fmt.Sprintf("%s/repos/%d/pipeline/%d/%d", server.Config.Server.Host, repo.ID, pipeline.Number, workflow.PID) } diff --git a/server/forge/common/status_test.go b/server/forge/common/status_test.go index 6bc467a7d..c3766159d 100644 --- a/server/forge/common/status_test.go +++ b/server/forge/common/status_test.go @@ -33,17 +33,17 @@ func TestGetPipelineStatusContext(t *testing.T) { repo := &model.Repo{Owner: "user1", Name: "repo1"} pipeline := &model.Pipeline{Event: model.EventPull} - step := &model.Step{Name: "lint"} + workflow := &model.Workflow{Name: "lint"} - assert.EqualValues(t, "", GetPipelineStatusContext(repo, pipeline, step)) + assert.EqualValues(t, "", GetPipelineStatusContext(repo, pipeline, workflow)) server.Config.Server.StatusContext = "ci/woodpecker" - server.Config.Server.StatusContextFormat = "{{ .context }}/{{ .event }}/{{ .pipeline }}" - assert.EqualValues(t, "ci/woodpecker/pr/lint", GetPipelineStatusContext(repo, pipeline, step)) + server.Config.Server.StatusContextFormat = "{{ .context }}/{{ .event }}/{{ .workflow }}" + assert.EqualValues(t, "ci/woodpecker/pr/lint", GetPipelineStatusContext(repo, pipeline, workflow)) pipeline.Event = model.EventPush - assert.EqualValues(t, "ci/woodpecker/push/lint", GetPipelineStatusContext(repo, pipeline, step)) + assert.EqualValues(t, "ci/woodpecker/push/lint", GetPipelineStatusContext(repo, pipeline, workflow)) server.Config.Server.StatusContext = "ci" - server.Config.Server.StatusContextFormat = "{{ .context }}:{{ .owner }}/{{ .repo }}:{{ .event }}:{{ .pipeline }}" - assert.EqualValues(t, "ci:user1/repo1:push:lint", GetPipelineStatusContext(repo, pipeline, step)) + server.Config.Server.StatusContextFormat = "{{ .context }}:{{ .owner }}/{{ .repo }}:{{ .event }}:{{ .workflow }}" + assert.EqualValues(t, "ci:user1/repo1:push:lint", GetPipelineStatusContext(repo, pipeline, workflow)) } diff --git a/server/forge/forge.go b/server/forge/forge.go index 70d262d33..6f4fcd1eb 100644 --- a/server/forge/forge.go +++ b/server/forge/forge.go @@ -61,7 +61,7 @@ type Forge interface { // Status sends the commit status to the forge. // An example would be the GitHub pull request status. - Status(ctx context.Context, u *model.User, r *model.Repo, b *model.Pipeline, p *model.Step) error + Status(ctx context.Context, u *model.User, r *model.Repo, b *model.Pipeline, p *model.Workflow) error // Netrc returns a .netrc file that can be used to clone // private repositories from a forge. diff --git a/server/forge/gitea/gitea.go b/server/forge/gitea/gitea.go index e0e4a6bcd..640869500 100644 --- a/server/forge/gitea/gitea.go +++ b/server/forge/gitea/gitea.go @@ -321,7 +321,7 @@ func (c *Gitea) Dir(ctx context.Context, u *model.User, r *model.Repo, b *model. } // Status is supported by the Gitea driver. -func (c *Gitea) Status(ctx context.Context, user *model.User, repo *model.Repo, pipeline *model.Pipeline, step *model.Step) error { +func (c *Gitea) Status(ctx context.Context, user *model.User, repo *model.Repo, pipeline *model.Pipeline, workflow *model.Workflow) error { client, err := c.newClientToken(ctx, user.Token) if err != nil { return err @@ -332,10 +332,10 @@ func (c *Gitea) Status(ctx context.Context, user *model.User, repo *model.Repo, repo.Name, pipeline.Commit, gitea.CreateStatusOption{ - State: getStatus(step.State), - TargetURL: common.GetPipelineStatusLink(repo, pipeline, step), - Description: common.GetPipelineStatusDescription(step.State), - Context: common.GetPipelineStatusContext(repo, pipeline, step), + State: getStatus(workflow.State), + TargetURL: common.GetPipelineStatusLink(repo, pipeline, workflow), + Description: common.GetPipelineStatusDescription(workflow.State), + Context: common.GetPipelineStatusContext(repo, pipeline, workflow), }, ) return err diff --git a/server/forge/gitea/gitea_test.go b/server/forge/gitea/gitea_test.go index e2e104220..66d73da50 100644 --- a/server/forge/gitea/gitea_test.go +++ b/server/forge/gitea/gitea_test.go @@ -25,12 +25,12 @@ import ( "github.com/franela/goblin" "github.com/gin-gonic/gin" "github.com/stretchr/testify/mock" - "github.com/woodpecker-ci/woodpecker/shared/utils" "github.com/woodpecker-ci/woodpecker/server/forge/gitea/fixtures" "github.com/woodpecker-ci/woodpecker/server/model" "github.com/woodpecker-ci/woodpecker/server/store" mocks_store "github.com/woodpecker-ci/woodpecker/server/store/mocks" + "github.com/woodpecker-ci/woodpecker/shared/utils" ) func Test_gitea(t *testing.T) { @@ -132,7 +132,7 @@ func Test_gitea(t *testing.T) { }) g.It("Should return nil from send pipeline status", func() { - err := c.Status(ctx, fakeUser, fakeRepo, fakePipeline, fakeStep) + err := c.Status(ctx, fakeUser, fakeRepo, fakePipeline, fakeWorkflow) g.Assert(err).IsNil() }) @@ -195,7 +195,7 @@ var ( Commit: "9ecad50", } - fakeStep = &model.Step{ + fakeWorkflow = &model.Workflow{ Name: "test", State: model.StatusSuccess, } diff --git a/server/forge/github/github.go b/server/forge/github/github.go index f4f742170..4c7f6034f 100644 --- a/server/forge/github/github.go +++ b/server/forge/github/github.go @@ -456,7 +456,7 @@ var reDeploy = regexp.MustCompile(`.+/deployments/(\d+)`) // Status sends the commit status to the forge. // An example would be the GitHub pull request status. -func (c *client) Status(ctx context.Context, user *model.User, repo *model.Repo, pipeline *model.Pipeline, step *model.Step) error { +func (c *client) Status(ctx context.Context, user *model.User, repo *model.Repo, pipeline *model.Pipeline, workflow *model.Workflow) error { client := c.newClientToken(ctx, user.Token) if pipeline.Event == model.EventDeploy { @@ -475,10 +475,10 @@ func (c *client) Status(ctx context.Context, user *model.User, repo *model.Repo, } _, _, err := client.Repositories.CreateStatus(ctx, repo.Owner, repo.Name, pipeline.Commit, &github.RepoStatus{ - Context: github.String(common.GetPipelineStatusContext(repo, pipeline, step)), - State: github.String(convertStatus(step.State)), - Description: github.String(common.GetPipelineStatusDescription(step.State)), - TargetURL: github.String(common.GetPipelineStatusLink(repo, pipeline, step)), + Context: github.String(common.GetPipelineStatusContext(repo, pipeline, workflow)), + State: github.String(convertStatus(workflow.State)), + Description: github.String(common.GetPipelineStatusDescription(workflow.State)), + TargetURL: github.String(common.GetPipelineStatusLink(repo, pipeline, workflow)), }) return err } diff --git a/server/forge/gitlab/gitlab.go b/server/forge/gitlab/gitlab.go index cdfdf15e0..a7d302e68 100644 --- a/server/forge/gitlab/gitlab.go +++ b/server/forge/gitlab/gitlab.go @@ -395,7 +395,7 @@ func (g *GitLab) Dir(ctx context.Context, user *model.User, repo *model.Repo, pi } // Status sends the commit status back to gitlab. -func (g *GitLab) Status(ctx context.Context, user *model.User, repo *model.Repo, pipeline *model.Pipeline, step *model.Step) error { +func (g *GitLab) Status(ctx context.Context, user *model.User, repo *model.Repo, pipeline *model.Pipeline, workflow *model.Workflow) error { client, err := newClient(g.url, user.Token, g.SkipVerify) if err != nil { return err @@ -407,10 +407,10 @@ func (g *GitLab) Status(ctx context.Context, user *model.User, repo *model.Repo, } _, _, err = client.Commits.SetCommitStatus(_repo.ID, pipeline.Commit, &gitlab.SetCommitStatusOptions{ - State: getStatus(step.State), - Description: gitlab.String(common.GetPipelineStatusDescription(step.State)), - TargetURL: gitlab.String(common.GetPipelineStatusLink(repo, pipeline, step)), - Context: gitlab.String(common.GetPipelineStatusContext(repo, pipeline, step)), + State: getStatus(workflow.State), + Description: gitlab.String(common.GetPipelineStatusDescription(workflow.State)), + TargetURL: gitlab.String(common.GetPipelineStatusLink(repo, pipeline, workflow)), + Context: gitlab.String(common.GetPipelineStatusContext(repo, pipeline, workflow)), }, gitlab.WithContext(ctx)) return err diff --git a/server/forge/mocks/forge.go b/server/forge/mocks/forge.go index ecb9cc961..d39649740 100644 --- a/server/forge/mocks/forge.go +++ b/server/forge/mocks/forge.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.28.2. DO NOT EDIT. +// Code generated by mockery v2.29.0. DO NOT EDIT. package mocks @@ -379,11 +379,11 @@ func (_m *Forge) Repos(ctx context.Context, u *model.User) ([]*model.Repo, error } // Status provides a mock function with given fields: ctx, u, r, b, p -func (_m *Forge) Status(ctx context.Context, u *model.User, r *model.Repo, b *model.Pipeline, p *model.Step) error { +func (_m *Forge) Status(ctx context.Context, u *model.User, r *model.Repo, b *model.Pipeline, p *model.Workflow) error { ret := _m.Called(ctx, u, r, b, p) var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *model.User, *model.Repo, *model.Pipeline, *model.Step) error); ok { + if rf, ok := ret.Get(0).(func(context.Context, *model.User, *model.Repo, *model.Pipeline, *model.Workflow) error); ok { r0 = rf(ctx, u, r, b, p) } else { r0 = ret.Error(0) diff --git a/server/grpc/rpc.go b/server/grpc/rpc.go index 3d750309f..69bae9b47 100644 --- a/server/grpc/rpc.go +++ b/server/grpc/rpc.go @@ -105,24 +105,24 @@ func (s *RPC) Extend(c context.Context, id string) error { // Update implements the rpc.Update function func (s *RPC) Update(c context.Context, id string, state rpc.State) error { - stepID, err := strconv.ParseInt(id, 10, 64) + workflowID, err := strconv.ParseInt(id, 10, 64) if err != nil { return err } - pstep, err := s.store.StepLoad(stepID) + workflow, err := s.store.WorkflowLoad(workflowID) if err != nil { - log.Error().Msgf("error: rpc.update: cannot find step with id %d: %s", stepID, err) + log.Error().Msgf("error: rpc.update: cannot find workflow with id %d: %s", workflowID, err) return err } - currentPipeline, err := s.store.GetPipeline(pstep.PipelineID) + currentPipeline, err := s.store.GetPipeline(workflow.PipelineID) if err != nil { - log.Error().Msgf("error: cannot find pipeline with id %d: %s", pstep.PipelineID, err) + log.Error().Msgf("error: cannot find pipeline with id %d: %s", workflow.PipelineID, err) return err } - step, err := s.store.StepChild(currentPipeline, pstep.PID, state.Step) + step, err := s.store.StepChild(currentPipeline, workflow.PID, state.Step) if err != nil { log.Error().Msgf("error: cannot find step with name %s: %s", state.Step, err) return err @@ -134,17 +134,11 @@ func (s *RPC) Update(c context.Context, id string, state rpc.State) error { return err } - if step, err = pipeline.UpdateStepStatus(s.store, *step, state, currentPipeline.Started); err != nil { + if err := pipeline.UpdateStepStatus(s.store, step, state, currentPipeline.Started); err != nil { log.Error().Err(err).Msg("rpc.update: cannot update step") } - s.updateForgeStatus(c, repo, currentPipeline, step) - - currentPipeline.Steps, err = s.store.StepList(currentPipeline) - if err != nil { - log.Error().Err(err).Msg("can not get step list from store") - } - if currentPipeline.Steps, err = model.Tree(currentPipeline.Steps); err != nil { + if currentPipeline.Workflows, err = s.store.WorkflowGetTree(currentPipeline); err != nil { log.Error().Err(err).Msg("can not build tree from step list") return err } @@ -172,7 +166,7 @@ func (s *RPC) Init(c context.Context, id string, state rpc.State) error { return err } - step, err := s.store.StepLoad(stepID) + workflow, err := s.store.WorkflowLoad(stepID) if err != nil { log.Error().Msgf("error: cannot find step with id %d: %s", stepID, err) return err @@ -182,11 +176,11 @@ func (s *RPC) Init(c context.Context, id string, state rpc.State) error { if err != nil { return err } - step.AgentID = agent.ID + workflow.AgentID = agent.ID - currentPipeline, err := s.store.GetPipeline(step.PipelineID) + currentPipeline, err := s.store.GetPipeline(workflow.PipelineID) if err != nil { - log.Error().Msgf("error: cannot find pipeline with id %d: %s", step.PipelineID, err) + log.Error().Msgf("error: cannot find pipeline with id %d: %s", workflow.PipelineID, err) return err } @@ -202,10 +196,10 @@ func (s *RPC) Init(c context.Context, id string, state rpc.State) error { } } - s.updateForgeStatus(c, repo, currentPipeline, step) + s.updateForgeStatus(c, repo, currentPipeline, workflow) defer func() { - currentPipeline.Steps, _ = s.store.StepList(currentPipeline) + currentPipeline.Workflows, _ = s.store.WorkflowGetTree(currentPipeline) message := pubsub.Message{ Labels: map[string]string{ "repo": repo.FullName, @@ -221,11 +215,11 @@ func (s *RPC) Init(c context.Context, id string, state rpc.State) error { } }() - step, err = pipeline.UpdateStepToStatusStarted(s.store, *step, state) + workflow, err = pipeline.UpdateWorkflowToStatusStarted(s.store, *workflow, state) if err != nil { return err } - s.updateForgeStatus(c, repo, currentPipeline, step) + s.updateForgeStatus(c, repo, currentPipeline, workflow) return nil } @@ -236,12 +230,17 @@ func (s *RPC) Done(c context.Context, id string, state rpc.State) error { return err } - workflow, err := s.store.StepLoad(workflowID) + workflow, err := s.store.WorkflowLoad(workflowID) if err != nil { log.Error().Err(err).Msgf("cannot find step with id %d", workflowID) return err } + workflow.Children, err = s.store.StepListFromWorkflowFind(workflow) + if err != nil { + return err + } + currentPipeline, err := s.store.GetPipeline(workflow.PipelineID) if err != nil { log.Error().Err(err).Msgf("cannot find pipeline with id %d", workflow.PipelineID) @@ -261,7 +260,7 @@ func (s *RPC) Done(c context.Context, id string, state rpc.State) error { logger.Trace().Msgf("gRPC Done with state: %#v", state) - if workflow, err = pipeline.UpdateStepStatusToDone(s.store, *workflow, state); err != nil { + if workflow, err = pipeline.UpdateWorkflowStatusToDone(s.store, *workflow, state); err != nil { logger.Error().Err(err).Msgf("pipeline.UpdateStepStatusToDone: cannot update workflow state: %s", err) } @@ -275,14 +274,14 @@ func (s *RPC) Done(c context.Context, id string, state rpc.State) error { logger.Error().Err(queueErr).Msg("queue.Done: cannot ack workflow") } - steps, err := s.store.StepList(currentPipeline) + currentPipeline.Workflows, err = s.store.WorkflowGetTree(currentPipeline) if err != nil { return err } - s.completeChildrenIfParentCompleted(steps, workflow) + s.completeChildrenIfParentCompleted(workflow) - if !model.IsThereRunningStage(steps) { - if currentPipeline, err = pipeline.UpdateStatusToDone(s.store, *currentPipeline, model.PipelineStatus(steps), workflow.Stopped); err != nil { + if !model.IsThereRunningStage(currentPipeline.Workflows) { + if currentPipeline, err = pipeline.UpdateStatusToDone(s.store, *currentPipeline, model.PipelineStatus(currentPipeline.Workflows), workflow.Stopped); err != nil { logger.Error().Err(err).Msgf("pipeline.UpdateStatusToDone: cannot update workflow final state") } } @@ -291,14 +290,16 @@ func (s *RPC) Done(c context.Context, id string, state rpc.State) error { // make sure writes to pubsub are non blocking (https://github.com/woodpecker-ci/woodpecker/blob/c919f32e0b6432a95e1a6d3d0ad662f591adf73f/server/logging/log.go#L9) go func() { - for _, step := range steps { - if err := s.logger.Close(c, step.ID); err != nil { - logger.Error().Err(err).Msgf("done: cannot close log stream for step %d", step.ID) + for _, wf := range currentPipeline.Workflows { + for _, step := range wf.Children { + if err := s.logger.Close(c, step.ID); err != nil { + logger.Error().Err(err).Msgf("done: cannot close log stream for step %d", step.ID) + } } } }() - if err := s.notify(c, repo, currentPipeline, steps); err != nil { + if err := s.notify(c, repo, currentPipeline); err != nil { return err } @@ -306,7 +307,7 @@ func (s *RPC) Done(c context.Context, id string, state rpc.State) error { s.pipelineCount.WithLabelValues(repo.FullName, currentPipeline.Branch, string(currentPipeline.Status), "total").Inc() s.pipelineTime.WithLabelValues(repo.FullName, currentPipeline.Branch, string(currentPipeline.Status), "total").Set(float64(currentPipeline.Finished - currentPipeline.Started)) } - if model.IsMultiPipeline(steps) { + if currentPipeline.IsMultiPipeline() { s.pipelineTime.WithLabelValues(repo.FullName, currentPipeline.Branch, string(workflow.State), workflow.Name).Set(float64(workflow.Stopped - workflow.Started)) } @@ -372,17 +373,17 @@ func (s *RPC) ReportHealth(ctx context.Context, status string) error { return s.store.AgentUpdate(agent) } -func (s *RPC) completeChildrenIfParentCompleted(steps []*model.Step, completedWorkflow *model.Step) { - for _, p := range steps { - if p.Running() && p.PPID == completedWorkflow.PID { - if _, err := pipeline.UpdateStepToStatusSkipped(s.store, *p, completedWorkflow.Stopped); err != nil { - log.Error().Msgf("error: done: cannot update step_id %d child state: %s", p.ID, err) +func (s *RPC) completeChildrenIfParentCompleted(completedWorkflow *model.Workflow) { + for _, c := range completedWorkflow.Children { + if c.Running() { + if _, err := pipeline.UpdateStepToStatusSkipped(s.store, *c, completedWorkflow.Stopped); err != nil { + log.Error().Msgf("error: done: cannot update step_id %d child state: %s", c.ID, err) } } } } -func (s *RPC) updateForgeStatus(ctx context.Context, repo *model.Repo, pipeline *model.Pipeline, step *model.Step) { +func (s *RPC) updateForgeStatus(ctx context.Context, repo *model.Repo, pipeline *model.Pipeline, workflow *model.Workflow) { user, err := s.store.GetUser(repo.UserID) if err != nil { log.Error().Err(err).Msgf("can not get user with id '%d'", repo.UserID) @@ -401,18 +402,15 @@ func (s *RPC) updateForgeStatus(ctx context.Context, repo *model.Repo, pipeline } // only do status updates for parent steps - if step != nil && step.IsParent() { - err = s.forge.Status(ctx, user, repo, pipeline, step) + if workflow != nil { + err = s.forge.Status(ctx, user, repo, pipeline, workflow) if err != nil { log.Error().Err(err).Msgf("error setting commit status for %s/%d", repo.FullName, pipeline.Number) } } } -func (s *RPC) notify(c context.Context, repo *model.Repo, pipeline *model.Pipeline, steps []*model.Step) (err error) { - if pipeline.Steps, err = model.Tree(steps); err != nil { - return err - } +func (s *RPC) notify(c context.Context, repo *model.Repo, pipeline *model.Pipeline) (err error) { message := pubsub.Message{ Labels: map[string]string{ "repo": repo.FullName, diff --git a/server/model/pipeline.go b/server/model/pipeline.go index 0ef4da99c..77b39ab49 100644 --- a/server/model/pipeline.go +++ b/server/model/pipeline.go @@ -45,7 +45,7 @@ type Pipeline struct { Link string `json:"link_url" xorm:"pipeline_link"` Reviewer string `json:"reviewed_by" xorm:"pipeline_reviewer"` Reviewed int64 `json:"reviewed_at" xorm:"pipeline_reviewed"` - Steps []*Step `json:"steps,omitempty" xorm:"-"` + Workflows []*Workflow `json:"workflows,omitempty" xorm:"-"` ChangedFiles []string `json:"changed_files,omitempty" xorm:"json 'changed_files'"` AdditionalVariables map[string]string `json:"variables,omitempty" xorm:"json 'additional_variables'"` PullRequestLabels []string `json:"pr_labels,omitempty" xorm:"json 'pr_labels'"` @@ -56,6 +56,11 @@ func (Pipeline) TableName() string { return "pipelines" } +// IsMultiPipeline checks if step list contain more than one parent step +func (p Pipeline) IsMultiPipeline() bool { + return len(p.Workflows) > 1 +} + type UpdatePipelineStore interface { UpdatePipeline(*Pipeline) error } diff --git a/server/model/step.go b/server/model/step.go index 41b93a51b..c751b5b63 100644 --- a/server/model/step.go +++ b/server/model/step.go @@ -15,8 +15,6 @@ package model -import "fmt" - // StepStore persists process information to storage. type StepStore interface { StepLoad(int64) (*Step, error) @@ -30,21 +28,17 @@ type StepStore interface { // Step represents a process in the pipeline. type Step struct { - ID int64 `json:"id" xorm:"pk autoincr 'step_id'"` - UUID string `json:"uuid" xorm:"UNIQUE INDEX 'step_uuid'"` - PipelineID int64 `json:"pipeline_id" xorm:"UNIQUE(s) INDEX 'step_pipeline_id'"` - PID int `json:"pid" xorm:"UNIQUE(s) 'step_pid'"` - PPID int `json:"ppid" xorm:"step_ppid"` - Name string `json:"name" xorm:"step_name"` - State StatusValue `json:"state" xorm:"step_state"` - Error string `json:"error,omitempty" xorm:"VARCHAR(500) step_error"` - ExitCode int `json:"exit_code" xorm:"step_exit_code"` - Started int64 `json:"start_time,omitempty" xorm:"step_started"` - Stopped int64 `json:"end_time,omitempty" xorm:"step_stopped"` - AgentID int64 `json:"agent_id,omitempty" xorm:"step_agent_id"` - Platform string `json:"platform,omitempty" xorm:"step_platform"` - Environ map[string]string `json:"environ,omitempty" xorm:"json 'step_environ'"` - Children []*Step `json:"children,omitempty" xorm:"-"` + ID int64 `json:"id" xorm:"pk autoincr 'step_id'"` + UUID string `json:"uuid" xorm:"UNIQUE INDEX 'step_uuid'"` + PipelineID int64 `json:"pipeline_id" xorm:"UNIQUE(s) INDEX 'step_pipeline_id'"` + PID int `json:"pid" xorm:"UNIQUE(s) 'step_pid'"` + PPID int `json:"ppid" xorm:"step_ppid"` + Name string `json:"name" xorm:"step_name"` + State StatusValue `json:"state" xorm:"step_state"` + Error string `json:"error,omitempty" xorm:"VARCHAR(500) step_error"` + ExitCode int `json:"exit_code" xorm:"step_exit_code"` + Started int64 `json:"start_time,omitempty" xorm:"step_started"` + Stopped int64 `json:"end_time,omitempty" xorm:"step_stopped"` } // @name Step type UpdateStepStore interface { @@ -65,83 +59,3 @@ func (p *Step) Running() bool { func (p *Step) Failing() bool { return p.State == StatusError || p.State == StatusKilled || p.State == StatusFailure } - -// IsParent returns true if the process is a parent process. -func (p *Step) IsParent() bool { - return p.PPID == 0 -} - -// IsMultiPipeline checks if step list contain more than one parent step -func IsMultiPipeline(steps []*Step) bool { - c := 0 - for _, step := range steps { - if step.IsParent() { - c++ - } - if c > 1 { - return true - } - } - return false -} - -// Tree creates a process tree from a flat process list. -func Tree(steps []*Step) ([]*Step, error) { - var nodes []*Step - - // init parent nodes - for i := range steps { - if steps[i].IsParent() { - nodes = append(nodes, steps[i]) - } - } - - // assign children to parents - for i := range steps { - if !steps[i].IsParent() { - parent, err := findNode(nodes, steps[i].PPID) - if err != nil { - return nil, err - } - parent.Children = append(parent.Children, steps[i]) - } - } - - return nodes, nil -} - -// PipelineStatus determine pipeline status based on corresponding step list -func PipelineStatus(steps []*Step) StatusValue { - status := StatusSuccess - - for _, p := range steps { - if p.IsParent() && p.Failing() { - status = p.State - } - } - - return status -} - -// IsThereRunningStage determine if it contains steps running or pending to run -// TODO: return false based on depends_on (https://github.com/woodpecker-ci/woodpecker/pull/730#discussion_r795681697) -func IsThereRunningStage(steps []*Step) bool { - for _, p := range steps { - if p.IsParent() { - if p.Running() { - return true - } - } - } - return false -} - -func findNode(nodes []*Step, pid int) (*Step, error) { - for _, node := range nodes { - if node.PID == pid { - return node, nil - } - } - - return nil, fmt.Errorf("Corrupt step structure") -} diff --git a/server/model/step_test.go b/server/model/step_test.go deleted file mode 100644 index 21d6df69e..000000000 --- a/server/model/step_test.go +++ /dev/null @@ -1,69 +0,0 @@ -// Copyright 2021 Woodpecker Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package model - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestTree(t *testing.T) { - steps := []*Step{{ - ID: 25, - UUID: "f80df0bb-77a7-4964-9412-2e1049872d57", - PID: 2, - PipelineID: 6, - PPID: 1, - Name: "clone", - State: StatusSuccess, - Error: "0", - }, { - ID: 24, - UUID: "c19b49c5-990d-4722-ba9c-1c4fe9db1f91", - PipelineID: 6, - PID: 1, - PPID: 0, - Name: "lint", - State: StatusFailure, - Error: "1", - }, { - ID: 26, - UUID: "4380146f-c0ff-4482-8107-c90937d1faba", - PipelineID: 6, - PID: 3, - PPID: 1, - Name: "lint", - State: StatusFailure, - Error: "1", - }} - steps, err := Tree(steps) - assert.NoError(t, err) - assert.Len(t, steps, 1) - assert.Len(t, steps[0].Children, 2) - - steps = []*Step{{ - ID: 25, - UUID: "f80df0bb-77a7-4964-9412-2e1049872d57", - PID: 2, - PipelineID: 6, - PPID: 1, - Name: "clone", - State: StatusSuccess, - Error: "0", - }} - _, err = Tree(steps) - assert.Error(t, err) -} diff --git a/server/model/workflow.go b/server/model/workflow.go new file mode 100644 index 000000000..067348492 --- /dev/null +++ b/server/model/workflow.go @@ -0,0 +1,75 @@ +// Copyright 2021 Woodpecker Authors +// Copyright 2018 Drone.IO Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package model + +// Workflow represents a workflow in the pipeline. +type Workflow struct { + ID int64 `json:"id" xorm:"pk autoincr 'workflow_id'"` + PipelineID int64 `json:"pipeline_id" xorm:"UNIQUE(s) INDEX 'workflow_pipeline_id'"` + PID int `json:"pid" xorm:"UNIQUE(s) 'workflow_pid'"` + Name string `json:"name" xorm:"workflow_name"` + State StatusValue `json:"state" xorm:"workflow_state"` + Error string `json:"error,omitempty" xorm:"VARCHAR(500) workflow_error"` + Started int64 `json:"start_time,omitempty" xorm:"workflow_started"` + Stopped int64 `json:"end_time,omitempty" xorm:"workflow_stopped"` + AgentID int64 `json:"agent_id,omitempty" xorm:"workflow_agent_id"` + Platform string `json:"platform,omitempty" xorm:"workflow_platform"` + Environ map[string]string `json:"environ,omitempty" xorm:"json 'workflow_environ'"` + Children []*Step `json:"children,omitempty" xorm:"-"` +} + +type UpdateWorkflowStore interface { + WorkflowUpdate(*Workflow) error +} + +// TableName return database table name for xorm +func (Workflow) TableName() string { + return "workflows" +} + +// Running returns true if the process state is pending or running. +func (p *Workflow) Running() bool { + return p.State == StatusPending || p.State == StatusRunning +} + +// Failing returns true if the process state is failed, killed or error. +func (p *Workflow) Failing() bool { + return p.State == StatusError || p.State == StatusKilled || p.State == StatusFailure +} + +// IsThereRunningStage determine if it contains workflows running or pending to run +// TODO: return false based on depends_on (https://github.com/woodpecker-ci/woodpecker/pull/730#discussion_r795681697) +func IsThereRunningStage(workflows []*Workflow) bool { + for _, p := range workflows { + if p.Running() { + return true + } + } + return false +} + +// PipelineStatus determine pipeline status based on corresponding step list +func PipelineStatus(workflows []*Workflow) StatusValue { + status := StatusSuccess + + for _, p := range workflows { + if p.Failing() { + status = p.State + } + } + + return status +} diff --git a/server/pipeline/cancel.go b/server/pipeline/cancel.go index b0b353ebe..bd45933b1 100644 --- a/server/pipeline/cancel.go +++ b/server/pipeline/cancel.go @@ -32,7 +32,7 @@ func Cancel(ctx context.Context, store store.Store, repo *model.Repo, user *mode return &ErrBadRequest{Msg: "Cannot cancel a non-running or non-pending or non-blocked pipeline"} } - steps, err := store.StepList(pipeline) + workflows, err := store.WorkflowGetTree(pipeline) if err != nil { return &ErrNotFound{Msg: err.Error()} } @@ -42,15 +42,12 @@ func Cancel(ctx context.Context, store store.Store, repo *model.Repo, user *mode stepsToCancel []string stepsToEvict []string ) - for _, step := range steps { - if step.PPID != 0 { - continue + for _, workflow := range workflows { + if workflow.State == model.StatusRunning { + stepsToCancel = append(stepsToCancel, fmt.Sprint(workflow.ID)) } - if step.State == model.StatusRunning { - stepsToCancel = append(stepsToCancel, fmt.Sprint(step.ID)) - } - if step.State == model.StatusPending { - stepsToEvict = append(stepsToEvict, fmt.Sprint(step.ID)) + if workflow.State == model.StatusPending { + stepsToEvict = append(stepsToEvict, fmt.Sprint(workflow.ID)) } } @@ -70,15 +67,16 @@ func Cancel(ctx context.Context, store store.Store, repo *model.Repo, user *mode // Then update the DB status for pending pipelines // Running ones will be set when the agents stop on the cancel signal - for _, step := range steps { - if step.State == model.StatusPending { - if step.PPID != 0 { + for _, workflow := range workflows { + if workflow.State == model.StatusPending { + if _, err = UpdateWorkflowToStatusSkipped(store, *workflow); err != nil { + log.Error().Err(err).Msgf("cannot update workflow with id %d state", workflow.ID) + } + } + for _, step := range workflow.Children { + if step.State == model.StatusPending { if _, err = UpdateStepToStatusSkipped(store, *step, 0); err != nil { - log.Error().Msgf("error: done: cannot update step_id %d state: %s", step.ID, err) - } - } else { - if _, err = UpdateStepToStatusKilled(store, *step); err != nil { - log.Error().Msgf("error: done: cannot update step_id %d state: %s", step.ID, err) + log.Error().Err(err).Msgf("cannot update workflow with id %d state", workflow.ID) } } } @@ -92,11 +90,7 @@ func Cancel(ctx context.Context, store store.Store, repo *model.Repo, user *mode updatePipelineStatus(ctx, killedPipeline, repo, user) - steps, err = store.StepList(killedPipeline) - if err != nil { - return &ErrNotFound{Msg: err.Error()} - } - if killedPipeline.Steps, err = model.Tree(steps); err != nil { + if killedPipeline.Workflows, err = store.WorkflowGetTree(killedPipeline); err != nil { return err } if err := publishToTopic(ctx, killedPipeline, repo); err != nil { diff --git a/server/pipeline/create.go b/server/pipeline/create.go index 12c4f21b9..acad2afc5 100644 --- a/server/pipeline/create.go +++ b/server/pipeline/create.go @@ -100,7 +100,7 @@ func Create(ctx context.Context, _store store.Store, repo *model.Repo, pipeline pipeline.Status = model.StatusBlocked } - err = _store.CreatePipeline(pipeline, pipeline.Steps...) + err = _store.CreatePipeline(pipeline) if err != nil { msg := fmt.Sprintf("failure to save pipeline for %s", repo.FullName) log.Error().Err(err).Msg(msg) diff --git a/server/pipeline/decline.go b/server/pipeline/decline.go index b90466586..71a48dfd6 100644 --- a/server/pipeline/decline.go +++ b/server/pipeline/decline.go @@ -35,11 +35,7 @@ func Decline(ctx context.Context, store store.Store, pipeline *model.Pipeline, u return nil, fmt.Errorf("error updating pipeline. %w", err) } - pipeline.Steps, err = store.StepList(pipeline) - if err != nil { - log.Error().Err(err).Msg("can not get step list from store") - } - if pipeline.Steps, err = model.Tree(pipeline.Steps); err != nil { + if pipeline.Workflows, err = store.WorkflowGetTree(pipeline); err != nil { log.Error().Err(err).Msg("can not build tree from step list") } diff --git a/server/pipeline/helper.go b/server/pipeline/helper.go index 00492fd24..238448287 100644 --- a/server/pipeline/helper.go +++ b/server/pipeline/helper.go @@ -24,13 +24,8 @@ import ( ) func updatePipelineStatus(ctx context.Context, pipeline *model.Pipeline, repo *model.Repo, user *model.User) { - for _, step := range pipeline.Steps { - // skip child steps - if !step.IsParent() { - continue - } - - err := server.Config.Services.Forge.Status(ctx, user, repo, pipeline, step) + for _, workflow := range pipeline.Workflows { + err := server.Config.Services.Forge.Status(ctx, user, repo, pipeline, workflow) if err != nil { log.Error().Err(err).Msgf("error setting commit status for %s/%d", repo.FullName, pipeline.Number) return diff --git a/server/pipeline/start.go b/server/pipeline/start.go index 64f8d7b24..71c3c13c6 100644 --- a/server/pipeline/start.go +++ b/server/pipeline/start.go @@ -33,7 +33,7 @@ func start(ctx context.Context, store store.Store, activePipeline *model.Pipelin log.Error().Err(err).Msg("Failed to cancel previous pipelines") } - if err := store.StepCreate(activePipeline.Steps); err != nil { + if err := store.WorkflowsCreate(activePipeline.Workflows); err != nil { log.Error().Err(err).Str("repo", repo.FullName).Msgf("error persisting steps for %s#%d", repo.FullName, activePipeline.Number) return nil, err } @@ -49,10 +49,11 @@ func start(ctx context.Context, store store.Store, activePipeline *model.Pipelin // open logs streamer for each step go func() { - steps := activePipeline.Steps - for _, step := range steps { - if err := server.Config.Services.Logs.Open(context.Background(), step.ID); err != nil { - log.Error().Err(err).Msgf("could not open log stream for step %d", step.ID) + for _, wf := range activePipeline.Workflows { + for _, step := range wf.Children { + if err := server.Config.Services.Logs.Open(context.Background(), step.ID); err != nil { + log.Error().Err(err).Msgf("could not open log stream for step %d", step.ID) + } } } }() diff --git a/server/pipeline/stepStatus.go b/server/pipeline/stepStatus.go index 7f1f4dd29..81f2d94b8 100644 --- a/server/pipeline/stepStatus.go +++ b/server/pipeline/stepStatus.go @@ -22,7 +22,7 @@ import ( "github.com/woodpecker-ci/woodpecker/server/model" ) -func UpdateStepStatus(store model.UpdateStepStore, step model.Step, state rpc.State, started int64) (*model.Step, error) { +func UpdateStepStatus(store model.UpdateStepStore, step *model.Step, state rpc.State, started int64) error { if state.Exited { step.Stopped = state.Finished step.ExitCode = state.ExitCode @@ -42,7 +42,7 @@ func UpdateStepStatus(store model.UpdateStepStore, step model.Step, state rpc.St if step.Started == 0 && step.Stopped != 0 { step.Started = started } - return &step, store.StepUpdate(&step) + return store.StepUpdate(step) } func UpdateStepToStatusStarted(store model.UpdateStepStore, step model.Step, state rpc.State) (*model.Step, error) { diff --git a/server/pipeline/stepStatus_test.go b/server/pipeline/stepStatus_test.go index 4a6664191..e09f00e94 100644 --- a/server/pipeline/stepStatus_test.go +++ b/server/pipeline/stepStatus_test.go @@ -19,6 +19,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/woodpecker-ci/woodpecker/pipeline/rpc" "github.com/woodpecker-ci/woodpecker/server/model" ) @@ -40,7 +42,9 @@ func TestUpdateStepStatusNotExited(t *testing.T) { ExitCode: 137, Error: "not an error", } - step, _ := UpdateStepStatus(&mockUpdateStepStore{}, model.Step{}, state, int64(1)) + step := &model.Step{} + err := UpdateStepStatus(&mockUpdateStepStore{}, step, state, int64(1)) + assert.NoError(t, err) if step.State != model.StatusRunning { t.Errorf("Step status not equals '%s' != '%s'", model.StatusRunning, step.State) @@ -67,7 +71,8 @@ func TestUpdateStepStatusNotExitedButStopped(t *testing.T) { ExitCode: 137, Error: "not an error", } - step, _ = UpdateStepStatus(&mockUpdateStepStore{}, *step, state, int64(42)) + err := UpdateStepStatus(&mockUpdateStepStore{}, step, state, int64(42)) + assert.NoError(t, err) if step.State != model.StatusRunning { t.Errorf("Step status not equals '%s' != '%s'", model.StatusRunning, step.State) @@ -92,7 +97,10 @@ func TestUpdateStepStatusExited(t *testing.T) { ExitCode: 137, Error: "an error", } - step, _ := UpdateStepStatus(&mockUpdateStepStore{}, model.Step{}, state, int64(42)) + + step := &model.Step{} + err := UpdateStepStatus(&mockUpdateStepStore{}, step, state, int64(42)) + assert.NoError(t, err) if step.State != model.StatusKilled { t.Errorf("Step status not equals '%s' != '%s'", model.StatusKilled, step.State) @@ -116,7 +124,9 @@ func TestUpdateStepStatusExitedButNot137(t *testing.T) { Finished: int64(34), Error: "an error", } - step, _ := UpdateStepStatus(&mockUpdateStepStore{}, model.Step{}, state, int64(42)) + step := &model.Step{} + err := UpdateStepStatus(&mockUpdateStepStore{}, step, state, int64(42)) + assert.NoError(t, err) if step.State != model.StatusFailure { t.Errorf("Step status not equals '%s' != '%s'", model.StatusFailure, step.State) @@ -141,7 +151,9 @@ func TestUpdateStepStatusExitedWithCode(t *testing.T) { ExitCode: 1, Error: "an error", } - step, _ := UpdateStepStatus(&mockUpdateStepStore{}, model.Step{}, state, int64(42)) + step := &model.Step{} + err := UpdateStepStatus(&mockUpdateStepStore{}, step, state, int64(42)) + assert.NoError(t, err) if step.State != model.StatusFailure { t.Errorf("Step status not equals '%s' != '%s'", model.StatusFailure, step.State) diff --git a/server/pipeline/topic.go b/server/pipeline/topic.go index cbeb641eb..02bf3ea5d 100644 --- a/server/pipeline/topic.go +++ b/server/pipeline/topic.go @@ -33,9 +33,6 @@ func publishToTopic(c context.Context, pipeline *model.Pipeline, repo *model.Rep }, } pipelineCopy := *pipeline - if pipelineCopy.Steps, err = model.Tree(pipelineCopy.Steps); err != nil { - return err - } message.Data, _ = json.Marshal(model.Event{ Repo: *repo, diff --git a/server/pipeline/workflowStatus.go b/server/pipeline/workflowStatus.go new file mode 100644 index 000000000..7bf473f84 --- /dev/null +++ b/server/pipeline/workflowStatus.go @@ -0,0 +1,45 @@ +// Copyright 2023 Woodpecker Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipeline + +import ( + "github.com/woodpecker-ci/woodpecker/pipeline/rpc" + "github.com/woodpecker-ci/woodpecker/server/model" +) + +func UpdateWorkflowToStatusStarted(store model.UpdateWorkflowStore, workflow model.Workflow, state rpc.State) (*model.Workflow, error) { + workflow.Started = state.Started + workflow.State = model.StatusRunning + return &workflow, store.WorkflowUpdate(&workflow) +} + +func UpdateWorkflowToStatusSkipped(store model.UpdateWorkflowStore, workflow model.Workflow) (*model.Workflow, error) { + workflow.State = model.StatusSkipped + return &workflow, store.WorkflowUpdate(&workflow) +} + +func UpdateWorkflowStatusToDone(store model.UpdateWorkflowStore, workflow model.Workflow, state rpc.State) (*model.Workflow, error) { + workflow.Stopped = state.Finished + workflow.Error = state.Error + if state.Started == 0 { + workflow.State = model.StatusSkipped + } else { + workflow.State = model.StatusSuccess + } + if workflow.Error != "" { + workflow.State = model.StatusFailure + } + return &workflow, store.WorkflowUpdate(&workflow) +} diff --git a/server/queue/fifo.go b/server/queue/fifo.go index e79173d1a..8fd13fab0 100644 --- a/server/queue/fifo.go +++ b/server/queue/fifo.go @@ -117,7 +117,7 @@ func (q *fifo) Error(_ context.Context, id string, err error) error { return q.finished([]string{id}, model.StatusFailure, err) } -// Error signals that the item is done executing with error. +// ErrorAtOnce signals that the item is done executing with error. func (q *fifo) ErrorAtOnce(_ context.Context, id []string, err error) error { return q.finished(id, model.StatusFailure, err) } diff --git a/server/store/datastore/migration/020_parent_steps_to_workflows.go b/server/store/datastore/migration/020_parent_steps_to_workflows.go new file mode 100644 index 000000000..19589a3f8 --- /dev/null +++ b/server/store/datastore/migration/020_parent_steps_to_workflows.go @@ -0,0 +1,87 @@ +// Copyright 2022 Woodpecker Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package migration + +import ( + "xorm.io/xorm" + + "github.com/woodpecker-ci/woodpecker/server/model" +) + +type oldStep018 struct { + ID int64 `xorm:"pk autoincr 'step_id'"` + PipelineID int64 `xorm:"UNIQUE(s) INDEX 'step_pipeline_id'"` + PID int `xorm:"UNIQUE(s) 'step_pid'"` + PPID int `xorm:"step_ppid"` + Name string `xorm:"step_name"` + State model.StatusValue `xorm:"step_state"` + Error string `xorm:"VARCHAR(500) step_error"` + Started int64 `xorm:"step_started"` + Stopped int64 `xorm:"step_stopped"` + AgentID int64 `xorm:"step_agent_id"` + Platform string `xorm:"step_platform"` + Environ map[string]string `xorm:"json 'step_environ'"` +} + +func (oldStep018) TableName() string { + return "steps" +} + +var parentStepsToWorkflows = task{ + name: "parent-steps-to-workflows", + required: true, + fn: func(sess *xorm.Session) error { + if err := sess.Sync(new(model.Workflow)); err != nil { + return err + } + // make sure the columns exist before removing them + if err := sess.Sync(new(oldStep018)); err != nil { + return err + } + + var parentSteps []*oldStep018 + err := sess.Where("step_ppid = ?", 0).Find(&parentSteps) + if err != nil { + return err + } + + for _, p := range parentSteps { + asWorkflow := &model.Workflow{ + PipelineID: p.PipelineID, + PID: p.PID, + Name: p.Name, + State: p.State, + Error: p.Error, + Started: p.Started, + Stopped: p.Stopped, + AgentID: p.AgentID, + Platform: p.Platform, + Environ: p.Environ, + } + + _, err = sess.Insert(asWorkflow) + if err != nil { + return err + } + + _, err = sess.Delete(&oldStep018{ID: p.ID}) + if err != nil { + return err + } + } + + return dropTableColumns(sess, "steps", "step_agent_id", "step_platform", "step_environ") + }, +} diff --git a/server/store/datastore/migration/migration.go b/server/store/datastore/migration/migration.go index b98cc44c3..d0267ed7f 100644 --- a/server/store/datastore/migration/migration.go +++ b/server/store/datastore/migration/migration.go @@ -52,6 +52,7 @@ var migrationTasks = []*task{ &dropOldCols, &initLogsEntriesTable, &migrateLogs2LogEntries, + &parentStepsToWorkflows, } var allBeans = []interface{}{ @@ -70,6 +71,7 @@ var allBeans = []interface{}{ new(model.ServerConfig), new(model.Cron), new(model.Redirection), + new(model.Workflow), } type migrations struct { diff --git a/server/store/datastore/step.go b/server/store/datastore/step.go index 4df262e03..44fbefa7b 100644 --- a/server/store/datastore/step.go +++ b/server/store/datastore/step.go @@ -55,21 +55,27 @@ func (s storage) StepList(pipeline *model.Pipeline) ([]*model.Step, error) { Find(&stepList) } -func (s storage) StepCreate(steps []*model.Step) error { - sess := s.engine.NewSession() - defer sess.Close() - if err := sess.Begin(); err != nil { - return err - } +func (s storage) StepListFromWorkflowFind(workflow *model.Workflow) ([]*model.Step, error) { + return s.stepListWorkflow(s.engine.NewSession(), workflow) +} +func (s storage) stepListWorkflow(sess *xorm.Session, workflow *model.Workflow) ([]*model.Step, error) { + stepList := make([]*model.Step, 0) + return stepList, sess. + Where("step_pipeline_id = ?", workflow.PipelineID). + Where("step_ppid = ?", workflow.PID). + OrderBy("step_pid"). + Find(&stepList) +} + +func (s storage) stepCreate(sess *xorm.Session, steps []*model.Step) error { for i := range steps { // only Insert on single object ref set auto created ID back to object if _, err := sess.Insert(steps[i]); err != nil { return err } } - - return sess.Commit() + return nil } func (s storage) StepUpdate(step *model.Step) error { @@ -88,6 +94,10 @@ func (s storage) StepClear(pipeline *model.Pipeline) error { return err } + if _, err := sess.Where("workflow_pipeline_id = ?", pipeline.ID).Delete(new(model.Workflow)); err != nil { + return err + } + return sess.Commit() } diff --git a/server/store/datastore/step_test.go b/server/store/datastore/step_test.go index 9aa8e4853..8238baa07 100644 --- a/server/store/datastore/step_test.go +++ b/server/store/datastore/step_test.go @@ -26,6 +26,8 @@ import ( func TestStepFind(t *testing.T) { store, closer := newTestStore(t, new(model.Step), new(model.Pipeline)) + sess := store.engine.NewSession() + defer closer() steps := []*model.Step{ @@ -38,14 +40,12 @@ func TestStepFind(t *testing.T) { State: model.StatusSuccess, Error: "pc load letter", ExitCode: 255, - AgentID: 1, - Platform: "linux/amd64", - Environ: map[string]string{"GOLANG": "tip"}, }, } - assert.NoError(t, store.StepCreate(steps)) + assert.NoError(t, store.stepCreate(sess, steps)) assert.EqualValues(t, 1, steps[0].ID) - assert.Error(t, store.StepCreate(steps)) + assert.Error(t, store.stepCreate(sess, steps)) + assert.NoError(t, sess.Close()) step, err := store.StepFind(&model.Pipeline{ID: 1000}, 1) if !assert.NoError(t, err) { @@ -58,7 +58,8 @@ func TestStepChild(t *testing.T) { store, closer := newTestStore(t, new(model.Step), new(model.Pipeline)) defer closer() - err := store.StepCreate([]*model.Step{ + sess := store.engine.NewSession() + err := store.stepCreate(sess, []*model.Step{ { UUID: "ea6d4008-8ace-4f8a-ad03-53f1756465d9", PipelineID: 1, @@ -79,6 +80,7 @@ func TestStepChild(t *testing.T) { t.Errorf("Unexpected error: insert steps: %s", err) return } + _ = sess.Commit() step, err := store.StepChild(&model.Pipeline{ID: 1}, 1, "build") if err != nil { t.Error(err) @@ -97,7 +99,8 @@ func TestStepList(t *testing.T) { store, closer := newTestStore(t, new(model.Step), new(model.Pipeline)) defer closer() - err := store.StepCreate([]*model.Step{ + sess := store.engine.NewSession() + err := store.stepCreate(sess, []*model.Step{ { UUID: "2bf387f7-2913-4907-814c-c9ada88707c0", PipelineID: 2, @@ -125,6 +128,7 @@ func TestStepList(t *testing.T) { t.Errorf("Unexpected error: insert steps: %s", err) return } + _ = sess.Commit() steps, err := store.StepList(&model.Pipeline{ID: 1}) if err != nil { t.Error(err) @@ -148,14 +152,13 @@ func TestStepUpdate(t *testing.T) { State: "pending", Error: "pc load letter", ExitCode: 255, - AgentID: 1, - Platform: "linux/amd64", - Environ: map[string]string{"GOLANG": "tip"}, } - if err := store.StepCreate([]*model.Step{step}); err != nil { + sess := store.engine.NewSession() + if err := store.stepCreate(sess, []*model.Step{step}); err != nil { t.Errorf("Unexpected error: insert step: %s", err) return } + _ = sess.Commit() step.State = "running" if err := store.StepUpdate(step); err != nil { t.Errorf("Unexpected error: update step: %s", err) @@ -175,7 +178,8 @@ func TestStepIndexes(t *testing.T) { store, closer := newTestStore(t, new(model.Step), new(model.Pipeline)) defer closer() - if err := store.StepCreate([]*model.Step{ + sess := store.engine.NewSession() + if err := store.stepCreate(sess, []*model.Step{ { UUID: "4db7e5fc-5312-4d02-9e14-b51b9e3242cc", PipelineID: 1, @@ -190,7 +194,7 @@ func TestStepIndexes(t *testing.T) { } // fail due to duplicate pid - if err := store.StepCreate([]*model.Step{ + if err := store.stepCreate(sess, []*model.Step{ { UUID: "c1f33a9e-2a02-4579-95ec-90255d785a12", PipelineID: 1, @@ -204,7 +208,7 @@ func TestStepIndexes(t *testing.T) { } // fail due to duplicate uuid - if err := store.StepCreate([]*model.Step{ + if err := store.stepCreate(sess, []*model.Step{ { UUID: "4db7e5fc-5312-4d02-9e14-b51b9e3242cc", PipelineID: 5, @@ -216,13 +220,15 @@ func TestStepIndexes(t *testing.T) { }); err == nil { t.Errorf("Unexpected error: duplicate pid") } + _ = sess.Close() } func TestStepByUUID(t *testing.T) { store, closer := newTestStore(t, new(model.Step), new(model.Pipeline)) defer closer() - assert.NoError(t, store.StepCreate([]*model.Step{ + sess := store.engine.NewSession() + assert.NoError(t, store.stepCreate(sess, []*model.Step{ { UUID: "4db7e5fc-5312-4d02-9e14-b51b9e3242cc", PipelineID: 1, @@ -240,11 +246,9 @@ func TestStepByUUID(t *testing.T) { State: "pending", Error: "pc load letter", ExitCode: 255, - AgentID: 1, - Platform: "linux/amd64", - Environ: map[string]string{"GOLANG": "tip"}, }, })) + _ = sess.Close() step, err := store.StepByUUID("4db7e5fc-5312-4d02-9e14-b51b9e3242cc") assert.NoError(t, err) diff --git a/server/store/datastore/workflow.go b/server/store/datastore/workflow.go new file mode 100644 index 000000000..f0e546405 --- /dev/null +++ b/server/store/datastore/workflow.go @@ -0,0 +1,71 @@ +package datastore + +import ( + "xorm.io/xorm" + + "github.com/woodpecker-ci/woodpecker/server/model" +) + +func (s storage) WorkflowGetTree(pipeline *model.Pipeline) ([]*model.Workflow, error) { + sess := s.engine.NewSession() + wfList, err := s.workflowList(sess, pipeline) + if err != nil { + return nil, err + } + + for _, wf := range wfList { + wf.Children, err = s.stepListWorkflow(sess, wf) + if err != nil { + return nil, err + } + } + + return wfList, sess.Commit() +} + +func (s storage) WorkflowsCreate(workflows []*model.Workflow) error { + sess := s.engine.NewSession() + defer sess.Close() + if err := sess.Begin(); err != nil { + return err + } + + for i := range workflows { + // only Insert on single object ref set auto created ID back to object + if err := s.stepCreate(sess, workflows[i].Children); err != nil { + return err + } + if _, err := sess.Insert(workflows[i]); err != nil { + return err + } + } + + return sess.Commit() +} + +func (s storage) WorkflowList(pipeline *model.Pipeline) ([]*model.Workflow, error) { + return s.workflowList(s.engine.NewSession(), pipeline) +} + +// workflowList lists workflows without child steps +func (s storage) workflowList(sess *xorm.Session, pipeline *model.Pipeline) ([]*model.Workflow, error) { + var wfList []*model.Workflow + err := sess.Where("workflow_pipeline_id = ?", pipeline.ID). + OrderBy("workflow_pid"). + Find(&wfList) + if err != nil { + return nil, err + } + + return wfList, nil +} + +func (s storage) WorkflowLoad(id int64) (*model.Workflow, error) { + workflow := new(model.Workflow) + return workflow, wrapGet(s.engine.ID(id).Get(workflow)) +} + +func (s storage) WorkflowUpdate(workflow *model.Workflow) error { + _, err := s.engine.ID(workflow.ID).AllCols().Update(workflow) + return err +} diff --git a/server/store/datastore/workflow_test.go b/server/store/datastore/workflow_test.go new file mode 100644 index 000000000..e0eca76a7 --- /dev/null +++ b/server/store/datastore/workflow_test.go @@ -0,0 +1,171 @@ +// Copyright 2022 Woodpecker Authors +// Copyright 2018 Drone.IO Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package datastore + +import ( + "testing" + + "github.com/woodpecker-ci/woodpecker/server/model" +) + +func TestWorkflowLoad(t *testing.T) { + store, closer := newTestStore(t, new(model.Step), new(model.Pipeline), new(model.Workflow)) + defer closer() + + wf := &model.Workflow{ + PipelineID: 1, + PID: 1, + Name: "woodpecker", + Children: []*model.Step{ + { + UUID: "ea6d4008-8ace-4f8a-ad03-53f1756465d9", + PipelineID: 1, + PID: 2, + PPID: 1, + State: "success", + }, + { + UUID: "2bf387f7-2913-4907-814c-c9ada88707c0", + PipelineID: 1, + PID: 3, + PPID: 1, + Name: "build", + State: "success", + }, + }, + } + err := store.WorkflowsCreate([]*model.Workflow{wf}) + if err != nil { + t.Errorf("Unexpected error: insert steps: %s", err) + return + } + workflowGet, err := store.WorkflowLoad(1) + if err != nil { + t.Errorf("Unexpected error: insert steps: %s", err) + return + } + + if got, want := workflowGet.PipelineID, int64(1); got != want { + t.Errorf("Want pipeline id %d, got %d", want, got) + } + if got, want := workflowGet.PID, 1; got != want { + t.Errorf("Want workflow pid %d, got %d", want, got) + } + // children are not loaded + if got, want := len(workflowGet.Children), 0; got != want { + t.Errorf("Want children len %d, got %d", want, got) + } +} + +func TestWorkflowGetTree(t *testing.T) { + store, closer := newTestStore(t, new(model.Step), new(model.Pipeline), new(model.Workflow)) + defer closer() + + wf := &model.Workflow{ + PipelineID: 1, + PID: 1, + Name: "woodpecker", + Children: []*model.Step{ + { + UUID: "ea6d4008-8ace-4f8a-ad03-53f1756465d9", + PipelineID: 1, + PID: 2, + PPID: 1, + State: "success", + }, + { + UUID: "2bf387f7-2913-4907-814c-c9ada88707c0", + PipelineID: 1, + PID: 3, + PPID: 1, + Name: "build", + State: "success", + }, + }, + } + err := store.WorkflowsCreate([]*model.Workflow{wf}) + if err != nil { + t.Errorf("Unexpected error: insert steps: %s", err) + return + } + + workflowsGet, err := store.WorkflowGetTree(&model.Pipeline{ID: 1}) + if err != nil { + t.Error(err) + return + } + + if got, want := len(workflowsGet), 1; got != want { + t.Errorf("Want workflow len %d, got %d", want, got) + return + } + workflowGet := workflowsGet[0] + if got, want := workflowGet.Name, "woodpecker"; got != want { + t.Errorf("Want workflow name %s, got %s", want, got) + } + if got, want := len(workflowGet.Children), 2; got != want { + t.Errorf("Want children len %d, got %d", want, got) + return + } + if got, want := workflowGet.Children[0].PID, 2; got != want { + t.Errorf("Want children len %d, got %d", want, got) + } + if got, want := workflowGet.Children[1].PID, 3; got != want { + t.Errorf("Want children len %d, got %d", want, got) + } +} + +func TestWorkflowUpdate(t *testing.T) { + store, closer := newTestStore(t, new(model.Step), new(model.Pipeline), new(model.Workflow)) + defer closer() + + wf := &model.Workflow{ + PipelineID: 1, + PID: 1, + Name: "woodpecker", + State: "pending", + } + err := store.WorkflowsCreate([]*model.Workflow{wf}) + if err != nil { + t.Errorf("Unexpected error: insert steps: %s", err) + return + } + workflowGet, err := store.WorkflowLoad(1) + if err != nil { + t.Errorf("Unexpected error: insert steps: %s", err) + return + } + + if got, want := workflowGet.State, model.StatusValue("pending"); got != want { + t.Errorf("Want workflow state %s, got %s", want, got) + } + + wf.State = "success" + + err = store.WorkflowUpdate(wf) + if err != nil { + t.Errorf("Unexpected error: insert steps: %s", err) + return + } + workflowGet, err = store.WorkflowLoad(1) + if err != nil { + t.Errorf("Unexpected error: insert steps: %s", err) + return + } + if got, want := workflowGet.State, model.StatusValue("success"); got != want { + t.Errorf("Want workflow state %s, got %s", want, got) + } +} diff --git a/server/store/mocks/store.go b/server/store/mocks/store.go index 4300f418e..7ead13c1c 100644 --- a/server/store/mocks/store.go +++ b/server/store/mocks/store.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.28.2. DO NOT EDIT. +// Code generated by mockery v2.29.0. DO NOT EDIT. package mocks @@ -1701,20 +1701,6 @@ func (_m *Store) StepClear(_a0 *model.Pipeline) error { return r0 } -// StepCreate provides a mock function with given fields: _a0 -func (_m *Store) StepCreate(_a0 []*model.Step) error { - ret := _m.Called(_a0) - - var r0 error - if rf, ok := ret.Get(0).(func([]*model.Step) error); ok { - r0 = rf(_a0) - } else { - r0 = ret.Error(0) - } - - return r0 -} - // StepFind provides a mock function with given fields: _a0, _a1 func (_m *Store) StepFind(_a0 *model.Pipeline, _a1 int) (*model.Step, error) { ret := _m.Called(_a0, _a1) @@ -1767,6 +1753,32 @@ func (_m *Store) StepList(_a0 *model.Pipeline) ([]*model.Step, error) { return r0, r1 } +// StepListFromWorkflowFind provides a mock function with given fields: _a0 +func (_m *Store) StepListFromWorkflowFind(_a0 *model.Workflow) ([]*model.Step, error) { + ret := _m.Called(_a0) + + var r0 []*model.Step + var r1 error + if rf, ok := ret.Get(0).(func(*model.Workflow) ([]*model.Step, error)); ok { + return rf(_a0) + } + if rf, ok := ret.Get(0).(func(*model.Workflow) []*model.Step); ok { + r0 = rf(_a0) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*model.Step) + } + } + + if rf, ok := ret.Get(1).(func(*model.Workflow) error); ok { + r1 = rf(_a0) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // StepLoad provides a mock function with given fields: _a0 func (_m *Store) StepLoad(_a0 int64) (*model.Step, error) { ret := _m.Called(_a0) @@ -1929,6 +1941,86 @@ func (_m *Store) UserFeed(_a0 *model.User) ([]*model.Feed, error) { return r0, r1 } +// WorkflowGetTree provides a mock function with given fields: _a0 +func (_m *Store) WorkflowGetTree(_a0 *model.Pipeline) ([]*model.Workflow, error) { + ret := _m.Called(_a0) + + var r0 []*model.Workflow + var r1 error + if rf, ok := ret.Get(0).(func(*model.Pipeline) ([]*model.Workflow, error)); ok { + return rf(_a0) + } + if rf, ok := ret.Get(0).(func(*model.Pipeline) []*model.Workflow); ok { + r0 = rf(_a0) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*model.Workflow) + } + } + + if rf, ok := ret.Get(1).(func(*model.Pipeline) error); ok { + r1 = rf(_a0) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// WorkflowLoad provides a mock function with given fields: _a0 +func (_m *Store) WorkflowLoad(_a0 int64) (*model.Workflow, error) { + ret := _m.Called(_a0) + + var r0 *model.Workflow + var r1 error + if rf, ok := ret.Get(0).(func(int64) (*model.Workflow, error)); ok { + return rf(_a0) + } + if rf, ok := ret.Get(0).(func(int64) *model.Workflow); ok { + r0 = rf(_a0) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*model.Workflow) + } + } + + if rf, ok := ret.Get(1).(func(int64) error); ok { + r1 = rf(_a0) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// WorkflowUpdate provides a mock function with given fields: _a0 +func (_m *Store) WorkflowUpdate(_a0 *model.Workflow) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func(*model.Workflow) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// WorkflowsCreate provides a mock function with given fields: _a0 +func (_m *Store) WorkflowsCreate(_a0 []*model.Workflow) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func([]*model.Workflow) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + type mockConstructorTestingTNewStore interface { mock.TestingT Cleanup(func()) diff --git a/server/store/store.go b/server/store/store.go index b76060d45..6b97227ee 100644 --- a/server/store/store.go +++ b/server/store/store.go @@ -139,9 +139,9 @@ type Store interface { StepByUUID(string) (*model.Step, error) StepChild(*model.Pipeline, int, string) (*model.Step, error) StepList(*model.Pipeline) ([]*model.Step, error) - StepCreate([]*model.Step) error StepUpdate(*model.Step) error StepClear(*model.Pipeline) error + StepListFromWorkflowFind(*model.Workflow) ([]*model.Step, error) // Logs LogFind(*model.Step) ([]*model.LogEntry, error) @@ -177,6 +177,12 @@ type Store interface { AgentUpdate(*model.Agent) error AgentDelete(*model.Agent) error + // Workflow + WorkflowGetTree(*model.Pipeline) ([]*model.Workflow, error) + WorkflowsCreate([]*model.Workflow) error + WorkflowLoad(int64) (*model.Workflow, error) + WorkflowUpdate(*model.Workflow) error + // Store operations Ping() error Close() error diff --git a/web/src/components/repo/pipeline/PipelineLog.vue b/web/src/components/repo/pipeline/PipelineLog.vue index 546c9955b..1deea6e8a 100644 --- a/web/src/components/repo/pipeline/PipelineLog.vue +++ b/web/src/components/repo/pipeline/PipelineLog.vue @@ -104,7 +104,7 @@ const apiClient = useApiClient(); const loadedStepSlug = ref(); const stepSlug = computed(() => `${repo?.value.owner} - ${repo?.value.name} - ${pipeline.value.id} - ${stepId.value}`); -const step = computed(() => pipeline.value && findStep(pipeline.value.steps || [], stepId.value)); +const step = computed(() => pipeline.value && findStep(pipeline.value.workflows || [], stepId.value)); const stream = ref(); const log = ref(); const consoleElement = ref(); diff --git a/web/src/components/repo/pipeline/PipelineStepDuration.vue b/web/src/components/repo/pipeline/PipelineStepDuration.vue index 5d336eed9..51b599c7e 100644 --- a/web/src/components/repo/pipeline/PipelineStepDuration.vue +++ b/web/src/components/repo/pipeline/PipelineStepDuration.vue @@ -1,12 +1,12 @@ diff --git a/web/src/components/repo/pipeline/PipelineStepList.vue b/web/src/components/repo/pipeline/PipelineStepList.vue index dbdb2b30c..a16273389 100644 --- a/web/src/components/repo/pipeline/PipelineStepList.vue +++ b/web/src/components/repo/pipeline/PipelineStepList.vue @@ -38,14 +38,14 @@ -
+
{{ $t('repo.pipeline.no_pipeline_steps') }}
@@ -56,7 +56,7 @@
@@ -81,7 +81,7 @@ :class="{ 'max-h-screen': !workflowsCollapsed[workflow.id], 'max-h-0': workflowsCollapsed[workflow.id], - 'ml-6': pipeline.steps && pipeline.steps.length > 1, + 'ml-6': pipeline.workflows && pipeline.workflows.length > 1, }" >