From 2253ca66dbfa1998667c5aefd53a003296a6fcfa Mon Sep 17 00:00:00 2001 From: Laszlo Fogas Date: Sun, 16 Jun 2019 15:56:32 +0200 Subject: [PATCH] Tasks should not run on error, unless specified --- server/rpc.go | 42 ++++++++++++++++++++++-------------------- 1 file changed, 22 insertions(+), 20 deletions(-) diff --git a/server/rpc.go b/server/rpc.go index 8e20fdb17..6bd288459 100644 --- a/server/rpc.go +++ b/server/rpc.go @@ -113,26 +113,22 @@ func (s *RPC) Next(c context.Context, filter rpc.Filter) (*rpc.Pipeline, error) if err != nil { return nil, err } - task, err := s.queue.Poll(c, fn) - if err != nil { - return nil, err - } else if task == nil { - return nil, nil + for { + task, err := s.queue.Poll(c, fn) + if err != nil { + return nil, err + } else if task == nil { + return nil, nil + } + + if task.ShouldRun() { + pipeline := new(rpc.Pipeline) + err = json.Unmarshal(task.Data, pipeline) + return pipeline, err + } else { + s.Done(c, task.ID, rpc.State{}) + } } - pipeline := new(rpc.Pipeline) - - // check if the process was previously cancelled - // cancelled, _ := s.checkCancelled(pipeline) - // if cancelled { - // logrus.Debugf("ignore pid %v: cancelled by user", pipeline.ID) - // if derr := s.queue.Done(c, pipeline.ID); derr != nil { - // logrus.Errorf("error: done: cannot ack proc_id %v: %s", pipeline.ID, err) - // } - // return nil, nil - // } - - err = json.Unmarshal(task.Data, pipeline) - return pipeline, err } // Wait implements the rpc.Wait function @@ -385,7 +381,13 @@ func (s *RPC) Done(c context.Context, id string, state rpc.State) error { s.updateProcState(proc, state) - if err := s.queue.Done(c, id); err != nil { + var queueErr error + if proc.Failing() { + queueErr = s.queue.Error(c, id, fmt.Errorf("Proc finished with exitcode %d, %s", state.ExitCode, state.Error)) + } else { + queueErr = s.queue.Done(c, id) + } + if queueErr != nil { log.Printf("error: done: cannot ack proc_id %d: %s", procID, err) }