From dc6b6211545445579e27ac94a852d53360b544d0 Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Mon, 2 Nov 2020 11:49:30 +0100 Subject: [PATCH 1/2] split nodeManagerRouter in to its own file Splits nodeManagerRouter's implementation in its own file in preparation for adding other router implementations. --- internal/praefect/router.go | 97 --------------------- internal/praefect/router_node_manager.go | 103 +++++++++++++++++++++++ 2 files changed, 103 insertions(+), 97 deletions(-) create mode 100644 internal/praefect/router_node_manager.go diff --git a/internal/praefect/router.go b/internal/praefect/router.go index ad27675cd0..57f4c2bcb0 100644 --- a/internal/praefect/router.go +++ b/internal/praefect/router.go @@ -2,10 +2,7 @@ package praefect import ( "context" - "fmt" - "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" - "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" "google.golang.org/grpc" ) @@ -49,97 +46,3 @@ type Router interface { // Additionally, it returns nodes which should have the change replicated to. RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath string) (RepositoryMutatorRoute, error) } - -type nodeManagerRouter struct { - mgr nodes.Manager - rs datastore.RepositoryStore -} - -func toRouterNode(node nodes.Node) RouterNode { - return RouterNode{ - Storage: node.GetStorage(), - Connection: node.GetConnection(), - } -} - -func toRouterNodes(nodes []nodes.Node) []RouterNode { - out := make([]RouterNode, len(nodes)) - for i := range nodes { - out[i] = toRouterNode(nodes[i]) - } - return out -} - -// NeWNodeManagerRouter returns a router that uses the NodeManager to make routing decisions. -func NewNodeManagerRouter(mgr nodes.Manager, rs datastore.RepositoryStore) Router { - return &nodeManagerRouter{mgr: mgr, rs: rs} -} - -func (r *nodeManagerRouter) RouteRepositoryAccessor(ctx context.Context, virtualStorage, relativePath string) (RouterNode, error) { - node, err := r.mgr.GetSyncedNode(ctx, virtualStorage, relativePath) - if err != nil { - return RouterNode{}, fmt.Errorf("get synced node: %w", err) - } - - return toRouterNode(node), nil -} - -func (r *nodeManagerRouter) RouteStorageAccessor(ctx context.Context, virtualStorage string) (RouterNode, error) { - shard, err := r.mgr.GetShard(virtualStorage) - if err != nil { - return RouterNode{}, err - } - - return toRouterNode(shard.Primary), nil -} - -func (r *nodeManagerRouter) RouteStorageMutator(ctx context.Context, virtualStorage string) (StorageMutatorRoute, error) { - shard, err := r.mgr.GetShard(virtualStorage) - if err != nil { - return StorageMutatorRoute{}, err - } - - return StorageMutatorRoute{ - Primary: toRouterNode(shard.Primary), - Secondaries: toRouterNodes(shard.GetHealthySecondaries()), - }, nil -} - -func (r *nodeManagerRouter) RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath string) (RepositoryMutatorRoute, error) { - shard, err := r.mgr.GetShard(virtualStorage) - if err != nil { - return RepositoryMutatorRoute{}, fmt.Errorf("get shard: %w", err) - } - - if latest, err := r.rs.IsLatestGeneration(ctx, virtualStorage, relativePath, shard.Primary.GetStorage()); err != nil { - return RepositoryMutatorRoute{}, fmt.Errorf("check generation: %w", err) - } else if !latest { - return RepositoryMutatorRoute{}, ErrRepositoryReadOnly - } - - // Only healthy secondaries which are consistent with the primary are allowed to take - // part in the transaction. Unhealthy nodes would block the transaction until they come back. - // Inconsistent nodes will anyway need repair so including them doesn't make sense. They - // also might vote to abort which might unnecessarily fail the transaction. - consistentSecondaries, err := r.rs.GetConsistentSecondaries(ctx, virtualStorage, relativePath, shard.Primary.GetStorage()) - if err != nil { - return RepositoryMutatorRoute{}, fmt.Errorf("consistent secondaries: %w", err) - } - - var replicationTargets []string - participatingSecondaries := make([]nodes.Node, 0, len(consistentSecondaries)) - for _, secondary := range shard.Secondaries { - if _, ok := consistentSecondaries[secondary.GetStorage()]; ok && secondary.IsHealthy() { - participatingSecondaries = append(participatingSecondaries, secondary) - continue - } - - replicationTargets = append(replicationTargets, secondary.GetStorage()) - } - - return RepositoryMutatorRoute{ - Primary: toRouterNode(shard.Primary), - Secondaries: toRouterNodes(participatingSecondaries), - ReplicationTargets: replicationTargets, - }, nil -} diff --git a/internal/praefect/router_node_manager.go b/internal/praefect/router_node_manager.go new file mode 100644 index 0000000000..4df9e76967 --- /dev/null +++ b/internal/praefect/router_node_manager.go @@ -0,0 +1,103 @@ +package praefect + +import ( + "context" + "fmt" + + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" +) + +type nodeManagerRouter struct { + mgr nodes.Manager + rs datastore.RepositoryStore +} + +func toRouterNode(node nodes.Node) RouterNode { + return RouterNode{ + Storage: node.GetStorage(), + Connection: node.GetConnection(), + } +} + +func toRouterNodes(nodes []nodes.Node) []RouterNode { + out := make([]RouterNode, len(nodes)) + for i := range nodes { + out[i] = toRouterNode(nodes[i]) + } + return out +} + +// NeWNodeManagerRouter returns a router that uses the NodeManager to make routing decisions. +func NewNodeManagerRouter(mgr nodes.Manager, rs datastore.RepositoryStore) Router { + return &nodeManagerRouter{mgr: mgr, rs: rs} +} + +func (r *nodeManagerRouter) RouteRepositoryAccessor(ctx context.Context, virtualStorage, relativePath string) (RouterNode, error) { + node, err := r.mgr.GetSyncedNode(ctx, virtualStorage, relativePath) + if err != nil { + return RouterNode{}, fmt.Errorf("get synced node: %w", err) + } + + return toRouterNode(node), nil +} + +func (r *nodeManagerRouter) RouteStorageAccessor(ctx context.Context, virtualStorage string) (RouterNode, error) { + shard, err := r.mgr.GetShard(virtualStorage) + if err != nil { + return RouterNode{}, err + } + + return toRouterNode(shard.Primary), nil +} + +func (r *nodeManagerRouter) RouteStorageMutator(ctx context.Context, virtualStorage string) (StorageMutatorRoute, error) { + shard, err := r.mgr.GetShard(virtualStorage) + if err != nil { + return StorageMutatorRoute{}, err + } + + return StorageMutatorRoute{ + Primary: toRouterNode(shard.Primary), + Secondaries: toRouterNodes(shard.GetHealthySecondaries()), + }, nil +} + +func (r *nodeManagerRouter) RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath string) (RepositoryMutatorRoute, error) { + shard, err := r.mgr.GetShard(virtualStorage) + if err != nil { + return RepositoryMutatorRoute{}, fmt.Errorf("get shard: %w", err) + } + + if latest, err := r.rs.IsLatestGeneration(ctx, virtualStorage, relativePath, shard.Primary.GetStorage()); err != nil { + return RepositoryMutatorRoute{}, fmt.Errorf("check generation: %w", err) + } else if !latest { + return RepositoryMutatorRoute{}, ErrRepositoryReadOnly + } + + // Only healthy secondaries which are consistent with the primary are allowed to take + // part in the transaction. Unhealthy nodes would block the transaction until they come back. + // Inconsistent nodes will anyway need repair so including them doesn't make sense. They + // also might vote to abort which might unnecessarily fail the transaction. + consistentSecondaries, err := r.rs.GetConsistentSecondaries(ctx, virtualStorage, relativePath, shard.Primary.GetStorage()) + if err != nil { + return RepositoryMutatorRoute{}, fmt.Errorf("consistent secondaries: %w", err) + } + + var replicationTargets []string + participatingSecondaries := make([]nodes.Node, 0, len(consistentSecondaries)) + for _, secondary := range shard.Secondaries { + if _, ok := consistentSecondaries[secondary.GetStorage()]; ok && secondary.IsHealthy() { + participatingSecondaries = append(participatingSecondaries, secondary) + continue + } + + replicationTargets = append(replicationTargets, secondary.GetStorage()) + } + + return RepositoryMutatorRoute{ + Primary: toRouterNode(shard.Primary), + Secondaries: toRouterNodes(participatingSecondaries), + ReplicationTargets: replicationTargets, + }, nil +} -- GitLab From d04a0391101ae6a0476aa65cdeb7d5b7cdd63918 Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Mon, 2 Nov 2020 11:50:09 +0100 Subject: [PATCH 2/2] implement a router on the per repository elector stack This commit adds a router implementation based on the per repository elector stack. The behavior deviates in few ways from the current nodeManager router: 1. RouteStorageMutator is unimplemented as it doesn't appear we should receive such requests any more with hashed storage. All mutators should be repository scoped given repository scoped primaries. 2. RouteStorageAccessor routes the requests to any healthy node. We can't match the behavior of nodeManagerRouter as we don't have a primary for a virtual storage. This should be fine though, as the only RPC that is scoped like this only makes a remote call and doesn't depend on any state on the server. 3. Repository scoped messages respect repository specific primaries as opposed to virtual storage scoped primaries in the nodeManagerRouter. This commit does not yet hook up the router to the rest of the code. --- internal/praefect/router.go | 3 +- internal/praefect/router_per_repository.go | 184 ++++++++++ .../praefect/router_per_repository_test.go | 338 ++++++++++++++++++ 3 files changed, 524 insertions(+), 1 deletion(-) create mode 100644 internal/praefect/router_per_repository.go create mode 100644 internal/praefect/router_per_repository_test.go diff --git a/internal/praefect/router.go b/internal/praefect/router.go index 57f4c2bcb0..d4b08989c5 100644 --- a/internal/praefect/router.go +++ b/internal/praefect/router.go @@ -6,7 +6,8 @@ import ( "google.golang.org/grpc" ) -// RouterNode represents a Node the router in a routing decision. +// RouterNode is a subset of a node's configuration needed to perform +// request routing. type RouterNode struct { // Storage is storage of the node. Storage string diff --git a/internal/praefect/router_per_repository.go b/internal/praefect/router_per_repository.go new file mode 100644 index 0000000000..47d8bfb043 --- /dev/null +++ b/internal/praefect/router_per_repository.go @@ -0,0 +1,184 @@ +package praefect + +import ( + "context" + "errors" + "fmt" + + "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" + "google.golang.org/grpc" +) + +// ErrNoSuitableNode is returned when there is not suitable node to serve a request. +var ErrNoSuitableNode = errors.New("no suitable node to serve the request") + +// ErrNoHealthyNodes is returned when there are no healthy nodes to serve a request. +var ErrNoHealthyNodes = errors.New("no healthy nodes") + +// Connections is a set of connections to configured storage nodes by their virtual storages. +type Connections map[string]map[string]*grpc.ClientConn + +// PrimaryGetter is an interface for getting a primary of a repository. +type PrimaryGetter interface { + // GetPrimary returns the primary storage for a given repository. + GetPrimary(ctx context.Context, virtualStorage string, relativePath string) (string, error) +} + +// Random is the interface of the Go random number generator. +type Random interface { + // Intn returns a random integer in the range [0,n). + Intn(n int) int +} + +// RandomFunc is an adapter to turn conforming functions in to a Random. +type RandomFunc func(n int) int + +func (fn RandomFunc) Intn(n int) int { return fn(n) } + +// PerRepositoryRouter implements a router that routes requests respecting per repository primary nodes. +type PerRepositoryRouter struct { + conns Connections + pg PrimaryGetter + rand Random + hc HealthChecker + rs datastore.RepositoryStore +} + +// NewPerRepositoryRouter returns a new PerRepositoryRouter using the passed configuration. +func NewPerRepositoryRouter(conns Connections, pg PrimaryGetter, hc HealthChecker, rand Random, rs datastore.RepositoryStore) *PerRepositoryRouter { + return &PerRepositoryRouter{ + conns: conns, + pg: pg, + rand: rand, + hc: hc, + rs: rs, + } +} + +func (r *PerRepositoryRouter) healthyNodes(virtualStorage string) ([]RouterNode, error) { + conns, ok := r.conns[virtualStorage] + if !ok { + return nil, nodes.ErrVirtualStorageNotExist + } + + healthyNodes := make([]RouterNode, 0, len(conns)) + for _, storage := range r.hc.HealthyNodes()[virtualStorage] { + conn, ok := conns[storage] + if !ok { + return nil, fmt.Errorf("no connection to node %q/%q", virtualStorage, storage) + } + + healthyNodes = append(healthyNodes, RouterNode{ + Storage: storage, + Connection: conn, + }) + } + + if len(healthyNodes) == 0 { + return nil, ErrNoHealthyNodes + } + + return healthyNodes, nil +} + +func (r *PerRepositoryRouter) pickRandom(nodes []RouterNode) (RouterNode, error) { + if len(nodes) == 0 { + return RouterNode{}, ErrNoSuitableNode + } + + return nodes[r.rand.Intn(len(nodes))], nil +} + +// The only storage scoped accessor RPC is RemoteService/FindRemoteRepository, which in turn executes a command +// without a repository. This can be done by any Gitaly server as it doesn't depend on the state on the server. +func (r *PerRepositoryRouter) RouteStorageAccessor(ctx context.Context, virtualStorage string) (RouterNode, error) { + healthyNodes, err := r.healthyNodes(virtualStorage) + if err != nil { + return RouterNode{}, err + } + + return r.pickRandom(healthyNodes) +} + +// RouteStorageMutator is not implemented here. The only storage scoped mutator RPC is related to namespace operations. +// These are not relevant anymore, given hashed storage is default everywhere, and should be eventually removed. +func (r *PerRepositoryRouter) RouteStorageMutator(ctx context.Context, virtualStorage string) (StorageMutatorRoute, error) { + return StorageMutatorRoute{}, errors.New("RouteStorageMutator is not implemented on PerRepositoryRouter") +} + +func (r *PerRepositoryRouter) RouteRepositoryAccessor(ctx context.Context, virtualStorage, relativePath string) (RouterNode, error) { + healthyNodes, err := r.healthyNodes(virtualStorage) + if err != nil { + return RouterNode{}, err + } + + primary, err := r.pg.GetPrimary(ctx, virtualStorage, relativePath) + if err != nil { + return RouterNode{}, fmt.Errorf("get primary: %w", err) + } + + consistentSecondaries, err := r.rs.GetConsistentSecondaries(ctx, virtualStorage, relativePath, primary) + if err != nil { + // this is recoverable error - proceed with primary node + ctxlogrus.Extract(ctx).WithError(err).Warn("get up to date secondaries") + } + + consistentSecondaries[primary] = struct{}{} + + healthyConsistentNodes := make([]RouterNode, 0, len(healthyNodes)) + for _, node := range healthyNodes { + if _, ok := consistentSecondaries[node.Storage]; !ok { + continue + } + + healthyConsistentNodes = append(healthyConsistentNodes, node) + } + + return r.pickRandom(healthyConsistentNodes) +} + +func (r *PerRepositoryRouter) RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath string) (RepositoryMutatorRoute, error) { + healthyNodes, err := r.healthyNodes(virtualStorage) + if err != nil { + return RepositoryMutatorRoute{}, err + } + + primary, err := r.pg.GetPrimary(ctx, virtualStorage, relativePath) + if err != nil { + return RepositoryMutatorRoute{}, fmt.Errorf("get primary: %w", err) + } + + if latest, err := r.rs.IsLatestGeneration(ctx, virtualStorage, relativePath, primary); err != nil { + return RepositoryMutatorRoute{}, fmt.Errorf("is latest generation: %w", err) + } else if !latest { + return RepositoryMutatorRoute{}, ErrRepositoryReadOnly + } + + consistentSecondaries, err := r.rs.GetConsistentSecondaries(ctx, virtualStorage, relativePath, primary) + if err != nil { + return RepositoryMutatorRoute{}, fmt.Errorf("consistent secondaries: %w", err) + } + + var route RepositoryMutatorRoute + for _, node := range healthyNodes { + if node.Storage == primary { + route.Primary = node + continue + } + + if _, ok := consistentSecondaries[node.Storage]; !ok { + route.ReplicationTargets = append(route.ReplicationTargets, node.Storage) + continue + } + + route.Secondaries = append(route.Secondaries, node) + } + + if (route.Primary == RouterNode{}) { + return RepositoryMutatorRoute{}, nodes.ErrPrimaryNotHealthy + } + + return route, nil +} diff --git a/internal/praefect/router_per_repository_test.go b/internal/praefect/router_per_repository_test.go new file mode 100644 index 0000000000..9d64a14176 --- /dev/null +++ b/internal/praefect/router_per_repository_test.go @@ -0,0 +1,338 @@ +package praefect + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" + "gitlab.com/gitlab-org/gitaly/internal/testhelper" + "google.golang.org/grpc" +) + +// PrimaryGetter is an adapter to turn conforming functions in to a PrimaryGetter. +type PrimaryGetterFunc func(ctx context.Context, virtualStorage, relativePath string) (string, error) + +func (fn PrimaryGetterFunc) GetPrimary(ctx context.Context, virtualStorage, relativePath string) (string, error) { + return fn(ctx, virtualStorage, relativePath) +} + +type MockRepositoryStore struct { + datastore.RepositoryStore + GetConsistentSecondariesFunc func(ctx context.Context, virtualStorage, relativePath, primary string) (map[string]struct{}, error) + IsLatestGenerationFunc func(ctx context.Context, virtualStorage, relativePath, storage string) (bool, error) +} + +func (m MockRepositoryStore) GetConsistentSecondaries(ctx context.Context, virtualStorage, relativePath, primary string) (map[string]struct{}, error) { + return m.GetConsistentSecondariesFunc(ctx, virtualStorage, relativePath, primary) +} + +func (m MockRepositoryStore) IsLatestGeneration(ctx context.Context, virtualStorage, relativePath, storage string) (bool, error) { + return m.IsLatestGenerationFunc(ctx, virtualStorage, relativePath, storage) +} + +func TestPerRepositoryRouter_RouteStorageAccessor(t *testing.T) { + ctx, cancel := testhelper.Context() + defer cancel() + + for _, tc := range []struct { + desc string + virtualStorage string + numCandidates int + pickCandidate int + error error + node string + }{ + { + desc: "unknown virtual storage", + virtualStorage: "unknown", + error: nodes.ErrVirtualStorageNotExist, + }, + { + desc: "picks randomly first candidate", + virtualStorage: "virtual-storage-1", + numCandidates: 2, + pickCandidate: 0, + node: "valid-choice-1", + }, + { + desc: "picks randomly second candidate", + virtualStorage: "virtual-storage-1", + numCandidates: 2, + pickCandidate: 1, + node: "valid-choice-2", + }, + } { + t.Run(tc.desc, func(t *testing.T) { + conns := Connections{ + "virtual-storage-1": { + "valid-choice-1": &grpc.ClientConn{}, + "valid-choice-2": &grpc.ClientConn{}, + "unhealthy": &grpc.ClientConn{}, + }, + } + + router := NewPerRepositoryRouter( + conns, + nil, + StaticHealthChecker{ + "virtual-storage-1": { + "valid-choice-1", + "valid-choice-2", + }, + }, + RandomFunc(func(n int) int { + require.Equal(t, tc.numCandidates, n) + return tc.pickCandidate + }), + nil, + ) + + node, err := router.RouteStorageAccessor(ctx, tc.virtualStorage) + require.Equal(t, tc.error, err) + require.Equal(t, RouterNode{ + Storage: tc.node, + Connection: conns["virtual-storage-1"][tc.node], + }, node) + }) + } +} + +func TestPerRepositoryRouter_RouteRepositoryAccessor(t *testing.T) { + for _, tc := range []struct { + desc string + virtualStorage string + healthyNodes StaticHealthChecker + numCandidates int + pickCandidate int + error error + node string + }{ + { + desc: "unknown virtual storage", + virtualStorage: "unknown", + error: nodes.ErrVirtualStorageNotExist, + }, + { + desc: "no healthy nodes", + virtualStorage: "virtual-storage-1", + healthyNodes: map[string][]string{}, + error: ErrNoHealthyNodes, + }, + { + desc: "primary picked randomly", + virtualStorage: "virtual-storage-1", + healthyNodes: map[string][]string{ + "virtual-storage-1": {"primary", "consistent-secondary"}, + }, + numCandidates: 2, + pickCandidate: 0, + node: "primary", + }, + { + desc: "secondary picked randomly", + virtualStorage: "virtual-storage-1", + healthyNodes: map[string][]string{ + "virtual-storage-1": {"primary", "consistent-secondary"}, + }, + numCandidates: 2, + pickCandidate: 1, + node: "consistent-secondary", + }, + { + desc: "secondary picked when primary is unhealthy", + virtualStorage: "virtual-storage-1", + healthyNodes: map[string][]string{ + "virtual-storage-1": {"consistent-secondary"}, + }, + numCandidates: 1, + node: "consistent-secondary", + }, + { + desc: "no suitable nodes", + virtualStorage: "virtual-storage-1", + healthyNodes: map[string][]string{ + "virtual-storage-1": {"inconistent-secondary"}, + }, + error: ErrNoSuitableNode, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + ctx, cancel := testhelper.Context() + defer cancel() + + conns := Connections{ + "virtual-storage-1": { + "primary": &grpc.ClientConn{}, + "consistent-secondary": &grpc.ClientConn{}, + "inconistent-secondary": &grpc.ClientConn{}, + "unhealthy-secondary": &grpc.ClientConn{}, + }, + } + + router := NewPerRepositoryRouter( + conns, + PrimaryGetterFunc(func(ctx context.Context, virtualStorage, relativePath string) (string, error) { + t.Helper() + require.Equal(t, tc.virtualStorage, virtualStorage) + require.Equal(t, "repository", relativePath) + return "primary", nil + }), + tc.healthyNodes, + RandomFunc(func(n int) int { + t.Helper() + require.Equal(t, tc.numCandidates, n) + return tc.pickCandidate + }), + MockRepositoryStore{ + GetConsistentSecondariesFunc: func(ctx context.Context, virtualStorage, relativePath, primary string) (map[string]struct{}, error) { + t.Helper() + require.Equal(t, tc.virtualStorage, virtualStorage) + require.Equal(t, "repository", relativePath) + require.Equal(t, "primary", primary) + return map[string]struct{}{"consistent-secondary": struct{}{}}, nil + }, + }, + ) + + node, err := router.RouteRepositoryAccessor(ctx, tc.virtualStorage, "repository") + require.Equal(t, tc.error, err) + if tc.node != "" { + require.Equal(t, RouterNode{ + Storage: tc.node, + Connection: conns[tc.virtualStorage][tc.node], + }, node) + } else { + require.Empty(t, node) + } + }) + } +} + +func TestPerRepositoryRouter_RouteRepositoryMutator(t *testing.T) { + for _, tc := range []struct { + desc string + virtualStorage string + healthyNodes StaticHealthChecker + isLatestGeneration bool + consistentSecondaries []string + secondaries []string + replicationTargets []string + error error + }{ + { + desc: "unknown virtual storage", + virtualStorage: "unknown", + error: nodes.ErrVirtualStorageNotExist, + }, + { + desc: "primary outdated", + virtualStorage: "virtual-storage-1", + isLatestGeneration: false, + healthyNodes: map[string][]string{"virtual-storage-1": {"primary", "secondary-1", "secondary-2"}}, + consistentSecondaries: []string{"secondary-1", "secondary-2"}, + error: ErrRepositoryReadOnly, + }, + { + desc: "primary unhealthy", + virtualStorage: "virtual-storage-1", + isLatestGeneration: true, + healthyNodes: map[string][]string{"virtual-storage-1": {"secondary-1", "secondary-2"}}, + consistentSecondaries: []string{"secondary-1", "secondary-2"}, + error: nodes.ErrPrimaryNotHealthy, + }, + { + desc: "all secondaries consistent", + virtualStorage: "virtual-storage-1", + isLatestGeneration: true, + healthyNodes: map[string][]string{"virtual-storage-1": {"primary", "secondary-1", "secondary-2"}}, + consistentSecondaries: []string{"secondary-1", "secondary-2"}, + secondaries: []string{"secondary-1", "secondary-2"}, + }, + { + desc: "inconsistent secondary", + virtualStorage: "virtual-storage-1", + isLatestGeneration: true, + healthyNodes: map[string][]string{"virtual-storage-1": {"primary", "secondary-1", "secondary-2"}}, + consistentSecondaries: []string{"secondary-2"}, + secondaries: []string{"secondary-2"}, + replicationTargets: []string{"secondary-1"}, + }, + { + desc: "unhealthy secondaries", + virtualStorage: "virtual-storage-1", + isLatestGeneration: true, + healthyNodes: map[string][]string{"virtual-storage-1": {"primary"}}, + consistentSecondaries: []string{"secondary-1"}, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + ctx, cancel := testhelper.Context() + defer cancel() + + conns := Connections{ + "virtual-storage-1": { + "primary": &grpc.ClientConn{}, + "secondary-1": &grpc.ClientConn{}, + "secondary-2": &grpc.ClientConn{}, + }, + } + + router := NewPerRepositoryRouter( + conns, + PrimaryGetterFunc(func(ctx context.Context, virtualStorage, relativePath string) (string, error) { + t.Helper() + require.Equal(t, tc.virtualStorage, virtualStorage) + require.Equal(t, "repository", relativePath) + return "primary", nil + }), + tc.healthyNodes, + nil, + MockRepositoryStore{ + IsLatestGenerationFunc: func(ctx context.Context, virtualStorage, relativePath, storage string) (bool, error) { + t.Helper() + require.Equal(t, tc.virtualStorage, virtualStorage) + require.Equal(t, "repository", relativePath) + require.Equal(t, "primary", storage) + return tc.isLatestGeneration, nil + }, + + GetConsistentSecondariesFunc: func(ctx context.Context, virtualStorage, relativePath, primary string) (map[string]struct{}, error) { + t.Helper() + require.Equal(t, tc.virtualStorage, virtualStorage) + require.Equal(t, "repository", relativePath) + require.Equal(t, "primary", primary) + consistentSecondaries := map[string]struct{}{} + for _, storage := range tc.consistentSecondaries { + consistentSecondaries[storage] = struct{}{} + } + + return consistentSecondaries, nil + }, + }, + ) + + route, err := router.RouteRepositoryMutator(ctx, tc.virtualStorage, "repository") + require.Equal(t, tc.error, err) + if err == nil { + var secondaries []RouterNode + for _, secondary := range tc.secondaries { + secondaries = append(secondaries, RouterNode{ + Storage: secondary, + Connection: conns[tc.virtualStorage][secondary], + }) + } + + require.Equal(t, RepositoryMutatorRoute{ + Primary: RouterNode{ + Storage: "primary", + Connection: conns[tc.virtualStorage]["primary"], + }, + Secondaries: secondaries, + ReplicationTargets: tc.replicationTargets, + }, route) + } + }) + } +} -- GitLab