Kubernetes backend integrated to the server

This commit is contained in:
Laszlo Fogas 2019-04-13 16:30:56 +02:00
parent 7691b2b897
commit bc7d8c9bb5
2 changed files with 248 additions and 32 deletions

View file

@ -18,14 +18,19 @@ import (
"context" "context"
"crypto/tls" "crypto/tls"
"errors" "errors"
"net" "net"
"net/http" "net/http"
"net/url" "net/url"
"os" "os"
"path/filepath" "path/filepath"
"strconv"
"strings" "strings"
"time" "time"
"github.com/laszlocph/drone-oss-08/cncd/pipeline/pipeline"
"github.com/laszlocph/drone-oss-08/cncd/pipeline/pipeline/backend"
kubernetes "github.com/laszlocph/drone-oss-08/cncd/pipeline/pipeline/backend/kubernetes"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/keepalive" "google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"
@ -34,6 +39,8 @@ import (
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"github.com/laszlocph/drone-oss-08/cncd/logging" "github.com/laszlocph/drone-oss-08/cncd/logging"
"github.com/laszlocph/drone-oss-08/cncd/pipeline/pipeline/interrupt"
"github.com/laszlocph/drone-oss-08/cncd/pipeline/pipeline/rpc"
"github.com/laszlocph/drone-oss-08/cncd/pipeline/pipeline/rpc/proto" "github.com/laszlocph/drone-oss-08/cncd/pipeline/pipeline/rpc/proto"
"github.com/laszlocph/drone-oss-08/cncd/pubsub" "github.com/laszlocph/drone-oss-08/cncd/pubsub"
"github.com/laszlocph/drone-oss-08/plugins/sender" "github.com/laszlocph/drone-oss-08/plugins/sender"
@ -42,6 +49,8 @@ import (
"github.com/laszlocph/drone-oss-08/router/middleware" "github.com/laszlocph/drone-oss-08/router/middleware"
droneserver "github.com/laszlocph/drone-oss-08/server" droneserver "github.com/laszlocph/drone-oss-08/server"
"github.com/laszlocph/drone-oss-08/store" "github.com/laszlocph/drone-oss-08/store"
"github.com/rs/zerolog/log"
"github.com/tevino/abool"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/gin-gonic/contrib/ginrus" "github.com/gin-gonic/contrib/ginrus"
@ -49,7 +58,20 @@ import (
oldcontext "golang.org/x/net/context" oldcontext "golang.org/x/net/context"
) )
// NOTE we need to limit the size of the logs and files that we upload.
// The maximum grpc payload size is 4194304. So until we implement streaming
// for uploads, we need to set these limits below the maximum.
const (
maxLogsUpload = 2000000 // this is per step
maxFileUpload = 1000000
)
var flags = []cli.Flag{ var flags = []cli.Flag{
cli.StringFlag{
EnvVar: "TEST_RUNE",
Name: "test.run",
Usage: "VSCode sets this flag",
},
cli.BoolFlag{ cli.BoolFlag{
EnvVar: "DRONE_DEBUG", EnvVar: "DRONE_DEBUG",
Name: "debug", Name: "debug",
@ -500,6 +522,11 @@ var flags = []cli.Flag{
Name: "keepalive-min-time", Name: "keepalive-min-time",
Usage: "server-side enforcement policy on the minimum amount of time a client should wait before sending a keepalive ping.", Usage: "server-side enforcement policy on the minimum amount of time a client should wait before sending a keepalive ping.",
}, },
cli.BoolFlag{
EnvVar: "DRONE_KUBERNETES",
Name: "kubernetes",
Usage: "Kubernetes backend is enabled",
},
} }
func server(c *cli.Context) error { func server(c *cli.Context) error {
@ -552,9 +579,177 @@ func server(c *cli.Context) error {
var g errgroup.Group var g errgroup.Group
// start the grpc server if c.Bool("kubernetes") {
g.Go(func() error { workEngine := droneserver.NewRPC(remote_, droneserver.Config.Services.Queue, droneserver.Config.Services.Pubsub, droneserver.Config.Services.Logs, store_)
g.Go(func() error {
logrus.Infoln("Starting Kubernetes backend")
for {
sigterm := abool.New()
ctx := context.Background()
ctx = interrupt.WithContextFunc(ctx, func() {
println("ctrl+c received, terminating process")
sigterm.Set()
})
for {
if sigterm.IsSet() {
return nil
}
log.Print("pipeline: request next execution\n")
work, err := workEngine.Next(ctx, rpc.NoFilter)
if err != nil {
logrus.Error(err)
return err
}
log.Printf("pipeline: received next execution: %s", work.ID)
go func() {
logger := log.With().
Str("repo", extractRepositoryName(work.Config)). // hack
Str("build", extractBuildNumber(work.Config)). // hack
Str("id", work.ID).
Logger()
engine, err := kubernetes.New()
if err != nil {
logrus.Error(err)
return
}
timeout := time.Hour
if minutes := work.Timeout; minutes != 0 {
timeout = time.Duration(minutes) * time.Minute
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
cancelled := abool.New()
go func() {
werr := workEngine.Wait(ctx, work.ID)
if werr != nil {
cancelled.SetTo(true) // TODO verify error is really an error
log.Printf("pipeline: cancel signal received: %s: %s", work.ID, werr)
cancel()
} else {
log.Printf("pipeline: cancel channel closed: %s", work.ID)
}
}()
go func() {
for {
select {
case <-ctx.Done():
log.Printf("pipeline: cancel ping loop: %s", work.ID)
return
case <-time.After(time.Minute):
log.Printf("pipeline: ping queue: %s", work.ID)
workEngine.Extend(ctx, work.ID)
}
}
}()
state := rpc.State{}
state.Started = time.Now().Unix()
err = workEngine.Init(context.Background(), work.ID, state)
if err != nil {
log.Printf("pipeline: error signaling pipeline init: %s: %s", work.ID, err)
}
uploads, defaultLogger := agent.DefaultLogger(logger, *work, r.client, ctxmeta)
defaultTracer := pipeline.TraceFunc(func(state *pipeline.State) error {
procState := rpc.State{
Proc: state.Pipeline.Step.Alias,
Exited: state.Process.Exited,
ExitCode: state.Process.ExitCode,
Started: time.Now().Unix(), // TODO do not do this
Finished: time.Now().Unix(),
}
defer func() {
if uerr := workEngine.Update(context.Background(), work.ID, procState); uerr != nil {
log.Printf("Pipeine: error updating pipeline step status: %s: %s: %s", work.ID, procState.Proc, uerr)
}
}()
if state.Process.Exited {
return nil
}
if state.Pipeline.Step.Environment == nil {
state.Pipeline.Step.Environment = map[string]string{}
}
state.Pipeline.Step.Environment["CI_BUILD_STATUS"] = "success"
state.Pipeline.Step.Environment["CI_BUILD_STARTED"] = strconv.FormatInt(state.Pipeline.Time, 10)
state.Pipeline.Step.Environment["CI_BUILD_FINISHED"] = strconv.FormatInt(time.Now().Unix(), 10)
state.Pipeline.Step.Environment["CI_JOB_STATUS"] = "success"
state.Pipeline.Step.Environment["CI_JOB_STARTED"] = strconv.FormatInt(state.Pipeline.Time, 10)
state.Pipeline.Step.Environment["CI_JOB_FINISHED"] = strconv.FormatInt(time.Now().Unix(), 10)
if state.Pipeline.Error != nil {
state.Pipeline.Step.Environment["CI_BUILD_STATUS"] = "failure"
state.Pipeline.Step.Environment["CI_JOB_STATUS"] = "failure"
}
return nil
})
err = pipeline.New(work.Config,
pipeline.WithContext(ctx),
pipeline.WithLogger(defaultLogger),
pipeline.WithTracer(defaultTracer),
pipeline.WithEngine(engine),
).Run()
state.Finished = time.Now().Unix()
state.Exited = true
if err != nil {
switch xerr := err.(type) {
case *pipeline.ExitError:
state.ExitCode = xerr.Code
default:
state.ExitCode = 1
state.Error = err.Error()
}
if cancelled.IsSet() {
state.ExitCode = 137
}
}
logger.Debug().
Str("error", state.Error).
Int("exit_code", state.ExitCode).
Msg("pipeline complete")
logger.Debug().
Msg("uploading logs")
uploads.Wait()
logger.Debug().
Msg("uploading logs complete")
logger.Debug().
Str("error", state.Error).
Int("exit_code", state.ExitCode).
Msg("updating pipeline status")
err = workEngine.Done(ctx, work.ID, state)
if err != nil {
logger.Error().Err(err).
Msg("updating pipeline status failed")
} else {
logger.Debug().
Msg("updating pipeline status complete")
}
}()
}
}
})
} else {
g.Go(func() error {
logrus.Infoln("Starting GRPC Agent backend")
lis, err := net.Listen("tcp", ":9000") lis, err := net.Listen("tcp", ":9000")
if err != nil { if err != nil {
logrus.Error(err) logrus.Error(err)
@ -586,6 +781,7 @@ func server(c *cli.Context) error {
} }
return nil return nil
}) })
}
// start the server with tls enabled // start the server with tls enabled
if c.String("server-cert") != "" { if c.String("server-cert") != "" {
@ -750,3 +946,13 @@ func cacheDir() string {
} }
return filepath.Join(os.Getenv("HOME"), ".cache", base) return filepath.Join(os.Getenv("HOME"), ".cache", base)
} }
// extract repository name from the configuration
func extractRepositoryName(config *backend.Config) string {
return config.Stages[0].Steps[0].Environment["DRONE_REPO"]
}
// extract build number from the configuration
func extractBuildNumber(config *backend.Config) string {
return config.Stages[0].Steps[0].Environment["DRONE_BUILD_NUMBER"]
}

View file

@ -100,6 +100,16 @@ type RPC struct {
host string host string
} }
func NewRPC(remote_ remote.Remote, queue_ queue.Queue, pubsub_ pubsub.Publisher, logger_ logging.Log, store_ store.Store) RPC {
return RPC{
remote: remote_,
store: store_,
queue: queue_,
pubsub: pubsub_,
logger: logger_,
}
}
// Next implements the rpc.Next function // Next implements the rpc.Next function
func (s *RPC) Next(c context.Context, filter rpc.Filter) (*rpc.Pipeline, error) { func (s *RPC) Next(c context.Context, filter rpc.Filter) (*rpc.Pipeline, error) {
metadata, ok := metadata.FromContext(c) metadata, ok := metadata.FromContext(c)