mirror of
https://github.com/woodpecker-ci/woodpecker.git
synced 2025-02-17 03:45:13 +00:00
repo_test.go conflict resolution
This commit is contained in:
commit
0579df7584
19 changed files with 697 additions and 83 deletions
|
@ -1,4 +1,4 @@
|
||||||
package pool
|
package cluster
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -9,14 +9,14 @@ import (
|
||||||
// TODO (bradrydzewski) ability to cancel work.
|
// TODO (bradrydzewski) ability to cancel work.
|
||||||
// TODO (bradrydzewski) ability to remove a worker.
|
// TODO (bradrydzewski) ability to remove a worker.
|
||||||
|
|
||||||
type Pool struct {
|
type Cluster struct {
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
clients map[dockerclient.Client]bool
|
clients map[dockerclient.Client]bool
|
||||||
clientc chan dockerclient.Client
|
clientc chan dockerclient.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
func New() *Pool {
|
func New() *Cluster {
|
||||||
return &Pool{
|
return &Cluster{
|
||||||
clients: make(map[dockerclient.Client]bool),
|
clients: make(map[dockerclient.Client]bool),
|
||||||
clientc: make(chan dockerclient.Client, 999),
|
clientc: make(chan dockerclient.Client, 999),
|
||||||
}
|
}
|
||||||
|
@ -24,26 +24,26 @@ func New() *Pool {
|
||||||
|
|
||||||
// Allocate allocates a client to the pool to
|
// Allocate allocates a client to the pool to
|
||||||
// be available to accept work.
|
// be available to accept work.
|
||||||
func (p *Pool) Allocate(c dockerclient.Client) bool {
|
func (c *Cluster) Allocate(cli dockerclient.Client) bool {
|
||||||
if p.IsAllocated(c) {
|
if c.IsAllocated(cli) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
p.Lock()
|
c.Lock()
|
||||||
p.clients[c] = true
|
c.clients[cli] = true
|
||||||
p.Unlock()
|
c.Unlock()
|
||||||
|
|
||||||
p.clientc <- c
|
c.clientc <- cli
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsAllocated is a helper function that returns
|
// IsAllocated is a helper function that returns
|
||||||
// true if the client is currently allocated to
|
// true if the client is currently allocated to
|
||||||
// the Pool.
|
// the Pool.
|
||||||
func (p *Pool) IsAllocated(c dockerclient.Client) bool {
|
func (c *Cluster) IsAllocated(cli dockerclient.Client) bool {
|
||||||
p.Lock()
|
c.Lock()
|
||||||
defer p.Unlock()
|
defer c.Unlock()
|
||||||
_, ok := p.clients[c]
|
_, ok := c.clients[cli]
|
||||||
return ok
|
return ok
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -51,21 +51,21 @@ func (p *Pool) IsAllocated(c dockerclient.Client) bool {
|
||||||
// available clients. If the client is currently
|
// available clients. If the client is currently
|
||||||
// reserved and performing work it will finish,
|
// reserved and performing work it will finish,
|
||||||
// but no longer be given new work.
|
// but no longer be given new work.
|
||||||
func (p *Pool) Deallocate(c dockerclient.Client) {
|
func (c *Cluster) Deallocate(cli dockerclient.Client) {
|
||||||
p.Lock()
|
c.Lock()
|
||||||
defer p.Unlock()
|
defer c.Unlock()
|
||||||
delete(p.clients, c)
|
delete(c.clients, cli)
|
||||||
}
|
}
|
||||||
|
|
||||||
// List returns a list of all Workers currently
|
// List returns a list of all Workers currently
|
||||||
// allocated to the Pool.
|
// allocated to the Pool.
|
||||||
func (p *Pool) List() []dockerclient.Client {
|
func (c *Cluster) List() []dockerclient.Client {
|
||||||
p.Lock()
|
c.Lock()
|
||||||
defer p.Unlock()
|
defer c.Unlock()
|
||||||
|
|
||||||
var clients []dockerclient.Client
|
var clients []dockerclient.Client
|
||||||
for c := range p.clients {
|
for cli := range c.clients {
|
||||||
clients = append(clients, c)
|
clients = append(clients, cli)
|
||||||
}
|
}
|
||||||
return clients
|
return clients
|
||||||
}
|
}
|
||||||
|
@ -73,17 +73,17 @@ func (p *Pool) List() []dockerclient.Client {
|
||||||
// Reserve reserves the next available worker to
|
// Reserve reserves the next available worker to
|
||||||
// start doing work. Once work is complete, the
|
// start doing work. Once work is complete, the
|
||||||
// worker should be released back to the pool.
|
// worker should be released back to the pool.
|
||||||
func (p *Pool) Reserve() <-chan dockerclient.Client {
|
func (p *Cluster) Reserve() <-chan dockerclient.Client {
|
||||||
return p.clientc
|
return p.clientc
|
||||||
}
|
}
|
||||||
|
|
||||||
// Release releases the worker back to the pool
|
// Release releases the worker back to the pool
|
||||||
// of available workers.
|
// of available workers.
|
||||||
func (p *Pool) Release(c dockerclient.Client) bool {
|
func (c *Cluster) Release(cli dockerclient.Client) bool {
|
||||||
if !p.IsAllocated(c) {
|
if !c.IsAllocated(cli) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
p.clientc <- c
|
c.clientc <- cli
|
||||||
return true
|
return true
|
||||||
}
|
}
|
|
@ -6,7 +6,7 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
type Token struct {
|
type Token struct {
|
||||||
Kind string `json:"-"`
|
Kind string `json:"kind"`
|
||||||
Login string `json:"-"`
|
Login string `json:"-"`
|
||||||
Label string `json:"label"`
|
Label string `json:"label"`
|
||||||
Repos []string `json:"repos,omitempty"`
|
Repos []string `json:"repos,omitempty"`
|
||||||
|
|
|
@ -24,7 +24,6 @@ var (
|
||||||
bucketRepoUsers = []byte("repo_users")
|
bucketRepoUsers = []byte("repo_users")
|
||||||
bucketBuild = []byte("build")
|
bucketBuild = []byte("build")
|
||||||
bucketBuildStatus = []byte("build_status")
|
bucketBuildStatus = []byte("build_status")
|
||||||
bucketBuildTasks = []byte("build_tasks")
|
|
||||||
bucketBuildLogs = []byte("build_logs")
|
bucketBuildLogs = []byte("build_logs")
|
||||||
bucketBuildSeq = []byte("build_seq")
|
bucketBuildSeq = []byte("build_seq")
|
||||||
)
|
)
|
||||||
|
@ -51,7 +50,6 @@ func New(path string) (*DB, error) {
|
||||||
tx.CreateBucketIfNotExists(bucketRepoUsers)
|
tx.CreateBucketIfNotExists(bucketRepoUsers)
|
||||||
tx.CreateBucketIfNotExists(bucketBuild)
|
tx.CreateBucketIfNotExists(bucketBuild)
|
||||||
tx.CreateBucketIfNotExists(bucketBuildStatus)
|
tx.CreateBucketIfNotExists(bucketBuildStatus)
|
||||||
tx.CreateBucketIfNotExists(bucketBuildTasks)
|
|
||||||
tx.CreateBucketIfNotExists(bucketBuildLogs)
|
tx.CreateBucketIfNotExists(bucketBuildLogs)
|
||||||
tx.CreateBucketIfNotExists(bucketBuildSeq)
|
tx.CreateBucketIfNotExists(bucketBuildSeq)
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -153,20 +153,25 @@ func (db *DB) DelRepo(repo *common.Repo) error {
|
||||||
|
|
||||||
// deleteTracesOfRepo cleans up build leftovers when a repo is removed
|
// deleteTracesOfRepo cleans up build leftovers when a repo is removed
|
||||||
func (db *DB) deleteTracesOfRepo(t *bolt.Tx, repoKey []byte) error {
|
func (db *DB) deleteTracesOfRepo(t *bolt.Tx, repoKey []byte) error {
|
||||||
err := error(nil)
|
|
||||||
|
|
||||||
// bucketBuildSeq uses the repoKey directly
|
// bucketBuildSeq uses the repoKey directly
|
||||||
t.Bucket(bucketBuildSeq).Delete(repoKey)
|
err := t.Bucket(bucketBuildSeq).Delete(repoKey)
|
||||||
|
if err != nil {
|
||||||
|
// only error here is if our Tx is read-only
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// the other buckets use repoKey with '/buildNumber', at least.
|
// the other buckets use repoKey with '/buildNumber', at least.
|
||||||
// validating that an additiona '/' is there ensures that we don't
|
// validating that an additiona '/' is there ensures that we don't
|
||||||
// match 'github.com/drone/droney' when we're cleaning up after
|
// match 'github.com/drone/droney' when we're cleaning up after
|
||||||
// 'github.com/drone/drone'.
|
// 'github.com/drone/drone'.
|
||||||
prefix := append(repoKey, '/')
|
prefix := append(repoKey, '/')
|
||||||
deleteWithPrefix(t, bucketBuildLogs, prefix, true)
|
buckets := [][]byte{bucketBuildStatus, bucketBuildLogs, bucketBuild}
|
||||||
deleteWithPrefix(t, bucketBuildStatus, prefix, true)
|
for _, b := range buckets {
|
||||||
deleteWithPrefix(t, bucketBuildTasks, prefix, true)
|
err = deleteWithPrefix(t, b, prefix)
|
||||||
deleteWithPrefix(t, bucketBuild, prefix, true)
|
if err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,12 +14,17 @@ func TestRepo(t *testing.T) {
|
||||||
testUser := "octocat"
|
testUser := "octocat"
|
||||||
testRepo := "github.com/octopod/hq"
|
testRepo := "github.com/octopod/hq"
|
||||||
testRepo2 := "github.com/octopod/avengers"
|
testRepo2 := "github.com/octopod/avengers"
|
||||||
|
commUser := &common.User{Login: "freya"}
|
||||||
var db *DB // Temp database
|
var db *DB // Temp database
|
||||||
|
|
||||||
// create a new database before each unit
|
// create a new database before each unit test and destroy afterwards.
|
||||||
// test and destroy afterwards.
|
|
||||||
g.BeforeEach(func() {
|
g.BeforeEach(func() {
|
||||||
db = Must("/tmp/drone.test.db")
|
file, err := ioutil.TempFile(os.TempDir(), "drone-bolt")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
db = Must(file.Name())
|
||||||
})
|
})
|
||||||
g.AfterEach(func() {
|
g.AfterEach(func() {
|
||||||
os.Remove(db.Path())
|
os.Remove(db.Path())
|
||||||
|
@ -42,7 +47,7 @@ func TestRepo(t *testing.T) {
|
||||||
g.Assert(repo.FullName).Equal(testRepo)
|
g.Assert(repo.FullName).Equal(testRepo)
|
||||||
})
|
})
|
||||||
|
|
||||||
g.It("Should del Repo", func() {
|
g.It("Should be deletable", func() {
|
||||||
db.SetRepo(&common.Repo{FullName: testRepo})
|
db.SetRepo(&common.Repo{FullName: testRepo})
|
||||||
|
|
||||||
db.Repo(testRepo)
|
db.Repo(testRepo)
|
||||||
|
@ -50,6 +55,33 @@ func TestRepo(t *testing.T) {
|
||||||
g.Assert(err_).Equal(nil)
|
g.Assert(err_).Equal(nil)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
g.It("Should cleanup builds when deleted", func() {
|
||||||
|
repo := &common.Repo{FullName: testRepo}
|
||||||
|
err := db.SetRepoNotExists(commUser, repo)
|
||||||
|
g.Assert(err).Equal(nil)
|
||||||
|
|
||||||
|
db.SetBuild(testRepo, &common.Build{State: "success"})
|
||||||
|
db.SetBuild(testRepo, &common.Build{State: "success"})
|
||||||
|
db.SetBuild(testRepo, &common.Build{State: "pending"})
|
||||||
|
|
||||||
|
db.SetBuildStatus(testRepo, 1, &common.Status{Context: "success"})
|
||||||
|
db.SetBuildStatus(testRepo, 2, &common.Status{Context: "success"})
|
||||||
|
db.SetBuildStatus(testRepo, 3, &common.Status{Context: "pending"})
|
||||||
|
|
||||||
|
// first a little sanity to validate our test conditions
|
||||||
|
_, err = db.BuildLast(testRepo)
|
||||||
|
g.Assert(err).Equal(nil)
|
||||||
|
|
||||||
|
// now run our specific test suite
|
||||||
|
// 1. ensure that we can delete the repo
|
||||||
|
err = db.DelRepo(repo)
|
||||||
|
g.Assert(err).Equal(nil)
|
||||||
|
|
||||||
|
// 2. ensure that deleting the repo cleans up other references
|
||||||
|
_, err = db.Build(testRepo, 1)
|
||||||
|
g.Assert(err).Equal(ErrKeyNotFound)
|
||||||
|
})
|
||||||
|
|
||||||
g.It("Should get RepoList", func() {
|
g.It("Should get RepoList", func() {
|
||||||
db.SetRepoNotExists(&common.User{Login: testUser}, &common.Repo{FullName: testRepo})
|
db.SetRepoNotExists(&common.User{Login: testUser}, &common.Repo{FullName: testRepo})
|
||||||
db.SetRepoNotExists(&common.User{Login: testUser}, &common.Repo{FullName: testRepo2})
|
db.SetRepoNotExists(&common.User{Login: testUser}, &common.Repo{FullName: testRepo2})
|
||||||
|
|
|
@ -85,16 +85,17 @@ func splice(t *bolt.Tx, bucket, index, value []byte) error {
|
||||||
return update(t, bucket, index, &keys)
|
return update(t, bucket, index, &keys)
|
||||||
}
|
}
|
||||||
|
|
||||||
func deleteWithPrefix(t *bolt.Tx, bucket, prefix []byte, ignoreErr bool) error {
|
func deleteWithPrefix(t *bolt.Tx, bucket, prefix []byte) error {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
c := t.Bucket(bucket).Cursor()
|
c := t.Bucket(bucket).Cursor()
|
||||||
for k, _ := c.Seek(prefix); bytes.HasPrefix(k, prefix); k, _ = c.Next() {
|
for k, _ := c.Seek(prefix); bytes.HasPrefix(k, prefix); k, _ = c.Next() {
|
||||||
err = c.Delete()
|
err = c.Delete()
|
||||||
if !ignoreErr && err != nil {
|
if err != nil {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// only error here is if our Tx is read-only
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -101,11 +101,19 @@ type Datastore interface {
|
||||||
// named repository.
|
// named repository.
|
||||||
BuildLast(string) (*common.Build, error)
|
BuildLast(string) (*common.Build, error)
|
||||||
|
|
||||||
|
// BuildConf gets the build configuration file (yaml)
|
||||||
|
// for the named repository and build number.
|
||||||
|
// BuildConf(string, int) ([]byte, error)
|
||||||
|
|
||||||
// SetBuild inserts or updates a build for the named
|
// SetBuild inserts or updates a build for the named
|
||||||
// repository. The build number is incremented and
|
// repository. The build number is incremented and
|
||||||
// assigned to the provided build.
|
// assigned to the provided build.
|
||||||
SetBuild(string, *common.Build) error
|
SetBuild(string, *common.Build) error
|
||||||
|
|
||||||
|
// SetBuildConf persists the build configuration file (yaml)
|
||||||
|
// for the named repository and build number.
|
||||||
|
// SetBuildConf(string, int) ([]byte, error)
|
||||||
|
|
||||||
// Status returns the status for the given repository
|
// Status returns the status for the given repository
|
||||||
// and build number.
|
// and build number.
|
||||||
////Status(string, int, string) (*common.Status, error)
|
////Status(string, int, string) (*common.Status, error)
|
||||||
|
|
|
@ -16,7 +16,7 @@ type Opts struct {
|
||||||
Privileged bool
|
Privileged bool
|
||||||
}
|
}
|
||||||
|
|
||||||
var defaultOpts = &Opts{
|
var DefaultOpts = &Opts{
|
||||||
Volumes: false,
|
Volumes: false,
|
||||||
Network: false,
|
Network: false,
|
||||||
Privileged: false,
|
Privileged: false,
|
||||||
|
@ -26,42 +26,13 @@ var defaultOpts = &Opts{
|
||||||
// a list of build configurations for each axis
|
// a list of build configurations for each axis
|
||||||
// using the default parsing options.
|
// using the default parsing options.
|
||||||
func Parse(raw string) ([]*common.Config, error) {
|
func Parse(raw string) ([]*common.Config, error) {
|
||||||
return ParseOpts(raw, defaultOpts)
|
return ParseOpts(raw, DefaultOpts)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ParseOpts parses a build matrix and returns
|
// ParseOpts parses a build matrix and returns
|
||||||
// a list of build configurations for each axis
|
// a list of build configurations for each axis
|
||||||
// using the provided parsing options.
|
// using the provided parsing options.
|
||||||
func ParseOpts(raw string, opts *Opts) ([]*common.Config, error) {
|
func ParseOpts(raw string, opts *Opts) ([]*common.Config, error) {
|
||||||
confs, err := parse(raw)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
for _, conf := range confs {
|
|
||||||
err := Lint(conf)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
transformSetup(conf)
|
|
||||||
transformClone(conf)
|
|
||||||
transformBuild(conf)
|
|
||||||
transformImages(conf)
|
|
||||||
transformDockerPlugin(conf)
|
|
||||||
if !opts.Network {
|
|
||||||
rmNetwork(conf)
|
|
||||||
}
|
|
||||||
if !opts.Volumes {
|
|
||||||
rmVolumes(conf)
|
|
||||||
}
|
|
||||||
if !opts.Privileged {
|
|
||||||
rmPrivileged(conf)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return confs, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// helper function to parse a matrix configuraiton file.
|
|
||||||
func parse(raw string) ([]*common.Config, error) {
|
|
||||||
axis, err := matrix.Parse(raw)
|
axis, err := matrix.Parse(raw)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -71,7 +42,7 @@ func parse(raw string) ([]*common.Config, error) {
|
||||||
// when no matrix values exist we should return
|
// when no matrix values exist we should return
|
||||||
// a single config value with an empty label.
|
// a single config value with an empty label.
|
||||||
if len(axis) == 0 {
|
if len(axis) == 0 {
|
||||||
conf, err := parseYaml(raw)
|
conf, err := ParseSingle(raw, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -81,19 +52,43 @@ func parse(raw string) ([]*common.Config, error) {
|
||||||
for _, ax := range axis {
|
for _, ax := range axis {
|
||||||
// inject the matrix values into the raw script
|
// inject the matrix values into the raw script
|
||||||
injected := inject.Inject(raw, ax)
|
injected := inject.Inject(raw, ax)
|
||||||
conf, err := parseYaml(injected)
|
conf, err := ParseSingle(injected, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
conf.Axis = common.Axis(ax)
|
conf.Axis = common.Axis(ax)
|
||||||
confs = append(confs, conf)
|
confs = append(confs, conf)
|
||||||
}
|
}
|
||||||
|
|
||||||
return confs, nil
|
return confs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// helper funtion to parse a yaml configuration file.
|
// helper funtion to parse a yaml configuration file.
|
||||||
func parseYaml(raw string) (*common.Config, error) {
|
func ParseSingle(raw string, opts *Opts) (*common.Config, error) {
|
||||||
conf := &common.Config{}
|
conf := &common.Config{}
|
||||||
err := yaml.Unmarshal([]byte(raw), conf)
|
err := yaml.Unmarshal([]byte(raw), conf)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// lint the yaml file
|
||||||
|
err = Lint(conf)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// apply rules / transofms
|
||||||
|
transformSetup(conf)
|
||||||
|
transformClone(conf)
|
||||||
|
transformBuild(conf)
|
||||||
|
transformImages(conf)
|
||||||
|
transformDockerPlugin(conf)
|
||||||
|
if !opts.Network {
|
||||||
|
rmNetwork(conf)
|
||||||
|
}
|
||||||
|
if !opts.Volumes {
|
||||||
|
rmVolumes(conf)
|
||||||
|
}
|
||||||
|
if !opts.Privileged {
|
||||||
|
rmPrivileged(conf)
|
||||||
|
}
|
||||||
return conf, err
|
return conf, err
|
||||||
}
|
}
|
||||||
|
|
81
queue/builtin/queue.go
Normal file
81
queue/builtin/queue.go
Normal file
|
@ -0,0 +1,81 @@
|
||||||
|
package builtin
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/drone/drone/queue"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Queue struct {
|
||||||
|
sync.Mutex
|
||||||
|
|
||||||
|
acks map[*queue.Work]struct{}
|
||||||
|
items map[*queue.Work]struct{}
|
||||||
|
itemc chan *queue.Work
|
||||||
|
}
|
||||||
|
|
||||||
|
func New() *Queue {
|
||||||
|
return &Queue{
|
||||||
|
acks: make(map[*queue.Work]struct{}),
|
||||||
|
items: make(map[*queue.Work]struct{}),
|
||||||
|
itemc: make(chan *queue.Work, 999),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Publish inserts work at the tail of this queue, waiting for
|
||||||
|
// space to become available if the queue is full.
|
||||||
|
func (q *Queue) Publish(work *queue.Work) error {
|
||||||
|
q.Lock()
|
||||||
|
q.items[work] = struct{}{}
|
||||||
|
q.Unlock()
|
||||||
|
q.itemc <- work
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove removes the specified work item from this queue,
|
||||||
|
// if it is present.
|
||||||
|
func (q *Queue) Remove(work *queue.Work) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pull retrieves and removes the head of this queue, waiting
|
||||||
|
// if necessary until work becomes available.
|
||||||
|
func (q *Queue) Pull() *queue.Work {
|
||||||
|
work := <-q.itemc
|
||||||
|
q.Lock()
|
||||||
|
delete(q.items, work)
|
||||||
|
q.Unlock()
|
||||||
|
return work
|
||||||
|
}
|
||||||
|
|
||||||
|
// PullAct retrieves and removes the head of this queue, waiting
|
||||||
|
// if necessary until work becomes available. Items pull from the
|
||||||
|
// queue that aren't acknowledged will be pushed back to the queue
|
||||||
|
// again when the default acknowledgement deadline is reached.
|
||||||
|
func (q *Queue) PullAck() *queue.Work {
|
||||||
|
work := q.Pull()
|
||||||
|
q.Lock()
|
||||||
|
q.acks[work] = struct{}{}
|
||||||
|
q.Unlock()
|
||||||
|
return work
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ack acknowledges an item in the queue was processed.
|
||||||
|
func (q *Queue) Ack(work *queue.Work) error {
|
||||||
|
q.Lock()
|
||||||
|
delete(q.acks, work)
|
||||||
|
q.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Items returns a slice containing all of the work in this
|
||||||
|
// queue, in proper sequence.
|
||||||
|
func (q *Queue) Items() []*queue.Work {
|
||||||
|
q.Lock()
|
||||||
|
defer q.Unlock()
|
||||||
|
items := []*queue.Work{}
|
||||||
|
for work, _ := range q.items {
|
||||||
|
items = append(items, work)
|
||||||
|
}
|
||||||
|
return items
|
||||||
|
}
|
109
queue/plugin/client.go
Normal file
109
queue/plugin/client.go
Normal file
|
@ -0,0 +1,109 @@
|
||||||
|
package plugin
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/json"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
|
||||||
|
"github.com/drone/drone/queue"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Client struct {
|
||||||
|
url string
|
||||||
|
token string
|
||||||
|
}
|
||||||
|
|
||||||
|
func New(url, token string) *Client {
|
||||||
|
return &Client{url, token}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Publish makes an http request to the remote queue
|
||||||
|
// to insert work at the tail.
|
||||||
|
func (c *Client) Publish(work *queue.Work) error {
|
||||||
|
return c.send("POST", "/queue", work, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove makes an http request to the remote queue to
|
||||||
|
// remove the specified work item.
|
||||||
|
func (c *Client) Remove(work *queue.Work) error {
|
||||||
|
return c.send("DELETE", "/queue", work, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pull makes an http request to the remote queue to
|
||||||
|
// retrieve work. This initiates a long poll and will
|
||||||
|
// block until complete.
|
||||||
|
func (c *Client) Pull() *queue.Work {
|
||||||
|
out := &queue.Work{}
|
||||||
|
err := c.send("POST", "/queue/pull", nil, out)
|
||||||
|
if err != nil {
|
||||||
|
// TODO handle error
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pull makes an http request to the remote queue to
|
||||||
|
// retrieve work, with an acknowldement required.
|
||||||
|
// This initiates a long poll and will block until
|
||||||
|
// complete.
|
||||||
|
func (c *Client) PullAck() *queue.Work {
|
||||||
|
out := &queue.Work{}
|
||||||
|
err := c.send("POST", "/queue/pull?ack=true", nil, out)
|
||||||
|
if err != nil {
|
||||||
|
// TODO handle error
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ack makes an http request to the remote queue
|
||||||
|
// to acknowledge an item in the queue was processed.
|
||||||
|
func (c *Client) Ack(work *queue.Work) error {
|
||||||
|
return c.send("POST", "/queue/ack", nil, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Items makes an http request to the remote queue
|
||||||
|
// to fetch a list of all work.
|
||||||
|
func (c *Client) Items() []*queue.Work {
|
||||||
|
out := []*queue.Work{}
|
||||||
|
err := c.send("GET", "/queue/items", nil, &out)
|
||||||
|
if err != nil {
|
||||||
|
// TODO handle error
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
// send is a helper function that makes an authenticated
|
||||||
|
// request to the remote http plugin.
|
||||||
|
func (c *Client) send(method, path string, in interface{}, out interface{}) error {
|
||||||
|
url_, err := url.Parse(c.url + path)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var buf io.ReadWriter
|
||||||
|
if in != nil {
|
||||||
|
buf = new(bytes.Buffer)
|
||||||
|
err := json.NewEncoder(buf).Encode(in)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
req, err := http.NewRequest(method, url_.String(), buf)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
req.Header.Add("Authorization", "Bearer "+c.token)
|
||||||
|
req.Header.Add("Content-Type", "application/json")
|
||||||
|
|
||||||
|
resp, err := http.DefaultClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
if out == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return json.NewDecoder(resp.Body).Decode(out)
|
||||||
|
}
|
116
queue/plugin/server.go
Normal file
116
queue/plugin/server.go
Normal file
|
@ -0,0 +1,116 @@
|
||||||
|
package plugin
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/drone/drone/queue"
|
||||||
|
"github.com/gin-gonic/gin"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Handle returns an http.Handler that enables a remote
|
||||||
|
// client to interop with a Queue over http.
|
||||||
|
func Handle(queue queue.Queue, token string) http.Handler {
|
||||||
|
r := gin.New()
|
||||||
|
|
||||||
|
// middleware to validate the authorization token
|
||||||
|
// and to inject the queue into the http context.
|
||||||
|
bearer := "Bearer " + token
|
||||||
|
r.Use(func(c *gin.Context) {
|
||||||
|
if c.Request.Header.Get("Authorization") != bearer {
|
||||||
|
c.AbortWithStatus(403)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
c.Set("queue", queue)
|
||||||
|
c.Next()
|
||||||
|
})
|
||||||
|
|
||||||
|
r.POST("/queue", publish)
|
||||||
|
r.DELETE("/queue", remove)
|
||||||
|
r.POST("/queue/pull", pull)
|
||||||
|
r.POST("/queue/ack", ack)
|
||||||
|
r.POST("/queue/items", items)
|
||||||
|
|
||||||
|
return r
|
||||||
|
}
|
||||||
|
|
||||||
|
// publish handles an http request to the queue
|
||||||
|
// to insert work at the tail.
|
||||||
|
func publish(c *gin.Context) {
|
||||||
|
q := fromContext(c)
|
||||||
|
work := &queue.Work{}
|
||||||
|
if !c.Bind(work) {
|
||||||
|
c.AbortWithStatus(400)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
err := q.Publish(work)
|
||||||
|
if err != nil {
|
||||||
|
c.Fail(500, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
c.Writer.WriteHeader(200)
|
||||||
|
}
|
||||||
|
|
||||||
|
// remove handles an http request to the queue
|
||||||
|
// to remove a work item.
|
||||||
|
func remove(c *gin.Context) {
|
||||||
|
q := fromContext(c)
|
||||||
|
work := &queue.Work{}
|
||||||
|
if !c.Bind(work) {
|
||||||
|
c.AbortWithStatus(400)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
err := q.Remove(work)
|
||||||
|
if err != nil {
|
||||||
|
c.Fail(500, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
c.Writer.WriteHeader(200)
|
||||||
|
}
|
||||||
|
|
||||||
|
// pull handles an http request to the queue
|
||||||
|
// to retrieve work.
|
||||||
|
func pull(c *gin.Context) {
|
||||||
|
q := fromContext(c)
|
||||||
|
var work *queue.Work
|
||||||
|
if c.Request.FormValue("ack") != "" {
|
||||||
|
work = q.PullAck()
|
||||||
|
} else {
|
||||||
|
work = q.Pull()
|
||||||
|
}
|
||||||
|
if work == nil {
|
||||||
|
c.AbortWithStatus(500)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
c.JSON(200, work)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ack handles an http request to the queue
|
||||||
|
// to confirm an item was successfully pulled.
|
||||||
|
func ack(c *gin.Context) {
|
||||||
|
q := fromContext(c)
|
||||||
|
work := &queue.Work{}
|
||||||
|
if !c.Bind(work) {
|
||||||
|
c.AbortWithStatus(400)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
err := q.Ack(work)
|
||||||
|
if err != nil {
|
||||||
|
c.Fail(500, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
c.Writer.WriteHeader(200)
|
||||||
|
}
|
||||||
|
|
||||||
|
// items handles an http request to the queue to
|
||||||
|
// return a list of all work items.
|
||||||
|
func items(c *gin.Context) {
|
||||||
|
q := fromContext(c)
|
||||||
|
items := q.Items()
|
||||||
|
c.JSON(200, items)
|
||||||
|
}
|
||||||
|
|
||||||
|
// helper function to retrieve the Queue from
|
||||||
|
// the context and cast appropriately.
|
||||||
|
func fromContext(c *gin.Context) queue.Queue {
|
||||||
|
return c.MustGet("queue").(queue.Queue)
|
||||||
|
}
|
55
queue/queue.go
Normal file
55
queue/queue.go
Normal file
|
@ -0,0 +1,55 @@
|
||||||
|
package queue
|
||||||
|
|
||||||
|
type Queue interface {
|
||||||
|
// Publish inserts work at the tail of this queue, waiting for
|
||||||
|
// space to become available if the queue is full.
|
||||||
|
Publish(*Work) error
|
||||||
|
|
||||||
|
// Remove removes the specified work item from this queue,
|
||||||
|
// if it is present.
|
||||||
|
Remove(*Work) error
|
||||||
|
|
||||||
|
// Pull retrieves and removes the head of this queue, waiting
|
||||||
|
// if necessary until work becomes available.
|
||||||
|
Pull() *Work
|
||||||
|
|
||||||
|
// PullAck retrieves and removes the head of this queue, waiting
|
||||||
|
// if necessary until work becomes available. Items pull from the
|
||||||
|
// queue that aren't acknowledged will be pushed back to the queue
|
||||||
|
// again when the default acknowledgement deadline is reached.
|
||||||
|
PullAck() *Work
|
||||||
|
|
||||||
|
// Ack acknowledges an item in the queue was processed.
|
||||||
|
Ack(*Work) error
|
||||||
|
|
||||||
|
// Items returns a slice containing all of the work in this
|
||||||
|
// queue, in proper sequence.
|
||||||
|
Items() []*Work
|
||||||
|
}
|
||||||
|
|
||||||
|
// type Manager interface {
|
||||||
|
// // Register registers a worker that has signed
|
||||||
|
// // up to accept work.
|
||||||
|
// Register(*Worker)
|
||||||
|
|
||||||
|
// // Unregister unregisters a worker that should no
|
||||||
|
// // longer be accepting work.
|
||||||
|
// Unregister(*Worker)
|
||||||
|
|
||||||
|
// // Assign assigns work to a worker.
|
||||||
|
// Assign(*Work, *Worker)
|
||||||
|
|
||||||
|
// // Unassign unassigns work from a worker.
|
||||||
|
// Unassign(*Work, *Worker)
|
||||||
|
|
||||||
|
// // Work returns a list of all work that is
|
||||||
|
// // currently in progress.
|
||||||
|
// Work() []*Work
|
||||||
|
|
||||||
|
// // Worker retrieves a worker by name.
|
||||||
|
// Worker(string) *Worker
|
||||||
|
|
||||||
|
// // Workers returns a slice containing all workers
|
||||||
|
// // registered with the manager.
|
||||||
|
// Workers() []*Worker
|
||||||
|
// }
|
73
queue/worker.go
Normal file
73
queue/worker.go
Normal file
|
@ -0,0 +1,73 @@
|
||||||
|
package queue
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"github.com/drone/drone/common"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Work represents an item for work to be
|
||||||
|
// processed by a worker.
|
||||||
|
type Work struct {
|
||||||
|
User *common.User `json:"user"`
|
||||||
|
Repo *common.Repo `json:"repo"`
|
||||||
|
Build *common.Build `json:"build"`
|
||||||
|
Keys *common.Keypair `json:"keypair"`
|
||||||
|
Netrc *common.Netrc `json:"netrc"`
|
||||||
|
Yaml []byte `json:"yaml"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// represents a worker that has connected
|
||||||
|
// to the system in order to perform work
|
||||||
|
type Worker struct {
|
||||||
|
Name string
|
||||||
|
Addr string
|
||||||
|
IsHealthy bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ping pings to worker to verify it is
|
||||||
|
// available and in good health.
|
||||||
|
func (w *Worker) Ping() (bool, error) {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Logs fetches the logs for a work item.
|
||||||
|
func (w *Worker) Logs() (io.Reader, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cancel cancels a work item.
|
||||||
|
func (w *Worker) Cancel() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// type Monitor struct {
|
||||||
|
// manager *Manager
|
||||||
|
// }
|
||||||
|
|
||||||
|
// func NewMonitor(manager *Manager) *Monitor {
|
||||||
|
// return &Monitor{manager}
|
||||||
|
// }
|
||||||
|
|
||||||
|
// // start is a helper function that is used to monitor
|
||||||
|
// // all registered workers and ensure they are in a
|
||||||
|
// // healthy state.
|
||||||
|
// func (m *Monitor) Start() {
|
||||||
|
// ticker := time.NewTicker(1 * time.Hour)
|
||||||
|
// go func() {
|
||||||
|
// for {
|
||||||
|
// select {
|
||||||
|
// case <-ticker.C:
|
||||||
|
// workers := m.manager.Workers()
|
||||||
|
// for _, worker := range workers {
|
||||||
|
// // ping the worker to make sure it is
|
||||||
|
// // available and still accepting builds.
|
||||||
|
// if _, err := worker.Ping(); err != nil {
|
||||||
|
// m.manager.SetHealth(worker, false)
|
||||||
|
// } else {
|
||||||
|
// m.manager.SetHealth(worker, true)
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// }
|
|
@ -12,16 +12,13 @@ type Remote interface {
|
||||||
Login(token, secret string) (*common.User, error)
|
Login(token, secret string) (*common.User, error)
|
||||||
|
|
||||||
// Orgs fetches the organizations for the given user.
|
// Orgs fetches the organizations for the given user.
|
||||||
//
|
|
||||||
// TODO(bradrydzewski) consider consolidating this to return
|
|
||||||
// the list of organizations along with
|
|
||||||
// the user Login info.
|
|
||||||
Orgs(u *common.User) ([]string, error)
|
Orgs(u *common.User) ([]string, error)
|
||||||
|
|
||||||
// Repo fetches the named repository from the remote system.
|
// Repo fetches the named repository from the remote system.
|
||||||
Repo(u *common.User, owner, repo string) (*common.Repo, error)
|
Repo(u *common.User, owner, repo string) (*common.Repo, error)
|
||||||
|
|
||||||
// Perm fetches the named repository from the remote system.
|
// Perm fetches the named repository permissions from
|
||||||
|
// the remote system for the specified user.
|
||||||
Perm(u *common.User, owner, repo string) (*common.Perm, error)
|
Perm(u *common.User, owner, repo string) (*common.Perm, error)
|
||||||
|
|
||||||
// Script fetches the build script (.drone.yml) from the remote
|
// Script fetches the build script (.drone.yml) from the remote
|
||||||
|
|
|
@ -138,6 +138,11 @@ func RunBuild(c *gin.Context) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// params, _ := store.RepoParams(repo.FullName)
|
||||||
|
// if params != nil && len(params) != 0 {
|
||||||
|
// raw = []byte(inject.InjectSafe(string(raw), params))
|
||||||
|
// }
|
||||||
|
|
||||||
// TODO push build to queue
|
// TODO push build to queue
|
||||||
|
|
||||||
c.JSON(202, build)
|
c.JSON(202, build)
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
|
|
||||||
log "github.com/Sirupsen/logrus"
|
log "github.com/Sirupsen/logrus"
|
||||||
"github.com/drone/drone/common"
|
"github.com/drone/drone/common"
|
||||||
|
"github.com/drone/drone/parser/inject"
|
||||||
"github.com/drone/drone/parser/matrix"
|
"github.com/drone/drone/parser/matrix"
|
||||||
// "github.com/bradrydzewski/drone/worker"
|
// "github.com/bradrydzewski/drone/worker"
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
|
@ -72,6 +73,8 @@ func PostHook(c *gin.Context) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
params, _ := store.RepoParams(repo.FullName)
|
||||||
|
|
||||||
build := &common.Build{}
|
build := &common.Build{}
|
||||||
build.State = common.StatePending
|
build.State = common.StatePending
|
||||||
build.Commit = hook.Commit
|
build.Commit = hook.Commit
|
||||||
|
@ -84,7 +87,10 @@ func PostHook(c *gin.Context) {
|
||||||
c.Fail(404, err)
|
c.Fail(404, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
// inject any private parameters into the .drone.yml
|
||||||
|
if params != nil && len(params) != 0 {
|
||||||
|
raw = []byte(inject.InjectSafe(string(raw), params))
|
||||||
|
}
|
||||||
axes, err := matrix.Parse(string(raw))
|
axes, err := matrix.Parse(string(raw))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("failure to calculate matrix for %s. %s", repo.FullName, err)
|
log.Errorf("failure to calculate matrix for %s. %s", repo.FullName, err)
|
||||||
|
|
108
server/queue.go
Normal file
108
server/queue.go
Normal file
|
@ -0,0 +1,108 @@
|
||||||
|
package server
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
|
"github.com/gin-gonic/gin"
|
||||||
|
"github.com/gin-gonic/gin/binding"
|
||||||
|
|
||||||
|
"github.com/drone/drone/common"
|
||||||
|
"github.com/drone/drone/eventbus"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TODO (bradrydzewski) the callback URL should be signed.
|
||||||
|
// TODO (bradrydzewski) we shouldn't need to fetch the Repo if specified in the URL path
|
||||||
|
// TODO (bradrydzewski) use SetRepoLast to update the last repository
|
||||||
|
|
||||||
|
// GET /queue/pull
|
||||||
|
func PollBuild(c *gin.Context) {
|
||||||
|
queue := ToQueue(c)
|
||||||
|
work := queue.PullAck()
|
||||||
|
c.JSON(200, work)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GET /queue/push/:owner/:repo
|
||||||
|
func PushBuild(c *gin.Context) {
|
||||||
|
store := ToDatastore(c)
|
||||||
|
repo := ToRepo(c)
|
||||||
|
bus := ToBus(c)
|
||||||
|
in := &common.Build{}
|
||||||
|
if !c.BindWith(in, binding.JSON) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
build, err := store.Build(repo.FullName, in.Number)
|
||||||
|
if err != nil {
|
||||||
|
c.Fail(404, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
err = store.SetBuildState(repo.FullName, build)
|
||||||
|
if err != nil {
|
||||||
|
c.Fail(500, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
bus.Send(&eventbus.Event{
|
||||||
|
Build: build,
|
||||||
|
Repo: repo,
|
||||||
|
})
|
||||||
|
if repo.Last != nil && repo.Last.Number > build.Number {
|
||||||
|
c.Writer.WriteHeader(200)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
repo.Last = build
|
||||||
|
store.SetRepo(repo)
|
||||||
|
c.Writer.WriteHeader(200)
|
||||||
|
}
|
||||||
|
|
||||||
|
// POST /queue/push/:owner/:repo/:build
|
||||||
|
func PushTask(c *gin.Context) {
|
||||||
|
store := ToDatastore(c)
|
||||||
|
repo := ToRepo(c)
|
||||||
|
bus := ToBus(c)
|
||||||
|
num, _ := strconv.Atoi(c.Params.ByName("build"))
|
||||||
|
in := &common.Task{}
|
||||||
|
if !c.BindWith(in, binding.JSON) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
err := store.SetBuildTask(repo.FullName, num, in)
|
||||||
|
if err != nil {
|
||||||
|
c.Fail(404, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
build, err := store.Build(repo.FullName, num)
|
||||||
|
if err != nil {
|
||||||
|
c.Fail(404, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
bus.Send(&eventbus.Event{
|
||||||
|
Build: build,
|
||||||
|
Repo: repo,
|
||||||
|
})
|
||||||
|
c.Writer.WriteHeader(200)
|
||||||
|
}
|
||||||
|
|
||||||
|
// POST /queue/push/:owner/:repo/:build/:task/logs
|
||||||
|
func PushLogs(c *gin.Context) {
|
||||||
|
store := ToDatastore(c)
|
||||||
|
repo := ToRepo(c)
|
||||||
|
bnum, _ := strconv.Atoi(c.Params.ByName("build"))
|
||||||
|
tnum, _ := strconv.Atoi(c.Params.ByName("task"))
|
||||||
|
|
||||||
|
// TODO (bradrydzewski) change this interface to accept an io.Reader
|
||||||
|
// instead of a byte array so that we can buffer the write and so that
|
||||||
|
// we avoid unnecessary copies of the data in memory.
|
||||||
|
logs, err := ioutil.ReadAll(io.LimitReader(c.Request.Body, 5000000)) //5MB
|
||||||
|
defer c.Request.Body.Close()
|
||||||
|
if err != nil {
|
||||||
|
c.Fail(500, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
err = store.SetLogs(repo.FullName, bnum, tnum, logs)
|
||||||
|
if err != nil {
|
||||||
|
c.Fail(500, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
c.Writer.WriteHeader(200)
|
||||||
|
}
|
|
@ -9,11 +9,27 @@ import (
|
||||||
"github.com/drone/drone/common"
|
"github.com/drone/drone/common"
|
||||||
"github.com/drone/drone/datastore"
|
"github.com/drone/drone/datastore"
|
||||||
"github.com/drone/drone/eventbus"
|
"github.com/drone/drone/eventbus"
|
||||||
|
"github.com/drone/drone/queue"
|
||||||
"github.com/drone/drone/remote"
|
"github.com/drone/drone/remote"
|
||||||
"github.com/drone/drone/server/session"
|
"github.com/drone/drone/server/session"
|
||||||
"github.com/drone/drone/settings"
|
"github.com/drone/drone/settings"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func SetQueue(q queue.Queue) gin.HandlerFunc {
|
||||||
|
return func(c *gin.Context) {
|
||||||
|
c.Set("queue", q)
|
||||||
|
c.Next()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func ToQueue(c *gin.Context) queue.Queue {
|
||||||
|
v, err := c.Get("queue")
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return v.(queue.Queue)
|
||||||
|
}
|
||||||
|
|
||||||
func SetBus(r eventbus.Bus) gin.HandlerFunc {
|
func SetBus(r eventbus.Bus) gin.HandlerFunc {
|
||||||
return func(c *gin.Context) {
|
return func(c *gin.Context) {
|
||||||
c.Set("eventbus", r)
|
c.Set("eventbus", r)
|
||||||
|
|
|
@ -98,6 +98,15 @@
|
||||||
$scope.error = err;
|
$scope.error = err;
|
||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// var convert = new Filter({stream:true,newline:false});
|
||||||
|
// var term = document.getElementById("term")
|
||||||
|
// var stdout = document.getElementById("stdout").innerText.split("\n")
|
||||||
|
// stdout.forEach(function(line, i) {
|
||||||
|
// setTimeout(function () {
|
||||||
|
// term.innerHTML += convert.toHtml(line+"\n");
|
||||||
|
// }, i*i);
|
||||||
|
// });
|
||||||
}
|
}
|
||||||
|
|
||||||
angular
|
angular
|
||||||
|
|
Loading…
Reference in a new issue