diff --git a/internal/backup/repository.go b/internal/backup/repository.go index 73fd549bead129008c967bc2740348d5e14ff8ef..b9d75ea1807152452d159e87b6f681b430daa3d3 100644 --- a/internal/backup/repository.go +++ b/internal/backup/repository.go @@ -129,6 +129,22 @@ func (rr *remoteRepository) createBundlePatterns(ctx context.Context, out io.Wri if err != nil { return fmt.Errorf("remote repository: create bundle patterns: %w", err) } + + if err := stream.Send(&gitalypb.CreateBundleFromRefListRequest{ + Repository: rr.repo, + }); err != nil { + return fmt.Errorf("remote repository: create bundle patterns: %w", err) + } + + // If we immediately start streaming the data without waiting for acknowledgement, it can cause: + // - Hard to debug errors when an RPC is rejected due to an error such as repository not existing. + // - The client not being able to retry the request on another node if it already started streaming + // the data to the other node without buffering the data. + // Therefore, here we are waiting for the acknowledgement from the server before starting the data stream. + if _, err = stream.Recv(); err != nil { + return fmt.Errorf("remote repository: create bundle patterns stream header is not acknowledged: %w", err) + } + c := chunk.New(&createBundleFromRefListSender{ stream: stream, }) @@ -145,8 +161,7 @@ func (rr *remoteRepository) createBundlePatterns(ctx context.Context, out io.Wri line = bytes.TrimSuffix(line, []byte("\n")) if err := c.Send(&gitalypb.CreateBundleFromRefListRequest{ - Repository: rr.repo, - Patterns: [][]byte{line}, + Patterns: [][]byte{line}, }); err != nil { return fmt.Errorf("remote repository: create bundle patterns: %w", err) } diff --git a/internal/gitaly/service/repository/create_bundle_from_ref_list.go b/internal/gitaly/service/repository/create_bundle_from_ref_list.go index 32b4cd45f8895d9034f5394e2a6393cdb97f6593..c55d02f3ee1bdafdc2d791e46bfb047e6da2e769 100644 --- a/internal/gitaly/service/repository/create_bundle_from_ref_list.go +++ b/internal/gitaly/service/repository/create_bundle_from_ref_list.go @@ -18,30 +18,30 @@ func (s *server) CreateBundleFromRefList(stream gitalypb.RepositoryService_Creat if err != nil { return err } + if firstRequest.GetPatterns() != nil { + return structerr.NewInvalidArgument("patterns should not be set for the first request in the stream") + } repository := firstRequest.GetRepository() if err := s.locator.ValidateRepository(ctx, repository); err != nil { return structerr.NewInvalidArgument("%w", err) } + // Acknowledge the stream header for client to start the data stream. + if err := stream.Send(&gitalypb.CreateBundleFromRefListResponse{}); err != nil { + return err + } + repo := s.localrepo(repository) if err := housekeeping.CleanupWorktrees(ctx, repo); err != nil { return structerr.NewInternal("cleaning up worktrees: %w", err) } - firstRead := true patterns := streamio.NewReader(func() ([]byte, error) { - var request *gitalypb.CreateBundleFromRefListRequest - if firstRead { - firstRead = false - request = firstRequest - } else { - var err error - request, err = stream.Recv() - if err != nil { - return nil, err - } + request, err := stream.Recv() + if err != nil { + return nil, err } return append(bytes.Join(request.GetPatterns(), []byte("\n")), '\n'), nil }) diff --git a/internal/gitaly/service/repository/create_bundle_from_ref_list_test.go b/internal/gitaly/service/repository/create_bundle_from_ref_list_test.go index cccded0f9d1310de154c8ca23f73125f24861771..a6000e2d58966cfe6cbb01b6b4ba6e224fbf6218 100644 --- a/internal/gitaly/service/repository/create_bundle_from_ref_list_test.go +++ b/internal/gitaly/service/repository/create_bundle_from_ref_list_test.go @@ -44,8 +44,15 @@ func TestCreateBundleFromRefList_success(t *testing.T) { c, err := client.CreateBundleFromRefList(ctx) require.NoError(t, err) + // Send only the repo first and get the acknowledgement require.NoError(t, c.Send(&gitalypb.CreateBundleFromRefListRequest{ Repository: repo, + })) + response, err := c.Recv() + require.NoError(t, err) + require.Empty(t, response.Data) // First response is only an acknowledgement without any data + + require.NoError(t, c.Send(&gitalypb.CreateBundleFromRefListRequest{ Patterns: [][]byte{ []byte("master"), []byte("^master~1"), @@ -83,8 +90,15 @@ func TestCreateBundleFromRefList_missing_ref(t *testing.T) { c, err := client.CreateBundleFromRefList(ctx) require.NoError(t, err) + // Send only the repo first and get the acknowledgement require.NoError(t, c.Send(&gitalypb.CreateBundleFromRefListRequest{ Repository: repo, + })) + response, err := c.Recv() + require.NoError(t, err) + require.Empty(t, response.Data) // First response is only an acknowledgement without any data + + require.NoError(t, c.Send(&gitalypb.CreateBundleFromRefListRequest{ Patterns: [][]byte{ []byte("refs/heads/master"), []byte("refs/heads/totally_missing"), @@ -110,45 +124,64 @@ func TestCreateBundleFromRefList_missing_ref(t *testing.T) { require.Contains(t, string(output), fmt.Sprintf("The bundle contains this ref:\n%s refs/heads/master", commitOID)) } -func TestCreateBundleFromRefList_validations(t *testing.T) { +func TestCreateBundleFromRefList_PatternsValidation(t *testing.T) { t.Parallel() ctx := testhelper.Context(t) cfg, client := setupRepositoryService(t) repo, _ := gittest.CreateRepository(t, ctx, cfg) - testCases := []struct { - desc string - request *gitalypb.CreateBundleFromRefListRequest - expectedErr error - }{ - { - desc: "empty repository", - request: &gitalypb.CreateBundleFromRefListRequest{ - Patterns: [][]byte{[]byte("master")}, - }, - expectedErr: structerr.NewInvalidArgument("%w", storage.ErrRepositoryNotSet), - }, - { - desc: "empty bundle", - request: &gitalypb.CreateBundleFromRefListRequest{ - Repository: repo, - Patterns: [][]byte{[]byte("master"), []byte("^master")}, - }, - expectedErr: status.Error(codes.FailedPrecondition, "create bundle: refusing to create empty bundle"), - }, - } + stream, err := client.CreateBundleFromRefList(ctx) + require.NoError(t, err) + + require.NoError(t, stream.Send(&gitalypb.CreateBundleFromRefListRequest{ + Repository: repo, + Patterns: [][]byte{[]byte("master")}, + })) + require.NoError(t, stream.CloseSend()) + _, err = stream.Recv() + testhelper.RequireGrpcError(t, structerr.NewInvalidArgument("patterns should not be set for the first request in the stream"), err) +} + +func TestCreateBundleFromRefList_RepositoryValidation(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + _, client := setupRepositoryService(t) + + stream, err := client.CreateBundleFromRefList(ctx) + require.NoError(t, err) + + require.NoError(t, stream.Send(&gitalypb.CreateBundleFromRefListRequest{})) + require.NoError(t, stream.CloseSend()) + _, err = stream.Recv() + testhelper.RequireGrpcError(t, structerr.NewInvalidArgument("%w", storage.ErrRepositoryNotSet), err) +} + +func TestCreateBundleFromRefList_BundleValidation(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg, client := setupRepositoryService(t) + repo, _ := gittest.CreateRepository(t, ctx, cfg) + + stream, err := client.CreateBundleFromRefList(ctx) + require.NoError(t, err) + + // Send only the repo first and get the acknowledgement + require.NoError(t, stream.Send(&gitalypb.CreateBundleFromRefListRequest{ + Repository: repo, + })) + response, err := stream.Recv() + require.NoError(t, err) + require.Empty(t, response.Data) // First response is only an acknowledgement without any data + + require.NoError(t, stream.Send(&gitalypb.CreateBundleFromRefListRequest{ + Patterns: [][]byte{[]byte("master"), []byte("^master")}, + })) - for _, testCase := range testCases { - t.Run(testCase.desc, func(t *testing.T) { - stream, err := client.CreateBundleFromRefList(ctx) - require.NoError(t, err) - - require.NoError(t, stream.Send(testCase.request)) - require.NoError(t, stream.CloseSend()) - for _, err = stream.Recv(); err == nil; _, err = stream.Recv() { - } - testhelper.RequireGrpcError(t, testCase.expectedErr, err) - }) + require.NoError(t, stream.CloseSend()) + for _, err = stream.Recv(); err == nil; _, err = stream.Recv() { } + testhelper.RequireGrpcError(t, status.Error(codes.FailedPrecondition, "create bundle: refusing to create empty bundle"), err) }