This commit is contained in:
Anbraten 2024-06-28 00:48:40 +02:00
parent 3ed09c9e52
commit 655b89bfcc
11 changed files with 463 additions and 383 deletions

View file

@ -165,7 +165,7 @@ func (c *client) Init(ctx context.Context, workflowID string, state rpc.Workflow
retry := c.newBackOff() retry := c.newBackOff()
req := new(proto.InitRequest) req := new(proto.InitRequest)
req.Id = workflowID req.Id = workflowID
req.State = new(proto.State) req.State = new(proto.WorkflowState)
req.State.Error = state.Error req.State.Error = state.Error
req.State.Finished = state.Finished req.State.Finished = state.Finished
req.State.Started = state.Started req.State.Started = state.Started
@ -203,7 +203,7 @@ func (c *client) Done(ctx context.Context, workflowID string, state rpc.Workflow
retry := c.newBackOff() retry := c.newBackOff()
req := new(proto.DoneRequest) req := new(proto.DoneRequest)
req.Id = workflowID req.Id = workflowID
req.State = new(proto.State) req.State = new(proto.WorkflowState)
req.State.Error = state.Error req.State.Error = state.Error
req.State.Finished = state.Finished req.State.Finished = state.Finished
req.State.Started = state.Started req.State.Started = state.Started
@ -270,12 +270,12 @@ func (c *client) Extend(ctx context.Context, workflowID string) (err error) {
return nil return nil
} }
// Update updates the workflow state. // Update updates the state of a step.
func (c *client) Update(ctx context.Context, stepID string, state rpc.StepState) (err error) { func (c *client) Update(ctx context.Context, stepID string, state rpc.StepState) (err error) {
retry := c.newBackOff() retry := c.newBackOff()
req := new(proto.UpdateRequest) req := new(proto.UpdateRequest)
req.Id = stepID req.Id = stepID
req.State = new(proto.State) req.State = new(proto.StepState)
req.State.Error = state.Error req.State.Error = state.Error
req.State.ExitCode = int32(state.ExitCode) req.State.ExitCode = int32(state.ExitCode)
req.State.Finished = state.Finished req.State.Finished = state.Finished

View file

@ -56,34 +56,34 @@ func (r *Runner) Run(runnerCtx context.Context) error { //nolint:contextcheck
ctxMeta := metadata.NewOutgoingContext(context.Background(), meta) ctxMeta := metadata.NewOutgoingContext(context.Background(), meta)
// get the next workflow from the queue // get the next workflow from the queue
work, err := r.client.Next(runnerCtx, r.filter) workflow, err := r.client.Next(runnerCtx, r.filter)
if err != nil { if err != nil {
return err return err
} }
if work == nil { if workflow == nil {
return nil return nil
} }
timeout := time.Hour timeout := time.Hour
if minutes := work.Timeout; minutes != 0 { if minutes := workflow.Timeout; minutes != 0 {
timeout = time.Duration(minutes) * time.Minute timeout = time.Duration(minutes) * time.Minute
} }
repoName := extractRepositoryName(work.Config) // hack repoName := extractRepositoryName(workflow.Config) // hack
pipelineNumber := extractPipelineNumber(work.Config) // hack pipelineNumber := extractPipelineNumber(workflow.Config) // hack
r.counter.Add( r.counter.Add(
work.ID, workflow.ID,
timeout, timeout,
repoName, repoName,
pipelineNumber, pipelineNumber,
) )
defer r.counter.Done(work.ID) defer r.counter.Done(workflow.ID)
logger := log.With(). logger := log.With().
Str("repo", repoName). Str("repo", repoName).
Str("pipeline", pipelineNumber). Str("pipeline", pipelineNumber).
Str("id", work.ID). Str("workflow_id", workflow.ID).
Logger() Logger()
logger.Debug().Msg("received execution") logger.Debug().Msg("received execution")
@ -102,12 +102,12 @@ func (r *Runner) Run(runnerCtx context.Context) error { //nolint:contextcheck
go func() { go func() {
logger.Debug().Msg("listen for cancel signal") logger.Debug().Msg("listen for cancel signal")
if err := r.client.Wait(workflowCtx, work.ID); err != nil { if err := r.client.Wait(workflowCtx, workflow.ID); err != nil {
canceled = true canceled = true
logger.Warn().Err(err).Msg("cancel signal received") logger.Warn().Err(err).Msg("cancel signal received")
cancel() cancel()
} else { } else {
logger.Debug().Msg("stop listening for cancel signal") logger.Debug().Msg("done listening for cancel signal")
} }
}() }()
@ -116,12 +116,11 @@ func (r *Runner) Run(runnerCtx context.Context) error { //nolint:contextcheck
select { select {
case <-workflowCtx.Done(): case <-workflowCtx.Done():
logger.Debug().Msg("pipeline done") logger.Debug().Msg("pipeline done")
return return
case <-time.After(time.Minute): case <-time.After(time.Minute):
logger.Debug().Msg("pipeline lease renewed") logger.Debug().Msg("pipeline lease renewed")
if err := r.client.Extend(workflowCtx, workflow.ID); err != nil {
if err := r.client.Extend(workflowCtx, work.ID); err != nil {
log.Error().Err(err).Msg("extending pipeline deadline failed") log.Error().Err(err).Msg("extending pipeline deadline failed")
} }
} }
@ -131,21 +130,21 @@ func (r *Runner) Run(runnerCtx context.Context) error { //nolint:contextcheck
state := rpc.WorkflowState{} state := rpc.WorkflowState{}
state.Started = time.Now().Unix() state.Started = time.Now().Unix()
err = r.client.Init(runnerCtx, work.ID, state) err = r.client.Init(runnerCtx, workflow.ID, state)
if err != nil { if err != nil {
logger.Error().Err(err).Msg("pipeline initialization failed") logger.Error().Err(err).Msg("pipeline initialization failed")
} }
var uploads sync.WaitGroup var uploads sync.WaitGroup
//nolint:contextcheck //nolint:contextcheck
err = pipeline.New(work.Config, err = pipeline.New(workflow.Config,
pipeline.WithContext(workflowCtx), pipeline.WithContext(workflowCtx),
pipeline.WithTaskUUID(fmt.Sprint(work.ID)), pipeline.WithTaskUUID(fmt.Sprint(workflow.ID)),
pipeline.WithLogger(r.createLogger(logger, &uploads, work)), pipeline.WithLogger(r.createLogger(logger, &uploads, workflow)),
pipeline.WithTracer(r.createTracer(ctxMeta, logger, work)), pipeline.WithTracer(r.createTracer(ctxMeta, logger, workflow)),
pipeline.WithBackend(*r.backend), pipeline.WithBackend(*r.backend),
pipeline.WithDescription(map[string]string{ pipeline.WithDescription(map[string]string{
"ID": work.ID, "ID": workflow.ID,
"Repo": repoName, "Repo": repoName,
"Pipeline": pipelineNumber, "Pipeline": pipelineNumber,
}), }),
@ -153,12 +152,10 @@ func (r *Runner) Run(runnerCtx context.Context) error { //nolint:contextcheck
state.Finished = time.Now().Unix() state.Finished = time.Now().Unix()
if canceled {
err = errors.Join(err, pipeline.ErrCancel)
}
if errors.Is(err, pipeline.ErrCancel) { if errors.Is(err, pipeline.ErrCancel) {
canceled = true canceled = true
} else if canceled {
err = errors.Join(err, pipeline.ErrCancel)
} }
if err != nil { if err != nil {
@ -168,20 +165,20 @@ func (r *Runner) Run(runnerCtx context.Context) error { //nolint:contextcheck
logger.Debug(). logger.Debug().
Str("error", state.Error). Str("error", state.Error).
Bool("canceled", canceled). Bool("canceled", canceled).
Msg("pipeline complete") Msg("workflow finished")
logger.Debug().Msg("uploading logs") logger.Debug().Msg("uploading logs")
uploads.Wait() uploads.Wait()
logger.Debug().Msg("uploading logs complete") logger.Debug().Msg("uploaded logs")
logger.Debug(). logger.Debug().
Str("error", state.Error). Str("error", state.Error).
Msg("updating pipeline status") Msg("updating workflow status")
if err := r.client.Done(runnerCtx, work.ID, state); err != nil { if err := r.client.Done(runnerCtx, workflow.ID, state); err != nil {
logger.Error().Err(err).Msg("updating pipeline status failed") logger.Error().Err(err).Msg("updating workflow status failed")
} else { } else {
logger.Debug().Msg("updating pipeline status complete") logger.Debug().Msg("updating workflow status complete")
} }
return nil return nil

View file

@ -30,7 +30,7 @@ func (r *Runner) createTracer(ctxMeta context.Context, logger zerolog.Logger, wo
return func(state *pipeline.State) error { return func(state *pipeline.State) error {
stepLogger := logger.With(). stepLogger := logger.With().
Str("image", state.Pipeline.Step.Image). Str("image", state.Pipeline.Step.Image).
Str("workflowID", workflow.ID). Str("workflow_id", workflow.ID).
Err(state.Process.Error). Err(state.Process.Error).
Int("exit_code", state.Process.ExitCode). Int("exit_code", state.Process.ExitCode).
Bool("exited", state.Process.Exited). Bool("exited", state.Process.Exited).

View file

@ -17,17 +17,17 @@ type Peer struct {
mock.Mock mock.Mock
} }
// Done provides a mock function with given fields: c, id, state // Done provides a mock function with given fields: c, workflowID, state
func (_m *Peer) Done(c context.Context, id string, state rpc.State) error { func (_m *Peer) Done(c context.Context, workflowID string, state rpc.WorkflowState) error {
ret := _m.Called(c, id, state) ret := _m.Called(c, workflowID, state)
if len(ret) == 0 { if len(ret) == 0 {
panic("no return value specified for Done") panic("no return value specified for Done")
} }
var r0 error var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string, rpc.State) error); ok { if rf, ok := ret.Get(0).(func(context.Context, string, rpc.WorkflowState) error); ok {
r0 = rf(c, id, state) r0 = rf(c, workflowID, state)
} else { } else {
r0 = ret.Error(0) r0 = ret.Error(0)
} }
@ -35,9 +35,9 @@ func (_m *Peer) Done(c context.Context, id string, state rpc.State) error {
return r0 return r0
} }
// Extend provides a mock function with given fields: c, id // Extend provides a mock function with given fields: c, workflowID
func (_m *Peer) Extend(c context.Context, id string) error { func (_m *Peer) Extend(c context.Context, workflowID string) error {
ret := _m.Called(c, id) ret := _m.Called(c, workflowID)
if len(ret) == 0 { if len(ret) == 0 {
panic("no return value specified for Extend") panic("no return value specified for Extend")
@ -45,7 +45,7 @@ func (_m *Peer) Extend(c context.Context, id string) error {
var r0 error var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { if rf, ok := ret.Get(0).(func(context.Context, string) error); ok {
r0 = rf(c, id) r0 = rf(c, workflowID)
} else { } else {
r0 = ret.Error(0) r0 = ret.Error(0)
} }
@ -53,17 +53,17 @@ func (_m *Peer) Extend(c context.Context, id string) error {
return r0 return r0
} }
// Init provides a mock function with given fields: c, id, state // Init provides a mock function with given fields: c, workflowID, state
func (_m *Peer) Init(c context.Context, id string, state rpc.State) error { func (_m *Peer) Init(c context.Context, workflowID string, state rpc.WorkflowState) error {
ret := _m.Called(c, id, state) ret := _m.Called(c, workflowID, state)
if len(ret) == 0 { if len(ret) == 0 {
panic("no return value specified for Init") panic("no return value specified for Init")
} }
var r0 error var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string, rpc.State) error); ok { if rf, ok := ret.Get(0).(func(context.Context, string, rpc.WorkflowState) error); ok {
r0 = rf(c, id, state) r0 = rf(c, workflowID, state)
} else { } else {
r0 = ret.Error(0) r0 = ret.Error(0)
} }
@ -183,17 +183,17 @@ func (_m *Peer) UnregisterAgent(ctx context.Context) error {
return r0 return r0
} }
// Update provides a mock function with given fields: c, id, state // Update provides a mock function with given fields: c, stepID, state
func (_m *Peer) Update(c context.Context, id string, state rpc.State) error { func (_m *Peer) Update(c context.Context, stepID string, state rpc.StepState) error {
ret := _m.Called(c, id, state) ret := _m.Called(c, stepID, state)
if len(ret) == 0 { if len(ret) == 0 {
panic("no return value specified for Update") panic("no return value specified for Update")
} }
var r0 error var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string, rpc.State) error); ok { if rf, ok := ret.Get(0).(func(context.Context, string, rpc.StepState) error); ok {
r0 = rf(c, id, state) r0 = rf(c, stepID, state)
} else { } else {
r0 = ret.Error(0) r0 = ret.Error(0)
} }
@ -231,9 +231,9 @@ func (_m *Peer) Version(c context.Context) (*rpc.Version, error) {
return r0, r1 return r0, r1
} }
// Wait provides a mock function with given fields: c, id // Wait provides a mock function with given fields: c, workflowID
func (_m *Peer) Wait(c context.Context, id string) error { func (_m *Peer) Wait(c context.Context, workflowID string) error {
ret := _m.Called(c, id) ret := _m.Called(c, workflowID)
if len(ret) == 0 { if len(ret) == 0 {
panic("no return value specified for Wait") panic("no return value specified for Wait")
@ -241,7 +241,7 @@ func (_m *Peer) Wait(c context.Context, id string) error {
var r0 error var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { if rf, ok := ret.Get(0).(func(context.Context, string) error); ok {
r0 = rf(c, id) r0 = rf(c, workflowID)
} else { } else {
r0 = ret.Error(0) r0 = ret.Error(0)
} }

View file

@ -78,7 +78,7 @@ type Peer interface {
// Extend extends the workflow deadline // Extend extends the workflow deadline
Extend(c context.Context, workflowID string) error Extend(c context.Context, workflowID string) error
// Update updates the step state // Update updates the state of a step
Update(c context.Context, stepID string, state StepState) error Update(c context.Context, stepID string, state StepState) error
// Log writes the step log entry // Log writes the step log entry

View file

@ -16,4 +16,4 @@ package proto
// Version is the version of the woodpecker.proto file, // Version is the version of the woodpecker.proto file,
// IMPORTANT: increased by 1 each time it get changed. // IMPORTANT: increased by 1 each time it get changed.
const Version int32 = 8 const Version int32 = 9

File diff suppressed because it is too large Load diff

View file

@ -41,10 +41,15 @@ service Woodpecker {
// Basic Types // Basic Types
// //
message State { message StepState {
string step_uuid = 1; string step_uuid = 1;
bool exited = 2; int32 exit_code = 2;
int32 exit_code = 3; int64 started = 3;
int64 finished = 4;
string error = 5;
}
message WorkflowState {
int64 started = 4; int64 started = 4;
int64 finished = 5; int64 finished = 5;
string error = 6; string error = 6;
@ -78,7 +83,7 @@ message NextRequest {
message InitRequest { message InitRequest {
string id = 1; string id = 1;
State state = 2; WorkflowState state = 2;
} }
message WaitRequest { message WaitRequest {
@ -87,7 +92,7 @@ message WaitRequest {
message DoneRequest { message DoneRequest {
string id = 1; string id = 1;
State state = 2; WorkflowState state = 2;
} }
message ExtendRequest { message ExtendRequest {
@ -96,7 +101,7 @@ message ExtendRequest {
message UpdateRequest { message UpdateRequest {
string id = 1; string id = 1;
State state = 2; StepState state = 2;
} }
message LogRequest { message LogRequest {

View file

@ -48,7 +48,7 @@ type RPC struct {
pipelineCount *prometheus.CounterVec pipelineCount *prometheus.CounterVec
} }
// Next implements the rpc.Next function. // Next returns the next workflow to execute from the queue and blocks until one is available.
func (s *RPC) Next(c context.Context, agentFilter rpc.Filter) (*rpc.Workflow, error) { func (s *RPC) Next(c context.Context, agentFilter rpc.Filter) (*rpc.Workflow, error) {
if hostname, err := s.getHostnameFromContext(c); err == nil { if hostname, err := s.getHostnameFromContext(c); err == nil {
log.Debug().Msgf("agent connected: %s: polling", hostname) log.Debug().Msgf("agent connected: %s: polling", hostname)
@ -86,19 +86,19 @@ func (s *RPC) Next(c context.Context, agentFilter rpc.Filter) (*rpc.Workflow, er
} }
} }
// Wait implements the rpc.Wait function. // Wait blocks until the workflow with the given ID is done.
func (s *RPC) Wait(c context.Context, id string) error { func (s *RPC) Wait(c context.Context, id string) error {
return s.queue.Wait(c, id) return s.queue.Wait(c, id)
} }
// Extend implements the rpc.Extend function. // Extend extends the timeout of the workflow with the given ID.
func (s *RPC) Extend(c context.Context, id string) error { func (s *RPC) Extend(c context.Context, id string) error {
return s.queue.Extend(c, id) return s.queue.Extend(c, id)
} }
// Update implements the rpc.Update function. // Update updates the state of a step
func (s *RPC) Update(_ context.Context, id string, state rpc.StepState) error { func (s *RPC) Update(_ context.Context, _workflowID string, state rpc.StepState) error {
workflowID, err := strconv.ParseInt(id, 10, 64) workflowID, err := strconv.ParseInt(_workflowID, 10, 64)
if err != nil { if err != nil {
return err return err
} }
@ -162,7 +162,7 @@ func (s *RPC) Update(_ context.Context, id string, state rpc.StepState) error {
return nil return nil
} }
// Init implements the rpc.Init function. // Init initializes the workflow with the given ID.
func (s *RPC) Init(c context.Context, _workflowID string, state rpc.WorkflowState) error { func (s *RPC) Init(c context.Context, _workflowID string, state rpc.WorkflowState) error {
workflowID, err := strconv.ParseInt(_workflowID, 10, 64) workflowID, err := strconv.ParseInt(_workflowID, 10, 64)
if err != nil { if err != nil {
@ -193,10 +193,19 @@ func (s *RPC) Init(c context.Context, _workflowID string, state rpc.WorkflowStat
return err return err
} }
if currentPipeline.Status == model.StatusPending { // Init should only be called on pending pipelines
if currentPipeline, err = pipeline.UpdateToStatusRunning(s.store, *currentPipeline, state.Started); err != nil { if currentPipeline.Status != model.StatusPending {
log.Error().Err(err).Msgf("init: cannot update build_id %d state", currentPipeline.ID) log.Error().Msgf("pipeline %d is not pending", currentPipeline.ID)
} return errors.New("pipeline is not pending")
}
if currentPipeline, err = pipeline.UpdateToStatusRunning(s.store, *currentPipeline, state.Started); err != nil {
log.Error().Err(err).Msgf("init: cannot update build_id %d state", currentPipeline.ID)
}
workflow, err = pipeline.UpdateWorkflowToStatusRunning(s.store, *workflow, state)
if err != nil {
return err
} }
s.updateForgeStatus(c, repo, currentPipeline, workflow) s.updateForgeStatus(c, repo, currentPipeline, workflow)
@ -220,17 +229,12 @@ func (s *RPC) Init(c context.Context, _workflowID string, state rpc.WorkflowStat
s.pubsub.Publish(message) s.pubsub.Publish(message)
}() }()
workflow, err = pipeline.UpdateWorkflowToStatusStarted(s.store, *workflow, state)
if err != nil {
return err
}
s.updateForgeStatus(c, repo, currentPipeline, workflow)
return nil return nil
} }
// Done implements the rpc.Done function. // Done marks the workflow with the given ID as done.
func (s *RPC) Done(c context.Context, id string, state rpc.WorkflowState) error { func (s *RPC) Done(c context.Context, _workflowID string, state rpc.WorkflowState) error {
workflowID, err := strconv.ParseInt(id, 10, 64) workflowID, err := strconv.ParseInt(_workflowID, 10, 64)
if err != nil { if err != nil {
return err return err
} }
@ -261,7 +265,7 @@ func (s *RPC) Done(c context.Context, id string, state rpc.WorkflowState) error
logger := log.With(). logger := log.With().
Str("repo_id", fmt.Sprint(repo.ID)). Str("repo_id", fmt.Sprint(repo.ID)).
Str("pipeline_id", fmt.Sprint(currentPipeline.ID)). Str("pipeline_id", fmt.Sprint(currentPipeline.ID)).
Str("workflow_id", id).Logger() Str("workflow_id", _workflowID).Logger()
logger.Trace().Msgf("gRPC Done with state: %#v", state) logger.Trace().Msgf("gRPC Done with state: %#v", state)
@ -271,9 +275,9 @@ func (s *RPC) Done(c context.Context, id string, state rpc.WorkflowState) error
var queueErr error var queueErr error
if workflow.Failing() { if workflow.Failing() {
queueErr = s.queue.Error(c, id, fmt.Errorf("workflow finished with error %s", state.Error)) queueErr = s.queue.Error(c, _workflowID, fmt.Errorf("workflow finished with error %s", state.Error))
} else { } else {
queueErr = s.queue.Done(c, id, workflow.State) queueErr = s.queue.Done(c, _workflowID, workflow.State)
} }
if queueErr != nil { if queueErr != nil {
logger.Error().Err(queueErr).Msg("queue.Done: cannot ack workflow") logger.Error().Err(queueErr).Msg("queue.Done: cannot ack workflow")

View file

@ -97,11 +97,11 @@ func (s *WoodpeckerServer) Init(c context.Context, req *proto.InitRequest) (*pro
func (s *WoodpeckerServer) Update(c context.Context, req *proto.UpdateRequest) (*proto.Empty, error) { func (s *WoodpeckerServer) Update(c context.Context, req *proto.UpdateRequest) (*proto.Empty, error) {
state := rpc.StepState{ state := rpc.StepState{
Error: req.GetState().GetError(),
ExitCode: int(req.GetState().GetExitCode()),
Finished: req.GetState().GetFinished(),
Started: req.GetState().GetStarted(),
StepUUID: req.GetState().GetStepUuid(), StepUUID: req.GetState().GetStepUuid(),
ExitCode: int(req.GetState().GetExitCode()),
Started: req.GetState().GetStarted(),
Finished: req.GetState().GetFinished(),
Error: req.GetState().GetError(),
} }
res := new(proto.Empty) res := new(proto.Empty)
err := s.peer.Update(c, req.GetId(), state) err := s.peer.Update(c, req.GetId(), state)

View file

@ -20,7 +20,7 @@ import (
"go.woodpecker-ci.org/woodpecker/v2/server/store" "go.woodpecker-ci.org/woodpecker/v2/server/store"
) )
func UpdateWorkflowToStatusStarted(store store.Store, workflow model.Workflow, state rpc.WorkflowState) (*model.Workflow, error) { func UpdateWorkflowToStatusRunning(store store.Store, workflow model.Workflow, state rpc.WorkflowState) (*model.Workflow, error) {
workflow.Started = state.Started workflow.Started = state.Started
workflow.State = model.StatusRunning workflow.State = model.StatusRunning
return &workflow, store.WorkflowUpdate(&workflow) return &workflow, store.WorkflowUpdate(&workflow)