mirror of
https://github.com/stashapp/stash.git
synced 2026-02-03 17:31:04 -06:00
Refactor file scanning and handling logic (#6498)
- Moved directory walking and queuing functionality into scan task code
This commit is contained in:
parent
244d70e20e
commit
d252a416d0
@ -100,6 +100,8 @@ func (s *Manager) Scan(ctx context.Context, input ScanMetadataInput) (int, error
|
||||
return 0, err
|
||||
}
|
||||
|
||||
cfg := config.GetInstance()
|
||||
|
||||
scanner := &file.Scanner{
|
||||
Repository: file.NewRepository(s.Repository),
|
||||
FileDecorators: []file.Decorator{
|
||||
@ -118,6 +120,10 @@ func (s *Manager) Scan(ctx context.Context, input ScanMetadataInput) (int, error
|
||||
},
|
||||
FingerprintCalculator: &fingerprintCalculator{s.Config},
|
||||
FS: &file.OsFS{},
|
||||
ZipFileExtensions: cfg.GetGalleryExtensions(),
|
||||
// ScanFilters is set in ScanJob.Execute
|
||||
// HandlerRequiredFilters is set in ScanJob.Execute
|
||||
Rescan: input.Rescan,
|
||||
}
|
||||
|
||||
scanJob := ScanJob{
|
||||
|
||||
@ -2,13 +2,17 @@ package manager
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/fs"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"runtime/debug"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/99designs/gqlgen/graphql/handler/lru"
|
||||
"github.com/remeh/sizedwaitgroup"
|
||||
"github.com/stashapp/stash/internal/manager/config"
|
||||
"github.com/stashapp/stash/pkg/file"
|
||||
"github.com/stashapp/stash/pkg/file/video"
|
||||
@ -24,14 +28,13 @@ import (
|
||||
"github.com/stashapp/stash/pkg/txn"
|
||||
)
|
||||
|
||||
type scanner interface {
|
||||
Scan(ctx context.Context, handlers []file.Handler, options file.ScanOptions, progressReporter file.ProgressReporter)
|
||||
}
|
||||
|
||||
type ScanJob struct {
|
||||
scanner scanner
|
||||
scanner *file.Scanner
|
||||
input ScanMetadataInput
|
||||
subscriptions *subscriptionManager
|
||||
|
||||
fileQueue chan file.ScannedFile
|
||||
count int
|
||||
}
|
||||
|
||||
func (j *ScanJob) Execute(ctx context.Context, progress *job.Progress) error {
|
||||
@ -55,22 +58,22 @@ func (j *ScanJob) Execute(ctx context.Context, progress *job.Progress) error {
|
||||
|
||||
start := time.Now()
|
||||
|
||||
nTasks := cfg.GetParallelTasksWithAutoDetection()
|
||||
|
||||
const taskQueueSize = 200000
|
||||
taskQueue := job.NewTaskQueue(ctx, progress, taskQueueSize, cfg.GetParallelTasksWithAutoDetection())
|
||||
taskQueue := job.NewTaskQueue(ctx, progress, taskQueueSize, nTasks)
|
||||
|
||||
var minModTime time.Time
|
||||
if j.input.Filter != nil && j.input.Filter.MinModTime != nil {
|
||||
minModTime = *j.input.Filter.MinModTime
|
||||
}
|
||||
|
||||
j.scanner.Scan(ctx, getScanHandlers(j.input, taskQueue, progress), file.ScanOptions{
|
||||
Paths: paths,
|
||||
ScanFilters: []file.PathFilter{newScanFilter(c, repo, minModTime)},
|
||||
ZipFileExtensions: cfg.GetGalleryExtensions(),
|
||||
ParallelTasks: cfg.GetParallelTasksWithAutoDetection(),
|
||||
HandlerRequiredFilters: []file.Filter{newHandlerRequiredFilter(cfg, repo)},
|
||||
Rescan: j.input.Rescan,
|
||||
}, progress)
|
||||
// HACK - these should really be set in the scanner initialization
|
||||
j.scanner.FileHandlers = getScanHandlers(j.input, taskQueue, progress)
|
||||
j.scanner.ScanFilters = []file.PathFilter{newScanFilter(c, repo, minModTime)}
|
||||
j.scanner.HandlerRequiredFilters = []file.Filter{newHandlerRequiredFilter(cfg, repo)}
|
||||
|
||||
j.runJob(ctx, paths, nTasks, progress)
|
||||
|
||||
taskQueue.Close()
|
||||
|
||||
@ -86,6 +89,264 @@ func (j *ScanJob) Execute(ctx context.Context, progress *job.Progress) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (j *ScanJob) runJob(ctx context.Context, paths []string, nTasks int, progress *job.Progress) {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
|
||||
j.fileQueue = make(chan file.ScannedFile, scanQueueSize)
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
wg.Done()
|
||||
|
||||
// handle panics in goroutine
|
||||
if p := recover(); p != nil {
|
||||
logger.Errorf("panic while queuing files for scan: %v", p)
|
||||
logger.Errorf(string(debug.Stack()))
|
||||
}
|
||||
}()
|
||||
|
||||
if err := j.queueFiles(ctx, paths, progress); err != nil {
|
||||
if errors.Is(err, context.Canceled) {
|
||||
return
|
||||
}
|
||||
|
||||
logger.Errorf("error queuing files for scan: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
logger.Infof("Finished adding files to queue. %d files queued", j.count)
|
||||
}()
|
||||
|
||||
defer wg.Wait()
|
||||
|
||||
j.processQueue(ctx, nTasks, progress)
|
||||
}
|
||||
|
||||
const scanQueueSize = 200000
|
||||
|
||||
func (j *ScanJob) queueFiles(ctx context.Context, paths []string, progress *job.Progress) error {
|
||||
fs := &file.OsFS{}
|
||||
|
||||
defer func() {
|
||||
close(j.fileQueue)
|
||||
|
||||
progress.AddTotal(j.count)
|
||||
progress.Definite()
|
||||
}()
|
||||
|
||||
var err error
|
||||
progress.ExecuteTask("Walking directory tree", func() {
|
||||
for _, p := range paths {
|
||||
err = file.SymWalk(fs, p, j.queueFileFunc(ctx, fs, nil, progress))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (j *ScanJob) queueFileFunc(ctx context.Context, f models.FS, zipFile *file.ScannedFile, progress *job.Progress) fs.WalkDirFunc {
|
||||
return func(path string, d fs.DirEntry, err error) error {
|
||||
if err != nil {
|
||||
// don't let errors prevent scanning
|
||||
logger.Errorf("error scanning %s: %v", path, err)
|
||||
return nil
|
||||
}
|
||||
|
||||
if err = ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
info, err := d.Info()
|
||||
if err != nil {
|
||||
logger.Errorf("reading info for %q: %v", path, err)
|
||||
return nil
|
||||
}
|
||||
|
||||
if !j.scanner.AcceptEntry(ctx, path, info) {
|
||||
if info.IsDir() {
|
||||
logger.Debugf("Skipping directory %s", path)
|
||||
return fs.SkipDir
|
||||
}
|
||||
|
||||
logger.Debugf("Skipping file %s", path)
|
||||
return nil
|
||||
}
|
||||
|
||||
size, err := file.GetFileSize(f, path, info)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ff := file.ScannedFile{
|
||||
BaseFile: &models.BaseFile{
|
||||
DirEntry: models.DirEntry{
|
||||
ModTime: file.ModTime(info),
|
||||
},
|
||||
Path: path,
|
||||
Basename: filepath.Base(path),
|
||||
Size: size,
|
||||
},
|
||||
FS: f,
|
||||
Info: info,
|
||||
}
|
||||
|
||||
if zipFile != nil {
|
||||
ff.ZipFileID = &zipFile.ID
|
||||
ff.ZipFile = zipFile
|
||||
}
|
||||
|
||||
if info.IsDir() {
|
||||
// handle folders immediately
|
||||
if err := j.handleFolder(ctx, ff, progress); err != nil {
|
||||
if !errors.Is(err, context.Canceled) {
|
||||
logger.Errorf("error processing %q: %v", path, err)
|
||||
}
|
||||
|
||||
// skip the directory since we won't be able to process the files anyway
|
||||
return fs.SkipDir
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// if zip file is present, we handle immediately
|
||||
if zipFile != nil {
|
||||
progress.ExecuteTask("Scanning "+path, func() {
|
||||
// don't increment progress in zip files
|
||||
if err := j.handleFile(ctx, ff, nil); err != nil {
|
||||
if !errors.Is(err, context.Canceled) {
|
||||
logger.Errorf("error processing %q: %v", path, err)
|
||||
}
|
||||
// don't return an error, just skip the file
|
||||
}
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
logger.Tracef("Queueing file %s for scanning", path)
|
||||
j.fileQueue <- ff
|
||||
|
||||
j.count++
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (j *ScanJob) processQueue(ctx context.Context, parallelTasks int, progress *job.Progress) {
|
||||
if parallelTasks < 1 {
|
||||
parallelTasks = 1
|
||||
}
|
||||
|
||||
wg := sizedwaitgroup.New(parallelTasks)
|
||||
|
||||
func() {
|
||||
defer func() {
|
||||
wg.Wait()
|
||||
|
||||
// handle panics in goroutine
|
||||
if p := recover(); p != nil {
|
||||
logger.Errorf("panic while scanning files: %v", p)
|
||||
logger.Errorf(string(debug.Stack()))
|
||||
}
|
||||
}()
|
||||
|
||||
for f := range j.fileQueue {
|
||||
logger.Tracef("Processing queued file %s", f.Path)
|
||||
if err := ctx.Err(); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
wg.Add()
|
||||
ff := f
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
j.processQueueItem(ctx, ff, progress)
|
||||
}()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (j *ScanJob) processQueueItem(ctx context.Context, f file.ScannedFile, progress *job.Progress) {
|
||||
progress.ExecuteTask("Scanning "+f.Path, func() {
|
||||
var err error
|
||||
if f.Info.IsDir() {
|
||||
err = j.handleFolder(ctx, f, progress)
|
||||
} else {
|
||||
err = j.handleFile(ctx, f, progress)
|
||||
}
|
||||
|
||||
if err != nil && !errors.Is(err, context.Canceled) {
|
||||
logger.Errorf("error processing %q: %v", f.Path, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (j *ScanJob) handleFolder(ctx context.Context, f file.ScannedFile, progress *job.Progress) error {
|
||||
if progress != nil {
|
||||
defer progress.Increment()
|
||||
}
|
||||
|
||||
_, err := j.scanner.ScanFolder(ctx, f)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (j *ScanJob) handleFile(ctx context.Context, f file.ScannedFile, progress *job.Progress) error {
|
||||
if progress != nil {
|
||||
defer progress.Increment()
|
||||
}
|
||||
|
||||
r, err := j.scanner.ScanFile(ctx, f)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// handle rename should have already handled the contents of the zip file
|
||||
// so shouldn't need to scan it again
|
||||
|
||||
if (r.New || r.Updated) && j.scanner.IsZipFile(f.Info.Name()) {
|
||||
ff := r.File
|
||||
f.BaseFile = ff.Base()
|
||||
|
||||
// scan zip files with a different context that is not cancellable
|
||||
// cancelling while scanning zip file contents results in the scan
|
||||
// contents being partially completed
|
||||
zipCtx := context.WithoutCancel(ctx)
|
||||
|
||||
if err := j.scanZipFile(zipCtx, f, progress); err != nil {
|
||||
logger.Errorf("Error scanning zip file %q: %v", f.Path, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (j *ScanJob) scanZipFile(ctx context.Context, f file.ScannedFile, progress *job.Progress) error {
|
||||
zipFS, err := f.FS.OpenZip(f.Path, f.Size)
|
||||
if err != nil {
|
||||
if errors.Is(err, file.ErrNotReaderAt) {
|
||||
// can't walk the zip file
|
||||
// just return
|
||||
logger.Debugf("Skipping zip file %q as it cannot be opened for walking", f.Path)
|
||||
return nil
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
defer zipFS.Close()
|
||||
|
||||
return file.SymWalk(zipFS, f.Path, j.queueFileFunc(ctx, zipFS, &f, progress))
|
||||
}
|
||||
|
||||
type extensionConfig struct {
|
||||
vidExt []string
|
||||
imgExt []string
|
||||
|
||||
@ -3,6 +3,10 @@ package file
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io/fs"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/stashapp/stash/pkg/models"
|
||||
"github.com/stashapp/stash/pkg/txn"
|
||||
@ -35,3 +39,23 @@ func (r *Repository) WithReadTxn(ctx context.Context, fn txn.TxnFunc) error {
|
||||
func (r *Repository) WithDB(ctx context.Context, fn txn.TxnFunc) error {
|
||||
return txn.WithDatabase(ctx, r.TxnManager, fn)
|
||||
}
|
||||
|
||||
// ModTime returns the modification time truncated to seconds.
|
||||
func ModTime(info fs.FileInfo) time.Time {
|
||||
// truncate to seconds, since we don't store beyond that in the database
|
||||
return info.ModTime().Truncate(time.Second)
|
||||
}
|
||||
|
||||
// GetFileSize gets the size of the file, taking into account symlinks.
|
||||
func GetFileSize(f models.FS, path string, info fs.FileInfo) (int64, error) {
|
||||
// #2196/#3042 - replace size with target size if file is a symlink
|
||||
if info.Mode()&os.ModeSymlink == os.ModeSymlink {
|
||||
targetInfo, err := f.Stat(path)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("reading info for symlink %q: %w", path, err)
|
||||
}
|
||||
return targetInfo.Size(), nil
|
||||
}
|
||||
|
||||
return info.Size(), nil
|
||||
}
|
||||
|
||||
@ -75,7 +75,7 @@ func (d *folderRenameDetector) bestCandidate() *models.Folder {
|
||||
return best.folder
|
||||
}
|
||||
|
||||
func (s *scanJob) detectFolderMove(ctx context.Context, file scanFile) (*models.Folder, error) {
|
||||
func (s *Scanner) detectFolderMove(ctx context.Context, file ScannedFile) (*models.Folder, error) {
|
||||
// in order for a folder to be considered moved, the existing folder must be
|
||||
// missing, and the majority of the old folder's files must be present, unchanged,
|
||||
// in the new folder.
|
||||
@ -88,7 +88,7 @@ func (s *scanJob) detectFolderMove(ctx context.Context, file scanFile) (*models.
|
||||
|
||||
r := s.Repository
|
||||
|
||||
if err := symWalk(file.fs, file.Path, func(path string, d fs.DirEntry, err error) error {
|
||||
if err := SymWalk(file.FS, file.Path, func(path string, d fs.DirEntry, err error) error {
|
||||
if err != nil {
|
||||
// don't let errors prevent scanning
|
||||
logger.Errorf("error scanning %s: %v", path, err)
|
||||
@ -111,11 +111,11 @@ func (s *scanJob) detectFolderMove(ctx context.Context, file scanFile) (*models.
|
||||
return nil
|
||||
}
|
||||
|
||||
if !s.acceptEntry(ctx, path, info) {
|
||||
if !s.AcceptEntry(ctx, path, info) {
|
||||
return nil
|
||||
}
|
||||
|
||||
size, err := getFileSize(file.fs, path, info)
|
||||
size, err := GetFileSize(file.FS, path, info)
|
||||
if err != nil {
|
||||
return fmt.Errorf("getting file size for %q: %w", path, err)
|
||||
}
|
||||
@ -154,7 +154,7 @@ func (s *scanJob) detectFolderMove(ctx context.Context, file scanFile) (*models.
|
||||
}
|
||||
|
||||
// parent folder must be missing
|
||||
_, err = file.fs.Lstat(pf.Path)
|
||||
_, err = file.FS.Lstat(pf.Path)
|
||||
if err == nil {
|
||||
// parent folder exists, not a candidate
|
||||
detector.reject(parentFolderID)
|
||||
|
||||
594
pkg/file/scan.go
594
pkg/file/scan.go
@ -2,29 +2,18 @@ package file
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/fs"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime/debug"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/remeh/sizedwaitgroup"
|
||||
"github.com/stashapp/stash/pkg/logger"
|
||||
"github.com/stashapp/stash/pkg/models"
|
||||
"github.com/stashapp/stash/pkg/txn"
|
||||
)
|
||||
|
||||
const (
|
||||
scanQueueSize = 200000
|
||||
// maximum number of times to retry in the event of a locked database
|
||||
// use -1 to retry forever
|
||||
maxRetries = -1
|
||||
)
|
||||
|
||||
// Scanner scans files into the database.
|
||||
//
|
||||
// The scan process works using two goroutines. The first walks through the provided paths
|
||||
@ -55,8 +44,26 @@ type Scanner struct {
|
||||
Repository Repository
|
||||
FingerprintCalculator FingerprintCalculator
|
||||
|
||||
// ZipFileExtensions is a list of file extensions that are considered zip files.
|
||||
// Extension does not include the . character.
|
||||
ZipFileExtensions []string
|
||||
|
||||
// ScanFilters are used to determine if a file should be scanned.
|
||||
ScanFilters []PathFilter
|
||||
|
||||
// HandlerRequiredFilters are used to determine if an unchanged file needs to be handled
|
||||
HandlerRequiredFilters []Filter
|
||||
|
||||
// FileDecorators are applied to files as they are scanned.
|
||||
FileDecorators []Decorator
|
||||
|
||||
// handlers are called after a file has been scanned.
|
||||
FileHandlers []Handler
|
||||
|
||||
// Rescan indicates whether files should be rescanned even if they haven't changed.
|
||||
Rescan bool
|
||||
|
||||
folderPathToID sync.Map
|
||||
}
|
||||
|
||||
// FingerprintCalculator calculates a fingerprint for the provided file.
|
||||
@ -91,257 +98,18 @@ func (d *FilteredDecorator) IsMissingMetadata(ctx context.Context, fs models.FS,
|
||||
return false
|
||||
}
|
||||
|
||||
// ProgressReporter is used to report progress of the scan.
|
||||
type ProgressReporter interface {
|
||||
AddTotal(total int)
|
||||
Increment()
|
||||
Definite()
|
||||
ExecuteTask(description string, fn func())
|
||||
}
|
||||
|
||||
type scanJob struct {
|
||||
*Scanner
|
||||
|
||||
// handlers are called after a file has been scanned.
|
||||
handlers []Handler
|
||||
|
||||
ProgressReports ProgressReporter
|
||||
options ScanOptions
|
||||
|
||||
startTime time.Time
|
||||
fileQueue chan scanFile
|
||||
retryList []scanFile
|
||||
retrying bool
|
||||
folderPathToID sync.Map
|
||||
zipPathToID sync.Map
|
||||
count int
|
||||
|
||||
txnRetryer txn.Retryer
|
||||
}
|
||||
|
||||
// ScanOptions provides options for scanning files.
|
||||
type ScanOptions struct {
|
||||
Paths []string
|
||||
|
||||
// ZipFileExtensions is a list of file extensions that are considered zip files.
|
||||
// Extension does not include the . character.
|
||||
ZipFileExtensions []string
|
||||
|
||||
// ScanFilters are used to determine if a file should be scanned.
|
||||
ScanFilters []PathFilter
|
||||
|
||||
// HandlerRequiredFilters are used to determine if an unchanged file needs to be handled
|
||||
HandlerRequiredFilters []Filter
|
||||
|
||||
ParallelTasks int
|
||||
|
||||
// When true files in path will be rescanned even if they haven't changed
|
||||
Rescan bool
|
||||
}
|
||||
|
||||
// Scan starts the scanning process.
|
||||
func (s *Scanner) Scan(ctx context.Context, handlers []Handler, options ScanOptions, progressReporter ProgressReporter) {
|
||||
job := &scanJob{
|
||||
Scanner: s,
|
||||
handlers: handlers,
|
||||
ProgressReports: progressReporter,
|
||||
options: options,
|
||||
txnRetryer: txn.Retryer{
|
||||
Manager: s.Repository.TxnManager,
|
||||
Retries: maxRetries,
|
||||
},
|
||||
}
|
||||
|
||||
job.execute(ctx)
|
||||
}
|
||||
|
||||
type scanFile struct {
|
||||
// ScannedFile represents a file being scanned.
|
||||
type ScannedFile struct {
|
||||
*models.BaseFile
|
||||
fs models.FS
|
||||
info fs.FileInfo
|
||||
FS models.FS
|
||||
Info fs.FileInfo
|
||||
}
|
||||
|
||||
func (s *scanJob) withTxn(ctx context.Context, fn func(ctx context.Context) error) error {
|
||||
return s.txnRetryer.WithTxn(ctx, fn)
|
||||
}
|
||||
|
||||
func (s *scanJob) withDB(ctx context.Context, fn func(ctx context.Context) error) error {
|
||||
return s.Repository.WithDB(ctx, fn)
|
||||
}
|
||||
|
||||
func (s *scanJob) execute(ctx context.Context) {
|
||||
paths := s.options.Paths
|
||||
logger.Infof("scanning %d paths", len(paths))
|
||||
s.startTime = time.Now()
|
||||
|
||||
s.fileQueue = make(chan scanFile, scanQueueSize)
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
wg.Done()
|
||||
|
||||
// handle panics in goroutine
|
||||
if p := recover(); p != nil {
|
||||
logger.Errorf("panic while queuing files for scan: %v", p)
|
||||
logger.Errorf(string(debug.Stack()))
|
||||
}
|
||||
}()
|
||||
|
||||
if err := s.queueFiles(ctx, paths); err != nil {
|
||||
if errors.Is(err, context.Canceled) {
|
||||
return
|
||||
}
|
||||
|
||||
logger.Errorf("error queuing files for scan: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
logger.Infof("Finished adding files to queue. %d files queued", s.count)
|
||||
}()
|
||||
|
||||
defer wg.Wait()
|
||||
|
||||
if err := s.processQueue(ctx); err != nil {
|
||||
if errors.Is(err, context.Canceled) {
|
||||
return
|
||||
}
|
||||
|
||||
logger.Errorf("error scanning files: %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (s *scanJob) queueFiles(ctx context.Context, paths []string) error {
|
||||
defer func() {
|
||||
close(s.fileQueue)
|
||||
|
||||
if s.ProgressReports != nil {
|
||||
s.ProgressReports.AddTotal(s.count)
|
||||
s.ProgressReports.Definite()
|
||||
}
|
||||
}()
|
||||
|
||||
var err error
|
||||
s.ProgressReports.ExecuteTask("Walking directory tree", func() {
|
||||
for _, p := range paths {
|
||||
err = symWalk(s.FS, p, s.queueFileFunc(ctx, s.FS, nil))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *scanJob) queueFileFunc(ctx context.Context, f models.FS, zipFile *scanFile) fs.WalkDirFunc {
|
||||
return func(path string, d fs.DirEntry, err error) error {
|
||||
if err != nil {
|
||||
// don't let errors prevent scanning
|
||||
logger.Errorf("error scanning %s: %v", path, err)
|
||||
return nil
|
||||
}
|
||||
|
||||
if err = ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
info, err := d.Info()
|
||||
if err != nil {
|
||||
logger.Errorf("reading info for %q: %v", path, err)
|
||||
return nil
|
||||
}
|
||||
|
||||
if !s.acceptEntry(ctx, path, info) {
|
||||
if info.IsDir() {
|
||||
return fs.SkipDir
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
size, err := getFileSize(f, path, info)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ff := scanFile{
|
||||
BaseFile: &models.BaseFile{
|
||||
DirEntry: models.DirEntry{
|
||||
ModTime: modTime(info),
|
||||
},
|
||||
Path: path,
|
||||
Basename: filepath.Base(path),
|
||||
Size: size,
|
||||
},
|
||||
fs: f,
|
||||
info: info,
|
||||
}
|
||||
|
||||
if zipFile != nil {
|
||||
zipFileID, err := s.getZipFileID(ctx, zipFile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ff.ZipFileID = zipFileID
|
||||
ff.ZipFile = zipFile
|
||||
}
|
||||
|
||||
if info.IsDir() {
|
||||
// handle folders immediately
|
||||
if err := s.handleFolder(ctx, ff); err != nil {
|
||||
if !errors.Is(err, context.Canceled) {
|
||||
logger.Errorf("error processing %q: %v", path, err)
|
||||
}
|
||||
|
||||
// skip the directory since we won't be able to process the files anyway
|
||||
return fs.SkipDir
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// if zip file is present, we handle immediately
|
||||
if zipFile != nil {
|
||||
s.ProgressReports.ExecuteTask("Scanning "+path, func() {
|
||||
if err := s.handleFile(ctx, ff); err != nil {
|
||||
if !errors.Is(err, context.Canceled) {
|
||||
logger.Errorf("error processing %q: %v", path, err)
|
||||
}
|
||||
// don't return an error, just skip the file
|
||||
}
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
s.fileQueue <- ff
|
||||
|
||||
s.count++
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func getFileSize(f models.FS, path string, info fs.FileInfo) (int64, error) {
|
||||
// #2196/#3042 - replace size with target size if file is a symlink
|
||||
if info.Mode()&os.ModeSymlink == os.ModeSymlink {
|
||||
targetInfo, err := f.Stat(path)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("reading info for symlink %q: %w", path, err)
|
||||
}
|
||||
return targetInfo.Size(), nil
|
||||
}
|
||||
|
||||
return info.Size(), nil
|
||||
}
|
||||
|
||||
func (s *scanJob) acceptEntry(ctx context.Context, path string, info fs.FileInfo) bool {
|
||||
// AcceptEntry determines if the file entry should be accepted for scanning
|
||||
func (s *Scanner) AcceptEntry(ctx context.Context, path string, info fs.FileInfo) bool {
|
||||
// always accept if there's no filters
|
||||
accept := len(s.options.ScanFilters) == 0
|
||||
for _, filter := range s.options.ScanFilters {
|
||||
accept := len(s.ScanFilters) == 0
|
||||
for _, filter := range s.ScanFilters {
|
||||
// accept if any filter accepts the file
|
||||
if filter.Accept(ctx, path, info) {
|
||||
accept = true
|
||||
@ -352,102 +120,7 @@ func (s *scanJob) acceptEntry(ctx context.Context, path string, info fs.FileInfo
|
||||
return accept
|
||||
}
|
||||
|
||||
func (s *scanJob) scanZipFile(ctx context.Context, f scanFile) error {
|
||||
zipFS, err := f.fs.OpenZip(f.Path, f.Size)
|
||||
if err != nil {
|
||||
if errors.Is(err, errNotReaderAt) {
|
||||
// can't walk the zip file
|
||||
// just return
|
||||
return nil
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
defer zipFS.Close()
|
||||
|
||||
return symWalk(zipFS, f.Path, s.queueFileFunc(ctx, zipFS, &f))
|
||||
}
|
||||
|
||||
func (s *scanJob) processQueue(ctx context.Context) error {
|
||||
parallelTasks := s.options.ParallelTasks
|
||||
if parallelTasks < 1 {
|
||||
parallelTasks = 1
|
||||
}
|
||||
|
||||
wg := sizedwaitgroup.New(parallelTasks)
|
||||
|
||||
if err := func() error {
|
||||
defer wg.Wait()
|
||||
|
||||
for f := range s.fileQueue {
|
||||
if err := ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
wg.Add()
|
||||
ff := f
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
s.processQueueItem(ctx, ff)
|
||||
}()
|
||||
}
|
||||
|
||||
return nil
|
||||
}(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.retrying = true
|
||||
|
||||
if err := func() error {
|
||||
defer wg.Wait()
|
||||
|
||||
for _, f := range s.retryList {
|
||||
if err := ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
wg.Add()
|
||||
ff := f
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
s.processQueueItem(ctx, ff)
|
||||
}()
|
||||
}
|
||||
|
||||
return nil
|
||||
}(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *scanJob) incrementProgress(f scanFile) {
|
||||
// don't increment for files inside zip files since these aren't
|
||||
// counted during the initial walking
|
||||
if s.ProgressReports != nil && f.ZipFile == nil {
|
||||
s.ProgressReports.Increment()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *scanJob) processQueueItem(ctx context.Context, f scanFile) {
|
||||
s.ProgressReports.ExecuteTask("Scanning "+f.Path, func() {
|
||||
var err error
|
||||
if f.info.IsDir() {
|
||||
err = s.handleFolder(ctx, f)
|
||||
} else {
|
||||
err = s.handleFile(ctx, f)
|
||||
}
|
||||
|
||||
if err != nil && !errors.Is(err, context.Canceled) {
|
||||
logger.Errorf("error processing %q: %v", f.Path, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (s *scanJob) getFolderID(ctx context.Context, path string) (*models.FolderID, error) {
|
||||
func (s *Scanner) getFolderID(ctx context.Context, path string) (*models.FolderID, error) {
|
||||
// check the folder cache first
|
||||
if f, ok := s.folderPathToID.Load(path); ok {
|
||||
v := f.(models.FolderID)
|
||||
@ -470,48 +143,17 @@ func (s *scanJob) getFolderID(ctx context.Context, path string) (*models.FolderI
|
||||
return &ret.ID, nil
|
||||
}
|
||||
|
||||
func (s *scanJob) getZipFileID(ctx context.Context, zipFile *scanFile) (*models.FileID, error) {
|
||||
if zipFile == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if zipFile.ID != 0 {
|
||||
return &zipFile.ID, nil
|
||||
}
|
||||
|
||||
path := zipFile.Path
|
||||
|
||||
// check the folder cache first
|
||||
if f, ok := s.zipPathToID.Load(path); ok {
|
||||
v := f.(models.FileID)
|
||||
return &v, nil
|
||||
}
|
||||
|
||||
// assume case sensitive when searching for the zip file
|
||||
const caseSensitive = true
|
||||
|
||||
ret, err := s.Repository.File.FindByPath(ctx, path, caseSensitive)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("getting zip file ID for %q: %w", path, err)
|
||||
}
|
||||
|
||||
if ret == nil {
|
||||
return nil, fmt.Errorf("zip file %q doesn't exist in database", zipFile.Path)
|
||||
}
|
||||
|
||||
s.zipPathToID.Store(path, ret.Base().ID)
|
||||
return &ret.Base().ID, nil
|
||||
}
|
||||
|
||||
func (s *scanJob) handleFolder(ctx context.Context, file scanFile) error {
|
||||
// ScanFolder scans the provided folder into the database, returning the folder entry.
|
||||
// If the folder already exists, it is updated if necessary.
|
||||
func (s *Scanner) ScanFolder(ctx context.Context, file ScannedFile) (*models.Folder, error) {
|
||||
var f *models.Folder
|
||||
var err error
|
||||
path := file.Path
|
||||
|
||||
return s.withTxn(ctx, func(ctx context.Context) error {
|
||||
defer s.incrementProgress(file)
|
||||
|
||||
err = s.Repository.WithTxn(ctx, func(ctx context.Context) error {
|
||||
// determine if folder already exists in data store (by path)
|
||||
// assume case sensitive by default
|
||||
f, err := s.Repository.Folder.FindByPath(ctx, path, true)
|
||||
f, err = s.Repository.Folder.FindByPath(ctx, path, true)
|
||||
if err != nil {
|
||||
return fmt.Errorf("checking for existing folder %q: %w", path, err)
|
||||
}
|
||||
@ -520,7 +162,7 @@ func (s *scanJob) handleFolder(ctx context.Context, file scanFile) error {
|
||||
// case insensitive searching
|
||||
// assume case sensitive if in zip
|
||||
if f == nil && file.ZipFileID == nil {
|
||||
caseSensitive, _ := file.fs.IsPathCaseSensitive(file.Path)
|
||||
caseSensitive, _ := file.FS.IsPathCaseSensitive(file.Path)
|
||||
|
||||
if !caseSensitive {
|
||||
f, err = s.Repository.Folder.FindByPath(ctx, path, false)
|
||||
@ -547,9 +189,11 @@ func (s *scanJob) handleFolder(ctx context.Context, file scanFile) error {
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
return f, err
|
||||
}
|
||||
|
||||
func (s *scanJob) onNewFolder(ctx context.Context, file scanFile) (*models.Folder, error) {
|
||||
func (s *Scanner) onNewFolder(ctx context.Context, file ScannedFile) (*models.Folder, error) {
|
||||
renamed, err := s.handleFolderRename(ctx, file)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -596,7 +240,7 @@ func (s *scanJob) onNewFolder(ctx context.Context, file scanFile) (*models.Folde
|
||||
return toCreate, nil
|
||||
}
|
||||
|
||||
func (s *scanJob) handleFolderRename(ctx context.Context, file scanFile) (*models.Folder, error) {
|
||||
func (s *Scanner) handleFolderRename(ctx context.Context, file ScannedFile) (*models.Folder, error) {
|
||||
// ignore folders in zip files
|
||||
if file.ZipFileID != nil {
|
||||
return nil, nil
|
||||
@ -637,7 +281,7 @@ func (s *scanJob) handleFolderRename(ctx context.Context, file scanFile) (*model
|
||||
return renamedFrom, nil
|
||||
}
|
||||
|
||||
func (s *scanJob) onExistingFolder(ctx context.Context, f scanFile, existing *models.Folder) (*models.Folder, error) {
|
||||
func (s *Scanner) onExistingFolder(ctx context.Context, f ScannedFile, existing *models.Folder) (*models.Folder, error) {
|
||||
update := false
|
||||
|
||||
// update if mod time is changed
|
||||
@ -678,22 +322,22 @@ func (s *scanJob) onExistingFolder(ctx context.Context, f scanFile, existing *mo
|
||||
return existing, nil
|
||||
}
|
||||
|
||||
func modTime(info fs.FileInfo) time.Time {
|
||||
// truncate to seconds, since we don't store beyond that in the database
|
||||
return info.ModTime().Truncate(time.Second)
|
||||
type ScanFileResult struct {
|
||||
File models.File
|
||||
New bool
|
||||
Renamed bool
|
||||
Updated bool
|
||||
}
|
||||
|
||||
func (s *scanJob) handleFile(ctx context.Context, f scanFile) error {
|
||||
defer s.incrementProgress(f)
|
||||
|
||||
var ff models.File
|
||||
// ScanFile scans the provided file into the database, returning the scan result.
|
||||
func (s *Scanner) ScanFile(ctx context.Context, f ScannedFile) (*ScanFileResult, error) {
|
||||
var r *ScanFileResult
|
||||
|
||||
// don't use a transaction to check if new or existing
|
||||
if err := s.withDB(ctx, func(ctx context.Context) error {
|
||||
if err := s.Repository.WithDB(ctx, func(ctx context.Context) error {
|
||||
// determine if file already exists in data store
|
||||
// assume case sensitive when searching for the file to begin with
|
||||
var err error
|
||||
ff, err = s.Repository.File.FindByPath(ctx, f.Path, true)
|
||||
ff, err := s.Repository.File.FindByPath(ctx, f.Path, true)
|
||||
if err != nil {
|
||||
return fmt.Errorf("checking for existing file %q: %w", f.Path, err)
|
||||
}
|
||||
@ -702,7 +346,7 @@ func (s *scanJob) handleFile(ctx context.Context, f scanFile) error {
|
||||
// case insensitive search
|
||||
// assume case sensitive if in zip
|
||||
if ff == nil && f.ZipFileID != nil {
|
||||
caseSensitive, _ := f.fs.IsPathCaseSensitive(f.Path)
|
||||
caseSensitive, _ := f.FS.IsPathCaseSensitive(f.Path)
|
||||
|
||||
if !caseSensitive {
|
||||
ff, err = s.Repository.File.FindByPath(ctx, f.Path, false)
|
||||
@ -714,35 +358,23 @@ func (s *scanJob) handleFile(ctx context.Context, f scanFile) error {
|
||||
|
||||
if ff == nil {
|
||||
// returns a file only if it is actually new
|
||||
ff, err = s.onNewFile(ctx, f)
|
||||
r, err = s.onNewFile(ctx, f)
|
||||
return err
|
||||
}
|
||||
|
||||
ff, err = s.onExistingFile(ctx, f, ff)
|
||||
r, err = s.onExistingFile(ctx, f, ff)
|
||||
return err
|
||||
}); err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if ff != nil && s.isZipFile(f.info.Name()) {
|
||||
f.BaseFile = ff.Base()
|
||||
|
||||
// scan zip files with a different context that is not cancellable
|
||||
// cancelling while scanning zip file contents results in the scan
|
||||
// contents being partially completed
|
||||
zipCtx := context.WithoutCancel(ctx)
|
||||
|
||||
if err := s.scanZipFile(zipCtx, f); err != nil {
|
||||
logger.Errorf("Error scanning zip file %q: %v", f.Path, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
return r, nil
|
||||
}
|
||||
|
||||
func (s *scanJob) isZipFile(path string) bool {
|
||||
// IsZipFile determines if the provided path is a zip file based on its extension.
|
||||
func (s *Scanner) IsZipFile(path string) bool {
|
||||
fExt := filepath.Ext(path)
|
||||
for _, ext := range s.options.ZipFileExtensions {
|
||||
for _, ext := range s.ZipFileExtensions {
|
||||
if strings.EqualFold(fExt, "."+ext) {
|
||||
return true
|
||||
}
|
||||
@ -751,7 +383,7 @@ func (s *scanJob) isZipFile(path string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (s *scanJob) onNewFile(ctx context.Context, f scanFile) (models.File, error) {
|
||||
func (s *Scanner) onNewFile(ctx context.Context, f ScannedFile) (*ScanFileResult, error) {
|
||||
now := time.Now()
|
||||
|
||||
baseFile := f.BaseFile
|
||||
@ -767,28 +399,20 @@ func (s *scanJob) onNewFile(ctx context.Context, f scanFile) (models.File, error
|
||||
}
|
||||
|
||||
if parentFolderID == nil {
|
||||
// if parent folder doesn't exist, assume it's not yet created
|
||||
// add this file to the queue to be created later
|
||||
if s.retrying {
|
||||
// if we're retrying and the folder still doesn't exist, then it's a problem
|
||||
return nil, fmt.Errorf("parent folder for %q doesn't exist", path)
|
||||
}
|
||||
|
||||
s.retryList = append(s.retryList, f)
|
||||
return nil, nil
|
||||
return nil, fmt.Errorf("parent folder for %q doesn't exist", path)
|
||||
}
|
||||
|
||||
baseFile.ParentFolderID = *parentFolderID
|
||||
|
||||
const useExisting = false
|
||||
fp, err := s.calculateFingerprints(f.fs, baseFile, path, useExisting)
|
||||
fp, err := s.calculateFingerprints(f.FS, baseFile, path, useExisting)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
baseFile.SetFingerprints(fp)
|
||||
|
||||
file, err := s.fireDecorators(ctx, f.fs, baseFile)
|
||||
file, err := s.fireDecorators(ctx, f.FS, baseFile)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -801,14 +425,17 @@ func (s *scanJob) onNewFile(ctx context.Context, f scanFile) (models.File, error
|
||||
}
|
||||
|
||||
if renamed != nil {
|
||||
return &ScanFileResult{
|
||||
File: renamed,
|
||||
Renamed: true,
|
||||
}, nil
|
||||
// handle rename should have already handled the contents of the zip file
|
||||
// so shouldn't need to scan it again
|
||||
// return nil so it doesn't
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// if not renamed, queue file for creation
|
||||
if err := s.withTxn(ctx, func(ctx context.Context) error {
|
||||
if err := s.Repository.WithTxn(ctx, func(ctx context.Context) error {
|
||||
if err := s.Repository.File.Create(ctx, file); err != nil {
|
||||
return fmt.Errorf("creating file %q: %w", path, err)
|
||||
}
|
||||
@ -822,10 +449,13 @@ func (s *scanJob) onNewFile(ctx context.Context, f scanFile) (models.File, error
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return file, nil
|
||||
return &ScanFileResult{
|
||||
File: file,
|
||||
New: true,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *scanJob) fireDecorators(ctx context.Context, fs models.FS, f models.File) (models.File, error) {
|
||||
func (s *Scanner) fireDecorators(ctx context.Context, fs models.FS, f models.File) (models.File, error) {
|
||||
for _, h := range s.FileDecorators {
|
||||
var err error
|
||||
f, err = h.Decorate(ctx, fs, f)
|
||||
@ -837,8 +467,8 @@ func (s *scanJob) fireDecorators(ctx context.Context, fs models.FS, f models.Fil
|
||||
return f, nil
|
||||
}
|
||||
|
||||
func (s *scanJob) fireHandlers(ctx context.Context, f models.File, oldFile models.File) error {
|
||||
for _, h := range s.handlers {
|
||||
func (s *Scanner) fireHandlers(ctx context.Context, f models.File, oldFile models.File) error {
|
||||
for _, h := range s.FileHandlers {
|
||||
if err := h.Handle(ctx, f, oldFile); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -847,7 +477,7 @@ func (s *scanJob) fireHandlers(ctx context.Context, f models.File, oldFile model
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *scanJob) calculateFingerprints(fs models.FS, f *models.BaseFile, path string, useExisting bool) (models.Fingerprints, error) {
|
||||
func (s *Scanner) calculateFingerprints(fs models.FS, f *models.BaseFile, path string, useExisting bool) (models.Fingerprints, error) {
|
||||
// only log if we're (re)calculating fingerprints
|
||||
if !useExisting {
|
||||
logger.Infof("Calculating fingerprints for %s ...", path)
|
||||
@ -884,7 +514,7 @@ func appendFileUnique(v []models.File, toAdd []models.File) []models.File {
|
||||
return v
|
||||
}
|
||||
|
||||
func (s *scanJob) getFileFS(f *models.BaseFile) (models.FS, error) {
|
||||
func (s *Scanner) getFileFS(f *models.BaseFile) (models.FS, error) {
|
||||
if f.ZipFile == nil {
|
||||
return s.FS, nil
|
||||
}
|
||||
@ -899,7 +529,7 @@ func (s *scanJob) getFileFS(f *models.BaseFile) (models.FS, error) {
|
||||
return fs.OpenZip(zipPath, zipSize)
|
||||
}
|
||||
|
||||
func (s *scanJob) handleRename(ctx context.Context, f models.File, fp []models.Fingerprint) (models.File, error) {
|
||||
func (s *Scanner) handleRename(ctx context.Context, f models.File, fp []models.Fingerprint) (models.File, error) {
|
||||
var others []models.File
|
||||
|
||||
for _, tfp := range fp {
|
||||
@ -941,7 +571,7 @@ func (s *scanJob) handleRename(ctx context.Context, f models.File, fp []models.F
|
||||
// treat as a move
|
||||
missing = append(missing, other)
|
||||
}
|
||||
case !s.acceptEntry(ctx, other.Base().Path, info):
|
||||
case !s.AcceptEntry(ctx, other.Base().Path, info):
|
||||
// #4393 - if the file is no longer in the configured library paths, treat it as a move
|
||||
logger.Debugf("File %q no longer in library paths. Treating as a move.", other.Base().Path)
|
||||
missing = append(missing, other)
|
||||
@ -974,12 +604,12 @@ func (s *scanJob) handleRename(ctx context.Context, f models.File, fp []models.F
|
||||
fBaseCopy.Fingerprints = updatedBase.Fingerprints
|
||||
*updatedBase = fBaseCopy
|
||||
|
||||
if err := s.withTxn(ctx, func(ctx context.Context) error {
|
||||
if err := s.Repository.WithTxn(ctx, func(ctx context.Context) error {
|
||||
if err := s.Repository.File.Update(ctx, updated); err != nil {
|
||||
return fmt.Errorf("updating file for rename %q: %w", newPath, err)
|
||||
}
|
||||
|
||||
if s.isZipFile(updatedBase.Basename) {
|
||||
if s.IsZipFile(updatedBase.Basename) {
|
||||
if err := transferZipHierarchy(ctx, s.Repository.Folder, s.Repository.File, updatedBase.ID, oldPath, newPath); err != nil {
|
||||
return fmt.Errorf("moving zip hierarchy for renamed zip file %q: %w", newPath, err)
|
||||
}
|
||||
@ -997,9 +627,9 @@ func (s *scanJob) handleRename(ctx context.Context, f models.File, fp []models.F
|
||||
return updated, nil
|
||||
}
|
||||
|
||||
func (s *scanJob) isHandlerRequired(ctx context.Context, f models.File) bool {
|
||||
accept := len(s.options.HandlerRequiredFilters) == 0
|
||||
for _, filter := range s.options.HandlerRequiredFilters {
|
||||
func (s *Scanner) isHandlerRequired(ctx context.Context, f models.File) bool {
|
||||
accept := len(s.HandlerRequiredFilters) == 0
|
||||
for _, filter := range s.HandlerRequiredFilters {
|
||||
// accept if any filter accepts the file
|
||||
if filter.Accept(ctx, f) {
|
||||
accept = true
|
||||
@ -1018,9 +648,9 @@ func (s *scanJob) isHandlerRequired(ctx context.Context, f models.File) bool {
|
||||
// - file size
|
||||
// - image format, width or height
|
||||
// - video codec, audio codec, format, width, height, framerate or bitrate
|
||||
func (s *scanJob) isMissingMetadata(ctx context.Context, f scanFile, existing models.File) bool {
|
||||
func (s *Scanner) isMissingMetadata(ctx context.Context, f ScannedFile, existing models.File) bool {
|
||||
for _, h := range s.FileDecorators {
|
||||
if h.IsMissingMetadata(ctx, f.fs, existing) {
|
||||
if h.IsMissingMetadata(ctx, f.FS, existing) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
@ -1028,20 +658,20 @@ func (s *scanJob) isMissingMetadata(ctx context.Context, f scanFile, existing mo
|
||||
return false
|
||||
}
|
||||
|
||||
func (s *scanJob) setMissingMetadata(ctx context.Context, f scanFile, existing models.File) (models.File, error) {
|
||||
func (s *Scanner) setMissingMetadata(ctx context.Context, f ScannedFile, existing models.File) (models.File, error) {
|
||||
path := existing.Base().Path
|
||||
logger.Infof("Updating metadata for %s", path)
|
||||
|
||||
existing.Base().Size = f.Size
|
||||
|
||||
var err error
|
||||
existing, err = s.fireDecorators(ctx, f.fs, existing)
|
||||
existing, err = s.fireDecorators(ctx, f.FS, existing)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// queue file for update
|
||||
if err := s.withTxn(ctx, func(ctx context.Context) error {
|
||||
if err := s.Repository.WithTxn(ctx, func(ctx context.Context) error {
|
||||
if err := s.Repository.File.Update(ctx, existing); err != nil {
|
||||
return fmt.Errorf("updating file %q: %w", path, err)
|
||||
}
|
||||
@ -1054,9 +684,9 @@ func (s *scanJob) setMissingMetadata(ctx context.Context, f scanFile, existing m
|
||||
return existing, nil
|
||||
}
|
||||
|
||||
func (s *scanJob) setMissingFingerprints(ctx context.Context, f scanFile, existing models.File) (models.File, error) {
|
||||
func (s *Scanner) setMissingFingerprints(ctx context.Context, f ScannedFile, existing models.File) (models.File, error) {
|
||||
const useExisting = true
|
||||
fp, err := s.calculateFingerprints(f.fs, existing.Base(), f.Path, useExisting)
|
||||
fp, err := s.calculateFingerprints(f.FS, existing.Base(), f.Path, useExisting)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -1064,7 +694,7 @@ func (s *scanJob) setMissingFingerprints(ctx context.Context, f scanFile, existi
|
||||
if fp.ContentsChanged(existing.Base().Fingerprints) {
|
||||
existing.SetFingerprints(fp)
|
||||
|
||||
if err := s.withTxn(ctx, func(ctx context.Context) error {
|
||||
if err := s.Repository.WithTxn(ctx, func(ctx context.Context) error {
|
||||
if err := s.Repository.File.Update(ctx, existing); err != nil {
|
||||
return fmt.Errorf("updating file %q: %w", f.Path, err)
|
||||
}
|
||||
@ -1079,14 +709,14 @@ func (s *scanJob) setMissingFingerprints(ctx context.Context, f scanFile, existi
|
||||
}
|
||||
|
||||
// returns a file only if it was updated
|
||||
func (s *scanJob) onExistingFile(ctx context.Context, f scanFile, existing models.File) (models.File, error) {
|
||||
func (s *Scanner) onExistingFile(ctx context.Context, f ScannedFile, existing models.File) (*ScanFileResult, error) {
|
||||
base := existing.Base()
|
||||
path := base.Path
|
||||
|
||||
fileModTime := f.ModTime
|
||||
// #6326 - also force a rescan if the basename changed
|
||||
updated := !fileModTime.Equal(base.ModTime) || base.Basename != f.Basename
|
||||
forceRescan := s.options.Rescan
|
||||
forceRescan := s.Rescan
|
||||
|
||||
if !updated && !forceRescan {
|
||||
return s.onUnchangedFile(ctx, f, existing)
|
||||
@ -1108,7 +738,7 @@ func (s *scanJob) onExistingFile(ctx context.Context, f scanFile, existing model
|
||||
|
||||
// calculate and update fingerprints for the file
|
||||
const useExisting = false
|
||||
fp, err := s.calculateFingerprints(f.fs, base, path, useExisting)
|
||||
fp, err := s.calculateFingerprints(f.FS, base, path, useExisting)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -1116,13 +746,13 @@ func (s *scanJob) onExistingFile(ctx context.Context, f scanFile, existing model
|
||||
s.removeOutdatedFingerprints(existing, fp)
|
||||
existing.SetFingerprints(fp)
|
||||
|
||||
existing, err = s.fireDecorators(ctx, f.fs, existing)
|
||||
existing, err = s.fireDecorators(ctx, f.FS, existing)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// queue file for update
|
||||
if err := s.withTxn(ctx, func(ctx context.Context) error {
|
||||
if err := s.Repository.WithTxn(ctx, func(ctx context.Context) error {
|
||||
if err := s.Repository.File.Update(ctx, existing); err != nil {
|
||||
return fmt.Errorf("updating file %q: %w", path, err)
|
||||
}
|
||||
@ -1135,11 +765,13 @@ func (s *scanJob) onExistingFile(ctx context.Context, f scanFile, existing model
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return existing, nil
|
||||
return &ScanFileResult{
|
||||
File: existing,
|
||||
Updated: true,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *scanJob) removeOutdatedFingerprints(existing models.File, fp models.Fingerprints) {
|
||||
func (s *Scanner) removeOutdatedFingerprints(existing models.File, fp models.Fingerprints) {
|
||||
// HACK - if no MD5 fingerprint was returned, and the oshash is changed
|
||||
// then remove the MD5 fingerprint
|
||||
oshash := fp.For(models.FingerprintTypeOshash)
|
||||
@ -1167,7 +799,7 @@ func (s *scanJob) removeOutdatedFingerprints(existing models.File, fp models.Fin
|
||||
}
|
||||
|
||||
// returns a file only if it was updated
|
||||
func (s *scanJob) onUnchangedFile(ctx context.Context, f scanFile, existing models.File) (models.File, error) {
|
||||
func (s *Scanner) onUnchangedFile(ctx context.Context, f ScannedFile, existing models.File) (*ScanFileResult, error) {
|
||||
var err error
|
||||
|
||||
isMissingMetdata := s.isMissingMetadata(ctx, f, existing)
|
||||
@ -1186,7 +818,7 @@ func (s *scanJob) onUnchangedFile(ctx context.Context, f scanFile, existing mode
|
||||
}
|
||||
|
||||
handlerRequired := false
|
||||
if err := s.withDB(ctx, func(ctx context.Context) error {
|
||||
if err := s.Repository.WithDB(ctx, func(ctx context.Context) error {
|
||||
// check if the handler needs to be run
|
||||
handlerRequired = s.isHandlerRequired(ctx, existing)
|
||||
return nil
|
||||
@ -1196,15 +828,20 @@ func (s *scanJob) onUnchangedFile(ctx context.Context, f scanFile, existing mode
|
||||
|
||||
if !handlerRequired {
|
||||
// if this file is a zip file, then we need to rescan the contents
|
||||
// as well. We do this by returning the file, instead of nil.
|
||||
// as well. We do this by indicating that the file is updated.
|
||||
if isMissingMetdata {
|
||||
return existing, nil
|
||||
return &ScanFileResult{
|
||||
File: existing,
|
||||
Updated: true,
|
||||
}, nil
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
return &ScanFileResult{
|
||||
File: existing,
|
||||
}, nil
|
||||
}
|
||||
|
||||
if err := s.withTxn(ctx, func(ctx context.Context) error {
|
||||
if err := s.Repository.WithTxn(ctx, func(ctx context.Context) error {
|
||||
if err := s.fireHandlers(ctx, existing, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -1215,6 +852,9 @@ func (s *scanJob) onUnchangedFile(ctx context.Context, f scanFile, existing mode
|
||||
}
|
||||
|
||||
// if this file is a zip file, then we need to rescan the contents
|
||||
// as well. We do this by returning the file, instead of nil.
|
||||
return existing, nil
|
||||
// as well. We do this by indicating that the file is updated.
|
||||
return &ScanFileResult{
|
||||
File: existing,
|
||||
Updated: true,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -81,8 +81,8 @@ func walkSym(f models.FS, filename string, linkDirname string, walkFn fs.WalkDir
|
||||
return fsWalk(f, filename, symWalkFunc)
|
||||
}
|
||||
|
||||
// symWalk extends filepath.Walk to also follow symlinks
|
||||
func symWalk(fs models.FS, path string, walkFn fs.WalkDirFunc) error {
|
||||
// SymWalk extends filepath.Walk to also follow symlinks
|
||||
func SymWalk(fs models.FS, path string, walkFn fs.WalkDirFunc) error {
|
||||
return walkSym(fs, path, path, walkFn)
|
||||
}
|
||||
|
||||
|
||||
@ -18,7 +18,7 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
errNotReaderAt = errors.New("not a ReaderAt")
|
||||
ErrNotReaderAt = errors.New("invalid reader: does not implement io.ReaderAt")
|
||||
errZipFSOpenZip = errors.New("cannot open zip file inside zip file")
|
||||
)
|
||||
|
||||
@ -38,7 +38,7 @@ func newZipFS(fs models.FS, path string, size int64) (*zipFS, error) {
|
||||
asReaderAt, _ := reader.(io.ReaderAt)
|
||||
if asReaderAt == nil {
|
||||
reader.Close()
|
||||
return nil, errNotReaderAt
|
||||
return nil, ErrNotReaderAt
|
||||
}
|
||||
|
||||
zipReader, err := zip.NewReader(asReaderAt, size)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user