Factor into functions

This commit is contained in:
Laszlo Fogas 2019-06-16 10:54:31 +02:00
parent 782a59b6cc
commit 6a2c3f129c

View file

@ -383,6 +383,43 @@ func (s *RPC) Done(c context.Context, id string, state rpc.State) error {
return err return err
} }
s.updateProcState(proc, state)
if err := s.queue.Done(c, id); err != nil {
log.Printf("error: done: cannot ack proc_id %d: %s", procID, err)
}
procs, _ := s.store.ProcList(build)
s.completeChildrenIfParentCompleted(procs, proc)
if !isThereRunningStage(procs) {
build.Status = buildStatus(procs)
build.Finished = proc.Stopped
if err := s.store.UpdateBuild(build); err != nil {
log.Printf("error: done: cannot update build_id %d final state: %s", build.ID, err)
}
s.updateRemoteStatus(repo, build)
}
if err := s.logger.Close(c, id); err != nil {
log.Printf("error: done: cannot close build_id %d logger: %s", proc.ID, err)
}
s.notify(c, repo, build, procs)
return nil
}
// Log implements the rpc.Log function
func (s *RPC) Log(c context.Context, id string, line *rpc.Line) error {
entry := new(logging.Entry)
entry.Data, _ = json.Marshal(line)
s.logger.Write(c, id, entry)
return nil
}
func (s *RPC) updateProcState(proc *model.Proc, state rpc.State) {
proc.Stopped = state.Finished proc.Stopped = state.Finished
proc.Error = state.Error proc.Error = state.Error
proc.ExitCode = state.ExitCode proc.ExitCode = state.ExitCode
@ -391,68 +428,68 @@ func (s *RPC) Done(c context.Context, id string, state rpc.State) error {
proc.State = model.StatusFailure proc.State = model.StatusFailure
} }
if err := s.store.ProcUpdate(proc); err != nil { if err := s.store.ProcUpdate(proc); err != nil {
log.Printf("error: done: cannot update proc_id %d state: %s", procID, err) log.Printf("error: done: cannot update proc_id %d state: %s", proc.ID, err)
} }
}
if err := s.queue.Done(c, id); err != nil { func (s *RPC) completeChildrenIfParentCompleted(procs []*model.Proc, completedProc *model.Proc) {
log.Printf("error: done: cannot ack proc_id %d: %s", procID, err)
}
// TODO handle this error
procs, _ := s.store.ProcList(build)
for _, p := range procs { for _, p := range procs {
if p.Running() && p.PPID == proc.PID { if p.Running() && p.PPID == completedProc.PID {
p.State = model.StatusSkipped p.State = model.StatusSkipped
if p.Started != 0 { if p.Started != 0 {
p.State = model.StatusSuccess // for deamons that are killed p.State = model.StatusSuccess // for deamons that are killed
p.Stopped = proc.Stopped p.Stopped = completedProc.Stopped
} }
if err := s.store.ProcUpdate(p); err != nil { if err := s.store.ProcUpdate(p); err != nil {
log.Printf("error: done: cannot update proc_id %d child state: %s", p.ID, err) log.Printf("error: done: cannot update proc_id %d child state: %s", p.ID, err)
} }
} }
} }
}
running := false func isThereRunningStage(procs []*model.Proc) bool {
status := model.StatusSuccess
for _, p := range procs { for _, p := range procs {
if p.PPID == 0 { if p.PPID == 0 {
if p.Running() { if p.Running() {
running = true return true
} }
}
}
return false
}
func buildStatus(procs []*model.Proc) string {
status := model.StatusSuccess
for _, p := range procs {
if p.PPID == 0 {
if p.Failing() { if p.Failing() {
status = p.State status = p.State
} }
} }
} }
if !running {
build.Status = status
build.Finished = proc.Stopped
if err := s.store.UpdateBuild(build); err != nil {
log.Printf("error: done: cannot update build_id %d final state: %s", build.ID, err)
}
// update the status return status
user, err := s.store.GetUser(repo.UserID) }
if err == nil {
if refresher, ok := s.remote.(remote.Refresher); ok { func (s *RPC) updateRemoteStatus(repo *model.Repo, build *model.Build) {
ok, _ := refresher.Refresh(user) user, err := s.store.GetUser(repo.UserID)
if ok { if err == nil {
s.store.UpdateUser(user) if refresher, ok := s.remote.(remote.Refresher); ok {
} ok, _ := refresher.Refresh(user)
} if ok {
uri := fmt.Sprintf("%s/%s/%d", s.host, repo.FullName, build.Number) s.store.UpdateUser(user)
err = s.remote.Status(user, repo, build, uri)
if err != nil {
logrus.Errorf("error setting commit status for %s/%d: %v", repo.FullName, build.Number, err)
} }
} }
uri := fmt.Sprintf("%s/%s/%d", s.host, repo.FullName, build.Number)
err = s.remote.Status(user, repo, build, uri)
if err != nil {
logrus.Errorf("error setting commit status for %s/%d: %v", repo.FullName, build.Number, err)
}
} }
}
if err := s.logger.Close(c, id); err != nil { func (s *RPC) notify(c context.Context, repo *model.Repo, build *model.Build, procs []*model.Proc) {
log.Printf("error: done: cannot close build_id %d logger: %s", proc.ID, err)
}
build.Procs = model.Tree(procs) build.Procs = model.Tree(procs)
message := pubsub.Message{ message := pubsub.Message{
Labels: map[string]string{ Labels: map[string]string{
@ -465,16 +502,6 @@ func (s *RPC) Done(c context.Context, id string, state rpc.State) error {
Build: *build, Build: *build,
}) })
s.pubsub.Publish(c, "topic/events", message) s.pubsub.Publish(c, "topic/events", message)
return nil
}
// Log implements the rpc.Log function
func (s *RPC) Log(c context.Context, id string, line *rpc.Line) error {
entry := new(logging.Entry)
entry.Data, _ = json.Marshal(line)
s.logger.Write(c, id, entry)
return nil
} }
func (s *RPC) checkCancelled(pipeline *rpc.Pipeline) (bool, error) { func (s *RPC) checkCancelled(pipeline *rpc.Pipeline) (bool, error) {