You've already forked openaccounting-server
forked from cybercinch/openaccounting-server
- Add JWT-based secure file access for local storage with 1-hour expiry - Implement GORM repository methods for attachment CRUD operations - Add secure file serving endpoint with token validation - Update storage interface to support user context in URL generation - Add comprehensive security features including path traversal protection - Update documentation with security model and configuration examples - Add utility functions for hex/byte conversion and UUID validation - Configure secure file permissions (0600) for uploaded files 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
205 lines
5.3 KiB
Go
205 lines
5.3 KiB
Go
package storage
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
)
|
|
|
|
func TestLocalStorage(t *testing.T) {
|
|
// Create temporary directory for testing
|
|
tmpDir := t.TempDir()
|
|
|
|
config := LocalConfig{
|
|
RootDir: tmpDir,
|
|
BaseURL: "http://localhost:8080/files",
|
|
}
|
|
|
|
storage, err := NewLocalStorage(config)
|
|
assert.NoError(t, err)
|
|
assert.NotNil(t, storage)
|
|
|
|
t.Run("Store and Retrieve File", func(t *testing.T) {
|
|
content := []byte("test file content")
|
|
reader := bytes.NewReader(content)
|
|
|
|
// Store file
|
|
path, err := storage.Store("test.txt", reader, "text/plain")
|
|
assert.NoError(t, err)
|
|
assert.NotEmpty(t, path)
|
|
|
|
// Verify file exists
|
|
exists, err := storage.Exists(path)
|
|
assert.NoError(t, err)
|
|
assert.True(t, exists)
|
|
|
|
// Retrieve file
|
|
retrievedReader, err := storage.Retrieve(path)
|
|
assert.NoError(t, err)
|
|
defer retrievedReader.Close()
|
|
|
|
retrievedContent, err := io.ReadAll(retrievedReader)
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, content, retrievedContent)
|
|
})
|
|
|
|
t.Run("Get File Metadata", func(t *testing.T) {
|
|
content := []byte("metadata test content")
|
|
reader := bytes.NewReader(content)
|
|
|
|
path, err := storage.Store("metadata.txt", reader, "text/plain")
|
|
assert.NoError(t, err)
|
|
|
|
metadata, err := storage.GetMetadata(path)
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, int64(len(content)), metadata.Size)
|
|
assert.False(t, metadata.LastModified.IsZero())
|
|
})
|
|
|
|
t.Run("Get File URL", func(t *testing.T) {
|
|
content := []byte("url test content")
|
|
reader := bytes.NewReader(content)
|
|
|
|
path, err := storage.Store("url.txt", reader, "text/plain")
|
|
assert.NoError(t, err)
|
|
|
|
url, err := storage.GetURL(path, time.Hour)
|
|
assert.NoError(t, err)
|
|
// New JWT token-based URLs should start with /secure-files and contain a token parameter
|
|
assert.Contains(t, url, "/secure-files")
|
|
assert.Contains(t, url, "token=")
|
|
// The token should be a JWT (contains dots for header.payload.signature)
|
|
assert.Contains(t, url, ".")
|
|
})
|
|
|
|
t.Run("Delete File", func(t *testing.T) {
|
|
content := []byte("delete test content")
|
|
reader := bytes.NewReader(content)
|
|
|
|
path, err := storage.Store("delete.txt", reader, "text/plain")
|
|
assert.NoError(t, err)
|
|
|
|
// Verify file exists
|
|
exists, err := storage.Exists(path)
|
|
assert.NoError(t, err)
|
|
assert.True(t, exists)
|
|
|
|
// Delete file
|
|
err = storage.Delete(path)
|
|
assert.NoError(t, err)
|
|
|
|
// Verify file no longer exists
|
|
exists, err = storage.Exists(path)
|
|
assert.NoError(t, err)
|
|
assert.False(t, exists)
|
|
})
|
|
|
|
t.Run("Path Validation", func(t *testing.T) {
|
|
// Test directory traversal prevention
|
|
_, err := storage.Retrieve("../../../etc/passwd")
|
|
assert.Error(t, err)
|
|
assert.IsType(t, &InvalidPathError{}, err)
|
|
|
|
// Test absolute path rejection
|
|
_, err = storage.Retrieve("/etc/passwd")
|
|
assert.Error(t, err)
|
|
assert.IsType(t, &InvalidPathError{}, err)
|
|
})
|
|
|
|
t.Run("File Not Found", func(t *testing.T) {
|
|
_, err := storage.Retrieve("nonexistent.txt")
|
|
assert.Error(t, err)
|
|
assert.IsType(t, &FileNotFoundError{}, err)
|
|
|
|
_, err = storage.GetMetadata("nonexistent.txt")
|
|
assert.Error(t, err)
|
|
assert.IsType(t, &FileNotFoundError{}, err)
|
|
|
|
_, err = storage.GetURL("nonexistent.txt", time.Hour)
|
|
assert.Error(t, err)
|
|
assert.IsType(t, &FileNotFoundError{}, err)
|
|
})
|
|
|
|
t.Run("Storage Path Generation", func(t *testing.T) {
|
|
content := []byte("path test content")
|
|
reader1 := bytes.NewReader(content)
|
|
reader2 := bytes.NewReader(content)
|
|
|
|
// Store two files with same name
|
|
path1, err := storage.Store("same.txt", reader1, "text/plain")
|
|
assert.NoError(t, err)
|
|
|
|
path2, err := storage.Store("same.txt", reader2, "text/plain")
|
|
assert.NoError(t, err)
|
|
|
|
// Paths should be different (unique)
|
|
assert.NotEqual(t, path1, path2)
|
|
|
|
// Both should exist
|
|
exists1, err := storage.Exists(path1)
|
|
assert.NoError(t, err)
|
|
assert.True(t, exists1)
|
|
|
|
exists2, err := storage.Exists(path2)
|
|
assert.NoError(t, err)
|
|
assert.True(t, exists2)
|
|
|
|
// Both should have correct extension
|
|
assert.True(t, strings.HasSuffix(path1, ".txt"))
|
|
assert.True(t, strings.HasSuffix(path2, ".txt"))
|
|
|
|
// Should be organized by date
|
|
now := time.Now()
|
|
expectedPrefix := filepath.Join(
|
|
fmt.Sprintf("%04d", now.Year()),
|
|
fmt.Sprintf("%02d", now.Month()),
|
|
fmt.Sprintf("%02d", now.Day()),
|
|
)
|
|
assert.True(t, strings.HasPrefix(path1, expectedPrefix))
|
|
assert.True(t, strings.HasPrefix(path2, expectedPrefix))
|
|
})
|
|
}
|
|
|
|
func TestLocalStorageConfig(t *testing.T) {
|
|
t.Run("Default Root Directory", func(t *testing.T) {
|
|
config := LocalConfig{} // Empty config
|
|
|
|
storage, err := NewLocalStorage(config)
|
|
assert.NoError(t, err)
|
|
assert.NotNil(t, storage)
|
|
|
|
// Should create default uploads directory
|
|
assert.Equal(t, "./uploads", storage.rootDir)
|
|
|
|
// Verify directory was created
|
|
_, err = os.Stat("./uploads")
|
|
assert.NoError(t, err)
|
|
|
|
// Clean up
|
|
os.RemoveAll("./uploads")
|
|
})
|
|
|
|
t.Run("Custom Root Directory", func(t *testing.T) {
|
|
tmpDir := t.TempDir()
|
|
customDir := filepath.Join(tmpDir, "custom", "storage")
|
|
|
|
config := LocalConfig{
|
|
RootDir: customDir,
|
|
}
|
|
|
|
storage, err := NewLocalStorage(config)
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, customDir, storage.rootDir)
|
|
|
|
// Verify custom directory was created
|
|
_, err = os.Stat(customDir)
|
|
assert.NoError(t, err)
|
|
})
|
|
} |