chunk the yaml into sections

This commit is contained in:
Brad Rydzewski 2016-04-19 18:37:53 -07:00
parent fd63d6e03d
commit b5823d20ff
19 changed files with 966 additions and 177 deletions

View file

@ -2,13 +2,22 @@ clone:
path: github.com/drone/drone
build:
image: drone/golang:1.5
environment:
- GO15VENDOREXPERIMENT=1
commands:
- make deps gen
- make test test_postgres test_mysql
- make build build_static deb docs
test:
image: drone/golang:1.5
environment:
- GO15VENDOREXPERIMENT=1
commands:
- make deps gen
- make test test_postgres test_mysql
- make build
dist:
image: drone/golang:1.5
environment:
- GO15VENDOREXPERIMENT=1
commands:
- make build
when:
event: push
compose:
postgres:
@ -27,10 +36,7 @@ publish:
password: $$DOCKER_PASS
email: $$DOCKER_EMAIL
repo: drone/drone
tag:
- "latest"
- "0.4.1"
- "0.4"
tag: [ "latest" ]
when:
repo: drone/drone
branch: master

1
.gitignore vendored
View file

@ -1,5 +1,4 @@
drone/drone
drone_*
*.sqlite
*_gen.go
*.html

View file

@ -19,11 +19,12 @@ ADD contrib/docker/etc/nsswitch.conf /etc/
ENV DATABASE_DRIVER=sqlite3
ENV DATABASE_CONFIG=/var/lib/drone/drone.sqlite
ADD drone_static /drone_static
ADD drone/drone /drone
# Alpine Linux doesn't use pam, which means that there is no /etc/nsswitch.conf,
# but Go and CGO rely on /etc/nsswitch.conf to check the order of DNS resolving.
# To fix this we just create /etc/nsswitch.conf and add the following line:
#RUN echo 'hosts: files mdns4_minimal [NOTFOUND=return] dns mdns4' >> /etc/nsswitch.conf
ENTRYPOINT ["/drone_static"]
ENTRYPOINT ["/drone"]
CMD ["serve"]

View file

@ -24,11 +24,10 @@ gen_template:
gen_migrations:
go generate github.com/drone/drone/store/datastore/ddl
build:
go build
build: build_static
build_static:
go build --ldflags '-extldflags "-static" -X github.com/drone/drone/version.VersionDev=$(CI_BUILD_NUMBER)' -o drone_static
cd drone && go build --ldflags '-extldflags "-static" -X github.com/drone/drone/version.VersionDev=$(CI_BUILD_NUMBER)' -o drone
test:
go test -cover $(PACKAGES)

155
api/queue.go Normal file
View file

@ -0,0 +1,155 @@
package api
import (
"fmt"
"io"
"strconv"
"github.com/Sirupsen/logrus"
"github.com/drone/drone/bus"
"github.com/drone/drone/model"
"github.com/drone/drone/queue"
"github.com/drone/drone/remote"
"github.com/drone/drone/store"
"github.com/drone/drone/stream"
"github.com/gin-gonic/gin"
)
// Pull is a long request that polls and attemts to pull work off the queue stack.
func Pull(c *gin.Context) {
logrus.Debugf("Agent %s connected.", c.ClientIP())
w := queue.PullClose(c, c.Writer)
if w == nil {
logrus.Debugf("Agent %s could not pull work.", c.ClientIP())
} else {
c.JSON(202, w)
logrus.Debugf("Agent %s assigned work. %s/%s#%d.%d",
c.ClientIP(),
w.Repo.Owner,
w.Repo.Name,
w.Build.Number,
w.Job.Number,
)
}
}
// Wait is a long request that polls and waits for cancelled build requests.
func Wait(c *gin.Context) {
id, err := strconv.ParseInt(c.Param("id"), 10, 64)
if err != nil {
c.String(500, "Invalid input. %s", err)
return
}
eventc := make(chan *bus.Event, 1)
bus.Subscribe(c, eventc)
defer bus.Unsubscribe(c, eventc)
for {
select {
case event := <-eventc:
if event.Job.ID == id &&
event.Job.Status != model.StatusPending &&
event.Job.Status != model.StatusRunning {
c.JSON(200, event.Job)
return
}
case <-c.Writer.CloseNotify():
return
}
}
}
// Update handles build updates from the agent and persists to the database.
func Update(c *gin.Context) {
work := &queue.Work{}
if err := c.BindJSON(work); err != nil {
logrus.Errorf("Invalid input. %s", err)
return
}
// TODO(bradrydzewski) it is really annoying that we have to do this lookup
// and I'd prefer not to. The reason we do this is because the Build and Job
// have fields that aren't serialized to json and would be reset to their
// empty values if we just saved what was coming in the http.Request body.
build, err := store.GetBuild(c, work.Build.ID)
if err != nil {
c.String(404, "Unable to find build. %s", err)
return
}
job, err := store.GetJob(c, work.Job.ID)
if err != nil {
c.String(404, "Unable to find job. %s", err)
return
}
build.Started = work.Build.Started
build.Finished = work.Build.Finished
build.Status = work.Build.Status
job.Started = work.Job.Started
job.Finished = work.Job.Finished
job.Status = work.Job.Status
job.ExitCode = work.Job.ExitCode
ok, err := store.UpdateBuildJob(c, build, job)
if err != nil {
c.String(500, "Unable to update job. %s", err)
return
}
if ok {
// get the user because we transfer the user form the server to agent
// and back we lose the token which does not get serialized to json.
user, err := store.GetUser(c, work.User.ID)
if err != nil {
c.String(500, "Unable to find user. %s", err)
return
}
bus.Publish(c, &bus.Event{})
remote.Status(c, user, work.Repo, build,
fmt.Sprintf("%s/%s/%d", work.System.Link, work.Repo.FullName, work.Build.Number))
}
c.JSON(200, work)
}
// Stream streams the logs to disk or memory for broadcasing to listeners. Once
// the stream is closed it is moved to permanent storage in the database.
func Stream(c *gin.Context) {
id, err := strconv.ParseInt(c.Param("id"), 10, 64)
if err != nil {
c.String(500, "Invalid input. %s", err)
return
}
key := stream.ToKey(id)
rc, wc, err := stream.Create(c, key)
if err != nil {
logrus.Errorf("Agent %s failed to create stream. %s.", c.ClientIP(), err)
return
}
defer func() {
wc.Close()
rc.Close()
stream.Remove(c, key)
}()
io.Copy(wc, c.Request.Body)
wc.Close()
rcc, _, err := stream.Open(c, key)
if err != nil {
logrus.Errorf("Agent %s failed to read cache. %s.", c.ClientIP(), err)
return
}
defer func() {
rcc.Close()
}()
store.WriteLog(c, &model.Job{ID: id}, rcc)
c.String(200, "")
logrus.Debugf("Agent %s wrote stream to database", c.ClientIP())
}

235
client/client.go Normal file
View file

@ -0,0 +1,235 @@
package client
import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"strconv"
"github.com/drone/drone/model"
"github.com/drone/drone/queue"
"golang.org/x/net/context"
"golang.org/x/net/context/ctxhttp"
"golang.org/x/oauth2"
)
const (
pathPull = "%s/api/queue/pull"
pathWait = "%s/api/queue/wait/%d"
pathStream = "%s/api/queue/stream/%d"
pathPush = "%s/api/queue/status/%d"
)
type client struct {
client *http.Client
base string // base url
}
// NewClient returns a client at the specified url.
func NewClient(uri string) Client {
return &client{http.DefaultClient, uri}
}
// NewClientToken returns a client at the specified url that
// authenticates all outbound requests with the given token.
func NewClientToken(uri, token string) Client {
config := new(oauth2.Config)
auther := config.Client(oauth2.NoContext, &oauth2.Token{AccessToken: token})
return &client{auther, uri}
}
// Pull pulls work from the server queue.
func (c *client) Pull() (*queue.Work, error) {
out := new(queue.Work)
uri := fmt.Sprintf(pathPull, c.base)
err := c.post(uri, nil, out)
return out, err
}
// Push pushes an update to the server.
func (c *client) Push(p *queue.Work) error {
uri := fmt.Sprintf(pathPush, c.base, p.Job.ID)
err := c.post(uri, p, nil)
return err
}
// Stream streams the build logs to the server.
func (c *client) Stream(id int64, rc io.ReadCloser) error {
uri := fmt.Sprintf(pathStream, c.base, id)
err := c.post(uri, rc, nil)
return err
}
// Wait watches and waits for the build to cancel or finish.
func (c *client) Wait(id int64) *Wait {
ctx, cancel := context.WithCancel(context.Background())
return &Wait{id, c, ctx, cancel}
}
////////
type CancelNotifier interface {
Canecel()
CancelNotify() bool
IsCancelled() bool
}
////////
type Wait struct {
id int64
client *client
ctx context.Context
cancel context.CancelFunc
}
func (w *Wait) Done() (*model.Job, error) {
uri := fmt.Sprintf(pathWait, w.client.base, w.id)
req, err := w.client.createRequest(uri, "POST", nil)
if err != nil {
return nil, err
}
res, err := ctxhttp.Do(w.ctx, w.client.client, req)
if err != nil {
return nil, err
}
defer res.Body.Close()
job := &model.Job{}
err = json.NewDecoder(res.Body).Decode(&job)
if err != nil {
return nil, err
}
return job, nil
}
func (w *Wait) Cancel() {
w.cancel()
}
//
// http request helper functions
//
// helper function for making an http GET request.
func (c *client) get(rawurl string, out interface{}) error {
return c.do(rawurl, "GET", nil, out)
}
// helper function for making an http POST request.
func (c *client) post(rawurl string, in, out interface{}) error {
return c.do(rawurl, "POST", in, out)
}
// helper function for making an http PUT request.
func (c *client) put(rawurl string, in, out interface{}) error {
return c.do(rawurl, "PUT", in, out)
}
// helper function for making an http PATCH request.
func (c *client) patch(rawurl string, in, out interface{}) error {
return c.do(rawurl, "PATCH", in, out)
}
// helper function for making an http DELETE request.
func (c *client) delete(rawurl string) error {
return c.do(rawurl, "DELETE", nil, nil)
}
// helper function to make an http request
func (c *client) do(rawurl, method string, in, out interface{}) error {
// executes the http request and returns the body as
// and io.ReadCloser
body, err := c.open(rawurl, method, in, out)
if err != nil {
return err
}
defer body.Close()
// if a json response is expected, parse and return
// the json response.
if out != nil {
return json.NewDecoder(body).Decode(out)
}
return nil
}
// helper function to open an http request
func (c *client) open(rawurl, method string, in, out interface{}) (io.ReadCloser, error) {
uri, err := url.Parse(rawurl)
if err != nil {
return nil, err
}
// creates a new http request to bitbucket.
req, err := http.NewRequest(method, uri.String(), nil)
if err != nil {
return nil, err
}
// if we are posting or putting data, we need to
// write it to the body of the request.
if in != nil {
rc, ok := in.(io.ReadCloser)
if ok {
req.Body = rc
req.Header.Set("Content-Type", "plain/text")
} else {
inJson, err := json.Marshal(in)
if err != nil {
return nil, err
}
buf := bytes.NewBuffer(inJson)
req.Body = ioutil.NopCloser(buf)
req.ContentLength = int64(len(inJson))
req.Header.Set("Content-Length", strconv.Itoa(len(inJson)))
req.Header.Set("Content-Type", "application/json")
}
}
resp, err := c.client.Do(req)
if err != nil {
return nil, err
}
if resp.StatusCode > http.StatusPartialContent {
defer resp.Body.Close()
out, _ := ioutil.ReadAll(resp.Body)
return nil, fmt.Errorf(string(out))
}
return resp.Body, nil
}
// createRequest is a helper function that builds an http.Request.
func (c *client) createRequest(rawurl, method string, in interface{}) (*http.Request, error) {
uri, err := url.Parse(rawurl)
if err != nil {
return nil, err
}
// if we are posting or putting data, we need to
// write it to the body of the request.
var buf io.ReadWriter
if in != nil {
buf = new(bytes.Buffer)
err := json.NewEncoder(buf).Encode(in)
if err != nil {
return nil, err
}
}
// creates a new http request to bitbucket.
req, err := http.NewRequest(method, uri.String(), buf)
if err != nil {
return nil, err
}
if in != nil {
req.Header.Set("Content-Type", "application/json")
}
return req, nil
}

22
client/interface.go Normal file
View file

@ -0,0 +1,22 @@
package client
import (
"io"
"github.com/drone/drone/queue"
)
// Client is used to communicate with a Drone server.
type Client interface {
// Pull pulls work from the server queue.
Pull() (*queue.Work, error)
// Push pushes an update to the server.
Push(*queue.Work) error
// Stream streams the build logs to the server.
Stream(int64, io.ReadCloser) error
// Wait waits for the job to the complete.
Wait(int64) *Wait
}

110
drone/agent/agent.go Normal file
View file

@ -0,0 +1,110 @@
package agent
import (
"sync"
"time"
"github.com/drone/drone/client"
"github.com/samalba/dockerclient"
"github.com/Sirupsen/logrus"
"github.com/codegangsta/cli"
)
// AgentCmd is the exported command for starting the drone agent.
var AgentCmd = cli.Command{
Name: "agent",
Usage: "starts the drone agent",
Action: start,
Flags: []cli.Flag{
cli.StringFlag{
EnvVar: "DOCKER_HOST",
Name: "docker-host",
Usage: "docker deamon address",
Value: "unix:///var/run/docker.sock",
},
cli.BoolFlag{
EnvVar: "DOCKER_TLS_VERIFY",
Name: "docker-tls-verify",
Usage: "docker daemon supports tlsverify",
},
cli.StringFlag{
EnvVar: "DOCKER_CERT_PATH",
Name: "docker-cert-path",
Usage: "docker certificate directory",
Value: "",
},
cli.IntFlag{
EnvVar: "DOCKER_MAX_PROCS",
Name: "docker-max-procs",
Usage: "limit number of running docker processes",
Value: 2,
},
cli.StringFlag{
EnvVar: "DRONE_SERVER",
Name: "drone-server",
Usage: "drone server address",
Value: "http://localhost:8000",
},
cli.StringFlag{
EnvVar: "DRONE_TOKEN",
Name: "drone-token",
Usage: "drone authorization token",
},
cli.DurationFlag{
EnvVar: "BACKOFF",
Name: "drone-backoff",
Usage: "drone server backoff interval",
Value: time.Second * 15,
},
cli.BoolFlag{
EnvVar: "DEBUG",
Name: "debug",
Usage: "start the agent in debug mode",
},
cli.BoolFlag{
EnvVar: "EXPERIMENTAL",
Name: "experimental",
Usage: "start the agent with experimental features",
},
},
}
func start(c *cli.Context) {
// debug level if requested by user
if c.Bool("debug") {
logrus.SetLevel(logrus.DebugLevel)
} else {
logrus.SetLevel(logrus.WarnLevel)
}
client := client.NewClientToken(
c.String("drone-server"),
c.String("drone-token"),
)
tls, _ := dockerclient.TLSConfigFromCertPath(c.String("docker-cert-path"))
if c.Bool("docker-host") {
tls.InsecureSkipVerify = true
}
docker, err := dockerclient.NewDockerClient(c.String("docker-host"), tls)
if err != nil {
logrus.Fatal(err)
}
var wg sync.WaitGroup
for i := 0; i < c.Int("docker-max-procs"); i++ {
wg.Add(1)
go func() {
for {
if err := recoverExec(client, docker); err != nil {
dur := c.Duration("drone-backoff")
logrus.Debugf("Attempting to reconnect in %v", dur)
time.Sleep(dur)
}
}
}()
}
wg.Wait()
}

186
drone/agent/exec.go Normal file
View file

@ -0,0 +1,186 @@
package agent
import (
"encoding/json"
"fmt"
"io"
"time"
"github.com/Sirupsen/logrus"
"github.com/dchest/uniuri"
"github.com/drone/drone/client"
"github.com/drone/drone/engine/compiler"
"github.com/drone/drone/engine/compiler/builtin"
"github.com/drone/drone/engine/runner"
engine "github.com/drone/drone/engine/runner/docker"
"github.com/drone/drone/model"
"github.com/samalba/dockerclient"
"golang.org/x/net/context"
)
func recoverExec(client client.Client, docker dockerclient.Client) error {
defer func() {
recover()
}()
return exec(client, docker)
}
func exec(client client.Client, docker dockerclient.Client) error {
w, err := client.Pull()
if err != nil {
return err
}
logrus.Infof("Starting build %s/%s#%d.%d",
w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number)
w.Job.Status = model.StatusRunning
w.Job.Started = time.Now().Unix()
prefix := fmt.Sprintf("drone_%s", uniuri.New())
trans := []compiler.Transform{
builtin.NewCloneOp("plugins/git:latest", true),
builtin.NewCacheOp(
"plugins/cache:latest",
"/var/lib/drone/cache/"+w.Repo.FullName,
false,
),
builtin.NewNormalizeOp("plugins"),
builtin.NewWorkspaceOp("/drone", "drone/src/github.com/"+w.Repo.FullName),
builtin.NewEnvOp(map[string]string{
"CI": "drone",
"CI_REPO": w.Repo.FullName,
"CI_REPO_OWNER": w.Repo.Owner,
"CI_REPO_NAME": w.Repo.Name,
"CI_REPO_LINK": w.Repo.Link,
"CI_REPO_AVATAR": w.Repo.Avatar,
"CI_REPO_BRANCH": w.Repo.Branch,
"CI_REPO_PRIVATE": fmt.Sprintf("%v", w.Repo.IsPrivate),
"CI_REMOTE_URL": w.Repo.Clone,
"CI_COMMIT_SHA": w.Build.Commit,
"CI_COMMIT_REF": w.Build.Ref,
"CI_COMMIT_BRANCH": w.Build.Branch,
"CI_COMMIT_LINK": w.Build.Link,
"CI_COMMIT_MESSAGE": w.Build.Message,
"CI_AUTHOR": w.Build.Author,
"CI_AUTHOR_EMAIL": w.Build.Email,
"CI_AUTHOR_AVATAR": w.Build.Avatar,
"CI_BUILD_NUMBER": fmt.Sprintf("%v", w.Build.Number),
"CI_BUILD_EVENT": w.Build.Event,
// "CI_NETRC_USERNAME": w.Netrc.Login,
// "CI_NETRC_PASSWORD": w.Netrc.Password,
// "CI_NETRC_MACHINE": w.Netrc.Machine,
// "CI_PREV_BUILD_STATUS": w.BuildLast.Status,
// "CI_PREV_BUILD_NUMBER": fmt.Sprintf("%v", w.BuildLast.Number),
// "CI_PREV_COMMIT_SHA": w.BuildLast.Commit,
}),
builtin.NewValidateOp(
w.Repo.IsTrusted,
[]string{"plugins/*"},
),
builtin.NewShellOp(builtin.Linux_adm64),
builtin.NewArgsOp(),
builtin.NewPodOp(prefix),
builtin.NewAliasOp(prefix),
builtin.NewPullOp(false),
builtin.NewFilterOp(
model.StatusSuccess, // w.BuildLast.Status,
w.Build.Branch,
w.Build.Event,
w.Build.Deploy,
map[string]string{},
),
}
compile := compiler.New()
compile.Transforms(trans)
spec, err := compile.CompileString(w.Yaml)
if err != nil {
// TODO handle error
logrus.Infof("Error compiling Yaml %s/%s#%d %s",
w.Repo.Owner, w.Repo.Name, w.Build.Number, err.Error())
return err
}
if err := client.Push(w); err != nil {
logrus.Errorf("Error persisting update %s/%s#%d.%d. %s",
w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number, err)
return err
}
conf := runner.Config{
Engine: engine.New(docker),
}
ctx := context.TODO()
ctx, cancel := context.WithCancel(ctx)
run := conf.Runner(ctx, spec)
run.Run()
defer cancel()
wait := client.Wait(w.Job.ID)
if err != nil {
return err
}
go func() {
_, werr := wait.Done()
if werr == nil {
logrus.Infof("Cancel build %s/%s#%d.%d",
w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number)
cancel()
}
}()
defer wait.Cancel()
rc, wc := io.Pipe()
go func() {
err := client.Stream(w.Job.ID, rc)
if err != nil && err != io.ErrClosedPipe {
logrus.Errorf("Error streaming build logs. %s", err)
}
}()
pipe := run.Pipe()
for {
line := pipe.Next()
if line == nil {
break
}
linejson, _ := json.Marshal(line)
wc.Write(linejson)
wc.Write([]byte{'\n'})
}
err = run.Wait()
pipe.Close()
wc.Close()
rc.Close()
// catch the build result
if err != nil {
w.Job.ExitCode = 255
}
if exitErr, ok := err.(*runner.ExitError); ok {
w.Job.ExitCode = exitErr.Code
}
w.Job.Finished = time.Now().Unix()
switch w.Job.ExitCode {
case 128, 130:
w.Job.Status = model.StatusKilled
case 0:
w.Job.Status = model.StatusSuccess
default:
w.Job.Status = model.StatusFailure
}
logrus.Infof("Finished build %s/%s#%d.%d",
w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number)
return client.Push(w)
}

29
drone/drone.go Normal file
View file

@ -0,0 +1,29 @@
package main
import (
"os"
"github.com/drone/drone/drone/agent"
"github.com/drone/drone/drone/server"
"github.com/drone/drone/version"
"github.com/codegangsta/cli"
"github.com/ianschenck/envflag"
_ "github.com/joho/godotenv/autoload"
)
func main2() {
envflag.Parse()
app := cli.NewApp()
app.Name = "drone"
app.Version = version.Version
app.Usage = "command line utility"
app.Commands = []cli.Command{
agent.AgentCmd,
server.ServeCmd,
}
app.Run(os.Args)
}

View file

@ -2,6 +2,7 @@ package main
import (
"net/http"
"os"
"time"
"github.com/drone/drone/router"
@ -22,6 +23,11 @@ var (
)
func main() {
if os.Getenv("CANARY") == "true" {
main2()
return
}
envflag.Parse()
// debug level if requested by user
@ -35,6 +41,9 @@ func main() {
handler := router.Load(
ginrus.Ginrus(logrus.StandardLogger(), time.RFC3339, true),
middleware.Version,
middleware.Queue(),
middleware.Stream(),
middleware.Bus(),
middleware.Cache(),
middleware.Store(),
middleware.Remote(),

89
drone/server/server.go Normal file
View file

@ -0,0 +1,89 @@
package server
import (
"net/http"
"time"
"github.com/drone/drone/router"
"github.com/drone/drone/router/middleware"
"github.com/Sirupsen/logrus"
"github.com/codegangsta/cli"
"github.com/gin-gonic/contrib/ginrus"
)
// ServeCmd is the exported command for starting the drone server.
var ServeCmd = cli.Command{
Name: "serve",
Usage: "starts the drone server",
Action: func(c *cli.Context) {
if err := start(c); err != nil {
logrus.Fatal(err)
}
},
Flags: []cli.Flag{
cli.StringFlag{
EnvVar: "SERVER_ADDR",
Name: "server-addr",
Usage: "server address",
Value: ":8000",
},
cli.StringFlag{
EnvVar: "SERVER_CERT",
Name: "server-cert",
Usage: "server ssl cert",
},
cli.StringFlag{
EnvVar: "SERVER_KEY",
Name: "server-key",
Usage: "server ssl key",
},
cli.BoolFlag{
EnvVar: "DEBUG",
Name: "debug",
Usage: "start the server in debug mode",
},
cli.BoolFlag{
EnvVar: "EXPERIMENTAL",
Name: "experimental",
Usage: "start the server with experimental features",
},
},
}
func start(c *cli.Context) error {
// debug level if requested by user
if c.Bool("debug") {
logrus.SetLevel(logrus.DebugLevel)
} else {
logrus.SetLevel(logrus.WarnLevel)
}
// setup the server and start the listener
handler := router.Load(
ginrus.Ginrus(logrus.StandardLogger(), time.RFC3339, true),
middleware.Version,
middleware.Queue(),
middleware.Stream(),
middleware.Bus(),
middleware.Cache(),
middleware.Store(),
middleware.Remote(),
middleware.Engine(),
)
if c.String("server-cert") != "" {
return http.ListenAndServeTLS(
c.String("server-addr"),
c.String("server-cert"),
c.String("server-key"),
handler,
)
}
return http.ListenAndServe(
c.String("server-addr"),
handler,
)
}

View file

@ -1,158 +0,0 @@
package engine
import (
"fmt"
"time"
"github.com/Sirupsen/logrus"
"github.com/drone/drone/bus"
"github.com/drone/drone/engine/compiler"
"github.com/drone/drone/engine/runner"
"github.com/drone/drone/engine/runner/docker"
"github.com/drone/drone/model"
"github.com/drone/drone/queue"
"github.com/drone/drone/store"
"github.com/drone/drone/stream"
"golang.org/x/net/context"
)
// Poll polls the build queue for build jobs.
func Poll(c context.Context) {
for {
pollRecover(c)
}
}
func pollRecover(c context.Context) {
defer recover()
poll(c)
}
func poll(c context.Context) {
w := queue.Pull(c)
logrus.Infof("Starting build %s/%s#%d.%d",
w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number)
rc, wc, err := stream.Create(c, stream.ToKey(w.Job.ID))
if err != nil {
logrus.Errorf("Error opening build stream %s/%s#%d.%d. %s",
w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number, err)
}
defer func() {
wc.Close()
rc.Close()
stream.Remove(c, stream.ToKey(w.Job.ID))
}()
w.Job.Status = model.StatusRunning
w.Job.Started = time.Now().Unix()
quitc := make(chan bool, 1)
eventc := make(chan *bus.Event, 1)
bus.Subscribe(c, eventc)
compile := compiler.New()
compile.Transforms(nil)
spec, err := compile.CompileString(w.Yaml)
if err != nil {
// TODO handle error
logrus.Infof("Error compiling Yaml %s/%s#%d %s",
w.Repo.Owner, w.Repo.Name, w.Build.Number, err.Error())
return
}
defer func() {
bus.Unsubscribe(c, eventc)
quitc <- true
}()
ctx := context.TODO()
ctx, cancel := context.WithCancel(ctx)
// TODO store the started build in the database
// TODO publish the started build
store.UpdateJob(c, w.Job)
//store.Write(c, w.Job, rc)
bus.Publish(c, bus.NewEvent(bus.Started, w.Repo, w.Build, w.Job))
conf := runner.Config{
Engine: docker.FromContext(c),
}
run := conf.Runner(ctx, spec)
run.Run()
defer cancel()
go func() {
for {
select {
case event := <-eventc:
if event.Type == bus.Cancelled && event.Job.ID == w.Job.ID {
logrus.Infof("Cancel build %s/%s#%d.%d",
w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number)
cancel()
}
case <-quitc:
return
}
}
}()
pipe := run.Pipe()
for {
line := pipe.Next()
if line == nil {
break
}
fmt.Println(line)
}
err = run.Wait()
// catch the build result
if err != nil {
w.Job.ExitCode = 255
}
if exitErr, ok := err.(*runner.ExitError); ok {
w.Job.ExitCode = exitErr.Code
}
w.Job.Finished = time.Now().Unix()
switch w.Job.ExitCode {
case 128, 130:
w.Job.Status = model.StatusKilled
case 0:
w.Job.Status = model.StatusSuccess
default:
w.Job.Status = model.StatusFailure
}
// store the finished build in the database
logs, _, err := stream.Open(c, stream.ToKey(w.Job.ID))
if err != nil {
logrus.Errorf("Error reading build stream %s/%s#%d.%d",
w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number)
}
defer func() {
if logs != nil {
logs.Close()
}
}()
if err := store.WriteLog(c, w.Job, logs); err != nil {
logrus.Errorf("Error persisting build stream %s/%s#%d.%d",
w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number)
}
if logs != nil {
logs.Close()
}
// TODO publish the finished build
store.UpdateJob(c, w.Job)
bus.Publish(c, bus.NewEvent(bus.Finished, w.Repo, w.Build, w.Job))
logrus.Infof("Finished build %s/%s#%d.%d",
w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number)
}

View file

@ -1,5 +1,7 @@
package model
import "strconv"
type RepoLite struct {
Owner string `json:"owner"`
Name string `json:"name"`
@ -31,3 +33,17 @@ type Repo struct {
AllowTag bool `json:"allow_tags" meddler:"repo_allow_tags"`
Hash string `json:"-" meddler:"repo_hash"`
}
// ToEnv returns environment variable valus for the repository.
func (r *Repo) ToEnv(to map[string]string) {
to["CI_VCS"] = r.Kind
to["CI_REPO"] = r.FullName
to["CI_REPO_OWNER"] = r.Owner
to["CI_REPO_NAME"] = r.Name
to["CI_REPO_LINK"] = r.Link
to["CI_REPO_AVATAR"] = r.Avatar
to["CI_REPO_BRANCH"] = r.Branch
to["CI_REPO_PRIVATE"] = strconv.FormatBool(r.IsPrivate)
to["CI_REPO_TRUSTED"] = strconv.FormatBool(r.IsTrusted)
to["CI_REMOTE_URL"] = r.Clone
}

14
router/middleware/bus.go Normal file
View file

@ -0,0 +1,14 @@
package middleware
import (
"github.com/drone/drone/bus"
"github.com/gin-gonic/gin"
)
func Bus() gin.HandlerFunc {
bus_ := bus.New()
return func(c *gin.Context) {
bus.ToContext(c, bus_)
c.Next()
}
}

View file

@ -0,0 +1,14 @@
package middleware
import (
"github.com/drone/drone/queue"
"github.com/gin-gonic/gin"
)
func Queue() gin.HandlerFunc {
queue_ := queue.New()
return func(c *gin.Context) {
queue.ToContext(c, queue_)
c.Next()
}
}

View file

@ -0,0 +1,14 @@
package middleware
import (
"github.com/drone/drone/stream"
"github.com/gin-gonic/gin"
)
func Stream() gin.HandlerFunc {
stream_ := stream.New()
return func(c *gin.Context) {
stream.ToContext(c, stream_)
c.Next()
}
}

View file

@ -2,6 +2,7 @@ package router
import (
"net/http"
"os"
"strings"
"github.com/gin-gonic/gin"
@ -136,8 +137,14 @@ func Load(middleware ...gin.HandlerFunc) http.Handler {
stream.Use(session.SetRepo())
stream.Use(session.SetPerm())
stream.Use(session.MustPull)
stream.GET("/:owner/:name", web.GetRepoEvents)
stream.GET("/:owner/:name/:build/:number", web.GetStream)
if os.Getenv("CANARY") == "true" {
stream.GET("/:owner/:name", web.GetRepoEvents2)
stream.GET("/:owner/:name/:build/:number", web.GetStream2)
} else {
stream.GET("/:owner/:name", web.GetRepoEvents)
stream.GET("/:owner/:name/:build/:number", web.GetStream)
}
}
bots := e.Group("/bots")
@ -154,6 +161,14 @@ func Load(middleware ...gin.HandlerFunc) http.Handler {
auth.POST("/token", web.GetLoginToken)
}
queue := e.Group("/api/queue")
{
queue.POST("/pull", api.Pull)
queue.POST("/wait/:id", api.Wait)
queue.POST("/stream/:id", api.Stream)
queue.POST("/status/:id", api.Update)
}
gitlab := e.Group("/gitlab/:owner/:name")
{
gitlab.Use(session.SetRepo())

View file

@ -275,6 +275,40 @@ func UpdateBuild(c context.Context, build *model.Build) error {
return FromContext(c).UpdateBuild(build)
}
func UpdateBuildJob(c context.Context, build *model.Build, job *model.Job) (bool, error) {
if err := UpdateJob(c, job); err != nil {
return false, err
}
jobs, err := GetJobList(c, build)
if err != nil {
return false, err
}
// check to see if all jobs are finished for this build. If yes, we need to
// calcualte the overall build status and finish time.
status := model.StatusSuccess
finish := job.Finished
for _, job := range jobs {
if job.Finished > finish {
finish = job.Finished
}
switch job.Status {
case model.StatusSuccess:
// no-op
case model.StatusRunning, model.StatusPending:
return false, nil
default:
status = job.Status
}
}
build.Status = status
build.Finished = finish
if err := FromContext(c).UpdateBuild(build); err != nil {
return false, err
}
return true, nil
}
func GetJob(c context.Context, id int64) (*model.Job, error) {
return FromContext(c).GetJob(id)
}