From c6f599c80e397ea1fb4fc8a8f247fdd65709c50e Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Fri, 6 May 2022 09:22:54 +0300 Subject: [PATCH 1/7] Resolve replica path in background verifier tests To test scenarios where the replica pointed to by a metadata record does not exist, the verifier deletes replicas from Gitalys as part of its test setup. It does so by pointing the calls to the relative path and does not resolve the replica path. This causes the tests to fail when Praefect generates unique replica paths. This commit resolves the replica path in the tests so the deletions always target the correct repositories. As the helpers are mostly designed with Gitaly in mind, the GetReplicaPath helper currently expects a Gitaly config which it will use to dial the service. In Praefect's context, we don't have such a config available nor do we even have the address of the server. This commit expands the helper to take a ClientConn in to use optionally as the other helpers do. --- internal/git/gittest/repo.go | 23 ++++++++++++++++++++--- internal/praefect/verifier_test.go | 6 ++++-- 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/internal/git/gittest/repo.go b/internal/git/gittest/repo.go index 020ae82718..88c8f93d40 100644 --- a/internal/git/gittest/repo.go +++ b/internal/git/gittest/repo.go @@ -178,13 +178,30 @@ func CreateRepository(ctx context.Context, t testing.TB, cfg config.Cfg, configs return clonedRepo, filepath.Join(storage.Path, getReplicaPath(ctx, t, conn, repository)) } +// GetReplicaPathConfig allows for configuring the GetReplicaPath call. +type GetReplicaPathConfig struct { + // ClientConn is the connection used to create the repository. If unset, the config is used to + // dial the service. + ClientConn *grpc.ClientConn +} + // GetReplicaPath retrieves the repository's replica path if the test has been // run with Praefect in front of it. This is necessary if the test creates a repository // through Praefect and peeks into the filesystem afterwards. Conn should be pointing to // Praefect. -func GetReplicaPath(ctx context.Context, t testing.TB, cfg config.Cfg, repo repository.GitRepo) string { - conn := dialService(ctx, t, cfg) - defer conn.Close() +func GetReplicaPath(ctx context.Context, t testing.TB, cfg config.Cfg, repo repository.GitRepo, opts ...GetReplicaPathConfig) string { + require.Less(t, len(opts), 2, "you must either pass no or exactly one option") + + var opt GetReplicaPathConfig + if len(opts) > 0 { + opt = opts[0] + } + + conn := opt.ClientConn + if conn == nil { + conn = dialService(ctx, t, cfg) + defer conn.Close() + } return getReplicaPath(ctx, t, conn, repo) } diff --git a/internal/praefect/verifier_test.go b/internal/praefect/verifier_test.go index 674089166c..33441ff072 100644 --- a/internal/praefect/verifier_test.go +++ b/internal/praefect/verifier_test.go @@ -546,11 +546,13 @@ func TestVerifier(t *testing.T) { for virtualStorage, relativePaths := range tc.replicas { for relativePath, storages := range relativePaths { // Create the expected repository. This creates all of the replicas transactionally. - gittest.CreateRepository(ctx, t, + repo, _ := gittest.CreateRepository(ctx, t, gitalyconfig.Cfg{Storages: []gitalyconfig.Storage{{Name: virtualStorage}}}, gittest.CreateRepositoryConfig{ClientConn: conn, RelativePath: relativePath}, ) + replicaPath := gittest.GetReplicaPath(ctx, t, gitalyconfig.Cfg{}, repo, gittest.GetReplicaPathConfig{ClientConn: conn}) + // Now remove the replicas that were created in the transaction but the test case // expects not to exist. We remove them directly from the Gitalys so the metadata // records are left in place. @@ -591,7 +593,7 @@ func TestVerifier(t *testing.T) { &gitalypb.RemoveRepositoryRequest{ Repository: &gitalypb.Repository{ StorageName: storage, - RelativePath: relativePath, + RelativePath: replicaPath, }, }, ) -- GitLab From e9f2924a3db3b3f64bdc810a53432b3fa6afedcb Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Wed, 23 Feb 2022 17:42:53 +0200 Subject: [PATCH 2/7] Derive identifiable replica paths for object pools Gitaly is relying on the @pools prefix in OptimizeRepository to avoid pruning object pools. Pruning object pools could lead to data loss if some pool members still need the pruned objects. To ensure Gitaly can identify object pools from the other repositories after their relative paths have been rewritten, this commit adds the DerivePoolPath function that will be used in the router to derive the replica paths for object pools. Doing so, the replica paths have been changed to include a @cluster prefix, which allows for grouping the cluster's repositories and pools under a common directory. Using the existing @Pools directory would also be possible as Rails hashes the repository ids where as Praefect doesn't. However, it's clearer to separate them in different directories and leave ownership to each service minting paths so they can ensure independently there are no conflicts. --- internal/git/housekeeping/object_pool.go | 4 ++- internal/git/housekeeping/object_pool_test.go | 10 ++++++ .../praefect/praefectutil/replica_path.go | 27 ++++++++++++-- .../praefectutil/replica_path_test.go | 36 +++++++++++++++++-- 4 files changed, 72 insertions(+), 5 deletions(-) diff --git a/internal/git/housekeeping/object_pool.go b/internal/git/housekeeping/object_pool.go index 1e4a935c89..4d0a27e4b2 100644 --- a/internal/git/housekeeping/object_pool.go +++ b/internal/git/housekeeping/object_pool.go @@ -3,6 +3,8 @@ package housekeeping import ( "regexp" "strings" + + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/praefectutil" ) // railsPoolDirRegexp is used to validate object pool directory structure and name as generated by Rails. @@ -21,5 +23,5 @@ func IsRailsPoolPath(relativePath string) bool { // IsPoolPath returns whether the relative path indicates the repository is an object // pool. func IsPoolPath(relativePath string) bool { - return IsRailsPoolPath(relativePath) + return IsRailsPoolPath(relativePath) || praefectutil.IsPoolPath(relativePath) } diff --git a/internal/git/housekeeping/object_pool_test.go b/internal/git/housekeeping/object_pool_test.go index 6fdded51f2..50c3201cd8 100644 --- a/internal/git/housekeeping/object_pool_test.go +++ b/internal/git/housekeeping/object_pool_test.go @@ -5,6 +5,7 @@ import ( "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/praefectutil" ) func TestIsPoolPath(t *testing.T) { @@ -18,6 +19,15 @@ func TestIsPoolPath(t *testing.T) { relativePath: gittest.NewObjectPoolName(t), isPoolPath: true, }, + { + desc: "praefect pool path", + relativePath: praefectutil.DerivePoolPath(1), + isPoolPath: true, + }, + { + desc: "praefect replica path", + relativePath: praefectutil.DeriveReplicaPath(1), + }, { desc: "empty string", }, diff --git a/internal/praefect/praefectutil/replica_path.go b/internal/praefect/praefectutil/replica_path.go index 5f35e1e814..576762d224 100644 --- a/internal/praefect/praefectutil/replica_path.go +++ b/internal/praefect/praefectutil/replica_path.go @@ -3,18 +3,41 @@ package praefectutil import ( "crypto/sha256" "fmt" + "path/filepath" "strconv" + "strings" ) +// poolPathPrefix is the prefix directory where Praefect places object pools. +const poolPathPrefix = "@cluster/pools/" + +// IsPoolPath returns whether the relative path indicates this is a Praefect generated object pool path. +func IsPoolPath(relativePath string) bool { + return strings.HasPrefix(relativePath, poolPathPrefix) +} + // DeriveReplicaPath derives a repository's disk storage path from its repository ID. The repository ID // is hashed with SHA256 and the first four hex digits of the hash are used as the two subdirectories to -// ensure even distribution into subdirectories. The format is @repositories/ab/cd/. +// ensure even distribution into subdirectories. The format is @cluster/repositories/ab/cd/. func DeriveReplicaPath(repositoryID int64) string { + return deriveDiskPath("@cluster/repositories", repositoryID) +} + +// DerivePoolPath derives an object pools's disk storage path from its repository ID. The repository ID +// is hashed with SHA256 and the first four hex digits of the hash are used as the two subdirectories to +// ensure even distribution into subdirectories. The format is @cluster/pools/ab/cd/. The pools +// have a different directory prefix from other repositories so Gitaly can identify them in OptimizeRepository +// and avoid pruning them. +func DerivePoolPath(repositoryID int64) string { + return deriveDiskPath(poolPathPrefix, repositoryID) +} + +func deriveDiskPath(prefixDir string, repositoryID int64) string { hasher := sha256.New() // String representation of the ID is used to make it easier to derive the replica paths with // external tools. The error is ignored as the hash.Hash interface is documented to never return // an error. hasher.Write([]byte(strconv.FormatInt(repositoryID, 10))) hash := hasher.Sum(nil) - return fmt.Sprintf("@repositories/%x/%x/%d", hash[0:1], hash[1:2], repositoryID) + return filepath.Join(prefixDir, fmt.Sprintf("%x/%x/%d", hash[0:1], hash[1:2], repositoryID)) } diff --git a/internal/praefect/praefectutil/replica_path_test.go b/internal/praefect/praefectutil/replica_path_test.go index 6084c13aea..572dfc2ed2 100644 --- a/internal/praefect/praefectutil/replica_path_test.go +++ b/internal/praefect/praefectutil/replica_path_test.go @@ -4,9 +4,41 @@ import ( "testing" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest" ) func TestDeriveReplicaPath(t *testing.T) { - require.Equal(t, "@repositories/6b/86/1", DeriveReplicaPath(1)) - require.Equal(t, "@repositories/d4/73/2", DeriveReplicaPath(2)) + require.Equal(t, "@cluster/repositories/6b/86/1", DeriveReplicaPath(1)) + require.Equal(t, "@cluster/repositories/d4/73/2", DeriveReplicaPath(2)) +} + +func TestDerivePoolPath(t *testing.T) { + require.Equal(t, "@cluster/pools/6b/86/1", DerivePoolPath(1)) + require.Equal(t, "@cluster/pools/d4/73/2", DerivePoolPath(2)) +} + +func TestIsPoolPath(t *testing.T) { + for _, tc := range []struct { + desc string + relativePath string + isPoolPath bool + }{ + { + desc: "praefect pool path", + relativePath: DerivePoolPath(1), + isPoolPath: true, + }, + { + desc: "praefect replica path", + relativePath: DeriveReplicaPath(1), + }, + { + desc: "rails pool path", + relativePath: gittest.NewObjectPoolName(t), + }, + } { + t.Run(tc.desc, func(t *testing.T) { + require.Equal(t, tc.isPoolPath, IsPoolPath(tc.relativePath)) + }) + } } -- GitLab From 6ef60ea1d66fed382059e2dd8b66c12d41bc5ca1 Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Fri, 22 Oct 2021 15:51:33 +0300 Subject: [PATCH 3/7] Add RenameRepositoryInPlace to RepositoryStore This commit adds RenameRepositoryInPlace to RepositoryStore. The new method will replace RenameRepository which can be removed in a later release. Unlike RenameRepository, renames doesn't change the replica path stored in the database. As Praefect is now routing using the replica_path, this enables Praefect to rename repositories atomically in the database without necessiating any disk changes. The old RenameRepository is left in place so any in-flight rename jobs can still be processed by it. --- .../praefect/datastore/repository_store.go | 36 +++++++++++++++ .../datastore/repository_store_mock.go | 6 +++ .../datastore/repository_store_test.go | 44 +++++++++++++++++++ 3 files changed, 86 insertions(+) diff --git a/internal/praefect/datastore/repository_store.go b/internal/praefect/datastore/repository_store.go index 780073155a..35a8a6a682 100644 --- a/internal/praefect/datastore/repository_store.go +++ b/internal/praefect/datastore/repository_store.go @@ -118,6 +118,9 @@ type RepositoryStore interface { // as the storage's which is calling it. Returns RepositoryNotExistsError when trying to rename a repository // which has no record in the virtual storage or the storage. RenameRepository(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error + // RenameRepositoryInPlace renames the repository in the database without changing the replica path. This will replace + // RenameRepository which can be removed in a later release. + RenameRepositoryInPlace(ctx context.Context, virtualStorage, relativePath, newRelativePath string) error // GetConsistentStoragesByRepositoryID returns the replica path and the set of up to date storages for the given repository keyed by repository ID. GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (string, map[string]struct{}, error) ConsistentStoragesGetter @@ -535,6 +538,39 @@ AND storage = $2 return nil } +// RenameRepositoryInPlace renames the repository in the database without changing the replica path. This will replace +// RenameRepository which can be removed in a later release. +func (rs *PostgresRepositoryStore) RenameRepositoryInPlace(ctx context.Context, virtualStorage, relativePath, newRelativePath string) error { + result, err := rs.db.ExecContext(ctx, ` +WITH repository AS ( + UPDATE repositories + SET relative_path = $3 + WHERE virtual_storage = $1 + AND relative_path = $2 + RETURNING repository_id +) + +UPDATE storage_repositories +SET relative_path = $3 +WHERE repository_id = (SELECT repository_id FROM repository) + `, virtualStorage, relativePath, newRelativePath) + if err != nil { + if glsql.IsUniqueViolation(err, "repository_lookup_index") { + return commonerr.ErrRepositoryAlreadyExists + } + + return fmt.Errorf("query: %w", err) + } + + if rowsAffected, err := result.RowsAffected(); err != nil { + return fmt.Errorf("rows affected: %w", err) + } else if rowsAffected == 0 { + return commonerr.ErrRepositoryNotFound + } + + return nil +} + //nolint: revive,stylecheck // This is unintentionally missing documentation. func (rs *PostgresRepositoryStore) RenameRepository(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error { const q = ` diff --git a/internal/praefect/datastore/repository_store_mock.go b/internal/praefect/datastore/repository_store_mock.go index 2a021661d9..42acafb6ec 100644 --- a/internal/praefect/datastore/repository_store_mock.go +++ b/internal/praefect/datastore/repository_store_mock.go @@ -14,6 +14,7 @@ type MockRepositoryStore struct { SetAuthoritativeReplicaFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error DeleteRepositoryFunc func(ctx context.Context, virtualStorage, relativePath string) (string, []string, error) DeleteReplicaFunc func(ctx context.Context, repositoryID int64, storage string) error + RenameRepositoryInPlaceFunc func(ctx context.Context, virtualStorage, relativePath, newRelativePath string) error RenameRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error GetConsistentStoragesByRepositoryIDFunc func(ctx context.Context, repositoryID int64) (string, map[string]struct{}, error) GetConsistentStoragesFunc func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) @@ -99,6 +100,11 @@ func (m MockRepositoryStore) DeleteReplica(ctx context.Context, repositoryID int return m.DeleteReplicaFunc(ctx, repositoryID, storage) } +// RenameRepositoryInPlace runs the mock's RenameRepositoryInPlaceFunc. +func (m MockRepositoryStore) RenameRepositoryInPlace(ctx context.Context, virtualStorage, relativePath, newRelativePath string) error { + return m.RenameRepositoryInPlaceFunc(ctx, virtualStorage, relativePath, newRelativePath) +} + //nolint: revive,stylecheck // This is unintentionally missing documentation. func (m MockRepositoryStore) RenameRepository(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error { if m.RenameRepositoryFunc == nil { diff --git a/internal/praefect/datastore/repository_store_test.go b/internal/praefect/datastore/repository_store_test.go index 9c980376c5..afdfcdb14d 100644 --- a/internal/praefect/datastore/repository_store_test.go +++ b/internal/praefect/datastore/repository_store_test.go @@ -765,6 +765,50 @@ func TestRepositoryStore_Postgres(t *testing.T) { }) }) + t.Run("RenameRepositoryInPlace", func(t *testing.T) { + t.Run("rename non-existing", func(t *testing.T) { + rs := newRepositoryStore(t, nil) + + require.Equal(t, + commonerr.ErrRepositoryNotFound, + rs.RenameRepositoryInPlace(ctx, vs, repo, "new-relative-path"), + ) + }) + + t.Run("destination exists", func(t *testing.T) { + rs := newRepositoryStore(t, nil) + + require.NoError(t, rs.CreateRepository(ctx, 1, vs, "relative-path-1", "replica-path-1", "primary", nil, nil, true, false)) + require.NoError(t, rs.CreateRepository(ctx, 2, vs, "relative-path-2", "replica-path-2", "primary", nil, nil, true, false)) + + require.Equal(t, + commonerr.ErrRepositoryAlreadyExists, + rs.RenameRepositoryInPlace(ctx, vs, "relative-path-1", "relative-path-2"), + ) + }) + + t.Run("successfully renamed", func(t *testing.T) { + rs := newRepositoryStore(t, nil) + + require.NoError(t, rs.CreateRepository(ctx, 1, vs, "original-relative-path", "original-replica-path", "primary", nil, nil, false, false)) + require.NoError(t, rs.RenameRepositoryInPlace(ctx, vs, "original-relative-path", "renamed-relative-path")) + requireState(t, ctx, db, + virtualStorageState{ + vs: { + "renamed-relative-path": {repositoryID: 1, replicaPath: "original-replica-path"}, + }, + }, + storageState{ + vs: { + "renamed-relative-path": { + "primary": {repositoryID: 1}, + }, + }, + }, + ) + }) + }) + t.Run("RenameRepository", func(t *testing.T) { t.Run("rename non-existing", func(t *testing.T) { rs := newRepositoryStore(t, nil) -- GitLab From 3e92c2cf29a757444e11b39f61e58a8820751e2a Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Mon, 25 Oct 2021 15:59:11 +0300 Subject: [PATCH 4/7] Intercept RenameRepository calls in Praefect Praefect currently handles RenameRepository like every other mutator. This is problematic as Praefect ends up renaming repositories first on the disks of the Gitalys and then in the database. If only the first operation succeeds, Praefect has effectively lost track of the repositories. With Praefect now generating unique relative paths for each repository, there's no need to actually move the repositories on the disks anymore. It is sufficient to rename the repository only in the database and leave the replicas in their existing locations on disk. This commit intercepts RenameRepository in Praefect and renames repositories only in the database. The atomic rename handling will be rolled out progressively with Praefect generated replica paths as the change does not make sense on its own. To do so, the RenameRepositoryFeatureFlagger checks the replica path of the repository and only applies the new logic if the repository has a Praefect generated replica path. Changelog: fixed --- .../gitaly/service/repository/rename_test.go | 15 +- internal/praefect/rename_repository.go | 160 ++++++++++++++++++ internal/praefect/server.go | 7 + internal/praefect/server_test.go | 126 +++++++------- 4 files changed, 233 insertions(+), 75 deletions(-) create mode 100644 internal/praefect/rename_repository.go diff --git a/internal/gitaly/service/repository/rename_test.go b/internal/gitaly/service/repository/rename_test.go index 9983eb8946..6c4f2b5207 100644 --- a/internal/gitaly/service/repository/rename_test.go +++ b/internal/gitaly/service/repository/rename_test.go @@ -70,9 +70,6 @@ func TestRenameRepository_DestinationExists(t *testing.T) { } func TestRenameRepository_invalidRequest(t *testing.T) { - // Prafect applies renames to metadata even on failed requests, which fails this test. - testhelper.SkipWithPraefect(t, "https://gitlab.com/gitlab-org/gitaly/-/issues/4003") - t.Parallel() ctx := testhelper.Context(t) @@ -87,7 +84,7 @@ func TestRenameRepository_invalidRequest(t *testing.T) { { desc: "empty repository", req: &gitalypb.RenameRepositoryRequest{Repository: nil, RelativePath: "/tmp/abc"}, - exp: status.Error(codes.InvalidArgument, gitalyOrPraefect("empty Repository", "repo scoped: empty Repository")), + exp: status.Error(codes.InvalidArgument, "empty Repository"), }, { desc: "empty destination relative path", @@ -101,20 +98,20 @@ func TestRenameRepository_invalidRequest(t *testing.T) { }, { desc: "repository storage doesn't exist", - req: &gitalypb.RenameRepositoryRequest{Repository: &gitalypb.Repository{StorageName: "stub", RelativePath: repo.RelativePath}, RelativePath: "../usr/bin"}, - exp: status.Error(codes.InvalidArgument, gitalyOrPraefect(`GetStorageByName: no such storage: "stub"`, "repo scoped: invalid Repository")), + req: &gitalypb.RenameRepositoryRequest{Repository: &gitalypb.Repository{StorageName: "stub", RelativePath: repo.RelativePath}, RelativePath: "usr/bin"}, + exp: status.Error(codes.InvalidArgument, `GetStorageByName: no such storage: "stub"`), }, { desc: "repository relative path doesn't exist", - req: &gitalypb.RenameRepositoryRequest{Repository: &gitalypb.Repository{StorageName: repo.StorageName, RelativePath: "stub"}, RelativePath: "../usr/bin"}, - exp: status.Error(codes.NotFound, fmt.Sprintf(`GetRepoPath: not a git repository: "%s/stub"`, storagePath)), + req: &gitalypb.RenameRepositoryRequest{Repository: &gitalypb.Repository{StorageName: repo.StorageName, RelativePath: "stub"}, RelativePath: "non-existent/directory"}, + exp: status.Error(codes.NotFound, fmt.Sprintf(`GetRepoPath: not a git repository: "%s/stub"`, gitalyOrPraefect(storagePath, repo.GetStorageName()))), }, } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { _, err := client.RenameRepository(ctx, tc.req) - testhelper.RequireGrpcError(t, err, tc.exp) + testhelper.RequireGrpcError(t, tc.exp, err) }) } } diff --git a/internal/praefect/rename_repository.go b/internal/praefect/rename_repository.go new file mode 100644 index 0000000000..495b18e4bc --- /dev/null +++ b/internal/praefect/rename_repository.go @@ -0,0 +1,160 @@ +package praefect + +import ( + "errors" + "fmt" + "strings" + + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v14/internal/helper" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/grpc-proxy/proxy" + "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" + "google.golang.org/grpc" +) + +type renamePeeker struct { + grpc.ServerStream + peeked *gitalypb.RenameRepositoryRequest +} + +func (peeker *renamePeeker) RecvMsg(msg interface{}) error { + // On the first read, we'll return the peeked first message of the stream. + if peeker.peeked != nil { + peeked := peeker.peeked + peeker.peeked = nil + + codec := proxy.NewCodec() + payload, err := codec.Marshal(peeked) + if err != nil { + return fmt.Errorf("marshaling peeked rename request: %w", err) + } + + return codec.Unmarshal(payload, msg) + } + + return peeker.ServerStream.RecvMsg(msg) +} + +func validateRenameRepositoryRequest(req *gitalypb.RenameRepositoryRequest, virtualStorages map[string]struct{}) error { + // These checks are not strictly necessary but they exist to keep retain compatibility with + // Gitaly's tested behavior. + if req.GetRepository() == nil { + return helper.ErrInvalidArgumentf("empty Repository") + } else if req.GetRelativePath() == "" { + return helper.ErrInvalidArgumentf("destination relative path is empty") + } else if _, ok := virtualStorages[req.GetRepository().GetStorageName()]; !ok { + return helper.ErrInvalidArgumentf("GetStorageByName: no such storage: %q", req.GetRepository().GetStorageName()) + } else if _, err := storage.ValidateRelativePath("/fake-root", req.GetRelativePath()); err != nil { + // Gitaly uses ValidateRelativePath to verify there are no traversals, so we use the same function + // here. Praefect is not susceptible to path traversals as it generates its own disk paths but we + // do this to retain API compatibility with Gitaly. ValidateRelativePath checks for traversals by + // seeing whether the relative path escapes the root directory. It's not possible to traverse up + // from the /, so the traversals in the path wouldn't be caught. To allow for the check to work, + // we use the /fake-root directory simply to notice if there were traversals in the path. + return helper.ErrInvalidArgumentf("GetRepoPath: %s", err) + } + + return nil +} + +// RenameRepositoryFeatureFlagger decides whether Praefect should handle the rename request or whether it should +// be proxied to a Gitaly. Rolling out Praefect generated replica paths is difficult as the atomicity fixes depend on the +// unique replica paths. If the unique replica paths are disabled, the in-place rename handling makes no longer sense either. +// Since they don't work isolation, this method decides which handling is used based on whether the repository is using a Praefect +// generated replica path or not. Repositories with client set paths are handled non-atomically by proxying to Gitalys. +// The Praefect generated paths are always handled with the atomic handling, regardless whether the feature flag is disabled +// later. +// +// This function peeks the first request and forwards the call either to a Gitaly or handles it in Praefect. This requires +// peeking into the internals of the proxying so we can set restore the frame correctly. +func RenameRepositoryFeatureFlagger(virtualStorageNames []string, rs datastore.RepositoryStore, handleRenameRepository grpc.StreamHandler) grpc.StreamServerInterceptor { + virtualStorages := make(map[string]struct{}, len(virtualStorageNames)) + for _, virtualStorage := range virtualStorageNames { + virtualStorages[virtualStorage] = struct{}{} + } + + return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + if info.FullMethod != "/gitaly.RepositoryService/RenameRepository" { + return handler(srv, stream) + } + + // Peek the message + var request gitalypb.RenameRepositoryRequest + if err := stream.RecvMsg(&request); err != nil { + return fmt.Errorf("peek rename repository request: %w", err) + } + + // In order for the handlers to work after the message is peeked, the stream is restored + // with an alternative implementation that returns the first message correctly. + stream = &renamePeeker{ServerStream: stream, peeked: &request} + + if err := validateRenameRepositoryRequest(&request, virtualStorages); err != nil { + return err + } + + repo := request.GetRepository() + repositoryID, err := rs.GetRepositoryID(stream.Context(), repo.GetStorageName(), repo.GetRelativePath()) + if err != nil { + if errors.As(err, new(commonerr.RepositoryNotFoundError)) { + return helper.ErrNotFoundf("GetRepoPath: not a git repository: \"%s/%s\"", repo.GetStorageName(), repo.GetRelativePath()) + } + + return fmt.Errorf("get repository id: %w", err) + } + + replicaPath, err := rs.GetReplicaPath(stream.Context(), repositoryID) + if err != nil { + return fmt.Errorf("get replica path: %w", err) + } + + // Repositories that do not have a Praefect generated replica path are always handled in the old manner. + // Once the feature flag is removed, all of the repositories will be handled in the atomic manner. + if !strings.HasPrefix(replicaPath, "@cluster") { + return handler(srv, stream) + } + + return handleRenameRepository(srv, stream) + } +} + +// RenameRepositoryHandler handles /gitaly.RepositoryService/RenameRepository calls by renaming +// the repository in the lookup table stored in the database. +func RenameRepositoryHandler(virtualStoragesNames []string, rs datastore.RepositoryStore) grpc.StreamHandler { + virtualStorages := make(map[string]struct{}, len(virtualStoragesNames)) + for _, virtualStorage := range virtualStoragesNames { + virtualStorages[virtualStorage] = struct{}{} + } + + return func(srv interface{}, stream grpc.ServerStream) error { + var req gitalypb.RenameRepositoryRequest + if err := stream.RecvMsg(&req); err != nil { + return fmt.Errorf("receive request: %w", err) + } + + if err := validateRenameRepositoryRequest(&req, virtualStorages); err != nil { + return err + } + + if err := rs.RenameRepositoryInPlace(stream.Context(), + req.GetRepository().GetStorageName(), + req.GetRepository().GetRelativePath(), + req.GetRelativePath(), + ); err != nil { + if errors.Is(err, commonerr.ErrRepositoryNotFound) { + return helper.ErrNotFoundf( + `GetRepoPath: not a git repository: "%s/%s"`, + req.GetRepository().GetStorageName(), + req.GetRepository().GetRelativePath(), + ) + } else if errors.Is(err, commonerr.ErrRepositoryAlreadyExists) { + return helper.ErrAlreadyExistsf("target repo exists already") + } + + return helper.ErrInternal(err) + } + + return stream.SendMsg(&gitalypb.RenameRepositoryResponse{}) + } +} diff --git a/internal/praefect/server.go b/internal/praefect/server.go index d758102464..d894bb3f3a 100644 --- a/internal/praefect/server.go +++ b/internal/praefect/server.go @@ -111,6 +111,13 @@ func NewGRPCServer( panichandler.StreamPanicHandler, } + if conf.Failover.ElectionStrategy == config.ElectionStrategyPerRepository { + streamInterceptors = append( + streamInterceptors, + RenameRepositoryFeatureFlagger(conf.VirtualStorageNames(), rs, RenameRepositoryHandler(conf.VirtualStorageNames(), rs)), + ) + } + grpcOpts = append(grpcOpts, proxyRequiredOpts(director)...) grpcOpts = append(grpcOpts, []grpc.ServerOption{ grpc.StreamInterceptor(grpcmw.ChainStreamServer(streamInterceptors...)), diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index c3681a34c4..a47e825299 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -7,8 +7,6 @@ import ( "io" "math/rand" "net" - "os" - "path/filepath" "sort" "strings" "sync" @@ -26,8 +24,8 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v14/internal/helper" - "gitlab.com/gitlab-org/gitaly/v14/internal/helper/text" "gitlab.com/gitlab-org/gitaly/v14/internal/listenmux" + "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/grpc-proxy/proxy" @@ -568,43 +566,20 @@ func TestRemoveRepository(t *testing.T) { verifyReposExistence(t, codes.NotFound) } -func pollUntilRemoved(t testing.TB, path string, deadline <-chan time.Time) { - for { - select { - case <-deadline: - require.Failf(t, "unable to detect path removal for %s", path) - default: - _, err := os.Stat(path) - if os.IsNotExist(err) { - return - } - require.NoError(t, err, "unexpected error while checking path %s", path) - } - time.Sleep(time.Millisecond) - } -} - func TestRenameRepository(t *testing.T) { - t.Parallel() - ctx := testhelper.Context(t) + testhelper.NewFeatureSets(featureflag.PraefectGeneratedReplicaPaths).Run(t, testRenameRepository) +} +func testRenameRepository(t *testing.T, ctx context.Context) { gitalyStorages := []string{"gitaly-1", "gitaly-2", "gitaly-3"} - repoPaths := make([]string, len(gitalyStorages)) praefectCfg := config.Config{ VirtualStorages: []*config.VirtualStorage{{Name: "praefect"}}, Failover: config.Failover{Enabled: true, ElectionStrategy: config.ElectionStrategyPerRepository}, } - var repo *gitalypb.Repository - for i, storageName := range gitalyStorages { - const relativePath = "test-repository" - + for _, storageName := range gitalyStorages { cfgBuilder := testcfg.NewGitalyCfgBuilder(testcfg.WithStorages(storageName)) - gitalyCfg, repos := cfgBuilder.BuildWithRepoAt(t, relativePath) - if repo == nil { - repo = repos[0] - } - + gitalyCfg := cfgBuilder.Build(t) gitalyAddr := testserver.RunGitalyServer(t, gitalyCfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) praefectCfg.VirtualStorages[0].Nodes = append(praefectCfg.VirtualStorages[0].Nodes, &config.Node{ @@ -612,76 +587,95 @@ func TestRenameRepository(t *testing.T) { Address: gitalyAddr, Token: gitalyCfg.Auth.Token, }) - - repoPaths[i] = filepath.Join(gitalyCfg.Storages[0].Path, relativePath) } evq := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t))) - tx := testdb.New(t).Begin(t) - defer tx.Rollback(t) + db := testdb.New(t) + + rs := datastore.NewPostgresRepositoryStore(db, nil) - rs := datastore.NewPostgresRepositoryStore(tx, nil) - require.NoError(t, rs.CreateRepository(ctx, 1, "praefect", repo.RelativePath, repo.RelativePath, "gitaly-1", []string{"gitaly-2", "gitaly-3"}, nil, true, false)) + txManager := transactions.NewManager(praefectCfg) + logger := testhelper.NewDiscardingLogEntry(t) + clientHandshaker := backchannel.NewClientHandshaker( + logger, + NewBackchannelServerFactory( + logger, + transaction.NewServer(txManager), + nil, + ), + ) - nodeSet, err := DialNodes(ctx, praefectCfg.VirtualStorages, nil, nil, nil, nil) + nodeSet, err := DialNodes(ctx, praefectCfg.VirtualStorages, nil, nil, clientHandshaker, nil) require.NoError(t, err) defer nodeSet.Close() - testdb.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect": praefectCfg.StorageNames()}) - cc, _, cleanup := runPraefectServer(t, ctx, praefectCfg, buildOptions{ withQueue: evq, withRepoStore: rs, withRouter: NewPerRepositoryRouter( nodeSet.Connections(), - nodes.NewPerRepositoryElector(tx), + nodes.NewPerRepositoryElector(db), StaticHealthChecker(praefectCfg.StorageNames()), NewLockedRandom(rand.New(rand.NewSource(0))), rs, - datastore.NewAssignmentStore(tx, praefectCfg.StorageNames()), + datastore.NewAssignmentStore(db, praefectCfg.StorageNames()), rs, nil, ), + withTxMgr: txManager, }) - defer cleanup() + t.Cleanup(cleanup) - // virtualRepo is a virtual repository all requests to it would be applied to the underline Gitaly nodes behind it - virtualRepo := proto.Clone(repo).(*gitalypb.Repository) - virtualRepo.StorageName = praefectCfg.VirtualStorages[0].Name + virtualRepo1, _ := gittest.CreateRepository(ctx, t, gconfig.Cfg{ + Storages: []gconfig.Storage{{Name: "praefect"}}, + }, gittest.CreateRepositoryConfig{ClientConn: cc}) + + virtualRepo2, _ := gittest.CreateRepository(ctx, t, gconfig.Cfg{ + Storages: []gconfig.Storage{{Name: "praefect"}}, + }, gittest.CreateRepositoryConfig{ClientConn: cc}) + + const newRelativePath = "unused-relative-path" repoServiceClient := gitalypb.NewRepositoryServiceClient(cc) - newName, err := text.RandomHex(20) - require.NoError(t, err) + _, err = repoServiceClient.RenameRepository(ctx, &gitalypb.RenameRepositoryRequest{ + Repository: &gitalypb.Repository{ + StorageName: virtualRepo1.StorageName, + RelativePath: "not-found", + }, + RelativePath: virtualRepo2.RelativePath, + }) + testhelper.RequireGrpcError(t, helper.ErrNotFoundf(`GetRepoPath: not a git repository: "praefect/not-found"`), err) + + _, err = repoServiceClient.RenameRepository(ctx, &gitalypb.RenameRepositoryRequest{ + Repository: virtualRepo1, + RelativePath: virtualRepo2.RelativePath, + }) + + expectedErr := helper.ErrAlreadyExistsf("target repo exists already") + testhelper.RequireGrpcError(t, expectedErr, err) _, err = repoServiceClient.RenameRepository(ctx, &gitalypb.RenameRepositoryRequest{ - Repository: virtualRepo, - RelativePath: newName, + Repository: virtualRepo1, + RelativePath: newRelativePath, }) require.NoError(t, err) resp, err := repoServiceClient.RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{ - Repository: virtualRepo, + Repository: virtualRepo1, }) require.NoError(t, err) require.False(t, resp.GetExists(), "repo with old name must gone") - // as we renamed the repo we need to update RelativePath before we could check if it exists - renamedVirtualRepo := virtualRepo - renamedVirtualRepo.RelativePath = newName - - // wait until replication jobs propagate changes to other storages - // as we don't know which one will be used to check because of reads distribution - require.NoError(t, evq.Wait(time.Minute, func(i *datastore.ReplicationEventQueueInterceptor) bool { - return len(i.GetAcknowledge()) == 2 - })) - - for _, oldLocation := range repoPaths { - pollUntilRemoved(t, oldLocation, time.After(10*time.Second)) - newLocation := filepath.Join(filepath.Dir(oldLocation), newName) - require.DirExists(t, newLocation, "must be renamed on secondary from %q to %q", oldLocation, newLocation) - } + resp, err = repoServiceClient.RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{ + Repository: &gitalypb.Repository{ + StorageName: virtualRepo1.StorageName, + RelativePath: newRelativePath, + }, + }) + require.NoError(t, err) + require.True(t, resp.GetExists(), "repo with new name must exist") } type mockSmartHTTP struct { -- GitLab From aec4a9343a82c80b919bb081a86fa60d6115a6c7 Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Thu, 21 Oct 2021 17:28:33 +0300 Subject: [PATCH 5/7] Generate unique replica paths for repositories Renaming, creating and deleting repositories is racy in Praefect. They can also partially fail in awkward ways due to Praefect first applying the operations on the disks of the Gitalys and later updating its metadata store. In order to make these operations atomic, there's been ongoing work in the background to make it possible to perform these in the database in a manner that races are not possible and partial failures do not end up conflicting with future operations. Renames can be fixed by doing the rename atomically in the database without moving any repositories on the disks. Deletes can be modeled as simply deleting the repository's database record. Creates are atomic by creating the repository's database record as the last step in the process. The last piece of the puzzle is to ensure repositories always land in different directories on the disk. This ensures that a partial failure doesn't block a future operation from succeeding. This commit implements that piece by deriving a unique path for each repository to store their replicas. Existing repositories stay where they are but newly created repositories will land in unique directories. Create might succeed on a disk but fail to be persisted in the database. Prior to this change, attempting to recreate a repository might fail due to the stale state on the disk. With this in place, the next attempt at creating the repository would still succeed as the new attempt would land the repository in a different directory on the disk. Deletes have the same problem prior to this commit. The repository's metadata may be successfully deleted but if we fail to delete the repository from the disk, future creations and renames may fail if they conflict with the stale state. As creates now always land in a different directory, the stale state no longer causes problems. Renames will work purely in the database, so any stale state on the disk will not affect them. Changelog: fixed --- .../gitaly/service/repository/rename_test.go | 17 +++++++++-- .../ff_praefect_generated_paths.go | 4 +++ internal/praefect/coordinator.go | 6 ++-- internal/praefect/coordinator_test.go | 1 + internal/praefect/router_per_repository.go | 15 ++++++++-- .../praefect/router_per_repository_test.go | 29 ++++++++++++------- internal/testhelper/testhelper.go | 3 ++ 7 files changed, 58 insertions(+), 17 deletions(-) create mode 100644 internal/metadata/featureflag/ff_praefect_generated_paths.go diff --git a/internal/gitaly/service/repository/rename_test.go b/internal/gitaly/service/repository/rename_test.go index 6c4f2b5207..f28657fb85 100644 --- a/internal/gitaly/service/repository/rename_test.go +++ b/internal/gitaly/service/repository/rename_test.go @@ -1,6 +1,7 @@ package repository import ( + "context" "fmt" "os" "path/filepath" @@ -11,6 +12,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/git" "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" @@ -19,8 +21,11 @@ import ( ) func TestRenameRepository_success(t *testing.T) { + testhelper.NewFeatureSets(featureflag.PraefectGeneratedReplicaPaths).Run(t, testRenameRepositorySuccess) +} + +func testRenameRepositorySuccess(t *testing.T, ctx context.Context) { t.Parallel() - ctx := testhelper.Context(t) // Praefect does not move repositories on the disk so this test case is not run with Praefect. cfg, repo, _, client := setupRepositoryService(ctx, t, testserver.WithDisablePraefect()) @@ -43,8 +48,11 @@ func TestRenameRepository_success(t *testing.T) { } func TestRenameRepository_DestinationExists(t *testing.T) { + testhelper.NewFeatureSets(featureflag.PraefectGeneratedReplicaPaths).Run(t, testRenameRepositoryDestinationExists) +} + +func testRenameRepositoryDestinationExists(t *testing.T, ctx context.Context) { t.Parallel() - ctx := testhelper.Context(t) cfg, client := setupRepositoryServiceWithoutRepo(t) @@ -70,8 +78,11 @@ func TestRenameRepository_DestinationExists(t *testing.T) { } func TestRenameRepository_invalidRequest(t *testing.T) { + testhelper.NewFeatureSets(featureflag.PraefectGeneratedReplicaPaths).Run(t, testRenameRepositoryInvalidRequest) +} + +func testRenameRepositoryInvalidRequest(t *testing.T, ctx context.Context) { t.Parallel() - ctx := testhelper.Context(t) _, repo, repoPath, client := setupRepositoryService(ctx, t) storagePath := strings.TrimSuffix(repoPath, "/"+repo.RelativePath) diff --git a/internal/metadata/featureflag/ff_praefect_generated_paths.go b/internal/metadata/featureflag/ff_praefect_generated_paths.go new file mode 100644 index 0000000000..4109bd6362 --- /dev/null +++ b/internal/metadata/featureflag/ff_praefect_generated_paths.go @@ -0,0 +1,4 @@ +package featureflag + +// PraefectGeneratedReplicaPaths will enable Praefect generated replica paths for new repositories. +var PraefectGeneratedReplicaPaths = NewFeatureFlag("praefect_generated_replica_paths", false) diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 77e83ec5a6..3821db602b 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -488,6 +488,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall route.RepositoryID, virtualStorage, targetRepo, + route.ReplicaPath, route.Primary.Storage, nil, append(routerNodesToStorages(route.Secondaries), route.ReplicationTargets...), @@ -835,7 +836,7 @@ func (c *Coordinator) createTransactionFinalizer( } return c.newRequestFinalizer( - ctx, route.RepositoryID, virtualStorage, targetRepo, route.Primary.Storage, + ctx, route.RepositoryID, virtualStorage, targetRepo, route.ReplicaPath, route.Primary.Storage, updated, outdated, change, params, cause)() } } @@ -992,6 +993,7 @@ func (c *Coordinator) newRequestFinalizer( repositoryID int64, virtualStorage string, targetRepo *gitalypb.Repository, + replicaPath string, primary string, updatedSecondaries []string, outdatedSecondaries []string, @@ -1048,7 +1050,7 @@ func (c *Coordinator) newRequestFinalizer( repositoryID, virtualStorage, targetRepo.GetRelativePath(), - targetRepo.GetRelativePath(), + replicaPath, primary, updatedSecondaries, outdatedSecondaries, diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index fce4d5e5be..e0927ae693 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -2673,6 +2673,7 @@ func TestNewRequestFinalizer_contextIsDisjointedFromTheRPC(t *testing.T) { 0, "virtual storage", &gitalypb.Repository{}, + "replica-path", "primary", []string{}, []string{"secondary"}, diff --git a/internal/praefect/router_per_repository.go b/internal/praefect/router_per_repository.go index 6f1b4e7bd8..7067cb36ea 100644 --- a/internal/praefect/router_per_repository.go +++ b/internal/praefect/router_per_repository.go @@ -5,8 +5,11 @@ import ( "errors" "fmt" + "gitlab.com/gitlab-org/gitaly/v14/internal/git/housekeeping" + "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/praefectutil" "google.golang.org/grpc" ) @@ -307,11 +310,19 @@ func (r *PerRepositoryRouter) RouteRepositoryCreation(ctx context.Context, virtu return RepositoryMutatorRoute{}, fmt.Errorf("reserve repository id: %w", err) } + replicaPath := relativePath + if featureflag.PraefectGeneratedReplicaPaths.IsEnabled(ctx) { + replicaPath = praefectutil.DeriveReplicaPath(id) + if housekeeping.IsRailsPoolPath(relativePath) { + replicaPath = praefectutil.DerivePoolPath(id) + } + } + replicationFactor := r.defaultReplicationFactors[virtualStorage] if replicationFactor == 1 { return RepositoryMutatorRoute{ RepositoryID: id, - ReplicaPath: relativePath, + ReplicaPath: replicaPath, Primary: primary, }, nil } @@ -360,7 +371,7 @@ func (r *PerRepositoryRouter) RouteRepositoryCreation(ctx context.Context, virtu return RepositoryMutatorRoute{ RepositoryID: id, - ReplicaPath: relativePath, + ReplicaPath: replicaPath, AdditionalReplicaPath: additionalReplicaPath, Primary: primary, Secondaries: secondaries, diff --git a/internal/praefect/router_per_repository_test.go b/internal/praefect/router_per_repository_test.go index 44d8b10ce9..6e620ffe26 100644 --- a/internal/praefect/router_per_repository_test.go +++ b/internal/praefect/router_per_repository_test.go @@ -8,9 +8,11 @@ import ( "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/praefectutil" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb" "google.golang.org/grpc" @@ -662,6 +664,10 @@ func TestPerRepositoryRouter_RouteRepositoryMaintenance(t *testing.T) { } func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { + testhelper.NewFeatureSets(featureflag.PraefectGeneratedReplicaPaths).Run(t, testPerRepositoryRouterRouteRepositoryCreation) +} + +func testPerRepositoryRouterRouteRepositoryCreation(t *testing.T, ctx context.Context) { t.Parallel() configuredNodes := map[string][]string{ @@ -692,6 +698,11 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { additionalReplicaPath = "additional-replica-path" ) + replicaPath := relativePath + if featureflag.PraefectGeneratedReplicaPaths.IsEnabled(ctx) { + replicaPath = praefectutil.DeriveReplicaPath(1) + } + for _, tc := range []struct { desc string virtualStorage string @@ -726,7 +737,7 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { matchRoute: requireOneOf( RepositoryMutatorRoute{ RepositoryID: 1, - ReplicaPath: relativePath, + ReplicaPath: replicaPath, AdditionalReplicaPath: additionalReplicaPath, Primary: RouterNode{Storage: "primary", Connection: primaryConn}, ReplicationTargets: []string{"secondary-1", "secondary-2"}, @@ -742,7 +753,7 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { matchRoute: requireOneOf( RepositoryMutatorRoute{ RepositoryID: 1, - ReplicaPath: relativePath, + ReplicaPath: replicaPath, Primary: RouterNode{Storage: "primary", Connection: primaryConn}, Secondaries: []RouterNode{ {Storage: "secondary-1", Connection: secondary1Conn}, @@ -760,7 +771,7 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { matchRoute: requireOneOf( RepositoryMutatorRoute{ RepositoryID: 1, - ReplicaPath: relativePath, + ReplicaPath: replicaPath, Primary: RouterNode{Storage: "primary", Connection: primaryConn}, Secondaries: []RouterNode{ {Storage: "secondary-1", Connection: secondary1Conn}, @@ -779,7 +790,7 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { matchRoute: requireOneOf( RepositoryMutatorRoute{ RepositoryID: 1, - ReplicaPath: relativePath, + ReplicaPath: replicaPath, Primary: RouterNode{Storage: "primary", Connection: primaryConn}, }, ), @@ -795,13 +806,13 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { matchRoute: requireOneOf( RepositoryMutatorRoute{ RepositoryID: 1, - ReplicaPath: relativePath, + ReplicaPath: replicaPath, Primary: RouterNode{Storage: "primary", Connection: primaryConn}, Secondaries: []RouterNode{{Storage: "secondary-1", Connection: secondary1Conn}}, }, RepositoryMutatorRoute{ RepositoryID: 1, - ReplicaPath: relativePath, + ReplicaPath: replicaPath, Primary: RouterNode{Storage: "primary", Connection: primaryConn}, Secondaries: []RouterNode{{Storage: "secondary-2", Connection: secondary1Conn}}, }, @@ -818,7 +829,7 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { matchRoute: requireOneOf( RepositoryMutatorRoute{ RepositoryID: 1, - ReplicaPath: relativePath, + ReplicaPath: replicaPath, Primary: RouterNode{Storage: "primary", Connection: primaryConn}, Secondaries: []RouterNode{{Storage: "secondary-1", Connection: secondary1Conn}}, ReplicationTargets: []string{"secondary-2"}, @@ -848,14 +859,12 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { }, } { t.Run(tc.desc, func(t *testing.T) { - ctx := testhelper.Context(t) - db.TruncateAll(t) rs := datastore.NewPostgresRepositoryStore(db, nil) if tc.repositoryExists { require.NoError(t, - rs.CreateRepository(ctx, 1, "virtual-storage-1", relativePath, relativePath, "primary", nil, nil, true, true), + rs.CreateRepository(ctx, 1, "virtual-storage-1", relativePath, replicaPath, "primary", nil, nil, true, true), ) } diff --git a/internal/testhelper/testhelper.go b/internal/testhelper/testhelper.go index 281dbc5b89..f3a917c709 100644 --- a/internal/testhelper/testhelper.go +++ b/internal/testhelper/testhelper.go @@ -180,6 +180,9 @@ func ContextWithoutCancel(opts ...ContextOpt) context.Context { // Randomly inject the Git flag so that we have coverage of tests with both old and new Git // version by pure chance. ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.GitV2361Gl1, rnd.Int()%2 == 0) + // PraefectGeneratedReplicaPaths affects many tests as it changes the repository creation logic. + // Randomly enable the flag to exercise both paths to some extent. + ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.PraefectGeneratedReplicaPaths, rnd.Int()%2 == 0) for _, opt := range opts { ctx = opt(ctx) -- GitLab From 5553cc8f515bbcb0e7414710387f330074f1e27d Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Fri, 19 Nov 2021 02:25:21 +0200 Subject: [PATCH 6/7] Use Praefect's RemoveRepository from remove-repository subcommand The RemoveRepository subcommand is working around the lack of atomicity in Praefect's repository removals. It manually cleaned up database state and issued deletes Gitalys to ensure deletes go through even if there is no metadata for the repository. This ensured there would be no conflicts when recreating a repository after running the command. These workarounds are no longer necessary as Praefect is now handling deletions. As soon as the repository's metadata is deleted, it stops existing from Praefects point of view. New repository created in the same virtual storage and relative path lands in a different directory on the Gitalys. As such, we can simplify the command and simply call Praefect's RemoveRepository to delete the repository's metadata and to delete the known replicas. The command was changed to error if the repository does not exist as it can't do anything if so. Likewise, the output was changed to not mention the nodes the repository exists on as it's mostly irrelevant. The deletion may succeed as far as the user cares but some replicas may still be left on the disk if Praefect was unable to remove them. Changelog: changed --- cmd/praefect/subcmd_remove_repository.go | 154 ++---------------- cmd/praefect/subcmd_remove_repository_test.go | 56 +++---- cmd/praefect/subcmd_track_repository_test.go | 15 +- 3 files changed, 38 insertions(+), 187 deletions(-) diff --git a/cmd/praefect/subcmd_remove_repository.go b/cmd/praefect/subcmd_remove_repository.go index 49b049615c..5298c713d2 100644 --- a/cmd/praefect/subcmd_remove_repository.go +++ b/cmd/praefect/subcmd_remove_repository.go @@ -7,7 +7,6 @@ import ( "flag" "fmt" "io" - "strings" "sync" "time" @@ -19,7 +18,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "gitlab.com/gitlab-org/labkit/correlation" "google.golang.org/grpc/metadata" - "google.golang.org/grpc/status" ) const ( @@ -61,15 +59,11 @@ func (cmd *removeRepository) FlagSet() *flag.FlagSet { " This command removes all state associated with a given repository from the Gitaly Cluster.\n" + " This includes both on-disk repositories on all relevant Gitaly nodes as well as any potential\n" + " database state as tracked by Praefect.\n" + - " Runs in dry-run mode by default and lists the gitaly nodes from which the repository will be removed from " + + " Runs in dry-run mode by default checks whether the repository exists" + " without actually removing it from the database and disk.\n" + " When -apply is used, the repository will be removed from the database as well as\n" + " the individual gitaly nodes on which they exist.\n") fs.PrintDefaults() - printfErr("NOTE:\n" + - " It may happen that parts of the repository continue to exist after this command, either because\n" + - " of an error that happened during deletion or because of in-flight RPC calls targeting the repository.\n" + - " It is safe and recommended to re-run this command in such a case.\n") } return fs } @@ -116,21 +110,10 @@ func (cmd *removeRepository) exec(ctx context.Context, logger logrus.FieldLogger if exists { fmt.Fprintln(cmd.w, "Repository found in the database.") } else { - fmt.Fprintln(cmd.w, "Repository is not being tracked in Praefect.") + return errors.New("repository is not being tracked in Praefect") } } - storagesOnDisk, err := cmd.getStoragesFromNodes(ctx, cfg) - if err != nil { - return err - } - - if len(storagesOnDisk) == 0 { - fmt.Fprintln(cmd.w, "Repository not found on any gitaly nodes.") - } else { - fmt.Fprintf(cmd.w, "Repository found on the following gitaly nodes: %s.\n", strings.Join(storagesOnDisk, ", ")) - } - if !cmd.apply { fmt.Fprintln(cmd.w, "Re-run the command with -apply to remove repositories from the database and disk.") return nil @@ -138,18 +121,21 @@ func (cmd *removeRepository) exec(ctx context.Context, logger logrus.FieldLogger fmt.Fprintf(cmd.w, "Attempting to remove %s from the database, and delete it from all gitaly nodes...\n", cmd.relativePath) - fmt.Fprintln(cmd.w, "Removing repository from the database...") - removed, err := cmd.removeRepositoryFromDatabase(ctx, db) + addr, err := getNodeAddress(cfg) if err != nil { - return fmt.Errorf("remove repository info from praefect database: %w", err) + return fmt.Errorf("get node address: %w", err) } - if !removed { - fmt.Fprintln(cmd.w, "The database has no information about this repository.") - } else { - fmt.Fprintln(cmd.w, "Removed repository metadata from the database.") + _, err = cmd.removeRepository(ctx, &gitalypb.Repository{ + StorageName: cmd.virtualStorage, + RelativePath: cmd.relativePath, + }, addr, cfg.Auth.Token) + if err != nil { + return fmt.Errorf("repository removal failed: %w", err) } + fmt.Fprintln(cmd.w, "Repository removal completed.") + fmt.Fprintln(cmd.w, "Removing replication events...") ticker := helper.NewTimerTicker(time.Second) defer ticker.Stop() @@ -157,80 +143,9 @@ func (cmd *removeRepository) exec(ctx context.Context, logger logrus.FieldLogger return fmt.Errorf("remove scheduled replication events: %w", err) } fmt.Fprintln(cmd.w, "Replication event removal completed.") - - // We should try to remove repository from each of gitaly nodes. - fmt.Fprintln(cmd.w, "Removing repository directly from gitaly nodes...") - cmd.removeRepositoryForEachGitaly(ctx, cfg, logger) - fmt.Fprintln(cmd.w, "Finished removing repository from gitaly nodes.") - return nil } -// getStoragesFromNodes looks on disk to finds the storages this repository exists for -func (cmd *removeRepository) getStoragesFromNodes(ctx context.Context, cfg config.Config) ([]string, error) { - var storages []string - for _, vs := range cfg.VirtualStorages { - if vs.Name != cmd.virtualStorage { - continue - } - - storages = make([]string, len(vs.Nodes)) - var wg sync.WaitGroup - - for i := 0; i < len(vs.Nodes); i++ { - wg.Add(1) - go func(node *config.Node, i int) { - defer wg.Done() - repo := &gitalypb.Repository{ - StorageName: node.Storage, - RelativePath: cmd.relativePath, - } - exists, err := repositoryExists(ctx, repo, node.Address, node.Token) - if err != nil { - cmd.logger.WithError(err).Errorf("checking if repository exists on %q", node.Storage) - } - if exists { - storages[i] = node.Storage - } - }(vs.Nodes[i], i) - } - - wg.Wait() - break - } - - var storagesFound []string - for _, storage := range storages { - if storage != "" { - storagesFound = append(storagesFound, storage) - } - } - - return storagesFound, nil -} - -func (cmd *removeRepository) removeRepositoryFromDatabase(ctx context.Context, db *sql.DB) (bool, error) { - var removed bool - if err := db.QueryRowContext( - ctx, - `WITH remove_storages_info AS ( - DELETE FROM storage_repositories - WHERE virtual_storage = $1 AND relative_path = $2 - ) - DELETE FROM repositories - WHERE virtual_storage = $1 AND relative_path = $2 - RETURNING TRUE`, - cmd.virtualStorage, - cmd.relativePath, - ).Scan(&removed); err != nil { - if errors.Is(err, sql.ErrNoRows) { - return false, nil - } - return false, fmt.Errorf("query row: %w", err) - } - return removed, nil -} - func (cmd *removeRepository) removeRepository(ctx context.Context, repo *gitalypb.Repository, addr, token string) (bool, error) { conn, err := subCmdDial(ctx, addr, token, cmd.dialTimeout) if err != nil { @@ -241,10 +156,7 @@ func (cmd *removeRepository) removeRepository(ctx context.Context, repo *gitalyp ctx = metadata.AppendToOutgoingContext(ctx, "client_name", removeRepositoryCmdName) repositoryClient := gitalypb.NewRepositoryServiceClient(conn) if _, err := repositoryClient.RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{Repository: repo}); err != nil { - if _, ok := status.FromError(err); !ok { - return false, fmt.Errorf("RemoveRepository: %w", err) - } - return false, nil + return false, err } return true, nil } @@ -291,43 +203,3 @@ func (cmd *removeRepository) removeReplicationEvents(ctx context.Context, logger } return nil } - -func (cmd *removeRepository) removeNode( - ctx context.Context, - logger logrus.FieldLogger, - node *config.Node, -) { - logger.Debugf("remove repository with gitaly %q at %q", node.Storage, node.Address) - repo := &gitalypb.Repository{ - StorageName: node.Storage, - RelativePath: cmd.relativePath, - } - removed, err := cmd.removeRepository(ctx, repo, node.Address, node.Token) - if err != nil { - logger.WithError(err).Warnf("repository removal failed for %q", node.Storage) - } else { - if removed { - fmt.Fprintf(cmd.w, "Successfully removed %s from %s\n", cmd.relativePath, node.Storage) - } else { - fmt.Fprintf(cmd.w, "Did not remove %s from %s\n", cmd.relativePath, node.Storage) - } - } - logger.Debugf("repository removal call to gitaly %q completed", node.Storage) -} - -func (cmd *removeRepository) removeRepositoryForEachGitaly(ctx context.Context, cfg config.Config, logger logrus.FieldLogger) { - for _, vs := range cfg.VirtualStorages { - if vs.Name == cmd.virtualStorage { - var wg sync.WaitGroup - for i := 0; i < len(vs.Nodes); i++ { - wg.Add(1) - go func(i int) { - defer wg.Done() - cmd.removeNode(ctx, logger, vs.Nodes[i]) - }(i) - } - wg.Wait() - break - } - } -} diff --git a/cmd/praefect/subcmd_remove_repository_test.go b/cmd/praefect/subcmd_remove_repository_test.go index 040dd40e53..f00be8d1d6 100644 --- a/cmd/praefect/subcmd_remove_repository_test.go +++ b/cmd/praefect/subcmd_remove_repository_test.go @@ -6,15 +6,15 @@ import ( "fmt" "os" "path/filepath" - "strings" "testing" "time" "github.com/sirupsen/logrus" - "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/client" + "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest" + gitalycfg "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup" "gitlab.com/gitlab-org/gitaly/v14/internal/helper" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" @@ -107,6 +107,8 @@ func TestRemoveRepository_Exec(t *testing.T) { t.Run("dry run", func(t *testing.T) { var out bytes.Buffer repo := createRepo(t, ctx, repoClient, praefectStorage, t.Name()) + replicaPath := gittest.GetReplicaPath(ctx, t, gitalycfg.Cfg{SocketPath: praefectServer.Address()}, repo) + cmd := &removeRepository{ logger: testhelper.NewDiscardingLogger(t), virtualStorage: repo.StorageName, @@ -117,10 +119,9 @@ func TestRemoveRepository_Exec(t *testing.T) { } require.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf)) - require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, repo.RelativePath)) - require.DirExists(t, filepath.Join(g2Cfg.Storages[0].Path, repo.RelativePath)) + require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, replicaPath)) + require.DirExists(t, filepath.Join(g2Cfg.Storages[0].Path, replicaPath)) assert.Contains(t, out.String(), "Repository found in the database.\n") - assert.Contains(t, out.String(), "Repository found on the following gitaly nodes:") assert.Contains(t, out.String(), "Re-run the command with -apply to remove repositories from the database and disk.\n") repositoryRowExists, err := datastore.NewPostgresRepositoryStore(db, nil).RepositoryExists(ctx, cmd.virtualStorage, cmd.relativePath) @@ -144,10 +145,8 @@ func TestRemoveRepository_Exec(t *testing.T) { require.NoDirExists(t, filepath.Join(g1Cfg.Storages[0].Path, repo.RelativePath)) require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, repo.RelativePath)) assert.Contains(t, out.String(), "Repository found in the database.\n") - assert.Contains(t, out.String(), "Repository found on the following gitaly nodes:") assert.Contains(t, out.String(), fmt.Sprintf("Attempting to remove %s from the database, and delete it from all gitaly nodes...\n", repo.RelativePath)) - assert.Contains(t, out.String(), fmt.Sprintf("Successfully removed %s from", repo.RelativePath)) - assert.NotContains(t, out.String(), fmt.Sprintf("Did not remove %s from", repo.RelativePath)) + assert.Contains(t, out.String(), "Repository removal completed.") var repositoryRowExists bool require.NoError(t, db.QueryRow( @@ -176,9 +175,8 @@ func TestRemoveRepository_Exec(t *testing.T) { require.NoDirExists(t, filepath.Join(g1Cfg.Storages[0].Path, repo.RelativePath)) require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, repo.RelativePath)) assert.Contains(t, out.String(), "Repository found in the database.\n") - assert.Contains(t, out.String(), "Repository found on the following gitaly nodes: gitaly-1") assert.Contains(t, out.String(), fmt.Sprintf("Attempting to remove %s from the database, and delete it from all gitaly nodes...\n", repo.RelativePath)) - assert.Contains(t, out.String(), fmt.Sprintf("Successfully removed %s from gitaly-1", repo.RelativePath)) + assert.Contains(t, out.String(), "Repository removal completed.") var repositoryRowExists bool require.NoError(t, db.QueryRow( @@ -191,8 +189,10 @@ func TestRemoveRepository_Exec(t *testing.T) { t.Run("no info about repository on praefect", func(t *testing.T) { var out bytes.Buffer repo := createRepo(t, ctx, repoClient, praefectStorage, t.Name()) + replicaPath := gittest.GetReplicaPath(ctx, t, gitalycfg.Cfg{SocketPath: praefectServer.Address()}, repo) + repoStore := datastore.NewPostgresRepositoryStore(db.DB, nil) - _, _, err := repoStore.DeleteRepository(ctx, repo.StorageName, repo.RelativePath) + _, _, err = repoStore.DeleteRepository(ctx, repo.StorageName, repo.RelativePath) require.NoError(t, err) cmd := &removeRepository{ @@ -203,12 +203,9 @@ func TestRemoveRepository_Exec(t *testing.T) { apply: true, w: &writer{w: &out}, } - require.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf)) - assert.Contains(t, out.String(), "Repository is not being tracked in Praefect.\n") - assert.Contains(t, out.String(), "Repository found on the following gitaly nodes:") - assert.Contains(t, out.String(), "The database has no information about this repository.\n") - require.NoDirExists(t, filepath.Join(g1Cfg.Storages[0].Path, repo.RelativePath)) - require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, repo.RelativePath)) + require.Error(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf), "repository is not being tracked in Praefect") + require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, replicaPath)) + require.DirExists(t, filepath.Join(g2Cfg.Storages[0].Path, replicaPath)) requireNoDatabaseInfo(t, db, cmd) }) @@ -218,11 +215,12 @@ func TestRemoveRepository_Exec(t *testing.T) { repo := createRepo(t, ctx, repoClient, praefectStorage, t.Name()) g2Srv.Shutdown() - logger := testhelper.NewDiscardingLogger(t) - loggerHook := test.NewLocal(logger) + replicaPath := gittest.GetReplicaPath(ctx, t, gitalycfg.Cfg{SocketPath: praefectServer.Address()}, repo) + require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, replicaPath)) + require.DirExists(t, filepath.Join(g2Cfg.Storages[0].Path, replicaPath)) cmd := &removeRepository{ - logger: logrus.NewEntry(logger), + logger: logrus.NewEntry(testhelper.NewDiscardingLogger(t)), virtualStorage: praefectStorage, relativePath: repo.RelativePath, dialTimeout: 100 * time.Millisecond, @@ -231,22 +229,10 @@ func TestRemoveRepository_Exec(t *testing.T) { } require.NoError(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf)) + assert.Contains(t, out.String(), "Repository removal completed.") - var checkExistsOnNodeErrFound, removeRepoFromDiskErrFound bool - for _, entry := range loggerHook.AllEntries() { - if strings.Contains(entry.Message, `checking if repository exists on "gitaly-2"`) { - checkExistsOnNodeErrFound = true - } - - if strings.Contains(entry.Message, `repository removal failed for "gitaly-2"`) { - removeRepoFromDiskErrFound = true - } - } - require.True(t, checkExistsOnNodeErrFound) - require.True(t, removeRepoFromDiskErrFound) - - require.NoDirExists(t, filepath.Join(g1Cfg.Storages[0].Path, repo.RelativePath)) - require.DirExists(t, filepath.Join(g2Cfg.Storages[0].Path, repo.RelativePath)) + require.NoDirExists(t, filepath.Join(g1Cfg.Storages[0].Path, replicaPath)) + require.DirExists(t, filepath.Join(g2Cfg.Storages[0].Path, replicaPath)) requireNoDatabaseInfo(t, db, cmd) }) diff --git a/cmd/praefect/subcmd_track_repository_test.go b/cmd/praefect/subcmd_track_repository_test.go index 2e3c807bfe..3e8750f42a 100644 --- a/cmd/praefect/subcmd_track_repository_test.go +++ b/cmd/praefect/subcmd_track_repository_test.go @@ -3,7 +3,6 @@ package main import ( "bytes" "flag" - "io" "path/filepath" "testing" "time" @@ -170,15 +169,9 @@ func TestAddRepository_Exec(t *testing.T) { defer nodeMgr.Stop() repoDS := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) - - rmRepoCmd := &removeRepository{ - logger: logger, - virtualStorage: virtualStorageName, - relativePath: tc.relativePath, - w: io.Discard, - apply: true, - } - require.NoError(t, rmRepoCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf)) + exists, err := repoDS.RepositoryExists(ctx, virtualStorageName, tc.relativePath) + require.NoError(t, err) + require.False(t, exists) // create the repo on Gitaly without Praefect knowing require.NoError(t, createRepoThroughGitaly1(tc.relativePath)) @@ -207,7 +200,7 @@ func TestAddRepository_Exec(t *testing.T) { assert.Contains(t, assignments, g1Cfg.Storages[0].Name) assert.Contains(t, assignments, g2Cfg.Storages[0].Name) - exists, err := repoDS.RepositoryExists(ctx, virtualStorageName, tc.relativePath) + exists, err = repoDS.RepositoryExists(ctx, virtualStorageName, tc.relativePath) require.NoError(t, err) assert.True(t, exists) assert.Contains(t, stdout.String(), tc.expectedOutput) -- GitLab From 9c19ad66f4b691f47bf5fa900062b45bec36cd08 Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Thu, 10 Feb 2022 14:40:32 +0200 Subject: [PATCH 7/7] Replace custom db assertions with a call to RepositoryExists TestRemoveRepository has some custom assertions to check whether DB records exist or not. This commit replaces those assertions by a call to RepositoryExists. RepositoryExists also checks in the database whether the repository exists or not, so this just replaces custom assertions by more use of production code. --- cmd/praefect/subcmd_remove_repository_test.go | 51 +++++-------------- 1 file changed, 13 insertions(+), 38 deletions(-) diff --git a/cmd/praefect/subcmd_remove_repository_test.go b/cmd/praefect/subcmd_remove_repository_test.go index f00be8d1d6..1831d131bc 100644 --- a/cmd/praefect/subcmd_remove_repository_test.go +++ b/cmd/praefect/subcmd_remove_repository_test.go @@ -104,6 +104,14 @@ func TestRemoveRepository_Exec(t *testing.T) { praefectStorage := conf.VirtualStorages[0].Name + repositoryExists := func(t testing.TB, repo *gitalypb.Repository) bool { + response, err := gitalypb.NewRepositoryServiceClient(cc).RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{ + Repository: repo, + }) + require.NoError(t, err) + return response.GetExists() + } + t.Run("dry run", func(t *testing.T) { var out bytes.Buffer repo := createRepo(t, ctx, repoClient, praefectStorage, t.Name()) @@ -123,10 +131,7 @@ func TestRemoveRepository_Exec(t *testing.T) { require.DirExists(t, filepath.Join(g2Cfg.Storages[0].Path, replicaPath)) assert.Contains(t, out.String(), "Repository found in the database.\n") assert.Contains(t, out.String(), "Re-run the command with -apply to remove repositories from the database and disk.\n") - - repositoryRowExists, err := datastore.NewPostgresRepositoryStore(db, nil).RepositoryExists(ctx, cmd.virtualStorage, cmd.relativePath) - require.NoError(t, err) - require.True(t, repositoryRowExists) + require.True(t, repositoryExists(t, repo)) }) t.Run("ok", func(t *testing.T) { @@ -147,13 +152,7 @@ func TestRemoveRepository_Exec(t *testing.T) { assert.Contains(t, out.String(), "Repository found in the database.\n") assert.Contains(t, out.String(), fmt.Sprintf("Attempting to remove %s from the database, and delete it from all gitaly nodes...\n", repo.RelativePath)) assert.Contains(t, out.String(), "Repository removal completed.") - - var repositoryRowExists bool - require.NoError(t, db.QueryRow( - `SELECT EXISTS(SELECT FROM repositories WHERE virtual_storage = $1 AND relative_path = $2)`, - cmd.virtualStorage, cmd.relativePath, - ).Scan(&repositoryRowExists)) - require.False(t, repositoryRowExists) + require.False(t, repositoryExists(t, repo)) }) t.Run("repository doesnt exist on one gitaly", func(t *testing.T) { @@ -177,13 +176,7 @@ func TestRemoveRepository_Exec(t *testing.T) { assert.Contains(t, out.String(), "Repository found in the database.\n") assert.Contains(t, out.String(), fmt.Sprintf("Attempting to remove %s from the database, and delete it from all gitaly nodes...\n", repo.RelativePath)) assert.Contains(t, out.String(), "Repository removal completed.") - - var repositoryRowExists bool - require.NoError(t, db.QueryRow( - `SELECT EXISTS(SELECT FROM repositories WHERE virtual_storage = $1 AND relative_path = $2)`, - cmd.virtualStorage, cmd.relativePath, - ).Scan(&repositoryRowExists)) - require.False(t, repositoryRowExists) + require.False(t, repositoryExists(t, repo)) }) t.Run("no info about repository on praefect", func(t *testing.T) { @@ -206,8 +199,7 @@ func TestRemoveRepository_Exec(t *testing.T) { require.Error(t, cmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf), "repository is not being tracked in Praefect") require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, replicaPath)) require.DirExists(t, filepath.Join(g2Cfg.Storages[0].Path, replicaPath)) - - requireNoDatabaseInfo(t, db, cmd) + require.False(t, repositoryExists(t, repo)) }) t.Run("one of gitalies is out of service", func(t *testing.T) { @@ -233,27 +225,10 @@ func TestRemoveRepository_Exec(t *testing.T) { require.NoDirExists(t, filepath.Join(g1Cfg.Storages[0].Path, replicaPath)) require.DirExists(t, filepath.Join(g2Cfg.Storages[0].Path, replicaPath)) - - requireNoDatabaseInfo(t, db, cmd) + require.False(t, repositoryExists(t, repo)) }) } -func requireNoDatabaseInfo(t *testing.T, db testdb.DB, cmd *removeRepository) { - t.Helper() - var repositoryRowExists bool - require.NoError(t, db.QueryRow( - `SELECT EXISTS(SELECT FROM repositories WHERE virtual_storage = $1 AND relative_path = $2)`, - cmd.virtualStorage, cmd.relativePath, - ).Scan(&repositoryRowExists)) - require.False(t, repositoryRowExists) - var storageRowExists bool - require.NoError(t, db.QueryRow( - `SELECT EXISTS(SELECT FROM storage_repositories WHERE virtual_storage = $1 AND relative_path = $2)`, - cmd.virtualStorage, cmd.relativePath, - ).Scan(&storageRowExists)) - require.False(t, storageRowExists) -} - func TestRemoveRepository_removeReplicationEvents(t *testing.T) { t.Parallel() const ( -- GitLab