Store an agents list and add agent heartbeats (#1189)

Co-authored-by: 6543 <6543@obermui.de>
This commit is contained in:
Anbraten 2023-01-28 14:13:04 +01:00 committed by GitHub
parent 222ff11fd9
commit d96032349a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
35 changed files with 1855 additions and 585 deletions

View file

@ -0,0 +1,45 @@
package rpc
import (
"context"
"time"
"github.com/woodpecker-ci/woodpecker/pipeline/rpc/proto"
"google.golang.org/grpc"
)
type AuthClient struct {
client proto.WoodpeckerAuthClient
conn *grpc.ClientConn
agentToken string
agentID int64
}
func NewAuthGrpcClient(conn *grpc.ClientConn, agentToken string, agentID int64) *AuthClient {
client := new(AuthClient)
client.client = proto.NewWoodpeckerAuthClient(conn)
client.conn = conn
client.agentToken = agentToken
client.agentID = agentID
return client
}
func (c *AuthClient) Auth() (string, int64, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
req := &proto.AuthRequest{
AgentToken: c.agentToken,
AgentId: c.agentID,
}
res, err := c.client.Auth(ctx, req)
if err != nil {
return "", -1, err
}
c.agentID = res.GetAgentId()
return res.GetAccessToken(), c.agentID, nil
}

View file

@ -0,0 +1,99 @@
package rpc
import (
"context"
"log"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
// AuthInterceptor is a client interceptor for authentication
type AuthInterceptor struct {
authClient *AuthClient
accessToken string
}
// NewAuthInterceptor returns a new auth interceptor
func NewAuthInterceptor(
authClient *AuthClient,
refreshDuration time.Duration,
) (*AuthInterceptor, error) {
interceptor := &AuthInterceptor{
authClient: authClient,
}
err := interceptor.scheduleRefreshToken(refreshDuration)
if err != nil {
return nil, err
}
return interceptor, nil
}
// Unary returns a client interceptor to authenticate unary RPC
func (interceptor *AuthInterceptor) Unary() grpc.UnaryClientInterceptor {
return func(
ctx context.Context,
method string,
req, reply interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
) error {
return invoker(interceptor.attachToken(ctx), method, req, reply, cc, opts...)
}
}
// Stream returns a client interceptor to authenticate stream RPC
func (interceptor *AuthInterceptor) Stream() grpc.StreamClientInterceptor {
return func(
ctx context.Context,
desc *grpc.StreamDesc,
cc *grpc.ClientConn,
method string,
streamer grpc.Streamer,
opts ...grpc.CallOption,
) (grpc.ClientStream, error) {
return streamer(interceptor.attachToken(ctx), desc, cc, method, opts...)
}
}
func (interceptor *AuthInterceptor) attachToken(ctx context.Context) context.Context {
return metadata.AppendToOutgoingContext(ctx, "token", interceptor.accessToken)
}
func (interceptor *AuthInterceptor) scheduleRefreshToken(refreshDuration time.Duration) error {
err := interceptor.refreshToken()
if err != nil {
return err
}
go func() {
wait := refreshDuration
for {
time.Sleep(wait)
err := interceptor.refreshToken()
if err != nil {
wait = time.Second
} else {
wait = refreshDuration
}
}
}()
return nil
}
func (interceptor *AuthInterceptor) refreshToken() error {
accessToken, _, err := interceptor.authClient.Auth()
if err != nil {
return err
}
interceptor.accessToken = accessToken
log.Printf("Token refreshed: %v", accessToken)
return nil
}

View file

@ -12,6 +12,7 @@ import (
"google.golang.org/grpc/status"
backend "github.com/woodpecker-ci/woodpecker/pipeline/backend/types"
"github.com/woodpecker-ci/woodpecker/pipeline/rpc"
"github.com/woodpecker-ci/woodpecker/pipeline/rpc/proto"
)
@ -23,7 +24,7 @@ type client struct {
}
// NewGrpcClient returns a new grpc Client.
func NewGrpcClient(conn *grpc.ClientConn) Peer {
func NewGrpcClient(conn *grpc.ClientConn) rpc.Peer {
client := new(client)
client.client = proto.NewWoodpeckerClient(conn)
client.conn = conn
@ -35,7 +36,7 @@ func (c *client) Close() error {
}
// Next returns the next pipeline in the queue.
func (c *client) Next(ctx context.Context, f Filter) (*Pipeline, error) {
func (c *client) Next(ctx context.Context, f rpc.Filter) (*rpc.Pipeline, error) {
var res *proto.NextReply
var err error
req := new(proto.NextRequest)
@ -75,7 +76,7 @@ func (c *client) Next(ctx context.Context, f Filter) (*Pipeline, error) {
return nil, nil
}
p := new(Pipeline)
p := new(rpc.Pipeline)
p.ID = res.GetPipeline().GetId()
p.Timeout = res.GetPipeline().GetTimeout()
p.Config = new(backend.Config)
@ -113,7 +114,7 @@ func (c *client) Wait(ctx context.Context, id string) (err error) {
}
// Init signals the pipeline is initialized.
func (c *client) Init(ctx context.Context, id string, state State) (err error) {
func (c *client) Init(ctx context.Context, id string, state rpc.State) (err error) {
req := new(proto.InitRequest)
req.Id = id
req.State = new(proto.State)
@ -147,7 +148,7 @@ func (c *client) Init(ctx context.Context, id string, state State) (err error) {
}
// Done signals the pipeline is complete.
func (c *client) Done(ctx context.Context, id string, state State) (err error) {
func (c *client) Done(ctx context.Context, id string, state rpc.State) (err error) {
req := new(proto.DoneRequest)
req.Id = id
req.State = new(proto.State)
@ -208,7 +209,7 @@ func (c *client) Extend(ctx context.Context, id string) (err error) {
}
// Update updates the pipeline state.
func (c *client) Update(ctx context.Context, id string, state State) (err error) {
func (c *client) Update(ctx context.Context, id string, state rpc.State) (err error) {
req := new(proto.UpdateRequest)
req.Id = id
req.State = new(proto.State)
@ -242,7 +243,7 @@ func (c *client) Update(ctx context.Context, id string, state State) (err error)
}
// Upload uploads the pipeline artifact.
func (c *client) Upload(ctx context.Context, id string, file *File) (err error) {
func (c *client) Upload(ctx context.Context, id string, file *rpc.File) (err error) {
req := new(proto.UploadRequest)
req.Id = id
req.File = new(proto.File)
@ -277,7 +278,7 @@ func (c *client) Upload(ctx context.Context, id string, file *File) (err error)
}
// Log writes the pipeline log entry.
func (c *client) Log(ctx context.Context, id string, line *Line) (err error) {
func (c *client) Log(ctx context.Context, id string, line *rpc.Line) (err error) {
req := new(proto.LogRequest)
req.Id = id
req.Line = new(proto.Line)
@ -307,3 +308,38 @@ func (c *client) Log(ctx context.Context, id string, line *Line) (err error) {
}
return nil
}
func (c *client) RegisterAgent(ctx context.Context, platform, backend, version string, capacity int) (int64, error) {
req := new(proto.RegisterAgentRequest)
req.Platform = platform
req.Backend = backend
req.Version = version
req.Capacity = int32(capacity)
res, err := c.client.RegisterAgent(ctx, req)
return res.GetAgentId(), err
}
func (c *client) ReportHealth(ctx context.Context) (err error) {
req := new(proto.ReportHealthRequest)
req.Status = "I am alive!"
for {
_, err = c.client.ReportHealth(ctx, req)
if err == nil {
return nil
}
switch status.Code(err) {
case
codes.Aborted,
codes.DataLoss,
codes.DeadlineExceeded,
codes.Internal,
codes.Unavailable:
// non-fatal errors
default:
return err
}
<-time.After(backoff)
}
}

View file

@ -22,6 +22,7 @@ import (
"runtime"
"strings"
"sync"
"time"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
@ -34,6 +35,7 @@ import (
"google.golang.org/grpc/metadata"
"github.com/woodpecker-ci/woodpecker/agent"
agentRpc "github.com/woodpecker-ci/woodpecker/agent/rpc"
"github.com/woodpecker-ci/woodpecker/pipeline/backend"
"github.com/woodpecker-ci/woodpecker/pipeline/backend/types"
"github.com/woodpecker-ci/woodpecker/pipeline/rpc"
@ -47,9 +49,11 @@ func loop(c *cli.Context) error {
hostname, _ = os.Hostname()
}
platform := runtime.GOOS + "/" + runtime.GOARCH
labels := map[string]string{
"hostname": hostname,
"platform": runtime.GOOS + "/" + runtime.GOARCH,
"platform": platform,
"repo": "*", // allow all repos by default
}
@ -95,10 +99,6 @@ func loop(c *cli.Context) error {
}()
}
// TODO pass version information to grpc server
// TODO authenticate to grpc server
// grpc.Dial(target, ))
var transport grpc.DialOption
if c.Bool("grpc-secure") {
transport = grpc.WithTransportCredentials(grpccredentials.NewTLS(&tls.Config{InsecureSkipVerify: c.Bool("skip-insecure-grpc")}))
@ -106,13 +106,9 @@ func loop(c *cli.Context) error {
transport = grpc.WithTransportCredentials(insecure.NewCredentials())
}
conn, err := grpc.Dial(
authConn, err := grpc.Dial(
c.String("server"),
transport,
grpc.WithPerRPCCredentials(&credentials{
username: c.String("grpc-username"),
password: c.String("grpc-password"),
}),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: c.Duration("grpc-keepalive-time"),
Timeout: c.Duration("grpc-keepalive-timeout"),
@ -121,9 +117,32 @@ func loop(c *cli.Context) error {
if err != nil {
return err
}
defer authConn.Close()
agentID := int64(-1) // TODO: store agent id in a file
agentToken := c.String("grpc-token")
authClient := agentRpc.NewAuthGrpcClient(authConn, agentToken, agentID)
authInterceptor, err := agentRpc.NewAuthInterceptor(authClient, 30*time.Minute)
if err != nil {
return err
}
conn, err := grpc.Dial(
c.String("server"),
transport,
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: c.Duration("grpc-keepalive-time"),
Timeout: c.Duration("grpc-keepalive-timeout"),
}),
grpc.WithUnaryInterceptor(authInterceptor.Unary()),
grpc.WithStreamInterceptor(authInterceptor.Stream()),
)
if err != nil {
return err
}
defer conn.Close()
client := rpc.NewGrpcClient(conn)
client := agentRpc.NewGrpcClient(conn)
sigterm := abool.New()
ctx := metadata.NewOutgoingContext(
@ -148,6 +167,29 @@ func loop(c *cli.Context) error {
return err
}
agentID, err = client.RegisterAgent(ctx, platform, engine.Name(), version.String(), parallel)
if err != nil {
return err
}
log.Debug().Msgf("Agent registered with ID %d", agentID)
go func() {
for {
if sigterm.IsSet() {
return
}
err := client.ReportHealth(ctx)
if err != nil {
log.Err(err).Msgf("Failed to report health")
return
}
<-time.After(time.Second * 10)
}
}()
for i := 0; i < parallel; i++ {
go func() {
defer wg.Done()
@ -178,25 +220,9 @@ func loop(c *cli.Context) error {
}
log.Info().Msgf(
"Starting Woodpecker agent with version '%s' and backend '%s' running up to %d pipelines in parallel",
version.String(), engine.Name(), parallel)
"Starting Woodpecker agent with version '%s' and backend '%s' using platform '%s' running up to %d pipelines in parallel",
version.String(), engine.Name(), platform, parallel)
wg.Wait()
return nil
}
type credentials struct {
username string
password string
}
func (c *credentials) GetRequestMetadata(context.Context, ...string) (map[string]string, error) {
return map[string]string{
"username": c.username,
"password": c.password,
}, nil
}
func (c *credentials) RequireTransportSecurity() bool {
return false
}

View file

@ -29,16 +29,10 @@ var flags = []cli.Flag{
Usage: "server address",
Value: "localhost:9000",
},
&cli.StringFlag{
EnvVars: []string{"WOODPECKER_USERNAME"},
Name: "grpc-username",
Usage: "auth username",
Value: "x-oauth-basic",
},
&cli.StringFlag{
EnvVars: []string{"WOODPECKER_AGENT_SECRET"},
Name: "grpc-password",
Usage: "server-agent shared password",
Name: "grpc-token",
Usage: "server-agent shared token",
FilePath: os.Getenv("WOODPECKER_AGENT_SECRET_FILE"),
},
&cli.BoolFlag{

View file

@ -17,7 +17,6 @@ package main
import (
"context"
"crypto/tls"
"errors"
"net"
"net/http"
"net/http/httputil"
@ -34,7 +33,6 @@ import (
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"github.com/woodpecker-ci/woodpecker/pipeline/rpc/proto"
"github.com/woodpecker-ci/woodpecker/server"
@ -135,16 +133,19 @@ func run(c *cli.Context) error {
log.Err(err).Msg("")
return err
}
authorizer := &authorizer{
password: c.String("agent-secret"),
}
jwtSecret := "secret" // TODO: make configurable
jwtManager := woodpeckerGrpcServer.NewJWTManager(jwtSecret)
authorizer := woodpeckerGrpcServer.NewAuthorizer(jwtManager)
grpcServer := grpc.NewServer(
grpc.StreamInterceptor(authorizer.streamInterceptor),
grpc.UnaryInterceptor(authorizer.unaryInterceptor),
grpc.StreamInterceptor(authorizer.StreamInterceptor),
grpc.UnaryInterceptor(authorizer.UnaryInterceptor),
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: c.Duration("keepalive-min-time"),
}),
)
woodpeckerServer := woodpeckerGrpcServer.NewWoodpeckerServer(
_forge,
server.Config.Services.Queue,
@ -155,6 +156,13 @@ func run(c *cli.Context) error {
)
proto.RegisterWoodpeckerServer(grpcServer, woodpeckerServer)
woodpeckerAuthServer := woodpeckerGrpcServer.NewWoodpeckerAuthServer(
jwtManager,
server.Config.Server.AgentToken,
_store,
)
proto.RegisterWoodpeckerAuthServer(grpcServer, woodpeckerAuthServer)
err = grpcServer.Serve(lis)
if err != nil {
log.Err(err).Msg("")
@ -315,7 +323,7 @@ func setupEvilGlobals(c *cli.Context, v store.Store, f forge.Forge) {
// server configuration
server.Config.Server.Cert = c.String("server-cert")
server.Config.Server.Key = c.String("server-key")
server.Config.Server.Pass = c.String("agent-secret")
server.Config.Server.AgentToken = c.String("agent-secret")
server.Config.Server.Host = c.String("server-host")
if c.IsSet("server-dev-oauth-host") {
server.Config.Server.OAuthHost = c.String("server-dev-oauth-host")
@ -337,31 +345,3 @@ func setupEvilGlobals(c *cli.Context, v store.Store, f forge.Forge) {
// TODO(485) temporary workaround to not hit api rate limits
server.Config.FlatPermissions = c.Bool("flat-permissions")
}
type authorizer struct {
password string
}
func (a *authorizer) streamInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
if err := a.authorize(stream.Context()); err != nil {
return err
}
return handler(srv, stream)
}
func (a *authorizer) unaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
if err := a.authorize(ctx); err != nil {
return nil, err
}
return handler(ctx, req)
}
func (a *authorizer) authorize(ctx context.Context) error {
if md, ok := metadata.FromIncomingContext(ctx); ok {
if len(md["password"]) > 0 && md["password"][0] == a.password {
return nil
}
return errors.New("invalid agent token")
}
return errors.New("missing agent token")
}

View file

@ -1,60 +0,0 @@
package rpc
import (
"context"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/woodpecker-ci/woodpecker/pipeline/rpc/proto"
)
// generate protobuffs
// protoc --go_out=plugins=grpc,import_path=proto:. *.proto
type healthClient struct {
client proto.HealthClient
conn *grpc.ClientConn
}
// NewGrpcHealthClient returns a new grpc Client.
func NewGrpcHealthClient(conn *grpc.ClientConn) Health {
client := new(healthClient)
client.client = proto.NewHealthClient(conn)
client.conn = conn
return client
}
func (c *healthClient) Close() error {
return c.conn.Close()
}
func (c *healthClient) Check(ctx context.Context) (bool, error) {
var res *proto.HealthCheckResponse
var err error
req := new(proto.HealthCheckRequest)
for {
res, err = c.client.Check(ctx, req)
if err == nil {
if res.GetStatus() == proto.HealthCheckResponse_SERVING {
return true, nil
}
return false, nil
}
switch status.Code(err) {
case
codes.Aborted,
codes.DataLoss,
codes.DeadlineExceeded,
codes.Internal,
codes.Unavailable:
// non-fatal errors
default:
return false, err
}
<-time.After(backoff)
}
}

View file

@ -1,11 +0,0 @@
package rpc
import (
"context"
)
// Health defines a health-check connection.
type Health interface {
// Check returns if server is healthy or not
Check(c context.Context) (bool, error)
}

View file

@ -66,4 +66,10 @@ type Peer interface {
// Log writes the pipeline log entry.
Log(c context.Context, id string, line *Line) error
// RegisterAgent register our agent to the server
RegisterAgent(ctx context.Context, platform, backend, version string, capacity int) (int64, error)
// ReportHealth reports health status of the agent to the server
ReportHealth(c context.Context) error
}

File diff suppressed because it is too large Load diff

View file

@ -3,6 +3,19 @@ syntax = "proto3";
option go_package = "github.com/woodpecker-ci/woodpecker/pipeline/rpc/proto";
package proto;
service Woodpecker {
rpc Next (NextRequest) returns (NextReply) {}
rpc Init (InitRequest) returns (Empty) {}
rpc Wait (WaitRequest) returns (Empty) {}
rpc Done (DoneRequest) returns (Empty) {}
rpc Extend (ExtendRequest) returns (Empty) {}
rpc Update (UpdateRequest) returns (Empty) {}
rpc Upload (UploadRequest) returns (Empty) {}
rpc Log (LogRequest) returns (Empty) {}
rpc RegisterAgent (RegisterAgentRequest) returns (RegisterAgentResponse) {}
rpc ReportHealth (ReportHealthRequest) returns (Empty) {}
}
message File {
string name = 1;
string step = 2;
@ -39,38 +52,6 @@ message Pipeline {
bytes payload = 3;
}
message HealthCheckRequest {
string service = 1;
}
message HealthCheckResponse {
enum ServingStatus {
UNKNOWN = 0;
SERVING = 1;
NOT_SERVING = 2;
}
ServingStatus status = 1;
}
service Woodpecker {
rpc Next (NextRequest) returns (NextReply) {}
rpc Init (InitRequest) returns (Empty) {}
rpc Wait (WaitRequest) returns (Empty) {}
rpc Done (DoneRequest) returns (Empty) {}
rpc Extend (ExtendRequest) returns (Empty) {}
rpc Update (UpdateRequest) returns (Empty) {}
rpc Upload (UploadRequest) returns (Empty) {}
rpc Log (LogRequest) returns (Empty) {}
}
service Health {
rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
}
//
// next
//
message NextRequest {
Filter filter = 1;
}
@ -113,5 +94,36 @@ message LogRequest {
}
message Empty {
}
message ReportHealthRequest {
string status = 1;
}
message RegisterAgentRequest {
string platform = 1;
int32 capacity = 2;
string backend = 3;
string version = 4;
}
message RegisterAgentResponse {
int64 agent_id = 1;
}
// Woodpecker auth service is a simple service to authenticate agents and aquire a token
service WoodpeckerAuth {
rpc Auth (AuthRequest) returns (AuthReply) {}
}
message AuthRequest {
string agent_token = 1;
int64 agent_id = 2;
}
message AuthReply {
string status = 1;
int64 agent_id = 2;
string access_token = 3;
}

View file

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v3.21.7
// - protoc v3.12.4
// source: woodpecker.proto
package proto
@ -30,6 +30,8 @@ type WoodpeckerClient interface {
Update(ctx context.Context, in *UpdateRequest, opts ...grpc.CallOption) (*Empty, error)
Upload(ctx context.Context, in *UploadRequest, opts ...grpc.CallOption) (*Empty, error)
Log(ctx context.Context, in *LogRequest, opts ...grpc.CallOption) (*Empty, error)
RegisterAgent(ctx context.Context, in *RegisterAgentRequest, opts ...grpc.CallOption) (*RegisterAgentResponse, error)
ReportHealth(ctx context.Context, in *ReportHealthRequest, opts ...grpc.CallOption) (*Empty, error)
}
type woodpeckerClient struct {
@ -112,6 +114,24 @@ func (c *woodpeckerClient) Log(ctx context.Context, in *LogRequest, opts ...grpc
return out, nil
}
func (c *woodpeckerClient) RegisterAgent(ctx context.Context, in *RegisterAgentRequest, opts ...grpc.CallOption) (*RegisterAgentResponse, error) {
out := new(RegisterAgentResponse)
err := c.cc.Invoke(ctx, "/proto.Woodpecker/RegisterAgent", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *woodpeckerClient) ReportHealth(ctx context.Context, in *ReportHealthRequest, opts ...grpc.CallOption) (*Empty, error) {
out := new(Empty)
err := c.cc.Invoke(ctx, "/proto.Woodpecker/ReportHealth", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// WoodpeckerServer is the server API for Woodpecker service.
// All implementations must embed UnimplementedWoodpeckerServer
// for forward compatibility
@ -124,6 +144,8 @@ type WoodpeckerServer interface {
Update(context.Context, *UpdateRequest) (*Empty, error)
Upload(context.Context, *UploadRequest) (*Empty, error)
Log(context.Context, *LogRequest) (*Empty, error)
RegisterAgent(context.Context, *RegisterAgentRequest) (*RegisterAgentResponse, error)
ReportHealth(context.Context, *ReportHealthRequest) (*Empty, error)
mustEmbedUnimplementedWoodpeckerServer()
}
@ -155,6 +177,12 @@ func (UnimplementedWoodpeckerServer) Upload(context.Context, *UploadRequest) (*E
func (UnimplementedWoodpeckerServer) Log(context.Context, *LogRequest) (*Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "method Log not implemented")
}
func (UnimplementedWoodpeckerServer) RegisterAgent(context.Context, *RegisterAgentRequest) (*RegisterAgentResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method RegisterAgent not implemented")
}
func (UnimplementedWoodpeckerServer) ReportHealth(context.Context, *ReportHealthRequest) (*Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "method ReportHealth not implemented")
}
func (UnimplementedWoodpeckerServer) mustEmbedUnimplementedWoodpeckerServer() {}
// UnsafeWoodpeckerServer may be embedded to opt out of forward compatibility for this service.
@ -312,6 +340,42 @@ func _Woodpecker_Log_Handler(srv interface{}, ctx context.Context, dec func(inte
return interceptor(ctx, in, info, handler)
}
func _Woodpecker_RegisterAgent_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(RegisterAgentRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(WoodpeckerServer).RegisterAgent(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/proto.Woodpecker/RegisterAgent",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(WoodpeckerServer).RegisterAgent(ctx, req.(*RegisterAgentRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Woodpecker_ReportHealth_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ReportHealthRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(WoodpeckerServer).ReportHealth(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/proto.Woodpecker/ReportHealth",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(WoodpeckerServer).ReportHealth(ctx, req.(*ReportHealthRequest))
}
return interceptor(ctx, in, info, handler)
}
// Woodpecker_ServiceDesc is the grpc.ServiceDesc for Woodpecker service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
@ -351,91 +415,99 @@ var Woodpecker_ServiceDesc = grpc.ServiceDesc{
MethodName: "Log",
Handler: _Woodpecker_Log_Handler,
},
{
MethodName: "RegisterAgent",
Handler: _Woodpecker_RegisterAgent_Handler,
},
{
MethodName: "ReportHealth",
Handler: _Woodpecker_ReportHealth_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "woodpecker.proto",
}
// HealthClient is the client API for Health service.
// WoodpeckerAuthClient is the client API for WoodpeckerAuth service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type HealthClient interface {
Check(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error)
type WoodpeckerAuthClient interface {
Auth(ctx context.Context, in *AuthRequest, opts ...grpc.CallOption) (*AuthReply, error)
}
type healthClient struct {
type woodpeckerAuthClient struct {
cc grpc.ClientConnInterface
}
func NewHealthClient(cc grpc.ClientConnInterface) HealthClient {
return &healthClient{cc}
func NewWoodpeckerAuthClient(cc grpc.ClientConnInterface) WoodpeckerAuthClient {
return &woodpeckerAuthClient{cc}
}
func (c *healthClient) Check(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error) {
out := new(HealthCheckResponse)
err := c.cc.Invoke(ctx, "/proto.Health/Check", in, out, opts...)
func (c *woodpeckerAuthClient) Auth(ctx context.Context, in *AuthRequest, opts ...grpc.CallOption) (*AuthReply, error) {
out := new(AuthReply)
err := c.cc.Invoke(ctx, "/proto.WoodpeckerAuth/Auth", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// HealthServer is the server API for Health service.
// All implementations must embed UnimplementedHealthServer
// WoodpeckerAuthServer is the server API for WoodpeckerAuth service.
// All implementations must embed UnimplementedWoodpeckerAuthServer
// for forward compatibility
type HealthServer interface {
Check(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error)
mustEmbedUnimplementedHealthServer()
type WoodpeckerAuthServer interface {
Auth(context.Context, *AuthRequest) (*AuthReply, error)
mustEmbedUnimplementedWoodpeckerAuthServer()
}
// UnimplementedHealthServer must be embedded to have forward compatible implementations.
type UnimplementedHealthServer struct {
// UnimplementedWoodpeckerAuthServer must be embedded to have forward compatible implementations.
type UnimplementedWoodpeckerAuthServer struct {
}
func (UnimplementedHealthServer) Check(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Check not implemented")
func (UnimplementedWoodpeckerAuthServer) Auth(context.Context, *AuthRequest) (*AuthReply, error) {
return nil, status.Errorf(codes.Unimplemented, "method Auth not implemented")
}
func (UnimplementedHealthServer) mustEmbedUnimplementedHealthServer() {}
func (UnimplementedWoodpeckerAuthServer) mustEmbedUnimplementedWoodpeckerAuthServer() {}
// UnsafeHealthServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to HealthServer will
// UnsafeWoodpeckerAuthServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to WoodpeckerAuthServer will
// result in compilation errors.
type UnsafeHealthServer interface {
mustEmbedUnimplementedHealthServer()
type UnsafeWoodpeckerAuthServer interface {
mustEmbedUnimplementedWoodpeckerAuthServer()
}
func RegisterHealthServer(s grpc.ServiceRegistrar, srv HealthServer) {
s.RegisterService(&Health_ServiceDesc, srv)
func RegisterWoodpeckerAuthServer(s grpc.ServiceRegistrar, srv WoodpeckerAuthServer) {
s.RegisterService(&WoodpeckerAuth_ServiceDesc, srv)
}
func _Health_Check_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(HealthCheckRequest)
func _WoodpeckerAuth_Auth_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(AuthRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(HealthServer).Check(ctx, in)
return srv.(WoodpeckerAuthServer).Auth(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/proto.Health/Check",
FullMethod: "/proto.WoodpeckerAuth/Auth",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(HealthServer).Check(ctx, req.(*HealthCheckRequest))
return srv.(WoodpeckerAuthServer).Auth(ctx, req.(*AuthRequest))
}
return interceptor(ctx, in, info, handler)
}
// Health_ServiceDesc is the grpc.ServiceDesc for Health service.
// WoodpeckerAuth_ServiceDesc is the grpc.ServiceDesc for WoodpeckerAuth service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var Health_ServiceDesc = grpc.ServiceDesc{
ServiceName: "proto.Health",
HandlerType: (*HealthServer)(nil),
var WoodpeckerAuth_ServiceDesc = grpc.ServiceDesc{
ServiceName: "proto.WoodpeckerAuth",
HandlerType: (*WoodpeckerAuthServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Check",
Handler: _Health_Check_Handler,
MethodName: "Auth",
Handler: _WoodpeckerAuth_Auth_Handler,
},
},
Streams: []grpc.StreamDesc{},

130
server/api/agent.go Normal file
View file

@ -0,0 +1,130 @@
// Copyright 2022 Woodpecker Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package api
import (
"encoding/base32"
"net/http"
"strconv"
"github.com/gin-gonic/gin"
"github.com/gorilla/securecookie"
"github.com/woodpecker-ci/woodpecker/server/model"
"github.com/woodpecker-ci/woodpecker/server/router/middleware/session"
"github.com/woodpecker-ci/woodpecker/server/store"
)
func GetAgents(c *gin.Context) {
agents, err := store.FromContext(c).AgentList()
if err != nil {
c.String(500, "Error getting agent list. %s", err)
return
}
c.JSON(http.StatusOK, agents)
}
func GetAgent(c *gin.Context) {
agentID, err := strconv.ParseInt(c.Param("agent"), 10, 64)
if err != nil {
_ = c.AbortWithError(http.StatusBadRequest, err)
return
}
agent, err := store.FromContext(c).AgentFind(agentID)
if err != nil {
c.String(http.StatusNotFound, "Cannot find agent. %s", err)
return
}
c.JSON(http.StatusOK, agent)
}
func PatchAgent(c *gin.Context) {
_store := store.FromContext(c)
in := &model.Agent{}
err := c.Bind(in)
if err != nil {
c.AbortWithStatus(http.StatusBadRequest)
return
}
agentID, err := strconv.ParseInt(c.Param("agent"), 10, 64)
if err != nil {
_ = c.AbortWithError(http.StatusBadRequest, err)
return
}
agent, err := _store.AgentFind(agentID)
if err != nil {
c.AbortWithStatus(http.StatusNotFound)
return
}
agent.Name = in.Name
err = _store.AgentUpdate(agent)
if err != nil {
c.AbortWithStatus(http.StatusConflict)
return
}
c.JSON(http.StatusOK, agent)
}
// PostAgent create a new agent with a random token so a new agent can connect to the server
func PostAgent(c *gin.Context) {
in := &model.Agent{}
err := c.Bind(in)
if err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}
user := session.User(c)
agent := &model.Agent{
Name: in.Name,
OwnerID: user.ID,
Token: base32.StdEncoding.EncodeToString(
securecookie.GenerateRandomKey(32),
),
}
if err = store.FromContext(c).AgentCreate(agent); err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
c.JSON(http.StatusOK, agent)
}
func DeleteAgent(c *gin.Context) {
_store := store.FromContext(c)
agentID, err := strconv.ParseInt(c.Param("agent"), 10, 64)
if err != nil {
_ = c.AbortWithError(http.StatusBadRequest, err)
return
}
agent, err := _store.AgentFind(agentID)
if err != nil {
c.String(http.StatusNotFound, "Cannot find user. %s", err)
return
}
if err = _store.AgentDelete(agent); err != nil {
c.String(http.StatusInternalServerError, "Error deleting user. %s", err)
return
}
c.String(http.StatusOK, "")
}

View file

@ -60,7 +60,7 @@ var Config = struct {
OAuthHost string
Host string
Port string
Pass string
AgentToken string
Docs string
StatusContext string
StatusContextFormat string

View file

@ -0,0 +1,65 @@
package grpc
import (
"context"
"github.com/rs/zerolog/log"
"github.com/woodpecker-ci/woodpecker/pipeline/rpc/proto"
"github.com/woodpecker-ci/woodpecker/server"
"github.com/woodpecker-ci/woodpecker/server/model"
"github.com/woodpecker-ci/woodpecker/server/store"
)
type WoodpeckerAuthServer struct {
proto.UnimplementedWoodpeckerAuthServer
jwtManager *JWTManager
agentMasterToken string
store store.Store
}
func NewWoodpeckerAuthServer(jwtManager *JWTManager, agentMasterToken string, store store.Store) *WoodpeckerAuthServer {
return &WoodpeckerAuthServer{jwtManager: jwtManager, agentMasterToken: agentMasterToken, store: store}
}
func (s *WoodpeckerAuthServer) Auth(c context.Context, req *proto.AuthRequest) (*proto.AuthReply, error) {
agent, err := s.getAgent(c, req.AgentId, req.AgentToken)
if err != nil {
return nil, err
}
accessToken, err := s.jwtManager.Generate(agent.ID)
if err != nil {
return nil, err
}
return &proto.AuthReply{
Status: "ok",
AgentId: agent.ID,
AccessToken: accessToken,
}, nil
}
func (s *WoodpeckerAuthServer) getAgent(c context.Context, agentID int64, agentToken string) (*model.Agent, error) {
if agentToken == s.agentMasterToken && agentID == -1 {
agent := new(model.Agent)
agent.Name = ""
agent.OwnerID = -1 // system agent
agent.Token = server.Config.Server.AgentToken
agent.Backend = ""
agent.Platform = ""
agent.Capacity = -1
err := s.store.AgentCreate(agent)
if err != nil {
log.Err(err).Msgf("Error creating system agent: %s", err)
return nil, err
}
return agent, nil
}
if agentToken == s.agentMasterToken {
return s.store.AgentFind(agentID)
}
return s.store.AgentFindByToken(agentToken)
}

93
server/grpc/authorizer.go Normal file
View file

@ -0,0 +1,93 @@
package grpc
import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
type StreamContextWrapper interface {
grpc.ServerStream
SetContext(context.Context)
}
type wrapper struct {
grpc.ServerStream
ctx context.Context
}
func (w *wrapper) Context() context.Context {
return w.ctx
}
func (w *wrapper) SetContext(ctx context.Context) {
w.ctx = ctx
}
func newStreamContextWrapper(inner grpc.ServerStream) StreamContextWrapper {
ctx := inner.Context()
return &wrapper{
inner,
ctx,
}
}
type Authorizer struct {
jwtManager *JWTManager
}
func NewAuthorizer(jwtManager *JWTManager) *Authorizer {
return &Authorizer{jwtManager: jwtManager}
}
func (a *Authorizer) StreamInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
_stream := newStreamContextWrapper(stream)
newCtx, err := a.authorize(stream.Context(), info.FullMethod)
if err != nil {
return err
}
_stream.SetContext(newCtx)
return handler(srv, _stream)
}
func (a *Authorizer) UnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
newCtx, err := a.authorize(ctx, info.FullMethod)
if err != nil {
return nil, err
}
return handler(newCtx, req)
}
func (a *Authorizer) authorize(ctx context.Context, fullMethod string) (context.Context, error) {
// bypass auth for token endpoint
if fullMethod == "/proto.WoodpeckerAuth/Auth" {
return ctx, nil
}
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return ctx, status.Errorf(codes.Unauthenticated, "metadata is not provided")
}
values := md["token"]
if len(values) == 0 {
return ctx, status.Errorf(codes.Unauthenticated, "token is not provided")
}
accessToken := values[0]
claims, err := a.jwtManager.Verify(accessToken)
if err != nil {
return ctx, status.Errorf(codes.Unauthenticated, "access token is invalid: %v", err)
}
md.Append("agent_id", fmt.Sprintf("%d", claims.AgentID))
return metadata.NewIncomingContext(ctx, md), nil
}

View file

@ -0,0 +1,73 @@
package grpc
import (
"errors"
"fmt"
"time"
"github.com/golang-jwt/jwt/v4"
)
// JWTManager is a JSON web token manager
type JWTManager struct {
secretKey string
tokenDuration time.Duration
}
// UserClaims is a custom JWT claims that contains some user's information
type AgentTokenClaims struct {
jwt.RegisteredClaims
AgentID int64 `json:"agent_id"`
}
const jwtTokenDuration = 1 * time.Hour
// NewJWTManager returns a new JWT manager
func NewJWTManager(secretKey string) *JWTManager {
return &JWTManager{secretKey, jwtTokenDuration}
}
// Generate generates and signs a new token for a user
func (manager *JWTManager) Generate(agentID int64) (string, error) {
claims := AgentTokenClaims{
RegisteredClaims: jwt.RegisteredClaims{
Issuer: "woodpecker",
Subject: fmt.Sprintf("%d", agentID),
Audience: jwt.ClaimStrings{},
NotBefore: jwt.NewNumericDate(time.Now()),
IssuedAt: jwt.NewNumericDate(time.Now()),
ID: fmt.Sprintf("%d", agentID),
ExpiresAt: jwt.NewNumericDate(time.Now().Add(manager.tokenDuration)),
},
AgentID: agentID,
}
token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
return token.SignedString([]byte(manager.secretKey))
}
// Verify verifies the access token string and return a user claim if the token is valid
func (manager *JWTManager) Verify(accessToken string) (*AgentTokenClaims, error) {
token, err := jwt.ParseWithClaims(
accessToken,
&AgentTokenClaims{},
func(token *jwt.Token) (interface{}, error) {
_, ok := token.Method.(*jwt.SigningMethodHMAC)
if !ok {
return nil, errors.New("unexpected token signing method")
}
return []byte(manager.secretKey), nil
},
)
if err != nil {
return nil, fmt.Errorf("invalid token: %w", err)
}
claims, ok := token.Claims.(*AgentTokenClaims)
if !ok {
return nil, errors.New("invalid token claims")
}
return claims, nil
}

View file

@ -22,12 +22,15 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"strconv"
"time"
"github.com/rs/zerolog/log"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc/metadata"
grpcMetadata "google.golang.org/grpc/metadata"
"github.com/woodpecker-ci/woodpecker/pipeline/rpc"
@ -382,6 +385,40 @@ func (s *RPC) Log(c context.Context, id string, line *rpc.Line) error {
return nil
}
func (s *RPC) RegisterAgent(ctx context.Context, platform, backend, version string, capacity int32) (int64, error) {
agent, err := s.getAgentFromContext(ctx)
if err != nil {
return -1, err
}
agent.Backend = backend
agent.Platform = platform
agent.Capacity = capacity
agent.Version = version
err = s.store.AgentUpdate(agent)
if err != nil {
return -1, err
}
return agent.ID, nil
}
func (s *RPC) ReportHealth(ctx context.Context, status string) error {
agent, err := s.getAgentFromContext(ctx)
if err != nil {
return err
}
if status != "I am alive!" {
return errors.New("Are you alive?")
}
agent.LastContact = time.Now().Unix()
return s.store.AgentUpdate(agent)
}
func (s *RPC) completeChildrenIfParentCompleted(steps []*model.Step, completedWorkflow *model.Step) {
for _, p := range steps {
if p.Running() && p.PPID == completedWorkflow.PID {
@ -438,3 +475,23 @@ func (s *RPC) notify(c context.Context, repo *model.Repo, pipeline *model.Pipeli
}
return nil
}
func (s *RPC) getAgentFromContext(ctx context.Context) (*model.Agent, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, errors.New("metadata is not provided")
}
values := md["agent_id"]
if len(values) == 0 {
return nil, errors.New("agent_id is not provided")
}
_agentID := values[0]
agentID, err := strconv.ParseInt(_agentID, 10, 64)
if err != nil {
return nil, errors.New("agent_id is not a valid integer")
}
return s.store.AgentFind(agentID)
}

View file

@ -163,3 +163,16 @@ func (s *WoodpeckerServer) Log(c context.Context, req *proto.LogRequest) (*proto
err := s.peer.Log(c, req.GetId(), line)
return res, err
}
func (s *WoodpeckerServer) RegisterAgent(c context.Context, req *proto.RegisterAgentRequest) (*proto.RegisterAgentResponse, error) {
res := new(proto.RegisterAgentResponse)
agentID, err := s.peer.RegisterAgent(c, req.GetPlatform(), req.GetBackend(), req.GetVersion(), req.GetCapacity())
res.AgentId = agentID
return res, err
}
func (s *WoodpeckerServer) ReportHealth(c context.Context, req *proto.ReportHealthRequest) (*proto.Empty, error) {
res := new(proto.Empty)
err := s.peer.ReportHealth(c, req.GetStatus())
return res, err
}

View file

@ -14,18 +14,25 @@
package model
// TODO: check if it is actually used or just some relict from the past
type Agent struct {
ID int64 `xorm:"pk autoincr 'agent_id'"`
Addr string `xorm:"UNIQUE VARCHAR(250) 'agent_addr'"`
Platform string `xorm:"VARCHAR(500) 'agent_platform'"`
Capacity int64 `xorm:"agent_capacity"`
Created int64 `xorm:"created 'agent_created'"`
Updated int64 `xorm:"updated 'agent_updated'"`
ID int64 `json:"id" xorm:"pk autoincr 'id'"`
Created int64 `json:"created" xorm:"created"`
Updated int64 `json:"updated" xorm:"updated"`
Name string `json:"name"`
OwnerID int64 `json:"owner_id" xorm:"'owner_id'"`
Token string `json:"token"`
LastContact int64 `json:"last_contact"`
Platform string `json:"platform" xorm:"VARCHAR(100)"`
Backend string `json:"backend" xorm:"VARCHAR(100)"`
Capacity int32 `json:"capacity"`
Version string `json:"version"`
}
// TableName return database table name for xorm
func (Agent) TableName() string {
return "agents"
}
func (a *Agent) IsSystemAgent() bool {
return a.OwnerID == -1
}

View file

@ -166,6 +166,16 @@ func apiRoutes(e *gin.Engine) {
logLevel.POST("", api.SetLogLevel)
}
agentBase := apiBase.Group("/agents")
{
agentBase.Use(session.MustAdmin())
agentBase.GET("", api.GetAgents)
agentBase.POST("", api.PostAgent)
agentBase.GET("/:agent", api.GetAgent)
agentBase.PATCH("/:agent", api.PatchAgent)
agentBase.DELETE("/:agent", api.DeleteAgent)
}
apiBase.GET("/signature/public-key", session.MustUser(), api.GetSignaturePublicKey)
apiBase.POST("/hook", api.PostHook)

View file

@ -0,0 +1,52 @@
// Copyright 2021 Woodpecker Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datastore
import (
"github.com/woodpecker-ci/woodpecker/server/model"
)
func (s storage) AgentList() ([]*model.Agent, error) {
agents := make([]*model.Agent, 0, 10)
return agents, s.engine.Find(&agents)
}
func (s storage) AgentFind(id int64) (*model.Agent, error) {
agent := new(model.Agent)
return agent, wrapGet(s.engine.ID(id).Get(agent))
}
func (s storage) AgentFindByToken(token string) (*model.Agent, error) {
agent := &model.Agent{
Token: token,
}
return agent, wrapGet(s.engine.Get(agent))
}
func (s storage) AgentCreate(agent *model.Agent) error {
// only Insert set auto created ID back to object
_, err := s.engine.Insert(agent)
return err
}
func (s storage) AgentUpdate(agent *model.Agent) error {
_, err := s.engine.ID(agent.ID).AllCols().Update(agent)
return err
}
func (s storage) AgentDelete(agent *model.Agent) error {
_, err := s.engine.ID(agent.ID).Delete(new(model.Agent))
return err
}

View file

@ -0,0 +1,31 @@
// Copyright 2022 Woodpecker Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package migration
import (
"xorm.io/xorm"
"github.com/woodpecker-ci/woodpecker/server/model"
)
var recreateAgentsTable = task{
name: "recreate-agents-table",
fn: func(sess *xorm.Session) error {
if err := dropTable(sess, "agents"); err != nil {
return err
}
return sess.Sync2(new(model.Agent))
},
}

View file

@ -23,6 +23,17 @@ import (
"xorm.io/xorm/schemas"
)
func dropTable(sess *xorm.Session, table string) error {
dialect := sess.Engine().Dialect().URI().DBType
switch dialect {
case schemas.MYSQL, schemas.POSTGRES, schemas.SQLITE:
_, err := sess.Exec(fmt.Sprintf("DROP TABLE `%s`;", table))
return err
default:
return fmt.Errorf("dialect '%s' not supported", dialect)
}
}
func renameTable(sess *xorm.Session, old, new string) error {
dialect := sess.Engine().Dialect().URI().DBType
switch dialect {

View file

@ -35,6 +35,7 @@ var migrationTasks = []*task{
&dropSenders,
&alterTableLogUpdateColumnLogDataType,
&alterTableSecretsAddUserCol,
&recreateAgentsTable,
&lowercaseSecretNames,
&renameBuildsToPipeline,
&renameColumnsBuildsToPipeline,

View file

@ -15,6 +15,117 @@ type Store struct {
mock.Mock
}
// AgentCreate provides a mock function with given fields: _a0
func (_m *Store) AgentCreate(_a0 *model.Agent) error {
ret := _m.Called(_a0)
var r0 error
if rf, ok := ret.Get(0).(func(*model.Agent) error); ok {
r0 = rf(_a0)
} else {
r0 = ret.Error(0)
}
return r0
}
// AgentDelete provides a mock function with given fields: _a0
func (_m *Store) AgentDelete(_a0 *model.Agent) error {
ret := _m.Called(_a0)
var r0 error
if rf, ok := ret.Get(0).(func(*model.Agent) error); ok {
r0 = rf(_a0)
} else {
r0 = ret.Error(0)
}
return r0
}
// AgentFind provides a mock function with given fields: _a0
func (_m *Store) AgentFind(_a0 int64) (*model.Agent, error) {
ret := _m.Called(_a0)
var r0 *model.Agent
if rf, ok := ret.Get(0).(func(int64) *model.Agent); ok {
r0 = rf(_a0)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*model.Agent)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(int64) error); ok {
r1 = rf(_a0)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// AgentFindByToken provides a mock function with given fields: _a0
func (_m *Store) AgentFindByToken(_a0 string) (*model.Agent, error) {
ret := _m.Called(_a0)
var r0 *model.Agent
if rf, ok := ret.Get(0).(func(string) *model.Agent); ok {
r0 = rf(_a0)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*model.Agent)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(string) error); ok {
r1 = rf(_a0)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// AgentList provides a mock function with given fields:
func (_m *Store) AgentList() ([]*model.Agent, error) {
ret := _m.Called()
var r0 []*model.Agent
if rf, ok := ret.Get(0).(func() []*model.Agent); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*model.Agent)
}
}
var r1 error
if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf()
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// AgentUpdate provides a mock function with given fields: _a0
func (_m *Store) AgentUpdate(_a0 *model.Agent) error {
ret := _m.Called(_a0)
var r0 error
if rf, ok := ret.Get(0).(func(*model.Agent) error); ok {
r0 = rf(_a0)
} else {
r0 = ret.Error(0)
}
return r0
}
// Close provides a mock function with given fields:
func (_m *Store) Close() error {
ret := _m.Called()

View file

@ -179,6 +179,14 @@ type Store interface {
CronListNextExecute(int64, int64) ([]*model.Cron, error)
CronGetLock(*model.Cron, int64) (bool, error)
// Agent
AgentCreate(*model.Agent) error
AgentFind(int64) (*model.Agent, error)
AgentFindByToken(string) (*model.Agent, error)
AgentList() ([]*model.Agent, error)
AgentUpdate(*model.Agent) error
AgentDelete(*model.Agent) error
// Store operations
Ping() error
Close() error

1
web/components.d.ts vendored
View file

@ -9,6 +9,7 @@ declare module '@vue/runtime-core' {
export interface GlobalComponents {
ActionsTab: typeof import('./src/components/repo/settings/ActionsTab.vue')['default']
ActivePipelines: typeof import('./src/components/layout/header/ActivePipelines.vue')['default']
AdminAgentsTab: typeof import('./src/components/admin/settings/AdminAgentsTab.vue')['default']
AdminSecretsTab: typeof import('./src/components/admin/settings/AdminSecretsTab.vue')['default']
BadgeTab: typeof import('./src/components/repo/settings/BadgeTab.vue')['default']
Button: typeof import('./src/components/atomic/Button.vue')['default']

View file

@ -321,6 +321,29 @@
"events": "Available at following events",
"pr_warning": "Please be careful with this option as a bad actor can submit a malicious pull request that exposes your secrets."
}
},
"agents": {
"agents": "Agents",
"desc": "Agents registered for this server",
"none": "There are no agents yet.",
"add": "Add agent",
"save": "Save agent",
"show": "Show agents",
"created": "Agent created",
"saved": "Agent saved",
"deleted": "Agent deleted",
"name": {
"name": "Name",
"placeholder": "Name of the agent"
},
"token": "Token",
"platform": "Platform",
"backend": "Backend",
"capacity": "Capacity",
"version": "Version",
"last_contact": "Last contact",
"never": "Never",
"delete_confirm": "Do you really want to delete this agent? It wont be able to connected to the server anymore."
}
}
},

View file

@ -0,0 +1,161 @@
<template>
<Panel>
<div class="flex flex-row border-b mb-4 pb-4 items-center dark:border-gray-600">
<div class="ml-2">
<h1 class="text-xl text-color">{{ $t('admin.settings.agents.agents') }}</h1>
<p class="text-sm text-color-alt">{{ $t('admin.settings.agents.desc') }}</p>
</div>
<Button
v-if="selectedAgent"
class="ml-auto"
:text="$t('admin.settings.agents.show')"
start-icon="back"
@click="selectedAgent = undefined"
/>
<template v-else>
<Button class="ml-auto" :text="$t('admin.settings.agents.add')" start-icon="plus" @click="showAddAgent" />
<Button class="ml-2" start-icon="refresh" @click="loadAgents" />
</template>
</div>
<div v-if="!selectedAgent" class="space-y-4 text-color">
<ListItem v-for="agent in agents" :key="agent.id" class="items-center">
<span>{{ agent.name || `Agent ${agent.id}` }}</span>
<span class="ml-auto">{{ agent.last_contact ? timeAgo.format(agent.last_contact * 1000) : 'never' }}</span>
<IconButton icon="edit" class="ml-2 w-8 h-8" @click="editAgent(agent)" />
<IconButton
icon="trash"
class="ml-2 w-8 h-8 hover:text-red-400 hover:dark:text-red-500"
:is-loading="isDeleting"
@click="deleteAgent(agent)"
/>
</ListItem>
<div v-if="agents?.length === 0" class="ml-2">{{ $t('admin.settings.agents.none') }}</div>
</div>
<div v-else>
<form @submit.prevent="saveAgent">
<InputField :label="$t('admin.settings.agents.name.name')">
<TextField
v-model="selectedAgent.name"
:placeholder="$t('admin.settings.agents.name.placeholder')"
required
/>
</InputField>
<template v-if="isEditingAgent">
<InputField :label="$t('admin.settings.agents.token')">
<TextField v-model="selectedAgent.token" :placeholder="$t('admin.settings.agents.token')" disabled />
</InputField>
<InputField :label="$t('admin.settings.agents.backend')" docs-url="docs/next/administration/backends/docker">
<TextField v-model="selectedAgent.backend" disabled />
</InputField>
<InputField :label="$t('admin.settings.agents.platform')">
<TextField v-model="selectedAgent.platform" disabled />
</InputField>
<InputField
:label="$t('admin.settings.agents.capacity')"
docs-url="docs/next/administration/agent-config#woodpecker_max_procs"
>
<span class="text-color-alt">The max amount of parallel pipelines executed by this agent.</span>
<TextField :model-value="selectedAgent.capacity?.toString()" disabled />
</InputField>
<InputField :label="$t('admin.settings.agents.version')">
<TextField :model-value="selectedAgent.version" disabled />
</InputField>
<InputField :label="$t('admin.settings.agents.last_contact')">
<TextField
:model-value="
selectedAgent.last_contact
? timeAgo.format(selectedAgent.last_contact * 1000)
: $t('admin.settings.agents.never')
"
disabled
/>
</InputField>
</template>
<Button
:is-loading="isSaving"
type="submit"
:text="isEditingAgent ? $t('admin.settings.agents.save') : $t('admin.settings.agents.add')"
/>
</form>
</div>
</Panel>
</template>
<script lang="ts" setup>
import { cloneDeep } from 'lodash';
import { computed, onMounted, ref } from 'vue';
import { useI18n } from 'vue-i18n';
import Button from '~/components/atomic/Button.vue';
import ListItem from '~/components/atomic/ListItem.vue';
import InputField from '~/components/form/InputField.vue';
import TextField from '~/components/form/TextField.vue';
import Panel from '~/components/layout/Panel.vue';
import useApiClient from '~/compositions/useApiClient';
import { useAsyncAction } from '~/compositions/useAsyncAction';
import useNotifications from '~/compositions/useNotifications';
import { Agent } from '~/lib/api/types';
import timeAgo from '~/utils/timeAgo';
const apiClient = useApiClient();
const notifications = useNotifications();
const i18n = useI18n();
const agents = ref<Agent[]>([]);
const selectedAgent = ref<Partial<Agent>>();
const isEditingAgent = computed(() => !!selectedAgent.value?.id);
async function loadAgents() {
agents.value = await apiClient.getAgents();
}
const { doSubmit: saveAgent, isLoading: isSaving } = useAsyncAction(async () => {
if (!selectedAgent.value) {
throw new Error("Unexpected: Can't get agent");
}
if (isEditingAgent.value) {
await apiClient.updateAgent(selectedAgent.value);
selectedAgent.value = undefined;
} else {
selectedAgent.value = await apiClient.createAgent(selectedAgent.value);
}
notifications.notify({
title: i18n.t(isEditingAgent.value ? 'admin.settings.agents.saved' : 'admin.settings.agents.created'),
type: 'success',
});
await loadAgents();
});
const { doSubmit: deleteAgent, isLoading: isDeleting } = useAsyncAction(async (_agent: Agent) => {
// eslint-disable-next-line no-restricted-globals, no-alert
if (!confirm(i18n.t('admin.settings.agents.delete_confirm'))) {
return;
}
await apiClient.deleteAgent(_agent);
notifications.notify({ title: i18n.t('admin.settings.agents.deleted'), type: 'success' });
await loadAgents();
});
function editAgent(agent: Agent) {
selectedAgent.value = cloneDeep(agent);
}
function showAddAgent() {
selectedAgent.value = cloneDeep({ name: '' });
}
onMounted(async () => {
await loadAgents();
});
</script>

View file

@ -41,13 +41,12 @@
<i-icon-park-outline-alarm-clock v-else-if="name === 'stopwatch'" class="h-6 w-6" />
<i-ic-baseline-file-download v-else-if="name === 'auto-scroll'" class="h-6 w-6" />
<i-ic-baseline-file-download-off v-else-if="name === 'auto-scroll-off'" class="h-6 w-6" />
<i-teenyicons-refresh-outline v-else-if="name === 'refresh'" class="h-6 w-6" />
<i-ic-baseline-play-arrow v-else-if="name === 'play'" class="h-6 w-6" />
<div v-else-if="name === 'blank'" class="h-6 w-6" />
</template>
<script lang="ts">
import { defineComponent, PropType } from 'vue';
<script lang="ts" setup>
export type IconNames =
| 'duration'
| 'since'
@ -92,16 +91,10 @@ export type IconNames =
| 'download'
| 'auto-scroll'
| 'auto-scroll-off'
| 'refresh'
| 'play';
export default defineComponent({
name: 'Icon',
props: {
name: {
type: String as PropType<IconNames>,
required: true,
},
},
});
defineProps<{
name: IconNames;
}>();
</script>

View file

@ -1,5 +1,7 @@
import ApiClient, { encodeQueryString } from './client';
import {
Agent,
Cron,
OrgPermissions,
Pipeline,
PipelineConfig,
@ -12,7 +14,6 @@ import {
RepoSettings,
Secret,
} from './types';
import { Cron } from './types/cron';
type RepoListOptions = {
all?: boolean;
@ -229,6 +230,26 @@ export default class WoodpeckerClient extends ApiClient {
return this._post('/api/user/token') as Promise<string>;
}
getAgents(): Promise<Agent[]> {
return this._get('/api/agents') as Promise<Agent[]>;
}
getAgent(agentId: Agent['id']): Promise<Agent> {
return this._get(`/api/agents/${agentId}`) as Promise<Agent>;
}
createAgent(agent: Partial<Agent>): Promise<Agent> {
return this._post('/api/agents', agent) as Promise<Agent>;
}
updateAgent(agent: Partial<Agent>): Promise<unknown> {
return this._patch(`/api/agents/${agent.id}`, agent);
}
deleteAgent(agent: Agent): Promise<unknown> {
return this._delete(`/api/agents/${agent.id}`);
}
// eslint-disable-next-line promise/prefer-await-to-callbacks
on(callback: (data: { pipeline?: Pipeline; repo?: Repo; step?: PipelineStep }) => void): EventSource {
return this._subscribe('/stream/events', callback, {

View file

@ -0,0 +1,12 @@
export type Agent = {
id: number;
name: string;
token: string;
created: number;
updated: number;
last_contact: number;
platform: string;
backend: string;
capacity: number;
version: string;
};

View file

@ -1,3 +1,4 @@
export * from './agent';
export * from './cron';
export * from './org';
export * from './pipeline';

View file

@ -6,41 +6,38 @@
<Tab id="secrets" :title="$t('admin.settings.secrets.secrets')">
<AdminSecretsTab />
</Tab>
<Tab id="agents" :title="$t('admin.settings.agents.agents')">
<AdminAgentsTab />
</Tab>
</Scaffold>
</template>
<script lang="ts">
import { defineComponent, onMounted } from 'vue';
<script lang="ts" setup>
import { onMounted } from 'vue';
import { useI18n } from 'vue-i18n';
import { useRouter } from 'vue-router';
import AdminAgentsTab from '~/components/admin/settings/AdminAgentsTab.vue';
import AdminSecretsTab from '~/components/admin/settings/AdminSecretsTab.vue';
import Scaffold from '~/components/layout/scaffold/Scaffold.vue';
import Tab from '~/components/layout/scaffold/Tab.vue';
import useAuthentication from '~/compositions/useAuthentication';
import useNotifications from '~/compositions/useNotifications';
export default defineComponent({
name: 'AdminSettings',
const notifications = useNotifications();
const router = useRouter();
const i18n = useI18n();
const { user } = useAuthentication();
components: {
Tab,
AdminSecretsTab,
Scaffold,
},
onMounted(async () => {
if (!user?.admin) {
notifications.notify({ type: 'error', title: i18n.t('admin.settings.not_allowed') });
await router.replace({ name: 'home' });
}
setup() {
const notifications = useNotifications();
const router = useRouter();
const i18n = useI18n();
const { user } = useAuthentication();
onMounted(async () => {
if (!user?.admin) {
notifications.notify({ type: 'error', title: i18n.t('admin.settings.not_allowed') });
await router.replace({ name: 'home' });
}
});
},
if (!user?.admin) {
notifications.notify({ type: 'error', title: i18n.t('admin.settings.not_allowed') });
await router.replace({ name: 'home' });
}
});
</script>