diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go index 7b52962340465ba2d96b938655341cb588d5295c..4c80ae4fdd89a8a88c90f81b2742101fe63cb32c 100644 --- a/internal/praefect/config/config.go +++ b/internal/praefect/config/config.go @@ -29,7 +29,8 @@ type Config struct { Prometheus prometheus.Config `toml:"prometheus"` Auth auth.Config `toml:"auth"` DB `toml:"database"` - FailoverEnabled bool `toml:"failover_enabled"` + FailoverEnabled bool `toml:"failover_enabled"` + ElectionStrategy string `toml:"election_strategy"` } // VirtualStorage represents a set of nodes for a storage diff --git a/internal/praefect/datastore/migrations/20200324001604_add_shard_elections_table.go b/internal/praefect/datastore/migrations/20200324001604_add_shard_elections_table.go new file mode 100644 index 0000000000000000000000000000000000000000..b7b88cc564705f1014402825b8d440bd46d7da25 --- /dev/null +++ b/internal/praefect/datastore/migrations/20200324001604_add_shard_elections_table.go @@ -0,0 +1,20 @@ +package migrations + +import migrate "github.com/rubenv/sql-migrate" + +func init() { + m := &migrate.Migration{ + Id: "20200324001604_add_shard_elections_table", + Up: []string{`CREATE TABLE shard_elections ( + is_primary boolean DEFAULT 'true' NOT NULL, + shard_name varchar(255) NOT NULL, + node_name varchar(255) NOT NULL, + last_seen_active timestamp NOT NULL + )`, + "CREATE UNIQUE INDEX primary_shard_idx ON shard_elections (is_primary, shard_name)", + }, + Down: []string{"DROP TABLE shard_elections"}, + } + + allMigrations = append(allMigrations, m) +} diff --git a/internal/praefect/nodes/local_memory_election_strategy.go b/internal/praefect/nodes/local_memory_election_strategy.go new file mode 100644 index 0000000000000000000000000000000000000000..a5a4d13bbf5b36ef4d239077177f35e4f6b53c76 --- /dev/null +++ b/internal/praefect/nodes/local_memory_election_strategy.go @@ -0,0 +1,199 @@ +package nodes + +import ( + "sync" + "time" + + "gitlab.com/gitlab-org/gitaly/internal/praefect/metrics" +) + +type managedNode struct { + node Node + up bool + primary bool + statuses []bool +} + +// LocalMemoryElectionStrategy relies on an in-memory datastore to track +// the primary and secondaries. It does NOT support multiple Praefect +// nodes or have any persistence. This is used mostly for testing and +// development. +type LocalMemoryElectionStrategy struct { + m sync.RWMutex + failoverEnabled bool + shardName string + nodes []*managedNode + primaryNode *managedNode +} + +// healthcheckThreshold is the number of consecutive healthpb.HealthCheckResponse_SERVING necessary +// for deeming a node "healthy" +const healthcheckThreshold = 3 + +func (n *managedNode) isHealthy() bool { + if len(n.statuses) < healthcheckThreshold { + return false + } + + for _, status := range n.statuses[len(n.statuses)-healthcheckThreshold:] { + if !status { + return false + } + } + + return true +} + +func newLocalMemoryElectionStrategy(name string, failoverEnabled bool) *LocalMemoryElectionStrategy { + return &LocalMemoryElectionStrategy{ + shardName: name, + } +} + +// AddNode registers a primary or secondary in the internal +// datastore. +func (s *LocalMemoryElectionStrategy) AddNode(node Node, primary bool) { + localNode := managedNode{ + node: node, + primary: primary, + statuses: make([]bool, 0), + up: false, + } + + // If failover hasn't been activated, we assume all nodes are up + // since health checks aren't run. + if !s.failoverEnabled { + localNode.up = true + } + + s.m.Lock() + defer s.m.Unlock() + + if primary { + s.primaryNode = &localNode + } + + s.nodes = append(s.nodes, &localNode) +} + +// Start launches a Goroutine to check the state of the nodes and +// continuously monitor their health via gRPC health checks. +func (s *LocalMemoryElectionStrategy) Start(bootstrapInterval, monitorInterval time.Duration) error { + s.bootstrap(bootstrapInterval) + go s.monitor(monitorInterval) + + return nil +} + +func (s *LocalMemoryElectionStrategy) bootstrap(d time.Duration) error { + timer := time.NewTimer(d) + defer timer.Stop() + + for i := 0; i < healthcheckThreshold; i++ { + <-timer.C + s.CheckShard() + timer.Reset(d) + } + + return nil +} + +func (s *LocalMemoryElectionStrategy) monitor(d time.Duration) { + ticker := time.NewTicker(d) + defer ticker.Stop() + + for { + <-ticker.C + s.CheckShard() + } +} + +// CheckShard issues a gRPC health check for each node managed by the +// shard. +func (s *LocalMemoryElectionStrategy) CheckShard() { + defer s.updateMetrics() + + for _, n := range s.nodes { + status := n.node.check() + n.statuses = append(n.statuses, status) + + if len(n.statuses) > healthcheckThreshold { + n.statuses = n.statuses[1:] + } + + up := n.isHealthy() + n.up = up + } + + if s.primaryNode != nil && s.primaryNode.isHealthy() { + return + } + + var newPrimary *managedNode + + for _, node := range s.nodes { + if !node.primary && node.isHealthy() { + newPrimary = node + break + } + } + + if newPrimary == nil { + return + } + + s.m.Lock() + defer s.m.Unlock() + + s.primaryNode.primary = false + s.primaryNode = newPrimary + newPrimary.primary = true +} + +// GetPrimary gets the primary of a shard. If no primary exists, it will +// be nil. If a primary has been elected but is down, err will be +// ErrPrimaryNotHealthy. +func (s *LocalMemoryElectionStrategy) GetPrimary() (Node, error) { + s.m.RLock() + defer s.m.RUnlock() + + if s.primaryNode == nil { + return nil, ErrPrimaryNotHealthy + } + + if !s.primaryNode.up { + return s.primaryNode.node, ErrPrimaryNotHealthy + } + + return s.primaryNode.node, nil +} + +// GetSecondaries gets the secondaries of a shard +func (s *LocalMemoryElectionStrategy) GetSecondaries() ([]Node, error) { + s.m.RLock() + defer s.m.RUnlock() + + var secondaries []Node + for _, n := range s.nodes { + if !n.primary { + secondaries = append(secondaries, n.node) + } + } + + return secondaries, nil +} + +func (s *LocalMemoryElectionStrategy) updateMetrics() { + s.m.RLock() + defer s.m.RUnlock() + + for _, node := range s.nodes { + val := float64(0) + + if node.primary { + val = float64(1) + } + + metrics.PrimaryGauge.WithLabelValues(s.shardName, node.node.GetStorage()).Set(val) + } +} diff --git a/internal/praefect/nodes/local_memory_election_strategy_test.go b/internal/praefect/nodes/local_memory_election_strategy_test.go new file mode 100644 index 0000000000000000000000000000000000000000..cbb5b2101f485afc7aae79eeb69a9f64bbb7350c --- /dev/null +++ b/internal/praefect/nodes/local_memory_election_strategy_test.go @@ -0,0 +1,46 @@ +package nodes + +import ( + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/praefect/models" + "gitlab.com/gitlab-org/gitaly/internal/testhelper" + "google.golang.org/grpc" +) + +func TestPrimaryAndSecondaries(t *testing.T) { + socket := testhelper.GetTemporaryGitalySocketFileName() + + cc, err := grpc.Dial( + "unix://"+socket, + grpc.WithInsecure(), + ) + + require.NoError(t, err) + + storageName := "default" + cs := newConnectionStatus(models.Node{Storage: storageName}, cc, testhelper.DiscardTestEntry(t), nil) + strategy := newLocalMemoryElectionStrategy(storageName, true) + + strategy.AddNode(cs, true) + + primary, err := strategy.GetPrimary() + + require.NoError(t, err) + require.Equal(t, primary, cs) + + secondaries, err := strategy.GetSecondaries() + + require.NoError(t, err) + require.Equal(t, 0, len(secondaries)) + + secondary := newConnectionStatus(models.Node{Storage: storageName}, cc, testhelper.DiscardTestEntry(t), nil) + strategy.AddNode(secondary, false) + + secondaries, err = strategy.GetSecondaries() + + require.NoError(t, err) + require.Equal(t, 1, len(secondaries)) + require.Equal(t, secondary, secondaries[0]) +} diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go index 19bf6eef8b71474bd3e5684d4d966ab3a9c0b7b4..74b3dba76354794017aad83715818e656830c646 100644 --- a/internal/praefect/nodes/manager.go +++ b/internal/praefect/nodes/manager.go @@ -3,7 +3,6 @@ package nodes import ( "context" "errors" - "sync" "time" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" @@ -38,55 +37,48 @@ type Node interface { GetAddress() string GetToken() string GetConnection() *grpc.ClientConn -} - -type shard struct { - m sync.RWMutex - primary *nodeStatus - secondaries []*nodeStatus -} - -// GetPrimary gets the primary of a shard -func (s *shard) GetPrimary() (Node, error) { - s.m.RLock() - defer s.m.RUnlock() - - return s.primary, nil -} - -// GetSecondaries gets the secondaries of a shard -func (s *shard) GetSecondaries() ([]Node, error) { - s.m.RLock() - defer s.m.RUnlock() - - var secondaries []Node - for _, secondary := range s.secondaries { - secondaries = append(secondaries, secondary) - } - - return secondaries, nil + check() bool } // Mgr is a concrete type that adheres to the Manager interface type Mgr struct { - // shards is a map of shards keyed on virtual storage name - shards map[string]*shard - // staticShards never changes based on node health. It is a static set of shards that comes from the config's - // VirtualStorages failoverEnabled bool log *logrus.Entry + // strategies is a map of strategies keyed on virtual storage name + strategies map[string]LeaderElectionStrategy +} + +// LeaderElectionStrategy defines the interface by which primary and +// secondaries are managed. +type LeaderElectionStrategy interface { + Start(bootstrapInterval, monitorInterval time.Duration) error + AddNode(node Node, primary bool) + CheckShard() + + Shard } // ErrPrimaryNotHealthy indicates the primary of a shard is not in a healthy state and hence // should not be used for a new request var ErrPrimaryNotHealthy = errors.New("primary is not healthy") -// NewNodeManager creates a new NodeMgr based on virtual storage configs +// NewManager creates a new NodeMgr based on virtual storage configs func NewManager(log *logrus.Entry, c config.Config, latencyHistogram metrics.HistogramVec, dialOpts ...grpc.DialOption) (*Mgr, error) { - shards := make(map[string]*shard) + mgr := Mgr{ + log: log, + failoverEnabled: c.FailoverEnabled} + + mgr.strategies = make(map[string]LeaderElectionStrategy) + for _, virtualStorage := range c.VirtualStorages { - var secondaries []*nodeStatus - var primary *nodeStatus + strategy, err := mgr.createElectionStrategy(virtualStorage.Name, c) + + if err != nil { + return nil, err + } + + mgr.strategies[virtualStorage.Name] = strategy + for _, node := range virtualStorage.Nodes { conn, err := client.Dial(node.Address, append( @@ -110,60 +102,43 @@ func NewManager(log *logrus.Entry, c config.Config, latencyHistogram metrics.His } ns := newConnectionStatus(*node, conn, log, latencyHistogram) - if node.DefaultPrimary { - primary = ns - continue - } - - secondaries = append(secondaries, ns) - } - - shards[virtualStorage.Name] = &shard{ - primary: primary, - secondaries: secondaries, + strategy.AddNode(ns, node.DefaultPrimary) } } - return &Mgr{ - shards: shards, - log: log, - failoverEnabled: c.FailoverEnabled, - }, nil + return &mgr, nil } -// healthcheckThreshold is the number of consecutive healthpb.HealthCheckResponse_SERVING necessary -// for deeming a node "healthy" -const healthcheckThreshold = 3 +func (n *Mgr) createElectionStrategy(shardName string, c config.Config) (LeaderElectionStrategy, error) { + if c.ElectionStrategy == "sql" { + strategy, err := newSqlElectionStrategy(shardName, c, n.log) -func (n *Mgr) bootstrap(d time.Duration) error { - timer := time.NewTimer(d) - defer timer.Stop() + if err != nil { + return nil, err + } - for i := 0; i < healthcheckThreshold; i++ { - <-timer.C - n.checkShards() - timer.Reset(d) + return strategy, err } - return nil -} - -func (n *Mgr) monitor(d time.Duration) { - ticker := time.NewTicker(d) - defer ticker.Stop() - - for { - <-ticker.C - n.checkShards() - } + return newLocalMemoryElectionStrategy(shardName, c.FailoverEnabled), nil } // Start will bootstrap the node manager by calling healthcheck on the nodes as well as kicking off // the monitoring process. Start must be called before NodeMgr can be used. func (n *Mgr) Start(bootstrapInterval, monitorInterval time.Duration) { if n.failoverEnabled { - n.bootstrap(bootstrapInterval) - go n.monitor(monitorInterval) + for _, strategy := range n.strategies { + strategy.Start(bootstrapInterval, monitorInterval) + } + } +} + +// CheckShards performs health checks on all the available shards. The +// election strategy is responsible for determining the criteria for +// when to elect a new primary and when a node is down. +func (n *Mgr) CheckShards() { + for _, strategy := range n.strategies { + strategy.CheckShard() } } @@ -172,13 +147,15 @@ var ErrVirtualStorageNotExist = errors.New("virtual storage does not exist") // GetShard retrieves a shard for a virtual storage name func (n *Mgr) GetShard(virtualStorageName string) (Shard, error) { - shard, ok := n.shards[virtualStorageName] + shard, ok := n.strategies[virtualStorageName] if !ok { return nil, ErrVirtualStorageNotExist } if n.failoverEnabled { - if !shard.primary.isHealthy() { + _, err := shard.GetPrimary() + + if err != nil { return nil, ErrPrimaryNotHealthy } } @@ -186,54 +163,10 @@ func (n *Mgr) GetShard(virtualStorageName string) (Shard, error) { return shard, nil } -func checkShard(virtualStorage string, s *shard) { - defer func() { - metrics.PrimaryGauge.WithLabelValues(virtualStorage, s.primary.GetStorage()).Set(1) - for _, secondary := range s.secondaries { - metrics.PrimaryGauge.WithLabelValues(virtualStorage, secondary.GetStorage()).Set(0) - } - }() - - s.primary.check() - for _, secondary := range s.secondaries { - secondary.check() - } - - if s.primary.isHealthy() { - return - } - - newPrimaryIndex := -1 - for i, secondary := range s.secondaries { - if secondary.isHealthy() { - newPrimaryIndex = i - break - } - } - - if newPrimaryIndex < 0 { - // no healthy secondaries exist - return - } - s.m.Lock() - newPrimary := s.secondaries[newPrimaryIndex] - s.secondaries = append(s.secondaries[:newPrimaryIndex], s.secondaries[newPrimaryIndex+1:]...) - s.secondaries = append(s.secondaries, s.primary) - s.primary = newPrimary - s.m.Unlock() -} - -func (n *Mgr) checkShards() { - for virtualStorage, shard := range n.shards { - checkShard(virtualStorage, shard) - } -} - func newConnectionStatus(node models.Node, cc *grpc.ClientConn, l *logrus.Entry, latencyHist metrics.HistogramVec) *nodeStatus { return &nodeStatus{ Node: node, ClientConn: cc, - statuses: make([]healthpb.HealthCheckResponse_ServingStatus, 0), log: l, latencyHist: latencyHist, } @@ -242,7 +175,6 @@ func newConnectionStatus(node models.Node, cc *grpc.ClientConn, l *logrus.Entry, type nodeStatus struct { models.Node *grpc.ClientConn - statuses []healthpb.HealthCheckResponse_ServingStatus log *logrus.Entry latencyHist metrics.HistogramVec } @@ -267,38 +199,24 @@ func (n *nodeStatus) GetConnection() *grpc.ClientConn { return n.ClientConn } -func (n *nodeStatus) isHealthy() bool { - if len(n.statuses) < healthcheckThreshold { - return false - } - - for _, status := range n.statuses[len(n.statuses)-healthcheckThreshold:] { - if status != healthpb.HealthCheckResponse_SERVING { - return false - } - } - - return true -} - -func (n *nodeStatus) check() { +func (n *nodeStatus) check() bool { client := healthpb.NewHealthClient(n.ClientConn) ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() + status := false start := time.Now() resp, err := client.Check(ctx, &healthpb.HealthCheckRequest{Service: ""}) n.latencyHist.WithLabelValues(n.Storage).Observe(time.Since(start).Seconds()) - if err != nil { - n.log.WithError(err).WithField("storage", n.Storage).WithField("address", n.Address).Warn("error when pinging healthcheck") - resp = &healthpb.HealthCheckResponse{ - Status: healthpb.HealthCheckResponse_UNKNOWN, - } + if err == nil && resp.Status == healthpb.HealthCheckResponse_SERVING { + status = true + } else { + n.log.WithError(err).WithFields(logrus.Fields{ + "storage": n.Storage, + "address": n.Address, + }).Warn("error when pinging healthcheck") } - n.statuses = append(n.statuses, resp.Status) - if len(n.statuses) > healthcheckThreshold { - n.statuses = n.statuses[1:] - } + return status } diff --git a/internal/praefect/nodes/manager_test.go b/internal/praefect/nodes/manager_test.go index 8f5e1443bba30db3e53c0a1e8f88f9d04074666d..135747afed52b3067ec3e5e141da4e1acd3dd19e 100644 --- a/internal/praefect/nodes/manager_test.go +++ b/internal/praefect/nodes/manager_test.go @@ -30,21 +30,17 @@ func TestNodeStatus(t *testing.T) { storageName := "default" cs := newConnectionStatus(models.Node{Storage: storageName}, cc, testhelper.DiscardTestEntry(t), mockHistogramVec) - require.False(t, cs.isHealthy()) - var expectedLabels [][]string for i := 0; i < healthcheckThreshold; i++ { - cs.check() + require.True(t, cs.check()) expectedLabels = append(expectedLabels, []string{storageName}) } - require.True(t, cs.isHealthy()) + require.Equal(t, expectedLabels, mockHistogramVec.LabelsCalled()) require.Len(t, mockHistogramVec.Observer().Observed(), healthcheckThreshold) healthSvr.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING) - - cs.check() - require.False(t, cs.isHealthy()) + require.False(t, cs.check()) } func TestNodeManager(t *testing.T) { @@ -122,7 +118,7 @@ func TestNodeManager(t *testing.T) { require.Equal(t, virtualStorages[0].Nodes[1].Address, secondaries[0].GetAddress()) healthSrv0.SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN) - nm.checkShards() + nm.CheckShards() labelsCalled := mockHistogram.LabelsCalled() for _, node := range virtualStorages[0].Nodes { @@ -167,7 +163,7 @@ func TestNodeManager(t *testing.T) { require.Equal(t, virtualStorages[0].Nodes[0].Address, secondaries[0].GetAddress()) healthSrv1.SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN) - nm.checkShards() + nm.CheckShards() _, err = nm.GetShard("virtual-storage-0") require.Error(t, err, "should return error since no nodes are healthy") diff --git a/internal/praefect/nodes/sql_election_strategy.go b/internal/praefect/nodes/sql_election_strategy.go new file mode 100644 index 0000000000000000000000000000000000000000..3d6e802ef46d951f9aa605d2292fb7579a433ab1 --- /dev/null +++ b/internal/praefect/nodes/sql_election_strategy.go @@ -0,0 +1,225 @@ +package nodes + +import ( + "database/sql" + "fmt" + "sync" + "time" + + "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql" + "gitlab.com/gitlab-org/gitaly/internal/praefect/metrics" +) + +var failoverThresholdSeconds = 20 + +type sqlNode struct { + node Node + primary bool +} + +type SqlElectionStrategy struct { + m sync.RWMutex + shardName string + nodes []*sqlNode + primaryNode *sqlNode + db *sql.DB + log *logrus.Entry +} + +func newSqlElectionStrategy(name string, c config.Config, log *logrus.Entry) (*SqlElectionStrategy, error) { + db, err := glsql.OpenDB(c.DB) + if err != nil { + return nil, err + } + + return &SqlElectionStrategy{ + shardName: name, + db: db, + log: log, + }, nil +} + +// AddNode registers a primary or secondary in the internal +// datastore. +func (s *SqlElectionStrategy) AddNode(node Node, primary bool) { + localNode := sqlNode{ + node: node, + primary: primary, + } + + s.m.Lock() + defer s.m.Unlock() + + if primary { + s.primaryNode = &localNode + } + + s.nodes = append(s.nodes, &localNode) +} + +// Start launches a Goroutine to check the state of the nodes and +// continuously monitor their health via gRPC health checks. +func (s *SqlElectionStrategy) Start(bootstrapInterval, monitorInterval time.Duration) error { + s.bootstrap(bootstrapInterval) + go s.monitor(monitorInterval) + + return nil +} + +func (s *SqlElectionStrategy) bootstrap(d time.Duration) error { + s.CheckShard() + + return nil +} + +func (s *SqlElectionStrategy) monitor(d time.Duration) { + ticker := time.NewTicker(d) + defer ticker.Stop() + + for { + <-ticker.C + s.CheckShard() + } +} + +// CheckShard issues a gRPC health check for each node managed by the +// shard. +func (s *SqlElectionStrategy) CheckShard() { + defer s.updateMetrics() + var wg sync.WaitGroup + + for _, n := range s.nodes { + s.log.Debug("checking node " + n.node.GetStorage() + ": " + n.node.GetAddress()) + + wg.Add(1) + go func(n *sqlNode) { + defer wg.Done() + + if n.node.check() { + s.updateLeader(n.node.GetStorage()) + } else { + s.log.Info("No response from " + n.node.GetStorage()) + } + }(n) + } + + wg.Wait() + candidate, err := s.lookupPrimary() + + if err == nil && candidate != s.primaryNode { + s.log.WithFields(logrus.Fields{ + "old_primary": s.primaryNode.node.GetStorage(), + "new_primary": candidate.node.GetStorage(), + "shard": s.shardName}).Info("primary node changed") + + s.m.Lock() + defer s.m.Unlock() + + if s.primaryNode != nil { + s.primaryNode.primary = false + } + + s.primaryNode = candidate + candidate.primary = true + } +} + +// GetPrimary gets the primary of a shard. If no primary exists, it will +// be nil. If a primary has been elected but is down, err will be +// ErrPrimaryNotHealthy. +func (s *SqlElectionStrategy) GetPrimary() (Node, error) { + s.m.RLock() + defer s.m.RUnlock() + + if s.primaryNode == nil { + return nil, ErrPrimaryNotHealthy + } + + return s.primaryNode.node, nil +} + +// GetSecondaries gets the secondaries of a shard +func (s *SqlElectionStrategy) GetSecondaries() ([]Node, error) { + s.m.RLock() + defer s.m.RUnlock() + + var secondaries []Node + for _, n := range s.nodes { + if !n.primary { + secondaries = append(secondaries, n.node) + } + } + + return secondaries, nil +} + +func (s *SqlElectionStrategy) updateMetrics() { + s.m.RLock() + defer s.m.RUnlock() + + for _, node := range s.nodes { + val := float64(0) + + if node.primary { + val = float64(1) + } + + metrics.PrimaryGauge.WithLabelValues(s.shardName, node.node.GetStorage()).Set(val) + } +} + +func (s *SqlElectionStrategy) updateLeader(storageName string) error { + q := `INSERT INTO shard_elections (is_primary, shard_name, node_name, last_seen_active) + VALUES ('t', '%s', '%s', now()) ON CONFLICT (is_primary, shard_name) + DO UPDATE SET + node_name = + CASE WHEN (shard_elections.last_seen_active < now() - interval '%d seconds') THEN + excluded.node_name + ELSE + shard_elections.node_name + END, + last_seen_active = + CASE WHEN (shard_elections.last_seen_active < now() - interval '%d seconds') THEN + now() + ELSE + shard_elections.last_seen_active + END` + + _, err := s.db.Exec(fmt.Sprintf(q, s.shardName, storageName, failoverThresholdSeconds, failoverThresholdSeconds)) + + if err != nil { + s.log.Errorf("Error updating leader: %s", err) + } + return err +} + +func (s *SqlElectionStrategy) lookupPrimary() (*sqlNode, error) { + q := fmt.Sprintf(`SELECT node_name FROM shard_elections + WHERE last_seen_active > now() - interval '%d seconds' + AND is_primary IS TRUE + AND shard_name = '%s'`, failoverThresholdSeconds, s.shardName) + + rows, err := s.db.Query(q) + if err != nil { + s.log.Errorf("Error looking up primary: %s", err) + return nil, err + } + defer rows.Close() + + for rows.Next() { + var name string + if err := rows.Scan(&name); err != nil { + return nil, err + } + + for _, n := range s.nodes { + if n.node.GetStorage() == name { + return n, nil + } + } + } + + return nil, err +}