From bc7d8c9bb5714860aa56eca49a794737c36e3fbb Mon Sep 17 00:00:00 2001 From: Laszlo Fogas Date: Sat, 13 Apr 2019 16:30:56 +0200 Subject: [PATCH] Kubernetes backend integrated to the server --- cmd/drone-server/server.go | 270 ++++++++++++++++++++++++++++++++----- server/rpc.go | 10 ++ 2 files changed, 248 insertions(+), 32 deletions(-) diff --git a/cmd/drone-server/server.go b/cmd/drone-server/server.go index 9c3673368..f072b4e4d 100644 --- a/cmd/drone-server/server.go +++ b/cmd/drone-server/server.go @@ -18,14 +18,19 @@ import ( "context" "crypto/tls" "errors" + "net" "net/http" "net/url" "os" "path/filepath" + "strconv" "strings" "time" + "github.com/laszlocph/drone-oss-08/cncd/pipeline/pipeline" + "github.com/laszlocph/drone-oss-08/cncd/pipeline/pipeline/backend" + kubernetes "github.com/laszlocph/drone-oss-08/cncd/pipeline/pipeline/backend/kubernetes" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" @@ -34,6 +39,8 @@ import ( "golang.org/x/sync/errgroup" "github.com/laszlocph/drone-oss-08/cncd/logging" + "github.com/laszlocph/drone-oss-08/cncd/pipeline/pipeline/interrupt" + "github.com/laszlocph/drone-oss-08/cncd/pipeline/pipeline/rpc" "github.com/laszlocph/drone-oss-08/cncd/pipeline/pipeline/rpc/proto" "github.com/laszlocph/drone-oss-08/cncd/pubsub" "github.com/laszlocph/drone-oss-08/plugins/sender" @@ -42,6 +49,8 @@ import ( "github.com/laszlocph/drone-oss-08/router/middleware" droneserver "github.com/laszlocph/drone-oss-08/server" "github.com/laszlocph/drone-oss-08/store" + "github.com/rs/zerolog/log" + "github.com/tevino/abool" "github.com/Sirupsen/logrus" "github.com/gin-gonic/contrib/ginrus" @@ -49,7 +58,20 @@ import ( oldcontext "golang.org/x/net/context" ) +// NOTE we need to limit the size of the logs and files that we upload. +// The maximum grpc payload size is 4194304. So until we implement streaming +// for uploads, we need to set these limits below the maximum. +const ( + maxLogsUpload = 2000000 // this is per step + maxFileUpload = 1000000 +) + var flags = []cli.Flag{ + cli.StringFlag{ + EnvVar: "TEST_RUNE", + Name: "test.run", + Usage: "VSCode sets this flag", + }, cli.BoolFlag{ EnvVar: "DRONE_DEBUG", Name: "debug", @@ -500,6 +522,11 @@ var flags = []cli.Flag{ Name: "keepalive-min-time", Usage: "server-side enforcement policy on the minimum amount of time a client should wait before sending a keepalive ping.", }, + cli.BoolFlag{ + EnvVar: "DRONE_KUBERNETES", + Name: "kubernetes", + Usage: "Kubernetes backend is enabled", + }, } func server(c *cli.Context) error { @@ -552,40 +579,209 @@ func server(c *cli.Context) error { var g errgroup.Group - // start the grpc server - g.Go(func() error { + if c.Bool("kubernetes") { + workEngine := droneserver.NewRPC(remote_, droneserver.Config.Services.Queue, droneserver.Config.Services.Pubsub, droneserver.Config.Services.Logs, store_) - lis, err := net.Listen("tcp", ":9000") - if err != nil { - logrus.Error(err) - return err - } - auther := &authorizer{ - password: c.String("agent-secret"), - } - s := grpc.NewServer( - grpc.StreamInterceptor(auther.streamInterceptor), - grpc.UnaryInterceptor(auther.unaryIntercaptor), - grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ - MinTime: c.Duration("keepalive-min-time"), - }), - ) - ss := new(droneserver.DroneServer) - ss.Queue = droneserver.Config.Services.Queue - ss.Logger = droneserver.Config.Services.Logs - ss.Pubsub = droneserver.Config.Services.Pubsub - ss.Remote = remote_ - ss.Store = store_ - ss.Host = droneserver.Config.Server.Host - proto.RegisterDroneServer(s, ss) + g.Go(func() error { + logrus.Infoln("Starting Kubernetes backend") + for { + sigterm := abool.New() + ctx := context.Background() + ctx = interrupt.WithContextFunc(ctx, func() { + println("ctrl+c received, terminating process") + sigterm.Set() + }) - err = s.Serve(lis) - if err != nil { - logrus.Error(err) - return err - } - return nil - }) + for { + if sigterm.IsSet() { + return nil + } + + log.Print("pipeline: request next execution\n") + + work, err := workEngine.Next(ctx, rpc.NoFilter) + if err != nil { + logrus.Error(err) + return err + } + log.Printf("pipeline: received next execution: %s", work.ID) + + go func() { + logger := log.With(). + Str("repo", extractRepositoryName(work.Config)). // hack + Str("build", extractBuildNumber(work.Config)). // hack + Str("id", work.ID). + Logger() + + engine, err := kubernetes.New() + if err != nil { + logrus.Error(err) + return + } + + timeout := time.Hour + if minutes := work.Timeout; minutes != 0 { + timeout = time.Duration(minutes) * time.Minute + } + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + cancelled := abool.New() + go func() { + werr := workEngine.Wait(ctx, work.ID) + if werr != nil { + cancelled.SetTo(true) // TODO verify error is really an error + log.Printf("pipeline: cancel signal received: %s: %s", work.ID, werr) + cancel() + } else { + log.Printf("pipeline: cancel channel closed: %s", work.ID) + } + }() + + go func() { + for { + select { + case <-ctx.Done(): + log.Printf("pipeline: cancel ping loop: %s", work.ID) + return + case <-time.After(time.Minute): + log.Printf("pipeline: ping queue: %s", work.ID) + workEngine.Extend(ctx, work.ID) + } + } + }() + + state := rpc.State{} + state.Started = time.Now().Unix() + err = workEngine.Init(context.Background(), work.ID, state) + if err != nil { + log.Printf("pipeline: error signaling pipeline init: %s: %s", work.ID, err) + } + + uploads, defaultLogger := agent.DefaultLogger(logger, *work, r.client, ctxmeta) + + defaultTracer := pipeline.TraceFunc(func(state *pipeline.State) error { + procState := rpc.State{ + Proc: state.Pipeline.Step.Alias, + Exited: state.Process.Exited, + ExitCode: state.Process.ExitCode, + Started: time.Now().Unix(), // TODO do not do this + Finished: time.Now().Unix(), + } + defer func() { + if uerr := workEngine.Update(context.Background(), work.ID, procState); uerr != nil { + log.Printf("Pipeine: error updating pipeline step status: %s: %s: %s", work.ID, procState.Proc, uerr) + } + }() + if state.Process.Exited { + return nil + } + if state.Pipeline.Step.Environment == nil { + state.Pipeline.Step.Environment = map[string]string{} + } + state.Pipeline.Step.Environment["CI_BUILD_STATUS"] = "success" + state.Pipeline.Step.Environment["CI_BUILD_STARTED"] = strconv.FormatInt(state.Pipeline.Time, 10) + state.Pipeline.Step.Environment["CI_BUILD_FINISHED"] = strconv.FormatInt(time.Now().Unix(), 10) + + state.Pipeline.Step.Environment["CI_JOB_STATUS"] = "success" + state.Pipeline.Step.Environment["CI_JOB_STARTED"] = strconv.FormatInt(state.Pipeline.Time, 10) + state.Pipeline.Step.Environment["CI_JOB_FINISHED"] = strconv.FormatInt(time.Now().Unix(), 10) + + if state.Pipeline.Error != nil { + state.Pipeline.Step.Environment["CI_BUILD_STATUS"] = "failure" + state.Pipeline.Step.Environment["CI_JOB_STATUS"] = "failure" + } + return nil + }) + + err = pipeline.New(work.Config, + pipeline.WithContext(ctx), + pipeline.WithLogger(defaultLogger), + pipeline.WithTracer(defaultTracer), + pipeline.WithEngine(engine), + ).Run() + + state.Finished = time.Now().Unix() + state.Exited = true + if err != nil { + switch xerr := err.(type) { + case *pipeline.ExitError: + state.ExitCode = xerr.Code + default: + state.ExitCode = 1 + state.Error = err.Error() + } + if cancelled.IsSet() { + state.ExitCode = 137 + } + } + + logger.Debug(). + Str("error", state.Error). + Int("exit_code", state.ExitCode). + Msg("pipeline complete") + + logger.Debug(). + Msg("uploading logs") + + uploads.Wait() + + logger.Debug(). + Msg("uploading logs complete") + + logger.Debug(). + Str("error", state.Error). + Int("exit_code", state.ExitCode). + Msg("updating pipeline status") + + err = workEngine.Done(ctx, work.ID, state) + if err != nil { + logger.Error().Err(err). + Msg("updating pipeline status failed") + } else { + logger.Debug(). + Msg("updating pipeline status complete") + } + }() + } + } + }) + } else { + g.Go(func() error { + logrus.Infoln("Starting GRPC Agent backend") + lis, err := net.Listen("tcp", ":9000") + if err != nil { + logrus.Error(err) + return err + } + auther := &authorizer{ + password: c.String("agent-secret"), + } + s := grpc.NewServer( + grpc.StreamInterceptor(auther.streamInterceptor), + grpc.UnaryInterceptor(auther.unaryIntercaptor), + grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ + MinTime: c.Duration("keepalive-min-time"), + }), + ) + ss := new(droneserver.DroneServer) + ss.Queue = droneserver.Config.Services.Queue + ss.Logger = droneserver.Config.Services.Logs + ss.Pubsub = droneserver.Config.Services.Pubsub + ss.Remote = remote_ + ss.Store = store_ + ss.Host = droneserver.Config.Server.Host + proto.RegisterDroneServer(s, ss) + + err = s.Serve(lis) + if err != nil { + logrus.Error(err) + return err + } + return nil + }) + } // start the server with tls enabled if c.String("server-cert") != "" { @@ -750,3 +946,13 @@ func cacheDir() string { } return filepath.Join(os.Getenv("HOME"), ".cache", base) } + +// extract repository name from the configuration +func extractRepositoryName(config *backend.Config) string { + return config.Stages[0].Steps[0].Environment["DRONE_REPO"] +} + +// extract build number from the configuration +func extractBuildNumber(config *backend.Config) string { + return config.Stages[0].Steps[0].Environment["DRONE_BUILD_NUMBER"] +} diff --git a/server/rpc.go b/server/rpc.go index 383e61639..c5b0fb22b 100644 --- a/server/rpc.go +++ b/server/rpc.go @@ -100,6 +100,16 @@ type RPC struct { host string } +func NewRPC(remote_ remote.Remote, queue_ queue.Queue, pubsub_ pubsub.Publisher, logger_ logging.Log, store_ store.Store) RPC { + return RPC{ + remote: remote_, + store: store_, + queue: queue_, + pubsub: pubsub_, + logger: logger_, + } +} + // Next implements the rpc.Next function func (s *RPC) Next(c context.Context, filter rpc.Filter) (*rpc.Pipeline, error) { metadata, ok := metadata.FromContext(c)