diff --git a/server/grpc/auth_server.go b/server/grpc/auth_server.go index cc2036a25..7e5e45c6a 100644 --- a/server/grpc/auth_server.go +++ b/server/grpc/auth_server.go @@ -62,7 +62,7 @@ func (s *WoodpeckerAuthServer) getAgent(agentID int64, agentToken string) (*mode if agentToken == s.agentMasterToken && agentID == -1 { agent := new(model.Agent) agent.Name = "" - agent.OwnerID = -1 // system agent + agent.OrgID = -1 // system agent agent.Token = s.agentMasterToken agent.Backend = "" agent.Platform = "" diff --git a/server/grpc/rpc.go b/server/grpc/rpc.go index 598cd9103..b84a150d8 100644 --- a/server/grpc/rpc.go +++ b/server/grpc/rpc.go @@ -61,6 +61,11 @@ func (s *RPC) Next(c context.Context, agentFilter rpc.Filter) (*rpc.Workflow, er return nil, err } + if agent.NoSchedule { + time.Sleep(1 * time.Second) + return nil, nil + } + // enforce server set agent filters agentFilters, err := agent.GetFilters() if err != nil { @@ -73,11 +78,6 @@ func (s *RPC) Next(c context.Context, agentFilter rpc.Filter) (*rpc.Workflow, er filterFn := createFilterFunc(agentFilter) - if agent.NoSchedule { - time.Sleep(1 * time.Second) - return nil, nil - } - for { // poll blocks until a task is available or the context is canceled / worker is kicked task, err := s.queue.Poll(c, agent.ID, filterFn) @@ -110,6 +110,37 @@ func (s *RPC) Extend(c context.Context, id string) error { return err } + workflowID, err := strconv.ParseInt(id, 10, 64) + if err != nil { + return err + } + + workflow, err := s.store.WorkflowLoad(workflowID) + if err != nil { + log.Error().Err(err).Msgf("rpc.update: cannot find workflow with id %d", workflowID) + return err + } + + currentPipeline, err := s.store.GetPipeline(workflow.PipelineID) + if err != nil { + log.Error().Err(err).Msgf("cannot find pipeline with id %d", workflow.PipelineID) + return err + } + + repo, err := s.store.GetRepo(currentPipeline.RepoID) + if err != nil { + log.Error().Err(err).Msgf("cannot find repo with id %d", currentPipeline.RepoID) + return err + } + + if !agent.CanAccessRepo(repo) { + msg := fmt.Sprintf("agent '%d' is not allowed to interact with repo '%s'", agent.ID, repo.FullName) + log.Error(). + Int64("repoId", repo.ID). + Msg(msg) + return fmt.Errorf(msg) + } + return s.queue.Extend(c, agent.ID, id) } @@ -137,10 +168,6 @@ func (s *RPC) Update(c context.Context, id string, state rpc.State) error { return err } - if !agent.IsSystemAgent() { - // TODO: check if agent is allowed to alter things - } - step, err := s.store.StepByUUID(state.StepUUID) if err != nil { log.Error().Err(err).Msgf("cannot find step with uuid %s", state.StepUUID) @@ -162,6 +189,14 @@ func (s *RPC) Update(c context.Context, id string, state rpc.State) error { return err } + if !agent.CanAccessRepo(repo) { + msg := fmt.Sprintf("agent '%d' is not allowed to interact with repo '%s'", agent.ID, repo.FullName) + log.Error(). + Int64("repoId", repo.ID). + Msg(msg) + return fmt.Errorf(msg) + } + if err := pipeline.UpdateStepStatus(s.store, step, state); err != nil { log.Error().Err(err).Msg("rpc.update: cannot update step") } @@ -206,10 +241,6 @@ func (s *RPC) Init(c context.Context, id string, state rpc.State) error { return err } - if !agent.IsSystemAgent() { - // TODO: check if agent is allowed to alter things - } - workflow.AgentID = agent.ID currentPipeline, err := s.store.GetPipeline(workflow.PipelineID) @@ -224,6 +255,14 @@ func (s *RPC) Init(c context.Context, id string, state rpc.State) error { return err } + if !agent.CanAccessRepo(repo) { + msg := fmt.Sprintf("agent '%d' is not allowed to interact with repo '%s'", agent.ID, repo.FullName) + log.Error(). + Int64("repoId", repo.ID). + Msg(msg) + return fmt.Errorf(msg) + } + if currentPipeline.Status == model.StatusPending { if currentPipeline, err = pipeline.UpdateToStatusRunning(s.store, *currentPipeline, state.Started); err != nil { log.Error().Err(err).Msgf("init: cannot update build_id %d state", currentPipeline.ID) @@ -294,8 +333,12 @@ func (s *RPC) Done(c context.Context, id string, state rpc.State) error { return err } - if !agent.IsSystemAgent() { - // TODO: check if agent is allowed to alter things + if !agent.CanAccessRepo(repo) { + msg := fmt.Sprintf("agent '%d' is not allowed to interact with repo '%s'", agent.ID, repo.FullName) + log.Error(). + Int64("repoId", repo.ID). + Msg(msg) + return fmt.Errorf(msg) } logger := log.With(). @@ -370,8 +413,24 @@ func (s *RPC) Log(c context.Context, _logEntry *rpc.LogEntry) error { return err } - if !agent.IsSystemAgent() { - // TODO: check if agent is allowed to alter things + currentPipeline, err := s.store.GetPipeline(step.PipelineID) + if err != nil { + log.Error().Err(err).Msgf("cannot find pipeline with id %d", step.PipelineID) + return err + } + + repo, err := s.store.GetRepo(currentPipeline.RepoID) + if err != nil { + log.Error().Err(err).Msgf("cannot find repo with id %d", currentPipeline.RepoID) + return err + } + + if !agent.CanAccessRepo(repo) { + msg := fmt.Sprintf("agent '%d' is not allowed to interact with repo '%s'", agent.ID, repo.FullName) + log.Error(). + Int64("repoId", repo.ID). + Msg(msg) + return fmt.Errorf(msg) } logEntry := &model.LogEntry{ diff --git a/server/grpc/rpc_test.go b/server/grpc/rpc_test.go index 1fd850b8a..6bccac1b7 100644 --- a/server/grpc/rpc_test.go +++ b/server/grpc/rpc_test.go @@ -38,7 +38,7 @@ func TestRegisterAgent(t *testing.T) { Created: 0, Updated: 0, Name: "hostname", - OwnerID: 0, + OrgID: 0, Token: "", LastContact: 0, Platform: "platform", @@ -78,7 +78,7 @@ func TestRegisterAgent(t *testing.T) { Created: 0, Updated: 0, Name: "originalHostname", - OwnerID: 0, + OrgID: 0, Token: "", LastContact: 0, Platform: "platform", diff --git a/server/model/agent.go b/server/model/agent.go index fd3937b53..d3df14d78 100644 --- a/server/model/agent.go +++ b/server/model/agent.go @@ -14,14 +14,17 @@ package model -import "errors" +import ( + "errors" + "fmt" +) type Agent struct { ID int64 `json:"id" xorm:"pk autoincr 'id'"` Created int64 `json:"created" xorm:"created"` Updated int64 `json:"updated" xorm:"updated"` Name string `json:"name" xorm:"name"` - OwnerID int64 `json:"owner_id" xorm:"'owner_id'"` + OrgID int64 `json:"owner_id" xorm:"'owner_id'"` // TODO: rename to org_id Token string `json:"token" xorm:"token"` LastContact int64 `json:"last_contact" xorm:"last_contact"` Platform string `json:"platform" xorm:"VARCHAR(100) 'platform'"` @@ -39,7 +42,7 @@ func (Agent) TableName() string { } func (a *Agent) IsSystemAgent() bool { - return a.OwnerID == -1 + return a.OrgID == -1 } var ErrFiltersBroken = errors.New("while creating filters map error ocured") @@ -52,15 +55,24 @@ func (a *Agent) GetFilters() (map[string]string, error) { // enforce filters for user and organization agents if a.IsSystemAgent() { - filters["repo"] = "*" // allow all repos by default - filters["owner"] = "*" // allow all owners by default - } else { - filters["owner"] = "*" // we don't have org agents implemented yet - // we expect this filter to be set else we fail - if _, ok := filters["repo"]; !ok { - return nil, ErrFiltersBroken - } + filters["org-id"] = "*" // allow all orgs + filters["repo-id"] = "*" // allow all repos + } else if a.OrgID > 0 { + filters["org-id"] = fmt.Sprintf("%d", a.OrgID) + filters["repo-id"] = "*" // allow all repos of the org } return filters, nil } + +func (a *Agent) CanAccessRepo(repo *Repo) bool { + if a.IsSystemAgent() { + return true + } + + if a.OrgID == repo.OrgID { + return true + } + + return false +} diff --git a/server/pipeline/queue.go b/server/pipeline/queue.go index afb640d58..413370d7d 100644 --- a/server/pipeline/queue.go +++ b/server/pipeline/queue.go @@ -60,6 +60,8 @@ func queuePipeline(ctx context.Context, repo *model.Repo, pipelineItems []*stepb func enforcedLabels(task *model.Task, repo *model.Repo) { task.Labels["repo"] = repo.FullName task.Labels["owner"] = repo.Owner + task.Labels["repo-id"] = fmt.Sprint(repo.ID) + task.Labels["org-id"] = fmt.Sprint(repo.OrgID) } func taskIDs(dependsOn []string, pipelineItems []*stepbuilder.Item) (taskIDs []string) {