diff --git a/internal/praefect/router.go b/internal/praefect/router.go index ad27675cd06305d1a9c5f117fd8fd601885da2f6..d4b08989c509785fa654ec7f62b434c26a32c89d 100644 --- a/internal/praefect/router.go +++ b/internal/praefect/router.go @@ -2,14 +2,12 @@ package praefect import ( "context" - "fmt" - "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" - "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" "google.golang.org/grpc" ) -// RouterNode represents a Node the router in a routing decision. +// RouterNode is a subset of a node's configuration needed to perform +// request routing. type RouterNode struct { // Storage is storage of the node. Storage string @@ -49,97 +47,3 @@ type Router interface { // Additionally, it returns nodes which should have the change replicated to. RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath string) (RepositoryMutatorRoute, error) } - -type nodeManagerRouter struct { - mgr nodes.Manager - rs datastore.RepositoryStore -} - -func toRouterNode(node nodes.Node) RouterNode { - return RouterNode{ - Storage: node.GetStorage(), - Connection: node.GetConnection(), - } -} - -func toRouterNodes(nodes []nodes.Node) []RouterNode { - out := make([]RouterNode, len(nodes)) - for i := range nodes { - out[i] = toRouterNode(nodes[i]) - } - return out -} - -// NeWNodeManagerRouter returns a router that uses the NodeManager to make routing decisions. -func NewNodeManagerRouter(mgr nodes.Manager, rs datastore.RepositoryStore) Router { - return &nodeManagerRouter{mgr: mgr, rs: rs} -} - -func (r *nodeManagerRouter) RouteRepositoryAccessor(ctx context.Context, virtualStorage, relativePath string) (RouterNode, error) { - node, err := r.mgr.GetSyncedNode(ctx, virtualStorage, relativePath) - if err != nil { - return RouterNode{}, fmt.Errorf("get synced node: %w", err) - } - - return toRouterNode(node), nil -} - -func (r *nodeManagerRouter) RouteStorageAccessor(ctx context.Context, virtualStorage string) (RouterNode, error) { - shard, err := r.mgr.GetShard(virtualStorage) - if err != nil { - return RouterNode{}, err - } - - return toRouterNode(shard.Primary), nil -} - -func (r *nodeManagerRouter) RouteStorageMutator(ctx context.Context, virtualStorage string) (StorageMutatorRoute, error) { - shard, err := r.mgr.GetShard(virtualStorage) - if err != nil { - return StorageMutatorRoute{}, err - } - - return StorageMutatorRoute{ - Primary: toRouterNode(shard.Primary), - Secondaries: toRouterNodes(shard.GetHealthySecondaries()), - }, nil -} - -func (r *nodeManagerRouter) RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath string) (RepositoryMutatorRoute, error) { - shard, err := r.mgr.GetShard(virtualStorage) - if err != nil { - return RepositoryMutatorRoute{}, fmt.Errorf("get shard: %w", err) - } - - if latest, err := r.rs.IsLatestGeneration(ctx, virtualStorage, relativePath, shard.Primary.GetStorage()); err != nil { - return RepositoryMutatorRoute{}, fmt.Errorf("check generation: %w", err) - } else if !latest { - return RepositoryMutatorRoute{}, ErrRepositoryReadOnly - } - - // Only healthy secondaries which are consistent with the primary are allowed to take - // part in the transaction. Unhealthy nodes would block the transaction until they come back. - // Inconsistent nodes will anyway need repair so including them doesn't make sense. They - // also might vote to abort which might unnecessarily fail the transaction. - consistentSecondaries, err := r.rs.GetConsistentSecondaries(ctx, virtualStorage, relativePath, shard.Primary.GetStorage()) - if err != nil { - return RepositoryMutatorRoute{}, fmt.Errorf("consistent secondaries: %w", err) - } - - var replicationTargets []string - participatingSecondaries := make([]nodes.Node, 0, len(consistentSecondaries)) - for _, secondary := range shard.Secondaries { - if _, ok := consistentSecondaries[secondary.GetStorage()]; ok && secondary.IsHealthy() { - participatingSecondaries = append(participatingSecondaries, secondary) - continue - } - - replicationTargets = append(replicationTargets, secondary.GetStorage()) - } - - return RepositoryMutatorRoute{ - Primary: toRouterNode(shard.Primary), - Secondaries: toRouterNodes(participatingSecondaries), - ReplicationTargets: replicationTargets, - }, nil -} diff --git a/internal/praefect/router_node_manager.go b/internal/praefect/router_node_manager.go new file mode 100644 index 0000000000000000000000000000000000000000..4df9e769676eb83f329b509037e91bcf4a8e416e --- /dev/null +++ b/internal/praefect/router_node_manager.go @@ -0,0 +1,103 @@ +package praefect + +import ( + "context" + "fmt" + + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" +) + +type nodeManagerRouter struct { + mgr nodes.Manager + rs datastore.RepositoryStore +} + +func toRouterNode(node nodes.Node) RouterNode { + return RouterNode{ + Storage: node.GetStorage(), + Connection: node.GetConnection(), + } +} + +func toRouterNodes(nodes []nodes.Node) []RouterNode { + out := make([]RouterNode, len(nodes)) + for i := range nodes { + out[i] = toRouterNode(nodes[i]) + } + return out +} + +// NeWNodeManagerRouter returns a router that uses the NodeManager to make routing decisions. +func NewNodeManagerRouter(mgr nodes.Manager, rs datastore.RepositoryStore) Router { + return &nodeManagerRouter{mgr: mgr, rs: rs} +} + +func (r *nodeManagerRouter) RouteRepositoryAccessor(ctx context.Context, virtualStorage, relativePath string) (RouterNode, error) { + node, err := r.mgr.GetSyncedNode(ctx, virtualStorage, relativePath) + if err != nil { + return RouterNode{}, fmt.Errorf("get synced node: %w", err) + } + + return toRouterNode(node), nil +} + +func (r *nodeManagerRouter) RouteStorageAccessor(ctx context.Context, virtualStorage string) (RouterNode, error) { + shard, err := r.mgr.GetShard(virtualStorage) + if err != nil { + return RouterNode{}, err + } + + return toRouterNode(shard.Primary), nil +} + +func (r *nodeManagerRouter) RouteStorageMutator(ctx context.Context, virtualStorage string) (StorageMutatorRoute, error) { + shard, err := r.mgr.GetShard(virtualStorage) + if err != nil { + return StorageMutatorRoute{}, err + } + + return StorageMutatorRoute{ + Primary: toRouterNode(shard.Primary), + Secondaries: toRouterNodes(shard.GetHealthySecondaries()), + }, nil +} + +func (r *nodeManagerRouter) RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath string) (RepositoryMutatorRoute, error) { + shard, err := r.mgr.GetShard(virtualStorage) + if err != nil { + return RepositoryMutatorRoute{}, fmt.Errorf("get shard: %w", err) + } + + if latest, err := r.rs.IsLatestGeneration(ctx, virtualStorage, relativePath, shard.Primary.GetStorage()); err != nil { + return RepositoryMutatorRoute{}, fmt.Errorf("check generation: %w", err) + } else if !latest { + return RepositoryMutatorRoute{}, ErrRepositoryReadOnly + } + + // Only healthy secondaries which are consistent with the primary are allowed to take + // part in the transaction. Unhealthy nodes would block the transaction until they come back. + // Inconsistent nodes will anyway need repair so including them doesn't make sense. They + // also might vote to abort which might unnecessarily fail the transaction. + consistentSecondaries, err := r.rs.GetConsistentSecondaries(ctx, virtualStorage, relativePath, shard.Primary.GetStorage()) + if err != nil { + return RepositoryMutatorRoute{}, fmt.Errorf("consistent secondaries: %w", err) + } + + var replicationTargets []string + participatingSecondaries := make([]nodes.Node, 0, len(consistentSecondaries)) + for _, secondary := range shard.Secondaries { + if _, ok := consistentSecondaries[secondary.GetStorage()]; ok && secondary.IsHealthy() { + participatingSecondaries = append(participatingSecondaries, secondary) + continue + } + + replicationTargets = append(replicationTargets, secondary.GetStorage()) + } + + return RepositoryMutatorRoute{ + Primary: toRouterNode(shard.Primary), + Secondaries: toRouterNodes(participatingSecondaries), + ReplicationTargets: replicationTargets, + }, nil +} diff --git a/internal/praefect/router_per_repository.go b/internal/praefect/router_per_repository.go new file mode 100644 index 0000000000000000000000000000000000000000..47d8bfb04306787a3e846e1be5128b463eebdd30 --- /dev/null +++ b/internal/praefect/router_per_repository.go @@ -0,0 +1,184 @@ +package praefect + +import ( + "context" + "errors" + "fmt" + + "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" + "google.golang.org/grpc" +) + +// ErrNoSuitableNode is returned when there is not suitable node to serve a request. +var ErrNoSuitableNode = errors.New("no suitable node to serve the request") + +// ErrNoHealthyNodes is returned when there are no healthy nodes to serve a request. +var ErrNoHealthyNodes = errors.New("no healthy nodes") + +// Connections is a set of connections to configured storage nodes by their virtual storages. +type Connections map[string]map[string]*grpc.ClientConn + +// PrimaryGetter is an interface for getting a primary of a repository. +type PrimaryGetter interface { + // GetPrimary returns the primary storage for a given repository. + GetPrimary(ctx context.Context, virtualStorage string, relativePath string) (string, error) +} + +// Random is the interface of the Go random number generator. +type Random interface { + // Intn returns a random integer in the range [0,n). + Intn(n int) int +} + +// RandomFunc is an adapter to turn conforming functions in to a Random. +type RandomFunc func(n int) int + +func (fn RandomFunc) Intn(n int) int { return fn(n) } + +// PerRepositoryRouter implements a router that routes requests respecting per repository primary nodes. +type PerRepositoryRouter struct { + conns Connections + pg PrimaryGetter + rand Random + hc HealthChecker + rs datastore.RepositoryStore +} + +// NewPerRepositoryRouter returns a new PerRepositoryRouter using the passed configuration. +func NewPerRepositoryRouter(conns Connections, pg PrimaryGetter, hc HealthChecker, rand Random, rs datastore.RepositoryStore) *PerRepositoryRouter { + return &PerRepositoryRouter{ + conns: conns, + pg: pg, + rand: rand, + hc: hc, + rs: rs, + } +} + +func (r *PerRepositoryRouter) healthyNodes(virtualStorage string) ([]RouterNode, error) { + conns, ok := r.conns[virtualStorage] + if !ok { + return nil, nodes.ErrVirtualStorageNotExist + } + + healthyNodes := make([]RouterNode, 0, len(conns)) + for _, storage := range r.hc.HealthyNodes()[virtualStorage] { + conn, ok := conns[storage] + if !ok { + return nil, fmt.Errorf("no connection to node %q/%q", virtualStorage, storage) + } + + healthyNodes = append(healthyNodes, RouterNode{ + Storage: storage, + Connection: conn, + }) + } + + if len(healthyNodes) == 0 { + return nil, ErrNoHealthyNodes + } + + return healthyNodes, nil +} + +func (r *PerRepositoryRouter) pickRandom(nodes []RouterNode) (RouterNode, error) { + if len(nodes) == 0 { + return RouterNode{}, ErrNoSuitableNode + } + + return nodes[r.rand.Intn(len(nodes))], nil +} + +// The only storage scoped accessor RPC is RemoteService/FindRemoteRepository, which in turn executes a command +// without a repository. This can be done by any Gitaly server as it doesn't depend on the state on the server. +func (r *PerRepositoryRouter) RouteStorageAccessor(ctx context.Context, virtualStorage string) (RouterNode, error) { + healthyNodes, err := r.healthyNodes(virtualStorage) + if err != nil { + return RouterNode{}, err + } + + return r.pickRandom(healthyNodes) +} + +// RouteStorageMutator is not implemented here. The only storage scoped mutator RPC is related to namespace operations. +// These are not relevant anymore, given hashed storage is default everywhere, and should be eventually removed. +func (r *PerRepositoryRouter) RouteStorageMutator(ctx context.Context, virtualStorage string) (StorageMutatorRoute, error) { + return StorageMutatorRoute{}, errors.New("RouteStorageMutator is not implemented on PerRepositoryRouter") +} + +func (r *PerRepositoryRouter) RouteRepositoryAccessor(ctx context.Context, virtualStorage, relativePath string) (RouterNode, error) { + healthyNodes, err := r.healthyNodes(virtualStorage) + if err != nil { + return RouterNode{}, err + } + + primary, err := r.pg.GetPrimary(ctx, virtualStorage, relativePath) + if err != nil { + return RouterNode{}, fmt.Errorf("get primary: %w", err) + } + + consistentSecondaries, err := r.rs.GetConsistentSecondaries(ctx, virtualStorage, relativePath, primary) + if err != nil { + // this is recoverable error - proceed with primary node + ctxlogrus.Extract(ctx).WithError(err).Warn("get up to date secondaries") + } + + consistentSecondaries[primary] = struct{}{} + + healthyConsistentNodes := make([]RouterNode, 0, len(healthyNodes)) + for _, node := range healthyNodes { + if _, ok := consistentSecondaries[node.Storage]; !ok { + continue + } + + healthyConsistentNodes = append(healthyConsistentNodes, node) + } + + return r.pickRandom(healthyConsistentNodes) +} + +func (r *PerRepositoryRouter) RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath string) (RepositoryMutatorRoute, error) { + healthyNodes, err := r.healthyNodes(virtualStorage) + if err != nil { + return RepositoryMutatorRoute{}, err + } + + primary, err := r.pg.GetPrimary(ctx, virtualStorage, relativePath) + if err != nil { + return RepositoryMutatorRoute{}, fmt.Errorf("get primary: %w", err) + } + + if latest, err := r.rs.IsLatestGeneration(ctx, virtualStorage, relativePath, primary); err != nil { + return RepositoryMutatorRoute{}, fmt.Errorf("is latest generation: %w", err) + } else if !latest { + return RepositoryMutatorRoute{}, ErrRepositoryReadOnly + } + + consistentSecondaries, err := r.rs.GetConsistentSecondaries(ctx, virtualStorage, relativePath, primary) + if err != nil { + return RepositoryMutatorRoute{}, fmt.Errorf("consistent secondaries: %w", err) + } + + var route RepositoryMutatorRoute + for _, node := range healthyNodes { + if node.Storage == primary { + route.Primary = node + continue + } + + if _, ok := consistentSecondaries[node.Storage]; !ok { + route.ReplicationTargets = append(route.ReplicationTargets, node.Storage) + continue + } + + route.Secondaries = append(route.Secondaries, node) + } + + if (route.Primary == RouterNode{}) { + return RepositoryMutatorRoute{}, nodes.ErrPrimaryNotHealthy + } + + return route, nil +} diff --git a/internal/praefect/router_per_repository_test.go b/internal/praefect/router_per_repository_test.go new file mode 100644 index 0000000000000000000000000000000000000000..9d64a14176fdd17b733e79c0cdc7c85e25a9d151 --- /dev/null +++ b/internal/praefect/router_per_repository_test.go @@ -0,0 +1,338 @@ +package praefect + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" + "gitlab.com/gitlab-org/gitaly/internal/testhelper" + "google.golang.org/grpc" +) + +// PrimaryGetter is an adapter to turn conforming functions in to a PrimaryGetter. +type PrimaryGetterFunc func(ctx context.Context, virtualStorage, relativePath string) (string, error) + +func (fn PrimaryGetterFunc) GetPrimary(ctx context.Context, virtualStorage, relativePath string) (string, error) { + return fn(ctx, virtualStorage, relativePath) +} + +type MockRepositoryStore struct { + datastore.RepositoryStore + GetConsistentSecondariesFunc func(ctx context.Context, virtualStorage, relativePath, primary string) (map[string]struct{}, error) + IsLatestGenerationFunc func(ctx context.Context, virtualStorage, relativePath, storage string) (bool, error) +} + +func (m MockRepositoryStore) GetConsistentSecondaries(ctx context.Context, virtualStorage, relativePath, primary string) (map[string]struct{}, error) { + return m.GetConsistentSecondariesFunc(ctx, virtualStorage, relativePath, primary) +} + +func (m MockRepositoryStore) IsLatestGeneration(ctx context.Context, virtualStorage, relativePath, storage string) (bool, error) { + return m.IsLatestGenerationFunc(ctx, virtualStorage, relativePath, storage) +} + +func TestPerRepositoryRouter_RouteStorageAccessor(t *testing.T) { + ctx, cancel := testhelper.Context() + defer cancel() + + for _, tc := range []struct { + desc string + virtualStorage string + numCandidates int + pickCandidate int + error error + node string + }{ + { + desc: "unknown virtual storage", + virtualStorage: "unknown", + error: nodes.ErrVirtualStorageNotExist, + }, + { + desc: "picks randomly first candidate", + virtualStorage: "virtual-storage-1", + numCandidates: 2, + pickCandidate: 0, + node: "valid-choice-1", + }, + { + desc: "picks randomly second candidate", + virtualStorage: "virtual-storage-1", + numCandidates: 2, + pickCandidate: 1, + node: "valid-choice-2", + }, + } { + t.Run(tc.desc, func(t *testing.T) { + conns := Connections{ + "virtual-storage-1": { + "valid-choice-1": &grpc.ClientConn{}, + "valid-choice-2": &grpc.ClientConn{}, + "unhealthy": &grpc.ClientConn{}, + }, + } + + router := NewPerRepositoryRouter( + conns, + nil, + StaticHealthChecker{ + "virtual-storage-1": { + "valid-choice-1", + "valid-choice-2", + }, + }, + RandomFunc(func(n int) int { + require.Equal(t, tc.numCandidates, n) + return tc.pickCandidate + }), + nil, + ) + + node, err := router.RouteStorageAccessor(ctx, tc.virtualStorage) + require.Equal(t, tc.error, err) + require.Equal(t, RouterNode{ + Storage: tc.node, + Connection: conns["virtual-storage-1"][tc.node], + }, node) + }) + } +} + +func TestPerRepositoryRouter_RouteRepositoryAccessor(t *testing.T) { + for _, tc := range []struct { + desc string + virtualStorage string + healthyNodes StaticHealthChecker + numCandidates int + pickCandidate int + error error + node string + }{ + { + desc: "unknown virtual storage", + virtualStorage: "unknown", + error: nodes.ErrVirtualStorageNotExist, + }, + { + desc: "no healthy nodes", + virtualStorage: "virtual-storage-1", + healthyNodes: map[string][]string{}, + error: ErrNoHealthyNodes, + }, + { + desc: "primary picked randomly", + virtualStorage: "virtual-storage-1", + healthyNodes: map[string][]string{ + "virtual-storage-1": {"primary", "consistent-secondary"}, + }, + numCandidates: 2, + pickCandidate: 0, + node: "primary", + }, + { + desc: "secondary picked randomly", + virtualStorage: "virtual-storage-1", + healthyNodes: map[string][]string{ + "virtual-storage-1": {"primary", "consistent-secondary"}, + }, + numCandidates: 2, + pickCandidate: 1, + node: "consistent-secondary", + }, + { + desc: "secondary picked when primary is unhealthy", + virtualStorage: "virtual-storage-1", + healthyNodes: map[string][]string{ + "virtual-storage-1": {"consistent-secondary"}, + }, + numCandidates: 1, + node: "consistent-secondary", + }, + { + desc: "no suitable nodes", + virtualStorage: "virtual-storage-1", + healthyNodes: map[string][]string{ + "virtual-storage-1": {"inconistent-secondary"}, + }, + error: ErrNoSuitableNode, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + ctx, cancel := testhelper.Context() + defer cancel() + + conns := Connections{ + "virtual-storage-1": { + "primary": &grpc.ClientConn{}, + "consistent-secondary": &grpc.ClientConn{}, + "inconistent-secondary": &grpc.ClientConn{}, + "unhealthy-secondary": &grpc.ClientConn{}, + }, + } + + router := NewPerRepositoryRouter( + conns, + PrimaryGetterFunc(func(ctx context.Context, virtualStorage, relativePath string) (string, error) { + t.Helper() + require.Equal(t, tc.virtualStorage, virtualStorage) + require.Equal(t, "repository", relativePath) + return "primary", nil + }), + tc.healthyNodes, + RandomFunc(func(n int) int { + t.Helper() + require.Equal(t, tc.numCandidates, n) + return tc.pickCandidate + }), + MockRepositoryStore{ + GetConsistentSecondariesFunc: func(ctx context.Context, virtualStorage, relativePath, primary string) (map[string]struct{}, error) { + t.Helper() + require.Equal(t, tc.virtualStorage, virtualStorage) + require.Equal(t, "repository", relativePath) + require.Equal(t, "primary", primary) + return map[string]struct{}{"consistent-secondary": struct{}{}}, nil + }, + }, + ) + + node, err := router.RouteRepositoryAccessor(ctx, tc.virtualStorage, "repository") + require.Equal(t, tc.error, err) + if tc.node != "" { + require.Equal(t, RouterNode{ + Storage: tc.node, + Connection: conns[tc.virtualStorage][tc.node], + }, node) + } else { + require.Empty(t, node) + } + }) + } +} + +func TestPerRepositoryRouter_RouteRepositoryMutator(t *testing.T) { + for _, tc := range []struct { + desc string + virtualStorage string + healthyNodes StaticHealthChecker + isLatestGeneration bool + consistentSecondaries []string + secondaries []string + replicationTargets []string + error error + }{ + { + desc: "unknown virtual storage", + virtualStorage: "unknown", + error: nodes.ErrVirtualStorageNotExist, + }, + { + desc: "primary outdated", + virtualStorage: "virtual-storage-1", + isLatestGeneration: false, + healthyNodes: map[string][]string{"virtual-storage-1": {"primary", "secondary-1", "secondary-2"}}, + consistentSecondaries: []string{"secondary-1", "secondary-2"}, + error: ErrRepositoryReadOnly, + }, + { + desc: "primary unhealthy", + virtualStorage: "virtual-storage-1", + isLatestGeneration: true, + healthyNodes: map[string][]string{"virtual-storage-1": {"secondary-1", "secondary-2"}}, + consistentSecondaries: []string{"secondary-1", "secondary-2"}, + error: nodes.ErrPrimaryNotHealthy, + }, + { + desc: "all secondaries consistent", + virtualStorage: "virtual-storage-1", + isLatestGeneration: true, + healthyNodes: map[string][]string{"virtual-storage-1": {"primary", "secondary-1", "secondary-2"}}, + consistentSecondaries: []string{"secondary-1", "secondary-2"}, + secondaries: []string{"secondary-1", "secondary-2"}, + }, + { + desc: "inconsistent secondary", + virtualStorage: "virtual-storage-1", + isLatestGeneration: true, + healthyNodes: map[string][]string{"virtual-storage-1": {"primary", "secondary-1", "secondary-2"}}, + consistentSecondaries: []string{"secondary-2"}, + secondaries: []string{"secondary-2"}, + replicationTargets: []string{"secondary-1"}, + }, + { + desc: "unhealthy secondaries", + virtualStorage: "virtual-storage-1", + isLatestGeneration: true, + healthyNodes: map[string][]string{"virtual-storage-1": {"primary"}}, + consistentSecondaries: []string{"secondary-1"}, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + ctx, cancel := testhelper.Context() + defer cancel() + + conns := Connections{ + "virtual-storage-1": { + "primary": &grpc.ClientConn{}, + "secondary-1": &grpc.ClientConn{}, + "secondary-2": &grpc.ClientConn{}, + }, + } + + router := NewPerRepositoryRouter( + conns, + PrimaryGetterFunc(func(ctx context.Context, virtualStorage, relativePath string) (string, error) { + t.Helper() + require.Equal(t, tc.virtualStorage, virtualStorage) + require.Equal(t, "repository", relativePath) + return "primary", nil + }), + tc.healthyNodes, + nil, + MockRepositoryStore{ + IsLatestGenerationFunc: func(ctx context.Context, virtualStorage, relativePath, storage string) (bool, error) { + t.Helper() + require.Equal(t, tc.virtualStorage, virtualStorage) + require.Equal(t, "repository", relativePath) + require.Equal(t, "primary", storage) + return tc.isLatestGeneration, nil + }, + + GetConsistentSecondariesFunc: func(ctx context.Context, virtualStorage, relativePath, primary string) (map[string]struct{}, error) { + t.Helper() + require.Equal(t, tc.virtualStorage, virtualStorage) + require.Equal(t, "repository", relativePath) + require.Equal(t, "primary", primary) + consistentSecondaries := map[string]struct{}{} + for _, storage := range tc.consistentSecondaries { + consistentSecondaries[storage] = struct{}{} + } + + return consistentSecondaries, nil + }, + }, + ) + + route, err := router.RouteRepositoryMutator(ctx, tc.virtualStorage, "repository") + require.Equal(t, tc.error, err) + if err == nil { + var secondaries []RouterNode + for _, secondary := range tc.secondaries { + secondaries = append(secondaries, RouterNode{ + Storage: secondary, + Connection: conns[tc.virtualStorage][secondary], + }) + } + + require.Equal(t, RepositoryMutatorRoute{ + Primary: RouterNode{ + Storage: "primary", + Connection: conns[tc.virtualStorage]["primary"], + }, + Secondaries: secondaries, + ReplicationTargets: tc.replicationTargets, + }, route) + } + }) + } +}