Refactor/simplify pubsub (#2554)

This commit is contained in:
qwerty287 2023-10-13 07:34:33 +02:00 committed by GitHub
parent b1cedecc42
commit 0e5defa807
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 45 additions and 335 deletions

View file

@ -15,7 +15,6 @@
package main
import (
"context"
"crypto/tls"
"net"
"net/http"
@ -264,9 +263,6 @@ func setupEvilGlobals(c *cli.Context, v store.Store, f forge.Forge) {
server.Config.Services.Queue = setupQueue(c, v)
server.Config.Services.Logs = logging.New()
server.Config.Services.Pubsub = pubsub.New()
if err := server.Config.Services.Pubsub.Create(context.Background(), "topic/events"); err != nil {
log.Error().Err(err).Msg("could not create pubsub service")
}
server.Config.Services.Registries = setupRegistryService(c, v)
// TODO(1544): fix encrypted store

View file

@ -79,7 +79,7 @@ func EventStreamSSE(c *gin.Context) {
}()
go func() {
err := server.Config.Services.Pubsub.Subscribe(ctx, "topic/events", func(m pubsub.Message) {
server.Config.Services.Pubsub.Subscribe(ctx, func(m pubsub.Message) {
defer func() {
obj := recover() // fix #2480 // TODO: check if it's still needed
log.Trace().Msgf("pubsub subscribe recover return: %v", obj)
@ -95,10 +95,7 @@ func EventStreamSSE(c *gin.Context) {
}
}
})
if err != nil {
log.Error().Err(err).Msg("Subscribe failed")
}
cancel(err)
cancel(nil)
}()
for {

View file

@ -32,7 +32,7 @@ import (
var Config = struct {
Services struct {
Pubsub pubsub.Publisher
Pubsub *pubsub.Publisher
Queue queue.Queue
Logs logging.Log
Secrets model.SecretService

View file

@ -43,7 +43,7 @@ import (
type RPC struct {
forge forge.Forge
queue queue.Queue
pubsub pubsub.Publisher
pubsub *pubsub.Publisher
logger logging.Log
store store.Store
pipelineTime *prometheus.GaugeVec
@ -102,7 +102,7 @@ func (s *RPC) Extend(c context.Context, id string) error {
}
// Update implements the rpc.Update function
func (s *RPC) Update(c context.Context, id string, state rpc.State) error {
func (s *RPC) Update(_ context.Context, id string, state rpc.State) error {
workflowID, err := strconv.ParseInt(id, 10, 64)
if err != nil {
return err
@ -150,9 +150,7 @@ func (s *RPC) Update(c context.Context, id string, state rpc.State) error {
Repo: *repo,
Pipeline: *currentPipeline,
})
if err := s.pubsub.Publish(c, "topic/events", message); err != nil {
log.Error().Err(err).Msg("can not publish step list to")
}
s.pubsub.Publish(message)
return nil
}
@ -208,9 +206,7 @@ func (s *RPC) Init(c context.Context, id string, state rpc.State) error {
Repo: *repo,
Pipeline: *currentPipeline,
})
if err := s.pubsub.Publish(c, "topic/events", message); err != nil {
log.Error().Err(err).Msg("can not publish step list to")
}
s.pubsub.Publish(message)
}()
workflow, err = pipeline.UpdateWorkflowToStatusStarted(s.store, *workflow, state)
@ -297,7 +293,7 @@ func (s *RPC) Done(c context.Context, id string, state rpc.State) error {
}
}()
if err := s.notify(c, repo, currentPipeline); err != nil {
if err := s.notify(repo, currentPipeline); err != nil {
return err
}
@ -399,7 +395,7 @@ func (s *RPC) updateForgeStatus(ctx context.Context, repo *model.Repo, pipeline
}
}
func (s *RPC) notify(c context.Context, repo *model.Repo, pipeline *model.Pipeline) (err error) {
func (s *RPC) notify(repo *model.Repo, pipeline *model.Pipeline) (err error) {
message := pubsub.Message{
Labels: map[string]string{
"repo": repo.FullName,
@ -410,9 +406,7 @@ func (s *RPC) notify(c context.Context, repo *model.Repo, pipeline *model.Pipeli
Repo: *repo,
Pipeline: *pipeline,
})
if err := s.pubsub.Publish(c, "topic/events", message); err != nil {
log.Error().Err(err).Msgf("grpc could not notify event: '%v'", message)
}
s.pubsub.Publish(message)
return nil
}

View file

@ -37,7 +37,7 @@ type WoodpeckerServer struct {
peer RPC
}
func NewWoodpeckerServer(forge forge.Forge, queue queue.Queue, logger logging.Log, pubsub pubsub.Publisher, store store.Store) proto.WoodpeckerServer {
func NewWoodpeckerServer(forge forge.Forge, queue queue.Queue, logger logging.Log, pubsub *pubsub.Publisher, store store.Store) proto.WoodpeckerServer {
pipelineTime := promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "woodpecker",
Name: "pipeline_time",

View file

@ -93,9 +93,7 @@ func Cancel(ctx context.Context, store store.Store, repo *model.Repo, user *mode
if killedPipeline.Workflows, err = store.WorkflowGetTree(killedPipeline); err != nil {
return err
}
if err := publishToTopic(ctx, killedPipeline, repo); err != nil {
log.Error().Err(err).Msg("publishToTopic")
}
publishToTopic(killedPipeline, repo)
return nil
}

View file

@ -24,7 +24,7 @@ import (
"github.com/woodpecker-ci/woodpecker/server/store"
)
// Decline update the status to declined for blocked pipeline because of a gated repo
// Decline updates the status to declined for blocked pipelines because of a gated repo
func Decline(ctx context.Context, store store.Store, pipeline *model.Pipeline, user *model.User, repo *model.Repo) (*model.Pipeline, error) {
if pipeline.Status != model.StatusBlocked {
return nil, fmt.Errorf("cannot decline a pipeline with status %s", pipeline.Status)
@ -41,9 +41,7 @@ func Decline(ctx context.Context, store store.Store, pipeline *model.Pipeline, u
updatePipelineStatus(ctx, pipeline, repo, user)
if err := publishToTopic(ctx, pipeline, repo); err != nil {
log.Error().Err(err).Msg("publishToTopic")
}
publishToTopic(pipeline, repo)
return pipeline, nil
}

View file

@ -61,9 +61,6 @@ func start(ctx context.Context, store store.Store, activePipeline *model.Pipelin
}
func publishPipeline(ctx context.Context, pipeline *model.Pipeline, repo *model.Repo, repoUser *model.User) {
if err := publishToTopic(ctx, pipeline, repo); err != nil {
log.Error().Err(err).Msg("publishToTopic")
}
publishToTopic(pipeline, repo)
updatePipelineStatus(ctx, pipeline, repo, repoUser)
}

View file

@ -15,7 +15,6 @@
package pipeline
import (
"context"
"encoding/json"
"strconv"
@ -25,7 +24,7 @@ import (
)
// publishToTopic publishes message to UI clients
func publishToTopic(c context.Context, pipeline *model.Pipeline, repo *model.Repo) (err error) {
func publishToTopic(pipeline *model.Pipeline, repo *model.Repo) {
message := pubsub.Message{
Labels: map[string]string{
"repo": repo.FullName,
@ -38,5 +37,5 @@ func publishToTopic(c context.Context, pipeline *model.Pipeline, repo *model.Rep
Repo: *repo,
Pipeline: pipelineCopy,
})
return server.Config.Services.Pubsub.Publish(c, "topic/events", message)
server.Config.Services.Pubsub.Publish(message)
}

View file

@ -1,12 +0,0 @@
# pubusb package
Go package provides a common interface for publish-subscriber messaging.
## History
This was originally published in: https://github.com/cncd/pubsub
Then it was included in: https://github.com/drone-ci/drone/cncd/pubsub
## Documentation:
https://godoc.org/github.com/woodpecker-ci/woodpecker/server/pubsub

View file

@ -19,71 +19,48 @@ import (
"sync"
)
type subscriber struct {
receiver Receiver
// Message defines a published message.
type Message struct {
// ID identifies this message.
ID string `json:"id,omitempty"`
// Data is the actual data in the entry.
Data []byte `json:"data"`
// Labels represents the key-value pairs the entry is labeled with.
Labels map[string]string `json:"labels,omitempty"`
}
type publisher struct {
// Receiver receives published messages.
type Receiver func(Message)
type Publisher struct {
sync.Mutex
topics map[string]*topic
subs map[*Receiver]struct{}
}
// New creates an in-memory publisher.
func New() Publisher {
return &publisher{
topics: make(map[string]*topic),
func New() *Publisher {
return &Publisher{
subs: make(map[*Receiver]struct{}),
}
}
func (p *publisher) Create(_ context.Context, dest string) error {
func (p *Publisher) Publish(message Message) {
p.Lock()
_, ok := p.topics[dest]
if !ok {
t := newTopic(dest)
p.topics[dest] = t
for s := range p.subs {
go (*s)(message)
}
p.Unlock()
return nil
}
func (p *publisher) Publish(_ context.Context, dest string, message Message) error {
func (p *Publisher) Subscribe(c context.Context, receiver Receiver) {
p.Lock()
t, ok := p.topics[dest]
p.subs[&receiver] = struct{}{}
p.Unlock()
if !ok {
return ErrNotFound
}
t.publish(message)
return nil
}
func (p *publisher) Subscribe(c context.Context, dest string, receiver Receiver) error {
<-c.Done()
p.Lock()
t, ok := p.topics[dest]
delete(p.subs, &receiver)
p.Unlock()
if !ok {
return ErrNotFound
}
s := &subscriber{
receiver: receiver,
}
t.subscribe(s)
select {
case <-c.Done():
case <-t.done:
}
t.unsubscribe(s)
return nil
}
func (p *publisher) Remove(_ context.Context, dest string) error {
p.Lock()
t, ok := p.topics[dest]
if ok {
delete(p.topics, dest)
t.close()
}
p.Unlock()
return nil
}

View file

@ -16,7 +16,6 @@ package pubsub
import (
"context"
"errors"
"sync"
"testing"
"time"
@ -28,7 +27,6 @@ func TestPubsub(t *testing.T) {
var (
wg sync.WaitGroup
testTopic = "test"
testMessage = Message{
Data: []byte("test"),
}
@ -39,81 +37,20 @@ func TestPubsub(t *testing.T) {
)
broker := New()
assert.NoError(t, broker.Create(ctx, testTopic))
go func() {
assert.NoError(t, broker.Subscribe(ctx, testTopic, func(message Message) { wg.Done() }))
broker.Subscribe(ctx, func(message Message) { assert.Equal(t, testMessage, message); wg.Done() })
}()
go func() {
assert.NoError(t, broker.Subscribe(ctx, testTopic, func(message Message) { wg.Done() }))
broker.Subscribe(ctx, func(message Message) { wg.Done() })
}()
<-time.After(500 * time.Millisecond)
if _, ok := broker.(*publisher).topics[testTopic]; !ok {
t.Errorf("Expect topic registered with publisher")
}
wg.Add(2)
go func() {
assert.NoError(t, broker.Publish(ctx, testTopic, testMessage))
broker.Publish(testMessage)
}()
wg.Wait()
cancel(nil)
}
func TestPublishNotFound(t *testing.T) {
var (
testTopic = "test"
testMessage = Message{
Data: []byte("test"),
}
)
broker := New()
err := broker.Publish(context.Background(), testTopic, testMessage)
if !errors.Is(err, ErrNotFound) {
t.Errorf("Expect Not Found error when topic does not exist")
}
}
func TestSubscribeNotFound(t *testing.T) {
var (
testTopic = "test"
testCallback = func(message Message) {}
)
broker := New()
err := broker.Subscribe(context.Background(), testTopic, testCallback)
if !errors.Is(err, ErrNotFound) {
t.Errorf("Expect Not Found error when topic does not exist")
}
}
func TestSubscriptionClosed(t *testing.T) {
var (
wg sync.WaitGroup
testTopic = "test"
testCallback = func(Message) {}
)
broker := New()
assert.NoError(t, broker.Create(context.Background(), testTopic))
go func() {
assert.NoError(t, broker.Subscribe(context.Background(), testTopic, testCallback))
wg.Done()
}()
<-time.After(500 * time.Millisecond)
if _, ok := broker.(*publisher).topics[testTopic]; !ok {
t.Errorf("Expect topic registered with publisher")
}
wg.Add(1)
assert.NoError(t, broker.Remove(context.Background(), testTopic))
wg.Wait()
if _, ok := broker.(*publisher).topics[testTopic]; ok {
t.Errorf("Expect topic removed from publisher")
}
}

View file

@ -1,56 +0,0 @@
// Copyright 2023 Woodpecker Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package pubsub implements a publish-subscriber messaging system.
package pubsub
import (
"context"
"errors"
)
// ErrNotFound is returned when the named topic does not exist.
var ErrNotFound = errors.New("topic not found")
// Message defines a published message.
type Message struct {
// ID identifies this message.
ID string `json:"id,omitempty"`
// Data is the actual data in the entry.
Data []byte `json:"data"`
// Labels represents the key-value pairs the entry is labeled with.
Labels map[string]string `json:"labels,omitempty"`
}
// Receiver receives published messages.
type Receiver func(Message)
// Publisher defines a mechanism for communicating messages from a group
// of senders, called publishers, to a group of consumers.
type Publisher interface {
// Create creates the named topic.
Create(c context.Context, topic string) error
// Publish publishes the message.
Publish(c context.Context, topic string, message Message) error
// Subscribe subscribes to the topic. The Receiver function is a callback
// function that receives published messages.
Subscribe(c context.Context, topic string, receiver Receiver) error
// Remove removes the named topic.
Remove(c context.Context, topic string) error
}

View file

@ -1,59 +0,0 @@
// Copyright 2023 Woodpecker Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pubsub
import "sync"
type topic struct {
sync.Mutex
name string
done chan struct{}
subs map[*subscriber]struct{}
}
func newTopic(dest string) *topic {
return &topic{
name: dest,
done: make(chan struct{}),
subs: make(map[*subscriber]struct{}),
}
}
func (t *topic) subscribe(s *subscriber) {
t.Lock()
t.subs[s] = struct{}{}
t.Unlock()
}
func (t *topic) unsubscribe(s *subscriber) {
t.Lock()
delete(t.subs, s)
t.Unlock()
}
func (t *topic) publish(m Message) {
t.Lock()
for s := range t.subs {
go s.receiver(m)
}
t.Unlock()
}
func (t *topic) close() {
t.Lock()
close(t.done)
t.Unlock()
}

View file

@ -1,56 +0,0 @@
// Copyright 2023 Woodpecker Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pubsub
import (
"testing"
"time"
)
func TestTopicSubscribe(t *testing.T) {
sub := new(subscriber)
top := newTopic("foo")
top.subscribe(sub)
if _, ok := top.subs[sub]; !ok {
t.Errorf("Expect subscription registered with topic on subscribe")
}
}
func TestTopicUnsubscribe(t *testing.T) {
sub := new(subscriber)
top := newTopic("foo")
top.subscribe(sub)
if _, ok := top.subs[sub]; !ok {
t.Errorf("Expect subscription registered with topic on subscribe")
}
top.unsubscribe(sub)
if _, ok := top.subs[sub]; ok {
t.Errorf("Expect subscription de-registered with topic on unsubscribe")
}
}
func TestTopicClose(t *testing.T) {
sub := new(subscriber)
top := newTopic("foo")
top.subscribe(sub)
go func() {
top.close()
}()
select {
case <-top.done:
case <-time.After(1 * time.Second):
t.Errorf("Expect subscription closed")
}
}