diff --git a/internal/praefect/datastore/migrations/20200304001604_add_shard_elections_table.go b/internal/praefect/datastore/migrations/20200304001604_add_shard_elections_table.go new file mode 100644 index 0000000000000000000000000000000000000000..bb36436985e1e88cb8525118c4cd0a08a54b17e5 --- /dev/null +++ b/internal/praefect/datastore/migrations/20200304001604_add_shard_elections_table.go @@ -0,0 +1,20 @@ +package migrations + +import migrate "github.com/rubenv/sql-migrate" + +func init() { + m := &migrate.Migration{ + Id: "20200304001604_add_shard_elections_table", + Up: []string{`CREATE TABLE shard_elections ( + is_primary boolean DEFAULT 'true' NOT NULL, + shard_name varchar(255) NOT NULL, + node_name varchar(255) NOT NULL, + last_seen_active timestamp NOT NULL + )`, + "CREATE UNIQUE INDEX primary_shard_idx ON shard_elections (is_primary, shard_name)", + }, + Down: []string{"DROP TABLE shard_elections4"}, + } + + allMigrations = append(allMigrations, m) +} diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go index d45affbfec1fa181c9398d9f320ef35bfc9395d3..92403b74399499271347da8ae14e6aa196c387cd 100644 --- a/internal/praefect/nodes/manager.go +++ b/internal/praefect/nodes/manager.go @@ -2,7 +2,9 @@ package nodes import ( "context" + "database/sql" "errors" + "fmt" "sync" "time" @@ -11,12 +13,16 @@ import ( gitalyauth "gitlab.com/gitlab-org/gitaly/auth" "gitlab.com/gitlab-org/gitaly/client" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" + "gitlab.com/gitlab-org/labkit/log" "google.golang.org/grpc" healthpb "google.golang.org/grpc/health/grpc_health_v1" ) +var failoverThresholdSeconds = 20 + // Shard is a primary with a set of secondaries type Shard interface { GetPrimary() (Node, error) @@ -37,9 +43,9 @@ type Node interface { } type shard struct { - m sync.RWMutex - primary *nodeStatus - secondaries []*nodeStatus + m sync.RWMutex + primary *nodeStatus + allNodes []*nodeStatus } // GetPrimary gets the primary of a shard @@ -56,8 +62,10 @@ func (s *shard) GetSecondaries() ([]Node, error) { defer s.m.RUnlock() var secondaries []Node - for _, secondary := range s.secondaries { - secondaries = append(secondaries, secondary) + for _, node := range s.allNodes { + if s.primary != node { + secondaries = append(secondaries, node) + } } return secondaries, nil @@ -70,18 +78,24 @@ type Mgr struct { // VirtualStorages failoverEnabled bool log *logrus.Entry + db *sql.DB } // ErrPrimaryNotHealthy indicates the primary of a shard is not in a healthy state and hence // should not be used for a new request var ErrPrimaryNotHealthy = errors.New("primary is not healthy") -// NewNodeManager creates a new NodeMgr based on virtual storage configs +// NewManager creates a new NodeMgr based on virtual storage configs func NewManager(log *logrus.Entry, c config.Config, dialOpts ...grpc.DialOption) (*Mgr, error) { + db, err := glsql.OpenDB(c.DB) + if err != nil { + return nil, err + } + shards := make(map[string]*shard) for _, virtualStorage := range c.VirtualStorages { - var secondaries []*nodeStatus var primary *nodeStatus + var nodes []*nodeStatus for _, node := range virtualStorage.Nodes { conn, err := client.Dial(node.Address, append( @@ -97,17 +111,16 @@ func NewManager(log *logrus.Entry, c config.Config, dialOpts ...grpc.DialOption) } ns := newConnectionStatus(*node, conn, log) + nodes = append(nodes, ns) + if node.DefaultPrimary { primary = ns - continue } - - secondaries = append(secondaries, ns) } shards[virtualStorage.Name] = &shard{ - primary: primary, - secondaries: secondaries, + primary: primary, + allNodes: nodes, } } @@ -115,6 +128,7 @@ func NewManager(log *logrus.Entry, c config.Config, dialOpts ...grpc.DialOption) shards: shards, log: log, failoverEnabled: c.FailoverEnabled, + db: db, }, nil } @@ -162,43 +176,102 @@ func (n *Mgr) GetShard(virtualStorageName string) (Shard, error) { } if n.failoverEnabled { - if !shard.primary.isHealthy() { - return nil, ErrPrimaryNotHealthy - } + // if !shard.primary.isHealthy() { + // return nil, ErrPrimaryNotHealthy + // } } return shard, nil } -func (n *Mgr) checkShards() { - for _, shard := range n.shards { - shard.primary.check() - for _, secondary := range shard.secondaries { - secondary.check() - } +func (n *Mgr) updateLeader(shardName string, storageName string) error { + q := `INSERT INTO shard_elections (is_primary, shard_name, node_name, last_seen_active) + VALUES ('t', '%s', '%s', now()) ON CONFLICT (is_primary, shard_name) + DO UPDATE SET + node_name = + CASE WHEN (shard_elections.last_seen_active < now() - interval '%d seconds') THEN + excluded.node_name + ELSE + shard_elections.node_name + END, + last_seen_active = + CASE WHEN (shard_elections.last_seen_active < now() - interval '%d seconds') THEN + now() + ELSE + shard_elections.last_seen_active + END` + + _, err := n.db.Exec(fmt.Sprintf(q, shardName, storageName, failoverThresholdSeconds, failoverThresholdSeconds)) + + if err != nil { + n.log.Errorf("Error updating leader: %s", err) + } + return err +} - if shard.primary.isHealthy() { - continue +func (n *Mgr) lookupPrimary(shardName string) (*nodeStatus, error) { + q := fmt.Sprintf(`SELECT node_name FROM shard_elections + WHERE last_seen_active > now() - interval '%d seconds' + AND is_primary IS TRUE + AND shard_name = '%s'`, failoverThresholdSeconds, shardName) + + rows, err := n.db.Query(q) + if err != nil { + n.log.Errorf("Error looking up primary: %s", err) + return nil, err + } + defer rows.Close() + + for rows.Next() { + var name string + if err := rows.Scan(&name); err != nil { + return nil, err } - newPrimaryIndex := -1 - for i, secondary := range shard.secondaries { - if secondary.isHealthy() { - newPrimaryIndex = i - break + shard := n.shards[shardName] + + for _, node := range shard.allNodes { + if node.GetStorage() == name { + return node, nil } } + } + + return nil, err +} + +func (n *Mgr) checkShards() { + var wg sync.WaitGroup + + for shardName, shard := range n.shards { + for _, node := range shard.allNodes { + n.log.Info("checking node " + node.GetStorage() + ": " + node.GetAddress()) + + wg.Add(1) + go func(node *nodeStatus) { + defer wg.Done() + + if node.check() { + n.updateLeader(shardName, node.GetStorage()) + } else { + n.log.Info("No response from " + node.GetStorage()) + } + }(node) + } - if newPrimaryIndex < 0 { - // no healthy secondaries exist - continue + wg.Wait() + primary, err := n.lookupPrimary(shardName) + + if err == nil && primary != shard.primary { + n.log.WithFields(log.Fields{ + "old_primary": shard.primary, + "new_primary": primary, + "shard": shardName}).Info("primary node changed") + + shard.m.Lock() + shard.primary = primary + shard.m.Unlock() } - shard.m.Lock() - newPrimary := shard.secondaries[newPrimaryIndex] - shard.secondaries = append(shard.secondaries[:newPrimaryIndex], shard.secondaries[newPrimaryIndex+1:]...) - shard.secondaries = append(shard.secondaries, shard.primary) - shard.primary = newPrimary - shard.m.Unlock() } } @@ -252,9 +325,10 @@ func (n *nodeStatus) isHealthy() bool { return true } -func (n *nodeStatus) check() { +func (n *nodeStatus) check() bool { client := healthpb.NewHealthClient(n.ClientConn) ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + success := false defer cancel() resp, err := client.Check(ctx, &healthpb.HealthCheckRequest{Service: ""}) @@ -263,10 +337,14 @@ func (n *nodeStatus) check() { resp = &healthpb.HealthCheckResponse{ Status: healthpb.HealthCheckResponse_UNKNOWN, } + } else { + success = resp.Status == healthpb.HealthCheckResponse_SERVING } n.statuses = append(n.statuses, resp.Status) if len(n.statuses) > healthcheckThreshold { n.statuses = n.statuses[1:] } + + return success }