code re-enables build agents. needs to be more thoroughly tested

This commit is contained in:
Brad Rydzewski 2015-05-18 15:47:13 -07:00
parent d55d28436b
commit 88d0cdbaf0
20 changed files with 637 additions and 303 deletions

View file

@ -1,13 +1,9 @@
package main package main
import ( import (
"bytes"
"encoding/json"
"flag" "flag"
"fmt" "fmt"
"io" "io"
"net/http"
"net/url"
"os" "os"
"time" "time"
@ -54,7 +50,7 @@ func main() {
time.Sleep(30 * time.Second) time.Sleep(30 * time.Second)
continue continue
} }
runner_ := runner.Runner{&updater{addr, token}} runner_ := runner.Runner{&updater{}}
runner_.Run(w) runner_.Run(w)
} }
}() }()
@ -66,6 +62,12 @@ func main() {
s.Run(":1999") s.Run(":1999")
} }
func pull() (*queue.Work, error) {
out := &queue.Work{}
err := send("POST", "/api/queue/pull", nil, out)
return out, err
}
// ping handler returns a simple response to the // ping handler returns a simple response to the
// caller indicating the server is running. This // caller indicating the server is running. This
// can be used for heartbeats. // can be used for heartbeats.
@ -86,6 +88,11 @@ func about(c *gin.Context) {
// stream handler is a proxy that streams the Docker // stream handler is a proxy that streams the Docker
// stdout and stderr for a running build to the caller. // stdout and stderr for a running build to the caller.
func stream(c *gin.Context) { func stream(c *gin.Context) {
if c.Request.FormValue("token") != token {
c.AbortWithStatus(401)
return
}
client, err := dockerclient.NewDockerClient(DockerHost, nil) client, err := dockerclient.NewDockerClient(DockerHost, nil)
if err != nil { if err != nil {
c.Fail(500, err) c.Fail(500, err)
@ -135,17 +142,3 @@ func stream(c *gin.Context) {
} }
io.Copy(c.Writer, rc) io.Copy(c.Writer, rc)
} }
func pull() (*queue.Work, error) {
url_, _ := url.Parse(addr)
url_.Path = "/api/queue/pull"
var body bytes.Buffer
resp, err := http.Post(url_.String(), "application/json", &body)
if err != nil {
return nil, err
}
defer resp.Body.Close()
work := &queue.Work{}
err = json.NewDecoder(resp.Body).Decode(work)
return work, err
}

View file

@ -5,71 +5,113 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"net/http" "net/http"
"net/url" "net/url"
"strconv"
"time"
//logs "github.com/Sirupsen/logrus" //logs "github.com/Sirupsen/logrus"
common "github.com/drone/drone/pkg/types" common "github.com/drone/drone/pkg/types"
) )
type updater struct { type updater struct{}
addr string
token string
}
func (u *updater) SetCommit(user *common.User, r *common.Repo, c *common.Commit) error { func (u *updater) SetCommit(user *common.User, r *common.Repo, c *common.Commit) error {
url_, err := url.Parse(addr) path := fmt.Sprintf("/api/queue/push/%s/%v", r.FullName, c.Sequence)
if err != nil { return sendBackoff("POST", path, c, nil)
return err
}
url_.Path = fmt.Sprintf("/api/queue/push/%s/%v", r.FullName, c.Sequence)
var body bytes.Buffer
json.NewEncoder(&body).Encode(c)
resp, err := http.Post(url_.String(), "application/json", &body)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
return fmt.Errorf("Error pushing task state. Code %d", resp.StatusCode)
}
return nil
} }
func (u *updater) SetBuild(r *common.Repo, c *common.Commit, b *common.Build) error { func (u *updater) SetBuild(r *common.Repo, c *common.Commit, b *common.Build) error {
url_, err := url.Parse(u.addr) path := fmt.Sprintf("/api/queue/push/%s", r.FullName)
if err != nil { return sendBackoff("POST", path, c, nil)
return err
}
url_.Path = fmt.Sprintf("/api/queue/push/%s", r.FullName)
var body bytes.Buffer
json.NewEncoder(&body).Encode(b)
resp, err := http.Post(url_.String(), "application/json", &body)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
return fmt.Errorf("Error pushing build state. Code %d", resp.StatusCode)
}
return nil
} }
func (u *updater) SetLogs(r *common.Repo, c *common.Commit, b *common.Build, rc io.ReadCloser) error { func (u *updater) SetLogs(r *common.Repo, c *common.Commit, b *common.Build, rc io.ReadCloser) error {
url_, err := url.Parse(u.addr) path := fmt.Sprintf("/api/queue/push/%s/%v/%v", r.FullName, c.Sequence, b.Sequence)
return sendBackoff("POST", path, rc, nil)
}
func sendBackoff(method, path string, in, out interface{}) error {
var err error
var attempts int
for {
err = send(method, path, in, out)
if err == nil {
break
}
if attempts > 30 {
break
}
attempts++
time.Sleep(time.Second * 30)
}
return err
}
// do makes an http.Request and returns the response
func send(method, path string, in, out interface{}) error {
// create the URI
uri, err := url.Parse(addr + path)
if err != nil { if err != nil {
return err return err
} }
url_.Path = fmt.Sprintf("/api/queue/push/%s/%v/%v/logs", r.FullName, c.Sequence, b.Sequence) if len(uri.Scheme) == 0 {
resp, err := http.Post(url_.String(), "application/json", rc) uri.Scheme = "http"
}
params := uri.Query()
params.Add("token", token)
uri.RawQuery = params.Encode()
// create the request
req, err := http.NewRequest(method, uri.String(), nil)
if err != nil {
return err
}
req.ProtoAtLeast(1, 1)
req.Close = true
req.ContentLength = 0
// If the data is a readCloser we can attach directly
// to the request body.
//
// Else we serialize the data input as JSON.
if rc, ok := in.(io.ReadCloser); ok {
req.Body = rc
} else if in != nil {
inJson, err := json.Marshal(in)
if err != nil {
return err
}
buf := bytes.NewBuffer(inJson)
req.Body = ioutil.NopCloser(buf)
req.ContentLength = int64(len(inJson))
req.Header.Set("Content-Length", strconv.Itoa(len(inJson)))
req.Header.Set("Content-Type", "application/json")
}
// make the request using the default http client
resp, err := http.DefaultClient.Do(req)
if err != nil { if err != nil {
return err return err
} }
defer resp.Body.Close() defer resp.Body.Close()
if resp.StatusCode != 200 {
return fmt.Errorf("Error pushing build logs. Code %d", resp.StatusCode) // Check for an http error status (ie not 200 StatusOK)
if resp.StatusCode > 300 {
return fmt.Errorf(resp.Status)
} }
// Decode the JSON response
if out != nil {
return json.NewDecoder(resp.Body).Decode(out)
}
return nil return nil
} }

View file

@ -89,7 +89,9 @@ func main() {
agents := api.Group("/agents") agents := api.Group("/agents")
{ {
agents.Use(server.MustAdmin()) agents.Use(server.MustAdmin())
agents.GET("/token", server.GetAgentToken) agents.GET("", server.GetAgents)
agents.POST("", server.PostAgent)
agents.DELETE("/:id", server.DeleteAgent)
} }
repos := api.Group("/repos/:owner/:name") repos := api.Group("/repos/:owner/:name")
@ -131,20 +133,20 @@ func main() {
hooks.POST("", server.PostHook) hooks.POST("", server.PostHook)
} }
// queue := api.Group("/queue") queue := api.Group("/queue")
// { {
// queue.Use(server.MustAgent()) queue.Use(server.MustAgent())
// queue.GET("", server.GetQueue) queue.Use(server.SetUpdater(updater))
// queue.POST("/pull", server.PollBuild) queue.POST("/pull", server.PollBuild)
// push := queue.Group("/push/:owner/:name") push := queue.Group("/push/:owner/:name")
// { {
// push.Use(server.SetRepo()) push.Use(server.SetRepo())
// push.POST("", server.PushBuild) push.POST("", server.PushCommit)
// push.POST("/:build", server.PushTask) push.POST("/:commit", server.PushBuild)
// push.POST("/:build/:task/logs", server.PushLogs) push.POST("/:commit/:build/logs", server.PushLogs)
// } }
// } }
stream := api.Group("/stream") stream := api.Group("/stream")
{ {

View file

@ -1,19 +1,34 @@
(function () { (function () {
function AgentsCtrl($scope, $window, users, agents) { function AgentsCtrl($scope, $window, users, agents) {
// this is the address that agents should connect with.
$scope.addr = $window.location.origin;
// this is the address that agents should connect with. // Gets the currently authenticated user
$scope.addr = $window.location.origin;
// Gets the currently authenticated user
users.getCached().then(function(payload){ users.getCached().then(function(payload){
$scope.user = payload.data; $scope.user = payload.data;
}); });
// Generages a remote token. // Generages a remote agents.
agents.getToken().then(function(payload){ agents.getAgents().then(function(payload){
$scope.token = payload.data; $scope.agents = payload.data;
}); });
$scope.onDelete = function(agent) {
console.log("delete agent", agent)
agents.deleteAgent(agent).then(function(payload){
var index = $scope.agents.indexOf(agent);
$scope.agents.splice(index, 1);
});
}
$scope.newAgent={address: ""};
$scope.onAdd = function(agent) {
agents.postAgent(agent).then(function(payload){
$scope.agents.push(payload.data);
$scope.newAgent={address: ""};
});
}
} }
angular angular

View file

@ -9,11 +9,20 @@
function AgentService($http) { function AgentService($http) {
/** /**
* Gets an agent token. * Gets an agent list.
*/ */
this.getToken = function() { this.getAgents = function() {
return $http.get('/api/agents/token'); return $http.get('/api/agents');
}; };
this.deleteAgent = function(agent) {
return $http.delete('/api/agents/'+agent.id);
};
this.postAgent = function(agent) {
return $http.post('/api/agents', agent);
};
} }
angular angular

View file

@ -2,10 +2,26 @@
<a href="/">Back</a> <a href="/">Back</a>
<dl>
<dt>Token</dt> <input type="text" ng-model="newAgent.address" />
<dd>{{ token }}</dd> <button ng-click="onAdd(newAgent)">Add</button>
</dl>
<table border="1">
<thead>
<tr>
<th>Address</th>
<th>Token</th>
<th></th>
</tr>
</thead>
<tbody>
<tr ng-repeat="agent in agents | orderBy:'-address'">
<td>{{ agent.address }}</td>
<td>{{ agent.token }}</td>
<td><button ng-click="onDelete(agent)">Delete</button>
</tr>
</tbody>
</table>
<pre> <pre>
docker run -d drone/drone-agent --addr={{ addr }} --token={{ token }} docker run -d drone/drone-agent --addr={{ addr }} --token={{ token }}

1
doc/setup-nginx.md Normal file
View file

@ -0,0 +1 @@
setup-nginx.md

View file

@ -1,20 +1,94 @@
package server package server
import ( import (
"strconv"
"github.com/drone/drone/pkg/types" "github.com/drone/drone/pkg/types"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/gin-gonic/gin/binding"
common "github.com/drone/drone/pkg/types"
) )
// GET /api/agents/token // GetAgents accepts a request to retrieve all build
func GetAgentToken(c *gin.Context) { // agents from the datastore and return encoded in JSON
sess := ToSession(c) // format.
token := &types.Token{} //
token.Kind = types.TokenAgent // GET /api/agents
token.Label = "drone-agent" //
tokenstr, err := sess.GenerateToken(token) func GetAgents(c *gin.Context) {
store := ToDatastore(c)
agents, err := store.AgentList()
if err != nil { if err != nil {
c.Fail(500, err) c.Fail(400, err)
} else { } else {
c.JSON(200, tokenstr) c.JSON(200, agents)
} }
} }
// PostAgent accepts a request to register a new build
// agent with the system. The registered agent is returned
// from the datastore and return encoded in JSON format.
//
// POST /api/agents
//
func PostAgent(c *gin.Context) {
store := ToDatastore(c)
in := &common.Agent{}
if !c.BindWith(in, binding.JSON) {
return
}
// attept to fetch the agent from the
// datastore. If the agent already exists we
// should re-activate
agent, err := store.AgentAddr(in.Addr)
if err != nil {
agent = &common.Agent{}
agent.Addr = in.Addr
agent.Token = types.GenerateToken()
agent.Active = true
agent.IsHealthy = true
err = store.AddAgent(agent)
if err != nil {
c.Fail(400, err)
} else {
c.JSON(200, agent)
}
return
}
agent.Active = true
err = store.SetAgent(agent)
if err != nil {
c.Fail(400, err)
} else {
c.JSON(200, agent)
}
}
// DeleteAgent accepts a request to delete a build agent
// from the system.
//
// DELETE /api/agents/:id
//
func DeleteAgent(c *gin.Context) {
store := ToDatastore(c)
idstr := c.Params.ByName("id")
id, _ := strconv.Atoi(idstr)
agent, err := store.Agent(int64(id))
if err != nil {
c.Fail(404, err)
return
}
agent.Active = false
err = store.SetAgent(agent)
if err != nil {
c.Fail(400, err)
return
}
c.Writer.WriteHeader(200)
}

View file

@ -1,182 +1,144 @@
package server package server
// import ( import (
// "encoding/json" "strconv"
// "io"
// "net"
// "strconv"
// log "github.com/Sirupsen/logrus" "github.com/gin-gonic/gin"
// "github.com/gin-gonic/gin" "github.com/gin-gonic/gin/binding"
// "github.com/gin-gonic/gin/binding"
// "github.com/drone/drone/common" log "github.com/Sirupsen/logrus"
// "github.com/drone/drone/eventbus" common "github.com/drone/drone/pkg/types"
// ) )
// // TODO (bradrydzewski) the callback URL should be signed. // GET /queue/pull
// // TODO (bradrydzewski) we shouldn't need to fetch the Repo if specified in the URL path func PollBuild(c *gin.Context) {
// // TODO (bradrydzewski) use SetRepoLast to update the last repository queue := ToQueue(c)
store := ToDatastore(c)
agent := ToAgent(c)
// // GET /queue/pull log.Infof("agent connected and polling builds at %s", agent.Addr)
// func PollBuild(c *gin.Context) {
// queue := ToQueue(c)
// store := ToDatastore(c)
// agent := &common.Agent{
// Addr: c.Request.RemoteAddr,
// }
// // extact the host port and name and // pull an item from the queue
// // replace with the default agent port (1999) work := queue.PullClose(c.Writer)
// host, _, err := net.SplitHostPort(agent.Addr) if work == nil {
// if err == nil { c.AbortWithStatus(500)
// agent.Addr = host return
// } }
// agent.Addr = net.JoinHostPort(agent.Addr, "1999")
// log.Infof("agent connected and polling builds at %s", agent.Addr) // store the agent details with the commit
work.Commit.AgentID = agent.ID
err := store.SetCommit(work.Commit)
if err != nil {
log.Errorf("unable to associate agent with commit. %s", err)
// IMPORTANT: this should never happen, and even if it does
// it is an error scenario that will only impact live streaming
// so we choose it log and ignore.
}
// work := queue.PullClose(c.Writer) c.JSON(200, work)
// if work == nil {
// c.AbortWithStatus(500)
// return
// }
// // TODO (bradrydzewski) decide how we want to handle a failure here // acknowledge work received by the client
// // still not sure exact behavior we want ... queue.Ack(work)
// err = store.SetBuildAgent(work.Repo.FullName, work.Build.Number, agent) }
// if err != nil {
// log.Errorf("error persisting build agent. %s", err)
// }
// c.JSON(200, work) // POST /queue/push/:owner/:repo
func PushCommit(c *gin.Context) {
store := ToDatastore(c)
repo := ToRepo(c)
// // acknowledge work received by the client in := &common.Commit{}
// queue.Ack(work) if !c.BindWith(in, binding.JSON) {
// } return
}
user, err := store.User(repo.UserID)
if err != nil {
c.Fail(404, err)
return
}
commit, err := store.CommitSeq(repo, in.Sequence)
if err != nil {
c.Fail(404, err)
return
}
// // GET /queue/push/:owner/:repo commit.Started = in.Started
// func PushBuild(c *gin.Context) { commit.Finished = in.Finished
// store := ToDatastore(c) commit.State = in.State
// repo := ToRepo(c)
// bus := ToBus(c)
// in := &common.Build{}
// if !c.BindWith(in, binding.JSON) {
// return
// }
// build, err := store.Build(repo.FullName, in.Number)
// if err != nil {
// c.Fail(404, err)
// return
// }
// if in.State != common.StatePending && in.State != common.StateRunning { updater := ToUpdater(c)
// store.DelBuildAgent(repo.FullName, build.Number) err = updater.SetCommit(user, repo, commit)
// } if err != nil {
c.Fail(500, err)
return
}
c.Writer.WriteHeader(200)
}
// build.Duration = in.Duration // POST /queue/push/:owner/:repo/:commit
// build.Started = in.Started func PushBuild(c *gin.Context) {
// build.Finished = in.Finished store := ToDatastore(c)
// build.State = in.State repo := ToRepo(c)
// err = store.SetBuildState(repo.FullName, build) cnum, _ := strconv.Atoi(c.Params.ByName("commit"))
// if err != nil {
// c.Fail(500, err)
// return
// }
// if build.State != common.StatePending && build.State != common.StateRunning { in := &common.Build{}
// if repo.Last == nil || build.Number >= repo.Last.Number { if !c.BindWith(in, binding.JSON) {
// repo.Last = build return
// store.SetRepo(repo) }
// }
// }
// // <-- FIXME commit, err := store.CommitSeq(repo, cnum)
// // for some reason the Repo and Build fail to marshal to JSON. if err != nil {
// // It has something to do with memory / pointers. So it goes away c.Fail(404, err)
// // if I just refetch these items. Needs to be fixed in the future, return
// // but for now should be ok }
// repo, err = store.Repo(repo.FullName) build, err := store.BuildSeq(commit, in.Sequence)
// if err != nil { if err != nil {
// c.Fail(500, err) c.Fail(404, err)
// return return
// } }
// build, err = store.Build(repo.FullName, in.Number)
// if err != nil {
// c.Fail(404, err)
// return
// }
// // END FIXME -->
// msg, err := json.Marshal(build) build.Duration = in.Duration
// if err == nil { build.Started = in.Started
// c.String(200, err.Error()) // we can ignore this error build.Finished = in.Finished
// return build.ExitCode = in.ExitCode
// } build.State = in.State
// bus.Send(&eventbus.Event{ updater := ToUpdater(c)
// Name: repo.FullName, err = updater.SetBuild(repo, commit, build)
// Kind: eventbus.EventRepo, if err != nil {
// Msg: msg, c.Fail(500, err)
// }) return
}
c.Writer.WriteHeader(200)
}
// c.Writer.WriteHeader(200) // POST /queue/push/:owner/:repo/:comimt/:build/logs
// } func PushLogs(c *gin.Context) {
store := ToDatastore(c)
repo := ToRepo(c)
cnum, _ := strconv.Atoi(c.Params.ByName("commit"))
bnum, _ := strconv.Atoi(c.Params.ByName("build"))
// // POST /queue/push/:owner/:repo/:build commit, err := store.CommitSeq(repo, cnum)
// func PushTask(c *gin.Context) { if err != nil {
// store := ToDatastore(c) c.Fail(404, err)
// repo := ToRepo(c) return
// bus := ToBus(c) }
// num, _ := strconv.Atoi(c.Params.ByName("build")) build, err := store.BuildSeq(commit, bnum)
// in := &common.Task{} if err != nil {
// if !c.BindWith(in, binding.JSON) { c.Fail(404, err)
// return return
// } }
// err := store.SetBuildTask(repo.FullName, num, in) updater := ToUpdater(c)
// if err != nil { err = updater.SetLogs(repo, commit, build, c.Request.Body)
// c.Fail(404, err) if err != nil {
// return c.Fail(500, err)
// } return
// build, err := store.Build(repo.FullName, num) }
// if err != nil { c.Writer.WriteHeader(200)
// c.Fail(404, err) }
// return
// }
// msg, err := json.Marshal(build) func GetQueue(c *gin.Context) {
// if err == nil { queue := ToQueue(c)
// c.String(200, err.Error()) // we can ignore this error items := queue.Items()
// return c.JSON(200, items)
// } }
// bus.Send(&eventbus.Event{
// Name: repo.FullName,
// Kind: eventbus.EventRepo,
// Msg: msg,
// })
// c.Writer.WriteHeader(200)
// }
// // POST /queue/push/:owner/:repo/:build/:task/logs
// func PushLogs(c *gin.Context) {
// store := ToDatastore(c)
// repo := ToRepo(c)
// bnum, _ := strconv.Atoi(c.Params.ByName("build"))
// tnum, _ := strconv.Atoi(c.Params.ByName("task"))
// const maxBuffToRead int64 = 5000000 // 5MB.
// err := store.SetLogs(repo.FullName, bnum, tnum, io.LimitReader(c.Request.Body, maxBuffToRead))
// if err != nil {
// c.Fail(500, err)
// return
// }
// c.Writer.WriteHeader(200)
// }
// func GetQueue(c *gin.Context) {
// queue := ToQueue(c)
// items := queue.Items()
// c.JSON(200, items)
// }

View file

@ -76,6 +76,21 @@ func SetRunner(r runner.Runner) gin.HandlerFunc {
} }
} }
func ToUpdater(c *gin.Context) runner.Updater {
v, ok := c.Get("updater")
if !ok {
return nil
}
return v.(runner.Updater)
}
func SetUpdater(u runner.Updater) gin.HandlerFunc {
return func(c *gin.Context) {
c.Set("updater", u)
c.Next()
}
}
func ToSettings(c *gin.Context) *settings.Settings { func ToSettings(c *gin.Context) *settings.Settings {
v, ok := c.Get("settings") v, ok := c.Get("settings")
if !ok { if !ok {
@ -115,6 +130,14 @@ func ToRepo(c *gin.Context) *common.Repo {
return v.(*common.Repo) return v.(*common.Repo)
} }
func ToAgent(c *gin.Context) *common.Agent {
v, ok := c.Get("agent")
if !ok {
return nil
}
return v.(*common.Agent)
}
func ToDatastore(c *gin.Context) store.Store { func ToDatastore(c *gin.Context) store.Store {
return c.MustGet("datastore").(store.Store) return c.MustGet("datastore").(store.Store)
} }
@ -231,15 +254,22 @@ func MustAdmin() gin.HandlerFunc {
func MustAgent() gin.HandlerFunc { func MustAgent() gin.HandlerFunc {
return func(c *gin.Context) { return func(c *gin.Context) {
sess := ToSession(c) store := ToDatastore(c)
token := sess.GetLogin(c.Request) token := c.Request.FormValue("token")
if token == nil { if len(token) == 0 {
c.AbortWithStatus(401) c.AbortWithStatus(401)
return return
} else if token.Kind != common.TokenAgent { }
c.AbortWithStatus(500) agent, err := store.AgentToken(token)
if err != nil {
c.Fail(401, err)
return return
} }
if agent.Active == false {
c.AbortWithStatus(403)
return
}
c.Set("agent", agent)
c.Next() c.Next()
} }
} }

View file

@ -2,6 +2,7 @@ package server
import ( import (
"io" "io"
"net/http"
"strconv" "strconv"
"github.com/drone/drone/pkg/bus" "github.com/drone/drone/pkg/bus"
@ -68,11 +69,38 @@ func GetStream(c *gin.Context) {
return return
} }
rc, err := runner.Logs(build) var rc io.ReadCloser
if err != nil {
c.Fail(404, err) // if no agent is assigned to the build we
return // should stream the local logs
if commit.AgentID == 0 {
rc, err = runner.Logs(build)
if err != nil {
c.Fail(404, err)
return
}
} else {
agent, err := store.Agent(commit.AgentID)
if err != nil {
c.Fail(404, err)
return
}
resp, err := http.Get("http://" + agent.Addr)
if err != nil {
c.Fail(500, err)
return
} else if resp.StatusCode != 200 {
resp.Body.Close()
c.AbortWithStatus(resp.StatusCode)
return
}
rc = resp.Body
} }
defer func() {
rc.Close()
}()
go func() { go func() {
<-c.Writer.CloseNotify() <-c.Writer.CloseNotify()
rc.Close() rc.Close()
@ -80,11 +108,6 @@ func GetStream(c *gin.Context) {
rw := &StreamWriter{c.Writer, 0} rw := &StreamWriter{c.Writer, 0}
defer func() {
log.Infof("closed log stream")
rc.Close()
}()
docker.StdCopy(rw, rw, rc) docker.StdCopy(rw, rw, rc)
} }

View file

@ -1 +1,77 @@
package builtin package builtin
import (
"database/sql"
common "github.com/drone/drone/pkg/types"
"github.com/russross/meddler"
)
type Agentstore struct {
*sql.DB
}
func NewAgentstore(db *sql.DB) *Agentstore {
return &Agentstore{db}
}
// Agent returns an agent by ID.
func (db *Agentstore) Agent(id int64) (*common.Agent, error) {
var agent = new(common.Agent)
var err = meddler.Load(db, agentTable, agent, id)
return agent, err
}
// AgentAddr returns an agent by address.
func (db *Agentstore) AgentAddr(addr string) (*common.Agent, error) {
var agent = new(common.Agent)
var err = meddler.QueryRow(db, agent, rebind(agentAddrQuery), addr)
return agent, err
}
// AgentToken returns an agent by token.
func (db *Agentstore) AgentToken(token string) (*common.Agent, error) {
var agent = new(common.Agent)
var err = meddler.QueryRow(db, agent, rebind(agentTokenQuery), token)
return agent, err
}
// AgentList returns a list of all build agents.
func (db *Agentstore) AgentList() ([]*common.Agent, error) {
var agents []*common.Agent
var err = meddler.QueryAll(db, &agents, rebind(agentListQuery), true)
return agents, err
}
// AddAgent inserts an agent in the datastore.
func (db *Agentstore) AddAgent(agent *common.Agent) error {
return meddler.Insert(db, agentTable, agent)
}
// SetAgent updates an agent in the datastore.
func (db *Agentstore) SetAgent(agent *common.Agent) error {
return meddler.Update(db, agentTable, agent)
}
// Agent table name in database.
const agentTable = "agents"
const agentTokenQuery = `
SELECT *
FROM agents
WHERE agent_token = ?
LIMIT 1;
`
const agentAddrQuery = `
SELECT *
FROM agents
WHERE agent_addr = ?
LIMIT 1;
`
const agentListQuery = `
SELECT *
FROM agents
WHERE agent_active = ?;
`

View file

@ -24,6 +24,7 @@ func Setup(tx migration.LimitedTx) error {
statusTable, statusTable,
statusCommitIndex, statusCommitIndex,
blobTable, blobTable,
agentTable,
} }
for _, stmt := range stmts { for _, stmt := range stmts {
_, err := tx.Exec(transform(stmt)) _, err := tx.Exec(transform(stmt))
@ -124,6 +125,7 @@ var commitTable = `
CREATE TABLE IF NOT EXISTS commits ( CREATE TABLE IF NOT EXISTS commits (
commit_id INTEGER PRIMARY KEY AUTOINCREMENT commit_id INTEGER PRIMARY KEY AUTOINCREMENT
,repo_id INTEGER ,repo_id INTEGER
,agent_id INTEGER
,commit_seq INTEGER ,commit_seq INTEGER
,commit_state VARCHAR(255) ,commit_state VARCHAR(255)
,commit_started INTEGER ,commit_started INTEGER
@ -150,6 +152,10 @@ var commitRepoIndex = `
CREATE INDEX commits_repo_idx ON commits (repo_id); CREATE INDEX commits_repo_idx ON commits (repo_id);
` `
var agentRepoIndex = `
CREATE INDEX commits_agent_idx ON commits (agent_id);
`
var tokenTable = ` var tokenTable = `
CREATE TABLE IF NOT EXISTS tokens ( CREATE TABLE IF NOT EXISTS tokens (
token_id INTEGER PRIMARY KEY AUTOINCREMENT token_id INTEGER PRIMARY KEY AUTOINCREMENT
@ -212,3 +218,17 @@ CREATE TABLE IF NOT EXISTS blobs (
,UNIQUE(blob_path) ,UNIQUE(blob_path)
); );
` `
var agentTable = `
CREATE TABLE IF NOT EXISTS agents (
agent_id INTEGER PRIMARY KEY AUTOINCREMENT
,agent_kind VARCHAR(255)
,agent_addr VARCHAR(2000)
,agent_token VARCHAR(2000)
,agent_active BOOL
,agent_cert BLOB
,agent_key BLOB
,UNIQUE(agent_addr)
,UNIQUE(agent_token)
);
`

View file

@ -40,22 +40,6 @@ func (db *Repostore) RepoList(user *common.User) ([]*common.Repo, error) {
return repos, err return repos, err
} }
// // RepoKeys retrieves a set of repository keys from
// // the datastore for the specified name.
// func (db *Repostore) RepoKeypair(repo *common.Repo) (*common.Keypair, error) {
// var keypair = new(common.Keypair)
// var err = meddler.QueryRow(db, keypair, rebind(repoKeysQuery), repo.ID)
// return keypair, err
// }
// // RepoParams retrieves a set of repository params from
// // the datastore for the specified name.
// func (db *Repostore) RepoParams(repo *common.Repo) (*common.Params, error) {
// var params = new(common.Params)
// var err = meddler.QueryRow(db, params, rebind(repoParamsQuery), repo.ID)
// return params, err
// }
// AddRepo inserts a repo in the datastore. // AddRepo inserts a repo in the datastore.
func (db *Repostore) AddRepo(repo *common.Repo) error { func (db *Repostore) AddRepo(repo *common.Repo) error {
repo.Created = time.Now().UTC().Unix() repo.Created = time.Now().UTC().Unix()
@ -69,16 +53,6 @@ func (db *Repostore) SetRepo(repo *common.Repo) error {
return meddler.Update(db, repoTable, repo) return meddler.Update(db, repoTable, repo)
} }
// // SetRepoKeypair upserts a keypair in the datastore.
// func (db *Repostore) SetRepoKeypair(keys *common.Keypair) error {
// return meddler.Save(db, repoKeyTable, keys)
// }
// // SetRepoKeypair upserts a param set in the datastore.
// func (db *Repostore) SetRepoParams(params *common.Params) error {
// return meddler.Save(db, repoParamTable, params)
// }
// DelRepo removes the repo from the datastore. // DelRepo removes the repo from the datastore.
func (db *Repostore) DelRepo(repo *common.Repo) error { func (db *Repostore) DelRepo(repo *common.Repo) error {
var _, err = db.Exec(rebind(repoDeleteStmt), repo.ID) var _, err = db.Exec(rebind(repoDeleteStmt), repo.ID)

View file

@ -80,6 +80,7 @@ func New(db *sql.DB) store.Store {
*Blobstore *Blobstore
*Starstore *Starstore
*Tokenstore *Tokenstore
*Agentstore
}{ }{
NewUserstore(db), NewUserstore(db),
NewRepostore(db), NewRepostore(db),
@ -88,5 +89,6 @@ func New(db *sql.DB) store.Store {
NewBlobstore(db), NewBlobstore(db),
NewStarstore(db), NewStarstore(db),
NewTokenstore(db), NewTokenstore(db),
NewAgentstore(db),
} }
} }

View file

@ -34,6 +34,8 @@ type Store interface {
// DelUser removes the user from the datastore. // DelUser removes the user from the datastore.
DelUser(*common.User) error DelUser(*common.User) error
//
// Token returns a token by ID. // Token returns a token by ID.
Token(int64) (*common.Token, error) Token(int64) (*common.Token, error)
@ -146,4 +148,24 @@ type Store interface {
// Del removes an object from the blobstore. // Del removes an object from the blobstore.
DelBlob(path string) error DelBlob(path string) error
//
// Agent returns an agent by ID.
Agent(int64) (*common.Agent, error)
// AgentAddr returns an agent by address.
AgentAddr(string) (*common.Agent, error)
// AgentToken returns an agent by token.
AgentToken(string) (*common.Agent, error)
// AgentList returns a list of all build agents.
AgentList() ([]*common.Agent, error)
// AddAgent inserts an agent in the datastore.
AddAgent(*common.Agent) error
// SetAgent updates an agent in the datastore.
SetAgent(*common.Agent) error
} }

View file

@ -3,7 +3,12 @@ package types
// Agent represents a worker that has connected // Agent represents a worker that has connected
// to the system in order to perform work // to the system in order to perform work
type Agent struct { type Agent struct {
Name string `json:"name"` ID int64 `meddler:"agent_id,pk" json:"id,omitempty"`
Addr string `json:"addr"` Kind string `meddler:"agent_kind" json:"kind,omitempty"`
IsHealthy bool `json:"is_healthy"` Addr string `meddler:"agent_addr" json:"address"`
Token string `meddler:"agent_token" json:"token"`
Cert string `meddler:"agent_cert" json:"-"`
Key string `meddler:"agent_key" json:"-"`
Active bool `meddler:"agent_active" json:"is_active"`
IsHealthy bool `meddler:"-" json:"is_healthy,omitempty"`
} }

View file

@ -12,6 +12,7 @@ const (
type Commit struct { type Commit struct {
ID int64 `meddler:"commit_id,pk" json:"-"` ID int64 `meddler:"commit_id,pk" json:"-"`
RepoID int64 `meddler:"repo_id" json:"-"` RepoID int64 `meddler:"repo_id" json:"-"`
AgentID int64 `meddler:"agent_id" json:"-"`
Sequence int `meddler:"commit_seq" json:"sequence"` Sequence int `meddler:"commit_seq" json:"sequence"`
State string `meddler:"commit_state" json:"state"` State string `meddler:"commit_state" json:"state"`
Started int64 `meddler:"commit_started" json:"started_at"` Started int64 `meddler:"commit_started" json:"started_at"`

48
pkg/types/util.go Normal file
View file

@ -0,0 +1,48 @@
package types
import (
"crypto/md5"
"crypto/rand"
"fmt"
"io"
"strings"
)
// standard characters allowed in token string.
var chars = []byte("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789")
// default token length
var length = 40
// GenerateToken generates random strings good for use in URIs to
// identify unique objects.
func GenerateToken() string {
b := make([]byte, length)
r := make([]byte, length+(length/4)) // storage for random bytes.
clen := byte(len(chars))
maxrb := byte(256 - (256 % len(chars)))
i := 0
for {
io.ReadFull(rand.Reader, r)
for _, c := range r {
if c >= maxrb {
// Skip this number to avoid modulo bias.
continue
}
b[i] = chars[c%clen]
i++
if i == length {
return string(b)
}
}
}
}
// helper function to create a Gravatar Hash
// for the given Email address.
func CreateGravatar(email string) string {
email = strings.ToLower(strings.TrimSpace(email))
hash := md5.New()
hash.Write([]byte(email))
return fmt.Sprintf("%x", hash.Sum(nil))
}

19
pkg/types/util_test.go Normal file
View file

@ -0,0 +1,19 @@
package types
import (
"testing"
)
func Test_CreateGravatar(t *testing.T) {
var got, want = CreateGravatar("dr_cooper@caltech.edu"), "2b77ba83e2216ddcd11fe8c24b70c2a3"
if got != want {
t.Errorf("Got gravatar hash %s, want %s", got, want)
}
}
func Test_GenerateToken(t *testing.T) {
token := GenerateToken()
if len(token) != length {
t.Errorf("Want token length %d, got %d", length, len(token))
}
}