mirror of
https://codeberg.org/forgejo/forgejo.git
synced 2025-03-13 23:32:41 +00:00
refactor(dump): archive in parallel to fix use after close
This commit is contained in:
parent
cc6d5e0367
commit
b36f692a2c
1 changed files with 213 additions and 197 deletions
410
cmd/dump.go
410
cmd/dump.go
|
@ -11,6 +11,7 @@ import (
|
|||
"path"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"code.gitea.io/gitea/models/db"
|
||||
|
@ -25,43 +26,39 @@ import (
|
|||
"github.com/urfave/cli/v2"
|
||||
)
|
||||
|
||||
func addObject(object fs.File, customName string, verbose bool) (archives.FileInfo, error) {
|
||||
func addObject(archiveJobs chan archives.ArchiveAsyncJob, object fs.File, customName string, verbose bool) error {
|
||||
if verbose {
|
||||
log.Info("Adding object %s", customName)
|
||||
log.Info("Adding file %s", customName)
|
||||
}
|
||||
|
||||
info, err := object.Stat()
|
||||
if err != nil {
|
||||
return archives.FileInfo{}, err
|
||||
return err
|
||||
}
|
||||
|
||||
return archives.FileInfo{
|
||||
FileInfo: info,
|
||||
NameInArchive: customName,
|
||||
Open: func() (fs.File, error) {
|
||||
return object, nil
|
||||
ch := make(chan error)
|
||||
|
||||
archiveJobs <- archives.ArchiveAsyncJob{
|
||||
File: archives.FileInfo{
|
||||
FileInfo: info,
|
||||
NameInArchive: customName,
|
||||
Open: func() (fs.File, error) {
|
||||
return object, nil
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
Result: ch,
|
||||
}
|
||||
|
||||
return <-ch
|
||||
}
|
||||
|
||||
func addFile(filePath, absPath string, verbose bool) (archives.FileInfo, error) {
|
||||
if verbose {
|
||||
log.Info("Adding file %s", filePath)
|
||||
}
|
||||
|
||||
info, err := os.Stat(absPath)
|
||||
func addFile(archiveJobs chan archives.ArchiveAsyncJob, filePath, absPath string, verbose bool) error {
|
||||
file, err := os.Open(absPath) // Closed by archiver
|
||||
if err != nil {
|
||||
return archives.FileInfo{}, err
|
||||
return err
|
||||
}
|
||||
|
||||
return archives.FileInfo{
|
||||
FileInfo: info,
|
||||
NameInArchive: filePath,
|
||||
Open: func() (fs.File, error) {
|
||||
// Only open the file when it's needed
|
||||
return os.Open(absPath)
|
||||
},
|
||||
}, nil
|
||||
return addObject(archiveJobs, file, filePath, verbose)
|
||||
}
|
||||
|
||||
func isSubdir(upper, lower string) (bool, error) {
|
||||
|
@ -106,8 +103,8 @@ var outputTypeEnum = &outputType{
|
|||
Default: "zip",
|
||||
}
|
||||
|
||||
func getArchiverByType(outType string) (archives.Archiver, error) {
|
||||
var archiver archives.Archiver
|
||||
func getArchiverByType(outType string) (archives.ArchiverAsync, error) {
|
||||
var archiver archives.ArchiverAsync
|
||||
switch outType {
|
||||
case "zip":
|
||||
archiver = archives.Zip{}
|
||||
|
@ -306,7 +303,8 @@ func runDump(ctx *cli.Context) error {
|
|||
return err
|
||||
}
|
||||
|
||||
var files []archives.FileInfo
|
||||
archiveJobs := make(chan archives.ArchiveAsyncJob)
|
||||
wg := sync.WaitGroup{}
|
||||
archiver, err := getArchiverByType(outType)
|
||||
if err != nil {
|
||||
fatal("Failed to get archiver for extension: %v", err)
|
||||
|
@ -316,29 +314,173 @@ func runDump(ctx *cli.Context) error {
|
|||
log.Info("Skipping local repositories")
|
||||
} else {
|
||||
log.Info("Dumping local repositories... %s", setting.RepoRootPath)
|
||||
l, err := addRecursiveExclude("repos", setting.RepoRootPath, []string{absFileName}, verbose)
|
||||
if err != nil {
|
||||
fatal("Failed to include repositories: %v", err)
|
||||
}
|
||||
files = append(files, l...)
|
||||
wg.Add(1)
|
||||
go dumpRepos(ctx, archiveJobs, &wg, absFileName, verbose)
|
||||
}
|
||||
|
||||
if ctx.IsSet("skip-lfs-data") && ctx.Bool("skip-lfs-data") {
|
||||
log.Info("Skipping LFS data")
|
||||
} else if !setting.LFS.StartServer {
|
||||
log.Info("LFS not enabled - skipping")
|
||||
} else if err := storage.LFS.IterateObjects("", func(objPath string, object storage.Object) error {
|
||||
f, err := addObject(object, path.Join("data", "lfs", objPath), verbose)
|
||||
if err != nil {
|
||||
return err
|
||||
wg.Add(1)
|
||||
go dumpDatabase(ctx, archiveJobs, &wg, verbose)
|
||||
|
||||
if len(setting.CustomConf) > 0 {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
log.Info("Adding custom configuration file from %s", setting.CustomConf)
|
||||
if err := addFile(archiveJobs, "app.ini", setting.CustomConf, verbose); err != nil {
|
||||
fatal("Failed to include specified app.ini: %v", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
files = append(files, f)
|
||||
return nil
|
||||
}); err != nil {
|
||||
fatal("Failed to dump LFS objects: %v", err)
|
||||
if ctx.IsSet("skip-custom-dir") && ctx.Bool("skip-custom-dir") {
|
||||
log.Info("Skipping custom directory")
|
||||
} else {
|
||||
wg.Add(1)
|
||||
go dumpCustom(archiveJobs, &wg, absFileName, verbose)
|
||||
}
|
||||
|
||||
isExist, err := util.IsExist(setting.AppDataPath)
|
||||
if err != nil {
|
||||
log.Error("Failed to check if %s exists: %v", setting.AppDataPath, err)
|
||||
}
|
||||
if isExist {
|
||||
log.Info("Packing data directory...%s", setting.AppDataPath)
|
||||
|
||||
wg.Add(1)
|
||||
go dumpData(ctx, archiveJobs, &wg, absFileName, verbose)
|
||||
}
|
||||
|
||||
if ctx.IsSet("skip-attachment-data") && ctx.Bool("skip-attachment-data") {
|
||||
log.Info("Skipping attachment data")
|
||||
} else {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
if err := storage.Attachments.IterateObjects("", func(objPath string, object storage.Object) error {
|
||||
return addObject(archiveJobs, object, path.Join("data", "attachments", objPath), verbose)
|
||||
}); err != nil {
|
||||
fatal("Failed to dump attachments: %v", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
if ctx.IsSet("skip-package-data") && ctx.Bool("skip-package-data") {
|
||||
log.Info("Skipping package data")
|
||||
} else if !setting.Packages.Enabled {
|
||||
log.Info("Package registry not enabled - skipping")
|
||||
} else {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
if err := storage.Packages.IterateObjects("", func(objPath string, object storage.Object) error {
|
||||
return addObject(archiveJobs, object, path.Join("data", "packages", objPath), verbose)
|
||||
}); err != nil {
|
||||
fatal("Failed to dump packages: %v", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Doesn't check if LogRootPath exists before processing --skip-log intentionally,
|
||||
// ensuring that it's clear the dump is skipped whether the directory's initialized
|
||||
// yet or not.
|
||||
if ctx.IsSet("skip-log") && ctx.Bool("skip-log") {
|
||||
log.Info("Skipping log files")
|
||||
} else {
|
||||
isExist, err := util.IsExist(setting.Log.RootPath)
|
||||
if err != nil {
|
||||
log.Error("Failed to check if %s exists: %v", setting.Log.RootPath, err)
|
||||
}
|
||||
if isExist {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
if err := addRecursiveExclude(archiveJobs, "log", setting.Log.RootPath, []string{absFileName}, verbose); err != nil {
|
||||
fatal("Failed to include log: %v", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for all jobs to finish before closing the channel
|
||||
// ArchiveAsync will only return after the channel is closed
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(archiveJobs)
|
||||
}()
|
||||
|
||||
if err := archiver.ArchiveAsync(ctx.Context, file, archiveJobs); err != nil {
|
||||
_ = util.Remove(fileName)
|
||||
|
||||
fatal("Archiving failed: %v", err)
|
||||
}
|
||||
|
||||
if fileName != "-" {
|
||||
if err := os.Chmod(fileName, 0o600); err != nil {
|
||||
log.Info("Can't change file access permissions mask to 0600: %v", err)
|
||||
}
|
||||
|
||||
log.Info("Finished dumping in file %s", fileName)
|
||||
} else {
|
||||
log.Info("Finished dumping to stdout")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func dumpData(ctx *cli.Context, archiveJobs chan archives.ArchiveAsyncJob, wg *sync.WaitGroup, absFileName string, verbose bool) {
|
||||
defer wg.Done()
|
||||
|
||||
var excludes []string
|
||||
if setting.SessionConfig.OriginalProvider == "file" {
|
||||
var opts session.Options
|
||||
if err := json.Unmarshal([]byte(setting.SessionConfig.ProviderConfig), &opts); err != nil {
|
||||
fatal("Failed to parse session config: %v", err)
|
||||
}
|
||||
excludes = append(excludes, opts.ProviderConfig)
|
||||
}
|
||||
|
||||
if ctx.IsSet("skip-index") && ctx.Bool("skip-index") {
|
||||
log.Info("Skipping bleve index data")
|
||||
excludes = append(excludes, setting.Indexer.RepoPath)
|
||||
excludes = append(excludes, setting.Indexer.IssuePath)
|
||||
}
|
||||
|
||||
if ctx.IsSet("skip-repo-archives") && ctx.Bool("skip-repo-archives") {
|
||||
log.Info("Skipping repository archives data")
|
||||
excludes = append(excludes, setting.RepoArchive.Storage.Path)
|
||||
}
|
||||
|
||||
excludes = append(excludes, setting.RepoRootPath)
|
||||
excludes = append(excludes, setting.LFS.Storage.Path)
|
||||
excludes = append(excludes, setting.Attachment.Storage.Path)
|
||||
excludes = append(excludes, setting.Packages.Storage.Path)
|
||||
excludes = append(excludes, setting.Log.RootPath)
|
||||
excludes = append(excludes, absFileName)
|
||||
if err := addRecursiveExclude(archiveJobs, "data", setting.AppDataPath, excludes, verbose); err != nil {
|
||||
fatal("Failed to include data directory: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func dumpCustom(archiveJobs chan archives.ArchiveAsyncJob, wg *sync.WaitGroup, absFileName string, verbose bool) {
|
||||
defer wg.Done()
|
||||
|
||||
customDir, err := os.Stat(setting.CustomPath)
|
||||
if err == nil && customDir.IsDir() {
|
||||
if is, _ := isSubdir(setting.AppDataPath, setting.CustomPath); !is {
|
||||
if err := addRecursiveExclude(archiveJobs, "custom", setting.CustomPath, []string{absFileName}, verbose); err != nil {
|
||||
fatal("Failed to include custom: %v", err)
|
||||
}
|
||||
} else {
|
||||
log.Info("Custom dir %s is inside data dir %s, skipping", setting.CustomPath, setting.AppDataPath)
|
||||
}
|
||||
} else {
|
||||
log.Info("Custom dir %s does not exist, skipping", setting.CustomPath)
|
||||
}
|
||||
}
|
||||
|
||||
func dumpDatabase(ctx *cli.Context, archiveJobs chan archives.ArchiveAsyncJob, wg *sync.WaitGroup, verbose bool) {
|
||||
defer wg.Done()
|
||||
|
||||
tmpDir := ctx.String("tempdir")
|
||||
if _, err := os.Stat(tmpDir); os.IsNotExist(err) {
|
||||
fatal("Path does not exist: %s", tmpDir)
|
||||
|
@ -366,165 +508,45 @@ func runDump(ctx *cli.Context) error {
|
|||
fatal("Failed to dump database: %v", err)
|
||||
}
|
||||
|
||||
f, err := addFile("forgejo-db.sql", dbDump.Name(), verbose)
|
||||
if err != nil {
|
||||
if err := addFile(archiveJobs, "forgejo-db.sql", dbDump.Name(), verbose); err != nil {
|
||||
fatal("Failed to include forgejo-db.sql: %v", err)
|
||||
}
|
||||
files = append(files, f)
|
||||
}
|
||||
|
||||
if len(setting.CustomConf) > 0 {
|
||||
log.Info("Adding custom configuration file from %s", setting.CustomConf)
|
||||
f, err := addFile("app.ini", setting.CustomConf, verbose)
|
||||
if err != nil {
|
||||
fatal("Failed to include specified app.ini: %v", err)
|
||||
}
|
||||
files = append(files, f)
|
||||
func dumpRepos(ctx *cli.Context, archiveJobs chan archives.ArchiveAsyncJob, wg *sync.WaitGroup, absFileName string, verbose bool) {
|
||||
defer wg.Done()
|
||||
|
||||
if err := addRecursiveExclude(archiveJobs, "repos", setting.RepoRootPath, []string{absFileName}, verbose); err != nil {
|
||||
fatal("Failed to include repositories: %v", err)
|
||||
}
|
||||
|
||||
if ctx.IsSet("skip-custom-dir") && ctx.Bool("skip-custom-dir") {
|
||||
log.Info("Skipping custom directory")
|
||||
} else {
|
||||
customDir, err := os.Stat(setting.CustomPath)
|
||||
if err == nil && customDir.IsDir() {
|
||||
if is, _ := isSubdir(setting.AppDataPath, setting.CustomPath); !is {
|
||||
l, err := addRecursiveExclude("custom", setting.CustomPath, []string{absFileName}, verbose)
|
||||
if err != nil {
|
||||
fatal("Failed to include custom: %v", err)
|
||||
}
|
||||
files = append(files, l...)
|
||||
} else {
|
||||
log.Info("Custom dir %s is inside data dir %s, skipping", setting.CustomPath, setting.AppDataPath)
|
||||
}
|
||||
} else {
|
||||
log.Info("Custom dir %s does not exist, skipping", setting.CustomPath)
|
||||
}
|
||||
}
|
||||
|
||||
isExist, err := util.IsExist(setting.AppDataPath)
|
||||
if err != nil {
|
||||
log.Error("Failed to check if %s exists: %v", setting.AppDataPath, err)
|
||||
}
|
||||
if isExist {
|
||||
log.Info("Packing data directory...%s", setting.AppDataPath)
|
||||
|
||||
var excludes []string
|
||||
if setting.SessionConfig.OriginalProvider == "file" {
|
||||
var opts session.Options
|
||||
if err = json.Unmarshal([]byte(setting.SessionConfig.ProviderConfig), &opts); err != nil {
|
||||
return err
|
||||
}
|
||||
excludes = append(excludes, opts.ProviderConfig)
|
||||
}
|
||||
|
||||
if ctx.IsSet("skip-index") && ctx.Bool("skip-index") {
|
||||
log.Info("Skipping bleve index data")
|
||||
excludes = append(excludes, setting.Indexer.RepoPath)
|
||||
excludes = append(excludes, setting.Indexer.IssuePath)
|
||||
}
|
||||
|
||||
if ctx.IsSet("skip-repo-archives") && ctx.Bool("skip-repo-archives") {
|
||||
log.Info("Skipping repository archives data")
|
||||
excludes = append(excludes, setting.RepoArchive.Storage.Path)
|
||||
}
|
||||
|
||||
excludes = append(excludes, setting.RepoRootPath)
|
||||
excludes = append(excludes, setting.LFS.Storage.Path)
|
||||
excludes = append(excludes, setting.Attachment.Storage.Path)
|
||||
excludes = append(excludes, setting.Packages.Storage.Path)
|
||||
excludes = append(excludes, setting.Log.RootPath)
|
||||
excludes = append(excludes, absFileName)
|
||||
l, err := addRecursiveExclude("data", setting.AppDataPath, excludes, verbose)
|
||||
if err != nil {
|
||||
fatal("Failed to include data directory: %v", err)
|
||||
}
|
||||
files = append(files, l...)
|
||||
}
|
||||
|
||||
if ctx.IsSet("skip-attachment-data") && ctx.Bool("skip-attachment-data") {
|
||||
log.Info("Skipping attachment data")
|
||||
} else if err := storage.Attachments.IterateObjects("", func(objPath string, object storage.Object) error {
|
||||
f, err := addObject(object, path.Join("data", "attachments", objPath), verbose)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
files = append(files, f)
|
||||
return nil
|
||||
if ctx.IsSet("skip-lfs-data") && ctx.Bool("skip-lfs-data") {
|
||||
log.Info("Skipping LFS data")
|
||||
} else if !setting.LFS.StartServer {
|
||||
log.Info("LFS not enabled - skipping")
|
||||
} else if err := storage.LFS.IterateObjects("", func(objPath string, object storage.Object) error {
|
||||
return addObject(archiveJobs, object, path.Join("data", "lfs", objPath), verbose)
|
||||
}); err != nil {
|
||||
fatal("Failed to dump attachments: %v", err)
|
||||
fatal("Failed to dump LFS objects: %v", err)
|
||||
}
|
||||
|
||||
if ctx.IsSet("skip-package-data") && ctx.Bool("skip-package-data") {
|
||||
log.Info("Skipping package data")
|
||||
} else if !setting.Packages.Enabled {
|
||||
log.Info("Package registry not enabled - skipping")
|
||||
} else if err := storage.Packages.IterateObjects("", func(objPath string, object storage.Object) error {
|
||||
f, err := addObject(object, path.Join("data", "packages", objPath), verbose)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
files = append(files, f)
|
||||
return nil
|
||||
}); err != nil {
|
||||
fatal("Failed to dump packages: %v", err)
|
||||
}
|
||||
|
||||
// Doesn't check if LogRootPath exists before processing --skip-log intentionally,
|
||||
// ensuring that it's clear the dump is skipped whether the directory's initialized
|
||||
// yet or not.
|
||||
if ctx.IsSet("skip-log") && ctx.Bool("skip-log") {
|
||||
log.Info("Skipping log files")
|
||||
} else {
|
||||
isExist, err := util.IsExist(setting.Log.RootPath)
|
||||
if err != nil {
|
||||
log.Error("Failed to check if %s exists: %v", setting.Log.RootPath, err)
|
||||
}
|
||||
if isExist {
|
||||
l, err := addRecursiveExclude("log", setting.Log.RootPath, []string{absFileName}, verbose)
|
||||
if err != nil {
|
||||
fatal("Failed to include log: %v", err)
|
||||
}
|
||||
files = append(files, l...)
|
||||
}
|
||||
}
|
||||
|
||||
if err := archiver.Archive(ctx.Context, file, files); err != nil {
|
||||
_ = util.Remove(fileName)
|
||||
|
||||
fatal("Archiving failed: %v", err)
|
||||
}
|
||||
|
||||
if fileName != "-" {
|
||||
if err := os.Chmod(fileName, 0o600); err != nil {
|
||||
log.Info("Can't change file access permissions mask to 0600: %v", err)
|
||||
}
|
||||
|
||||
log.Info("Finished dumping in file %s", fileName)
|
||||
} else {
|
||||
log.Info("Finished dumping to stdout")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// addRecursiveExclude zips absPath to specified insidePath inside writer excluding excludeAbsPath
|
||||
// archives.FilesFromDisk doesn't support excluding files, so we have to do it manually
|
||||
func addRecursiveExclude(insidePath, absPath string, excludeAbsPath []string, verbose bool) ([]archives.FileInfo, error) {
|
||||
var list []archives.FileInfo
|
||||
func addRecursiveExclude(archiveJobs chan archives.ArchiveAsyncJob, insidePath, absPath string, excludeAbsPath []string, verbose bool) error {
|
||||
absPath, err := filepath.Abs(absPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
dir, err := os.Open(absPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
defer dir.Close()
|
||||
|
||||
files, err := dir.Readdir(0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
for _, file := range files {
|
||||
currentAbsPath := filepath.Join(absPath, file.Name())
|
||||
|
@ -536,39 +558,33 @@ func addRecursiveExclude(insidePath, absPath string, excludeAbsPath []string, ve
|
|||
}
|
||||
|
||||
if file.IsDir() {
|
||||
f, err := addFile(currentInsidePath, currentAbsPath, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
if err := addFile(archiveJobs, currentInsidePath, currentAbsPath, false); err != nil {
|
||||
return err
|
||||
}
|
||||
list = append(list, f)
|
||||
|
||||
l, err := addRecursiveExclude(currentInsidePath, currentAbsPath, excludeAbsPath, verbose)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
if err := addRecursiveExclude(archiveJobs, currentInsidePath, currentAbsPath, excludeAbsPath, verbose); err != nil {
|
||||
return err
|
||||
}
|
||||
list = append(list, l...)
|
||||
} else {
|
||||
// only copy regular files and symlink regular files, skip non-regular files like socket/pipe/...
|
||||
shouldAdd := file.Mode().IsRegular()
|
||||
if !shouldAdd && file.Mode()&os.ModeSymlink == os.ModeSymlink {
|
||||
target, err := filepath.EvalSymlinks(currentAbsPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
targetStat, err := os.Stat(target)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
shouldAdd = targetStat.Mode().IsRegular()
|
||||
}
|
||||
if shouldAdd {
|
||||
f, err := addFile(currentInsidePath, currentAbsPath, verbose)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
if err := addFile(archiveJobs, currentInsidePath, currentAbsPath, verbose); err != nil {
|
||||
return err
|
||||
}
|
||||
list = append(list, f)
|
||||
}
|
||||
}
|
||||
}
|
||||
return list, nil
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue