Revert "avoid superfluous synchronized pull_request run when opening a PR"

The fix against the race incorrectly assumes the sha of the commit being
pushed belongs to the base repository. It finds the highest possible
pull request ID from the head repository instead of looking it up in
the base repository.

Figuring out if a PR was created in the future based on the highest
index of the base repository would require collecting all of them
because there is no way to know in advance which repository may be
involved in the race.

Fixing this race can be done either by:

* Introducing a new field in the pull_request table https://codeberg.org/forgejo/forgejo/pulls/2842
  which feels more like a hack than a real solution
* Refactoring the logic
  which would be a significant undertaking

The race has been in the codebase for a very long time and manifests
itself in the CI, when events happen in quick succession. The only
concrete manifestation was however fixed by https://codeberg.org/forgejo/forgejo/issues/2009

Since this race now only exists in theory and not in practice, let's
revert this bugous commit until a proper solution is implemented.

Fixes: https://codeberg.org/forgejo/forgejo/issues/2817

This reverts commit 036f1eddc5.

Conflicts:
	services/pull/pull.go
This commit is contained in:
Earl Warren 2024-03-26 13:40:12 +01:00
parent 57e7650d70
commit ceea9c4334
No known key found for this signature in database
GPG key ID: 0579CB2928A78A00
9 changed files with 94 additions and 336 deletions

View file

@ -9,14 +9,6 @@ import (
"code.gitea.io/gitea/models/db"
)
func GetMaxIssueIndexForRepo(ctx context.Context, repoID int64) (int64, error) {
var max int64
if _, err := db.GetEngine(ctx).Select("MAX(`index`)").Table("issue").Where("repo_id=?", repoID).Get(&max); err != nil {
return 0, err
}
return max, nil
}
// RecalculateIssueIndexForRepo create issue_index for repo if not exist and
// update it based on highest index of existing issues assigned to a repo
func RecalculateIssueIndexForRepo(ctx context.Context, repoID int64) error {
@ -26,8 +18,8 @@ func RecalculateIssueIndexForRepo(ctx context.Context, repoID int64) error {
}
defer committer.Close()
max, err := GetMaxIssueIndexForRepo(ctx, repoID)
if err != nil {
var max int64
if _, err = db.GetEngine(ctx).Select(" MAX(`index`)").Table("issue").Where("repo_id=?", repoID).Get(&max); err != nil {
return err
}

View file

@ -1,38 +0,0 @@
// Copyright 2024 The Forgejo Authors
// SPDX-License-Identifier: MIT
package issues_test
import (
"testing"
"code.gitea.io/gitea/models/db"
issues_model "code.gitea.io/gitea/models/issues"
repo_model "code.gitea.io/gitea/models/repo"
"code.gitea.io/gitea/models/unittest"
"github.com/stretchr/testify/assert"
)
func TestGetMaxIssueIndexForRepo(t *testing.T) {
assert.NoError(t, unittest.PrepareTestDatabase())
repo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: 1})
maxPR, err := issues_model.GetMaxIssueIndexForRepo(db.DefaultContext, repo.ID)
assert.NoError(t, err)
issue := testCreateIssue(t, repo.ID, repo.OwnerID, "title1", "content1", false)
assert.Greater(t, issue.Index, maxPR)
maxPR, err = issues_model.GetMaxIssueIndexForRepo(db.DefaultContext, repo.ID)
assert.NoError(t, err)
pull := testCreateIssue(t, repo.ID, repo.OwnerID, "title2", "content2", true)
assert.Greater(t, pull.Index, maxPR)
maxPR, err = issues_model.GetMaxIssueIndexForRepo(db.DefaultContext, repo.ID)
assert.NoError(t, err)
assert.Equal(t, maxPR, pull.Index)
}

View file

@ -47,14 +47,6 @@ func listPullRequestStatement(ctx context.Context, baseRepoID int64, opts *PullR
return sess, nil
}
func GetUnmergedPullRequestsByHeadInfoMax(ctx context.Context, repoID, maxIndex int64, branch string) ([]*PullRequest, error) {
prs := make([]*PullRequest, 0, 2)
sess := db.GetEngine(ctx).
Join("INNER", "issue", "issue.id = `pull_request`.issue_id").
Where("`pull_request`.head_repo_id = ? AND `pull_request`.head_branch = ? AND `pull_request`.has_merged = ? AND `issue`.is_closed = ? AND `pull_request`.flow = ? AND `issue`.`index` <= ?", repoID, branch, false, false, PullRequestFlowGithub, maxIndex)
return prs, sess.Find(&prs)
}
// GetUnmergedPullRequestsByHeadInfo returns all pull requests that are open and has not been merged
func GetUnmergedPullRequestsByHeadInfo(ctx context.Context, repoID int64, branch string) ([]*PullRequest, error) {
prs := make([]*PullRequest, 0, 2)

View file

@ -4,7 +4,6 @@
package issues_test
import (
"fmt"
"testing"
"code.gitea.io/gitea/models/db"
@ -157,91 +156,6 @@ func TestGetUnmergedPullRequestsByHeadInfo(t *testing.T) {
}
}
func TestGetUnmergedPullRequestsByHeadInfoMax(t *testing.T) {
assert.NoError(t, unittest.PrepareTestDatabase())
repoID := int64(1)
maxPR := int64(0)
prs, err := issues_model.GetUnmergedPullRequestsByHeadInfoMax(db.DefaultContext, repoID, maxPR, "branch2")
assert.NoError(t, err)
assert.Len(t, prs, 0)
maxPR, err = issues_model.GetMaxIssueIndexForRepo(db.DefaultContext, repoID)
assert.NoError(t, err)
prs, err = issues_model.GetUnmergedPullRequestsByHeadInfoMax(db.DefaultContext, repoID, maxPR, "branch2")
assert.NoError(t, err)
assert.Len(t, prs, 1)
for _, pr := range prs {
assert.Equal(t, int64(1), pr.HeadRepoID)
assert.Equal(t, "branch2", pr.HeadBranch)
}
pr := prs[0]
for _, testCase := range []struct {
table string
field string
id int64
match any
nomatch any
}{
{
table: "issue",
field: "is_closed",
id: pr.IssueID,
match: false,
nomatch: true,
},
{
table: "pull_request",
field: "flow",
id: pr.ID,
match: issues_model.PullRequestFlowGithub,
nomatch: issues_model.PullRequestFlowAGit,
},
{
table: "pull_request",
field: "head_repo_id",
id: pr.ID,
match: pr.HeadRepoID,
nomatch: 0,
},
{
table: "pull_request",
field: "head_branch",
id: pr.ID,
match: pr.HeadBranch,
nomatch: "something else",
},
{
table: "pull_request",
field: "has_merged",
id: pr.ID,
match: false,
nomatch: true,
},
} {
t.Run(testCase.field, func(t *testing.T) {
update := fmt.Sprintf("UPDATE `%s` SET `%s` = ? WHERE `id` = ?", testCase.table, testCase.field)
// expect no match
_, err = db.GetEngine(db.DefaultContext).Exec(update, testCase.nomatch, testCase.id)
assert.NoError(t, err)
prs, err = issues_model.GetUnmergedPullRequestsByHeadInfoMax(db.DefaultContext, repoID, maxPR, "branch2")
assert.NoError(t, err)
assert.Len(t, prs, 0)
// expect one match
_, err = db.GetEngine(db.DefaultContext).Exec(update, testCase.match, testCase.id)
assert.NoError(t, err)
prs, err = issues_model.GetUnmergedPullRequestsByHeadInfoMax(db.DefaultContext, repoID, maxPR, "branch2")
assert.NoError(t, err)
assert.Len(t, prs, 1)
// identical to the known PR
assert.Equal(t, pr.ID, prs[0].ID)
})
}
}
func TestGetUnmergedPullRequestsByBaseInfo(t *testing.T) {
assert.NoError(t, unittest.PrepareTestDatabase())
prs, err := issues_model.GetUnmergedPullRequestsByBaseInfo(db.DefaultContext, 1, "master")

View file

@ -187,7 +187,7 @@ func Merge(ctx context.Context, pr *issues_model.PullRequest, doer *user_model.U
}
defer func() {
AddTestPullRequestTask(ctx, doer, pr.BaseRepo.ID, pr.BaseBranch, false, "", "")
go AddTestPullRequestTask(doer, pr.BaseRepo.ID, pr.BaseBranch, false, "", "")
}()
pr.MergedCommitID, err = doMergeAndPush(ctx, pr, doer, mergeStyle, expectedHeadCommitID, message)

View file

@ -292,129 +292,113 @@ func checkForInvalidation(ctx context.Context, requests issues_model.PullRequest
// AddTestPullRequestTask adds new test tasks by given head/base repository and head/base branch,
// and generate new patch for testing as needed.
func AddTestPullRequestTask(ctx context.Context, doer *user_model.User, repoID int64, branch string, isSync bool, oldCommitID, newCommitID string) {
// When TestPullRequest runs it must ignore any PR with an index > maxPR because they
// would have been created after the goroutine started. They are in the future.
// This guards the following race:
// * commit A is pushed
// * goroutine starts but does not run TestPullRequest yet
// * a pull request with commit A as the head is created
// * goroutine continues and runs TestPullRequest
maxPR, err := issues_model.GetMaxIssueIndexForRepo(ctx, repoID)
if err != nil {
log.Error("AddTestPullRequestTask GetMaxIssueIndexForRepo(%d): %v", repoID, err)
return
}
log.Trace("AddTestPullRequestTask [head_repo_id: %d, head_branch: %s]: only pull requests with index <= %d will be considered", repoID, branch, maxPR)
go graceful.GetManager().RunWithShutdownContext(func(ctx context.Context) {
func AddTestPullRequestTask(doer *user_model.User, repoID int64, branch string, isSync bool, oldCommitID, newCommitID string) {
log.Trace("AddTestPullRequestTask [head_repo_id: %d, head_branch: %s]: finding pull requests", repoID, branch)
graceful.GetManager().RunWithShutdownContext(func(ctx context.Context) {
// There is no sensible way to shut this down ":-("
// If you don't let it run all the way then you will lose data
// TODO: graceful: TestPullRequest needs to become a queue!
// TODO: graceful: AddTestPullRequestTask needs to become a queue!
TestPullRequest(ctx, doer, repoID, maxPR, branch, isSync, oldCommitID, newCommitID)
})
}
// GetUnmergedPullRequestsByHeadInfo() only return open and unmerged PR.
prs, err := issues_model.GetUnmergedPullRequestsByHeadInfo(ctx, repoID, branch)
if err != nil {
log.Error("Find pull requests [head_repo_id: %d, head_branch: %s]: %v", repoID, branch, err)
return
}
func TestPullRequest(ctx context.Context, doer *user_model.User, repoID, maxPR int64, branch string, isSync bool, oldCommitID, newCommitID string) {
// GetUnmergedPullRequestsByHeadInfo() only return open and unmerged PR.
prs, err := issues_model.GetUnmergedPullRequestsByHeadInfoMax(ctx, repoID, maxPR, branch)
if err != nil {
log.Error("Find pull requests [head_repo_id: %d, head_branch: %s]: %v", repoID, branch, err)
return
}
for _, pr := range prs {
log.Trace("Updating PR[id=%d,index=%d]: composing new test task", pr.ID, pr.Index)
if pr.Flow == issues_model.PullRequestFlowGithub {
if err := PushToBaseRepo(ctx, pr); err != nil {
log.Error("PushToBaseRepo: %v", err)
for _, pr := range prs {
log.Trace("Updating PR[%d]: composing new test task", pr.ID)
if pr.Flow == issues_model.PullRequestFlowGithub {
if err := PushToBaseRepo(ctx, pr); err != nil {
log.Error("PushToBaseRepo: %v", err)
continue
}
} else {
continue
}
} else {
continue
AddToTaskQueue(ctx, pr)
comment, err := CreatePushPullComment(ctx, doer, pr, oldCommitID, newCommitID)
if err == nil && comment != nil {
notify_service.PullRequestPushCommits(ctx, doer, pr, comment)
}
}
AddToTaskQueue(ctx, pr)
comment, err := CreatePushPullComment(ctx, doer, pr, oldCommitID, newCommitID)
if err == nil && comment != nil {
notify_service.PullRequestPushCommits(ctx, doer, pr, comment)
}
}
if isSync {
requests := issues_model.PullRequestList(prs)
if err = requests.LoadAttributes(ctx); err != nil {
log.Error("PullRequestList.LoadAttributes: %v", err)
}
if invalidationErr := checkForInvalidation(ctx, requests, repoID, doer, branch); invalidationErr != nil {
log.Error("checkForInvalidation: %v", invalidationErr)
}
if err == nil {
for _, pr := range prs {
objectFormat := git.ObjectFormatFromName(pr.BaseRepo.ObjectFormatName)
if newCommitID != "" && newCommitID != objectFormat.EmptyObjectID().String() {
changed, err := checkIfPRContentChanged(ctx, pr, oldCommitID, newCommitID)
if err != nil {
log.Error("checkIfPRContentChanged: %v", err)
}
if changed {
// Mark old reviews as stale if diff to mergebase has changed
if err := issues_model.MarkReviewsAsStale(ctx, pr.IssueID); err != nil {
log.Error("MarkReviewsAsStale: %v", err)
}
// dismiss all approval reviews if protected branch rule item enabled.
pb, err := git_model.GetFirstMatchProtectedBranchRule(ctx, pr.BaseRepoID, pr.BaseBranch)
if isSync {
requests := issues_model.PullRequestList(prs)
if err = requests.LoadAttributes(ctx); err != nil {
log.Error("PullRequestList.LoadAttributes: %v", err)
}
if invalidationErr := checkForInvalidation(ctx, requests, repoID, doer, branch); invalidationErr != nil {
log.Error("checkForInvalidation: %v", invalidationErr)
}
if err == nil {
for _, pr := range prs {
objectFormat := git.ObjectFormatFromName(pr.BaseRepo.ObjectFormatName)
if newCommitID != "" && newCommitID != objectFormat.EmptyObjectID().String() {
changed, err := checkIfPRContentChanged(ctx, pr, oldCommitID, newCommitID)
if err != nil {
log.Error("GetFirstMatchProtectedBranchRule: %v", err)
log.Error("checkIfPRContentChanged: %v", err)
}
if pb != nil && pb.DismissStaleApprovals {
if err := DismissApprovalReviews(ctx, doer, pr); err != nil {
log.Error("DismissApprovalReviews: %v", err)
if changed {
// Mark old reviews as stale if diff to mergebase has changed
if err := issues_model.MarkReviewsAsStale(ctx, pr.IssueID); err != nil {
log.Error("MarkReviewsAsStale: %v", err)
}
// dismiss all approval reviews if protected branch rule item enabled.
pb, err := git_model.GetFirstMatchProtectedBranchRule(ctx, pr.BaseRepoID, pr.BaseBranch)
if err != nil {
log.Error("GetFirstMatchProtectedBranchRule: %v", err)
}
if pb != nil && pb.DismissStaleApprovals {
if err := DismissApprovalReviews(ctx, doer, pr); err != nil {
log.Error("DismissApprovalReviews: %v", err)
}
}
}
if err := issues_model.MarkReviewsAsNotStale(ctx, pr.IssueID, newCommitID); err != nil {
log.Error("MarkReviewsAsNotStale: %v", err)
}
divergence, err := GetDiverging(ctx, pr)
if err != nil {
log.Error("GetDiverging: %v", err)
} else {
err = pr.UpdateCommitDivergence(ctx, divergence.Ahead, divergence.Behind)
if err != nil {
log.Error("UpdateCommitDivergence: %v", err)
}
}
}
if err := issues_model.MarkReviewsAsNotStale(ctx, pr.IssueID, newCommitID); err != nil {
log.Error("MarkReviewsAsNotStale: %v", err)
}
divergence, err := GetDiverging(ctx, pr)
if err != nil {
log.Error("GetDiverging: %v", err)
} else {
err = pr.UpdateCommitDivergence(ctx, divergence.Ahead, divergence.Behind)
if err != nil {
log.Error("UpdateCommitDivergence: %v", err)
}
}
notify_service.PullRequestSynchronized(ctx, doer, pr)
}
notify_service.PullRequestSynchronized(ctx, doer, pr)
}
}
}
log.Trace("TestPullRequest [base_repo_id: %d, base_branch: %s]: finding pull requests", repoID, branch)
prs, err = issues_model.GetUnmergedPullRequestsByBaseInfo(ctx, repoID, branch)
if err != nil {
log.Error("Find pull requests [base_repo_id: %d, base_branch: %s]: %v", repoID, branch, err)
return
}
for _, pr := range prs {
divergence, err := GetDiverging(ctx, pr)
log.Trace("AddTestPullRequestTask [base_repo_id: %d, base_branch: %s]: finding pull requests", repoID, branch)
prs, err = issues_model.GetUnmergedPullRequestsByBaseInfo(ctx, repoID, branch)
if err != nil {
if git_model.IsErrBranchNotExist(err) && !git.IsBranchExist(ctx, pr.HeadRepo.RepoPath(), pr.HeadBranch) {
log.Warn("Cannot test PR %s/%d: head_branch %s no longer exists", pr.BaseRepo.Name, pr.IssueID, pr.HeadBranch)
} else {
log.Error("GetDiverging: %v", err)
}
} else {
err = pr.UpdateCommitDivergence(ctx, divergence.Ahead, divergence.Behind)
if err != nil {
log.Error("UpdateCommitDivergence: %v", err)
}
log.Error("Find pull requests [base_repo_id: %d, base_branch: %s]: %v", repoID, branch, err)
return
}
AddToTaskQueue(ctx, pr)
}
for _, pr := range prs {
divergence, err := GetDiverging(ctx, pr)
if err != nil {
if git_model.IsErrBranchNotExist(err) && !git.IsBranchExist(ctx, pr.HeadRepo.RepoPath(), pr.HeadBranch) {
log.Warn("Cannot test PR %s/%d: head_branch %s no longer exists", pr.BaseRepo.Name, pr.IssueID, pr.HeadBranch)
} else {
log.Error("GetDiverging: %v", err)
}
} else {
err = pr.UpdateCommitDivergence(ctx, divergence.Ahead, divergence.Behind)
if err != nil {
log.Error("UpdateCommitDivergence: %v", err)
}
}
AddToTaskQueue(ctx, pr)
}
})
}
// checkIfPRContentChanged checks if diff to target branch has changed by push

View file

@ -36,7 +36,7 @@ func Update(ctx context.Context, pr *issues_model.PullRequest, doer *user_model.
if rebase {
defer func() {
AddTestPullRequestTask(ctx, doer, pr.BaseRepo.ID, pr.BaseBranch, false, "", "")
go AddTestPullRequestTask(doer, pr.BaseRepo.ID, pr.BaseBranch, false, "", "")
}()
return updateHeadByRebaseOnToBase(ctx, pr, doer, message)
@ -75,7 +75,7 @@ func Update(ctx context.Context, pr *issues_model.PullRequest, doer *user_model.
_, err = doMergeAndPush(ctx, reversePR, doer, repo_model.MergeStyleMerge, "", message)
defer func() {
AddTestPullRequestTask(ctx, doer, reversePR.HeadRepo.ID, reversePR.HeadBranch, false, "", "")
go AddTestPullRequestTask(doer, reversePR.HeadRepo.ID, reversePR.HeadBranch, false, "", "")
}()
return err

View file

@ -166,7 +166,7 @@ func pushUpdates(optsList []*repo_module.PushUpdateOptions) error {
branch := opts.RefFullName.BranchName()
if !opts.IsDelRef() {
log.Trace("TriggerTask '%s/%s' by %s", repo.Name, branch, pusher.Name)
pull_service.AddTestPullRequestTask(ctx, pusher, repo.ID, branch, true, opts.OldCommitID, opts.NewCommitID)
go pull_service.AddTestPullRequestTask(pusher, repo.ID, branch, true, opts.OldCommitID, opts.NewCommitID)
newCommit, err := gitRepo.GetCommit(opts.NewCommitID)
if err != nil {

View file

@ -1,86 +0,0 @@
// Copyright 2024 The Forgejo Authors
// SPDX-License-Identifier: MIT
package integration
import (
"context"
"testing"
"time"
issues_model "code.gitea.io/gitea/models/issues"
repo_model "code.gitea.io/gitea/models/repo"
"code.gitea.io/gitea/models/unittest"
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/git"
"code.gitea.io/gitea/modules/log"
repo_module "code.gitea.io/gitea/modules/repository"
"code.gitea.io/gitea/modules/test"
pull_service "code.gitea.io/gitea/services/pull"
repo_service "code.gitea.io/gitea/services/repository"
"code.gitea.io/gitea/tests"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestPullRequestSynchronized(t *testing.T) {
defer tests.PrepareTestEnv(t)()
// unmerged pull request of user2/repo1 from branch2 to master
pull := unittest.AssertExistsAndLoadBean(t, &issues_model.PullRequest{ID: 2})
// tip of tests/gitea-repositories-meta/user2/repo1 branch2
pull.HeadCommitID = "985f0301dba5e7b34be866819cd15ad3d8f508ee"
require.Equal(t, pull.HeadRepoID, pull.BaseRepoID)
repo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: pull.HeadRepoID})
owner := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: repo.OwnerID})
t.Run("AddTestPullRequestTask", func(t *testing.T) {
logChecker, cleanup := test.NewLogChecker(log.DEFAULT, log.TRACE)
logChecker.Filter("Updating PR").StopMark("TestPullRequest ")
defer cleanup()
opt := &repo_module.PushUpdateOptions{
PusherID: owner.ID,
PusherName: owner.Name,
RepoUserName: owner.Name,
RepoName: repo.Name,
RefFullName: git.RefName("refs/heads/branch2"),
OldCommitID: pull.HeadCommitID,
NewCommitID: pull.HeadCommitID,
}
require.NoError(t, repo_service.PushUpdate(opt))
logFiltered, logStopped := logChecker.Check(5 * time.Second)
assert.True(t, logStopped)
assert.True(t, logFiltered[0])
})
for _, testCase := range []struct {
name string
maxPR int64
expected bool
}{
{
name: "TestPullRequest process PR",
maxPR: pull.Index,
expected: true,
},
{
name: "TestPullRequest skip PR",
maxPR: pull.Index - 1,
expected: false,
},
} {
t.Run(testCase.name, func(t *testing.T) {
logChecker, cleanup := test.NewLogChecker(log.DEFAULT, log.TRACE)
logChecker.Filter("Updating PR").StopMark("TestPullRequest ")
defer cleanup()
pull_service.TestPullRequest(context.Background(), owner, repo.ID, testCase.maxPR, "branch2", true, pull.HeadCommitID, pull.HeadCommitID)
logFiltered, logStopped := logChecker.Check(5 * time.Second)
assert.True(t, logStopped)
assert.Equal(t, testCase.expected, logFiltered[0])
})
}
}