From ebdd06b5a29cf67a340b118ee5e4f1de508d82d3 Mon Sep 17 00:00:00 2001 From: Mustafa Bayar Date: Mon, 22 Jul 2024 15:26:28 +0200 Subject: [PATCH 1/2] repository: Send acknowledgement for CreateBundleFromRefList CreateBundleFromRefList does not acknowledge the header message sent as the first message in the stream to the client which can make it harder to understand the problem if server returns error while client already starts its data stream without the acknowledgement. Therefore server should acknowledge the initial request with empty response and also validate that the first request dooes not contain patterns. --- .../repository/create_bundle_from_ref_list.go | 22 ++--- .../create_bundle_from_ref_list_test.go | 99 ++++++++++++------- 2 files changed, 77 insertions(+), 44 deletions(-) diff --git a/internal/gitaly/service/repository/create_bundle_from_ref_list.go b/internal/gitaly/service/repository/create_bundle_from_ref_list.go index 32b4cd45f8..c55d02f3ee 100644 --- a/internal/gitaly/service/repository/create_bundle_from_ref_list.go +++ b/internal/gitaly/service/repository/create_bundle_from_ref_list.go @@ -18,30 +18,30 @@ func (s *server) CreateBundleFromRefList(stream gitalypb.RepositoryService_Creat if err != nil { return err } + if firstRequest.GetPatterns() != nil { + return structerr.NewInvalidArgument("patterns should not be set for the first request in the stream") + } repository := firstRequest.GetRepository() if err := s.locator.ValidateRepository(ctx, repository); err != nil { return structerr.NewInvalidArgument("%w", err) } + // Acknowledge the stream header for client to start the data stream. + if err := stream.Send(&gitalypb.CreateBundleFromRefListResponse{}); err != nil { + return err + } + repo := s.localrepo(repository) if err := housekeeping.CleanupWorktrees(ctx, repo); err != nil { return structerr.NewInternal("cleaning up worktrees: %w", err) } - firstRead := true patterns := streamio.NewReader(func() ([]byte, error) { - var request *gitalypb.CreateBundleFromRefListRequest - if firstRead { - firstRead = false - request = firstRequest - } else { - var err error - request, err = stream.Recv() - if err != nil { - return nil, err - } + request, err := stream.Recv() + if err != nil { + return nil, err } return append(bytes.Join(request.GetPatterns(), []byte("\n")), '\n'), nil }) diff --git a/internal/gitaly/service/repository/create_bundle_from_ref_list_test.go b/internal/gitaly/service/repository/create_bundle_from_ref_list_test.go index cccded0f9d..a6000e2d58 100644 --- a/internal/gitaly/service/repository/create_bundle_from_ref_list_test.go +++ b/internal/gitaly/service/repository/create_bundle_from_ref_list_test.go @@ -44,8 +44,15 @@ func TestCreateBundleFromRefList_success(t *testing.T) { c, err := client.CreateBundleFromRefList(ctx) require.NoError(t, err) + // Send only the repo first and get the acknowledgement require.NoError(t, c.Send(&gitalypb.CreateBundleFromRefListRequest{ Repository: repo, + })) + response, err := c.Recv() + require.NoError(t, err) + require.Empty(t, response.Data) // First response is only an acknowledgement without any data + + require.NoError(t, c.Send(&gitalypb.CreateBundleFromRefListRequest{ Patterns: [][]byte{ []byte("master"), []byte("^master~1"), @@ -83,8 +90,15 @@ func TestCreateBundleFromRefList_missing_ref(t *testing.T) { c, err := client.CreateBundleFromRefList(ctx) require.NoError(t, err) + // Send only the repo first and get the acknowledgement require.NoError(t, c.Send(&gitalypb.CreateBundleFromRefListRequest{ Repository: repo, + })) + response, err := c.Recv() + require.NoError(t, err) + require.Empty(t, response.Data) // First response is only an acknowledgement without any data + + require.NoError(t, c.Send(&gitalypb.CreateBundleFromRefListRequest{ Patterns: [][]byte{ []byte("refs/heads/master"), []byte("refs/heads/totally_missing"), @@ -110,45 +124,64 @@ func TestCreateBundleFromRefList_missing_ref(t *testing.T) { require.Contains(t, string(output), fmt.Sprintf("The bundle contains this ref:\n%s refs/heads/master", commitOID)) } -func TestCreateBundleFromRefList_validations(t *testing.T) { +func TestCreateBundleFromRefList_PatternsValidation(t *testing.T) { t.Parallel() ctx := testhelper.Context(t) cfg, client := setupRepositoryService(t) repo, _ := gittest.CreateRepository(t, ctx, cfg) - testCases := []struct { - desc string - request *gitalypb.CreateBundleFromRefListRequest - expectedErr error - }{ - { - desc: "empty repository", - request: &gitalypb.CreateBundleFromRefListRequest{ - Patterns: [][]byte{[]byte("master")}, - }, - expectedErr: structerr.NewInvalidArgument("%w", storage.ErrRepositoryNotSet), - }, - { - desc: "empty bundle", - request: &gitalypb.CreateBundleFromRefListRequest{ - Repository: repo, - Patterns: [][]byte{[]byte("master"), []byte("^master")}, - }, - expectedErr: status.Error(codes.FailedPrecondition, "create bundle: refusing to create empty bundle"), - }, - } + stream, err := client.CreateBundleFromRefList(ctx) + require.NoError(t, err) + + require.NoError(t, stream.Send(&gitalypb.CreateBundleFromRefListRequest{ + Repository: repo, + Patterns: [][]byte{[]byte("master")}, + })) + require.NoError(t, stream.CloseSend()) + _, err = stream.Recv() + testhelper.RequireGrpcError(t, structerr.NewInvalidArgument("patterns should not be set for the first request in the stream"), err) +} + +func TestCreateBundleFromRefList_RepositoryValidation(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + _, client := setupRepositoryService(t) + + stream, err := client.CreateBundleFromRefList(ctx) + require.NoError(t, err) + + require.NoError(t, stream.Send(&gitalypb.CreateBundleFromRefListRequest{})) + require.NoError(t, stream.CloseSend()) + _, err = stream.Recv() + testhelper.RequireGrpcError(t, structerr.NewInvalidArgument("%w", storage.ErrRepositoryNotSet), err) +} + +func TestCreateBundleFromRefList_BundleValidation(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg, client := setupRepositoryService(t) + repo, _ := gittest.CreateRepository(t, ctx, cfg) + + stream, err := client.CreateBundleFromRefList(ctx) + require.NoError(t, err) + + // Send only the repo first and get the acknowledgement + require.NoError(t, stream.Send(&gitalypb.CreateBundleFromRefListRequest{ + Repository: repo, + })) + response, err := stream.Recv() + require.NoError(t, err) + require.Empty(t, response.Data) // First response is only an acknowledgement without any data + + require.NoError(t, stream.Send(&gitalypb.CreateBundleFromRefListRequest{ + Patterns: [][]byte{[]byte("master"), []byte("^master")}, + })) - for _, testCase := range testCases { - t.Run(testCase.desc, func(t *testing.T) { - stream, err := client.CreateBundleFromRefList(ctx) - require.NoError(t, err) - - require.NoError(t, stream.Send(testCase.request)) - require.NoError(t, stream.CloseSend()) - for _, err = stream.Recv(); err == nil; _, err = stream.Recv() { - } - testhelper.RequireGrpcError(t, testCase.expectedErr, err) - }) + require.NoError(t, stream.CloseSend()) + for _, err = stream.Recv(); err == nil; _, err = stream.Recv() { } + testhelper.RequireGrpcError(t, status.Error(codes.FailedPrecondition, "create bundle: refusing to create empty bundle"), err) } -- GitLab From 3c0fcdefdc511847f983e539a5e8b9f3612a7bb3 Mon Sep 17 00:00:00 2001 From: Mustafa Bayar Date: Mon, 22 Jul 2024 15:08:37 +0200 Subject: [PATCH 2/2] repository: Wait acknowledgement for CreateBundleFromRefList CreateBundleFromRefList immediately starts streaming the data without waiting for acknowledgement first which makes it more difficult to retry write requests as the client may not be able to retry with another Gitaly as it may no longer have the data it is streaming. Also it becomes more tricky to understand the actual error returned from server if connection gets closed during the data stream. Client should wait for acknowledgement from server first. --- internal/backup/repository.go | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/internal/backup/repository.go b/internal/backup/repository.go index 73fd549bea..b9d75ea180 100644 --- a/internal/backup/repository.go +++ b/internal/backup/repository.go @@ -129,6 +129,22 @@ func (rr *remoteRepository) createBundlePatterns(ctx context.Context, out io.Wri if err != nil { return fmt.Errorf("remote repository: create bundle patterns: %w", err) } + + if err := stream.Send(&gitalypb.CreateBundleFromRefListRequest{ + Repository: rr.repo, + }); err != nil { + return fmt.Errorf("remote repository: create bundle patterns: %w", err) + } + + // If we immediately start streaming the data without waiting for acknowledgement, it can cause: + // - Hard to debug errors when an RPC is rejected due to an error such as repository not existing. + // - The client not being able to retry the request on another node if it already started streaming + // the data to the other node without buffering the data. + // Therefore, here we are waiting for the acknowledgement from the server before starting the data stream. + if _, err = stream.Recv(); err != nil { + return fmt.Errorf("remote repository: create bundle patterns stream header is not acknowledged: %w", err) + } + c := chunk.New(&createBundleFromRefListSender{ stream: stream, }) @@ -145,8 +161,7 @@ func (rr *remoteRepository) createBundlePatterns(ctx context.Context, out io.Wri line = bytes.TrimSuffix(line, []byte("\n")) if err := c.Send(&gitalypb.CreateBundleFromRefListRequest{ - Repository: rr.repo, - Patterns: [][]byte{line}, + Patterns: [][]byte{line}, }); err != nil { return fmt.Errorf("remote repository: create bundle patterns: %w", err) } -- GitLab