repo subscriptions functioning in datastore w/ index

This commit is contained in:
Brad Rydzewski 2015-04-13 21:39:05 -07:00
parent 5e7c4e2573
commit 1ad5e3f597
7 changed files with 109 additions and 103 deletions

View file

@ -10,8 +10,4 @@ type User struct {
Admin bool `json:"admin,omitempty"` Admin bool `json:"admin,omitempty"`
Created int64 `json:"created_at,omitempty"` Created int64 `json:"created_at,omitempty"`
Updated int64 `json:"updated_at,omitempty"` Updated int64 `json:"updated_at,omitempty"`
// Repos contains a list of subscriptions
// to repositories the user is watching.
Repos map[string]struct{} `json:"-"`
} }

View file

@ -1,10 +1,11 @@
package bolt package bolt
import ( import (
"bytes"
"time" "time"
"github.com/drone/drone/common"
"github.com/boltdb/bolt" "github.com/boltdb/bolt"
"github.com/drone/drone/common"
) )
// GetRepo gets the repository by name. // GetRepo gets the repository by name.
@ -59,14 +60,21 @@ func (db *DB) UpdateRepo(repo *common.Repo) error {
// InsertRepo inserts a repository in the datastore and // InsertRepo inserts a repository in the datastore and
// subscribes the user to that repository. // subscribes the user to that repository.
func (db *DB) InsertRepo(user *common.User, repo *common.Repo) error { func (db *DB) InsertRepo(user *common.User, repo *common.Repo) error {
key := []byte(repo.FullName) repokey := []byte(repo.FullName)
repo.Created = time.Now().UTC().Unix() repo.Created = time.Now().UTC().Unix()
repo.Updated = time.Now().UTC().Unix() repo.Updated = time.Now().UTC().Unix()
return db.Update(func(t *bolt.Tx) error { return db.Update(func(t *bolt.Tx) error {
// TODO(bradrydzewski) add repo to user index userkey := []byte(user.Login)
// TODO(bradrydzewski) add user to repo index err := push(t, bucketUserRepos, userkey, repokey)
return insert(t, bucketRepo, key, repo) if err != nil {
return err
}
// err = push(t, bucketRepoUsers, repokey, userkey)
// if err != nil {
// return err
// }
return insert(t, bucketRepo, repokey, repo)
}) })
} }
@ -111,25 +119,48 @@ func (db *DB) DeleteRepo(repo *common.Repo) error {
return t.Commit() return t.Commit()
} }
// // GetSubscriber gets the subscriber by login for the // GetSubscriber gets the subscriber by login for the
// // named repository. // named repository.
// func (db *DB) GetSubscriber(repo string, login string) (*common.Subscriber, error) { func (db *DB) GetSubscriber(login, repo string) (*common.Subscriber, error) {
// sub := &common.Subscriber{} sub := &common.Subscriber{}
// key := []byte(login + "/" + repo) err := db.View(func(t *bolt.Tx) error {
// err := get(db, bucketUserRepos, key, sub) repokey := []byte(repo)
// return sub, err
// }
// // InsertSubscriber inserts a subscriber for the named // get the index of user tokens and unmarshal
// // repository. // to a string array.
// func (db *DB) InsertSubscriber(repo string, sub *common.Subscriber) error { var keys [][]byte
// key := []byte(sub.Login + "/" + repo) err := get(t, bucketUserRepos, []byte(login), &keys)
// return insert(db, bucketUserRepos, key, sub) if err != nil && err != ErrKeyNotFound {
// } return err
}
// // DeleteSubscriber removes the subscriber by login for the for _, key := range keys {
// // named repository. if bytes.Equal(repokey, key) {
// func (db *DB) DeleteSubscriber(repo string, sub *common.Subscriber) error { sub.Subscribed = true
// key := []byte(sub.Login + "/" + repo) return nil
// return delete(db, bucketUserRepos, key) }
// } }
return nil
})
return sub, err
}
// InsertSubscriber inserts a subscriber for the named
// repository.
func (db *DB) InsertSubscriber(login, repo string) error {
return db.Update(func(t *bolt.Tx) error {
userkey := []byte(login)
repokey := []byte(repo)
return push(t, bucketUserRepos, userkey, repokey)
})
}
// DeleteSubscriber removes the subscriber by login for the
// named repository.
func (db *DB) DeleteSubscriber(login, repo string) error {
return db.Update(func(t *bolt.Tx) error {
userkey := []byte(login)
repokey := []byte(repo)
return splice(t, bucketUserRepos, userkey, repokey)
})
}

View file

@ -54,39 +54,33 @@ func (db *DB) GetUserTokens(login string) ([]*common.Token, error) {
// GetUserRepos gets a list of repositories for the // GetUserRepos gets a list of repositories for the
// given user account. // given user account.
func (db *DB) GetUserRepos(login string) ([]*common.Repo, error) { func (db *DB) GetUserRepos(login string) ([]*common.Repo, error) {
t, err := db.Begin(false)
if err != nil {
return nil, err
}
defer t.Rollback()
repos := []*common.Repo{} repos := []*common.Repo{}
user := &common.User{} err := db.View(func(t *bolt.Tx) error {
// get the index of user tokens and unmarshal
// get the user struct from the database // to a string array.
key := []byte(login) var keys [][]byte
raw := t.Bucket(bucketUser).Get(key) err := get(t, bucketUserRepos, []byte(login), &keys)
err = decode(raw, user) if err != nil && err != ErrKeyNotFound {
if err != nil { return err
return nil, err
} }
// for each item in the index, get the repository // for each item in the index, get the repository
// and append to the array // and append to the array
for repoName, _ := range user.Repos { for _, key := range keys {
repo := &common.Repo{} repo := &common.Repo{}
key = []byte(repoName) err := get(t, bucketRepo, key, repo)
raw = t.Bucket(bucketRepo).Get(key) if err == ErrKeyNotFound {
if raw == nil { // TODO if we come across ErrKeyNotFound it means
// this will happen when the repository has been deleted // we need to re-build the index
// TODO we should probably upate the index in this case.
continue continue
} } else if err != nil {
err = decode(raw, repo) return err
if err != nil {
break
} }
repos = append(repos, repo) repos = append(repos, repo)
} }
return nil
})
return repos, err return repos, err
} }

View file

@ -74,11 +74,13 @@ func splice(t *bolt.Tx, bucket, index, value []byte) error {
if err != nil && err != ErrKeyNotFound { if err != nil && err != ErrKeyNotFound {
return err return err
} }
for i, key := range keys { for i, key := range keys {
if bytes.Equal(key, value) { if bytes.Equal(key, value) {
keys = keys[:i+copy(keys[i:], keys[i+1:])] keys = keys[:i+copy(keys[i:], keys[i+1:])]
break break
} }
} }
return update(t, bucket, index, &keys) return update(t, bucket, index, &keys)
} }

View file

@ -46,17 +46,17 @@ type Datastore interface {
// DeleteUser deletes the token. // DeleteUser deletes the token.
DeleteToken(*common.Token) error DeleteToken(*common.Token) error
// // GetSubscriber gets the subscriber by login for the // GetSubscriber gets the subscriber by login for the
// // named repository. // named repository.
// GetSubscriber(string, string) (*common.Subscriber, error) GetSubscriber(string, string) (*common.Subscriber, error)
// // InsertSubscriber inserts a subscriber for the named // InsertSubscriber inserts a subscriber for the named
// // repository. // repository.
// InsertSubscriber(string, *common.Subscriber) error InsertSubscriber(string, string) error
// // DeleteSubscriber removes the subscriber by login for the // DeleteSubscriber removes the subscriber by login for the
// // named repository. // named repository.
// DeleteSubscriber(string, *common.Subscriber) error DeleteSubscriber(string, string) error
// GetRepo gets the repository by name. // GetRepo gets the repository by name.
GetRepo(string) (*common.Repo, error) GetRepo(string) (*common.Repo, error)

View file

@ -66,8 +66,7 @@ func GetRepo(c *gin.Context) {
} }
// check to see if the user is subscribing to the repo // check to see if the user is subscribing to the repo
_, ok := user.Repos[repo.FullName] data.Watch, _ = store.GetSubscriber(user.Login, repo.FullName)
data.Watch = &common.Subscriber{Subscribed: ok}
c.JSON(200, data) c.JSON(200, data)
} }
@ -122,8 +121,7 @@ func PutRepo(c *gin.Context) {
data.Params, _ = store.GetRepoParams(repo.FullName) data.Params, _ = store.GetRepoParams(repo.FullName)
// check to see if the user is subscribing to the repo // check to see if the user is subscribing to the repo
_, ok := user.Repos[repo.FullName] data.Watch, _ = store.GetSubscriber(user.Login, repo.FullName)
data.Watch = &common.Subscriber{Subscribed: ok}
c.JSON(200, data) c.JSON(200, data)
} }
@ -207,11 +205,11 @@ func PostRepo(c *gin.Context) {
// activate the repository before we make any // activate the repository before we make any
// local changes to the database. // local changes to the database.
err = remote.Activate(user, r, keypair, link) // err = remote.Activate(user, r, keypair, link)
if err != nil { // if err != nil {
c.Fail(500, err) // c.Fail(500, err)
return // return
} // }
println(link) println(link)
// persist the repository // persist the repository
@ -228,16 +226,6 @@ func PostRepo(c *gin.Context) {
return return
} }
// subscribe the user to the repository
// if this fails we'll ignore, since the user
// can just go click the "watch" button in the
// user interface.
if user.Repos == nil {
user.Repos = map[string]struct{}{}
}
user.Repos[r.FullName] = struct{}{}
store.UpdateUser(user)
c.JSON(200, r) c.JSON(200, r)
} }

View file

@ -1,9 +1,8 @@
package server package server
import ( import (
"github.com/gin-gonic/gin"
"github.com/drone/drone/common" "github.com/drone/drone/common"
"github.com/gin-gonic/gin"
) )
// Unubscribe accapets a request to unsubscribe the // Unubscribe accapets a request to unsubscribe the
@ -16,8 +15,7 @@ func Unsubscribe(c *gin.Context) {
repo := ToRepo(c) repo := ToRepo(c)
user := ToUser(c) user := ToUser(c)
delete(user.Repos, repo.FullName) err := store.DeleteSubscriber(user.Login, repo.FullName)
err := store.UpdateUser(user)
if err != nil { if err != nil {
c.Fail(400, err) c.Fail(400, err)
} else { } else {
@ -34,11 +32,8 @@ func Subscribe(c *gin.Context) {
store := ToDatastore(c) store := ToDatastore(c)
repo := ToRepo(c) repo := ToRepo(c)
user := ToUser(c) user := ToUser(c)
if user.Repos == nil {
user.Repos = map[string]struct{}{} err := store.InsertSubscriber(user.Login, repo.FullName)
}
user.Repos[repo.FullName] = struct{}{}
err := store.UpdateUser(user)
if err != nil { if err != nil {
c.Fail(400, err) c.Fail(400, err)
} else { } else {