Rename queue.Description to queue.ManagedQueue as per @guillep2k

This commit is contained in:
Andrew Thornton 2019-12-31 17:17:07 +00:00
parent a10129f74c
commit 6306cd42bc
No known key found for this signature in database
GPG key ID: 3CDE74631F13A748
3 changed files with 69 additions and 69 deletions

View file

@ -23,11 +23,11 @@ type Manager struct {
mutex sync.Mutex mutex sync.Mutex
counter int64 counter int64
Queues map[int64]*Description Queues map[int64]*ManagedQueue
} }
// Description represents a working queue inheriting from Gitea. // ManagedQueue represents a working queue inheriting from Gitea.
type Description struct { type ManagedQueue struct {
mutex sync.Mutex mutex sync.Mutex
QID int64 QID int64
Queue Queue Queue Queue
@ -35,13 +35,13 @@ type Description struct {
Name string Name string
Configuration interface{} Configuration interface{}
ExemplarType string ExemplarType string
Pool PoolManager Pool ManagedPool
counter int64 counter int64
PoolWorkers map[int64]*PoolWorkers PoolWorkers map[int64]*PoolWorkers
} }
// PoolManager is a simple interface to get certain details from a worker pool // ManagedPool is a simple interface to get certain details from a worker pool
type PoolManager interface { type ManagedPool interface {
AddWorkers(number int, timeout time.Duration) context.CancelFunc AddWorkers(number int, timeout time.Duration) context.CancelFunc
NumberOfWorkers() int NumberOfWorkers() int
MaxNumberOfWorkers() int MaxNumberOfWorkers() int
@ -52,8 +52,8 @@ type PoolManager interface {
SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)
} }
// DescriptionList implements the sort.Interface // ManagedQueueList implements the sort.Interface
type DescriptionList []*Description type ManagedQueueList []*ManagedQueue
// PoolWorkers represents a working queue inheriting from Gitea. // PoolWorkers represents a working queue inheriting from Gitea.
type PoolWorkers struct { type PoolWorkers struct {
@ -76,7 +76,7 @@ func init() {
func GetManager() *Manager { func GetManager() *Manager {
if manager == nil { if manager == nil {
manager = &Manager{ manager = &Manager{
Queues: make(map[int64]*Description), Queues: make(map[int64]*ManagedQueue),
} }
} }
return manager return manager
@ -87,10 +87,10 @@ func (m *Manager) Add(queue Queue,
t Type, t Type,
configuration, configuration,
exemplar interface{}, exemplar interface{},
pool PoolManager) int64 { pool ManagedPool) int64 {
cfg, _ := json.Marshal(configuration) cfg, _ := json.Marshal(configuration)
desc := &Description{ mq := &ManagedQueue{
Queue: queue, Queue: queue,
Type: t, Type: t,
Configuration: string(cfg), Configuration: string(cfg),
@ -100,15 +100,15 @@ func (m *Manager) Add(queue Queue,
} }
m.mutex.Lock() m.mutex.Lock()
m.counter++ m.counter++
desc.QID = m.counter mq.QID = m.counter
desc.Name = fmt.Sprintf("queue-%d", desc.QID) mq.Name = fmt.Sprintf("queue-%d", mq.QID)
if named, ok := queue.(Named); ok { if named, ok := queue.(Named); ok {
desc.Name = named.Name() mq.Name = named.Name()
} }
m.Queues[desc.QID] = desc m.Queues[mq.QID] = mq
m.mutex.Unlock() m.mutex.Unlock()
log.Trace("Queue Manager registered: %s (QID: %d)", desc.Name, desc.QID) log.Trace("Queue Manager registered: %s (QID: %d)", mq.Name, mq.QID)
return desc.QID return mq.QID
} }
// Remove a queue from the Manager // Remove a queue from the Manager
@ -120,27 +120,27 @@ func (m *Manager) Remove(qid int64) {
} }
// GetDescription by qid // GetManagedQueue by qid
func (m *Manager) GetDescription(qid int64) *Description { func (m *Manager) GetManagedQueue(qid int64) *ManagedQueue {
m.mutex.Lock() m.mutex.Lock()
defer m.mutex.Unlock() defer m.mutex.Unlock()
return m.Queues[qid] return m.Queues[qid]
} }
// Descriptions returns the queue descriptions // ManagedQueues returns the managed queues
func (m *Manager) Descriptions() []*Description { func (m *Manager) ManagedQueues() []*ManagedQueue {
m.mutex.Lock() m.mutex.Lock()
descs := make([]*Description, 0, len(m.Queues)) mqs := make([]*ManagedQueue, 0, len(m.Queues))
for _, desc := range m.Queues { for _, mq := range m.Queues {
descs = append(descs, desc) mqs = append(mqs, mq)
} }
m.mutex.Unlock() m.mutex.Unlock()
sort.Sort(DescriptionList(descs)) sort.Sort(ManagedQueueList(mqs))
return descs return mqs
} }
// Workers returns the poolworkers // Workers returns the poolworkers
func (q *Description) Workers() []*PoolWorkers { func (q *ManagedQueue) Workers() []*PoolWorkers {
q.mutex.Lock() q.mutex.Lock()
workers := make([]*PoolWorkers, 0, len(q.PoolWorkers)) workers := make([]*PoolWorkers, 0, len(q.PoolWorkers))
for _, worker := range q.PoolWorkers { for _, worker := range q.PoolWorkers {
@ -152,7 +152,7 @@ func (q *Description) Workers() []*PoolWorkers {
} }
// RegisterWorkers registers workers to this queue // RegisterWorkers registers workers to this queue
func (q *Description) RegisterWorkers(number int, start time.Time, hasTimeout bool, timeout time.Time, cancel context.CancelFunc) int64 { func (q *ManagedQueue) RegisterWorkers(number int, start time.Time, hasTimeout bool, timeout time.Time, cancel context.CancelFunc) int64 {
q.mutex.Lock() q.mutex.Lock()
defer q.mutex.Unlock() defer q.mutex.Unlock()
q.counter++ q.counter++
@ -168,7 +168,7 @@ func (q *Description) RegisterWorkers(number int, start time.Time, hasTimeout bo
} }
// CancelWorkers cancels pooled workers with pid // CancelWorkers cancels pooled workers with pid
func (q *Description) CancelWorkers(pid int64) { func (q *ManagedQueue) CancelWorkers(pid int64) {
q.mutex.Lock() q.mutex.Lock()
pw, ok := q.PoolWorkers[pid] pw, ok := q.PoolWorkers[pid]
q.mutex.Unlock() q.mutex.Unlock()
@ -179,14 +179,14 @@ func (q *Description) CancelWorkers(pid int64) {
} }
// RemoveWorkers deletes pooled workers with pid // RemoveWorkers deletes pooled workers with pid
func (q *Description) RemoveWorkers(pid int64) { func (q *ManagedQueue) RemoveWorkers(pid int64) {
q.mutex.Lock() q.mutex.Lock()
delete(q.PoolWorkers, pid) delete(q.PoolWorkers, pid)
q.mutex.Unlock() q.mutex.Unlock()
} }
// AddWorkers adds workers to the queue if it has registered an add worker function // AddWorkers adds workers to the queue if it has registered an add worker function
func (q *Description) AddWorkers(number int, timeout time.Duration) context.CancelFunc { func (q *ManagedQueue) AddWorkers(number int, timeout time.Duration) context.CancelFunc {
if q.Pool != nil { if q.Pool != nil {
// the cancel will be added to the pool workers description above // the cancel will be added to the pool workers description above
return q.Pool.AddWorkers(number, timeout) return q.Pool.AddWorkers(number, timeout)
@ -195,7 +195,7 @@ func (q *Description) AddWorkers(number int, timeout time.Duration) context.Canc
} }
// NumberOfWorkers returns the number of workers in the queue // NumberOfWorkers returns the number of workers in the queue
func (q *Description) NumberOfWorkers() int { func (q *ManagedQueue) NumberOfWorkers() int {
if q.Pool != nil { if q.Pool != nil {
return q.Pool.NumberOfWorkers() return q.Pool.NumberOfWorkers()
} }
@ -203,7 +203,7 @@ func (q *Description) NumberOfWorkers() int {
} }
// MaxNumberOfWorkers returns the maximum number of workers for the pool // MaxNumberOfWorkers returns the maximum number of workers for the pool
func (q *Description) MaxNumberOfWorkers() int { func (q *ManagedQueue) MaxNumberOfWorkers() int {
if q.Pool != nil { if q.Pool != nil {
return q.Pool.MaxNumberOfWorkers() return q.Pool.MaxNumberOfWorkers()
} }
@ -211,7 +211,7 @@ func (q *Description) MaxNumberOfWorkers() int {
} }
// BoostWorkers returns the number of workers for a boost // BoostWorkers returns the number of workers for a boost
func (q *Description) BoostWorkers() int { func (q *ManagedQueue) BoostWorkers() int {
if q.Pool != nil { if q.Pool != nil {
return q.Pool.BoostWorkers() return q.Pool.BoostWorkers()
} }
@ -219,7 +219,7 @@ func (q *Description) BoostWorkers() int {
} }
// BoostTimeout returns the timeout of the next boost // BoostTimeout returns the timeout of the next boost
func (q *Description) BoostTimeout() time.Duration { func (q *ManagedQueue) BoostTimeout() time.Duration {
if q.Pool != nil { if q.Pool != nil {
return q.Pool.BoostTimeout() return q.Pool.BoostTimeout()
} }
@ -227,7 +227,7 @@ func (q *Description) BoostTimeout() time.Duration {
} }
// BlockTimeout returns the timeout til the next boost // BlockTimeout returns the timeout til the next boost
func (q *Description) BlockTimeout() time.Duration { func (q *ManagedQueue) BlockTimeout() time.Duration {
if q.Pool != nil { if q.Pool != nil {
return q.Pool.BlockTimeout() return q.Pool.BlockTimeout()
} }
@ -235,21 +235,21 @@ func (q *Description) BlockTimeout() time.Duration {
} }
// SetSettings sets the setable boost values // SetSettings sets the setable boost values
func (q *Description) SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) { func (q *ManagedQueue) SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) {
if q.Pool != nil { if q.Pool != nil {
q.Pool.SetSettings(maxNumberOfWorkers, boostWorkers, timeout) q.Pool.SetSettings(maxNumberOfWorkers, boostWorkers, timeout)
} }
} }
func (l DescriptionList) Len() int { func (l ManagedQueueList) Len() int {
return len(l) return len(l)
} }
func (l DescriptionList) Less(i, j int) bool { func (l ManagedQueueList) Less(i, j int) bool {
return l[i].Name < l[j].Name return l[i].Name < l[j].Name
} }
func (l DescriptionList) Swap(i, j int) { func (l ManagedQueueList) Swap(i, j int) {
l[i], l[j] = l[j], l[i] l[i], l[j] = l[j], l[i]
} }

View file

@ -71,19 +71,19 @@ func (p *WorkerPool) pushBoost(data Data) {
} }
p.blockTimeout *= 2 p.blockTimeout *= 2
ctx, cancel := context.WithCancel(p.baseCtx) ctx, cancel := context.WithCancel(p.baseCtx)
desc := GetManager().GetDescription(p.qid) mq := GetManager().GetManagedQueue(p.qid)
boost := p.boostWorkers boost := p.boostWorkers
if (boost+p.numberOfWorkers) > p.maxNumberOfWorkers && p.maxNumberOfWorkers >= 0 { if (boost+p.numberOfWorkers) > p.maxNumberOfWorkers && p.maxNumberOfWorkers >= 0 {
boost = p.maxNumberOfWorkers - p.numberOfWorkers boost = p.maxNumberOfWorkers - p.numberOfWorkers
} }
if desc != nil { if mq != nil {
log.Warn("WorkerPool: %d (for %s) Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, desc.Name, ourTimeout, boost, p.boostTimeout, p.blockTimeout) log.Warn("WorkerPool: %d (for %s) Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, mq.Name, ourTimeout, boost, p.boostTimeout, p.blockTimeout)
start := time.Now() start := time.Now()
pid := desc.RegisterWorkers(boost, start, false, start, cancel) pid := mq.RegisterWorkers(boost, start, false, start, cancel)
go func() { go func() {
<-ctx.Done() <-ctx.Done()
desc.RemoveWorkers(pid) mq.RemoveWorkers(pid)
cancel() cancel()
}() }()
} else { } else {
@ -171,15 +171,15 @@ func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.Cance
ctx, cancel = context.WithCancel(p.baseCtx) ctx, cancel = context.WithCancel(p.baseCtx)
} }
desc := GetManager().GetDescription(p.qid) mq := GetManager().GetManagedQueue(p.qid)
if desc != nil { if mq != nil {
pid := desc.RegisterWorkers(number, start, hasTimeout, end, cancel) pid := mq.RegisterWorkers(number, start, hasTimeout, end, cancel)
go func() { go func() {
<-ctx.Done() <-ctx.Done()
desc.RemoveWorkers(pid) mq.RemoveWorkers(pid)
cancel() cancel()
}() }()
log.Trace("WorkerPool: %d (for %s) adding %d workers with group id: %d", p.qid, desc.Name, number, pid) log.Trace("WorkerPool: %d (for %s) adding %d workers with group id: %d", p.qid, mq.Name, number, pid)
} else { } else {
log.Trace("WorkerPool: %d adding %d workers (no group id)", p.qid, number) log.Trace("WorkerPool: %d adding %d workers (no group id)", p.qid, number)

View file

@ -358,7 +358,7 @@ func Monitor(ctx *context.Context) {
ctx.Data["PageIsAdminMonitor"] = true ctx.Data["PageIsAdminMonitor"] = true
ctx.Data["Processes"] = process.GetManager().Processes() ctx.Data["Processes"] = process.GetManager().Processes()
ctx.Data["Entries"] = cron.ListTasks() ctx.Data["Entries"] = cron.ListTasks()
ctx.Data["Queues"] = queue.GetManager().Descriptions() ctx.Data["Queues"] = queue.GetManager().ManagedQueues()
ctx.HTML(200, tplMonitor) ctx.HTML(200, tplMonitor)
} }
@ -374,28 +374,28 @@ func MonitorCancel(ctx *context.Context) {
// Queue shows details for a specific queue // Queue shows details for a specific queue
func Queue(ctx *context.Context) { func Queue(ctx *context.Context) {
qid := ctx.ParamsInt64("qid") qid := ctx.ParamsInt64("qid")
desc := queue.GetManager().GetDescription(qid) mq := queue.GetManager().GetManagedQueue(qid)
if desc == nil { if mq == nil {
ctx.Status(404) ctx.Status(404)
return return
} }
ctx.Data["Title"] = ctx.Tr("admin.monitor.queue", desc.Name) ctx.Data["Title"] = ctx.Tr("admin.monitor.queue", mq.Name)
ctx.Data["PageIsAdmin"] = true ctx.Data["PageIsAdmin"] = true
ctx.Data["PageIsAdminMonitor"] = true ctx.Data["PageIsAdminMonitor"] = true
ctx.Data["Queue"] = desc ctx.Data["Queue"] = mq
ctx.HTML(200, tplQueue) ctx.HTML(200, tplQueue)
} }
// WorkerCancel cancels a worker group // WorkerCancel cancels a worker group
func WorkerCancel(ctx *context.Context) { func WorkerCancel(ctx *context.Context) {
qid := ctx.ParamsInt64("qid") qid := ctx.ParamsInt64("qid")
desc := queue.GetManager().GetDescription(qid) mq := queue.GetManager().GetManagedQueue(qid)
if desc == nil { if mq == nil {
ctx.Status(404) ctx.Status(404)
return return
} }
pid := ctx.ParamsInt64("pid") pid := ctx.ParamsInt64("pid")
desc.CancelWorkers(pid) mq.CancelWorkers(pid)
ctx.Flash.Info(ctx.Tr("admin.monitor.queue.pool.cancelling")) ctx.Flash.Info(ctx.Tr("admin.monitor.queue.pool.cancelling"))
ctx.JSON(200, map[string]interface{}{ ctx.JSON(200, map[string]interface{}{
"redirect": setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid), "redirect": setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid),
@ -405,8 +405,8 @@ func WorkerCancel(ctx *context.Context) {
// AddWorkers adds workers to a worker group // AddWorkers adds workers to a worker group
func AddWorkers(ctx *context.Context) { func AddWorkers(ctx *context.Context) {
qid := ctx.ParamsInt64("qid") qid := ctx.ParamsInt64("qid")
desc := queue.GetManager().GetDescription(qid) mq := queue.GetManager().GetManagedQueue(qid)
if desc == nil { if mq == nil {
ctx.Status(404) ctx.Status(404)
return return
} }
@ -422,12 +422,12 @@ func AddWorkers(ctx *context.Context) {
ctx.Redirect(setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid)) ctx.Redirect(setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid))
return return
} }
if desc.Pool == nil { if mq.Pool == nil {
ctx.Flash.Error(ctx.Tr("admin.monitor.queue.pool.none")) ctx.Flash.Error(ctx.Tr("admin.monitor.queue.pool.none"))
ctx.Redirect(setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid)) ctx.Redirect(setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid))
return return
} }
desc.AddWorkers(number, timeout) mq.AddWorkers(number, timeout)
ctx.Flash.Success(ctx.Tr("admin.monitor.queue.pool.added")) ctx.Flash.Success(ctx.Tr("admin.monitor.queue.pool.added"))
ctx.Redirect(setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid)) ctx.Redirect(setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid))
} }
@ -435,12 +435,12 @@ func AddWorkers(ctx *context.Context) {
// SetQueueSettings sets the maximum number of workers for this queue // SetQueueSettings sets the maximum number of workers for this queue
func SetQueueSettings(ctx *context.Context) { func SetQueueSettings(ctx *context.Context) {
qid := ctx.ParamsInt64("qid") qid := ctx.ParamsInt64("qid")
desc := queue.GetManager().GetDescription(qid) mq := queue.GetManager().GetManagedQueue(qid)
if desc == nil { if mq == nil {
ctx.Status(404) ctx.Status(404)
return return
} }
if desc.Pool == nil { if mq.Pool == nil {
ctx.Flash.Error(ctx.Tr("admin.monitor.queue.pool.none")) ctx.Flash.Error(ctx.Tr("admin.monitor.queue.pool.none"))
ctx.Redirect(setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid)) ctx.Redirect(setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid))
return return
@ -464,7 +464,7 @@ func SetQueueSettings(ctx *context.Context) {
maxNumber = -1 maxNumber = -1
} }
} else { } else {
maxNumber = desc.MaxNumberOfWorkers() maxNumber = mq.MaxNumberOfWorkers()
} }
if len(numberStr) > 0 { if len(numberStr) > 0 {
@ -475,7 +475,7 @@ func SetQueueSettings(ctx *context.Context) {
return return
} }
} else { } else {
number = desc.BoostWorkers() number = mq.BoostWorkers()
} }
if len(timeoutStr) > 0 { if len(timeoutStr) > 0 {
@ -486,10 +486,10 @@ func SetQueueSettings(ctx *context.Context) {
return return
} }
} else { } else {
timeout = desc.Pool.BoostTimeout() timeout = mq.Pool.BoostTimeout()
} }
desc.SetSettings(maxNumber, number, timeout) mq.SetSettings(maxNumber, number, timeout)
ctx.Flash.Success(ctx.Tr("admin.monitor.queue.settings.changed")) ctx.Flash.Success(ctx.Tr("admin.monitor.queue.settings.changed"))
ctx.Redirect(setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid)) ctx.Redirect(setting.AppSubURL + fmt.Sprintf("/admin/monitor/queue/%d", qid))
} }