From d252a416d0aa378612791e06f8ac43107d60f901 Mon Sep 17 00:00:00 2001 From: WithoutPants <53250216+WithoutPants@users.noreply.github.com> Date: Tue, 27 Jan 2026 17:42:15 +1100 Subject: [PATCH] Refactor file scanning and handling logic (#6498) - Moved directory walking and queuing functionality into scan task code --- internal/manager/manager_tasks.go | 6 + internal/manager/task_scan.go | 289 ++++++++++++++- pkg/file/file.go | 24 ++ pkg/file/folder_rename_detect.go | 10 +- pkg/file/scan.go | 594 ++++++------------------------ pkg/file/walk.go | 4 +- pkg/file/zip.go | 4 +- 7 files changed, 431 insertions(+), 500 deletions(-) diff --git a/internal/manager/manager_tasks.go b/internal/manager/manager_tasks.go index 1e66433be..bac726c1b 100644 --- a/internal/manager/manager_tasks.go +++ b/internal/manager/manager_tasks.go @@ -100,6 +100,8 @@ func (s *Manager) Scan(ctx context.Context, input ScanMetadataInput) (int, error return 0, err } + cfg := config.GetInstance() + scanner := &file.Scanner{ Repository: file.NewRepository(s.Repository), FileDecorators: []file.Decorator{ @@ -118,6 +120,10 @@ func (s *Manager) Scan(ctx context.Context, input ScanMetadataInput) (int, error }, FingerprintCalculator: &fingerprintCalculator{s.Config}, FS: &file.OsFS{}, + ZipFileExtensions: cfg.GetGalleryExtensions(), + // ScanFilters is set in ScanJob.Execute + // HandlerRequiredFilters is set in ScanJob.Execute + Rescan: input.Rescan, } scanJob := ScanJob{ diff --git a/internal/manager/task_scan.go b/internal/manager/task_scan.go index fc1a4770f..d09765577 100644 --- a/internal/manager/task_scan.go +++ b/internal/manager/task_scan.go @@ -2,13 +2,17 @@ package manager import ( "context" + "errors" "fmt" "io/fs" "path/filepath" "regexp" + "runtime/debug" + "sync" "time" "github.com/99designs/gqlgen/graphql/handler/lru" + "github.com/remeh/sizedwaitgroup" "github.com/stashapp/stash/internal/manager/config" "github.com/stashapp/stash/pkg/file" "github.com/stashapp/stash/pkg/file/video" @@ -24,14 +28,13 @@ import ( "github.com/stashapp/stash/pkg/txn" ) -type scanner interface { - Scan(ctx context.Context, handlers []file.Handler, options file.ScanOptions, progressReporter file.ProgressReporter) -} - type ScanJob struct { - scanner scanner + scanner *file.Scanner input ScanMetadataInput subscriptions *subscriptionManager + + fileQueue chan file.ScannedFile + count int } func (j *ScanJob) Execute(ctx context.Context, progress *job.Progress) error { @@ -55,22 +58,22 @@ func (j *ScanJob) Execute(ctx context.Context, progress *job.Progress) error { start := time.Now() + nTasks := cfg.GetParallelTasksWithAutoDetection() + const taskQueueSize = 200000 - taskQueue := job.NewTaskQueue(ctx, progress, taskQueueSize, cfg.GetParallelTasksWithAutoDetection()) + taskQueue := job.NewTaskQueue(ctx, progress, taskQueueSize, nTasks) var minModTime time.Time if j.input.Filter != nil && j.input.Filter.MinModTime != nil { minModTime = *j.input.Filter.MinModTime } - j.scanner.Scan(ctx, getScanHandlers(j.input, taskQueue, progress), file.ScanOptions{ - Paths: paths, - ScanFilters: []file.PathFilter{newScanFilter(c, repo, minModTime)}, - ZipFileExtensions: cfg.GetGalleryExtensions(), - ParallelTasks: cfg.GetParallelTasksWithAutoDetection(), - HandlerRequiredFilters: []file.Filter{newHandlerRequiredFilter(cfg, repo)}, - Rescan: j.input.Rescan, - }, progress) + // HACK - these should really be set in the scanner initialization + j.scanner.FileHandlers = getScanHandlers(j.input, taskQueue, progress) + j.scanner.ScanFilters = []file.PathFilter{newScanFilter(c, repo, minModTime)} + j.scanner.HandlerRequiredFilters = []file.Filter{newHandlerRequiredFilter(cfg, repo)} + + j.runJob(ctx, paths, nTasks, progress) taskQueue.Close() @@ -86,6 +89,264 @@ func (j *ScanJob) Execute(ctx context.Context, progress *job.Progress) error { return nil } +func (j *ScanJob) runJob(ctx context.Context, paths []string, nTasks int, progress *job.Progress) { + var wg sync.WaitGroup + wg.Add(1) + + j.fileQueue = make(chan file.ScannedFile, scanQueueSize) + + go func() { + defer func() { + wg.Done() + + // handle panics in goroutine + if p := recover(); p != nil { + logger.Errorf("panic while queuing files for scan: %v", p) + logger.Errorf(string(debug.Stack())) + } + }() + + if err := j.queueFiles(ctx, paths, progress); err != nil { + if errors.Is(err, context.Canceled) { + return + } + + logger.Errorf("error queuing files for scan: %v", err) + return + } + + logger.Infof("Finished adding files to queue. %d files queued", j.count) + }() + + defer wg.Wait() + + j.processQueue(ctx, nTasks, progress) +} + +const scanQueueSize = 200000 + +func (j *ScanJob) queueFiles(ctx context.Context, paths []string, progress *job.Progress) error { + fs := &file.OsFS{} + + defer func() { + close(j.fileQueue) + + progress.AddTotal(j.count) + progress.Definite() + }() + + var err error + progress.ExecuteTask("Walking directory tree", func() { + for _, p := range paths { + err = file.SymWalk(fs, p, j.queueFileFunc(ctx, fs, nil, progress)) + if err != nil { + return + } + } + }) + + return err +} + +func (j *ScanJob) queueFileFunc(ctx context.Context, f models.FS, zipFile *file.ScannedFile, progress *job.Progress) fs.WalkDirFunc { + return func(path string, d fs.DirEntry, err error) error { + if err != nil { + // don't let errors prevent scanning + logger.Errorf("error scanning %s: %v", path, err) + return nil + } + + if err = ctx.Err(); err != nil { + return err + } + + info, err := d.Info() + if err != nil { + logger.Errorf("reading info for %q: %v", path, err) + return nil + } + + if !j.scanner.AcceptEntry(ctx, path, info) { + if info.IsDir() { + logger.Debugf("Skipping directory %s", path) + return fs.SkipDir + } + + logger.Debugf("Skipping file %s", path) + return nil + } + + size, err := file.GetFileSize(f, path, info) + if err != nil { + return err + } + + ff := file.ScannedFile{ + BaseFile: &models.BaseFile{ + DirEntry: models.DirEntry{ + ModTime: file.ModTime(info), + }, + Path: path, + Basename: filepath.Base(path), + Size: size, + }, + FS: f, + Info: info, + } + + if zipFile != nil { + ff.ZipFileID = &zipFile.ID + ff.ZipFile = zipFile + } + + if info.IsDir() { + // handle folders immediately + if err := j.handleFolder(ctx, ff, progress); err != nil { + if !errors.Is(err, context.Canceled) { + logger.Errorf("error processing %q: %v", path, err) + } + + // skip the directory since we won't be able to process the files anyway + return fs.SkipDir + } + + return nil + } + + // if zip file is present, we handle immediately + if zipFile != nil { + progress.ExecuteTask("Scanning "+path, func() { + // don't increment progress in zip files + if err := j.handleFile(ctx, ff, nil); err != nil { + if !errors.Is(err, context.Canceled) { + logger.Errorf("error processing %q: %v", path, err) + } + // don't return an error, just skip the file + } + }) + + return nil + } + + logger.Tracef("Queueing file %s for scanning", path) + j.fileQueue <- ff + + j.count++ + + return nil + } +} + +func (j *ScanJob) processQueue(ctx context.Context, parallelTasks int, progress *job.Progress) { + if parallelTasks < 1 { + parallelTasks = 1 + } + + wg := sizedwaitgroup.New(parallelTasks) + + func() { + defer func() { + wg.Wait() + + // handle panics in goroutine + if p := recover(); p != nil { + logger.Errorf("panic while scanning files: %v", p) + logger.Errorf(string(debug.Stack())) + } + }() + + for f := range j.fileQueue { + logger.Tracef("Processing queued file %s", f.Path) + if err := ctx.Err(); err != nil { + return + } + + wg.Add() + ff := f + go func() { + defer wg.Done() + j.processQueueItem(ctx, ff, progress) + }() + } + }() +} + +func (j *ScanJob) processQueueItem(ctx context.Context, f file.ScannedFile, progress *job.Progress) { + progress.ExecuteTask("Scanning "+f.Path, func() { + var err error + if f.Info.IsDir() { + err = j.handleFolder(ctx, f, progress) + } else { + err = j.handleFile(ctx, f, progress) + } + + if err != nil && !errors.Is(err, context.Canceled) { + logger.Errorf("error processing %q: %v", f.Path, err) + } + }) +} + +func (j *ScanJob) handleFolder(ctx context.Context, f file.ScannedFile, progress *job.Progress) error { + if progress != nil { + defer progress.Increment() + } + + _, err := j.scanner.ScanFolder(ctx, f) + if err != nil { + return err + } + + return nil +} + +func (j *ScanJob) handleFile(ctx context.Context, f file.ScannedFile, progress *job.Progress) error { + if progress != nil { + defer progress.Increment() + } + + r, err := j.scanner.ScanFile(ctx, f) + if err != nil { + return err + } + + // handle rename should have already handled the contents of the zip file + // so shouldn't need to scan it again + + if (r.New || r.Updated) && j.scanner.IsZipFile(f.Info.Name()) { + ff := r.File + f.BaseFile = ff.Base() + + // scan zip files with a different context that is not cancellable + // cancelling while scanning zip file contents results in the scan + // contents being partially completed + zipCtx := context.WithoutCancel(ctx) + + if err := j.scanZipFile(zipCtx, f, progress); err != nil { + logger.Errorf("Error scanning zip file %q: %v", f.Path, err) + } + } + + return nil +} + +func (j *ScanJob) scanZipFile(ctx context.Context, f file.ScannedFile, progress *job.Progress) error { + zipFS, err := f.FS.OpenZip(f.Path, f.Size) + if err != nil { + if errors.Is(err, file.ErrNotReaderAt) { + // can't walk the zip file + // just return + logger.Debugf("Skipping zip file %q as it cannot be opened for walking", f.Path) + return nil + } + + return err + } + + defer zipFS.Close() + + return file.SymWalk(zipFS, f.Path, j.queueFileFunc(ctx, zipFS, &f, progress)) +} + type extensionConfig struct { vidExt []string imgExt []string diff --git a/pkg/file/file.go b/pkg/file/file.go index 407949ba1..b93083b35 100644 --- a/pkg/file/file.go +++ b/pkg/file/file.go @@ -3,6 +3,10 @@ package file import ( "context" + "fmt" + "io/fs" + "os" + "time" "github.com/stashapp/stash/pkg/models" "github.com/stashapp/stash/pkg/txn" @@ -35,3 +39,23 @@ func (r *Repository) WithReadTxn(ctx context.Context, fn txn.TxnFunc) error { func (r *Repository) WithDB(ctx context.Context, fn txn.TxnFunc) error { return txn.WithDatabase(ctx, r.TxnManager, fn) } + +// ModTime returns the modification time truncated to seconds. +func ModTime(info fs.FileInfo) time.Time { + // truncate to seconds, since we don't store beyond that in the database + return info.ModTime().Truncate(time.Second) +} + +// GetFileSize gets the size of the file, taking into account symlinks. +func GetFileSize(f models.FS, path string, info fs.FileInfo) (int64, error) { + // #2196/#3042 - replace size with target size if file is a symlink + if info.Mode()&os.ModeSymlink == os.ModeSymlink { + targetInfo, err := f.Stat(path) + if err != nil { + return 0, fmt.Errorf("reading info for symlink %q: %w", path, err) + } + return targetInfo.Size(), nil + } + + return info.Size(), nil +} diff --git a/pkg/file/folder_rename_detect.go b/pkg/file/folder_rename_detect.go index 4c057461b..cfae7e4fb 100644 --- a/pkg/file/folder_rename_detect.go +++ b/pkg/file/folder_rename_detect.go @@ -75,7 +75,7 @@ func (d *folderRenameDetector) bestCandidate() *models.Folder { return best.folder } -func (s *scanJob) detectFolderMove(ctx context.Context, file scanFile) (*models.Folder, error) { +func (s *Scanner) detectFolderMove(ctx context.Context, file ScannedFile) (*models.Folder, error) { // in order for a folder to be considered moved, the existing folder must be // missing, and the majority of the old folder's files must be present, unchanged, // in the new folder. @@ -88,7 +88,7 @@ func (s *scanJob) detectFolderMove(ctx context.Context, file scanFile) (*models. r := s.Repository - if err := symWalk(file.fs, file.Path, func(path string, d fs.DirEntry, err error) error { + if err := SymWalk(file.FS, file.Path, func(path string, d fs.DirEntry, err error) error { if err != nil { // don't let errors prevent scanning logger.Errorf("error scanning %s: %v", path, err) @@ -111,11 +111,11 @@ func (s *scanJob) detectFolderMove(ctx context.Context, file scanFile) (*models. return nil } - if !s.acceptEntry(ctx, path, info) { + if !s.AcceptEntry(ctx, path, info) { return nil } - size, err := getFileSize(file.fs, path, info) + size, err := GetFileSize(file.FS, path, info) if err != nil { return fmt.Errorf("getting file size for %q: %w", path, err) } @@ -154,7 +154,7 @@ func (s *scanJob) detectFolderMove(ctx context.Context, file scanFile) (*models. } // parent folder must be missing - _, err = file.fs.Lstat(pf.Path) + _, err = file.FS.Lstat(pf.Path) if err == nil { // parent folder exists, not a candidate detector.reject(parentFolderID) diff --git a/pkg/file/scan.go b/pkg/file/scan.go index 36b409c89..d9a58ad44 100644 --- a/pkg/file/scan.go +++ b/pkg/file/scan.go @@ -2,29 +2,18 @@ package file import ( "context" - "errors" "fmt" "io/fs" - "os" "path/filepath" - "runtime/debug" "strings" "sync" "time" - "github.com/remeh/sizedwaitgroup" "github.com/stashapp/stash/pkg/logger" "github.com/stashapp/stash/pkg/models" "github.com/stashapp/stash/pkg/txn" ) -const ( - scanQueueSize = 200000 - // maximum number of times to retry in the event of a locked database - // use -1 to retry forever - maxRetries = -1 -) - // Scanner scans files into the database. // // The scan process works using two goroutines. The first walks through the provided paths @@ -55,8 +44,26 @@ type Scanner struct { Repository Repository FingerprintCalculator FingerprintCalculator + // ZipFileExtensions is a list of file extensions that are considered zip files. + // Extension does not include the . character. + ZipFileExtensions []string + + // ScanFilters are used to determine if a file should be scanned. + ScanFilters []PathFilter + + // HandlerRequiredFilters are used to determine if an unchanged file needs to be handled + HandlerRequiredFilters []Filter + // FileDecorators are applied to files as they are scanned. FileDecorators []Decorator + + // handlers are called after a file has been scanned. + FileHandlers []Handler + + // Rescan indicates whether files should be rescanned even if they haven't changed. + Rescan bool + + folderPathToID sync.Map } // FingerprintCalculator calculates a fingerprint for the provided file. @@ -91,257 +98,18 @@ func (d *FilteredDecorator) IsMissingMetadata(ctx context.Context, fs models.FS, return false } -// ProgressReporter is used to report progress of the scan. -type ProgressReporter interface { - AddTotal(total int) - Increment() - Definite() - ExecuteTask(description string, fn func()) -} - -type scanJob struct { - *Scanner - - // handlers are called after a file has been scanned. - handlers []Handler - - ProgressReports ProgressReporter - options ScanOptions - - startTime time.Time - fileQueue chan scanFile - retryList []scanFile - retrying bool - folderPathToID sync.Map - zipPathToID sync.Map - count int - - txnRetryer txn.Retryer -} - -// ScanOptions provides options for scanning files. -type ScanOptions struct { - Paths []string - - // ZipFileExtensions is a list of file extensions that are considered zip files. - // Extension does not include the . character. - ZipFileExtensions []string - - // ScanFilters are used to determine if a file should be scanned. - ScanFilters []PathFilter - - // HandlerRequiredFilters are used to determine if an unchanged file needs to be handled - HandlerRequiredFilters []Filter - - ParallelTasks int - - // When true files in path will be rescanned even if they haven't changed - Rescan bool -} - -// Scan starts the scanning process. -func (s *Scanner) Scan(ctx context.Context, handlers []Handler, options ScanOptions, progressReporter ProgressReporter) { - job := &scanJob{ - Scanner: s, - handlers: handlers, - ProgressReports: progressReporter, - options: options, - txnRetryer: txn.Retryer{ - Manager: s.Repository.TxnManager, - Retries: maxRetries, - }, - } - - job.execute(ctx) -} - -type scanFile struct { +// ScannedFile represents a file being scanned. +type ScannedFile struct { *models.BaseFile - fs models.FS - info fs.FileInfo + FS models.FS + Info fs.FileInfo } -func (s *scanJob) withTxn(ctx context.Context, fn func(ctx context.Context) error) error { - return s.txnRetryer.WithTxn(ctx, fn) -} - -func (s *scanJob) withDB(ctx context.Context, fn func(ctx context.Context) error) error { - return s.Repository.WithDB(ctx, fn) -} - -func (s *scanJob) execute(ctx context.Context) { - paths := s.options.Paths - logger.Infof("scanning %d paths", len(paths)) - s.startTime = time.Now() - - s.fileQueue = make(chan scanFile, scanQueueSize) - var wg sync.WaitGroup - wg.Add(1) - - go func() { - defer func() { - wg.Done() - - // handle panics in goroutine - if p := recover(); p != nil { - logger.Errorf("panic while queuing files for scan: %v", p) - logger.Errorf(string(debug.Stack())) - } - }() - - if err := s.queueFiles(ctx, paths); err != nil { - if errors.Is(err, context.Canceled) { - return - } - - logger.Errorf("error queuing files for scan: %v", err) - return - } - - logger.Infof("Finished adding files to queue. %d files queued", s.count) - }() - - defer wg.Wait() - - if err := s.processQueue(ctx); err != nil { - if errors.Is(err, context.Canceled) { - return - } - - logger.Errorf("error scanning files: %v", err) - return - } -} - -func (s *scanJob) queueFiles(ctx context.Context, paths []string) error { - defer func() { - close(s.fileQueue) - - if s.ProgressReports != nil { - s.ProgressReports.AddTotal(s.count) - s.ProgressReports.Definite() - } - }() - - var err error - s.ProgressReports.ExecuteTask("Walking directory tree", func() { - for _, p := range paths { - err = symWalk(s.FS, p, s.queueFileFunc(ctx, s.FS, nil)) - if err != nil { - return - } - } - }) - - return err -} - -func (s *scanJob) queueFileFunc(ctx context.Context, f models.FS, zipFile *scanFile) fs.WalkDirFunc { - return func(path string, d fs.DirEntry, err error) error { - if err != nil { - // don't let errors prevent scanning - logger.Errorf("error scanning %s: %v", path, err) - return nil - } - - if err = ctx.Err(); err != nil { - return err - } - - info, err := d.Info() - if err != nil { - logger.Errorf("reading info for %q: %v", path, err) - return nil - } - - if !s.acceptEntry(ctx, path, info) { - if info.IsDir() { - return fs.SkipDir - } - - return nil - } - - size, err := getFileSize(f, path, info) - if err != nil { - return err - } - - ff := scanFile{ - BaseFile: &models.BaseFile{ - DirEntry: models.DirEntry{ - ModTime: modTime(info), - }, - Path: path, - Basename: filepath.Base(path), - Size: size, - }, - fs: f, - info: info, - } - - if zipFile != nil { - zipFileID, err := s.getZipFileID(ctx, zipFile) - if err != nil { - return err - } - ff.ZipFileID = zipFileID - ff.ZipFile = zipFile - } - - if info.IsDir() { - // handle folders immediately - if err := s.handleFolder(ctx, ff); err != nil { - if !errors.Is(err, context.Canceled) { - logger.Errorf("error processing %q: %v", path, err) - } - - // skip the directory since we won't be able to process the files anyway - return fs.SkipDir - } - - return nil - } - - // if zip file is present, we handle immediately - if zipFile != nil { - s.ProgressReports.ExecuteTask("Scanning "+path, func() { - if err := s.handleFile(ctx, ff); err != nil { - if !errors.Is(err, context.Canceled) { - logger.Errorf("error processing %q: %v", path, err) - } - // don't return an error, just skip the file - } - }) - - return nil - } - - s.fileQueue <- ff - - s.count++ - - return nil - } -} - -func getFileSize(f models.FS, path string, info fs.FileInfo) (int64, error) { - // #2196/#3042 - replace size with target size if file is a symlink - if info.Mode()&os.ModeSymlink == os.ModeSymlink { - targetInfo, err := f.Stat(path) - if err != nil { - return 0, fmt.Errorf("reading info for symlink %q: %w", path, err) - } - return targetInfo.Size(), nil - } - - return info.Size(), nil -} - -func (s *scanJob) acceptEntry(ctx context.Context, path string, info fs.FileInfo) bool { +// AcceptEntry determines if the file entry should be accepted for scanning +func (s *Scanner) AcceptEntry(ctx context.Context, path string, info fs.FileInfo) bool { // always accept if there's no filters - accept := len(s.options.ScanFilters) == 0 - for _, filter := range s.options.ScanFilters { + accept := len(s.ScanFilters) == 0 + for _, filter := range s.ScanFilters { // accept if any filter accepts the file if filter.Accept(ctx, path, info) { accept = true @@ -352,102 +120,7 @@ func (s *scanJob) acceptEntry(ctx context.Context, path string, info fs.FileInfo return accept } -func (s *scanJob) scanZipFile(ctx context.Context, f scanFile) error { - zipFS, err := f.fs.OpenZip(f.Path, f.Size) - if err != nil { - if errors.Is(err, errNotReaderAt) { - // can't walk the zip file - // just return - return nil - } - - return err - } - - defer zipFS.Close() - - return symWalk(zipFS, f.Path, s.queueFileFunc(ctx, zipFS, &f)) -} - -func (s *scanJob) processQueue(ctx context.Context) error { - parallelTasks := s.options.ParallelTasks - if parallelTasks < 1 { - parallelTasks = 1 - } - - wg := sizedwaitgroup.New(parallelTasks) - - if err := func() error { - defer wg.Wait() - - for f := range s.fileQueue { - if err := ctx.Err(); err != nil { - return err - } - - wg.Add() - ff := f - go func() { - defer wg.Done() - s.processQueueItem(ctx, ff) - }() - } - - return nil - }(); err != nil { - return err - } - - s.retrying = true - - if err := func() error { - defer wg.Wait() - - for _, f := range s.retryList { - if err := ctx.Err(); err != nil { - return err - } - - wg.Add() - ff := f - go func() { - defer wg.Done() - s.processQueueItem(ctx, ff) - }() - } - - return nil - }(); err != nil { - return err - } - - return nil -} - -func (s *scanJob) incrementProgress(f scanFile) { - // don't increment for files inside zip files since these aren't - // counted during the initial walking - if s.ProgressReports != nil && f.ZipFile == nil { - s.ProgressReports.Increment() - } -} - -func (s *scanJob) processQueueItem(ctx context.Context, f scanFile) { - s.ProgressReports.ExecuteTask("Scanning "+f.Path, func() { - var err error - if f.info.IsDir() { - err = s.handleFolder(ctx, f) - } else { - err = s.handleFile(ctx, f) - } - - if err != nil && !errors.Is(err, context.Canceled) { - logger.Errorf("error processing %q: %v", f.Path, err) - } - }) -} - -func (s *scanJob) getFolderID(ctx context.Context, path string) (*models.FolderID, error) { +func (s *Scanner) getFolderID(ctx context.Context, path string) (*models.FolderID, error) { // check the folder cache first if f, ok := s.folderPathToID.Load(path); ok { v := f.(models.FolderID) @@ -470,48 +143,17 @@ func (s *scanJob) getFolderID(ctx context.Context, path string) (*models.FolderI return &ret.ID, nil } -func (s *scanJob) getZipFileID(ctx context.Context, zipFile *scanFile) (*models.FileID, error) { - if zipFile == nil { - return nil, nil - } - - if zipFile.ID != 0 { - return &zipFile.ID, nil - } - - path := zipFile.Path - - // check the folder cache first - if f, ok := s.zipPathToID.Load(path); ok { - v := f.(models.FileID) - return &v, nil - } - - // assume case sensitive when searching for the zip file - const caseSensitive = true - - ret, err := s.Repository.File.FindByPath(ctx, path, caseSensitive) - if err != nil { - return nil, fmt.Errorf("getting zip file ID for %q: %w", path, err) - } - - if ret == nil { - return nil, fmt.Errorf("zip file %q doesn't exist in database", zipFile.Path) - } - - s.zipPathToID.Store(path, ret.Base().ID) - return &ret.Base().ID, nil -} - -func (s *scanJob) handleFolder(ctx context.Context, file scanFile) error { +// ScanFolder scans the provided folder into the database, returning the folder entry. +// If the folder already exists, it is updated if necessary. +func (s *Scanner) ScanFolder(ctx context.Context, file ScannedFile) (*models.Folder, error) { + var f *models.Folder + var err error path := file.Path - return s.withTxn(ctx, func(ctx context.Context) error { - defer s.incrementProgress(file) - + err = s.Repository.WithTxn(ctx, func(ctx context.Context) error { // determine if folder already exists in data store (by path) // assume case sensitive by default - f, err := s.Repository.Folder.FindByPath(ctx, path, true) + f, err = s.Repository.Folder.FindByPath(ctx, path, true) if err != nil { return fmt.Errorf("checking for existing folder %q: %w", path, err) } @@ -520,7 +162,7 @@ func (s *scanJob) handleFolder(ctx context.Context, file scanFile) error { // case insensitive searching // assume case sensitive if in zip if f == nil && file.ZipFileID == nil { - caseSensitive, _ := file.fs.IsPathCaseSensitive(file.Path) + caseSensitive, _ := file.FS.IsPathCaseSensitive(file.Path) if !caseSensitive { f, err = s.Repository.Folder.FindByPath(ctx, path, false) @@ -547,9 +189,11 @@ func (s *scanJob) handleFolder(ctx context.Context, file scanFile) error { return nil }) + + return f, err } -func (s *scanJob) onNewFolder(ctx context.Context, file scanFile) (*models.Folder, error) { +func (s *Scanner) onNewFolder(ctx context.Context, file ScannedFile) (*models.Folder, error) { renamed, err := s.handleFolderRename(ctx, file) if err != nil { return nil, err @@ -596,7 +240,7 @@ func (s *scanJob) onNewFolder(ctx context.Context, file scanFile) (*models.Folde return toCreate, nil } -func (s *scanJob) handleFolderRename(ctx context.Context, file scanFile) (*models.Folder, error) { +func (s *Scanner) handleFolderRename(ctx context.Context, file ScannedFile) (*models.Folder, error) { // ignore folders in zip files if file.ZipFileID != nil { return nil, nil @@ -637,7 +281,7 @@ func (s *scanJob) handleFolderRename(ctx context.Context, file scanFile) (*model return renamedFrom, nil } -func (s *scanJob) onExistingFolder(ctx context.Context, f scanFile, existing *models.Folder) (*models.Folder, error) { +func (s *Scanner) onExistingFolder(ctx context.Context, f ScannedFile, existing *models.Folder) (*models.Folder, error) { update := false // update if mod time is changed @@ -678,22 +322,22 @@ func (s *scanJob) onExistingFolder(ctx context.Context, f scanFile, existing *mo return existing, nil } -func modTime(info fs.FileInfo) time.Time { - // truncate to seconds, since we don't store beyond that in the database - return info.ModTime().Truncate(time.Second) +type ScanFileResult struct { + File models.File + New bool + Renamed bool + Updated bool } -func (s *scanJob) handleFile(ctx context.Context, f scanFile) error { - defer s.incrementProgress(f) - - var ff models.File +// ScanFile scans the provided file into the database, returning the scan result. +func (s *Scanner) ScanFile(ctx context.Context, f ScannedFile) (*ScanFileResult, error) { + var r *ScanFileResult // don't use a transaction to check if new or existing - if err := s.withDB(ctx, func(ctx context.Context) error { + if err := s.Repository.WithDB(ctx, func(ctx context.Context) error { // determine if file already exists in data store // assume case sensitive when searching for the file to begin with - var err error - ff, err = s.Repository.File.FindByPath(ctx, f.Path, true) + ff, err := s.Repository.File.FindByPath(ctx, f.Path, true) if err != nil { return fmt.Errorf("checking for existing file %q: %w", f.Path, err) } @@ -702,7 +346,7 @@ func (s *scanJob) handleFile(ctx context.Context, f scanFile) error { // case insensitive search // assume case sensitive if in zip if ff == nil && f.ZipFileID != nil { - caseSensitive, _ := f.fs.IsPathCaseSensitive(f.Path) + caseSensitive, _ := f.FS.IsPathCaseSensitive(f.Path) if !caseSensitive { ff, err = s.Repository.File.FindByPath(ctx, f.Path, false) @@ -714,35 +358,23 @@ func (s *scanJob) handleFile(ctx context.Context, f scanFile) error { if ff == nil { // returns a file only if it is actually new - ff, err = s.onNewFile(ctx, f) + r, err = s.onNewFile(ctx, f) return err } - ff, err = s.onExistingFile(ctx, f, ff) + r, err = s.onExistingFile(ctx, f, ff) return err }); err != nil { - return err + return nil, err } - if ff != nil && s.isZipFile(f.info.Name()) { - f.BaseFile = ff.Base() - - // scan zip files with a different context that is not cancellable - // cancelling while scanning zip file contents results in the scan - // contents being partially completed - zipCtx := context.WithoutCancel(ctx) - - if err := s.scanZipFile(zipCtx, f); err != nil { - logger.Errorf("Error scanning zip file %q: %v", f.Path, err) - } - } - - return nil + return r, nil } -func (s *scanJob) isZipFile(path string) bool { +// IsZipFile determines if the provided path is a zip file based on its extension. +func (s *Scanner) IsZipFile(path string) bool { fExt := filepath.Ext(path) - for _, ext := range s.options.ZipFileExtensions { + for _, ext := range s.ZipFileExtensions { if strings.EqualFold(fExt, "."+ext) { return true } @@ -751,7 +383,7 @@ func (s *scanJob) isZipFile(path string) bool { return false } -func (s *scanJob) onNewFile(ctx context.Context, f scanFile) (models.File, error) { +func (s *Scanner) onNewFile(ctx context.Context, f ScannedFile) (*ScanFileResult, error) { now := time.Now() baseFile := f.BaseFile @@ -767,28 +399,20 @@ func (s *scanJob) onNewFile(ctx context.Context, f scanFile) (models.File, error } if parentFolderID == nil { - // if parent folder doesn't exist, assume it's not yet created - // add this file to the queue to be created later - if s.retrying { - // if we're retrying and the folder still doesn't exist, then it's a problem - return nil, fmt.Errorf("parent folder for %q doesn't exist", path) - } - - s.retryList = append(s.retryList, f) - return nil, nil + return nil, fmt.Errorf("parent folder for %q doesn't exist", path) } baseFile.ParentFolderID = *parentFolderID const useExisting = false - fp, err := s.calculateFingerprints(f.fs, baseFile, path, useExisting) + fp, err := s.calculateFingerprints(f.FS, baseFile, path, useExisting) if err != nil { return nil, err } baseFile.SetFingerprints(fp) - file, err := s.fireDecorators(ctx, f.fs, baseFile) + file, err := s.fireDecorators(ctx, f.FS, baseFile) if err != nil { return nil, err } @@ -801,14 +425,17 @@ func (s *scanJob) onNewFile(ctx context.Context, f scanFile) (models.File, error } if renamed != nil { + return &ScanFileResult{ + File: renamed, + Renamed: true, + }, nil // handle rename should have already handled the contents of the zip file // so shouldn't need to scan it again // return nil so it doesn't - return nil, nil } // if not renamed, queue file for creation - if err := s.withTxn(ctx, func(ctx context.Context) error { + if err := s.Repository.WithTxn(ctx, func(ctx context.Context) error { if err := s.Repository.File.Create(ctx, file); err != nil { return fmt.Errorf("creating file %q: %w", path, err) } @@ -822,10 +449,13 @@ func (s *scanJob) onNewFile(ctx context.Context, f scanFile) (models.File, error return nil, err } - return file, nil + return &ScanFileResult{ + File: file, + New: true, + }, nil } -func (s *scanJob) fireDecorators(ctx context.Context, fs models.FS, f models.File) (models.File, error) { +func (s *Scanner) fireDecorators(ctx context.Context, fs models.FS, f models.File) (models.File, error) { for _, h := range s.FileDecorators { var err error f, err = h.Decorate(ctx, fs, f) @@ -837,8 +467,8 @@ func (s *scanJob) fireDecorators(ctx context.Context, fs models.FS, f models.Fil return f, nil } -func (s *scanJob) fireHandlers(ctx context.Context, f models.File, oldFile models.File) error { - for _, h := range s.handlers { +func (s *Scanner) fireHandlers(ctx context.Context, f models.File, oldFile models.File) error { + for _, h := range s.FileHandlers { if err := h.Handle(ctx, f, oldFile); err != nil { return err } @@ -847,7 +477,7 @@ func (s *scanJob) fireHandlers(ctx context.Context, f models.File, oldFile model return nil } -func (s *scanJob) calculateFingerprints(fs models.FS, f *models.BaseFile, path string, useExisting bool) (models.Fingerprints, error) { +func (s *Scanner) calculateFingerprints(fs models.FS, f *models.BaseFile, path string, useExisting bool) (models.Fingerprints, error) { // only log if we're (re)calculating fingerprints if !useExisting { logger.Infof("Calculating fingerprints for %s ...", path) @@ -884,7 +514,7 @@ func appendFileUnique(v []models.File, toAdd []models.File) []models.File { return v } -func (s *scanJob) getFileFS(f *models.BaseFile) (models.FS, error) { +func (s *Scanner) getFileFS(f *models.BaseFile) (models.FS, error) { if f.ZipFile == nil { return s.FS, nil } @@ -899,7 +529,7 @@ func (s *scanJob) getFileFS(f *models.BaseFile) (models.FS, error) { return fs.OpenZip(zipPath, zipSize) } -func (s *scanJob) handleRename(ctx context.Context, f models.File, fp []models.Fingerprint) (models.File, error) { +func (s *Scanner) handleRename(ctx context.Context, f models.File, fp []models.Fingerprint) (models.File, error) { var others []models.File for _, tfp := range fp { @@ -941,7 +571,7 @@ func (s *scanJob) handleRename(ctx context.Context, f models.File, fp []models.F // treat as a move missing = append(missing, other) } - case !s.acceptEntry(ctx, other.Base().Path, info): + case !s.AcceptEntry(ctx, other.Base().Path, info): // #4393 - if the file is no longer in the configured library paths, treat it as a move logger.Debugf("File %q no longer in library paths. Treating as a move.", other.Base().Path) missing = append(missing, other) @@ -974,12 +604,12 @@ func (s *scanJob) handleRename(ctx context.Context, f models.File, fp []models.F fBaseCopy.Fingerprints = updatedBase.Fingerprints *updatedBase = fBaseCopy - if err := s.withTxn(ctx, func(ctx context.Context) error { + if err := s.Repository.WithTxn(ctx, func(ctx context.Context) error { if err := s.Repository.File.Update(ctx, updated); err != nil { return fmt.Errorf("updating file for rename %q: %w", newPath, err) } - if s.isZipFile(updatedBase.Basename) { + if s.IsZipFile(updatedBase.Basename) { if err := transferZipHierarchy(ctx, s.Repository.Folder, s.Repository.File, updatedBase.ID, oldPath, newPath); err != nil { return fmt.Errorf("moving zip hierarchy for renamed zip file %q: %w", newPath, err) } @@ -997,9 +627,9 @@ func (s *scanJob) handleRename(ctx context.Context, f models.File, fp []models.F return updated, nil } -func (s *scanJob) isHandlerRequired(ctx context.Context, f models.File) bool { - accept := len(s.options.HandlerRequiredFilters) == 0 - for _, filter := range s.options.HandlerRequiredFilters { +func (s *Scanner) isHandlerRequired(ctx context.Context, f models.File) bool { + accept := len(s.HandlerRequiredFilters) == 0 + for _, filter := range s.HandlerRequiredFilters { // accept if any filter accepts the file if filter.Accept(ctx, f) { accept = true @@ -1018,9 +648,9 @@ func (s *scanJob) isHandlerRequired(ctx context.Context, f models.File) bool { // - file size // - image format, width or height // - video codec, audio codec, format, width, height, framerate or bitrate -func (s *scanJob) isMissingMetadata(ctx context.Context, f scanFile, existing models.File) bool { +func (s *Scanner) isMissingMetadata(ctx context.Context, f ScannedFile, existing models.File) bool { for _, h := range s.FileDecorators { - if h.IsMissingMetadata(ctx, f.fs, existing) { + if h.IsMissingMetadata(ctx, f.FS, existing) { return true } } @@ -1028,20 +658,20 @@ func (s *scanJob) isMissingMetadata(ctx context.Context, f scanFile, existing mo return false } -func (s *scanJob) setMissingMetadata(ctx context.Context, f scanFile, existing models.File) (models.File, error) { +func (s *Scanner) setMissingMetadata(ctx context.Context, f ScannedFile, existing models.File) (models.File, error) { path := existing.Base().Path logger.Infof("Updating metadata for %s", path) existing.Base().Size = f.Size var err error - existing, err = s.fireDecorators(ctx, f.fs, existing) + existing, err = s.fireDecorators(ctx, f.FS, existing) if err != nil { return nil, err } // queue file for update - if err := s.withTxn(ctx, func(ctx context.Context) error { + if err := s.Repository.WithTxn(ctx, func(ctx context.Context) error { if err := s.Repository.File.Update(ctx, existing); err != nil { return fmt.Errorf("updating file %q: %w", path, err) } @@ -1054,9 +684,9 @@ func (s *scanJob) setMissingMetadata(ctx context.Context, f scanFile, existing m return existing, nil } -func (s *scanJob) setMissingFingerprints(ctx context.Context, f scanFile, existing models.File) (models.File, error) { +func (s *Scanner) setMissingFingerprints(ctx context.Context, f ScannedFile, existing models.File) (models.File, error) { const useExisting = true - fp, err := s.calculateFingerprints(f.fs, existing.Base(), f.Path, useExisting) + fp, err := s.calculateFingerprints(f.FS, existing.Base(), f.Path, useExisting) if err != nil { return nil, err } @@ -1064,7 +694,7 @@ func (s *scanJob) setMissingFingerprints(ctx context.Context, f scanFile, existi if fp.ContentsChanged(existing.Base().Fingerprints) { existing.SetFingerprints(fp) - if err := s.withTxn(ctx, func(ctx context.Context) error { + if err := s.Repository.WithTxn(ctx, func(ctx context.Context) error { if err := s.Repository.File.Update(ctx, existing); err != nil { return fmt.Errorf("updating file %q: %w", f.Path, err) } @@ -1079,14 +709,14 @@ func (s *scanJob) setMissingFingerprints(ctx context.Context, f scanFile, existi } // returns a file only if it was updated -func (s *scanJob) onExistingFile(ctx context.Context, f scanFile, existing models.File) (models.File, error) { +func (s *Scanner) onExistingFile(ctx context.Context, f ScannedFile, existing models.File) (*ScanFileResult, error) { base := existing.Base() path := base.Path fileModTime := f.ModTime // #6326 - also force a rescan if the basename changed updated := !fileModTime.Equal(base.ModTime) || base.Basename != f.Basename - forceRescan := s.options.Rescan + forceRescan := s.Rescan if !updated && !forceRescan { return s.onUnchangedFile(ctx, f, existing) @@ -1108,7 +738,7 @@ func (s *scanJob) onExistingFile(ctx context.Context, f scanFile, existing model // calculate and update fingerprints for the file const useExisting = false - fp, err := s.calculateFingerprints(f.fs, base, path, useExisting) + fp, err := s.calculateFingerprints(f.FS, base, path, useExisting) if err != nil { return nil, err } @@ -1116,13 +746,13 @@ func (s *scanJob) onExistingFile(ctx context.Context, f scanFile, existing model s.removeOutdatedFingerprints(existing, fp) existing.SetFingerprints(fp) - existing, err = s.fireDecorators(ctx, f.fs, existing) + existing, err = s.fireDecorators(ctx, f.FS, existing) if err != nil { return nil, err } // queue file for update - if err := s.withTxn(ctx, func(ctx context.Context) error { + if err := s.Repository.WithTxn(ctx, func(ctx context.Context) error { if err := s.Repository.File.Update(ctx, existing); err != nil { return fmt.Errorf("updating file %q: %w", path, err) } @@ -1135,11 +765,13 @@ func (s *scanJob) onExistingFile(ctx context.Context, f scanFile, existing model }); err != nil { return nil, err } - - return existing, nil + return &ScanFileResult{ + File: existing, + Updated: true, + }, nil } -func (s *scanJob) removeOutdatedFingerprints(existing models.File, fp models.Fingerprints) { +func (s *Scanner) removeOutdatedFingerprints(existing models.File, fp models.Fingerprints) { // HACK - if no MD5 fingerprint was returned, and the oshash is changed // then remove the MD5 fingerprint oshash := fp.For(models.FingerprintTypeOshash) @@ -1167,7 +799,7 @@ func (s *scanJob) removeOutdatedFingerprints(existing models.File, fp models.Fin } // returns a file only if it was updated -func (s *scanJob) onUnchangedFile(ctx context.Context, f scanFile, existing models.File) (models.File, error) { +func (s *Scanner) onUnchangedFile(ctx context.Context, f ScannedFile, existing models.File) (*ScanFileResult, error) { var err error isMissingMetdata := s.isMissingMetadata(ctx, f, existing) @@ -1186,7 +818,7 @@ func (s *scanJob) onUnchangedFile(ctx context.Context, f scanFile, existing mode } handlerRequired := false - if err := s.withDB(ctx, func(ctx context.Context) error { + if err := s.Repository.WithDB(ctx, func(ctx context.Context) error { // check if the handler needs to be run handlerRequired = s.isHandlerRequired(ctx, existing) return nil @@ -1196,15 +828,20 @@ func (s *scanJob) onUnchangedFile(ctx context.Context, f scanFile, existing mode if !handlerRequired { // if this file is a zip file, then we need to rescan the contents - // as well. We do this by returning the file, instead of nil. + // as well. We do this by indicating that the file is updated. if isMissingMetdata { - return existing, nil + return &ScanFileResult{ + File: existing, + Updated: true, + }, nil } - return nil, nil + return &ScanFileResult{ + File: existing, + }, nil } - if err := s.withTxn(ctx, func(ctx context.Context) error { + if err := s.Repository.WithTxn(ctx, func(ctx context.Context) error { if err := s.fireHandlers(ctx, existing, nil); err != nil { return err } @@ -1215,6 +852,9 @@ func (s *scanJob) onUnchangedFile(ctx context.Context, f scanFile, existing mode } // if this file is a zip file, then we need to rescan the contents - // as well. We do this by returning the file, instead of nil. - return existing, nil + // as well. We do this by indicating that the file is updated. + return &ScanFileResult{ + File: existing, + Updated: true, + }, nil } diff --git a/pkg/file/walk.go b/pkg/file/walk.go index 3c6a157b7..bd33f42c3 100644 --- a/pkg/file/walk.go +++ b/pkg/file/walk.go @@ -81,8 +81,8 @@ func walkSym(f models.FS, filename string, linkDirname string, walkFn fs.WalkDir return fsWalk(f, filename, symWalkFunc) } -// symWalk extends filepath.Walk to also follow symlinks -func symWalk(fs models.FS, path string, walkFn fs.WalkDirFunc) error { +// SymWalk extends filepath.Walk to also follow symlinks +func SymWalk(fs models.FS, path string, walkFn fs.WalkDirFunc) error { return walkSym(fs, path, path, walkFn) } diff --git a/pkg/file/zip.go b/pkg/file/zip.go index 4df2453dc..5afcd5329 100644 --- a/pkg/file/zip.go +++ b/pkg/file/zip.go @@ -18,7 +18,7 @@ import ( ) var ( - errNotReaderAt = errors.New("not a ReaderAt") + ErrNotReaderAt = errors.New("invalid reader: does not implement io.ReaderAt") errZipFSOpenZip = errors.New("cannot open zip file inside zip file") ) @@ -38,7 +38,7 @@ func newZipFS(fs models.FS, path string, size int64) (*zipFS, error) { asReaderAt, _ := reader.(io.ReaderAt) if asReaderAt == nil { reader.Close() - return nil, errNotReaderAt + return nil, ErrNotReaderAt } zipReader, err := zip.NewReader(asReaderAt, size)