feat: implement a custom tus datastore
This commit is contained in:
parent
2990cf8c29
commit
a28860dec4
@ -13,20 +13,26 @@ export async function upload(url, content = "", overwrite = false, onupload) {
|
||||
}
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
var upload = new tus.Upload(content, {
|
||||
endpoint: tusEndpoint,
|
||||
chunkSize: tusSettings.chunkSize,
|
||||
retryDelays: computeRetryDelays(tusSettings),
|
||||
parallelUploads: tusSettings.parallelUploads || 1,
|
||||
metadata: {
|
||||
const metadata = {
|
||||
filename: content.name,
|
||||
filetype: content.type,
|
||||
overwrite: overwrite.toString(),
|
||||
// url is URI encoded and needs to be decoded for metadata first
|
||||
destination: decodeURIComponent(removePrefix(url)),
|
||||
},
|
||||
};
|
||||
var upload = new tus.Upload(content, {
|
||||
endpoint: tusEndpoint,
|
||||
chunkSize: tusSettings.chunkSize,
|
||||
retryDelays: computeRetryDelays(tusSettings),
|
||||
parallelUploads: tusSettings.parallelUploads || 1,
|
||||
headers: {
|
||||
"X-Auth": store.state.jwt,
|
||||
// Send the metadata with every request
|
||||
// If we used the tus client's metadata option, it would only be sent
|
||||
// with some of the requests.
|
||||
"Upload-Metadata": Object.entries(metadata)
|
||||
.map(([key, value]) => `${key} ${btoa(value)}`)
|
||||
.join(","),
|
||||
},
|
||||
onError: function (error) {
|
||||
reject("Upload failed: " + error);
|
||||
@ -42,13 +48,12 @@ export async function upload(url, content = "", overwrite = false, onupload) {
|
||||
resolve();
|
||||
},
|
||||
});
|
||||
|
||||
upload.findPreviousUploads().then(function (previousUploads) {
|
||||
if (previousUploads.length) {
|
||||
upload.resumeFromPreviousUpload(previousUploads[0]);
|
||||
}
|
||||
upload.start();
|
||||
});
|
||||
upload.start();
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@ -188,8 +188,8 @@
|
||||
"tusUploads": "Chunked Uploads",
|
||||
"tusUploadsHelp": "File Browser supports the tus.io protocol for resumable file uploads, allowing for the creation of efficient, reliable, resumable and chunked file uploads even on unreliable networks.",
|
||||
"tusUploadsEnabled": "Enable chunked file uploads",
|
||||
"tusUploadsParallelUploads": "Number of parallel uploads",
|
||||
"tusUploadsChunkSize": "Indicates to maximum size of a request (direct uploads will be used for smaller uploads). You may input a plain integer denoting a bytes input or a string like 10MB, 1.00Ti etc.",
|
||||
"tusUploadsParallelUploads": "Number of parallel uploads",
|
||||
"tusUploadsRetryCount": "Number of times to retry a failed upload (set to 0 to disable retries)",
|
||||
"userHomeBasePath": "Base path for user home directories",
|
||||
"userScopeGenerationPlaceholder": "The scope will be auto generated",
|
||||
|
||||
@ -116,6 +116,17 @@
|
||||
</p>
|
||||
|
||||
<div class="tusConditionalSettings">
|
||||
<label for="tus-chunkSize">{{
|
||||
$t("settings.tusUploadsChunkSize")
|
||||
}}</label>
|
||||
<input
|
||||
class="input input--block"
|
||||
type="text"
|
||||
v-model="formattedChunkSize"
|
||||
id="tus-chunkSize"
|
||||
v-bind:disabled="!settings.tus.enabled"
|
||||
/>
|
||||
|
||||
<label for="tus-parallelUploads">{{
|
||||
$t("settings.tusUploadsParallelUploads")
|
||||
}}</label>
|
||||
@ -128,17 +139,6 @@
|
||||
min="1"
|
||||
/>
|
||||
|
||||
<label for="tus-chunkSize">{{
|
||||
$t("settings.tusUploadsChunkSize")
|
||||
}}</label>
|
||||
<input
|
||||
class="input input--block"
|
||||
type="text"
|
||||
v-model="formattedChunkSize"
|
||||
id="tus-chunkSize"
|
||||
v-bind:disabled="!settings.tus.enabled"
|
||||
/>
|
||||
|
||||
<label for="tus-retryCount">{{
|
||||
$t("settings.tusUploadsRetryCount")
|
||||
}}</label>
|
||||
|
||||
149
http/tus.go
149
http/tus.go
@ -1,33 +1,23 @@
|
||||
package http
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"github.com/tus/tusd/pkg/filestore"
|
||||
tusd "github.com/tus/tusd/pkg/handler"
|
||||
|
||||
"github.com/filebrowser/filebrowser/v2/settings"
|
||||
"github.com/filebrowser/filebrowser/v2/storage"
|
||||
"github.com/filebrowser/filebrowser/v2/users"
|
||||
)
|
||||
|
||||
const uploadDirName = ".tmp_upload"
|
||||
|
||||
type TusHandler struct {
|
||||
store *storage.Storage
|
||||
server *settings.Server
|
||||
settings *settings.Settings
|
||||
tusdHandlers map[uint]*tusd.UnroutedHandler
|
||||
notifyNewTusdHandler chan struct{}
|
||||
apiPath string
|
||||
mutex *sync.Mutex
|
||||
}
|
||||
@ -37,7 +27,6 @@ func NewTusHandler(store *storage.Storage, server *settings.Server, apiPath stri
|
||||
tusHandler.store = store
|
||||
tusHandler.server = server
|
||||
tusHandler.tusdHandlers = make(map[uint]*tusd.UnroutedHandler)
|
||||
tusHandler.notifyNewTusdHandler = make(chan struct{})
|
||||
tusHandler.apiPath = apiPath
|
||||
tusHandler.mutex = &sync.Mutex{}
|
||||
|
||||
@ -45,9 +34,6 @@ func NewTusHandler(store *storage.Storage, server *settings.Server, apiPath stri
|
||||
return tusHandler, fmt.Errorf("couldn't get settings: %w", err)
|
||||
}
|
||||
|
||||
// Create a goroutine that handles uploaded file events for all users
|
||||
go tusHandler.handleFileUploadedEvents()
|
||||
|
||||
return tusHandler, nil
|
||||
}
|
||||
|
||||
@ -79,7 +65,6 @@ func (th *TusHandler) getOrCreateTusdHandler(d *data, r *http.Request) (_ *tusd.
|
||||
return nil, err
|
||||
}
|
||||
th.tusdHandlers[d.user.ID] = tusdHandler
|
||||
th.notifyNewTusdHandler <- struct{}{}
|
||||
}
|
||||
|
||||
return tusdHandler, nil
|
||||
@ -89,17 +74,10 @@ func (th TusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
code, err := withUser(func(w http.ResponseWriter, r *http.Request, d *data) (int, error) {
|
||||
// Create a new tus handler for current user if it doesn't exist yet
|
||||
tusdHandler, err := th.getOrCreateTusdHandler(d, r)
|
||||
|
||||
if err != nil {
|
||||
return http.StatusBadRequest, err
|
||||
}
|
||||
|
||||
// Create upload directory for each request
|
||||
uploadDir := filepath.Join(d.user.FullPath("/"), uploadDirName)
|
||||
if err := os.MkdirAll(uploadDir, os.ModePerm); err != nil {
|
||||
return http.StatusInternalServerError, err
|
||||
}
|
||||
|
||||
switch r.Method {
|
||||
case "POST":
|
||||
tusdHandler.PostFile(w, r)
|
||||
@ -128,17 +106,13 @@ func (th TusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
func (th TusHandler) createTusdHandler(d *data, basePath string) (*tusd.UnroutedHandler, error) {
|
||||
uploadDir := filepath.Join(d.user.FullPath("/"), uploadDirName)
|
||||
tusStore := filestore.FileStore{
|
||||
Path: uploadDir,
|
||||
}
|
||||
tusStore := NewInPlaceDataStore(d.user.FullPath("/"))
|
||||
composer := tusd.NewStoreComposer()
|
||||
tusStore.UseIn(composer)
|
||||
|
||||
tusdHandler, err := tusd.NewUnroutedHandler(tusd.Config{
|
||||
BasePath: basePath,
|
||||
StoreComposer: composer,
|
||||
NotifyCompleteUploads: true,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to create tusdHandler: %w", err)
|
||||
@ -146,124 +120,3 @@ func (th TusHandler) createTusdHandler(d *data, basePath string) (*tusd.Unrouted
|
||||
|
||||
return tusdHandler, nil
|
||||
}
|
||||
|
||||
func getMetadataField(metadata tusd.MetaData, field string) (string, error) {
|
||||
if value, ok := metadata[field]; ok {
|
||||
return value, nil
|
||||
} else {
|
||||
return "", fmt.Errorf("metadata field %s not found in upload request", field)
|
||||
}
|
||||
}
|
||||
|
||||
func (th TusHandler) handleFileUploadedEvents() {
|
||||
// Instead of running a goroutine for each user, we use a single goroutine that handles events for all users.
|
||||
// This works by using a reflect select statement that waits for events from all users.
|
||||
// On top of this, the reflect select statement also waits for a notification channel that is used to notify
|
||||
// the goroutine when a new user has been added to so that the reflect select statement can be updated.
|
||||
for {
|
||||
cases := make([]reflect.SelectCase, len(th.tusdHandlers)+1)
|
||||
// UserIDs != position in select statement, so store mapping
|
||||
caseIdsToUserIds := make(map[int]uint, len(th.tusdHandlers))
|
||||
cases[0] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(th.notifyNewTusdHandler)}
|
||||
i := 1
|
||||
for userID, tusdHandler := range th.tusdHandlers {
|
||||
cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(tusdHandler.CompleteUploads)}
|
||||
caseIdsToUserIds[i] = userID
|
||||
i++
|
||||
}
|
||||
|
||||
for {
|
||||
chosen, value, _ := reflect.Select(cases)
|
||||
if chosen == 0 {
|
||||
// Notification channel has been triggered,
|
||||
// so we need to update the reflect select statement
|
||||
break
|
||||
}
|
||||
|
||||
// Get user ID from reflect select statement
|
||||
userID := caseIdsToUserIds[chosen]
|
||||
user, err := th.store.Users.Get(th.server.Root, userID)
|
||||
if err != nil {
|
||||
log.Printf("ERROR: couldn't get user with ID %d: %s\n", userID, err)
|
||||
continue
|
||||
}
|
||||
event := value.Interface().(tusd.HookEvent)
|
||||
if err := th.handleFileUploaded(user, &event); err != nil {
|
||||
log.Printf("ERROR: couldn't handle completed upload: %s\n", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (th TusHandler) handleFileUploaded(user *users.User, event *tusd.HookEvent) error {
|
||||
// Clean up only if an upload has been finalized
|
||||
if !event.Upload.IsFinal {
|
||||
return nil
|
||||
}
|
||||
|
||||
filename, err := getMetadataField(event.Upload.MetaData, "filename")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
destination, err := getMetadataField(event.Upload.MetaData, "destination")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
overwriteStr, err := getMetadataField(event.Upload.MetaData, "overwrite")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
userPath := user.FullPath("/")
|
||||
uploadDir := filepath.Join(userPath, uploadDirName)
|
||||
uploadedFile := filepath.Join(uploadDir, event.Upload.ID)
|
||||
fullDestination := filepath.Join(userPath, destination)
|
||||
|
||||
log.Printf("Upload of %s (%s) is finished. Moving file to destination (%s) "+
|
||||
"and cleaning up temporary files.\n", filename, uploadedFile, fullDestination)
|
||||
|
||||
// Check if destination file already exists. If so, we require overwrite to be set
|
||||
if _, err := os.Stat(fullDestination); !errors.Is(err, os.ErrNotExist) {
|
||||
if overwrite, err := strconv.ParseBool(overwriteStr); err != nil {
|
||||
return err
|
||||
} else if !overwrite {
|
||||
return fmt.Errorf("overwrite is set to false while destination file %s exists", destination)
|
||||
}
|
||||
}
|
||||
|
||||
// Move uploaded file from tmp upload folder to user folder
|
||||
if err := os.Rename(uploadedFile, fullDestination); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return th.removeTemporaryFiles(uploadDir, &event.Upload)
|
||||
}
|
||||
|
||||
func (th TusHandler) removeTemporaryFiles(uploadDir string, upload *tusd.FileInfo) error {
|
||||
// Remove uploaded tmp files for finished upload (.info objects are created and need to be removed, too))
|
||||
for _, partialUpload := range append(upload.PartialUploads, upload.ID) {
|
||||
filesToDelete, err := filepath.Glob(filepath.Join(uploadDir, partialUpload+"*"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, f := range filesToDelete {
|
||||
if err := os.Remove(f); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Delete folder basePath if it is empty after the request
|
||||
dir, err := os.ReadDir(uploadDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(dir) == 0 {
|
||||
// os.Remove won't remove non-empty folders in case of race condition
|
||||
if err := os.Remove(uploadDir); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
184
http/tus_store.go
Normal file
184
http/tus_store.go
Normal file
@ -0,0 +1,184 @@
|
||||
package http
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
tusd "github.com/tus/tusd/pkg/handler"
|
||||
)
|
||||
|
||||
const uidLength = 16
|
||||
const filePerm = 0644
|
||||
|
||||
type InPlaceDataStore struct {
|
||||
// All uploads will be stored relative to this directory.
|
||||
// It equals the user's root directory.
|
||||
basePath string
|
||||
|
||||
// Maps an upload ID to its object.
|
||||
// Required, since GetUpload only provides us with the id of an upload
|
||||
// and expects us to return the Info object.
|
||||
uploads map[string]*InPlaceUpload
|
||||
|
||||
// Each upload appends to the file, so we need to make sure
|
||||
// each upload has expanded the file by info.Size bytes, before the next
|
||||
// upload is created.
|
||||
mutex *sync.Mutex
|
||||
}
|
||||
|
||||
func NewInPlaceDataStore(basePath string) *InPlaceDataStore {
|
||||
return &InPlaceDataStore{
|
||||
basePath: basePath,
|
||||
uploads: make(map[string]*InPlaceUpload),
|
||||
mutex: &sync.Mutex{},
|
||||
}
|
||||
}
|
||||
|
||||
func (store *InPlaceDataStore) UseIn(composer *tusd.StoreComposer) {
|
||||
composer.UseCore(store)
|
||||
composer.UseConcater(store)
|
||||
}
|
||||
|
||||
func prepareFile(filePath string, uploadSize int64, mutex *sync.Mutex) (int64, error) {
|
||||
mutex.Lock()
|
||||
defer mutex.Unlock()
|
||||
|
||||
// Create the file if it doesn't exist.
|
||||
file, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE, filePerm)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
// Get the file's current size.
|
||||
actualOffset, err := file.Seek(0, io.SeekEnd)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
// Enlarge the file by the upload's size.
|
||||
_, err = file.Write(make([]byte, uploadSize))
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return actualOffset, nil
|
||||
}
|
||||
|
||||
func (store *InPlaceDataStore) NewUpload(ctx context.Context, info tusd.FileInfo) (_ tusd.Upload, err error) { //nolint: gocritic
|
||||
// The method must return an unique id which is used to identify the upload
|
||||
if info.ID == "" {
|
||||
info.ID, err = uid()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
destination, ok := info.MetaData["destination"]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("metadata field 'destination' not found in upload request")
|
||||
}
|
||||
filePath := filepath.Join(store.basePath, destination)
|
||||
|
||||
// Tus creates a POST request for the final concatenation.
|
||||
// In that case, we don't need to create a new upload.
|
||||
actualOffset := info.Size
|
||||
if info.IsPartial {
|
||||
actualOffset, err = prepareFile(filePath, info.Size, store.mutex)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
upload := &InPlaceUpload{
|
||||
FileInfo: info,
|
||||
filePath: filePath,
|
||||
actualOffset: actualOffset,
|
||||
parent: store,
|
||||
}
|
||||
|
||||
store.uploads[info.ID] = upload
|
||||
|
||||
return upload, nil
|
||||
}
|
||||
|
||||
func (store *InPlaceDataStore) GetUpload(ctx context.Context, id string) (tusd.Upload, error) {
|
||||
if upload, ok := store.uploads[id]; ok {
|
||||
return upload, nil
|
||||
} else {
|
||||
return nil, fmt.Errorf("upload not found")
|
||||
}
|
||||
}
|
||||
|
||||
// We need to define a concater, as client libraries will automatically ask for a concatenation.
|
||||
func (store *InPlaceDataStore) AsConcatableUpload(upload tusd.Upload) tusd.ConcatableUpload {
|
||||
return upload.(*InPlaceUpload)
|
||||
}
|
||||
|
||||
type InPlaceUpload struct {
|
||||
tusd.FileInfo
|
||||
// Extend the tusd.FileInfo struct with the target path of our uploaded file.
|
||||
filePath string
|
||||
// tusd expects offset to equal the upload's written bytes.
|
||||
// As we can have multiple uploads working on the same file,
|
||||
// this is not the case for us. Thus, store the actual offset.
|
||||
// See: https://github.com/tus/tusd/blob/main/pkg/handler/unrouted_handler.go#L714
|
||||
actualOffset int64
|
||||
// Enable the upload to remove itself from the active uploads map.
|
||||
parent *InPlaceDataStore
|
||||
}
|
||||
|
||||
func (upload *InPlaceUpload) WriteChunk(ctx context.Context, offset int64, src io.Reader) (int64, error) {
|
||||
// Open the file and seek to the given offset.
|
||||
// Then, copy the given reader to the file, update the offset and return.
|
||||
file, err := os.OpenFile(upload.filePath, os.O_WRONLY, filePerm)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
_, err = file.Seek(upload.actualOffset+offset, io.SeekStart)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
n, err := io.Copy(file, src)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
upload.Offset += n
|
||||
return n, nil
|
||||
}
|
||||
|
||||
func (upload *InPlaceUpload) GetInfo(ctx context.Context) (tusd.FileInfo, error) {
|
||||
return upload.FileInfo, nil
|
||||
}
|
||||
|
||||
func (upload *InPlaceUpload) GetReader(ctx context.Context) (io.Reader, error) {
|
||||
return os.Open(upload.filePath)
|
||||
}
|
||||
|
||||
func (upload *InPlaceUpload) FinishUpload(ctx context.Context) error {
|
||||
// Remove the upload from the ID-mapping.
|
||||
delete(upload.parent.uploads, upload.filePath)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (upload *InPlaceUpload) ConcatUploads(ctx context.Context, uploads []tusd.Upload) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
func uid() (string, error) {
|
||||
id := make([]byte, uidLength)
|
||||
_, err := io.ReadFull(rand.Reader, id)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return hex.EncodeToString(id), err
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user