feat(downloader): add download status tracking and error handling
This commit is contained in:
parent
b44b95c9f6
commit
27f0fd9ccb
@ -1,6 +1,7 @@
|
|||||||
package http
|
package http
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/filebrowser/filebrowser/v2/files"
|
"github.com/filebrowser/filebrowser/v2/files"
|
||||||
@ -22,6 +23,13 @@ type DownloadTask struct {
|
|||||||
totalSize int64
|
totalSize int64
|
||||||
savedSize int64
|
savedSize int64
|
||||||
cache *cache.Cache
|
cache *cache.Cache
|
||||||
|
cancel context.CancelFunc
|
||||||
|
status string
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *DownloadTask) SaveCache() {
|
||||||
|
d.cache.Set(d.TaskID.String(), d, cache.NoExpiration)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *DownloadTask) Progress() float64 {
|
func (d *DownloadTask) Progress() float64 {
|
||||||
@ -31,6 +39,14 @@ func (d *DownloadTask) Progress() float64 {
|
|||||||
return float64(d.savedSize) / float64(d.totalSize)
|
return float64(d.savedSize) / float64(d.totalSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *DownloadTask) ResolveErr(err error) {
|
||||||
|
if err != nil {
|
||||||
|
d.status = "error"
|
||||||
|
d.err = err
|
||||||
|
d.SaveCache()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func NewDownloadTask(url, filename, pathname string, downloaderCache *cache.Cache) *DownloadTask {
|
func NewDownloadTask(url, filename, pathname string, downloaderCache *cache.Cache) *DownloadTask {
|
||||||
taskId := uuid.New()
|
taskId := uuid.New()
|
||||||
downloadTask := &DownloadTask{
|
downloadTask := &DownloadTask{
|
||||||
@ -43,17 +59,6 @@ func NewDownloadTask(url, filename, pathname string, downloaderCache *cache.Cach
|
|||||||
return downloadTask
|
return downloadTask
|
||||||
}
|
}
|
||||||
|
|
||||||
type WriteCounter struct {
|
|
||||||
task *DownloadTask
|
|
||||||
}
|
|
||||||
|
|
||||||
func (wc *WriteCounter) Write(p []byte) (int, error) {
|
|
||||||
n := len(p)
|
|
||||||
wc.task.savedSize += int64(n)
|
|
||||||
wc.task.cache.Set(wc.task.TaskID.String(), wc.task, cache.NoExpiration)
|
|
||||||
return n, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func downloadHandler(downloaderCache *cache.Cache) handleFunc {
|
func downloadHandler(downloaderCache *cache.Cache) handleFunc {
|
||||||
return withUser(func(w http.ResponseWriter, r *http.Request, d *data) (int, error) {
|
return withUser(func(w http.ResponseWriter, r *http.Request, d *data) (int, error) {
|
||||||
if !d.user.Perm.Create || !d.Check(r.URL.Path) {
|
if !d.user.Perm.Create || !d.Check(r.URL.Path) {
|
||||||
@ -102,6 +107,8 @@ func downloadStatusHandler(downloaderCache *cache.Cache) handleFunc {
|
|||||||
"pathname": taskCache.Pathname,
|
"pathname": taskCache.Pathname,
|
||||||
"url": taskCache.URL,
|
"url": taskCache.URL,
|
||||||
"taskID": taskCache.TaskID.String(),
|
"taskID": taskCache.TaskID.String(),
|
||||||
|
"status": taskCache.status,
|
||||||
|
"error": taskCache.err,
|
||||||
}
|
}
|
||||||
w.Header().Set("Content-Type", "application/json")
|
w.Header().Set("Content-Type", "application/json")
|
||||||
err := json.NewEncoder(w).Encode(&responseBody)
|
err := json.NewEncoder(w).Encode(&responseBody)
|
||||||
@ -113,30 +120,67 @@ func downloadStatusHandler(downloaderCache *cache.Cache) handleFunc {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func downloadWithTask(fs afero.Fs, task *DownloadTask) error {
|
func downloadWithTask(fs afero.Fs, task *DownloadTask) error {
|
||||||
task.cache.Set(task.TaskID.String(), task, cache.NoExpiration)
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
task.cancel = cancel
|
||||||
|
task.status = "downloading"
|
||||||
|
task.SaveCache()
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
err := fs.MkdirAll(task.Pathname, files.PermDir)
|
err := fs.MkdirAll(task.Pathname, files.PermDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
task.ResolveErr(err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
file, err := fs.OpenFile(path.Join(task.Pathname, task.Filename), os.O_RDWR|os.O_CREATE|os.O_TRUNC, files.PermFile)
|
file, err := fs.OpenFile(path.Join(task.Pathname, task.Filename), os.O_RDWR|os.O_CREATE|os.O_TRUNC, files.PermFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
task.ResolveErr(err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer file.Close()
|
defer file.Close()
|
||||||
resp, err := http.Get(task.URL)
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, task.URL, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
task.ResolveErr(err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
resp, err := http.DefaultClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
task.ResolveErr(err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
task.totalSize = resp.ContentLength
|
task.totalSize = resp.ContentLength
|
||||||
|
buf := make([]byte, 32*1024)
|
||||||
_, err = io.Copy(file, io.TeeReader(resp.Body, &WriteCounter{task: task}))
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
task.status = "canceled"
|
||||||
|
task.SaveCache()
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
|
rn, err := resp.Body.Read(buf)
|
||||||
|
if err == io.EOF {
|
||||||
|
task.status = "completed"
|
||||||
|
task.SaveCache()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
task.ResolveErr(err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if rn > 0 {
|
||||||
|
wn, err := file.Write(buf[:rn])
|
||||||
|
if err != nil {
|
||||||
|
task.ResolveErr(err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
task.savedSize += int64(wn)
|
||||||
|
task.SaveCache()
|
||||||
|
|
||||||
return nil
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func asyncDownloadWithTask(fs afero.Fs, task *DownloadTask) {
|
func asyncDownloadWithTask(fs afero.Fs, task *DownloadTask) {
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user