diff --git a/drone/agent/agent.go b/drone/agent/agent.go index dc94b57df..3ea456b77 100644 --- a/drone/agent/agent.go +++ b/drone/agent/agent.go @@ -1,6 +1,8 @@ package agent import ( + "fmt" + "math" "os" "os/signal" "strings" @@ -60,7 +62,7 @@ var AgentCmd = cli.Command{ Value: "amd64", }, cli.StringFlag{ - EnvVar: "DRONE_SERVER", + EnvVar: "DRONE_SERVER,DRONE_ENDPOINT", Name: "drone-server", Usage: "drone server address", Value: "ws://localhost:8000/ws/broker", @@ -138,11 +140,55 @@ var AgentCmd = cli.Command{ Name: "extension", Usage: "custom plugin extension endpoint", }, + + // + // + // + + cli.BoolFlag{ + EnvVar: "DRONE_CANARY", + Name: "canary", + Usage: "enable experimental features at your own risk", + }, + + // cli.StringFlag{ + // Name: "endpoint", + // EnvVar: "DRONE_ENDPOINT,DRONE_SERVER", + // Value: "ws://localhost:9999/ws/rpc", + // }, + // cli.DurationFlag{ + // Name: "backoff", + // EnvVar: "DRONE_BACKOFF", + // Value: time.Second * 15, + // }, + cli.IntFlag{ + Name: "retry-limit", + EnvVar: "DRONE_RETRY_LIMIT", + Value: math.MaxInt32, + }, + cli.IntFlag{ + Name: "max-procs", + EnvVar: "DRONE_MAX_PROCS", + Value: 1, + }, + cli.StringFlag{ + Name: "platform", + EnvVar: "DRONE_PLATFORM", + Value: "linux/amd64", + }, }, } func start(c *cli.Context) { + if c.Bool("canary") { + if err := loop(c); err != nil { + fmt.Println(err) + os.Exit(1) + } + return + } + log := redlog.New(os.Stderr) log.SetLevel(0) logger.SetLogger(log) @@ -187,7 +233,7 @@ func start(c *cli.Context) { client.Ack(m.Ack) }() - r := pipeline{ + r := pipelinet{ drone: client, docker: docker, config: config{ diff --git a/drone/agent/exec.go b/drone/agent/exec.go index db486deca..268253420 100644 --- a/drone/agent/exec.go +++ b/drone/agent/exec.go @@ -22,13 +22,13 @@ type config struct { extension []string } -type pipeline struct { +type pipelinet struct { drone *stomp.Client docker dockerclient.Client config config } -func (r *pipeline) run(w *model.Work) { +func (r *pipelinet) run(w *model.Work) { // defer func() { // // r.drone.Ack(id, opts) diff --git a/drone/agent/exp.go b/drone/agent/exp.go new file mode 100644 index 000000000..57e252cc7 --- /dev/null +++ b/drone/agent/exp.go @@ -0,0 +1,191 @@ +package agent + +import ( + "context" + "io" + "log" + "net/url" + "sync" + "time" + + "github.com/cncd/pipeline/pipeline" + "github.com/cncd/pipeline/pipeline/backend" + "github.com/cncd/pipeline/pipeline/backend/docker" + "github.com/cncd/pipeline/pipeline/interrupt" + "github.com/cncd/pipeline/pipeline/multipart" + "github.com/cncd/pipeline/pipeline/rpc" + + "github.com/codegangsta/cli" + "github.com/tevino/abool" +) + +func loop(c *cli.Context) error { + endpoint, err := url.Parse( + c.String("drone-server"), + ) + if err != nil { + return err + } + + client, err := rpc.NewClient( + endpoint.String(), + rpc.WithRetryLimit( + c.Int("retry-limit"), + ), + rpc.WithBackoff( + c.Duration("backoff"), + ), + rpc.WithToken( + c.String("drone-secret"), + ), + ) + if err != nil { + return err + } + defer client.Close() + + sigterm := abool.New() + ctx := context.Background() + ctx = interrupt.WithContextFunc(ctx, func() { + println("ctrl+c received, terminating process") + sigterm.Set() + }) + + var wg sync.WaitGroup + parallel := c.Int("max-procs") + wg.Add(parallel) + + for i := 0; i < parallel; i++ { + go func() { + defer wg.Done() + for { + if sigterm.IsSet() { + return + } + if err := run(ctx, client); err != nil { + log.Printf("build runner encountered error: exiting: %s", err) + return + } + } + }() + } + + wg.Wait() + return nil +} + +func run(ctx context.Context, client rpc.Peer) error { + log.Println("pipeline: request next execution") + + // get the next job from the queue + work, err := client.Next(ctx) + if err != nil { + return err + } + if work == nil { + return nil + } + log.Printf("pipeline: received next execution: %s", work.ID) + + // new docker engine + engine, err := docker.NewEnv() + if err != nil { + return err + } + + timeout := time.Hour + if minutes := work.Timeout; minutes != 0 { + timeout = time.Duration(minutes) * time.Minute + } + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + cancelled := abool.New() + go func() { + ok, _ := client.Notify(ctx, work.ID) + if ok { + cancelled.SetTo(true) + log.Printf("pipeline: cancel signal received: %s", work.ID) + cancel() + } else { + log.Printf("pipeline: cancel channel closed: %s", work.ID) + } + }() + + go func() { + for { + select { + case <-ctx.Done(): + log.Printf("pipeline: cancel ping loop: %s", work.ID) + return + case <-time.After(time.Minute): + log.Printf("pipeline: ping queue: %s", work.ID) + client.Extend(ctx, work.ID) + } + } + }() + + state := rpc.State{} + state.Started = time.Now().Unix() + err = client.Update(context.Background(), work.ID, state) + if err != nil { + log.Printf("pipeline: error updating pipeline status: %s: %s", work.ID, err) + } + + defaultLogger := pipeline.LogFunc(func(proc *backend.Step, rc multipart.Reader) error { + part, rerr := rc.NextPart() + if rerr != nil { + return rerr + } + writer := rpc.NewLineWriter(client, work.ID, proc.Alias) + io.Copy(writer, part) + + defer func() { + log.Printf("pipeline: finish uploading logs: %s: step %s", work.ID, proc.Alias) + }() + + part, rerr = rc.NextPart() + if rerr != nil { + return nil + } + mime := part.Header().Get("Content-Type") + if serr := client.Save(context.Background(), work.ID, mime, part); serr != nil { + log.Printf("pipeline: cannot upload artifact: %s: %s: %s", work.ID, mime, serr) + } + return nil + }) + + err = pipeline.New(work.Config, + pipeline.WithContext(ctx), + pipeline.WithLogger(defaultLogger), + pipeline.WithTracer(pipeline.DefaultTracer), + pipeline.WithEngine(engine), + ).Run() + + state.Finished = time.Now().Unix() + state.Exited = true + if err != nil { + state.Error = err.Error() + if xerr, ok := err.(*pipeline.ExitError); ok { + state.ExitCode = xerr.Code + } + if xerr, ok := err.(*pipeline.OomError); ok { + state.ExitCode = xerr.Code + } + if cancelled.IsSet() { + state.ExitCode = 130 + } else if state.ExitCode == 0 { + state.ExitCode = 1 + } + } + + log.Printf("pipeline: execution complete: %s", work.ID) + + err = client.Update(context.Background(), work.ID, state) + if err != nil { + log.Printf("Pipeine: error updating pipeline status: %s: %s", work.ID, err) + } + + return nil +} diff --git a/router/router.go b/router/router.go index f5def78db..a8948faf0 100644 --- a/router/router.go +++ b/router/router.go @@ -2,6 +2,7 @@ package router import ( "net/http" + "os" "github.com/gin-gonic/gin" @@ -119,19 +120,46 @@ func Load(middleware ...gin.HandlerFunc) http.Handler { badges.GET("/cc.xml", server.GetCC) } - e.POST("/hook", server.PostHook) - e.POST("/api/hook", server.PostHook) + if os.Getenv("DRONE_CANARY") == "" { + e.POST("/hook", server.PostHook) + e.POST("/api/hook", server.PostHook) + } else { + e.POST("/hook", server.PostHook2) + e.POST("/api/hook", server.PostHook2) + } - ws := e.Group("/ws") - { - ws.GET("/broker", server.Broker) - ws.GET("/feed", server.EventStream) - ws.GET("/logs/:owner/:name/:build/:number", - session.SetRepo(), - session.SetPerm(), - session.MustPull, - server.LogStream, - ) + if os.Getenv("DRONE_CANARY") == "" { + ws := e.Group("/ws") + { + ws.GET("/broker", server.Broker) + ws.GET("/feed", server.EventStream) + ws.GET("/logs/:owner/:name/:build/:number", + session.SetRepo(), + session.SetPerm(), + session.MustPull, + server.LogStream, + ) + } + } else { + ws := e.Group("/ws") + { + ws.GET("/broker", server.RPCHandler) + ws.GET("/rpc", server.RPCHandler) + ws.GET("/feed", server.EventStream2) + ws.GET("/logs/:owner/:name/:build/:number", + session.SetRepo(), + session.SetPerm(), + session.MustPull, + server.LogStream2, + ) + } + info := e.Group("/api/info") + { + info.GET("/queue", + session.MustAdmin(), + server.GetQueueInfo, + ) + } } auth := e.Group("/authorize") diff --git a/server/hook.go b/server/hook.go index 9f87437e6..27b6d8d35 100644 --- a/server/hook.go +++ b/server/hook.go @@ -1,9 +1,13 @@ package server import ( + "context" + "encoding/json" "fmt" + "net/url" "regexp" "strconv" + "time" "github.com/gin-gonic/gin" "github.com/square/go-jose" @@ -15,7 +19,18 @@ import ( "github.com/drone/drone/shared/token" "github.com/drone/drone/store" "github.com/drone/drone/yaml" + "github.com/drone/envsubst" "github.com/drone/mq/stomp" + + "github.com/cncd/pipeline/pipeline/backend" + "github.com/cncd/pipeline/pipeline/frontend" + yaml2 "github.com/cncd/pipeline/pipeline/frontend/yaml" + "github.com/cncd/pipeline/pipeline/frontend/yaml/compiler" + "github.com/cncd/pipeline/pipeline/frontend/yaml/linter" + "github.com/cncd/pipeline/pipeline/frontend/yaml/matrix" + "github.com/cncd/pipeline/pipeline/rpc" + "github.com/cncd/pubsub" + "github.com/cncd/queue" ) var skipRe = regexp.MustCompile(`\[(?i:ci *skip|skip *ci)\]`) @@ -105,8 +120,8 @@ func PostHook(c *gin.Context) { // a small number of people will probably be upset by this, I'm not sure // it is actually that big of a deal. if len(build.Email) == 0 { - author, err := store.GetUserLogin(c, build.Author) - if err == nil { + author, uerr := store.GetUserLogin(c, build.Author) + if uerr == nil { build.Email = author.Email } } @@ -164,9 +179,9 @@ func PostHook(c *gin.Context) { log.Debugf("cannot parse .drone.yml.sig file. empty file") } else { build.Signed = true - output, err := signature.Verify([]byte(repo.Hash)) - if err != nil { - log.Debugf("cannot verify .drone.yml.sig file. %s", err) + output, verr := signature.Verify([]byte(repo.Hash)) + if verr != nil { + log.Debugf("cannot verify .drone.yml.sig file. %s", verr) } else if string(output) != string(raw) { log.Debugf("cannot verify .drone.yml.sig file. no match") } else { @@ -212,7 +227,7 @@ func PostHook(c *gin.Context) { } client := stomp.MustFromContext(c) - client.SendJSON("/topic/events", model.Event{ + client.SendJSON("topic/events", model.Event{ Type: model.Enqueued, Repo: *repo, Build: *build, @@ -247,3 +262,388 @@ func PostHook(c *gin.Context) { } } + +// +// CANARY IMPLEMENTATION +// +// This file is a complete disaster because I'm trying to wedge in some +// experimental code. Please pardon our appearance during renovations. +// + +func GetQueueInfo(c *gin.Context) { + c.IndentedJSON(200, + config.queue.Info(c), + ) +} + +func PostHook2(c *gin.Context) { + remote_ := remote.FromContext(c) + + tmprepo, build, err := remote_.Hook(c.Request) + if err != nil { + log.Errorf("failure to parse hook. %s", err) + c.AbortWithError(400, err) + return + } + if build == nil { + c.Writer.WriteHeader(200) + return + } + if tmprepo == nil { + log.Errorf("failure to ascertain repo from hook.") + c.Writer.WriteHeader(400) + return + } + + // skip the build if any case-insensitive combination of the words "skip" and "ci" + // wrapped in square brackets appear in the commit message + skipMatch := skipRe.FindString(build.Message) + if len(skipMatch) > 0 { + log.Infof("ignoring hook. %s found in %s", skipMatch, build.Commit) + c.Writer.WriteHeader(204) + return + } + + repo, err := store.GetRepoOwnerName(c, tmprepo.Owner, tmprepo.Name) + if err != nil { + log.Errorf("failure to find repo %s/%s from hook. %s", tmprepo.Owner, tmprepo.Name, err) + c.AbortWithError(404, err) + return + } + + // get the token and verify the hook is authorized + parsed, err := token.ParseRequest(c.Request, func(t *token.Token) (string, error) { + return repo.Hash, nil + }) + if err != nil { + log.Errorf("failure to parse token from hook for %s. %s", repo.FullName, err) + c.AbortWithError(400, err) + return + } + if parsed.Text != repo.FullName { + log.Errorf("failure to verify token from hook. Expected %s, got %s", repo.FullName, parsed.Text) + c.AbortWithStatus(403) + return + } + + if repo.UserID == 0 { + log.Warnf("ignoring hook. repo %s has no owner.", repo.FullName) + c.Writer.WriteHeader(204) + return + } + var skipped = true + if (build.Event == model.EventPush && repo.AllowPush) || + (build.Event == model.EventPull && repo.AllowPull) || + (build.Event == model.EventDeploy && repo.AllowDeploy) || + (build.Event == model.EventTag && repo.AllowTag) { + skipped = false + } + + if skipped { + log.Infof("ignoring hook. repo %s is disabled for %s events.", repo.FullName, build.Event) + c.Writer.WriteHeader(204) + return + } + + user, err := store.GetUser(c, repo.UserID) + if err != nil { + log.Errorf("failure to find repo owner %s. %s", repo.FullName, err) + c.AbortWithError(500, err) + return + } + + // if there is no email address associated with the pull request, + // we lookup the email address based on the authors github login. + // + // my initial hesitation with this code is that it has the ability + // to expose your email address. At the same time, your email address + // is already exposed in the public .git log. So while some people will + // a small number of people will probably be upset by this, I'm not sure + // it is actually that big of a deal. + if len(build.Email) == 0 { + author, uerr := store.GetUserLogin(c, build.Author) + if uerr == nil { + build.Email = author.Email + } + } + + // if the remote has a refresh token, the current access token + // may be stale. Therefore, we should refresh prior to dispatching + // the job. + if refresher, ok := remote_.(remote.Refresher); ok { + ok, _ := refresher.Refresh(user) + if ok { + store.UpdateUser(c, user) + } + } + + // fetch the build file from the database + cfg := ToConfig(c) + raw, err := remote_.File(user, repo, build, cfg.Yaml) + if err != nil { + log.Errorf("failure to get build config for %s. %s", repo.FullName, err) + c.AbortWithError(404, err) + return + } + sec, err := remote_.File(user, repo, build, cfg.Shasum) + if err != nil { + log.Debugf("cannot find build secrets for %s. %s", repo.FullName, err) + // NOTE we don't exit on failure. The sec file is optional + } + + axes, err := matrix.Parse(raw) + if err != nil { + c.String(500, "Failed to parse yaml file or calculate matrix. %s", err) + return + } + if len(axes) == 0 { + axes = append(axes, matrix.Axis{}) + } + + netrc, err := remote_.Netrc(user, repo) + if err != nil { + c.String(500, "Failed to generate netrc file. %s", err) + return + } + + // verify the branches can be built vs skipped + branches, err := yaml2.ParseBytes(raw) + if err != nil { + c.String(500, "Failed to parse yaml file. %s", err) + return + } + if !branches.Branches.Match(build.Branch) && build.Event != model.EventTag && build.Event != model.EventDeploy { + c.String(200, "Branch does not match restrictions defined in yaml") + return + } + + signature, err := jose.ParseSigned(string(sec)) + if err != nil { + log.Debugf("cannot parse .drone.yml.sig file. %s", err) + } else if len(sec) == 0 { + log.Debugf("cannot parse .drone.yml.sig file. empty file") + } else { + build.Signed = true + output, verr := signature.Verify([]byte(repo.Hash)) + if verr != nil { + log.Debugf("cannot verify .drone.yml.sig file. %s", verr) + } else if string(output) != string(raw) { + log.Debugf("cannot verify .drone.yml.sig file. no match") + } else { + build.Verified = true + } + } + + // update some build fields + build.Status = model.StatusPending + build.RepoID = repo.ID + + // and use a transaction + var jobs []*model.Job + for num, axis := range axes { + jobs = append(jobs, &model.Job{ + BuildID: build.ID, + Number: num + 1, + Status: model.StatusPending, + Environment: axis, + }) + } + err = store.CreateBuild(c, build, jobs...) + if err != nil { + log.Errorf("failure to save commit for %s. %s", repo.FullName, err) + c.AbortWithError(500, err) + return + } + + c.JSON(200, build) + + uri := fmt.Sprintf("%s/%s/%d", httputil.GetURL(c.Request), repo.FullName, build.Number) + err = remote_.Status(user, repo, build, uri) + if err != nil { + log.Errorf("error setting commit status for %s/%d", repo.FullName, build.Number) + } + + // get the previous build so that we can send + // on status change notifications + last, _ := store.GetBuildLastBefore(c, repo, build.Branch, build.ID) + secs, err := store.GetMergedSecretList(c, repo) + if err != nil { + log.Debugf("Error getting secrets for %s#%d. %s", repo.FullName, build.Number, err) + } + + // + // new code here + // + + message := pubsub.Message{} + message.Data, _ = json.Marshal(model.Event{ + Type: model.Enqueued, + Repo: *repo, + Build: *build, + }) + message.Labels = map[string]string{ + "repo": repo.FullName, + "private": strconv.FormatBool(repo.IsPrivate), + } + // TODO remove global reference + config.pubsub.Publish(c, "topic/events", message) + + // + // workspace + // + + var ( + link, _ = url.Parse(repo.Link) + base = "/drone" + path = "src/" + link.Host + "/" + repo.FullName + ) + + for _, job := range jobs { + + metadata := metadataFromStruct(repo, build, last, job, "linux/amd64") + environ := metadata.Environ() + + secrets := map[string]string{} + for _, sec := range secs { + if !sec.MatchEvent(build.Event) { + continue + } + if build.Verified || sec.SkipVerify { + secrets[sec.Name] = sec.Value + } + } + sub := func(name string) string { + if v, ok := environ[name]; ok { + return v + } + return secrets[name] + } + if s, err := envsubst.Eval(string(raw), sub); err != nil { + raw = []byte(s) + } + parsed, err := yaml2.ParseBytes(raw) + if err != nil { + // TODO + } + + lerr := linter.New( + linter.WithTrusted(repo.IsTrusted), + ).Lint(parsed) + if lerr != nil { + // TODO + } + + ir := compiler.New( + compiler.WithEnviron(environ), + compiler.WithEscalated("plugins/docker", "plugins/gcr", "plugins/ecr"), + compiler.WithLocal(false), + compiler.WithNetrc(netrc.Login, netrc.Password, netrc.Machine), + compiler.WithPrefix( + fmt.Sprintf( + "%d_%d", + job.ID, + time.Now().Unix(), + ), + ), + compiler.WithProxy(), + compiler.WithVolumes(), // todo set global volumes + compiler.WithWorkspace(base, path), + ).Compile(parsed) + + task := new(queue.Task) + task.ID = fmt.Sprint(job.ID) + task.Labels = map[string]string{} + task.Labels["platform"] = "linux/amd64" + // TODO set proper platform + // TODO set proper labels + task.Data, _ = json.Marshal(rpc.Pipeline{ + ID: fmt.Sprint(job.ID), + Config: ir, + Timeout: repo.Timeout, + }) + + config.logger.Open(context.Background(), task.ID) + config.queue.Push(context.Background(), task) + } +} + +// use helper funciton to return ([]backend.Config, error) + +type builder struct { + secs []*model.Secret + repo *model.Repo + build *model.Build + last *model.Build + jobs []*model.Job + link string +} + +func (b *builder) Build() ([]*backend.Config, error) { + + return nil, nil +} + +// return the metadata from the cli context. +func metadataFromStruct(repo *model.Repo, build, last *model.Build, job *model.Job, link string) frontend.Metadata { + return frontend.Metadata{ + Repo: frontend.Repo{ + Name: repo.Name, + Link: repo.Link, + Remote: repo.Clone, + Private: repo.IsPrivate, + }, + Curr: frontend.Build{ + Number: build.Number, + Created: build.Created, + Started: build.Started, + Finished: build.Finished, + Status: build.Status, + Event: build.Event, + Link: build.Link, + Target: build.Deploy, + Commit: frontend.Commit{ + Sha: build.Commit, + Ref: build.Ref, + Refspec: build.Refspec, + Branch: build.Branch, + Message: build.Message, + Author: frontend.Author{ + Name: build.Author, + Email: build.Email, + Avatar: build.Avatar, + }, + }, + }, + Prev: frontend.Build{ + Number: last.Number, + Created: last.Created, + Started: last.Started, + Finished: last.Finished, + Status: last.Status, + Event: last.Event, + Link: last.Link, + Target: last.Deploy, + Commit: frontend.Commit{ + Sha: last.Commit, + Ref: last.Ref, + Refspec: last.Refspec, + Branch: last.Branch, + Message: last.Message, + Author: frontend.Author{ + Name: last.Author, + Email: last.Email, + Avatar: last.Avatar, + }, + }, + }, + Job: frontend.Job{ + Number: job.Number, + Matrix: job.Environment, + }, + Sys: frontend.System{ + Name: "drone", + Link: link, + Arch: "linux/amd64", + }, + } +} diff --git a/server/rpc.go b/server/rpc.go new file mode 100644 index 000000000..f15107d54 --- /dev/null +++ b/server/rpc.go @@ -0,0 +1,221 @@ +package server + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "log" + "os" + "strconv" + + "github.com/Sirupsen/logrus" + "github.com/cncd/logging" + "github.com/cncd/pipeline/pipeline/rpc" + "github.com/cncd/pubsub" + "github.com/cncd/queue" + "github.com/gin-gonic/gin" + + "github.com/drone/drone/model" + "github.com/drone/drone/remote" + "github.com/drone/drone/store" +) + +// This file is a complete disaster because I'm trying to wedge in some +// experimental code. Please pardon our appearance during renovations. + +var config = struct { + pubsub pubsub.Publisher + queue queue.Queue + logger logging.Log + secret string + host string +}{ + pubsub.New(), + queue.New(), + logging.New(), + os.Getenv("DRONE_SECRET"), + os.Getenv("DRONE_HOST"), +} + +func init() { + config.pubsub.Create(context.Background(), "topic/events") +} + +// func SetupRPC() gin.HandlerFunc { +// return func(c *gin.Context) { +// c.Next() +// } +// } + +func RPCHandler(c *gin.Context) { + + fmt.Println(c.Request.Header.Write(os.Stdout)) + if secret := c.Request.Header.Get("Authorization"); secret != "Bearer "+config.secret { + log.Printf("Unable to connect agent. Invalid authorization token %q does not match %q", secret, config.secret) + c.String(401, "Unable to connect agent. Invalid authorization token") + return + } + peer := RPC{ + remote: remote.FromContext(c), + store: store.FromContext(c), + queue: config.queue, + pubsub: config.pubsub, + logger: config.logger, + host: config.host, + } + rpc.NewServer(&peer).ServeHTTP(c.Writer, c.Request) +} + +type RPC struct { + remote remote.Remote + queue queue.Queue + pubsub pubsub.Publisher + logger logging.Log + store store.Store + host string +} + +// Next implements the rpc.Next function +func (s *RPC) Next(c context.Context) (*rpc.Pipeline, error) { + filter := func(*queue.Task) bool { return true } + task, err := s.queue.Poll(c, filter) + if err != nil { + return nil, err + } else if task == nil { + return nil, nil + } + pipeline := new(rpc.Pipeline) + err = json.Unmarshal(task.Data, pipeline) + return pipeline, err +} + +// Notify implements the rpc.Notify function +func (s *RPC) Notify(c context.Context, id string) (bool, error) { + err := s.queue.Wait(c, id) + return (err == queue.ErrCancel), nil +} + +// Extend implements the rpc.Extend function +func (s *RPC) Extend(c context.Context, id string) error { + return s.queue.Extend(c, id) +} + +// Update implements the rpc.Update function +func (s *RPC) Update(c context.Context, id string, state rpc.State) error { + jobID, err := strconv.ParseInt(id, 10, 64) + if err != nil { + return err + } + + job, err := s.store.GetJob(jobID) + if err != nil { + log.Printf("error: cannot find job with id %d: %s", jobID, err) + return err + } + + build, err := s.store.GetBuild(job.BuildID) + if err != nil { + log.Printf("error: cannot find build with id %d: %s", job.BuildID, err) + return err + } + + repo, err := s.store.GetRepo(build.RepoID) + if err != nil { + log.Printf("error: cannot find repo with id %d: %s", build.RepoID, err) + return err + } + + if build.Status != model.StatusRunning { + + } + + job.Started = state.Started + job.Finished = state.Finished + job.ExitCode = state.ExitCode + job.Status = model.StatusRunning + job.Error = state.Error + + if build.Status == model.StatusPending { + build.Started = job.Started + build.Status = model.StatusRunning + s.store.UpdateBuild(build) + } + + log.Printf("pipeline: update %s: exited=%v, exit_code=%d", id, state.Exited, state.ExitCode) + + if state.Exited { + + job.Status = model.StatusSuccess + if job.ExitCode != 0 || job.Error != "" { + job.Status = model.StatusFailure + } + + // save the logs + var buf bytes.Buffer + if serr := s.logger.Snapshot(context.Background(), id, &buf); serr != nil { + log.Printf("error: snapshotting logs: %s", serr) + } + if werr := s.store.WriteLog(job, &buf); werr != nil { + log.Printf("error: persisting logs: %s", werr) + } + + // close the logger + s.logger.Close(c, id) + s.queue.Done(c, id) + } + + // hackity hack + cc := context.WithValue(c, "store", s.store) + ok, uerr := store.UpdateBuildJob(cc, build, job) + if uerr != nil { + log.Printf("error: updating job: %s", uerr) + } + if ok { + // get the user because we transfer the user form the server to agent + // and back we lose the token which does not get serialized to json. + user, uerr := s.store.GetUser(repo.UserID) + if uerr != nil { + logrus.Errorf("Unable to find user. %s", err) + } else { + s.remote.Status(user, repo, build, + fmt.Sprintf("%s/%s/%d", s.host, repo.FullName, build.Number)) + } + } + + message := pubsub.Message{} + message.Data, _ = json.Marshal(model.Event{ + Type: func() model.EventType { + // HACK we don't even really care about the event type. + // so we should just simplify how events are triggered. + // WTF was this being used for????????????????????????? + if job.Status == model.StatusRunning { + return model.Started + } + return model.Finished + }(), + Repo: *repo, + Build: *build, + Job: *job, + }) + message.Labels = map[string]string{ + "repo": repo.FullName, + "private": strconv.FormatBool(repo.IsPrivate), + } + s.pubsub.Publish(c, "topic/events", message) + log.Println("finish rpc.update") + return nil +} + +// Save implements the rpc.Save function +func (s *RPC) Save(c context.Context, id, mime string, file io.Reader) error { return nil } + +// Log implements the rpc.Log function +func (s *RPC) Log(c context.Context, id string, line *rpc.Line) error { + entry := new(logging.Entry) + entry.Data, _ = json.Marshal(line) + fmt.Println(string(entry.Data)) + s.logger.Write(c, id, entry) + return nil +} diff --git a/server/stream.go b/server/stream.go index 1640a1b9a..dad1b46b8 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1,10 +1,13 @@ package server import ( + "context" "fmt" "strconv" "time" + "github.com/cncd/logging" + "github.com/cncd/pubsub" "github.com/drone/drone/cache" "github.com/drone/drone/model" "github.com/drone/drone/router/middleware/session" @@ -194,3 +197,158 @@ func reader(ws *websocket.Conn) { } } } + +// +// CANARY IMPLEMENTATION +// +// This file is a complete disaster because I'm trying to wedge in some +// experimental code. Please pardon our appearance during renovations. +// + +func LogStream2(c *gin.Context) { + repo := session.Repo(c) + buildn, _ := strconv.Atoi(c.Param("build")) + jobn, _ := strconv.Atoi(c.Param("number")) + + build, err := store.GetBuildNumber(c, repo, buildn) + if err != nil { + logrus.Debugln("stream cannot get build number.", err) + c.AbortWithError(404, err) + return + } + job, err := store.GetJobNumber(c, build, jobn) + if err != nil { + logrus.Debugln("stream cannot get job number.", err) + c.AbortWithError(404, err) + return + } + if job.Status != model.StatusRunning { + logrus.Debugln("stream not found.") + c.AbortWithStatus(404) + return + } + + ws, err := upgrader.Upgrade(c.Writer, c.Request, nil) + if err != nil { + if _, ok := err.(websocket.HandshakeError); !ok { + logrus.Errorf("Cannot upgrade websocket. %s", err) + } + return + } + logrus.Debugf("Successfull upgraded websocket") + + ticker := time.NewTicker(pingPeriod) + logc := make(chan []byte, 10) + + ctx, cancel := context.WithCancel( + context.Background(), + ) + defer func() { + cancel() + ticker.Stop() + close(logc) + logrus.Debugf("Successfully closing websocket") + }() + + go func() { + // TODO remove global variable + config.logger.Tail(ctx, fmt.Sprint(job.ID), func(entries ...*logging.Entry) { + for _, entry := range entries { + select { + case <-ctx.Done(): + return + default: + logc <- entry.Data + } + } + }) + cancel() + }() + + go func() { + for { + select { + case <-ctx.Done(): + return + case buf, ok := <-logc: + if ok { + ws.SetWriteDeadline(time.Now().Add(writeWait)) + ws.WriteMessage(websocket.TextMessage, buf) + } + case <-ticker.C: + err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(writeWait)) + if err != nil { + return + } + } + } + }() + reader(ws) +} + +func EventStream2(c *gin.Context) { + ws, err := upgrader.Upgrade(c.Writer, c.Request, nil) + if err != nil { + if _, ok := err.(websocket.HandshakeError); !ok { + logrus.Errorf("Cannot upgrade websocket. %s", err) + } + return + } + logrus.Debugf("Successfull upgraded websocket") + + user := session.User(c) + repo := map[string]bool{} + if user != nil { + repo, _ = cache.GetRepoMap(c, user) + } + + ticker := time.NewTicker(pingPeriod) + eventc := make(chan []byte, 10) + + ctx, cancel := context.WithCancel( + context.Background(), + ) + defer func() { + cancel() + ticker.Stop() + close(eventc) + logrus.Debugf("Successfully closing websocket") + }() + + go func() { + // TODO remove this from global config + config.pubsub.Subscribe(c, "topic/events", func(m pubsub.Message) { + name := m.Labels["repo"] + priv := m.Labels["private"] + if repo[name] || priv == "false" { + select { + case <-ctx.Done(): + return + default: + eventc <- m.Data + } + } + }) + cancel() + }() + + go func() { + for { + select { + case <-ctx.Done(): + return + case buf, ok := <-eventc: + if ok { + ws.SetWriteDeadline(time.Now().Add(writeWait)) + ws.WriteMessage(websocket.TextMessage, buf) + } + case <-ticker.C: + err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(writeWait)) + if err != nil { + return + } + } + } + }() + reader(ws) +} diff --git a/vendor/github.com/cncd/logging/LICENSE b/vendor/github.com/cncd/logging/LICENSE new file mode 100644 index 000000000..64e202179 --- /dev/null +++ b/vendor/github.com/cncd/logging/LICENSE @@ -0,0 +1,29 @@ +BSD 3-Clause License + +Copyright (c) 2017, Brad Rydzewski +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +* Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +* Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +* Neither the name of the copyright holder nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/github.com/cncd/logging/README b/vendor/github.com/cncd/logging/README new file mode 100644 index 000000000..d84cf1273 --- /dev/null +++ b/vendor/github.com/cncd/logging/README @@ -0,0 +1,6 @@ +Go package provides a common interface for storing and streaming logs. + +Documentation: + + http://godoc.org/github.com/cncd/logging + http://godoc.org/github.com/cncd/logging/gcp diff --git a/vendor/github.com/cncd/logging/log.go b/vendor/github.com/cncd/logging/log.go new file mode 100644 index 000000000..1bb53a2e4 --- /dev/null +++ b/vendor/github.com/cncd/logging/log.go @@ -0,0 +1,143 @@ +package logging + +import ( + "context" + "io" + "sync" +) + +// TODO (bradrydzewski) writing to subscribers is currently a blocking +// operation and does not protect against slow clients from locking +// the stream. This should be resolved. + +// TODO (bradrydzewski) implement a mux.Info to fetch information and +// statistics for the multiplexier. Streams, subscribers, etc +// mux.Info() + +// TODO (bradrydzewski) refactor code to place publisher and subscriber +// operations in separate files with more encapsulated logic. +// sub.push() +// sub.join() +// sub.start()... event loop + +type subscriber struct { + handler Handler +} + +type stream struct { + sync.Mutex + + path string + hist []*Entry + subs map[*subscriber]struct{} + done chan struct{} + wait sync.WaitGroup +} + +type log struct { + sync.Mutex + + streams map[string]*stream +} + +// New returns a new logger. +func New() Log { + return &log{ + streams: map[string]*stream{}, + } +} + +func (l *log) Open(c context.Context, path string) error { + l.Lock() + _, ok := l.streams[path] + if !ok { + l.streams[path] = &stream{ + path: path, + subs: make(map[*subscriber]struct{}), + done: make(chan struct{}), + } + } + l.Unlock() + return nil +} + +func (l *log) Write(c context.Context, path string, entry *Entry) error { + l.Lock() + s, ok := l.streams[path] + l.Unlock() + if !ok { + return ErrNotFound + } + s.Lock() + s.hist = append(s.hist, entry) + for sub := range s.subs { + go sub.handler(entry) + } + s.Unlock() + return nil +} + +func (l *log) Tail(c context.Context, path string, handler Handler) error { + l.Lock() + s, ok := l.streams[path] + l.Unlock() + if !ok { + return ErrNotFound + } + + sub := &subscriber{ + handler: handler, + } + s.Lock() + if len(s.hist) != 0 { + sub.handler(s.hist...) + } + s.subs[sub] = struct{}{} + s.Unlock() + + select { + case <-c.Done(): + case <-s.done: + } + + s.Lock() + delete(s.subs, sub) + s.Unlock() + return nil +} + +func (l *log) Close(c context.Context, path string) error { + l.Lock() + s, ok := l.streams[path] + l.Unlock() + if !ok { + return ErrNotFound + } + + s.Lock() + close(s.done) + s.Unlock() + + l.Lock() + delete(l.streams, path) + l.Unlock() + return nil +} + +func (l *log) Snapshot(c context.Context, path string, w io.Writer) error { + l.Lock() + s, ok := l.streams[path] + l.Unlock() + if !ok { + return ErrNotFound + } + s.Lock() + for _, entry := range s.hist { + w.Write(entry.Data) + w.Write(cr) + } + s.Unlock() + return nil +} + +var cr = []byte{'\n'} diff --git a/vendor/github.com/cncd/logging/logging.go b/vendor/github.com/cncd/logging/logging.go new file mode 100644 index 000000000..8fa1ff24e --- /dev/null +++ b/vendor/github.com/cncd/logging/logging.go @@ -0,0 +1,80 @@ +package logging + +import ( + "context" + "errors" + "io" +) + +// ErrNotFound is returned when the log does not exist. +var ErrNotFound = errors.New("stream: not found") + +// Entry defines a log entry. +type Entry struct { + // ID identifies this message. + ID string `json:"id,omitempty"` + + // Data is the actual data in the entry. + Data []byte `json:"data"` + + // Tags represents the key-value pairs the + // entry is tagged with. + Tags map[string]string `json:"tags,omitempty"` +} + +// Handler defines a callback function for handling log entries. +type Handler func(...*Entry) + +// Log defines a log multiplexer. +type Log interface { + // Open opens the log. + Open(c context.Context, path string) error + + // Write writes the entry to the log. + Write(c context.Context, path string, entry *Entry) error + + // Tail tails the log. + Tail(c context.Context, path string, handler Handler) error + + // Close closes the log. + Close(c context.Context, path string) error + + // Snapshot snapshots the stream to Writer w. + Snapshot(c context.Context, path string, w io.Writer) error + + // Info returns runtime information about the multiplexer. + // Info(c context.Context) (interface{}, error) +} + +// // global streamer +// var global = New() +// +// // Set sets a default global logger. +// func Set(log Log) { +// global = log +// } +// +// // Open opens the log stream. +// func Open(c context.Context, path string) error { +// return global.Open(c, path) +// } +// +// // Write writes the log entry to the stream. +// func Write(c context.Context, path string, entry *Entry) error { +// return global.Write(c, path, entry) +// } +// +// // Tail tails the log stream. +// func Tail(c context.Context, path string, handler Handler) error { +// return global.Tail(c, path, handler) +// } +// +// // Close closes the log stream. +// func Close(c context.Context, path string) error { +// return global.Close(c, path) +// } +// +// // Snapshot snapshots the stream to Writer w. +// func Snapshot(c context.Context, path string, w io.Writer) error { +// return global.Snapshot(c, path, w) +// } diff --git a/vendor/github.com/cncd/pipeline/LICENSE b/vendor/github.com/cncd/pipeline/LICENSE new file mode 100644 index 000000000..64e202179 --- /dev/null +++ b/vendor/github.com/cncd/pipeline/LICENSE @@ -0,0 +1,29 @@ +BSD 3-Clause License + +Copyright (c) 2017, Brad Rydzewski +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +* Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +* Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +* Neither the name of the copyright holder nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/github.com/cncd/pipeline/pipeline/backend/backend.go b/vendor/github.com/cncd/pipeline/pipeline/backend/backend.go new file mode 100644 index 000000000..ef8331097 --- /dev/null +++ b/vendor/github.com/cncd/pipeline/pipeline/backend/backend.go @@ -0,0 +1,21 @@ +package backend + +import "io" + +// Engine defines a container orchestration backend and is used +// to create and manage container resources. +type Engine interface { + // Setup the pipeline environment. + Setup(*Config) error + // Start the pipeline step. + Exec(*Step) error + // Kill the pipeline step. + Kill(*Step) error + // Wait for the pipeline step to complete and returns + // the completion results. + Wait(*Step) (*State, error) + // Tail the pipeline step logs. + Tail(*Step) (io.ReadCloser, error) + // Destroy the pipeline environment. + Destroy(*Config) error +} diff --git a/vendor/github.com/cncd/pipeline/pipeline/backend/docker/convert.go b/vendor/github.com/cncd/pipeline/pipeline/backend/docker/convert.go new file mode 100644 index 000000000..440ce965f --- /dev/null +++ b/vendor/github.com/cncd/pipeline/pipeline/backend/docker/convert.go @@ -0,0 +1,130 @@ +package docker + +import ( + "encoding/base64" + "encoding/json" + "strings" + + "github.com/cncd/pipeline/pipeline/backend" + + "github.com/docker/docker/api/types/container" +) + +// returns a container configuration. +func toConfig(proc *backend.Step) *container.Config { + config := &container.Config{ + Image: proc.Image, + Labels: proc.Labels, + WorkingDir: proc.WorkingDir, + AttachStdout: true, + AttachStderr: true, + } + if len(proc.Environment) != 0 { + config.Env = toEnv(proc.Environment) + } + if len(proc.Command) != 0 { + config.Cmd = proc.Command + } + if len(proc.Entrypoint) != 0 { + config.Entrypoint = proc.Entrypoint + } + if len(proc.Volumes) != 0 { + config.Volumes = toVol(proc.Volumes) + } + return config +} + +// returns a container host configuration. +func toHostConfig(proc *backend.Step) *container.HostConfig { + config := &container.HostConfig{ + Resources: container.Resources{ + CPUQuota: proc.CPUQuota, + CPUShares: proc.CPUShares, + CpusetCpus: proc.CPUSet, + Memory: proc.MemLimit, + MemorySwap: proc.MemSwapLimit, + }, + Privileged: proc.Privileged, + ShmSize: proc.ShmSize, + } + // if len(proc.VolumesFrom) != 0 { + // config.VolumesFrom = proc.VolumesFrom + // } + // if len(proc.Network) != 0 { + // config.NetworkMode = container.NetworkMode( + // proc.Network, + // ) + // } + if len(proc.DNS) != 0 { + config.DNS = proc.DNS + } + if len(proc.DNSSearch) != 0 { + config.DNSSearch = proc.DNSSearch + } + if len(proc.ExtraHosts) != 0 { + config.ExtraHosts = proc.ExtraHosts + } + if len(proc.Devices) != 0 { + config.Devices = toDev(proc.Devices) + } + if len(proc.Volumes) != 0 { + config.Binds = proc.Volumes + } + // if proc.OomKillDisable { + // config.OomKillDisable = &proc.OomKillDisable + // } + + return config +} + +// helper function that converts a slice of volume paths to a set of +// unique volume names. +func toVol(paths []string) map[string]struct{} { + set := map[string]struct{}{} + for _, path := range paths { + parts := strings.Split(path, ":") + if len(parts) < 2 { + continue + } + set[parts[1]] = struct{}{} + } + return set +} + +// helper function that converts a key value map of environment variables to a +// string slice in key=value format. +func toEnv(env map[string]string) []string { + var envs []string + for k, v := range env { + envs = append(envs, k+"="+v) + } + return envs +} + +// helper function that converts a slice of device paths to a slice of +// container.DeviceMapping. +func toDev(paths []string) []container.DeviceMapping { + var devices []container.DeviceMapping + for _, path := range paths { + parts := strings.Split(path, ":") + if len(parts) < 2 { + continue + } + devices = append(devices, container.DeviceMapping{ + PathOnHost: parts[0], + PathInContainer: parts[1], + CgroupPermissions: "rwm", + }) + } + return devices +} + +// helper function that serializes the auth configuration as JSON +// base64 payload. +func encodeAuthToBase64(authConfig backend.Auth) (string, error) { + buf, err := json.Marshal(authConfig) + if err != nil { + return "", err + } + return base64.URLEncoding.EncodeToString(buf), nil +} diff --git a/vendor/github.com/cncd/pipeline/pipeline/backend/docker/docker.go b/vendor/github.com/cncd/pipeline/pipeline/backend/docker/docker.go new file mode 100644 index 000000000..e77b99a41 --- /dev/null +++ b/vendor/github.com/cncd/pipeline/pipeline/backend/docker/docker.go @@ -0,0 +1,202 @@ +package docker + +import ( + "context" + "io" + "io/ioutil" + + "github.com/cncd/pipeline/pipeline/backend" + + "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/network" + "github.com/docker/docker/api/types/volume" + "github.com/docker/docker/client" + "github.com/docker/docker/pkg/stdcopy" +) + +type engine struct { + client client.APIClient +} + +// New returns a new Docker Engine using the given client. +func New(cli client.APIClient) backend.Engine { + return &engine{ + client: cli, + } +} + +// NewEnv returns a new Docker Engine using the client connection +// environment variables. +func NewEnv() (backend.Engine, error) { + cli, err := client.NewEnvClient() + if err != nil { + return nil, err + } + return New(cli), nil +} + +func (e *engine) Setup(conf *backend.Config) error { + for _, vol := range conf.Volumes { + _, err := e.client.VolumeCreate(noContext, volume.VolumesCreateBody{ + Name: vol.Name, + Driver: vol.Driver, + DriverOpts: vol.DriverOpts, + // Labels: defaultLabels, + }) + if err != nil { + return err + } + } + for _, network := range conf.Networks { + _, err := e.client.NetworkCreate(noContext, network.Name, types.NetworkCreate{ + Driver: network.Driver, + Options: network.DriverOpts, + // Labels: defaultLabels, + }) + if err != nil { + return err + } + } + return nil +} + +func (e *engine) Exec(proc *backend.Step) error { + ctx := context.Background() + + config := toConfig(proc) + hostConfig := toHostConfig(proc) + + // create pull options with encoded authorization credentials. + pullopts := types.ImagePullOptions{} + if proc.AuthConfig.Username != "" && proc.AuthConfig.Password != "" { + pullopts.RegistryAuth, _ = encodeAuthToBase64(proc.AuthConfig) + } + + // automatically pull the latest version of the image if requested + // by the process configuration. + if proc.Pull { + rc, perr := e.client.ImagePull(ctx, config.Image, pullopts) + if perr == nil { + io.Copy(ioutil.Discard, rc) + rc.Close() + } + // fix for drone/drone#1917 + if perr != nil && proc.AuthConfig.Password != "" { + return perr + } + } + + _, err := e.client.ContainerCreate(ctx, config, hostConfig, nil, proc.Name) + if client.IsErrImageNotFound(err) { + // automatically pull and try to re-create the image if the + // failure is caused because the image does not exist. + rc, perr := e.client.ImagePull(ctx, config.Image, pullopts) + if perr != nil { + return perr + } + io.Copy(ioutil.Discard, rc) + rc.Close() + + _, err = e.client.ContainerCreate(ctx, config, hostConfig, nil, proc.Name) + } + if err != nil { + return err + } + + for _, net := range proc.Networks { + err = e.client.NetworkConnect(ctx, net.Name, proc.Name, &network.EndpointSettings{ + Aliases: net.Aliases, + }) + if err != nil { + return err + } + } + + // if proc.Network != "host" { // or bridge, overlay, none, internal, container: .... + // err = e.client.NetworkConnect(ctx, proc.Network, proc.Name, &network.EndpointSettings{ + // Aliases: proc.NetworkAliases, + // }) + // if err != nil { + // return err + // } + // } + + return e.client.ContainerStart(ctx, proc.Name, startOpts) +} + +func (e *engine) Kill(proc *backend.Step) error { + return e.client.ContainerKill(noContext, proc.Name, "9") +} + +func (e *engine) Wait(proc *backend.Step) (*backend.State, error) { + _, err := e.client.ContainerWait(noContext, proc.Name) + if err != nil { + // todo + } + + info, err := e.client.ContainerInspect(noContext, proc.Name) + if err != nil { + return nil, err + } + if info.State.Running { + // todo + } + + return &backend.State{ + Exited: true, + ExitCode: info.State.ExitCode, + OOMKilled: info.State.OOMKilled, + }, nil +} + +func (e *engine) Tail(proc *backend.Step) (io.ReadCloser, error) { + logs, err := e.client.ContainerLogs(noContext, proc.Name, logsOpts) + if err != nil { + return nil, err + } + rc, wc := io.Pipe() + + go func() { + stdcopy.StdCopy(wc, wc, logs) + logs.Close() + wc.Close() + rc.Close() + }() + return rc, nil +} + +func (e *engine) Destroy(conf *backend.Config) error { + for _, stage := range conf.Stages { + for _, step := range stage.Steps { + e.client.ContainerKill(noContext, step.Name, "9") + e.client.ContainerRemove(noContext, step.Name, removeOpts) + } + } + for _, volume := range conf.Volumes { + e.client.VolumeRemove(noContext, volume.Name, true) + } + for _, network := range conf.Networks { + e.client.NetworkRemove(noContext, network.Name) + } + return nil +} + +var ( + noContext = context.Background() + + startOpts = types.ContainerStartOptions{} + + removeOpts = types.ContainerRemoveOptions{ + RemoveVolumes: true, + RemoveLinks: false, + Force: false, + } + + logsOpts = types.ContainerLogsOptions{ + Follow: true, + ShowStdout: true, + ShowStderr: true, + Details: false, + Timestamps: false, + } +) diff --git a/vendor/github.com/cncd/pipeline/pipeline/backend/docker/pool.go b/vendor/github.com/cncd/pipeline/pipeline/backend/docker/pool.go new file mode 100644 index 000000000..37fd97afb --- /dev/null +++ b/vendor/github.com/cncd/pipeline/pipeline/backend/docker/pool.go @@ -0,0 +1,44 @@ +package docker + +// import ( +// "context" +// +// "github.com/cncd/pipeline/pipeline/backend" +// ) +// +// // Pool manages a pool of Docker clients. +// type Pool struct { +// queue chan (backend.Engine) +// } +// +// // NewPool returns a Pool. +// func NewPool(engines ...backend.Engine) *Pool { +// return &Pool{ +// queue: make(chan backend.Engine, len(engines)), +// } +// } +// +// // Reserve requests the next available Docker client in the pool. +// func (p *Pool) Reserve(c context.Context) backend.Engine { +// select { +// case <-c.Done(): +// case engine := <-p.queue: +// return engine +// } +// return nil +// } +// +// // Release releases the Docker client back to the pool. +// func (p *Pool) Release(engine backend.Engine) { +// p.queue <- engine +// } + +// pool := docker.Pool( +// docker.FromEnvironmentMust(), +// docker.FromEnvironmentMust(), +// docker.FromEnvironmentMust(), +// docker.FromEnvironmentMust(), +// ) +// +// client := pool.Reserve() +// defer pool.Release(client) diff --git a/vendor/github.com/cncd/pipeline/pipeline/backend/types.go b/vendor/github.com/cncd/pipeline/pipeline/backend/types.go new file mode 100644 index 000000000..df0f79745 --- /dev/null +++ b/vendor/github.com/cncd/pipeline/pipeline/backend/types.go @@ -0,0 +1,103 @@ +package backend + +type ( + // Config defines the runtime configuration of a pipeline. + Config struct { + Stages []*Stage `json:"pipeline"` // pipeline stages + Networks []*Network `json:"networks"` // network definitions + Volumes []*Volume `json:"volumes"` // volume definitions + } + + // Stage denotes a collection of one or more steps. + Stage struct { + Name string `json:"name,omitempty"` + Alias string `json:"alias,omitempty"` + Steps []*Step `json:"steps,omitempty"` + } + + // Step defines a container process. + Step struct { + Name string `json:"name"` + Alias string `json:"alias,omitempty"` + Image string `json:"image,omitempty"` + Pull bool `json:"pull,omitempty"` + Detached bool `json:"detach,omitempty"` + Privileged bool `json:"privileged,omitempty"` + WorkingDir string `json:"working_dir,omitempty"` + Environment map[string]string `json:"environment,omitempty"` + Labels map[string]string `json:"labels,omitempty"` + Entrypoint []string `json:"entrypoint,omitempty"` + Command []string `json:"command,omitempty"` + ExtraHosts []string `json:"extra_hosts,omitempty"` + Volumes []string `json:"volumes,omitempty"` + Devices []string `json:"devices,omitempty"` + Networks []Conn `json:"networks,omitempty"` + DNS []string `json:"dns,omitempty"` + DNSSearch []string `json:"dns_search,omitempty"` + MemSwapLimit int64 `json:"memswap_limit,omitempty"` + MemLimit int64 `json:"mem_limit,omitempty"` + ShmSize int64 `json:"shm_size,omitempty"` + CPUQuota int64 `json:"cpu_quota,omitempty"` + CPUShares int64 `json:"cpu_shares,omitempty"` + CPUSet string `json:"cpu_set,omitempty"` + OnFailure bool `json:"on_failure,omitempty"` + OnSuccess bool `json:"on_success,omitempty"` + AuthConfig Auth `json:"auth_config,omitempty"` + } + + // Auth defines registry authentication credentials. + Auth struct { + Username string `json:"username,omitempty"` + Password string `json:"password,omitempty"` + Email string `json:"email,omitempty"` + } + + // Conn defines a container network connection. + Conn struct { + Name string `json:"name"` + Aliases []string `json:"aliases"` + } + + // Network defines a container network. + Network struct { + Name string `json:"name,omitempty"` + Driver string `json:"driver,omitempty"` + DriverOpts map[string]string `json:"driver_opts,omitempty"` + } + + // Volume defines a container volume. + Volume struct { + Name string `json:"name,omitempty"` + Driver string `json:"driver,omitempty"` + DriverOpts map[string]string `json:"driver_opts,omitempty"` + } + + // State defines a container state. + State struct { + // Container exit code + ExitCode int `json:"exit_code"` + // Container exited, true or false + Exited bool `json:"exited"` + // Container is oom killed, true or false + OOMKilled bool `json:"oom_killed"` + } + + // // State defines the pipeline and process state. + // State struct { + // Pipeline struct { + // // Current pipeline step + // Step *Step `json:"step"` + // // Current pipeline error state + // Error error `json:"error"` + // } + // + // Process struct { + // // Container exit code + // ExitCode int `json:"exit_code"` + // // Container exited, true or false + // Exited bool `json:"exited"` + // // Container is oom killed, true or false + // OOMKilled bool `json:"oom_killed"` + // } + // } +) diff --git a/vendor/github.com/cncd/pipeline/pipeline/error.go b/vendor/github.com/cncd/pipeline/pipeline/error.go new file mode 100644 index 000000000..bee2ac029 --- /dev/null +++ b/vendor/github.com/cncd/pipeline/pipeline/error.go @@ -0,0 +1,38 @@ +package pipeline + +import ( + "errors" + "fmt" +) + +var ( + // ErrSkip is used as a return value when container execution should be + // skipped at runtime. It is not returned as an error by any function. + ErrSkip = errors.New("Skipped") + + // ErrCancel is used as a return value when the container execution receives + // a cancellation signal from the context. + ErrCancel = errors.New("Cancelled") +) + +// An ExitError reports an unsuccessful exit. +type ExitError struct { + Name string + Code int +} + +// Error returns the error message in string format. +func (e *ExitError) Error() string { + return fmt.Sprintf("%s : exit code %d", e.Name, e.Code) +} + +// An OomError reports the process received an OOMKill from the kernel. +type OomError struct { + Name string + Code int +} + +// Error reteurns the error message in string format. +func (e *OomError) Error() string { + return fmt.Sprintf("%s : received oom kill", e.Name) +} diff --git a/vendor/github.com/cncd/pipeline/pipeline/frontend/metadata.go b/vendor/github.com/cncd/pipeline/pipeline/frontend/metadata.go new file mode 100644 index 000000000..ec35206c2 --- /dev/null +++ b/vendor/github.com/cncd/pipeline/pipeline/frontend/metadata.go @@ -0,0 +1,121 @@ +package frontend + +import "strconv" + +type ( + // Metadata defines runtime m. + Metadata struct { + ID string `json:"id,omitempty"` + Repo Repo `json:"repo,omitempty"` + Curr Build `json:"curr,omitempty"` + Prev Build `json:"prev,omitempty"` + Job Job `json:"job,omitempty"` + Sys System `json:"sys,omitempty"` + } + + // Repo defines runtime metadata for a repository. + Repo struct { + Name string `json:"name,omitempty"` + Link string `json:"link,omitempty"` + Remote string `json:"remote,omitempty"` + Private bool `json:"private,omitempty"` + } + + // Build defines runtime metadata for a build. + Build struct { + Number int `json:"number,omitempty"` + Created int64 `json:"created,omitempty"` + Started int64 `json:"started,omitempty"` + Finished int64 `json:"finished,omitempty"` + Timeout int64 `json:"timeout,omitempty"` + Status string `json:"status,omitempty"` + Event string `json:"event,omitempty"` + Link string `json:"link,omitempty"` + Target string `json:"target,omitempty"` + Trusted bool `json:"trusted,omitempty"` + Commit Commit `json:"commit,omitempty"` + } + + // Commit defines runtime metadata for a commit. + Commit struct { + Sha string `json:"sha,omitempty"` + Ref string `json:"ref,omitempty"` + Refspec string `json:"refspec,omitempty"` + Branch string `json:"branch,omitempty"` + Message string `json:"message,omitempty"` + Author Author `json:"author,omitempty"` + } + + // Author defines runtime metadata for a commit author. + Author struct { + Name string `json:"name,omitempty"` + Email string `json:"email,omitempty"` + Avatar string `json:"avatar,omitempty"` + } + + // Job defines runtime metadata for a job. + Job struct { + Number int `json:"number,omitempty"` + Matrix map[string]string `json:"matrix,omitempty"` + } + + // System defines runtime metadata for a ci/cd system. + System struct { + Name string `json:"name,omitempty"` + Host string `json:"host,omitempty"` + Link string `json:"link,omitempty"` + Arch string `json:"arch,omitempty"` + } +) + +// Environ returns the metadata as a map of environment variables. +func (m *Metadata) Environ() map[string]string { + return map[string]string{ + "CI_REPO": m.Repo.Name, + "CI_REPO_NAME": m.Repo.Name, + "CI_REPO_LINK": m.Repo.Link, + "CI_REPO_REMOTE": m.Repo.Remote, + "CI_REMOTE_URL": m.Repo.Remote, + "CI_REPO_PRIVATE": strconv.FormatBool(m.Repo.Private), + "CI_BUILD_NUMBER": strconv.Itoa(m.Curr.Number), + "CI_BUILD_CREATED": strconv.FormatInt(m.Curr.Created, 10), + "CI_BUILD_STARTED": strconv.FormatInt(m.Curr.Started, 10), + "CI_BUILD_FINISHED": strconv.FormatInt(m.Curr.Finished, 10), + "CI_BUILD_STATUS": m.Curr.Status, + "CI_BUILD_EVENT": m.Curr.Event, + "CI_BUILD_LINK": m.Curr.Link, + "CI_BUILD_TARGET": m.Curr.Target, + "CI_COMMIT_SHA": m.Curr.Commit.Sha, + "CI_COMMIT_REF": m.Curr.Commit.Ref, + "CI_COMMIT_REFSPEC": m.Curr.Commit.Refspec, + "CI_COMMIT_BRANCH": m.Curr.Commit.Branch, + "CI_COMMIT_MESSAGE": m.Curr.Commit.Message, + "CI_COMMIT_AUTHOR": m.Curr.Commit.Author.Name, + "CI_COMMIT_AUTHOR_NAME": m.Curr.Commit.Author.Name, + "CI_COMMIT_AUTHOR_EMAIL": m.Curr.Commit.Author.Email, + "CI_COMMIT_AUTHOR_AVATAR": m.Curr.Commit.Author.Avatar, + "CI_PREV_BUILD_NUMBER": strconv.Itoa(m.Prev.Number), + "CI_PREV_BUILD_CREATED": strconv.FormatInt(m.Prev.Created, 10), + "CI_PREV_BUILD_STARTED": strconv.FormatInt(m.Prev.Started, 10), + "CI_PREV_BUILD_FINISHED": strconv.FormatInt(m.Prev.Finished, 10), + "CI_PREV_BUILD_STATUS": m.Prev.Status, + "CI_PREV_BUILD_EVENT": m.Prev.Event, + "CI_PREV_BUILD_LINK": m.Prev.Link, + "CI_PREV_COMMIT_SHA": m.Prev.Commit.Sha, + "CI_PREV_COMMIT_REF": m.Prev.Commit.Ref, + "CI_PREV_COMMIT_REFSPEC": m.Prev.Commit.Refspec, + "CI_PREV_COMMIT_BRANCH": m.Prev.Commit.Branch, + "CI_PREV_COMMIT_MESSAGE": m.Prev.Commit.Message, + "CI_PREV_COMMIT_AUTHOR": m.Prev.Commit.Author.Name, + "CI_PREV_COMMIT_AUTHOR_NAME": m.Prev.Commit.Author.Name, + "CI_PREV_COMMIT_AUTHOR_EMAIL": m.Prev.Commit.Author.Email, + "CI_PREV_COMMIT_AUTHOR_AVATAR": m.Prev.Commit.Author.Avatar, + "CI_JOB_NUMBER": strconv.Itoa(m.Job.Number), + "CI_SYSTEM": m.Sys.Name, + "CI_SYSTEM_NAME": m.Sys.Name, + "CI_SYSTEM_LINK": m.Sys.Link, + "CI_SYSTEM_HOST": m.Sys.Host, + "CI_SYSTEM_ARCH": m.Sys.Arch, + "CI": m.Sys.Name, + } +} diff --git a/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/compiler/compiler.go b/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/compiler/compiler.go new file mode 100644 index 000000000..a483a1826 --- /dev/null +++ b/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/compiler/compiler.go @@ -0,0 +1,167 @@ +package compiler + +import ( + "fmt" + + "github.com/cncd/pipeline/pipeline/backend" + "github.com/cncd/pipeline/pipeline/frontend" + "github.com/cncd/pipeline/pipeline/frontend/yaml" + // libcompose "github.com/docker/libcompose/yaml" +) + +// TODO(bradrydzewski) compiler should handle user-defined volumes from YAML +// TODO(bradrydzewski) compiler should handle user-defined networks from YAML + +// Compiler compiles the yaml +type Compiler struct { + local bool + escalated []string + prefix string + volumes []string + env map[string]string + base string + path string + metadata frontend.Metadata + aliases []string +} + +// New creates a new Compiler with options. +func New(opts ...Option) *Compiler { + compiler := new(Compiler) + compiler.env = map[string]string{} + for _, opt := range opts { + opt(compiler) + } + return compiler +} + +// Compile compiles the YAML configuration to the pipeline intermediate +// representation configuration format. +func (c *Compiler) Compile(conf *yaml.Config) *backend.Config { + config := new(backend.Config) + + // create a default volume + config.Volumes = append(config.Volumes, &backend.Volume{ + Name: fmt.Sprintf("%s_default", c.prefix), + Driver: "local", + }) + + // create a default network + config.Networks = append(config.Networks, &backend.Network{ + Name: fmt.Sprintf("%s_default", c.prefix), + Driver: "bridge", + }) + + // overrides the default workspace paths when specified + // in the YAML file. + if len(conf.Workspace.Base) != 0 { + c.base = conf.Workspace.Base + } + if len(conf.Workspace.Path) != 0 { + c.path = conf.Workspace.Path + } + + // add default clone step + if c.local == false && len(conf.Clone.Containers) == 0 { + container := &yaml.Container{ + Image: "plugins/git:latest", + Vargs: map[string]interface{}{"depth": "0"}, + } + name := fmt.Sprintf("%s_clone", c.prefix) + step := c.createProcess(name, container) + + stage := new(backend.Stage) + stage.Name = name + stage.Alias = "clone" + stage.Steps = append(stage.Steps, step) + + config.Stages = append(config.Stages, stage) + } else if c.local == false { + for i, container := range conf.Clone.Containers { + if !container.Constraints.Match(c.metadata) { + continue + } + stage := new(backend.Stage) + stage.Name = fmt.Sprintf("%s_clone_%v", c.prefix, i) + stage.Alias = container.Name + + name := fmt.Sprintf("%s_clone_%d", c.prefix, i) + step := c.createProcess(name, container) + stage.Steps = append(stage.Steps, step) + + config.Stages = append(config.Stages, stage) + } + } + + // add services steps + if len(conf.Services.Containers) != 0 { + stage := new(backend.Stage) + stage.Name = fmt.Sprintf("%s_services", c.prefix) + stage.Alias = "services" + + for _, container := range conf.Services.Containers { + c.aliases = append(c.aliases, container.Name) + } + + for i, container := range conf.Services.Containers { + name := fmt.Sprintf("%s_services_%d", c.prefix, i) + step := c.createProcess(name, container) + stage.Steps = append(stage.Steps, step) + + } + config.Stages = append(config.Stages, stage) + } + + // add pipeline steps. 1 pipeline step per stage, at the moment + var stage *backend.Stage + var group string + for i, container := range conf.Pipeline.Containers { + if !container.Constraints.Match(c.metadata) { + continue + } + + if stage == nil || group != container.Group || container.Group == "" { + group = container.Group + + stage = new(backend.Stage) + stage.Name = fmt.Sprintf("%s_stage_%v", c.prefix, i) + stage.Alias = container.Name + config.Stages = append(config.Stages, stage) + } + + name := fmt.Sprintf("%s_step_%d", c.prefix, i) + step := c.createProcess(name, container) + stage.Steps = append(stage.Steps, step) + } + + return config +} + +// func setupNetwork(step *backend.Step, network *libcompose.Network) { +// step.Networks = append(step.Networks, backend.Conn{ +// Name: network.Name, +// // Aliases: +// }) +// } +// +// func setupVolume(step *backend.Step, volume *libcompose.Volume) { +// step.Volumes = append(step.Volumes, volume.String()) +// } +// +// var ( +// // Default plugin used to clone the repository. +// defaultCloneImage = "plugins/git:latest" +// +// // Default plugin settings used to clone the repository. +// defaultCloneVargs = map[string]interface{}{ +// "depth": 0, +// } +// ) +// +// // defaultClone returns the default step for cloning an image. +// func defaultClone() *yaml.Container { +// return &yaml.Container{ +// Image: defaultCloneImage, +// Vargs: defaultCloneVargs, +// } +// } diff --git a/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/compiler/convert.go b/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/compiler/convert.go new file mode 100644 index 000000000..d61de71e6 --- /dev/null +++ b/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/compiler/convert.go @@ -0,0 +1,152 @@ +package compiler + +import ( + "fmt" + "path" + "strings" + + "github.com/cncd/pipeline/pipeline/backend" + "github.com/cncd/pipeline/pipeline/frontend/yaml" +) + +func (c *Compiler) createProcess(name string, container *yaml.Container) *backend.Step { + var ( + detached bool + workingdir string + + workspace = fmt.Sprintf("%s_default:%s", c.prefix, c.base) + privileged = container.Privileged + entrypoint = container.Entrypoint + command = container.Command + image = expandImage(container.Image) + // network = container.Network + ) + + networks := []backend.Conn{ + backend.Conn{ + Name: fmt.Sprintf("%s_default", c.prefix), + Aliases: c.aliases, + }, + } + + volumes := []string{ + workspace, + } + for _, volume := range container.Volumes.Volumes { + volumes = append(volumes, volume.String()) + } + // if network == "" { + // network = fmt.Sprintf("%s_default", c.prefix) + // for _, alias := range c.aliases { + // // if alias != container.Name { + // aliases = append(aliases, alias) + // // } + // } + // } // host, bridge, none, container:, overlay + + // append default environment variables + environment := map[string]string{} + for k, v := range container.Environment { + environment[k] = v + } + for k, v := range c.env { + switch v { + case "", "0", "false": + continue + default: + environment[k] = v + + // legacy code for drone plugins + if strings.HasPrefix(k, "CI_") { + p := strings.Replace(k, "CI_", "DRONE_", 1) + environment[p] = v + } + } + } + + environment["CI_WORKSPACE"] = path.Join(c.base, c.path) + environment["DRONE_WORKSPACE"] = path.Join(c.base, c.path) + + if !isService(container) { + workingdir = path.Join(c.base, c.path) + } + + if isService(container) { + detached = true + } + + if isPlugin(container) { + paramsToEnv(container.Vargs, environment) + + if imageMatches(container.Image, c.escalated) { + privileged = true + entrypoint = []string{} + command = []string{} + } + } + + if isShell(container) { + entrypoint = []string{"/bin/sh", "-c"} + command = []string{"echo $CI_SCRIPT | base64 -d | /bin/sh -e"} + environment["CI_SCRIPT"] = generateScriptPosix(container.Commands) + environment["HOME"] = "/root" + environment["SHELL"] = "/bin/sh" + } + + return &backend.Step{ + Name: name, + Alias: container.Name, + Image: image, + Pull: container.Pull, + Detached: detached, + Privileged: privileged, + WorkingDir: workingdir, + Environment: environment, + Labels: container.Labels, + Entrypoint: entrypoint, + Command: command, + ExtraHosts: container.ExtraHosts, + Volumes: volumes, + Devices: container.Devices, + Networks: networks, + DNS: container.DNS, + DNSSearch: container.DNSSearch, + MemSwapLimit: int64(container.MemSwapLimit), + MemLimit: int64(container.MemLimit), + ShmSize: int64(container.ShmSize), + CPUQuota: int64(container.CPUQuota), + CPUShares: int64(container.CPUShares), + CPUSet: container.CPUSet, + AuthConfig: backend.Auth{ + Username: container.AuthConfig.Username, + Password: container.AuthConfig.Password, + Email: container.AuthConfig.Email, + }, + OnSuccess: container.Constraints.Status.Match("success"), + OnFailure: (len(container.Constraints.Status.Include)+ + len(container.Constraints.Status.Exclude) != 0) && + container.Constraints.Status.Match("failure"), + } +} + +func imageMatches(image string, to []string) bool { + image = trimImage(image) + for _, i := range to { + if image == i { + return true + } + } + return false +} + +func isPlugin(c *yaml.Container) bool { + return len(c.Vargs) != 0 +} + +func isShell(c *yaml.Container) bool { + return len(c.Commands) != 0 +} + +func isService(c *yaml.Container) bool { + return c.Detached || (isPlugin(c) == false && isShell(c) == false) +} diff --git a/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/compiler/image.go b/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/compiler/image.go new file mode 100644 index 000000000..d1492c446 --- /dev/null +++ b/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/compiler/image.go @@ -0,0 +1,36 @@ +package compiler + +import ( + "github.com/docker/docker/reference" +) + +// trimImage returns the short image name without tag. +func trimImage(name string) string { + ref, err := reference.ParseNamed(name) + if err != nil { + return name + } + return reference.TrimNamed(ref).String() +} + +// expandImage returns the fully qualified image name. +func expandImage(name string) string { + ref, err := reference.ParseNamed(name) + if err != nil { + return name + } + return reference.WithDefaultTag(ref).String() +} + +// matchImage returns true if the image name matches +// an image in the list. Note the image tag is not used +// in the matching logic. +func matchImage(from string, to ...string) bool { + from = trimImage(from) + for _, match := range to { + if from == match { + return true + } + } + return false +} diff --git a/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/compiler/option.go b/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/compiler/option.go new file mode 100644 index 000000000..e56e33997 --- /dev/null +++ b/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/compiler/option.go @@ -0,0 +1,145 @@ +package compiler + +import ( + "os" + "strings" + + "github.com/cncd/pipeline/pipeline/frontend" +) + +// Option configures a compiler option. +type Option func(*Compiler) + +// WithVolumes configutes the compiler with default volumes that +// are mounted to each container in the pipeline. +func WithVolumes(volumes ...string) Option { + return func(compiler *Compiler) { + compiler.volumes = volumes + } +} + +// WithMetadata configutes the compiler with the repostiory, build +// and system metadata. The metadata is used to remove steps from +// the compiled pipeline configuration that should be skipped. The +// metadata is also added to each container as environment variables. +func WithMetadata(metadata frontend.Metadata) Option { + return func(compiler *Compiler) { + compiler.metadata = metadata + + for k, v := range metadata.Environ() { + compiler.env[k] = v + } + } +} + +// WithNetrc configures the compiler with netrc authentication +// credentials added by default to every container in the pipeline. +func WithNetrc(username, password, machine string) Option { + return WithEnviron( + map[string]string{ + "CI_NETRC_USERNAME": username, + "CI_NETRC_PASSWORD": password, + "CI_NETRC_MACHINE": machine, + }, + ) +} + +// WithWorkspace configures the compiler with the workspace base +// and path. The workspace base is a volume created at runtime and +// mounted into all containers in the pipeline. The base and path +// are joined to provide the working directory for all build and +// plugin steps in the pipeline. +func WithWorkspace(base, path string) Option { + return func(compiler *Compiler) { + compiler.base = base + compiler.path = path + } +} + +// WithEscalated configures the compiler to automatically execute +// images as privileged containers if the match the given list. +func WithEscalated(images ...string) Option { + return func(compiler *Compiler) { + compiler.escalated = images + } +} + +// WithPrefix configures the compiler with the prefix. The prefix is +// used to prefix container, volume and network names to avoid +// collision at runtime. +func WithPrefix(prefix string) Option { + return func(compiler *Compiler) { + compiler.prefix = prefix + } +} + +// WithLocal configures the compiler with the local flag. The local +// flag indicates the pipeline execution is running in a local development +// environment with a mounted local working directory. +func WithLocal(local bool) Option { + return func(compiler *Compiler) { + compiler.local = local + } +} + +// WithEnviron configures the compiler with environment variables +// added by default to every container in the pipeline. +func WithEnviron(env map[string]string) Option { + return func(compiler *Compiler) { + for k, v := range env { + compiler.env[k] = v + } + } +} + +// WithProxy configures the compiler with HTTP_PROXY, HTTPS_PROXY, +// and NO_PROXY environment variables added by default to every +// container in the pipeline. +func WithProxy() Option { + return WithEnviron( + map[string]string{ + "no_proxy": noProxy, + "NO_PROXY": noProxy, + "http_proxy": httpProxy, + "HTTP_PROXY": httpProxy, + "HTTPS_PROXY": httpsProxy, + "https_proxy": httpsProxy, + }, + ) +} + +// TODO(bradrydzewski) consider an alternate approach to +// WithProxy where the proxy strings are passed directly +// to the function as named parameters. + +// func WithProxy2(http, https, none string) Option { +// return WithEnviron( +// map[string]string{ +// "no_proxy": none, +// "NO_PROXY": none, +// "http_proxy": http, +// "HTTP_PROXY": http, +// "HTTPS_PROXY": https, +// "https_proxy": https, +// }, +// ) +// } + +var ( + noProxy = getenv("no_proxy") + httpProxy = getenv("https_proxy") + httpsProxy = getenv("https_proxy") +) + +// getenv returns the named environment variable. +func getenv(name string) (value string) { + name = strings.ToUpper(name) + if value := os.Getenv(name); value != "" { + return value + } + name = strings.ToLower(name) + if value := os.Getenv(name); value != "" { + return value + } + return +} diff --git a/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/compiler/params.go b/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/compiler/params.go new file mode 100644 index 000000000..5fe239015 --- /dev/null +++ b/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/compiler/params.go @@ -0,0 +1,65 @@ +package compiler + +import ( + "fmt" + "reflect" + "strconv" + "strings" + + json "github.com/ghodss/yaml" + "gopkg.in/yaml.v2" +) + +// paramsToEnv uses reflection to convert a map[string]interface to a list +// of environment variables. +func paramsToEnv(from map[string]interface{}, to map[string]string) error { + for k, v := range from { + if v == nil { + continue + } + + t := reflect.TypeOf(v) + vv := reflect.ValueOf(v) + + k = "PLUGIN_" + strings.ToUpper(k) + + switch t.Kind() { + case reflect.Bool: + to[k] = strconv.FormatBool(vv.Bool()) + + case reflect.String: + to[k] = vv.String() + + case reflect.Int, reflect.Int16, reflect.Int32, reflect.Int64, reflect.Int8: + to[k] = fmt.Sprintf("%v", vv.Int()) + + case reflect.Float32, reflect.Float64: + to[k] = fmt.Sprintf("%v", vv.Float()) + + case reflect.Map: + yml, _ := yaml.Marshal(vv.Interface()) + out, _ := json.YAMLToJSON(yml) + to[k] = string(out) + + case reflect.Slice: + out, err := yaml.Marshal(vv.Interface()) + if err != nil { + return err + } + + in := []string{} + err = yaml.Unmarshal(out, &in) + if err == nil { + to[k] = strings.Join(in, ",") + } else { + out, err = json.YAMLToJSON(out) + if err != nil { + return err + } + to[k] = string(out) + } + } + } + + return nil +} diff --git a/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/compiler/script_posix.go b/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/compiler/script_posix.go new file mode 100644 index 000000000..a4c6574a1 --- /dev/null +++ b/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/compiler/script_posix.go @@ -0,0 +1,52 @@ +package compiler + +import ( + "bytes" + "encoding/base64" + "fmt" + "strings" +) + +// generateScriptPosix is a helper function that generates a build script +// for a linux container using the given +func generateScriptPosix(commands []string) string { + var buf bytes.Buffer + for _, command := range commands { + escaped := fmt.Sprintf("%q", command) + escaped = strings.Replace(escaped, "$", `\$`, -1) + buf.WriteString(fmt.Sprintf( + traceScript, + escaped, + command, + )) + } + script := fmt.Sprintf( + setupScript, + buf.String(), + ) + return base64.StdEncoding.EncodeToString([]byte(script)) +} + +// setupScript is a helper script this is added to the build to ensure +// a minimum set of environment variables are set correctly. +const setupScript = ` +if [ -n "$CI_NETRC_MACHINE" ]; then +cat < $HOME/.netrc +machine $CI_NETRC_MACHINE +login $CI_NETRC_USERNAME +password $CI_NETRC_PASSWORD +EOF +chmod 0600 $HOME/.netrc +fi +unset CI_NETRC_USERNAME +unset CI_NETRC_PASSWORD +unset CI_SCRIPT +%s +` + +// traceScript is a helper script that is added to the build script +// to trace a command. +const traceScript = ` +echo + %s +%s +` diff --git a/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/compiler/script_win.go b/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/compiler/script_win.go new file mode 100644 index 000000000..a20d4fea6 --- /dev/null +++ b/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/compiler/script_win.go @@ -0,0 +1 @@ +package compiler diff --git a/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/config.go b/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/config.go new file mode 100644 index 000000000..53599a5b4 --- /dev/null +++ b/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/config.go @@ -0,0 +1,68 @@ +package yaml + +import ( + "io" + "io/ioutil" + "os" + + libcompose "github.com/docker/libcompose/yaml" + "gopkg.in/yaml.v2" +) + +type ( + // Config defines a pipeline configuration. + Config struct { + Platform string + Branches Constraint + Workspace Workspace + Clone Containers + Pipeline Containers + Services Containers + Networks Networks + Volumes Volumes + Labels libcompose.SliceorMap + } + + // Workspace defines a pipeline workspace. + Workspace struct { + Base string + Path string + } +) + +// Parse parses the configuration from bytes b. +func Parse(r io.Reader) (*Config, error) { + out, err := ioutil.ReadAll(r) + if err != nil { + return nil, err + } + return ParseBytes(out) +} + +// ParseBytes parses the configuration from bytes b. +func ParseBytes(b []byte) (*Config, error) { + out := new(Config) + err := yaml.Unmarshal(b, out) + if err != nil { + return nil, err + } + + return out, nil +} + +// ParseString parses the configuration from string s. +func ParseString(s string) (*Config, error) { + return ParseBytes( + []byte(s), + ) +} + +// ParseFile parses the configuration from path p. +func ParseFile(p string) (*Config, error) { + f, err := os.Open(p) + if err != nil { + return nil, err + } + defer f.Close() + return Parse(f) +} diff --git a/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/constraint.go b/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/constraint.go new file mode 100644 index 000000000..d62c0f986 --- /dev/null +++ b/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/constraint.go @@ -0,0 +1,152 @@ +package yaml + +import ( + "path/filepath" + + "github.com/cncd/pipeline/pipeline/frontend" + libcompose "github.com/docker/libcompose/yaml" +) + +type ( + // Constraints defines a set of runtime constraints. + Constraints struct { + Repo Constraint + Instance Constraint + Platform Constraint + Environment Constraint + Event Constraint + Branch Constraint + Status Constraint + Matrix ConstraintMap + } + + // Constraint defines a runtime constraint. + Constraint struct { + Include []string + Exclude []string + } + + // ConstraintMap defines a runtime constraint map. + ConstraintMap struct { + Include map[string]string + Exclude map[string]string + } +) + +// Match returns true if all constraints match the given input. If a single +// constraint fails a false value is returned. +func (c *Constraints) Match(metadata frontend.Metadata) bool { + return c.Platform.Match(metadata.Sys.Arch) && + c.Environment.Match(metadata.Curr.Target) && + c.Event.Match(metadata.Curr.Event) && + c.Branch.Match(metadata.Curr.Commit.Branch) && + c.Repo.Match(metadata.Repo.Name) && + c.Matrix.Match(metadata.Job.Matrix) +} + +// Match returns true if the string matches the include patterns and does not +// match any of the exclude patterns. +func (c *Constraint) Match(v string) bool { + if c.Excludes(v) { + return false + } + if c.Includes(v) { + return true + } + if len(c.Include) == 0 { + return true + } + return false +} + +// Includes returns true if the string matches the include patterns. +func (c *Constraint) Includes(v string) bool { + for _, pattern := range c.Include { + if ok, _ := filepath.Match(pattern, v); ok { + return true + } + } + return false +} + +// Excludes returns true if the string matches the exclude patterns. +func (c *Constraint) Excludes(v string) bool { + for _, pattern := range c.Exclude { + if ok, _ := filepath.Match(pattern, v); ok { + return true + } + } + return false +} + +// UnmarshalYAML unmarshals the constraint. +func (c *Constraint) UnmarshalYAML(unmarshal func(interface{}) error) error { + var out1 = struct { + Include libcompose.Stringorslice + Exclude libcompose.Stringorslice + }{} + + var out2 libcompose.Stringorslice + + unmarshal(&out1) + unmarshal(&out2) + + c.Exclude = out1.Exclude + c.Include = append( + out1.Include, + out2..., + ) + return nil +} + +// Match returns true if the params matches the include key values and does not +// match any of the exclude key values. +func (c *ConstraintMap) Match(params map[string]string) bool { + // when no includes or excludes automatically match + if len(c.Include) == 0 && len(c.Exclude) == 0 { + return true + } + // exclusions are processed first. So we can include everything and then + // selectively include others. + if len(c.Exclude) != 0 { + var matches int + + for key, val := range c.Exclude { + if params[key] == val { + matches++ + } + } + if matches == len(c.Exclude) { + return false + } + } + for key, val := range c.Include { + if params[key] != val { + return false + } + } + return true +} + +// UnmarshalYAML unmarshals the constraint map. +func (c *ConstraintMap) UnmarshalYAML(unmarshal func(interface{}) error) error { + out1 := struct { + Include map[string]string + Exclude map[string]string + }{ + Include: map[string]string{}, + Exclude: map[string]string{}, + } + + out2 := map[string]string{} + + unmarshal(&out1) + unmarshal(&out2) + + c.Include = out1.Include + c.Exclude = out1.Exclude + for k, v := range out2 { + c.Include[k] = v + } + return nil +} diff --git a/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/container.go b/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/container.go new file mode 100644 index 000000000..650e21110 --- /dev/null +++ b/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/container.go @@ -0,0 +1,80 @@ +package yaml + +import ( + "fmt" + + libcompose "github.com/docker/libcompose/yaml" + "gopkg.in/yaml.v2" +) + +type ( + // AuthConfig defines registry authentication credentials. + AuthConfig struct { + Username string + Password string + Email string + } + + // Containers denotes an ordered collection of containers. + Containers struct { + Containers []*Container + } + + // Container defines a container. + Container struct { + AuthConfig AuthConfig `yaml:"auth_config,omitempty"` + CapAdd []string `yaml:"cap_add,omitempty"` + CapDrop []string `yaml:"cap_drop,omitempty"` + Command libcompose.Command `yaml:"command,omitempty"` + Commands libcompose.Stringorslice `yaml:"commands,omitempty"` + CPUQuota libcompose.StringorInt `yaml:"cpu_quota,omitempty"` + CPUSet string `yaml:"cpuset,omitempty"` + CPUShares libcompose.StringorInt `yaml:"cpu_shares,omitempty"` + Detached bool `yaml:"detach,omitempty"` + Devices []string `yaml:"devices,omitempty"` + DNS libcompose.Stringorslice `yaml:"dns,omitempty"` + DNSSearch libcompose.Stringorslice `yaml:"dns_search,omitempty"` + Entrypoint libcompose.Command `yaml:"entrypoint,omitempty"` + Environment libcompose.SliceorMap `yaml:"environment,omitempty"` + ExtraHosts []string `yaml:"extra_hosts,omitempty"` + Group string `yaml:"group,omitempty"` + Image string `yaml:"image,omitempty"` + Isolation string `yaml:"isolation,omitempty"` + Labels libcompose.SliceorMap `yaml:"labels,omitempty"` + MemLimit libcompose.MemStringorInt `yaml:"mem_limit,omitempty"` + MemSwapLimit libcompose.MemStringorInt `yaml:"memswap_limit,omitempty"` + MemSwappiness libcompose.MemStringorInt `yaml:"mem_swappiness,omitempty"` + Name string `yaml:"name,omitempty"` + NetworkMode string `yaml:"network_mode,omitempty"` + Networks libcompose.Networks `yaml:"networks,omitempty"` + Privileged bool `yaml:"privileged,omitempty"` + Pull bool `yaml:"pull,omitempty"` + ShmSize libcompose.MemStringorInt `yaml:"shm_size,omitempty"` + Ulimits libcompose.Ulimits `yaml:"ulimits,omitempty"` + Volumes libcompose.Volumes `yaml:"volumes,omitempty"` + Constraints Constraints `yaml:"when,omitempty"` + Vargs map[string]interface{} `yaml:",inline"` + } +) + +// UnmarshalYAML implements the Unmarshaller interface. +func (c *Containers) UnmarshalYAML(unmarshal func(interface{}) error) error { + slice := yaml.MapSlice{} + if err := unmarshal(&slice); err != nil { + return err + } + + for _, s := range slice { + container := Container{} + out, _ := yaml.Marshal(s.Value) + + if err := yaml.Unmarshal(out, &container); err != nil { + return err + } + if container.Name == "" { + container.Name = fmt.Sprintf("%v", s.Key) + } + c.Containers = append(c.Containers, &container) + } + return nil +} diff --git a/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/linter/linter.go b/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/linter/linter.go new file mode 100644 index 000000000..150184d2a --- /dev/null +++ b/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/linter/linter.go @@ -0,0 +1,109 @@ +package linter + +import ( + "fmt" + + "github.com/cncd/pipeline/pipeline/frontend/yaml" +) + +// A Linter lints a pipeline configuration. +type Linter struct { + trusted bool +} + +// New creates a new Linter with options. +func New(opts ...Option) *Linter { + linter := new(Linter) + for _, opt := range opts { + opt(linter) + } + return linter +} + +// Lint lints the configuration. +func (l *Linter) Lint(c *yaml.Config) error { + var containers []*yaml.Container + containers = append(containers, c.Pipeline.Containers...) + containers = append(containers, c.Services.Containers...) + + for _, container := range containers { + if err := l.lintImage(container); err != nil { + return err + } + if l.trusted == false { + if err := l.lintTrusted(container); err != nil { + return err + } + } + if isService(container) == false { + if err := l.lintEntrypoint(container); err != nil { + return err + } + } + } + + if len(c.Pipeline.Containers) == 0 { + return fmt.Errorf("Invalid or missing pipeline section") + } + return nil +} + +func (l *Linter) lintImage(c *yaml.Container) error { + if len(c.Image) == 0 { + return fmt.Errorf("Invalid or missing image") + } + return nil +} + +func (l *Linter) lintEntrypoint(c *yaml.Container) error { + if len(c.Entrypoint) != 0 { + return fmt.Errorf("Cannot override container entrypoint") + } + if len(c.Command) != 0 { + return fmt.Errorf("Cannot override container command") + } + return nil +} + +func (l *Linter) lintTrusted(c *yaml.Container) error { + if c.Privileged { + return fmt.Errorf("Insufficient privileges to use privileged mode") + } + if c.ShmSize != 0 { + return fmt.Errorf("Insufficient privileges to override shm_size") + } + if len(c.DNS) != 0 { + return fmt.Errorf("Insufficient privileges to use custom dns") + } + if len(c.DNSSearch) != 0 { + return fmt.Errorf("Insufficient privileges to use dns_search") + } + if len(c.Devices) != 0 { + return fmt.Errorf("Insufficient privileges to use devices") + } + if len(c.ExtraHosts) != 0 { + return fmt.Errorf("Insufficient privileges to use extra_hosts") + } + if len(c.NetworkMode) != 0 { + return fmt.Errorf("Insufficient privileges to use network_mode") + } + if c.Networks.Networks != nil && len(c.Networks.Networks) != 0 { + return fmt.Errorf("Insufficient privileges to use networks") + } + if c.Volumes.Volumes != nil && len(c.Volumes.Volumes) != 0 { + return fmt.Errorf("Insufficient privileges to use volumes") + } + return nil +} + +func isService(c *yaml.Container) bool { + return !isScript(c) && !isPlugin(c) +} + +func isScript(c *yaml.Container) bool { + return len(c.Commands) != 0 +} + +func isPlugin(c *yaml.Container) bool { + return len(c.Vargs) != 0 +} diff --git a/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/linter/option.go b/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/linter/option.go new file mode 100644 index 000000000..3e1a67e97 --- /dev/null +++ b/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/linter/option.go @@ -0,0 +1,11 @@ +package linter + +// Option configures a linting option. +type Option func(*Linter) + +// WithTrusted adds the trusted option to the linter. +func WithTrusted(trusted bool) Option { + return func(linter *Linter) { + linter.trusted = trusted + } +} diff --git a/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/matrix/matrix.go b/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/matrix/matrix.go new file mode 100644 index 000000000..76a0f6bed --- /dev/null +++ b/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/matrix/matrix.go @@ -0,0 +1,117 @@ +package matrix + +import ( + "strings" + + "gopkg.in/yaml.v2" +) + +const ( + limitTags = 10 + limitAxis = 25 +) + +// Matrix represents the build matrix. +type Matrix map[string][]string + +// Axis represents a single permutation of entries from the build matrix. +type Axis map[string]string + +// String returns a string representation of an Axis as a comma-separated list +// of environment variables. +func (a Axis) String() string { + var envs []string + for k, v := range a { + envs = append(envs, k+"="+v) + } + return strings.Join(envs, " ") +} + +// Parse parses the Yaml matrix definition. +func Parse(data []byte) ([]Axis, error) { + + axis, err := parseList(data) + if err == nil && len(axis) != 0 { + return axis, nil + } + + matrix, err := parse(data) + if err != nil { + return nil, err + } + + // if not a matrix build return an array with just the single axis. + if len(matrix) == 0 { + return nil, nil + } + + return calc(matrix), nil +} + +// ParseString parses the Yaml string matrix definition. +func ParseString(data string) ([]Axis, error) { + return Parse([]byte(data)) +} + +func calc(matrix Matrix) []Axis { + // calculate number of permutations and extract the list of tags + // (ie go_version, redis_version, etc) + var perm int + var tags []string + for k, v := range matrix { + perm *= len(v) + if perm == 0 { + perm = len(v) + } + tags = append(tags, k) + } + + // structure to hold the transformed result set + axisList := []Axis{} + + // for each axis calculate the uniqe set of values that should be used. + for p := 0; p < perm; p++ { + axis := map[string]string{} + decr := perm + for i, tag := range tags { + elems := matrix[tag] + decr = decr / len(elems) + elem := p / decr % len(elems) + axis[tag] = elems[elem] + + // enforce a maximum number of tags in the build matrix. + if i > limitTags { + break + } + } + + // append to the list of axis. + axisList = append(axisList, axis) + + // enforce a maximum number of axis that should be calculated. + if p > limitAxis { + break + } + } + + return axisList +} + +func parse(raw []byte) (Matrix, error) { + data := struct { + Matrix map[string][]string + }{} + err := yaml.Unmarshal(raw, &data) + return data.Matrix, err +} + +func parseList(raw []byte) ([]Axis, error) { + data := struct { + Matrix struct { + Include []Axis + } + }{} + + err := yaml.Unmarshal(raw, &data) + return data.Matrix.Include, err +} diff --git a/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/network.go b/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/network.go new file mode 100644 index 000000000..d6a2c2133 --- /dev/null +++ b/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/network.go @@ -0,0 +1,48 @@ +package yaml + +import ( + "fmt" + + "gopkg.in/yaml.v2" +) + +type ( + // Networks defines a collection of networks. + Networks struct { + Networks []*Network + } + + // Network defines a container network. + Network struct { + Name string `yaml:"name,omitempty"` + Driver string `yaml:"driver,omitempty"` + DriverOpts map[string]string `yaml:"driver_opts,omitempty"` + } +) + +// UnmarshalYAML implements the Unmarshaller interface. +func (n *Networks) UnmarshalYAML(unmarshal func(interface{}) error) error { + slice := yaml.MapSlice{} + err := unmarshal(&slice) + if err != nil { + return err + } + + for _, s := range slice { + nn := Network{} + out, _ := yaml.Marshal(s.Value) + + err = yaml.Unmarshal(out, &nn) + if err != nil { + return err + } + if nn.Name == "" { + nn.Name = fmt.Sprintf("%v", s.Key) + } + if nn.Driver == "" { + nn.Driver = "bridge" + } + n.Networks = append(n.Networks, &nn) + } + return err +} diff --git a/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/volume.go b/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/volume.go new file mode 100644 index 000000000..346848505 --- /dev/null +++ b/vendor/github.com/cncd/pipeline/pipeline/frontend/yaml/volume.go @@ -0,0 +1,48 @@ +package yaml + +import ( + "fmt" + + "gopkg.in/yaml.v2" +) + +type ( + // Volumes defines a collection of volumes. + Volumes struct { + Volumes []*Volume + } + + // Volume defines a container volume. + Volume struct { + Name string `yaml:"name,omitempty"` + Driver string `yaml:"driver,omitempty"` + DriverOpts map[string]string `yaml:"driver_opts,omitempty"` + } +) + +// UnmarshalYAML implements the Unmarshaller interface. +func (v *Volumes) UnmarshalYAML(unmarshal func(interface{}) error) error { + slice := yaml.MapSlice{} + err := unmarshal(&slice) + if err != nil { + return err + } + + for _, s := range slice { + vv := Volume{} + out, _ := yaml.Marshal(s.Value) + + err = yaml.Unmarshal(out, &vv) + if err != nil { + return err + } + if vv.Name == "" { + vv.Name = fmt.Sprintf("%v", s.Key) + } + if vv.Driver == "" { + vv.Driver = "local" + } + v.Volumes = append(v.Volumes, &vv) + } + return err +} diff --git a/vendor/github.com/cncd/pipeline/pipeline/interrupt/interrupt.go b/vendor/github.com/cncd/pipeline/pipeline/interrupt/interrupt.go new file mode 100644 index 000000000..0296d7548 --- /dev/null +++ b/vendor/github.com/cncd/pipeline/pipeline/interrupt/interrupt.go @@ -0,0 +1,36 @@ +package interrupt + +import ( + "context" + "os" + "os/signal" +) + +// WithContext returns a copy of parent context whose Done channel is closed +// when an os interrupt signal is received. +func WithContext(ctx context.Context) context.Context { + return WithContextFunc(ctx, func() { + println("ctrl+c received, terminating process") + }) +} + +// WithContextFunc returns a copy of parent context that is cancelled when +// an os interrupt signal is received. The callback function f is invoked +// before cancellation. +func WithContextFunc(ctx context.Context, f func()) context.Context { + ctx, cancel := context.WithCancel(ctx) + go func() { + c := make(chan os.Signal) + signal.Notify(c, os.Interrupt) + defer signal.Stop(c) + + select { + case <-ctx.Done(): + case <-c: + f() + cancel() + } + }() + + return ctx +} diff --git a/vendor/github.com/cncd/pipeline/pipeline/logger.go b/vendor/github.com/cncd/pipeline/pipeline/logger.go new file mode 100644 index 000000000..d55734ee7 --- /dev/null +++ b/vendor/github.com/cncd/pipeline/pipeline/logger.go @@ -0,0 +1,20 @@ +package pipeline + +import ( + "github.com/cncd/pipeline/pipeline/backend" + "github.com/cncd/pipeline/pipeline/multipart" +) + +// Logger handles the process logging. +type Logger interface { + Log(*backend.Step, multipart.Reader) error +} + +// LogFunc type is an adapter to allow the use of an ordinary +// function for process logging. +type LogFunc func(*backend.Step, multipart.Reader) error + +// Log calls f(proc, r). +func (f LogFunc) Log(step *backend.Step, r multipart.Reader) error { + return f(step, r) +} diff --git a/vendor/github.com/cncd/pipeline/pipeline/multipart/doc.go b/vendor/github.com/cncd/pipeline/pipeline/multipart/doc.go new file mode 100644 index 000000000..56a9c264d --- /dev/null +++ b/vendor/github.com/cncd/pipeline/pipeline/multipart/doc.go @@ -0,0 +1 @@ +package multipart diff --git a/vendor/github.com/cncd/pipeline/pipeline/multipart/reader.go b/vendor/github.com/cncd/pipeline/pipeline/multipart/reader.go new file mode 100644 index 000000000..47505f73d --- /dev/null +++ b/vendor/github.com/cncd/pipeline/pipeline/multipart/reader.go @@ -0,0 +1,102 @@ +package multipart + +import ( + "bufio" + "bytes" + "io" + "mime/multipart" + "net/textproto" +) + +// Reader is an iterator over parts in a multipart log stream. +type Reader interface { + // NextPart returns the next part in the multipart or + // an error. When there are no more parts, the error + // io.EOF is returned. + NextPart() (Part, error) +} + +// A Part represents a single part in a multipart body. +type Part interface { + io.Reader + + // Header returns the headers of the body with the + // keys canonicalized. + Header() textproto.MIMEHeader + + // FileName returns the filename parameter of the + // Content-Disposition header. + FileName() string + + // FormName returns the name parameter if p has a + // Content-Disposition of type form-data. + FormName() string +} + +// New returns a new multipart Reader. +func New(r io.Reader) Reader { + buf := bufio.NewReader(r) + out, _ := buf.Peek(4) + + if bytes.Equal(out, []byte("MIME")) { + return &multipartReader{ + reader: multipart.NewReader(buf, "boundary"), + } + } + return &textReader{ + reader: r, + } +} + +// +// +// + +type multipartReader struct { + reader *multipart.Reader +} + +func (r *multipartReader) NextPart() (Part, error) { + next, err := r.reader.NextPart() + if err != nil { + return nil, err + } + part := new(part) + part.Reader = next + part.filename = next.FileName() + part.formname = next.FormName() + part.header = next.Header + return part, nil +} + +// +// +// + +type textReader struct { + reader io.Reader + done bool +} + +func (r *textReader) NextPart() (Part, error) { + if r.done { + return nil, io.EOF + } + r.done = true + p := new(part) + p.Reader = r.reader + p.filename = "terminal.log" + return p, nil +} + +type part struct { + io.Reader + + filename string + formname string + header textproto.MIMEHeader +} + +func (p *part) Header() textproto.MIMEHeader { return p.header } +func (p *part) FileName() string { return p.filename } +func (p *part) FormName() string { return p.filename } diff --git a/vendor/github.com/cncd/pipeline/pipeline/option.go b/vendor/github.com/cncd/pipeline/pipeline/option.go new file mode 100644 index 000000000..4b2332933 --- /dev/null +++ b/vendor/github.com/cncd/pipeline/pipeline/option.go @@ -0,0 +1,38 @@ +package pipeline + +import ( + "context" + + "github.com/cncd/pipeline/pipeline/backend" +) + +// Option configures a runtime option. +type Option func(*Runtime) + +// WithEngine returns an option configured with a runtime engine. +func WithEngine(engine backend.Engine) Option { + return func(r *Runtime) { + r.engine = engine + } +} + +// WithLogger returns an option configured with a runtime logger. +func WithLogger(logger Logger) Option { + return func(r *Runtime) { + r.logger = logger + } +} + +// WithTracer returns an option configured with a runtime tracer. +func WithTracer(tracer Tracer) Option { + return func(r *Runtime) { + r.tracer = tracer + } +} + +// WithContext returns an option configured with a context. +func WithContext(ctx context.Context) Option { + return func(r *Runtime) { + r.ctx = ctx + } +} diff --git a/vendor/github.com/cncd/pipeline/pipeline/parse.go b/vendor/github.com/cncd/pipeline/pipeline/parse.go new file mode 100644 index 000000000..89d6417be --- /dev/null +++ b/vendor/github.com/cncd/pipeline/pipeline/parse.go @@ -0,0 +1,37 @@ +package pipeline + +import ( + "encoding/json" + "io" + "os" + "strings" + + "github.com/cncd/pipeline/pipeline/backend" +) + +// Parse parses the pipeline config from an io.Reader. +func Parse(r io.Reader) (*backend.Config, error) { + cfg := new(backend.Config) + err := json.NewDecoder(r).Decode(cfg) + if err != nil { + return nil, err + } + return cfg, nil +} + +// ParseFile parses the pipeline config from a file. +func ParseFile(path string) (*backend.Config, error) { + f, err := os.Open(path) + if err != nil { + return nil, err + } + defer f.Close() + return Parse(f) +} + +// ParseString parses the pipeline config from a string. +func ParseString(s string) (*backend.Config, error) { + return Parse( + strings.NewReader(s), + ) +} diff --git a/vendor/github.com/cncd/pipeline/pipeline/pipeline.go b/vendor/github.com/cncd/pipeline/pipeline/pipeline.go new file mode 100644 index 000000000..be6970f83 --- /dev/null +++ b/vendor/github.com/cncd/pipeline/pipeline/pipeline.go @@ -0,0 +1,175 @@ +package pipeline + +import ( + "context" + "time" + + "golang.org/x/sync/errgroup" + + "github.com/cncd/pipeline/pipeline/backend" + "github.com/cncd/pipeline/pipeline/multipart" +) + +type ( + // State defines the pipeline and process state. + State struct { + // Global state of the pipeline. + Pipeline struct { + // Pipeline time started + Time int64 `json:"time"` + // Current pipeline step + Step *backend.Step `json:"step"` + // Current pipeline error state + Error error `json:"error"` + } + + // Current process state. + Process *backend.State + } +) + +// Runtime is a configuration runtime. +type Runtime struct { + err error + spec *backend.Config + engine backend.Engine + started int64 + + ctx context.Context + tracer Tracer + logger Logger +} + +// New returns a new runtime using the specified runtime +// configuration and runtime engine. +func New(spec *backend.Config, opts ...Option) *Runtime { + r := new(Runtime) + r.spec = spec + r.ctx = context.Background() + for _, opts := range opts { + opts(r) + } + return r +} + +// Run starts the runtime and waits for it to complete. +func (r *Runtime) Run() error { + defer func() { + r.engine.Destroy(r.spec) + }() + + r.started = time.Now().Unix() + if err := r.engine.Setup(r.spec); err != nil { + return err + } + + for _, stage := range r.spec.Stages { + select { + case <-r.ctx.Done(): + return ErrCancel + case err := <-r.execAll(stage.Steps): + if err != nil { + r.err = err + } + } + } + + return r.err +} + +// +// +// + +func (r *Runtime) execAll(procs []*backend.Step) <-chan error { + var g errgroup.Group + done := make(chan error) + + for _, proc := range procs { + proc := proc + g.Go(func() error { + return r.exec(proc) + }) + } + + go func() { + done <- g.Wait() + close(done) + }() + return done +} + +// +// +// + +func (r *Runtime) exec(proc *backend.Step) error { + switch { + case r.err != nil && proc.OnFailure == false: + return nil + case r.err == nil && proc.OnSuccess == false: + return nil + } + + if r.tracer != nil { + state := new(State) + state.Pipeline.Time = r.started + state.Pipeline.Error = r.err + state.Pipeline.Step = proc + state.Process = new(backend.State) // empty + if err := r.tracer.Trace(state); err == ErrSkip { + return nil + } else if err != nil { + return err + } + } + + if err := r.engine.Exec(proc); err != nil { + return err + } + + if r.logger != nil { + rc, err := r.engine.Tail(proc) + if err != nil { + return err + } + + go func() { + r.logger.Log(proc, multipart.New(rc)) + rc.Close() + }() + } + + if proc.Detached { + return nil + } + + wait, err := r.engine.Wait(proc) + if err != nil { + return err + } + + if r.tracer != nil { + state := new(State) + state.Pipeline.Time = r.started + state.Pipeline.Error = r.err + state.Pipeline.Step = proc + state.Process = wait + if err := r.tracer.Trace(state); err != nil { + return err + } + } + + if wait.OOMKilled { + return &OomError{ + Name: proc.Name, + Code: wait.ExitCode, + } + } else if wait.ExitCode != 0 { + return &ExitError{ + Name: proc.Name, + Code: wait.ExitCode, + } + } + return nil +} diff --git a/vendor/github.com/cncd/pipeline/pipeline/rpc/client.go b/vendor/github.com/cncd/pipeline/pipeline/rpc/client.go new file mode 100644 index 000000000..35fd1bda0 --- /dev/null +++ b/vendor/github.com/cncd/pipeline/pipeline/rpc/client.go @@ -0,0 +1,180 @@ +package rpc + +import ( + "context" + "io" + "io/ioutil" + "log" + "math" + "net/http" + "sync" + "time" + + "github.com/gorilla/websocket" + "github.com/sourcegraph/jsonrpc2" + websocketrpc "github.com/sourcegraph/jsonrpc2/websocket" +) + +const ( + methodNext = "next" + methodNotify = "notify" + methodExtend = "extend" + methodUpdate = "update" + methodLog = "log" + methodSave = "save" +) + +type ( + saveReq struct { + ID string `json:"id"` + Mime string `json:"mime"` + Data []byte `json:"data"` + } + + updateReq struct { + ID string `json:"id"` + State State `json:"state"` + } + + logReq struct { + ID string `json:"id"` + Line *Line `json:"line"` + } +) + +const ( + defaultRetryClount = math.MaxInt32 + defaultBackoff = 10 * time.Second +) + +// Client represents an rpc client. +type Client struct { + sync.Mutex + + conn *jsonrpc2.Conn + done bool + retry int + backoff time.Duration + endpoint string + token string +} + +// NewClient returns a new Client. +func NewClient(endpoint string, opts ...Option) (*Client, error) { + cli := &Client{ + endpoint: endpoint, + retry: defaultRetryClount, + backoff: defaultBackoff, + } + for _, opt := range opts { + opt(cli) + } + err := cli.openRetry() + return cli, err +} + +// Next returns the next pipeline in the queue. +func (t *Client) Next(c context.Context) (*Pipeline, error) { + res := new(Pipeline) + err := t.call(c, methodNext, nil, res) + return res, err +} + +// Notify returns true if the pipeline should be cancelled. +func (t *Client) Notify(c context.Context, id string) (bool, error) { + out := false + err := t.call(c, methodNotify, id, &out) + return out, err +} + +// Extend extends the pipeline deadline. +func (t *Client) Extend(c context.Context, id string) error { + return t.call(c, methodExtend, id, nil) +} + +// Update updates the pipeline state. +func (t *Client) Update(c context.Context, id string, state State) error { + params := updateReq{id, state} + return t.call(c, methodUpdate, ¶ms, nil) +} + +// Log writes the pipeline log entry. +func (t *Client) Log(c context.Context, id string, line *Line) error { + params := logReq{id, line} + return t.call(c, methodLog, ¶ms, nil) +} + +// Save saves the pipeline artifact. +func (t *Client) Save(c context.Context, id, mime string, file io.Reader) error { + data, err := ioutil.ReadAll(file) + if err != nil { + return err + } + params := saveReq{id, mime, data} + return t.call(c, methodSave, params, nil) +} + +// Close closes the client connection. +func (t *Client) Close() error { + t.Lock() + t.done = true + t.Unlock() + return t.conn.Close() +} + +// call makes the remote prodedure call. If the call fails due to connectivity +// issues the connection is re-establish and call re-attempted. +func (t *Client) call(ctx context.Context, name string, req, res interface{}) error { + if err := t.conn.Call(ctx, name, req, res); err == nil { + return nil + } else if err != jsonrpc2.ErrClosed && err != io.ErrUnexpectedEOF { + log.Printf("rpc: error making call: %s", err) + return err + } else { + log.Printf("rpc: error making call: connection closed: %s", err) + } + if err := t.openRetry(); err != nil { + return err + } + return t.conn.Call(ctx, name, req, res) +} + +// openRetry opens the connection and will retry on failure until +// the connection is successfully open, or the maximum retry count +// is exceeded. +func (t *Client) openRetry() error { + for i := 0; i < t.retry; i++ { + err := t.open() + if err == nil { + break + } + if err == io.EOF { + return err + } + + log.Printf("rpc: error re-connecting: %s", err) + <-time.After(t.backoff) + } + return nil +} + +// open creates a websocket connection to a peer and establishes a json +// rpc communication stream. +func (t *Client) open() error { + t.Lock() + defer t.Unlock() + if t.done { + return io.EOF + } + header := map[string][]string{ + "Content-Type": {"application/json-rpc"}, + "Authorization": {"Bearer " + t.token}, + } + conn, _, err := websocket.DefaultDialer.Dial(t.endpoint, http.Header(header)) + if err != nil { + return err + } + stream := websocketrpc.NewObjectStream(conn) + t.conn = jsonrpc2.NewConn(context.Background(), stream, nil) + return nil +} diff --git a/vendor/github.com/cncd/pipeline/pipeline/rpc/line.go b/vendor/github.com/cncd/pipeline/pipeline/rpc/line.go new file mode 100644 index 000000000..d3470e89d --- /dev/null +++ b/vendor/github.com/cncd/pipeline/pipeline/rpc/line.go @@ -0,0 +1,70 @@ +package rpc + +import ( + "bytes" + "context" + "fmt" + "time" +) + +// Identifies the type of line in the logs. +const ( + LineStdout int = iota + LineStderr + LineExitCode + LineMetadata + LineProgress +) + +// Line is a line of console output. +type Line struct { + Proc string `json:"proc,omitempty"` + Time int64 `json:"time,omitempty"` + Type int `json:"type,omitempty"` + Pos int `json:"pos,omityempty"` + Out string `json:"out,omitempty"` +} + +func (l *Line) String() string { + switch l.Type { + case LineExitCode: + return fmt.Sprintf("[%s] exit code %s", l.Proc, l.Out) + default: + return fmt.Sprintf("[%s:L%v:%vs] %s", l.Proc, l.Pos, l.Time, l.Out) + } +} + +// LineWriter sends logs to the client. +type LineWriter struct { + peer Peer + id string + name string + num int + now time.Time +} + +// NewLineWriter returns a new line reader. +func NewLineWriter(peer Peer, id, name string) *LineWriter { + w := new(LineWriter) + w.peer = peer + w.id = id + w.name = name + w.num = 0 + w.now = time.Now().UTC() + return w +} + +func (w *LineWriter) Write(p []byte) (n int, err error) { + for _, part := range bytes.Split(p, []byte{'\n'}) { + line := &Line{ + Out: string(part), + Proc: w.name, + Pos: w.num, + Time: int64(time.Since(w.now).Seconds()), + Type: LineStdout, + } + w.peer.Log(context.Background(), w.id, line) + w.num++ + } + return len(p), nil +} diff --git a/vendor/github.com/cncd/pipeline/pipeline/rpc/option.go b/vendor/github.com/cncd/pipeline/pipeline/rpc/option.go new file mode 100644 index 000000000..aed0070e2 --- /dev/null +++ b/vendor/github.com/cncd/pipeline/pipeline/rpc/option.go @@ -0,0 +1,29 @@ +package rpc + +import "time" + +// Option configures a client option. +type Option func(*Client) + +// WithBackoff configures the backoff duration when attempting +// to re-connect to a server. +func WithBackoff(d time.Duration) Option { + return func(c *Client) { + c.backoff = d + } +} + +// WithRetryLimit configures the maximum number of retries when +// connecting to the server. +func WithRetryLimit(i int) Option { + return func(c *Client) { + c.retry = i + } +} + +// WithToken configures the client authorization token. +func WithToken(t string) Option { + return func(c *Client) { + c.token = t + } +} diff --git a/vendor/github.com/cncd/pipeline/pipeline/rpc/peer.go b/vendor/github.com/cncd/pipeline/pipeline/rpc/peer.go new file mode 100644 index 000000000..414591f81 --- /dev/null +++ b/vendor/github.com/cncd/pipeline/pipeline/rpc/peer.go @@ -0,0 +1,55 @@ +package rpc + +import ( + "context" + "io" + + "github.com/cncd/pipeline/pipeline/backend" +) + +type ( + // Filter defines filters for fetching items from the queue. + Filter struct { + Platform string `json:"platform"` + } + + // State defines the pipeline state. + State struct { + Proc string `json:"proc"` + Exited bool `json:"exited"` + ExitCode int `json:"exit_code"` + Started int64 `json:"started"` + Finished int64 `json:"finished"` + Error string `json:"error"` + } + + // Pipeline defines the pipeline execution details. + Pipeline struct { + ID string `json:"id"` + Config *backend.Config `json:"config"` + Timeout int64 `json:"timeout"` + } +) + +// Peer defines a peer-to-peer connection. +type Peer interface { + // Next returns the next pipeline in the queue. + Next(c context.Context) (*Pipeline, error) + + // Notify returns true if the pipeline should be cancelled. + // TODO: rename to Done, Wait? + Notify(c context.Context, id string) (bool, error) + + // Extend extends the pipeline deadline + Extend(c context.Context, id string) error + + // Update updates the pipeline state. + Update(c context.Context, id string, state State) error + + // Save saves the pipeline artifact. + // TODO rename to Upload + Save(c context.Context, id, mime string, file io.Reader) error + + // Log writes the pipeline log entry. + Log(c context.Context, id string, line *Line) error +} diff --git a/vendor/github.com/cncd/pipeline/pipeline/rpc/server.go b/vendor/github.com/cncd/pipeline/pipeline/rpc/server.go new file mode 100644 index 000000000..242c4ee9a --- /dev/null +++ b/vendor/github.com/cncd/pipeline/pipeline/rpc/server.go @@ -0,0 +1,126 @@ +package rpc + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "net/http" + + "github.com/gorilla/websocket" + "github.com/sourcegraph/jsonrpc2" + websocketrpc "github.com/sourcegraph/jsonrpc2/websocket" +) + +// errNoSuchMethod is returned when the name rpc method does not exist. +var errNoSuchMethod = errors.New("No such rpc method") + +// noContext is an empty context used when no context is required. +var noContext = context.Background() + +// Server represents an rpc server. +type Server struct { + peer Peer +} + +// NewServer returns an rpc Server. +func NewServer(peer Peer) *Server { + return &Server{peer} +} + +// ServeHTTP implements an http.Handler that answers rpc requests. +func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { + upgrader := websocket.Upgrader{} + c, err := upgrader.Upgrade(w, r, nil) + if err != nil { + return + } + ctx, cancel := context.WithCancel(context.Background()) + conn := jsonrpc2.NewConn(ctx, + websocketrpc.NewObjectStream(c), + jsonrpc2.HandlerWithError(s.router), + ) + defer func() { + cancel() + conn.Close() + }() + <-conn.DisconnectNotify() +} + +// router implements an jsonrpc2.Handler that answers RPC requests. +func (s *Server) router(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) (interface{}, error) { + switch req.Method { + case methodNext: + return s.next(ctx, req) + case methodNotify: + return s.notify(ctx, req) + case methodExtend: + return s.extend(ctx, req) + case methodUpdate: + return s.update(req) + case methodLog: + return s.log(req) + case methodSave: + return s.save(req) + default: + return nil, errNoSuchMethod + } +} + +// next unmarshals the rpc request parameters and invokes the peer.Next +// procedure. The results are retuned and written to the rpc response. +func (s *Server) next(ctx context.Context, req *jsonrpc2.Request) (interface{}, error) { + return s.peer.Next(ctx) +} + +// notify unmarshals the rpc request parameters and invokes the peer.Notify +// procedure. The results are retuned and written to the rpc response. +func (s *Server) notify(ctx context.Context, req *jsonrpc2.Request) (interface{}, error) { + var id string + err := json.Unmarshal([]byte(*req.Params), &id) + if err != nil { + return nil, err + } + return s.peer.Notify(ctx, id) +} + +// extend unmarshals the rpc request parameters and invokes the peer.Extend +// procedure. The results are retuned and written to the rpc response. +func (s *Server) extend(ctx context.Context, req *jsonrpc2.Request) (interface{}, error) { + var id string + err := json.Unmarshal([]byte(*req.Params), &id) + if err != nil { + return nil, err + } + return nil, s.peer.Extend(ctx, id) +} + +// update unmarshals the rpc request parameters and invokes the peer.Update +// procedure. The results are retuned and written to the rpc response. +func (s *Server) update(req *jsonrpc2.Request) (interface{}, error) { + in := new(updateReq) + if err := json.Unmarshal([]byte(*req.Params), in); err != nil { + return nil, err + } + return nil, s.peer.Update(noContext, in.ID, in.State) +} + +// log unmarshals the rpc request parameters and invokes the peer.Log +// procedure. The results are retuned and written to the rpc response. +func (s *Server) log(req *jsonrpc2.Request) (interface{}, error) { + in := new(logReq) + if err := json.Unmarshal([]byte(*req.Params), in); err != nil { + return nil, err + } + return nil, s.peer.Log(noContext, in.ID, in.Line) +} + +// save unmarshals the rpc request parameters and invokes the peer.Save +// procedure. The results are retuned and written to the rpc response. +func (s *Server) save(req *jsonrpc2.Request) (interface{}, error) { + in := new(saveReq) + if err := json.Unmarshal([]byte(*req.Params), in); err != nil { + return nil, err + } + return nil, s.peer.Save(noContext, in.ID, in.Mime, bytes.NewBuffer(in.Data)) +} diff --git a/vendor/github.com/cncd/pipeline/pipeline/tracer.go b/vendor/github.com/cncd/pipeline/pipeline/tracer.go new file mode 100644 index 000000000..14e3a196f --- /dev/null +++ b/vendor/github.com/cncd/pipeline/pipeline/tracer.go @@ -0,0 +1,45 @@ +package pipeline + +import ( + "strconv" + "time" +) + +// Tracer handles process tracing. +type Tracer interface { + Trace(*State) error +} + +// TraceFunc type is an adapter to allow the use of ordinary +// functions as a Tracer. +type TraceFunc func(*State) error + +// Trace calls f(proc, state). +func (f TraceFunc) Trace(state *State) error { + return f(state) +} + +// DefaultTracer provides a tracer that updates the CI_ enviornment +// variables to include the correct timestamp and status. +// TODO(bradrydzewski) find either a new home or better name for this. +var DefaultTracer = TraceFunc(func(state *State) error { + if state.Process.Exited { + return nil + } + if state.Pipeline.Step.Environment == nil { + return nil + } + state.Pipeline.Step.Environment["CI_BUILD_STATUS"] = "success" + state.Pipeline.Step.Environment["CI_BUILD_STARTED"] = strconv.FormatInt(state.Pipeline.Time, 10) + state.Pipeline.Step.Environment["CI_BUILD_FINISHED"] = strconv.FormatInt(time.Now().Unix(), 10) + + state.Pipeline.Step.Environment["CI_JOB_STATUS"] = "success" + state.Pipeline.Step.Environment["CI_JOB_STARTED"] = strconv.FormatInt(state.Pipeline.Time, 10) + state.Pipeline.Step.Environment["CI_JOB_FINISHED"] = strconv.FormatInt(time.Now().Unix(), 10) + + if state.Pipeline.Error != nil { + state.Pipeline.Step.Environment["CI_BUILD_STATUS"] = "failure" + state.Pipeline.Step.Environment["CI_JOB_STATUS"] = "failure" + } + return nil +}) diff --git a/vendor/github.com/cncd/pubsub/LICENSE b/vendor/github.com/cncd/pubsub/LICENSE new file mode 100644 index 000000000..64e202179 --- /dev/null +++ b/vendor/github.com/cncd/pubsub/LICENSE @@ -0,0 +1,29 @@ +BSD 3-Clause License + +Copyright (c) 2017, Brad Rydzewski +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +* Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +* Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +* Neither the name of the copyright holder nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/github.com/cncd/pubsub/README b/vendor/github.com/cncd/pubsub/README new file mode 100644 index 000000000..c053f1ac7 --- /dev/null +++ b/vendor/github.com/cncd/pubsub/README @@ -0,0 +1,6 @@ +Go package provides a common interface for publish-subscriber messaging. + +Documentation: + + http://godoc.org/github.com/cncd/pubsub + http://godoc.org/github.com/cncd/pubsub/gcp diff --git a/vendor/github.com/cncd/pubsub/pub.go b/vendor/github.com/cncd/pubsub/pub.go new file mode 100644 index 000000000..aad40fa90 --- /dev/null +++ b/vendor/github.com/cncd/pubsub/pub.go @@ -0,0 +1,75 @@ +package pubsub + +import ( + "context" + "sync" +) + +type subscriber struct { + receiver Receiver +} + +type publisher struct { + sync.Mutex + + topics map[string]*topic +} + +// New creates an in-memory publisher. +func New() Publisher { + return &publisher{ + topics: make(map[string]*topic), + } +} + +func (p *publisher) Create(c context.Context, dest string) error { + p.Lock() + t, ok := p.topics[dest] + if !ok { + t = newTopic(dest) + p.topics[dest] = t + } + p.Unlock() + return nil +} + +func (p *publisher) Publish(c context.Context, dest string, message Message) error { + p.Lock() + t, ok := p.topics[dest] + p.Unlock() + if !ok { + return ErrNotFound + } + t.publish(message) + return nil +} + +func (p *publisher) Subscribe(c context.Context, dest string, receiver Receiver) error { + p.Lock() + t, ok := p.topics[dest] + p.Unlock() + if !ok { + return ErrNotFound + } + s := &subscriber{ + receiver: receiver, + } + t.subscribe(s) + select { + case <-c.Done(): + case <-t.done: + } + t.unsubscribe(s) + return nil +} + +func (p *publisher) Remove(c context.Context, dest string) error { + p.Lock() + t, ok := p.topics[dest] + if ok { + delete(p.topics, dest) + t.close() + } + p.Unlock() + return nil +} diff --git a/vendor/github.com/cncd/pubsub/pubsub.go b/vendor/github.com/cncd/pubsub/pubsub.go new file mode 100644 index 000000000..9caade640 --- /dev/null +++ b/vendor/github.com/cncd/pubsub/pubsub.go @@ -0,0 +1,71 @@ +// Package pubsub implements a publish-subscriber messaging system. +package pubsub + +import ( + "context" + "errors" +) + +// ErrNotFound is returned when the named topic does not exist. +var ErrNotFound = errors.New("topic not found") + +// Message defines a published message. +type Message struct { + // ID identifies this message. + ID string `json:"id,omitempty"` + + // Data is the actual data in the entry. + Data []byte `json:"data"` + + // Labels represents the key-value pairs the entry is lebeled with. + Labels map[string]string `json:"labels,omitempty"` +} + +// Receiver receives published messages. +type Receiver func(Message) + +// Publisher defines a mechanism for communicating messages from a group +// of senders, called producers, to a group of consumers. +type Publisher interface { + // Create creates the named topic. + Create(c context.Context, topic string) error + + // Publish publishes the message. + Publish(c context.Context, topic string, message Message) error + + // Subscribe subscribes to the topic. The Receiver function is a callback + // function that receives published messages. + Subscribe(c context.Context, topic string, receiver Receiver) error + + // Remove removes the named topic. + Remove(c context.Context, topic string) error +} + +// // global instance of the queue. +// var global = New() +// +// // Set sets the global queue. +// func Set(p Publisher) { +// global = p +// } +// +// // Create creates the named topic. +// func Create(c context.Context, topic string) error { +// return global.Create(c, topic) +// } +// +// // Publish publishes the message. +// func Publish(c context.Context, topic string, message Message) error { +// return global.Publish(c, topic, message) +// } +// +// // Subscribe subscribes to the topic. The Receiver function is a callback +// // function that receives published messages. +// func Subscribe(c context.Context, topic string, receiver Receiver) error { +// return global.Subscribe(c, topic, receiver) +// } +// +// // Remove removes the topic. +// func Remove(c context.Context, topic string) error { +// return global.Remove(c, topic) +// } diff --git a/vendor/github.com/cncd/pubsub/topic.go b/vendor/github.com/cncd/pubsub/topic.go new file mode 100644 index 000000000..b3b127113 --- /dev/null +++ b/vendor/github.com/cncd/pubsub/topic.go @@ -0,0 +1,45 @@ +package pubsub + +import "sync" + +type topic struct { + sync.Mutex + + name string + done chan bool + subs map[*subscriber]struct{} +} + +func newTopic(dest string) *topic { + return &topic{ + name: dest, + done: make(chan bool), + subs: make(map[*subscriber]struct{}), + } +} + +func (t *topic) subscribe(s *subscriber) { + t.Lock() + t.subs[s] = struct{}{} + t.Unlock() +} + +func (t *topic) unsubscribe(s *subscriber) { + t.Lock() + delete(t.subs, s) + t.Unlock() +} + +func (t *topic) publish(m Message) { + t.Lock() + for s := range t.subs { + go s.receiver(m) + } + t.Unlock() +} + +func (t *topic) close() { + t.Lock() + close(t.done) + t.Unlock() +} diff --git a/vendor/github.com/cncd/queue/LICENSE b/vendor/github.com/cncd/queue/LICENSE new file mode 100644 index 000000000..64e202179 --- /dev/null +++ b/vendor/github.com/cncd/queue/LICENSE @@ -0,0 +1,29 @@ +BSD 3-Clause License + +Copyright (c) 2017, Brad Rydzewski +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +* Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +* Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +* Neither the name of the copyright holder nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/github.com/cncd/queue/README b/vendor/github.com/cncd/queue/README new file mode 100644 index 000000000..85a992414 --- /dev/null +++ b/vendor/github.com/cncd/queue/README @@ -0,0 +1,6 @@ +Go package provides a common interface for working with task queues. + +Documentation: + + http://godoc.org/github.com/cncd/queue + http://godoc.org/github.com/cncd/queue/gcp diff --git a/vendor/github.com/cncd/queue/fifo.go b/vendor/github.com/cncd/queue/fifo.go new file mode 100644 index 000000000..59264f672 --- /dev/null +++ b/vendor/github.com/cncd/queue/fifo.go @@ -0,0 +1,190 @@ +package queue + +import ( + "container/list" + "context" + "log" + "runtime" + "sync" + "time" +) + +type entry struct { + item *Task + done chan bool + retry int + error error + deadline time.Time +} + +type worker struct { + filter Filter + channel chan *Task +} + +type fifo struct { + sync.Mutex + + workers map[*worker]struct{} + running map[string]*entry + pending *list.List + extension time.Duration +} + +// New returns a new fifo queue. +func New() Queue { + return &fifo{ + workers: map[*worker]struct{}{}, + running: map[string]*entry{}, + pending: list.New(), + extension: time.Minute * 10, + } +} + +// Push pushes an item to the tail of this queue. +func (q *fifo) Push(c context.Context, task *Task) error { + q.Lock() + q.pending.PushBack(task) + q.Unlock() + go q.process() + return nil +} + +// Poll retrieves and removes the head of this queue. +func (q *fifo) Poll(c context.Context, f Filter) (*Task, error) { + q.Lock() + w := &worker{ + channel: make(chan *Task, 1), + filter: f, + } + q.workers[w] = struct{}{} + q.Unlock() + go q.process() + + for { + select { + case <-c.Done(): + q.Lock() + delete(q.workers, w) + q.Unlock() + return nil, nil + case t := <-w.channel: + return t, nil + } + } +} + +// Done signals that the item is done executing. +func (q *fifo) Done(c context.Context, id string) error { + return q.Error(c, id, nil) +} + +// Error signals that the item is done executing with error. +func (q *fifo) Error(c context.Context, id string, err error) error { + q.Lock() + state, ok := q.running[id] + if ok { + state.error = err + close(state.done) + delete(q.running, id) + } + q.Unlock() + return nil +} + +// Wait waits until the item is done executing. +func (q *fifo) Wait(c context.Context, id string) error { + q.Lock() + state := q.running[id] + q.Unlock() + if state != nil { + select { + case <-c.Done(): + case <-state.done: + return state.error + } + } + return nil +} + +// Extend extends the task execution deadline. +func (q *fifo) Extend(c context.Context, id string) error { + q.Lock() + defer q.Unlock() + + state, ok := q.running[id] + if ok { + state.deadline = time.Now().Add(q.extension) + return nil + } + return ErrNotFound +} + +// Info returns internal queue information. +func (q *fifo) Info(c context.Context) InfoT { + q.Lock() + stats := InfoT{} + stats.Stats.Workers = len(q.workers) + stats.Stats.Pending = q.pending.Len() + stats.Stats.Running = len(q.running) + + for e := q.pending.Front(); e != nil; e = e.Next() { + stats.Pending = append(stats.Pending, e.Value.(*Task)) + } + for _, entry := range q.running { + stats.Running = append(stats.Running, entry.item) + } + + q.Unlock() + return stats +} + +// helper function that loops through the queue and attempts to +// match the item to a single subscriber. +func (q *fifo) process() { + defer func() { + // the risk of panic is low. This code can probably be removed + // once the code has been used in real world installs without issue. + if err := recover(); err != nil { + const size = 64 << 10 + buf := make([]byte, size) + buf = buf[:runtime.Stack(buf, false)] + log.Printf("queue: unexpected panic: %v\n%s", err, buf) + } + }() + + q.Lock() + defer q.Unlock() + + // TODO(bradrydzewski) move this to a helper function + // push items to the front of the queue if the item expires. + for id, state := range q.running { + if time.Now().After(state.deadline) { + q.pending.PushFront(state.item) + delete(q.running, id) + close(state.done) + } + } + + var next *list.Element +loop: + for e := q.pending.Front(); e != nil; e = next { + next = e.Next() + item := e.Value.(*Task) + for w := range q.workers { + if w.filter(item) { + delete(q.workers, w) + q.pending.Remove(e) + + q.running[item.ID] = &entry{ + item: item, + done: make(chan bool), + deadline: time.Now().Add(q.extension), + } + + w.channel <- item + break loop + } + } + } +} diff --git a/vendor/github.com/cncd/queue/queue.go b/vendor/github.com/cncd/queue/queue.go new file mode 100644 index 000000000..48d2f85e7 --- /dev/null +++ b/vendor/github.com/cncd/queue/queue.go @@ -0,0 +1,110 @@ +package queue + +import ( + "context" + "errors" +) + +var ( + // ErrCancel indicates the task was cancelled. + ErrCancel = errors.New("queue: task cancelled") + + // ErrNotFound indicates the task was not found in the queue. + ErrNotFound = errors.New("queue: task not found") +) + +// Task defines a unit of work in the queue. +type Task struct { + // ID identifies this task. + ID string `json:"id,omitempty"` + + // Data is the actual data in the entry. + Data []byte `json:"data"` + + // Labels represents the key-value pairs the entry is lebeled with. + Labels map[string]string `json:"labels,omitempty"` +} + +// InfoT provides runtime information. +type InfoT struct { + Pending []*Task `json:"pending"` + Running []*Task `json:"running"` + Stats struct { + Workers int `json:"worker_count"` + Pending int `json:"pending_count"` + Running int `json:"running_count"` + Complete int `json:"completed_count"` + } `json:"stats"` +} + +// Filter filters tasks in the queue. If the Filter returns false, +// the Task is skipped and not returned to the subscriber. +type Filter func(*Task) bool + +// Queue defines a task queue for scheduling tasks among +// a pool of workers. +type Queue interface { + // Push pushes an task to the tail of this queue. + Push(c context.Context, task *Task) error + + // Poll retrieves and removes a task head of this queue. + Poll(c context.Context, f Filter) (*Task, error) + + // Extend extends the deadline for a task. + Extend(c context.Context, id string) error + + // Done signals the task is complete. + Done(c context.Context, id string) error + + // Error signals the task is complete with errors. + Error(c context.Context, id string, err error) error + + // Wait waits until the task is complete. + Wait(c context.Context, id string) error + + // Info returns internal queue information. + Info(c context.Context) InfoT +} + +// // global instance of the queue. +// var global = New() +// +// // Set sets the global queue. +// func Set(queue Queue) { +// global = queue +// } +// +// // Push pushes an task to the tail of the global queue. +// func Push(c context.Context, task *Task) error { +// return global.Push(c, task) +// } +// +// // Poll retrieves and removes a task head of the global queue. +// func Poll(c context.Context, f Filter) (*Task, error) { +// return global.Poll(c, f) +// } +// +// // Extend extends the deadline for a task. +// func Extend(c context.Context, id string) error { +// return global.Extend(c, id) +// } +// +// // Done signals the task is complete. +// func Done(c context.Context, id string) error { +// return global.Done(c, id) +// } +// +// // Error signals the task is complete with errors. +// func Error(c context.Context, id string, err error) { +// global.Error(c, id, err) +// } +// +// // Wait waits until the task is complete. +// func Wait(c context.Context, id string) error { +// return global.Wait(c, id) +// } +// +// // Info returns internal queue information. +// func Info(c context.Context) InfoT { +// return global.Info(c) +// } diff --git a/vendor/github.com/cncd/queue/worker.go b/vendor/github.com/cncd/queue/worker.go new file mode 100644 index 000000000..c969bad24 --- /dev/null +++ b/vendor/github.com/cncd/queue/worker.go @@ -0,0 +1 @@ +package queue diff --git a/vendor/vendor.json b/vendor/vendor.json index 043cb16e7..f2d2c5b85 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -1,6 +1,6 @@ { "comment": "", - "ignore": "test", + "ignore": "test github.com/drone/mq/ github.com/tidwall/redlog/ google.golang.org/appengine/ github.com/syndtr/goleveldb/ github.com/drone/drone-ui/", "package": [ { "path": "code.google.com/p/go.crypto/ssh", @@ -17,6 +17,90 @@ "revision": "5cef21e2e4f0fd147973b558d4db7395176bcd95", "revisionTime": "2016-03-22T13:50:45-07:00" }, + { + "checksumSHA1": "HhHyclewPAZ7sZQCsl2nMSe1T5s=", + "path": "github.com/cncd/logging", + "revision": "03b6463409fecbd23f04587adf9bc71da13796a2", + "revisionTime": "2017-03-05T07:05:34Z" + }, + { + "checksumSHA1": "W3AuK8ocqHwlUajGmQLFvnRhTZE=", + "path": "github.com/cncd/pipeline/pipeline", + "revision": "b5457789534af0da2a05bd9fd0c0afe6fea391c6", + "revisionTime": "2017-03-04T04:47:59Z" + }, + { + "checksumSHA1": "PSzh0ix/rlMrS/Cl3aH6GHGrJuo=", + "path": "github.com/cncd/pipeline/pipeline/backend", + "revision": "b5457789534af0da2a05bd9fd0c0afe6fea391c6", + "revisionTime": "2017-03-04T04:47:59Z" + }, + { + "checksumSHA1": "0CGXRaYwZhJxGIrGhn8WGpkFqPo=", + "path": "github.com/cncd/pipeline/pipeline/backend/docker", + "revision": "b5457789534af0da2a05bd9fd0c0afe6fea391c6", + "revisionTime": "2017-03-04T04:47:59Z" + }, + { + "checksumSHA1": "uUagpzha5ah/a3RO6IImvzHYFlY=", + "path": "github.com/cncd/pipeline/pipeline/frontend", + "revision": "b5457789534af0da2a05bd9fd0c0afe6fea391c6", + "revisionTime": "2017-03-04T04:47:59Z" + }, + { + "checksumSHA1": "O0sulBQAHJeNLg3lO38Cq5uf/eg=", + "path": "github.com/cncd/pipeline/pipeline/frontend/yaml", + "revision": "b5457789534af0da2a05bd9fd0c0afe6fea391c6", + "revisionTime": "2017-03-04T04:47:59Z" + }, + { + "checksumSHA1": "e1lZWQdObXCKWqZOGlOeaeERQMc=", + "path": "github.com/cncd/pipeline/pipeline/frontend/yaml/compiler", + "revision": "b5457789534af0da2a05bd9fd0c0afe6fea391c6", + "revisionTime": "2017-03-04T04:47:59Z" + }, + { + "checksumSHA1": "Q0GkNUFamVYIA1Fd8r0A5M6Gx54=", + "path": "github.com/cncd/pipeline/pipeline/frontend/yaml/linter", + "revision": "b5457789534af0da2a05bd9fd0c0afe6fea391c6", + "revisionTime": "2017-03-04T04:47:59Z" + }, + { + "checksumSHA1": "kx2sPUIMozPC/g6E4w48h3FfH3k=", + "path": "github.com/cncd/pipeline/pipeline/frontend/yaml/matrix", + "revision": "b5457789534af0da2a05bd9fd0c0afe6fea391c6", + "revisionTime": "2017-03-04T04:47:59Z" + }, + { + "checksumSHA1": "2/3f3oNmxXy5kcrRLCFa24Oc9O4=", + "path": "github.com/cncd/pipeline/pipeline/interrupt", + "revision": "b5457789534af0da2a05bd9fd0c0afe6fea391c6", + "revisionTime": "2017-03-04T04:47:59Z" + }, + { + "checksumSHA1": "8eTwXZPM/Kp9uE/mnhpWDTiX7nY=", + "path": "github.com/cncd/pipeline/pipeline/multipart", + "revision": "b5457789534af0da2a05bd9fd0c0afe6fea391c6", + "revisionTime": "2017-03-04T04:47:59Z" + }, + { + "checksumSHA1": "5axmtZsHaQ5uE/tuNQZygquNx8U=", + "path": "github.com/cncd/pipeline/pipeline/rpc", + "revision": "b5457789534af0da2a05bd9fd0c0afe6fea391c6", + "revisionTime": "2017-03-04T04:47:59Z" + }, + { + "checksumSHA1": "7Qj1DK0ceAXkYztW0l3+L6sn+V8=", + "path": "github.com/cncd/pubsub", + "revision": "0691529ab100a0f3c4e2087407d2788a0787ad9c", + "revisionTime": "2017-03-03T07:06:35Z" + }, + { + "checksumSHA1": "AG4M07wOZNTnSFHJIfdXT2ymnts=", + "path": "github.com/cncd/queue", + "revision": "1ce1ada7160f1eda015a16c1b7f9ea497fa36873", + "revisionTime": "2017-03-03T07:04:55Z" + }, { "path": "github.com/codegangsta/cli", "revision": "70e3fa51ebed95df8c0fbe1519c1c1f9bc16bb13",