Wait for tracer to be done before finishing workflow (#4068)

This commit is contained in:
Anbraten 2024-08-30 11:44:56 +02:00 committed by GitHub
parent e2a43e8467
commit 599dd97d1b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 21 additions and 19 deletions

View file

@ -72,13 +72,13 @@ func (c *client) Version(ctx context.Context) (*rpc.Version, error) {
}
// Next returns the next workflow in the queue.
func (c *client) Next(ctx context.Context, f rpc.Filter) (*rpc.Workflow, error) {
func (c *client) Next(ctx context.Context, filter rpc.Filter) (*rpc.Workflow, error) {
var res *proto.NextResponse
var err error
retry := c.newBackOff()
req := new(proto.NextRequest)
req.Filter = new(proto.Filter)
req.Filter.Labels = f.Labels
req.Filter.Labels = filter.Labels
for {
res, err = c.client.Next(ctx, req)
if err == nil {
@ -135,10 +135,10 @@ func (c *client) Next(ctx context.Context, f rpc.Filter) (*rpc.Workflow, error)
}
// Wait blocks until the workflow is complete.
func (c *client) Wait(ctx context.Context, id string) (err error) {
func (c *client) Wait(ctx context.Context, workflowID string) (err error) {
retry := c.newBackOff()
req := new(proto.WaitRequest)
req.Id = id
req.Id = workflowID
for {
_, err = c.client.Wait(ctx, req)
if err == nil {
@ -273,10 +273,10 @@ func (c *client) Done(ctx context.Context, workflowID string, state rpc.Workflow
}
// Extend extends the workflow deadline.
func (c *client) Extend(ctx context.Context, id string) (err error) {
func (c *client) Extend(ctx context.Context, workflowID string) (err error) {
retry := c.newBackOff()
req := new(proto.ExtendRequest)
req.Id = id
req.Id = workflowID
for {
_, err = c.client.Extend(ctx, req)
if err == nil {
@ -317,10 +317,10 @@ func (c *client) Extend(ctx context.Context, id string) (err error) {
}
// Update updates the workflow state.
func (c *client) Update(ctx context.Context, id string, state rpc.StepState) (err error) {
func (c *client) Update(ctx context.Context, workflowID string, state rpc.StepState) (err error) {
retry := c.newBackOff()
req := new(proto.UpdateRequest)
req.Id = id
req.Id = workflowID
req.State = new(proto.StepState)
req.State.StepUuid = state.StepUUID
req.State.Started = state.Started
@ -367,7 +367,7 @@ func (c *client) Update(ctx context.Context, id string, state rpc.StepState) (er
return nil
}
// Log writes the workflow log entry.
// Log writes the step log entry.
func (c *client) Log(ctx context.Context, logEntry *rpc.LogEntry) (err error) {
retry := c.newBackOff()
req := new(proto.LogRequest)

View file

@ -105,7 +105,6 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { //nolint:co
if err := r.client.Wait(workflowCtx, workflow.ID); err != nil {
canceled = true
logger.Warn().Err(err).Msg("cancel signal received")
cancel()
} else {
logger.Debug().Msg("done listening for cancel signal")
@ -117,11 +116,10 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { //nolint:co
select {
case <-workflowCtx.Done():
logger.Debug().Msg("pipeline done")
return
case <-time.After(time.Minute):
logger.Debug().Msg("pipeline lease renewed")
if err := r.client.Extend(workflowCtx, workflow.ID); err != nil {
log.Error().Err(err).Msg("extending pipeline deadline failed")
}
@ -144,7 +142,7 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { //nolint:co
pipeline.WithContext(workflowCtx),
pipeline.WithTaskUUID(fmt.Sprint(workflow.ID)),
pipeline.WithLogger(r.createLogger(logger, &uploads, workflow)),
pipeline.WithTracer(r.createTracer(ctxMeta, logger, workflow)),
pipeline.WithTracer(r.createTracer(ctxMeta, &uploads, logger, workflow)),
pipeline.WithBackend(*r.backend),
pipeline.WithDescription(map[string]string{
"workflow_id": workflow.ID,
@ -170,9 +168,9 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { //nolint:co
Bool("canceled", canceled).
Msg("workflow finished")
logger.Debug().Msg("uploading logs ...")
logger.Debug().Msg("uploading logs and traces / states ...")
uploads.Wait()
logger.Debug().Msg("uploaded logs")
logger.Debug().Msg("uploaded logs and traces / states")
logger.Debug().
Str("error", state.Error).

View file

@ -18,6 +18,7 @@ import (
"context"
"runtime"
"strconv"
"sync"
"time"
"github.com/rs/zerolog"
@ -26,11 +27,13 @@ import (
"go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc"
)
func (r *Runner) createTracer(ctxMeta context.Context, logger zerolog.Logger, workflow *rpc.Workflow) pipeline.TraceFunc {
func (r *Runner) createTracer(ctxMeta context.Context, uploads *sync.WaitGroup, logger zerolog.Logger, workflow *rpc.Workflow) pipeline.TraceFunc {
return func(state *pipeline.State) error {
uploads.Add(1)
stepLogger := logger.With().
Str("image", state.Pipeline.Step.Image).
Str("workflowID", workflow.ID).
Str("workflow_id", workflow.ID).
Err(state.Process.Error).
Int("exit_code", state.Process.ExitCode).
Bool("exited", state.Process.Exited).
@ -57,6 +60,7 @@ func (r *Runner) createTracer(ctxMeta context.Context, logger zerolog.Logger, wo
}
stepLogger.Debug().Msg("update step status complete")
uploads.Done()
}()
if state.Process.Exited {
return nil

View file

@ -258,9 +258,9 @@ func (r *Runtime) exec(step *backend.Step) (*backend.State, error) {
return nil, nil
}
// Some pipeline backends, such as local, will close the pipe from Tail on Wait,
// so first make sure all reading has finished.
// We wait until all data was logged. (Needed for some backends like local as WaitStep kills the log stream)
wg.Wait()
waitState, err := r.engine.WaitStep(r.ctx, step, r.taskUUID)
if err != nil {
if errors.Is(err, context.Canceled) {