From f7fe9edd0b90dfe8414bd6b9c7c61c8a3304da6c Mon Sep 17 00:00:00 2001 From: Laszlo Fogas Date: Sat, 13 Apr 2019 20:16:30 +0200 Subject: [PATCH] Using Runner in server --- cmd/drone-agent/agent.go | 10 +- cmd/drone-server/server.go | 175 ++---------------- runner/runner.go | 356 +++++++++++++++++++++++++++++++++++++ runner/stats.go | 65 +++++++ 4 files changed, 445 insertions(+), 161 deletions(-) create mode 100644 runner/runner.go create mode 100644 runner/stats.go diff --git a/cmd/drone-agent/agent.go b/cmd/drone-agent/agent.go index a1bec24c5..c82e906e5 100644 --- a/cmd/drone-agent/agent.go +++ b/cmd/drone-agent/agent.go @@ -24,6 +24,7 @@ import ( "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" + "github.com/laszlocph/drone-oss-08/cncd/pipeline/pipeline/backend/docker" "github.com/laszlocph/drone-oss-08/cncd/pipeline/pipeline/rpc" "github.com/laszlocph/drone-oss-08/runner" @@ -120,7 +121,14 @@ func loop(c *cli.Context) error { if sigterm.IsSet() { return } - r := runner.NewRunner(client, filter, hostname, counter) + + engine, err := docker.NewEnv() + if err != nil { + log.Error().Err(err).Msg("cannot create docker client") + return + } + + r := runner.NewRunner(client, filter, hostname, counter, &engine) if err := r.Run(ctx); err != nil { log.Error().Err(err).Msg("pipeline done with error") return diff --git a/cmd/drone-server/server.go b/cmd/drone-server/server.go index f072b4e4d..e811de2d2 100644 --- a/cmd/drone-server/server.go +++ b/cmd/drone-server/server.go @@ -24,13 +24,10 @@ import ( "net/url" "os" "path/filepath" - "strconv" "strings" "time" - "github.com/laszlocph/drone-oss-08/cncd/pipeline/pipeline" - "github.com/laszlocph/drone-oss-08/cncd/pipeline/pipeline/backend" - kubernetes "github.com/laszlocph/drone-oss-08/cncd/pipeline/pipeline/backend/kubernetes" + "github.com/laszlocph/drone-oss-08/runner" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" @@ -39,6 +36,7 @@ import ( "golang.org/x/sync/errgroup" "github.com/laszlocph/drone-oss-08/cncd/logging" + "github.com/laszlocph/drone-oss-08/cncd/pipeline/pipeline/backend/kubernetes" "github.com/laszlocph/drone-oss-08/cncd/pipeline/pipeline/interrupt" "github.com/laszlocph/drone-oss-08/cncd/pipeline/pipeline/rpc" "github.com/laszlocph/drone-oss-08/cncd/pipeline/pipeline/rpc/proto" @@ -58,14 +56,6 @@ import ( oldcontext "golang.org/x/net/context" ) -// NOTE we need to limit the size of the logs and files that we upload. -// The maximum grpc payload size is 4194304. So until we implement streaming -// for uploads, we need to set these limits below the maximum. -const ( - maxLogsUpload = 2000000 // this is per step - maxFileUpload = 1000000 -) - var flags = []cli.Flag{ cli.StringFlag{ EnvVar: "TEST_RUNE", @@ -582,6 +572,12 @@ func server(c *cli.Context) error { if c.Bool("kubernetes") { workEngine := droneserver.NewRPC(remote_, droneserver.Config.Services.Queue, droneserver.Config.Services.Pubsub, droneserver.Config.Services.Logs, store_) + var counter = &runner.State{ + Polling: 0, + Running: 0, + Metadata: map[string]runner.Info{}, + } + g.Go(func() error { logrus.Infoln("Starting Kubernetes backend") for { @@ -598,7 +594,6 @@ func server(c *cli.Context) error { } log.Print("pipeline: request next execution\n") - work, err := workEngine.Next(ctx, rpc.NoFilter) if err != nil { logrus.Error(err) @@ -606,144 +601,14 @@ func server(c *cli.Context) error { } log.Printf("pipeline: received next execution: %s", work.ID) - go func() { - logger := log.With(). - Str("repo", extractRepositoryName(work.Config)). // hack - Str("build", extractBuildNumber(work.Config)). // hack - Str("id", work.ID). - Logger() + engine, err := kubernetes.New() + if err != nil { + logrus.Error(err) + return err + } - engine, err := kubernetes.New() - if err != nil { - logrus.Error(err) - return - } - - timeout := time.Hour - if minutes := work.Timeout; minutes != 0 { - timeout = time.Duration(minutes) * time.Minute - } - - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - - cancelled := abool.New() - go func() { - werr := workEngine.Wait(ctx, work.ID) - if werr != nil { - cancelled.SetTo(true) // TODO verify error is really an error - log.Printf("pipeline: cancel signal received: %s: %s", work.ID, werr) - cancel() - } else { - log.Printf("pipeline: cancel channel closed: %s", work.ID) - } - }() - - go func() { - for { - select { - case <-ctx.Done(): - log.Printf("pipeline: cancel ping loop: %s", work.ID) - return - case <-time.After(time.Minute): - log.Printf("pipeline: ping queue: %s", work.ID) - workEngine.Extend(ctx, work.ID) - } - } - }() - - state := rpc.State{} - state.Started = time.Now().Unix() - err = workEngine.Init(context.Background(), work.ID, state) - if err != nil { - log.Printf("pipeline: error signaling pipeline init: %s: %s", work.ID, err) - } - - uploads, defaultLogger := agent.DefaultLogger(logger, *work, r.client, ctxmeta) - - defaultTracer := pipeline.TraceFunc(func(state *pipeline.State) error { - procState := rpc.State{ - Proc: state.Pipeline.Step.Alias, - Exited: state.Process.Exited, - ExitCode: state.Process.ExitCode, - Started: time.Now().Unix(), // TODO do not do this - Finished: time.Now().Unix(), - } - defer func() { - if uerr := workEngine.Update(context.Background(), work.ID, procState); uerr != nil { - log.Printf("Pipeine: error updating pipeline step status: %s: %s: %s", work.ID, procState.Proc, uerr) - } - }() - if state.Process.Exited { - return nil - } - if state.Pipeline.Step.Environment == nil { - state.Pipeline.Step.Environment = map[string]string{} - } - state.Pipeline.Step.Environment["CI_BUILD_STATUS"] = "success" - state.Pipeline.Step.Environment["CI_BUILD_STARTED"] = strconv.FormatInt(state.Pipeline.Time, 10) - state.Pipeline.Step.Environment["CI_BUILD_FINISHED"] = strconv.FormatInt(time.Now().Unix(), 10) - - state.Pipeline.Step.Environment["CI_JOB_STATUS"] = "success" - state.Pipeline.Step.Environment["CI_JOB_STARTED"] = strconv.FormatInt(state.Pipeline.Time, 10) - state.Pipeline.Step.Environment["CI_JOB_FINISHED"] = strconv.FormatInt(time.Now().Unix(), 10) - - if state.Pipeline.Error != nil { - state.Pipeline.Step.Environment["CI_BUILD_STATUS"] = "failure" - state.Pipeline.Step.Environment["CI_JOB_STATUS"] = "failure" - } - return nil - }) - - err = pipeline.New(work.Config, - pipeline.WithContext(ctx), - pipeline.WithLogger(defaultLogger), - pipeline.WithTracer(defaultTracer), - pipeline.WithEngine(engine), - ).Run() - - state.Finished = time.Now().Unix() - state.Exited = true - if err != nil { - switch xerr := err.(type) { - case *pipeline.ExitError: - state.ExitCode = xerr.Code - default: - state.ExitCode = 1 - state.Error = err.Error() - } - if cancelled.IsSet() { - state.ExitCode = 137 - } - } - - logger.Debug(). - Str("error", state.Error). - Int("exit_code", state.ExitCode). - Msg("pipeline complete") - - logger.Debug(). - Msg("uploading logs") - - uploads.Wait() - - logger.Debug(). - Msg("uploading logs complete") - - logger.Debug(). - Str("error", state.Error). - Int("exit_code", state.ExitCode). - Msg("updating pipeline status") - - err = workEngine.Done(ctx, work.ID, state) - if err != nil { - logger.Error().Err(err). - Msg("updating pipeline status failed") - } else { - logger.Debug(). - Msg("updating pipeline status complete") - } - }() + r := runner.NewRunner(&workEngine, rpc.NoFilter, "", counter, &engine) + go r.ProcessWork(*work, ctx) } } }) @@ -946,13 +811,3 @@ func cacheDir() string { } return filepath.Join(os.Getenv("HOME"), ".cache", base) } - -// extract repository name from the configuration -func extractRepositoryName(config *backend.Config) string { - return config.Stages[0].Steps[0].Environment["DRONE_REPO"] -} - -// extract build number from the configuration -func extractBuildNumber(config *backend.Config) string { - return config.Stages[0].Steps[0].Environment["DRONE_BUILD_NUMBER"] -} diff --git a/runner/runner.go b/runner/runner.go new file mode 100644 index 000000000..d3fad8d87 --- /dev/null +++ b/runner/runner.go @@ -0,0 +1,356 @@ +package runner + +import ( + "context" + "encoding/json" + "io" + "io/ioutil" + "strconv" + "sync" + "time" + + "github.com/laszlocph/drone-oss-08/cncd/pipeline/pipeline" + "github.com/laszlocph/drone-oss-08/cncd/pipeline/pipeline/backend" + "github.com/laszlocph/drone-oss-08/cncd/pipeline/pipeline/multipart" + "github.com/laszlocph/drone-oss-08/cncd/pipeline/pipeline/rpc" + "github.com/rs/zerolog/log" + "github.com/tevino/abool" + "google.golang.org/grpc/metadata" +) + +// NOTE we need to limit the size of the logs and files that we upload. +// The maximum grpc payload size is 4194304. So until we implement streaming +// for uploads, we need to set these limits below the maximum. +const ( + maxLogsUpload = 2000000 // this is per step + maxFileUpload = 1000000 +) + +type Runner struct { + client rpc.Peer + filter rpc.Filter + hostname string + counter *State + engine *backend.Engine +} + +func NewRunner(workEngine rpc.Peer, f rpc.Filter, h string, state *State, backend *backend.Engine) Runner { + return Runner{ + client: workEngine, + filter: f, + hostname: h, + counter: state, + engine: backend, + } +} + +func (r *Runner) Run(ctx context.Context) error { + log.Debug(). + Msg("request next execution") + + meta, _ := metadata.FromOutgoingContext(ctx) + ctxmeta := metadata.NewOutgoingContext(context.Background(), meta) + + // get the next job from the queue + work, err := r.client.Next(ctx, r.filter) + if err != nil { + return err + } + if work == nil { + return nil + } + + err = r.ProcessWork(*work, ctxmeta) + if err != nil { + return err + } + + return nil +} + +func (r *Runner) ProcessWork(work rpc.Pipeline, ctx context.Context) error { + timeout := time.Hour + if minutes := work.Timeout; minutes != 0 { + timeout = time.Duration(minutes) * time.Minute + } + + r.counter.Add( + work.ID, + timeout, + extractRepositoryName(work.Config), // hack + extractBuildNumber(work.Config), // hack + ) + defer r.counter.Done(work.ID) + + logger := log.With(). + Str("repo", extractRepositoryName(work.Config)). // hack + Str("build", extractBuildNumber(work.Config)). // hack + Str("id", work.ID). + Logger() + + logger.Debug(). + Msg("received execution") + + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + cancelled := abool.New() + go func() { + logger.Debug(). + Msg("listen for cancel signal") + + if werr := r.client.Wait(ctx, work.ID); werr != nil { + cancelled.SetTo(true) + logger.Warn(). + Err(werr). + Msg("cancel signal received") + + cancel() + } else { + logger.Debug(). + Msg("stop listening for cancel signal") + } + }() + + go func() { + for { + select { + case <-ctx.Done(): + logger.Debug(). + Msg("pipeline done") + + return + case <-time.After(time.Minute): + logger.Debug(). + Msg("pipeline lease renewed") + + r.client.Extend(ctx, work.ID) + } + } + }() + + state := rpc.State{} + state.Started = time.Now().Unix() + + err := r.client.Init(ctx, work.ID, state) + if err != nil { + logger.Error(). + Err(err). + Msg("pipeline initialization failed") + } + + var uploads sync.WaitGroup + defaultLogger := pipeline.LogFunc(func(proc *backend.Step, rc multipart.Reader) error { + + loglogger := logger.With(). + Str("image", proc.Image). + Str("stage", proc.Alias). + Logger() + + part, rerr := rc.NextPart() + if rerr != nil { + return rerr + } + uploads.Add(1) + + var secrets []string + for _, secret := range work.Config.Secrets { + if secret.Mask { + secrets = append(secrets, secret.Value) + } + } + + loglogger.Debug().Msg("log stream opened") + + limitedPart := io.LimitReader(part, maxLogsUpload) + logstream := rpc.NewLineWriter(r.client, work.ID, proc.Alias, secrets...) + io.Copy(logstream, limitedPart) + + loglogger.Debug().Msg("log stream copied") + + file := &rpc.File{} + file.Mime = "application/json+logs" + file.Proc = proc.Alias + file.Name = "logs.json" + file.Data, _ = json.Marshal(logstream.Lines()) + file.Size = len(file.Data) + file.Time = time.Now().Unix() + + loglogger.Debug(). + Msg("log stream uploading") + + if serr := r.client.Upload(ctx, work.ID, file); serr != nil { + loglogger.Error(). + Err(serr). + Msg("log stream upload error") + } + + loglogger.Debug(). + Msg("log stream upload complete") + + defer func() { + loglogger.Debug(). + Msg("log stream closed") + + uploads.Done() + }() + + part, rerr = rc.NextPart() + if rerr != nil { + return nil + } + // TODO should be configurable + limitedPart = io.LimitReader(part, maxFileUpload) + file = &rpc.File{} + file.Mime = part.Header().Get("Content-Type") + file.Proc = proc.Alias + file.Name = part.FileName() + file.Data, _ = ioutil.ReadAll(limitedPart) + file.Size = len(file.Data) + file.Time = time.Now().Unix() + file.Meta = map[string]string{} + + for key, value := range part.Header() { + file.Meta[key] = value[0] + } + + loglogger.Debug(). + Str("file", file.Name). + Str("mime", file.Mime). + Msg("file stream uploading") + + if serr := r.client.Upload(ctx, work.ID, file); serr != nil { + loglogger.Error(). + Err(serr). + Str("file", file.Name). + Str("mime", file.Mime). + Msg("file stream upload error") + } + + loglogger.Debug(). + Str("file", file.Name). + Str("mime", file.Mime). + Msg("file stream upload complete") + return nil + }) + + defaultTracer := pipeline.TraceFunc(func(state *pipeline.State) error { + proclogger := logger.With(). + Str("image", state.Pipeline.Step.Image). + Str("stage", state.Pipeline.Step.Alias). + Int("exit_code", state.Process.ExitCode). + Bool("exited", state.Process.Exited). + Logger() + + procState := rpc.State{ + Proc: state.Pipeline.Step.Alias, + Exited: state.Process.Exited, + ExitCode: state.Process.ExitCode, + Started: time.Now().Unix(), // TODO do not do this + Finished: time.Now().Unix(), + } + defer func() { + proclogger.Debug(). + Msg("update step status") + + if uerr := r.client.Update(ctx, work.ID, procState); uerr != nil { + proclogger.Debug(). + Err(uerr). + Msg("update step status error") + } + + proclogger.Debug(). + Msg("update step status complete") + }() + if state.Process.Exited { + return nil + } + if state.Pipeline.Step.Environment == nil { + state.Pipeline.Step.Environment = map[string]string{} + } + + state.Pipeline.Step.Environment["DRONE_MACHINE"] = r.hostname + state.Pipeline.Step.Environment["CI_BUILD_STATUS"] = "success" + state.Pipeline.Step.Environment["CI_BUILD_STARTED"] = strconv.FormatInt(state.Pipeline.Time, 10) + state.Pipeline.Step.Environment["CI_BUILD_FINISHED"] = strconv.FormatInt(time.Now().Unix(), 10) + state.Pipeline.Step.Environment["DRONE_BUILD_STATUS"] = "success" + state.Pipeline.Step.Environment["DRONE_BUILD_STARTED"] = strconv.FormatInt(state.Pipeline.Time, 10) + state.Pipeline.Step.Environment["DRONE_BUILD_FINISHED"] = strconv.FormatInt(time.Now().Unix(), 10) + + state.Pipeline.Step.Environment["CI_JOB_STATUS"] = "success" + state.Pipeline.Step.Environment["CI_JOB_STARTED"] = strconv.FormatInt(state.Pipeline.Time, 10) + state.Pipeline.Step.Environment["CI_JOB_FINISHED"] = strconv.FormatInt(time.Now().Unix(), 10) + state.Pipeline.Step.Environment["DRONE_JOB_STATUS"] = "success" + state.Pipeline.Step.Environment["DRONE_JOB_STARTED"] = strconv.FormatInt(state.Pipeline.Time, 10) + state.Pipeline.Step.Environment["DRONE_JOB_FINISHED"] = strconv.FormatInt(time.Now().Unix(), 10) + + if state.Pipeline.Error != nil { + state.Pipeline.Step.Environment["CI_BUILD_STATUS"] = "failure" + state.Pipeline.Step.Environment["CI_JOB_STATUS"] = "failure" + state.Pipeline.Step.Environment["DRONE_BUILD_STATUS"] = "failure" + state.Pipeline.Step.Environment["DRONE_JOB_STATUS"] = "failure" + } + return nil + }) + + err = pipeline.New(work.Config, + pipeline.WithContext(ctx), + pipeline.WithLogger(defaultLogger), + pipeline.WithTracer(defaultTracer), + pipeline.WithEngine(*r.engine), + ).Run() + + state.Finished = time.Now().Unix() + state.Exited = true + if err != nil { + switch xerr := err.(type) { + case *pipeline.ExitError: + state.ExitCode = xerr.Code + default: + state.ExitCode = 1 + state.Error = err.Error() + } + if cancelled.IsSet() { + state.ExitCode = 137 + } + } + + logger.Debug(). + Str("error", state.Error). + Int("exit_code", state.ExitCode). + Msg("pipeline complete") + + logger.Debug(). + Msg("uploading logs") + + uploads.Wait() + + logger.Debug(). + Msg("uploading logs complete") + + logger.Debug(). + Str("error", state.Error). + Int("exit_code", state.ExitCode). + Msg("updating pipeline status") + + err = r.client.Done(ctx, work.ID, state) + if err != nil { + logger.Error().Err(err). + Msg("updating pipeline status failed") + } else { + logger.Debug(). + Msg("updating pipeline status complete") + } + + return nil +} + +// extract repository name from the configuration +func extractRepositoryName(config *backend.Config) string { + return config.Stages[0].Steps[0].Environment["DRONE_REPO"] +} + +// extract build number from the configuration +func extractBuildNumber(config *backend.Config) string { + return config.Stages[0].Steps[0].Environment["DRONE_BUILD_NUMBER"] +} diff --git a/runner/stats.go b/runner/stats.go new file mode 100644 index 000000000..cd5fbcb4d --- /dev/null +++ b/runner/stats.go @@ -0,0 +1,65 @@ +package runner + +import ( + "encoding/json" + "io" + "sync" + "time" +) + +type State struct { + sync.Mutex `json:"-"` + Polling int `json:"polling_count"` + Running int `json:"running_count"` + Metadata map[string]Info `json:"running"` +} + +type Info struct { + ID string `json:"id"` + Repo string `json:"repository"` + Build string `json:"build_number"` + Started time.Time `json:"build_started"` + Timeout time.Duration `json:"build_timeout"` +} + +func (s *State) Add(id string, timeout time.Duration, repo, build string) { + s.Lock() + s.Polling-- + s.Running++ + s.Metadata[id] = Info{ + ID: id, + Repo: repo, + Build: build, + Timeout: timeout, + Started: time.Now().UTC(), + } + s.Unlock() +} + +func (s *State) Done(id string) { + s.Lock() + s.Polling++ + s.Running-- + delete(s.Metadata, id) + s.Unlock() +} + +func (s *State) Healthy() bool { + s.Lock() + defer s.Unlock() + now := time.Now() + buf := time.Hour // 1 hour buffer + for _, item := range s.Metadata { + if now.After(item.Started.Add(item.Timeout).Add(buf)) { + return false + } + } + return true +} + +func (s *State) WriteTo(w io.Writer) (int, error) { + s.Lock() + out, _ := json.Marshal(s) + s.Unlock() + return w.Write(out) +}