From a28860dec4f3c0e558eb38d33006179dbe0f9914 Mon Sep 17 00:00:00 2001 From: Tobias Goerke Date: Tue, 25 Jul 2023 17:04:06 +0200 Subject: [PATCH] feat: implement a custom tus datastore --- frontend/src/api/tus.js | 23 ++-- frontend/src/i18n/en.json | 2 +- frontend/src/views/settings/Global.vue | 22 +-- http/tus.go | 165 ++-------------------- http/tus_store.go | 184 +++++++++++++++++++++++++ 5 files changed, 219 insertions(+), 177 deletions(-) create mode 100644 http/tus_store.go diff --git a/frontend/src/api/tus.js b/frontend/src/api/tus.js index 470e3507..3a699b3a 100644 --- a/frontend/src/api/tus.js +++ b/frontend/src/api/tus.js @@ -13,20 +13,26 @@ export async function upload(url, content = "", overwrite = false, onupload) { } return new Promise((resolve, reject) => { + const metadata = { + filename: content.name, + filetype: content.type, + overwrite: overwrite.toString(), + // url is URI encoded and needs to be decoded for metadata first + destination: decodeURIComponent(removePrefix(url)), + }; var upload = new tus.Upload(content, { endpoint: tusEndpoint, chunkSize: tusSettings.chunkSize, retryDelays: computeRetryDelays(tusSettings), parallelUploads: tusSettings.parallelUploads || 1, - metadata: { - filename: content.name, - filetype: content.type, - overwrite: overwrite.toString(), - // url is URI encoded and needs to be decoded for metadata first - destination: decodeURIComponent(removePrefix(url)), - }, headers: { "X-Auth": store.state.jwt, + // Send the metadata with every request + // If we used the tus client's metadata option, it would only be sent + // with some of the requests. + "Upload-Metadata": Object.entries(metadata) + .map(([key, value]) => `${key} ${btoa(value)}`) + .join(","), }, onError: function (error) { reject("Upload failed: " + error); @@ -42,13 +48,12 @@ export async function upload(url, content = "", overwrite = false, onupload) { resolve(); }, }); - upload.findPreviousUploads().then(function (previousUploads) { if (previousUploads.length) { upload.resumeFromPreviousUpload(previousUploads[0]); } - upload.start(); }); + upload.start(); }); } diff --git a/frontend/src/i18n/en.json b/frontend/src/i18n/en.json index f4d1d3b9..9f8eec8e 100644 --- a/frontend/src/i18n/en.json +++ b/frontend/src/i18n/en.json @@ -188,8 +188,8 @@ "tusUploads": "Chunked Uploads", "tusUploadsHelp": "File Browser supports the tus.io protocol for resumable file uploads, allowing for the creation of efficient, reliable, resumable and chunked file uploads even on unreliable networks.", "tusUploadsEnabled": "Enable chunked file uploads", - "tusUploadsParallelUploads": "Number of parallel uploads", "tusUploadsChunkSize": "Indicates to maximum size of a request (direct uploads will be used for smaller uploads). You may input a plain integer denoting a bytes input or a string like 10MB, 1.00Ti etc.", + "tusUploadsParallelUploads": "Number of parallel uploads", "tusUploadsRetryCount": "Number of times to retry a failed upload (set to 0 to disable retries)", "userHomeBasePath": "Base path for user home directories", "userScopeGenerationPlaceholder": "The scope will be auto generated", diff --git a/frontend/src/views/settings/Global.vue b/frontend/src/views/settings/Global.vue index 15e8a41c..f2f882bf 100644 --- a/frontend/src/views/settings/Global.vue +++ b/frontend/src/views/settings/Global.vue @@ -116,6 +116,17 @@

+ + + @@ -128,17 +139,6 @@ min="1" /> - - - diff --git a/http/tus.go b/http/tus.go index ba468450..9b0fc062 100644 --- a/http/tus.go +++ b/http/tus.go @@ -1,35 +1,25 @@ package http import ( - "errors" "fmt" "log" "net/http" "net/url" - "os" - "path/filepath" - "reflect" - "strconv" "sync" - "github.com/tus/tusd/pkg/filestore" tusd "github.com/tus/tusd/pkg/handler" "github.com/filebrowser/filebrowser/v2/settings" "github.com/filebrowser/filebrowser/v2/storage" - "github.com/filebrowser/filebrowser/v2/users" ) -const uploadDirName = ".tmp_upload" - type TusHandler struct { - store *storage.Storage - server *settings.Server - settings *settings.Settings - tusdHandlers map[uint]*tusd.UnroutedHandler - notifyNewTusdHandler chan struct{} - apiPath string - mutex *sync.Mutex + store *storage.Storage + server *settings.Server + settings *settings.Settings + tusdHandlers map[uint]*tusd.UnroutedHandler + apiPath string + mutex *sync.Mutex } func NewTusHandler(store *storage.Storage, server *settings.Server, apiPath string) (_ *TusHandler, err error) { @@ -37,7 +27,6 @@ func NewTusHandler(store *storage.Storage, server *settings.Server, apiPath stri tusHandler.store = store tusHandler.server = server tusHandler.tusdHandlers = make(map[uint]*tusd.UnroutedHandler) - tusHandler.notifyNewTusdHandler = make(chan struct{}) tusHandler.apiPath = apiPath tusHandler.mutex = &sync.Mutex{} @@ -45,9 +34,6 @@ func NewTusHandler(store *storage.Storage, server *settings.Server, apiPath stri return tusHandler, fmt.Errorf("couldn't get settings: %w", err) } - // Create a goroutine that handles uploaded file events for all users - go tusHandler.handleFileUploadedEvents() - return tusHandler, nil } @@ -79,7 +65,6 @@ func (th *TusHandler) getOrCreateTusdHandler(d *data, r *http.Request) (_ *tusd. return nil, err } th.tusdHandlers[d.user.ID] = tusdHandler - th.notifyNewTusdHandler <- struct{}{} } return tusdHandler, nil @@ -89,17 +74,10 @@ func (th TusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { code, err := withUser(func(w http.ResponseWriter, r *http.Request, d *data) (int, error) { // Create a new tus handler for current user if it doesn't exist yet tusdHandler, err := th.getOrCreateTusdHandler(d, r) - if err != nil { return http.StatusBadRequest, err } - // Create upload directory for each request - uploadDir := filepath.Join(d.user.FullPath("/"), uploadDirName) - if err := os.MkdirAll(uploadDir, os.ModePerm); err != nil { - return http.StatusInternalServerError, err - } - switch r.Method { case "POST": tusdHandler.PostFile(w, r) @@ -128,17 +106,13 @@ func (th TusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } func (th TusHandler) createTusdHandler(d *data, basePath string) (*tusd.UnroutedHandler, error) { - uploadDir := filepath.Join(d.user.FullPath("/"), uploadDirName) - tusStore := filestore.FileStore{ - Path: uploadDir, - } + tusStore := NewInPlaceDataStore(d.user.FullPath("/")) composer := tusd.NewStoreComposer() tusStore.UseIn(composer) tusdHandler, err := tusd.NewUnroutedHandler(tusd.Config{ - BasePath: basePath, - StoreComposer: composer, - NotifyCompleteUploads: true, + BasePath: basePath, + StoreComposer: composer, }) if err != nil { return nil, fmt.Errorf("unable to create tusdHandler: %w", err) @@ -146,124 +120,3 @@ func (th TusHandler) createTusdHandler(d *data, basePath string) (*tusd.Unrouted return tusdHandler, nil } - -func getMetadataField(metadata tusd.MetaData, field string) (string, error) { - if value, ok := metadata[field]; ok { - return value, nil - } else { - return "", fmt.Errorf("metadata field %s not found in upload request", field) - } -} - -func (th TusHandler) handleFileUploadedEvents() { - // Instead of running a goroutine for each user, we use a single goroutine that handles events for all users. - // This works by using a reflect select statement that waits for events from all users. - // On top of this, the reflect select statement also waits for a notification channel that is used to notify - // the goroutine when a new user has been added to so that the reflect select statement can be updated. - for { - cases := make([]reflect.SelectCase, len(th.tusdHandlers)+1) - // UserIDs != position in select statement, so store mapping - caseIdsToUserIds := make(map[int]uint, len(th.tusdHandlers)) - cases[0] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(th.notifyNewTusdHandler)} - i := 1 - for userID, tusdHandler := range th.tusdHandlers { - cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(tusdHandler.CompleteUploads)} - caseIdsToUserIds[i] = userID - i++ - } - - for { - chosen, value, _ := reflect.Select(cases) - if chosen == 0 { - // Notification channel has been triggered, - // so we need to update the reflect select statement - break - } - - // Get user ID from reflect select statement - userID := caseIdsToUserIds[chosen] - user, err := th.store.Users.Get(th.server.Root, userID) - if err != nil { - log.Printf("ERROR: couldn't get user with ID %d: %s\n", userID, err) - continue - } - event := value.Interface().(tusd.HookEvent) - if err := th.handleFileUploaded(user, &event); err != nil { - log.Printf("ERROR: couldn't handle completed upload: %s\n", err) - } - } - } -} - -func (th TusHandler) handleFileUploaded(user *users.User, event *tusd.HookEvent) error { - // Clean up only if an upload has been finalized - if !event.Upload.IsFinal { - return nil - } - - filename, err := getMetadataField(event.Upload.MetaData, "filename") - if err != nil { - return err - } - destination, err := getMetadataField(event.Upload.MetaData, "destination") - if err != nil { - return err - } - overwriteStr, err := getMetadataField(event.Upload.MetaData, "overwrite") - if err != nil { - return err - } - userPath := user.FullPath("/") - uploadDir := filepath.Join(userPath, uploadDirName) - uploadedFile := filepath.Join(uploadDir, event.Upload.ID) - fullDestination := filepath.Join(userPath, destination) - - log.Printf("Upload of %s (%s) is finished. Moving file to destination (%s) "+ - "and cleaning up temporary files.\n", filename, uploadedFile, fullDestination) - - // Check if destination file already exists. If so, we require overwrite to be set - if _, err := os.Stat(fullDestination); !errors.Is(err, os.ErrNotExist) { - if overwrite, err := strconv.ParseBool(overwriteStr); err != nil { - return err - } else if !overwrite { - return fmt.Errorf("overwrite is set to false while destination file %s exists", destination) - } - } - - // Move uploaded file from tmp upload folder to user folder - if err := os.Rename(uploadedFile, fullDestination); err != nil { - return err - } - - return th.removeTemporaryFiles(uploadDir, &event.Upload) -} - -func (th TusHandler) removeTemporaryFiles(uploadDir string, upload *tusd.FileInfo) error { - // Remove uploaded tmp files for finished upload (.info objects are created and need to be removed, too)) - for _, partialUpload := range append(upload.PartialUploads, upload.ID) { - filesToDelete, err := filepath.Glob(filepath.Join(uploadDir, partialUpload+"*")) - if err != nil { - return err - } - for _, f := range filesToDelete { - if err := os.Remove(f); err != nil { - return err - } - } - } - - // Delete folder basePath if it is empty after the request - dir, err := os.ReadDir(uploadDir) - if err != nil { - return err - } - - if len(dir) == 0 { - // os.Remove won't remove non-empty folders in case of race condition - if err := os.Remove(uploadDir); err != nil { - return err - } - } - - return nil -} diff --git a/http/tus_store.go b/http/tus_store.go new file mode 100644 index 00000000..818821ac --- /dev/null +++ b/http/tus_store.go @@ -0,0 +1,184 @@ +package http + +import ( + "context" + "crypto/rand" + "encoding/hex" + "fmt" + "io" + "os" + "path/filepath" + "sync" + + tusd "github.com/tus/tusd/pkg/handler" +) + +const uidLength = 16 +const filePerm = 0644 + +type InPlaceDataStore struct { + // All uploads will be stored relative to this directory. + // It equals the user's root directory. + basePath string + + // Maps an upload ID to its object. + // Required, since GetUpload only provides us with the id of an upload + // and expects us to return the Info object. + uploads map[string]*InPlaceUpload + + // Each upload appends to the file, so we need to make sure + // each upload has expanded the file by info.Size bytes, before the next + // upload is created. + mutex *sync.Mutex +} + +func NewInPlaceDataStore(basePath string) *InPlaceDataStore { + return &InPlaceDataStore{ + basePath: basePath, + uploads: make(map[string]*InPlaceUpload), + mutex: &sync.Mutex{}, + } +} + +func (store *InPlaceDataStore) UseIn(composer *tusd.StoreComposer) { + composer.UseCore(store) + composer.UseConcater(store) +} + +func prepareFile(filePath string, uploadSize int64, mutex *sync.Mutex) (int64, error) { + mutex.Lock() + defer mutex.Unlock() + + // Create the file if it doesn't exist. + file, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE, filePerm) + if err != nil { + return 0, err + } + defer file.Close() + + // Get the file's current size. + actualOffset, err := file.Seek(0, io.SeekEnd) + if err != nil { + return 0, err + } + // Enlarge the file by the upload's size. + _, err = file.Write(make([]byte, uploadSize)) + if err != nil { + return 0, err + } + + return actualOffset, nil +} + +func (store *InPlaceDataStore) NewUpload(ctx context.Context, info tusd.FileInfo) (_ tusd.Upload, err error) { //nolint: gocritic + // The method must return an unique id which is used to identify the upload + if info.ID == "" { + info.ID, err = uid() + if err != nil { + return nil, err + } + } + + destination, ok := info.MetaData["destination"] + if !ok { + return nil, fmt.Errorf("metadata field 'destination' not found in upload request") + } + filePath := filepath.Join(store.basePath, destination) + + // Tus creates a POST request for the final concatenation. + // In that case, we don't need to create a new upload. + actualOffset := info.Size + if info.IsPartial { + actualOffset, err = prepareFile(filePath, info.Size, store.mutex) + if err != nil { + return nil, err + } + } + + upload := &InPlaceUpload{ + FileInfo: info, + filePath: filePath, + actualOffset: actualOffset, + parent: store, + } + + store.uploads[info.ID] = upload + + return upload, nil +} + +func (store *InPlaceDataStore) GetUpload(ctx context.Context, id string) (tusd.Upload, error) { + if upload, ok := store.uploads[id]; ok { + return upload, nil + } else { + return nil, fmt.Errorf("upload not found") + } +} + +// We need to define a concater, as client libraries will automatically ask for a concatenation. +func (store *InPlaceDataStore) AsConcatableUpload(upload tusd.Upload) tusd.ConcatableUpload { + return upload.(*InPlaceUpload) +} + +type InPlaceUpload struct { + tusd.FileInfo + // Extend the tusd.FileInfo struct with the target path of our uploaded file. + filePath string + // tusd expects offset to equal the upload's written bytes. + // As we can have multiple uploads working on the same file, + // this is not the case for us. Thus, store the actual offset. + // See: https://github.com/tus/tusd/blob/main/pkg/handler/unrouted_handler.go#L714 + actualOffset int64 + // Enable the upload to remove itself from the active uploads map. + parent *InPlaceDataStore +} + +func (upload *InPlaceUpload) WriteChunk(ctx context.Context, offset int64, src io.Reader) (int64, error) { + // Open the file and seek to the given offset. + // Then, copy the given reader to the file, update the offset and return. + file, err := os.OpenFile(upload.filePath, os.O_WRONLY, filePerm) + if err != nil { + return 0, err + } + defer file.Close() + + _, err = file.Seek(upload.actualOffset+offset, io.SeekStart) + if err != nil { + return 0, err + } + + n, err := io.Copy(file, src) + if err != nil { + return 0, err + } + + upload.Offset += n + return n, nil +} + +func (upload *InPlaceUpload) GetInfo(ctx context.Context) (tusd.FileInfo, error) { + return upload.FileInfo, nil +} + +func (upload *InPlaceUpload) GetReader(ctx context.Context) (io.Reader, error) { + return os.Open(upload.filePath) +} + +func (upload *InPlaceUpload) FinishUpload(ctx context.Context) error { + // Remove the upload from the ID-mapping. + delete(upload.parent.uploads, upload.filePath) + return nil +} + +func (upload *InPlaceUpload) ConcatUploads(ctx context.Context, uploads []tusd.Upload) (err error) { + return nil +} + +func uid() (string, error) { + id := make([]byte, uidLength) + _, err := io.ReadFull(rand.Reader, id) + if err != nil { + return "", err + } + return hex.EncodeToString(id), err +}