Rename grpc pipeline to workflow (#2173)

closes #1823

Co-authored-by: 6543 <6543@obermui.de>
This commit is contained in:
Anbraten 2023-08-21 18:30:19 +02:00 committed by GitHub
parent 81ead7cbf2
commit 4de8cbec76
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 37 additions and 37 deletions

View file

@ -27,7 +27,7 @@ import (
"github.com/woodpecker-ci/woodpecker/pipeline/rpc" "github.com/woodpecker-ci/woodpecker/pipeline/rpc"
) )
func (r *Runner) createLogger(logger zerolog.Logger, uploads *sync.WaitGroup, work *rpc.Pipeline) pipeline.LogFunc { func (r *Runner) createLogger(logger zerolog.Logger, uploads *sync.WaitGroup, workflow *rpc.Workflow) pipeline.LogFunc {
return func(step *backend.Step, rc multipart.Reader) error { return func(step *backend.Step, rc multipart.Reader) error {
loglogger := logger.With(). loglogger := logger.With().
Str("image", step.Image). Str("image", step.Image).
@ -41,7 +41,7 @@ func (r *Runner) createLogger(logger zerolog.Logger, uploads *sync.WaitGroup, wo
uploads.Add(1) uploads.Add(1)
var secrets []string var secrets []string
for _, secret := range work.Config.Secrets { for _, secret := range workflow.Config.Secrets {
if secret.Mask { if secret.Mask {
secrets = append(secrets, secret.Value) secrets = append(secrets, secret.Value)
} }

View file

@ -70,8 +70,8 @@ func (c *client) Version(ctx context.Context) (*rpc.Version, error) {
}, nil }, nil
} }
// Next returns the next pipeline in the queue. // Next returns the next workflow in the queue.
func (c *client) Next(ctx context.Context, f rpc.Filter) (*rpc.Pipeline, error) { func (c *client) Next(ctx context.Context, f rpc.Filter) (*rpc.Workflow, error) {
var res *proto.NextResponse var res *proto.NextResponse
var err error var err error
retry := c.newBackOff() retry := c.newBackOff()
@ -115,17 +115,17 @@ func (c *client) Next(ctx context.Context, f rpc.Filter) (*rpc.Pipeline, error)
return nil, nil return nil, nil
} }
p := new(rpc.Pipeline) w := new(rpc.Workflow)
p.ID = res.GetPipeline().GetId() w.ID = res.GetPipeline().GetId()
p.Timeout = res.GetPipeline().GetTimeout() w.Timeout = res.GetPipeline().GetTimeout()
p.Config = new(backend.Config) w.Config = new(backend.Config)
if err := json.Unmarshal(res.GetPipeline().GetPayload(), p.Config); err != nil { if err := json.Unmarshal(res.GetPipeline().GetPayload(), w.Config); err != nil {
log.Error().Err(err).Msgf("could not unmarshal pipeline config of '%s'", p.ID) log.Error().Err(err).Msgf("could not unmarshal workflow config of '%s'", w.ID)
} }
return p, nil return w, nil
} }
// Wait blocks until the pipeline is complete. // Wait blocks until the workflow is complete.
func (c *client) Wait(ctx context.Context, id string) (err error) { func (c *client) Wait(ctx context.Context, id string) (err error) {
retry := c.newBackOff() retry := c.newBackOff()
req := new(proto.WaitRequest) req := new(proto.WaitRequest)
@ -159,7 +159,7 @@ func (c *client) Wait(ctx context.Context, id string) (err error) {
return nil return nil
} }
// Init signals the pipeline is initialized. // Init signals the workflow is initialized.
func (c *client) Init(ctx context.Context, id string, state rpc.State) (err error) { func (c *client) Init(ctx context.Context, id string, state rpc.State) (err error) {
retry := c.newBackOff() retry := c.newBackOff()
req := new(proto.InitRequest) req := new(proto.InitRequest)
@ -200,7 +200,7 @@ func (c *client) Init(ctx context.Context, id string, state rpc.State) (err erro
return nil return nil
} }
// Done signals the pipeline is complete. // Done signals the work is complete.
func (c *client) Done(ctx context.Context, id string, state rpc.State) (err error) { func (c *client) Done(ctx context.Context, id string, state rpc.State) (err error) {
retry := c.newBackOff() retry := c.newBackOff()
req := new(proto.DoneRequest) req := new(proto.DoneRequest)
@ -241,7 +241,7 @@ func (c *client) Done(ctx context.Context, id string, state rpc.State) (err erro
return nil return nil
} }
// Extend extends the pipeline deadline // Extend extends the workflow deadline
func (c *client) Extend(ctx context.Context, id string) (err error) { func (c *client) Extend(ctx context.Context, id string) (err error) {
retry := c.newBackOff() retry := c.newBackOff()
req := new(proto.ExtendRequest) req := new(proto.ExtendRequest)
@ -275,7 +275,7 @@ func (c *client) Extend(ctx context.Context, id string) (err error) {
return nil return nil
} }
// Update updates the pipeline state. // Update updates the workflow state.
func (c *client) Update(ctx context.Context, id string, state rpc.State) (err error) { func (c *client) Update(ctx context.Context, id string, state rpc.State) (err error) {
retry := c.newBackOff() retry := c.newBackOff()
req := new(proto.UpdateRequest) req := new(proto.UpdateRequest)
@ -316,7 +316,7 @@ func (c *client) Update(ctx context.Context, id string, state rpc.State) (err er
return nil return nil
} }
// Log writes the pipeline log entry. // Log writes the workflow log entry.
func (c *client) Log(ctx context.Context, logEntry *rpc.LogEntry) (err error) { func (c *client) Log(ctx context.Context, logEntry *rpc.LogEntry) (err error) {
retry := c.newBackOff() retry := c.newBackOff()
req := new(proto.LogRequest) req := new(proto.LogRequest)

View file

@ -26,7 +26,7 @@ import (
"github.com/woodpecker-ci/woodpecker/pipeline/rpc" "github.com/woodpecker-ci/woodpecker/pipeline/rpc"
) )
func (r *Runner) createTracer(ctxmeta context.Context, logger zerolog.Logger, work *rpc.Pipeline) pipeline.TraceFunc { func (r *Runner) createTracer(ctxmeta context.Context, logger zerolog.Logger, workflow *rpc.Workflow) pipeline.TraceFunc {
return func(state *pipeline.State) error { return func(state *pipeline.State) error {
steplogger := logger.With(). steplogger := logger.With().
Str("image", state.Pipeline.Step.Image). Str("image", state.Pipeline.Step.Image).
@ -50,7 +50,7 @@ func (r *Runner) createTracer(ctxmeta context.Context, logger zerolog.Logger, wo
defer func() { defer func() {
steplogger.Debug().Msg("update step status") steplogger.Debug().Msg("update step status")
if uerr := r.client.Update(ctxmeta, work.ID, stepState); uerr != nil { if uerr := r.client.Update(ctxmeta, workflow.ID, stepState); uerr != nil {
steplogger.Debug(). steplogger.Debug().
Err(uerr). Err(uerr).
Msg("update step status error") Msg("update step status error")

View file

@ -27,7 +27,7 @@ type (
Labels map[string]string `json:"labels"` Labels map[string]string `json:"labels"`
} }
// State defines the pipeline state. // State defines the workflow state.
State struct { State struct {
Step string `json:"step"` Step string `json:"step"`
Exited bool `json:"exited"` Exited bool `json:"exited"`
@ -37,8 +37,8 @@ type (
Error string `json:"error"` Error string `json:"error"`
} }
// Pipeline defines the pipeline execution details. // Workflow defines the workflow execution details.
Pipeline struct { Workflow struct {
ID string `json:"id"` ID string `json:"id"`
Config *backend.Config `json:"config"` Config *backend.Config `json:"config"`
Timeout int64 `json:"timeout"` Timeout int64 `json:"timeout"`
@ -55,25 +55,25 @@ type Peer interface {
// Version returns the server- & grpc-version // Version returns the server- & grpc-version
Version(c context.Context) (*Version, error) Version(c context.Context) (*Version, error)
// Next returns the next pipeline in the queue. // Next returns the next workflow in the queue
Next(c context.Context, f Filter) (*Pipeline, error) Next(c context.Context, f Filter) (*Workflow, error)
// Wait blocks until the pipeline is complete. // Wait blocks until the workflow is complete
Wait(c context.Context, id string) error Wait(c context.Context, id string) error
// Init signals the pipeline is initialized. // Init signals the workflow is initialized
Init(c context.Context, id string, state State) error Init(c context.Context, id string, state State) error
// Done signals the pipeline is complete. // Done signals the workflow is complete
Done(c context.Context, id string, state State) error Done(c context.Context, id string, state State) error
// Extend extends the pipeline deadline // Extend extends the workflow deadline
Extend(c context.Context, id string) error Extend(c context.Context, id string) error
// Update updates the pipeline state. // Update updates the workflow state
Update(c context.Context, id string, state State) error Update(c context.Context, id string, state State) error
// Log writes the pipeline log entry. // Log writes the workflow log entry
Log(c context.Context, logEntry *LogEntry) error Log(c context.Context, logEntry *LogEntry) error
// RegisterAgent register our agent to the server // RegisterAgent register our agent to the server

View file

@ -61,7 +61,7 @@ message Filter {
map<string, string> labels = 1; map<string, string> labels = 1;
} }
message Pipeline { message Workflow {
string id = 1; string id = 1;
int64 timeout = 2; int64 timeout = 2;
bytes payload = 3; bytes payload = 3;
@ -126,7 +126,7 @@ message VersionResponse {
} }
message NextResponse { message NextResponse {
Pipeline pipeline = 1; Workflow workflow = 1;
} }
message RegisterAgentResponse { message RegisterAgentResponse {

View file

@ -53,7 +53,7 @@ type RPC struct {
} }
// Next implements the rpc.Next function // Next implements the rpc.Next function
func (s *RPC) Next(c context.Context, agentFilter rpc.Filter) (*rpc.Pipeline, error) { func (s *RPC) Next(c context.Context, agentFilter rpc.Filter) (*rpc.Workflow, error) {
metadata, ok := grpcMetadata.FromIncomingContext(c) metadata, ok := grpcMetadata.FromIncomingContext(c)
if ok { if ok {
hostname, ok := metadata["hostname"] hostname, ok := metadata["hostname"]
@ -82,9 +82,9 @@ func (s *RPC) Next(c context.Context, agentFilter rpc.Filter) (*rpc.Pipeline, er
} }
if task.ShouldRun() { if task.ShouldRun() {
pipeline := new(rpc.Pipeline) workflow := new(rpc.Workflow)
err = json.Unmarshal(task.Data, pipeline) err = json.Unmarshal(task.Data, workflow)
return pipeline, err return workflow, err
} }
if err := s.Done(c, task.ID, rpc.State{}); err != nil { if err := s.Done(c, task.ID, rpc.State{}); err != nil {

View file

@ -42,7 +42,7 @@ func queuePipeline(repo *model.Repo, pipelineItems []*pipeline.Item) error {
task.RunOn = item.RunsOn task.RunOn = item.RunsOn
task.DepStatus = make(map[string]model.StatusValue) task.DepStatus = make(map[string]model.StatusValue)
task.Data, _ = json.Marshal(rpc.Pipeline{ task.Data, _ = json.Marshal(rpc.Workflow{
ID: fmt.Sprint(item.Workflow.ID), ID: fmt.Sprint(item.Workflow.ID),
Config: item.Config, Config: item.Config,
Timeout: repo.Timeout, Timeout: repo.Timeout,