fix: disable parallel uploads and refactor

This commit is contained in:
Tobias Goerke 2023-07-27 08:22:15 +02:00
parent 676dd2fec5
commit cd7c518b14
8 changed files with 120 additions and 93 deletions

View File

@ -335,10 +335,9 @@ func quickSetup(flags *pflag.FlagSet, d pythonData) {
AuthMethod: "", AuthMethod: "",
Branding: settings.Branding{}, Branding: settings.Branding{},
Tus: settings.Tus{ Tus: settings.Tus{
Enabled: true, Enabled: true,
ChunkSize: settings.DefaultTusChunkSize, ChunkSize: settings.DefaultTusChunkSize,
ParallelUploads: settings.DefaultTusParallelUploads, RetryCount: settings.DefaultTusRetryCount,
RetryCount: settings.DefaultTusRetryCount,
}, },
Commands: nil, Commands: nil,
Shell: nil, Shell: nil,

View File

@ -22,7 +22,7 @@ export async function upload(url, content = "", overwrite = false, onupload) {
endpoint: tusEndpoint, endpoint: tusEndpoint,
chunkSize: tusSettings.chunkSize, chunkSize: tusSettings.chunkSize,
retryDelays: computeRetryDelays(tusSettings), retryDelays: computeRetryDelays(tusSettings),
parallelUploads: tusSettings.parallelUploads || 1, parallelUploads: 1,
headers: { headers: {
"X-Auth": store.state.jwt, "X-Auth": store.state.jwt,
// Send the metadata with every request // Send the metadata with every request

View File

@ -189,7 +189,6 @@
"tusUploadsHelp": "File Browser supports the tus.io protocol for resumable file uploads, allowing for the creation of efficient, reliable, resumable and chunked file uploads even on unreliable networks.", "tusUploadsHelp": "File Browser supports the tus.io protocol for resumable file uploads, allowing for the creation of efficient, reliable, resumable and chunked file uploads even on unreliable networks.",
"tusUploadsEnabled": "Enable chunked file uploads", "tusUploadsEnabled": "Enable chunked file uploads",
"tusUploadsChunkSize": "Indicates to maximum size of a request (direct uploads will be used for smaller uploads). You may input a plain integer denoting a bytes input or a string like 10MB, 1.00Ti etc.", "tusUploadsChunkSize": "Indicates to maximum size of a request (direct uploads will be used for smaller uploads). You may input a plain integer denoting a bytes input or a string like 10MB, 1.00Ti etc.",
"tusUploadsParallelUploads": "Number of parallel uploads",
"tusUploadsRetryCount": "Number of times to retry a failed upload (set to 0 to disable retries)", "tusUploadsRetryCount": "Number of times to retry a failed upload (set to 0 to disable retries)",
"userHomeBasePath": "Base path for user home directories", "userHomeBasePath": "Base path for user home directories",
"userScopeGenerationPlaceholder": "The scope will be auto generated", "userScopeGenerationPlaceholder": "The scope will be auto generated",

View File

@ -127,18 +127,6 @@
v-bind:disabled="!settings.tus.enabled" v-bind:disabled="!settings.tus.enabled"
/> />
<label for="tus-parallelUploads">{{
$t("settings.tusUploadsParallelUploads")
}}</label>
<input
class="input input--block"
type="number"
v-model.number="settings.tus.parallelUploads"
id="tus-parallelUploads"
v-bind:disabled="!settings.tus.enabled"
min="1"
/>
<label for="tus-retryCount">{{ <label for="tus-retryCount">{{
$t("settings.tusUploadsRetryCount") $t("settings.tusUploadsRetryCount")
}}</label> }}</label>

View File

@ -72,11 +72,6 @@ func (th *TusHandler) getOrCreateTusdHandler(d *data, r *http.Request) (_ *tusd.
func (th TusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (th TusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
code, err := withUser(func(w http.ResponseWriter, r *http.Request, d *data) (int, error) { code, err := withUser(func(w http.ResponseWriter, r *http.Request, d *data) (int, error) {
// Check if user has permission to create files
if !d.user.Perm.Create {
return http.StatusForbidden, nil
}
// Create a new tus handler for current user if it doesn't exist yet // Create a new tus handler for current user if it doesn't exist yet
tusdHandler, err := th.getOrCreateTusdHandler(d, r) tusdHandler, err := th.getOrCreateTusdHandler(d, r)
if err != nil { if err != nil {
@ -111,7 +106,7 @@ func (th TusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
} }
func (th TusHandler) createTusdHandler(d *data, basePath string) (*tusd.UnroutedHandler, error) { func (th TusHandler) createTusdHandler(d *data, basePath string) (*tusd.UnroutedHandler, error) {
tusStore := NewInPlaceDataStore(d.user.FullPath("/"), d.user.Perm.Modify) tusStore := NewInPlaceDataStore(d.user.FullPath("/"), d.user.Perm.Create, d.user.Perm.Modify)
composer := tusd.NewStoreComposer() composer := tusd.NewStoreComposer()
tusStore.UseIn(composer) tusStore.UseIn(composer)

View File

@ -1,11 +1,13 @@
// InPlaceDataStore is a storage backend for tusd, which stores the uploaded // InPlaceDataStore is a storage backend for tusd, which stores the uploaded
// files in the user's root directory. It features parallel and resumable uploads. // files in the user's root directory, without creating any auxiliary files.
// It only touches the target file, without creating any lock files or separate // It thus requires no clean-up on failed uploads.
// files for upload parts. It thus requires no clean-up on failed uploads. // The destination metadata field needs to be set in the upload request.
// It requires the destination metadata field to be set in the upload request.
// For each NewUpload, the target file is expanded by the upload's size. // For each NewUpload, the target file is expanded by the upload's size.
// This way, multiple uploads can work on the same file, without interfering // This way, multiple uploads can work on the same file, without interfering
// with each other. // with each other.
// The uploads are resumable. Also, parallel uploads are supported, however,
// the initial POST requests to NewUpload must be synchronized and in order.
// Otherwise, no guarantee of the upload's integrity can be given.
package http package http
@ -13,8 +15,9 @@ import (
"context" "context"
"crypto/rand" "crypto/rand"
"encoding/hex" "encoding/hex"
"fmt" "errors"
"io" "io"
"net/http"
"os" "os"
"path/filepath" "path/filepath"
"sync" "sync"
@ -30,13 +33,21 @@ type InPlaceDataStore struct {
// It equals the user's root directory. // It equals the user's root directory.
path string path string
// Store whether the user is permitted to create new files.
createPerm bool
// Store whether the user is permitted to modify files or only create new ones. // Store whether the user is permitted to modify files or only create new ones.
modifyPerm bool modifyPerm bool
// Maps an upload ID to its object. // Maps an upload ID to its object.
// Required, since GetUpload only provides us with the id of an upload // Required, since GetUpload only provides us with the id of an upload
// and expects us to return the Info object. // and expects us to return the Info object.
uploads map[string]*InPlaceUpload uploadsByID map[string]*InPlaceUpload
// Map all uploads by their path.
// Each path can have multiple uploads, as multiple uploads can work on the same file
// when parallel uploads are enabled.
uploadsByPath map[string][]*InPlaceUpload
// Each upload appends to the file, so we need to make sure // Each upload appends to the file, so we need to make sure
// each upload has expanded the file by info.Size bytes, before the next // each upload has expanded the file by info.Size bytes, before the next
@ -44,12 +55,14 @@ type InPlaceDataStore struct {
mutex *sync.Mutex mutex *sync.Mutex
} }
func NewInPlaceDataStore(path string, modifyPerm bool) *InPlaceDataStore { func NewInPlaceDataStore(path string, createPerm, modifyPerm bool) *InPlaceDataStore {
return &InPlaceDataStore{ return &InPlaceDataStore{
path: path, path: path,
modifyPerm: modifyPerm, createPerm: createPerm,
uploads: make(map[string]*InPlaceUpload), modifyPerm: modifyPerm,
mutex: &sync.Mutex{}, uploadsByID: make(map[string]*InPlaceUpload),
uploadsByPath: make(map[string][]*InPlaceUpload),
mutex: &sync.Mutex{},
} }
} }
@ -58,63 +71,79 @@ func (store *InPlaceDataStore) UseIn(composer *tusd.StoreComposer) {
composer.UseConcater(store) composer.UseConcater(store)
} }
func (store *InPlaceDataStore) cleanupOrphanedUploads(filePath string) error { func (store *InPlaceDataStore) isPartOfNewUpload(fileExists bool, filePath string) bool {
// If the file doesn't exist, remove all upload references. if !fileExists {
// This way we can eliminate inconsistencies for failed uploads. // If the file doesn't exist, remove all upload references.
if _, err := os.Stat(filePath); os.IsNotExist(err) { // This way we can eliminate inconsistencies for failed uploads.
for id, upload := range store.uploads { for _, upload := range store.uploadsByPath[filePath] {
if upload.filePath == filePath { delete(store.uploadsByID, upload.ID)
delete(store.uploads, id)
}
} }
} else { delete(store.uploadsByPath, filePath)
// If the file but no uploads exist for it,
// we need to remove the file to make sure we don't append to an existing file. return true
// This would lead to files with duplicate content. }
uploadExists := false
for _, upload := range store.uploads { // In case the file exists, it is still possible that it is a new upload.
if upload.filePath == filePath { // E.g.: the user wants to overwrite an existing file.
uploadExists = true return store.uploadsByPath[filePath] == nil
break }
}
} func (store *InPlaceDataStore) checkPermissions(isPartOfNewUpload bool) error {
if !uploadExists { // Return tusd.HTTPErrors, as they are handled by tusd.
if !store.modifyPerm { if isPartOfNewUpload {
// Gets interpreted as a 400 by tusd. if !store.createPerm {
// There is no way to return a 403, so a 400 is better than a 500. return tusd.NewHTTPError(errors.New("user is not allowed to create a new upload"), http.StatusForbidden)
return tusd.ErrUploadStoppedByServer
}
if err := os.Remove(filePath); err != nil {
return err
}
} }
} }
if !store.modifyPerm {
return tusd.NewHTTPError(errors.New("user is not allowed to modify existing files"), http.StatusForbidden)
}
return nil return nil
} }
func (store *InPlaceDataStore) initializeUpload(filePath string, info *tusd.FileInfo) (int64, error) { func (store *InPlaceDataStore) initializeUpload(filePath string, info *tusd.FileInfo) (int64, error) {
store.mutex.Lock() fileExists := true
defer store.mutex.Unlock() if _, err := os.Stat(filePath); os.IsNotExist(err) {
fileExists = false
if err := store.cleanupOrphanedUploads(filePath); err != nil { } else if err != nil {
return 0, err return 0, err
} }
// Create the file if it doesn't exist. // Delete existing files and references, if necessary.
file, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE, filePerm) isPartOfNewUpload := store.isPartOfNewUpload(fileExists, filePath)
if err := store.checkPermissions(isPartOfNewUpload); err != nil {
return 0, err
}
if isPartOfNewUpload && fileExists {
// Remove the file's contents (instead of re-creating it).
return 0, os.Truncate(filePath, 0)
}
if isPartOfNewUpload && !fileExists {
// Create the file, if it doesn't exist.
if _, err := os.Create(filePath); err != nil {
return 0, err
}
return 0, nil
}
// The file exists and is part of an existing upload.
// Open the file and enlarge it by the upload's size.
file, err := os.OpenFile(filePath, os.O_WRONLY, filePerm)
if err != nil { if err != nil {
return 0, err return 0, err
} }
defer file.Close() defer file.Close()
// Get the file's current size. // Get the file's current size and offset to the end of the file.
actualOffset, err := file.Seek(0, io.SeekEnd) actualOffset, err := file.Seek(0, io.SeekEnd)
if err != nil { if err != nil {
return 0, err return 0, err
} }
// Enlarge the file by the upload's size. // Enlarge the file by the upload's size (starting from the current offset).
_, err = file.Write(make([]byte, info.Size)) if _, err = file.Write(make([]byte, info.Size)); err != nil {
if err != nil {
return 0, err return 0, err
} }
@ -129,7 +158,7 @@ func (store *InPlaceDataStore) NewUpload(ctx context.Context, info tusd.FileInfo
destination, ok := info.MetaData["destination"] destination, ok := info.MetaData["destination"]
if !ok { if !ok {
return nil, fmt.Errorf("metadata field 'destination' not found in upload request") return nil, errors.New("metadata field 'destination' not found in upload request")
} }
filePath := filepath.Join(store.path, destination) filePath := filepath.Join(store.path, destination)
@ -139,23 +168,29 @@ func (store *InPlaceDataStore) NewUpload(ctx context.Context, info tusd.FileInfo
actualOffset: info.Size, actualOffset: info.Size,
parent: store, parent: store,
} }
// Lock the mutex, as we need to modify the target file synchronously.
store.mutex.Lock()
defer store.mutex.Unlock()
// Tus creates a POST request for the final concatenation. // Tus creates a POST request for the final concatenation.
// In that case, we don't need to create a new upload. // In that case, we don't need to create a new upload.
if !info.IsFinal { if !info.IsFinal {
if upload.actualOffset, err = store.initializeUpload(filePath, &info); err != nil { if upload.actualOffset, err = store.initializeUpload(filePath, &info); err != nil {
return nil, err return nil, err
} }
store.uploads[info.ID] = upload
} }
store.uploadsByID[upload.ID] = upload
store.uploadsByPath[upload.filePath] = append(store.uploadsByPath[upload.filePath], upload)
return upload, nil return upload, nil
} }
func (store *InPlaceDataStore) GetUpload(ctx context.Context, id string) (tusd.Upload, error) { func (store *InPlaceDataStore) GetUpload(ctx context.Context, id string) (tusd.Upload, error) {
if upload, ok := store.uploads[id]; ok { if upload, ok := store.uploadsByID[id]; ok {
return upload, nil return upload, nil
} else { } else {
return nil, fmt.Errorf("upload not found") return nil, errors.New("upload not found")
} }
} }
@ -186,8 +221,7 @@ func (upload *InPlaceUpload) WriteChunk(ctx context.Context, offset int64, src i
} }
defer file.Close() defer file.Close()
_, err = file.Seek(upload.actualOffset+offset, io.SeekStart) if _, err = file.Seek(upload.actualOffset+offset, io.SeekStart); err != nil {
if err != nil {
return 0, err return 0, err
} }
@ -209,16 +243,31 @@ func (upload *InPlaceUpload) GetReader(ctx context.Context) (io.Reader, error) {
} }
func (upload *InPlaceUpload) FinishUpload(ctx context.Context) error { func (upload *InPlaceUpload) FinishUpload(ctx context.Context) error {
upload.parent.mutex.Lock()
defer upload.parent.mutex.Unlock()
delete(upload.parent.uploadsByID, upload.ID)
uploadsByPath := upload.parent.uploadsByPath[upload.filePath]
for i, u := range uploadsByPath {
if u.ID == upload.ID {
upload.parent.uploadsByPath[upload.filePath] = append(uploadsByPath[:i], uploadsByPath[i+1:]...)
break
}
}
if len(upload.parent.uploadsByPath[upload.filePath]) == 0 {
delete(upload.parent.uploadsByPath, upload.filePath)
}
return nil return nil
} }
func (upload *InPlaceUpload) ConcatUploads(ctx context.Context, uploads []tusd.Upload) (err error) { func (upload *InPlaceUpload) ConcatUploads(ctx context.Context, uploads []tusd.Upload) (err error) {
parent := upload.parent
for _, u := range uploads { for _, u := range uploads {
delete(parent.uploads, (u.(*InPlaceUpload)).ID) if err := (u.(*InPlaceUpload)).FinishUpload(ctx); err != nil {
return err
}
} }
delete(parent.uploads, upload.ID) return upload.FinishUpload(ctx)
return nil
} }
func uid() (string, error) { func uid() (string, error) {

View File

@ -35,10 +35,9 @@ func (s *Storage) Get() (*Settings, error) {
} }
if set.Tus == (Tus{}) { if set.Tus == (Tus{}) {
set.Tus = Tus{ set.Tus = Tus{
Enabled: false, Enabled: false,
ChunkSize: DefaultTusChunkSize, ChunkSize: DefaultTusChunkSize,
ParallelUploads: DefaultTusParallelUploads, RetryCount: DefaultTusRetryCount,
RetryCount: DefaultTusRetryCount,
} }
} }
return set, nil return set, nil

View File

@ -1,13 +1,11 @@
package settings package settings
const DefaultTusChunkSize = 20 * 1024 * 1024 // 20MB const DefaultTusChunkSize = 5 * 1024 * 1024 // 20MB
const DefaultTusParallelUploads = 3
const DefaultTusRetryCount = 3 const DefaultTusRetryCount = 3
// Tus contains the tus.io settings of the app. // Tus contains the tus.io settings of the app.
type Tus struct { type Tus struct {
Enabled bool `json:"enabled"` Enabled bool `json:"enabled"`
ChunkSize uint64 `json:"chunkSize"` ChunkSize uint64 `json:"chunkSize"`
ParallelUploads uint8 `json:"parallelUploads"` RetryCount uint16 `json:"retryCount"`
RetryCount uint16 `json:"retryCount"`
} }