diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index e67c17d4c49b2bcf930be8dc0b7f392725120819..ed908551c9d1a0490dd24c0c6c9160a03721d3bf 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -312,6 +312,7 @@ func run(cfgs []starter.Config, conf config.Config) error { praefect.NewLockedRandom(rand.New(rand.NewSource(time.Now().UnixNano()))), csg, assignmentStore, + rs, conf.DefaultReplicationFactors(), ) } else { diff --git a/cmd/praefect/subcmd_accept_dataloss_test.go b/cmd/praefect/subcmd_accept_dataloss_test.go index cb3e9bf2b09b108817933fcefdb34ed0fa3db4e8..2945b43f1665d83111d15b6e3595be9554bc5be8 100644 --- a/cmd/praefect/subcmd_accept_dataloss_test.go +++ b/cmd/praefect/subcmd_accept_dataloss_test.go @@ -47,7 +47,7 @@ func TestAcceptDatalossSubcommand(t *testing.T) { if !repoCreated { repoCreated = true - require.NoError(t, rs.CreateRepository(ctx, vs, repo, storage, nil, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, storage, nil, nil, false, false)) } require.NoError(t, rs.SetGeneration(ctx, vs, repo, storage, generation)) diff --git a/cmd/praefect/subcmd_set_replication_factor_test.go b/cmd/praefect/subcmd_set_replication_factor_test.go index 5b0d07605518fd1555799a69980aba106a8162b1..197045a55bf521683c2af989a2757f9444a14ae3 100644 --- a/cmd/praefect/subcmd_set_replication_factor_test.go +++ b/cmd/praefect/subcmd_set_replication_factor_test.go @@ -92,7 +92,7 @@ func TestSetReplicationFactorSubcommand(t *testing.T) { // create a repository record require.NoError(t, - datastore.NewPostgresRepositoryStore(db, nil).CreateRepository(ctx, "virtual-storage", "relative-path", "primary", nil, nil, false, false), + datastore.NewPostgresRepositoryStore(db, nil).CreateRepository(ctx, 1, "virtual-storage", "relative-path", "primary", nil, nil, false, false), ) ln, clean := listenAndServe(t, []svcRegistrar{registerPraefectInfoServer( diff --git a/internal/helper/error.go b/internal/helper/error.go index 10551c0139aef2cc7fca4315cb769a6da4c77826..6075866aed288f7f60bb899e0aa43c072dea89a5 100644 --- a/internal/helper/error.go +++ b/internal/helper/error.go @@ -45,6 +45,9 @@ func ErrUnavailable(err error) error { return wrapError(codes.Unavailable, err) // ErrPermissionDenied wraps err with codes.PermissionDenied, unless err is already a gRPC error. func ErrPermissionDenied(err error) error { return wrapError(codes.PermissionDenied, err) } +// ErrAlreadyExists wraps err with codes.AlreadyExists, unless err is already a gRPC error. +func ErrAlreadyExists(err error) error { return wrapError(codes.AlreadyExists, err) } + // wrapError wraps the given error with the error code unless it's already a gRPC error. If given // nil it will return nil. func wrapError(code codes.Code, err error) error { diff --git a/internal/helper/error_test.go b/internal/helper/error_test.go index 65f7bab91361c60e4535decc7a36c330c6d8cd0d..ad07e90d2cd99a2d45c20677899535d27247a061 100644 --- a/internal/helper/error_test.go +++ b/internal/helper/error_test.go @@ -55,6 +55,11 @@ func TestError(t *testing.T) { errorf: ErrUnavailable, code: codes.Unavailable, }, + { + desc: "AlreadyExists", + errorf: ErrAlreadyExists, + code: codes.AlreadyExists, + }, } { t.Run(tc.desc, func(t *testing.T) { // tc.code and our canary test code must not diff --git a/internal/praefect/commonerr/error.go b/internal/praefect/commonerr/error.go index 6289f28211840b28bd98dfe8c180c05161166980..4ad2f6c800f63d105fed5172c93ab2ad64f7a4db 100644 --- a/internal/praefect/commonerr/error.go +++ b/internal/praefect/commonerr/error.go @@ -3,7 +3,10 @@ // due to cyclic imports. package commonerr -import "fmt" +import ( + "errors" + "fmt" +) // RepositoryNotFoundError is returned when attempting to operate on a repository // that does not exist in the virtual storage. @@ -21,3 +24,6 @@ func NewRepositoryNotFoundError(virtualStorage string, relativePath string) erro func (err RepositoryNotFoundError) Error() string { return fmt.Sprintf("repository %q/%q not found", err.virtualStorage, err.relativePath) } + +// ErrRepositoryAlreadyExists is returned when attempting to create a repository that already exists. +var ErrRepositoryAlreadyExists = errors.New("repository already exists") diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index c77b48eccb0cbecb2db2365ebd87f775c7ed559a..218d5fbb3c91321b3081508154eaf3eee1e7d247 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -333,8 +333,13 @@ func (c *Coordinator) directRepositoryScopedMessage(ctx context.Context, call gr // the repositories are created directly on the filesystem. There is no call for the // CreateRepository that creates records in the database that is why we do it artificially // before redirecting the calls. - if err := c.rs.CreateRepository(ctx, call.targetRepo.StorageName, call.targetRepo.RelativePath, call.targetRepo.StorageName, nil, nil, true, true); err != nil { - if !errors.Is(err, datastore.RepositoryExistsError{}) { + id, err := c.rs.ReserveRepositoryID(ctx, call.targetRepo.StorageName, call.targetRepo.RelativePath) + if err != nil { + if !errors.Is(err, commonerr.ErrRepositoryAlreadyExists) { + return nil, err + } + } else { + if err := c.rs.CreateRepository(ctx, id, call.targetRepo.StorageName, call.targetRepo.RelativePath, call.targetRepo.StorageName, nil, nil, true, true); err != nil { return nil, err } } @@ -456,7 +461,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall var route RepositoryMutatorRoute switch change { case datastore.CreateRepo: - route, err = c.router.RouteRepositoryCreation(ctx, virtualStorage) + route, err = c.router.RouteRepositoryCreation(ctx, virtualStorage, targetRepo.RelativePath) if err != nil { return nil, fmt.Errorf("route repository creation: %w", err) } @@ -552,6 +557,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall finalizers = append(finalizers, c.newRequestFinalizer( ctx, + route.RepositoryID, virtualStorage, targetRepo, route.Primary.Storage, @@ -627,6 +633,11 @@ func (c *Coordinator) StreamDirector(ctx context.Context, fullMethodName string, if errors.Is(err, nodes.ErrVirtualStorageNotExist) { return nil, helper.ErrInvalidArgument(err) } + + if errors.Is(err, commonerr.ErrRepositoryAlreadyExists) { + return nil, helper.ErrAlreadyExists(err) + } + return nil, err } return sp, nil @@ -784,7 +795,7 @@ func (c *Coordinator) createTransactionFinalizer( } return c.newRequestFinalizer( - ctx, virtualStorage, targetRepo, route.Primary.Storage, + ctx, route.RepositoryID, virtualStorage, targetRepo, route.Primary.Storage, updated, outdated, change, params, cause)() } } @@ -916,6 +927,7 @@ func routerNodesToStorages(nodes []RouterNode) []string { func (c *Coordinator) newRequestFinalizer( ctx context.Context, + repositoryID int64, virtualStorage string, targetRepo *gitalypb.Repository, primary string, @@ -982,6 +994,7 @@ func (c *Coordinator) newRequestFinalizer( c.conf.DefaultReplicationFactors()[virtualStorage] > 0 if err := c.rs.CreateRepository(ctx, + repositoryID, virtualStorage, targetRepo.GetRelativePath(), primary, @@ -1005,6 +1018,7 @@ func (c *Coordinator) newRequestFinalizer( for _, secondary := range outdatedSecondaries { event := datastore.ReplicationEvent{ Job: datastore.ReplicationJob{ + RepositoryID: repositoryID, Change: change, RelativePath: targetRepo.GetRelativePath(), VirtualStorage: virtualStorage, diff --git a/internal/praefect/coordinator_pg_test.go b/internal/praefect/coordinator_pg_test.go index 3d887b23a9df0e2230ae77e0a211b9c7137f3bbd..a27f92820870262c312084cdfcc92c9a487e4256 100644 --- a/internal/praefect/coordinator_pg_test.go +++ b/internal/praefect/coordinator_pg_test.go @@ -217,7 +217,7 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { if !repoCreated { repoCreated = true - require.NoError(t, rs.CreateRepository(ctx, repo.StorageName, repo.RelativePath, storageNodes[i].Storage, nil, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, 1, repo.StorageName, repo.RelativePath, storageNodes[i].Storage, nil, nil, false, false)) } require.NoError(t, rs.SetGeneration(ctx, repo.StorageName, repo.RelativePath, storageNodes[i].Storage, n.generation)) diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 0c24863b2827a2dba224bfce4ad10cec65e60214..91fd9f42d8a188e232ffbac395e786c852272080 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -803,8 +803,9 @@ func TestStreamDirector_repo_creation(t *testing.T) { var createRepositoryCalled int64 rs := datastore.MockRepositoryStore{ - CreateRepositoryFunc: func(ctx context.Context, virtualStorage, relativePath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error { + CreateRepositoryFunc: func(ctx context.Context, repoID int64, virtualStorage, relativePath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error { atomic.AddInt64(&createRepositoryCalled, 1) + assert.Equal(t, int64(0), repoID) assert.Equal(t, targetRepo.StorageName, virtualStorage) assert.Equal(t, targetRepo.RelativePath, relativePath) assert.Equal(t, rewrittenStorage, primary) @@ -873,6 +874,7 @@ func TestStreamDirector_repo_creation(t *testing.T) { }, nil, nil, + rs, conf.DefaultReplicationFactors(), ) default: @@ -2068,7 +2070,7 @@ func TestNewRequestFinalizer_contextIsDisjointedFromTheRPC(t *testing.T) { requireSuppressedCancellation(t, ctx) return err }, - CreateRepositoryFunc: func(ctx context.Context, _, _, _ string, _, _ []string, _, _ bool) error { + CreateRepositoryFunc: func(ctx context.Context, _ int64, _, _, _ string, _, _ []string, _, _ bool) error { requireSuppressedCancellation(t, ctx) return err }, @@ -2079,6 +2081,7 @@ func TestNewRequestFinalizer_contextIsDisjointedFromTheRPC(t *testing.T) { nil, ).newRequestFinalizer( ctx, + 0, "virtual storage", &gitalypb.Repository{}, "primary", diff --git a/internal/praefect/datastore/assignment.go b/internal/praefect/datastore/assignment.go index b097272c46ec0b674feb9bba60bd44444d168648..7fc48eedcba0659d4ec85b8de9a2f50e54826228 100644 --- a/internal/praefect/datastore/assignment.go +++ b/internal/praefect/datastore/assignment.go @@ -123,7 +123,7 @@ func (s AssignmentStore) SetReplicationFactor(ctx context.Context, virtualStorag // current assignments. rows, err := s.db.QueryContext(ctx, ` WITH repository AS ( - SELECT virtual_storage, relative_path, "primary" + SELECT repository_id, virtual_storage, relative_path, "primary" FROM repositories WHERE virtual_storage = $1 AND relative_path = $2 @@ -139,7 +139,7 @@ existing_assignments AS ( created_assignments AS ( INSERT INTO repository_assignments - SELECT virtual_storage, relative_path, storage + SELECT virtual_storage, relative_path, storage, repository_id FROM repository CROSS JOIN ( SELECT unnest($4::text[]) AS storage ) AS configured_storages WHERE storage NOT IN ( SELECT storage FROM existing_assignments ) diff --git a/internal/praefect/datastore/assignment_test.go b/internal/praefect/datastore/assignment_test.go index c8ac5ad847d1d63fa12d29776fe8872b0d70ac7d..40dcb1c590c75ecb66b3f7c15efe6a3e6f3a1fc7 100644 --- a/internal/praefect/datastore/assignment_test.go +++ b/internal/praefect/datastore/assignment_test.go @@ -3,6 +3,7 @@ package datastore import ( "testing" + "github.com/lib/pq" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" @@ -204,15 +205,15 @@ func TestAssignmentStore_SetReplicationFactor(t *testing.T) { if !tc.nonExistentRepository { _, err := db.ExecContext(ctx, ` - INSERT INTO repositories (virtual_storage, relative_path, "primary") - VALUES ('virtual-storage', 'relative-path', 'primary') + INSERT INTO repositories (virtual_storage, relative_path, "primary", repository_id) + VALUES ('virtual-storage', 'relative-path', 'primary', 1) `) require.NoError(t, err) } for _, storage := range tc.existingAssignments { _, err := db.ExecContext(ctx, ` - INSERT INTO repository_assignments VALUES ('virtual-storage', 'relative-path', $1) + INSERT INTO repository_assignments VALUES ('virtual-storage', 'relative-path', $1, 1) `, storage) require.NoError(t, err) } @@ -230,6 +231,14 @@ func TestAssignmentStore_SetReplicationFactor(t *testing.T) { assignedStorages, err := store.GetHostAssignments(ctx, "virtual-storage", "relative-path") require.NoError(t, err) tc.requireStorages(t, assignedStorages) + + var storagesWithIncorrectRepositoryID pq.StringArray + require.NoError(t, db.QueryRowContext(ctx, ` + SELECT array_agg(storage) + FROM repository_assignments + WHERE COALESCE(repository_id != 1, true) + `).Scan(&storagesWithIncorrectRepositoryID)) + require.Empty(t, storagesWithIncorrectRepositoryID) }) } } diff --git a/internal/praefect/datastore/migrations/20210727085659_repository_ids.go b/internal/praefect/datastore/migrations/20210727085659_repository_ids.go new file mode 100644 index 0000000000000000000000000000000000000000..84bdee938731c2dd546d80ed0109160a890fbdf3 --- /dev/null +++ b/internal/praefect/datastore/migrations/20210727085659_repository_ids.go @@ -0,0 +1,31 @@ +package migrations + +import migrate "github.com/rubenv/sql-migrate" + +func init() { + m := &migrate.Migration{ + Id: "20210727085659_repository_ids", + Up: []string{ + "CREATE UNIQUE INDEX repository_lookup_index ON repositories (virtual_storage, relative_path)", + "ALTER TABLE repository_assignments DROP CONSTRAINT IF EXISTS repository_assignments_virtual_storage_relative_path_fkey", + "ALTER TABLE repository_assignments DROP CONSTRAINT IF EXISTS repository_assignments_virtual_storage_fkey", + "ALTER TABLE repositories DROP CONSTRAINT repositories_pkey", + "ALTER TABLE repository_assignments ADD CONSTRAINT repository_assignments_virtual_storage_relative_path_fkey FOREIGN KEY (virtual_storage, relative_path) REFERENCES repositories (virtual_storage, relative_path) ON UPDATE CASCADE ON DELETE CASCADE", + "ALTER TABLE repositories ADD COLUMN repository_id BIGSERIAL PRIMARY KEY", + "ALTER TABLE repository_assignments ADD COLUMN repository_id BIGINT REFERENCES repositories (repository_id) ON DELETE CASCADE", + "ALTER TABLE storage_repositories ADD COLUMN repository_id BIGINT REFERENCES repositories (repository_id) ON DELETE SET NULL", + }, + Down: []string{ + "ALTER TABLE storage_repositories DROP COLUMN repository_id", + "ALTER TABLE repository_assignments DROP COLUMN repository_id", + "ALTER TABLE repositories DROP COLUMN repository_id", + "CREATE UNIQUE INDEX repositories_pkey ON repositories (virtual_storage, relative_path)", + "ALTER TABLE repositories ADD PRIMARY KEY USING INDEX repositories_pkey", + "ALTER TABLE repository_assignments DROP CONSTRAINT repository_assignments_virtual_storage_relative_path_fkey", + "DROP INDEX repository_lookup_index", + "ALTER TABLE repository_assignments ADD FOREIGN KEY (virtual_storage, relative_path) REFERENCES repositories (virtual_storage, relative_path) ON UPDATE CASCADE ON DELETE CASCADE", + }, + } + + allMigrations = append(allMigrations, m) +} diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go index dcff2567e00c00568ddab15062f40873b72ddefa..72d88f970245c51d2f3cac81c5c884e19b8e6bf7 100644 --- a/internal/praefect/datastore/queue.go +++ b/internal/praefect/datastore/queue.go @@ -49,6 +49,10 @@ func allowToAck(state JobState) error { // ReplicationJob is a persistent representation of the replication job. type ReplicationJob struct { + // RepositoryID is the ID of the repository this job relates to. RepositoryID + // may be 0 if the job doesn't relate to any known repository. This can happen + // for example when the job is deleting an orphaned replica of a deleted repository. + RepositoryID int64 `json:"repository_id"` Change ChangeType `json:"change"` RelativePath string `json:"relative_path"` TargetNodeStorage string `json:"target_node_storage"` diff --git a/internal/praefect/datastore/repository_store.go b/internal/praefect/datastore/repository_store.go index 1819521947e7b56f2441e4e7457ea15ce2fd84a0..d0bc486619baefe43b41c5e4cfdaf09d5aa27238 100644 --- a/internal/praefect/datastore/repository_store.go +++ b/internal/praefect/datastore/repository_store.go @@ -104,7 +104,7 @@ type RepositoryStore interface { // // storeAssignments should be set when variable replication factor is enabled. When set, the primary and the // secondaries are stored as the assigned hosts of the repository. - CreateRepository(ctx context.Context, virtualStorage, relativePath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error + CreateRepository(ctx context.Context, repositoryID int64, virtualStorage, relativePath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error // SetAuthoritativeReplica sets the given replica of a repsitory as the authoritative one by setting its generation as the latest one. SetAuthoritativeReplica(ctx context.Context, virtualStorage, relativePath, storage string) error // DeleteRepository deletes the repository's record from the virtual storage and the storages. Returns @@ -127,6 +127,12 @@ type RepositoryStore interface { // record of the invalid repository. If the storage was the only storage with the repository, the repository's // record on the virtual storage is also deleted. DeleteInvalidRepository(ctx context.Context, virtualStorage, relativePath, storage string) error + // ReserveRepositoryID reserves an ID for a repository that is about to be created and returns it. If a repository already + // exists with the given virtual storage and relative path combination, an error is returned. + ReserveRepositoryID(ctx context.Context, virtualStorage, relativePath string) (int64, error) + // GetRepositoryID gets the ID of the repository identified via the given virtual storage and relative path. Returns a + // RepositoryNotFoundError if the repository doesn't exist. + GetRepositoryID(ctx context.Context, virtualStorage, relativePath string) (int64, error) } // PostgresRepositoryStore is a Postgres implementation of RepositoryStore. @@ -229,13 +235,17 @@ WITH repository AS ( ) INSERT INTO storage_repositories ( + repository_id, virtual_storage, relative_path, storage, generation ) -VALUES ($1, $2, $3, $4) +SELECT + (SELECT repository_id FROM repositories WHERE virtual_storage = $1 AND relative_path = $2), + $1, $2, $3, $4 ON CONFLICT (virtual_storage, relative_path, storage) DO UPDATE SET + repository_id = EXCLUDED.repository_id, generation = EXCLUDED.generation ` @@ -324,26 +334,36 @@ AND storage = ANY($3) return sourceGeneration, nil } -// CreateRepository creates a new repository and assigns it to the given primary and secondary -// nodes. -func (rs *PostgresRepositoryStore) CreateRepository(ctx context.Context, virtualStorage, relativePath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error { +// CreateRepository creates a record for a repository in the specified virtual storage and relative path. +// Primary is the storage the repository was created on. UpdatedSecondaries are secondaries that participated +// and successfully completed the transaction. OutdatedSecondaries are secondaries that were outdated or failed +// the transaction. Returns RepositoryExistsError when trying to create a repository which already exists in the store. +// +// storePrimary should be set when repository specific primaries are enabled. When set, the primary is stored as +// the repository's primary. +// +// storeAssignments should be set when variable replication factor is enabled. When set, the primary and the +// secondaries are stored as the assigned hosts of the repository. +func (rs *PostgresRepositoryStore) CreateRepository(ctx context.Context, repositoryID int64, virtualStorage, relativePath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error { const q = ` WITH repo AS ( INSERT INTO repositories ( + repository_id, virtual_storage, relative_path, generation, "primary" - ) VALUES ($1, $2, 0, CASE WHEN $4 THEN $3 END) + ) VALUES ($8, $1, $2, 0, CASE WHEN $4 THEN $3 END) ), assignments AS ( INSERT INTO repository_assignments ( + repository_id, virtual_storage, relative_path, storage ) - SELECT $1, $2, storage + SELECT $8, $1, $2, storage FROM ( SELECT $3 AS storage UNION @@ -355,12 +375,13 @@ assignments AS ( ) INSERT INTO storage_repositories ( + repository_id, virtual_storage, relative_path, storage, generation ) -SELECT $1, $2, storage, 0 +SELECT $8, $1, $2, storage, 0 FROM ( SELECT $3 AS storage UNION @@ -376,10 +397,15 @@ FROM ( pq.StringArray(updatedSecondaries), pq.StringArray(outdatedSecondaries), storeAssignments, + repositoryID, ) var pqerr *pq.Error if errors.As(err, &pqerr) && pqerr.Code.Name() == "unique_violation" { + if pqerr.Constraint == "repositories_pkey" { + return fmt.Errorf("repository id %d already in use", repositoryID) + } + return RepositoryExistsError{ virtualStorage: virtualStorage, relativePath: relativePath, @@ -691,3 +717,45 @@ ORDER BY relative_path, "primary" return repos, rows.Err() } + +// ReserveRepositoryID reserves an ID for a repository that is about to be created and returns it. If a repository already +// exists with the given virtual storage and relative path combination, an error is returned. +func (rs *PostgresRepositoryStore) ReserveRepositoryID(ctx context.Context, virtualStorage, relativePath string) (int64, error) { + var id int64 + if err := rs.db.QueryRowContext(ctx, ` +SELECT nextval('repositories_repository_id_seq') +WHERE NOT EXISTS ( + SELECT FROM repositories + WHERE virtual_storage = $1 + AND relative_path = $2 +) + `, virtualStorage, relativePath).Scan(&id); err != nil { + if errors.Is(err, sql.ErrNoRows) { + return 0, commonerr.ErrRepositoryAlreadyExists + } + + return 0, fmt.Errorf("scan: %w", err) + } + + return id, nil +} + +// GetRepositoryID gets the ID of the repository identified via the given virtual storage and relative path. Returns a +// RepositoryNotFoundError if the repository doesn't exist. +func (rs *PostgresRepositoryStore) GetRepositoryID(ctx context.Context, virtualStorage, relativePath string) (int64, error) { + var id int64 + if err := rs.db.QueryRowContext(ctx, ` +SELECT repository_id +FROM repositories +WHERE virtual_storage = $1 +AND relative_path = $2 + `, virtualStorage, relativePath).Scan(&id); err != nil { + if errors.Is(err, sql.ErrNoRows) { + return 0, commonerr.NewRepositoryNotFoundError(virtualStorage, relativePath) + } + + return 0, fmt.Errorf("scan: %w", err) + } + + return id, nil +} diff --git a/internal/praefect/datastore/repository_store_mock.go b/internal/praefect/datastore/repository_store_mock.go index 82edb845e7987587c13a85a210b00d8f16ecb7f1..0e3c46c92b4479be85c9c7f4fd89b8de0d385cd6 100644 --- a/internal/praefect/datastore/repository_store_mock.go +++ b/internal/praefect/datastore/repository_store_mock.go @@ -9,7 +9,7 @@ type MockRepositoryStore struct { IncrementGenerationFunc func(ctx context.Context, virtualStorage, relativePath, primary string, secondaries []string) error GetReplicatedGenerationFunc func(ctx context.Context, virtualStorage, relativePath, source, target string) (int, error) SetGenerationFunc func(ctx context.Context, virtualStorage, relativePath, storage string, generation int) error - CreateRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error + CreateRepositoryFunc func(ctx context.Context, repositoryID int64, virtualStorage, relativePath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error SetAuthoritativeReplicaFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error DeleteRepositoryFunc func(ctx context.Context, virtualStorage, relativePath string, storages []string) error DeleteReplicaFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error @@ -18,6 +18,8 @@ type MockRepositoryStore struct { GetPartiallyAvailableRepositoriesFunc func(ctx context.Context, virtualStorage string) ([]PartiallyAvailableRepository, error) DeleteInvalidRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error RepositoryExistsFunc func(ctx context.Context, virtualStorage, relativePath string) (bool, error) + ReserveRepositoryIDFunc func(ctx context.Context, virtualStorage, relativePath string) (int64, error) + GetRepositoryIDFunc func(ctx context.Context, virtualStorage, relativePath string) (int64, error) } func (m MockRepositoryStore) GetGeneration(ctx context.Context, virtualStorage, relativePath, storage string) (int, error) { @@ -53,12 +55,12 @@ func (m MockRepositoryStore) SetGeneration(ctx context.Context, virtualStorage, } // CreateRepository calls the mocked function. If no mock has been provided, it returns a nil error. -func (m MockRepositoryStore) CreateRepository(ctx context.Context, virtualStorage, relativePath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error { +func (m MockRepositoryStore) CreateRepository(ctx context.Context, repositoryID int64, virtualStorage, relativePath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error { if m.CreateRepositoryFunc == nil { return nil } - return m.CreateRepositoryFunc(ctx, virtualStorage, relativePath, primary, updatedSecondaries, outdatedSecondaries, storePrimary, storeAssignments) + return m.CreateRepositoryFunc(ctx, repositoryID, virtualStorage, relativePath, primary, updatedSecondaries, outdatedSecondaries, storePrimary, storeAssignments) } // SetAuthoritativeReplica calls the mocked function. If no mock has been provided, it returns a nil error. @@ -128,3 +130,21 @@ func (m MockRepositoryStore) RepositoryExists(ctx context.Context, virtualStorag return m.RepositoryExistsFunc(ctx, virtualStorage, relativePath) } + +// ReserveRepositoryID returns the result of ReserveRepositoryIDFunc or 0 if it is unset. +func (m MockRepositoryStore) ReserveRepositoryID(ctx context.Context, virtualStorage, relativePath string) (int64, error) { + if m.ReserveRepositoryIDFunc == nil { + return 0, nil + } + + return m.ReserveRepositoryIDFunc(ctx, virtualStorage, relativePath) +} + +// GetRepositoryID returns the result of GetRepositoryIDFunc or 0 if it is unset. +func (m MockRepositoryStore) GetRepositoryID(ctx context.Context, virtualStorage, relativePath string) (int64, error) { + if m.GetRepositoryIDFunc == nil { + return 0, nil + } + + return m.GetRepositoryIDFunc(ctx, virtualStorage, relativePath) +} diff --git a/internal/praefect/datastore/repository_store_test.go b/internal/praefect/datastore/repository_store_test.go index fda77a73f2ce47c85d8028ba3e85e8a826bcd6df..0e12c5d12bad3dbbade4fa2163ea78dfd263dce9 100644 --- a/internal/praefect/datastore/repository_store_test.go +++ b/internal/praefect/datastore/repository_store_test.go @@ -3,6 +3,7 @@ package datastore import ( "context" "database/sql" + "fmt" "testing" "time" @@ -15,17 +16,23 @@ import ( // repositoryRecord represents Praefect's records related to a repository. type repositoryRecord struct { - primary string - assignments []string + repositoryID int64 + primary string + assignments []string } // virtualStorageStates represents the virtual storage's view of which repositories should exist. // It's structured as virtual-storage->relative_path. type virtualStorageState map[string]map[string]repositoryRecord +type replicaRecord struct { + repositoryID int64 + generation int +} + // storageState contains individual storage's repository states. -// It structured as virtual-storage->relative_path->storage->generation. -type storageState map[string]map[string]map[string]int +// It structured as virtual-storage->relative_path->storage->replicaRecord +type storageState map[string]map[string]map[string]replicaRecord type ( requireStateFunc func(t *testing.T, ctx context.Context, vss virtualStorageState, ss storageState) @@ -37,13 +44,13 @@ func requireState(t testing.TB, ctx context.Context, db glsql.Querier, vss virtu requireVirtualStorageState := func(t testing.TB, ctx context.Context, exp virtualStorageState) { rows, err := db.QueryContext(ctx, ` -SELECT virtual_storage, relative_path, "primary", assigned_storages +SELECT repository_id, virtual_storage, relative_path, "primary", assigned_storages FROM repositories LEFT JOIN ( - SELECT virtual_storage, relative_path, array_agg(storage ORDER BY storage) AS assigned_storages + SELECT repository_id, virtual_storage, relative_path, array_agg(storage ORDER BY storage) AS assigned_storages FROM repository_assignments - GROUP BY virtual_storage, relative_path -) AS repository_assignments USING (virtual_storage, relative_path) + GROUP BY repository_id, virtual_storage, relative_path +) AS repository_assignments USING (repository_id, virtual_storage, relative_path) `) require.NoError(t, err) @@ -52,18 +59,20 @@ LEFT JOIN ( act := make(virtualStorageState) for rows.Next() { var ( + repositoryID sql.NullInt64 virtualStorage, relativePath string primary sql.NullString assignments pq.StringArray ) - require.NoError(t, rows.Scan(&virtualStorage, &relativePath, &primary, &assignments)) + require.NoError(t, rows.Scan(&repositoryID, &virtualStorage, &relativePath, &primary, &assignments)) if act[virtualStorage] == nil { act[virtualStorage] = make(map[string]repositoryRecord) } act[virtualStorage][relativePath] = repositoryRecord{ - primary: primary.String, - assignments: assignments, + repositoryID: repositoryID.Int64, + primary: primary.String, + assignments: assignments, } } @@ -73,7 +82,7 @@ LEFT JOIN ( requireStorageState := func(t testing.TB, ctx context.Context, exp storageState) { rows, err := db.QueryContext(ctx, ` -SELECT virtual_storage, relative_path, storage, generation +SELECT repository_id, virtual_storage, relative_path, storage, generation FROM storage_repositories `) require.NoError(t, err) @@ -81,18 +90,19 @@ FROM storage_repositories act := make(storageState) for rows.Next() { + var repositoryID sql.NullInt64 var vs, rel, storage string var gen int - require.NoError(t, rows.Scan(&vs, &rel, &storage, &gen)) + require.NoError(t, rows.Scan(&repositoryID, &vs, &rel, &storage, &gen)) if act[vs] == nil { - act[vs] = make(map[string]map[string]int) + act[vs] = make(map[string]map[string]replicaRecord) } if act[vs][rel] == nil { - act[vs][rel] = make(map[string]int) + act[vs][rel] = make(map[string]replicaRecord) } - act[vs][rel][storage] = gen + act[vs][rel][storage] = replicaRecord{repositoryID: repositoryID.Int64, generation: gen} } require.NoError(t, rows.Err()) @@ -144,8 +154,8 @@ func TestRepositoryStore_incrementGenerationConcurrently(t *testing.T) { state: storageState{ "virtual-storage": { "relative-path": { - "primary": 2, - "secondary": 2, + "primary": {repositoryID: 1, generation: 2}, + "secondary": {repositoryID: 1, generation: 2}, }, }, }, @@ -162,8 +172,8 @@ func TestRepositoryStore_incrementGenerationConcurrently(t *testing.T) { state: storageState{ "virtual-storage": { "relative-path": { - "primary": 2, - "secondary": 0, + "primary": {repositoryID: 1, generation: 2}, + "secondary": {repositoryID: 1, generation: 0}, }, }, }, @@ -180,8 +190,8 @@ func TestRepositoryStore_incrementGenerationConcurrently(t *testing.T) { state: storageState{ "virtual-storage": { "relative-path": { - "primary": 1, - "secondary": 0, + "primary": {repositoryID: 1, generation: 1}, + "secondary": {repositoryID: 1, generation: 0}, }, }, }, @@ -193,7 +203,7 @@ func TestRepositoryStore_incrementGenerationConcurrently(t *testing.T) { db.TruncateAll(t) - require.NoError(t, NewPostgresRepositoryStore(db, nil).CreateRepository(ctx, "virtual-storage", "relative-path", "primary", []string{"secondary"}, nil, false, false)) + require.NoError(t, NewPostgresRepositoryStore(db, nil).CreateRepository(ctx, 1, "virtual-storage", "relative-path", "primary", []string{"secondary"}, nil, false, false)) firstTx := db.Begin(t) secondTx := db.Begin(t) @@ -211,7 +221,7 @@ func TestRepositoryStore_incrementGenerationConcurrently(t *testing.T) { secondTx.Commit(t) requireState(t, ctx, db, - virtualStorageState{"virtual-storage": {"relative-path": {}}}, + virtualStorageState{"virtual-storage": {"relative-path": {repositoryID: 1}}}, tc.state, ) }) @@ -273,7 +283,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { t.Run("write to outdated nodes", func(t *testing.T) { rs, requireState := newStore(t, nil) - require.NoError(t, rs.CreateRepository(ctx, vs, repo, "latest-node", []string{"outdated-primary", "outdated-secondary"}, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "latest-node", []string{"outdated-primary", "outdated-secondary"}, nil, false, false)) require.NoError(t, rs.SetGeneration(ctx, vs, repo, "latest-node", 1)) require.Equal(t, @@ -283,15 +293,15 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": repositoryRecord{}, + "repository-1": {repositoryID: 1}, }, }, storageState{ "virtual-storage-1": { "repository-1": { - "latest-node": 1, - "outdated-primary": 0, - "outdated-secondary": 0, + "latest-node": {repositoryID: 1, generation: 1}, + "outdated-primary": {repositoryID: 1, generation: 0}, + "outdated-secondary": {repositoryID: 1, generation: 0}, }, }, }, @@ -301,14 +311,14 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { t.Run("increments generation for up to date nodes", func(t *testing.T) { rs, requireState := newStore(t, nil) - for _, pair := range []struct{ virtualStorage, relativePath string }{ + for id, pair := range []struct{ virtualStorage, relativePath string }{ {vs, repo}, // create records that don't get modified to ensure the query is correctly scoped by virtual storage // and relative path {vs, "other-relative-path"}, {"other-virtual-storage", repo}, } { - require.NoError(t, rs.CreateRepository(ctx, pair.virtualStorage, pair.relativePath, "primary", []string{"up-to-date-secondary", "outdated-secondary"}, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, int64(id+1), pair.virtualStorage, pair.relativePath, "primary", []string{"up-to-date-secondary", "outdated-secondary"}, nil, false, false)) } require.NoError(t, rs.IncrementGeneration(ctx, vs, repo, "primary", []string{"up-to-date-secondary"})) @@ -316,31 +326,31 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": repositoryRecord{}, - "other-relative-path": {}, + "repository-1": {repositoryID: 1}, + "other-relative-path": {repositoryID: 2}, }, "other-virtual-storage": { - "repository-1": {}, + "repository-1": {repositoryID: 3}, }, }, storageState{ "virtual-storage-1": { "repository-1": { - "primary": 1, - "up-to-date-secondary": 1, - "outdated-secondary": 0, + "primary": {repositoryID: 1, generation: 1}, + "up-to-date-secondary": {repositoryID: 1, generation: 1}, + "outdated-secondary": {repositoryID: 1, generation: 0}, }, "other-relative-path": { - "primary": 0, - "up-to-date-secondary": 0, - "outdated-secondary": 0, + "primary": {repositoryID: 2}, + "up-to-date-secondary": {repositoryID: 2}, + "outdated-secondary": {repositoryID: 2}, }, }, "other-virtual-storage": { "repository-1": { - "primary": 0, - "up-to-date-secondary": 0, - "outdated-secondary": 0, + "primary": {repositoryID: 3}, + "up-to-date-secondary": {repositoryID: 3}, + "outdated-secondary": {repositoryID: 3}, }, }, }, @@ -352,31 +362,31 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": repositoryRecord{}, - "other-relative-path": {}, + "repository-1": {repositoryID: 1}, + "other-relative-path": {repositoryID: 2}, }, "other-virtual-storage": { - "repository-1": {}, + "repository-1": {repositoryID: 3}, }, }, storageState{ "virtual-storage-1": { "repository-1": { - "primary": 2, - "up-to-date-secondary": 2, - "outdated-secondary": 0, + "primary": {repositoryID: 1, generation: 2}, + "up-to-date-secondary": {repositoryID: 1, generation: 2}, + "outdated-secondary": {repositoryID: 1, generation: 0}, }, "other-relative-path": { - "primary": 0, - "up-to-date-secondary": 0, - "outdated-secondary": 0, + "primary": {repositoryID: 2}, + "up-to-date-secondary": {repositoryID: 2}, + "outdated-secondary": {repositoryID: 2}, }, }, "other-virtual-storage": { "repository-1": { - "primary": 0, - "up-to-date-secondary": 0, - "outdated-secondary": 0, + "primary": {repositoryID: 3}, + "up-to-date-secondary": {repositoryID: 3}, + "outdated-secondary": {repositoryID: 3}, }, }, }, @@ -395,7 +405,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { storageState{ "virtual-storage-1": { "repository-1": { - "storage-1": 1, + "storage-1": {repositoryID: 0, generation: 1}, }, }, }, @@ -405,19 +415,19 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { t.Run("updates existing record", func(t *testing.T) { rs, requireState := newStore(t, nil) - require.NoError(t, rs.CreateRepository(ctx, vs, repo, "storage-1", nil, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "storage-1", nil, nil, false, false)) require.NoError(t, rs.SetGeneration(ctx, vs, repo, stor, 1)) require.NoError(t, rs.SetGeneration(ctx, vs, repo, stor, 0)) requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": repositoryRecord{}, + "repository-1": {repositoryID: 1}, }, }, storageState{ "virtual-storage-1": { "repository-1": { - "storage-1": 0, + "storage-1": {repositoryID: 1, generation: 0}, }, }, }, @@ -436,18 +446,18 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { }) t.Run("sets the given replica as the latest", func(t *testing.T) { - require.NoError(t, rs.CreateRepository(ctx, vs, repo, "storage-1", []string{"storage-2"}, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "storage-1", []string{"storage-2"}, nil, false, false)) requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": repositoryRecord{}, + "repository-1": {repositoryID: 1}, }, }, storageState{ "virtual-storage-1": { "repository-1": { - "storage-1": 0, - "storage-2": 0, + "storage-1": {repositoryID: 1, generation: 0}, + "storage-2": {repositoryID: 1, generation: 0}, }, }, }, @@ -457,14 +467,14 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": repositoryRecord{}, + "repository-1": {repositoryID: 1}, }, }, storageState{ "virtual-storage-1": { "repository-1": { - "storage-1": 1, - "storage-2": 0, + "storage-1": {repositoryID: 1, generation: 1}, + "storage-2": {repositoryID: 1, generation: 0}, }, }, }, @@ -597,26 +607,27 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { t.Run(tc.desc, func(t *testing.T) { rs, requireState := newStore(t, nil) - require.NoError(t, rs.CreateRepository(ctx, vs, repo, "primary", tc.updatedSecondaries, tc.outdatedSecondaries, tc.storePrimary, tc.storeAssignments)) + require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "primary", tc.updatedSecondaries, tc.outdatedSecondaries, tc.storePrimary, tc.storeAssignments)) expectedStorageState := storageState{ vs: { repo: { - "primary": 0, + "primary": {repositoryID: 1, generation: 0}, }, }, } for _, updatedSecondary := range tc.updatedSecondaries { - expectedStorageState[vs][repo][updatedSecondary] = 0 + expectedStorageState[vs][repo][updatedSecondary] = replicaRecord{repositoryID: 1, generation: 0} } requireState(t, ctx, virtualStorageState{ vs: { - repo: repositoryRecord{ - primary: tc.expectedPrimary, - assignments: tc.expectedAssignments, + repo: { + repositoryID: 1, + primary: tc.expectedPrimary, + assignments: tc.expectedAssignments, }, }, }, @@ -626,13 +637,23 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { } }) - t.Run("conflict", func(t *testing.T) { + t.Run("conflict due to virtual storage and relative path", func(t *testing.T) { rs, _ := newStore(t, nil) - require.NoError(t, rs.CreateRepository(ctx, vs, repo, stor, nil, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, stor, nil, nil, false, false)) require.Equal(t, RepositoryExistsError{vs, repo, stor}, - rs.CreateRepository(ctx, vs, repo, stor, nil, nil, false, false), + rs.CreateRepository(ctx, 2, vs, repo, stor, nil, nil, false, false), + ) + }) + + t.Run("conflict due to repository id", func(t *testing.T) { + rs, _ := newStore(t, nil) + + require.NoError(t, rs.CreateRepository(ctx, 1, "virtual-storage-1", "relative-path-1", "storage-1", nil, nil, false, false)) + require.Equal(t, + fmt.Errorf("repository id 1 already in use"), + rs.CreateRepository(ctx, 1, "virtual-storage-2", "relative-path-2", "storage-2", nil, nil, false, false), ) }) }) @@ -647,42 +668,42 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { t.Run("delete existing", func(t *testing.T) { rs, requireState := newStore(t, nil) - require.NoError(t, rs.CreateRepository(ctx, "deleted", "deleted", "deleted", nil, nil, false, false)) - require.NoError(t, rs.CreateRepository(ctx, "virtual-storage-1", "other-storages-remain", "deleted-storage", []string{"remaining-storage"}, nil, false, false)) - require.NoError(t, rs.CreateRepository(ctx, "virtual-storage-2", "deleted-repo", "deleted-storage", nil, nil, false, false)) - require.NoError(t, rs.CreateRepository(ctx, "virtual-storage-2", "other-repo-remains", "remaining-storage", nil, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, 1, "deleted", "deleted", "deleted", nil, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, 2, "virtual-storage-1", "other-storages-remain", "deleted-storage", []string{"remaining-storage"}, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, 3, "virtual-storage-2", "deleted-repo", "deleted-storage", nil, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, 4, "virtual-storage-2", "other-repo-remains", "remaining-storage", nil, nil, false, false)) requireState(t, ctx, virtualStorageState{ "deleted": { - "deleted": repositoryRecord{}, + "deleted": {repositoryID: 1}, }, "virtual-storage-1": { - "other-storages-remain": repositoryRecord{}, + "other-storages-remain": {repositoryID: 2}, }, "virtual-storage-2": { - "deleted-repo": repositoryRecord{}, - "other-repo-remains": repositoryRecord{}, + "deleted-repo": repositoryRecord{repositoryID: 3}, + "other-repo-remains": {repositoryID: 4}, }, }, storageState{ "deleted": { "deleted": { - "deleted": 0, + "deleted": {repositoryID: 1, generation: 0}, }, }, "virtual-storage-1": { "other-storages-remain": { - "deleted-storage": 0, - "remaining-storage": 0, + "deleted-storage": {repositoryID: 2, generation: 0}, + "remaining-storage": {repositoryID: 2, generation: 0}, }, }, "virtual-storage-2": { "deleted-repo": { - "deleted-storage": 0, + "deleted-storage": {repositoryID: 3, generation: 0}, }, "other-repo-remains": { - "remaining-storage": 0, + "remaining-storage": {repositoryID: 4, generation: 0}, }, }, }, @@ -695,18 +716,18 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { requireState(t, ctx, virtualStorageState{ "virtual-storage-2": { - "other-repo-remains": repositoryRecord{}, + "other-repo-remains": {repositoryID: 4}, }, }, storageState{ "virtual-storage-1": { "other-storages-remain": { - "remaining-storage": 0, + "remaining-storage": {generation: 0}, }, }, "virtual-storage-2": { "other-repo-remains": { - "remaining-storage": 0, + "remaining-storage": {repositoryID: 4, generation: 0}, }, }, }, @@ -715,20 +736,20 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { t.Run("transactional delete", func(t *testing.T) { rs, requireState := newStore(t, nil) - require.NoError(t, rs.CreateRepository(ctx, "virtual-storage-1", "repository-1", "replica-1", []string{"replica-2", "replica-3"}, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, 1, "virtual-storage-1", "repository-1", "replica-1", []string{"replica-2", "replica-3"}, nil, false, false)) requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": repositoryRecord{}, + "repository-1": {repositoryID: 1}, }, }, storageState{ "virtual-storage-1": { "repository-1": { - "replica-1": 0, - "replica-2": 0, - "replica-3": 0, + "replica-1": {repositoryID: 1, generation: 0}, + "replica-2": {repositoryID: 1, generation: 0}, + "replica-3": {repositoryID: 1, generation: 0}, }, }, }, @@ -740,7 +761,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { storageState{ "virtual-storage-1": { "repository-1": { - "replica-3": 0, + "replica-3": {generation: 0}, }, }, }, @@ -756,33 +777,33 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { }) t.Run("delete existing", func(t *testing.T) { - require.NoError(t, rs.CreateRepository(ctx, "virtual-storage-1", "relative-path-1", "storage-1", []string{"storage-2"}, nil, false, false)) - require.NoError(t, rs.CreateRepository(ctx, "virtual-storage-1", "relative-path-2", "storage-1", nil, nil, false, false)) - require.NoError(t, rs.CreateRepository(ctx, "virtual-storage-2", "relative-path-1", "storage-1", nil, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, 1, "virtual-storage-1", "relative-path-1", "storage-1", []string{"storage-2"}, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, 2, "virtual-storage-1", "relative-path-2", "storage-1", nil, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, 3, "virtual-storage-2", "relative-path-1", "storage-1", nil, nil, false, false)) requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "relative-path-1": repositoryRecord{}, - "relative-path-2": repositoryRecord{}, + "relative-path-1": {repositoryID: 1}, + "relative-path-2": {repositoryID: 2}, }, "virtual-storage-2": { - "relative-path-1": repositoryRecord{}, + "relative-path-1": {repositoryID: 3}, }, }, storageState{ "virtual-storage-1": { "relative-path-1": { - "storage-1": 0, - "storage-2": 0, + "storage-1": {repositoryID: 1, generation: 0}, + "storage-2": {repositoryID: 1, generation: 0}, }, "relative-path-2": { - "storage-1": 0, + "storage-1": {repositoryID: 2, generation: 0}, }, }, "virtual-storage-2": { "relative-path-1": { - "storage-1": 0, + "storage-1": {repositoryID: 3, generation: 0}, }, }, }, @@ -793,25 +814,25 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "relative-path-1": repositoryRecord{}, - "relative-path-2": repositoryRecord{}, + "relative-path-1": {repositoryID: 1}, + "relative-path-2": {repositoryID: 2}, }, "virtual-storage-2": { - "relative-path-1": repositoryRecord{}, + "relative-path-1": {repositoryID: 3}, }, }, storageState{ "virtual-storage-1": { "relative-path-1": { - "storage-2": 0, + "storage-2": {repositoryID: 1, generation: 0}, }, "relative-path-2": { - "storage-1": 0, + "storage-1": {repositoryID: 2, generation: 0}, }, }, "virtual-storage-2": { "relative-path-1": { - "storage-1": 0, + "storage-1": {repositoryID: 3, generation: 0}, }, }, }, @@ -832,24 +853,24 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { t.Run("rename existing", func(t *testing.T) { rs, requireState := newStore(t, nil) - require.NoError(t, rs.CreateRepository(ctx, vs, "renamed-all", "storage-1", nil, nil, false, false)) - require.NoError(t, rs.CreateRepository(ctx, vs, "renamed-some", "storage-1", []string{"storage-2"}, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, 1, vs, "renamed-all", "storage-1", nil, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, 2, vs, "renamed-some", "storage-1", []string{"storage-2"}, nil, false, false)) requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "renamed-all": repositoryRecord{}, - "renamed-some": repositoryRecord{}, + "renamed-all": repositoryRecord{repositoryID: 1}, + "renamed-some": {repositoryID: 2}, }, }, storageState{ "virtual-storage-1": { "renamed-all": { - "storage-1": 0, + "storage-1": {repositoryID: 1, generation: 0}, }, "renamed-some": { - "storage-1": 0, - "storage-2": 0, + "storage-1": {repositoryID: 2, generation: 0}, + "storage-2": {repositoryID: 2, generation: 0}, }, }, }, @@ -861,20 +882,20 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "renamed-all-new": repositoryRecord{}, - "renamed-some-new": repositoryRecord{}, + "renamed-all-new": repositoryRecord{repositoryID: 1}, + "renamed-some-new": {repositoryID: 2}, }, }, storageState{ "virtual-storage-1": { "renamed-all-new": { - "storage-1": 0, + "storage-1": {repositoryID: 1, generation: 0}, }, "renamed-some-new": { - "storage-1": 0, + "storage-1": {repositoryID: 2, generation: 0}, }, "renamed-some": { - "storage-2": 0, + "storage-2": {repositoryID: 2, generation: 0}, }, }, }, @@ -893,21 +914,21 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { require.Empty(t, secondaries) }) - require.NoError(t, rs.CreateRepository(ctx, vs, repo, "primary", []string{"consistent-secondary"}, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "primary", []string{"consistent-secondary"}, nil, false, false)) require.NoError(t, rs.IncrementGeneration(ctx, vs, repo, "primary", []string{"consistent-secondary"})) require.NoError(t, rs.SetGeneration(ctx, vs, repo, "inconsistent-secondary", 0)) requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": repositoryRecord{}, + "repository-1": {repositoryID: 1}, }, }, storageState{ "virtual-storage-1": { "repository-1": { - "primary": 1, - "consistent-secondary": 1, - "inconsistent-secondary": 0, + "primary": {repositoryID: 1, generation: 1}, + "consistent-secondary": {repositoryID: 1, generation: 1}, + "inconsistent-secondary": {repositoryID: 1, generation: 0}, }, }, }, @@ -933,16 +954,16 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": repositoryRecord{}, + "repository-1": {repositoryID: 1}, }, }, storageState{ "virtual-storage-1": { "repository-1": { - "unknown": 2, - "primary": 1, - "consistent-secondary": 1, - "inconsistent-secondary": 0, + "unknown": {repositoryID: 1, generation: 2}, + "primary": {repositoryID: 1, generation: 1}, + "consistent-secondary": {repositoryID: 1, generation: 1}, + "inconsistent-secondary": {repositoryID: 1, generation: 0}, }, }, }, @@ -960,9 +981,9 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { storageState{ "virtual-storage-1": { "repository-1": { - "unknown": 2, - "consistent-secondary": 1, - "inconsistent-secondary": 0, + "unknown": {generation: 2}, + "consistent-secondary": {generation: 1}, + "inconsistent-secondary": {generation: 0}, }, }, }, @@ -977,25 +998,25 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { t.Run("DeleteInvalidRepository", func(t *testing.T) { t.Run("only replica", func(t *testing.T) { rs, requireState := newStore(t, nil) - require.NoError(t, rs.CreateRepository(ctx, vs, repo, "invalid-storage", nil, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "invalid-storage", nil, nil, false, false)) require.NoError(t, rs.DeleteInvalidRepository(ctx, vs, repo, "invalid-storage")) requireState(t, ctx, virtualStorageState{}, storageState{}) }) t.Run("another replica", func(t *testing.T) { rs, requireState := newStore(t, nil) - require.NoError(t, rs.CreateRepository(ctx, vs, repo, "invalid-storage", []string{"other-storage"}, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "invalid-storage", []string{"other-storage"}, nil, false, false)) require.NoError(t, rs.DeleteInvalidRepository(ctx, vs, repo, "invalid-storage")) requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": repositoryRecord{}, + "repository-1": {repositoryID: 1}, }, }, storageState{ "virtual-storage-1": { "repository-1": { - "other-storage": 0, + "other-storage": {repositoryID: 1, generation: 0}, }, }, }, @@ -1010,7 +1031,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { require.NoError(t, err) require.False(t, exists) - require.NoError(t, rs.CreateRepository(ctx, vs, repo, stor, nil, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, stor, nil, nil, false, false)) exists, err = rs.RepositoryExists(ctx, vs, repo) require.NoError(t, err) require.True(t, exists) @@ -1020,6 +1041,42 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { require.NoError(t, err) require.False(t, exists) }) + + t.Run("ReserveRepositoryID", func(t *testing.T) { + rs, _ := newStore(t, nil) + + id, err := rs.ReserveRepositoryID(ctx, vs, repo) + require.NoError(t, err) + require.Equal(t, int64(1), id) + + id, err = rs.ReserveRepositoryID(ctx, vs, repo) + require.NoError(t, err) + require.Equal(t, int64(2), id) + + require.NoError(t, rs.CreateRepository(ctx, id, vs, repo, stor, nil, nil, false, false)) + + id, err = rs.ReserveRepositoryID(ctx, vs, repo) + require.Equal(t, commonerr.ErrRepositoryAlreadyExists, err) + require.Equal(t, int64(0), id) + + id, err = rs.ReserveRepositoryID(ctx, vs, repo+"-2") + require.NoError(t, err) + require.Equal(t, int64(3), id) + }) + + t.Run("GetRepositoryID", func(t *testing.T) { + rs, _ := newStore(t, nil) + + id, err := rs.GetRepositoryID(ctx, vs, repo) + require.Equal(t, commonerr.NewRepositoryNotFoundError(vs, repo), err) + require.Equal(t, int64(0), id) + + require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, stor, nil, nil, false, false)) + + id, err = rs.GetRepositoryID(ctx, vs, repo) + require.Nil(t, err) + require.Equal(t, int64(1), id) + }) } func TestPostgresRepositoryStore_GetPartiallyAvailableRepositories(t *testing.T) { diff --git a/internal/praefect/reconciler/reconciler.go b/internal/praefect/reconciler/reconciler.go index 308a7aff194ecc04bd1b4e59f6f8f30374a97f69..a37fd5a0555a94f285b6deae9edda649ee37f768 100644 --- a/internal/praefect/reconciler/reconciler.go +++ b/internal/praefect/reconciler/reconciler.go @@ -84,6 +84,7 @@ func (r *Reconciler) Run(ctx context.Context, ticker helper.Ticker) error { // job is an internal type for formatting log messages type job struct { + RepositoryID int64 `json:"repository_id"` Change string `json:"change"` CorrelationID string `json:"correlation_id"` VirtualStorage string `json:"virtual_storage"` @@ -148,10 +149,12 @@ healthy_storages AS ( delete_jobs AS ( SELECT DISTINCT ON (virtual_storage, relative_path) + repositories.repository_id, virtual_storage, relative_path, storage FROM storage_repositories + JOIN repositories USING (virtual_storage, relative_path) JOIN healthy_storages USING (virtual_storage, storage) WHERE ( -- Only unassigned repositories should be targeted for deletion. If no assignment exists, @@ -161,7 +164,7 @@ delete_jobs AS ( WHERE virtual_storage = storage_repositories.virtual_storage AND relative_path = storage_repositories.relative_path ) - AND generation <= ( + AND storage_repositories.generation <= ( -- Check whether the replica's generation is equal or lower than the generation of every assigned -- replica of the repository. If so, then it is eligible for deletion. SELECT MIN(COALESCE(generation, -1)) @@ -196,6 +199,7 @@ delete_jobs AS ( -- repository remains on storage unused (because it doesn't exist in the 'repositories' table anymore). SELECT * FROM ( SELECT DISTINCT ON (virtual_storage, relative_path) + 0 AS repository_id, virtual_storage, relative_path, storage @@ -219,12 +223,13 @@ delete_jobs AS ( update_jobs AS ( SELECT DISTINCT ON (virtual_storage, relative_path, target_node_storage) + repository_id, virtual_storage, relative_path, source_node_storage, target_node_storage FROM ( - SELECT virtual_storage, relative_path, storage AS target_node_storage + SELECT repositories.repository_id, virtual_storage, relative_path, storage AS target_node_storage FROM repositories JOIN healthy_storages USING (virtual_storage) LEFT JOIN storage_repositories USING (virtual_storage, relative_path, storage) @@ -273,6 +278,7 @@ reconciliation_jobs AS ( jsonb_build_object('correlation_id', encode(random()::text::bytea, 'base64')) FROM ( SELECT + COALESCE(repository_id, 0) AS repository_id, virtual_storage, relative_path, source_node_storage, @@ -281,6 +287,7 @@ reconciliation_jobs AS ( FROM update_jobs UNION ALL SELECT + COALESCE(repository_id, 0) AS repository_id, virtual_storage, relative_path, NULL AS source_node_storage, @@ -303,6 +310,7 @@ create_locks AS ( SELECT meta->>'correlation_id', + job->>'repository_id', job->>'change', job->>'virtual_storage', job->>'relative_path', @@ -326,6 +334,7 @@ FROM reconciliation_jobs var j job if err := rows.Scan( &j.CorrelationID, + &j.RepositoryID, &j.Change, &j.VirtualStorage, &j.RelativePath, diff --git a/internal/praefect/reconciler/reconciler_test.go b/internal/praefect/reconciler/reconciler_test.go index a76adb48ff8b168e8bdf52c5e256259e6577862e..a74a668f1a31e7ff0c7a9a57e2f00f2061db9cd3 100644 --- a/internal/praefect/reconciler/reconciler_test.go +++ b/internal/praefect/reconciler/reconciler_test.go @@ -12,6 +12,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/helper" "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/metadatahandler" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" @@ -1078,7 +1079,11 @@ func TestReconciler(t *testing.T) { if repo.generation >= 0 { if !repoCreated { repoCreated = true - require.NoError(t, rs.CreateRepository(ctx, virtualStorage, relativePath, storage, nil, nil, false, false)) + + id, err := rs.ReserveRepositoryID(ctx, virtualStorage, relativePath) + require.NoError(t, err) + + require.NoError(t, rs.CreateRepository(ctx, id, virtualStorage, relativePath, storage, nil, nil, false, false)) } require.NoError(t, rs.SetGeneration(ctx, virtualStorage, relativePath, storage, repo.generation)) @@ -1149,6 +1154,16 @@ func TestReconciler(t *testing.T) { } } + // Fill the expected reconciliation jobs with generated repository IDs. + for i, job := range tc.reconciliationJobs { + id, err := rs.GetRepositoryID(ctx, job.VirtualStorage, job.RelativePath) + if err != nil { + require.Equal(t, commonerr.NewRepositoryNotFoundError(job.VirtualStorage, job.RelativePath), err) + } + + tc.reconciliationJobs[i].RepositoryID = id + } + // run reconcile in two concurrent transactions to ensure everything works // as expected with multiple Praefect's reconciling at the same time firstTx := db.Begin(t) diff --git a/internal/praefect/replicator_pg_test.go b/internal/praefect/replicator_pg_test.go index 63e21b223ac5d93af4e91abb88a2a96f3b13bb77..d790a88c3bae13f92152017a9b7bd53746fe2625 100644 --- a/internal/praefect/replicator_pg_test.go +++ b/internal/praefect/replicator_pg_test.go @@ -87,7 +87,7 @@ func TestReplicatorDestroy(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - require.NoError(t, rs.CreateRepository(ctx, "virtual-storage-1", "relative-path-1", "storage-1", []string{"storage-2"}, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, 1, "virtual-storage-1", "relative-path-1", "storage-1", []string{"storage-2"}, nil, false, false)) ln, err := net.Listen("tcp", "localhost:0") require.NoError(t, err) diff --git a/internal/praefect/repository_exists_test.go b/internal/praefect/repository_exists_test.go index 75db7e81be1aa31b2f8ef4cd0cf29132caa78800..a5e894eaf13de66aed244e66382c4f6ac1bc95ec 100644 --- a/internal/praefect/repository_exists_test.go +++ b/internal/praefect/repository_exists_test.go @@ -74,7 +74,7 @@ func TestRepositoryExistsStreamInterceptor(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - require.NoError(t, rs.CreateRepository(ctx, "virtual-storage", "relative-path", "storage", nil, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, 0, "virtual-storage", "relative-path", "storage", nil, nil, false, false)) electionStrategy := config.ElectionStrategyPerRepository if tc.routeToGitaly { diff --git a/internal/praefect/router.go b/internal/praefect/router.go index a2147a27ee7c07dba1b8d6bc17ae94465859df49..fc99b4769a62e83dd9c3f49a61d9c40b359b23ae 100644 --- a/internal/praefect/router.go +++ b/internal/praefect/router.go @@ -25,6 +25,8 @@ type StorageMutatorRoute struct { // RepositoryMutatorRoute describes how to route a repository scoped mutator call. type RepositoryMutatorRoute struct { + // RepositoryID is the repository's ID as Praefect identifies it. + RepositoryID int64 // Primary is the primary node of the transaction. Primary RouterNode // Secondaries are the secondary participating in a transaction. @@ -50,5 +52,5 @@ type Router interface { RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath string) (RepositoryMutatorRoute, error) // RouteRepositoryCreation decides returns the primary and secondaries that should handle the repository creation // request. It is up to the caller to store the assignments and primary information after finishing the RPC. - RouteRepositoryCreation(ctx context.Context, virtualStorage string) (RepositoryMutatorRoute, error) + RouteRepositoryCreation(ctx context.Context, virtualStorage, relativePath string) (RepositoryMutatorRoute, error) } diff --git a/internal/praefect/router_node_manager.go b/internal/praefect/router_node_manager.go index 015a8985a48634fe355fe4b5e898bde941e6a5f6..4395d4dfc4314f47700bc2aa14252efee959acd0 100644 --- a/internal/praefect/router_node_manager.go +++ b/internal/praefect/router_node_manager.go @@ -119,7 +119,7 @@ func (r *nodeManagerRouter) RouteRepositoryMutator(ctx context.Context, virtualS // RouteRepositoryCreation includes healthy secondaries in the transaction and sets the unhealthy secondaries as // replication targets. The virtual storage's primary acts as the primary for every repository. -func (r *nodeManagerRouter) RouteRepositoryCreation(ctx context.Context, virtualStorage string) (RepositoryMutatorRoute, error) { +func (r *nodeManagerRouter) RouteRepositoryCreation(ctx context.Context, virtualStorage, relativePath string) (RepositoryMutatorRoute, error) { shard, err := r.mgr.GetShard(ctx, virtualStorage) if err != nil { return RepositoryMutatorRoute{}, fmt.Errorf("get shard: %w", err) diff --git a/internal/praefect/router_per_repository.go b/internal/praefect/router_per_repository.go index fc022cddb775fb648707229780b326cb3cccc2d7..4a70bb9125cd7bc3568a93977744258dd0837167 100644 --- a/internal/praefect/router_per_repository.go +++ b/internal/praefect/router_per_repository.go @@ -63,11 +63,20 @@ type PerRepositoryRouter struct { rand Random hc HealthChecker csg datastore.ConsistentStoragesGetter + rs datastore.RepositoryStore defaultReplicationFactors map[string]int } // NewPerRepositoryRouter returns a new PerRepositoryRouter using the passed configuration. -func NewPerRepositoryRouter(conns Connections, pg PrimaryGetter, hc HealthChecker, rand Random, csg datastore.ConsistentStoragesGetter, ag AssignmentGetter, defaultReplicationFactors map[string]int) *PerRepositoryRouter { +func NewPerRepositoryRouter( + conns Connections, + pg PrimaryGetter, + hc HealthChecker, + rand Random, + csg datastore.ConsistentStoragesGetter, + ag AssignmentGetter, + rs datastore.RepositoryStore, + defaultReplicationFactors map[string]int) *PerRepositoryRouter { return &PerRepositoryRouter{ conns: conns, pg: pg, @@ -75,6 +84,7 @@ func NewPerRepositoryRouter(conns Connections, pg PrimaryGetter, hc HealthChecke hc: hc, csg: csg, ag: ag, + rs: rs, defaultReplicationFactors: defaultReplicationFactors, } } @@ -204,7 +214,12 @@ func (r *PerRepositoryRouter) RouteRepositoryMutator(ctx context.Context, virtua return RepositoryMutatorRoute{}, fmt.Errorf("get host assignments: %w", err) } - var route RepositoryMutatorRoute + repositoryID, err := r.rs.GetRepositoryID(ctx, virtualStorage, relativePath) + if err != nil { + return RepositoryMutatorRoute{}, fmt.Errorf("get repository id: %w", err) + } + + route := RepositoryMutatorRoute{RepositoryID: repositoryID} for _, assigned := range assignedStorages { node, healthy := healthySet[assigned] if assigned == primary { @@ -237,7 +252,7 @@ func (r *PerRepositoryRouter) RouteRepositoryMutator(ctx context.Context, virtua // RouteRepositoryCreation picks a random healthy node to act as the primary node and selects the secondary nodes // if assignments are enabled. Healthy secondaries take part in the transaction, unhealthy secondaries are set as // replication targets. -func (r *PerRepositoryRouter) RouteRepositoryCreation(ctx context.Context, virtualStorage string) (RepositoryMutatorRoute, error) { +func (r *PerRepositoryRouter) RouteRepositoryCreation(ctx context.Context, virtualStorage, relativePath string) (RepositoryMutatorRoute, error) { healthyNodes, err := r.healthyNodes(virtualStorage) if err != nil { return RepositoryMutatorRoute{}, err @@ -248,9 +263,17 @@ func (r *PerRepositoryRouter) RouteRepositoryCreation(ctx context.Context, virtu return RepositoryMutatorRoute{}, err } + id, err := r.rs.ReserveRepositoryID(ctx, virtualStorage, relativePath) + if err != nil { + return RepositoryMutatorRoute{}, fmt.Errorf("reserve repository id: %w", err) + } + replicationFactor := r.defaultReplicationFactors[virtualStorage] if replicationFactor == 1 { - return RepositoryMutatorRoute{Primary: primary}, nil + return RepositoryMutatorRoute{ + RepositoryID: id, + Primary: primary, + }, nil } var secondaryNodes []RouterNode @@ -296,6 +319,7 @@ func (r *PerRepositoryRouter) RouteRepositoryCreation(ctx context.Context, virtu } return RepositoryMutatorRoute{ + RepositoryID: id, Primary: primary, Secondaries: secondaries, ReplicationTargets: replicationTargets, diff --git a/internal/praefect/router_per_repository_test.go b/internal/praefect/router_per_repository_test.go index 5a0a1e864600814ae741e4b359ef6da25c8eb261..332bf30b8c66bdbc99cefac176b384ddd0258b91 100644 --- a/internal/praefect/router_per_repository_test.go +++ b/internal/praefect/router_per_repository_test.go @@ -2,10 +2,12 @@ package praefect import ( "context" + "fmt" "sort" "testing" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" @@ -95,6 +97,7 @@ func TestPerRepositoryRouter_RouteStorageAccessor(t *testing.T) { }, nil, nil, + datastore.MockRepositoryStore{}, nil, ) @@ -226,6 +229,7 @@ func TestPerRepositoryRouter_RouteRepositoryAccessor(t *testing.T) { }, }, nil, + datastore.MockRepositoryStore{}, nil, ) @@ -367,6 +371,11 @@ func TestPerRepositoryRouter_RouteRepositoryMutator(t *testing.T) { }, }, tc.assignedNodes, + datastore.MockRepositoryStore{ + GetRepositoryIDFunc: func(ctx context.Context, virtualStorage, relativePath string) (int64, error) { + return 1, nil + }, + }, nil, ) @@ -382,6 +391,7 @@ func TestPerRepositoryRouter_RouteRepositoryMutator(t *testing.T) { } require.Equal(t, RepositoryMutatorRoute{ + RepositoryID: 1, Primary: RouterNode{ Storage: "primary", Connection: conns[tc.virtualStorage]["primary"], @@ -423,6 +433,7 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { primaryCandidates int primaryPick int secondaryCandidates int + repositoryExists bool matchRoute matcher error error }{ @@ -445,6 +456,7 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { primaryPick: 0, matchRoute: requireOneOf( RepositoryMutatorRoute{ + RepositoryID: 1, Primary: RouterNode{Storage: "primary", Connection: primaryConn}, ReplicationTargets: []string{"secondary-1", "secondary-2"}, }, @@ -458,7 +470,8 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { primaryPick: 0, matchRoute: requireOneOf( RepositoryMutatorRoute{ - Primary: RouterNode{Storage: "primary", Connection: primaryConn}, + RepositoryID: 1, + Primary: RouterNode{Storage: "primary", Connection: primaryConn}, Secondaries: []RouterNode{ {Storage: "secondary-1", Connection: secondary1Conn}, {Storage: "secondary-2", Connection: secondary2Conn}, @@ -474,7 +487,8 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { primaryPick: 0, matchRoute: requireOneOf( RepositoryMutatorRoute{ - Primary: RouterNode{Storage: "primary", Connection: primaryConn}, + RepositoryID: 1, + Primary: RouterNode{Storage: "primary", Connection: primaryConn}, Secondaries: []RouterNode{ {Storage: "secondary-1", Connection: secondary1Conn}, }, @@ -491,7 +505,8 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { primaryPick: 0, matchRoute: requireOneOf( RepositoryMutatorRoute{ - Primary: RouterNode{Storage: "primary", Connection: primaryConn}, + RepositoryID: 1, + Primary: RouterNode{Storage: "primary", Connection: primaryConn}, }, ), }, @@ -505,12 +520,14 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { secondaryCandidates: 2, matchRoute: requireOneOf( RepositoryMutatorRoute{ - Primary: RouterNode{Storage: "primary", Connection: primaryConn}, - Secondaries: []RouterNode{{Storage: "secondary-1", Connection: secondary1Conn}}, + RepositoryID: 1, + Primary: RouterNode{Storage: "primary", Connection: primaryConn}, + Secondaries: []RouterNode{{Storage: "secondary-1", Connection: secondary1Conn}}, }, RepositoryMutatorRoute{ - Primary: RouterNode{Storage: "primary", Connection: primaryConn}, - Secondaries: []RouterNode{{Storage: "secondary-2", Connection: secondary1Conn}}, + RepositoryID: 1, + Primary: RouterNode{Storage: "primary", Connection: primaryConn}, + Secondaries: []RouterNode{{Storage: "secondary-2", Connection: secondary1Conn}}, }, ), }, @@ -524,12 +541,22 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { secondaryCandidates: 2, matchRoute: requireOneOf( RepositoryMutatorRoute{ + RepositoryID: 1, Primary: RouterNode{Storage: "primary", Connection: primaryConn}, Secondaries: []RouterNode{{Storage: "secondary-1", Connection: secondary1Conn}}, ReplicationTargets: []string{"secondary-2"}, }, ), }, + { + desc: "repository already exists", + virtualStorage: "virtual-storage-1", + healthyNodes: StaticHealthChecker(configuredNodes), + primaryCandidates: 3, + primaryPick: 0, + repositoryExists: true, + error: fmt.Errorf("reserve repository id: %w", commonerr.ErrRepositoryAlreadyExists), + }, } { t.Run(tc.desc, func(t *testing.T) { ctx, cancel := testhelper.Context() @@ -556,8 +583,17 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { }, nil, nil, + datastore.MockRepositoryStore{ + ReserveRepositoryIDFunc: func(ctx context.Context, virtualStorage, relativePath string) (int64, error) { + if tc.repositoryExists { + return 0, commonerr.ErrRepositoryAlreadyExists + } + + return 1, nil + }, + }, map[string]int{"virtual-storage-1": tc.replicationFactor}, - ).RouteRepositoryCreation(ctx, tc.virtualStorage) + ).RouteRepositoryCreation(ctx, tc.virtualStorage, "relative-path") if tc.error != nil { require.Equal(t, tc.error, err) return