From 9c324373ecfece644f961dedecab91d6af0f22a8 Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Tue, 27 Jul 2021 13:39:32 +0200 Subject: [PATCH 01/14] Add repository_id into the database schema Praefect's approach of identifying repositories solely by their virtual storage and relative path is causing issues. Removing and recreating a repository with the same virtual storage and relative path maps the new repository to the same disk path and database state. This can cause failures recreating the repository due to already existing state and is currently impacting backup restoration. The virtual storage and relative path are client provided. They are used as the primary key in the database as they are the only method of identifying a repository currently. If the client changes the relative path, Praefect has to update the keys in the database and move the replicas physically on the disks of the Gitaly nodes. This is error prone as it may be that the operation succeeds only partially causing Praefect to lose track of the repository. This is impacting the interoperability of Geo and Gitaly Cluster as Geo performs renames frequently. The delete/recreation problem can be avoided by ensuring the newly created repository does not map to the previously deleted repository's state even if it uses the same virtual storage and relative path. The rename problem can be avoided by storing the replicas in static paths determined by Praefect and giving the client only the illusion the repository was moved. To facilitate both of the fixes, this commit adds repository_id into the schema. A repository ID uniquely identifies a repository in the cluster and doesn't have any other meaning. Praefect can then use the generated repository ID to map the repository on the disk to a unique path regardless of the client provide virtual stoage and relative path. As the repository ID can be used to identify the replicas on the disks, the renames become atomic database updates. Praefect only needs to update the a mapping from (virtual_storage, relative_path) to repository_id. Each newly created repository gets a unique ID, so a newly created repository won't map to the same database or disk state as its ID is different from the deleted repository. Uniqueness of the (virtual_storage, relative_path) will still be enforced and there can be only one repository that maps to a given combination and is still considered to exist. The migration to repository ID's has to be split over three releases to ensure zero downtime upgrades: 1. This first migration adds the columns in all relevant tables. Each logical repository in `repositories` table gets an ID generated for them. In the same release, we'll change the code to link any newly created repositories and replicas correctly via the repository id. 2. The second release will contain a migration to connect any existing records via the repository id. Unupgraded Praefects creating records concurrently is fine as they already connect the records via the repository ID. Since we can now assume every record is connected via the repository id, we'll update every query to join the records based on repository_id rather than (virtual_storage, relative_path). The router will look up replica relative paths rather than assuming they are the client provided ones. 3. As all queries are now joining via the repository_id, the replicas' relative path can now be decoupled from the logical repository's relative path. This release will hash the repository id to generate a unique relative path for each repository and store it. Unupgraded Praefect's are already at this point looking up the relative path to use, so this is backwards compatible. --- .../20210727085659_repository_ids.go | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 internal/praefect/datastore/migrations/20210727085659_repository_ids.go diff --git a/internal/praefect/datastore/migrations/20210727085659_repository_ids.go b/internal/praefect/datastore/migrations/20210727085659_repository_ids.go new file mode 100644 index 0000000000..84bdee9387 --- /dev/null +++ b/internal/praefect/datastore/migrations/20210727085659_repository_ids.go @@ -0,0 +1,31 @@ +package migrations + +import migrate "github.com/rubenv/sql-migrate" + +func init() { + m := &migrate.Migration{ + Id: "20210727085659_repository_ids", + Up: []string{ + "CREATE UNIQUE INDEX repository_lookup_index ON repositories (virtual_storage, relative_path)", + "ALTER TABLE repository_assignments DROP CONSTRAINT IF EXISTS repository_assignments_virtual_storage_relative_path_fkey", + "ALTER TABLE repository_assignments DROP CONSTRAINT IF EXISTS repository_assignments_virtual_storage_fkey", + "ALTER TABLE repositories DROP CONSTRAINT repositories_pkey", + "ALTER TABLE repository_assignments ADD CONSTRAINT repository_assignments_virtual_storage_relative_path_fkey FOREIGN KEY (virtual_storage, relative_path) REFERENCES repositories (virtual_storage, relative_path) ON UPDATE CASCADE ON DELETE CASCADE", + "ALTER TABLE repositories ADD COLUMN repository_id BIGSERIAL PRIMARY KEY", + "ALTER TABLE repository_assignments ADD COLUMN repository_id BIGINT REFERENCES repositories (repository_id) ON DELETE CASCADE", + "ALTER TABLE storage_repositories ADD COLUMN repository_id BIGINT REFERENCES repositories (repository_id) ON DELETE SET NULL", + }, + Down: []string{ + "ALTER TABLE storage_repositories DROP COLUMN repository_id", + "ALTER TABLE repository_assignments DROP COLUMN repository_id", + "ALTER TABLE repositories DROP COLUMN repository_id", + "CREATE UNIQUE INDEX repositories_pkey ON repositories (virtual_storage, relative_path)", + "ALTER TABLE repositories ADD PRIMARY KEY USING INDEX repositories_pkey", + "ALTER TABLE repository_assignments DROP CONSTRAINT repository_assignments_virtual_storage_relative_path_fkey", + "DROP INDEX repository_lookup_index", + "ALTER TABLE repository_assignments ADD FOREIGN KEY (virtual_storage, relative_path) REFERENCES repositories (virtual_storage, relative_path) ON UPDATE CASCADE ON DELETE CASCADE", + }, + } + + allMigrations = append(allMigrations, m) +} -- GitLab From cc20f95ab7ead03a748ad73eb81b807fa052fbf9 Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Tue, 27 Jul 2021 13:40:03 +0200 Subject: [PATCH 02/14] Add a method for reserving a repository ID from the database Praefect creates database records for a repository only after the creation operation has been completed by the Gitalys. As Praefect will later generate the relative paths where the Gitalys will store the replicas, it needs to know the repository ID prior to creating the database records. This commit adds a method for reserving a repository ID prior to creating the repository. This allows Praefect to generate the relative path and create the repository on the Gitalys prior to acknowledging the creation in the database. The generated ID is consumed even if the repository creation fails. This ensures any future creation attempts map to different disk state, so the client can always retry a failed repository creation. For completeness, the method also checks that no repository exists at the given virtual storage and relative path. This is mostly to fail early. If there are two concurrent creations for the same virtual storage and relative path, they both will get unique IDs and store the repositories in different directories on the Gitalys. The faster one will then win the race and create the database records with CreateRepository. The slower will receive a RepositoryExistsError from CreateRepository and not acknowledge the creation to the client. --- internal/praefect/commonerr/error.go | 8 +++++- .../praefect/datastore/repository_store.go | 25 +++++++++++++++++++ .../datastore/repository_store_mock.go | 10 ++++++++ .../datastore/repository_store_test.go | 22 ++++++++++++++++ 4 files changed, 64 insertions(+), 1 deletion(-) diff --git a/internal/praefect/commonerr/error.go b/internal/praefect/commonerr/error.go index 6289f28211..4ad2f6c800 100644 --- a/internal/praefect/commonerr/error.go +++ b/internal/praefect/commonerr/error.go @@ -3,7 +3,10 @@ // due to cyclic imports. package commonerr -import "fmt" +import ( + "errors" + "fmt" +) // RepositoryNotFoundError is returned when attempting to operate on a repository // that does not exist in the virtual storage. @@ -21,3 +24,6 @@ func NewRepositoryNotFoundError(virtualStorage string, relativePath string) erro func (err RepositoryNotFoundError) Error() string { return fmt.Sprintf("repository %q/%q not found", err.virtualStorage, err.relativePath) } + +// ErrRepositoryAlreadyExists is returned when attempting to create a repository that already exists. +var ErrRepositoryAlreadyExists = errors.New("repository already exists") diff --git a/internal/praefect/datastore/repository_store.go b/internal/praefect/datastore/repository_store.go index 1819521947..03121cb858 100644 --- a/internal/praefect/datastore/repository_store.go +++ b/internal/praefect/datastore/repository_store.go @@ -127,6 +127,9 @@ type RepositoryStore interface { // record of the invalid repository. If the storage was the only storage with the repository, the repository's // record on the virtual storage is also deleted. DeleteInvalidRepository(ctx context.Context, virtualStorage, relativePath, storage string) error + // ReserveRepositoryID reserves an ID for a repository that is about to be created and returns it. If a repository already + // exists with the given virtual storage and relative path combination, an error is returned. + ReserveRepositoryID(ctx context.Context, virtualStorage, relativePath string) (int64, error) } // PostgresRepositoryStore is a Postgres implementation of RepositoryStore. @@ -691,3 +694,25 @@ ORDER BY relative_path, "primary" return repos, rows.Err() } + +// ReserveRepositoryID reserves an ID for a repository that is about to be created and returns it. If a repository already +// exists with the given virtual storage and relative path combination, an error is returned. +func (rs *PostgresRepositoryStore) ReserveRepositoryID(ctx context.Context, virtualStorage, relativePath string) (int64, error) { + var id int64 + if err := rs.db.QueryRowContext(ctx, ` +SELECT nextval('repositories_repository_id_seq') +WHERE NOT EXISTS ( + SELECT FROM repositories + WHERE virtual_storage = $1 + AND relative_path = $2 +) + `, virtualStorage, relativePath).Scan(&id); err != nil { + if errors.Is(err, sql.ErrNoRows) { + return 0, commonerr.ErrRepositoryAlreadyExists + } + + return 0, fmt.Errorf("scan: %w", err) + } + + return id, nil +} diff --git a/internal/praefect/datastore/repository_store_mock.go b/internal/praefect/datastore/repository_store_mock.go index 82edb845e7..8fdc383feb 100644 --- a/internal/praefect/datastore/repository_store_mock.go +++ b/internal/praefect/datastore/repository_store_mock.go @@ -18,6 +18,7 @@ type MockRepositoryStore struct { GetPartiallyAvailableRepositoriesFunc func(ctx context.Context, virtualStorage string) ([]PartiallyAvailableRepository, error) DeleteInvalidRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error RepositoryExistsFunc func(ctx context.Context, virtualStorage, relativePath string) (bool, error) + ReserveRepositoryIDFunc func(ctx context.Context, virtualStorage, relativePath string) (int64, error) } func (m MockRepositoryStore) GetGeneration(ctx context.Context, virtualStorage, relativePath, storage string) (int, error) { @@ -128,3 +129,12 @@ func (m MockRepositoryStore) RepositoryExists(ctx context.Context, virtualStorag return m.RepositoryExistsFunc(ctx, virtualStorage, relativePath) } + +// ReserveRepositoryID returns the result of ReserveRepositoryIDFunc or 0 if it is unset. +func (m MockRepositoryStore) ReserveRepositoryID(ctx context.Context, virtualStorage, relativePath string) (int64, error) { + if m.ReserveRepositoryIDFunc == nil { + return 0, nil + } + + return m.ReserveRepositoryIDFunc(ctx, virtualStorage, relativePath) +} diff --git a/internal/praefect/datastore/repository_store_test.go b/internal/praefect/datastore/repository_store_test.go index fda77a73f2..181705bedf 100644 --- a/internal/praefect/datastore/repository_store_test.go +++ b/internal/praefect/datastore/repository_store_test.go @@ -1020,6 +1020,28 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { require.NoError(t, err) require.False(t, exists) }) + + t.Run("ReserveRepositoryID", func(t *testing.T) { + rs, _ := newStore(t, nil) + + id, err := rs.ReserveRepositoryID(ctx, vs, repo) + require.NoError(t, err) + require.Equal(t, int64(1), id) + + id, err = rs.ReserveRepositoryID(ctx, vs, repo) + require.NoError(t, err) + require.Equal(t, int64(2), id) + + require.NoError(t, rs.CreateRepository(ctx, vs, repo, stor, nil, nil, false, false)) + + id, err = rs.ReserveRepositoryID(ctx, vs, repo) + require.Equal(t, commonerr.ErrRepositoryAlreadyExists, err) + require.Equal(t, int64(0), id) + + id, err = rs.ReserveRepositoryID(ctx, vs, repo+"-2") + require.NoError(t, err) + require.Equal(t, int64(4), id) + }) } func TestPostgresRepositoryStore_GetPartiallyAvailableRepositories(t *testing.T) { -- GitLab From 8710a86448097c24eda824c1040f9d6d22d024df Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Thu, 12 Aug 2021 12:34:13 +0200 Subject: [PATCH 03/14] Add method for getting a repository's ID Praefect receives requests from the client to repositories identified by a `(virtual_storage, relative_path)` tuple. Praefect needs to lookup the correct repository ID for this tuple at the time of the request. Doing the lookup once for each request and operating on the repository id afterwards guarantees that any in-flight jobs and requests will always target the same repository. If Praefect depended on the tuple, concurrent modification could cause the updates to target wrong repositories in the database, for example if a repository is renamed while a request is in flight and generation increment is attempted with the old name after the request. This is not really a problem in practice though but is fixed by this nonetheless. In practice, Praefect needs the GetRepositoryID to propagate the repository's ID in any replication jobs a mutator creates. This ensures any new replicas created from the replication jobs get linked to the correct repository. --- .../praefect/datastore/repository_store.go | 23 +++++++++++++++++++ .../datastore/repository_store_mock.go | 10 ++++++++ .../datastore/repository_store_test.go | 14 +++++++++++ 3 files changed, 47 insertions(+) diff --git a/internal/praefect/datastore/repository_store.go b/internal/praefect/datastore/repository_store.go index 03121cb858..bc588b7cac 100644 --- a/internal/praefect/datastore/repository_store.go +++ b/internal/praefect/datastore/repository_store.go @@ -130,6 +130,9 @@ type RepositoryStore interface { // ReserveRepositoryID reserves an ID for a repository that is about to be created and returns it. If a repository already // exists with the given virtual storage and relative path combination, an error is returned. ReserveRepositoryID(ctx context.Context, virtualStorage, relativePath string) (int64, error) + // GetRepositoryID gets the ID of the repository identified via the given virtual storage and relative path. Returns a + // RepositoryNotFoundError if the repository doesn't exist. + GetRepositoryID(ctx context.Context, virtualStorage, relativePath string) (int64, error) } // PostgresRepositoryStore is a Postgres implementation of RepositoryStore. @@ -716,3 +719,23 @@ WHERE NOT EXISTS ( return id, nil } + +// GetRepositoryID gets the ID of the repository identified via the given virtual storage and relative path. Returns a +// RepositoryNotFoundError if the repository doesn't exist. +func (rs *PostgresRepositoryStore) GetRepositoryID(ctx context.Context, virtualStorage, relativePath string) (int64, error) { + var id int64 + if err := rs.db.QueryRowContext(ctx, ` +SELECT repository_id +FROM repositories +WHERE virtual_storage = $1 +AND relative_path = $2 + `, virtualStorage, relativePath).Scan(&id); err != nil { + if errors.Is(err, sql.ErrNoRows) { + return 0, commonerr.NewRepositoryNotFoundError(virtualStorage, relativePath) + } + + return 0, fmt.Errorf("scan: %w", err) + } + + return id, nil +} diff --git a/internal/praefect/datastore/repository_store_mock.go b/internal/praefect/datastore/repository_store_mock.go index 8fdc383feb..ed4b3c62ab 100644 --- a/internal/praefect/datastore/repository_store_mock.go +++ b/internal/praefect/datastore/repository_store_mock.go @@ -19,6 +19,7 @@ type MockRepositoryStore struct { DeleteInvalidRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error RepositoryExistsFunc func(ctx context.Context, virtualStorage, relativePath string) (bool, error) ReserveRepositoryIDFunc func(ctx context.Context, virtualStorage, relativePath string) (int64, error) + GetRepositoryIDFunc func(ctx context.Context, virtualStorage, relativePath string) (int64, error) } func (m MockRepositoryStore) GetGeneration(ctx context.Context, virtualStorage, relativePath, storage string) (int, error) { @@ -138,3 +139,12 @@ func (m MockRepositoryStore) ReserveRepositoryID(ctx context.Context, virtualSto return m.ReserveRepositoryIDFunc(ctx, virtualStorage, relativePath) } + +// GetRepositoryID returns the result of GetRepositoryIDFunc or 0 if it is unset. +func (m MockRepositoryStore) GetRepositoryID(ctx context.Context, virtualStorage, relativePath string) (int64, error) { + if m.GetRepositoryIDFunc == nil { + return 0, nil + } + + return m.GetRepositoryIDFunc(ctx, virtualStorage, relativePath) +} diff --git a/internal/praefect/datastore/repository_store_test.go b/internal/praefect/datastore/repository_store_test.go index 181705bedf..8e7eb0208a 100644 --- a/internal/praefect/datastore/repository_store_test.go +++ b/internal/praefect/datastore/repository_store_test.go @@ -1042,6 +1042,20 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { require.NoError(t, err) require.Equal(t, int64(4), id) }) + + t.Run("GetRepositoryID", func(t *testing.T) { + rs, _ := newStore(t, nil) + + id, err := rs.GetRepositoryID(ctx, vs, repo) + require.Equal(t, commonerr.NewRepositoryNotFoundError(vs, repo), err) + require.Equal(t, int64(0), id) + + require.NoError(t, rs.CreateRepository(ctx, vs, repo, stor, nil, nil, false, false)) + + id, err = rs.GetRepositoryID(ctx, vs, repo) + require.Nil(t, err) + require.Equal(t, int64(1), id) + }) } func TestPostgresRepositoryStore_GetPartiallyAvailableRepositories(t *testing.T) { -- GitLab From b790880102f46230eec34519b8abbfd13cdaaab5 Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Tue, 27 Jul 2021 13:47:09 +0200 Subject: [PATCH 04/14] Pass relative path to CreateRepository CreateRepository will need to know the relative path of the repository the client is trying to create in order to check whether a repository with it already exists or not. This commit pipes the relative path to CreateRepository to prepare for follow-up changes even though the relative path is not yet used. --- internal/praefect/coordinator.go | 2 +- internal/praefect/router.go | 2 +- internal/praefect/router_node_manager.go | 2 +- internal/praefect/router_per_repository.go | 2 +- internal/praefect/router_per_repository_test.go | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index c77b48eccb..b4c007e3ab 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -456,7 +456,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall var route RepositoryMutatorRoute switch change { case datastore.CreateRepo: - route, err = c.router.RouteRepositoryCreation(ctx, virtualStorage) + route, err = c.router.RouteRepositoryCreation(ctx, virtualStorage, targetRepo.RelativePath) if err != nil { return nil, fmt.Errorf("route repository creation: %w", err) } diff --git a/internal/praefect/router.go b/internal/praefect/router.go index a2147a27ee..bc708ffd9b 100644 --- a/internal/praefect/router.go +++ b/internal/praefect/router.go @@ -50,5 +50,5 @@ type Router interface { RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath string) (RepositoryMutatorRoute, error) // RouteRepositoryCreation decides returns the primary and secondaries that should handle the repository creation // request. It is up to the caller to store the assignments and primary information after finishing the RPC. - RouteRepositoryCreation(ctx context.Context, virtualStorage string) (RepositoryMutatorRoute, error) + RouteRepositoryCreation(ctx context.Context, virtualStorage, relativePath string) (RepositoryMutatorRoute, error) } diff --git a/internal/praefect/router_node_manager.go b/internal/praefect/router_node_manager.go index 015a8985a4..4395d4dfc4 100644 --- a/internal/praefect/router_node_manager.go +++ b/internal/praefect/router_node_manager.go @@ -119,7 +119,7 @@ func (r *nodeManagerRouter) RouteRepositoryMutator(ctx context.Context, virtualS // RouteRepositoryCreation includes healthy secondaries in the transaction and sets the unhealthy secondaries as // replication targets. The virtual storage's primary acts as the primary for every repository. -func (r *nodeManagerRouter) RouteRepositoryCreation(ctx context.Context, virtualStorage string) (RepositoryMutatorRoute, error) { +func (r *nodeManagerRouter) RouteRepositoryCreation(ctx context.Context, virtualStorage, relativePath string) (RepositoryMutatorRoute, error) { shard, err := r.mgr.GetShard(ctx, virtualStorage) if err != nil { return RepositoryMutatorRoute{}, fmt.Errorf("get shard: %w", err) diff --git a/internal/praefect/router_per_repository.go b/internal/praefect/router_per_repository.go index fc022cddb7..3a1a98f4e5 100644 --- a/internal/praefect/router_per_repository.go +++ b/internal/praefect/router_per_repository.go @@ -237,7 +237,7 @@ func (r *PerRepositoryRouter) RouteRepositoryMutator(ctx context.Context, virtua // RouteRepositoryCreation picks a random healthy node to act as the primary node and selects the secondary nodes // if assignments are enabled. Healthy secondaries take part in the transaction, unhealthy secondaries are set as // replication targets. -func (r *PerRepositoryRouter) RouteRepositoryCreation(ctx context.Context, virtualStorage string) (RepositoryMutatorRoute, error) { +func (r *PerRepositoryRouter) RouteRepositoryCreation(ctx context.Context, virtualStorage, relativePath string) (RepositoryMutatorRoute, error) { healthyNodes, err := r.healthyNodes(virtualStorage) if err != nil { return RepositoryMutatorRoute{}, err diff --git a/internal/praefect/router_per_repository_test.go b/internal/praefect/router_per_repository_test.go index 5a0a1e8646..4bb27d0bf2 100644 --- a/internal/praefect/router_per_repository_test.go +++ b/internal/praefect/router_per_repository_test.go @@ -557,7 +557,7 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { nil, nil, map[string]int{"virtual-storage-1": tc.replicationFactor}, - ).RouteRepositoryCreation(ctx, tc.virtualStorage) + ).RouteRepositoryCreation(ctx, tc.virtualStorage, "relative-path") if tc.error != nil { require.Equal(t, tc.error, err) return -- GitLab From ec619962966cc59d889c7792b1613b3ac4c0968f Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Tue, 27 Jul 2021 15:00:35 +0200 Subject: [PATCH 05/14] helper: add already exists error decorator This commit adds a helper method for wrapping an error with AlreadyExists status code if the error isn't already a status. --- internal/helper/error.go | 3 +++ internal/helper/error_test.go | 5 +++++ 2 files changed, 8 insertions(+) diff --git a/internal/helper/error.go b/internal/helper/error.go index 10551c0139..6075866aed 100644 --- a/internal/helper/error.go +++ b/internal/helper/error.go @@ -45,6 +45,9 @@ func ErrUnavailable(err error) error { return wrapError(codes.Unavailable, err) // ErrPermissionDenied wraps err with codes.PermissionDenied, unless err is already a gRPC error. func ErrPermissionDenied(err error) error { return wrapError(codes.PermissionDenied, err) } +// ErrAlreadyExists wraps err with codes.AlreadyExists, unless err is already a gRPC error. +func ErrAlreadyExists(err error) error { return wrapError(codes.AlreadyExists, err) } + // wrapError wraps the given error with the error code unless it's already a gRPC error. If given // nil it will return nil. func wrapError(code codes.Code, err error) error { diff --git a/internal/helper/error_test.go b/internal/helper/error_test.go index 65f7bab913..ad07e90d2c 100644 --- a/internal/helper/error_test.go +++ b/internal/helper/error_test.go @@ -55,6 +55,11 @@ func TestError(t *testing.T) { errorf: ErrUnavailable, code: codes.Unavailable, }, + { + desc: "AlreadyExists", + errorf: ErrAlreadyExists, + code: codes.AlreadyExists, + }, } { t.Run(tc.desc, func(t *testing.T) { // tc.code and our canary test code must not -- GitLab From 1836583ff226a634932a9e2f424acc3e8882d9d9 Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Tue, 27 Jul 2021 16:25:10 +0200 Subject: [PATCH 06/14] Reserve repository ids when routing repository creations When Praefect is routing a repository creation, it needs to reserve a repository ID in order to generate later a relative path for the repository. This commit changes the PerRepositoryRouter to reserve a repository ID and return in the route. The ID reservation also acts as a fail fast check to see whether there already exists a repository with the given virtual storage and relative path. --- cmd/praefect/main.go | 1 + internal/praefect/coordinator.go | 5 +++ internal/praefect/coordinator_test.go | 1 + internal/praefect/router.go | 2 + internal/praefect/router_per_repository.go | 23 +++++++++- .../praefect/router_per_repository_test.go | 45 ++++++++++++++++--- 6 files changed, 68 insertions(+), 9 deletions(-) diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index e67c17d4c4..ed908551c9 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -312,6 +312,7 @@ func run(cfgs []starter.Config, conf config.Config) error { praefect.NewLockedRandom(rand.New(rand.NewSource(time.Now().UnixNano()))), csg, assignmentStore, + rs, conf.DefaultReplicationFactors(), ) } else { diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index b4c007e3ab..45346e488c 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -627,6 +627,11 @@ func (c *Coordinator) StreamDirector(ctx context.Context, fullMethodName string, if errors.Is(err, nodes.ErrVirtualStorageNotExist) { return nil, helper.ErrInvalidArgument(err) } + + if errors.Is(err, commonerr.ErrRepositoryAlreadyExists) { + return nil, helper.ErrAlreadyExists(err) + } + return nil, err } return sp, nil diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 0c24863b28..d2225f1a39 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -873,6 +873,7 @@ func TestStreamDirector_repo_creation(t *testing.T) { }, nil, nil, + rs, conf.DefaultReplicationFactors(), ) default: diff --git a/internal/praefect/router.go b/internal/praefect/router.go index bc708ffd9b..fc99b4769a 100644 --- a/internal/praefect/router.go +++ b/internal/praefect/router.go @@ -25,6 +25,8 @@ type StorageMutatorRoute struct { // RepositoryMutatorRoute describes how to route a repository scoped mutator call. type RepositoryMutatorRoute struct { + // RepositoryID is the repository's ID as Praefect identifies it. + RepositoryID int64 // Primary is the primary node of the transaction. Primary RouterNode // Secondaries are the secondary participating in a transaction. diff --git a/internal/praefect/router_per_repository.go b/internal/praefect/router_per_repository.go index 3a1a98f4e5..326d375cb6 100644 --- a/internal/praefect/router_per_repository.go +++ b/internal/praefect/router_per_repository.go @@ -63,11 +63,20 @@ type PerRepositoryRouter struct { rand Random hc HealthChecker csg datastore.ConsistentStoragesGetter + rs datastore.RepositoryStore defaultReplicationFactors map[string]int } // NewPerRepositoryRouter returns a new PerRepositoryRouter using the passed configuration. -func NewPerRepositoryRouter(conns Connections, pg PrimaryGetter, hc HealthChecker, rand Random, csg datastore.ConsistentStoragesGetter, ag AssignmentGetter, defaultReplicationFactors map[string]int) *PerRepositoryRouter { +func NewPerRepositoryRouter( + conns Connections, + pg PrimaryGetter, + hc HealthChecker, + rand Random, + csg datastore.ConsistentStoragesGetter, + ag AssignmentGetter, + rs datastore.RepositoryStore, + defaultReplicationFactors map[string]int) *PerRepositoryRouter { return &PerRepositoryRouter{ conns: conns, pg: pg, @@ -75,6 +84,7 @@ func NewPerRepositoryRouter(conns Connections, pg PrimaryGetter, hc HealthChecke hc: hc, csg: csg, ag: ag, + rs: rs, defaultReplicationFactors: defaultReplicationFactors, } } @@ -248,9 +258,17 @@ func (r *PerRepositoryRouter) RouteRepositoryCreation(ctx context.Context, virtu return RepositoryMutatorRoute{}, err } + id, err := r.rs.ReserveRepositoryID(ctx, virtualStorage, relativePath) + if err != nil { + return RepositoryMutatorRoute{}, fmt.Errorf("reserve repository id: %w", err) + } + replicationFactor := r.defaultReplicationFactors[virtualStorage] if replicationFactor == 1 { - return RepositoryMutatorRoute{Primary: primary}, nil + return RepositoryMutatorRoute{ + RepositoryID: id, + Primary: primary, + }, nil } var secondaryNodes []RouterNode @@ -296,6 +314,7 @@ func (r *PerRepositoryRouter) RouteRepositoryCreation(ctx context.Context, virtu } return RepositoryMutatorRoute{ + RepositoryID: id, Primary: primary, Secondaries: secondaries, ReplicationTargets: replicationTargets, diff --git a/internal/praefect/router_per_repository_test.go b/internal/praefect/router_per_repository_test.go index 4bb27d0bf2..a964b41b1d 100644 --- a/internal/praefect/router_per_repository_test.go +++ b/internal/praefect/router_per_repository_test.go @@ -2,10 +2,12 @@ package praefect import ( "context" + "fmt" "sort" "testing" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" @@ -95,6 +97,7 @@ func TestPerRepositoryRouter_RouteStorageAccessor(t *testing.T) { }, nil, nil, + datastore.MockRepositoryStore{}, nil, ) @@ -226,6 +229,7 @@ func TestPerRepositoryRouter_RouteRepositoryAccessor(t *testing.T) { }, }, nil, + datastore.MockRepositoryStore{}, nil, ) @@ -367,6 +371,7 @@ func TestPerRepositoryRouter_RouteRepositoryMutator(t *testing.T) { }, }, tc.assignedNodes, + datastore.MockRepositoryStore{}, nil, ) @@ -423,6 +428,7 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { primaryCandidates int primaryPick int secondaryCandidates int + repositoryExists bool matchRoute matcher error error }{ @@ -445,6 +451,7 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { primaryPick: 0, matchRoute: requireOneOf( RepositoryMutatorRoute{ + RepositoryID: 1, Primary: RouterNode{Storage: "primary", Connection: primaryConn}, ReplicationTargets: []string{"secondary-1", "secondary-2"}, }, @@ -458,7 +465,8 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { primaryPick: 0, matchRoute: requireOneOf( RepositoryMutatorRoute{ - Primary: RouterNode{Storage: "primary", Connection: primaryConn}, + RepositoryID: 1, + Primary: RouterNode{Storage: "primary", Connection: primaryConn}, Secondaries: []RouterNode{ {Storage: "secondary-1", Connection: secondary1Conn}, {Storage: "secondary-2", Connection: secondary2Conn}, @@ -474,7 +482,8 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { primaryPick: 0, matchRoute: requireOneOf( RepositoryMutatorRoute{ - Primary: RouterNode{Storage: "primary", Connection: primaryConn}, + RepositoryID: 1, + Primary: RouterNode{Storage: "primary", Connection: primaryConn}, Secondaries: []RouterNode{ {Storage: "secondary-1", Connection: secondary1Conn}, }, @@ -491,7 +500,8 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { primaryPick: 0, matchRoute: requireOneOf( RepositoryMutatorRoute{ - Primary: RouterNode{Storage: "primary", Connection: primaryConn}, + RepositoryID: 1, + Primary: RouterNode{Storage: "primary", Connection: primaryConn}, }, ), }, @@ -505,12 +515,14 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { secondaryCandidates: 2, matchRoute: requireOneOf( RepositoryMutatorRoute{ - Primary: RouterNode{Storage: "primary", Connection: primaryConn}, - Secondaries: []RouterNode{{Storage: "secondary-1", Connection: secondary1Conn}}, + RepositoryID: 1, + Primary: RouterNode{Storage: "primary", Connection: primaryConn}, + Secondaries: []RouterNode{{Storage: "secondary-1", Connection: secondary1Conn}}, }, RepositoryMutatorRoute{ - Primary: RouterNode{Storage: "primary", Connection: primaryConn}, - Secondaries: []RouterNode{{Storage: "secondary-2", Connection: secondary1Conn}}, + RepositoryID: 1, + Primary: RouterNode{Storage: "primary", Connection: primaryConn}, + Secondaries: []RouterNode{{Storage: "secondary-2", Connection: secondary1Conn}}, }, ), }, @@ -524,12 +536,22 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { secondaryCandidates: 2, matchRoute: requireOneOf( RepositoryMutatorRoute{ + RepositoryID: 1, Primary: RouterNode{Storage: "primary", Connection: primaryConn}, Secondaries: []RouterNode{{Storage: "secondary-1", Connection: secondary1Conn}}, ReplicationTargets: []string{"secondary-2"}, }, ), }, + { + desc: "repository already exists", + virtualStorage: "virtual-storage-1", + healthyNodes: StaticHealthChecker(configuredNodes), + primaryCandidates: 3, + primaryPick: 0, + repositoryExists: true, + error: fmt.Errorf("reserve repository id: %w", commonerr.ErrRepositoryAlreadyExists), + }, } { t.Run(tc.desc, func(t *testing.T) { ctx, cancel := testhelper.Context() @@ -556,6 +578,15 @@ func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { }, nil, nil, + datastore.MockRepositoryStore{ + ReserveRepositoryIDFunc: func(ctx context.Context, virtualStorage, relativePath string) (int64, error) { + if tc.repositoryExists { + return 0, commonerr.ErrRepositoryAlreadyExists + } + + return 1, nil + }, + }, map[string]int{"virtual-storage-1": tc.replicationFactor}, ).RouteRepositoryCreation(ctx, tc.virtualStorage, "relative-path") if tc.error != nil { -- GitLab From 3ba2a6b75add398b278b5ac571350aca73c419bc Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Thu, 12 Aug 2021 12:47:20 +0200 Subject: [PATCH 07/14] Return repository id from the Router's RouteRepositoryMutator This commit returns the repository's ID from RouteRepositoryMutator. The repository ID is needed in the request finalizers later to propagate the repository id into the replication jobs. --- internal/praefect/router_per_repository.go | 7 ++++++- internal/praefect/router_per_repository_test.go | 7 ++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/internal/praefect/router_per_repository.go b/internal/praefect/router_per_repository.go index 326d375cb6..4a70bb9125 100644 --- a/internal/praefect/router_per_repository.go +++ b/internal/praefect/router_per_repository.go @@ -214,7 +214,12 @@ func (r *PerRepositoryRouter) RouteRepositoryMutator(ctx context.Context, virtua return RepositoryMutatorRoute{}, fmt.Errorf("get host assignments: %w", err) } - var route RepositoryMutatorRoute + repositoryID, err := r.rs.GetRepositoryID(ctx, virtualStorage, relativePath) + if err != nil { + return RepositoryMutatorRoute{}, fmt.Errorf("get repository id: %w", err) + } + + route := RepositoryMutatorRoute{RepositoryID: repositoryID} for _, assigned := range assignedStorages { node, healthy := healthySet[assigned] if assigned == primary { diff --git a/internal/praefect/router_per_repository_test.go b/internal/praefect/router_per_repository_test.go index a964b41b1d..332bf30b8c 100644 --- a/internal/praefect/router_per_repository_test.go +++ b/internal/praefect/router_per_repository_test.go @@ -371,7 +371,11 @@ func TestPerRepositoryRouter_RouteRepositoryMutator(t *testing.T) { }, }, tc.assignedNodes, - datastore.MockRepositoryStore{}, + datastore.MockRepositoryStore{ + GetRepositoryIDFunc: func(ctx context.Context, virtualStorage, relativePath string) (int64, error) { + return 1, nil + }, + }, nil, ) @@ -387,6 +391,7 @@ func TestPerRepositoryRouter_RouteRepositoryMutator(t *testing.T) { } require.Equal(t, RepositoryMutatorRoute{ + RepositoryID: 1, Primary: RouterNode{ Storage: "primary", Connection: conns[tc.virtualStorage]["primary"], -- GitLab From 6556805a5fb74f6410c912a947ba0a76ad7da12b Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Tue, 27 Jul 2021 17:19:51 +0200 Subject: [PATCH 08/14] Assert repository id in repository store tests This commit changes the repository store test's to support asserting the repository ids on the records. Since there's no code hooking up the repository IDs yet, the only records in the test that get the repository id currently are the `repositories` table's records due to autogeneration. Later commits will introduce the code to link the other records. --- .../datastore/repository_store_test.go | 240 +++++++++--------- 1 file changed, 125 insertions(+), 115 deletions(-) diff --git a/internal/praefect/datastore/repository_store_test.go b/internal/praefect/datastore/repository_store_test.go index 8e7eb0208a..76b121e2fc 100644 --- a/internal/praefect/datastore/repository_store_test.go +++ b/internal/praefect/datastore/repository_store_test.go @@ -15,17 +15,23 @@ import ( // repositoryRecord represents Praefect's records related to a repository. type repositoryRecord struct { - primary string - assignments []string + repositoryID int64 + primary string + assignments []string } // virtualStorageStates represents the virtual storage's view of which repositories should exist. // It's structured as virtual-storage->relative_path. type virtualStorageState map[string]map[string]repositoryRecord +type replicaRecord struct { + repositoryID int64 + generation int +} + // storageState contains individual storage's repository states. -// It structured as virtual-storage->relative_path->storage->generation. -type storageState map[string]map[string]map[string]int +// It structured as virtual-storage->relative_path->storage->replicaRecord +type storageState map[string]map[string]map[string]replicaRecord type ( requireStateFunc func(t *testing.T, ctx context.Context, vss virtualStorageState, ss storageState) @@ -37,7 +43,7 @@ func requireState(t testing.TB, ctx context.Context, db glsql.Querier, vss virtu requireVirtualStorageState := func(t testing.TB, ctx context.Context, exp virtualStorageState) { rows, err := db.QueryContext(ctx, ` -SELECT virtual_storage, relative_path, "primary", assigned_storages +SELECT repository_id, virtual_storage, relative_path, "primary", assigned_storages FROM repositories LEFT JOIN ( SELECT virtual_storage, relative_path, array_agg(storage ORDER BY storage) AS assigned_storages @@ -52,18 +58,20 @@ LEFT JOIN ( act := make(virtualStorageState) for rows.Next() { var ( + repositoryID sql.NullInt64 virtualStorage, relativePath string primary sql.NullString assignments pq.StringArray ) - require.NoError(t, rows.Scan(&virtualStorage, &relativePath, &primary, &assignments)) + require.NoError(t, rows.Scan(&repositoryID, &virtualStorage, &relativePath, &primary, &assignments)) if act[virtualStorage] == nil { act[virtualStorage] = make(map[string]repositoryRecord) } act[virtualStorage][relativePath] = repositoryRecord{ - primary: primary.String, - assignments: assignments, + repositoryID: repositoryID.Int64, + primary: primary.String, + assignments: assignments, } } @@ -73,7 +81,7 @@ LEFT JOIN ( requireStorageState := func(t testing.TB, ctx context.Context, exp storageState) { rows, err := db.QueryContext(ctx, ` -SELECT virtual_storage, relative_path, storage, generation +SELECT repository_id, virtual_storage, relative_path, storage, generation FROM storage_repositories `) require.NoError(t, err) @@ -81,18 +89,19 @@ FROM storage_repositories act := make(storageState) for rows.Next() { + var repositoryID sql.NullInt64 var vs, rel, storage string var gen int - require.NoError(t, rows.Scan(&vs, &rel, &storage, &gen)) + require.NoError(t, rows.Scan(&repositoryID, &vs, &rel, &storage, &gen)) if act[vs] == nil { - act[vs] = make(map[string]map[string]int) + act[vs] = make(map[string]map[string]replicaRecord) } if act[vs][rel] == nil { - act[vs][rel] = make(map[string]int) + act[vs][rel] = make(map[string]replicaRecord) } - act[vs][rel][storage] = gen + act[vs][rel][storage] = replicaRecord{repositoryID: repositoryID.Int64, generation: gen} } require.NoError(t, rows.Err()) @@ -144,8 +153,8 @@ func TestRepositoryStore_incrementGenerationConcurrently(t *testing.T) { state: storageState{ "virtual-storage": { "relative-path": { - "primary": 2, - "secondary": 2, + "primary": {generation: 2}, + "secondary": {generation: 2}, }, }, }, @@ -162,8 +171,8 @@ func TestRepositoryStore_incrementGenerationConcurrently(t *testing.T) { state: storageState{ "virtual-storage": { "relative-path": { - "primary": 2, - "secondary": 0, + "primary": {generation: 2}, + "secondary": {generation: 0}, }, }, }, @@ -180,8 +189,8 @@ func TestRepositoryStore_incrementGenerationConcurrently(t *testing.T) { state: storageState{ "virtual-storage": { "relative-path": { - "primary": 1, - "secondary": 0, + "primary": {generation: 1}, + "secondary": {generation: 0}, }, }, }, @@ -211,7 +220,7 @@ func TestRepositoryStore_incrementGenerationConcurrently(t *testing.T) { secondTx.Commit(t) requireState(t, ctx, db, - virtualStorageState{"virtual-storage": {"relative-path": {}}}, + virtualStorageState{"virtual-storage": {"relative-path": {repositoryID: 1}}}, tc.state, ) }) @@ -283,15 +292,15 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": repositoryRecord{}, + "repository-1": repositoryRecord{repositoryID: 1}, }, }, storageState{ "virtual-storage-1": { "repository-1": { - "latest-node": 1, - "outdated-primary": 0, - "outdated-secondary": 0, + "latest-node": {generation: 1}, + "outdated-primary": {generation: 0}, + "outdated-secondary": {generation: 0}, }, }, }, @@ -316,31 +325,31 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": repositoryRecord{}, - "other-relative-path": {}, + "repository-1": {repositoryID: 1}, + "other-relative-path": {repositoryID: 2}, }, "other-virtual-storage": { - "repository-1": {}, + "repository-1": {repositoryID: 3}, }, }, storageState{ "virtual-storage-1": { "repository-1": { - "primary": 1, - "up-to-date-secondary": 1, - "outdated-secondary": 0, + "primary": {generation: 1}, + "up-to-date-secondary": {generation: 1}, + "outdated-secondary": {generation: 0}, }, "other-relative-path": { - "primary": 0, - "up-to-date-secondary": 0, - "outdated-secondary": 0, + "primary": {}, + "up-to-date-secondary": {}, + "outdated-secondary": {}, }, }, "other-virtual-storage": { "repository-1": { - "primary": 0, - "up-to-date-secondary": 0, - "outdated-secondary": 0, + "primary": {}, + "up-to-date-secondary": {}, + "outdated-secondary": {}, }, }, }, @@ -352,31 +361,31 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": repositoryRecord{}, - "other-relative-path": {}, + "repository-1": {repositoryID: 1}, + "other-relative-path": {repositoryID: 2}, }, "other-virtual-storage": { - "repository-1": {}, + "repository-1": {repositoryID: 3}, }, }, storageState{ "virtual-storage-1": { "repository-1": { - "primary": 2, - "up-to-date-secondary": 2, - "outdated-secondary": 0, + "primary": {generation: 2}, + "up-to-date-secondary": {generation: 2}, + "outdated-secondary": {generation: 0}, }, "other-relative-path": { - "primary": 0, - "up-to-date-secondary": 0, - "outdated-secondary": 0, + "primary": {}, + "up-to-date-secondary": {}, + "outdated-secondary": {}, }, }, "other-virtual-storage": { "repository-1": { - "primary": 0, - "up-to-date-secondary": 0, - "outdated-secondary": 0, + "primary": {}, + "up-to-date-secondary": {}, + "outdated-secondary": {}, }, }, }, @@ -395,7 +404,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { storageState{ "virtual-storage-1": { "repository-1": { - "storage-1": 1, + "storage-1": {generation: 1}, }, }, }, @@ -411,13 +420,13 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": repositoryRecord{}, + "repository-1": repositoryRecord{repositoryID: 1}, }, }, storageState{ "virtual-storage-1": { "repository-1": { - "storage-1": 0, + "storage-1": {generation: 0}, }, }, }, @@ -440,14 +449,14 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": repositoryRecord{}, + "repository-1": repositoryRecord{repositoryID: 1}, }, }, storageState{ "virtual-storage-1": { "repository-1": { - "storage-1": 0, - "storage-2": 0, + "storage-1": {generation: 0}, + "storage-2": {generation: 0}, }, }, }, @@ -457,14 +466,14 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": repositoryRecord{}, + "repository-1": repositoryRecord{repositoryID: 1}, }, }, storageState{ "virtual-storage-1": { "repository-1": { - "storage-1": 1, - "storage-2": 0, + "storage-1": {generation: 1}, + "storage-2": {generation: 0}, }, }, }, @@ -602,21 +611,22 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { expectedStorageState := storageState{ vs: { repo: { - "primary": 0, + "primary": {generation: 0}, }, }, } for _, updatedSecondary := range tc.updatedSecondaries { - expectedStorageState[vs][repo][updatedSecondary] = 0 + expectedStorageState[vs][repo][updatedSecondary] = replicaRecord{generation: 0} } requireState(t, ctx, virtualStorageState{ vs: { repo: repositoryRecord{ - primary: tc.expectedPrimary, - assignments: tc.expectedAssignments, + repositoryID: 1, + primary: tc.expectedPrimary, + assignments: tc.expectedAssignments, }, }, }, @@ -655,34 +665,34 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { requireState(t, ctx, virtualStorageState{ "deleted": { - "deleted": repositoryRecord{}, + "deleted": repositoryRecord{repositoryID: 1}, }, "virtual-storage-1": { - "other-storages-remain": repositoryRecord{}, + "other-storages-remain": repositoryRecord{repositoryID: 2}, }, "virtual-storage-2": { - "deleted-repo": repositoryRecord{}, - "other-repo-remains": repositoryRecord{}, + "deleted-repo": repositoryRecord{repositoryID: 3}, + "other-repo-remains": repositoryRecord{repositoryID: 4}, }, }, storageState{ "deleted": { "deleted": { - "deleted": 0, + "deleted": {generation: 0}, }, }, "virtual-storage-1": { "other-storages-remain": { - "deleted-storage": 0, - "remaining-storage": 0, + "deleted-storage": {generation: 0}, + "remaining-storage": {generation: 0}, }, }, "virtual-storage-2": { "deleted-repo": { - "deleted-storage": 0, + "deleted-storage": {generation: 0}, }, "other-repo-remains": { - "remaining-storage": 0, + "remaining-storage": {generation: 0}, }, }, }, @@ -695,18 +705,18 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { requireState(t, ctx, virtualStorageState{ "virtual-storage-2": { - "other-repo-remains": repositoryRecord{}, + "other-repo-remains": repositoryRecord{repositoryID: 4}, }, }, storageState{ "virtual-storage-1": { "other-storages-remain": { - "remaining-storage": 0, + "remaining-storage": {generation: 0}, }, }, "virtual-storage-2": { "other-repo-remains": { - "remaining-storage": 0, + "remaining-storage": {generation: 0}, }, }, }, @@ -720,15 +730,15 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": repositoryRecord{}, + "repository-1": repositoryRecord{repositoryID: 1}, }, }, storageState{ "virtual-storage-1": { "repository-1": { - "replica-1": 0, - "replica-2": 0, - "replica-3": 0, + "replica-1": {generation: 0}, + "replica-2": {generation: 0}, + "replica-3": {generation: 0}, }, }, }, @@ -740,7 +750,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { storageState{ "virtual-storage-1": { "repository-1": { - "replica-3": 0, + "replica-3": {generation: 0}, }, }, }, @@ -763,26 +773,26 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "relative-path-1": repositoryRecord{}, - "relative-path-2": repositoryRecord{}, + "relative-path-1": repositoryRecord{repositoryID: 1}, + "relative-path-2": repositoryRecord{repositoryID: 2}, }, "virtual-storage-2": { - "relative-path-1": repositoryRecord{}, + "relative-path-1": repositoryRecord{repositoryID: 3}, }, }, storageState{ "virtual-storage-1": { "relative-path-1": { - "storage-1": 0, - "storage-2": 0, + "storage-1": {generation: 0}, + "storage-2": {generation: 0}, }, "relative-path-2": { - "storage-1": 0, + "storage-1": {generation: 0}, }, }, "virtual-storage-2": { "relative-path-1": { - "storage-1": 0, + "storage-1": {generation: 0}, }, }, }, @@ -793,25 +803,25 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "relative-path-1": repositoryRecord{}, - "relative-path-2": repositoryRecord{}, + "relative-path-1": repositoryRecord{repositoryID: 1}, + "relative-path-2": repositoryRecord{repositoryID: 2}, }, "virtual-storage-2": { - "relative-path-1": repositoryRecord{}, + "relative-path-1": repositoryRecord{repositoryID: 3}, }, }, storageState{ "virtual-storage-1": { "relative-path-1": { - "storage-2": 0, + "storage-2": {generation: 0}, }, "relative-path-2": { - "storage-1": 0, + "storage-1": {generation: 0}, }, }, "virtual-storage-2": { "relative-path-1": { - "storage-1": 0, + "storage-1": {generation: 0}, }, }, }, @@ -838,18 +848,18 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "renamed-all": repositoryRecord{}, - "renamed-some": repositoryRecord{}, + "renamed-all": repositoryRecord{repositoryID: 1}, + "renamed-some": repositoryRecord{repositoryID: 2}, }, }, storageState{ "virtual-storage-1": { "renamed-all": { - "storage-1": 0, + "storage-1": {generation: 0}, }, "renamed-some": { - "storage-1": 0, - "storage-2": 0, + "storage-1": {generation: 0}, + "storage-2": {generation: 0}, }, }, }, @@ -861,20 +871,20 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "renamed-all-new": repositoryRecord{}, - "renamed-some-new": repositoryRecord{}, + "renamed-all-new": repositoryRecord{repositoryID: 1}, + "renamed-some-new": repositoryRecord{repositoryID: 2}, }, }, storageState{ "virtual-storage-1": { "renamed-all-new": { - "storage-1": 0, + "storage-1": {generation: 0}, }, "renamed-some-new": { - "storage-1": 0, + "storage-1": {generation: 0}, }, "renamed-some": { - "storage-2": 0, + "storage-2": {generation: 0}, }, }, }, @@ -899,15 +909,15 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": repositoryRecord{}, + "repository-1": repositoryRecord{repositoryID: 1}, }, }, storageState{ "virtual-storage-1": { "repository-1": { - "primary": 1, - "consistent-secondary": 1, - "inconsistent-secondary": 0, + "primary": {generation: 1}, + "consistent-secondary": {generation: 1}, + "inconsistent-secondary": {generation: 0}, }, }, }, @@ -933,16 +943,16 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": repositoryRecord{}, + "repository-1": repositoryRecord{repositoryID: 1}, }, }, storageState{ "virtual-storage-1": { "repository-1": { - "unknown": 2, - "primary": 1, - "consistent-secondary": 1, - "inconsistent-secondary": 0, + "unknown": {generation: 2}, + "primary": {generation: 1}, + "consistent-secondary": {generation: 1}, + "inconsistent-secondary": {generation: 0}, }, }, }, @@ -960,9 +970,9 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { storageState{ "virtual-storage-1": { "repository-1": { - "unknown": 2, - "consistent-secondary": 1, - "inconsistent-secondary": 0, + "unknown": {generation: 2}, + "consistent-secondary": {generation: 1}, + "inconsistent-secondary": {generation: 0}, }, }, }, @@ -989,13 +999,13 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": repositoryRecord{}, + "repository-1": repositoryRecord{repositoryID: 1}, }, }, storageState{ "virtual-storage-1": { "repository-1": { - "other-storage": 0, + "other-storage": {generation: 0}, }, }, }, -- GitLab From 679d591d4dd3510525ef32e6a41376a4ab8b66f3 Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Tue, 27 Jul 2021 17:30:00 +0200 Subject: [PATCH 09/14] Link records with repository ID in CreateRepository When a repository is being created, all of the records need to be connected via the repository ID. This commit expands CreateRepository to take in the repository ID Praefect reserved for the repository while routing the creation request. CreateRepository then links all of the records it creates via the repository id. While the repository ID is not really used for anything yet, the newly created records being linked via the ID allow us to run a migration in the following record to just link all existing records correctly via the repository ID. Any concurrently created records would already be linked correctly by the code added in this commit. --- cmd/praefect/subcmd_accept_dataloss_test.go | 2 +- .../subcmd_set_replication_factor_test.go | 2 +- internal/praefect/coordinator.go | 14 +- internal/praefect/coordinator_pg_test.go | 2 +- internal/praefect/coordinator_test.go | 6 +- .../praefect/datastore/repository_store.go | 26 ++- .../datastore/repository_store_mock.go | 6 +- .../datastore/repository_store_test.go | 182 +++++++++--------- internal/praefect/replicator_pg_test.go | 2 +- internal/praefect/repository_exists_test.go | 2 +- 10 files changed, 133 insertions(+), 111 deletions(-) diff --git a/cmd/praefect/subcmd_accept_dataloss_test.go b/cmd/praefect/subcmd_accept_dataloss_test.go index cb3e9bf2b0..2945b43f16 100644 --- a/cmd/praefect/subcmd_accept_dataloss_test.go +++ b/cmd/praefect/subcmd_accept_dataloss_test.go @@ -47,7 +47,7 @@ func TestAcceptDatalossSubcommand(t *testing.T) { if !repoCreated { repoCreated = true - require.NoError(t, rs.CreateRepository(ctx, vs, repo, storage, nil, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, storage, nil, nil, false, false)) } require.NoError(t, rs.SetGeneration(ctx, vs, repo, storage, generation)) diff --git a/cmd/praefect/subcmd_set_replication_factor_test.go b/cmd/praefect/subcmd_set_replication_factor_test.go index 5b0d076055..197045a55b 100644 --- a/cmd/praefect/subcmd_set_replication_factor_test.go +++ b/cmd/praefect/subcmd_set_replication_factor_test.go @@ -92,7 +92,7 @@ func TestSetReplicationFactorSubcommand(t *testing.T) { // create a repository record require.NoError(t, - datastore.NewPostgresRepositoryStore(db, nil).CreateRepository(ctx, "virtual-storage", "relative-path", "primary", nil, nil, false, false), + datastore.NewPostgresRepositoryStore(db, nil).CreateRepository(ctx, 1, "virtual-storage", "relative-path", "primary", nil, nil, false, false), ) ln, clean := listenAndServe(t, []svcRegistrar{registerPraefectInfoServer( diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 45346e488c..0d837224d4 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -333,8 +333,13 @@ func (c *Coordinator) directRepositoryScopedMessage(ctx context.Context, call gr // the repositories are created directly on the filesystem. There is no call for the // CreateRepository that creates records in the database that is why we do it artificially // before redirecting the calls. - if err := c.rs.CreateRepository(ctx, call.targetRepo.StorageName, call.targetRepo.RelativePath, call.targetRepo.StorageName, nil, nil, true, true); err != nil { - if !errors.Is(err, datastore.RepositoryExistsError{}) { + id, err := c.rs.ReserveRepositoryID(ctx, call.targetRepo.StorageName, call.targetRepo.RelativePath) + if err != nil { + if !errors.Is(err, commonerr.ErrRepositoryAlreadyExists) { + return nil, err + } + } else { + if err := c.rs.CreateRepository(ctx, id, call.targetRepo.StorageName, call.targetRepo.RelativePath, call.targetRepo.StorageName, nil, nil, true, true); err != nil { return nil, err } } @@ -552,6 +557,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall finalizers = append(finalizers, c.newRequestFinalizer( ctx, + route.RepositoryID, virtualStorage, targetRepo, route.Primary.Storage, @@ -789,7 +795,7 @@ func (c *Coordinator) createTransactionFinalizer( } return c.newRequestFinalizer( - ctx, virtualStorage, targetRepo, route.Primary.Storage, + ctx, route.RepositoryID, virtualStorage, targetRepo, route.Primary.Storage, updated, outdated, change, params, cause)() } } @@ -921,6 +927,7 @@ func routerNodesToStorages(nodes []RouterNode) []string { func (c *Coordinator) newRequestFinalizer( ctx context.Context, + repositoryID int64, virtualStorage string, targetRepo *gitalypb.Repository, primary string, @@ -987,6 +994,7 @@ func (c *Coordinator) newRequestFinalizer( c.conf.DefaultReplicationFactors()[virtualStorage] > 0 if err := c.rs.CreateRepository(ctx, + repositoryID, virtualStorage, targetRepo.GetRelativePath(), primary, diff --git a/internal/praefect/coordinator_pg_test.go b/internal/praefect/coordinator_pg_test.go index 3d887b23a9..a27f928208 100644 --- a/internal/praefect/coordinator_pg_test.go +++ b/internal/praefect/coordinator_pg_test.go @@ -217,7 +217,7 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { if !repoCreated { repoCreated = true - require.NoError(t, rs.CreateRepository(ctx, repo.StorageName, repo.RelativePath, storageNodes[i].Storage, nil, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, 1, repo.StorageName, repo.RelativePath, storageNodes[i].Storage, nil, nil, false, false)) } require.NoError(t, rs.SetGeneration(ctx, repo.StorageName, repo.RelativePath, storageNodes[i].Storage, n.generation)) diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index d2225f1a39..91fd9f42d8 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -803,8 +803,9 @@ func TestStreamDirector_repo_creation(t *testing.T) { var createRepositoryCalled int64 rs := datastore.MockRepositoryStore{ - CreateRepositoryFunc: func(ctx context.Context, virtualStorage, relativePath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error { + CreateRepositoryFunc: func(ctx context.Context, repoID int64, virtualStorage, relativePath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error { atomic.AddInt64(&createRepositoryCalled, 1) + assert.Equal(t, int64(0), repoID) assert.Equal(t, targetRepo.StorageName, virtualStorage) assert.Equal(t, targetRepo.RelativePath, relativePath) assert.Equal(t, rewrittenStorage, primary) @@ -2069,7 +2070,7 @@ func TestNewRequestFinalizer_contextIsDisjointedFromTheRPC(t *testing.T) { requireSuppressedCancellation(t, ctx) return err }, - CreateRepositoryFunc: func(ctx context.Context, _, _, _ string, _, _ []string, _, _ bool) error { + CreateRepositoryFunc: func(ctx context.Context, _ int64, _, _, _ string, _, _ []string, _, _ bool) error { requireSuppressedCancellation(t, ctx) return err }, @@ -2080,6 +2081,7 @@ func TestNewRequestFinalizer_contextIsDisjointedFromTheRPC(t *testing.T) { nil, ).newRequestFinalizer( ctx, + 0, "virtual storage", &gitalypb.Repository{}, "primary", diff --git a/internal/praefect/datastore/repository_store.go b/internal/praefect/datastore/repository_store.go index bc588b7cac..0b0650991f 100644 --- a/internal/praefect/datastore/repository_store.go +++ b/internal/praefect/datastore/repository_store.go @@ -104,7 +104,7 @@ type RepositoryStore interface { // // storeAssignments should be set when variable replication factor is enabled. When set, the primary and the // secondaries are stored as the assigned hosts of the repository. - CreateRepository(ctx context.Context, virtualStorage, relativePath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error + CreateRepository(ctx context.Context, repositoryID int64, virtualStorage, relativePath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error // SetAuthoritativeReplica sets the given replica of a repsitory as the authoritative one by setting its generation as the latest one. SetAuthoritativeReplica(ctx context.Context, virtualStorage, relativePath, storage string) error // DeleteRepository deletes the repository's record from the virtual storage and the storages. Returns @@ -330,26 +330,36 @@ AND storage = ANY($3) return sourceGeneration, nil } -// CreateRepository creates a new repository and assigns it to the given primary and secondary -// nodes. -func (rs *PostgresRepositoryStore) CreateRepository(ctx context.Context, virtualStorage, relativePath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error { +// CreateRepository creates a record for a repository in the specified virtual storage and relative path. +// Primary is the storage the repository was created on. UpdatedSecondaries are secondaries that participated +// and successfully completed the transaction. OutdatedSecondaries are secondaries that were outdated or failed +// the transaction. Returns RepositoryExistsError when trying to create a repository which already exists in the store. +// +// storePrimary should be set when repository specific primaries are enabled. When set, the primary is stored as +// the repository's primary. +// +// storeAssignments should be set when variable replication factor is enabled. When set, the primary and the +// secondaries are stored as the assigned hosts of the repository. +func (rs *PostgresRepositoryStore) CreateRepository(ctx context.Context, repositoryID int64, virtualStorage, relativePath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error { const q = ` WITH repo AS ( INSERT INTO repositories ( + repository_id, virtual_storage, relative_path, generation, "primary" - ) VALUES ($1, $2, 0, CASE WHEN $4 THEN $3 END) + ) VALUES ($8, $1, $2, 0, CASE WHEN $4 THEN $3 END) ), assignments AS ( INSERT INTO repository_assignments ( + repository_id, virtual_storage, relative_path, storage ) - SELECT $1, $2, storage + SELECT $8, $1, $2, storage FROM ( SELECT $3 AS storage UNION @@ -361,12 +371,13 @@ assignments AS ( ) INSERT INTO storage_repositories ( + repository_id, virtual_storage, relative_path, storage, generation ) -SELECT $1, $2, storage, 0 +SELECT $8, $1, $2, storage, 0 FROM ( SELECT $3 AS storage UNION @@ -382,6 +393,7 @@ FROM ( pq.StringArray(updatedSecondaries), pq.StringArray(outdatedSecondaries), storeAssignments, + repositoryID, ) var pqerr *pq.Error diff --git a/internal/praefect/datastore/repository_store_mock.go b/internal/praefect/datastore/repository_store_mock.go index ed4b3c62ab..0e3c46c92b 100644 --- a/internal/praefect/datastore/repository_store_mock.go +++ b/internal/praefect/datastore/repository_store_mock.go @@ -9,7 +9,7 @@ type MockRepositoryStore struct { IncrementGenerationFunc func(ctx context.Context, virtualStorage, relativePath, primary string, secondaries []string) error GetReplicatedGenerationFunc func(ctx context.Context, virtualStorage, relativePath, source, target string) (int, error) SetGenerationFunc func(ctx context.Context, virtualStorage, relativePath, storage string, generation int) error - CreateRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error + CreateRepositoryFunc func(ctx context.Context, repositoryID int64, virtualStorage, relativePath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error SetAuthoritativeReplicaFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error DeleteRepositoryFunc func(ctx context.Context, virtualStorage, relativePath string, storages []string) error DeleteReplicaFunc func(ctx context.Context, virtualStorage, relativePath, storage string) error @@ -55,12 +55,12 @@ func (m MockRepositoryStore) SetGeneration(ctx context.Context, virtualStorage, } // CreateRepository calls the mocked function. If no mock has been provided, it returns a nil error. -func (m MockRepositoryStore) CreateRepository(ctx context.Context, virtualStorage, relativePath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error { +func (m MockRepositoryStore) CreateRepository(ctx context.Context, repositoryID int64, virtualStorage, relativePath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error { if m.CreateRepositoryFunc == nil { return nil } - return m.CreateRepositoryFunc(ctx, virtualStorage, relativePath, primary, updatedSecondaries, outdatedSecondaries, storePrimary, storeAssignments) + return m.CreateRepositoryFunc(ctx, repositoryID, virtualStorage, relativePath, primary, updatedSecondaries, outdatedSecondaries, storePrimary, storeAssignments) } // SetAuthoritativeReplica calls the mocked function. If no mock has been provided, it returns a nil error. diff --git a/internal/praefect/datastore/repository_store_test.go b/internal/praefect/datastore/repository_store_test.go index 76b121e2fc..912241112a 100644 --- a/internal/praefect/datastore/repository_store_test.go +++ b/internal/praefect/datastore/repository_store_test.go @@ -46,10 +46,10 @@ func requireState(t testing.TB, ctx context.Context, db glsql.Querier, vss virtu SELECT repository_id, virtual_storage, relative_path, "primary", assigned_storages FROM repositories LEFT JOIN ( - SELECT virtual_storage, relative_path, array_agg(storage ORDER BY storage) AS assigned_storages + SELECT repository_id, virtual_storage, relative_path, array_agg(storage ORDER BY storage) AS assigned_storages FROM repository_assignments - GROUP BY virtual_storage, relative_path -) AS repository_assignments USING (virtual_storage, relative_path) + GROUP BY repository_id, virtual_storage, relative_path +) AS repository_assignments USING (repository_id, virtual_storage, relative_path) `) require.NoError(t, err) @@ -153,8 +153,8 @@ func TestRepositoryStore_incrementGenerationConcurrently(t *testing.T) { state: storageState{ "virtual-storage": { "relative-path": { - "primary": {generation: 2}, - "secondary": {generation: 2}, + "primary": {repositoryID: 1, generation: 2}, + "secondary": {repositoryID: 1, generation: 2}, }, }, }, @@ -171,8 +171,8 @@ func TestRepositoryStore_incrementGenerationConcurrently(t *testing.T) { state: storageState{ "virtual-storage": { "relative-path": { - "primary": {generation: 2}, - "secondary": {generation: 0}, + "primary": {repositoryID: 1, generation: 2}, + "secondary": {repositoryID: 1, generation: 0}, }, }, }, @@ -189,8 +189,8 @@ func TestRepositoryStore_incrementGenerationConcurrently(t *testing.T) { state: storageState{ "virtual-storage": { "relative-path": { - "primary": {generation: 1}, - "secondary": {generation: 0}, + "primary": {repositoryID: 1, generation: 1}, + "secondary": {repositoryID: 1, generation: 0}, }, }, }, @@ -202,7 +202,7 @@ func TestRepositoryStore_incrementGenerationConcurrently(t *testing.T) { db.TruncateAll(t) - require.NoError(t, NewPostgresRepositoryStore(db, nil).CreateRepository(ctx, "virtual-storage", "relative-path", "primary", []string{"secondary"}, nil, false, false)) + require.NoError(t, NewPostgresRepositoryStore(db, nil).CreateRepository(ctx, 1, "virtual-storage", "relative-path", "primary", []string{"secondary"}, nil, false, false)) firstTx := db.Begin(t) secondTx := db.Begin(t) @@ -282,7 +282,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { t.Run("write to outdated nodes", func(t *testing.T) { rs, requireState := newStore(t, nil) - require.NoError(t, rs.CreateRepository(ctx, vs, repo, "latest-node", []string{"outdated-primary", "outdated-secondary"}, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "latest-node", []string{"outdated-primary", "outdated-secondary"}, nil, false, false)) require.NoError(t, rs.SetGeneration(ctx, vs, repo, "latest-node", 1)) require.Equal(t, @@ -298,9 +298,9 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { storageState{ "virtual-storage-1": { "repository-1": { - "latest-node": {generation: 1}, - "outdated-primary": {generation: 0}, - "outdated-secondary": {generation: 0}, + "latest-node": {repositoryID: 1, generation: 1}, + "outdated-primary": {repositoryID: 1, generation: 0}, + "outdated-secondary": {repositoryID: 1, generation: 0}, }, }, }, @@ -310,14 +310,14 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { t.Run("increments generation for up to date nodes", func(t *testing.T) { rs, requireState := newStore(t, nil) - for _, pair := range []struct{ virtualStorage, relativePath string }{ + for id, pair := range []struct{ virtualStorage, relativePath string }{ {vs, repo}, // create records that don't get modified to ensure the query is correctly scoped by virtual storage // and relative path {vs, "other-relative-path"}, {"other-virtual-storage", repo}, } { - require.NoError(t, rs.CreateRepository(ctx, pair.virtualStorage, pair.relativePath, "primary", []string{"up-to-date-secondary", "outdated-secondary"}, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, int64(id+1), pair.virtualStorage, pair.relativePath, "primary", []string{"up-to-date-secondary", "outdated-secondary"}, nil, false, false)) } require.NoError(t, rs.IncrementGeneration(ctx, vs, repo, "primary", []string{"up-to-date-secondary"})) @@ -335,21 +335,21 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { storageState{ "virtual-storage-1": { "repository-1": { - "primary": {generation: 1}, - "up-to-date-secondary": {generation: 1}, - "outdated-secondary": {generation: 0}, + "primary": {repositoryID: 1, generation: 1}, + "up-to-date-secondary": {repositoryID: 1, generation: 1}, + "outdated-secondary": {repositoryID: 1, generation: 0}, }, "other-relative-path": { - "primary": {}, - "up-to-date-secondary": {}, - "outdated-secondary": {}, + "primary": {repositoryID: 2}, + "up-to-date-secondary": {repositoryID: 2}, + "outdated-secondary": {repositoryID: 2}, }, }, "other-virtual-storage": { "repository-1": { - "primary": {}, - "up-to-date-secondary": {}, - "outdated-secondary": {}, + "primary": {repositoryID: 3}, + "up-to-date-secondary": {repositoryID: 3}, + "outdated-secondary": {repositoryID: 3}, }, }, }, @@ -371,21 +371,21 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { storageState{ "virtual-storage-1": { "repository-1": { - "primary": {generation: 2}, - "up-to-date-secondary": {generation: 2}, - "outdated-secondary": {generation: 0}, + "primary": {repositoryID: 1, generation: 2}, + "up-to-date-secondary": {repositoryID: 1, generation: 2}, + "outdated-secondary": {repositoryID: 1, generation: 0}, }, "other-relative-path": { - "primary": {}, - "up-to-date-secondary": {}, - "outdated-secondary": {}, + "primary": {repositoryID: 2}, + "up-to-date-secondary": {repositoryID: 2}, + "outdated-secondary": {repositoryID: 2}, }, }, "other-virtual-storage": { "repository-1": { - "primary": {}, - "up-to-date-secondary": {}, - "outdated-secondary": {}, + "primary": {repositoryID: 3}, + "up-to-date-secondary": {repositoryID: 3}, + "outdated-secondary": {repositoryID: 3}, }, }, }, @@ -404,7 +404,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { storageState{ "virtual-storage-1": { "repository-1": { - "storage-1": {generation: 1}, + "storage-1": {repositoryID: 0, generation: 1}, }, }, }, @@ -414,7 +414,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { t.Run("updates existing record", func(t *testing.T) { rs, requireState := newStore(t, nil) - require.NoError(t, rs.CreateRepository(ctx, vs, repo, "storage-1", nil, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "storage-1", nil, nil, false, false)) require.NoError(t, rs.SetGeneration(ctx, vs, repo, stor, 1)) require.NoError(t, rs.SetGeneration(ctx, vs, repo, stor, 0)) requireState(t, ctx, @@ -426,7 +426,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { storageState{ "virtual-storage-1": { "repository-1": { - "storage-1": {generation: 0}, + "storage-1": {repositoryID: 1, generation: 0}, }, }, }, @@ -445,7 +445,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { }) t.Run("sets the given replica as the latest", func(t *testing.T) { - require.NoError(t, rs.CreateRepository(ctx, vs, repo, "storage-1", []string{"storage-2"}, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "storage-1", []string{"storage-2"}, nil, false, false)) requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { @@ -455,8 +455,8 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { storageState{ "virtual-storage-1": { "repository-1": { - "storage-1": {generation: 0}, - "storage-2": {generation: 0}, + "storage-1": {repositoryID: 1, generation: 0}, + "storage-2": {repositoryID: 1, generation: 0}, }, }, }, @@ -472,8 +472,8 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { storageState{ "virtual-storage-1": { "repository-1": { - "storage-1": {generation: 1}, - "storage-2": {generation: 0}, + "storage-1": {repositoryID: 1, generation: 1}, + "storage-2": {repositoryID: 1, generation: 0}, }, }, }, @@ -606,18 +606,18 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { t.Run(tc.desc, func(t *testing.T) { rs, requireState := newStore(t, nil) - require.NoError(t, rs.CreateRepository(ctx, vs, repo, "primary", tc.updatedSecondaries, tc.outdatedSecondaries, tc.storePrimary, tc.storeAssignments)) + require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "primary", tc.updatedSecondaries, tc.outdatedSecondaries, tc.storePrimary, tc.storeAssignments)) expectedStorageState := storageState{ vs: { repo: { - "primary": {generation: 0}, + "primary": {repositoryID: 1, generation: 0}, }, }, } for _, updatedSecondary := range tc.updatedSecondaries { - expectedStorageState[vs][repo][updatedSecondary] = replicaRecord{generation: 0} + expectedStorageState[vs][repo][updatedSecondary] = replicaRecord{repositoryID: 1, generation: 0} } requireState(t, ctx, @@ -639,10 +639,10 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { t.Run("conflict", func(t *testing.T) { rs, _ := newStore(t, nil) - require.NoError(t, rs.CreateRepository(ctx, vs, repo, stor, nil, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, stor, nil, nil, false, false)) require.Equal(t, RepositoryExistsError{vs, repo, stor}, - rs.CreateRepository(ctx, vs, repo, stor, nil, nil, false, false), + rs.CreateRepository(ctx, 1, vs, repo, stor, nil, nil, false, false), ) }) }) @@ -657,10 +657,10 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { t.Run("delete existing", func(t *testing.T) { rs, requireState := newStore(t, nil) - require.NoError(t, rs.CreateRepository(ctx, "deleted", "deleted", "deleted", nil, nil, false, false)) - require.NoError(t, rs.CreateRepository(ctx, "virtual-storage-1", "other-storages-remain", "deleted-storage", []string{"remaining-storage"}, nil, false, false)) - require.NoError(t, rs.CreateRepository(ctx, "virtual-storage-2", "deleted-repo", "deleted-storage", nil, nil, false, false)) - require.NoError(t, rs.CreateRepository(ctx, "virtual-storage-2", "other-repo-remains", "remaining-storage", nil, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, 1, "deleted", "deleted", "deleted", nil, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, 2, "virtual-storage-1", "other-storages-remain", "deleted-storage", []string{"remaining-storage"}, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, 3, "virtual-storage-2", "deleted-repo", "deleted-storage", nil, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, 4, "virtual-storage-2", "other-repo-remains", "remaining-storage", nil, nil, false, false)) requireState(t, ctx, virtualStorageState{ @@ -678,21 +678,21 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { storageState{ "deleted": { "deleted": { - "deleted": {generation: 0}, + "deleted": {repositoryID: 1, generation: 0}, }, }, "virtual-storage-1": { "other-storages-remain": { - "deleted-storage": {generation: 0}, - "remaining-storage": {generation: 0}, + "deleted-storage": {repositoryID: 2, generation: 0}, + "remaining-storage": {repositoryID: 2, generation: 0}, }, }, "virtual-storage-2": { "deleted-repo": { - "deleted-storage": {generation: 0}, + "deleted-storage": {repositoryID: 3, generation: 0}, }, "other-repo-remains": { - "remaining-storage": {generation: 0}, + "remaining-storage": {repositoryID: 4, generation: 0}, }, }, }, @@ -716,7 +716,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { }, "virtual-storage-2": { "other-repo-remains": { - "remaining-storage": {generation: 0}, + "remaining-storage": {repositoryID: 4, generation: 0}, }, }, }, @@ -725,7 +725,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { t.Run("transactional delete", func(t *testing.T) { rs, requireState := newStore(t, nil) - require.NoError(t, rs.CreateRepository(ctx, "virtual-storage-1", "repository-1", "replica-1", []string{"replica-2", "replica-3"}, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, 1, "virtual-storage-1", "repository-1", "replica-1", []string{"replica-2", "replica-3"}, nil, false, false)) requireState(t, ctx, virtualStorageState{ @@ -736,9 +736,9 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { storageState{ "virtual-storage-1": { "repository-1": { - "replica-1": {generation: 0}, - "replica-2": {generation: 0}, - "replica-3": {generation: 0}, + "replica-1": {repositoryID: 1, generation: 0}, + "replica-2": {repositoryID: 1, generation: 0}, + "replica-3": {repositoryID: 1, generation: 0}, }, }, }, @@ -766,9 +766,9 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { }) t.Run("delete existing", func(t *testing.T) { - require.NoError(t, rs.CreateRepository(ctx, "virtual-storage-1", "relative-path-1", "storage-1", []string{"storage-2"}, nil, false, false)) - require.NoError(t, rs.CreateRepository(ctx, "virtual-storage-1", "relative-path-2", "storage-1", nil, nil, false, false)) - require.NoError(t, rs.CreateRepository(ctx, "virtual-storage-2", "relative-path-1", "storage-1", nil, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, 1, "virtual-storage-1", "relative-path-1", "storage-1", []string{"storage-2"}, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, 2, "virtual-storage-1", "relative-path-2", "storage-1", nil, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, 3, "virtual-storage-2", "relative-path-1", "storage-1", nil, nil, false, false)) requireState(t, ctx, virtualStorageState{ @@ -783,16 +783,16 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { storageState{ "virtual-storage-1": { "relative-path-1": { - "storage-1": {generation: 0}, - "storage-2": {generation: 0}, + "storage-1": {repositoryID: 1, generation: 0}, + "storage-2": {repositoryID: 1, generation: 0}, }, "relative-path-2": { - "storage-1": {generation: 0}, + "storage-1": {repositoryID: 2, generation: 0}, }, }, "virtual-storage-2": { "relative-path-1": { - "storage-1": {generation: 0}, + "storage-1": {repositoryID: 3, generation: 0}, }, }, }, @@ -813,15 +813,15 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { storageState{ "virtual-storage-1": { "relative-path-1": { - "storage-2": {generation: 0}, + "storage-2": {repositoryID: 1, generation: 0}, }, "relative-path-2": { - "storage-1": {generation: 0}, + "storage-1": {repositoryID: 2, generation: 0}, }, }, "virtual-storage-2": { "relative-path-1": { - "storage-1": {generation: 0}, + "storage-1": {repositoryID: 3, generation: 0}, }, }, }, @@ -842,8 +842,8 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { t.Run("rename existing", func(t *testing.T) { rs, requireState := newStore(t, nil) - require.NoError(t, rs.CreateRepository(ctx, vs, "renamed-all", "storage-1", nil, nil, false, false)) - require.NoError(t, rs.CreateRepository(ctx, vs, "renamed-some", "storage-1", []string{"storage-2"}, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, 1, vs, "renamed-all", "storage-1", nil, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, 2, vs, "renamed-some", "storage-1", []string{"storage-2"}, nil, false, false)) requireState(t, ctx, virtualStorageState{ @@ -855,11 +855,11 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { storageState{ "virtual-storage-1": { "renamed-all": { - "storage-1": {generation: 0}, + "storage-1": {repositoryID: 1, generation: 0}, }, "renamed-some": { - "storage-1": {generation: 0}, - "storage-2": {generation: 0}, + "storage-1": {repositoryID: 2, generation: 0}, + "storage-2": {repositoryID: 2, generation: 0}, }, }, }, @@ -878,13 +878,13 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { storageState{ "virtual-storage-1": { "renamed-all-new": { - "storage-1": {generation: 0}, + "storage-1": {repositoryID: 1, generation: 0}, }, "renamed-some-new": { - "storage-1": {generation: 0}, + "storage-1": {repositoryID: 2, generation: 0}, }, "renamed-some": { - "storage-2": {generation: 0}, + "storage-2": {repositoryID: 2, generation: 0}, }, }, }, @@ -903,7 +903,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { require.Empty(t, secondaries) }) - require.NoError(t, rs.CreateRepository(ctx, vs, repo, "primary", []string{"consistent-secondary"}, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "primary", []string{"consistent-secondary"}, nil, false, false)) require.NoError(t, rs.IncrementGeneration(ctx, vs, repo, "primary", []string{"consistent-secondary"})) require.NoError(t, rs.SetGeneration(ctx, vs, repo, "inconsistent-secondary", 0)) requireState(t, ctx, @@ -915,8 +915,8 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { storageState{ "virtual-storage-1": { "repository-1": { - "primary": {generation: 1}, - "consistent-secondary": {generation: 1}, + "primary": {repositoryID: 1, generation: 1}, + "consistent-secondary": {repositoryID: 1, generation: 1}, "inconsistent-secondary": {generation: 0}, }, }, @@ -950,8 +950,8 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { "virtual-storage-1": { "repository-1": { "unknown": {generation: 2}, - "primary": {generation: 1}, - "consistent-secondary": {generation: 1}, + "primary": {repositoryID: 1, generation: 1}, + "consistent-secondary": {repositoryID: 1, generation: 1}, "inconsistent-secondary": {generation: 0}, }, }, @@ -987,14 +987,14 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { t.Run("DeleteInvalidRepository", func(t *testing.T) { t.Run("only replica", func(t *testing.T) { rs, requireState := newStore(t, nil) - require.NoError(t, rs.CreateRepository(ctx, vs, repo, "invalid-storage", nil, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "invalid-storage", nil, nil, false, false)) require.NoError(t, rs.DeleteInvalidRepository(ctx, vs, repo, "invalid-storage")) requireState(t, ctx, virtualStorageState{}, storageState{}) }) t.Run("another replica", func(t *testing.T) { rs, requireState := newStore(t, nil) - require.NoError(t, rs.CreateRepository(ctx, vs, repo, "invalid-storage", []string{"other-storage"}, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, "invalid-storage", []string{"other-storage"}, nil, false, false)) require.NoError(t, rs.DeleteInvalidRepository(ctx, vs, repo, "invalid-storage")) requireState(t, ctx, virtualStorageState{ @@ -1005,7 +1005,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { storageState{ "virtual-storage-1": { "repository-1": { - "other-storage": {generation: 0}, + "other-storage": {repositoryID: 1, generation: 0}, }, }, }, @@ -1020,7 +1020,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { require.NoError(t, err) require.False(t, exists) - require.NoError(t, rs.CreateRepository(ctx, vs, repo, stor, nil, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, stor, nil, nil, false, false)) exists, err = rs.RepositoryExists(ctx, vs, repo) require.NoError(t, err) require.True(t, exists) @@ -1042,7 +1042,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { require.NoError(t, err) require.Equal(t, int64(2), id) - require.NoError(t, rs.CreateRepository(ctx, vs, repo, stor, nil, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, id, vs, repo, stor, nil, nil, false, false)) id, err = rs.ReserveRepositoryID(ctx, vs, repo) require.Equal(t, commonerr.ErrRepositoryAlreadyExists, err) @@ -1050,7 +1050,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { id, err = rs.ReserveRepositoryID(ctx, vs, repo+"-2") require.NoError(t, err) - require.Equal(t, int64(4), id) + require.Equal(t, int64(3), id) }) t.Run("GetRepositoryID", func(t *testing.T) { @@ -1060,7 +1060,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { require.Equal(t, commonerr.NewRepositoryNotFoundError(vs, repo), err) require.Equal(t, int64(0), id) - require.NoError(t, rs.CreateRepository(ctx, vs, repo, stor, nil, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, stor, nil, nil, false, false)) id, err = rs.GetRepositoryID(ctx, vs, repo) require.Nil(t, err) diff --git a/internal/praefect/replicator_pg_test.go b/internal/praefect/replicator_pg_test.go index 63e21b223a..d790a88c3b 100644 --- a/internal/praefect/replicator_pg_test.go +++ b/internal/praefect/replicator_pg_test.go @@ -87,7 +87,7 @@ func TestReplicatorDestroy(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - require.NoError(t, rs.CreateRepository(ctx, "virtual-storage-1", "relative-path-1", "storage-1", []string{"storage-2"}, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, 1, "virtual-storage-1", "relative-path-1", "storage-1", []string{"storage-2"}, nil, false, false)) ln, err := net.Listen("tcp", "localhost:0") require.NoError(t, err) diff --git a/internal/praefect/repository_exists_test.go b/internal/praefect/repository_exists_test.go index 75db7e81be..a5e894eaf1 100644 --- a/internal/praefect/repository_exists_test.go +++ b/internal/praefect/repository_exists_test.go @@ -74,7 +74,7 @@ func TestRepositoryExistsStreamInterceptor(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - require.NoError(t, rs.CreateRepository(ctx, "virtual-storage", "relative-path", "storage", nil, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, 0, "virtual-storage", "relative-path", "storage", nil, nil, false, false)) electionStrategy := config.ElectionStrategyPerRepository if tc.routeToGitaly { -- GitLab From 056d610e14bb4c750659b10e6be12081040af9c4 Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Tue, 10 Aug 2021 13:54:50 +0200 Subject: [PATCH 10/14] Link repository id to replicas created from replication jobs SetGeneration is called to set a replica's generation following a replication job. As it might create new replica records, it should ensure the record is linked to the correct repository via the repository ID. This commit links the replica's record by getting the correct repository ID from the repositories table. As all replicas still have the same (virtual_storage, relative_path) as the repository, this is still safe to do. It's also necessary as not every replication job yet carries a repository ID. This commit also includes the repository ID in replication jobs so the approach can later be updated to simply take the repository ID from the replication job. --- internal/praefect/coordinator.go | 1 + internal/praefect/datastore/queue.go | 4 ++++ internal/praefect/datastore/repository_store.go | 6 +++++- internal/praefect/datastore/repository_store_test.go | 6 +++--- 4 files changed, 13 insertions(+), 4 deletions(-) diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 0d837224d4..218d5fbb3c 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -1018,6 +1018,7 @@ func (c *Coordinator) newRequestFinalizer( for _, secondary := range outdatedSecondaries { event := datastore.ReplicationEvent{ Job: datastore.ReplicationJob{ + RepositoryID: repositoryID, Change: change, RelativePath: targetRepo.GetRelativePath(), VirtualStorage: virtualStorage, diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go index dcff2567e0..72d88f9702 100644 --- a/internal/praefect/datastore/queue.go +++ b/internal/praefect/datastore/queue.go @@ -49,6 +49,10 @@ func allowToAck(state JobState) error { // ReplicationJob is a persistent representation of the replication job. type ReplicationJob struct { + // RepositoryID is the ID of the repository this job relates to. RepositoryID + // may be 0 if the job doesn't relate to any known repository. This can happen + // for example when the job is deleting an orphaned replica of a deleted repository. + RepositoryID int64 `json:"repository_id"` Change ChangeType `json:"change"` RelativePath string `json:"relative_path"` TargetNodeStorage string `json:"target_node_storage"` diff --git a/internal/praefect/datastore/repository_store.go b/internal/praefect/datastore/repository_store.go index 0b0650991f..83571fa7e8 100644 --- a/internal/praefect/datastore/repository_store.go +++ b/internal/praefect/datastore/repository_store.go @@ -235,13 +235,17 @@ WITH repository AS ( ) INSERT INTO storage_repositories ( + repository_id, virtual_storage, relative_path, storage, generation ) -VALUES ($1, $2, $3, $4) +SELECT + (SELECT repository_id FROM repositories WHERE virtual_storage = $1 AND relative_path = $2), + $1, $2, $3, $4 ON CONFLICT (virtual_storage, relative_path, storage) DO UPDATE SET + repository_id = EXCLUDED.repository_id, generation = EXCLUDED.generation ` diff --git a/internal/praefect/datastore/repository_store_test.go b/internal/praefect/datastore/repository_store_test.go index 912241112a..3940e26ffc 100644 --- a/internal/praefect/datastore/repository_store_test.go +++ b/internal/praefect/datastore/repository_store_test.go @@ -917,7 +917,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { "repository-1": { "primary": {repositoryID: 1, generation: 1}, "consistent-secondary": {repositoryID: 1, generation: 1}, - "inconsistent-secondary": {generation: 0}, + "inconsistent-secondary": {repositoryID: 1, generation: 0}, }, }, }, @@ -949,10 +949,10 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { storageState{ "virtual-storage-1": { "repository-1": { - "unknown": {generation: 2}, + "unknown": {repositoryID: 1, generation: 2}, "primary": {repositoryID: 1, generation: 1}, "consistent-secondary": {repositoryID: 1, generation: 1}, - "inconsistent-secondary": {generation: 0}, + "inconsistent-secondary": {repositoryID: 1, generation: 0}, }, }, }, -- GitLab From 8022d334fb3d832ca1289fd00c61217dc997260a Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Thu, 29 Jul 2021 12:11:34 +0200 Subject: [PATCH 11/14] Link created assignments via the repository ID to the repository The assignments are already being linked with the repository ID when they are being created through CreateRepository as part of repository creation. Assignment records can also be created when SetReplicationFactor is called. This commit updates SetReplicationFactor to link the created assignments records via the repository ID so we can later do a one swoop migration to link all existing records to their repositories via the id. --- internal/praefect/datastore/assignment.go | 4 ++-- internal/praefect/datastore/assignment_test.go | 15 ++++++++++++--- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/internal/praefect/datastore/assignment.go b/internal/praefect/datastore/assignment.go index b097272c46..7fc48eedcb 100644 --- a/internal/praefect/datastore/assignment.go +++ b/internal/praefect/datastore/assignment.go @@ -123,7 +123,7 @@ func (s AssignmentStore) SetReplicationFactor(ctx context.Context, virtualStorag // current assignments. rows, err := s.db.QueryContext(ctx, ` WITH repository AS ( - SELECT virtual_storage, relative_path, "primary" + SELECT repository_id, virtual_storage, relative_path, "primary" FROM repositories WHERE virtual_storage = $1 AND relative_path = $2 @@ -139,7 +139,7 @@ existing_assignments AS ( created_assignments AS ( INSERT INTO repository_assignments - SELECT virtual_storage, relative_path, storage + SELECT virtual_storage, relative_path, storage, repository_id FROM repository CROSS JOIN ( SELECT unnest($4::text[]) AS storage ) AS configured_storages WHERE storage NOT IN ( SELECT storage FROM existing_assignments ) diff --git a/internal/praefect/datastore/assignment_test.go b/internal/praefect/datastore/assignment_test.go index c8ac5ad847..40dcb1c590 100644 --- a/internal/praefect/datastore/assignment_test.go +++ b/internal/praefect/datastore/assignment_test.go @@ -3,6 +3,7 @@ package datastore import ( "testing" + "github.com/lib/pq" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" @@ -204,15 +205,15 @@ func TestAssignmentStore_SetReplicationFactor(t *testing.T) { if !tc.nonExistentRepository { _, err := db.ExecContext(ctx, ` - INSERT INTO repositories (virtual_storage, relative_path, "primary") - VALUES ('virtual-storage', 'relative-path', 'primary') + INSERT INTO repositories (virtual_storage, relative_path, "primary", repository_id) + VALUES ('virtual-storage', 'relative-path', 'primary', 1) `) require.NoError(t, err) } for _, storage := range tc.existingAssignments { _, err := db.ExecContext(ctx, ` - INSERT INTO repository_assignments VALUES ('virtual-storage', 'relative-path', $1) + INSERT INTO repository_assignments VALUES ('virtual-storage', 'relative-path', $1, 1) `, storage) require.NoError(t, err) } @@ -230,6 +231,14 @@ func TestAssignmentStore_SetReplicationFactor(t *testing.T) { assignedStorages, err := store.GetHostAssignments(ctx, "virtual-storage", "relative-path") require.NoError(t, err) tc.requireStorages(t, assignedStorages) + + var storagesWithIncorrectRepositoryID pq.StringArray + require.NoError(t, db.QueryRowContext(ctx, ` + SELECT array_agg(storage) + FROM repository_assignments + WHERE COALESCE(repository_id != 1, true) + `).Scan(&storagesWithIncorrectRepositoryID)) + require.Empty(t, storagesWithIncorrectRepositoryID) }) } } -- GitLab From 40af81f89e52a642d86e420d62ada56b67a0614a Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Thu, 12 Aug 2021 13:19:22 +0200 Subject: [PATCH 12/14] Include repository ID in jobs scheduled by the reconciler Reconciler is the only other component that currently schedules replication jobs along the request finalizers. As such, we need to update it to produce replicaton jobs similar to those producer by the request finalizers. Namely, this commit includes the repository's ID in the replication jobs produced by the reconciler. --- internal/praefect/reconciler/reconciler.go | 13 +++++++++++-- internal/praefect/reconciler/reconciler_test.go | 17 ++++++++++++++++- 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/internal/praefect/reconciler/reconciler.go b/internal/praefect/reconciler/reconciler.go index 308a7aff19..a37fd5a055 100644 --- a/internal/praefect/reconciler/reconciler.go +++ b/internal/praefect/reconciler/reconciler.go @@ -84,6 +84,7 @@ func (r *Reconciler) Run(ctx context.Context, ticker helper.Ticker) error { // job is an internal type for formatting log messages type job struct { + RepositoryID int64 `json:"repository_id"` Change string `json:"change"` CorrelationID string `json:"correlation_id"` VirtualStorage string `json:"virtual_storage"` @@ -148,10 +149,12 @@ healthy_storages AS ( delete_jobs AS ( SELECT DISTINCT ON (virtual_storage, relative_path) + repositories.repository_id, virtual_storage, relative_path, storage FROM storage_repositories + JOIN repositories USING (virtual_storage, relative_path) JOIN healthy_storages USING (virtual_storage, storage) WHERE ( -- Only unassigned repositories should be targeted for deletion. If no assignment exists, @@ -161,7 +164,7 @@ delete_jobs AS ( WHERE virtual_storage = storage_repositories.virtual_storage AND relative_path = storage_repositories.relative_path ) - AND generation <= ( + AND storage_repositories.generation <= ( -- Check whether the replica's generation is equal or lower than the generation of every assigned -- replica of the repository. If so, then it is eligible for deletion. SELECT MIN(COALESCE(generation, -1)) @@ -196,6 +199,7 @@ delete_jobs AS ( -- repository remains on storage unused (because it doesn't exist in the 'repositories' table anymore). SELECT * FROM ( SELECT DISTINCT ON (virtual_storage, relative_path) + 0 AS repository_id, virtual_storage, relative_path, storage @@ -219,12 +223,13 @@ delete_jobs AS ( update_jobs AS ( SELECT DISTINCT ON (virtual_storage, relative_path, target_node_storage) + repository_id, virtual_storage, relative_path, source_node_storage, target_node_storage FROM ( - SELECT virtual_storage, relative_path, storage AS target_node_storage + SELECT repositories.repository_id, virtual_storage, relative_path, storage AS target_node_storage FROM repositories JOIN healthy_storages USING (virtual_storage) LEFT JOIN storage_repositories USING (virtual_storage, relative_path, storage) @@ -273,6 +278,7 @@ reconciliation_jobs AS ( jsonb_build_object('correlation_id', encode(random()::text::bytea, 'base64')) FROM ( SELECT + COALESCE(repository_id, 0) AS repository_id, virtual_storage, relative_path, source_node_storage, @@ -281,6 +287,7 @@ reconciliation_jobs AS ( FROM update_jobs UNION ALL SELECT + COALESCE(repository_id, 0) AS repository_id, virtual_storage, relative_path, NULL AS source_node_storage, @@ -303,6 +310,7 @@ create_locks AS ( SELECT meta->>'correlation_id', + job->>'repository_id', job->>'change', job->>'virtual_storage', job->>'relative_path', @@ -326,6 +334,7 @@ FROM reconciliation_jobs var j job if err := rows.Scan( &j.CorrelationID, + &j.RepositoryID, &j.Change, &j.VirtualStorage, &j.RelativePath, diff --git a/internal/praefect/reconciler/reconciler_test.go b/internal/praefect/reconciler/reconciler_test.go index a76adb48ff..a74a668f1a 100644 --- a/internal/praefect/reconciler/reconciler_test.go +++ b/internal/praefect/reconciler/reconciler_test.go @@ -12,6 +12,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/helper" "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/metadatahandler" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" @@ -1078,7 +1079,11 @@ func TestReconciler(t *testing.T) { if repo.generation >= 0 { if !repoCreated { repoCreated = true - require.NoError(t, rs.CreateRepository(ctx, virtualStorage, relativePath, storage, nil, nil, false, false)) + + id, err := rs.ReserveRepositoryID(ctx, virtualStorage, relativePath) + require.NoError(t, err) + + require.NoError(t, rs.CreateRepository(ctx, id, virtualStorage, relativePath, storage, nil, nil, false, false)) } require.NoError(t, rs.SetGeneration(ctx, virtualStorage, relativePath, storage, repo.generation)) @@ -1149,6 +1154,16 @@ func TestReconciler(t *testing.T) { } } + // Fill the expected reconciliation jobs with generated repository IDs. + for i, job := range tc.reconciliationJobs { + id, err := rs.GetRepositoryID(ctx, job.VirtualStorage, job.RelativePath) + if err != nil { + require.Equal(t, commonerr.NewRepositoryNotFoundError(job.VirtualStorage, job.RelativePath), err) + } + + tc.reconciliationJobs[i].RepositoryID = id + } + // run reconcile in two concurrent transactions to ensure everything works // as expected with multiple Praefect's reconciling at the same time firstTx := db.Begin(t) -- GitLab From 8e9163eb09c5b902c200f32dd6dfe8d2283124f5 Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Wed, 18 Aug 2021 12:44:02 +0200 Subject: [PATCH 13/14] Remove redundant struct name in RepositoryStore tests RepositoryStore tests use helper types to represent state to be asserted in the tests. As the map type already contains the type of the struct it contains, it's not necessary to write out the struct name when writing a literal map. This commit removes the redundant struct name from the literals. --- .../datastore/repository_store_test.go | 42 +++++++++---------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/internal/praefect/datastore/repository_store_test.go b/internal/praefect/datastore/repository_store_test.go index 3940e26ffc..0d774783fd 100644 --- a/internal/praefect/datastore/repository_store_test.go +++ b/internal/praefect/datastore/repository_store_test.go @@ -292,7 +292,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": repositoryRecord{repositoryID: 1}, + "repository-1": {repositoryID: 1}, }, }, storageState{ @@ -420,7 +420,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": repositoryRecord{repositoryID: 1}, + "repository-1": {repositoryID: 1}, }, }, storageState{ @@ -449,7 +449,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": repositoryRecord{repositoryID: 1}, + "repository-1": {repositoryID: 1}, }, }, storageState{ @@ -466,7 +466,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": repositoryRecord{repositoryID: 1}, + "repository-1": {repositoryID: 1}, }, }, storageState{ @@ -623,7 +623,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { requireState(t, ctx, virtualStorageState{ vs: { - repo: repositoryRecord{ + repo: { repositoryID: 1, primary: tc.expectedPrimary, assignments: tc.expectedAssignments, @@ -665,14 +665,14 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { requireState(t, ctx, virtualStorageState{ "deleted": { - "deleted": repositoryRecord{repositoryID: 1}, + "deleted": {repositoryID: 1}, }, "virtual-storage-1": { - "other-storages-remain": repositoryRecord{repositoryID: 2}, + "other-storages-remain": {repositoryID: 2}, }, "virtual-storage-2": { "deleted-repo": repositoryRecord{repositoryID: 3}, - "other-repo-remains": repositoryRecord{repositoryID: 4}, + "other-repo-remains": {repositoryID: 4}, }, }, storageState{ @@ -705,7 +705,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { requireState(t, ctx, virtualStorageState{ "virtual-storage-2": { - "other-repo-remains": repositoryRecord{repositoryID: 4}, + "other-repo-remains": {repositoryID: 4}, }, }, storageState{ @@ -730,7 +730,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": repositoryRecord{repositoryID: 1}, + "repository-1": {repositoryID: 1}, }, }, storageState{ @@ -773,11 +773,11 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "relative-path-1": repositoryRecord{repositoryID: 1}, - "relative-path-2": repositoryRecord{repositoryID: 2}, + "relative-path-1": {repositoryID: 1}, + "relative-path-2": {repositoryID: 2}, }, "virtual-storage-2": { - "relative-path-1": repositoryRecord{repositoryID: 3}, + "relative-path-1": {repositoryID: 3}, }, }, storageState{ @@ -803,11 +803,11 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "relative-path-1": repositoryRecord{repositoryID: 1}, - "relative-path-2": repositoryRecord{repositoryID: 2}, + "relative-path-1": {repositoryID: 1}, + "relative-path-2": {repositoryID: 2}, }, "virtual-storage-2": { - "relative-path-1": repositoryRecord{repositoryID: 3}, + "relative-path-1": {repositoryID: 3}, }, }, storageState{ @@ -849,7 +849,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { virtualStorageState{ "virtual-storage-1": { "renamed-all": repositoryRecord{repositoryID: 1}, - "renamed-some": repositoryRecord{repositoryID: 2}, + "renamed-some": {repositoryID: 2}, }, }, storageState{ @@ -872,7 +872,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { virtualStorageState{ "virtual-storage-1": { "renamed-all-new": repositoryRecord{repositoryID: 1}, - "renamed-some-new": repositoryRecord{repositoryID: 2}, + "renamed-some-new": {repositoryID: 2}, }, }, storageState{ @@ -909,7 +909,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": repositoryRecord{repositoryID: 1}, + "repository-1": {repositoryID: 1}, }, }, storageState{ @@ -943,7 +943,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": repositoryRecord{repositoryID: 1}, + "repository-1": {repositoryID: 1}, }, }, storageState{ @@ -999,7 +999,7 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { requireState(t, ctx, virtualStorageState{ "virtual-storage-1": { - "repository-1": repositoryRecord{repositoryID: 1}, + "repository-1": {repositoryID: 1}, }, }, storageState{ -- GitLab From 42ff8a857c1c350baa7970141544aa5552bb4700 Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Tue, 31 Aug 2021 14:52:27 +0200 Subject: [PATCH 14/14] Add test case for duplicate repository id in CreateRepository There's currently a test case to test an error is returned if a repository being created alreaady exists. It doesn't test separately the two unique constraints we have, first one being unique (virtual_storage, relative_path) and the second one being unique repository ID. While the repository ID's should always be unique due to the sequence being used to generate them, this commit splits the test case to assert both unique constraints separately. --- internal/praefect/datastore/repository_store.go | 4 ++++ .../praefect/datastore/repository_store_test.go | 15 +++++++++++++-- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/internal/praefect/datastore/repository_store.go b/internal/praefect/datastore/repository_store.go index 83571fa7e8..d0bc486619 100644 --- a/internal/praefect/datastore/repository_store.go +++ b/internal/praefect/datastore/repository_store.go @@ -402,6 +402,10 @@ FROM ( var pqerr *pq.Error if errors.As(err, &pqerr) && pqerr.Code.Name() == "unique_violation" { + if pqerr.Constraint == "repositories_pkey" { + return fmt.Errorf("repository id %d already in use", repositoryID) + } + return RepositoryExistsError{ virtualStorage: virtualStorage, relativePath: relativePath, diff --git a/internal/praefect/datastore/repository_store_test.go b/internal/praefect/datastore/repository_store_test.go index 0d774783fd..0e12c5d12b 100644 --- a/internal/praefect/datastore/repository_store_test.go +++ b/internal/praefect/datastore/repository_store_test.go @@ -3,6 +3,7 @@ package datastore import ( "context" "database/sql" + "fmt" "testing" "time" @@ -636,13 +637,23 @@ func testRepositoryStore(t *testing.T, newStore repositoryStoreFactory) { } }) - t.Run("conflict", func(t *testing.T) { + t.Run("conflict due to virtual storage and relative path", func(t *testing.T) { rs, _ := newStore(t, nil) require.NoError(t, rs.CreateRepository(ctx, 1, vs, repo, stor, nil, nil, false, false)) require.Equal(t, RepositoryExistsError{vs, repo, stor}, - rs.CreateRepository(ctx, 1, vs, repo, stor, nil, nil, false, false), + rs.CreateRepository(ctx, 2, vs, repo, stor, nil, nil, false, false), + ) + }) + + t.Run("conflict due to repository id", func(t *testing.T) { + rs, _ := newStore(t, nil) + + require.NoError(t, rs.CreateRepository(ctx, 1, "virtual-storage-1", "relative-path-1", "storage-1", nil, nil, false, false)) + require.Equal(t, + fmt.Errorf("repository id 1 already in use"), + rs.CreateRepository(ctx, 1, "virtual-storage-2", "relative-path-2", "storage-2", nil, nil, false, false), ) }) }) -- GitLab