// Copyright 2023 Woodpecker Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package logging import ( "context" "sync" "go.woodpecker-ci.org/woodpecker/server/model" ) // TODO (bradrydzewski) writing to subscribers is currently a blocking // operation and does not protect against slow clients from locking // the stream. This should be resolved. // TODO (bradrydzewski) implement a mux.Info to fetch information and // statistics for the multiplexer. Streams, subscribers, etc // mux.Info() // TODO (bradrydzewski) refactor code to place publisher and subscriber // operations in separate files with more encapsulated logic. // sub.push() // sub.join() // sub.start()... event loop type subscriber struct { handler Handler } type stream struct { sync.Mutex stepID int64 list []*model.LogEntry subs map[*subscriber]struct{} done chan struct{} } type log struct { sync.Mutex streams map[int64]*stream } // New returns a new logger. func New() Log { return &log{ streams: map[int64]*stream{}, } } func (l *log) Open(_ context.Context, stepID int64) error { l.Lock() _, ok := l.streams[stepID] if !ok { l.streams[stepID] = &stream{ stepID: stepID, subs: make(map[*subscriber]struct{}), done: make(chan struct{}), } } l.Unlock() return nil } func (l *log) Write(ctx context.Context, stepID int64, logEntry *model.LogEntry) error { l.Lock() s, ok := l.streams[stepID] l.Unlock() // auto open the stream if it does not exist if !ok { err := l.Open(ctx, stepID) if err != nil { return err } s = l.streams[stepID] } s.Lock() s.list = append(s.list, logEntry) for sub := range s.subs { go sub.handler(logEntry) } s.Unlock() return nil } func (l *log) Tail(c context.Context, stepID int64, handler Handler) error { l.Lock() s, ok := l.streams[stepID] l.Unlock() if !ok { return ErrNotFound } sub := &subscriber{ handler: handler, } s.Lock() if len(s.list) != 0 { sub.handler(s.list...) } s.subs[sub] = struct{}{} s.Unlock() select { case <-c.Done(): case <-s.done: } s.Lock() delete(s.subs, sub) s.Unlock() return nil } func (l *log) Close(_ context.Context, stepID int64) error { l.Lock() s, ok := l.streams[stepID] l.Unlock() if !ok { return ErrNotFound } s.Lock() close(s.done) s.Unlock() l.Lock() delete(l.streams, stepID) l.Unlock() return nil }