mirror of
https://github.com/woodpecker-ci/woodpecker.git
synced 2025-01-23 15:48:44 +00:00
119 lines
2.3 KiB
Go
119 lines
2.3 KiB
Go
|
package pubsub
|
||
|
|
||
|
import (
|
||
|
"time"
|
||
|
)
|
||
|
|
||
|
type Channel struct {
|
||
|
record bool
|
||
|
history []interface{}
|
||
|
timeout time.Duration
|
||
|
closed chan bool
|
||
|
broadcast chan interface{}
|
||
|
subscribe chan *Subscription
|
||
|
unsubscribe chan *Subscription
|
||
|
subscriptions map[*Subscription]bool
|
||
|
}
|
||
|
|
||
|
func NewChannel(opts *Opts) *Channel {
|
||
|
return &Channel{
|
||
|
timeout: opts.Timeout,
|
||
|
record: opts.Record,
|
||
|
history: make([]interface{}, 0),
|
||
|
closed: make(chan bool),
|
||
|
broadcast: make(chan interface{}),
|
||
|
subscribe: make(chan *Subscription),
|
||
|
unsubscribe: make(chan *Subscription),
|
||
|
subscriptions: make(map[*Subscription]bool),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (c *Channel) Publish(data interface{}) {
|
||
|
c.broadcast <- data
|
||
|
return
|
||
|
}
|
||
|
|
||
|
func (c *Channel) Subscribe() *Subscription {
|
||
|
s := NewSubscription(c)
|
||
|
c.subscribe <- s
|
||
|
return s
|
||
|
}
|
||
|
|
||
|
func (c *Channel) Close() {
|
||
|
go func() { c.closed <- true }()
|
||
|
}
|
||
|
|
||
|
func (c *Channel) start() {
|
||
|
// make sure we don't bring down the application
|
||
|
// if somehow we encounter a nil pointer or some
|
||
|
// other unexpected behavior.
|
||
|
defer func() {
|
||
|
recover()
|
||
|
}()
|
||
|
|
||
|
// timeout the channel after N duration
|
||
|
// ignore the timeout if set to 0
|
||
|
var timeout <-chan time.Time
|
||
|
if c.timeout > 0 {
|
||
|
timeout = time.After(c.timeout)
|
||
|
}
|
||
|
|
||
|
for {
|
||
|
select {
|
||
|
|
||
|
case sub := <-c.unsubscribe:
|
||
|
delete(c.subscriptions, sub)
|
||
|
close(sub.send)
|
||
|
|
||
|
case sub := <-c.subscribe:
|
||
|
c.subscriptions[sub] = true
|
||
|
|
||
|
// if we are recording the output
|
||
|
// we should send it to the subscriber
|
||
|
// upon first connecting.
|
||
|
if c.record && len(c.history) > 0 {
|
||
|
history := make([]interface{}, len(c.history))
|
||
|
copy(history, c.history)
|
||
|
go replay(sub, history)
|
||
|
}
|
||
|
|
||
|
case msg := <-c.broadcast:
|
||
|
// if we are recording the output, append
|
||
|
// the message to the history
|
||
|
if c.record {
|
||
|
c.history = append(c.history, msg)
|
||
|
}
|
||
|
|
||
|
// loop through each subscription and
|
||
|
// send the message.
|
||
|
for sub := range c.subscriptions {
|
||
|
select {
|
||
|
case sub.send <- msg:
|
||
|
// do nothing
|
||
|
default:
|
||
|
sub.Close()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
case <-timeout:
|
||
|
c.Close()
|
||
|
|
||
|
case <-c.closed:
|
||
|
c.stop()
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func replay(s *Subscription, history []interface{}) {
|
||
|
for _, msg := range history {
|
||
|
s.send <- msg
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (c *Channel) stop() {
|
||
|
for sub := range c.subscriptions {
|
||
|
sub.Close()
|
||
|
}
|
||
|
}
|