From 6a2c3f129c233e0e87d53c23791e7869652de2f9 Mon Sep 17 00:00:00 2001 From: Laszlo Fogas Date: Sun, 16 Jun 2019 10:54:31 +0200 Subject: [PATCH] Factor into functions --- server/rpc.go | 117 +++++++++++++++++++++++++++++++------------------- 1 file changed, 72 insertions(+), 45 deletions(-) diff --git a/server/rpc.go b/server/rpc.go index 61ff8648a..8e20fdb17 100644 --- a/server/rpc.go +++ b/server/rpc.go @@ -383,6 +383,43 @@ func (s *RPC) Done(c context.Context, id string, state rpc.State) error { return err } + s.updateProcState(proc, state) + + if err := s.queue.Done(c, id); err != nil { + log.Printf("error: done: cannot ack proc_id %d: %s", procID, err) + } + + procs, _ := s.store.ProcList(build) + s.completeChildrenIfParentCompleted(procs, proc) + + if !isThereRunningStage(procs) { + build.Status = buildStatus(procs) + build.Finished = proc.Stopped + if err := s.store.UpdateBuild(build); err != nil { + log.Printf("error: done: cannot update build_id %d final state: %s", build.ID, err) + } + + s.updateRemoteStatus(repo, build) + } + + if err := s.logger.Close(c, id); err != nil { + log.Printf("error: done: cannot close build_id %d logger: %s", proc.ID, err) + } + + s.notify(c, repo, build, procs) + + return nil +} + +// Log implements the rpc.Log function +func (s *RPC) Log(c context.Context, id string, line *rpc.Line) error { + entry := new(logging.Entry) + entry.Data, _ = json.Marshal(line) + s.logger.Write(c, id, entry) + return nil +} + +func (s *RPC) updateProcState(proc *model.Proc, state rpc.State) { proc.Stopped = state.Finished proc.Error = state.Error proc.ExitCode = state.ExitCode @@ -391,68 +428,68 @@ func (s *RPC) Done(c context.Context, id string, state rpc.State) error { proc.State = model.StatusFailure } if err := s.store.ProcUpdate(proc); err != nil { - log.Printf("error: done: cannot update proc_id %d state: %s", procID, err) + log.Printf("error: done: cannot update proc_id %d state: %s", proc.ID, err) } +} - if err := s.queue.Done(c, id); err != nil { - log.Printf("error: done: cannot ack proc_id %d: %s", procID, err) - } - - // TODO handle this error - procs, _ := s.store.ProcList(build) +func (s *RPC) completeChildrenIfParentCompleted(procs []*model.Proc, completedProc *model.Proc) { for _, p := range procs { - if p.Running() && p.PPID == proc.PID { + if p.Running() && p.PPID == completedProc.PID { p.State = model.StatusSkipped if p.Started != 0 { p.State = model.StatusSuccess // for deamons that are killed - p.Stopped = proc.Stopped + p.Stopped = completedProc.Stopped } if err := s.store.ProcUpdate(p); err != nil { log.Printf("error: done: cannot update proc_id %d child state: %s", p.ID, err) } } } +} - running := false - status := model.StatusSuccess +func isThereRunningStage(procs []*model.Proc) bool { for _, p := range procs { if p.PPID == 0 { if p.Running() { - running = true + return true } + } + } + return false +} + +func buildStatus(procs []*model.Proc) string { + status := model.StatusSuccess + + for _, p := range procs { + if p.PPID == 0 { if p.Failing() { status = p.State } } } - if !running { - build.Status = status - build.Finished = proc.Stopped - if err := s.store.UpdateBuild(build); err != nil { - log.Printf("error: done: cannot update build_id %d final state: %s", build.ID, err) - } - // update the status - user, err := s.store.GetUser(repo.UserID) - if err == nil { - if refresher, ok := s.remote.(remote.Refresher); ok { - ok, _ := refresher.Refresh(user) - if ok { - s.store.UpdateUser(user) - } - } - uri := fmt.Sprintf("%s/%s/%d", s.host, repo.FullName, build.Number) - err = s.remote.Status(user, repo, build, uri) - if err != nil { - logrus.Errorf("error setting commit status for %s/%d: %v", repo.FullName, build.Number, err) + return status +} + +func (s *RPC) updateRemoteStatus(repo *model.Repo, build *model.Build) { + user, err := s.store.GetUser(repo.UserID) + if err == nil { + if refresher, ok := s.remote.(remote.Refresher); ok { + ok, _ := refresher.Refresh(user) + if ok { + s.store.UpdateUser(user) } } + uri := fmt.Sprintf("%s/%s/%d", s.host, repo.FullName, build.Number) + err = s.remote.Status(user, repo, build, uri) + if err != nil { + logrus.Errorf("error setting commit status for %s/%d: %v", repo.FullName, build.Number, err) + } } +} - if err := s.logger.Close(c, id); err != nil { - log.Printf("error: done: cannot close build_id %d logger: %s", proc.ID, err) - } - +func (s *RPC) notify(c context.Context, repo *model.Repo, build *model.Build, procs []*model.Proc) { build.Procs = model.Tree(procs) message := pubsub.Message{ Labels: map[string]string{ @@ -465,16 +502,6 @@ func (s *RPC) Done(c context.Context, id string, state rpc.State) error { Build: *build, }) s.pubsub.Publish(c, "topic/events", message) - - return nil -} - -// Log implements the rpc.Log function -func (s *RPC) Log(c context.Context, id string, line *rpc.Line) error { - entry := new(logging.Entry) - entry.Data, _ = json.Marshal(line) - s.logger.Write(c, id, entry) - return nil } func (s *RPC) checkCancelled(pipeline *rpc.Pipeline) (bool, error) {