mirror of
https://github.com/woodpecker-ci/woodpecker.git
synced 2025-01-24 16:18:42 +00:00
134 lines
2.5 KiB
Go
134 lines
2.5 KiB
Go
|
package broadcast
|
||
|
|
||
|
type taggedObservation struct {
|
||
|
sub *subObserver
|
||
|
ob interface{}
|
||
|
}
|
||
|
|
||
|
const (
|
||
|
register = iota
|
||
|
unregister
|
||
|
purge
|
||
|
)
|
||
|
|
||
|
type taggedRegReq struct {
|
||
|
sub *subObserver
|
||
|
ch chan<- interface{}
|
||
|
regType int
|
||
|
}
|
||
|
|
||
|
// A MuxObserver multiplexes several streams of observations onto a
|
||
|
// single delivery goroutine.
|
||
|
type MuxObserver struct {
|
||
|
subs map[*subObserver]map[chan<- interface{}]bool
|
||
|
reg chan taggedRegReq
|
||
|
input chan taggedObservation
|
||
|
}
|
||
|
|
||
|
// NewMuxObserver constructs a new MuxObserver.
|
||
|
//
|
||
|
// qlen is the size of the channel buffer for observations sent into
|
||
|
// the mux observer and reglen is the size of the channel buffer for
|
||
|
// registration/unregistration events.
|
||
|
func NewMuxObserver(qlen, reglen int) *MuxObserver {
|
||
|
rv := &MuxObserver{
|
||
|
subs: map[*subObserver]map[chan<- interface{}]bool{},
|
||
|
reg: make(chan taggedRegReq, reglen),
|
||
|
input: make(chan taggedObservation, qlen),
|
||
|
}
|
||
|
go rv.run()
|
||
|
return rv
|
||
|
}
|
||
|
|
||
|
// Close shuts down this mux observer.
|
||
|
func (m *MuxObserver) Close() error {
|
||
|
close(m.reg)
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (m *MuxObserver) broadcast(to taggedObservation) {
|
||
|
for ch := range m.subs[to.sub] {
|
||
|
ch <- to.ob
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (m *MuxObserver) doReg(tr taggedRegReq) {
|
||
|
mm, exists := m.subs[tr.sub]
|
||
|
if !exists {
|
||
|
mm = map[chan<- interface{}]bool{}
|
||
|
m.subs[tr.sub] = mm
|
||
|
}
|
||
|
mm[tr.ch] = true
|
||
|
}
|
||
|
|
||
|
func (m *MuxObserver) doUnreg(tr taggedRegReq) {
|
||
|
mm, exists := m.subs[tr.sub]
|
||
|
if exists {
|
||
|
delete(mm, tr.ch)
|
||
|
if len(mm) == 0 {
|
||
|
delete(m.subs, tr.sub)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (m *MuxObserver) handleReg(tr taggedRegReq) {
|
||
|
switch tr.regType {
|
||
|
case register:
|
||
|
m.doReg(tr)
|
||
|
case unregister:
|
||
|
m.doUnreg(tr)
|
||
|
case purge:
|
||
|
delete(m.subs, tr.sub)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (m *MuxObserver) run() {
|
||
|
for {
|
||
|
select {
|
||
|
case tr, ok := <-m.reg:
|
||
|
if ok {
|
||
|
m.handleReg(tr)
|
||
|
} else {
|
||
|
return
|
||
|
}
|
||
|
default:
|
||
|
select {
|
||
|
case to := <-m.input:
|
||
|
m.broadcast(to)
|
||
|
case tr, ok := <-m.reg:
|
||
|
if ok {
|
||
|
m.handleReg(tr)
|
||
|
} else {
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Sub creates a new sub-broadcaster from this MuxObserver.
|
||
|
func (m *MuxObserver) Sub() Broadcaster {
|
||
|
return &subObserver{m}
|
||
|
}
|
||
|
|
||
|
type subObserver struct {
|
||
|
mo *MuxObserver
|
||
|
}
|
||
|
|
||
|
func (s *subObserver) Register(ch chan<- interface{}) {
|
||
|
s.mo.reg <- taggedRegReq{s, ch, register}
|
||
|
}
|
||
|
|
||
|
func (s *subObserver) Unregister(ch chan<- interface{}) {
|
||
|
s.mo.reg <- taggedRegReq{s, ch, unregister}
|
||
|
}
|
||
|
|
||
|
func (s *subObserver) Close() error {
|
||
|
s.mo.reg <- taggedRegReq{s, nil, purge}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (s *subObserver) Submit(ob interface{}) {
|
||
|
s.mo.input <- taggedObservation{s, ob}
|
||
|
}
|