woodpecker/pipeline/pipeline.go
Anbraten c1a8884d62
Add backend selection for agent (#463)
- add backend selection option
- by default it will auto-detect a backend
2021-11-26 03:34:48 +01:00

189 lines
3.4 KiB
Go

package pipeline
import (
"context"
"strings"
"time"
"github.com/rs/zerolog/log"
"golang.org/x/sync/errgroup"
backend "github.com/woodpecker-ci/woodpecker/pipeline/backend/types"
"github.com/woodpecker-ci/woodpecker/pipeline/multipart"
)
type (
// State defines the pipeline and process state.
State struct {
// Global state of the pipeline.
Pipeline struct {
// Pipeline time started
Time int64 `json:"time"`
// Current pipeline step
Step *backend.Step `json:"step"`
// Current pipeline error state
Error error `json:"error"`
}
// Current process state.
Process *backend.State
}
)
// Runtime is a configuration runtime.
type Runtime struct {
err error
spec *backend.Config
engine backend.Engine
started int64
ctx context.Context
tracer Tracer
logger Logger
}
// New returns a new runtime using the specified runtime
// configuration and runtime engine.
func New(spec *backend.Config, opts ...Option) *Runtime {
r := new(Runtime)
r.spec = spec
r.ctx = context.Background()
for _, opts := range opts {
opts(r)
}
return r
}
// Run starts the runtime and waits for it to complete.
func (r *Runtime) Run() error {
defer func() {
if err := r.engine.Destroy(r.ctx, r.spec); err != nil {
log.Error().Err(err).Msg("could not destroy engine")
}
}()
r.started = time.Now().Unix()
if err := r.engine.Setup(r.ctx, r.spec); err != nil {
return err
}
for _, stage := range r.spec.Stages {
select {
case <-r.ctx.Done():
return ErrCancel
case err := <-r.execAll(stage.Steps):
if err != nil {
r.err = err
}
}
}
return r.err
}
//
//
//
func (r *Runtime) execAll(procs []*backend.Step) <-chan error {
var g errgroup.Group
done := make(chan error)
for _, proc := range procs {
proc := proc
g.Go(func() error {
return r.exec(proc)
})
}
go func() {
done <- g.Wait()
close(done)
}()
return done
}
//
//
//
func (r *Runtime) exec(proc *backend.Step) error {
switch {
case r.err != nil && !proc.OnFailure:
return nil
case r.err == nil && !proc.OnSuccess:
return nil
}
if r.tracer != nil {
state := new(State)
state.Pipeline.Time = r.started
state.Pipeline.Error = r.err
state.Pipeline.Step = proc
state.Process = new(backend.State) // empty
if err := r.tracer.Trace(state); err == ErrSkip {
return nil
} else if err != nil {
return err
}
}
// TODO: using DRONE_ will be deprecated with 0.15.0. remove fallback with following release
for key, value := range proc.Environment {
if strings.HasPrefix(key, "CI_") {
proc.Environment[strings.Replace(key, "CI_", "DRONE_", 1)] = value
}
}
if err := r.engine.Exec(r.ctx, proc); err != nil {
return err
}
if r.logger != nil {
rc, err := r.engine.Tail(r.ctx, proc)
if err != nil {
return err
}
go func() {
if err := r.logger.Log(proc, multipart.New(rc)); err != nil {
log.Error().Err(err).Msg("process logging failed")
}
_ = rc.Close()
}()
}
if proc.Detached {
return nil
}
wait, err := r.engine.Wait(r.ctx, proc)
if err != nil {
return err
}
if r.tracer != nil {
state := new(State)
state.Pipeline.Time = r.started
state.Pipeline.Error = r.err
state.Pipeline.Step = proc
state.Process = wait
if err := r.tracer.Trace(state); err != nil {
return err
}
}
if wait.OOMKilled {
return &OomError{
Name: proc.Name,
Code: wait.ExitCode,
}
} else if wait.ExitCode != 0 {
return &ExitError{
Name: proc.Name,
Code: wait.ExitCode,
}
}
return nil
}