woodpecker/pipeline/rpc/log_entry.go
Fernando Barbosa 01699eaaab fix: k8s agent fails to tail logs starving the cpu
Proposal to fix https://github.com/woodpecker-ci/woodpecker/issues/2253

We have observed several possibly-related issues on a Kubernetes
backend:

1. Agents behave erractly when dealing with certain log payloads. A common
   observation here is that steps that produce a large volume of logs will cause
   some steps to be stuck "pending" forever.

2. Agents use way more CPU than should be expected, we often see 200-300
   millicores of CPU per Workflow per agent (as reported on #2253).

3. We commonly see Agents displaying thousands of error lines about
   parsing logs, often with very close timestamps, which may explain issues 1
   and 2 (as reported on #2253).

```
{"level":"error","error":"rpc error: code = Internal desc = grpc: error while marshaling: string field contains invalid UTF-8","time":"2024-04-05T21:32:25Z","caller":"/src/agent/rpc/client_grpc.go:335","message":"grpc error: log(): code: Internal"}
{"level":"error","error":"rpc error: code = Internal desc = grpc: error while marshaling: string field contains invalid UTF-8","time":"2024-04-05T21:32:25Z","caller":"/src/agent/rpc/client_grpc.go:335","message":"grpc error: log(): code: Internal"}
{"level":"error","error":"rpc error: code = Internal desc = grpc: error while marshaling: string field contains invalid UTF-8","time":"2024-04-05T21:32:25Z","caller":"/src/agent/rpc/client_grpc.go:335","message":"grpc error: log(): code: Internal"}
```

4. We've also observed that agents will sometimes drop out of the worker queue,
also as reported on #2253.

Seeing as the logs point to `client_grpc.go:335`, this pull request
fixes the issue by:

1. Removing codes.Internal from being a retryable GRPC status. Now agent GRPC
calls that fail with codes. Internal will not be retried. There's not an
agreement on what GRPC codes should be retried but Internal does not seem to be
a common one to retry -- if ever.

2. Add a timeout of 30 seconds to any retries. Currently, the exponential
retries have a maximum timeout of _15 minutes_. I assume this might be
required by some other functions so Agents resume their operation in
case the webserver restarts. Still this is likely the cause behind the
large cpu increase as agents can be stuck trying thousands of requests for
a large windown of time. The previous change alone should be enough to
solve this issue but I think this might be a good idea to prevent
similar problems from arising in the future.
2024-04-08 16:32:29 -03:00

113 lines
2.7 KiB
Go

// Copyright 2022 Woodpecker Authors
// Copyright 2011 Drone.IO Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rpc
import (
"context"
"fmt"
"strings"
"time"
"github.com/rs/zerolog/log"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/shared"
)
// Identifies the type of line in the logs.
const (
LogEntryStdout int = iota
LogEntryStderr
LogEntryExitCode
LogEntryMetadata
LogEntryProgress
)
// Line is a line of console output.
type LogEntry struct {
StepUUID string `json:"step_uuid,omitempty"`
Time int64 `json:"time,omitempty"`
Type int `json:"type,omitempty"`
Line int `json:"line,omitempty"`
Data string `json:"data,omitempty"`
}
func (l *LogEntry) String() string {
switch l.Type {
case LogEntryExitCode:
return fmt.Sprintf("[%s] exit code %s", l.StepUUID, l.Data)
default:
return fmt.Sprintf("[%s:L%v:%vs] %s", l.StepUUID, l.Line, l.Time, l.Data)
}
}
// LineWriter sends logs to the client.
type LineWriter struct {
peer Peer
stepUUID string
num int
now time.Time
rep *strings.Replacer
lines []*LogEntry
}
// NewLineWriter returns a new line reader.
func NewLineWriter(peer Peer, stepUUID string, secret ...string) *LineWriter {
return &LineWriter{
peer: peer,
stepUUID: stepUUID,
now: time.Now().UTC(),
rep: shared.NewSecretsReplacer(secret),
lines: nil,
}
}
func (w *LineWriter) Write(p []byte) (n int, err error) {
data := string(p)
if w.rep != nil {
data = w.rep.Replace(data)
}
log.Trace().Str("step-uuid", w.stepUUID).Msgf("grpc write line: %s", data)
line := &LogEntry{
Data: data,
StepUUID: w.stepUUID,
Time: int64(time.Since(w.now).Seconds()),
Type: LogEntryStdout,
Line: w.num,
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := w.peer.Log(ctx, line); err != nil {
log.Error().Err(err).Str("step-uuid", w.stepUUID).Msg("fail to write pipeline log to peer")
}
w.num++
w.lines = append(w.lines, line)
return len(p), nil
}
// Lines returns the line history
func (w *LineWriter) Lines() []*LogEntry {
return w.lines
}
// Clear clears the line history
func (w *LineWriter) Clear() {
w.lines = w.lines[:0]
}