feat: copy annexed files on pull request merge

This commit is contained in:
Matthias Riße 2025-02-05 15:44:02 +01:00
parent 1298a315bd
commit cff9cdb2d8
5 changed files with 249 additions and 34 deletions

View file

@ -10,12 +10,16 @@
package annex
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"os"
"path"
"path/filepath"
"strings"
"sync"
"time"
"code.gitea.io/gitea/modules/git"
@ -29,6 +33,16 @@ import (
// ErrBlobIsNotAnnexed occurs if a blob does not contain a valid annex key
var ErrBlobIsNotAnnexed = errors.New("not a git-annex pointer")
func PrivateInit(ctx context.Context, repoPath string) error {
if _, _, err := git.NewCommand(ctx, "config", "annex.private", "true").RunStdString(&git.RunOpts{Dir: repoPath}); err != nil {
return err
}
if _, _, err := git.NewCommand(ctx, "annex", "init").RunStdString(&git.RunOpts{Dir: repoPath}); err != nil {
return err
}
return nil
}
func LookupKey(blob *git.Blob) (string, error) {
stdout, _, err := git.NewCommand(git.DefaultContext, "annex", "lookupkey", "--ref").AddDynamicArguments(blob.ID.String()).RunStdString(&git.RunOpts{Dir: blob.Repo().Path})
if err != nil {
@ -38,6 +52,42 @@ func LookupKey(blob *git.Blob) (string, error) {
return key, nil
}
// LookupKeyBatch runs git annex lookupkey --batch --ref
func LookupKeyBatch(ctx context.Context, shasToBatchReader *io.PipeReader, lookupKeyBatchWriter *io.PipeWriter, wg *sync.WaitGroup, repoPath string) {
defer wg.Done()
defer shasToBatchReader.Close()
defer lookupKeyBatchWriter.Close()
stderr := new(bytes.Buffer)
var errbuf strings.Builder
if err := git.NewCommand(ctx, "annex", "lookupkey", "--batch", "--ref").Run(&git.RunOpts{
Dir: repoPath,
Stdout: lookupKeyBatchWriter,
Stdin: shasToBatchReader,
Stderr: stderr,
}); err != nil {
_ = lookupKeyBatchWriter.CloseWithError(fmt.Errorf("git annex lookupkey --batch --ref [%s]: %w - %s", repoPath, err, errbuf.String()))
}
}
// CopyFromToBatch runs git -c annex.hardlink=true annex copy --batch-keys --from <remote> --to <remote>
func CopyFromToBatch(ctx context.Context, from, to string, keysToCopyReader *io.PipeReader, wg *sync.WaitGroup, repoPath string) {
defer wg.Done()
defer keysToCopyReader.Close()
stdout := new(bytes.Buffer)
stderr := new(bytes.Buffer)
var errbuf strings.Builder
if err := git.NewCommand(ctx, "-c", "annex.hardlink=true", "annex", "copy", "--batch-keys", "--from").AddDynamicArguments(from).AddArguments("--to").AddDynamicArguments(to).Run(&git.RunOpts{
Dir: repoPath,
Stdout: stdout,
Stdin: keysToCopyReader,
Stderr: stderr,
}); err != nil {
_ = keysToCopyReader.CloseWithError(fmt.Errorf("git annex copy --batch-keys --from <remote> --to <remote> [%s]: %w - %s", repoPath, err, errbuf.String()))
}
}
func ContentLocationFromKey(repoPath, key string) (string, error) {
contentLocation, _, err := git.NewCommandContextNoGlobals(git.DefaultContext, "annex", "contentlocation").AddDynamicArguments(key).RunStdString(&git.RunOpts{Dir: repoPath})
if err != nil {
@ -90,6 +140,12 @@ func IsAnnexed(blob *git.Blob) (bool, error) {
return true, nil
}
// PathIsAnnexRepo determines if repoPath is a git-annex enabled repository
func PathIsAnnexRepo(repoPath string) bool {
_, _, err := git.NewCommand(git.DefaultContext, "config", "annex.uuid").RunStdString(&git.RunOpts{Dir: repoPath})
return err == nil
}
// IsAnnexRepo determines if repo is a git-annex enabled repository
func IsAnnexRepo(repo *git.Repository) bool {
_, _, err := git.NewCommand(repo.Ctx, "config", "annex.uuid").RunStdString(&git.RunOpts{Dir: repo.Path})

View file

@ -106,3 +106,36 @@ func BlobsLessThan1024FromCatFileBatchCheck(catFileCheckReader *io.PipeReader, s
}
}
}
// BlobsLessThanOrEqual32KiBFromCatFileBatchCheck reads a pipeline from cat-file --batch-check and returns the blobs <=32KiB in size
func BlobsLessThanOrEqual32KiBFromCatFileBatchCheck(catFileCheckReader *io.PipeReader, shasToBatchWriter *io.PipeWriter, wg *sync.WaitGroup) {
defer wg.Done()
defer catFileCheckReader.Close()
scanner := bufio.NewScanner(catFileCheckReader)
defer func() {
_ = shasToBatchWriter.CloseWithError(scanner.Err())
}()
for scanner.Scan() {
line := scanner.Text()
if len(line) == 0 {
continue
}
fields := strings.Split(line, " ")
if len(fields) < 3 || fields[1] != "blob" {
continue
}
size, _ := strconv.Atoi(fields[2])
if size > 32*1024 {
continue
}
toWrite := []byte(fields[0] + "\n")
for len(toWrite) > 0 {
n, err := shasToBatchWriter.Write(toWrite)
if err != nil {
_ = catFileCheckReader.CloseWithError(err)
break
}
toWrite = toWrite[n:]
}
}
}

61
services/pull/annex.go Normal file
View file

@ -0,0 +1,61 @@
// Copyright 2025 The Forgejo Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package pull
import (
"context"
"io"
"sync"
"code.gitea.io/gitea/modules/annex"
"code.gitea.io/gitea/modules/git/pipeline"
)
// AnnexPush copies all annexed files referenced in new commits from the head repository to the base repository
func AnnexPush(ctx context.Context, tmpBasePath, mergeHeadSHA, mergeBaseSHA string) error {
// Initialize the temporary repository with git-annex
if err := annex.PrivateInit(ctx, tmpBasePath); err != nil {
return err
}
revListReader, revListWriter := io.Pipe()
shasToCheckReader, shasToCheckWriter := io.Pipe()
catFileCheckReader, catFileCheckWriter := io.Pipe()
shasToBatchReader, shasToBatchWriter := io.Pipe()
lookupKeyBatchReader, lookupKeyBatchWriter := io.Pipe()
errChan := make(chan error, 1)
wg := sync.WaitGroup{}
wg.Add(6)
// Create the go-routines in reverse order.
// 6. Take the referenced keys and copy their data from the head repository to
// the base repository
go annex.CopyFromToBatch(ctx, "head_repo", "origin", lookupKeyBatchReader, &wg, tmpBasePath)
// 5. Take the shas of the blobs and resolve them to annex keys, git-annex
// should filter out anything that doesn't reference a key
go annex.LookupKeyBatch(ctx, shasToBatchReader, lookupKeyBatchWriter, &wg, tmpBasePath)
// 4. From the provided objects restrict to blobs <=32KiB
go pipeline.BlobsLessThanOrEqual32KiBFromCatFileBatchCheck(catFileCheckReader, shasToBatchWriter, &wg)
// 3. Run batch-check on the objects retrieved from rev-list
go pipeline.CatFileBatchCheck(ctx, shasToCheckReader, catFileCheckWriter, &wg, tmpBasePath)
// 2. Check each object retrieved rejecting those without names as they will be commits or trees
go pipeline.BlobsFromRevListObjects(revListReader, shasToCheckWriter, &wg)
// 1. Run rev-list objects from mergeHead to mergeBase
go pipeline.RevListObjects(ctx, revListWriter, &wg, tmpBasePath, mergeHeadSHA, mergeBaseSHA, errChan)
wg.Wait()
select {
case err, has := <-errChan:
if has {
return err
}
default:
}
return nil
}

View file

@ -22,6 +22,7 @@ import (
repo_model "code.gitea.io/gitea/models/repo"
"code.gitea.io/gitea/models/unit"
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/annex"
"code.gitea.io/gitea/modules/cache"
"code.gitea.io/gitea/modules/git"
"code.gitea.io/gitea/modules/log"
@ -314,6 +315,12 @@ func doMergeAndPush(ctx context.Context, pr *issues_model.PullRequest, doer *use
}
}
if setting.Annex.Enabled && annex.PathIsAnnexRepo(pr.BaseRepo.RepoPath()) && annex.PathIsAnnexRepo(pr.HeadRepo.RepoPath()) {
if err := AnnexPush(ctx, mergeCtx.tmpBasePath, mergeHeadSHA, mergeBaseSHA); err != nil {
return "", err
}
}
var headUser *user_model.User
err = pr.HeadRepo.LoadOwner(ctx)
if err != nil {

View file

@ -28,9 +28,11 @@ import (
"code.gitea.io/gitea/modules/git"
"code.gitea.io/gitea/modules/setting"
api "code.gitea.io/gitea/modules/structs"
"code.gitea.io/gitea/modules/test"
"code.gitea.io/gitea/modules/util"
"code.gitea.io/gitea/tests"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@ -62,6 +64,95 @@ func doCreateRemoteAnnexRepository(t *testing.T, u *url.URL, ctx APITestContext,
return nil
}
func TestGitAnnexPullRequest(t *testing.T) {
if !setting.Annex.Enabled {
t.Skip("Skipping since annex support is disabled.")
}
defer tests.PrepareTestEnv(t)()
onGiteaRun(t, func(t *testing.T, u *url.URL) {
forEachObjectFormat(t, func(t *testing.T, objectFormat git.ObjectFormat) {
upstreamRepoName := "annex-pull-request-test-" + objectFormat.Name()
forkRepoName := upstreamRepoName
ctx := NewAPITestContext(t, "user2", upstreamRepoName, auth_model.AccessTokenScopeWriteRepository)
require.NoError(t, doCreateRemoteAnnexRepository(t, u, ctx, false, objectFormat))
session := loginUser(t, "user1")
testRepoFork(t, session, "user2", upstreamRepoName, "user1", forkRepoName)
// Generate random file
tmpFile := path.Join(t.TempDir(), "somefile")
require.NoError(t, generateRandomFile(1024*1024/4, tmpFile))
expectedContent, err := os.ReadFile(tmpFile)
require.NoError(t, err)
testUploadFile(t, session, "user1", forkRepoName, setting.Repository.DefaultBranch, filepath.Base(tmpFile), tmpFile)
resp := testPullCreate(t, session, "user1", forkRepoName, false, setting.Repository.DefaultBranch, setting.Repository.DefaultBranch, "Testing git-annex content in a pull request")
elem := strings.Split(test.RedirectURL(resp), "/")
assert.EqualValues(t, "pulls", elem[3])
testPullMerge(t, session, elem[1], elem[2], elem[4], repo_model.MergeStyleMerge, false)
// Get some handles on the target repository and file
remoteRepoPath := path.Join(setting.RepoRootPath, ctx.GitPath())
repo, err := git.OpenRepository(git.DefaultContext, remoteRepoPath)
require.NoError(t, err)
defer repo.Close()
tree, err := repo.GetTree(setting.Repository.DefaultBranch)
require.NoError(t, err)
treeEntry, err := tree.GetTreeEntryByPath(filepath.Base(tmpFile))
require.NoError(t, err)
blob := treeEntry.Blob()
// Check that the pull request file is annexed
isAnnexed, err := annex.IsAnnexed(blob)
require.NoError(t, err)
require.True(t, isAnnexed)
// Check that the pull request file has the correct content
annexedFile, err := annex.Content(blob)
require.NoError(t, err)
actualContent, err := io.ReadAll(annexedFile)
require.NoError(t, err)
require.Equal(t, expectedContent, actualContent)
})
})
}
func testUploadFile(t *testing.T, session *TestSession, username, reponame, branch, filename, path string) {
t.Helper()
body := &bytes.Buffer{}
mpForm := multipart.NewWriter(body)
err := mpForm.WriteField("_csrf", GetCSRF(t, session, username+"/"+reponame+"/_upload/"+branch))
require.NoError(t, err)
file, err := mpForm.CreateFormFile("file", filename)
require.NoError(t, err)
srcFile, err := os.Open(path)
require.NoError(t, err)
io.Copy(file, srcFile)
require.NoError(t, mpForm.Close())
req := NewRequestWithBody(t, "POST", "/"+username+"/"+reponame+"/upload-file", body)
req.Header.Add("Content-Type", mpForm.FormDataContentType())
resp := session.MakeRequest(t, req, http.StatusOK)
respMap := map[string]string{}
DecodeJSON(t, resp, &respMap)
fileUUID := respMap["uuid"]
req = NewRequestWithValues(t, "POST", username+"/"+reponame+"/_upload/"+branch, map[string]string{
"commit_choice": "direct",
"files": fileUUID,
"_csrf": GetCSRF(t, session, username+"/"+reponame+"/_upload/"+branch),
"commit_mail_id": "-1",
})
session.MakeRequest(t, req, http.StatusSeeOther)
}
func TestGitAnnexWebUpload(t *testing.T) {
if !setting.Annex.Enabled {
t.Skip("Skipping since annex support is disabled.")
@ -72,32 +163,6 @@ func TestGitAnnexWebUpload(t *testing.T) {
ctx := NewAPITestContext(t, "user2", "annex-web-upload-test"+objectFormat.Name(), auth_model.AccessTokenScopeWriteRepository)
require.NoError(t, doCreateRemoteAnnexRepository(t, u, ctx, false, objectFormat))
uploadFile := func(t *testing.T, path string) string {
t.Helper()
body := &bytes.Buffer{}
mpForm := multipart.NewWriter(body)
err := mpForm.WriteField("_csrf", GetCSRF(t, ctx.Session, ctx.Username+"/"+ctx.Reponame+"/_upload/"+setting.Repository.DefaultBranch))
require.NoError(t, err)
file, err := mpForm.CreateFormFile("file", filepath.Base(path))
require.NoError(t, err)
srcFile, err := os.Open(path)
require.NoError(t, err)
io.Copy(file, srcFile)
require.NoError(t, mpForm.Close())
req := NewRequestWithBody(t, "POST", "/"+ctx.Username+"/"+ctx.Reponame+"/upload-file", body)
req.Header.Add("Content-Type", mpForm.FormDataContentType())
resp := ctx.Session.MakeRequest(t, req, http.StatusOK)
respMap := map[string]string{}
DecodeJSON(t, resp, &respMap)
return respMap["uuid"]
}
// Generate random file
tmpFile := path.Join(t.TempDir(), "web-upload-test-file.bin")
require.NoError(t, generateRandomFile(1024*1024/4, tmpFile))
@ -105,14 +170,7 @@ func TestGitAnnexWebUpload(t *testing.T) {
require.NoError(t, err)
// Upload generated file
fileUUID := uploadFile(t, tmpFile)
req := NewRequestWithValues(t, "POST", ctx.Username+"/"+ctx.Reponame+"/_upload/"+setting.Repository.DefaultBranch, map[string]string{
"commit_choice": "direct",
"files": fileUUID,
"_csrf": GetCSRF(t, ctx.Session, ctx.Username+"/"+ctx.Reponame+"/_upload/"+setting.Repository.DefaultBranch),
"commit_mail_id": "-1",
})
ctx.Session.MakeRequest(t, req, http.StatusSeeOther)
testUploadFile(t, ctx.Session, ctx.Username, ctx.Reponame, setting.Repository.DefaultBranch, filepath.Base(tmpFile), tmpFile)
// Get some handles on the target repository and file
remoteRepoPath := path.Join(setting.RepoRootPath, ctx.GitPath())