Make pipeline runtime log with description (#970)

* introduce runtime descriptors to pipeline runtime

* nit return orig error at traceStep()

* more logging

* refactor

Co-authored-by: Zav Shotan <zshotan@bloomberg.net>
This commit is contained in:
6543 2022-06-15 18:11:20 +02:00 committed by GitHub
parent 9f7b72f9a9
commit 068063655b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 83 additions and 12 deletions

View file

@ -81,17 +81,20 @@ func (r *Runner) Run(ctx context.Context) error {
timeout = time.Duration(minutes) * time.Minute
}
repoName := extractRepositoryName(work.Config) // hack
buildNumber := extractBuildNumber(work.Config) // hack
r.counter.Add(
work.ID,
timeout,
extractRepositoryName(work.Config), // hack
extractBuildNumber(work.Config), // hack
repoName,
buildNumber,
)
defer r.counter.Done(work.ID)
logger := log.With().
Str("repo", extractRepositoryName(work.Config)). // hack
Str("build", extractBuildNumber(work.Config)). // hack
Str("repo", repoName).
Str("build", buildNumber).
Str("id", work.ID).
Logger()
@ -308,6 +311,11 @@ func (r *Runner) Run(ctx context.Context) error {
pipeline.WithLogger(defaultLogger),
pipeline.WithTracer(defaultTracer),
pipeline.WithEngine(*r.engine),
pipeline.WithDescription(map[string]string{
"ID": work.ID,
"Repo": repoName,
"Build": buildNumber,
}),
).Run()
state.Finished = time.Now().Unix()

View file

@ -201,6 +201,9 @@ func execWithAxis(c *cli.Context, file, repoPath string, axis matrix.Axis) error
pipeline.WithTracer(pipeline.DefaultTracer),
pipeline.WithLogger(defaultLogger),
pipeline.WithEngine(engine),
pipeline.WithDescription(map[string]string{
"CLI": "exec",
}),
).Run()
}

View file

@ -36,3 +36,9 @@ func WithContext(ctx context.Context) Option {
r.ctx = ctx
}
}
func WithDescription(desc map[string]string) Option {
return func(r *Runtime) {
r.Description = desc
}
}

View file

@ -5,6 +5,7 @@ import (
"strings"
"time"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"golang.org/x/sync/errgroup"
@ -40,12 +41,15 @@ type Runtime struct {
ctx context.Context
tracer Tracer
logger Logger
Description map[string]string // The runtime descriptors.
}
// New returns a new runtime using the specified runtime
// configuration and runtime engine.
func New(spec *backend.Config, opts ...Option) *Runtime {
r := new(Runtime)
r.Description = map[string]string{}
r.spec = spec
r.ctx = context.Background()
for _, opts := range opts {
@ -54,11 +58,33 @@ func New(spec *backend.Config, opts ...Option) *Runtime {
return r
}
func (r *Runtime) MakeLogger() zerolog.Logger {
logCtx := log.With()
for key, val := range r.Description {
logCtx = logCtx.Str(key, val)
}
return logCtx.Logger()
}
// Starts the execution of the pipeline and waits for it to complete
func (r *Runtime) Run() error {
logger := r.MakeLogger()
logger.Debug().Msgf("Executing %d stages, in order of:", len(r.spec.Stages))
for _, stage := range r.spec.Stages {
steps := []string{}
for _, step := range stage.Steps {
steps = append(steps, step.Name)
}
logger.Debug().
Str("Stage", stage.Name).
Str("Steps", strings.Join(steps, ",")).
Msg("stage")
}
defer func() {
if err := r.engine.Destroy(r.ctx, r.spec); err != nil {
log.Error().Err(err).Msg("could not destroy pipeline")
logger.Error().Err(err).Msg("could not destroy engine")
}
}()
@ -104,13 +130,17 @@ func (r *Runtime) traceStep(processState *backend.State, err error, step *backen
state.Process = processState // empty
state.Pipeline.Error = r.err
return r.tracer.Trace(state)
if traceErr := r.tracer.Trace(state); traceErr != nil {
return traceErr
}
return err
}
// Executes a set of parallel steps
func (r *Runtime) execAll(steps []*backend.Step) <-chan error {
var g errgroup.Group
done := make(chan error)
logger := r.MakeLogger()
for _, step := range steps {
// required since otherwise the loop variable
@ -119,10 +149,21 @@ func (r *Runtime) execAll(steps []*backend.Step) <-chan error {
step := step
g.Go(func() error {
// Case the pipeline was already complete.
logger.Debug().
Str("Step", step.Name).
Msg("Prepare")
switch {
case r.err != nil && !step.OnFailure:
logger.Debug().
Str("Step", step.Name).
Err(r.err).
Msgf("Skipped due to OnFailure=%t", step.OnFailure)
return nil
case r.err == nil && !step.OnSuccess:
logger.Debug().
Str("Step", step.Name).
Msgf("Skipped due to OnSuccess=%t", step.OnSuccess)
return nil
}
@ -132,15 +173,26 @@ func (r *Runtime) execAll(steps []*backend.Step) <-chan error {
return err
}
logger.Debug().
Str("Step", step.Name).
Msg("Executing")
processState, err := r.exec(step)
// Return the error after tracing it.
traceErr := r.traceStep(processState, err, step)
if traceErr != nil {
return traceErr
logger.Debug().
Str("Step", step.Name).
Msg("Complete")
// if we got a nil process but an error state
// then we need to log the internal error to the step.
if r.logger != nil && err != nil && processState == nil {
_ = r.logger.Log(step, multipart.New(strings.NewReader(
"Backend engine error while running step: "+err.Error(),
)))
}
return err
// Return the error after tracing it.
return r.traceStep(processState, err, step)
})
}
@ -171,8 +223,10 @@ func (r *Runtime) exec(step *backend.Step) (*backend.State, error) {
}
go func() {
logger := r.MakeLogger()
if err := r.logger.Log(step, multipart.New(rc)); err != nil {
log.Error().Err(err).Msg("process logging failed")
logger.Error().Err(err).Msg("process logging failed")
}
_ = rc.Close()
}()