diff --git a/agent/rpc/client_grpc.go b/agent/rpc/client_grpc.go index e1dc0d8ef..21d6af30f 100644 --- a/agent/rpc/client_grpc.go +++ b/agent/rpc/client_grpc.go @@ -72,13 +72,13 @@ func (c *client) Version(ctx context.Context) (*rpc.Version, error) { } // Next returns the next workflow in the queue. -func (c *client) Next(ctx context.Context, f rpc.Filter) (*rpc.Workflow, error) { +func (c *client) Next(ctx context.Context, filter rpc.Filter) (*rpc.Workflow, error) { var res *proto.NextResponse var err error retry := c.newBackOff() req := new(proto.NextRequest) req.Filter = new(proto.Filter) - req.Filter.Labels = f.Labels + req.Filter.Labels = filter.Labels for { res, err = c.client.Next(ctx, req) if err == nil { @@ -135,10 +135,10 @@ func (c *client) Next(ctx context.Context, f rpc.Filter) (*rpc.Workflow, error) } // Wait blocks until the workflow is complete. -func (c *client) Wait(ctx context.Context, id string) (err error) { +func (c *client) Wait(ctx context.Context, workflowID string) (err error) { retry := c.newBackOff() req := new(proto.WaitRequest) - req.Id = id + req.Id = workflowID for { _, err = c.client.Wait(ctx, req) if err == nil { @@ -273,10 +273,10 @@ func (c *client) Done(ctx context.Context, workflowID string, state rpc.Workflow } // Extend extends the workflow deadline. -func (c *client) Extend(ctx context.Context, id string) (err error) { +func (c *client) Extend(ctx context.Context, workflowID string) (err error) { retry := c.newBackOff() req := new(proto.ExtendRequest) - req.Id = id + req.Id = workflowID for { _, err = c.client.Extend(ctx, req) if err == nil { @@ -317,10 +317,10 @@ func (c *client) Extend(ctx context.Context, id string) (err error) { } // Update updates the workflow state. -func (c *client) Update(ctx context.Context, id string, state rpc.StepState) (err error) { +func (c *client) Update(ctx context.Context, workflowID string, state rpc.StepState) (err error) { retry := c.newBackOff() req := new(proto.UpdateRequest) - req.Id = id + req.Id = workflowID req.State = new(proto.StepState) req.State.StepUuid = state.StepUUID req.State.Started = state.Started @@ -367,7 +367,7 @@ func (c *client) Update(ctx context.Context, id string, state rpc.StepState) (er return nil } -// Log writes the workflow log entry. +// Log writes the step log entry. func (c *client) Log(ctx context.Context, logEntry *rpc.LogEntry) (err error) { retry := c.newBackOff() req := new(proto.LogRequest) diff --git a/agent/runner.go b/agent/runner.go index ea3016e36..0024b7888 100644 --- a/agent/runner.go +++ b/agent/runner.go @@ -105,7 +105,6 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { //nolint:co if err := r.client.Wait(workflowCtx, workflow.ID); err != nil { canceled = true logger.Warn().Err(err).Msg("cancel signal received") - cancel() } else { logger.Debug().Msg("done listening for cancel signal") @@ -117,11 +116,10 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { //nolint:co select { case <-workflowCtx.Done(): logger.Debug().Msg("pipeline done") - return + case <-time.After(time.Minute): logger.Debug().Msg("pipeline lease renewed") - if err := r.client.Extend(workflowCtx, workflow.ID); err != nil { log.Error().Err(err).Msg("extending pipeline deadline failed") } @@ -144,7 +142,7 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { //nolint:co pipeline.WithContext(workflowCtx), pipeline.WithTaskUUID(fmt.Sprint(workflow.ID)), pipeline.WithLogger(r.createLogger(logger, &uploads, workflow)), - pipeline.WithTracer(r.createTracer(ctxMeta, logger, workflow)), + pipeline.WithTracer(r.createTracer(ctxMeta, &uploads, logger, workflow)), pipeline.WithBackend(*r.backend), pipeline.WithDescription(map[string]string{ "workflow_id": workflow.ID, @@ -170,9 +168,9 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { //nolint:co Bool("canceled", canceled). Msg("workflow finished") - logger.Debug().Msg("uploading logs ...") + logger.Debug().Msg("uploading logs and traces / states ...") uploads.Wait() - logger.Debug().Msg("uploaded logs") + logger.Debug().Msg("uploaded logs and traces / states") logger.Debug(). Str("error", state.Error). diff --git a/agent/tracer.go b/agent/tracer.go index 9f3588126..8f6e2bf53 100644 --- a/agent/tracer.go +++ b/agent/tracer.go @@ -18,6 +18,7 @@ import ( "context" "runtime" "strconv" + "sync" "time" "github.com/rs/zerolog" @@ -26,11 +27,13 @@ import ( "go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc" ) -func (r *Runner) createTracer(ctxMeta context.Context, logger zerolog.Logger, workflow *rpc.Workflow) pipeline.TraceFunc { +func (r *Runner) createTracer(ctxMeta context.Context, uploads *sync.WaitGroup, logger zerolog.Logger, workflow *rpc.Workflow) pipeline.TraceFunc { return func(state *pipeline.State) error { + uploads.Add(1) + stepLogger := logger.With(). Str("image", state.Pipeline.Step.Image). - Str("workflowID", workflow.ID). + Str("workflow_id", workflow.ID). Err(state.Process.Error). Int("exit_code", state.Process.ExitCode). Bool("exited", state.Process.Exited). @@ -57,6 +60,7 @@ func (r *Runner) createTracer(ctxMeta context.Context, logger zerolog.Logger, wo } stepLogger.Debug().Msg("update step status complete") + uploads.Done() }() if state.Process.Exited { return nil diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index 6e9d9f181..2b732b444 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -258,9 +258,9 @@ func (r *Runtime) exec(step *backend.Step) (*backend.State, error) { return nil, nil } - // Some pipeline backends, such as local, will close the pipe from Tail on Wait, - // so first make sure all reading has finished. + // We wait until all data was logged. (Needed for some backends like local as WaitStep kills the log stream) wg.Wait() + waitState, err := r.engine.WaitStep(r.ctx, step, r.taskUUID) if err != nil { if errors.Is(err, context.Canceled) {