From f8ca891330e58e1c3a57a3ada9861a08ea5c520d Mon Sep 17 00:00:00 2001 From: Stan Hu Date: Wed, 4 Mar 2020 07:22:21 -0800 Subject: [PATCH 1/5] Spike test: Do leader elections in SQL --- ...0200304001604_add_shard_elections_table.go | 20 +++ internal/praefect/nodes/manager.go | 123 ++++++++++++------ 2 files changed, 106 insertions(+), 37 deletions(-) create mode 100644 internal/praefect/datastore/migrations/20200304001604_add_shard_elections_table.go diff --git a/internal/praefect/datastore/migrations/20200304001604_add_shard_elections_table.go b/internal/praefect/datastore/migrations/20200304001604_add_shard_elections_table.go new file mode 100644 index 0000000000..bb36436985 --- /dev/null +++ b/internal/praefect/datastore/migrations/20200304001604_add_shard_elections_table.go @@ -0,0 +1,20 @@ +package migrations + +import migrate "github.com/rubenv/sql-migrate" + +func init() { + m := &migrate.Migration{ + Id: "20200304001604_add_shard_elections_table", + Up: []string{`CREATE TABLE shard_elections ( + is_primary boolean DEFAULT 'true' NOT NULL, + shard_name varchar(255) NOT NULL, + node_name varchar(255) NOT NULL, + last_seen_active timestamp NOT NULL + )`, + "CREATE UNIQUE INDEX primary_shard_idx ON shard_elections (is_primary, shard_name)", + }, + Down: []string{"DROP TABLE shard_elections4"}, + } + + allMigrations = append(allMigrations, m) +} diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go index d45affbfec..47f80cc2ff 100644 --- a/internal/praefect/nodes/manager.go +++ b/internal/praefect/nodes/manager.go @@ -2,6 +2,7 @@ package nodes import ( "context" + "database/sql" "errors" "sync" "time" @@ -11,6 +12,7 @@ import ( gitalyauth "gitlab.com/gitlab-org/gitaly/auth" "gitlab.com/gitlab-org/gitaly/client" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "google.golang.org/grpc" @@ -37,9 +39,9 @@ type Node interface { } type shard struct { - m sync.RWMutex - primary *nodeStatus - secondaries []*nodeStatus + m sync.RWMutex + primary *nodeStatus + allNodes []*nodeStatus } // GetPrimary gets the primary of a shard @@ -56,8 +58,10 @@ func (s *shard) GetSecondaries() ([]Node, error) { defer s.m.RUnlock() var secondaries []Node - for _, secondary := range s.secondaries { - secondaries = append(secondaries, secondary) + for _, node := range s.allNodes { + if s.primary != node { + secondaries = append(secondaries, node) + } } return secondaries, nil @@ -70,18 +74,24 @@ type Mgr struct { // VirtualStorages failoverEnabled bool log *logrus.Entry + db *sql.DB } // ErrPrimaryNotHealthy indicates the primary of a shard is not in a healthy state and hence // should not be used for a new request var ErrPrimaryNotHealthy = errors.New("primary is not healthy") -// NewNodeManager creates a new NodeMgr based on virtual storage configs +// NewManager creates a new NodeMgr based on virtual storage configs func NewManager(log *logrus.Entry, c config.Config, dialOpts ...grpc.DialOption) (*Mgr, error) { + db, err := glsql.OpenDB(c.DB) + if err != nil { + return nil, err + } + shards := make(map[string]*shard) for _, virtualStorage := range c.VirtualStorages { - var secondaries []*nodeStatus var primary *nodeStatus + var nodes []*nodeStatus for _, node := range virtualStorage.Nodes { conn, err := client.Dial(node.Address, append( @@ -97,17 +107,16 @@ func NewManager(log *logrus.Entry, c config.Config, dialOpts ...grpc.DialOption) } ns := newConnectionStatus(*node, conn, log) + nodes = append(nodes, ns) + if node.DefaultPrimary { primary = ns - continue } - - secondaries = append(secondaries, ns) } shards[virtualStorage.Name] = &shard{ - primary: primary, - secondaries: secondaries, + primary: primary, + allNodes: nodes, } } @@ -115,6 +124,7 @@ func NewManager(log *logrus.Entry, c config.Config, dialOpts ...grpc.DialOption) shards: shards, log: log, failoverEnabled: c.FailoverEnabled, + db: db, }, nil } @@ -140,6 +150,8 @@ func (n *Mgr) monitor(d time.Duration) { defer ticker.Stop() for { + n.log.Info("starting health checks!") + <-ticker.C n.checkShards() } @@ -162,43 +174,75 @@ func (n *Mgr) GetShard(virtualStorageName string) (Shard, error) { } if n.failoverEnabled { - if !shard.primary.isHealthy() { - return nil, ErrPrimaryNotHealthy - } + // if !shard.primary.isHealthy() { + // return nil, ErrPrimaryNotHealthy + // } } return shard, nil } -func (n *Mgr) checkShards() { - for _, shard := range n.shards { - shard.primary.check() - for _, secondary := range shard.secondaries { - secondary.check() +func (n *Mgr) updateLeader(shardName string, storageName string) { + n.db.Exec("INSERT INTO shard_elections (is_primary, shard_name, node_name, last_seen_active) " + + "VALUES ('t', " + "'" + shardName + "'," + "'" + storageName + "', now()) ON CONFLICT (is_primary, shard_name) " + + "DO UPDATE SET " + + "node_name = " + + "CASE WHEN (shard_elections.last_seen_active < now() - interval '20 seconds') THEN " + + "shard_elections.node_name " + + "ELSE " + + "excluded.node_name " + + "END, " + + "last_seen_active = " + + "CASE WHEN (shard_elections.last_seen_active < now() - interval '20 seconds') THEN " + + " shard_elections.last_seen_active " + + "ELSE " + + "excluded.last_seen_active " + + "END") +} + +func (n *Mgr) LookupPrimary(shardName string) (*nodeStatus, error) { + rows, err := n.db.Query("SELECT node_name FROM shard_elections WHERE is_primary IS TRUE AND shard_name = '" + shardName + "'") + + if err != nil { + return nil, err + } + + for rows.Next() { + var name string + if err := rows.Scan(&name); err != nil { + return nil, err } - if shard.primary.isHealthy() { - continue + shard := n.shards[shardName] + + for _, node := range shard.allNodes { + if node.GetStorage() == name { + return node, nil + } } + } - newPrimaryIndex := -1 - for i, secondary := range shard.secondaries { - if secondary.isHealthy() { - newPrimaryIndex = i - break + return nil, err +} + +func (n *Mgr) checkShards() { + for shardName, shard := range n.shards { + for _, node := range shard.allNodes { + if node.check() { + n.log.Info("health check good for " + node.GetStorage()) + n.updateLeader(shardName, node.GetStorage()) + } else { + n.log.Info("health check failed for " + node.GetStorage()) } } - if newPrimaryIndex < 0 { - // no healthy secondaries exist - continue + primary, err := n.LookupPrimary(shardName) + + if err != nil { + shard.m.Lock() + shard.primary = primary + shard.m.Unlock() } - shard.m.Lock() - newPrimary := shard.secondaries[newPrimaryIndex] - shard.secondaries = append(shard.secondaries[:newPrimaryIndex], shard.secondaries[newPrimaryIndex+1:]...) - shard.secondaries = append(shard.secondaries, shard.primary) - shard.primary = newPrimary - shard.m.Unlock() } } @@ -252,9 +296,10 @@ func (n *nodeStatus) isHealthy() bool { return true } -func (n *nodeStatus) check() { +func (n *nodeStatus) check() bool { client := healthpb.NewHealthClient(n.ClientConn) ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + success := false defer cancel() resp, err := client.Check(ctx, &healthpb.HealthCheckRequest{Service: ""}) @@ -263,10 +308,14 @@ func (n *nodeStatus) check() { resp = &healthpb.HealthCheckResponse{ Status: healthpb.HealthCheckResponse_UNKNOWN, } + } else { + success = resp.Status == healthpb.HealthCheckResponse_SERVING } n.statuses = append(n.statuses, resp.Status) if len(n.statuses) > healthcheckThreshold { n.statuses = n.statuses[1:] } + + return success } -- GitLab From d693fe3cb0da3d7f64700f301a780606526aa950 Mon Sep 17 00:00:00 2001 From: Stan Hu Date: Thu, 5 Mar 2020 23:46:59 -0800 Subject: [PATCH 2/5] Fix primary promotion --- internal/praefect/nodes/manager.go | 63 ++++++++++++++++++------------ 1 file changed, 38 insertions(+), 25 deletions(-) diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go index 47f80cc2ff..61a0884896 100644 --- a/internal/praefect/nodes/manager.go +++ b/internal/praefect/nodes/manager.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "errors" + "fmt" "sync" "time" @@ -150,8 +151,6 @@ func (n *Mgr) monitor(d time.Duration) { defer ticker.Stop() for { - n.log.Info("starting health checks!") - <-ticker.C n.checkShards() } @@ -182,28 +181,40 @@ func (n *Mgr) GetShard(virtualStorageName string) (Shard, error) { return shard, nil } -func (n *Mgr) updateLeader(shardName string, storageName string) { - n.db.Exec("INSERT INTO shard_elections (is_primary, shard_name, node_name, last_seen_active) " + - "VALUES ('t', " + "'" + shardName + "'," + "'" + storageName + "', now()) ON CONFLICT (is_primary, shard_name) " + - "DO UPDATE SET " + - "node_name = " + - "CASE WHEN (shard_elections.last_seen_active < now() - interval '20 seconds') THEN " + - "shard_elections.node_name " + - "ELSE " + - "excluded.node_name " + - "END, " + - "last_seen_active = " + - "CASE WHEN (shard_elections.last_seen_active < now() - interval '20 seconds') THEN " + - " shard_elections.last_seen_active " + - "ELSE " + - "excluded.last_seen_active " + - "END") +func (n *Mgr) updateLeader(shardName string, storageName string) error { + q := `INSERT INTO shard_elections (is_primary, shard_name, node_name, last_seen_active) + VALUES ('t', '%s', '%s', now()) ON CONFLICT (is_primary, shard_name) + DO UPDATE SET + node_name = + CASE WHEN (shard_elections.last_seen_active < now() - interval '20 seconds') THEN + excluded.node_name + ELSE + shard_elections.node_name + END, + last_seen_active = + CASE WHEN (shard_elections.last_seen_active < now() - interval '20 seconds') THEN + now() + ELSE + shard_elections.last_seen_active + END` + + _, err := n.db.Exec(fmt.Sprintf(q, shardName, storageName)) + + if err != nil { + n.log.Errorf("Error updating leader: %s", err) + } + return err } -func (n *Mgr) LookupPrimary(shardName string) (*nodeStatus, error) { - rows, err := n.db.Query("SELECT node_name FROM shard_elections WHERE is_primary IS TRUE AND shard_name = '" + shardName + "'") +func (n *Mgr) lookupPrimary(shardName string) (*nodeStatus, error) { + q := fmt.Sprintf(`SELECT node_name FROM shard_elections + WHERE last_seen_active > now() - interval '20 seconds' + AND is_primary IS TRUE + AND shard_name = '%s'`, shardName) + rows, err := n.db.Query(q) if err != nil { + n.log.Errorf("Error looking up primary: %s", err) return nil, err } @@ -229,16 +240,18 @@ func (n *Mgr) checkShards() { for shardName, shard := range n.shards { for _, node := range shard.allNodes { if node.check() { - n.log.Info("health check good for " + node.GetStorage()) n.updateLeader(shardName, node.GetStorage()) - } else { - n.log.Info("health check failed for " + node.GetStorage()) } } - primary, err := n.LookupPrimary(shardName) + primary, err := n.lookupPrimary(shardName) + + if err == nil && primary != shard.primary { + n.log.WithFields(log.Fields{ + "old_primary": shard.primary, + "new_primary": primary, + "shard": shardName}).Info("primary node changed") - if err != nil { shard.m.Lock() shard.primary = primary shard.m.Unlock() -- GitLab From 4b2058c165f0559cba439c897501ad77c408d7bb Mon Sep 17 00:00:00 2001 From: Stan Hu Date: Thu, 5 Mar 2020 23:50:19 -0800 Subject: [PATCH 3/5] Add missing import --- internal/praefect/nodes/manager.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go index 61a0884896..c2929785af 100644 --- a/internal/praefect/nodes/manager.go +++ b/internal/praefect/nodes/manager.go @@ -16,6 +16,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" + "gitlab.com/gitlab-org/labkit/log" "google.golang.org/grpc" healthpb "google.golang.org/grpc/health/grpc_health_v1" ) -- GitLab From 5033efb584146cd6d5b26bdc671ffa662a67a788 Mon Sep 17 00:00:00 2001 From: Stan Hu Date: Fri, 6 Mar 2020 14:29:01 -0800 Subject: [PATCH 4/5] Fix database session leak --- internal/praefect/nodes/manager.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go index c2929785af..07b077ff0e 100644 --- a/internal/praefect/nodes/manager.go +++ b/internal/praefect/nodes/manager.go @@ -218,6 +218,7 @@ func (n *Mgr) lookupPrimary(shardName string) (*nodeStatus, error) { n.log.Errorf("Error looking up primary: %s", err) return nil, err } + defer rows.Close() for rows.Next() { var name string -- GitLab From 95fca8eee78725ae1af7572a96934a6c1c975508 Mon Sep 17 00:00:00 2001 From: Stan Hu Date: Fri, 6 Mar 2020 15:42:43 -0800 Subject: [PATCH 5/5] Ping hosts in parallel Parameterize failover duration --- internal/praefect/nodes/manager.go | 30 ++++++++++++++++++++++-------- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go index 07b077ff0e..92403b7439 100644 --- a/internal/praefect/nodes/manager.go +++ b/internal/praefect/nodes/manager.go @@ -21,6 +21,8 @@ import ( healthpb "google.golang.org/grpc/health/grpc_health_v1" ) +var failoverThresholdSeconds = 20 + // Shard is a primary with a set of secondaries type Shard interface { GetPrimary() (Node, error) @@ -187,19 +189,19 @@ func (n *Mgr) updateLeader(shardName string, storageName string) error { VALUES ('t', '%s', '%s', now()) ON CONFLICT (is_primary, shard_name) DO UPDATE SET node_name = - CASE WHEN (shard_elections.last_seen_active < now() - interval '20 seconds') THEN + CASE WHEN (shard_elections.last_seen_active < now() - interval '%d seconds') THEN excluded.node_name ELSE shard_elections.node_name END, last_seen_active = - CASE WHEN (shard_elections.last_seen_active < now() - interval '20 seconds') THEN + CASE WHEN (shard_elections.last_seen_active < now() - interval '%d seconds') THEN now() ELSE shard_elections.last_seen_active END` - _, err := n.db.Exec(fmt.Sprintf(q, shardName, storageName)) + _, err := n.db.Exec(fmt.Sprintf(q, shardName, storageName, failoverThresholdSeconds, failoverThresholdSeconds)) if err != nil { n.log.Errorf("Error updating leader: %s", err) @@ -209,9 +211,9 @@ func (n *Mgr) updateLeader(shardName string, storageName string) error { func (n *Mgr) lookupPrimary(shardName string) (*nodeStatus, error) { q := fmt.Sprintf(`SELECT node_name FROM shard_elections - WHERE last_seen_active > now() - interval '20 seconds' + WHERE last_seen_active > now() - interval '%d seconds' AND is_primary IS TRUE - AND shard_name = '%s'`, shardName) + AND shard_name = '%s'`, failoverThresholdSeconds, shardName) rows, err := n.db.Query(q) if err != nil { @@ -239,13 +241,25 @@ func (n *Mgr) lookupPrimary(shardName string) (*nodeStatus, error) { } func (n *Mgr) checkShards() { + var wg sync.WaitGroup + for shardName, shard := range n.shards { for _, node := range shard.allNodes { - if node.check() { - n.updateLeader(shardName, node.GetStorage()) - } + n.log.Info("checking node " + node.GetStorage() + ": " + node.GetAddress()) + + wg.Add(1) + go func(node *nodeStatus) { + defer wg.Done() + + if node.check() { + n.updateLeader(shardName, node.GetStorage()) + } else { + n.log.Info("No response from " + node.GetStorage()) + } + }(node) } + wg.Wait() primary, err := n.lookupPrimary(shardName) if err == nil && primary != shard.primary { -- GitLab