woodpecker/server/rpc.go

737 lines
18 KiB
Go
Raw Normal View History

2018-02-19 22:24:10 +00:00
// Copyright 2018 Drone.IO Inc.
2018-03-21 13:02:17 +00:00
//
2018-02-19 22:24:10 +00:00
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
2018-03-21 13:02:17 +00:00
//
2018-02-19 22:24:10 +00:00
// http://www.apache.org/licenses/LICENSE-2.0
2018-03-21 13:02:17 +00:00
//
2018-02-19 22:24:10 +00:00
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2017-03-05 07:56:08 +00:00
package server
import (
"bytes"
"context"
"encoding/json"
2017-04-12 17:56:30 +00:00
"fmt"
2017-03-05 07:56:08 +00:00
"log"
"strconv"
"time"
2017-03-05 07:56:08 +00:00
2017-06-28 17:21:22 +00:00
oldcontext "golang.org/x/net/context"
"google.golang.org/grpc/metadata"
"github.com/Sirupsen/logrus"
2019-04-06 13:44:04 +00:00
"github.com/laszlocph/drone-oss-08/cncd/logging"
"github.com/laszlocph/drone-oss-08/cncd/pipeline/pipeline/rpc"
"github.com/laszlocph/drone-oss-08/cncd/pipeline/pipeline/rpc/proto"
"github.com/laszlocph/drone-oss-08/cncd/pubsub"
"github.com/laszlocph/drone-oss-08/cncd/queue"
2017-03-05 07:56:08 +00:00
2019-04-04 18:51:20 +00:00
"github.com/laszlocph/drone-oss-08/model"
"github.com/laszlocph/drone-oss-08/remote"
"github.com/laszlocph/drone-oss-08/store"
"github.com/drone/expr"
2017-03-05 07:56:08 +00:00
)
// This file is a complete disaster because I'm trying to wedge in some
// experimental code. Please pardon our appearance during renovations.
// Config is an evil global configuration that will be used as we transition /
// refactor the codebase to move away from storing these values in the Context.
var Config = struct {
Services struct {
Pubsub pubsub.Publisher
Queue queue.Queue
Logs logging.Log
Senders model.SenderService
Secrets model.SecretService
Registries model.RegistryService
2017-06-26 19:27:53 +00:00
Environ model.EnvironService
}
Storage struct {
// Users model.UserStore
// Repos model.RepoStore
// Builds model.BuildStore
// Logs model.LogStore
2017-05-05 16:59:37 +00:00
Config model.ConfigStore
Files model.FileStore
Procs model.ProcStore
// Registries model.RegistryStore
// Secrets model.SecretStore
}
Server struct {
Key string
Cert string
Host string
Port string
Pass string
RepoConfig string
SessionExpires time.Duration
// Open bool
// Orgs map[string]struct{}
// Admins map[string]struct{}
}
Prometheus struct {
AuthToken string
}
Pipeline struct {
Limits model.ResourceLimit
Volumes []string
Networks []string
Privileged []string
}
}{}
2017-03-05 07:56:08 +00:00
type RPC struct {
remote remote.Remote
queue queue.Queue
pubsub pubsub.Publisher
logger logging.Log
store store.Store
host string
}
// Next implements the rpc.Next function
func (s *RPC) Next(c context.Context, filter rpc.Filter) (*rpc.Pipeline, error) {
2017-07-20 16:21:15 +00:00
metadata, ok := metadata.FromContext(c)
if ok {
hostname, ok := metadata["hostname"]
if ok && len(hostname) != 0 {
logrus.Debugf("agent connected: %s: polling", hostname[0])
}
}
fn, err := createFilterFunc(filter)
if err != nil {
return nil, err
}
for {
task, err := s.queue.Poll(c, fn)
if err != nil {
return nil, err
} else if task == nil {
return nil, nil
}
if task.ShouldRun() {
pipeline := new(rpc.Pipeline)
err = json.Unmarshal(task.Data, pipeline)
return pipeline, err
} else {
s.Done(c, task.ID, rpc.State{})
}
}
2017-03-05 07:56:08 +00:00
}
2017-03-05 11:05:16 +00:00
// Wait implements the rpc.Wait function
func (s *RPC) Wait(c context.Context, id string) error {
return s.queue.Wait(c, id)
2017-03-05 07:56:08 +00:00
}
// Extend implements the rpc.Extend function
func (s *RPC) Extend(c context.Context, id string) error {
return s.queue.Extend(c, id)
}
// Update implements the rpc.Update function
func (s *RPC) Update(c context.Context, id string, state rpc.State) error {
2017-04-01 11:17:04 +00:00
procID, err := strconv.ParseInt(id, 10, 64)
2017-03-05 07:56:08 +00:00
if err != nil {
return err
}
2017-04-03 09:34:37 +00:00
pproc, err := s.store.ProcLoad(procID)
2017-03-05 07:56:08 +00:00
if err != nil {
2017-04-03 09:34:37 +00:00
log.Printf("error: rpc.update: cannot find pproc with id %d: %s", procID, err)
2017-03-05 07:56:08 +00:00
return err
}
2017-04-03 09:34:37 +00:00
build, err := s.store.GetBuild(pproc.BuildID)
2017-03-05 07:56:08 +00:00
if err != nil {
2017-04-03 09:34:37 +00:00
log.Printf("error: cannot find build with id %d: %s", pproc.BuildID, err)
return err
}
proc, err := s.store.ProcChild(build, pproc.PID, state.Proc)
if err != nil {
log.Printf("error: cannot find proc with name %s: %s", state.Proc, err)
2017-03-05 07:56:08 +00:00
return err
}
2017-07-20 03:07:29 +00:00
metadata, ok := metadata.FromContext(c)
if ok {
hostname, ok := metadata["hostname"]
if ok && len(hostname) != 0 {
proc.Machine = hostname[0]
}
}
2017-03-05 07:56:08 +00:00
repo, err := s.store.GetRepo(build.RepoID)
if err != nil {
log.Printf("error: cannot find repo with id %d: %s", build.RepoID, err)
return err
}
2017-04-01 11:17:04 +00:00
if state.Exited {
proc.Stopped = state.Finished
proc.ExitCode = state.ExitCode
proc.Error = state.Error
2017-04-03 09:34:37 +00:00
proc.State = model.StatusSuccess
if state.ExitCode != 0 || state.Error != "" {
proc.State = model.StatusFailure
}
2017-09-15 16:04:21 +00:00
if state.ExitCode == 137 {
proc.State = model.StatusKilled
}
2017-04-01 11:17:04 +00:00
} else {
proc.Started = state.Started
proc.State = model.StatusRunning
}
if proc.Started == 0 && proc.Stopped != 0 {
proc.Started = build.Started
}
2017-04-01 11:17:04 +00:00
if err := s.store.ProcUpdate(proc); err != nil {
log.Printf("error: rpc.update: cannot update proc: %s", err)
}
build.Procs, _ = s.store.ProcList(build)
2017-04-04 09:30:06 +00:00
build.Procs = model.Tree(build.Procs)
2017-04-01 11:17:04 +00:00
message := pubsub.Message{
Labels: map[string]string{
"repo": repo.FullName,
"private": strconv.FormatBool(repo.IsPrivate),
},
}
message.Data, _ = json.Marshal(model.Event{
Repo: *repo,
Build: *build,
})
s.pubsub.Publish(c, "topic/events", message)
2017-03-05 07:56:08 +00:00
2017-04-01 11:17:04 +00:00
return nil
}
// Upload implements the rpc.Upload function
func (s *RPC) Upload(c context.Context, id string, file *rpc.File) error {
procID, err := strconv.ParseInt(id, 10, 64)
if err != nil {
return err
2017-03-05 07:56:08 +00:00
}
2017-04-03 09:34:37 +00:00
pproc, err := s.store.ProcLoad(procID)
2017-04-01 11:17:04 +00:00
if err != nil {
2017-04-03 09:34:37 +00:00
log.Printf("error: cannot find parent proc with id %d: %s", procID, err)
return err
}
build, err := s.store.GetBuild(pproc.BuildID)
if err != nil {
log.Printf("error: cannot find build with id %d: %s", pproc.BuildID, err)
return err
}
proc, err := s.store.ProcChild(build, pproc.PID, file.Proc)
if err != nil {
log.Printf("error: cannot find child proc with name %s: %s", file.Proc, err)
2017-04-01 11:17:04 +00:00
return err
}
2017-04-03 09:34:37 +00:00
if file.Mime == "application/json+logs" {
return s.store.LogSave(
proc,
bytes.NewBuffer(file.Data),
)
}
2017-08-02 20:04:00 +00:00
report := &model.File{
2017-04-01 11:17:04 +00:00
BuildID: proc.BuildID,
ProcID: proc.ID,
2017-07-27 17:06:24 +00:00
PID: proc.PID,
2017-04-01 11:17:04 +00:00
Mime: file.Mime,
Name: file.Name,
Size: file.Size,
Time: file.Time,
2017-08-02 20:04:00 +00:00
}
if d, ok := file.Meta["X-Tests-Passed"]; ok {
report.Passed, _ = strconv.Atoi(d)
}
if d, ok := file.Meta["X-Tests-Failed"]; ok {
report.Failed, _ = strconv.Atoi(d)
}
if d, ok := file.Meta["X-Tests-Skipped"]; ok {
report.Skipped, _ = strconv.Atoi(d)
}
if d, ok := file.Meta["X-Checks-Passed"]; ok {
report.Passed, _ = strconv.Atoi(d)
}
if d, ok := file.Meta["X-Checks-Failed"]; ok {
report.Failed, _ = strconv.Atoi(d)
}
if d, ok := file.Meta["X-Coverage-Lines"]; ok {
report.Passed, _ = strconv.Atoi(d)
}
if d, ok := file.Meta["X-Coverage-Total"]; ok {
if total, _ := strconv.Atoi(d); total != 0 {
report.Failed = total - report.Passed
}
}
return Config.Storage.Files.FileCreate(
report,
2017-04-01 11:17:04 +00:00
bytes.NewBuffer(file.Data),
)
}
// Init implements the rpc.Init function
func (s *RPC) Init(c context.Context, id string, state rpc.State) error {
procID, err := strconv.ParseInt(id, 10, 64)
if err != nil {
return err
}
proc, err := s.store.ProcLoad(procID)
if err != nil {
log.Printf("error: cannot find proc with id %d: %s", procID, err)
return err
}
metadata, ok := metadata.FromContext(c)
if ok {
hostname, ok := metadata["hostname"]
if ok && len(hostname) != 0 {
proc.Machine = hostname[0]
}
}
2017-04-01 11:17:04 +00:00
build, err := s.store.GetBuild(proc.BuildID)
if err != nil {
log.Printf("error: cannot find build with id %d: %s", proc.BuildID, err)
return err
}
repo, err := s.store.GetRepo(build.RepoID)
if err != nil {
log.Printf("error: cannot find repo with id %d: %s", build.RepoID, err)
return err
}
2017-03-05 07:56:08 +00:00
if build.Status == model.StatusPending {
build.Status = model.StatusRunning
2017-04-01 11:17:04 +00:00
build.Started = state.Started
if err := s.store.UpdateBuild(build); err != nil {
log.Printf("error: init: cannot update build_id %d state: %s", build.ID, err)
}
2017-03-05 07:56:08 +00:00
}
2017-04-01 11:17:04 +00:00
defer func() {
build.Procs, _ = s.store.ProcList(build)
message := pubsub.Message{
Labels: map[string]string{
"repo": repo.FullName,
"private": strconv.FormatBool(repo.IsPrivate),
},
}
message.Data, _ = json.Marshal(model.Event{
Repo: *repo,
Build: *build,
})
s.pubsub.Publish(c, "topic/events", message)
}()
2017-03-05 07:56:08 +00:00
2017-04-01 11:17:04 +00:00
proc.Started = state.Started
proc.State = model.StatusRunning
return s.store.ProcUpdate(proc)
}
// Done implements the rpc.Done function
func (s *RPC) Done(c context.Context, id string, state rpc.State) error {
procID, err := strconv.ParseInt(id, 10, 64)
if err != nil {
return err
}
2017-03-05 07:56:08 +00:00
2017-04-01 11:17:04 +00:00
proc, err := s.store.ProcLoad(procID)
if err != nil {
log.Printf("error: cannot find proc with id %d: %s", procID, err)
return err
}
build, err := s.store.GetBuild(proc.BuildID)
if err != nil {
log.Printf("error: cannot find build with id %d: %s", proc.BuildID, err)
return err
}
repo, err := s.store.GetRepo(build.RepoID)
if err != nil {
log.Printf("error: cannot find repo with id %d: %s", build.RepoID, err)
return err
}
2019-06-16 08:54:31 +00:00
s.updateProcState(proc, state)
var queueErr error
if proc.Failing() {
queueErr = s.queue.Error(c, id, fmt.Errorf("Proc finished with exitcode %d, %s", state.ExitCode, state.Error))
} else {
queueErr = s.queue.Done(c, id)
}
if queueErr != nil {
2019-06-16 08:54:31 +00:00
log.Printf("error: done: cannot ack proc_id %d: %s", procID, err)
}
procs, _ := s.store.ProcList(build)
s.completeChildrenIfParentCompleted(procs, proc)
if !isThereRunningStage(procs) {
build.Status = buildStatus(procs)
build.Finished = proc.Stopped
if err := s.store.UpdateBuild(build); err != nil {
log.Printf("error: done: cannot update build_id %d final state: %s", build.ID, err)
}
if !isMultiPipeline(procs) {
s.updateRemoteStatus(repo, build, nil)
}
}
if isMultiPipeline(procs) {
s.updateRemoteStatus(repo, build, proc)
2019-06-16 08:54:31 +00:00
}
if err := s.logger.Close(c, id); err != nil {
log.Printf("error: done: cannot close build_id %d logger: %s", proc.ID, err)
}
s.notify(c, repo, build, procs)
return nil
}
func isMultiPipeline(procs []*model.Proc) bool {
countPPIDZero := 0
for _, proc := range procs {
if proc.PPID == 0 {
countPPIDZero++
}
}
return countPPIDZero > 1
}
2019-06-16 08:54:31 +00:00
// Log implements the rpc.Log function
func (s *RPC) Log(c context.Context, id string, line *rpc.Line) error {
entry := new(logging.Entry)
entry.Data, _ = json.Marshal(line)
s.logger.Write(c, id, entry)
return nil
}
func (s *RPC) updateProcState(proc *model.Proc, state rpc.State) {
2017-04-01 11:17:04 +00:00
proc.Stopped = state.Finished
proc.Error = state.Error
proc.ExitCode = state.ExitCode
2019-06-19 06:36:13 +00:00
if state.Started == 0 {
proc.State = model.StatusSkipped
} else {
proc.State = model.StatusSuccess
}
2017-04-03 09:34:37 +00:00
if proc.ExitCode != 0 || proc.Error != "" {
proc.State = model.StatusFailure
}
2017-04-01 11:17:04 +00:00
if err := s.store.ProcUpdate(proc); err != nil {
2019-06-16 08:54:31 +00:00
log.Printf("error: done: cannot update proc_id %d state: %s", proc.ID, err)
2017-04-01 11:17:04 +00:00
}
2019-06-16 08:54:31 +00:00
}
2017-03-05 07:56:08 +00:00
2019-06-16 08:54:31 +00:00
func (s *RPC) completeChildrenIfParentCompleted(procs []*model.Proc, completedProc *model.Proc) {
2017-04-01 11:17:04 +00:00
for _, p := range procs {
2019-06-16 08:54:31 +00:00
if p.Running() && p.PPID == completedProc.PID {
2017-04-01 11:17:04 +00:00
p.State = model.StatusSkipped
if p.Started != 0 {
2017-04-04 09:30:06 +00:00
p.State = model.StatusSuccess // for deamons that are killed
2019-06-16 08:54:31 +00:00
p.Stopped = completedProc.Stopped
2017-04-01 11:17:04 +00:00
}
if err := s.store.ProcUpdate(p); err != nil {
log.Printf("error: done: cannot update proc_id %d child state: %s", p.ID, err)
}
2017-03-05 07:56:08 +00:00
}
2017-04-04 09:30:06 +00:00
}
2019-06-16 08:54:31 +00:00
}
2017-04-04 09:30:06 +00:00
2019-06-16 08:54:31 +00:00
func isThereRunningStage(procs []*model.Proc) bool {
2017-04-04 09:30:06 +00:00
for _, p := range procs {
if p.PPID == 0 {
if p.Running() {
2019-06-16 08:54:31 +00:00
return true
2017-04-04 09:30:06 +00:00
}
2019-06-16 08:54:31 +00:00
}
}
return false
}
func buildStatus(procs []*model.Proc) string {
status := model.StatusSuccess
for _, p := range procs {
if p.PPID == 0 {
2017-04-01 11:17:04 +00:00
if p.Failing() {
2017-04-04 09:30:06 +00:00
status = p.State
2017-04-01 11:17:04 +00:00
}
2017-03-05 07:56:08 +00:00
}
2017-04-01 11:17:04 +00:00
}
2017-04-12 17:56:30 +00:00
2019-06-16 08:54:31 +00:00
return status
}
func (s *RPC) updateRemoteStatus(repo *model.Repo, build *model.Build, proc *model.Proc) {
2019-06-16 08:54:31 +00:00
user, err := s.store.GetUser(repo.UserID)
if err == nil {
if refresher, ok := s.remote.(remote.Refresher); ok {
ok, _ := refresher.Refresh(user)
if ok {
s.store.UpdateUser(user)
2017-04-12 17:56:30 +00:00
}
}
2019-06-16 08:54:31 +00:00
uri := fmt.Sprintf("%s/%s/%d", s.host, repo.FullName, build.Number)
err = s.remote.Status(user, repo, build, uri, proc)
2019-06-16 08:54:31 +00:00
if err != nil {
logrus.Errorf("error setting commit status for %s/%d: %v", repo.FullName, build.Number, err)
}
2017-03-05 07:56:08 +00:00
}
2019-06-16 08:54:31 +00:00
}
2017-03-05 07:56:08 +00:00
2019-06-16 08:54:31 +00:00
func (s *RPC) notify(c context.Context, repo *model.Repo, build *model.Build, procs []*model.Proc) {
2017-04-04 09:30:06 +00:00
build.Procs = model.Tree(procs)
2017-04-01 11:17:04 +00:00
message := pubsub.Message{
Labels: map[string]string{
"repo": repo.FullName,
"private": strconv.FormatBool(repo.IsPrivate),
},
}
2017-03-05 07:56:08 +00:00
message.Data, _ = json.Marshal(model.Event{
Repo: *repo,
Build: *build,
})
s.pubsub.Publish(c, "topic/events", message)
}
func createFilterFunc(filter rpc.Filter) (queue.Filter, error) {
var st *expr.Selector
var err error
if filter.Expr != "" {
st, err = expr.ParseString(filter.Expr)
if err != nil {
return nil, err
}
}
return func(task *queue.Task) bool {
if st != nil {
match, _ := st.Eval(expr.NewRow(task.Labels))
return match
}
for k, v := range filter.Labels {
if task.Labels[k] != v {
return false
}
}
return true
}, nil
}
2017-06-28 17:21:22 +00:00
//
//
//
// DroneServer is a grpc server implementation.
type DroneServer struct {
Remote remote.Remote
Queue queue.Queue
Pubsub pubsub.Publisher
Logger logging.Log
Store store.Store
Host string
}
func (s *DroneServer) Next(c oldcontext.Context, req *proto.NextRequest) (*proto.NextReply, error) {
peer := RPC{
remote: s.Remote,
store: s.Store,
queue: s.Queue,
pubsub: s.Pubsub,
logger: s.Logger,
host: s.Host,
}
filter := rpc.Filter{
Labels: req.GetFilter().GetLabels(),
Expr: req.GetFilter().GetExpr(),
2017-06-28 17:21:22 +00:00
}
res := new(proto.NextReply)
pipeline, err := peer.Next(c, filter)
if err != nil {
return res, err
}
if pipeline == nil {
return res, err
}
res.Pipeline = new(proto.Pipeline)
res.Pipeline.Id = pipeline.ID
res.Pipeline.Timeout = pipeline.Timeout
res.Pipeline.Payload, _ = json.Marshal(pipeline.Config)
return res, err
}
func (s *DroneServer) Init(c oldcontext.Context, req *proto.InitRequest) (*proto.Empty, error) {
peer := RPC{
remote: s.Remote,
store: s.Store,
queue: s.Queue,
pubsub: s.Pubsub,
logger: s.Logger,
host: s.Host,
}
state := rpc.State{
Error: req.GetState().GetError(),
ExitCode: int(req.GetState().GetExitCode()),
Finished: req.GetState().GetFinished(),
Started: req.GetState().GetStarted(),
Proc: req.GetState().GetName(),
Exited: req.GetState().GetExited(),
}
res := new(proto.Empty)
err := peer.Init(c, req.GetId(), state)
return res, err
}
func (s *DroneServer) Update(c oldcontext.Context, req *proto.UpdateRequest) (*proto.Empty, error) {
peer := RPC{
remote: s.Remote,
store: s.Store,
queue: s.Queue,
pubsub: s.Pubsub,
logger: s.Logger,
host: s.Host,
}
state := rpc.State{
Error: req.GetState().GetError(),
ExitCode: int(req.GetState().GetExitCode()),
Finished: req.GetState().GetFinished(),
Started: req.GetState().GetStarted(),
Proc: req.GetState().GetName(),
Exited: req.GetState().GetExited(),
}
res := new(proto.Empty)
err := peer.Update(c, req.GetId(), state)
return res, err
}
func (s *DroneServer) Upload(c oldcontext.Context, req *proto.UploadRequest) (*proto.Empty, error) {
peer := RPC{
remote: s.Remote,
store: s.Store,
queue: s.Queue,
pubsub: s.Pubsub,
logger: s.Logger,
host: s.Host,
}
file := &rpc.File{
Data: req.GetFile().GetData(),
Mime: req.GetFile().GetMime(),
Name: req.GetFile().GetName(),
Proc: req.GetFile().GetProc(),
Size: int(req.GetFile().GetSize()),
Time: req.GetFile().GetTime(),
2017-08-02 20:04:00 +00:00
Meta: req.GetFile().GetMeta(),
2017-06-28 17:21:22 +00:00
}
2017-08-02 20:04:00 +00:00
2017-06-28 17:21:22 +00:00
res := new(proto.Empty)
err := peer.Upload(c, req.GetId(), file)
return res, err
}
func (s *DroneServer) Done(c oldcontext.Context, req *proto.DoneRequest) (*proto.Empty, error) {
peer := RPC{
remote: s.Remote,
store: s.Store,
queue: s.Queue,
pubsub: s.Pubsub,
logger: s.Logger,
host: s.Host,
}
state := rpc.State{
Error: req.GetState().GetError(),
ExitCode: int(req.GetState().GetExitCode()),
Finished: req.GetState().GetFinished(),
Started: req.GetState().GetStarted(),
Proc: req.GetState().GetName(),
Exited: req.GetState().GetExited(),
}
res := new(proto.Empty)
err := peer.Done(c, req.GetId(), state)
return res, err
}
func (s *DroneServer) Wait(c oldcontext.Context, req *proto.WaitRequest) (*proto.Empty, error) {
peer := RPC{
remote: s.Remote,
store: s.Store,
queue: s.Queue,
pubsub: s.Pubsub,
logger: s.Logger,
host: s.Host,
}
res := new(proto.Empty)
err := peer.Wait(c, req.GetId())
return res, err
}
func (s *DroneServer) Extend(c oldcontext.Context, req *proto.ExtendRequest) (*proto.Empty, error) {
peer := RPC{
remote: s.Remote,
store: s.Store,
queue: s.Queue,
pubsub: s.Pubsub,
logger: s.Logger,
host: s.Host,
}
res := new(proto.Empty)
err := peer.Extend(c, req.GetId())
return res, err
}
func (s *DroneServer) Log(c oldcontext.Context, req *proto.LogRequest) (*proto.Empty, error) {
peer := RPC{
remote: s.Remote,
store: s.Store,
queue: s.Queue,
pubsub: s.Pubsub,
logger: s.Logger,
host: s.Host,
}
line := &rpc.Line{
Out: req.GetLine().GetOut(),
Pos: int(req.GetLine().GetPos()),
Time: req.GetLine().GetTime(),
Proc: req.GetLine().GetProc(),
}
res := new(proto.Empty)
err := peer.Log(c, req.GetId(), line)
return res, err
}