From b68761f3c1fe68f8ee77fbe9f02e23a382f194d6 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Fri, 29 Mar 2024 11:32:37 +0100 Subject: [PATCH] more todos --- server/grpc/auth_server.go | 3 +++ server/grpc/rpc.go | 42 ++++++++++++++++++++++++++++++++++++-- server/queue/fifo.go | 6 +++++- server/queue/queue.go | 5 ++++- 4 files changed, 52 insertions(+), 4 deletions(-) diff --git a/server/grpc/auth_server.go b/server/grpc/auth_server.go index f9652c85f..cc2036a25 100644 --- a/server/grpc/auth_server.go +++ b/server/grpc/auth_server.go @@ -89,5 +89,8 @@ func (s *WoodpeckerAuthServer) getAgent(agentID int64, agentToken string) (*mode if err != nil && errors.Is(err, types.RecordNotExist) { return nil, fmt.Errorf("individual agent not found by token: %w", err) } + + // TODO: check if an agent can still pretend to be an other one + return agent, err } diff --git a/server/grpc/rpc.go b/server/grpc/rpc.go index ef9533f02..598cd9103 100644 --- a/server/grpc/rpc.go +++ b/server/grpc/rpc.go @@ -105,11 +105,16 @@ func (s *RPC) Wait(c context.Context, id string) error { // Extend implements the rpc.Extend function func (s *RPC) Extend(c context.Context, id string) error { - return s.queue.Extend(c, id) + agent, err := s.getAgentFromContext(c) + if err != nil { + return err + } + + return s.queue.Extend(c, agent.ID, id) } // Update implements the rpc.Update function -func (s *RPC) Update(_ context.Context, id string, state rpc.State) error { +func (s *RPC) Update(c context.Context, id string, state rpc.State) error { workflowID, err := strconv.ParseInt(id, 10, 64) if err != nil { return err @@ -127,6 +132,15 @@ func (s *RPC) Update(_ context.Context, id string, state rpc.State) error { return err } + agent, err := s.getAgentFromContext(c) + if err != nil { + return err + } + + if !agent.IsSystemAgent() { + // TODO: check if agent is allowed to alter things + } + step, err := s.store.StepByUUID(state.StepUUID) if err != nil { log.Error().Err(err).Msgf("cannot find step with uuid %s", state.StepUUID) @@ -191,6 +205,11 @@ func (s *RPC) Init(c context.Context, id string, state rpc.State) error { if err != nil { return err } + + if !agent.IsSystemAgent() { + // TODO: check if agent is allowed to alter things + } + workflow.AgentID = agent.ID currentPipeline, err := s.store.GetPipeline(workflow.PipelineID) @@ -270,6 +289,15 @@ func (s *RPC) Done(c context.Context, id string, state rpc.State) error { return err } + agent, err := s.getAgentFromContext(c) + if err != nil { + return err + } + + if !agent.IsSystemAgent() { + // TODO: check if agent is allowed to alter things + } + logger := log.With(). Str("repo_id", fmt.Sprint(repo.ID)). Str("pipeline_id", fmt.Sprint(currentPipeline.ID)). @@ -336,6 +364,16 @@ func (s *RPC) Log(c context.Context, _logEntry *rpc.LogEntry) error { if err != nil { return fmt.Errorf("could not find step with uuid %s in store: %w", _logEntry.StepUUID, err) } + + agent, err := s.getAgentFromContext(c) + if err != nil { + return err + } + + if !agent.IsSystemAgent() { + // TODO: check if agent is allowed to alter things + } + logEntry := &model.LogEntry{ StepID: step.ID, Time: _logEntry.Time, diff --git a/server/queue/fifo.go b/server/queue/fifo.go index 476df8eba..4e3361d1a 100644 --- a/server/queue/fifo.go +++ b/server/queue/fifo.go @@ -187,12 +187,16 @@ func (q *fifo) Wait(c context.Context, id string) error { } // Extend extends the task execution deadline. -func (q *fifo) Extend(_ context.Context, id string) error { +func (q *fifo) Extend(_ context.Context, agentID int64, id string) error { q.Lock() defer q.Unlock() state, ok := q.running[id] if ok { + if state.item.AgentID != agentID { + return ErrAgentMissmatch + } + state.deadline = time.Now().Add(q.extension) return nil } diff --git a/server/queue/queue.go b/server/queue/queue.go index 9cb695c24..fd3b474bf 100644 --- a/server/queue/queue.go +++ b/server/queue/queue.go @@ -28,6 +28,9 @@ var ( // ErrNotFound indicates the task was not found in the queue. ErrNotFound = errors.New("queue: task not found") + + // ErrAgentMissmatch indicates an agent does not mat to the task + ErrAgentMissmatch = errors.New("task has not expected agent id") ) // InfoT provides runtime information. @@ -79,7 +82,7 @@ type Queue interface { Poll(c context.Context, agentID int64, f FilterFn) (*model.Task, error) // Extend extends the deadline for a task. - Extend(c context.Context, id string) error + Extend(c context.Context, agentID int64, id string) error // Done signals the task is complete. Done(c context.Context, id string, exitStatus model.StatusValue) error