// Copyright 2022 Woodpecker Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package queue import ( "container/list" "context" "fmt" "sync" "time" "github.com/rs/zerolog/log" "go.woodpecker-ci.org/woodpecker/v2/server/model" "go.woodpecker-ci.org/woodpecker/v2/shared/constant" ) type entry struct { item *model.Task done chan bool error error deadline time.Time } type worker struct { agentID int64 filter FilterFn channel chan *model.Task stop context.CancelCauseFunc } type fifo struct { sync.Mutex ctx context.Context workers map[*worker]struct{} running map[string]*entry pending *list.List waitingOnDeps *list.List extension time.Duration paused bool } // processTimeInterval is the time till the queue rearranges things, // as the agent pull in 10 milliseconds we should also give them work asap. const processTimeInterval = 100 * time.Millisecond var ErrWorkerKicked = fmt.Errorf("worker was kicked") // NewMemoryQueue returns a new fifo queue. func NewMemoryQueue(ctx context.Context) Queue { q := &fifo{ ctx: ctx, workers: map[*worker]struct{}{}, running: map[string]*entry{}, pending: list.New(), waitingOnDeps: list.New(), extension: constant.TaskTimeout, paused: false, } go q.process() return q } // Push pushes a task to the tail of this queue. func (q *fifo) Push(_ context.Context, task *model.Task) error { q.Lock() q.pending.PushBack(task) q.Unlock() return nil } // PushAtOnce pushes multiple tasks to the tail of this queue. func (q *fifo) PushAtOnce(_ context.Context, tasks []*model.Task) error { q.Lock() for _, task := range tasks { q.pending.PushBack(task) } q.Unlock() return nil } // Poll retrieves and removes a task head of this queue. func (q *fifo) Poll(c context.Context, agentID int64, filter FilterFn) (*model.Task, error) { q.Lock() ctx, stop := context.WithCancelCause(c) _worker := &worker{ agentID: agentID, channel: make(chan *model.Task, 1), filter: filter, stop: stop, } q.workers[_worker] = struct{}{} q.Unlock() for { select { case <-ctx.Done(): q.Lock() delete(q.workers, _worker) q.Unlock() return nil, ctx.Err() case t := <-_worker.channel: return t, nil } } } // Done signals the task is complete. func (q *fifo) Done(_ context.Context, id string, exitStatus model.StatusValue) error { return q.finished([]string{id}, exitStatus, nil) } // Error signals the task is done with an error. func (q *fifo) Error(_ context.Context, id string, err error) error { return q.finished([]string{id}, model.StatusFailure, err) } // ErrorAtOnce signals multiple done are complete with an error. func (q *fifo) ErrorAtOnce(_ context.Context, ids []string, err error) error { return q.finished(ids, model.StatusFailure, err) } func (q *fifo) finished(ids []string, exitStatus model.StatusValue, err error) error { q.Lock() for _, id := range ids { taskEntry, ok := q.running[id] if ok { taskEntry.error = err close(taskEntry.done) delete(q.running, id) } else { q.removeFromPending(id) } q.updateDepStatusInQueue(id, exitStatus) } q.Unlock() return nil } // Evict removes a pending task from the queue. func (q *fifo) Evict(ctx context.Context, taskID string) error { return q.EvictAtOnce(ctx, []string{taskID}) } // EvictAtOnce removes multiple pending tasks from the queue. func (q *fifo) EvictAtOnce(_ context.Context, taskIDs []string) error { q.Lock() defer q.Unlock() for _, id := range taskIDs { var next *list.Element for element := q.pending.Front(); element != nil; element = next { next = element.Next() task, ok := element.Value.(*model.Task) if ok && task.ID == id { q.pending.Remove(element) return nil } } } return ErrNotFound } // Wait waits until the item is done executing. func (q *fifo) Wait(ctx context.Context, taskID string) error { q.Lock() state := q.running[taskID] q.Unlock() if state != nil { select { case <-ctx.Done(): case <-state.done: return state.error } } return nil } // Extend extends the task execution deadline. func (q *fifo) Extend(_ context.Context, agentID int64, taskID string) error { q.Lock() defer q.Unlock() state, ok := q.running[taskID] if ok { if state.item.AgentID != agentID { return ErrAgentMissMatch } state.deadline = time.Now().Add(q.extension) return nil } return ErrNotFound } // Info returns internal queue information. func (q *fifo) Info(_ context.Context) InfoT { q.Lock() stats := InfoT{} stats.Stats.Workers = len(q.workers) stats.Stats.Pending = q.pending.Len() stats.Stats.WaitingOnDeps = q.waitingOnDeps.Len() stats.Stats.Running = len(q.running) for element := q.pending.Front(); element != nil; element = element.Next() { task, _ := element.Value.(*model.Task) stats.Pending = append(stats.Pending, task) } for element := q.waitingOnDeps.Front(); element != nil; element = element.Next() { task, _ := element.Value.(*model.Task) stats.WaitingOnDeps = append(stats.WaitingOnDeps, task) } for _, entry := range q.running { stats.Running = append(stats.Running, entry.item) } stats.Paused = q.paused q.Unlock() return stats } // Pause stops the queue from handing out new work items in Poll. func (q *fifo) Pause() { q.Lock() q.paused = true q.Unlock() } // Resume starts the queue again. func (q *fifo) Resume() { q.Lock() q.paused = false q.Unlock() } // KickAgentWorkers kicks all workers for a given agent. func (q *fifo) KickAgentWorkers(agentID int64) { q.Lock() defer q.Unlock() for worker := range q.workers { if worker.agentID == agentID { worker.stop(ErrWorkerKicked) delete(q.workers, worker) } } } // helper function that loops through the queue and attempts to // match the item to a single subscriber until context got cancel. func (q *fifo) process() { for { select { case <-time.After(processTimeInterval): case <-q.ctx.Done(): return } q.Lock() if q.paused { q.Unlock() continue } q.resubmitExpiredPipelines() q.filterWaiting() for pending, worker := q.assignToWorker(); pending != nil && worker != nil; pending, worker = q.assignToWorker() { task, _ := pending.Value.(*model.Task) task.AgentID = worker.agentID delete(q.workers, worker) q.pending.Remove(pending) q.running[task.ID] = &entry{ item: task, done: make(chan bool), deadline: time.Now().Add(q.extension), } worker.channel <- task } q.Unlock() } } func (q *fifo) filterWaiting() { // resubmits all waiting tasks to pending, deps may have cleared var nextWaiting *list.Element for e := q.waitingOnDeps.Front(); e != nil; e = nextWaiting { nextWaiting = e.Next() task, _ := e.Value.(*model.Task) q.pending.PushBack(task) } // rebuild waitingDeps q.waitingOnDeps = list.New() var filtered []*list.Element var nextPending *list.Element for element := q.pending.Front(); element != nil; element = nextPending { nextPending = element.Next() task, _ := element.Value.(*model.Task) if q.depsInQueue(task) { log.Debug().Msgf("queue: waiting due to unmet dependencies %v", task.ID) q.waitingOnDeps.PushBack(task) filtered = append(filtered, element) } } // filter waiting tasks for _, f := range filtered { q.pending.Remove(f) } } func (q *fifo) assignToWorker() (*list.Element, *worker) { var next *list.Element var bestWorker *worker var bestScore int for element := q.pending.Front(); element != nil; element = next { next = element.Next() task, _ := element.Value.(*model.Task) log.Debug().Msgf("queue: trying to assign task: %v with deps %v", task.ID, task.Dependencies) for worker := range q.workers { matched, score := worker.filter(task) if matched && score > bestScore { bestWorker = worker bestScore = score } } if bestWorker != nil { log.Debug().Msgf("queue: assigned task: %v with deps %v to worker with score %d", task.ID, task.Dependencies, bestScore) return element, bestWorker } } return nil, nil } func (q *fifo) resubmitExpiredPipelines() { for taskID, taskState := range q.running { if time.Now().After(taskState.deadline) { q.pending.PushFront(taskState.item) delete(q.running, taskID) close(taskState.done) } } } func (q *fifo) depsInQueue(task *model.Task) bool { var next *list.Element for element := q.pending.Front(); element != nil; element = next { next = element.Next() possibleDep, ok := element.Value.(*model.Task) log.Debug().Msgf("queue: pending right now: %v", possibleDep.ID) for _, dep := range task.Dependencies { if ok && possibleDep.ID == dep { return true } } } for possibleDepID := range q.running { log.Debug().Msgf("queue: running right now: %v", possibleDepID) for _, dep := range task.Dependencies { if possibleDepID == dep { return true } } } return false } func (q *fifo) updateDepStatusInQueue(taskID string, status model.StatusValue) { var next *list.Element for element := q.pending.Front(); element != nil; element = next { next = element.Next() pending, ok := element.Value.(*model.Task) for _, dep := range pending.Dependencies { if ok && taskID == dep { pending.DepStatus[dep] = status } } } for _, running := range q.running { for _, dep := range running.item.Dependencies { if taskID == dep { running.item.DepStatus[dep] = status } } } for element := q.waitingOnDeps.Front(); element != nil; element = next { next = element.Next() waiting, ok := element.Value.(*model.Task) for _, dep := range waiting.Dependencies { if ok && taskID == dep { waiting.DepStatus[dep] = status } } } } func (q *fifo) removeFromPending(taskID string) { log.Debug().Msgf("queue: trying to remove %s", taskID) var next *list.Element for element := q.pending.Front(); element != nil; element = next { next = element.Next() task, _ := element.Value.(*model.Task) if task.ID == taskID { log.Debug().Msgf("queue: %s is removed from pending", taskID) q.pending.Remove(element) return } } }