diff --git a/internal/git/housekeeping/offloading_storage_gcp.go b/internal/git/housekeeping/offloading_storage_gcp.go new file mode 100644 index 0000000000000000000000000000000000000000..3407eb6302e180466ab8b631c2e7a8abc92f2d60 --- /dev/null +++ b/internal/git/housekeeping/offloading_storage_gcp.go @@ -0,0 +1,189 @@ +package housekeeping + +import ( + "context" + "errors" + "fmt" + "io" + "os" + "path/filepath" + "sync" + "time" + + gs "cloud.google.com/go/storage" + "github.com/googleapis/gax-go/v2" + "google.golang.org/api/iterator" +) + +// OffloadingStorageClient defines the interface for operations required to interact with offloading storage. +// This interface can be extended to support additional types of storage systems as needed. +type OffloadingStorageClient interface { + Upload(ctx context.Context, path string, bucketPrefix string) error + TryDelete(ctx context.Context, bucketPrefix string, objects []string) map[string]error + HasObjects(ctx context.Context, bucketPrefix string) (bool, error) +} + +// OffloadingStorageGcpClient is the implementation of OffloadingStorageClient for GCP. +type OffloadingStorageGcpClient struct { + bucketClient *gs.BucketHandle +} + +// NewOffloadingStorageGcpClient initializes and returns a new GCP bucket client +// to facilitate bucket operations such as uploading, listing, and deleting objects. +func NewOffloadingStorageGcpClient(client *gs.Client, bucket string) *OffloadingStorageGcpClient { + bucketClient := client.Bucket(bucket) + return &OffloadingStorageGcpClient{ + bucketClient: bucketClient, + } +} + +// Upload uploads a file to the GCP bucket under the specified bucketPrefix. +// The source file is located on the local file system at the provided path. +func (r *OffloadingStorageGcpClient) Upload(ctx context.Context, path string, bucketPrefix string) error { + // Use context timeouts to set an overall deadline for the operation, including all potential retries. + // Having a cancelable context is critical here because the retry mechanism does not have a maximum + // retry count and will continue retrying until the context is explicitly canceled. + ctx, cancel := context.WithTimeout(ctx, 60*time.Second) + defer cancel() + + file, err := os.Open(path) + if err != nil { + return fmt.Errorf("open file: %w", err) + } + objectPath := fmt.Sprintf("%s/%s", bucketPrefix, filepath.Base(path)) + + o := r.bucketClient.Object(objectPath).Retryer( + // Use WithBackoff to control the timing of the exponential backoff. + gs.WithBackoff(gax.Backoff{ + Max: 3 * time.Second, + }), + // We use preconditions when uploading objects to prevent race conditions. + // This approach is categorized as "Conditionally idempotent" based on the guidelines at: + // https://cloud.google.com/storage/docs/retry-strategy#idempotency-operations + gs.WithPolicy(gs.RetryIdempotent), + ) + + // Preconditions are used here to prevent race conditions. + // We want to make sure that no object with the same name is being uploaded concurrently. + o = o.If(gs.Conditions{DoesNotExist: true}) + wc := o.NewWriter(ctx) + _, err = io.Copy(wc, file) + if err != nil { + return fmt.Errorf("copy local file to object writer: %w", err) + } + if err := wc.Close(); err != nil { + return fmt.Errorf("close object writer: %w", err) + } + if err := file.Close(); err != nil { + return fmt.Errorf("close local file writer: %w", err) + } + return nil +} + +// TryDelete attempts to delete given objects within the specified bucketPrefix. +// It employs a retry mechanism to ensure objects are deleted with best-effort. +func (r *OffloadingStorageGcpClient) TryDelete(ctx context.Context, bucketPrefix string, objects []string) map[string]error { + res := make(map[string]error) + if len(objects) == 0 { + return res + } + + // Use context timeouts to set an overall deadline for the operation, including all potential retries. + // Having a cancelable context is critical here because the retry mechanism does not have a maximum + // retry count and will continue retrying until the context is explicitly canceled. + ctx, cancel := context.WithTimeout(ctx, 60*time.Second) + defer cancel() + wg := sync.WaitGroup{} + type deleteResult struct { + object string + err error + } + resCh := make(chan deleteResult, len(objects)) + + // In the context of inactive repo offloading, the number of objects are expected to be small. + // Therefore, we can afford to use a goroutine for each object. + for _, object := range objects { + wg.Add(1) + objectPath := fmt.Sprintf("%s/%s", bucketPrefix, filepath.Base(object)) + go func(obj string) { + defer wg.Done() + o := r.bucketClient.Object(obj).Retryer( + // Use WithBackoff to control the timing of the exponential backoff. + gs.WithBackoff(gax.Backoff{ + Max: 5 * time.Second, + }), + // Delete an object with ifGenerationMatch is categorized as "Conditionally idempotent" + // based on the guidelines at: https://cloud.google.com/storage/docs/retry-strategy#idempotency-operations + gs.WithPolicy(gs.RetryIdempotent), + ) + + // Set a generation-match precondition to avoid potential race + // conditions and data corruptions. The request to delete the file is aborted + // if the object's generation number does not match your precondition. + attrs, err := o.Attrs(ctx) + if err != nil { + resCh <- deleteResult{object: obj, err: err} + return + } + o = o.If(gs.Conditions{GenerationMatch: attrs.Generation}) + err = o.Delete(ctx) + resCh <- deleteResult{object: obj, err: err} + }(objectPath) + } + + wg.Wait() + close(resCh) + for delRes := range resCh { + res[delRes.object] = delRes.err + } + + return res +} + +// HasObjects checks whether the GCP bucket contains any objects that match the specified bucketPrefix. +func (r *OffloadingStorageGcpClient) HasObjects(ctx context.Context, bucketPrefix string) (bool, error) { + q := gs.Query{Prefix: bucketPrefix + "/", Delimiter: "/"} + it := r.bucketClient.Objects(ctx, &q) + + for { + attrs, err := it.Next() + if errors.Is(err, iterator.Done) { + return false, nil + } + + if err != nil { + return false, fmt.Errorf("list object on GCP bucket: %w", err) + } + + // Exclude the bucketPrefix "folder" itself + if attrs != nil && attrs.Name == bucketPrefix+"/" { + continue + } + + return true, nil + } +} + +// List lists all objects that match the specified bucketPrefix. +func (r *OffloadingStorageGcpClient) List(ctx context.Context, bucketPrefix string) ([]string, error) { + q := gs.Query{Prefix: bucketPrefix + "/", Delimiter: "/"} + it := r.bucketClient.Objects(ctx, &q) + objects := make([]string, 0) + for { + attrs, err := it.Next() + if errors.Is(err, iterator.Done) { + return objects, nil + } + + if err != nil { + return []string{}, fmt.Errorf("list object on GCP bucket: %w", err) + } + + // Exclude the bucketPrefix "folder" itself + if attrs != nil && attrs.Name == bucketPrefix+"/" { + continue + } + + objects = append(objects, filepath.Base(attrs.Name)) + } +} diff --git a/internal/git/housekeeping/offloading_storage_gcp_test.go b/internal/git/housekeeping/offloading_storage_gcp_test.go new file mode 100644 index 0000000000000000000000000000000000000000..2d1ed488e596a8c7eb0e40ccf64be26f47fab742 --- /dev/null +++ b/internal/git/housekeeping/offloading_storage_gcp_test.go @@ -0,0 +1,320 @@ +package housekeeping + +import ( + "context" + "fmt" + "os" + "path/filepath" + "sync" + "testing" + + gs "cloud.google.com/go/storage" + "github.com/fullstorydev/emulators/storage/gcsemu" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" +) + +func TestOffloadingStorageGcpClient_Upload(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + + client, svr := setupGcsEmulator(t, ctx) + defer teardownGcsEmulator(t, client, svr) + + bucket := "test-bucket" + offloadingClient := NewOffloadingStorageGcpClient(client, bucket) + + for _, tc := range []struct { + desc string + + // filesToUpload is a list of files to upload + filesToUpload []string + + // filesOnLocal is a list of files actually on the local filesystem + filesOnLocal []string + expectedErrMsgs []string + }{ + { + desc: "Upload objects", + filesToUpload: []string{"obj1.txt", "obj2.txt", "obj3.txt"}, + filesOnLocal: []string{"obj1.txt", "obj2.txt", "obj3.txt"}, + // expect no error + }, + { + desc: "Upload objects but one is missing on local", + filesToUpload: []string{"obj1.txt", "obj2.txt", "obj3.txt"}, + filesOnLocal: []string{"obj1.txt", "obj2.txt"}, + expectedErrMsgs: []string{ + "", + "", + "open file: open %s/obj3.txt: no such file or directory", + }, + }, + { + desc: "No object to upload", + filesToUpload: []string{}, + filesOnLocal: []string{}, + // expect no error + }, + } { + t.Run(tc.desc, func(t *testing.T) { + dir := testhelper.TempDir(t) + + prefix := "test-upload" + testhelper.TempDir(t) + for _, f := range tc.filesOnLocal { + err := os.WriteFile(filepath.Join(dir, f), []byte("hello world"), 0o666) + require.NoError(t, err) + } + + errCh := make(chan error) + var wg sync.WaitGroup + for _, f := range tc.filesToUpload { + wg.Add(1) + go func(file string) { + defer wg.Done() + errCh <- offloadingClient.Upload(ctx, filepath.Join(dir, file), prefix) + }(f) + } + + // Close the error channel once all goroutines are done + go func() { + wg.Wait() + close(errCh) // Close the channel after all goroutines finish + }() + + // Process errors from the channel + var actualErrMsgs []string + if len(tc.expectedErrMsgs) == 0 { + for err := range errCh { + require.NoError(t, err) // Handle any errors + } + // Validate uploaded files + uploadedFiles, _ := offloadingClient.List(ctx, prefix) + require.ElementsMatch(t, tc.filesToUpload, uploadedFiles) + } else { + + // prepare error messages + + for i, f := range tc.expectedErrMsgs { + if f != "" { + tc.expectedErrMsgs[i] = fmt.Sprintf(f, dir) + } + } + + for err := range errCh { + if err == nil { + actualErrMsgs = append(actualErrMsgs, "") + } else { + actualErrMsgs = append(actualErrMsgs, err.Error()) + } + } + require.ElementsMatch(t, tc.expectedErrMsgs, actualErrMsgs) + } + }) + } +} + +func TestOffloadingStorageGcpClient_Delete(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + + client, svr := setupGcsEmulator(t, ctx) + defer teardownGcsEmulator(t, client, svr) + + bucket := "test-bucket" + offloadingClient := NewOffloadingStorageGcpClient(client, bucket) + + t.Run("Delete objects", func(t *testing.T) { + prefix := "test-delete" + testhelper.TempDir(t) + + filesToUpload := []string{"obj1.txt", "obj2.txt", "obj3.txt"} + createObjectsInOffloadingStorage(t, ctx, offloadingClient, prefix, filesToUpload) + + offloadingClient.TryDelete(ctx, prefix, filesToUpload) + + // Validate uploaded files are deleted + uploadedFiles, _ := offloadingClient.List(ctx, prefix) + require.Empty(t, uploadedFiles) + }) + + t.Run("Delete empty list", func(t *testing.T) { + prefix := "test-delete" + testhelper.TempDir(t) + + filesToUpload := []string{"obj1.txt", "obj2.txt", "obj3.txt"} + createObjectsInOffloadingStorage(t, ctx, offloadingClient, prefix, filesToUpload) + + res := offloadingClient.TryDelete(ctx, prefix, []string{}) + require.Empty(t, res) + + // Validate uploaded files are deleted + uploadedFiles, _ := offloadingClient.List(ctx, prefix) + require.ElementsMatch(t, filesToUpload, uploadedFiles) + }) + + t.Run("Delete objects", func(t *testing.T) { + prefix := "test-delete" + testhelper.TempDir(t) + + filesToUpload := []string{"obj1.txt", "obj2.txt"} + createObjectsInOffloadingStorage(t, ctx, offloadingClient, prefix, filesToUpload) + + res := offloadingClient.TryDelete(ctx, prefix, []string{"obj1.txt", "obj2.txt", "obj3.txt"}) + expectedRes := map[string]error{ + prefix + "/obj1.txt": nil, + prefix + "/obj2.txt": nil, + prefix + "/obj3.txt": gs.ErrObjectNotExist, + } + require.Equal(t, expectedRes, res) + // Validate uploaded files are deleted + uploadedFiles, _ := offloadingClient.List(ctx, prefix) + require.Empty(t, uploadedFiles) + }) +} + +func TestOffloadingStorageGcpClient_List(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + + client, svr := setupGcsEmulator(t, ctx) + defer teardownGcsEmulator(t, client, svr) + bucket := "test-bucket" + offloadingClient := NewOffloadingStorageGcpClient(client, bucket) + + // Create a placeholder object first. This is because the emulator behaves + // differently from a real GCP bucket. It requires storing something to ensure + // the bucket's existence. Without this placeholder, operations like "No existing prefix" + // and "Empty prefix" may fail with an error indicating the bucket does not exist. + // In contrast, a real GCP bucket would return no error and simply provide an empty result. + // To ensure the test behaves consistently with a real GCP environment, we add + // placeholder data here to confirm the bucket's existence. + prefix := "test-list-to-create-bucket" + filesToUpload := []string{"i-am-just-a-place-holder.txt"} + createObjectsInOffloadingStorage(t, ctx, offloadingClient, prefix, filesToUpload) + + t.Run("List existing objects", func(t *testing.T) { + prefix := "test-list-create-something" + + filesToUpload := []string{"obj1.txt", "obj2.txt", "obj3.txt"} + createObjectsInOffloadingStorage(t, ctx, offloadingClient, prefix, filesToUpload) + + // Validate uploaded files + uploadedFiles, _ := offloadingClient.List(ctx, prefix) + require.ElementsMatch(t, filesToUpload, uploadedFiles) + }) + t.Run("List empty folder", func(t *testing.T) { + // must end with "/" to be a folder + prefix := "test-list-empty-folder/" + + contentType := `inode/directory` + obj := offloadingClient.bucketClient.Object(prefix) + writer := obj.NewWriter(ctx) + writer.ObjectAttrs.ContentType = contentType + _, err := writer.Write(nil) + require.NoError(t, err) + err = writer.Close() + require.NoError(t, err) + + // Validate uploaded files + uploadedFiles, err := offloadingClient.List(ctx, prefix) + require.NoError(t, err) + require.Empty(t, uploadedFiles) + }) + + t.Run("Empty prefix", func(t *testing.T) { + prefix := "" + res, err := offloadingClient.List(ctx, prefix) + require.Empty(t, res) + require.NoError(t, err) + }) + t.Run("No existing prefix", func(t *testing.T) { + prefix := "i/dont/existing" + res, err := offloadingClient.List(ctx, prefix) + require.Empty(t, res) + require.NoError(t, err) + }) +} + +func TestOffloadingStorageGcpClient_HasObjects(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + + client, svr := setupGcsEmulator(t, ctx) + defer teardownGcsEmulator(t, client, svr) + + bucket := "test-bucket" + offloadingClient := NewOffloadingStorageGcpClient(client, bucket) + + t.Run("Existing objects", func(t *testing.T) { + prefix := "test-has-objects" + filesToUpload := []string{"obj1.txt", "obj2.txt", "obj3.txt"} + createObjectsInOffloadingStorage(t, ctx, offloadingClient, prefix, filesToUpload) + // Validate uploaded files + hasObjects, err := offloadingClient.HasObjects(ctx, prefix) + require.True(t, hasObjects) + require.NoError(t, err) + }) + t.Run("Empty folder", func(t *testing.T) { + prefix := "test-has-objects-empty-folder" + + contentType := `inode/directory` + obj := offloadingClient.bucketClient.Object(prefix) + writer := obj.NewWriter(ctx) + writer.ObjectAttrs.ContentType = contentType + _, err := writer.Write(nil) + require.NoError(t, err) + err = writer.Close() + require.NoError(t, err) + + // Validate uploaded files + hasObjects, err := offloadingClient.HasObjects(ctx, prefix) + require.False(t, hasObjects) + require.NoError(t, err) + }) +} + +// setupGcsEmulator sets up an in-memory Google bucket emulator +func setupGcsEmulator(t *testing.T, ctx context.Context) (*gs.Client, *gcsemu.Server) { + svr, err := gcsemu.NewServer("localhost:0", gcsemu.Options{}) + require.NoError(t, err) + + // gcsemu.NewClient will look at this env var to figure out what host/port to talk to + err = os.Setenv("GCS_EMULATOR_HOST", svr.Addr) //nolint + require.NoError(t, err) + + client, err := gcsemu.NewClient(ctx) + require.NoError(t, err) + return client, svr +} + +// createObjectsInOffloadingStorage creates objects in the offloading storage with the given prefix. +func createObjectsInOffloadingStorage(t *testing.T, ctx context.Context, client OffloadingStorageClient, prefix string, objects []string) { + dir := testhelper.TempDir(t) + errCh := make(chan error) + var wg sync.WaitGroup + for _, f := range objects { + wg.Add(1) + go func(file string) { + defer wg.Done() + err := os.WriteFile(filepath.Join(dir, file), []byte("hello world from "+f), 0o666) + require.NoError(t, err) + errCh <- client.Upload(ctx, filepath.Join(dir, file), prefix) + }(f) + } + + // Close the error channel once all goroutines are done + go func() { + wg.Wait() + close(errCh) // Close the channel after all goroutines finish + }() + + // Process errors from the channel + for err := range errCh { + require.NoError(t, err) // Handle any errors + } +} + +// teardownGcsEmulator close in-memory Google bucket emulator and its client +func teardownGcsEmulator(t *testing.T, client *gs.Client, svr *gcsemu.Server) { + err := client.Close() + require.NoError(t, err) + svr.Close() +} diff --git a/internal/offloading/simulation_bucket.go b/internal/offloading/simulation_bucket.go new file mode 100644 index 0000000000000000000000000000000000000000..ce8f7c1c2b08d9c7f51f84e5c9cfc6fb71ed4d76 --- /dev/null +++ b/internal/offloading/simulation_bucket.go @@ -0,0 +1,200 @@ +package offloading + +import ( + "context" + "errors" + "gocloud.dev/blob" + "io" + "sync" + + "time" +) + +// Mock bucket drive based on fileblob, +// We intercepet upload, download and delete operations with delay and error + +var ( + SimulationErrorUploadCanceled = errors.New("upload canceled") + SimulationErrorDownloadCanceled = errors.New("download canceled") + SimulationErrorDeleteCanceled = errors.New("delete canceled") + SimulationErrorListCanceled = errors.New("list canceled") +) + +type SimulationBucket struct { + SimulationOn bool + Simulation map[string][]Simulation + + // We can intercept iterator + ListOperationCtx context.Context + ListOperationErr error + ListCurrentSimulationRound int + itSimulation []Simulation + + retryStat map[string]int + mu sync.Mutex + *blob.Bucket +} + +type Simulation struct { + + // each element is the N-th retry's simulated duration + Delay time.Duration + Err error +} + +// Client use this https://github.com/google/go-cloud/blob/9e81e8d4f2a83fc70eb74643da0485273ab2480d/blob/blob.go#L688 +// to create a mock bucket + +func NewSimulationBucket(bucket *blob.Bucket, simulationOn bool, simulation map[string][]Simulation) (Bucket, error) { + itSimulation := make([]Simulation, 0) + for _, s := range simulation { + itSimulation = append(itSimulation, s...) + } + m := &SimulationBucket{ + Simulation: simulation, + Bucket: bucket, + SimulationOn: simulationOn, + retryStat: make(map[string]int), + mu: sync.Mutex{}, + ListCurrentSimulationRound: 0, + itSimulation: itSimulation, + } + return m, nil +} + +func (r *SimulationBucket) Download(ctx context.Context, key string, writer io.Writer, opts *blob.ReaderOptions) error { + simulation, found := r.Simulation[key] + if !r.SimulationOn || !found { + return r.Bucket.Download(ctx, key, writer, opts) + } + + r.mu.Lock() + retryIndex, found := r.retryStat[key] + if found { + r.retryStat[key] = r.retryStat[key] + 1 + } else { + r.retryStat[key] = 1 + } + thisSimulation := simulation[retryIndex] + timer := time.NewTimer(thisSimulation.Delay) + r.mu.Unlock() + + select { + case <-ctx.Done(): + return SimulationErrorDownloadCanceled + case <-timer.C: + if thisSimulation.Err != nil { + return thisSimulation.Err + } + return r.Bucket.Download(ctx, key, writer, opts) + } + +} + +func (r *SimulationBucket) Upload(ctx context.Context, key string, reader io.Reader, opts *blob.WriterOptions) error { + simulation, found := r.Simulation[key] + if !r.SimulationOn || !found { + return r.Bucket.Upload(ctx, key, reader, opts) + } + + r.mu.Lock() + retryIndex, found := r.retryStat[key] + if found { + r.retryStat[key] = r.retryStat[key] + 1 + } else { + r.retryStat[key] = 1 + } + thisSimulation := simulation[retryIndex] + timer := time.NewTimer(thisSimulation.Delay) + r.mu.Unlock() + + select { + case <-ctx.Done(): + return SimulationErrorUploadCanceled + case <-timer.C: + if thisSimulation.Err != nil { + return thisSimulation.Err + } + return r.Bucket.Upload(ctx, key, reader, opts) + } + +} + +func (r *SimulationBucket) Delete(ctx context.Context, key string) error { + simulation, found := r.Simulation[key] + if !r.SimulationOn || !found { + return r.Bucket.Delete(ctx, key) + } + + r.mu.Lock() + retryIndex, found := r.retryStat[key] + if found { + r.retryStat[key] = r.retryStat[key] + 1 + } else { + r.retryStat[key] = 1 + } + thisSimulation := simulation[retryIndex] + timer := time.NewTimer(thisSimulation.Delay) + r.mu.Unlock() + + select { + case <-ctx.Done(): + return SimulationErrorDeleteCanceled + case <-timer.C: + if thisSimulation.Err != nil { + return thisSimulation.Err + } + return r.Bucket.Delete(ctx, key) + } + +} + +func (r *SimulationBucket) ListSimu(opts *blob.ListOptions) *ListIteratorWrapper { + defer func() { + r.ListCurrentSimulationRound++ + }() + + it := r.Bucket.List(opts) + + simulationOn := r.SimulationOn + var currentSimulation Simulation + if r.ListCurrentSimulationRound >= len(r.itSimulation) { + simulationOn = false + currentSimulation = Simulation{} + } else { + currentSimulation = r.itSimulation[r.ListCurrentSimulationRound] + } + + return &ListIteratorWrapper{ + ListIterator: it, + SimulationOn: simulationOn, + Simulation: currentSimulation, + } +} + +func (r *SimulationBucket) SetListContext(ctx context.Context) { + r.ListOperationCtx = ctx +} + +type ListIteratorWrapper struct { + SimulationOn bool + Simulation Simulation + *blob.ListIterator +} + +func (r *ListIteratorWrapper) Next(ctx context.Context) (*blob.ListObject, error) { + if !r.SimulationOn { + return r.ListIterator.Next(ctx) + } + + timer := time.NewTimer(r.Simulation.Delay) + select { + case <-ctx.Done(): + return nil, SimulationErrorListCanceled + case <-timer.C: + if r.Simulation.Err != nil { + return nil, r.Simulation.Err + } + return r.ListIterator.Next(ctx) + } +} diff --git a/internal/offloading/sink.go b/internal/offloading/sink.go new file mode 100644 index 0000000000000000000000000000000000000000..7ac411d3c50b3f3fe9aac955b3d50321bf2ba151 --- /dev/null +++ b/internal/offloading/sink.go @@ -0,0 +1,319 @@ +package offloading + +import ( + "context" + "errors" + "fmt" + "gitlab.com/gitlab-org/gitaly/v16/internal/backoff" + "gocloud.dev/blob" + "io" + "math/rand" + "os" + "path/filepath" + "sync" + "time" + + _ "gocloud.dev/blob/azureblob" // register Azure driver + _ "gocloud.dev/blob/fileblob" // register file driver + _ "gocloud.dev/blob/gcsblob" // register Google Cloud driver + _ "gocloud.dev/blob/memblob" // register in-memory driver + _ "gocloud.dev/blob/s3blob" +) + +type Bucket interface { + Download(ctx context.Context, key string, w io.Writer, opts *blob.ReaderOptions) error + Upload(ctx context.Context, key string, r io.Reader, opts *blob.WriterOptions) error + List(opts *blob.ListOptions) *blob.ListIterator + Delete(ctx context.Context, key string) (err error) + Attributes(ctx context.Context, key string) (*blob.Attributes, error) + Close() error +} + +type Iterator interface { + Next(ctx context.Context) (*blob.ListObject, error) +} + +// Sink is a wrapper around the storage bucket used for uploading/downloading packfiles +type Sink struct { + ctx context.Context + globalCancel context.CancelFunc + //sink *backup.Sink + timeout time.Duration + bucket Bucket + retry backoff.Strategy + + // maxRetry is 0 mean no retry + maxRetry uint + retryTimeout time.Duration +} + +// NewSink creates a Sink from the given parameters. +func NewSink(ctx context.Context, bucket Bucket, timeout time.Duration, maxRetry uint, retryTimeout time.Duration) (*Sink, error) { + //sink, err := backup.ResolveSink(ctx, uri) + //bucket, err := blob.OpenBucket(ctx, uri) + //if err != nil { + // return nil, fmt.Errorf("open bucket: %w", err) + //} + ctx, cancel := context.WithTimeout(ctx, timeout) + return &Sink{ + ctx: ctx, + globalCancel: cancel, + timeout: timeout, + bucket: bucket, + retry: backoff.NewDefaultExponential(rand.New(rand.NewSource(time.Now().UnixNano()))), + maxRetry: maxRetry, + retryTimeout: retryTimeout, + }, nil +} + +// TODO upload just upload one single file to the bucket +func (r *Sink) Upload(ctx context.Context, fullFilePath string, prefix string) (returnErr error) { + // Use context timeouts to set an overall deadline for the operation, including all potential retries. + // Having a cancelable context is critical here because the retry mechanism does not have a maximum + // retry count and will continue retrying until the context is explicitly canceled. + ctx, cancel := context.WithTimeout(ctx, r.timeout) + //var cancel context.CancelFunc + defer cancel() + + file, err := os.Open(fullFilePath) + defer func() { + if err := file.Close(); err != nil { + returnErr = errors.Join(returnErr, fmt.Errorf("close local file writer: %w", err)) + } + }() + if err != nil { + return fmt.Errorf("open file: %w", err) + } + + objectKey := fmt.Sprintf("%s/%s", prefix, filepath.Base(fullFilePath)) + for retry := uint(0); retry <= r.maxRetry; { + operationCtx, operationCancel := context.WithTimeout(ctx, r.retryTimeout) + err = r.bucket.Upload(operationCtx, objectKey, file, &blob.WriterOptions{ + // 'no-store' - we don't want the backup to be cached as the content could be changed, + // so we always want a fresh and up to date data + // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Cache-Control#cacheability + // 'no-transform' - disallows intermediates to modify data + // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Cache-Control#other + CacheControl: "no-store, no-transform", + ContentType: "application/octet-stream", + ContentMD5: make([]byte, 0), + }) + operationCancel() + if err == nil || r.maxRetry == 0 { + break + } else { + timer := time.NewTimer(r.retry.Backoff(retry)) + select { + case <-ctx.Done(): + timer.Stop() + return fmt.Errorf("uoload object cancelled %q: %w", objectKey, err) + case <-timer.C: + // Refresh timer expires, issue another try. + retry++ + } + } + } + if err != nil { + return fmt.Errorf("upload object %q: %w", objectKey, err) + } + return nil +} + +// download +func (r *Sink) Download(ctx context.Context, objectKey string, fullFilePath string) (returnErr error) { + // Use context timeouts to set an overall deadline for the operation, including all potential retries. + // Having a cancelable context is critical here because the retry mechanism does not have a maximum + // retry count and will continue retrying until the context is explicitly canceled. + ctx, cancel := context.WithTimeout(ctx, r.timeout) + defer cancel() + + file, err := os.Create(fullFilePath) + defer func() { + if err := file.Close(); err != nil { + returnErr = errors.Join(returnErr, fmt.Errorf("close local file: %w", err)) + } + }() + if err != nil { + return fmt.Errorf("create file: %w", err) + } + + for retry := uint(0); retry <= r.maxRetry; { + operationCtx, operationCancel := context.WithTimeout(ctx, r.retryTimeout) + err = r.bucket.Download(operationCtx, objectKey, file, nil) + operationCancel() + if err == nil || r.maxRetry == 0 { + break + } else { + timer := time.NewTimer(r.retry.Backoff(retry)) + select { + case <-ctx.Done(): + timer.Stop() + return fmt.Errorf("download object cancelled %q: %w", objectKey, err) + case <-timer.C: + // Refresh timer expires, issue another try. + retry++ + } + } + } + if err != nil { + return fmt.Errorf("download object: %w", err) + } + return nil +} + +// List +func (r *Sink) List(ctx context.Context, prefix string) (res []string, err error) { + return list(ctx, r, prefix, r.bucket.List) +} + +type ListFunc[T Iterator] func(opts *blob.ListOptions) T + +func list[T Iterator](ctx context.Context, r *Sink, prefix string, listFunc ListFunc[T]) (res []string, err error) { + //defer r.globalCancel() + + core := func() (res []string, err error) { + var listErr error + var attrs *blob.ListObject + it := listFunc(&blob.ListOptions{ + Prefix: prefix + "/", + Delimiter: "/", + }) + objects := make([]string, 0) + for { + operationCtx, operationCancel := context.WithTimeout(ctx, r.retryTimeout) + attrs, listErr = it.Next(operationCtx) + operationCancel() + + if errors.Is(listErr, io.EOF) { + return objects, nil + } + + if listErr != nil { + return []string{}, fmt.Errorf("list object on GCP bucket: %w", listErr) + } + + // Exclude the bucketPrefix "folder" itself + if attrs != nil && attrs.Key == prefix+"/" { + continue + } + + objects = append(objects, filepath.Base(attrs.Key)) + } + } + + ctx, cancel := context.WithTimeout(ctx, r.timeout) + defer cancel() + for retry := uint(0); retry <= r.maxRetry; { + + res, err = core() + + if err == nil || r.maxRetry == 0 { + break + } else { + timer := time.NewTimer(r.retry.Backoff(retry)) + select { + case <-ctx.Done(): + timer.Stop() + return []string{}, fmt.Errorf("list object cancelled %w", err) + case <-timer.C: + // Refresh timer expires, issue another try. + retry++ + } + } + } + + if err != nil { + return []string{}, fmt.Errorf("list object %w", err) + + } + return res, nil + +} + +// HasObjects +func (r *Sink) HasObjects(ctx context.Context, prefix string) (bool, error) { + it := r.bucket.List(&blob.ListOptions{ + Prefix: prefix + "/", + Delimiter: "/", + }) + + for { + attrs, err := it.Next(ctx) + if errors.Is(err, io.EOF) { + return false, nil + } + + if err != nil { + return false, fmt.Errorf("list objects: %w", err) + } + + // Exclude the bucketPrefix "folder" itself + if attrs != nil && attrs.Key == prefix+"/" { + continue + } + + return true, nil + } +} + +// DeleteObjects attempts to delete given objects within the specified bucketPrefix. +// It employs a retry mechanism to ensure objects are deleted with best-effort. +// Result is objectKey map to error +func (r *Sink) DeleteObjects(ctx context.Context, prefix string, objectNames []string) map[string]error { + res := make(map[string]error) + if len(objectNames) == 0 { + return res + } + + // Use context timeouts to set an overall deadline for the operation, including all potential retries. + // Having a cancelable context is critical here because the retry mechanism does not have a maximum + // retry count and will continue retrying until the context is explicitly canceled. + ctx, cancel := context.WithTimeout(ctx, r.timeout) + defer cancel() + wg := sync.WaitGroup{} + type deleteResult struct { + object string + err error + } + resCh := make(chan deleteResult, len(objectNames)) + + // In the context of inactive repo offloading, the number of objects are expected to be small. + // Therefore, we can afford to use a goroutine for each object. + for _, object := range objectNames { + wg.Add(1) + objectPath := fmt.Sprintf("%s/%s", prefix, filepath.Base(object)) + go func(obj string) { + defer wg.Done() + var err error + for retry := uint(0); retry <= r.maxRetry; { + operationCtx, operationCancel := context.WithTimeout(ctx, r.retryTimeout) + err = r.bucket.Delete(operationCtx, obj) + operationCancel() + if err == nil || r.maxRetry == 0 { + break + } else { + timer := time.NewTimer(r.retry.Backoff(retry)) + select { + case <-ctx.Done(): + timer.Stop() + err = fmt.Errorf("delete object cancelled %q: %w", obj, err) + case <-timer.C: + // Refresh timer expires, issue another try. + retry++ + } + } + } + resCh <- deleteResult{object: obj, err: err} + }(objectPath) + } + + wg.Wait() + close(resCh) + for delRes := range resCh { + if delRes.err != nil { + res[delRes.object] = delRes.err + } + } + + return res +} diff --git a/internal/offloading/sink_test.go b/internal/offloading/sink_test.go new file mode 100644 index 0000000000000000000000000000000000000000..965898c8323a6897dbcf630ad99456616262f8c9 --- /dev/null +++ b/internal/offloading/sink_test.go @@ -0,0 +1,1277 @@ +package offloading + +import ( + "context" + "errors" + "fmt" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" + "gocloud.dev/blob" + "gocloud.dev/gcerrors" + "os" + "path/filepath" + "strings" + "testing" + "time" +) + +// Test Upload +func TestSink_Upload(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + desc string + + // filesToUpload is a list of files to upload + prefix string + + filesToUpload []string + + // filesOnLocal is a list of files actually on the local filesystem + filesOnLocal map[string]string + expectedSuccessful map[string]string + expectedErrored map[string]error + }{ + { + desc: "Upload objects", + prefix: "jerry" + testhelper.TempDir(t), + filesToUpload: []string{"C-131", "C-137"}, + filesOnLocal: map[string]string{ + "C-131": "I am Mr. Frundles", + "C-137": "Why are you here?", + }, + expectedSuccessful: map[string]string{ + "C-131": "I am Mr. Frundles", + "C-137": "Why are you here?", + }, + }, + { + desc: "Upload objects but one is missing on local", + prefix: "jerry" + testhelper.TempDir(t), + filesToUpload: []string{"C-131", "C-137", "5126"}, + filesOnLocal: map[string]string{ + "C-131": "I am Mr. Frundles", + "C-137": "Why are you here?", + }, + expectedSuccessful: map[string]string{ + "C-131": "I am Mr. Frundles", + "C-137": "Why are you here?", + }, + expectedErrored: map[string]error{ + "5126": os.ErrInvalid, + }, + }, + { + desc: "prefix can be empty", + prefix: "", + filesToUpload: []string{"C-131", "C-137", "5126"}, + filesOnLocal: map[string]string{ + "C-131": "I am Mr. Frundles", + "C-137": "Why are you here?", + }, + expectedSuccessful: map[string]string{ + "C-131": "I am Mr. Frundles", + "C-137": "Why are you here?", + }, + expectedErrored: map[string]error{ + "5126": os.ErrInvalid, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + ctx := testhelper.Context(t) + sink := setupEmptyLocalBucket(t) + defer closeBucket(t, sink) + localDir := testhelper.TempDir(t) + + for k, v := range tc.filesOnLocal { + err := os.WriteFile(filepath.Join(localDir, k), []byte(v), 0o666) + require.NoError(t, err) + } + + //prefix := "jerry" + testhelper.TempDir(t) + for _, objectName := range tc.filesToUpload { + err := sink.Upload(ctx, filepath.Join(localDir, objectName), tc.prefix) + if err == nil { + objKey := tc.prefix + "/" + objectName + attr, err := sink.bucket.Attributes(ctx, objKey) + require.NoError(t, err) + require.Equal(t, attr.CacheControl, "no-store, no-transform") + require.Equal(t, attr.ContentType, "application/octet-stream") + + var builder strings.Builder + err = sink.bucket.Download(ctx, objKey, &builder, nil) + require.NoError(t, err) + require.Equal(t, tc.expectedSuccessful[objectName], builder.String()) + } else { + require.True(t, errors.Is(err, tc.expectedErrored[objectName])) + //require.Equal(t, tc.expectedErrored[objectName], err) + } + } + + }) + + } + + t.Run("upload duplicated key whose writer finished later wins", func(t *testing.T) { + ctx := testhelper.Context(t) + localDirLoser := testhelper.TempDir(t) + err := os.WriteFile(filepath.Join(localDirLoser, "i_am_key"), []byte("hello"), 0o666) + require.NoError(t, err) + localDirWinner := testhelper.TempDir(t) + err = os.WriteFile(filepath.Join(localDirWinner, "i_am_key"), []byte("world"), 0o666) + sink := setupEmptyLocalBucket(t) + + resCh := make(chan error, 2) + go func() { + time.Sleep(200 * time.Millisecond) + resCh <- sink.Upload(ctx, filepath.Join(localDirLoser, "i_am_key"), "some/prefix") + }() + go func() { + time.Sleep(300 * time.Millisecond) + resCh <- sink.Upload(ctx, filepath.Join(localDirWinner, "i_am_key"), "some/prefix") + }() + + <-resCh + <-resCh + + var builder strings.Builder + err = sink.bucket.Download(ctx, "some/prefix/i_am_key", &builder, nil) + require.NoError(t, err) + require.Equal(t, "world", builder.String()) + + }) + + t.Run("local file path cannot be empty", func(t *testing.T) { + ctx := testhelper.Context(t) + sink := setupEmptyLocalBucket(t) + defer closeBucket(t, sink) + + err := sink.Upload(ctx, "", "some/prefix") + require.Error(t, err) + }) + +} + +func TestSink_Upload_Timeout_Cancellation_And_Retry(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + + for _, tc := range []struct { + desc string + maxRetry uint + overallTimeout time.Duration + + // operationTimeOut is the timer for each operation e.g. upload, download + operationTimeOut time.Duration + objectName string + simulationData []Simulation + expectedError error + }{ + { + desc: "upload failed with overall timer timeout no retry", + maxRetry: 0, + overallTimeout: 100 * time.Millisecond, + operationTimeOut: 5 * time.Second, + objectName: "overall_timeout_obj_key", + simulationData: []Simulation{ + {Delay: 10 * time.Second, Err: nil}, + }, + expectedError: SimulationErrorUploadCanceled, + }, + { + desc: "upload success with some operation timeout and retry", + maxRetry: 2, + overallTimeout: 60 * time.Second, + operationTimeOut: 100 * time.Millisecond, + objectName: "success_with_retry", + simulationData: []Simulation{ + {300 * time.Millisecond, nil}, // this will timeout + {300 * time.Millisecond, nil}, // this will timeout + {10 * time.Millisecond, nil}, // this should succeed + }, + expectedError: nil, + }, + { + desc: "upload failed with operation timer timeout and with retry", + maxRetry: 2, + overallTimeout: 60 * time.Second, + operationTimeOut: 50 * time.Millisecond, + objectName: "failed_even_retry", + simulationData: []Simulation{ + {1 * time.Second, nil}, + {1 * time.Second, nil}, + {1 * time.Second, nil}, + }, + expectedError: SimulationErrorUploadCanceled, + }, + { + desc: "upload success with simulated cancel and retry", + maxRetry: 2, + overallTimeout: 60 * time.Second, + operationTimeOut: 10 * time.Second, + objectName: "success_with_cancel_and_retry", + simulationData: []Simulation{ + {500 * time.Millisecond, SimulationErrorUploadCanceled}, + {500 * time.Millisecond, SimulationErrorUploadCanceled}, + {500 * time.Millisecond, nil}, + }, + expectedError: nil, + }, + { + desc: "upload failed with simulated cancel and retry", + maxRetry: 2, + overallTimeout: 60 * time.Second, + operationTimeOut: 10 * time.Second, + objectName: "success_with_cancel_and_retry", + simulationData: []Simulation{ + {500 * time.Millisecond, SimulationErrorUploadCanceled}, + {500 * time.Millisecond, SimulationErrorUploadCanceled}, + {500 * time.Millisecond, SimulationErrorUploadCanceled}, + }, + expectedError: SimulationErrorUploadCanceled, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + prefix := "some/prefix" + localBucket := testhelper.TempDir(t) + localBucketUri := fmt.Sprintf("file://%s", localBucket) + bucket, err := blob.OpenBucket(ctx, localBucketUri) + require.NoError(t, err) + + objectKey := fmt.Sprintf("%s/%s", prefix, tc.objectName) + simulation := map[string][]Simulation{ + objectKey: tc.simulationData, + } + simuBucket, err := NewSimulationBucket(bucket, true, simulation) + require.NoError(t, err) + + sink, err := NewSink(ctx, simuBucket, tc.overallTimeout, tc.maxRetry, tc.operationTimeOut) + require.NoError(t, err) + defer closeBucket(t, sink) + + localDir := testhelper.TempDir(t) + err = os.WriteFile(filepath.Join(localDir, tc.objectName), []byte("Go long!"), 0o666) + require.NoError(t, err) + err = sink.Upload(ctx, filepath.Join(localDir, tc.objectName), prefix) + if tc.expectedError != nil { + require.ErrorIs(t, err, SimulationErrorUploadCanceled) + } else { + require.NoError(t, err) + } + + }) + } + + t.Run("upload failed with overall cancel triggered no retry", func(t *testing.T) { + prefix := "some/prefix" + objectName := "overall_cancel_called" + localBucket := testhelper.TempDir(t) + localBucketUri := fmt.Sprintf("file://%s", localBucket) + bucket, err := blob.OpenBucket(ctx, localBucketUri) + objectKey := fmt.Sprintf("%s/%s", prefix, objectName) + simulation := map[string][]Simulation{ + objectKey: { + {1 * time.Second, nil}, + }, + } + simuBucket, err := NewSimulationBucket(bucket, true, simulation) + require.NoError(t, err) + ctx, cancel := context.WithCancel(ctx) + + // the overall timer and operation timer are long enough + sink, err := NewSink(ctx, simuBucket, 100*time.Second, 0, 10*time.Second) + require.NoError(t, err) + defer closeBucket(t, sink) + + localDir := testhelper.TempDir(t) + err = os.WriteFile(filepath.Join(localDir, objectName), []byte("Go long!"), 0o666) + require.NoError(t, err) + errCh := make(chan error) + go func() { + errCh <- sink.Upload(ctx, filepath.Join(localDir, objectName), prefix) + }() + + // trigger cancel + cancel() + err = <-errCh + require.ErrorIs(t, err, SimulationErrorUploadCanceled) + }) + +} + +// Test Download +func TestSink_Download(t *testing.T) { + ctx := testhelper.Context(t) + for _, tc := range []struct { + desc string + queryPrefix string + queryObjectNames []string + expectedContent map[string]string + expectedError map[string]gcerrors.ErrorCode + }{ + { + desc: "download all objects", + queryPrefix: "jerry", + queryObjectNames: []string{"C-131", "C-137"}, + expectedContent: map[string]string{ + "C-131": "I am Mr. Frundles", + "C-137": "Why are you here?", + }, + }, + { + desc: "Download nonexistent objects", + queryPrefix: "jerry", + queryObjectNames: []string{"5126"}, + expectedContent: map[string]string{}, + expectedError: map[string]gcerrors.ErrorCode{ + "5126": gcerrors.NotFound, + }, + }, + { + desc: "Download nonexistent folder", + queryPrefix: "nonexistent", + queryObjectNames: []string{"C-131"}, + expectedContent: map[string]string{}, + expectedError: map[string]gcerrors.ErrorCode{ + "C-131": gcerrors.NotFound, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + var sink *Sink + + prefix := "jerry" + objects := []fileBucketData{ + { + ObjectName: "C-131", + Content: "I am Mr. Frundles", + WriterOpt: blob.WriterOptions{ + ContentType: "application/octet-stream", + CacheControl: "no-store, no-transform", + }, + }, + { + ObjectName: "C-137", + Content: "Why are you here?", + WriterOpt: blob.WriterOptions{ + ContentType: "atext/html; charset=utf-8", + CacheControl: "no-store, no-transform", + }, + }, + } + sink, _ = setupLocalBucket(t, prefix, objects) + defer closeBucket(t, sink) + localDir := testhelper.TempDir(t) + + for _, obj := range tc.queryObjectNames { + objectKey := fmt.Sprintf("%s/%s", tc.queryPrefix, obj) + localFullPath := filepath.Join(localDir, obj) + err := sink.Download(ctx, objectKey, localFullPath) + if err == nil { + buf, err := os.ReadFile(localFullPath) + require.NoError(t, err) + require.Equal(t, tc.expectedContent[obj], string(buf)) + } else { + require.Equal(t, tc.expectedError[obj], gcerrors.Code(err)) + } + } + + }) + } +} + +func TestSink_Download_Timeout_Cancellation_And_Retry(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + + for _, tc := range []struct { + desc string + maxRetry uint + overallTimeout time.Duration + + // operationTimeOut is the timer for each operation e.g. upload, download + operationTimeOut time.Duration + objectName string + objectContent string + simulationData []Simulation + expectedError error + }{ + { + desc: "download failed with overall timer timeout no retry", + maxRetry: 0, + overallTimeout: 100 * time.Millisecond, + operationTimeOut: 5 * time.Second, + objectName: "overall_timeout_obj_key", + objectContent: "it's about time", + simulationData: []Simulation{ + {Delay: 10 * time.Second, Err: nil}, + }, + expectedError: SimulationErrorDownloadCanceled, + }, + { + desc: "down success with some timeouts and retry", + maxRetry: 2, + overallTimeout: 60 * time.Second, + operationTimeOut: 100 * time.Millisecond, + objectName: "success_with_retry", + objectContent: "with some network jitter, we made it", + simulationData: []Simulation{ + {300 * time.Millisecond, nil}, // this will timeout + {300 * time.Millisecond, nil}, // this will timeout + {10 * time.Millisecond, nil}, // this should succeed + }, + expectedError: nil, + }, + { + desc: "download failed with operation timeout and with retry", + maxRetry: 2, + overallTimeout: 60 * time.Second, + operationTimeOut: 50 * time.Millisecond, + objectName: "failed_even_retry", + objectContent: "we had a terrible network", + simulationData: []Simulation{ + {1 * time.Second, nil}, + {1 * time.Second, nil}, + {1 * time.Second, nil}, + }, + expectedError: SimulationErrorDownloadCanceled, + }, + { + desc: "down success with simulated cancel and retry", + maxRetry: 2, + overallTimeout: 60 * time.Second, + operationTimeOut: 10 * time.Second, + objectName: "success_with_cancel_and_retry", + simulationData: []Simulation{ + {500 * time.Millisecond, SimulationErrorDownloadCanceled}, + {500 * time.Millisecond, SimulationErrorDownloadCanceled}, + {500 * time.Millisecond, nil}, + }, + expectedError: nil, + }, + { + desc: "download failed with simulated cancel and retry", + maxRetry: 2, + overallTimeout: 60 * time.Second, + operationTimeOut: 10 * time.Second, + objectName: "success_with_cancel_and_retry", + simulationData: []Simulation{ + {500 * time.Millisecond, SimulationErrorDownloadCanceled}, + {500 * time.Millisecond, SimulationErrorDownloadCanceled}, + {500 * time.Millisecond, SimulationErrorDownloadCanceled}, + }, + expectedError: SimulationErrorDownloadCanceled, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + prefix := "some/prefix" + objects := []fileBucketData{ + { + ObjectName: tc.objectName, + Content: tc.objectContent, + WriterOpt: blob.WriterOptions{ + ContentType: "application/octet-stream", + CacheControl: "no-store, no-transform", + }, + }, + } + _, bucket := setupLocalBucket(t, prefix, objects) + + objectKey := fmt.Sprintf("%s/%s", prefix, tc.objectName) + simulation := map[string][]Simulation{ + objectKey: tc.simulationData, + } + simuBucket, err := NewSimulationBucket(bucket, true, simulation) + require.NoError(t, err) + sink, err := NewSink(ctx, simuBucket, tc.overallTimeout, tc.maxRetry, tc.operationTimeOut) + require.NoError(t, err) + defer closeBucket(t, sink) + + localDir := testhelper.TempDir(t) + localFullPath := filepath.Join(localDir, tc.objectName) + err = sink.Download(ctx, objectKey, localFullPath) + + if tc.expectedError != nil { + require.ErrorIs(t, err, tc.expectedError) + } else { + buf, err := os.ReadFile(localFullPath) + require.NoError(t, err) + require.Equal(t, tc.objectContent, string(buf)) + } + }) + } + + t.Run("download failed with overall cancel triggered no retry", func(t *testing.T) { + prefix := "some/prefix" + objectName := "overall_cancel_called" + objectContent := "I can't wait forever" + objects := []fileBucketData{ + { + ObjectName: objectName, + Content: objectContent, + WriterOpt: blob.WriterOptions{ + ContentType: "application/octet-stream", + CacheControl: "no-store, no-transform", + }, + }, + } + _, bucket := setupLocalBucket(t, prefix, objects) + + objectKey := fmt.Sprintf("%s/%s", prefix, objectName) + simulation := map[string][]Simulation{ + objectKey: { + {1 * time.Second, nil}, + }, + } + simuBucket, err := NewSimulationBucket(bucket, true, simulation) + require.NoError(t, err) + ctx, cancel := context.WithCancel(ctx) + sink, err := NewSink(ctx, simuBucket, 100*time.Second, 0, 10*time.Second) + require.NoError(t, err) + defer closeBucket(t, sink) + + localDir := testhelper.TempDir(t) + localFullPath := filepath.Join(localDir, objectName) + errCh := make(chan error) + go func() { + errCh <- sink.Download(ctx, objectKey, localFullPath) + }() + + // trigger cancel + cancel() + err = <-errCh + require.ErrorIs(t, err, SimulationErrorDownloadCanceled) + }) +} + +func TestSink_Delete(t *testing.T) { + ctx := testhelper.Context(t) + + for _, tc := range []struct { + desc string + queryingPrefix string + createBucket bool + bucket string + prefix string + objects []fileBucketData + objectsToDelete []string + objectsLeft []string + expectedRes map[string]gcerrors.ErrorCode + }{ + { + desc: "delete all objects", + queryingPrefix: "jerry", + prefix: "jerry", + objects: []fileBucketData{ + { + ObjectName: "C-131", + Content: "I am Mr. Frundles", + WriterOpt: blob.WriterOptions{ + ContentType: "application/octet-stream", + CacheControl: "no-store, no-transform", + }, + }, + { + ObjectName: "C-137", + Content: "Why are you here?", + WriterOpt: blob.WriterOptions{ + ContentType: "atext/html; charset=utf-8", + CacheControl: "no-store, no-transform", + }, + }, + }, + objectsToDelete: []string{"C-131", "C-137"}, + expectedRes: map[string]gcerrors.ErrorCode{}, + }, + { + desc: "delete some objects", + queryingPrefix: "jerry", + prefix: "jerry", + objects: []fileBucketData{ + { + ObjectName: "C-131", + Content: "I am Mr. Frundles", + WriterOpt: blob.WriterOptions{ + ContentType: "application/octet-stream", + CacheControl: "no-store, no-transform", + }, + }, + { + ObjectName: "C-137", + Content: "Why are you here?", + WriterOpt: blob.WriterOptions{ + ContentType: "atext/html; charset=utf-8", + CacheControl: "no-store, no-transform", + }, + }, + }, + objectsToDelete: []string{"C-131"}, + objectsLeft: []string{"C-137"}, + expectedRes: map[string]gcerrors.ErrorCode{}, + }, + { + desc: "delete more objects than you have", + queryingPrefix: "jerry", + prefix: "jerry", + objects: []fileBucketData{ + { + ObjectName: "C-131", + Content: "I am Mr. Frundles", + WriterOpt: blob.WriterOptions{ + ContentType: "application/octet-stream", + CacheControl: "no-store, no-transform", + }, + }, + { + ObjectName: "C-137", + Content: "Why are you here?", + WriterOpt: blob.WriterOptions{ + ContentType: "atext/html; charset=utf-8", + CacheControl: "no-store, no-transform", + }, + }, + }, + objectsToDelete: []string{"C-131", "C-137", "5126"}, + objectsLeft: []string{}, + expectedRes: map[string]gcerrors.ErrorCode{ + "jerry/5126": gcerrors.NotFound, + }, + }, + { + desc: "delete nonexistent prefix", + queryingPrefix: "nonexistent", + prefix: "jerry", + objects: []fileBucketData{ + { + ObjectName: "C-131", + Content: "I am Mr. Frundles", + WriterOpt: blob.WriterOptions{ + ContentType: "application/octet-stream", + CacheControl: "no-store, no-transform", + }, + }, + }, + objectsToDelete: []string{"C-131"}, + objectsLeft: []string{"C-131"}, + expectedRes: map[string]gcerrors.ErrorCode{ + "nonexistent/C-131": gcerrors.NotFound, + }, + }, + { + desc: "empty bucket", + queryingPrefix: "i-am-empty", + prefix: "i-am-empty", + objects: []fileBucketData{}, + objectsToDelete: []string{"C-131"}, + objectsLeft: []string{}, + expectedRes: map[string]gcerrors.ErrorCode{ + "i-am-empty/C-131": gcerrors.NotFound, + }, + }, + { + desc: "delete parameter is empty", + queryingPrefix: "jerry", + prefix: "jerry", + objects: []fileBucketData{ + { + ObjectName: "C-131", + Content: "I am Mr. Frundles", + WriterOpt: blob.WriterOptions{ + ContentType: "application/octet-stream", + CacheControl: "no-store, no-transform", + }, + }, + { + ObjectName: "C-137", + Content: "Why are you here?", + WriterOpt: blob.WriterOptions{ + ContentType: "atext/html; charset=utf-8", + CacheControl: "no-store, no-transform", + }, + }, + }, + objectsToDelete: []string{}, + expectedRes: map[string]gcerrors.ErrorCode{}, + objectsLeft: []string{"C-131", "C-137"}, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + var sink *Sink + + sink, _ = setupLocalBucket(t, tc.prefix, tc.objects) + defer closeBucket(t, sink) + + actualResWithErrorCode := make(map[string]gcerrors.ErrorCode) + res := sink.DeleteObjects(ctx, tc.queryingPrefix, tc.objectsToDelete) + fmt.Println(res) + for k, v := range res { + actualResWithErrorCode[k] = gcerrors.Code(v) + } + require.Equal(t, tc.expectedRes, actualResWithErrorCode) + + actualObjectsLeft, err := sink.List(ctx, tc.prefix) + require.NoError(t, err) + require.ElementsMatch(t, tc.objectsLeft, actualObjectsLeft) + }) + } +} + +func TestSink_Delete_Timeout_Cancellation_And_Retry(t *testing.T) { + + t.Parallel() + ctx := testhelper.Context(t) + + for _, tc := range []struct { + desc string + maxRetry uint + overallTimeout time.Duration + + // operationTimeOut is the timer for each operation e.g. upload, download + operationTimeOut time.Duration + objectName string + simulationData []Simulation + expectedError error + }{ + { + desc: "delete failed with overall timer timeout no retry", + maxRetry: 0, + overallTimeout: 100 * time.Millisecond, + operationTimeOut: 5 * time.Second, + objectName: "overall_timeout_obj_key", + simulationData: []Simulation{ + {Delay: 10 * time.Second, Err: nil}, + }, + expectedError: SimulationErrorDeleteCanceled, + }, + { + desc: "delete success with some timeouts and retry", + maxRetry: 2, + overallTimeout: 60 * time.Second, + operationTimeOut: 100 * time.Millisecond, + objectName: "success_with_retry", + simulationData: []Simulation{ + {300 * time.Millisecond, nil}, // this will timeout + {300 * time.Millisecond, nil}, // this will timeout + {10 * time.Millisecond, nil}, // this should succeed + }, + expectedError: nil, + }, + { + desc: "delete failed with operation timeout and with retry", + maxRetry: 2, + overallTimeout: 60 * time.Second, + operationTimeOut: 50 * time.Millisecond, + objectName: "failed_even_retry", + simulationData: []Simulation{ + {1 * time.Second, nil}, + {1 * time.Second, nil}, + {1 * time.Second, nil}, + }, + expectedError: SimulationErrorDeleteCanceled, + }, + { + desc: "delete success with simulated cancel and retry", + maxRetry: 2, + overallTimeout: 60 * time.Second, + operationTimeOut: 10 * time.Second, + objectName: "success_with_cancel_and_retry", + simulationData: []Simulation{ + {500 * time.Millisecond, SimulationErrorDownloadCanceled}, + {500 * time.Millisecond, SimulationErrorDownloadCanceled}, + {500 * time.Millisecond, nil}, + }, + expectedError: nil, + }, + { + desc: "delete failed with simulated cancel and retry", + maxRetry: 2, + overallTimeout: 60 * time.Second, + operationTimeOut: 10 * time.Second, + objectName: "success_with_cancel_and_retry", + simulationData: []Simulation{ + {500 * time.Millisecond, SimulationErrorDeleteCanceled}, + {500 * time.Millisecond, SimulationErrorDeleteCanceled}, + {500 * time.Millisecond, SimulationErrorDeleteCanceled}, + }, + expectedError: SimulationErrorDeleteCanceled, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + prefix := "some/prefix" + objects := []fileBucketData{ + { + ObjectName: tc.objectName, + Content: "I am Mr. Frundles", + WriterOpt: blob.WriterOptions{ + ContentType: "application/octet-stream", + CacheControl: "no-store, no-transform", + }, + }, + } + _, bucket := setupLocalBucket(t, prefix, objects) + + objectKey := fmt.Sprintf("%s/%s", prefix, tc.objectName) + simulation := map[string][]Simulation{ + objectKey: tc.simulationData, + } + simuBucket, err := NewSimulationBucket(bucket, true, simulation) + require.NoError(t, err) + sink, err := NewSink(ctx, simuBucket, tc.overallTimeout, tc.maxRetry, tc.operationTimeOut) + require.NoError(t, err) + defer closeBucket(t, sink) + + res := sink.DeleteObjects(ctx, prefix, []string{objectKey}) + + if tc.expectedError != nil { + require.ErrorIs(t, res[objectKey], tc.expectedError) + } else { + require.NoError(t, res[objectKey]) + actualObjectsLeft, err := sink.List(ctx, prefix) + require.NoError(t, err) + require.Empty(t, actualObjectsLeft) + } + }) + } + + t.Run("download failed with overall cancel triggered no retry", func(t *testing.T) { + prefix := "some/prefix" + objectName := "overall_cancel_called" + objectContent := "I can't wait forever" + objects := []fileBucketData{ + { + ObjectName: objectName, + Content: objectContent, + WriterOpt: blob.WriterOptions{ + ContentType: "application/octet-stream", + CacheControl: "no-store, no-transform", + }, + }, + } + _, bucket := setupLocalBucket(t, prefix, objects) + + objectKey := fmt.Sprintf("%s/%s", prefix, objectName) + simulation := map[string][]Simulation{ + objectKey: { + {1 * time.Second, nil}, + }, + } + simuBucket, err := NewSimulationBucket(bucket, true, simulation) + require.NoError(t, err) + ctx, cancel := context.WithCancel(ctx) + sink, err := NewSink(ctx, simuBucket, 100*time.Second, 0, 10*time.Second) + require.NoError(t, err) + defer closeBucket(t, sink) + + errCh := make(chan map[string]error) + go func() { + errCh <- sink.DeleteObjects(ctx, prefix, []string{objectKey}) + }() + + // trigger cancel + cancel() + res := <-errCh + require.ErrorIs(t, res[objectKey], SimulationErrorDeleteCanceled) + }) +} + +// Test List +func TestSink_List(t *testing.T) { + ctx := testhelper.Context(t) + + for _, tc := range []struct { + desc string + queryingPrefix string + createBucket bool + bucket string + prefix string + objects []fileBucketData + expectedRes []string + expectedErr error + }{ + { + desc: "bucket with objects", + queryingPrefix: "jerry", + prefix: "jerry", + objects: []fileBucketData{ + { + ObjectName: "C-131", + Content: "I am Mr. Frundles", + WriterOpt: blob.WriterOptions{ + ContentType: "application/octet-stream", + CacheControl: "no-store, no-transform", + }, + }, + { + ObjectName: "C-137", + Content: "Why are you here?", + WriterOpt: blob.WriterOptions{ + ContentType: "atext/html; charset=utf-8", + CacheControl: "no-store, no-transform", + }, + }, + }, + expectedRes: []string{"C-131", "C-137"}, + expectedErr: nil, + }, + { + desc: "prefix with / ending", + queryingPrefix: "jerry/", + prefix: "jerry", + objects: []fileBucketData{ + { + ObjectName: "C-131", + Content: "I am Mr. Frundles", + WriterOpt: blob.WriterOptions{ + ContentType: "application/octet-stream", + CacheControl: "no-store, no-transform", + }, + }, + }, + expectedRes: []string{}, + expectedErr: nil, + }, + { + desc: "nonexistent prefix in a bucket with objects", + queryingPrefix: "nonexistent", + prefix: "jerry", + objects: []fileBucketData{ + { + ObjectName: "C-131", + Content: "I am Mr. Frundles", + WriterOpt: blob.WriterOptions{ + ContentType: "application/octet-stream", + CacheControl: "no-store, no-transform", + }, + }, + }, + expectedRes: []string{}, + expectedErr: nil, + }, + { + desc: "empty bucket", + queryingPrefix: "i-am-empty", + prefix: "i-am-empty", + objects: []fileBucketData{}, + expectedRes: []string{}, + expectedErr: nil, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + var sink *Sink + + sink, _ = setupLocalBucket(t, tc.prefix, tc.objects) + defer closeBucket(t, sink) + + res, err := sink.List(ctx, tc.queryingPrefix) + require.NoError(t, err) + + require.Equal(t, tc.expectedRes, res) + }) + } +} + +func TestSink_List_Timeout_Cancellation_And_Retry(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + + for _, tc := range []struct { + desc string + maxRetry uint + overallTimeout time.Duration + + // operationTimeOut is the timer for each operation e.g. upload, download + operationTimeOut time.Duration + objectName string + simulationData []Simulation + expectedError error + }{ + { + desc: "list failed with overall timer timeout no retry", + maxRetry: 0, + overallTimeout: 100 * time.Millisecond, + operationTimeOut: 5 * time.Second, + objectName: "overall_timeout_obj_key", + simulationData: []Simulation{ + {Delay: 10 * time.Second, Err: nil}, + }, + expectedError: SimulationErrorListCanceled, + }, + { + desc: "list success with some timeouts and retry", + maxRetry: 2, + overallTimeout: 60 * time.Second, + operationTimeOut: 100 * time.Millisecond, + objectName: "success_with_retry", + simulationData: []Simulation{ + {300 * time.Millisecond, nil}, // this will timeout + {300 * time.Millisecond, nil}, // this will timeout + {10 * time.Millisecond, nil}, // this should succeed + }, + expectedError: nil, + }, + { + desc: "list failed with operation timeout and with retry", + maxRetry: 2, + overallTimeout: 60 * time.Second, + operationTimeOut: 50 * time.Millisecond, + objectName: "failed_even_retry", + simulationData: []Simulation{ + {1 * time.Second, nil}, + {1 * time.Second, nil}, + {1 * time.Second, nil}, + }, + expectedError: SimulationErrorListCanceled, + }, + { + desc: "list success with simulated cancel and retry", + maxRetry: 2, + overallTimeout: 60 * time.Second, + operationTimeOut: 10 * time.Second, + objectName: "success_with_cancel_and_retry", + simulationData: []Simulation{ + {500 * time.Millisecond, SimulationErrorDownloadCanceled}, + {500 * time.Millisecond, SimulationErrorDownloadCanceled}, + {500 * time.Millisecond, nil}, + }, + expectedError: nil, + }, + { + desc: "delete failed with simulated cancel and retry", + maxRetry: 2, + overallTimeout: 60 * time.Second, + operationTimeOut: 10 * time.Second, + objectName: "success_with_cancel_and_retry", + simulationData: []Simulation{ + {500 * time.Millisecond, SimulationErrorDeleteCanceled}, + {500 * time.Millisecond, SimulationErrorDeleteCanceled}, + {500 * time.Millisecond, SimulationErrorDeleteCanceled}, + }, + expectedError: SimulationErrorDeleteCanceled, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + prefix := "some/prefix" + objects := []fileBucketData{ + { + ObjectName: tc.objectName, + Content: "I am Mr. Frundles", + WriterOpt: blob.WriterOptions{ + ContentType: "application/octet-stream", + CacheControl: "no-store, no-transform", + }, + }, + } + _, bucket := setupLocalBucket(t, prefix, objects) + + objectKey := fmt.Sprintf("%s/%s", prefix, tc.objectName) + simulation := map[string][]Simulation{ + objectKey: tc.simulationData, + } + simuBucket, err := NewSimulationBucket(bucket, true, simulation) + require.NoError(t, err) + + simuBucketPtr, ok := simuBucket.(*SimulationBucket) + require.True(t, ok) + + sink, err := NewSink(ctx, simuBucket, tc.overallTimeout, tc.maxRetry, tc.operationTimeOut) + //simuBucketPtr.ListOperationCtx = sink.ctx + + require.NoError(t, err) + defer closeBucket(t, sink) + + res, err := list(ctx, sink, prefix, simuBucketPtr.ListSimu) + if tc.expectedError != nil { + require.ErrorIs(t, err, tc.expectedError) + } else { + require.NoError(t, err) + require.ElementsMatch(t, res, []string{tc.objectName}) + } + }) + } + + t.Run("list failed with overall cancel triggered no retry", func(t *testing.T) { + prefix := "some/prefix" + objectName := "overall_cancel_called" + objectContent := "I can't wait forever" + objects := []fileBucketData{ + { + ObjectName: objectName, + Content: objectContent, + WriterOpt: blob.WriterOptions{ + ContentType: "application/octet-stream", + CacheControl: "no-store, no-transform", + }, + }, + } + _, bucket := setupLocalBucket(t, prefix, objects) + + objectKey := fmt.Sprintf("%s/%s", prefix, objectName) + simulation := map[string][]Simulation{ + objectKey: { + {1 * time.Second, nil}, + }, + } + simuBucket, err := NewSimulationBucket(bucket, true, simulation) + require.NoError(t, err) + + simuBucketPtr, ok := simuBucket.(*SimulationBucket) + require.True(t, ok) + + ctx, cancel := context.WithCancel(ctx) + sink, err := NewSink(ctx, simuBucket, 100*time.Second, 0, 10*time.Second) + require.NoError(t, err) + defer closeBucket(t, sink) + + errCh := make(chan error) + go func() { + _, err := list(ctx, sink, prefix, simuBucketPtr.ListSimu) + errCh <- err + }() + + // trigger cancel + cancel() + res := <-errCh + require.ErrorIs(t, res, SimulationErrorListCanceled) + }) +} + +func TestSink_HasObjects(t *testing.T) { + ctx := testhelper.Context(t) + + for _, tc := range []struct { + desc string + queryingPrefix string + createBucket bool + bucket string + prefix string + objects []fileBucketData + expectedRes bool + expectedErr error + }{ + { + desc: "bucket with objects", + queryingPrefix: "jerry", + prefix: "jerry", + objects: []fileBucketData{ + { + ObjectName: "C-131", + Content: "I am Mr. Frundles", + WriterOpt: blob.WriterOptions{ + ContentType: "application/octet-stream", + CacheControl: "no-store, no-transform", + }, + }, + { + ObjectName: "C-137", + Content: "Why are you here?", + WriterOpt: blob.WriterOptions{ + ContentType: "atext/html; charset=utf-8", + CacheControl: "no-store, no-transform", + }, + }, + }, + expectedRes: true, + expectedErr: nil, + }, + { + desc: "prefix with / ending", + queryingPrefix: "jerry/", + prefix: "jerry", + objects: []fileBucketData{ + { + ObjectName: "C-131", + Content: "I am Mr. Frundles", + WriterOpt: blob.WriterOptions{ + ContentType: "application/octet-stream", + CacheControl: "no-store, no-transform", + }, + }, + }, + expectedRes: false, + expectedErr: nil, + }, + { + desc: "nonexistent prefix in a bucket with objects", + queryingPrefix: "nonexistent", + prefix: "jerry", + objects: []fileBucketData{ + { + ObjectName: "C-131", + Content: "I am Mr. Frundles", + WriterOpt: blob.WriterOptions{ + ContentType: "application/octet-stream", + CacheControl: "no-store, no-transform", + }, + }, + }, + expectedRes: false, + expectedErr: nil, + }, + { + desc: "empty bucket", + queryingPrefix: "i-am-empty", + prefix: "i-am-empty", + objects: []fileBucketData{}, + expectedRes: false, + expectedErr: nil, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + var sink *Sink + + sink, _ = setupLocalBucket(t, tc.prefix, tc.objects) + defer closeBucket(t, sink) + + res, err := sink.HasObjects(ctx, tc.queryingPrefix) + require.NoError(t, err) + require.Equal(t, tc.expectedRes, res) + }) + } +} + +// Setup Bucket based on local file system +func setupEmptyLocalBucket(t *testing.T) *Sink { + ctx := testhelper.Context(t) + localBucket := testhelper.TempDir(t) + localBucketUri := fmt.Sprintf("file://%s", localBucket) + bucket, err := blob.OpenBucket(ctx, localBucketUri) + require.NoError(t, err) + sink, err := NewSink(ctx, bucket, 60*time.Second, 1, 3*time.Second) + require.NoError(t, err) + return sink +} + +// Add file to bucket +type fileBucketData struct { + ObjectName string + Content string + WriterOpt blob.WriterOptions +} + +func setupLocalBucket(t *testing.T, prefix string, objectsToUpload []fileBucketData) (*Sink, *blob.Bucket) { + + ctx := testhelper.Context(t) + bucket := testhelper.TempDir(t) + localBucketUri := fmt.Sprintf("file://%s", bucket) + localBucket, err := blob.OpenBucket(ctx, localBucketUri) + require.NoError(t, err) + sink, err := NewSink(ctx, localBucket, 60*time.Second, 1, 3*time.Second) + require.NoError(t, err) + + for _, obj := range objectsToUpload { + objectKey := filepath.Join(prefix, obj.ObjectName) + content := strings.NewReader(obj.Content) + require.NoError(t, err) + err = sink.bucket.Upload(ctx, objectKey, content, &obj.WriterOpt) + require.NoError(t, err) + } + + return sink, localBucket +} + +// Clean up bucket ?? +func closeBucket(t *testing.T, sink *Sink) { + err := sink.bucket.Close() + require.NoError(t, err) +} diff --git a/internal/offloading/testhelper_test.go b/internal/offloading/testhelper_test.go new file mode 100644 index 0000000000000000000000000000000000000000..911b7f1184fe02b28817fcdd57a8196549ee58f8 --- /dev/null +++ b/internal/offloading/testhelper_test.go @@ -0,0 +1,11 @@ +package offloading + +import ( + "testing" + + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" +) + +func TestMain(m *testing.M) { + testhelper.Run(m) +} diff --git a/internal/testhelper/leakage.go b/internal/testhelper/leakage.go index 3e23c56b5d43749bb89433aa6a6d1905aad7a19f..fdbd73c2d36ad2ed350ff941b7d2820dc9ad7aad 100644 --- a/internal/testhelper/leakage.go +++ b/internal/testhelper/leakage.go @@ -25,6 +25,10 @@ func mustHaveNoGoroutines() { // `init()` function. There is no way to stop this worker, so it will leak // whenever we import the package. goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), + // cloud.google.com/go/storage client leaves cached connections to improve performance, + // and idle connections are garbage collected within a reasonable timeframe. + // See https://github.com/googleapis/google-cloud-go/issues/10948 for more details. + goleak.IgnoreTopFunction("internal/poll.runtime_pollWait"), // The backchannel code is somehow stock on closing its connections. I have no clue // why that is, but we should investigate. goleak.IgnoreTopFunction(PkgPath("internal/grpc/backchannel.clientHandshake.serve.func4")),