diff --git a/modules/queue/manager.go b/modules/queue/manager.go index a2deb8ff7c..d26836e7c8 100644 --- a/modules/queue/manager.go +++ b/modules/queue/manager.go @@ -23,11 +23,11 @@ type Manager struct { mutex sync.Mutex counter int64 - Queues map[int64]*Description + Queues map[int64]*ManagedQueue } -// Description represents a working queue inheriting from Gitea. -type Description struct { +// ManagedQueue represents a working queue inheriting from Gitea. +type ManagedQueue struct { mutex sync.Mutex QID int64 Queue Queue @@ -35,13 +35,13 @@ type Description struct { Name string Configuration interface{} ExemplarType string - Pool PoolManager + Pool ManagedPool counter int64 PoolWorkers map[int64]*PoolWorkers } -// PoolManager is a simple interface to get certain details from a worker pool -type PoolManager interface { +// ManagedPool is a simple interface to get certain details from a worker pool +type ManagedPool interface { AddWorkers(number int, timeout time.Duration) context.CancelFunc NumberOfWorkers() int MaxNumberOfWorkers() int @@ -52,8 +52,8 @@ type PoolManager interface { SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) } -// DescriptionList implements the sort.Interface -type DescriptionList []*Description +// ManagedQueueList implements the sort.Interface +type ManagedQueueList []*ManagedQueue // PoolWorkers represents a working queue inheriting from Gitea. type PoolWorkers struct { @@ -76,7 +76,7 @@ func init() { func GetManager() *Manager { if manager == nil { manager = &Manager{ - Queues: make(map[int64]*Description), + Queues: make(map[int64]*ManagedQueue), } } return manager @@ -87,10 +87,10 @@ func (m *Manager) Add(queue Queue, t Type, configuration, exemplar interface{}, - pool PoolManager) int64 { + pool ManagedPool) int64 { cfg, _ := json.Marshal(configuration) - desc := &Description{ + mq := &ManagedQueue{ Queue: queue, Type: t, Configuration: string(cfg), @@ -100,15 +100,15 @@ func (m *Manager) Add(queue Queue, } m.mutex.Lock() m.counter++ - desc.QID = m.counter - desc.Name = fmt.Sprintf("queue-%d", desc.QID) + mq.QID = m.counter + mq.Name = fmt.Sprintf("queue-%d", mq.QID) if named, ok := queue.(Named); ok { - desc.Name = named.Name() + mq.Name = named.Name() } - m.Queues[desc.QID] = desc + m.Queues[mq.QID] = mq m.mutex.Unlock() - log.Trace("Queue Manager registered: %s (QID: %d)", desc.Name, desc.QID) - return desc.QID + log.Trace("Queue Manager registered: %s (QID: %d)", mq.Name, mq.QID) + return mq.QID } // Remove a queue from the Manager @@ -120,27 +120,27 @@ func (m *Manager) Remove(qid int64) { } -// GetDescription by qid -func (m *Manager) GetDescription(qid int64) *Description { +// GetManagedQueue by qid +func (m *Manager) GetManagedQueue(qid int64) *ManagedQueue { m.mutex.Lock() defer m.mutex.Unlock() return m.Queues[qid] } -// Descriptions returns the queue descriptions -func (m *Manager) Descriptions() []*Description { +// ManagedQueues returns the managed queues +func (m *Manager) ManagedQueues() []*ManagedQueue { m.mutex.Lock() - descs := make([]*Description, 0, len(m.Queues)) - for _, desc := range m.Queues { - descs = append(descs, desc) + mqs := make([]*ManagedQueue, 0, len(m.Queues)) + for _, mq := range m.Queues { + mqs = append(mqs, mq) } m.mutex.Unlock() - sort.Sort(DescriptionList(descs)) - return descs + sort.Sort(ManagedQueueList(mqs)) + return mqs } // Workers returns the poolworkers -func (q *Description) Workers() []*PoolWorkers { +func (q *ManagedQueue) Workers() []*PoolWorkers { q.mutex.Lock() workers := make([]*PoolWorkers, 0, len(q.PoolWorkers)) for _, worker := range q.PoolWorkers { @@ -152,7 +152,7 @@ func (q *Description) Workers() []*PoolWorkers { } // RegisterWorkers registers workers to this queue -func (q *Description) RegisterWorkers(number int, start time.Time, hasTimeout bool, timeout time.Time, cancel context.CancelFunc) int64 { +func (q *ManagedQueue) RegisterWorkers(number int, start time.Time, hasTimeout bool, timeout time.Time, cancel context.CancelFunc) int64 { q.mutex.Lock() defer q.mutex.Unlock() q.counter++ @@ -168,7 +168,7 @@ func (q *Description) RegisterWorkers(number int, start time.Time, hasTimeout bo } // CancelWorkers cancels pooled workers with pid -func (q *Description) CancelWorkers(pid int64) { +func (q *ManagedQueue) CancelWorkers(pid int64) { q.mutex.Lock() pw, ok := q.PoolWorkers[pid] q.mutex.Unlock() @@ -179,14 +179,14 @@ func (q *Description) CancelWorkers(pid int64) { } // RemoveWorkers deletes pooled workers with pid -func (q *Description) RemoveWorkers(pid int64) { +func (q *ManagedQueue) RemoveWorkers(pid int64) { q.mutex.Lock() delete(q.PoolWorkers, pid) q.mutex.Unlock() } // AddWorkers adds workers to the queue if it has registered an add worker function -func (q *Description) AddWorkers(number int, timeout time.Duration) context.CancelFunc { +func (q *ManagedQueue) AddWorkers(number int, timeout time.Duration) context.CancelFunc { if q.Pool != nil { // the cancel will be added to the pool workers description above return q.Pool.AddWorkers(number, timeout) @@ -195,7 +195,7 @@ func (q *Description) AddWorkers(number int, timeout time.Duration) context.Canc } // NumberOfWorkers returns the number of workers in the queue -func (q *Description) NumberOfWorkers() int { +func (q *ManagedQueue) NumberOfWorkers() int { if q.Pool != nil { return q.Pool.NumberOfWorkers() } @@ -203,7 +203,7 @@ func (q *Description) NumberOfWorkers() int { } // MaxNumberOfWorkers returns the maximum number of workers for the pool -func (q *Description) MaxNumberOfWorkers() int { +func (q *ManagedQueue) MaxNumberOfWorkers() int { if q.Pool != nil { return q.Pool.MaxNumberOfWorkers() } @@ -211,7 +211,7 @@ func (q *Description) MaxNumberOfWorkers() int { } // BoostWorkers returns the number of workers for a boost -func (q *Description) BoostWorkers() int { +func (q *ManagedQueue) BoostWorkers() int { if q.Pool != nil { return q.Pool.BoostWorkers() } @@ -219,7 +219,7 @@ func (q *Description) BoostWorkers() int { } // BoostTimeout returns the timeout of the next boost -func (q *Description) BoostTimeout() time.Duration { +func (q *ManagedQueue) BoostTimeout() time.Duration { if q.Pool != nil { return q.Pool.BoostTimeout() } @@ -227,7 +227,7 @@ func (q *Description) BoostTimeout() time.Duration { } // BlockTimeout returns the timeout til the next boost -func (q *Description) BlockTimeout() time.Duration { +func (q *ManagedQueue) BlockTimeout() time.Duration { if q.Pool != nil { return q.Pool.BlockTimeout() } @@ -235,21 +235,21 @@ func (q *Description) BlockTimeout() time.Duration { } // SetSettings sets the setable boost values -func (q *Description) SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) { +func (q *ManagedQueue) SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) { if q.Pool != nil { q.Pool.SetSettings(maxNumberOfWorkers, boostWorkers, timeout) } } -func (l DescriptionList) Len() int { +func (l ManagedQueueList) Len() int { return len(l) } -func (l DescriptionList) Less(i, j int) bool { +func (l ManagedQueueList) Less(i, j int) bool { return l[i].Name < l[j].Name } -func (l DescriptionList) Swap(i, j int) { +func (l ManagedQueueList) Swap(i, j int) { l[i], l[j] = l[j], l[i] } diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go index 98a68cd041..e92c1ec315 100644 --- a/modules/queue/workerpool.go +++ b/modules/queue/workerpool.go @@ -71,19 +71,19 @@ func (p *WorkerPool) pushBoost(data Data) { } p.blockTimeout *= 2 ctx, cancel := context.WithCancel(p.baseCtx) - desc := GetManager().GetDescription(p.qid) + mq := GetManager().GetManagedQueue(p.qid) boost := p.boostWorkers if (boost+p.numberOfWorkers) > p.maxNumberOfWorkers && p.maxNumberOfWorkers >= 0 { boost = p.maxNumberOfWorkers - p.numberOfWorkers } - if desc != nil { - log.Warn("WorkerPool: %d (for %s) Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, desc.Name, ourTimeout, boost, p.boostTimeout, p.blockTimeout) + if mq != nil { + log.Warn("WorkerPool: %d (for %s) Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, mq.Name, ourTimeout, boost, p.boostTimeout, p.blockTimeout) start := time.Now() - pid := desc.RegisterWorkers(boost, start, false, start, cancel) + pid := mq.RegisterWorkers(boost, start, false, start, cancel) go func() { <-ctx.Done() - desc.RemoveWorkers(pid) + mq.RemoveWorkers(pid) cancel() }() } else { @@ -171,15 +171,15 @@ func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.Cance ctx, cancel = context.WithCancel(p.baseCtx) } - desc := GetManager().GetDescription(p.qid) - if desc != nil { - pid := desc.RegisterWorkers(number, start, hasTimeout, end, cancel) + mq := GetManager().GetManagedQueue(p.qid) + if mq != nil { + pid := mq.RegisterWorkers(number, start, hasTimeout, end, cancel) go func() { <-ctx.Done() - desc.RemoveWorkers(pid) + mq.RemoveWorkers(pid) cancel() }() - log.Trace("WorkerPool: %d (for %s) adding %d workers with group id: %d", p.qid, desc.Name, number, pid) + log.Trace("WorkerPool: %d (for %s) adding %d workers with group id: %d", p.qid, mq.Name, number, pid) } else { log.Trace("WorkerPool: %d adding %d workers (no group id)", p.qid, number) diff --git a/routers/admin/admin.go b/routers/admin/admin.go index 299da8b46c..5e8e0b7467 100644 --- a/routers/admin/admin.go +++ b/routers/admin/admin.go @@ -358,7 +358,7 @@ func Monitor(ctx *context.Context) { ctx.Data["PageIsAdminMonitor"] = true ctx.Data["Processes"] = process.GetManager().Processes() ctx.Data["Entries"] = cron.ListTasks() - ctx.Data["Queues"] = queue.GetManager().Descriptions() + ctx.Data["Queues"] = queue.GetManager().ManagedQueues() ctx.HTML(200, tplMonitor) } @@ -374,28 +374,28 @@ func MonitorCancel(ctx *context.Context) { // Queue shows details for a specific queue func Queue(ctx *context.Context) { qid := ctx.ParamsInt64("qid") - desc := queue.GetManager().GetDescription(qid) - if desc == nil { + mq := queue.GetManager().GetManagedQueue(qid) + if mq == nil { ctx.Status(404) return } - ctx.Data["Title"] = ctx.Tr("admin.monitor.queue", desc.Name) + ctx.Data["Title"] = ctx.Tr("admin.monitor.queue", mq.Name) ctx.Data["PageIsAdmin"] = true ctx.Data["PageIsAdminMonitor"] = true - ctx.Data["Queue"] = desc + ctx.Data["Queue"] = mq ctx.HTML(200, tplQueue) } // WorkerCancel cancels a worker group func WorkerCancel(ctx *context.Context) { qid := ctx.ParamsInt64("qid") - desc := queue.GetManager().GetDescription(qid) - if desc == nil { + mq := queue.GetManager().GetManagedQueue(qid) + if mq == nil { ctx.Status(404) return } pid := ctx.ParamsInt64("pid") - desc.CancelWorkers(pid) + mq.CancelWorkers(pid) ctx.Flash.Info(ctx.Tr("admin.monitor.queue.pool.cancelling")) ctx.JSON(200, map[string]interface{}{ "redirect": setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid), @@ -405,8 +405,8 @@ func WorkerCancel(ctx *context.Context) { // AddWorkers adds workers to a worker group func AddWorkers(ctx *context.Context) { qid := ctx.ParamsInt64("qid") - desc := queue.GetManager().GetDescription(qid) - if desc == nil { + mq := queue.GetManager().GetManagedQueue(qid) + if mq == nil { ctx.Status(404) return } @@ -422,12 +422,12 @@ func AddWorkers(ctx *context.Context) { ctx.Redirect(setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid)) return } - if desc.Pool == nil { + if mq.Pool == nil { ctx.Flash.Error(ctx.Tr("admin.monitor.queue.pool.none")) ctx.Redirect(setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid)) return } - desc.AddWorkers(number, timeout) + mq.AddWorkers(number, timeout) ctx.Flash.Success(ctx.Tr("admin.monitor.queue.pool.added")) ctx.Redirect(setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid)) } @@ -435,12 +435,12 @@ func AddWorkers(ctx *context.Context) { // SetQueueSettings sets the maximum number of workers for this queue func SetQueueSettings(ctx *context.Context) { qid := ctx.ParamsInt64("qid") - desc := queue.GetManager().GetDescription(qid) - if desc == nil { + mq := queue.GetManager().GetManagedQueue(qid) + if mq == nil { ctx.Status(404) return } - if desc.Pool == nil { + if mq.Pool == nil { ctx.Flash.Error(ctx.Tr("admin.monitor.queue.pool.none")) ctx.Redirect(setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid)) return @@ -464,7 +464,7 @@ func SetQueueSettings(ctx *context.Context) { maxNumber = -1 } } else { - maxNumber = desc.MaxNumberOfWorkers() + maxNumber = mq.MaxNumberOfWorkers() } if len(numberStr) > 0 { @@ -475,7 +475,7 @@ func SetQueueSettings(ctx *context.Context) { return } } else { - number = desc.BoostWorkers() + number = mq.BoostWorkers() } if len(timeoutStr) > 0 { @@ -486,10 +486,10 @@ func SetQueueSettings(ctx *context.Context) { return } } else { - timeout = desc.Pool.BoostTimeout() + timeout = mq.Pool.BoostTimeout() } - desc.SetSettings(maxNumber, number, timeout) + mq.SetSettings(maxNumber, number, timeout) ctx.Flash.Success(ctx.Tr("admin.monitor.queue.settings.changed")) ctx.Redirect(setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid)) }