From cd7c518b1430e0ea36d9273761fd01f5e2cf3f8f Mon Sep 17 00:00:00 2001 From: Tobias Goerke Date: Thu, 27 Jul 2023 08:22:15 +0200 Subject: [PATCH] fix: disable parallel uploads and refactor --- cmd/root.go | 7 +- frontend/src/api/tus.js | 2 +- frontend/src/i18n/en.json | 1 - frontend/src/views/settings/Global.vue | 12 -- http/tus.go | 7 +- http/tus_store.go | 167 ++++++++++++++++--------- settings/storage.go | 7 +- settings/tus.go | 10 +- 8 files changed, 120 insertions(+), 93 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index 2b18cc7a..55c3d23d 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -335,10 +335,9 @@ func quickSetup(flags *pflag.FlagSet, d pythonData) { AuthMethod: "", Branding: settings.Branding{}, Tus: settings.Tus{ - Enabled: true, - ChunkSize: settings.DefaultTusChunkSize, - ParallelUploads: settings.DefaultTusParallelUploads, - RetryCount: settings.DefaultTusRetryCount, + Enabled: true, + ChunkSize: settings.DefaultTusChunkSize, + RetryCount: settings.DefaultTusRetryCount, }, Commands: nil, Shell: nil, diff --git a/frontend/src/api/tus.js b/frontend/src/api/tus.js index bf7c6811..cd4456b9 100644 --- a/frontend/src/api/tus.js +++ b/frontend/src/api/tus.js @@ -22,7 +22,7 @@ export async function upload(url, content = "", overwrite = false, onupload) { endpoint: tusEndpoint, chunkSize: tusSettings.chunkSize, retryDelays: computeRetryDelays(tusSettings), - parallelUploads: tusSettings.parallelUploads || 1, + parallelUploads: 1, headers: { "X-Auth": store.state.jwt, // Send the metadata with every request diff --git a/frontend/src/i18n/en.json b/frontend/src/i18n/en.json index 9f8eec8e..b9a932ba 100644 --- a/frontend/src/i18n/en.json +++ b/frontend/src/i18n/en.json @@ -189,7 +189,6 @@ "tusUploadsHelp": "File Browser supports the tus.io protocol for resumable file uploads, allowing for the creation of efficient, reliable, resumable and chunked file uploads even on unreliable networks.", "tusUploadsEnabled": "Enable chunked file uploads", "tusUploadsChunkSize": "Indicates to maximum size of a request (direct uploads will be used for smaller uploads). You may input a plain integer denoting a bytes input or a string like 10MB, 1.00Ti etc.", - "tusUploadsParallelUploads": "Number of parallel uploads", "tusUploadsRetryCount": "Number of times to retry a failed upload (set to 0 to disable retries)", "userHomeBasePath": "Base path for user home directories", "userScopeGenerationPlaceholder": "The scope will be auto generated", diff --git a/frontend/src/views/settings/Global.vue b/frontend/src/views/settings/Global.vue index f2f882bf..cbe686a6 100644 --- a/frontend/src/views/settings/Global.vue +++ b/frontend/src/views/settings/Global.vue @@ -127,18 +127,6 @@ v-bind:disabled="!settings.tus.enabled" /> - - - diff --git a/http/tus.go b/http/tus.go index 6b8a2fdc..1990c01b 100644 --- a/http/tus.go +++ b/http/tus.go @@ -72,11 +72,6 @@ func (th *TusHandler) getOrCreateTusdHandler(d *data, r *http.Request) (_ *tusd. func (th TusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { code, err := withUser(func(w http.ResponseWriter, r *http.Request, d *data) (int, error) { - // Check if user has permission to create files - if !d.user.Perm.Create { - return http.StatusForbidden, nil - } - // Create a new tus handler for current user if it doesn't exist yet tusdHandler, err := th.getOrCreateTusdHandler(d, r) if err != nil { @@ -111,7 +106,7 @@ func (th TusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } func (th TusHandler) createTusdHandler(d *data, basePath string) (*tusd.UnroutedHandler, error) { - tusStore := NewInPlaceDataStore(d.user.FullPath("/"), d.user.Perm.Modify) + tusStore := NewInPlaceDataStore(d.user.FullPath("/"), d.user.Perm.Create, d.user.Perm.Modify) composer := tusd.NewStoreComposer() tusStore.UseIn(composer) diff --git a/http/tus_store.go b/http/tus_store.go index 76b4607c..90ba7b61 100644 --- a/http/tus_store.go +++ b/http/tus_store.go @@ -1,11 +1,13 @@ // InPlaceDataStore is a storage backend for tusd, which stores the uploaded -// files in the user's root directory. It features parallel and resumable uploads. -// It only touches the target file, without creating any lock files or separate -// files for upload parts. It thus requires no clean-up on failed uploads. -// It requires the destination metadata field to be set in the upload request. +// files in the user's root directory, without creating any auxiliary files. +// It thus requires no clean-up on failed uploads. +// The destination metadata field needs to be set in the upload request. // For each NewUpload, the target file is expanded by the upload's size. // This way, multiple uploads can work on the same file, without interfering // with each other. +// The uploads are resumable. Also, parallel uploads are supported, however, +// the initial POST requests to NewUpload must be synchronized and in order. +// Otherwise, no guarantee of the upload's integrity can be given. package http @@ -13,8 +15,9 @@ import ( "context" "crypto/rand" "encoding/hex" - "fmt" + "errors" "io" + "net/http" "os" "path/filepath" "sync" @@ -30,13 +33,21 @@ type InPlaceDataStore struct { // It equals the user's root directory. path string + // Store whether the user is permitted to create new files. + createPerm bool + // Store whether the user is permitted to modify files or only create new ones. modifyPerm bool // Maps an upload ID to its object. // Required, since GetUpload only provides us with the id of an upload // and expects us to return the Info object. - uploads map[string]*InPlaceUpload + uploadsByID map[string]*InPlaceUpload + + // Map all uploads by their path. + // Each path can have multiple uploads, as multiple uploads can work on the same file + // when parallel uploads are enabled. + uploadsByPath map[string][]*InPlaceUpload // Each upload appends to the file, so we need to make sure // each upload has expanded the file by info.Size bytes, before the next @@ -44,12 +55,14 @@ type InPlaceDataStore struct { mutex *sync.Mutex } -func NewInPlaceDataStore(path string, modifyPerm bool) *InPlaceDataStore { +func NewInPlaceDataStore(path string, createPerm, modifyPerm bool) *InPlaceDataStore { return &InPlaceDataStore{ - path: path, - modifyPerm: modifyPerm, - uploads: make(map[string]*InPlaceUpload), - mutex: &sync.Mutex{}, + path: path, + createPerm: createPerm, + modifyPerm: modifyPerm, + uploadsByID: make(map[string]*InPlaceUpload), + uploadsByPath: make(map[string][]*InPlaceUpload), + mutex: &sync.Mutex{}, } } @@ -58,63 +71,79 @@ func (store *InPlaceDataStore) UseIn(composer *tusd.StoreComposer) { composer.UseConcater(store) } -func (store *InPlaceDataStore) cleanupOrphanedUploads(filePath string) error { - // If the file doesn't exist, remove all upload references. - // This way we can eliminate inconsistencies for failed uploads. - if _, err := os.Stat(filePath); os.IsNotExist(err) { - for id, upload := range store.uploads { - if upload.filePath == filePath { - delete(store.uploads, id) - } +func (store *InPlaceDataStore) isPartOfNewUpload(fileExists bool, filePath string) bool { + if !fileExists { + // If the file doesn't exist, remove all upload references. + // This way we can eliminate inconsistencies for failed uploads. + for _, upload := range store.uploadsByPath[filePath] { + delete(store.uploadsByID, upload.ID) } - } else { - // If the file but no uploads exist for it, - // we need to remove the file to make sure we don't append to an existing file. - // This would lead to files with duplicate content. - uploadExists := false - for _, upload := range store.uploads { - if upload.filePath == filePath { - uploadExists = true - break - } - } - if !uploadExists { - if !store.modifyPerm { - // Gets interpreted as a 400 by tusd. - // There is no way to return a 403, so a 400 is better than a 500. - return tusd.ErrUploadStoppedByServer - } - if err := os.Remove(filePath); err != nil { - return err - } + delete(store.uploadsByPath, filePath) + + return true + } + + // In case the file exists, it is still possible that it is a new upload. + // E.g.: the user wants to overwrite an existing file. + return store.uploadsByPath[filePath] == nil +} + +func (store *InPlaceDataStore) checkPermissions(isPartOfNewUpload bool) error { + // Return tusd.HTTPErrors, as they are handled by tusd. + if isPartOfNewUpload { + if !store.createPerm { + return tusd.NewHTTPError(errors.New("user is not allowed to create a new upload"), http.StatusForbidden) } } + + if !store.modifyPerm { + return tusd.NewHTTPError(errors.New("user is not allowed to modify existing files"), http.StatusForbidden) + } + return nil } func (store *InPlaceDataStore) initializeUpload(filePath string, info *tusd.FileInfo) (int64, error) { - store.mutex.Lock() - defer store.mutex.Unlock() - - if err := store.cleanupOrphanedUploads(filePath); err != nil { + fileExists := true + if _, err := os.Stat(filePath); os.IsNotExist(err) { + fileExists = false + } else if err != nil { return 0, err } - // Create the file if it doesn't exist. - file, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE, filePerm) + // Delete existing files and references, if necessary. + isPartOfNewUpload := store.isPartOfNewUpload(fileExists, filePath) + + if err := store.checkPermissions(isPartOfNewUpload); err != nil { + return 0, err + } + if isPartOfNewUpload && fileExists { + // Remove the file's contents (instead of re-creating it). + return 0, os.Truncate(filePath, 0) + } + if isPartOfNewUpload && !fileExists { + // Create the file, if it doesn't exist. + if _, err := os.Create(filePath); err != nil { + return 0, err + } + return 0, nil + } + + // The file exists and is part of an existing upload. + // Open the file and enlarge it by the upload's size. + file, err := os.OpenFile(filePath, os.O_WRONLY, filePerm) if err != nil { return 0, err } defer file.Close() - // Get the file's current size. + // Get the file's current size and offset to the end of the file. actualOffset, err := file.Seek(0, io.SeekEnd) if err != nil { return 0, err } - // Enlarge the file by the upload's size. - _, err = file.Write(make([]byte, info.Size)) - if err != nil { + // Enlarge the file by the upload's size (starting from the current offset). + if _, err = file.Write(make([]byte, info.Size)); err != nil { return 0, err } @@ -129,7 +158,7 @@ func (store *InPlaceDataStore) NewUpload(ctx context.Context, info tusd.FileInfo destination, ok := info.MetaData["destination"] if !ok { - return nil, fmt.Errorf("metadata field 'destination' not found in upload request") + return nil, errors.New("metadata field 'destination' not found in upload request") } filePath := filepath.Join(store.path, destination) @@ -139,23 +168,29 @@ func (store *InPlaceDataStore) NewUpload(ctx context.Context, info tusd.FileInfo actualOffset: info.Size, parent: store, } + + // Lock the mutex, as we need to modify the target file synchronously. + store.mutex.Lock() + defer store.mutex.Unlock() + // Tus creates a POST request for the final concatenation. // In that case, we don't need to create a new upload. if !info.IsFinal { if upload.actualOffset, err = store.initializeUpload(filePath, &info); err != nil { return nil, err } - store.uploads[info.ID] = upload } + store.uploadsByID[upload.ID] = upload + store.uploadsByPath[upload.filePath] = append(store.uploadsByPath[upload.filePath], upload) return upload, nil } func (store *InPlaceDataStore) GetUpload(ctx context.Context, id string) (tusd.Upload, error) { - if upload, ok := store.uploads[id]; ok { + if upload, ok := store.uploadsByID[id]; ok { return upload, nil } else { - return nil, fmt.Errorf("upload not found") + return nil, errors.New("upload not found") } } @@ -186,8 +221,7 @@ func (upload *InPlaceUpload) WriteChunk(ctx context.Context, offset int64, src i } defer file.Close() - _, err = file.Seek(upload.actualOffset+offset, io.SeekStart) - if err != nil { + if _, err = file.Seek(upload.actualOffset+offset, io.SeekStart); err != nil { return 0, err } @@ -209,16 +243,31 @@ func (upload *InPlaceUpload) GetReader(ctx context.Context) (io.Reader, error) { } func (upload *InPlaceUpload) FinishUpload(ctx context.Context) error { + upload.parent.mutex.Lock() + defer upload.parent.mutex.Unlock() + + delete(upload.parent.uploadsByID, upload.ID) + uploadsByPath := upload.parent.uploadsByPath[upload.filePath] + for i, u := range uploadsByPath { + if u.ID == upload.ID { + upload.parent.uploadsByPath[upload.filePath] = append(uploadsByPath[:i], uploadsByPath[i+1:]...) + break + } + } + if len(upload.parent.uploadsByPath[upload.filePath]) == 0 { + delete(upload.parent.uploadsByPath, upload.filePath) + } + return nil } func (upload *InPlaceUpload) ConcatUploads(ctx context.Context, uploads []tusd.Upload) (err error) { - parent := upload.parent for _, u := range uploads { - delete(parent.uploads, (u.(*InPlaceUpload)).ID) + if err := (u.(*InPlaceUpload)).FinishUpload(ctx); err != nil { + return err + } } - delete(parent.uploads, upload.ID) - return nil + return upload.FinishUpload(ctx) } func uid() (string, error) { diff --git a/settings/storage.go b/settings/storage.go index bbeffe15..a73a1946 100644 --- a/settings/storage.go +++ b/settings/storage.go @@ -35,10 +35,9 @@ func (s *Storage) Get() (*Settings, error) { } if set.Tus == (Tus{}) { set.Tus = Tus{ - Enabled: false, - ChunkSize: DefaultTusChunkSize, - ParallelUploads: DefaultTusParallelUploads, - RetryCount: DefaultTusRetryCount, + Enabled: false, + ChunkSize: DefaultTusChunkSize, + RetryCount: DefaultTusRetryCount, } } return set, nil diff --git a/settings/tus.go b/settings/tus.go index b6ea3f79..f5eceb44 100644 --- a/settings/tus.go +++ b/settings/tus.go @@ -1,13 +1,11 @@ package settings -const DefaultTusChunkSize = 20 * 1024 * 1024 // 20MB -const DefaultTusParallelUploads = 3 +const DefaultTusChunkSize = 5 * 1024 * 1024 // 20MB const DefaultTusRetryCount = 3 // Tus contains the tus.io settings of the app. type Tus struct { - Enabled bool `json:"enabled"` - ChunkSize uint64 `json:"chunkSize"` - ParallelUploads uint8 `json:"parallelUploads"` - RetryCount uint16 `json:"retryCount"` + Enabled bool `json:"enabled"` + ChunkSize uint64 `json:"chunkSize"` + RetryCount uint16 `json:"retryCount"` }