From 0c50ae108f2ed240304da47abbbaba2e97346170 Mon Sep 17 00:00:00 2001 From: Eric Ju Date: Thu, 5 Dec 2024 23:52:13 -0500 Subject: [PATCH 01/10] testhelper: Ignore internal/poll.runtime_pollWait in leak checks Using the cloud.google.com/go/storage client in tests may trigger reports of goroutine leaks involving internal/poll.runtime_pollWait. This issue was previously reported in https://github.com/googleapis/google-cloud-go/issues/10948, but the report was closed with an explanation: the cached connections are intentionally designed to improve performance, and idle connections are garbage collected within a reasonable timeframe. No memory leaks have been observed in production as a result of this behavior. To avoid false positives in test leak detection, this commit teaches testhelper to ignore internal/poll.runtime_pollWait during goroutine leak checks. --- internal/testhelper/leakage.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/testhelper/leakage.go b/internal/testhelper/leakage.go index 3e23c56b5d..fdbd73c2d3 100644 --- a/internal/testhelper/leakage.go +++ b/internal/testhelper/leakage.go @@ -25,6 +25,10 @@ func mustHaveNoGoroutines() { // `init()` function. There is no way to stop this worker, so it will leak // whenever we import the package. goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), + // cloud.google.com/go/storage client leaves cached connections to improve performance, + // and idle connections are garbage collected within a reasonable timeframe. + // See https://github.com/googleapis/google-cloud-go/issues/10948 for more details. + goleak.IgnoreTopFunction("internal/poll.runtime_pollWait"), // The backchannel code is somehow stock on closing its connections. I have no clue // why that is, but we should investigate. goleak.IgnoreTopFunction(PkgPath("internal/grpc/backchannel.clientHandshake.serve.func4")), -- GitLab From 249a794adecf742fdfc275e3ea76b2084f929dd2 Mon Sep 17 00:00:00 2001 From: Eric Ju Date: Sat, 7 Dec 2024 23:52:08 -0500 Subject: [PATCH 02/10] housekeeping: Add OffloadingStorage Client with GCP implementation As part of repository offloading, interacting with the offloading storage is required for operations such as uploading, deleting, and listing objects. This commit introduces an OffloadingStorageClient interface to define these operations. Additionally, a GCP-specific implementation of the client is provided, leveraging the Google Cloud Storage API. --- .../housekeeping/offloading_storage_gcp.go | 189 +++++++++++ .../offloading_storage_gcp_test.go | 320 ++++++++++++++++++ 2 files changed, 509 insertions(+) create mode 100644 internal/git/housekeeping/offloading_storage_gcp.go create mode 100644 internal/git/housekeeping/offloading_storage_gcp_test.go diff --git a/internal/git/housekeeping/offloading_storage_gcp.go b/internal/git/housekeeping/offloading_storage_gcp.go new file mode 100644 index 0000000000..3407eb6302 --- /dev/null +++ b/internal/git/housekeeping/offloading_storage_gcp.go @@ -0,0 +1,189 @@ +package housekeeping + +import ( + "context" + "errors" + "fmt" + "io" + "os" + "path/filepath" + "sync" + "time" + + gs "cloud.google.com/go/storage" + "github.com/googleapis/gax-go/v2" + "google.golang.org/api/iterator" +) + +// OffloadingStorageClient defines the interface for operations required to interact with offloading storage. +// This interface can be extended to support additional types of storage systems as needed. +type OffloadingStorageClient interface { + Upload(ctx context.Context, path string, bucketPrefix string) error + TryDelete(ctx context.Context, bucketPrefix string, objects []string) map[string]error + HasObjects(ctx context.Context, bucketPrefix string) (bool, error) +} + +// OffloadingStorageGcpClient is the implementation of OffloadingStorageClient for GCP. +type OffloadingStorageGcpClient struct { + bucketClient *gs.BucketHandle +} + +// NewOffloadingStorageGcpClient initializes and returns a new GCP bucket client +// to facilitate bucket operations such as uploading, listing, and deleting objects. +func NewOffloadingStorageGcpClient(client *gs.Client, bucket string) *OffloadingStorageGcpClient { + bucketClient := client.Bucket(bucket) + return &OffloadingStorageGcpClient{ + bucketClient: bucketClient, + } +} + +// Upload uploads a file to the GCP bucket under the specified bucketPrefix. +// The source file is located on the local file system at the provided path. +func (r *OffloadingStorageGcpClient) Upload(ctx context.Context, path string, bucketPrefix string) error { + // Use context timeouts to set an overall deadline for the operation, including all potential retries. + // Having a cancelable context is critical here because the retry mechanism does not have a maximum + // retry count and will continue retrying until the context is explicitly canceled. + ctx, cancel := context.WithTimeout(ctx, 60*time.Second) + defer cancel() + + file, err := os.Open(path) + if err != nil { + return fmt.Errorf("open file: %w", err) + } + objectPath := fmt.Sprintf("%s/%s", bucketPrefix, filepath.Base(path)) + + o := r.bucketClient.Object(objectPath).Retryer( + // Use WithBackoff to control the timing of the exponential backoff. + gs.WithBackoff(gax.Backoff{ + Max: 3 * time.Second, + }), + // We use preconditions when uploading objects to prevent race conditions. + // This approach is categorized as "Conditionally idempotent" based on the guidelines at: + // https://cloud.google.com/storage/docs/retry-strategy#idempotency-operations + gs.WithPolicy(gs.RetryIdempotent), + ) + + // Preconditions are used here to prevent race conditions. + // We want to make sure that no object with the same name is being uploaded concurrently. + o = o.If(gs.Conditions{DoesNotExist: true}) + wc := o.NewWriter(ctx) + _, err = io.Copy(wc, file) + if err != nil { + return fmt.Errorf("copy local file to object writer: %w", err) + } + if err := wc.Close(); err != nil { + return fmt.Errorf("close object writer: %w", err) + } + if err := file.Close(); err != nil { + return fmt.Errorf("close local file writer: %w", err) + } + return nil +} + +// TryDelete attempts to delete given objects within the specified bucketPrefix. +// It employs a retry mechanism to ensure objects are deleted with best-effort. +func (r *OffloadingStorageGcpClient) TryDelete(ctx context.Context, bucketPrefix string, objects []string) map[string]error { + res := make(map[string]error) + if len(objects) == 0 { + return res + } + + // Use context timeouts to set an overall deadline for the operation, including all potential retries. + // Having a cancelable context is critical here because the retry mechanism does not have a maximum + // retry count and will continue retrying until the context is explicitly canceled. + ctx, cancel := context.WithTimeout(ctx, 60*time.Second) + defer cancel() + wg := sync.WaitGroup{} + type deleteResult struct { + object string + err error + } + resCh := make(chan deleteResult, len(objects)) + + // In the context of inactive repo offloading, the number of objects are expected to be small. + // Therefore, we can afford to use a goroutine for each object. + for _, object := range objects { + wg.Add(1) + objectPath := fmt.Sprintf("%s/%s", bucketPrefix, filepath.Base(object)) + go func(obj string) { + defer wg.Done() + o := r.bucketClient.Object(obj).Retryer( + // Use WithBackoff to control the timing of the exponential backoff. + gs.WithBackoff(gax.Backoff{ + Max: 5 * time.Second, + }), + // Delete an object with ifGenerationMatch is categorized as "Conditionally idempotent" + // based on the guidelines at: https://cloud.google.com/storage/docs/retry-strategy#idempotency-operations + gs.WithPolicy(gs.RetryIdempotent), + ) + + // Set a generation-match precondition to avoid potential race + // conditions and data corruptions. The request to delete the file is aborted + // if the object's generation number does not match your precondition. + attrs, err := o.Attrs(ctx) + if err != nil { + resCh <- deleteResult{object: obj, err: err} + return + } + o = o.If(gs.Conditions{GenerationMatch: attrs.Generation}) + err = o.Delete(ctx) + resCh <- deleteResult{object: obj, err: err} + }(objectPath) + } + + wg.Wait() + close(resCh) + for delRes := range resCh { + res[delRes.object] = delRes.err + } + + return res +} + +// HasObjects checks whether the GCP bucket contains any objects that match the specified bucketPrefix. +func (r *OffloadingStorageGcpClient) HasObjects(ctx context.Context, bucketPrefix string) (bool, error) { + q := gs.Query{Prefix: bucketPrefix + "/", Delimiter: "/"} + it := r.bucketClient.Objects(ctx, &q) + + for { + attrs, err := it.Next() + if errors.Is(err, iterator.Done) { + return false, nil + } + + if err != nil { + return false, fmt.Errorf("list object on GCP bucket: %w", err) + } + + // Exclude the bucketPrefix "folder" itself + if attrs != nil && attrs.Name == bucketPrefix+"/" { + continue + } + + return true, nil + } +} + +// List lists all objects that match the specified bucketPrefix. +func (r *OffloadingStorageGcpClient) List(ctx context.Context, bucketPrefix string) ([]string, error) { + q := gs.Query{Prefix: bucketPrefix + "/", Delimiter: "/"} + it := r.bucketClient.Objects(ctx, &q) + objects := make([]string, 0) + for { + attrs, err := it.Next() + if errors.Is(err, iterator.Done) { + return objects, nil + } + + if err != nil { + return []string{}, fmt.Errorf("list object on GCP bucket: %w", err) + } + + // Exclude the bucketPrefix "folder" itself + if attrs != nil && attrs.Name == bucketPrefix+"/" { + continue + } + + objects = append(objects, filepath.Base(attrs.Name)) + } +} diff --git a/internal/git/housekeeping/offloading_storage_gcp_test.go b/internal/git/housekeeping/offloading_storage_gcp_test.go new file mode 100644 index 0000000000..2d1ed488e5 --- /dev/null +++ b/internal/git/housekeeping/offloading_storage_gcp_test.go @@ -0,0 +1,320 @@ +package housekeeping + +import ( + "context" + "fmt" + "os" + "path/filepath" + "sync" + "testing" + + gs "cloud.google.com/go/storage" + "github.com/fullstorydev/emulators/storage/gcsemu" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" +) + +func TestOffloadingStorageGcpClient_Upload(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + + client, svr := setupGcsEmulator(t, ctx) + defer teardownGcsEmulator(t, client, svr) + + bucket := "test-bucket" + offloadingClient := NewOffloadingStorageGcpClient(client, bucket) + + for _, tc := range []struct { + desc string + + // filesToUpload is a list of files to upload + filesToUpload []string + + // filesOnLocal is a list of files actually on the local filesystem + filesOnLocal []string + expectedErrMsgs []string + }{ + { + desc: "Upload objects", + filesToUpload: []string{"obj1.txt", "obj2.txt", "obj3.txt"}, + filesOnLocal: []string{"obj1.txt", "obj2.txt", "obj3.txt"}, + // expect no error + }, + { + desc: "Upload objects but one is missing on local", + filesToUpload: []string{"obj1.txt", "obj2.txt", "obj3.txt"}, + filesOnLocal: []string{"obj1.txt", "obj2.txt"}, + expectedErrMsgs: []string{ + "", + "", + "open file: open %s/obj3.txt: no such file or directory", + }, + }, + { + desc: "No object to upload", + filesToUpload: []string{}, + filesOnLocal: []string{}, + // expect no error + }, + } { + t.Run(tc.desc, func(t *testing.T) { + dir := testhelper.TempDir(t) + + prefix := "test-upload" + testhelper.TempDir(t) + for _, f := range tc.filesOnLocal { + err := os.WriteFile(filepath.Join(dir, f), []byte("hello world"), 0o666) + require.NoError(t, err) + } + + errCh := make(chan error) + var wg sync.WaitGroup + for _, f := range tc.filesToUpload { + wg.Add(1) + go func(file string) { + defer wg.Done() + errCh <- offloadingClient.Upload(ctx, filepath.Join(dir, file), prefix) + }(f) + } + + // Close the error channel once all goroutines are done + go func() { + wg.Wait() + close(errCh) // Close the channel after all goroutines finish + }() + + // Process errors from the channel + var actualErrMsgs []string + if len(tc.expectedErrMsgs) == 0 { + for err := range errCh { + require.NoError(t, err) // Handle any errors + } + // Validate uploaded files + uploadedFiles, _ := offloadingClient.List(ctx, prefix) + require.ElementsMatch(t, tc.filesToUpload, uploadedFiles) + } else { + + // prepare error messages + + for i, f := range tc.expectedErrMsgs { + if f != "" { + tc.expectedErrMsgs[i] = fmt.Sprintf(f, dir) + } + } + + for err := range errCh { + if err == nil { + actualErrMsgs = append(actualErrMsgs, "") + } else { + actualErrMsgs = append(actualErrMsgs, err.Error()) + } + } + require.ElementsMatch(t, tc.expectedErrMsgs, actualErrMsgs) + } + }) + } +} + +func TestOffloadingStorageGcpClient_Delete(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + + client, svr := setupGcsEmulator(t, ctx) + defer teardownGcsEmulator(t, client, svr) + + bucket := "test-bucket" + offloadingClient := NewOffloadingStorageGcpClient(client, bucket) + + t.Run("Delete objects", func(t *testing.T) { + prefix := "test-delete" + testhelper.TempDir(t) + + filesToUpload := []string{"obj1.txt", "obj2.txt", "obj3.txt"} + createObjectsInOffloadingStorage(t, ctx, offloadingClient, prefix, filesToUpload) + + offloadingClient.TryDelete(ctx, prefix, filesToUpload) + + // Validate uploaded files are deleted + uploadedFiles, _ := offloadingClient.List(ctx, prefix) + require.Empty(t, uploadedFiles) + }) + + t.Run("Delete empty list", func(t *testing.T) { + prefix := "test-delete" + testhelper.TempDir(t) + + filesToUpload := []string{"obj1.txt", "obj2.txt", "obj3.txt"} + createObjectsInOffloadingStorage(t, ctx, offloadingClient, prefix, filesToUpload) + + res := offloadingClient.TryDelete(ctx, prefix, []string{}) + require.Empty(t, res) + + // Validate uploaded files are deleted + uploadedFiles, _ := offloadingClient.List(ctx, prefix) + require.ElementsMatch(t, filesToUpload, uploadedFiles) + }) + + t.Run("Delete objects", func(t *testing.T) { + prefix := "test-delete" + testhelper.TempDir(t) + + filesToUpload := []string{"obj1.txt", "obj2.txt"} + createObjectsInOffloadingStorage(t, ctx, offloadingClient, prefix, filesToUpload) + + res := offloadingClient.TryDelete(ctx, prefix, []string{"obj1.txt", "obj2.txt", "obj3.txt"}) + expectedRes := map[string]error{ + prefix + "/obj1.txt": nil, + prefix + "/obj2.txt": nil, + prefix + "/obj3.txt": gs.ErrObjectNotExist, + } + require.Equal(t, expectedRes, res) + // Validate uploaded files are deleted + uploadedFiles, _ := offloadingClient.List(ctx, prefix) + require.Empty(t, uploadedFiles) + }) +} + +func TestOffloadingStorageGcpClient_List(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + + client, svr := setupGcsEmulator(t, ctx) + defer teardownGcsEmulator(t, client, svr) + bucket := "test-bucket" + offloadingClient := NewOffloadingStorageGcpClient(client, bucket) + + // Create a placeholder object first. This is because the emulator behaves + // differently from a real GCP bucket. It requires storing something to ensure + // the bucket's existence. Without this placeholder, operations like "No existing prefix" + // and "Empty prefix" may fail with an error indicating the bucket does not exist. + // In contrast, a real GCP bucket would return no error and simply provide an empty result. + // To ensure the test behaves consistently with a real GCP environment, we add + // placeholder data here to confirm the bucket's existence. + prefix := "test-list-to-create-bucket" + filesToUpload := []string{"i-am-just-a-place-holder.txt"} + createObjectsInOffloadingStorage(t, ctx, offloadingClient, prefix, filesToUpload) + + t.Run("List existing objects", func(t *testing.T) { + prefix := "test-list-create-something" + + filesToUpload := []string{"obj1.txt", "obj2.txt", "obj3.txt"} + createObjectsInOffloadingStorage(t, ctx, offloadingClient, prefix, filesToUpload) + + // Validate uploaded files + uploadedFiles, _ := offloadingClient.List(ctx, prefix) + require.ElementsMatch(t, filesToUpload, uploadedFiles) + }) + t.Run("List empty folder", func(t *testing.T) { + // must end with "/" to be a folder + prefix := "test-list-empty-folder/" + + contentType := `inode/directory` + obj := offloadingClient.bucketClient.Object(prefix) + writer := obj.NewWriter(ctx) + writer.ObjectAttrs.ContentType = contentType + _, err := writer.Write(nil) + require.NoError(t, err) + err = writer.Close() + require.NoError(t, err) + + // Validate uploaded files + uploadedFiles, err := offloadingClient.List(ctx, prefix) + require.NoError(t, err) + require.Empty(t, uploadedFiles) + }) + + t.Run("Empty prefix", func(t *testing.T) { + prefix := "" + res, err := offloadingClient.List(ctx, prefix) + require.Empty(t, res) + require.NoError(t, err) + }) + t.Run("No existing prefix", func(t *testing.T) { + prefix := "i/dont/existing" + res, err := offloadingClient.List(ctx, prefix) + require.Empty(t, res) + require.NoError(t, err) + }) +} + +func TestOffloadingStorageGcpClient_HasObjects(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + + client, svr := setupGcsEmulator(t, ctx) + defer teardownGcsEmulator(t, client, svr) + + bucket := "test-bucket" + offloadingClient := NewOffloadingStorageGcpClient(client, bucket) + + t.Run("Existing objects", func(t *testing.T) { + prefix := "test-has-objects" + filesToUpload := []string{"obj1.txt", "obj2.txt", "obj3.txt"} + createObjectsInOffloadingStorage(t, ctx, offloadingClient, prefix, filesToUpload) + // Validate uploaded files + hasObjects, err := offloadingClient.HasObjects(ctx, prefix) + require.True(t, hasObjects) + require.NoError(t, err) + }) + t.Run("Empty folder", func(t *testing.T) { + prefix := "test-has-objects-empty-folder" + + contentType := `inode/directory` + obj := offloadingClient.bucketClient.Object(prefix) + writer := obj.NewWriter(ctx) + writer.ObjectAttrs.ContentType = contentType + _, err := writer.Write(nil) + require.NoError(t, err) + err = writer.Close() + require.NoError(t, err) + + // Validate uploaded files + hasObjects, err := offloadingClient.HasObjects(ctx, prefix) + require.False(t, hasObjects) + require.NoError(t, err) + }) +} + +// setupGcsEmulator sets up an in-memory Google bucket emulator +func setupGcsEmulator(t *testing.T, ctx context.Context) (*gs.Client, *gcsemu.Server) { + svr, err := gcsemu.NewServer("localhost:0", gcsemu.Options{}) + require.NoError(t, err) + + // gcsemu.NewClient will look at this env var to figure out what host/port to talk to + err = os.Setenv("GCS_EMULATOR_HOST", svr.Addr) //nolint + require.NoError(t, err) + + client, err := gcsemu.NewClient(ctx) + require.NoError(t, err) + return client, svr +} + +// createObjectsInOffloadingStorage creates objects in the offloading storage with the given prefix. +func createObjectsInOffloadingStorage(t *testing.T, ctx context.Context, client OffloadingStorageClient, prefix string, objects []string) { + dir := testhelper.TempDir(t) + errCh := make(chan error) + var wg sync.WaitGroup + for _, f := range objects { + wg.Add(1) + go func(file string) { + defer wg.Done() + err := os.WriteFile(filepath.Join(dir, file), []byte("hello world from "+f), 0o666) + require.NoError(t, err) + errCh <- client.Upload(ctx, filepath.Join(dir, file), prefix) + }(f) + } + + // Close the error channel once all goroutines are done + go func() { + wg.Wait() + close(errCh) // Close the channel after all goroutines finish + }() + + // Process errors from the channel + for err := range errCh { + require.NoError(t, err) // Handle any errors + } +} + +// teardownGcsEmulator close in-memory Google bucket emulator and its client +func teardownGcsEmulator(t *testing.T, client *gs.Client, svr *gcsemu.Server) { + err := client.Close() + require.NoError(t, err) + svr.Close() +} -- GitLab From cb1ba913e317dcfbf666035a02040a64e4160191 Mon Sep 17 00:00:00 2001 From: Eric Ju Date: Wed, 11 Dec 2024 22:21:55 -0500 Subject: [PATCH 03/10] Draft: Add offloading sink --- internal/offloading/sink.go | 274 ++++++++++++++++++++++++++++++++++++ 1 file changed, 274 insertions(+) create mode 100644 internal/offloading/sink.go diff --git a/internal/offloading/sink.go b/internal/offloading/sink.go new file mode 100644 index 0000000000..0aea5000d6 --- /dev/null +++ b/internal/offloading/sink.go @@ -0,0 +1,274 @@ +package offloading + +import ( + "context" + "errors" + "fmt" + "gocloud.dev/blob" + "google.golang.org/api/iterator" + "os" + "path/filepath" + "sync" + "time" + + _ "gocloud.dev/blob/azureblob" // register Azure driver + _ "gocloud.dev/blob/fileblob" // register file driver + _ "gocloud.dev/blob/gcsblob" // register Google Cloud driver + _ "gocloud.dev/blob/memblob" // register in-memory driver + _ "gocloud.dev/blob/s3blob" +) + +// Sink is a wrapper around the storage bucket used for uploading/downloading packfiles +type Sink struct { + //sink *backup.Sink + timeout time.Duration + bucket *blob.Bucket + // retry +} + +// NewSink creates a Sink from the given parameters. +func NewSink(ctx context.Context, uri string, timeout time.Duration) (*Sink, error) { + //sink, err := backup.ResolveSink(ctx, uri) + bucket, err := blob.OpenBucket(ctx, uri) + if err != nil { + return nil, fmt.Errorf("open bucket: %w", err) + } + return &Sink{ + //sink: sink, + timeout: timeout, + bucket: bucket, + }, nil +} + +// Offload + +// Rehydrate + +// upload just upload one single file to the bucket +func (r *Sink) Upload(ctx context.Context, fullFilePath string, prefix string) (returnErr error) { + // Use context timeouts to set an overall deadline for the operation, including all potential retries. + // Having a cancelable context is critical here because the retry mechanism does not have a maximum + // retry count and will continue retrying until the context is explicitly canceled. + ctx, cancel := context.WithTimeout(ctx, r.timeout) + defer cancel() + + file, err := os.Open(fullFilePath) + defer func() { + if err := file.Close(); err != nil { + returnErr = errors.Join(returnErr, fmt.Errorf("close local file writer: %w", err)) + } + }() + if err != nil { + return fmt.Errorf("open file: %w", err) + } + + objectKey := fmt.Sprintf("%s/%s", prefix, filepath.Base(fullFilePath)) + if err := r.bucket.Upload(ctx, objectKey, file, &blob.WriterOptions{ + // 'no-store' - we don't want the backup to be cached as the content could be changed, + // so we always want a fresh and up to date data + // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Cache-Control#cacheability + // 'no-transform' - disallows intermediates to modify data + // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Cache-Control#other + CacheControl: "no-store, no-transform", + ContentType: "application/octet-stream", + }); err != nil { + return fmt.Errorf("uoload object: %w", err) + } + + return nil +} + +// download +func (r *Sink) Download(ctx context.Context, objectKey string, fullFilePath string) (returnErr error) { + // Use context timeouts to set an overall deadline for the operation, including all potential retries. + // Having a cancelable context is critical here because the retry mechanism does not have a maximum + // retry count and will continue retrying until the context is explicitly canceled. + ctx, cancel := context.WithTimeout(ctx, r.timeout) + defer cancel() + + file, err := os.Create(fullFilePath) + defer func() { + if err := file.Close(); err != nil { + returnErr = errors.Join(returnErr, fmt.Errorf("close local file: %w", err)) + } + }() + if err != nil { + return fmt.Errorf("create file: %w", err) + } + + // TODO hwo to do retry? + // TODO what is the option? + if err := r.bucket.Download(ctx, objectKey, file, nil); err != nil { + return fmt.Errorf("download object: %w", err) + } + + return nil +} + +//// upload just upload one single file to the bucket +//func (r *Sink) upload(ctx context.Context, fullFilePath string, prefix string) (returnErr error) { +// // Use context timeouts to set an overall deadline for the operation, including all potential retries. +// // Having a cancelable context is critical here because the retry mechanism does not have a maximum +// // retry count and will continue retrying until the context is explicitly canceled. +// ctx, cancel := context.WithTimeout(ctx, r.timeout) +// defer cancel() +// +// file, err := os.Open(fullFilePath) +// defer func() { +// if err := file.Close(); err != nil { +// returnErr = errors.Join(returnErr, fmt.Errorf("close local file writer: %w", err)) +// } +// }() +// if err != nil { +// return fmt.Errorf("open file: %w", err) +// } +// +// objectKey := fmt.Sprintf("%s/%s", prefix, filepath.Base(fullFilePath)) +// wc, err := r.sink.GetWriter(ctx, objectKey) +// defer func() { +// if err := wc.Close(); err != nil { +// returnErr = errors.Join(returnErr, fmt.Errorf("close object writer: %w", err)) +// } +// }() +// if err != nil { +// return fmt.Errorf("get writer: %w", err) +// } +// +// _, err = io.Copy(wc, file) +// if err != nil { +// return fmt.Errorf("copy local file to object writer: %w", err) +// } +// +// return nil +//} +// +//// download +//func (r *Sink) download(ctx context.Context, objectKey string, fullFilePath string) (returnErr error) { +// // Use context timeouts to set an overall deadline for the operation, including all potential retries. +// // Having a cancelable context is critical here because the retry mechanism does not have a maximum +// // retry count and will continue retrying until the context is explicitly canceled. +// ctx, cancel := context.WithTimeout(ctx, r.timeout) +// defer cancel() +// +// file, err := os.Create(fullFilePath) +// defer func() { +// if err := file.Close(); err != nil { +// returnErr = errors.Join(returnErr, fmt.Errorf("close local file: %w", err)) +// } +// }() +// if err != nil { +// return fmt.Errorf("create file: %w", err) +// } +// +// // TODO hwo to do retry? +// // TODO what is the option? +// rc, err := r.sink.GetReader(ctx, objectKey) +// defer func() { +// if err := rc.Close(); err != nil { +// returnErr = errors.Join(returnErr, fmt.Errorf("close obeject reader: %w", err)) +// } +// }() +// if err != nil { +// return fmt.Errorf("get reader: %w", err) +// } +// _, err = io.Copy(file, rc) +// if err != nil { +// return fmt.Errorf("copy object reader to local file: %w", err) +// } +// +// return nil +//} + +// List +func (r *Sink) List(ctx context.Context, prefix string) ([]string, error) { + it := r.bucket.List(&blob.ListOptions{ + Prefix: prefix + "/", + Delimiter: "/", + }) + objects := make([]string, 0) + for { + attrs, err := it.Next(ctx) + if errors.Is(err, iterator.Done) { + return objects, nil + } + + if err != nil { + return []string{}, fmt.Errorf("list objects: %w", err) + } + + // Exclude the bucketPrefix "folder" itself + if attrs != nil && attrs.Key == prefix+"/" { + continue + } + + objects = append(objects, filepath.Base(attrs.Key)) + } + +} + +// HasObjects +func (r *Sink) HasObjects(ctx context.Context, prefix string) (bool, error) { + it := r.bucket.List(&blob.ListOptions{ + Prefix: prefix + "/", + Delimiter: "/", + }) + + for { + attrs, err := it.Next(ctx) + if errors.Is(err, iterator.Done) { + return false, nil + } + + if err != nil { + return false, fmt.Errorf("list objects: %w", err) + } + + // Exclude the bucketPrefix "folder" itself + if attrs != nil && attrs.Key == prefix+"/" { + continue + } + + return true, nil + } +} + +// DeleteObjects attempts to delete given objects within the specified bucketPrefix. +// It employs a retry mechanism to ensure objects are deleted with best-effort. +func (r *Sink) DeleteObjects(ctx context.Context, prefix string, objectNames []string) map[string]error { + res := make(map[string]error) + if len(objectNames) == 0 { + return res + } + + // Use context timeouts to set an overall deadline for the operation, including all potential retries. + // Having a cancelable context is critical here because the retry mechanism does not have a maximum + // retry count and will continue retrying until the context is explicitly canceled. + ctx, cancel := context.WithTimeout(ctx, r.timeout) + defer cancel() + wg := sync.WaitGroup{} + type deleteResult struct { + object string + err error + } + resCh := make(chan deleteResult, len(objectNames)) + + // In the context of inactive repo offloading, the number of objects are expected to be small. + // Therefore, we can afford to use a goroutine for each object. + for _, object := range objectNames { + wg.Add(1) + objectPath := fmt.Sprintf("%s/%s", prefix, filepath.Base(object)) + go func(obj string) { + defer wg.Done() + err := r.bucket.Delete(ctx, obj) + resCh <- deleteResult{object: obj, err: err} + }(objectPath) + } + + wg.Wait() + close(resCh) + for delRes := range resCh { + res[delRes.object] = delRes.err + } + + return res +} -- GitLab From f722ecb769edfa8bdb5c2c0c6092ac276aee6d59 Mon Sep 17 00:00:00 2001 From: Eric Ju Date: Fri, 13 Dec 2024 01:32:20 -0500 Subject: [PATCH 04/10] Draft: add more tests --- internal/offloading/sink.go | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/internal/offloading/sink.go b/internal/offloading/sink.go index 0aea5000d6..49efb2f161 100644 --- a/internal/offloading/sink.go +++ b/internal/offloading/sink.go @@ -5,7 +5,7 @@ import ( "errors" "fmt" "gocloud.dev/blob" - "google.golang.org/api/iterator" + "io" "os" "path/filepath" "sync" @@ -40,11 +40,7 @@ func NewSink(ctx context.Context, uri string, timeout time.Duration) (*Sink, err }, nil } -// Offload - -// Rehydrate - -// upload just upload one single file to the bucket +// TODO upload just upload one single file to the bucket func (r *Sink) Upload(ctx context.Context, fullFilePath string, prefix string) (returnErr error) { // Use context timeouts to set an overall deadline for the operation, including all potential retries. // Having a cancelable context is critical here because the retry mechanism does not have a maximum @@ -188,7 +184,7 @@ func (r *Sink) List(ctx context.Context, prefix string) ([]string, error) { objects := make([]string, 0) for { attrs, err := it.Next(ctx) - if errors.Is(err, iterator.Done) { + if errors.Is(err, io.EOF) { return objects, nil } @@ -215,7 +211,7 @@ func (r *Sink) HasObjects(ctx context.Context, prefix string) (bool, error) { for { attrs, err := it.Next(ctx) - if errors.Is(err, iterator.Done) { + if errors.Is(err, io.EOF) { return false, nil } @@ -232,8 +228,17 @@ func (r *Sink) HasObjects(ctx context.Context, prefix string) (bool, error) { } } +//func (r *Sink) IsEmpty(ctx context.Context, prefix string) (bool, error) { +// res, err := r.HasObjects(ctx, prefix) +// if err != nil { +// return false, err +// } +// return !res, nil +//} + // DeleteObjects attempts to delete given objects within the specified bucketPrefix. // It employs a retry mechanism to ensure objects are deleted with best-effort. +// Result is objectKey map to error func (r *Sink) DeleteObjects(ctx context.Context, prefix string, objectNames []string) map[string]error { res := make(map[string]error) if len(objectNames) == 0 { @@ -267,7 +272,9 @@ func (r *Sink) DeleteObjects(ctx context.Context, prefix string, objectNames []s wg.Wait() close(resCh) for delRes := range resCh { - res[delRes.object] = delRes.err + if delRes.err != nil { + res[delRes.object] = delRes.err + } } return res -- GitLab From c4c65ab87a0ea1bb95faec73c0766361138e5c35 Mon Sep 17 00:00:00 2001 From: Eric Ju Date: Fri, 13 Dec 2024 11:37:30 -0500 Subject: [PATCH 05/10] Draft: add sink test --- internal/offloading/sink_test.go | 644 +++++++++++++++++++++++++ internal/offloading/testhelper_test.go | 11 + 2 files changed, 655 insertions(+) create mode 100644 internal/offloading/sink_test.go create mode 100644 internal/offloading/testhelper_test.go diff --git a/internal/offloading/sink_test.go b/internal/offloading/sink_test.go new file mode 100644 index 0000000000..0a7ce3b3bb --- /dev/null +++ b/internal/offloading/sink_test.go @@ -0,0 +1,644 @@ +package offloading + +import ( + "errors" + "fmt" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" + "gocloud.dev/blob" + "gocloud.dev/gcerrors" + "os" + "path/filepath" + "strings" + "testing" + "time" +) + +// Test Upload +func TestSink_Upload(t *testing.T) { + t.Parallel() + + // Most basic + // Uploade text + // upload binary + + for _, tc := range []struct { + desc string + + // filesToUpload is a list of files to upload + prefix string + + filesToUpload []string + + // filesOnLocal is a list of files actually on the local filesystem + filesOnLocal map[string]string + expectedSuccessful map[string]string + expectedErrored map[string]error + }{ + { + desc: "Upload objects", + prefix: "jerry" + testhelper.TempDir(t), + filesToUpload: []string{"C-131", "C-137"}, + filesOnLocal: map[string]string{ + "C-131": "I am Mr. Frundles", + "C-137": "Why are you here?", + }, + expectedSuccessful: map[string]string{ + "C-131": "I am Mr. Frundles", + "C-137": "Why are you here?", + }, + }, + { + desc: "Upload objects but one is missing on local", + prefix: "jerry" + testhelper.TempDir(t), + filesToUpload: []string{"C-131", "C-137", "5126"}, + filesOnLocal: map[string]string{ + "C-131": "I am Mr. Frundles", + "C-137": "Why are you here?", + }, + expectedSuccessful: map[string]string{ + "C-131": "I am Mr. Frundles", + "C-137": "Why are you here?", + }, + expectedErrored: map[string]error{ + "5126": os.ErrInvalid, + }, + }, + { + desc: "prefix can be empty", + prefix: "", + filesToUpload: []string{"C-131", "C-137", "5126"}, + filesOnLocal: map[string]string{ + "C-131": "I am Mr. Frundles", + "C-137": "Why are you here?", + }, + expectedSuccessful: map[string]string{ + "C-131": "I am Mr. Frundles", + "C-137": "Why are you here?", + }, + expectedErrored: map[string]error{ + "5126": os.ErrInvalid, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + ctx := testhelper.Context(t) + sink := setupEmptyLocalBucket(t) + defer closeBucket(t, sink) + localDir := testhelper.TempDir(t) + + for k, v := range tc.filesOnLocal { + err := os.WriteFile(filepath.Join(localDir, k), []byte(v), 0o666) + require.NoError(t, err) + } + + //prefix := "jerry" + testhelper.TempDir(t) + for _, objectName := range tc.filesToUpload { + err := sink.Upload(ctx, filepath.Join(localDir, objectName), tc.prefix) + if err == nil { + objKey := tc.prefix + "/" + objectName + attr, err := sink.bucket.Attributes(ctx, objKey) + require.NoError(t, err) + require.Equal(t, attr.CacheControl, "no-store, no-transform") + require.Equal(t, attr.ContentType, "application/octet-stream") + + var builder strings.Builder + err = sink.bucket.Download(ctx, objKey, &builder, nil) + require.NoError(t, err) + require.Equal(t, tc.expectedSuccessful[objectName], builder.String()) + } else { + require.True(t, errors.Is(err, tc.expectedErrored[objectName])) + //require.Equal(t, tc.expectedErrored[objectName], err) + } + } + + }) + + } + + t.Run("local file path cannot be empty", func(t *testing.T) { + ctx := testhelper.Context(t) + sink := setupEmptyLocalBucket(t) + defer closeBucket(t, sink) + + err := sink.Upload(ctx, "", "some/prefix") + require.Error(t, err) + }) + //filesToUpload := []string{"obj1.txt", "obj2.txt"} + + //for _, f := range filesToUpload { + // err := sink.Upload(ctx, filepath.Join(dir, f), prefix) + // require.NoError(t, err) + //} + + // Get objects and verify content and arributes + //actualObjects, err := sink.List(ctx, prefix) + //require.NoError(t, err) + //require.ElementsMatch(t, filesToUpload, actualObjects) + // + //for _, obj := range actualObjects { + // objKey := prefix + "/" + obj + // attr, err := sink.bucket.Attributes(ctx, objKey) + // require.NoError(t, err) + // require.Equal(t, attr.CacheControl, "no-store, no-transform") + // require.Equal(t, attr.ContentType, "application/octet-stream") + // + // var builder strings.Builder + // err = sink.bucket.Download(ctx, objKey, &builder, nil) + // require.NoError(t, err) + // require.Equal(t, "hello world", builder.String()) + //} + +} + +// Test Download +func TestSink_Download(t *testing.T) { + ctx := testhelper.Context(t) + for _, tc := range []struct { + desc string + queryPrefix string + queryObjectNames []string + expectedContent map[string]string + expectedError map[string]gcerrors.ErrorCode + }{ + { + desc: "download all objects", + queryPrefix: "jerry", + queryObjectNames: []string{"C-131", "C-137"}, + expectedContent: map[string]string{ + "C-131": "I am Mr. Frundles", + "C-137": "Why are you here?", + }, + }, + { + desc: "Download nonexistent objects", + queryPrefix: "jerry", + queryObjectNames: []string{"5126"}, + expectedContent: map[string]string{}, + expectedError: map[string]gcerrors.ErrorCode{ + "5126": gcerrors.NotFound, + }, + }, + { + desc: "Download nonexistent folder", + queryPrefix: "nonexistent", + queryObjectNames: []string{"C-131"}, + expectedContent: map[string]string{}, + expectedError: map[string]gcerrors.ErrorCode{ + "C-131": gcerrors.NotFound, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + var sink *Sink + + prefix := "jerry" + objects := []fileBucketData{ + { + ObjectName: "C-131", + Content: "I am Mr. Frundles", + WriterOpt: blob.WriterOptions{ + ContentType: "application/octet-stream", + CacheControl: "no-store, no-transform", + }, + }, + { + ObjectName: "C-137", + Content: "Why are you here?", + WriterOpt: blob.WriterOptions{ + ContentType: "atext/html; charset=utf-8", + CacheControl: "no-store, no-transform", + }, + }, + } + sink = setupLocalBucket(t, prefix, objects) + defer closeBucket(t, sink) + localDir := testhelper.TempDir(t) + + for _, obj := range tc.queryObjectNames { + objectKey := fmt.Sprintf("%s/%s", tc.queryPrefix, obj) + localFullPath := filepath.Join(localDir, obj) + err := sink.Download(ctx, objectKey, localFullPath) + if err == nil { + require.NoError(t, err) + buf, err := os.ReadFile(localFullPath) + require.NoError(t, err) + require.Equal(t, tc.expectedContent[obj], string(buf)) + } else { + require.Equal(t, tc.expectedError[obj], gcerrors.Code(err)) + } + } + + }) + } +} + +func TestSink_Delete(t *testing.T) { + ctx := testhelper.Context(t) + + for _, tc := range []struct { + desc string + queryingPrefix string + createBucket bool + bucket string + prefix string + objects []fileBucketData + objectsToDelete []string + objectsLeft []string + expectedRes map[string]gcerrors.ErrorCode + }{ + { + desc: "delete all objects", + queryingPrefix: "jerry", + prefix: "jerry", + objects: []fileBucketData{ + { + ObjectName: "C-131", + Content: "I am Mr. Frundles", + WriterOpt: blob.WriterOptions{ + ContentType: "application/octet-stream", + CacheControl: "no-store, no-transform", + }, + }, + { + ObjectName: "C-137", + Content: "Why are you here?", + WriterOpt: blob.WriterOptions{ + ContentType: "atext/html; charset=utf-8", + CacheControl: "no-store, no-transform", + }, + }, + }, + objectsToDelete: []string{"C-131", "C-137"}, + expectedRes: map[string]gcerrors.ErrorCode{}, + }, + { + desc: "delete some objects", + queryingPrefix: "jerry", + prefix: "jerry", + objects: []fileBucketData{ + { + ObjectName: "C-131", + Content: "I am Mr. Frundles", + WriterOpt: blob.WriterOptions{ + ContentType: "application/octet-stream", + CacheControl: "no-store, no-transform", + }, + }, + { + ObjectName: "C-137", + Content: "Why are you here?", + WriterOpt: blob.WriterOptions{ + ContentType: "atext/html; charset=utf-8", + CacheControl: "no-store, no-transform", + }, + }, + }, + objectsToDelete: []string{"C-131"}, + objectsLeft: []string{"C-137"}, + expectedRes: map[string]gcerrors.ErrorCode{}, + }, + { + desc: "delete more objects than you have", + queryingPrefix: "jerry", + prefix: "jerry", + objects: []fileBucketData{ + { + ObjectName: "C-131", + Content: "I am Mr. Frundles", + WriterOpt: blob.WriterOptions{ + ContentType: "application/octet-stream", + CacheControl: "no-store, no-transform", + }, + }, + { + ObjectName: "C-137", + Content: "Why are you here?", + WriterOpt: blob.WriterOptions{ + ContentType: "atext/html; charset=utf-8", + CacheControl: "no-store, no-transform", + }, + }, + }, + objectsToDelete: []string{"C-131", "C-137", "5126"}, + objectsLeft: []string{}, + expectedRes: map[string]gcerrors.ErrorCode{ + "jerry/5126": gcerrors.NotFound, + }, + }, + { + desc: "delete nonexistent prefix", + queryingPrefix: "nonexistent", + prefix: "jerry", + objects: []fileBucketData{ + { + ObjectName: "C-131", + Content: "I am Mr. Frundles", + WriterOpt: blob.WriterOptions{ + ContentType: "application/octet-stream", + CacheControl: "no-store, no-transform", + }, + }, + }, + objectsToDelete: []string{"C-131"}, + objectsLeft: []string{"C-131"}, + expectedRes: map[string]gcerrors.ErrorCode{ + "nonexistent/C-131": gcerrors.NotFound, + }, + }, + { + desc: "empty bucket", + queryingPrefix: "i-am-empty", + prefix: "i-am-empty", + objects: []fileBucketData{}, + objectsToDelete: []string{"C-131"}, + objectsLeft: []string{}, + expectedRes: map[string]gcerrors.ErrorCode{ + "i-am-empty/C-131": gcerrors.NotFound, + }, + }, + { + desc: "delete parameter is empty", + queryingPrefix: "jerry", + prefix: "jerry", + objects: []fileBucketData{ + { + ObjectName: "C-131", + Content: "I am Mr. Frundles", + WriterOpt: blob.WriterOptions{ + ContentType: "application/octet-stream", + CacheControl: "no-store, no-transform", + }, + }, + { + ObjectName: "C-137", + Content: "Why are you here?", + WriterOpt: blob.WriterOptions{ + ContentType: "atext/html; charset=utf-8", + CacheControl: "no-store, no-transform", + }, + }, + }, + objectsToDelete: []string{}, + expectedRes: map[string]gcerrors.ErrorCode{}, + objectsLeft: []string{"C-131", "C-137"}, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + var sink *Sink + + sink = setupLocalBucket(t, tc.prefix, tc.objects) + defer closeBucket(t, sink) + + actualResWithErrorCode := make(map[string]gcerrors.ErrorCode) + res := sink.DeleteObjects(ctx, tc.queryingPrefix, tc.objectsToDelete) + fmt.Println(res) + for k, v := range res { + actualResWithErrorCode[k] = gcerrors.Code(v) + } + require.Equal(t, tc.expectedRes, actualResWithErrorCode) + + actualObjectsLeft, err := sink.List(ctx, tc.prefix) + require.NoError(t, err) + require.ElementsMatch(t, tc.objectsLeft, actualObjectsLeft) + }) + } +} + +// Test List +func TestSink_List(t *testing.T) { + ctx := testhelper.Context(t) + + for _, tc := range []struct { + desc string + queryingPrefix string + createBucket bool + bucket string + prefix string + objects []fileBucketData + expectedRes []string + expectedErr error + }{ + { + desc: "bucket with objects", + queryingPrefix: "jerry", + prefix: "jerry", + objects: []fileBucketData{ + { + ObjectName: "C-131", + Content: "I am Mr. Frundles", + WriterOpt: blob.WriterOptions{ + ContentType: "application/octet-stream", + CacheControl: "no-store, no-transform", + }, + }, + { + ObjectName: "C-137", + Content: "Why are you here?", + WriterOpt: blob.WriterOptions{ + ContentType: "atext/html; charset=utf-8", + CacheControl: "no-store, no-transform", + }, + }, + }, + expectedRes: []string{"C-131", "C-137"}, + expectedErr: nil, + }, + { + desc: "prefix with / ending", + queryingPrefix: "jerry/", + prefix: "jerry", + objects: []fileBucketData{ + { + ObjectName: "C-131", + Content: "I am Mr. Frundles", + WriterOpt: blob.WriterOptions{ + ContentType: "application/octet-stream", + CacheControl: "no-store, no-transform", + }, + }, + }, + expectedRes: []string{}, + expectedErr: nil, + }, + { + desc: "nonexistent prefix in a bucket with objects", + queryingPrefix: "nonexistent", + prefix: "jerry", + objects: []fileBucketData{ + { + ObjectName: "C-131", + Content: "I am Mr. Frundles", + WriterOpt: blob.WriterOptions{ + ContentType: "application/octet-stream", + CacheControl: "no-store, no-transform", + }, + }, + }, + expectedRes: []string{}, + expectedErr: nil, + }, + { + desc: "empty bucket", + queryingPrefix: "i-am-empty", + prefix: "i-am-empty", + objects: []fileBucketData{}, + expectedRes: []string{}, + expectedErr: nil, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + var sink *Sink + + sink = setupLocalBucket(t, tc.prefix, tc.objects) + defer closeBucket(t, sink) + + res, err := sink.List(ctx, tc.queryingPrefix) + require.NoError(t, err) + + require.Equal(t, tc.expectedRes, res) + }) + } + //sink := setupLocalBucket(t) + //defer closeBucket(t, sink) + // + //res, err := sink.List(ctx, "jerryboree") + //require.NoError(t, err) + //require.ElementsMatch(t, res, []string{"C-137", "C-131"}) +} + +func TestSink_HasObjects(t *testing.T) { + ctx := testhelper.Context(t) + + for _, tc := range []struct { + desc string + queryingPrefix string + createBucket bool + bucket string + prefix string + objects []fileBucketData + expectedRes bool + expectedErr error + }{ + { + desc: "bucket with objects", + queryingPrefix: "jerry", + prefix: "jerry", + objects: []fileBucketData{ + { + ObjectName: "C-131", + Content: "I am Mr. Frundles", + WriterOpt: blob.WriterOptions{ + ContentType: "application/octet-stream", + CacheControl: "no-store, no-transform", + }, + }, + { + ObjectName: "C-137", + Content: "Why are you here?", + WriterOpt: blob.WriterOptions{ + ContentType: "atext/html; charset=utf-8", + CacheControl: "no-store, no-transform", + }, + }, + }, + expectedRes: true, + expectedErr: nil, + }, + { + desc: "prefix with / ending", + queryingPrefix: "jerry/", + prefix: "jerry", + objects: []fileBucketData{ + { + ObjectName: "C-131", + Content: "I am Mr. Frundles", + WriterOpt: blob.WriterOptions{ + ContentType: "application/octet-stream", + CacheControl: "no-store, no-transform", + }, + }, + }, + expectedRes: false, + expectedErr: nil, + }, + { + desc: "nonexistent prefix in a bucket with objects", + queryingPrefix: "nonexistent", + prefix: "jerry", + objects: []fileBucketData{ + { + ObjectName: "C-131", + Content: "I am Mr. Frundles", + WriterOpt: blob.WriterOptions{ + ContentType: "application/octet-stream", + CacheControl: "no-store, no-transform", + }, + }, + }, + expectedRes: false, + expectedErr: nil, + }, + { + desc: "empty bucket", + queryingPrefix: "i-am-empty", + prefix: "i-am-empty", + objects: []fileBucketData{}, + expectedRes: false, + expectedErr: nil, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + var sink *Sink + + sink = setupLocalBucket(t, tc.prefix, tc.objects) + defer closeBucket(t, sink) + + res, err := sink.HasObjects(ctx, tc.queryingPrefix) + require.NoError(t, err) + require.Equal(t, tc.expectedRes, res) + }) + } +} + +// Setup Bucket based on local file system +func setupEmptyLocalBucket(t *testing.T) *Sink { + ctx := testhelper.Context(t) + localBucket := testhelper.TempDir(t) + localBucketUri := fmt.Sprintf("file://%s", localBucket) + sink, err := NewSink(ctx, localBucketUri, 60*time.Second) + require.NoError(t, err) + return sink +} + +// Add file to bucket +type fileBucketData struct { + ObjectName string + Content string + WriterOpt blob.WriterOptions +} + +func setupLocalBucket(t *testing.T, prefix string, objectsToUpload []fileBucketData) *Sink { + + ctx := testhelper.Context(t) + bucket := testhelper.TempDir(t) + localBucketUri := fmt.Sprintf("file://%s", bucket) + sink, err := NewSink(ctx, localBucketUri, 60*time.Second) + require.NoError(t, err) + + for _, obj := range objectsToUpload { + objectKey := filepath.Join(prefix, obj.ObjectName) + content := strings.NewReader(obj.Content) + require.NoError(t, err) + err = sink.bucket.Upload(ctx, objectKey, content, &obj.WriterOpt) + require.NoError(t, err) + } + + return sink +} + +// Clean up bucket ?? +func closeBucket(t *testing.T, sink *Sink) { + err := sink.bucket.Close() + require.NoError(t, err) +} diff --git a/internal/offloading/testhelper_test.go b/internal/offloading/testhelper_test.go new file mode 100644 index 0000000000..911b7f1184 --- /dev/null +++ b/internal/offloading/testhelper_test.go @@ -0,0 +1,11 @@ +package offloading + +import ( + "testing" + + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" +) + +func TestMain(m *testing.M) { + testhelper.Run(m) +} -- GitLab From 6c61eb986f615fe62afe520f07ae2755f2f90171 Mon Sep 17 00:00:00 2001 From: Eric Ju Date: Mon, 16 Dec 2024 13:16:31 -0500 Subject: [PATCH 06/10] Draft: add backoff --- internal/offloading/sink.go | 95 ++++++++++++++++++++++++-------- internal/offloading/sink_test.go | 4 +- 2 files changed, 75 insertions(+), 24 deletions(-) diff --git a/internal/offloading/sink.go b/internal/offloading/sink.go index 49efb2f161..be7e0a419b 100644 --- a/internal/offloading/sink.go +++ b/internal/offloading/sink.go @@ -4,8 +4,10 @@ import ( "context" "errors" "fmt" + "gitlab.com/gitlab-org/gitaly/v16/internal/backoff" "gocloud.dev/blob" "io" + "math/rand" "os" "path/filepath" "sync" @@ -21,13 +23,14 @@ import ( // Sink is a wrapper around the storage bucket used for uploading/downloading packfiles type Sink struct { //sink *backup.Sink - timeout time.Duration - bucket *blob.Bucket - // retry + timeout time.Duration + bucket *blob.Bucket + retry backoff.Strategy + maxRetry uint } // NewSink creates a Sink from the given parameters. -func NewSink(ctx context.Context, uri string, timeout time.Duration) (*Sink, error) { +func NewSink(ctx context.Context, uri string, timeout time.Duration, maxRetry uint) (*Sink, error) { //sink, err := backup.ResolveSink(ctx, uri) bucket, err := blob.OpenBucket(ctx, uri) if err != nil { @@ -35,8 +38,10 @@ func NewSink(ctx context.Context, uri string, timeout time.Duration) (*Sink, err } return &Sink{ //sink: sink, - timeout: timeout, - bucket: bucket, + timeout: timeout, + bucket: bucket, + retry: backoff.NewDefaultExponential(rand.New(rand.NewSource(time.Now().UnixNano()))), + maxRetry: maxRetry, }, nil } @@ -59,18 +64,34 @@ func (r *Sink) Upload(ctx context.Context, fullFilePath string, prefix string) ( } objectKey := fmt.Sprintf("%s/%s", prefix, filepath.Base(fullFilePath)) - if err := r.bucket.Upload(ctx, objectKey, file, &blob.WriterOptions{ - // 'no-store' - we don't want the backup to be cached as the content could be changed, - // so we always want a fresh and up to date data - // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Cache-Control#cacheability - // 'no-transform' - disallows intermediates to modify data - // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Cache-Control#other - CacheControl: "no-store, no-transform", - ContentType: "application/octet-stream", - }); err != nil { - return fmt.Errorf("uoload object: %w", err) + for retry := uint(0); retry <= r.maxRetry; { + err = r.bucket.Upload(ctx, objectKey, file, &blob.WriterOptions{ + // 'no-store' - we don't want the backup to be cached as the content could be changed, + // so we always want a fresh and up to date data + // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Cache-Control#cacheability + // 'no-transform' - disallows intermediates to modify data + // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Cache-Control#other + CacheControl: "no-store, no-transform", + ContentType: "application/octet-stream", + ContentMD5: make([]byte, 0), + }) + if err == nil { + break + } else { + timer := time.NewTimer(r.retry.Backoff(retry)) + select { + case <-ctx.Done(): + timer.Stop() + return fmt.Errorf("uoload object cancelled %q: %w", objectKey, err) + case <-timer.C: + // Refresh timer expires, issue another try. + retry++ + } + } + } + if err != nil { + return fmt.Errorf("upload object %q: %w", objectKey, err) } - return nil } @@ -92,12 +113,25 @@ func (r *Sink) Download(ctx context.Context, objectKey string, fullFilePath stri return fmt.Errorf("create file: %w", err) } - // TODO hwo to do retry? - // TODO what is the option? - if err := r.bucket.Download(ctx, objectKey, file, nil); err != nil { + for retry := uint(0); retry <= r.maxRetry; { + err = r.bucket.Download(ctx, objectKey, file, nil) + if err == nil { + break + } else { + timer := time.NewTimer(r.retry.Backoff(retry)) + select { + case <-ctx.Done(): + timer.Stop() + return fmt.Errorf("download object cancelled %q: %w", objectKey, err) + case <-timer.C: + // Refresh timer expires, issue another try. + retry++ + } + } + } + if err != nil { return fmt.Errorf("download object: %w", err) } - return nil } @@ -264,7 +298,24 @@ func (r *Sink) DeleteObjects(ctx context.Context, prefix string, objectNames []s objectPath := fmt.Sprintf("%s/%s", prefix, filepath.Base(object)) go func(obj string) { defer wg.Done() - err := r.bucket.Delete(ctx, obj) + var err error + for retry := uint(0); retry <= r.maxRetry; { + err = r.bucket.Delete(ctx, obj) + if err == nil { + break + } else { + timer := time.NewTimer(r.retry.Backoff(retry)) + select { + case <-ctx.Done(): + timer.Stop() + err = fmt.Errorf("delete object cancelled %q: %w", obj, err) + return + case <-timer.C: + // Refresh timer expires, issue another try. + retry++ + } + } + } resCh <- deleteResult{object: obj, err: err} }(objectPath) } diff --git a/internal/offloading/sink_test.go b/internal/offloading/sink_test.go index 0a7ce3b3bb..4dfa3b138d 100644 --- a/internal/offloading/sink_test.go +++ b/internal/offloading/sink_test.go @@ -606,7 +606,7 @@ func setupEmptyLocalBucket(t *testing.T) *Sink { ctx := testhelper.Context(t) localBucket := testhelper.TempDir(t) localBucketUri := fmt.Sprintf("file://%s", localBucket) - sink, err := NewSink(ctx, localBucketUri, 60*time.Second) + sink, err := NewSink(ctx, localBucketUri, 60*time.Second, 1) require.NoError(t, err) return sink } @@ -623,7 +623,7 @@ func setupLocalBucket(t *testing.T, prefix string, objectsToUpload []fileBucketD ctx := testhelper.Context(t) bucket := testhelper.TempDir(t) localBucketUri := fmt.Sprintf("file://%s", bucket) - sink, err := NewSink(ctx, localBucketUri, 60*time.Second) + sink, err := NewSink(ctx, localBucketUri, 60*time.Second, 1) require.NoError(t, err) for _, obj := range objectsToUpload { -- GitLab From 49dff7576ab98cb2cc87d8ce148958841def20d9 Mon Sep 17 00:00:00 2001 From: Eric Ju Date: Mon, 16 Dec 2024 23:52:00 -0500 Subject: [PATCH 07/10] Draft: simulated bucket and test with timeout and cancel --- internal/offloading/simulation_bucket.go | 110 ++++++++++++ internal/offloading/sink.go | 46 +++-- internal/offloading/sink_test.go | 219 ++++++++++++++++++++--- 3 files changed, 332 insertions(+), 43 deletions(-) create mode 100644 internal/offloading/simulation_bucket.go diff --git a/internal/offloading/simulation_bucket.go b/internal/offloading/simulation_bucket.go new file mode 100644 index 0000000000..9e936285ab --- /dev/null +++ b/internal/offloading/simulation_bucket.go @@ -0,0 +1,110 @@ +package offloading + +import ( + "context" + "errors" + "gocloud.dev/blob" + "io" + "sync" + + "time" +) + +// Mock bucket drive based on fileblob, +// We intercepet upload, download and delete operations with delay and error + +var ( + SimulationErrorUploadCanceled = errors.New("upload canceled") + SimulationErrorDownloadCanceled = errors.New("download canceled") +) + +type SimulationBucket struct { + SimulationOn bool + Simulation map[string][]Simulation + retryStat map[string]int + mu sync.Mutex + *blob.Bucket +} + +type Simulation struct { + + // each element is the N-th retry's simulated duration + Delay time.Duration + Err error +} + +// Client use this https://github.com/google/go-cloud/blob/9e81e8d4f2a83fc70eb74643da0485273ab2480d/blob/blob.go#L688 +// to create a mock bucket + +func NewSimulationBucket(bucket *blob.Bucket, simulationOn bool, simulation map[string][]Simulation) (Bucket, error) { + //b, err := fileblob.OpenBucket(dir, nil) + //if err != nil { + // return nil, err + //} + m := &SimulationBucket{ + Simulation: simulation, + Bucket: bucket, + SimulationOn: simulationOn, + retryStat: make(map[string]int), + mu: sync.Mutex{}, + } + return m, nil +} + +func (r *SimulationBucket) Download(ctx context.Context, key string, writer io.Writer, opts *blob.ReaderOptions) error { + simulation, found := r.Simulation[key] + if !r.SimulationOn || !found { + return r.Bucket.Download(ctx, key, writer, opts) + } + + r.mu.Lock() + retryIndex, found := r.retryStat[key] + if found { + r.retryStat[key] = r.retryStat[key] + 1 + } else { + r.retryStat[key] = 1 + } + thisSimulation := simulation[retryIndex] + timer := time.NewTimer(thisSimulation.Delay) + r.mu.Unlock() + + select { + case <-ctx.Done(): + return SimulationErrorDownloadCanceled + case <-timer.C: + if thisSimulation.Err != nil { + return thisSimulation.Err + } + return r.Bucket.Download(ctx, key, writer, opts) + } + +} + +func (r *SimulationBucket) Upload(ctx context.Context, key string, reader io.Reader, opts *blob.WriterOptions) error { + simulation, found := r.Simulation[key] + if !r.SimulationOn || !found { + return r.Bucket.Upload(ctx, key, reader, opts) + } + + r.mu.Lock() + retryIndex, found := r.retryStat[key] + if found { + r.retryStat[key] = r.retryStat[key] + 1 + } else { + r.retryStat[key] = 1 + } + thisSimulation := simulation[retryIndex] + timer := time.NewTimer(thisSimulation.Delay) + r.mu.Unlock() + + select { + case <-ctx.Done(): + return SimulationErrorUploadCanceled + case <-timer.C: + if thisSimulation.Err != nil { + return thisSimulation.Err + } + return r.Bucket.Upload(ctx, key, reader, opts) + } + +} diff --git a/internal/offloading/sink.go b/internal/offloading/sink.go index be7e0a419b..484c9980f2 100644 --- a/internal/offloading/sink.go +++ b/internal/offloading/sink.go @@ -20,28 +20,41 @@ import ( _ "gocloud.dev/blob/s3blob" ) +type Bucket interface { + Download(ctx context.Context, key string, w io.Writer, opts *blob.ReaderOptions) error + Upload(ctx context.Context, key string, r io.Reader, opts *blob.WriterOptions) error + List(opts *blob.ListOptions) *blob.ListIterator + Delete(ctx context.Context, key string) (err error) + Attributes(ctx context.Context, key string) (*blob.Attributes, error) + Close() error +} + // Sink is a wrapper around the storage bucket used for uploading/downloading packfiles type Sink struct { //sink *backup.Sink - timeout time.Duration - bucket *blob.Bucket - retry backoff.Strategy - maxRetry uint + timeout time.Duration + bucket Bucket + retry backoff.Strategy + + // maxRetry is 0 mean no retry + maxRetry uint + retryTimeout time.Duration } // NewSink creates a Sink from the given parameters. -func NewSink(ctx context.Context, uri string, timeout time.Duration, maxRetry uint) (*Sink, error) { +func NewSink(ctx context.Context, bucket Bucket, timeout time.Duration, maxRetry uint, retryTimeout time.Duration) (*Sink, error) { //sink, err := backup.ResolveSink(ctx, uri) - bucket, err := blob.OpenBucket(ctx, uri) - if err != nil { - return nil, fmt.Errorf("open bucket: %w", err) - } + //bucket, err := blob.OpenBucket(ctx, uri) + //if err != nil { + // return nil, fmt.Errorf("open bucket: %w", err) + //} return &Sink{ //sink: sink, - timeout: timeout, - bucket: bucket, - retry: backoff.NewDefaultExponential(rand.New(rand.NewSource(time.Now().UnixNano()))), - maxRetry: maxRetry, + timeout: timeout, + bucket: bucket, + retry: backoff.NewDefaultExponential(rand.New(rand.NewSource(time.Now().UnixNano()))), + maxRetry: maxRetry, + retryTimeout: retryTimeout, }, nil } @@ -51,6 +64,7 @@ func (r *Sink) Upload(ctx context.Context, fullFilePath string, prefix string) ( // Having a cancelable context is critical here because the retry mechanism does not have a maximum // retry count and will continue retrying until the context is explicitly canceled. ctx, cancel := context.WithTimeout(ctx, r.timeout) + //var cancel context.CancelFunc defer cancel() file, err := os.Open(fullFilePath) @@ -65,7 +79,8 @@ func (r *Sink) Upload(ctx context.Context, fullFilePath string, prefix string) ( objectKey := fmt.Sprintf("%s/%s", prefix, filepath.Base(fullFilePath)) for retry := uint(0); retry <= r.maxRetry; { - err = r.bucket.Upload(ctx, objectKey, file, &blob.WriterOptions{ + operationCtx, operationCancel := context.WithTimeout(ctx, r.retryTimeout) + err = r.bucket.Upload(operationCtx, objectKey, file, &blob.WriterOptions{ // 'no-store' - we don't want the backup to be cached as the content could be changed, // so we always want a fresh and up to date data // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Cache-Control#cacheability @@ -75,7 +90,8 @@ func (r *Sink) Upload(ctx context.Context, fullFilePath string, prefix string) ( ContentType: "application/octet-stream", ContentMD5: make([]byte, 0), }) - if err == nil { + operationCancel() + if err == nil || r.maxRetry == 0 { break } else { timer := time.NewTimer(r.retry.Backoff(retry)) diff --git a/internal/offloading/sink_test.go b/internal/offloading/sink_test.go index 4dfa3b138d..502d7ee4f8 100644 --- a/internal/offloading/sink_test.go +++ b/internal/offloading/sink_test.go @@ -1,6 +1,7 @@ package offloading import ( + "context" "errors" "fmt" "github.com/stretchr/testify/require" @@ -18,10 +19,6 @@ import ( func TestSink_Upload(t *testing.T) { t.Parallel() - // Most basic - // Uploade text - // upload binary - for _, tc := range []struct { desc string @@ -116,6 +113,8 @@ func TestSink_Upload(t *testing.T) { } + // Duplicated key + t.Run("local file path cannot be empty", func(t *testing.T) { ctx := testhelper.Context(t) sink := setupEmptyLocalBucket(t) @@ -124,30 +123,190 @@ func TestSink_Upload(t *testing.T) { err := sink.Upload(ctx, "", "some/prefix") require.Error(t, err) }) - //filesToUpload := []string{"obj1.txt", "obj2.txt"} - //for _, f := range filesToUpload { - // err := sink.Upload(ctx, filepath.Join(dir, f), prefix) - // require.NoError(t, err) - //} + t.Run("upload failed with overall timer timeout no retry", func(t *testing.T) { + ctx := testhelper.Context(t) - // Get objects and verify content and arributes - //actualObjects, err := sink.List(ctx, prefix) - //require.NoError(t, err) - //require.ElementsMatch(t, filesToUpload, actualObjects) - // - //for _, obj := range actualObjects { - // objKey := prefix + "/" + obj - // attr, err := sink.bucket.Attributes(ctx, objKey) - // require.NoError(t, err) - // require.Equal(t, attr.CacheControl, "no-store, no-transform") - // require.Equal(t, attr.ContentType, "application/octet-stream") - // - // var builder strings.Builder - // err = sink.bucket.Download(ctx, objKey, &builder, nil) - // require.NoError(t, err) - // require.Equal(t, "hello world", builder.String()) - //} + localBucket := testhelper.TempDir(t) + localBucketUri := fmt.Sprintf("file://%s", localBucket) + bucket, err := blob.OpenBucket(ctx, localBucketUri) + require.NoError(t, err) + simulation := map[string][]Simulation{ + "some/prefix/a_very_long_delay_key": { + { + Delay: 10 * time.Second, + Err: nil, + }, + }, + } + simuBucket, err := NewSimulationBucket(bucket, true, simulation) + require.NoError(t, err) + + // 100ms is the overall timeout, 5s is the timeout for each upload + sink, err := NewSink(ctx, simuBucket, 100*time.Millisecond, 0, 5*time.Second) + require.NoError(t, err) + defer closeBucket(t, sink) + + localDir := testhelper.TempDir(t) + err = os.WriteFile(filepath.Join(localDir, "a_very_long_delay_key"), []byte("Go long!"), 0o666) + require.NoError(t, err) + err = sink.Upload(ctx, filepath.Join(localDir, "a_very_long_delay_key"), "some/prefix") + require.ErrorIs(t, err, SimulationErrorUploadCanceled) + }) + + t.Run("upload timeout success with retry", func(t *testing.T) { + ctx := testhelper.Context(t) + + localBucket := testhelper.TempDir(t) + localBucketUri := fmt.Sprintf("file://%s", localBucket) + bucket, err := blob.OpenBucket(ctx, localBucketUri) + require.NoError(t, err) + simulation := map[string][]Simulation{ + "some/prefix/a_very_long_delay_key": { + {300 * time.Millisecond, nil}, + {300 * time.Millisecond, nil}, + {10 * time.Millisecond, nil}, + }, + } + simuBucket, err := NewSimulationBucket(bucket, true, simulation) + require.NoError(t, err) + + sink, err := NewSink(ctx, simuBucket, 60*time.Second, 2, 100*time.Millisecond) + require.NoError(t, err) + defer closeBucket(t, sink) + + localDir := testhelper.TempDir(t) + err = os.WriteFile(filepath.Join(localDir, "a_very_long_delay_key"), []byte("Go long!"), 0o666) + require.NoError(t, err) + err = sink.Upload(ctx, filepath.Join(localDir, "a_very_long_delay_key"), "some/prefix") + require.NoError(t, err) + }) + + t.Run("upload failed with operation timer timeout and with retry", func(t *testing.T) { + ctx := testhelper.Context(t) + + localBucket := testhelper.TempDir(t) + localBucketUri := fmt.Sprintf("file://%s", localBucket) + bucket, err := blob.OpenBucket(ctx, localBucketUri) + require.NoError(t, err) + simulation := map[string][]Simulation{ + "some/prefix/a_very_long_delay_key": { + {1 * time.Second, nil}, + {1 * time.Second, nil}, + {1 * time.Second, nil}, + }, + } + simuBucket, err := NewSimulationBucket(bucket, true, simulation) + require.NoError(t, err) + sink, err := NewSink(ctx, simuBucket, 60*time.Second, 2, 50*time.Millisecond) + require.NoError(t, err) + defer closeBucket(t, sink) + + localDir := testhelper.TempDir(t) + err = os.WriteFile(filepath.Join(localDir, "a_very_long_delay_key"), []byte("Go long!"), 0o666) + require.NoError(t, err) + err = sink.Upload(ctx, filepath.Join(localDir, "a_very_long_delay_key"), "some/prefix") + require.ErrorIs(t, err, SimulationErrorUploadCanceled) + }) + + t.Run("upload failed with overall cancel triggered no retry", func(t *testing.T) { + ctx := testhelper.Context(t) + + localBucket := testhelper.TempDir(t) + localBucketUri := fmt.Sprintf("file://%s", localBucket) + bucket, err := blob.OpenBucket(ctx, localBucketUri) + require.NoError(t, err) + simulation := map[string][]Simulation{ + "some/prefix/a_very_long_delay_key": { + //Delay: []time.Duration{1 * time.Second}, + //Err: nil, + {1 * time.Second, nil}, + }, + } + simuBucket, err := NewSimulationBucket(bucket, true, simulation) + require.NoError(t, err) + ctx, cancel := context.WithCancel(ctx) + + // the overall timer and operation timer are long enough + sink, err := NewSink(ctx, simuBucket, 100*time.Second, 0, 5*time.Second) + require.NoError(t, err) + defer closeBucket(t, sink) + + localDir := testhelper.TempDir(t) + err = os.WriteFile(filepath.Join(localDir, "a_very_long_delay_key"), []byte("Go long!"), 0o666) + require.NoError(t, err) + errCh := make(chan error) + go func() { + errCh <- sink.Upload(ctx, filepath.Join(localDir, "a_very_long_delay_key"), "some/prefix") + }() + + // trigger cancel + cancel() + err = <-errCh + require.ErrorIs(t, err, SimulationErrorUploadCanceled) + }) + + t.Run("upload success with simulated cancel and retry", func(t *testing.T) { + ctx := testhelper.Context(t) + + localBucket := testhelper.TempDir(t) + localBucketUri := fmt.Sprintf("file://%s", localBucket) + bucket, err := blob.OpenBucket(ctx, localBucketUri) + require.NoError(t, err) + simulation := map[string][]Simulation{ + "some/prefix/a_very_long_delay_key": { + //Delay: []time.Duration{1 * time.Second}, + //Err: nil, + {500 * time.Millisecond, SimulationErrorUploadCanceled}, + {500 * time.Millisecond, SimulationErrorUploadCanceled}, + {500 * time.Millisecond, nil}, + }, + } + simuBucket, err := NewSimulationBucket(bucket, true, simulation) + require.NoError(t, err) + + // the overall timer and operation timer are long enough + sink, err := NewSink(ctx, simuBucket, 100*time.Second, 2, 5*time.Second) + require.NoError(t, err) + defer closeBucket(t, sink) + + localDir := testhelper.TempDir(t) + err = os.WriteFile(filepath.Join(localDir, "a_very_long_delay_key"), []byte("Go long!"), 0o666) + require.NoError(t, err) + err = sink.Upload(ctx, filepath.Join(localDir, "a_very_long_delay_key"), "some/prefix") + require.NoError(t, err) + }) + + t.Run("upload failed with simulated cancel and retry", func(t *testing.T) { + ctx := testhelper.Context(t) + + localBucket := testhelper.TempDir(t) + localBucketUri := fmt.Sprintf("file://%s", localBucket) + bucket, err := blob.OpenBucket(ctx, localBucketUri) + require.NoError(t, err) + simulation := map[string][]Simulation{ + "some/prefix/a_very_long_delay_key": { + //Delay: []time.Duration{1 * time.Second}, + //Err: nil, + {500 * time.Millisecond, SimulationErrorUploadCanceled}, + {500 * time.Millisecond, SimulationErrorUploadCanceled}, + {500 * time.Millisecond, SimulationErrorUploadCanceled}, + }, + } + simuBucket, err := NewSimulationBucket(bucket, true, simulation) + require.NoError(t, err) + + // the overall timer and operation timer are long enough + sink, err := NewSink(ctx, simuBucket, 100*time.Second, 2, 5*time.Second) + require.NoError(t, err) + defer closeBucket(t, sink) + + localDir := testhelper.TempDir(t) + err = os.WriteFile(filepath.Join(localDir, "a_very_long_delay_key"), []byte("Go long!"), 0o666) + require.NoError(t, err) + err = sink.Upload(ctx, filepath.Join(localDir, "a_very_long_delay_key"), "some/prefix") + require.ErrorIs(t, err, SimulationErrorUploadCanceled) + }) } @@ -606,7 +765,9 @@ func setupEmptyLocalBucket(t *testing.T) *Sink { ctx := testhelper.Context(t) localBucket := testhelper.TempDir(t) localBucketUri := fmt.Sprintf("file://%s", localBucket) - sink, err := NewSink(ctx, localBucketUri, 60*time.Second, 1) + bucket, err := blob.OpenBucket(ctx, localBucketUri) + require.NoError(t, err) + sink, err := NewSink(ctx, bucket, 60*time.Second, 1, 3*time.Second) require.NoError(t, err) return sink } @@ -623,7 +784,9 @@ func setupLocalBucket(t *testing.T, prefix string, objectsToUpload []fileBucketD ctx := testhelper.Context(t) bucket := testhelper.TempDir(t) localBucketUri := fmt.Sprintf("file://%s", bucket) - sink, err := NewSink(ctx, localBucketUri, 60*time.Second, 1) + localBucket, err := blob.OpenBucket(ctx, localBucketUri) + require.NoError(t, err) + sink, err := NewSink(ctx, localBucket, 60*time.Second, 1, 3*time.Second) require.NoError(t, err) for _, obj := range objectsToUpload { -- GitLab From 838397d6091e9312a5aaad77ef4d706595735cdf Mon Sep 17 00:00:00 2001 From: Eric Ju Date: Tue, 17 Dec 2024 01:49:18 -0500 Subject: [PATCH 08/10] Draft: re org test with cancel and retry --- internal/offloading/sink_test.go | 282 +++++++++++++++---------------- 1 file changed, 136 insertions(+), 146 deletions(-) diff --git a/internal/offloading/sink_test.go b/internal/offloading/sink_test.go index 502d7ee4f8..dd5b1bb63c 100644 --- a/internal/offloading/sink_test.go +++ b/internal/offloading/sink_test.go @@ -113,113 +113,165 @@ func TestSink_Upload(t *testing.T) { } - // Duplicated key - - t.Run("local file path cannot be empty", func(t *testing.T) { + t.Run("upload duplicated key whose writer finished later wins", func(t *testing.T) { ctx := testhelper.Context(t) + localDirLoser := testhelper.TempDir(t) + err := os.WriteFile(filepath.Join(localDirLoser, "i_am_key"), []byte("hello"), 0o666) + require.NoError(t, err) + localDirWinner := testhelper.TempDir(t) + err = os.WriteFile(filepath.Join(localDirWinner, "i_am_key"), []byte("world"), 0o666) sink := setupEmptyLocalBucket(t) - defer closeBucket(t, sink) - err := sink.Upload(ctx, "", "some/prefix") - require.Error(t, err) - }) + resCh := make(chan error, 2) + go func() { + time.Sleep(200 * time.Millisecond) + resCh <- sink.Upload(ctx, filepath.Join(localDirLoser, "i_am_key"), "some/prefix") + }() + go func() { + time.Sleep(300 * time.Millisecond) + resCh <- sink.Upload(ctx, filepath.Join(localDirWinner, "i_am_key"), "some/prefix") + }() - t.Run("upload failed with overall timer timeout no retry", func(t *testing.T) { - ctx := testhelper.Context(t) + <-resCh + <-resCh - localBucket := testhelper.TempDir(t) - localBucketUri := fmt.Sprintf("file://%s", localBucket) - bucket, err := blob.OpenBucket(ctx, localBucketUri) + var builder strings.Builder + err = sink.bucket.Download(ctx, "some/prefix/i_am_key", &builder, nil) require.NoError(t, err) - simulation := map[string][]Simulation{ - "some/prefix/a_very_long_delay_key": { - { - Delay: 10 * time.Second, - Err: nil, - }, - }, - } - simuBucket, err := NewSimulationBucket(bucket, true, simulation) - require.NoError(t, err) - - // 100ms is the overall timeout, 5s is the timeout for each upload - sink, err := NewSink(ctx, simuBucket, 100*time.Millisecond, 0, 5*time.Second) - require.NoError(t, err) - defer closeBucket(t, sink) + require.Equal(t, "world", builder.String()) - localDir := testhelper.TempDir(t) - err = os.WriteFile(filepath.Join(localDir, "a_very_long_delay_key"), []byte("Go long!"), 0o666) - require.NoError(t, err) - err = sink.Upload(ctx, filepath.Join(localDir, "a_very_long_delay_key"), "some/prefix") - require.ErrorIs(t, err, SimulationErrorUploadCanceled) }) - t.Run("upload timeout success with retry", func(t *testing.T) { + t.Run("local file path cannot be empty", func(t *testing.T) { ctx := testhelper.Context(t) - - localBucket := testhelper.TempDir(t) - localBucketUri := fmt.Sprintf("file://%s", localBucket) - bucket, err := blob.OpenBucket(ctx, localBucketUri) - require.NoError(t, err) - simulation := map[string][]Simulation{ - "some/prefix/a_very_long_delay_key": { - {300 * time.Millisecond, nil}, - {300 * time.Millisecond, nil}, - {10 * time.Millisecond, nil}, - }, - } - simuBucket, err := NewSimulationBucket(bucket, true, simulation) - require.NoError(t, err) - - sink, err := NewSink(ctx, simuBucket, 60*time.Second, 2, 100*time.Millisecond) - require.NoError(t, err) + sink := setupEmptyLocalBucket(t) defer closeBucket(t, sink) - localDir := testhelper.TempDir(t) - err = os.WriteFile(filepath.Join(localDir, "a_very_long_delay_key"), []byte("Go long!"), 0o666) - require.NoError(t, err) - err = sink.Upload(ctx, filepath.Join(localDir, "a_very_long_delay_key"), "some/prefix") - require.NoError(t, err) + err := sink.Upload(ctx, "", "some/prefix") + require.Error(t, err) }) - t.Run("upload failed with operation timer timeout and with retry", func(t *testing.T) { - ctx := testhelper.Context(t) +} - localBucket := testhelper.TempDir(t) - localBucketUri := fmt.Sprintf("file://%s", localBucket) - bucket, err := blob.OpenBucket(ctx, localBucketUri) - require.NoError(t, err) - simulation := map[string][]Simulation{ - "some/prefix/a_very_long_delay_key": { +func TestSink_Upload_Timeout_Cancellation_And_Retry(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + + for _, tc := range []struct { + desc string + maxRetry uint + overallTimeout time.Duration + + // operationTimeOut is the timer for each operation e.g. upload, download + operationTimeOut time.Duration + objectName string + simulationData []Simulation + expectedError error + }{ + { + desc: "upload failed with overall timer timeout no retry", + maxRetry: 0, + overallTimeout: 100 * time.Millisecond, + operationTimeOut: 5 * time.Second, + objectName: "overall_timeout_obj_key", + simulationData: []Simulation{ + {Delay: 10 * time.Second, Err: nil}, + }, + expectedError: SimulationErrorUploadCanceled, + }, + { + desc: "upload timeout success with retry", + maxRetry: 2, + overallTimeout: 60 * time.Second, + operationTimeOut: 100 * time.Millisecond, + objectName: "success_with_retry", + simulationData: []Simulation{ + {300 * time.Millisecond, nil}, // this will timeout + {300 * time.Millisecond, nil}, // this will timeout + {10 * time.Millisecond, nil}, // this should succeed + }, + expectedError: nil, + }, + { + desc: "upload failed with operation timer timeout and with retry", + maxRetry: 2, + overallTimeout: 60 * time.Second, + operationTimeOut: 50 * time.Millisecond, + objectName: "failed_even_retry", + simulationData: []Simulation{ {1 * time.Second, nil}, {1 * time.Second, nil}, {1 * time.Second, nil}, }, - } - simuBucket, err := NewSimulationBucket(bucket, true, simulation) - require.NoError(t, err) - sink, err := NewSink(ctx, simuBucket, 60*time.Second, 2, 50*time.Millisecond) - require.NoError(t, err) - defer closeBucket(t, sink) + expectedError: SimulationErrorUploadCanceled, + }, + { + desc: "upload success with simulated cancel and retry", + maxRetry: 2, + overallTimeout: 60 * time.Second, + operationTimeOut: 10 * time.Second, + objectName: "success_with_cancel_and_retry", + simulationData: []Simulation{ + {500 * time.Millisecond, SimulationErrorUploadCanceled}, + {500 * time.Millisecond, SimulationErrorUploadCanceled}, + {500 * time.Millisecond, nil}, + }, + expectedError: nil, + }, + { + desc: "upload failed with simulated cancel and retry", + maxRetry: 2, + overallTimeout: 60 * time.Second, + operationTimeOut: 10 * time.Second, + objectName: "success_with_cancel_and_retry", + simulationData: []Simulation{ + {500 * time.Millisecond, SimulationErrorUploadCanceled}, + {500 * time.Millisecond, SimulationErrorUploadCanceled}, + {500 * time.Millisecond, SimulationErrorUploadCanceled}, + }, + expectedError: SimulationErrorUploadCanceled, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + prefix := "some/prefix" + localBucket := testhelper.TempDir(t) + localBucketUri := fmt.Sprintf("file://%s", localBucket) + bucket, err := blob.OpenBucket(ctx, localBucketUri) + require.NoError(t, err) - localDir := testhelper.TempDir(t) - err = os.WriteFile(filepath.Join(localDir, "a_very_long_delay_key"), []byte("Go long!"), 0o666) - require.NoError(t, err) - err = sink.Upload(ctx, filepath.Join(localDir, "a_very_long_delay_key"), "some/prefix") - require.ErrorIs(t, err, SimulationErrorUploadCanceled) - }) + objectKey := fmt.Sprintf("%s/%s", prefix, tc.objectName) + simulation := map[string][]Simulation{ + objectKey: tc.simulationData, + } + simuBucket, err := NewSimulationBucket(bucket, true, simulation) + require.NoError(t, err) - t.Run("upload failed with overall cancel triggered no retry", func(t *testing.T) { - ctx := testhelper.Context(t) + sink, err := NewSink(ctx, simuBucket, tc.overallTimeout, tc.maxRetry, tc.operationTimeOut) + require.NoError(t, err) + defer closeBucket(t, sink) + localDir := testhelper.TempDir(t) + err = os.WriteFile(filepath.Join(localDir, tc.objectName), []byte("Go long!"), 0o666) + require.NoError(t, err) + err = sink.Upload(ctx, filepath.Join(localDir, tc.objectName), prefix) + if tc.expectedError != nil { + require.ErrorIs(t, err, SimulationErrorUploadCanceled) + } else { + require.NoError(t, err) + } + + }) + } + + t.Run("upload failed with overall cancel triggered no retry", func(t *testing.T) { + prefix := "some/prefix" + objectName := "overall_cancel_called" localBucket := testhelper.TempDir(t) localBucketUri := fmt.Sprintf("file://%s", localBucket) bucket, err := blob.OpenBucket(ctx, localBucketUri) - require.NoError(t, err) + objectKey := fmt.Sprintf("%s/%s", prefix, objectName) simulation := map[string][]Simulation{ - "some/prefix/a_very_long_delay_key": { - //Delay: []time.Duration{1 * time.Second}, - //Err: nil, + objectKey: { {1 * time.Second, nil}, }, } @@ -228,16 +280,16 @@ func TestSink_Upload(t *testing.T) { ctx, cancel := context.WithCancel(ctx) // the overall timer and operation timer are long enough - sink, err := NewSink(ctx, simuBucket, 100*time.Second, 0, 5*time.Second) + sink, err := NewSink(ctx, simuBucket, 100*time.Second, 0, 10*time.Second) require.NoError(t, err) defer closeBucket(t, sink) localDir := testhelper.TempDir(t) - err = os.WriteFile(filepath.Join(localDir, "a_very_long_delay_key"), []byte("Go long!"), 0o666) + err = os.WriteFile(filepath.Join(localDir, objectName), []byte("Go long!"), 0o666) require.NoError(t, err) errCh := make(chan error) go func() { - errCh <- sink.Upload(ctx, filepath.Join(localDir, "a_very_long_delay_key"), "some/prefix") + errCh <- sink.Upload(ctx, filepath.Join(localDir, objectName), prefix) }() // trigger cancel @@ -246,68 +298,6 @@ func TestSink_Upload(t *testing.T) { require.ErrorIs(t, err, SimulationErrorUploadCanceled) }) - t.Run("upload success with simulated cancel and retry", func(t *testing.T) { - ctx := testhelper.Context(t) - - localBucket := testhelper.TempDir(t) - localBucketUri := fmt.Sprintf("file://%s", localBucket) - bucket, err := blob.OpenBucket(ctx, localBucketUri) - require.NoError(t, err) - simulation := map[string][]Simulation{ - "some/prefix/a_very_long_delay_key": { - //Delay: []time.Duration{1 * time.Second}, - //Err: nil, - {500 * time.Millisecond, SimulationErrorUploadCanceled}, - {500 * time.Millisecond, SimulationErrorUploadCanceled}, - {500 * time.Millisecond, nil}, - }, - } - simuBucket, err := NewSimulationBucket(bucket, true, simulation) - require.NoError(t, err) - - // the overall timer and operation timer are long enough - sink, err := NewSink(ctx, simuBucket, 100*time.Second, 2, 5*time.Second) - require.NoError(t, err) - defer closeBucket(t, sink) - - localDir := testhelper.TempDir(t) - err = os.WriteFile(filepath.Join(localDir, "a_very_long_delay_key"), []byte("Go long!"), 0o666) - require.NoError(t, err) - err = sink.Upload(ctx, filepath.Join(localDir, "a_very_long_delay_key"), "some/prefix") - require.NoError(t, err) - }) - - t.Run("upload failed with simulated cancel and retry", func(t *testing.T) { - ctx := testhelper.Context(t) - - localBucket := testhelper.TempDir(t) - localBucketUri := fmt.Sprintf("file://%s", localBucket) - bucket, err := blob.OpenBucket(ctx, localBucketUri) - require.NoError(t, err) - simulation := map[string][]Simulation{ - "some/prefix/a_very_long_delay_key": { - //Delay: []time.Duration{1 * time.Second}, - //Err: nil, - {500 * time.Millisecond, SimulationErrorUploadCanceled}, - {500 * time.Millisecond, SimulationErrorUploadCanceled}, - {500 * time.Millisecond, SimulationErrorUploadCanceled}, - }, - } - simuBucket, err := NewSimulationBucket(bucket, true, simulation) - require.NoError(t, err) - - // the overall timer and operation timer are long enough - sink, err := NewSink(ctx, simuBucket, 100*time.Second, 2, 5*time.Second) - require.NoError(t, err) - defer closeBucket(t, sink) - - localDir := testhelper.TempDir(t) - err = os.WriteFile(filepath.Join(localDir, "a_very_long_delay_key"), []byte("Go long!"), 0o666) - require.NoError(t, err) - err = sink.Upload(ctx, filepath.Join(localDir, "a_very_long_delay_key"), "some/prefix") - require.ErrorIs(t, err, SimulationErrorUploadCanceled) - }) - } // Test Download -- GitLab From 9b8a902057fd013f15f45aa7bfca17c982341f9f Mon Sep 17 00:00:00 2001 From: Eric Ju Date: Wed, 18 Dec 2024 18:43:44 -0500 Subject: [PATCH 09/10] Draft: listing tests are working --- internal/offloading/simulation_bucket.go | 156 ++++++- internal/offloading/sink.go | 233 +++++++++- internal/offloading/sink_test.go | 517 ++++++++++++++++++++++- 3 files changed, 868 insertions(+), 38 deletions(-) diff --git a/internal/offloading/simulation_bucket.go b/internal/offloading/simulation_bucket.go index 9e936285ab..ec72424280 100644 --- a/internal/offloading/simulation_bucket.go +++ b/internal/offloading/simulation_bucket.go @@ -16,13 +16,22 @@ import ( var ( SimulationErrorUploadCanceled = errors.New("upload canceled") SimulationErrorDownloadCanceled = errors.New("download canceled") + SimulationErrorDeleteCanceled = errors.New("delete canceled") + SimulationErrorListCanceled = errors.New("list canceled") ) type SimulationBucket struct { SimulationOn bool Simulation map[string][]Simulation - retryStat map[string]int - mu sync.Mutex + + // We can intercept iterator + ListOperationCtx context.Context + ListOperationErr error + ListCurrentSimulationRound int + itSimulation []Simulation + + retryStat map[string]int + mu sync.Mutex *blob.Bucket } @@ -41,12 +50,18 @@ func NewSimulationBucket(bucket *blob.Bucket, simulationOn bool, simulation map[ //if err != nil { // return nil, err //} + itSimulation := make([]Simulation, 0) + for _, s := range simulation { + itSimulation = append(itSimulation, s...) + } m := &SimulationBucket{ - Simulation: simulation, - Bucket: bucket, - SimulationOn: simulationOn, - retryStat: make(map[string]int), - mu: sync.Mutex{}, + Simulation: simulation, + Bucket: bucket, + SimulationOn: simulationOn, + retryStat: make(map[string]int), + mu: sync.Mutex{}, + ListCurrentSimulationRound: 0, + itSimulation: itSimulation, } return m, nil } @@ -108,3 +123,130 @@ func (r *SimulationBucket) Upload(ctx context.Context, key string, reader io.Rea } } + +func (r *SimulationBucket) Delete(ctx context.Context, key string) error { + simulation, found := r.Simulation[key] + if !r.SimulationOn || !found { + return r.Bucket.Delete(ctx, key) + } + + r.mu.Lock() + retryIndex, found := r.retryStat[key] + if found { + r.retryStat[key] = r.retryStat[key] + 1 + } else { + r.retryStat[key] = 1 + } + thisSimulation := simulation[retryIndex] + timer := time.NewTimer(thisSimulation.Delay) + r.mu.Unlock() + + select { + case <-ctx.Done(): + return SimulationErrorDeleteCanceled + case <-timer.C: + if thisSimulation.Err != nil { + return thisSimulation.Err + } + return r.Bucket.Delete(ctx, key) + } + +} + +//func (r *SimulationBucket) List(opts *blob.ListOptions) *blob.ListIterator { +// +// simulation, found := r.Simulation[opts.Prefix] +// if !r.SimulationOn || !found { +// return r.Bucket.List(opts) +// } +// +// r.mu.Lock() +// retryIndex, found := r.retryStat[opts.Prefix] +// if found { +// r.retryStat[opts.Prefix] = r.retryStat[opts.Prefix] + 1 +// } else { +// r.retryStat[opts.Prefix] = 1 +// } +// thisSimulation := simulation[retryIndex] +// timer := time.NewTimer(thisSimulation.Delay) +// r.mu.Unlock() +// +// select { +// case <-r.ListOperationCtx.Done(): +// r.ListOperationErr = SimulationErrorListCanceled +// return nil +// case <-timer.C: +// if thisSimulation.Err != nil { +// r.ListOperationErr = thisSimulation.Err +// return nil +// } +// return r.Bucket.List(opts) +// } +// +//} + +func (r *SimulationBucket) ListSimu(opts *blob.ListOptions) *ListIteratorWrapper { + defer func() { + r.ListCurrentSimulationRound++ + }() + + it := r.Bucket.List(opts) + + simulationOn := r.SimulationOn + var currentSimulation Simulation + if r.ListCurrentSimulationRound >= len(r.itSimulation) { + simulationOn = false + currentSimulation = Simulation{} + } else { + currentSimulation = r.itSimulation[r.ListCurrentSimulationRound] + } + + return &ListIteratorWrapper{ + ListIterator: it, + SimulationOn: simulationOn, + Simulation: currentSimulation, + } +} + +func (r *SimulationBucket) SetListContext(ctx context.Context) { + r.ListOperationCtx = ctx +} + +type ListIteratorWrapper struct { + SimulationOn bool + Simulation Simulation + *blob.ListIterator +} + +func (r *ListIteratorWrapper) Next(ctx context.Context) (*blob.ListObject, error) { + // Create a channel for signaling when each timer completes + //done := make(chan err) + if !r.SimulationOn { + return r.ListIterator.Next(ctx) + } + + timer := time.NewTimer(r.Simulation.Delay) + select { + case <-ctx.Done(): + return nil, SimulationErrorListCanceled + case <-timer.C: + if r.Simulation.Err != nil { + return nil, r.Simulation.Err + } + return r.ListIterator.Next(ctx) + } + + // Iterate over durations and start a goroutine for each timer + //for i, d := range r.Simulation { + // go func(idx int, s Simulation) { + // // Wait for the time to pass + // <-time.After(s.Delay) + // r.ListIterator.Next(ctx) + // }(i, d) + //} + + // Listen for the completion of each timer + //for range durations { + // fmt.Println(<-done) + //} +} diff --git a/internal/offloading/sink.go b/internal/offloading/sink.go index 484c9980f2..21db917fcc 100644 --- a/internal/offloading/sink.go +++ b/internal/offloading/sink.go @@ -29,8 +29,14 @@ type Bucket interface { Close() error } +type Iterator interface { + Next(ctx context.Context) (*blob.ListObject, error) +} + // Sink is a wrapper around the storage bucket used for uploading/downloading packfiles type Sink struct { + ctx context.Context + globalCancel context.CancelFunc //sink *backup.Sink timeout time.Duration bucket Bucket @@ -48,8 +54,10 @@ func NewSink(ctx context.Context, bucket Bucket, timeout time.Duration, maxRetry //if err != nil { // return nil, fmt.Errorf("open bucket: %w", err) //} + ctx, cancel := context.WithTimeout(ctx, timeout) return &Sink{ - //sink: sink, + ctx: ctx, + globalCancel: cancel, timeout: timeout, bucket: bucket, retry: backoff.NewDefaultExponential(rand.New(rand.NewSource(time.Now().UnixNano()))), @@ -130,8 +138,10 @@ func (r *Sink) Download(ctx context.Context, objectKey string, fullFilePath stri } for retry := uint(0); retry <= r.maxRetry; { - err = r.bucket.Download(ctx, objectKey, file, nil) - if err == nil { + operationCtx, operationCancel := context.WithTimeout(ctx, r.retryTimeout) + err = r.bucket.Download(operationCtx, objectKey, file, nil) + operationCancel() + if err == nil || r.maxRetry == 0 { break } else { timer := time.NewTimer(r.retry.Backoff(retry)) @@ -226,30 +236,206 @@ func (r *Sink) Download(ctx context.Context, objectKey string, fullFilePath stri //} // List -func (r *Sink) List(ctx context.Context, prefix string) ([]string, error) { - it := r.bucket.List(&blob.ListOptions{ - Prefix: prefix + "/", - Delimiter: "/", - }) - objects := make([]string, 0) - for { - attrs, err := it.Next(ctx) - if errors.Is(err, io.EOF) { - return objects, nil - } +func (r *Sink) List(ctx context.Context, prefix string) (res []string, err error) { - if err != nil { - return []string{}, fmt.Errorf("list objects: %w", err) + return list(ctx, r, prefix, r.bucket.List) + + //ctx, cancel := context.WithTimeout(ctx, r.timeout) + //defer cancel() + //for retry := uint(0); retry <= r.maxRetry; { + // + // res, err = list(ctx, r, prefix, r.bucket.List) + // + // if err == nil || r.maxRetry == 0 { + // break + // } else { + // timer := time.NewTimer(r.retry.Backoff(retry)) + // select { + // case <-ctx.Done(): + // timer.Stop() + // return []string{}, fmt.Errorf("list object cancelled %w", err) + // case <-timer.C: + // // Refresh timer expires, issue another try. + // retry++ + // } + // } + //} + // + ////listFunc := + ////r.bucket.List(&blob.ListOptions{ + //// Prefix: prefix + "/", + //// Delimiter: "/", + //// }) + //if err != nil { + // return []string{}, fmt.Errorf("list object %w", err) + // + //} + //return res, nil + //defer r.globalCancel() + //for retry := uint(0); retry <= r.maxRetry; { + // operationCtx, operationCancel := context.WithTimeout(r.ctx, r.retryTimeout) + // it := r.bucket.List(&blob.ListOptions{ + // Prefix: prefix + "/", + // Delimiter: "/", + // }) + // objects := make([]string, 0) + // for { + // var err error + // var attrs *blob.ListObject + // if it != nil { + // attrs, err = it.Next(operationCtx) + // if errors.Is(err, io.EOF) { + // operationCancel() + // return objects, nil + // } + // } + // + // if it == nil || err != nil { + // if r.maxRetry == 0 { + // operationCancel() + // return []string{}, fmt.Errorf("list objects: %w", err) + // } + // timer := time.NewTimer(r.retry.Backoff(retry)) + // select { + // case <-ctx.Done(): + // timer.Stop() + // operationCancel() + // return []string{}, fmt.Errorf("download object cancelled: %w", err) + // case <-timer.C: + // // Refresh timer expires, issue another try. + // retry++ + // break + // } + // } + // + // // Exclude the bucketPrefix "folder" itself + // if attrs != nil && attrs.Key == prefix+"/" { + // continue + // } + // objects = append(objects, filepath.Base(attrs.Key)) + // } + //} + //return []string{}, fmt.Errorf("list object enter iteration loop") +} + +//type originalListFunc func(opts *blob.ListOptions) *blob.ListIterator +//type wrapperListFunc func(opts *blob.ListOptions) *ListIteratorWrapper + +type ListFunc[T Iterator] func(opts *blob.ListOptions) T + +func list[T Iterator](ctx context.Context, r *Sink, prefix string, listFunc ListFunc[T]) (res []string, err error) { + //defer r.globalCancel() + + core := func() (res []string, err error) { + var listErr error + var attrs *blob.ListObject + it := listFunc(&blob.ListOptions{ + Prefix: prefix + "/", + Delimiter: "/", + }) + objects := make([]string, 0) + for { + operationCtx, operationCancel := context.WithTimeout(ctx, r.retryTimeout) + attrs, listErr = it.Next(operationCtx) + operationCancel() + + if errors.Is(listErr, io.EOF) { + return objects, nil + } + + if listErr != nil { + return []string{}, fmt.Errorf("list object on GCP bucket: %w", listErr) + } + + // Exclude the bucketPrefix "folder" itself + if attrs != nil && attrs.Key == prefix+"/" { + continue + } + + objects = append(objects, filepath.Base(attrs.Key)) } + } - // Exclude the bucketPrefix "folder" itself - if attrs != nil && attrs.Key == prefix+"/" { - continue + ctx, cancel := context.WithTimeout(ctx, r.timeout) + defer cancel() + for retry := uint(0); retry <= r.maxRetry; { + + res, err = core() + + if err == nil || r.maxRetry == 0 { + break + } else { + timer := time.NewTimer(r.retry.Backoff(retry)) + select { + case <-ctx.Done(): + timer.Stop() + return []string{}, fmt.Errorf("list object cancelled %w", err) + case <-timer.C: + // Refresh timer expires, issue another try. + retry++ + } } + } + + if err != nil { + return []string{}, fmt.Errorf("list object %w", err) - objects = append(objects, filepath.Base(attrs.Key)) } + return res, nil + //ctx, cancel := context.WithTimeout(ctx, r.timeout) + //defer cancel() + //for retry := uint(0); retry <= r.maxRetry; { + // //it := r.bucket.List(&blob.ListOptions{ + // // Prefix: prefix + "/", + // // Delimiter: "/", + // //}) + // it := listFunc(&blob.ListOptions{ + // Prefix: prefix + "/", + // Delimiter: "/", + // }) + // objects := make([]string, 0) + // for { + // var err error + // var attrs *blob.ListObject + // + // operationCtx, operationCancel := context.WithTimeout(r.ctx, r.retryTimeout) + // attrs, err = it.Next(operationCtx) + // if errors.Is(err, io.EOF) { + // operationCancel() + // return objects, nil + // } + // + // if err != nil { + // if r.maxRetry == 0 { + // operationCancel() + // return []string{}, fmt.Errorf("list objects: %w", err) + // } + // timer := time.NewTimer(r.retry.Backoff(retry)) + // select { + // case <-ctx.Done(): + // timer.Stop() + // operationCancel() + // return []string{}, fmt.Errorf("download object cancelled: %w", err) + // case <-timer.C: + // // Refresh timer expires, issue another try. + // retry++ + // break + // } + // } + // + // // Exclude the bucketPrefix "folder" itself + // if attrs != nil && attrs.Key == prefix+"/" { + // continue + // } + // + // if attrs != nil { + // objects = append(objects, filepath.Base(attrs.Key)) + // } + // + // } + //} + //return []string{}, fmt.Errorf("list object enter iteration loop") } // HasObjects @@ -316,8 +502,10 @@ func (r *Sink) DeleteObjects(ctx context.Context, prefix string, objectNames []s defer wg.Done() var err error for retry := uint(0); retry <= r.maxRetry; { - err = r.bucket.Delete(ctx, obj) - if err == nil { + operationCtx, operationCancel := context.WithTimeout(ctx, r.retryTimeout) + err = r.bucket.Delete(operationCtx, obj) + operationCancel() + if err == nil || r.maxRetry == 0 { break } else { timer := time.NewTimer(r.retry.Backoff(retry)) @@ -325,7 +513,6 @@ func (r *Sink) DeleteObjects(ctx context.Context, prefix string, objectNames []s case <-ctx.Done(): timer.Stop() err = fmt.Errorf("delete object cancelled %q: %w", obj, err) - return case <-timer.C: // Refresh timer expires, issue another try. retry++ diff --git a/internal/offloading/sink_test.go b/internal/offloading/sink_test.go index dd5b1bb63c..5a3461e689 100644 --- a/internal/offloading/sink_test.go +++ b/internal/offloading/sink_test.go @@ -180,7 +180,7 @@ func TestSink_Upload_Timeout_Cancellation_And_Retry(t *testing.T) { expectedError: SimulationErrorUploadCanceled, }, { - desc: "upload timeout success with retry", + desc: "upload success with some operation timeout and retry", maxRetry: 2, overallTimeout: 60 * time.Second, operationTimeOut: 100 * time.Millisecond, @@ -360,7 +360,7 @@ func TestSink_Download(t *testing.T) { }, }, } - sink = setupLocalBucket(t, prefix, objects) + sink, _ = setupLocalBucket(t, prefix, objects) defer closeBucket(t, sink) localDir := testhelper.TempDir(t) @@ -369,7 +369,6 @@ func TestSink_Download(t *testing.T) { localFullPath := filepath.Join(localDir, obj) err := sink.Download(ctx, objectKey, localFullPath) if err == nil { - require.NoError(t, err) buf, err := os.ReadFile(localFullPath) require.NoError(t, err) require.Equal(t, tc.expectedContent[obj], string(buf)) @@ -382,6 +381,170 @@ func TestSink_Download(t *testing.T) { } } +func TestSink_Download_Timeout_Cancellation_And_Retry(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + + for _, tc := range []struct { + desc string + maxRetry uint + overallTimeout time.Duration + + // operationTimeOut is the timer for each operation e.g. upload, download + operationTimeOut time.Duration + objectName string + objectContent string + simulationData []Simulation + expectedError error + }{ + { + desc: "download failed with overall timer timeout no retry", + maxRetry: 0, + overallTimeout: 100 * time.Millisecond, + operationTimeOut: 5 * time.Second, + objectName: "overall_timeout_obj_key", + objectContent: "it's about time", + simulationData: []Simulation{ + {Delay: 10 * time.Second, Err: nil}, + }, + expectedError: SimulationErrorDownloadCanceled, + }, + { + desc: "down success with some timeouts and retry", + maxRetry: 2, + overallTimeout: 60 * time.Second, + operationTimeOut: 100 * time.Millisecond, + objectName: "success_with_retry", + objectContent: "with some network jitter, we made it", + simulationData: []Simulation{ + {300 * time.Millisecond, nil}, // this will timeout + {300 * time.Millisecond, nil}, // this will timeout + {10 * time.Millisecond, nil}, // this should succeed + }, + expectedError: nil, + }, + { + desc: "download failed with operation timeout and with retry", + maxRetry: 2, + overallTimeout: 60 * time.Second, + operationTimeOut: 50 * time.Millisecond, + objectName: "failed_even_retry", + objectContent: "we had a terrible network", + simulationData: []Simulation{ + {1 * time.Second, nil}, + {1 * time.Second, nil}, + {1 * time.Second, nil}, + }, + expectedError: SimulationErrorDownloadCanceled, + }, + { + desc: "down success with simulated cancel and retry", + maxRetry: 2, + overallTimeout: 60 * time.Second, + operationTimeOut: 10 * time.Second, + objectName: "success_with_cancel_and_retry", + simulationData: []Simulation{ + {500 * time.Millisecond, SimulationErrorDownloadCanceled}, + {500 * time.Millisecond, SimulationErrorDownloadCanceled}, + {500 * time.Millisecond, nil}, + }, + expectedError: nil, + }, + { + desc: "download failed with simulated cancel and retry", + maxRetry: 2, + overallTimeout: 60 * time.Second, + operationTimeOut: 10 * time.Second, + objectName: "success_with_cancel_and_retry", + simulationData: []Simulation{ + {500 * time.Millisecond, SimulationErrorDownloadCanceled}, + {500 * time.Millisecond, SimulationErrorDownloadCanceled}, + {500 * time.Millisecond, SimulationErrorDownloadCanceled}, + }, + expectedError: SimulationErrorDownloadCanceled, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + prefix := "some/prefix" + objects := []fileBucketData{ + { + ObjectName: tc.objectName, + Content: tc.objectContent, + WriterOpt: blob.WriterOptions{ + ContentType: "application/octet-stream", + CacheControl: "no-store, no-transform", + }, + }, + } + _, bucket := setupLocalBucket(t, prefix, objects) + + objectKey := fmt.Sprintf("%s/%s", prefix, tc.objectName) + simulation := map[string][]Simulation{ + objectKey: tc.simulationData, + } + simuBucket, err := NewSimulationBucket(bucket, true, simulation) + require.NoError(t, err) + sink, err := NewSink(ctx, simuBucket, tc.overallTimeout, tc.maxRetry, tc.operationTimeOut) + require.NoError(t, err) + defer closeBucket(t, sink) + + localDir := testhelper.TempDir(t) + localFullPath := filepath.Join(localDir, tc.objectName) + err = sink.Download(ctx, objectKey, localFullPath) + + if tc.expectedError != nil { + require.ErrorIs(t, err, tc.expectedError) + } else { + buf, err := os.ReadFile(localFullPath) + require.NoError(t, err) + require.Equal(t, tc.objectContent, string(buf)) + } + }) + } + + t.Run("download failed with overall cancel triggered no retry", func(t *testing.T) { + prefix := "some/prefix" + objectName := "overall_cancel_called" + objectContent := "I can't wait forever" + objects := []fileBucketData{ + { + ObjectName: objectName, + Content: objectContent, + WriterOpt: blob.WriterOptions{ + ContentType: "application/octet-stream", + CacheControl: "no-store, no-transform", + }, + }, + } + _, bucket := setupLocalBucket(t, prefix, objects) + + objectKey := fmt.Sprintf("%s/%s", prefix, objectName) + simulation := map[string][]Simulation{ + objectKey: { + {1 * time.Second, nil}, + }, + } + simuBucket, err := NewSimulationBucket(bucket, true, simulation) + require.NoError(t, err) + ctx, cancel := context.WithCancel(ctx) + sink, err := NewSink(ctx, simuBucket, 100*time.Second, 0, 10*time.Second) + require.NoError(t, err) + defer closeBucket(t, sink) + + localDir := testhelper.TempDir(t) + localFullPath := filepath.Join(localDir, objectName) + errCh := make(chan error) + go func() { + errCh <- sink.Download(ctx, objectKey, localFullPath) + }() + + // trigger cancel + cancel() + err = <-errCh + require.ErrorIs(t, err, SimulationErrorDownloadCanceled) + }) +} + func TestSink_Delete(t *testing.T) { ctx := testhelper.Context(t) @@ -536,7 +699,7 @@ func TestSink_Delete(t *testing.T) { t.Run(tc.desc, func(t *testing.T) { var sink *Sink - sink = setupLocalBucket(t, tc.prefix, tc.objects) + sink, _ = setupLocalBucket(t, tc.prefix, tc.objects) defer closeBucket(t, sink) actualResWithErrorCode := make(map[string]gcerrors.ErrorCode) @@ -554,6 +717,164 @@ func TestSink_Delete(t *testing.T) { } } +func TestSink_Delete_Timeout_Cancellation_And_Retry(t *testing.T) { + + t.Parallel() + ctx := testhelper.Context(t) + + for _, tc := range []struct { + desc string + maxRetry uint + overallTimeout time.Duration + + // operationTimeOut is the timer for each operation e.g. upload, download + operationTimeOut time.Duration + objectName string + simulationData []Simulation + expectedError error + }{ + { + desc: "delete failed with overall timer timeout no retry", + maxRetry: 0, + overallTimeout: 100 * time.Millisecond, + operationTimeOut: 5 * time.Second, + objectName: "overall_timeout_obj_key", + simulationData: []Simulation{ + {Delay: 10 * time.Second, Err: nil}, + }, + expectedError: SimulationErrorDeleteCanceled, + }, + { + desc: "delete success with some timeouts and retry", + maxRetry: 2, + overallTimeout: 60 * time.Second, + operationTimeOut: 100 * time.Millisecond, + objectName: "success_with_retry", + simulationData: []Simulation{ + {300 * time.Millisecond, nil}, // this will timeout + {300 * time.Millisecond, nil}, // this will timeout + {10 * time.Millisecond, nil}, // this should succeed + }, + expectedError: nil, + }, + { + desc: "delete failed with operation timeout and with retry", + maxRetry: 2, + overallTimeout: 60 * time.Second, + operationTimeOut: 50 * time.Millisecond, + objectName: "failed_even_retry", + simulationData: []Simulation{ + {1 * time.Second, nil}, + {1 * time.Second, nil}, + {1 * time.Second, nil}, + }, + expectedError: SimulationErrorDeleteCanceled, + }, + { + desc: "delete success with simulated cancel and retry", + maxRetry: 2, + overallTimeout: 60 * time.Second, + operationTimeOut: 10 * time.Second, + objectName: "success_with_cancel_and_retry", + simulationData: []Simulation{ + {500 * time.Millisecond, SimulationErrorDownloadCanceled}, + {500 * time.Millisecond, SimulationErrorDownloadCanceled}, + {500 * time.Millisecond, nil}, + }, + expectedError: nil, + }, + { + desc: "delete failed with simulated cancel and retry", + maxRetry: 2, + overallTimeout: 60 * time.Second, + operationTimeOut: 10 * time.Second, + objectName: "success_with_cancel_and_retry", + simulationData: []Simulation{ + {500 * time.Millisecond, SimulationErrorDeleteCanceled}, + {500 * time.Millisecond, SimulationErrorDeleteCanceled}, + {500 * time.Millisecond, SimulationErrorDeleteCanceled}, + }, + expectedError: SimulationErrorDeleteCanceled, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + prefix := "some/prefix" + objects := []fileBucketData{ + { + ObjectName: tc.objectName, + Content: "I am Mr. Frundles", + WriterOpt: blob.WriterOptions{ + ContentType: "application/octet-stream", + CacheControl: "no-store, no-transform", + }, + }, + } + _, bucket := setupLocalBucket(t, prefix, objects) + + objectKey := fmt.Sprintf("%s/%s", prefix, tc.objectName) + simulation := map[string][]Simulation{ + objectKey: tc.simulationData, + } + simuBucket, err := NewSimulationBucket(bucket, true, simulation) + require.NoError(t, err) + sink, err := NewSink(ctx, simuBucket, tc.overallTimeout, tc.maxRetry, tc.operationTimeOut) + require.NoError(t, err) + defer closeBucket(t, sink) + + res := sink.DeleteObjects(ctx, prefix, []string{objectKey}) + + if tc.expectedError != nil { + require.ErrorIs(t, res[objectKey], tc.expectedError) + } else { + require.NoError(t, res[objectKey]) + actualObjectsLeft, err := sink.List(ctx, prefix) + require.NoError(t, err) + require.Empty(t, actualObjectsLeft) + } + }) + } + + t.Run("download failed with overall cancel triggered no retry", func(t *testing.T) { + prefix := "some/prefix" + objectName := "overall_cancel_called" + objectContent := "I can't wait forever" + objects := []fileBucketData{ + { + ObjectName: objectName, + Content: objectContent, + WriterOpt: blob.WriterOptions{ + ContentType: "application/octet-stream", + CacheControl: "no-store, no-transform", + }, + }, + } + _, bucket := setupLocalBucket(t, prefix, objects) + + objectKey := fmt.Sprintf("%s/%s", prefix, objectName) + simulation := map[string][]Simulation{ + objectKey: { + {1 * time.Second, nil}, + }, + } + simuBucket, err := NewSimulationBucket(bucket, true, simulation) + require.NoError(t, err) + ctx, cancel := context.WithCancel(ctx) + sink, err := NewSink(ctx, simuBucket, 100*time.Second, 0, 10*time.Second) + require.NoError(t, err) + defer closeBucket(t, sink) + + errCh := make(chan map[string]error) + go func() { + errCh <- sink.DeleteObjects(ctx, prefix, []string{objectKey}) + }() + + // trigger cancel + cancel() + res := <-errCh + require.ErrorIs(t, res[objectKey], SimulationErrorDeleteCanceled) + }) +} + // Test List func TestSink_List(t *testing.T) { ctx := testhelper.Context(t) @@ -639,7 +960,7 @@ func TestSink_List(t *testing.T) { t.Run(tc.desc, func(t *testing.T) { var sink *Sink - sink = setupLocalBucket(t, tc.prefix, tc.objects) + sink, _ = setupLocalBucket(t, tc.prefix, tc.objects) defer closeBucket(t, sink) res, err := sink.List(ctx, tc.queryingPrefix) @@ -656,6 +977,186 @@ func TestSink_List(t *testing.T) { //require.ElementsMatch(t, res, []string{"C-137", "C-131"}) } +func TestSink_List_Timeout_Cancellation_And_Retry(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + + for _, tc := range []struct { + desc string + maxRetry uint + overallTimeout time.Duration + + // operationTimeOut is the timer for each operation e.g. upload, download + operationTimeOut time.Duration + objectName string + simulationData []Simulation + expectedError error + }{ + { + desc: "list failed with overall timer timeout no retry", + maxRetry: 0, + overallTimeout: 100 * time.Millisecond, + operationTimeOut: 5 * time.Second, + objectName: "overall_timeout_obj_key", + simulationData: []Simulation{ + {Delay: 10 * time.Second, Err: nil}, + }, + expectedError: SimulationErrorListCanceled, + }, + { + desc: "list success with some timeouts and retry", + maxRetry: 2, + overallTimeout: 60 * time.Second, + operationTimeOut: 100 * time.Millisecond, + objectName: "success_with_retry", + simulationData: []Simulation{ + {300 * time.Millisecond, nil}, // this will timeout + {300 * time.Millisecond, nil}, // this will timeout + {10 * time.Millisecond, nil}, // this should succeed + }, + expectedError: nil, + }, + { + desc: "list failed with operation timeout and with retry", + maxRetry: 2, + overallTimeout: 60 * time.Second, + operationTimeOut: 50 * time.Millisecond, + objectName: "failed_even_retry", + simulationData: []Simulation{ + {1 * time.Second, nil}, + {1 * time.Second, nil}, + {1 * time.Second, nil}, + }, + expectedError: SimulationErrorListCanceled, + }, + { + desc: "list success with simulated cancel and retry", + maxRetry: 2, + overallTimeout: 60 * time.Second, + operationTimeOut: 10 * time.Second, + objectName: "success_with_cancel_and_retry", + simulationData: []Simulation{ + {500 * time.Millisecond, SimulationErrorDownloadCanceled}, + {500 * time.Millisecond, SimulationErrorDownloadCanceled}, + {500 * time.Millisecond, nil}, + }, + expectedError: nil, + }, + { + desc: "delete failed with simulated cancel and retry", + maxRetry: 2, + overallTimeout: 60 * time.Second, + operationTimeOut: 10 * time.Second, + objectName: "success_with_cancel_and_retry", + simulationData: []Simulation{ + {500 * time.Millisecond, SimulationErrorDeleteCanceled}, + {500 * time.Millisecond, SimulationErrorDeleteCanceled}, + {500 * time.Millisecond, SimulationErrorDeleteCanceled}, + }, + expectedError: SimulationErrorDeleteCanceled, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + prefix := "some/prefix" + objects := []fileBucketData{ + { + ObjectName: tc.objectName, + Content: "I am Mr. Frundles", + WriterOpt: blob.WriterOptions{ + ContentType: "application/octet-stream", + CacheControl: "no-store, no-transform", + }, + }, + } + _, bucket := setupLocalBucket(t, prefix, objects) + + // When simulating listing, simulation data key is prefix + "/" + // Since we pass prefix in List() + // TODO why + //simulationKey := fmt.Sprintf("%s/", prefix) + objectKey := fmt.Sprintf("%s/%s", prefix, tc.objectName) + simulation := map[string][]Simulation{ + objectKey: tc.simulationData, + } + simuBucket, err := NewSimulationBucket(bucket, true, simulation) + require.NoError(t, err) + + simuBucketPtr, ok := simuBucket.(*SimulationBucket) + require.True(t, ok) + + sink, err := NewSink(ctx, simuBucket, tc.overallTimeout, tc.maxRetry, tc.operationTimeOut) + //simuBucketPtr.ListOperationCtx = sink.ctx + + require.NoError(t, err) + defer closeBucket(t, sink) + + res, err := list(ctx, sink, prefix, simuBucketPtr.ListSimu) + if tc.expectedError != nil { + require.ErrorIs(t, err, tc.expectedError) + } else { + require.NoError(t, err) + require.ElementsMatch(t, res, []string{tc.objectName}) + } + + //res := sink.DeleteObjects(ctx, prefix, []string{objectKey}) + + //if tc.expectedError != nil { + // require.ErrorIs(t, res[objectKey], tc.expectedError) + //} else { + // require.NoError(t, res[objectKey]) + // actualObjectsLeft, err := sink.List(ctx, prefix) + // require.NoError(t, err) + // require.Empty(t, actualObjectsLeft) + //} + }) + } + + t.Run("list failed with overall cancel triggered no retry", func(t *testing.T) { + prefix := "some/prefix" + objectName := "overall_cancel_called" + objectContent := "I can't wait forever" + objects := []fileBucketData{ + { + ObjectName: objectName, + Content: objectContent, + WriterOpt: blob.WriterOptions{ + ContentType: "application/octet-stream", + CacheControl: "no-store, no-transform", + }, + }, + } + _, bucket := setupLocalBucket(t, prefix, objects) + + objectKey := fmt.Sprintf("%s/%s", prefix, objectName) + simulation := map[string][]Simulation{ + objectKey: { + {1 * time.Second, nil}, + }, + } + simuBucket, err := NewSimulationBucket(bucket, true, simulation) + require.NoError(t, err) + + simuBucketPtr, ok := simuBucket.(*SimulationBucket) + require.True(t, ok) + + ctx, cancel := context.WithCancel(ctx) + sink, err := NewSink(ctx, simuBucket, 100*time.Second, 0, 10*time.Second) + require.NoError(t, err) + defer closeBucket(t, sink) + + errCh := make(chan error) + go func() { + _, err := list(ctx, sink, prefix, simuBucketPtr.ListSimu) + errCh <- err + }() + + // trigger cancel + cancel() + res := <-errCh + require.ErrorIs(t, res, SimulationErrorListCanceled) + }) +} + func TestSink_HasObjects(t *testing.T) { ctx := testhelper.Context(t) @@ -740,7 +1241,7 @@ func TestSink_HasObjects(t *testing.T) { t.Run(tc.desc, func(t *testing.T) { var sink *Sink - sink = setupLocalBucket(t, tc.prefix, tc.objects) + sink, _ = setupLocalBucket(t, tc.prefix, tc.objects) defer closeBucket(t, sink) res, err := sink.HasObjects(ctx, tc.queryingPrefix) @@ -769,7 +1270,7 @@ type fileBucketData struct { WriterOpt blob.WriterOptions } -func setupLocalBucket(t *testing.T, prefix string, objectsToUpload []fileBucketData) *Sink { +func setupLocalBucket(t *testing.T, prefix string, objectsToUpload []fileBucketData) (*Sink, *blob.Bucket) { ctx := testhelper.Context(t) bucket := testhelper.TempDir(t) @@ -787,7 +1288,7 @@ func setupLocalBucket(t *testing.T, prefix string, objectsToUpload []fileBucketD require.NoError(t, err) } - return sink + return sink, localBucket } // Clean up bucket ?? -- GitLab From afe26e1c3591478ea093e0036cd1ccbb844ed440 Mon Sep 17 00:00:00 2001 From: Eric Ju Date: Wed, 18 Dec 2024 18:48:02 -0500 Subject: [PATCH 10/10] Draft: clean up --- internal/offloading/simulation_bucket.go | 52 ------ internal/offloading/sink.go | 216 ----------------------- internal/offloading/sink_test.go | 21 --- 3 files changed, 289 deletions(-) diff --git a/internal/offloading/simulation_bucket.go b/internal/offloading/simulation_bucket.go index ec72424280..ce8f7c1c2b 100644 --- a/internal/offloading/simulation_bucket.go +++ b/internal/offloading/simulation_bucket.go @@ -46,10 +46,6 @@ type Simulation struct { // to create a mock bucket func NewSimulationBucket(bucket *blob.Bucket, simulationOn bool, simulation map[string][]Simulation) (Bucket, error) { - //b, err := fileblob.OpenBucket(dir, nil) - //if err != nil { - // return nil, err - //} itSimulation := make([]Simulation, 0) for _, s := range simulation { itSimulation = append(itSimulation, s...) @@ -153,38 +149,6 @@ func (r *SimulationBucket) Delete(ctx context.Context, key string) error { } -//func (r *SimulationBucket) List(opts *blob.ListOptions) *blob.ListIterator { -// -// simulation, found := r.Simulation[opts.Prefix] -// if !r.SimulationOn || !found { -// return r.Bucket.List(opts) -// } -// -// r.mu.Lock() -// retryIndex, found := r.retryStat[opts.Prefix] -// if found { -// r.retryStat[opts.Prefix] = r.retryStat[opts.Prefix] + 1 -// } else { -// r.retryStat[opts.Prefix] = 1 -// } -// thisSimulation := simulation[retryIndex] -// timer := time.NewTimer(thisSimulation.Delay) -// r.mu.Unlock() -// -// select { -// case <-r.ListOperationCtx.Done(): -// r.ListOperationErr = SimulationErrorListCanceled -// return nil -// case <-timer.C: -// if thisSimulation.Err != nil { -// r.ListOperationErr = thisSimulation.Err -// return nil -// } -// return r.Bucket.List(opts) -// } -// -//} - func (r *SimulationBucket) ListSimu(opts *blob.ListOptions) *ListIteratorWrapper { defer func() { r.ListCurrentSimulationRound++ @@ -219,8 +183,6 @@ type ListIteratorWrapper struct { } func (r *ListIteratorWrapper) Next(ctx context.Context) (*blob.ListObject, error) { - // Create a channel for signaling when each timer completes - //done := make(chan err) if !r.SimulationOn { return r.ListIterator.Next(ctx) } @@ -235,18 +197,4 @@ func (r *ListIteratorWrapper) Next(ctx context.Context) (*blob.ListObject, error } return r.ListIterator.Next(ctx) } - - // Iterate over durations and start a goroutine for each timer - //for i, d := range r.Simulation { - // go func(idx int, s Simulation) { - // // Wait for the time to pass - // <-time.After(s.Delay) - // r.ListIterator.Next(ctx) - // }(i, d) - //} - - // Listen for the completion of each timer - //for range durations { - // fmt.Println(<-done) - //} } diff --git a/internal/offloading/sink.go b/internal/offloading/sink.go index 21db917fcc..7ac411d3c5 100644 --- a/internal/offloading/sink.go +++ b/internal/offloading/sink.go @@ -161,166 +161,11 @@ func (r *Sink) Download(ctx context.Context, objectKey string, fullFilePath stri return nil } -//// upload just upload one single file to the bucket -//func (r *Sink) upload(ctx context.Context, fullFilePath string, prefix string) (returnErr error) { -// // Use context timeouts to set an overall deadline for the operation, including all potential retries. -// // Having a cancelable context is critical here because the retry mechanism does not have a maximum -// // retry count and will continue retrying until the context is explicitly canceled. -// ctx, cancel := context.WithTimeout(ctx, r.timeout) -// defer cancel() -// -// file, err := os.Open(fullFilePath) -// defer func() { -// if err := file.Close(); err != nil { -// returnErr = errors.Join(returnErr, fmt.Errorf("close local file writer: %w", err)) -// } -// }() -// if err != nil { -// return fmt.Errorf("open file: %w", err) -// } -// -// objectKey := fmt.Sprintf("%s/%s", prefix, filepath.Base(fullFilePath)) -// wc, err := r.sink.GetWriter(ctx, objectKey) -// defer func() { -// if err := wc.Close(); err != nil { -// returnErr = errors.Join(returnErr, fmt.Errorf("close object writer: %w", err)) -// } -// }() -// if err != nil { -// return fmt.Errorf("get writer: %w", err) -// } -// -// _, err = io.Copy(wc, file) -// if err != nil { -// return fmt.Errorf("copy local file to object writer: %w", err) -// } -// -// return nil -//} -// -//// download -//func (r *Sink) download(ctx context.Context, objectKey string, fullFilePath string) (returnErr error) { -// // Use context timeouts to set an overall deadline for the operation, including all potential retries. -// // Having a cancelable context is critical here because the retry mechanism does not have a maximum -// // retry count and will continue retrying until the context is explicitly canceled. -// ctx, cancel := context.WithTimeout(ctx, r.timeout) -// defer cancel() -// -// file, err := os.Create(fullFilePath) -// defer func() { -// if err := file.Close(); err != nil { -// returnErr = errors.Join(returnErr, fmt.Errorf("close local file: %w", err)) -// } -// }() -// if err != nil { -// return fmt.Errorf("create file: %w", err) -// } -// -// // TODO hwo to do retry? -// // TODO what is the option? -// rc, err := r.sink.GetReader(ctx, objectKey) -// defer func() { -// if err := rc.Close(); err != nil { -// returnErr = errors.Join(returnErr, fmt.Errorf("close obeject reader: %w", err)) -// } -// }() -// if err != nil { -// return fmt.Errorf("get reader: %w", err) -// } -// _, err = io.Copy(file, rc) -// if err != nil { -// return fmt.Errorf("copy object reader to local file: %w", err) -// } -// -// return nil -//} - // List func (r *Sink) List(ctx context.Context, prefix string) (res []string, err error) { - return list(ctx, r, prefix, r.bucket.List) - - //ctx, cancel := context.WithTimeout(ctx, r.timeout) - //defer cancel() - //for retry := uint(0); retry <= r.maxRetry; { - // - // res, err = list(ctx, r, prefix, r.bucket.List) - // - // if err == nil || r.maxRetry == 0 { - // break - // } else { - // timer := time.NewTimer(r.retry.Backoff(retry)) - // select { - // case <-ctx.Done(): - // timer.Stop() - // return []string{}, fmt.Errorf("list object cancelled %w", err) - // case <-timer.C: - // // Refresh timer expires, issue another try. - // retry++ - // } - // } - //} - // - ////listFunc := - ////r.bucket.List(&blob.ListOptions{ - //// Prefix: prefix + "/", - //// Delimiter: "/", - //// }) - //if err != nil { - // return []string{}, fmt.Errorf("list object %w", err) - // - //} - //return res, nil - //defer r.globalCancel() - //for retry := uint(0); retry <= r.maxRetry; { - // operationCtx, operationCancel := context.WithTimeout(r.ctx, r.retryTimeout) - // it := r.bucket.List(&blob.ListOptions{ - // Prefix: prefix + "/", - // Delimiter: "/", - // }) - // objects := make([]string, 0) - // for { - // var err error - // var attrs *blob.ListObject - // if it != nil { - // attrs, err = it.Next(operationCtx) - // if errors.Is(err, io.EOF) { - // operationCancel() - // return objects, nil - // } - // } - // - // if it == nil || err != nil { - // if r.maxRetry == 0 { - // operationCancel() - // return []string{}, fmt.Errorf("list objects: %w", err) - // } - // timer := time.NewTimer(r.retry.Backoff(retry)) - // select { - // case <-ctx.Done(): - // timer.Stop() - // operationCancel() - // return []string{}, fmt.Errorf("download object cancelled: %w", err) - // case <-timer.C: - // // Refresh timer expires, issue another try. - // retry++ - // break - // } - // } - // - // // Exclude the bucketPrefix "folder" itself - // if attrs != nil && attrs.Key == prefix+"/" { - // continue - // } - // objects = append(objects, filepath.Base(attrs.Key)) - // } - //} - //return []string{}, fmt.Errorf("list object enter iteration loop") } -//type originalListFunc func(opts *blob.ListOptions) *blob.ListIterator -//type wrapperListFunc func(opts *blob.ListOptions) *ListIteratorWrapper - type ListFunc[T Iterator] func(opts *blob.ListOptions) T func list[T Iterator](ctx context.Context, r *Sink, prefix string, listFunc ListFunc[T]) (res []string, err error) { @@ -383,59 +228,6 @@ func list[T Iterator](ctx context.Context, r *Sink, prefix string, listFunc List } return res, nil - //ctx, cancel := context.WithTimeout(ctx, r.timeout) - //defer cancel() - //for retry := uint(0); retry <= r.maxRetry; { - // //it := r.bucket.List(&blob.ListOptions{ - // // Prefix: prefix + "/", - // // Delimiter: "/", - // //}) - // it := listFunc(&blob.ListOptions{ - // Prefix: prefix + "/", - // Delimiter: "/", - // }) - // objects := make([]string, 0) - // for { - // var err error - // var attrs *blob.ListObject - // - // operationCtx, operationCancel := context.WithTimeout(r.ctx, r.retryTimeout) - // attrs, err = it.Next(operationCtx) - // if errors.Is(err, io.EOF) { - // operationCancel() - // return objects, nil - // } - // - // if err != nil { - // if r.maxRetry == 0 { - // operationCancel() - // return []string{}, fmt.Errorf("list objects: %w", err) - // } - // timer := time.NewTimer(r.retry.Backoff(retry)) - // select { - // case <-ctx.Done(): - // timer.Stop() - // operationCancel() - // return []string{}, fmt.Errorf("download object cancelled: %w", err) - // case <-timer.C: - // // Refresh timer expires, issue another try. - // retry++ - // break - // } - // } - // - // // Exclude the bucketPrefix "folder" itself - // if attrs != nil && attrs.Key == prefix+"/" { - // continue - // } - // - // if attrs != nil { - // objects = append(objects, filepath.Base(attrs.Key)) - // } - // - // } - //} - //return []string{}, fmt.Errorf("list object enter iteration loop") } // HasObjects @@ -464,14 +256,6 @@ func (r *Sink) HasObjects(ctx context.Context, prefix string) (bool, error) { } } -//func (r *Sink) IsEmpty(ctx context.Context, prefix string) (bool, error) { -// res, err := r.HasObjects(ctx, prefix) -// if err != nil { -// return false, err -// } -// return !res, nil -//} - // DeleteObjects attempts to delete given objects within the specified bucketPrefix. // It employs a retry mechanism to ensure objects are deleted with best-effort. // Result is objectKey map to error diff --git a/internal/offloading/sink_test.go b/internal/offloading/sink_test.go index 5a3461e689..965898c832 100644 --- a/internal/offloading/sink_test.go +++ b/internal/offloading/sink_test.go @@ -969,12 +969,6 @@ func TestSink_List(t *testing.T) { require.Equal(t, tc.expectedRes, res) }) } - //sink := setupLocalBucket(t) - //defer closeBucket(t, sink) - // - //res, err := sink.List(ctx, "jerryboree") - //require.NoError(t, err) - //require.ElementsMatch(t, res, []string{"C-137", "C-131"}) } func TestSink_List_Timeout_Cancellation_And_Retry(t *testing.T) { @@ -1070,10 +1064,6 @@ func TestSink_List_Timeout_Cancellation_And_Retry(t *testing.T) { } _, bucket := setupLocalBucket(t, prefix, objects) - // When simulating listing, simulation data key is prefix + "/" - // Since we pass prefix in List() - // TODO why - //simulationKey := fmt.Sprintf("%s/", prefix) objectKey := fmt.Sprintf("%s/%s", prefix, tc.objectName) simulation := map[string][]Simulation{ objectKey: tc.simulationData, @@ -1097,17 +1087,6 @@ func TestSink_List_Timeout_Cancellation_And_Retry(t *testing.T) { require.NoError(t, err) require.ElementsMatch(t, res, []string{tc.objectName}) } - - //res := sink.DeleteObjects(ctx, prefix, []string{objectKey}) - - //if tc.expectedError != nil { - // require.ErrorIs(t, res[objectKey], tc.expectedError) - //} else { - // require.NoError(t, res[objectKey]) - // actualObjectsLeft, err := sink.List(ctx, prefix) - // require.NoError(t, err) - // require.Empty(t, actualObjectsLeft) - //} }) } -- GitLab