diff --git a/internal/backup/log_entry_test.go b/internal/backup/log_entry_test.go index dffedfd45cfacbed43262ae476625a02bdf24c38..45911641e0a73a948b0439154a76ab8326874147 100644 --- a/internal/backup/log_entry_test.go +++ b/internal/backup/log_entry_test.go @@ -20,6 +20,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" "gitlab.com/gitlab-org/gitaly/v16/internal/helper" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) type mockLogManager struct { @@ -70,6 +71,10 @@ func (lm *mockLogManager) AcknowledgeTransaction(_ storagemgr.LogConsumer, lsn s } } +func (lm *mockLogManager) ReadEntry(lsn storage.LSN) (*gitalypb.LogEntry, error) { + return nil, nil +} + func (lm *mockLogManager) SendNotification() { n := lm.notifications[0] lm.archiver.NotifyNewTransactions(lm.partitionInfo.storageName, lm.partitionInfo.partitionID, n.lowWaterMark, n.highWaterMark) diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go index d94704aac6a91662c5e09f4ca9f7d9817434c416..067d101401504e8ea08cfc3eb47ad7cbcca76283 100644 --- a/internal/cli/gitaly/serve.go +++ b/internal/cli/gitaly/serve.go @@ -392,12 +392,19 @@ func run(appCtx *cli.Context, cfg config.Cfg, logger log.Logger) error { return fmt.Errorf("resolving write-ahead log backup sink: %w", err) } - consumerFactory = func(lma storagemgr.LogManagerAccessor) (storagemgr.LogConsumer, func()) { + consumerFactory = func(lma storagemgr.LogManagerAccessor) storagemgr.LogConsumer { walArchiver := backup.NewLogEntryArchiver(logger, walSink, cfg.Backup.WALWorkerCount, lma) prometheus.MustRegister(walArchiver) walArchiver.Run() - return walArchiver, walArchiver.Close + return walArchiver + } + } + // WAL doesn't support multiple log consumers yet. So, Raft's consumer will shadow other consumers. + // This problem will be handled in https://gitlab.com/gitlab-org/gitaly/-/issues/6105. + if cfg.Raft.Enabled { + consumerFactory = func(lma storagemgr.LogManagerAccessor) storagemgr.LogConsumer { + return raft.NewLogConsumer(ctx, lma, logger) } } diff --git a/internal/gitaly/storage/raft/db.go b/internal/gitaly/storage/raft/db.go index 143f4083771c2fa450a411fcfcf432c822e69bca..82ced10accd0106135761db1a3b1c20c25ca7831 100644 --- a/internal/gitaly/storage/raft/db.go +++ b/internal/gitaly/storage/raft/db.go @@ -2,6 +2,7 @@ package raft import ( "context" + "fmt" "github.com/lni/dragonboat/v4/statemachine" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue" @@ -38,19 +39,29 @@ func newNamespacedDBAccessor(ptnMgr *storagemgr.PartitionManager, storageName st } } -// dbForStorage returns a namedspaced DB accessor function for specific information of a storage in +// dbForStorage returns a namedspaced DB accessor for specific information of a storage in // Raft cluster such as allocated storage ID, last applied replica groups, etc. func dbForStorage(ptnMgr *storagemgr.PartitionManager, storageName string) dbAccessor { return newNamespacedDBAccessor(ptnMgr, storageName, []byte("raft/self/")) } -// dbForMetadataGroup returns a namedspaced DB accessfor function to store the data of metadata Raft -// group. Those data consists of cluster-wide information such as list of registered storages and -// their replication groups, etc. +// dbForMetadataGroup returns a namedspaced DB accessor to store the data of metadata Raft group. +// Those data consists of cluster-wide information such as list of registered storages and their +// replication groups, etc. func dbForMetadataGroup(ptnMgr *storagemgr.PartitionManager, storageName string) dbAccessor { return newNamespacedDBAccessor(ptnMgr, storageName, []byte("raft/cluster/")) } +// dbForReplicationGroup returns a namedspaced DB accessor to store the list of partitions under +// management of a storage. They could be either replicated partitions or ones created by the +// authoritative storage. +func dbForReplicationGroup(ptnMgr *storagemgr.PartitionManager, storageID raftID, storageName string, targetID raftID) dbAccessor { + if storageID == targetID { + return newNamespacedDBAccessor(ptnMgr, storageName, []byte("raft/authority/")) + } + return newNamespacedDBAccessor(ptnMgr, storageName, []byte(fmt.Sprintf("raft/replicas/%s/", targetID.MarshalBinary()))) +} + var keyLastApplied = []byte("applied_lsn") // Statemachine is an interface that wraps dragonboat's statemachine. It is a superset of diff --git a/internal/gitaly/storage/raft/db_test.go b/internal/gitaly/storage/raft/db_test.go index 6f4697ccbb7a69faab83fb8038b43184b1e6b992..41b3aa67315df74cb120b2fd3ba90743bf699b6f 100644 --- a/internal/gitaly/storage/raft/db_test.go +++ b/internal/gitaly/storage/raft/db_test.go @@ -4,13 +4,8 @@ import ( "testing" "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile" - "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" - "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" - "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue" - "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" @@ -23,20 +18,9 @@ func TestDbForMetadataGroup(t *testing.T) { ctx := testhelper.Context(t) cfg := testcfg.Build(t, testcfg.WithStorages("node-1")) - logger := testhelper.NewLogger(t) - dbMgr := setupTestDBManager(t, cfg) + dbMgr, ptnManager := setupTestDBManagerAndPartitionManager(t, cfg) - cmdFactory := gittest.NewCommandFactory(t, cfg) - catfileCache := catfile.NewCache(cfg) - t.Cleanup(catfileCache.Stop) - - localRepoFactory := localrepo.NewFactory(logger, config.NewLocator(cfg), cmdFactory, catfileCache) - - partitionManager, err := storagemgr.NewPartitionManager(testhelper.Context(t), cfg.Storages, cmdFactory, localRepoFactory, logger, dbMgr, cfg.Prometheus, nil) - require.NoError(t, err) - t.Cleanup(partitionManager.Close) - - db := dbForMetadataGroup(partitionManager, "node-1") + db := dbForMetadataGroup(ptnManager, "node-1") require.NoError(t, db.write(ctx, func(txn keyvalue.ReadWriter) error { require.NoError(t, txn.Set([]byte("data-1"), []byte("one"))) return nil @@ -96,19 +80,7 @@ func TestDbForStorage(t *testing.T) { ctx := testhelper.Context(t) cfg := testcfg.Build(t, testcfg.WithStorages("node-1")) - logger := testhelper.NewLogger(t) - dbMgr := setupTestDBManager(t, cfg) - - cmdFactory := gittest.NewCommandFactory(t, cfg) - catfileCache := catfile.NewCache(cfg) - t.Cleanup(catfileCache.Stop) - - localRepoFactory := localrepo.NewFactory(logger, config.NewLocator(cfg), cmdFactory, catfileCache) - - partitionManager, err := storagemgr.NewPartitionManager(testhelper.Context(t), cfg.Storages, cmdFactory, localRepoFactory, logger, dbMgr, cfg.Prometheus, nil) - require.NoError(t, err) - t.Cleanup(partitionManager.Close) - + dbMgr, ptnManager := setupTestDBManagerAndPartitionManager(t, cfg) storageInfo := &gitalypb.Storage{ StorageId: 1, Name: "node-1", @@ -117,7 +89,7 @@ func TestDbForStorage(t *testing.T) { ReplicaGroups: []uint64{2, 3}, } - db := dbForStorage(partitionManager, "node-1") + db := dbForStorage(ptnManager, "node-1") require.NoError(t, db.write(ctx, func(txn keyvalue.ReadWriter) error { storage, err := proto.Marshal(storageInfo) require.NoError(t, err) @@ -169,3 +141,63 @@ func TestDbForStorage(t *testing.T) { return nil })) } + +func TestDbForReplicationGroup(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t, testcfg.WithStorages("node-1")) + + dbMgr, ptnManager := setupTestDBManagerAndPartitionManager(t, cfg) + + db1 := dbForReplicationGroup(ptnManager, 1, "node-1", 1) + db2 := dbForReplicationGroup(ptnManager, 1, "node-1", 2) + db3 := dbForReplicationGroup(ptnManager, 1, "node-1", 3) + + require.NoError(t, db1.write(ctx, func(txn keyvalue.ReadWriter) error { + require.NoError(t, txn.Set([]byte("partitions/1"), []byte("@hashed/aa/bb/aabb"))) + require.NoError(t, txn.Set([]byte("partitions/2"), []byte("@hashed/cc/dd/ccdd"))) + require.NoError(t, txn.Set([]byte("partitions/3"), []byte("@hashed/ii/jj/iijj"))) + return nil + })) + + require.NoError(t, db2.write(ctx, func(txn keyvalue.ReadWriter) error { + require.NoError(t, txn.Set([]byte("partitions/1"), []byte("@hashed/aa/bb/aabb"))) + require.NoError(t, txn.Set([]byte("partitions/2"), []byte("@hashed/ee/ff/eeff"))) + return nil + })) + + require.NoError(t, db3.write(ctx, func(txn keyvalue.ReadWriter) error { + require.NoError(t, txn.Set([]byte("partitions/1"), []byte("@hashed/aa/bb/aabb"))) + require.NoError(t, txn.Set([]byte("partitions/2"), []byte("@hashed/gg/hh/gghh"))) + return nil + })) + + store, err := dbMgr.GetDB("node-1") + require.NoError(t, err) + + require.NoError(t, store.View(func(txn keyvalue.ReadWriter) error { + it := txn.NewIterator(keyvalue.IteratorOptions{ + Prefix: []byte("p/\x00\x00\x00\x00\x00\x00\x00\x01/kv/raft/"), + }) + defer it.Close() + + values := map[string][]byte{} + for it.Rewind(); it.Valid(); it.Next() { + k, err := it.Item().ValueCopy(nil) + require.NoError(t, err) + values[string(it.Item().Key())] = k + } + + require.Equal(t, map[string][]byte{ + "p/\x00\x00\x00\x00\x00\x00\x00\x01/kv/raft/authority/partitions/1": []byte("@hashed/aa/bb/aabb"), + "p/\x00\x00\x00\x00\x00\x00\x00\x01/kv/raft/authority/partitions/2": []byte("@hashed/cc/dd/ccdd"), + "p/\x00\x00\x00\x00\x00\x00\x00\x01/kv/raft/authority/partitions/3": []byte("@hashed/ii/jj/iijj"), + "p/\x00\x00\x00\x00\x00\x00\x00\x01/kv/raft/replicas/\x00\x00\x00\x00\x00\x00\x00\x02/partitions/1": []byte("@hashed/aa/bb/aabb"), + "p/\x00\x00\x00\x00\x00\x00\x00\x01/kv/raft/replicas/\x00\x00\x00\x00\x00\x00\x00\x02/partitions/2": []byte("@hashed/ee/ff/eeff"), + "p/\x00\x00\x00\x00\x00\x00\x00\x01/kv/raft/replicas/\x00\x00\x00\x00\x00\x00\x00\x03/partitions/1": []byte("@hashed/aa/bb/aabb"), + "p/\x00\x00\x00\x00\x00\x00\x00\x01/kv/raft/replicas/\x00\x00\x00\x00\x00\x00\x00\x03/partitions/2": []byte("@hashed/gg/hh/gghh"), + }, values) + return nil + })) +} diff --git a/internal/gitaly/storage/raft/ids.go b/internal/gitaly/storage/raft/ids.go index b7bfed58b8d6301dfd6057f5dc68f18d9b1972fb..d82c56fcd10f0f443494d13b53efcbd4e296ba51 100644 --- a/internal/gitaly/storage/raft/ids.go +++ b/internal/gitaly/storage/raft/ids.go @@ -10,9 +10,6 @@ import ( // internally. In Gitaly, "group" is used exclusively to refer to a Raft group. type raftID uint64 -// MetadataGroupID is a hard-coded ID of the cluster-wide metadata Raft group. -const MetadataGroupID = raftID(1) - // MarshalBinary returns a binary representation of the raftID. func (id raftID) MarshalBinary() []byte { marshaled := make([]byte, binary.Size(id)) @@ -40,3 +37,13 @@ func (id raftID) ToUint64() uint64 { // needs to perform all necessary checks beforehand. However, the state machine must perform state // validation at its layer. The result is propagated to the caller to handle respectively. type updateResult uint64 + +// MetadataGroupID is a hard-coded ID of the cluster-wide metadata Raft group. +const MetadataGroupID = raftID(1) + +// AuthorityGroupID returns the ID of the Raft group that manages the partitions created by the +// input storage. The ID has the form "XX000000", in which the first two bytes are the storage ID. +// The cluster has the maximum of 2^16=65536 storages. +func AuthorityGroupID(storageID raftID) raftID { + return storageID << 48 +} diff --git a/internal/gitaly/storage/raft/log_consumer.go b/internal/gitaly/storage/raft/log_consumer.go new file mode 100644 index 0000000000000000000000000000000000000000..8b1146b2bb9f37c882f2c1bdd815e4d65105bd2a --- /dev/null +++ b/internal/gitaly/storage/raft/log_consumer.go @@ -0,0 +1,203 @@ +package raft + +import ( + "container/list" + "context" + "fmt" + "sync" + + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" + "gitlab.com/gitlab-org/gitaly/v16/internal/log" +) + +type notification struct { + partitionID storage.PartitionID + lowWaterMark storage.LSN + highWaterMark storage.LSN +} + +type partitionState struct { + nextLSN storage.LSN +} + +type notifications struct { + sync.Mutex + storageName string + list *list.List + signal chan struct{} + partitionStates map[storage.PartitionID]*partitionState +} + +func (n *notifications) pop() *notification { + n.Lock() + defer n.Unlock() + + item := n.list.Front() + if item == nil { + return nil + } + + n.list.Remove(item) + return item.Value.(*notification) +} + +type replicatorPusher struct { + finish chan struct{} + done chan struct{} + replicator *replicator +} + +// LogConsumer acts as a bridge that connects WAL and Raft replicator. It implements WAL's +// storagemgr.LogConsumer interface, listens for new notifications, and then pushes new notifications +// to the corresponding Raft replicator. +type LogConsumer struct { + sync.Mutex + + ctx context.Context + logger log.Logger + logManager storagemgr.LogManagerAccessor + pushers map[string]*replicatorPusher + notifications map[string]*notifications +} + +// Push starts a goroutine that pipe the notifications from WAL to the corresponding replicator of a storage. +func (l *LogConsumer) Push(storageName string, replicator *replicator) error { + if _, exist := l.pushers[storageName]; exist { + return fmt.Errorf("storage %q already registered", storageName) + } + + l.pushers[storageName] = &replicatorPusher{ + finish: make(chan struct{}), + done: make(chan struct{}), + replicator: replicator, + } + go l.pushNotifications(l.initializeNotifications(storageName), l.pushers[storageName]) + + return nil +} + +// NotifyNewTransactions is called by WAL's transaction manager when there are new appended logs or +// when the manager restarts. It appends the notification to a linked list. In the background, that +// linked list is consumed by another goroutine initiated by (*LogConsumer).Push. +func (l *LogConsumer) NotifyNewTransactions(storageName string, partitionID storage.PartitionID, lowWaterMark storage.LSN, highWaterMark storage.LSN) { + notifications := l.initializeNotifications(storageName) + + notifications.Lock() + notifications.list.PushBack(¬ification{ + partitionID: partitionID, + lowWaterMark: lowWaterMark, + highWaterMark: highWaterMark, + }) + notifications.Unlock() + + select { + case notifications.signal <- struct{}{}: + default: + } +} + +// Close stops all activities of this log consumer. The function exits when all background goroutines finish. +func (l *LogConsumer) Close() { + for storageName := range l.pushers { + close(l.pushers[storageName].finish) + } + for storageName := range l.pushers { + <-l.pushers[storageName].done + } + l.pushers = map[string]*replicatorPusher{} + l.notifications = map[string]*notifications{} +} + +func (l *LogConsumer) pushNotifications(notifications *notifications, pusher *replicatorPusher) { + defer close(pusher.done) + + for { + select { + case <-pusher.finish: + return + case <-notifications.signal: + } + + for { + select { + case <-pusher.finish: + return + default: + } + + n := notifications.pop() + if n != nil { + break + } + + state, ok := notifications.partitionStates[n.partitionID] + if !ok { + state = &partitionState{nextLSN: n.lowWaterMark} + notifications.partitionStates[n.partitionID] = state + } + + // All log entries are already handled. + if state.nextLSN > n.highWaterMark { + continue + } else if state.nextLSN < n.lowWaterMark { + state.nextLSN = n.lowWaterMark + } + + if err := l.logManager.CallLogManager(l.ctx, notifications.storageName, n.partitionID, func(manager storagemgr.LogManager) { + for lsn := state.nextLSN; lsn <= n.highWaterMark; lsn++ { + entry, err := manager.ReadEntry(lsn) + if err != nil { + l.logger.WithError(err).WithFields(log.Fields{ + "raft.authority_name": notifications.storageName, + "raft.partition_id": n.partitionID, + "raft.lsn": lsn, + }).Error("fail to read log entry") + continue + } + if entry.GetRepositoryCreation() != nil { + if err := pusher.replicator.BroadcastNewPartition(raftID(n.partitionID), entry.RelativePath); err != nil { + l.logger.WithError(err).WithFields(log.Fields{ + "raft.authority_name": notifications.storageName, + "raft.partition_id": n.partitionID, + }).Error("fail to broadcast new partition") + } + } + manager.AcknowledgeTransaction(l, lsn) + } + state.nextLSN = n.highWaterMark + 1 + }); err != nil { + l.logger.WithError(err).Error("failed to acknowledge log entry") + } + } + } +} + +func (l *LogConsumer) initializeNotifications(storageName string) *notifications { + l.Lock() + defer l.Unlock() + + if _, ok := l.notifications[storageName]; !ok { + l.notifications[storageName] = ¬ifications{ + storageName: storageName, + list: &list.List{}, + signal: make(chan struct{}, 1), + partitionStates: map[storage.PartitionID]*partitionState{}, + } + } + return l.notifications[storageName] +} + +// NewLogConsumer is a factory that returns new LogConsumer object for the input LogManagerAccessor. +func NewLogConsumer(ctx context.Context, lma storagemgr.LogManagerAccessor, logger log.Logger) storagemgr.LogConsumer { + return &LogConsumer{ + ctx: ctx, + logManager: lma, + logger: logger.WithFields(log.Fields{ + "component": "raft", + "raft.component": "log_consumer", + }), + pushers: map[string]*replicatorPusher{}, + notifications: map[string]*notifications{}, + } +} diff --git a/internal/gitaly/storage/raft/logger.go b/internal/gitaly/storage/raft/logger.go index 733506e22fdd4881a607f65483d5d91174dfb79a..9118516ecac818f89b16c1ad7f79a56f7c13d943 100644 --- a/internal/gitaly/storage/raft/logger.go +++ b/internal/gitaly/storage/raft/logger.go @@ -73,7 +73,7 @@ func SetLogger(logger log.Logger, suppressDefaultLog bool) { name: pkgName, Logger: logger.WithFields(log.Fields{ "component": "raft", - "raft_component": strings.ToLower(pkgName), + "raft.component": strings.ToLower(pkgName), }), } }) @@ -85,12 +85,14 @@ func SetLogger(logger log.Logger, suppressDefaultLog bool) { dragonboatLogger.GetLogger("rsm").SetLevel(dragonboatLogger.WARNING) dragonboatLogger.GetLogger("transport").SetLevel(dragonboatLogger.ERROR) dragonboatLogger.GetLogger("grpc").SetLevel(dragonboatLogger.ERROR) + dragonboatLogger.GetLogger("logdb").SetLevel(dragonboatLogger.ERROR) } else { dragonboatLogger.GetLogger("dragonboat").SetLevel(dragonboatLogger.INFO) dragonboatLogger.GetLogger("raft").SetLevel(dragonboatLogger.INFO) dragonboatLogger.GetLogger("rsm").SetLevel(dragonboatLogger.INFO) dragonboatLogger.GetLogger("transport").SetLevel(dragonboatLogger.INFO) dragonboatLogger.GetLogger("grpc").SetLevel(dragonboatLogger.INFO) + dragonboatLogger.GetLogger("logdb").SetLevel(dragonboatLogger.INFO) } }) } diff --git a/internal/gitaly/storage/raft/manager.go b/internal/gitaly/storage/raft/manager.go index 6a068013da4f883ce95f365e394feb15e18b001f..a022c0862bb84b19933b6b08f8c843be63f9e08d 100644 --- a/internal/gitaly/storage/raft/manager.go +++ b/internal/gitaly/storage/raft/manager.go @@ -3,11 +3,15 @@ package raft import ( "context" "fmt" + "math/rand" "path/filepath" + "sync" "sync/atomic" + "time" "github.com/lni/dragonboat/v4" dragonboatConf "github.com/lni/dragonboat/v4/config" + "gitlab.com/gitlab-org/gitaly/v16/internal/backoff" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" "gitlab.com/gitlab-org/gitaly/v16/internal/log" @@ -39,12 +43,15 @@ type ManagerConfig struct { // Manager is responsible for managing the Raft cluster for all storages. type Manager struct { ctx context.Context + cancel context.CancelFunc clusterConfig config.Raft managerConfig ManagerConfig logger log.Logger started atomic.Bool closed atomic.Bool + running sync.WaitGroup + logConsumer *LogConsumer storageManagers map[string]*storageManager firstStorage *storageManager metadataGroup *metadataRaftGroup @@ -72,18 +79,20 @@ func NewManager( if len(storages) > 1 { return nil, fmt.Errorf("the support for multiple storages is temporarily disabled") } - + ctx, cancel := context.WithCancel(ctx) m := &Manager{ ctx: ctx, + cancel: cancel, clusterConfig: clusterCfg, managerConfig: managerCfg, logger: logger.WithFields(log.Fields{ "component": "raft", - "raft_component": "manager", - "raft_cluster_id": clusterCfg.ClusterID, - "raft_node_id": clusterCfg.NodeID, + "raft.component": "manager", + "raft.cluster_id": clusterCfg.ClusterID, + "raft.node_id": clusterCfg.NodeID, }), storageManagers: map[string]*storageManager{}, + running: sync.WaitGroup{}, } storage := storages[0] @@ -96,7 +105,7 @@ func NewManager( DefaultNodeRegistryEnabled: false, EnableMetrics: true, RaftEventListener: &raftLogger{ - Logger: m.logger.WithField("raft_component", "system"), + Logger: m.logger.WithField("raft.component", "system"), }, Expert: managerCfg.expertConfig, }) @@ -108,6 +117,15 @@ func NewManager( if m.firstStorage == nil { m.firstStorage = m.storageManagers[storage.Name] } + consumer := ptnMgr.GetLogConsumer() + if consumer == nil { + return nil, fmt.Errorf("WAL log consumer has not been initialized") + } + raftLogConsumer, ok := consumer.(*LogConsumer) + if !ok { + return nil, fmt.Errorf("mismatched WAL log consumer, %T expected, %T found", raftLogConsumer, consumer) + } + m.logConsumer = raftLogConsumer return m, nil } @@ -126,6 +144,7 @@ func NewManager( // - Register the node's storage with the metadata Raft group. The metadata Raft group allocates a // new storage ID for each of them. They persist in their IDs. This type of ID is used for future // interaction with the cluster. +// - Start the replicator that monitors to any storage/partition changes from desired groups. func (m *Manager) Start() (returnedErr error) { if m.started.Load() { return fmt.Errorf("raft manager already started") @@ -138,8 +157,8 @@ func (m *Manager) Start() (returnedErr error) { }() m.logger.WithFields(log.Fields{ - "raft_config": m.clusterConfig, - "raft_manager_conf": m.managerConfig, + "raft.config": m.clusterConfig, + "raft.manager_conf": m.managerConfig, }).Info("Raft cluster is starting") // A Gitaly node contains multiple independent storages, and each storage maps to a dragonboat @@ -160,18 +179,11 @@ func (m *Manager) Start() (returnedErr error) { m.logger.WithField("cluster", cluster).Info("Raft cluster bootstrapped") } - // Temporarily, we fetch the cluster info from the metadata Raft group directly. In the future, - // this node needs to contact a metadata authority. - // For more information: https://gitlab.com/groups/gitlab-org/-/epics/10864 - cluster, err := m.metadataGroup.ClusterInfo() - if err != nil { - return fmt.Errorf("getting cluster info: %w", err) - } - if cluster.ClusterId != m.clusterConfig.ClusterID { - return fmt.Errorf("joining the wrong cluster, expected to join %q but joined %q", m.clusterConfig.ClusterID, cluster.ClusterId) + if err := m.registerStorages(); err != nil { + return err } - if err := m.registerStorages(); err != nil { + if err := m.startReplicators(); err != nil { return err } @@ -196,6 +208,17 @@ func (m *Manager) initMetadataGroup(storageMgr *storageManager) error { } func (m *Manager) registerStorages() error { + // Temporarily, we fetch the cluster info from the metadata Raft group directly. In the future, + // this node needs to contact a metadata authority. + // For more information: https://gitlab.com/groups/gitlab-org/-/epics/10864 + cluster, err := m.metadataGroup.ClusterInfo() + if err != nil { + return fmt.Errorf("getting cluster info: %w", err) + } + if cluster.ClusterId != m.clusterConfig.ClusterID { + return fmt.Errorf("joining the wrong cluster, expected to join %q but joined %q", m.clusterConfig.ClusterID, cluster.ClusterId) + } + if m.managerConfig.testBeforeRegister != nil { m.managerConfig.testBeforeRegister() } @@ -206,7 +229,7 @@ func (m *Manager) registerStorages() error { if err := storageMgr.loadStorageInfo(m.ctx); err != nil { return fmt.Errorf("loading persisted storage info: %w", err) } - if storageMgr.persistedInfo == nil || storageMgr.persistedInfo.GetStorageId() == 0 { + if storageMgr.ID() == 0 { storageInfo, err := m.metadataGroup.RegisterStorage(storageName) if err != nil { return fmt.Errorf("registering storage info: %w", err) @@ -214,6 +237,10 @@ func (m *Manager) registerStorages() error { if err := storageMgr.saveStorageInfo(m.ctx, storageInfo); err != nil { return fmt.Errorf("saving storage info: %w", err) } + m.logger.WithFields(log.Fields{ + "raft.storage_name": storageName, + "raft.storage_id": storageMgr.persistedInfo.GetStorageId(), + }).Info("storage registered") } else if storageMgr.persistedInfo.NodeId != m.clusterConfig.NodeID || storageMgr.persistedInfo.ReplicationFactor != m.clusterConfig.ReplicationFactor { // Changes that gonna affect replication. Gitaly needs to sync up those changes to metadata // Raft group to shuffle the replication groups. We don't persit new info intentionally. The @@ -227,15 +254,131 @@ func (m *Manager) registerStorages() error { } } m.logger.WithFields(log.Fields{ - "storage_name": storageName, - "storage_id": storageMgr.persistedInfo.GetStorageId(), - "replication_factor": storageMgr.persistedInfo.GetReplicationFactor(), + "raft.storage_name": storageName, + "raft.storage_id": storageMgr.persistedInfo.GetStorageId(), + "raft.replication_factor": storageMgr.persistedInfo.GetReplicationFactor(), }).Info("storage joined the cluster") } return nil } +func (m *Manager) initReplicationGroup(hostingID raftID, storageInfo *gitalypb.Storage, authority bool) (*replicationRaftGroup, error) { + var destinatingMgr *storageManager + for _, storageMgr := range m.storageManagers { + if storageMgr.ID() == hostingID { + destinatingMgr = storageMgr + } + } + + if destinatingMgr == nil { + return nil, fmt.Errorf("storage %q is not managed by this node", storageInfo.GetName()) + } + + group, err := newReplicationGroup( + m.ctx, + authority, + storageInfo, + destinatingMgr.nodeHost, + destinatingMgr.dbForReplicationGroup(raftID(storageInfo.GetStorageId())), + m.clusterConfig, + m.logger, + ) + if err != nil { + return nil, fmt.Errorf("initializing replication Raft group: %w", err) + } + return group, nil +} + +func (m *Manager) startReplicators() error { + for storageName, storageMgr := range m.storageManagers { + replicator := newReplicator(m.ctx, storageMgr.ID(), m.logger, replicatorConfig{ + initAuthorityGroup: func(storageInfo *gitalypb.Storage) (authority, error) { + return m.initReplicationGroup(raftID(storageInfo.GetStorageId()), storageInfo, true) + }, + initReplicaGroup: func(hostingID raftID, storageInfo *gitalypb.Storage) (replica, error) { + return m.initReplicationGroup(hostingID, storageInfo, false) + }, + getNodeAddr: m.getNodeAddr, + initPartitionGroup: func(_authorityID raftID, _partitionID raftID, _relativePath string) error { + // No-op now. + return nil + }, + }) + storageMgr.replicator = replicator + if err := m.logConsumer.Push(storageName, replicator); err != nil { + return fmt.Errorf("registering storage %q to Raft's WAL log consumer: %w", storageName, err) + } + } + + delay := backoff.NewDefaultExponential(rand.New(rand.NewSource(time.Now().UnixNano()))) + delay.BaseDelay = 5 * time.Second + delay.MaxDelay = 60 * time.Second + + errTimer := time.NewTimer(m.metadataGroup.maxNextElectionWait()) + + const maxNoUpdates = 30 + var noUpdates uint + noUpdatesTimer := time.NewTimer(delay.BaseDelay) + + m.running.Add(1) + go func() { + defer m.running.Done() + for { + var err error + + clusterInfo, err := m.ClusterInfo() + if err == nil { + for _, storageMgr := range m.storageManagers { + var changes int + if changes, err = storageMgr.replicator.ApplyReplicaGroups(clusterInfo); err != nil { + break + } + if err = storageMgr.saveStorageInfo(m.ctx, clusterInfo.Storages[storageMgr.ID().ToUint64()]); err != nil { + break + } + if changes != 0 { + noUpdates = 0 + } + } + } + + if err != nil { + if m.ctx.Err() != nil { + return + } + m.logger.WithError(err).WithField("cluster", clusterInfo).Error("error while monitoring replica changes") + errTimer.Reset(m.metadataGroup.maxNextElectionWait()) + noUpdates = 0 + select { + case <-m.ctx.Done(): + errTimer.Stop() + return + case <-errTimer.C: + errTimer.Stop() + continue + } + } + + // Push back the next polling point of time. This practice reduces the polling + // rate when the cluster is stable. + if noUpdates < maxNoUpdates { + noUpdates++ + } + noUpdatesTimer.Reset(delay.Backoff(noUpdates)) + select { + case <-m.ctx.Done(): + noUpdatesTimer.Stop() + return + case <-noUpdatesTimer.C: + noUpdatesTimer.Stop() + } + } + }() + + return nil +} + // Ready returns if the Raft manager is ready. func (m *Manager) Ready() bool { return m.started.Load() @@ -248,9 +391,16 @@ func (m *Manager) Close() { } defer m.closed.Store(true) + if err := m.metadataGroup.Close(); err != nil { + m.logger.WithError(err).Warn("fail to stop metadata Raft group") + } for _, storageMgr := range m.storageManagers { - storageMgr.Close() + if err := storageMgr.Close(); err != nil { + m.logger.WithError(err).Error("stopping storage") + } } + m.cancel() + m.running.Wait() m.logger.Info("Raft cluster has stopped") } @@ -264,3 +414,13 @@ func (m *Manager) ClusterInfo() (*gitalypb.Cluster, error) { } return m.metadataGroup.ClusterInfo() } + +func (m *Manager) getNodeAddr(nodeID raftID) (string, error) { + // Right now, all storages are also initial members. In the future, the manager needs to contact + // the metadata Raft group or gossip. + addr, exist := m.clusterConfig.InitialMembers[fmt.Sprintf("%d", nodeID)] + if !exist { + return "", fmt.Errorf("address of storage %d does not exist", nodeID) + } + return addr, nil +} diff --git a/internal/gitaly/storage/raft/manager_test.go b/internal/gitaly/storage/raft/manager_test.go index 381f6300bb9db59cb967b8a00d29a2cd47f6c8db..b582b80971af321ffb6df5e97f6c944321d7b0ac 100644 --- a/internal/gitaly/storage/raft/manager_test.go +++ b/internal/gitaly/storage/raft/manager_test.go @@ -4,7 +4,6 @@ import ( "fmt" "testing" - "github.com/lni/dragonboat/v4" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" @@ -54,23 +53,29 @@ func TestManager_Start(t *testing.T) { } return &testNode{ - manager: mgr, - close: mgr.Close, + cfg: cfg, + manager: mgr, + ptnManager: ptnMgr, + close: mgr.Close, }, nil } } - resetManager := func(t *testing.T, m *Manager) { - m.metadataGroup = nil - for _, storageMgr := range m.storageManagers { - storageMgr.clearStorageInfo() - storageMgr.nodeHost.Close() - nodeHost, err := dragonboat.NewNodeHost(storageMgr.nodeHost.NodeHostConfig()) - require.NoError(t, err) - storageMgr.nodeHost = nodeHost - } - m.started.Store(false) - m.closed.Store(false) + resetManager := func(t *testing.T, node *testNode) { + node.manager.Close() + node.ptnManager.GetLogConsumer().Close() + + m2, err := NewManager( + testhelper.Context(t), + node.cfg.Storages, + node.manager.clusterConfig, + node.manager.managerConfig, + node.ptnManager, + node.manager.logger) + require.NoError(t, err) + + node.manager = m2 + node.close = m2.Close } t.Run("bootstrap a singular cluster", func(t *testing.T) { @@ -80,7 +85,7 @@ func TestManager_Start(t *testing.T) { defer cluster.closeAll() require.NoError(t, cluster.nodes[1].manager.Start()) - require.Equal(t, raftID(1), cluster.nodes[1].manager.firstStorage.id) + require.Equal(t, raftID(1), cluster.nodes[1].manager.firstStorage.ID()) clusterInfo, err := cluster.nodes[1].manager.ClusterInfo() require.NoError(t, err) @@ -101,7 +106,7 @@ func TestManager_Start(t *testing.T) { defer cluster.closeAll() require.NoError(t, cluster.nodes[1].manager.Start()) - require.Equal(t, raftID(1), cluster.nodes[1].manager.firstStorage.id) + require.Equal(t, raftID(1), cluster.nodes[1].manager.firstStorage.ID()) require.EqualError(t, cluster.nodes[1].manager.Start(), "raft manager already started") }) @@ -118,7 +123,7 @@ func TestManager_Start(t *testing.T) { require.NoError(t, cluster.nodes[node].manager.Start()) storage := cluster.nodes[node].manager.firstStorage - require.Equal(t, storage.id.ToUint64(), storage.persistedInfo.StorageId) + require.Equal(t, storage.ID().ToUint64(), storage.persistedInfo.StorageId) require.Equal(t, storage.name, storage.persistedInfo.Name) require.Equal(t, uint64(3), storage.persistedInfo.ReplicationFactor) require.Equal(t, node.ToUint64(), storage.persistedInfo.NodeId) @@ -127,7 +132,7 @@ func TestManager_Start(t *testing.T) { var expectedIDs, allocatedIDs []raftID for i := raftID(1); i <= raftID(numNode); i++ { expectedIDs = append(expectedIDs, i) - allocatedIDs = append(allocatedIDs, cluster.nodes[i].manager.firstStorage.id) + allocatedIDs = append(allocatedIDs, cluster.nodes[i].manager.firstStorage.ID()) } require.ElementsMatch(t, expectedIDs, allocatedIDs) @@ -141,13 +146,13 @@ func TestManager_Start(t *testing.T) { require.Equal(t, cluster.clusterID, clusterInfo.ClusterId) require.Equal(t, uint64(numNode+1), clusterInfo.NextStorageId) expectedInfo := &gitalypb.Storage{ - StorageId: storage.id.ToUint64(), + StorageId: storage.ID().ToUint64(), Name: storage.name, ReplicationFactor: 3, NodeId: node.ToUint64(), - ReplicaGroups: replicaGroups(storage.id, uint64(numNode)), + ReplicaGroups: replicaGroups(storage.ID(), uint64(numNode)), } - testhelper.ProtoEqual(t, expectedInfo, clusterInfo.Storages[storage.id.ToUint64()]) + testhelper.ProtoEqual(t, expectedInfo, clusterInfo.Storages[storage.ID().ToUint64()]) }) }) }(numNode) @@ -167,7 +172,7 @@ func TestManager_Start(t *testing.T) { require.Equal(t, true, cluster.nodes[node].manager.Ready()) storage := cluster.nodes[node].manager.firstStorage - require.Equal(t, storage.id.ToUint64(), storage.persistedInfo.StorageId) + require.Equal(t, storage.ID().ToUint64(), storage.persistedInfo.StorageId) require.Equal(t, storage.name, storage.persistedInfo.Name) require.Equal(t, uint64(3), storage.persistedInfo.ReplicationFactor) require.Equal(t, node.ToUint64(), storage.persistedInfo.NodeId) @@ -175,8 +180,8 @@ func TestManager_Start(t *testing.T) { // The quorum is reached require.ElementsMatch(t, []raftID{1, 2}, []raftID{ - cluster.nodes[1].manager.firstStorage.id, - cluster.nodes[2].manager.firstStorage.id, + cluster.nodes[1].manager.firstStorage.ID(), + cluster.nodes[2].manager.firstStorage.ID(), }) fanOut(2, func(node raftID) { @@ -184,19 +189,18 @@ func TestManager_Start(t *testing.T) { storage := mgr.firstStorage clusterInfo, err := mgr.ClusterInfo() - fmt.Printf("%+v %+v\n", node, clusterInfo) require.NoError(t, err) require.Equal(t, cluster.clusterID, clusterInfo.ClusterId) require.Equal(t, uint64(3), clusterInfo.NextStorageId) expectedInfo := &gitalypb.Storage{ - StorageId: storage.id.ToUint64(), + StorageId: storage.ID().ToUint64(), Name: storage.name, ReplicationFactor: 3, NodeId: node.ToUint64(), - ReplicaGroups: replicaGroups(storage.id, 2), + ReplicaGroups: replicaGroups(storage.ID(), 2), } - testhelper.ProtoEqual(t, expectedInfo, clusterInfo.Storages[storage.id.ToUint64()]) + testhelper.ProtoEqual(t, expectedInfo, clusterInfo.Storages[storage.ID().ToUint64()]) }) // Now the third node joins. It does not matter whether the third node bootstraps the cluster. @@ -213,13 +217,13 @@ func TestManager_Start(t *testing.T) { require.Equal(t, cluster.clusterID, clusterInfo.ClusterId) require.Equal(t, uint64(4), clusterInfo.NextStorageId) expectedInfo := &gitalypb.Storage{ - StorageId: storage.id.ToUint64(), + StorageId: storage.ID().ToUint64(), Name: storage.name, ReplicationFactor: 3, NodeId: node.ToUint64(), - ReplicaGroups: replicaGroups(storage.id, 3), + ReplicaGroups: replicaGroups(storage.ID(), 3), } - testhelper.ProtoEqual(t, expectedInfo, clusterInfo.Storages[storage.id.ToUint64()]) + testhelper.ProtoEqual(t, expectedInfo, clusterInfo.Storages[storage.ID().ToUint64()]) }) }) }(bootstrap) @@ -273,7 +277,7 @@ func TestManager_Start(t *testing.T) { require.NoError(t, cluster.nodes[node].manager.Start()) storage := cluster.nodes[node].manager.firstStorage - require.Equal(t, storage.id.ToUint64(), storage.persistedInfo.StorageId) + require.Equal(t, storage.ID().ToUint64(), storage.persistedInfo.StorageId) require.Equal(t, storage.name, storage.persistedInfo.Name) require.Equal(t, uint64(3), storage.persistedInfo.ReplicationFactor) require.Equal(t, node.ToUint64(), storage.persistedInfo.NodeId) @@ -285,8 +289,8 @@ func TestManager_Start(t *testing.T) { }) require.ElementsMatch(t, []raftID{1, 2}, []raftID{ - cluster.nodes[1].manager.firstStorage.id, - cluster.nodes[2].manager.firstStorage.id, + cluster.nodes[1].manager.firstStorage.ID(), + cluster.nodes[2].manager.firstStorage.ID(), }) fanOut(3, func(node raftID) { @@ -305,13 +309,13 @@ func TestManager_Start(t *testing.T) { require.Equal(t, uint64(3), clusterInfo.NextStorageId) expectedInfo := &gitalypb.Storage{ - StorageId: storage.id.ToUint64(), + StorageId: storage.ID().ToUint64(), Name: storage.name, ReplicationFactor: 3, NodeId: node.ToUint64(), - ReplicaGroups: replicaGroups(storage.id, 2), + ReplicaGroups: replicaGroups(storage.ID(), 2), } - testhelper.ProtoEqual(t, expectedInfo, clusterInfo.Storages[storage.id.ToUint64()]) + testhelper.ProtoEqual(t, expectedInfo, clusterInfo.Storages[storage.ID().ToUint64()]) } }) }) @@ -326,14 +330,14 @@ func TestManager_Start(t *testing.T) { require.NoError(t, cluster.nodes[node].manager.Start()) storage := cluster.nodes[node].manager.firstStorage - require.Equal(t, storage.id.ToUint64(), storage.persistedInfo.StorageId) + require.Equal(t, storage.ID().ToUint64(), storage.persistedInfo.StorageId) require.Equal(t, storage.name, storage.persistedInfo.Name) require.Equal(t, uint64(3), storage.persistedInfo.ReplicationFactor) require.Equal(t, node.ToUint64(), storage.persistedInfo.NodeId) }) for _, node := range cluster.nodes { - resetManager(t, node.manager) + resetManager(t, node) } fanOut(3, func(node raftID) { @@ -349,13 +353,13 @@ func TestManager_Start(t *testing.T) { require.Equal(t, uint64(4), clusterInfo.NextStorageId) expectedInfo := &gitalypb.Storage{ - StorageId: mgr.firstStorage.id.ToUint64(), + StorageId: mgr.firstStorage.ID().ToUint64(), Name: mgr.firstStorage.name, ReplicationFactor: 3, NodeId: node.ToUint64(), - ReplicaGroups: replicaGroups(mgr.firstStorage.id, 3), + ReplicaGroups: replicaGroups(mgr.firstStorage.ID(), 3), } - testhelper.ProtoEqual(t, expectedInfo, clusterInfo.Storages[mgr.firstStorage.id.ToUint64()]) + testhelper.ProtoEqual(t, expectedInfo, clusterInfo.Storages[mgr.firstStorage.ID().ToUint64()]) }) }) @@ -369,14 +373,14 @@ func TestManager_Start(t *testing.T) { require.NoError(t, cluster.nodes[node].manager.Start()) storage := cluster.nodes[node].manager.firstStorage - require.Equal(t, storage.id.ToUint64(), storage.persistedInfo.StorageId) + require.Equal(t, storage.ID().ToUint64(), storage.persistedInfo.StorageId) require.Equal(t, storage.name, storage.persistedInfo.Name) require.Equal(t, uint64(3), storage.persistedInfo.ReplicationFactor) require.Equal(t, node.ToUint64(), storage.persistedInfo.NodeId) }) for _, node := range cluster.nodes { - resetManager(t, node.manager) + resetManager(t, node) node.manager.managerConfig.BootstrapCluster = false } @@ -393,13 +397,13 @@ func TestManager_Start(t *testing.T) { require.Equal(t, uint64(4), clusterInfo.NextStorageId) expectedInfo := &gitalypb.Storage{ - StorageId: mgr.firstStorage.id.ToUint64(), + StorageId: mgr.firstStorage.ID().ToUint64(), Name: mgr.firstStorage.name, ReplicationFactor: 3, NodeId: node.ToUint64(), - ReplicaGroups: replicaGroups(mgr.firstStorage.id, 3), + ReplicaGroups: replicaGroups(mgr.firstStorage.ID(), 3), } - testhelper.ProtoEqual(t, expectedInfo, clusterInfo.Storages[mgr.firstStorage.id.ToUint64()]) + testhelper.ProtoEqual(t, expectedInfo, clusterInfo.Storages[mgr.firstStorage.ID().ToUint64()]) }) }) @@ -448,7 +452,7 @@ func TestManager_Start(t *testing.T) { require.NoError(t, cluster.nodes[node].manager.Start()) storage := cluster.nodes[node].manager.firstStorage - require.Equal(t, storage.id.ToUint64(), storage.persistedInfo.StorageId) + require.Equal(t, storage.ID().ToUint64(), storage.persistedInfo.StorageId) require.Equal(t, storage.name, storage.persistedInfo.Name) require.Equal(t, uint64(1), storage.persistedInfo.ReplicationFactor) require.Equal(t, node.ToUint64(), storage.persistedInfo.NodeId) @@ -456,7 +460,7 @@ func TestManager_Start(t *testing.T) { }) for _, node := range cluster.nodes { - resetManager(t, node.manager) + resetManager(t, node) node.manager.managerConfig.BootstrapCluster = false node.manager.clusterConfig.ReplicationFactor = 3 } @@ -474,13 +478,13 @@ func TestManager_Start(t *testing.T) { require.Equal(t, uint64(4), clusterInfo.NextStorageId) expectedInfo := &gitalypb.Storage{ - StorageId: mgr.firstStorage.id.ToUint64(), + StorageId: mgr.firstStorage.ID().ToUint64(), Name: mgr.firstStorage.name, ReplicationFactor: 3, // New replication factor NodeId: node.ToUint64(), - ReplicaGroups: replicaGroups(mgr.firstStorage.id, 3), // New replica groups + ReplicaGroups: replicaGroups(mgr.firstStorage.ID(), 3), // New replica groups } - testhelper.ProtoEqual(t, expectedInfo, clusterInfo.Storages[mgr.firstStorage.id.ToUint64()]) + testhelper.ProtoEqual(t, expectedInfo, clusterInfo.Storages[mgr.firstStorage.ID().ToUint64()]) }) }) } diff --git a/internal/gitaly/storage/raft/metadata_group.go b/internal/gitaly/storage/raft/metadata_group.go index 545bd6103e91eb71c21dbb6747b450860b6c35b9..09369acd47e73a441a2524199ff90d57eed0acd7 100644 --- a/internal/gitaly/storage/raft/metadata_group.go +++ b/internal/gitaly/storage/raft/metadata_group.go @@ -61,9 +61,9 @@ func newMetadataRaftGroup(ctx context.Context, nodeHost *dragonboat.NodeHost, db } groupLogger := logger.WithFields(log.Fields{ - "raft_group": "metadata", - "raft_group_id": MetadataGroupID, - "raft_replica_id": clusterCfg.NodeID, + "raft.group": "metadata", + "raft.group_id": MetadataGroupID, + "raft.replica_id": clusterCfg.NodeID, }) groupLogger.Info("joined metadata group") @@ -233,6 +233,11 @@ func (g *metadataRaftGroup) WaitReady() error { return WaitGroupReady(g.ctx, g.nodeHost, g.groupID) } +// Close closes the Raft group. +func (g *metadataRaftGroup) Close() error { + return g.nodeHost.StopReplica(g.groupConfig.ShardID, g.groupConfig.ReplicaID) +} + func (g *metadataRaftGroup) maxHeartbeatWait() time.Duration { return time.Millisecond * time.Duration(g.groupConfig.HeartbeatRTT*g.nodeHost.NodeHostConfig().RTTMillisecond) } diff --git a/internal/gitaly/storage/raft/replication_group.go b/internal/gitaly/storage/raft/replication_group.go new file mode 100644 index 0000000000000000000000000000000000000000..b4b0b70bccc3d6df7c32f7cb32998cb0c507e98c --- /dev/null +++ b/internal/gitaly/storage/raft/replication_group.go @@ -0,0 +1,314 @@ +package raft + +import ( + "context" + "fmt" + "math/rand" + "sync" + "time" + + "github.com/lni/dragonboat/v4" + dragonboatConf "github.com/lni/dragonboat/v4/config" + "github.com/lni/dragonboat/v4/statemachine" + "gitlab.com/gitlab-org/gitaly/v16/internal/backoff" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/v16/internal/log" + "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" +) + +// replicationRaftGroup is a Raft group that either manages the partitions created by the input +// storage or manages the partitions replicated from another storage. +// This type of Raft group is supposted to be establish one-way connection between an authority to +// its replicas. Each related node starts a replication Raft group as follows: +// - The group member on authority storage always holds the "leader" role. +// - The group members on replicas always hold the "non-voting" role. They can sync the list of +// partitions created by the authority but don't involve in the election. +// +// All participants join the same group ID, generated from the authority's ID. This approach +// allows the authority to communicate partition's creation and deletion events to its replicas +// reliably. The list of partitions are persisted in the state machines of each replication Raft +// group member. If the authority if offline, no further partitions are created in that authority +// storage; the clients must find another authority. +// +// This group provides some functions that allow the outsider to access the list of partitions at +// startup as well as any partition updates along the way. +// +// The startup order of members of this group matters. The group leader, aka authority, must start +// the group first. It thens registers the list of replicas to the group. Finally, replica group +// members join the group. +type replicationRaftGroup struct { + Group + sync.Mutex + cancel context.CancelFunc + backoffProfile *backoff.Exponential + + // authority tells if this Raft group is for an authority or not. + authority bool + storageID raftID + storageName string + + initialized bool + // pendingOperations contains the list of partitions waiting for processed. + pendingOperations []*partitionOperation + // newOperation is a channel telling if there is a new operation. + newOperation chan struct{} +} + +// newReplicationGroup starts an instance of the replication Raft group. +func newReplicationGroup( + ctx context.Context, + authority bool, + storage *gitalypb.Storage, + nodeHost *dragonboat.NodeHost, + db dbAccessor, + clusterCfg config.Raft, + logger log.Logger, +) (*replicationRaftGroup, error) { + ctx, cancel := context.WithCancel(ctx) + + // Both authority and replicas share the same group ID, which is generated from the authority ID. + groupID := AuthorityGroupID(raftID(storage.GetStorageId())) + groupCfg := dragonboatConf.Config{ + ReplicaID: clusterCfg.NodeID, + ShardID: groupID.ToUint64(), + ElectionRTT: clusterCfg.ElectionTicks, + HeartbeatRTT: clusterCfg.HeartbeatTicks, + // Replicas must have non-voting role. + IsNonVoting: !authority, + WaitReady: true, + } + + backoffProfile := backoff.NewDefaultExponential(rand.New(rand.NewSource(time.Now().UnixNano()))) + backoffProfile.BaseDelay = time.Duration(clusterCfg.ElectionTicks) * time.Duration(clusterCfg.RTTMilliseconds) * time.Microsecond + + groupLogger := logger.WithFields(log.Fields{ + "raft.group": "replication", + "raft.group_id": groupID, + "raft.replica_id": clusterCfg.NodeID, + "raft.authority": authority, + "raft.storage_id": storage.GetStorageId(), + "raft.storage_name": storage.GetName(), + }) + + group := &replicationRaftGroup{ + Group: Group{ + ctx: ctx, + groupID: groupID, + replicaID: raftID(clusterCfg.NodeID), + clusterConfig: clusterCfg, + groupConfig: groupCfg, + logger: groupLogger, + nodeHost: nodeHost, + }, + cancel: cancel, + storageID: raftID(storage.GetStorageId()), + storageName: storage.GetName(), + authority: authority, + backoffProfile: backoffProfile, + newOperation: make(chan struct{}, 1), + } + + initialMembers := map[uint64]string{} + // The authority group is the only initial member of this group. Replicated groups don't need the list of + // initial members. They join the group as non-voting members. + if authority { + initialMembers[clusterCfg.NodeID] = clusterCfg.RaftAddr + } + + if err := nodeHost.StartOnDiskReplica(initialMembers, !authority, func(_groupID, _replicaID uint64) statemachine.IOnDiskStateMachine { + return newReplicationStateMachine(ctx, db, authority, raftID(storage.GetStorageId()), group.appendPartitionOps) + }, groupCfg); err != nil { + return nil, fmt.Errorf("joining replication group: %w", err) + } + + return group, nil +} + +// Stop stops the group by cancelling the context and closing connecting channels. +func (g *replicationRaftGroup) Close() (returnedErr error) { + g.cancel() + if err := g.nodeHost.StopReplica(g.groupConfig.ShardID, g.groupConfig.ReplicaID); err != nil { + returnedErr = err + } + close(g.newOperation) + return +} + +// PollPartitions returns the list of pending operations. If there is any partition, the list +// returns immediately. Otherwise, the caller is blocked until new operation is added or the context +// is cancelled. +// If this function is called for the first time after the group has just started, it returns all +// partitions in the statemachine. Subsequent calls return new updates from the authority member. +func (g *replicationRaftGroup) PollPartitions() ([]*partitionOperation, error) { + if err := g.initialize(); err != nil { + return nil, fmt.Errorf("initializing replication Raft group") + } + + for { + g.Lock() + // If there is any pending operation, returns immediately. + if len(g.pendingOperations) > 0 { + partitions := g.pendingOperations + g.pendingOperations = []*partitionOperation{} + g.Unlock() + return partitions, nil + } + g.Unlock() + + // Other wise, block until then. + select { + case <-g.ctx.Done(): + return nil, nil + case <-g.newOperation: + } + } +} + +// WaitReady waits until the replication group is ready or its context is cancelled. +func (g *replicationRaftGroup) WaitReady() error { + return WaitGroupReady(g.ctx, g.nodeHost, g.groupID) +} + +// RegisterReplica adds another node as the replica of this authority storage. Without this call, +// the replication Raft group members on other nodes are not able to join the group. This function +// doesn't kick off replication. The replication starts when other nodes kick off their counterpart +// replication Raft groups. +func (g *replicationRaftGroup) RegisterReplica(ctx context.Context, nodeID raftID, replicaAddr string) error { + if !g.authority { + return fmt.Errorf("only authority storage could register its replica") + } + membership, err := g.nodeHost.SyncGetShardMembership(ctx, g.groupID.ToUint64()) + if err != nil { + return fmt.Errorf("fetching shard membership: %w", err) + } + if _, exist := membership.NonVotings[nodeID.ToUint64()]; exist { + return nil + } + + if err := g.nodeHost.SyncRequestAddNonVoting(ctx, g.groupID.ToUint64(), nodeID.ToUint64(), replicaAddr, membership.ConfigChangeID); err != nil { + return fmt.Errorf("requesting to add node %d (%s) as a replica of group %d (storage ID %d): %w", nodeID, replicaAddr, g.groupID, g.storageID, err) + } + return nil +} + +// StorageName is a getter that returns the group's target storage name. +func (g *replicationRaftGroup) StorageName() string { + return g.storageName +} + +// StorageID is a getter that returns the group's target storage ID. +func (g *replicationRaftGroup) StorageID() raftID { + return g.storageID +} + +// initialize pushes all partitions in the state machine to the list of pending operations. It runs +// once when a caller gets the list of partitions after group has just started. +func (g *replicationRaftGroup) initialize() error { + g.Lock() + defer g.Unlock() + + if g.initialized { + return nil + } + + if err := g.WaitReady(); err != nil { + return err + } + + // Push all registered partitions to the pending list. + partitions, err := g.GetRegisteredPartitions() + if err != nil { + return err + } + if len(partitions) > 0 { + for _, partition := range partitions { + g.pendingOperations = append(g.pendingOperations, &partitionOperation{ + op: opTrackPartition, + partition: partition, + }) + } + g.signalNewPartition() + } + g.initialized = true + return nil +} + +func (g *replicationRaftGroup) signalNewPartition() { + select { + case g.newOperation <- struct{}{}: + default: + // If the signal channel is full, no need to wait. + } +} + +func (g *replicationRaftGroup) appendPartitionOps(ops []*partitionOperation) { + g.Lock() + defer g.Unlock() + + g.pendingOperations = append(g.pendingOperations, ops...) + g.signalNewPartition() +} + +// RegisterPartition adds a partition to the tracking list of the replication Raft group. +func (g *replicationRaftGroup) RegisterPartition(partitionID raftID, relativePath string) (*gitalypb.Partition, error) { + result, response, err := g.requestRegisterPartition(partitionID, relativePath) + if err != nil { + return nil, fmt.Errorf("sending RegisterPartitionRequest: %w", err) + } + + switch result { + case resultRegisterPartitionSuccessfully: + return response.GetPartition(), nil + case resultRegisterPartitionExisted: + return nil, structerr.New("partition already existed").WithMetadataItems( + structerr.MetadataItem{Key: "partition_id", Value: partitionID}, + structerr.MetadataItem{Key: "relative_path", Value: relativePath}, + ) + default: + return nil, fmt.Errorf("unknown result code %d", result) + } +} + +// GetRegisteredPartitions returns the list of all registered partitions. Warning. This function is +// not suitable to be called too frequently. +func (g *replicationRaftGroup) GetRegisteredPartitions() ([]*gitalypb.Partition, error) { + response, err := g.requestGetPartitions() + if err != nil { + return nil, fmt.Errorf("sending GetRegisteredPartitionsRequest: %w", err) + } + return response.GetPartitions(), nil +} + +func (g *replicationRaftGroup) requestRegisterPartition(partitionID raftID, relativePath string) (updateResult, *gitalypb.RegisterPartitionResponse, error) { + requester := NewRequester[*gitalypb.RegisterPartitionRequest, *gitalypb.RegisterPartitionResponse]( + g.nodeHost, g.groupID, g.logger, requestOption{ + retry: defaultRetry, + timeout: g.maxNextElectionWait(), + exponential: g.backoffProfile, + }, + ) + return requester.SyncWrite(g.ctx, &gitalypb.RegisterPartitionRequest{ + Partition: &gitalypb.Partition{ + AuthorityName: g.storageName, + AuthorityId: g.storageID.ToUint64(), + PartitionId: partitionID.ToUint64(), + RelativePath: relativePath, + }, + }) +} + +func (g *replicationRaftGroup) requestGetPartitions() (*gitalypb.GetRegisteredPartitionsResponse, error) { + requester := NewRequester[*gitalypb.GetRegisteredPartitionsRequest, *gitalypb.GetRegisteredPartitionsResponse]( + g.nodeHost, g.groupID, g.logger, requestOption{ + retry: defaultRetry, + timeout: g.maxNextElectionWait(), + exponential: g.backoffProfile, + }, + ) + return requester.SyncRead(g.ctx, &gitalypb.GetRegisteredPartitionsRequest{}) +} + +func (g *replicationRaftGroup) maxNextElectionWait() time.Duration { + return time.Millisecond * time.Duration(g.groupConfig.ElectionRTT*g.nodeHost.NodeHostConfig().RTTMillisecond) +} diff --git a/internal/gitaly/storage/raft/replication_statemachine.go b/internal/gitaly/storage/raft/replication_statemachine.go new file mode 100644 index 0000000000000000000000000000000000000000..e7de8f958d5300274f6bd8ce35d33255c85f9cba --- /dev/null +++ b/internal/gitaly/storage/raft/replication_statemachine.go @@ -0,0 +1,284 @@ +package raft + +import ( + "context" + "errors" + "fmt" + "io" + + "github.com/dgraph-io/badger/v4" + "github.com/lni/dragonboat/v4/statemachine" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" + "google.golang.org/protobuf/proto" +) + +// partitionOpType is an operation type, denoting the operation to perform for a partition +// managed by this Raft group. +type partitionOpType byte + +const ( + // opTrackPartition tells the observer to track a partition. There are two occasions: + // - When the Raft group is restarted. + // - When the Raft group detects a new partition creation. + opTrackPartition = partitionOpType(iota) +) + +// partitionOperation defines an operation to perform on a partition. From the scope of replication +// metadata, only partition creation and deletion are supported. +type partitionOperation struct { + op partitionOpType + partition *gitalypb.Partition +} + +const ( + resultRegisterPartitionSuccessfully = updateResult(iota) + resultRegisterPartitionExisted +) + +var keyPartitionsList = []byte("partitions") + +// replicationStateMachine is a state machine that manages the data for a replication Raft group. +// It manages two types of objects: +// - If the replication Raft group is for an authority, the state machine manages the partitions +// created by that authority storage. +// - If the replication Raft group is for a replica, the state machine manages the partitions the +// replica should manage. +// The statemachine triggers the configured callback when it receives new creation/deletion +// operations. +type replicationStateMachine struct { + ctx context.Context + db dbAccessor + authority bool + storageID raftID + callback func([]*partitionOperation) + callbackOps []*partitionOperation +} + +// Open initializes the replication state machine and returns the last applied index. The data are +// persisted to disk by the keyvalue package, so we don't maintain a separate in-memory +// representation here. +func (s *replicationStateMachine) Open(stopC <-chan struct{}) (uint64, error) { + lastApplied, err := s.LastApplied() + if err != nil { + return 0, fmt.Errorf("reading last index from DB: %w", err) + } + + select { + case <-stopC: + return 0, statemachine.ErrOpenStopped + default: + return lastApplied.ToUint64(), nil + } +} + +// LastApplied returns the last applied index of the state machine. +func (s *replicationStateMachine) LastApplied() (lastApplied raftID, err error) { + return lastApplied, s.db.read(s.ctx, func(txn keyvalue.ReadWriter) error { + lastApplied, err = s.getLastIndex(txn) + if err != nil { + err = fmt.Errorf("getting last index from DB: %w", err) + } + return err + }) +} + +// Update applies entry to replication group. It supports the only operation now: +// - *gitalypb.RegisterPartitionRequest +func (s *replicationStateMachine) Update(entries []statemachine.Entry) ([]statemachine.Entry, error) { + defer func() { + // Always reset the list of operations for callback, regardless of the update result. + s.callbackOps = []*partitionOperation{} + }() + var returnedEntries []statemachine.Entry + + if err := s.db.write(s.ctx, func(txn keyvalue.ReadWriter) error { + entries, err := s.update(txn, entries) + if err != nil { + return err + } + returnedEntries = entries + return nil + }); err != nil { + return nil, err + } + + if len(s.callbackOps) > 0 { + s.callback(s.callbackOps) + } + return returnedEntries, nil +} + +func (s *replicationStateMachine) update(txn keyvalue.ReadWriter, entries []statemachine.Entry) (_ []statemachine.Entry, returnedErr error) { + lastApplied, err := s.getLastIndex(txn) + if err != nil { + return nil, fmt.Errorf("reading last index from DB: %w", err) + } + + var returnedEntries []statemachine.Entry + for _, entry := range entries { + if lastApplied >= raftID(entry.Index) { + return nil, fmt.Errorf("log entry with previously applied index, last applied %d entry index %d", lastApplied, entry.Index) + } + result, err := s.updateEntry(txn, &entry) + if err != nil { + return nil, fmt.Errorf("updating entry index %d: %w", entry.Index, err) + } + returnedEntries = append(returnedEntries, statemachine.Entry{ + Index: entry.Index, + Result: *result, + }) + lastApplied = raftID(entry.Index) + } + if err := txn.Set(keyLastApplied, lastApplied.MarshalBinary()); err != nil { + return nil, fmt.Errorf("setting last index: %w", err) + } + return returnedEntries, nil +} + +func (s *replicationStateMachine) updateEntry(txn keyvalue.ReadWriter, entry *statemachine.Entry) (*statemachine.Result, error) { + var result statemachine.Result + + msg, err := anyProtoUnmarshal(entry.Cmd) + if err != nil { + return nil, fmt.Errorf("unmarshalling command: %w", err) + } + + switch req := msg.(type) { + case *gitalypb.RegisterPartitionRequest: + returnedPartition, err := s.upsertPartition(txn, req.GetPartition()) + if err != nil { + return nil, fmt.Errorf("handling RegisterPartitionRequest: %w", err) + } + + if returnedPartition == nil { + returnedPartition = req.GetPartition() + result.Value = uint64(resultRegisterPartitionSuccessfully) + + s.callbackOps = append(s.callbackOps, &partitionOperation{ + op: opTrackPartition, + partition: returnedPartition, + }) + } else { + result.Value = uint64(resultRegisterPartitionExisted) + } + + response, err := anyProtoMarshal(&gitalypb.RegisterPartitionResponse{Partition: returnedPartition}) + if err != nil { + return nil, fmt.Errorf("marshaling RegisterPartitionResponse: %w", err) + } + result.Data = response + + return &result, nil + default: + return nil, fmt.Errorf("request not supported: %s", msg.ProtoReflect().Descriptor().Name()) + } +} + +// Lookup queries the state machine. It supports the only operation now: +// - *gitalypb.GetRegisteredPartitionsRequest +func (s *replicationStateMachine) Lookup(cmd interface{}) (interface{}, error) { + switch cmd.(type) { + case *gitalypb.GetRegisteredPartitionsRequest: + response := &gitalypb.GetRegisteredPartitionsResponse{} + + if err := s.db.read(s.ctx, func(txn keyvalue.ReadWriter) error { + it := txn.NewIterator(keyvalue.IteratorOptions{ + Prefix: []byte(fmt.Sprintf("%s/", keyPartitionsList)), + }) + defer it.Close() + + for it.Rewind(); it.Valid(); it.Next() { + if err := it.Item().Value(func(value []byte) error { + var partition gitalypb.Partition + if err := proto.Unmarshal(value, &partition); err != nil { + return fmt.Errorf("unmarshalling partition from DB: %w", err) + } + response.Partitions = append(response.Partitions, &partition) + + return nil + }); err != nil { + return fmt.Errorf("iterating through partitions: %w", err) + } + } + return nil + }); err != nil { + return nil, fmt.Errorf("reading registered partitions: %w", err) + } + + return response, nil + default: + return nil, fmt.Errorf("request not supported: %T", cmd) + } +} + +// Sync is a no-op because our DB flushes to disk on commit. +func (s *replicationStateMachine) Sync() error { return nil } + +// PrepareSnapshot is a no-op until we start supporting snapshots. +func (s *replicationStateMachine) PrepareSnapshot() (interface{}, error) { + return nil, fmt.Errorf("PrepareSnapshot hasn't been not supported") +} + +// SaveSnapshot is a no-op until we start supporting snapshots. +func (s *replicationStateMachine) SaveSnapshot(_ interface{}, _ io.Writer, _ <-chan struct{}) error { + return fmt.Errorf("SaveSnapshot hasn't been not supported") +} + +// RecoverFromSnapshot is a no-op until we start supporting snapshots. +func (s *replicationStateMachine) RecoverFromSnapshot(_ io.Reader, _ <-chan struct{}) error { + return fmt.Errorf("RecoverFromSnapshot hasn't been not supported") +} + +// Close is a no-op because our DB is managed externally. +func (s *replicationStateMachine) Close() error { return nil } + +func (s *replicationStateMachine) getLastIndex(txn keyvalue.ReadWriter) (raftID, error) { + var appliedIndex raftID + + item, err := txn.Get(keyLastApplied) + if err != nil { + if errors.Is(err, badger.ErrKeyNotFound) { + return 0, nil + } + return 0, err + } + return appliedIndex, item.Value(func(value []byte) error { + appliedIndex.UnmarshalBinary(value) + return nil + }) +} + +func (s *replicationStateMachine) upsertPartition(txn keyvalue.ReadWriter, partition *gitalypb.Partition) (*gitalypb.Partition, error) { + key := s.partitionKey(partition) + item, err := txn.Get(key) + if err != nil { + if errors.Is(err, badger.ErrKeyNotFound) { + marshaledPartition, err := proto.Marshal(partition) + if err != nil { + return nil, fmt.Errorf("marshaling partition: %w", err) + } + if err := txn.Set(key, marshaledPartition); err != nil { + return nil, fmt.Errorf("saving partition %d of authority storage %d: %w", partition.GetPartitionId(), partition.GetAuthorityId(), err) + } + return nil, nil + } + return nil, err + } + + var existingPartition gitalypb.Partition + return &existingPartition, item.Value(func(value []byte) error { + return proto.Unmarshal(value, &existingPartition) + }) +} + +func (s *replicationStateMachine) partitionKey(partition *gitalypb.Partition) []byte { + return []byte(fmt.Sprintf("%s/%s", keyPartitionsList, raftID(partition.GetPartitionId()).MarshalBinary())) +} + +var _ = Statemachine(&replicationStateMachine{}) + +// newReplicationStateMachine returns a state machine that manages data for a replication Raft group. +func newReplicationStateMachine(ctx context.Context, db dbAccessor, authority bool, storageID raftID, callback func([]*partitionOperation)) *replicationStateMachine { + return &replicationStateMachine{ctx: ctx, db: db, authority: authority, storageID: storageID, callback: callback} +} diff --git a/internal/gitaly/storage/raft/replicator.go b/internal/gitaly/storage/raft/replicator.go new file mode 100644 index 0000000000000000000000000000000000000000..162652e49b3fc2d445c008f8f66f3888e1a38133 --- /dev/null +++ b/internal/gitaly/storage/raft/replicator.go @@ -0,0 +1,358 @@ +package raft + +import ( + "context" + "errors" + "fmt" + "slices" + "time" + + "gitlab.com/gitlab-org/gitaly/v16/internal/log" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" +) + +// defaultReplicatorTimeout defines the default timeout period used for replicator's operations. +const defaultReplicatorTimeout = 30 * time.Second + +// replica is an interface that represents avavailable actions of a replica group. +type replica interface { + PollPartitions() ([]*partitionOperation, error) + WaitReady() error + StorageID() raftID + StorageName() string + Close() error +} + +// authority is an interface that represents the actions of an authority group. +type authority interface { + replica + + // RegisterPartition registers a partition with the authority. + RegisterPartition(partitionID raftID, relativePath string) (*gitalypb.Partition, error) + // RegisterReplica registers a replica to the list of known replicas of the authority. + RegisterReplica(ctx context.Context, nodeID raftID, replicaAddr string) error +} + +// storageTracker tracks the partition deletion/creation activities of a replica. +type storageTracker struct { + ctx context.Context + cancel context.CancelFunc + done chan struct{} + poller replica +} + +type replicatorConfig struct { + initAuthorityGroup func(storage *gitalypb.Storage) (authority, error) + initReplicaGroup func(authorityID raftID, storage *gitalypb.Storage) (replica, error) + initPartitionGroup func(authorityID raftID, partitionID raftID, relativePath string) error + getNodeAddr func(nodeID raftID) (string, error) +} + +// replicator manages the replication activities of a storage, including: +// - Apply the list replica groups. +// - Poll the list of partitions under management and start/stop corresponding partition groups accordingly. +// - Manage the authority storage of this node. +// - Push any changes from partitions of that authority storages if any. +// - Manage the replicas assigned to this storage. +// - Poll and apply any changes from partitions of replicas if any. +// +/* + ┌─►Partition Group N ──► Other nodes + │ + ├─►Partition Group 2 ──► Other nodes + Push changes │ + ├─►Partition Group 1 ──► Other nodes + │ + │ + ┌─────► Authority Group + │ + Push authority changes │ Poll Changes +WAL ────────────────────► Replicator ◄─────────────────────────────┐ + ▲ │ │ │ + │ │ │ │ + └─────────────────────────┘ ├────── Replica GroupsA │ + Apply replica changes │ │ │ + │ │ │ + │ ├──Partition Group 1 │ │ + │ │ │ │ + │ ├──Partition Group 2 ├───┤ + │ │ │ │ + │ └──Partition Group N │ │ + │ │ + └────── Replica GroupsB │ + │ │ + │ │ + ├──Partition Group 1 │ │ + │ │ │ + ├──Partition Group 2 ├───┘ + │ │ + └──Partition Group N │ +*/ +type replicator struct { + ctx context.Context + logger log.Logger + config replicatorConfig + authorityID raftID + authority authority + storageTrackers map[raftID]*storageTracker +} + +// ApplyReplicaGroups applies the list of replica groups. +func (r *replicator) ApplyReplicaGroups(cluster *gitalypb.Cluster) (int, error) { + var changes int + + authorityStorageInfo := cluster.Storages[r.authorityID.ToUint64()] + if authorityStorageInfo == nil { + return 0, fmt.Errorf("storage with ID %d does not exist", r.authorityID) + } + + replicatingStorages := r.stringifyReplicatingStorages() + storagesToReplicate := r.storagesToReplicate(cluster) + + // First, initialize the authority storage, which is the storage managed by this node. + if r.authority == nil { + authority, err := r.config.initAuthorityGroup(authorityStorageInfo) + if err != nil { + return changes, fmt.Errorf("creating authority Raft group: %w", err) + } + + if err := authority.WaitReady(); err != nil { + if closingErr := authority.Close(); closingErr != nil { + return changes, errors.Join(err, closingErr) + } + return changes, fmt.Errorf("waiting authority Raft group: %w", err) + } + + // Track this storage. As the group does not communicate with this replicator directly, we + // need to poll from its statemachine. + r.trackStorage(r.authorityID, authority) + r.authority = authority + changes++ + + r.logger.WithFields(log.Fields{ + "raft.storage_id": r.authorityID, + "raft.storage_name": authorityStorageInfo.GetName(), + }).Info("tracking authority storage") + } + + // Register all replicas of this authority storage. The replication won't start until a + // replication Raft groups on other nodes start. This node doesn't remove obsolete replicas on + // other nodes although it could. It lets other nodes shutdown obsolete replicas themselves so + // that they have a chance to clean up resources. + for _, replicaID := range authorityStorageInfo.ReplicaGroups { + if err := r.registerReplica(replicaID, cluster); err != nil { + return changes, fmt.Errorf("registering replica: %w", err) + } + } + + // Untrack obsoleted replicas when the list of replica groups changes. At the moment, the + // replicator doesn't clean up data. It simply stops listening to new changes. Data deletion + // will be handled in https://gitlab.com/gitlab-org/gitaly/-/issues/6248. + for storageID := range r.storageTrackers { + if storageID == r.authorityID { + continue + } + if _, exist := storagesToReplicate[storageID]; !exist { + changes++ + if err := r.untrackStorage(storageID); err != nil { + return 0, fmt.Errorf("untrack storage ID %d: %w", storageID, err) + } + r.logger.WithFields(log.Fields{"raft.storage_id": storageID}).Info("untracked storage") + } + } + + // Listen to existing/new partitions from tracked replica. + for storageID, storageInfo := range storagesToReplicate { + if _, exist := r.storageTrackers[storageID]; exist { + continue + } + changes++ + replicated, err := r.config.initReplicaGroup(r.authorityID, storageInfo) + if err != nil { + return 0, fmt.Errorf("creating replication Raft group: %w", err) + } + r.trackStorage(storageID, replicated) + r.logger.WithFields(log.Fields{ + "raft.storage_id": storageID, + "raft.storage_name": storageInfo.GetName(), + }).Info("tracking storage") + } + + if changes != 0 { + r.logger.WithFields(log.Fields{ + "raft.replicating_storages": replicatingStorages, + "raft.storages_to_replicate": r.stringifyStorageMap(storagesToReplicate), + }).Info(fmt.Sprintf("applied replica groups, %d changes", changes)) + } + return changes, nil +} + +// BroadcastNewPartition is triggered by a caller. It makes a call to the authority group to +// register the new partition. This partition is broadcasted to corresponding replica Raft groups. +func (r *replicator) BroadcastNewPartition(partitionID raftID, relativePath string) error { + if _, err := r.authority.RegisterPartition(partitionID, relativePath); err != nil { + return fmt.Errorf("registering partition: %w", err) + } + return nil +} + +// Close stops all activities of the replicator. This function exits after background goroutines finish. +func (r *replicator) Close() error { + var mergedErr error + + for storageID := range r.storageTrackers { + if err := r.untrackStorage(storageID); err != nil { + if mergedErr == nil { + mergedErr = err + } else { + mergedErr = errors.Join(mergedErr, err) + } + } + } + return mergedErr +} + +func (r *replicator) storagesToReplicate(cluster *gitalypb.Cluster) map[raftID]*gitalypb.Storage { + replications := map[raftID]*gitalypb.Storage{} + for storageID, storage := range cluster.Storages { + if slices.Contains(storage.ReplicaGroups, r.authorityID.ToUint64()) { + replications[raftID(storageID)] = storage + } + } + return replications +} + +func (r *replicator) stringifyStorageMap(m map[raftID]*gitalypb.Storage) map[raftID]string { + result := map[raftID]string{} + for k, v := range m { + result[k] = v.GetName() + } + return result +} + +func (r *replicator) stringifyReplicatingStorages() map[raftID]string { + result := map[raftID]string{} + for k, v := range r.storageTrackers { + if k != r.authorityID { + result[k] = v.poller.StorageName() + continue + } + } + return result +} + +func (r *replicator) trackStorage(storageID raftID, poller replica) { + ctx, cancel := context.WithCancel(r.ctx) + r.storageTrackers[storageID] = &storageTracker{ + ctx: ctx, + cancel: cancel, + poller: poller, + done: make(chan struct{}), + } + go r.pollPartitionsFromStorage(r.storageTrackers[storageID]) +} + +func (r *replicator) untrackStorage(storageID raftID) error { + tracker := r.storageTrackers[storageID] + tracker.cancel() + defer func() { + delete(r.storageTrackers, storageID) + }() + + if err := tracker.poller.Close(); err != nil { + return fmt.Errorf("closing poller: %w", err) + } + + select { + case <-time.After(defaultReplicatorTimeout): + return fmt.Errorf("deadline exceeded while untracking storage") + case <-tracker.done: + return nil + } +} + +func (r *replicator) pollPartitionsFromStorage(tracker *storageTracker) { + defer close(tracker.done) + + for { + select { + case <-tracker.ctx.Done(): + return + default: + } + + partitions, err := tracker.poller.PollPartitions() + if err != nil { + if errors.Is(err, context.Canceled) { + return + } + r.logger.WithFields(log.Fields{ + "raft.storage_id": tracker.poller.StorageID(), + "raft.storage_name": tracker.poller.StorageName(), + }).WithError(err).Error("fail to poll partritions from storage") + continue + } + if len(partitions) == 0 { + continue + } + for _, operation := range partitions { + partition := operation.partition + if partition == nil { + r.logger.Error("tracking unknown partition") + continue + } + switch operation.op { + case opTrackPartition: + fields := log.Fields{ + "raft.authority_id": partition.GetAuthorityId(), + "raft.authority_name": partition.GetAuthorityName(), + "raft.storage_id": r.authority.StorageID(), + "raft.storage_name": r.authority.StorageName(), + "raft.partition_id": partition.GetPartitionId(), + "raft.relative_path": partition.GetRelativePath(), + } + if err := r.config.initPartitionGroup( + raftID(partition.GetAuthorityId()), + raftID(partition.GetPartitionId()), + partition.GetRelativePath(), + ); err != nil { + r.logger.WithFields(fields).WithError(err).Error("fail to initialize partition Raft group") + continue + } + r.logger.WithFields(fields).Debug("tracking partition") + default: + r.logger.WithFields(log.Fields{ + "raft.partition_id": partition.GetPartitionId(), + "raft.relative_path": partition.GetRelativePath(), + }).Error("unsupported op") + } + } + } +} + +func (r *replicator) registerReplica(replicaID uint64, cluster *gitalypb.Cluster) error { + replicaInfo := cluster.Storages[replicaID] + if replicaInfo == nil { + return fmt.Errorf("replica ID %d of storage ID %d not found", replicaID, r.authorityID) + } + addr, err := r.config.getNodeAddr(raftID(replicaInfo.GetNodeId())) + if err != nil { + return fmt.Errorf("getting storage address: %w", err) + } + ctx, cancel := context.WithTimeout(r.ctx, defaultReplicatorTimeout) + defer cancel() + return r.authority.RegisterReplica(ctx, raftID(replicaInfo.GetNodeId()), addr) +} + +func newReplicator(ctx context.Context, authorityID raftID, logger log.Logger, config replicatorConfig) *replicator { + return &replicator{ + ctx: ctx, + authorityID: authorityID, + logger: logger.WithFields(log.Fields{ + "raft.component": "replicator", + "raft.storage_id": authorityID, + }), + config: config, + storageTrackers: map[raftID]*storageTracker{}, + } +} diff --git a/internal/gitaly/storage/raft/storage.go b/internal/gitaly/storage/raft/storage.go index d981bcabfeaa2da477da5d79b5292978e70b3a72..5d476cdd2c56ef19b1c161b50cd5be5369f56343 100644 --- a/internal/gitaly/storage/raft/storage.go +++ b/internal/gitaly/storage/raft/storage.go @@ -3,7 +3,6 @@ package raft import ( "context" "errors" - "fmt" "github.com/dgraph-io/badger/v4" "github.com/lni/dragonboat/v4" @@ -17,12 +16,12 @@ import ( // keyvalue.Transactioner for each Raft group, allowing the Raft groups to store their data in the // underlying keyvalue store. type storageManager struct { - id raftID name string ptnMgr *storagemgr.PartitionManager db dbAccessor nodeHost *dragonboat.NodeHost persistedInfo *gitalypb.Storage + replicator *replicator } // newStorageManager returns an instance of storage manager. @@ -36,7 +35,23 @@ func newStorageManager(name string, ptnMgr *storagemgr.PartitionManager, nodeHos } // Close closes the storage manager. -func (m *storageManager) Close() { m.nodeHost.Close() } +func (m *storageManager) Close() (returnedErr error) { + if m.replicator != nil { + if err := m.replicator.Close(); err != nil { + returnedErr = err + } + } + m.nodeHost.Close() + return +} + +// ID returns the ID of the storage from persistent storage. +func (m *storageManager) ID() raftID { + if m.persistedInfo == nil { + return 0 + } + return raftID(m.persistedInfo.GetStorageId()) +} func (m *storageManager) loadStorageInfo(ctx context.Context) error { return m.db.read(ctx, func(txn keyvalue.ReadWriter) error { @@ -53,7 +68,6 @@ func (m *storageManager) loadStorageInfo(ctx context.Context) error { return err } m.persistedInfo = &persistedInfo - m.id = raftID(m.persistedInfo.StorageId) return nil }) }) @@ -61,12 +75,6 @@ func (m *storageManager) loadStorageInfo(ctx context.Context) error { func (m *storageManager) saveStorageInfo(ctx context.Context, storage *gitalypb.Storage) error { return m.db.write(ctx, func(txn keyvalue.ReadWriter) error { - _, err := txn.Get([]byte("storage")) - if err == nil { - return fmt.Errorf("storage already exists") - } else if !errors.Is(err, badger.ErrKeyNotFound) { - return err - } marshaled, err := proto.Marshal(storage) if err != nil { return err @@ -75,18 +83,14 @@ func (m *storageManager) saveStorageInfo(ctx context.Context, storage *gitalypb. return err } m.persistedInfo = storage - m.id = raftID(m.persistedInfo.StorageId) return nil }) } -// clearStorageInfo clears the storage info inside the in-memory storage of the storage manager. It -// does not clean the persisted info the DB. -func (m *storageManager) clearStorageInfo() { - m.id = 0 - m.persistedInfo = nil -} - func (m *storageManager) dbForMetadataGroup() dbAccessor { return dbForMetadataGroup(m.ptnMgr, m.name) } + +func (m *storageManager) dbForReplicationGroup(targetID raftID) dbAccessor { + return dbForReplicationGroup(m.ptnMgr, m.ID(), m.name, targetID) +} diff --git a/internal/gitaly/storage/raft/testhelper_test.go b/internal/gitaly/storage/raft/testhelper_test.go index e853a44a1869acbb90f7809e4633788271caad1d..919ab5528ff598f982333373dab836dad18588ed 100644 --- a/internal/gitaly/storage/raft/testhelper_test.go +++ b/internal/gitaly/storage/raft/testhelper_test.go @@ -125,10 +125,15 @@ var dragonboatTestingProfile = func() dragonboatConfig.ExpertConfig { }() type testNode struct { - nodeHost *dragonboat.NodeHost - manager *Manager + // Variables for a real manager. + cfg config.Cfg + manager *Manager + ptnManager *storagemgr.PartitionManager + close func() + + // Mock manager's functionalities. sm *testStateMachine - close func() + nodeHost *dragonboat.NodeHost } type testRaftCluster struct { @@ -355,7 +360,7 @@ func setupTestDBManager(t *testing.T, cfg config.Cfg) *keyvalue.DBManager { return dbMgr } -func setupTestPartitionManager(t *testing.T, cfg config.Cfg) *storagemgr.PartitionManager { +func setupTestDBManagerAndPartitionManager(t *testing.T, cfg config.Cfg) (*keyvalue.DBManager, *storagemgr.PartitionManager) { logger := testhelper.NewLogger(t) dbMgr := setupTestDBManager(t, cfg) @@ -365,10 +370,20 @@ func setupTestPartitionManager(t *testing.T, cfg config.Cfg) *storagemgr.Partiti localRepoFactory := localrepo.NewFactory(logger, config.NewLocator(cfg), cmdFactory, catfileCache) - partitionManager, err := storagemgr.NewPartitionManager(testhelper.Context(t), cfg.Storages, cmdFactory, localRepoFactory, logger, dbMgr, cfg.Prometheus, nil) + consumerFactory := func(lma storagemgr.LogManagerAccessor) storagemgr.LogConsumer { + return NewLogConsumer(testhelper.Context(t), lma, logger) + } + + partitionManager, err := storagemgr.NewPartitionManager(testhelper.Context(t), cfg.Storages, cmdFactory, localRepoFactory, logger, dbMgr, cfg.Prometheus, consumerFactory) require.NoError(t, err) t.Cleanup(partitionManager.Close) + return dbMgr, partitionManager +} + +func setupTestPartitionManager(t *testing.T, cfg config.Cfg) *storagemgr.PartitionManager { + _, partitionManager := setupTestDBManagerAndPartitionManager(t, cfg) + return partitionManager } diff --git a/internal/gitaly/storage/storagemgr/partition_manager.go b/internal/gitaly/storage/storagemgr/partition_manager.go index 0aa49fcaf4945b90c8ffee93dbbfdd337de88281..dca012bc443494002ad4fb7700d3db4ae2658b11 100644 --- a/internal/gitaly/storage/storagemgr/partition_manager.go +++ b/internal/gitaly/storage/storagemgr/partition_manager.go @@ -54,8 +54,10 @@ type PartitionManager struct { // transactionManagerFactory is a factory to create TransactionManagers. This shouldn't ever be changed // during normal operation, but can be used to adjust the transaction manager's behaviour in tests. transactionManagerFactory transactionManagerFactory - // consumerCleanup closes the LogConsumer. - consumerCleanup func() + // consumer is a hook that will be passed to the each TransactionManager. The Transactionmanager + // notifies the consumer of new transactions by invoking the NotifyNewTransaction method after + // they are committed. + consumer LogConsumer // metrics accounts for all metrics of transaction operations. It will be // passed down to each transaction manager and is shared between them. The // metrics must be registered to be collected by prometheus collector. @@ -284,13 +286,10 @@ func NewPartitionManager( metrics: metrics, } - var logConsumer LogConsumer - var cleanup func() if consumerFactory != nil { - logConsumer, cleanup = consumerFactory(pm) + pm.consumer = consumerFactory(pm) } - pm.consumerCleanup = cleanup pm.transactionManagerFactory = func( logger log.Logger, partitionID storage.PartitionID, @@ -312,7 +311,7 @@ func NewPartitionManager( metrics.housekeeping, metrics.snapshot.Scope(storageMgr.name), ), - logConsumer, + pm.consumer, ) } @@ -449,6 +448,11 @@ func (pm *PartitionManager) CallLogManager(ctx context.Context, storageName stri return nil } +// GetLogConsumer returns the registered log consumer if any. +func (pm *PartitionManager) GetLogConsumer() LogConsumer { + return pm.consumer +} + // StorageKV executes the provided function against the storage's metadata DB. All write operations // issued by the function are committed in an atomic fashion. All read operations are performed // against a snapshot of the database. @@ -603,8 +607,8 @@ func (pm *PartitionManager) Close() { }() } - if pm.consumerCleanup != nil { - pm.consumerCleanup() + if pm.consumer != nil { + pm.consumer.Close() } activeStorages.Wait() diff --git a/internal/gitaly/storage/storagemgr/testhelper_test.go b/internal/gitaly/storage/storagemgr/testhelper_test.go index 9171c431df5c047b6d43425f267e93a6f1f4a7e5..39d021e4403a173da3f2af3e5bfa5e4fb4dede50 100644 --- a/internal/gitaly/storage/storagemgr/testhelper_test.go +++ b/internal/gitaly/storage/storagemgr/testhelper_test.go @@ -842,6 +842,16 @@ type ConsumerAcknowledge struct { LSN storage.LSN } +// ConsumerReadEntry asserts the entry returned to the consumer at a point of time. +type ConsumerReadEntry struct { + // LSN is the target LSN by the consumers. + LSN storage.LSN + // ExpectedEntry is the expected returned log entry. + ExpectedEntry *gitalypb.LogEntry + // ExpectedErr is the expected returned error string. + ExpectedErr string +} + // RemoveRepository removes the repository from the disk. It must be run with the TransactionManager // closed. type RemoveRepository struct{} @@ -897,6 +907,8 @@ func (lc *MockLogConsumer) NotifyNewTransactions(storageName string, partitionID lc.highWaterMark = highWaterMark } +func (lc *MockLogConsumer) Close() {} + // ConsumerState is used to track the log positions received by the consumer and the corresponding // acknowledgements from the consumer to the manager. We deliberately do not track the LowWaterMark // sent to consumers as this is non-deterministic. @@ -1354,6 +1366,15 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas transaction.WriteCommitGraphs(step.Config) case ConsumerAcknowledge: transactionManager.AcknowledgeTransaction(transactionManager.consumer, step.LSN) + case ConsumerReadEntry: + entry, err := transactionManager.ReadEntry(step.LSN) + if step.ExpectedErr == "" { + require.NoError(t, err) + testhelper.ProtoEqual(t, step.ExpectedEntry, entry) + } else { + require.Error(t, err) + require.Contains(t, err.Error(), step.ExpectedErr) + } case RepositoryAssertion: require.Contains(t, openTransactions, step.TransactionID, "test error: transaction's snapshot asserted before beginning it") transaction := openTransactions[step.TransactionID] diff --git a/internal/gitaly/storage/storagemgr/transaction_manager.go b/internal/gitaly/storage/storagemgr/transaction_manager.go index bf6e716f0604d66ada9309c4da8f8dd361fe67dc..97d48e9eb53a7af86824e1f20308aaa8530d9ce9 100644 --- a/internal/gitaly/storage/storagemgr/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/transaction_manager.go @@ -875,6 +875,8 @@ type LogConsumer interface { // LSNs are sent so that a newly initialized consumer is aware of the full range of // entries it can process. NotifyNewTransactions(storageName string, partitionID storage.PartitionID, lowWaterMark, highWaterMark storage.LSN) + // Close shuts down the log consumer. It is called when the partition manager shutdowns. + Close() } // LogManagerAccessor is the interface used by the LogManager coordinator. It is called by @@ -889,7 +891,7 @@ type LogManagerAccessor interface { // LogConsumerFactory returns a LogConsumer that requires a LogManagerAccessor for construction and // a function to close the LogConsumer. -type LogConsumerFactory func(LogManagerAccessor) (_ LogConsumer, cleanup func()) +type LogConsumerFactory func(LogManagerAccessor) LogConsumer // LogManager is the interface used on the consumer side of the integration. The consumer // has the ability to acknowledge transactions as having been processed with AcknowledgeTransaction. @@ -899,6 +901,8 @@ type LogManager interface { AcknowledgeTransaction(consumer LogConsumer, lsn storage.LSN) // GetTransactionPath returns the path of the log entry's root directory. GetTransactionPath(lsn storage.LSN) string + // ReadEntry returns the log entry object of the LSN. + ReadEntry(lsn storage.LSN) (*gitalypb.LogEntry, error) } // AcknowledgeTransaction acknowledges log entries up and including lsn as successfully processed @@ -919,6 +923,18 @@ func (mgr *TransactionManager) GetTransactionPath(lsn storage.LSN) string { return walFilesPathForLSN(mgr.stateDirectory, lsn) } +// ReadEntry returns the log entry object of the LSN. +func (mgr *TransactionManager) ReadEntry(lsn storage.LSN) (*gitalypb.LogEntry, error) { + if lsn < mgr.lowWaterMark() { + return nil, fmt.Errorf("requested log entry is gone") + } + entry, err := mgr.readLogEntry(lsn) + if err != nil { + return nil, fmt.Errorf("reading log entry: %w", err) + } + return entry, nil +} + // consumerPosition tracks the last LSN acknowledged for a consumer. type consumerPosition struct { // position is the last LSN acknowledged as completed by the consumer. @@ -2177,6 +2193,10 @@ func (mgr *TransactionManager) processTransaction() (returnedErr error) { } } + if transaction.repositoryCreation != nil { + logEntry.RepositoryCreation = &gitalypb.LogEntry_RepositoryCreation{} + } + if transaction.deleteRepository { logEntry.RepositoryDeletion = &gitalypb.LogEntry_RepositoryDeletion{} @@ -2779,7 +2799,7 @@ func (mgr *TransactionManager) verifyReferences(ctx context.Context, transaction // to transaction operations. // // To ensure that we don't modify existing tables and autocompact, we lock the existing tables -// before applying the updates. This way the reftable backend willl only create new tables +// before applying the updates. This way the reftable backend will only create new tables func (mgr *TransactionManager) verifyReferencesWithGitForReftables( ctx context.Context, referenceTransactions []*gitalypb.LogEntry_ReferenceTransaction, diff --git a/internal/gitaly/storage/storagemgr/transaction_manager_consumer_test.go b/internal/gitaly/storage/storagemgr/transaction_manager_consumer_test.go index b9872f7b2e16d623b04b4a2b133fffbd0199fbd3..02702bb81717a8018f6740a57f38b90c9e994379 100644 --- a/internal/gitaly/storage/storagemgr/transaction_manager_consumer_test.go +++ b/internal/gitaly/storage/storagemgr/transaction_manager_consumer_test.go @@ -2,6 +2,7 @@ package storagemgr import ( "context" + "path/filepath" "testing" "gitlab.com/gitlab-org/gitaly/v16/internal/git" @@ -9,6 +10,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) func generateConsumerTests(t *testing.T, setup testTransactionSetup) []transactionTestCase { @@ -80,9 +82,35 @@ func generateConsumerTests(t *testing.T, setup testTransactionSetup) []transacti "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, }, }, + ConsumerReadEntry{ + LSN: 1, + ExpectedEntry: &gitalypb.LogEntry{ + RelativePath: setup.RelativePath, + Operations: []*gitalypb.LogEntry_Operation{ + {Operation: &gitalypb.LogEntry_Operation_CreateHardLink_{ + CreateHardLink: &gitalypb.LogEntry_Operation_CreateHardLink{ + SourcePath: []byte("1"), + DestinationPath: []byte(filepath.Join(setup.RelativePath, "refs/heads/main")), + }, + }}, + }, + ReferenceTransactions: []*gitalypb.LogEntry_ReferenceTransaction{ + {Changes: []*gitalypb.LogEntry_ReferenceTransaction_Change{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(setup.Commits.First.OID), + }, + }}, + }, + }, + }, ConsumerAcknowledge{ LSN: 1, }, + ConsumerReadEntry{ + LSN: 1, + ExpectedErr: "requested log entry is gone", + }, }, expectedState: StateAssertion{ Database: DatabaseState{ @@ -137,6 +165,14 @@ func generateConsumerTests(t *testing.T, setup testTransactionSetup) []transacti ConsumerAcknowledge{ LSN: 2, }, + ConsumerReadEntry{ + LSN: 1, + ExpectedErr: "requested log entry is gone", + }, + ConsumerReadEntry{ + LSN: 2, + ExpectedErr: "requested log entry is gone", + }, }, expectedState: StateAssertion{ Database: DatabaseState{ diff --git a/internal/gitaly/storage/storagemgr/transaction_manager_hook_test.go b/internal/gitaly/storage/storagemgr/transaction_manager_hook_test.go index 48b04bb09876a22f792c6ed89a5dd74436e04d06..21abe41858bf8a90780ce9fc8d4d932a839ad79b 100644 --- a/internal/gitaly/storage/storagemgr/transaction_manager_hook_test.go +++ b/internal/gitaly/storage/storagemgr/transaction_manager_hook_test.go @@ -18,6 +18,8 @@ type hookFunc func(hookContext) type hookContext struct { // closeManager calls the calls Close on the TransactionManager. closeManager func() + // accessManager triggers a callback against the TransactionManager. + accessManager func(func(*TransactionManager)) } // installHooks takes the hooks in the test setup and configures them in the TransactionManager. @@ -41,6 +43,9 @@ func installHooks(mgr *TransactionManager, inflightTransactions *sync.WaitGroup, *destination = func() { runHook(hookContext{ closeManager: mgr.Close, + accessManager: func(callback func(*TransactionManager)) { + callback(mgr) + }, }) } } diff --git a/internal/gitaly/storage/storagemgr/transaction_manager_repo_test.go b/internal/gitaly/storage/storagemgr/transaction_manager_repo_test.go index 5df78150548f9e88f9bb4869eeaf0b05a474b94f..86135cddfeeac23983d05a45d893443b0e6cc522 100644 --- a/internal/gitaly/storage/storagemgr/transaction_manager_repo_test.go +++ b/internal/gitaly/storage/storagemgr/transaction_manager_repo_test.go @@ -3,20 +3,33 @@ package storagemgr import ( "testing" + "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/snapshot" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) func generateCreateRepositoryTests(t *testing.T, setup testTransactionSetup) []transactionTestCase { + assertLogEntryHasRepositoryCreation := func(hookCtx hookContext) { + hookCtx.accessManager(func(mgr *TransactionManager) { + entry, err := mgr.readLogEntry(mgr.appliedLSN + 1) + require.NoError(t, err) + testhelper.ProtoEqual(t, &gitalypb.LogEntry_RepositoryCreation{}, entry.RepositoryCreation) + }) + } return []transactionTestCase{ { desc: "create repository when it doesn't exist", steps: steps{ RemoveRepository{}, - StartManager{}, + StartManager{ + Hooks: testTransactionHooks{ + BeforeApplyLogEntry: assertLogEntryHasRepositoryCreation, + }, + }, Begin{ RelativePaths: []string{setup.RelativePath}, }, @@ -78,7 +91,11 @@ func generateCreateRepositoryTests(t *testing.T, setup testTransactionSetup) []t desc: "create repository with full state", steps: steps{ RemoveRepository{}, - StartManager{}, + StartManager{ + Hooks: testTransactionHooks{ + BeforeApplyLogEntry: assertLogEntryHasRepositoryCreation, + }, + }, Begin{ RelativePaths: []string{setup.RelativePath}, }, @@ -263,7 +280,8 @@ func generateCreateRepositoryTests(t *testing.T, setup testTransactionSetup) []t RemoveRepository{}, StartManager{ Hooks: testTransactionHooks{ - BeforeApplyLogEntry: func(hookContext) { + BeforeApplyLogEntry: func(hc hookContext) { + assertLogEntryHasRepositoryCreation(hc) panic(errSimulatedCrash) }, }, @@ -333,7 +351,11 @@ func generateCreateRepositoryTests(t *testing.T, setup testTransactionSetup) []t AssertManager{ ExpectedError: errSimulatedCrash, }, - StartManager{}, + StartManager{ + Hooks: testTransactionHooks{ + BeforeApplyLogEntry: assertLogEntryHasRepositoryCreation, + }, + }, }, expectedState: StateAssertion{ Database: DatabaseState{ diff --git a/proto/cluster.proto b/proto/cluster.proto index c7ad3889602fdd19548aef08776612bfeb84c1f8..6315b54498f9b73e9e1a6f37a47cf11099c57103 100644 --- a/proto/cluster.proto +++ b/proto/cluster.proto @@ -17,6 +17,20 @@ message Cluster { map storages = 3; } +// Partition represents a partition in a Raft cluster. +message Partition { + // authority_name is the name of storage that creates this partition. + string authority_name = 1; + // authority_id is the unique ID of storage that creates this partition. The ID was allocated by + // the metadata Raft group. + uint64 authority_id = 2; + // partition_id is the partition ID of a partition *within the authority storage*. All + // repositories sharing a same origin have the same partition ID. + uint64 partition_id = 3; + // relative_path is the unique relative path of that partition. + string relative_path = 4; +} + // Storage represents a storage unit within a cluster. message Storage { // storage_id is the unique identifier for the storage. @@ -99,4 +113,30 @@ message UpdateStorageRequest { message UpdateStorageResponse { // storage contains the details of the newly updated storage. Storage storage = 1; +} + +// RegisterPartitionRequest is the request message to add a partition (repository + forks) to a +// storage for tracking. The destinating storage is not necessarily the same as the authority +// storage. For example, a partition is tracked by another storage on different node. +message RegisterPartitionRequest { + // partition is the registerting one. + Partition partition = 1; +} + +// RegisterPartitionResponse is the response message of RegisterPartitionRequest. +message RegisterPartitionResponse { + // partition contains info of registered partition. If the registering partition already exists, + // it contains the partition info inside the statemachine. + Partition partition = 1; +} + +// GetRegisteredPartitionsRequest is the request message to query for all registered partitions +// that a storage is tracking. +message GetRegisteredPartitionsRequest { +} + +// GetRegisteredPartitionsResponse is the response message of GetRegisteredPartitionsRequest. +message GetRegisteredPartitionsResponse { + // partitions is the list of registered partitions. + repeated Partition partitions = 1; } \ No newline at end of file diff --git a/proto/go/gitalypb/cluster.pb.go b/proto/go/gitalypb/cluster.pb.go index 399e251a9dcc47d8ae5fb906cb38b735e23a240d..5e8ab974081e0abcc93139e0f934c94f0f815700 100644 --- a/proto/go/gitalypb/cluster.pb.go +++ b/proto/go/gitalypb/cluster.pb.go @@ -90,6 +90,84 @@ func (x *Cluster) GetStorages() map[uint64]*Storage { return nil } +// Partition represents a partition in a Raft cluster. +type Partition struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // authority_name is the name of storage that creates this partition. + AuthorityName string `protobuf:"bytes,1,opt,name=authority_name,json=authorityName,proto3" json:"authority_name,omitempty"` + // authority_id is the unique ID of storage that creates this partition. The ID was allocated by + // the metadata Raft group. + AuthorityId uint64 `protobuf:"varint,2,opt,name=authority_id,json=authorityId,proto3" json:"authority_id,omitempty"` + // partition_id is the partition ID of a partition *within the authority storage*. All + // repositories sharing a same origin have the same partition ID. + PartitionId uint64 `protobuf:"varint,3,opt,name=partition_id,json=partitionId,proto3" json:"partition_id,omitempty"` + // relative_path is the unique relative path of that partition. + RelativePath string `protobuf:"bytes,4,opt,name=relative_path,json=relativePath,proto3" json:"relative_path,omitempty"` +} + +func (x *Partition) Reset() { + *x = Partition{} + if protoimpl.UnsafeEnabled { + mi := &file_cluster_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Partition) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Partition) ProtoMessage() {} + +func (x *Partition) ProtoReflect() protoreflect.Message { + mi := &file_cluster_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Partition.ProtoReflect.Descriptor instead. +func (*Partition) Descriptor() ([]byte, []int) { + return file_cluster_proto_rawDescGZIP(), []int{1} +} + +func (x *Partition) GetAuthorityName() string { + if x != nil { + return x.AuthorityName + } + return "" +} + +func (x *Partition) GetAuthorityId() uint64 { + if x != nil { + return x.AuthorityId + } + return 0 +} + +func (x *Partition) GetPartitionId() uint64 { + if x != nil { + return x.PartitionId + } + return 0 +} + +func (x *Partition) GetRelativePath() string { + if x != nil { + return x.RelativePath + } + return "" +} + // Storage represents a storage unit within a cluster. type Storage struct { state protoimpl.MessageState @@ -111,7 +189,7 @@ type Storage struct { func (x *Storage) Reset() { *x = Storage{} if protoimpl.UnsafeEnabled { - mi := &file_cluster_proto_msgTypes[1] + mi := &file_cluster_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -124,7 +202,7 @@ func (x *Storage) String() string { func (*Storage) ProtoMessage() {} func (x *Storage) ProtoReflect() protoreflect.Message { - mi := &file_cluster_proto_msgTypes[1] + mi := &file_cluster_proto_msgTypes[2] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -137,7 +215,7 @@ func (x *Storage) ProtoReflect() protoreflect.Message { // Deprecated: Use Storage.ProtoReflect.Descriptor instead. func (*Storage) Descriptor() ([]byte, []int) { - return file_cluster_proto_rawDescGZIP(), []int{1} + return file_cluster_proto_rawDescGZIP(), []int{2} } func (x *Storage) GetStorageId() uint64 { @@ -196,7 +274,7 @@ type LeaderState struct { func (x *LeaderState) Reset() { *x = LeaderState{} if protoimpl.UnsafeEnabled { - mi := &file_cluster_proto_msgTypes[2] + mi := &file_cluster_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -209,7 +287,7 @@ func (x *LeaderState) String() string { func (*LeaderState) ProtoMessage() {} func (x *LeaderState) ProtoReflect() protoreflect.Message { - mi := &file_cluster_proto_msgTypes[2] + mi := &file_cluster_proto_msgTypes[3] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -222,7 +300,7 @@ func (x *LeaderState) ProtoReflect() protoreflect.Message { // Deprecated: Use LeaderState.ProtoReflect.Descriptor instead. func (*LeaderState) Descriptor() ([]byte, []int) { - return file_cluster_proto_rawDescGZIP(), []int{2} + return file_cluster_proto_rawDescGZIP(), []int{3} } func (x *LeaderState) GetGroupId() uint64 { @@ -266,7 +344,7 @@ type BootstrapClusterRequest struct { func (x *BootstrapClusterRequest) Reset() { *x = BootstrapClusterRequest{} if protoimpl.UnsafeEnabled { - mi := &file_cluster_proto_msgTypes[3] + mi := &file_cluster_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -279,7 +357,7 @@ func (x *BootstrapClusterRequest) String() string { func (*BootstrapClusterRequest) ProtoMessage() {} func (x *BootstrapClusterRequest) ProtoReflect() protoreflect.Message { - mi := &file_cluster_proto_msgTypes[3] + mi := &file_cluster_proto_msgTypes[4] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -292,7 +370,7 @@ func (x *BootstrapClusterRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use BootstrapClusterRequest.ProtoReflect.Descriptor instead. func (*BootstrapClusterRequest) Descriptor() ([]byte, []int) { - return file_cluster_proto_rawDescGZIP(), []int{3} + return file_cluster_proto_rawDescGZIP(), []int{4} } func (x *BootstrapClusterRequest) GetClusterId() string { @@ -315,7 +393,7 @@ type BootstrapClusterResponse struct { func (x *BootstrapClusterResponse) Reset() { *x = BootstrapClusterResponse{} if protoimpl.UnsafeEnabled { - mi := &file_cluster_proto_msgTypes[4] + mi := &file_cluster_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -328,7 +406,7 @@ func (x *BootstrapClusterResponse) String() string { func (*BootstrapClusterResponse) ProtoMessage() {} func (x *BootstrapClusterResponse) ProtoReflect() protoreflect.Message { - mi := &file_cluster_proto_msgTypes[4] + mi := &file_cluster_proto_msgTypes[5] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -341,7 +419,7 @@ func (x *BootstrapClusterResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use BootstrapClusterResponse.ProtoReflect.Descriptor instead. func (*BootstrapClusterResponse) Descriptor() ([]byte, []int) { - return file_cluster_proto_rawDescGZIP(), []int{4} + return file_cluster_proto_rawDescGZIP(), []int{5} } func (x *BootstrapClusterResponse) GetCluster() *Cluster { @@ -361,7 +439,7 @@ type GetClusterRequest struct { func (x *GetClusterRequest) Reset() { *x = GetClusterRequest{} if protoimpl.UnsafeEnabled { - mi := &file_cluster_proto_msgTypes[5] + mi := &file_cluster_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -374,7 +452,7 @@ func (x *GetClusterRequest) String() string { func (*GetClusterRequest) ProtoMessage() {} func (x *GetClusterRequest) ProtoReflect() protoreflect.Message { - mi := &file_cluster_proto_msgTypes[5] + mi := &file_cluster_proto_msgTypes[6] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -387,7 +465,7 @@ func (x *GetClusterRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use GetClusterRequest.ProtoReflect.Descriptor instead. func (*GetClusterRequest) Descriptor() ([]byte, []int) { - return file_cluster_proto_rawDescGZIP(), []int{5} + return file_cluster_proto_rawDescGZIP(), []int{6} } // GetClusterResponse is the response message for retrieving information about a cluster. @@ -403,7 +481,7 @@ type GetClusterResponse struct { func (x *GetClusterResponse) Reset() { *x = GetClusterResponse{} if protoimpl.UnsafeEnabled { - mi := &file_cluster_proto_msgTypes[6] + mi := &file_cluster_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -416,7 +494,7 @@ func (x *GetClusterResponse) String() string { func (*GetClusterResponse) ProtoMessage() {} func (x *GetClusterResponse) ProtoReflect() protoreflect.Message { - mi := &file_cluster_proto_msgTypes[6] + mi := &file_cluster_proto_msgTypes[7] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -429,7 +507,7 @@ func (x *GetClusterResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use GetClusterResponse.ProtoReflect.Descriptor instead. func (*GetClusterResponse) Descriptor() ([]byte, []int) { - return file_cluster_proto_rawDescGZIP(), []int{6} + return file_cluster_proto_rawDescGZIP(), []int{7} } func (x *GetClusterResponse) GetCluster() *Cluster { @@ -456,7 +534,7 @@ type RegisterStorageRequest struct { func (x *RegisterStorageRequest) Reset() { *x = RegisterStorageRequest{} if protoimpl.UnsafeEnabled { - mi := &file_cluster_proto_msgTypes[7] + mi := &file_cluster_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -469,7 +547,7 @@ func (x *RegisterStorageRequest) String() string { func (*RegisterStorageRequest) ProtoMessage() {} func (x *RegisterStorageRequest) ProtoReflect() protoreflect.Message { - mi := &file_cluster_proto_msgTypes[7] + mi := &file_cluster_proto_msgTypes[8] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -482,7 +560,7 @@ func (x *RegisterStorageRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use RegisterStorageRequest.ProtoReflect.Descriptor instead. func (*RegisterStorageRequest) Descriptor() ([]byte, []int) { - return file_cluster_proto_rawDescGZIP(), []int{7} + return file_cluster_proto_rawDescGZIP(), []int{8} } func (x *RegisterStorageRequest) GetStorageName() string { @@ -519,7 +597,7 @@ type RegisterStorageResponse struct { func (x *RegisterStorageResponse) Reset() { *x = RegisterStorageResponse{} if protoimpl.UnsafeEnabled { - mi := &file_cluster_proto_msgTypes[8] + mi := &file_cluster_proto_msgTypes[9] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -532,7 +610,7 @@ func (x *RegisterStorageResponse) String() string { func (*RegisterStorageResponse) ProtoMessage() {} func (x *RegisterStorageResponse) ProtoReflect() protoreflect.Message { - mi := &file_cluster_proto_msgTypes[8] + mi := &file_cluster_proto_msgTypes[9] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -545,7 +623,7 @@ func (x *RegisterStorageResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use RegisterStorageResponse.ProtoReflect.Descriptor instead. func (*RegisterStorageResponse) Descriptor() ([]byte, []int) { - return file_cluster_proto_rawDescGZIP(), []int{8} + return file_cluster_proto_rawDescGZIP(), []int{9} } func (x *RegisterStorageResponse) GetStorage() *Storage { @@ -574,7 +652,7 @@ type UpdateStorageRequest struct { func (x *UpdateStorageRequest) Reset() { *x = UpdateStorageRequest{} if protoimpl.UnsafeEnabled { - mi := &file_cluster_proto_msgTypes[9] + mi := &file_cluster_proto_msgTypes[10] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -587,7 +665,7 @@ func (x *UpdateStorageRequest) String() string { func (*UpdateStorageRequest) ProtoMessage() {} func (x *UpdateStorageRequest) ProtoReflect() protoreflect.Message { - mi := &file_cluster_proto_msgTypes[9] + mi := &file_cluster_proto_msgTypes[10] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -600,7 +678,7 @@ func (x *UpdateStorageRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use UpdateStorageRequest.ProtoReflect.Descriptor instead. func (*UpdateStorageRequest) Descriptor() ([]byte, []int) { - return file_cluster_proto_rawDescGZIP(), []int{9} + return file_cluster_proto_rawDescGZIP(), []int{10} } func (x *UpdateStorageRequest) GetStorageId() uint64 { @@ -637,7 +715,7 @@ type UpdateStorageResponse struct { func (x *UpdateStorageResponse) Reset() { *x = UpdateStorageResponse{} if protoimpl.UnsafeEnabled { - mi := &file_cluster_proto_msgTypes[10] + mi := &file_cluster_proto_msgTypes[11] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -650,7 +728,7 @@ func (x *UpdateStorageResponse) String() string { func (*UpdateStorageResponse) ProtoMessage() {} func (x *UpdateStorageResponse) ProtoReflect() protoreflect.Message { - mi := &file_cluster_proto_msgTypes[10] + mi := &file_cluster_proto_msgTypes[11] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -663,7 +741,7 @@ func (x *UpdateStorageResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use UpdateStorageResponse.ProtoReflect.Descriptor instead. func (*UpdateStorageResponse) Descriptor() ([]byte, []int) { - return file_cluster_proto_rawDescGZIP(), []int{10} + return file_cluster_proto_rawDescGZIP(), []int{11} } func (x *UpdateStorageResponse) GetStorage() *Storage { @@ -673,6 +751,196 @@ func (x *UpdateStorageResponse) GetStorage() *Storage { return nil } +// RegisterPartitionRequest is the request message to add a partition (repository + forks) to a +// storage for tracking. The destinating storage is not necessarily the same as the authority +// storage. For example, a partition is tracked by another storage on different node. +type RegisterPartitionRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // partition is the registerting one. + Partition *Partition `protobuf:"bytes,1,opt,name=partition,proto3" json:"partition,omitempty"` +} + +func (x *RegisterPartitionRequest) Reset() { + *x = RegisterPartitionRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_cluster_proto_msgTypes[12] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *RegisterPartitionRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RegisterPartitionRequest) ProtoMessage() {} + +func (x *RegisterPartitionRequest) ProtoReflect() protoreflect.Message { + mi := &file_cluster_proto_msgTypes[12] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RegisterPartitionRequest.ProtoReflect.Descriptor instead. +func (*RegisterPartitionRequest) Descriptor() ([]byte, []int) { + return file_cluster_proto_rawDescGZIP(), []int{12} +} + +func (x *RegisterPartitionRequest) GetPartition() *Partition { + if x != nil { + return x.Partition + } + return nil +} + +// RegisterPartitionResponse is the response message of RegisterPartitionRequest. +type RegisterPartitionResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // partition contains info of registered partition. If the registering partition already exists, + // it contains the partition info inside the statemachine. + Partition *Partition `protobuf:"bytes,1,opt,name=partition,proto3" json:"partition,omitempty"` +} + +func (x *RegisterPartitionResponse) Reset() { + *x = RegisterPartitionResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_cluster_proto_msgTypes[13] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *RegisterPartitionResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RegisterPartitionResponse) ProtoMessage() {} + +func (x *RegisterPartitionResponse) ProtoReflect() protoreflect.Message { + mi := &file_cluster_proto_msgTypes[13] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RegisterPartitionResponse.ProtoReflect.Descriptor instead. +func (*RegisterPartitionResponse) Descriptor() ([]byte, []int) { + return file_cluster_proto_rawDescGZIP(), []int{13} +} + +func (x *RegisterPartitionResponse) GetPartition() *Partition { + if x != nil { + return x.Partition + } + return nil +} + +// GetRegisteredPartitionsRequest is the request message to query for all registered partitions +// that a storage is tracking. +type GetRegisteredPartitionsRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *GetRegisteredPartitionsRequest) Reset() { + *x = GetRegisteredPartitionsRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_cluster_proto_msgTypes[14] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetRegisteredPartitionsRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetRegisteredPartitionsRequest) ProtoMessage() {} + +func (x *GetRegisteredPartitionsRequest) ProtoReflect() protoreflect.Message { + mi := &file_cluster_proto_msgTypes[14] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetRegisteredPartitionsRequest.ProtoReflect.Descriptor instead. +func (*GetRegisteredPartitionsRequest) Descriptor() ([]byte, []int) { + return file_cluster_proto_rawDescGZIP(), []int{14} +} + +// GetRegisteredPartitionsResponse is the response message of GetRegisteredPartitionsRequest. +type GetRegisteredPartitionsResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // partitions is the list of registered partitions. + Partitions []*Partition `protobuf:"bytes,1,rep,name=partitions,proto3" json:"partitions,omitempty"` +} + +func (x *GetRegisteredPartitionsResponse) Reset() { + *x = GetRegisteredPartitionsResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_cluster_proto_msgTypes[15] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetRegisteredPartitionsResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetRegisteredPartitionsResponse) ProtoMessage() {} + +func (x *GetRegisteredPartitionsResponse) ProtoReflect() protoreflect.Message { + mi := &file_cluster_proto_msgTypes[15] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetRegisteredPartitionsResponse.ProtoReflect.Descriptor instead. +func (*GetRegisteredPartitionsResponse) Descriptor() ([]byte, []int) { + return file_cluster_proto_rawDescGZIP(), []int{15} +} + +func (x *GetRegisteredPartitionsResponse) GetPartitions() []*Partition { + if x != nil { + return x.Partitions + } + return nil +} + var File_cluster_proto protoreflect.FileDescriptor var file_cluster_proto_rawDesc = []byte{ @@ -691,7 +959,17 @@ var file_cluster_proto_rawDesc = []byte{ 0x01, 0x28, 0x04, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x25, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, - 0x02, 0x38, 0x01, 0x22, 0xab, 0x01, 0x0a, 0x07, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x12, + 0x02, 0x38, 0x01, 0x22, 0x9d, 0x01, 0x0a, 0x09, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, + 0x6e, 0x12, 0x25, 0x0a, 0x0e, 0x61, 0x75, 0x74, 0x68, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x5f, 0x6e, + 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x61, 0x75, 0x74, 0x68, 0x6f, + 0x72, 0x69, 0x74, 0x79, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x61, 0x75, 0x74, 0x68, + 0x6f, 0x72, 0x69, 0x74, 0x79, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0b, + 0x61, 0x75, 0x74, 0x68, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x49, 0x64, 0x12, 0x21, 0x0a, 0x0c, 0x70, + 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, + 0x04, 0x52, 0x0b, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x23, + 0x0a, 0x0d, 0x72, 0x65, 0x6c, 0x61, 0x74, 0x69, 0x76, 0x65, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x18, + 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, 0x6c, 0x61, 0x74, 0x69, 0x76, 0x65, 0x50, + 0x61, 0x74, 0x68, 0x22, 0xab, 0x01, 0x0a, 0x07, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x09, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x49, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, @@ -747,11 +1025,28 @@ var file_cluster_proto_rawDesc = []byte{ 0x6f, 0x72, 0x61, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x29, 0x0a, 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x52, - 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, 0x74, 0x6c, - 0x61, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2d, 0x6f, 0x72, - 0x67, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2f, 0x76, 0x31, 0x36, 0x2f, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x70, 0x62, 0x62, 0x06, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x22, 0x4b, 0x0a, 0x18, 0x52, 0x65, 0x67, 0x69, + 0x73, 0x74, 0x65, 0x72, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x12, 0x2f, 0x0a, 0x09, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, + 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, + 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x09, 0x70, 0x61, 0x72, 0x74, + 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x4c, 0x0a, 0x19, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, + 0x72, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x12, 0x2f, 0x0a, 0x09, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x50, + 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x09, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, + 0x69, 0x6f, 0x6e, 0x22, 0x20, 0x0a, 0x1e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, + 0x65, 0x72, 0x65, 0x64, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x54, 0x0a, 0x1f, 0x47, 0x65, 0x74, 0x52, 0x65, 0x67, 0x69, + 0x73, 0x74, 0x65, 0x72, 0x65, 0x64, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x31, 0x0a, 0x0a, 0x70, 0x61, 0x72, 0x74, + 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x67, + 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, + 0x0a, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x42, 0x34, 0x5a, 0x32, 0x67, + 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, + 0x2d, 0x6f, 0x72, 0x67, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2f, 0x76, 0x31, 0x36, 0x2f, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x70, + 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -766,33 +1061,41 @@ func file_cluster_proto_rawDescGZIP() []byte { return file_cluster_proto_rawDescData } -var file_cluster_proto_msgTypes = make([]protoimpl.MessageInfo, 12) +var file_cluster_proto_msgTypes = make([]protoimpl.MessageInfo, 17) var file_cluster_proto_goTypes = []any{ - (*Cluster)(nil), // 0: gitaly.Cluster - (*Storage)(nil), // 1: gitaly.Storage - (*LeaderState)(nil), // 2: gitaly.LeaderState - (*BootstrapClusterRequest)(nil), // 3: gitaly.BootstrapClusterRequest - (*BootstrapClusterResponse)(nil), // 4: gitaly.BootstrapClusterResponse - (*GetClusterRequest)(nil), // 5: gitaly.GetClusterRequest - (*GetClusterResponse)(nil), // 6: gitaly.GetClusterResponse - (*RegisterStorageRequest)(nil), // 7: gitaly.RegisterStorageRequest - (*RegisterStorageResponse)(nil), // 8: gitaly.RegisterStorageResponse - (*UpdateStorageRequest)(nil), // 9: gitaly.UpdateStorageRequest - (*UpdateStorageResponse)(nil), // 10: gitaly.UpdateStorageResponse - nil, // 11: gitaly.Cluster.StoragesEntry + (*Cluster)(nil), // 0: gitaly.Cluster + (*Partition)(nil), // 1: gitaly.Partition + (*Storage)(nil), // 2: gitaly.Storage + (*LeaderState)(nil), // 3: gitaly.LeaderState + (*BootstrapClusterRequest)(nil), // 4: gitaly.BootstrapClusterRequest + (*BootstrapClusterResponse)(nil), // 5: gitaly.BootstrapClusterResponse + (*GetClusterRequest)(nil), // 6: gitaly.GetClusterRequest + (*GetClusterResponse)(nil), // 7: gitaly.GetClusterResponse + (*RegisterStorageRequest)(nil), // 8: gitaly.RegisterStorageRequest + (*RegisterStorageResponse)(nil), // 9: gitaly.RegisterStorageResponse + (*UpdateStorageRequest)(nil), // 10: gitaly.UpdateStorageRequest + (*UpdateStorageResponse)(nil), // 11: gitaly.UpdateStorageResponse + (*RegisterPartitionRequest)(nil), // 12: gitaly.RegisterPartitionRequest + (*RegisterPartitionResponse)(nil), // 13: gitaly.RegisterPartitionResponse + (*GetRegisteredPartitionsRequest)(nil), // 14: gitaly.GetRegisteredPartitionsRequest + (*GetRegisteredPartitionsResponse)(nil), // 15: gitaly.GetRegisteredPartitionsResponse + nil, // 16: gitaly.Cluster.StoragesEntry } var file_cluster_proto_depIdxs = []int32{ - 11, // 0: gitaly.Cluster.storages:type_name -> gitaly.Cluster.StoragesEntry + 16, // 0: gitaly.Cluster.storages:type_name -> gitaly.Cluster.StoragesEntry 0, // 1: gitaly.BootstrapClusterResponse.cluster:type_name -> gitaly.Cluster 0, // 2: gitaly.GetClusterResponse.cluster:type_name -> gitaly.Cluster - 1, // 3: gitaly.RegisterStorageResponse.storage:type_name -> gitaly.Storage - 1, // 4: gitaly.UpdateStorageResponse.storage:type_name -> gitaly.Storage - 1, // 5: gitaly.Cluster.StoragesEntry.value:type_name -> gitaly.Storage - 6, // [6:6] is the sub-list for method output_type - 6, // [6:6] is the sub-list for method input_type - 6, // [6:6] is the sub-list for extension type_name - 6, // [6:6] is the sub-list for extension extendee - 0, // [0:6] is the sub-list for field type_name + 2, // 3: gitaly.RegisterStorageResponse.storage:type_name -> gitaly.Storage + 2, // 4: gitaly.UpdateStorageResponse.storage:type_name -> gitaly.Storage + 1, // 5: gitaly.RegisterPartitionRequest.partition:type_name -> gitaly.Partition + 1, // 6: gitaly.RegisterPartitionResponse.partition:type_name -> gitaly.Partition + 1, // 7: gitaly.GetRegisteredPartitionsResponse.partitions:type_name -> gitaly.Partition + 2, // 8: gitaly.Cluster.StoragesEntry.value:type_name -> gitaly.Storage + 9, // [9:9] is the sub-list for method output_type + 9, // [9:9] is the sub-list for method input_type + 9, // [9:9] is the sub-list for extension type_name + 9, // [9:9] is the sub-list for extension extendee + 0, // [0:9] is the sub-list for field type_name } func init() { file_cluster_proto_init() } @@ -814,7 +1117,7 @@ func file_cluster_proto_init() { } } file_cluster_proto_msgTypes[1].Exporter = func(v any, i int) any { - switch v := v.(*Storage); i { + switch v := v.(*Partition); i { case 0: return &v.state case 1: @@ -826,7 +1129,7 @@ func file_cluster_proto_init() { } } file_cluster_proto_msgTypes[2].Exporter = func(v any, i int) any { - switch v := v.(*LeaderState); i { + switch v := v.(*Storage); i { case 0: return &v.state case 1: @@ -838,7 +1141,7 @@ func file_cluster_proto_init() { } } file_cluster_proto_msgTypes[3].Exporter = func(v any, i int) any { - switch v := v.(*BootstrapClusterRequest); i { + switch v := v.(*LeaderState); i { case 0: return &v.state case 1: @@ -850,7 +1153,7 @@ func file_cluster_proto_init() { } } file_cluster_proto_msgTypes[4].Exporter = func(v any, i int) any { - switch v := v.(*BootstrapClusterResponse); i { + switch v := v.(*BootstrapClusterRequest); i { case 0: return &v.state case 1: @@ -862,7 +1165,7 @@ func file_cluster_proto_init() { } } file_cluster_proto_msgTypes[5].Exporter = func(v any, i int) any { - switch v := v.(*GetClusterRequest); i { + switch v := v.(*BootstrapClusterResponse); i { case 0: return &v.state case 1: @@ -874,7 +1177,7 @@ func file_cluster_proto_init() { } } file_cluster_proto_msgTypes[6].Exporter = func(v any, i int) any { - switch v := v.(*GetClusterResponse); i { + switch v := v.(*GetClusterRequest); i { case 0: return &v.state case 1: @@ -886,7 +1189,7 @@ func file_cluster_proto_init() { } } file_cluster_proto_msgTypes[7].Exporter = func(v any, i int) any { - switch v := v.(*RegisterStorageRequest); i { + switch v := v.(*GetClusterResponse); i { case 0: return &v.state case 1: @@ -898,7 +1201,7 @@ func file_cluster_proto_init() { } } file_cluster_proto_msgTypes[8].Exporter = func(v any, i int) any { - switch v := v.(*RegisterStorageResponse); i { + switch v := v.(*RegisterStorageRequest); i { case 0: return &v.state case 1: @@ -910,7 +1213,7 @@ func file_cluster_proto_init() { } } file_cluster_proto_msgTypes[9].Exporter = func(v any, i int) any { - switch v := v.(*UpdateStorageRequest); i { + switch v := v.(*RegisterStorageResponse); i { case 0: return &v.state case 1: @@ -922,6 +1225,18 @@ func file_cluster_proto_init() { } } file_cluster_proto_msgTypes[10].Exporter = func(v any, i int) any { + switch v := v.(*UpdateStorageRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_cluster_proto_msgTypes[11].Exporter = func(v any, i int) any { switch v := v.(*UpdateStorageResponse); i { case 0: return &v.state @@ -933,6 +1248,54 @@ func file_cluster_proto_init() { return nil } } + file_cluster_proto_msgTypes[12].Exporter = func(v any, i int) any { + switch v := v.(*RegisterPartitionRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_cluster_proto_msgTypes[13].Exporter = func(v any, i int) any { + switch v := v.(*RegisterPartitionResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_cluster_proto_msgTypes[14].Exporter = func(v any, i int) any { + switch v := v.(*GetRegisteredPartitionsRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_cluster_proto_msgTypes[15].Exporter = func(v any, i int) any { + switch v := v.(*GetRegisteredPartitionsResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -940,7 +1303,7 @@ func file_cluster_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_cluster_proto_rawDesc, NumEnums: 0, - NumMessages: 12, + NumMessages: 17, NumExtensions: 0, NumServices: 0, }, diff --git a/proto/go/gitalypb/log.pb.go b/proto/go/gitalypb/log.pb.go index b5cb6d241ea1e95b3419f503d53fe5aacc3306f2..dc016edb0c9867ea27dfcd695aa8d1d81dce3d16 100644 --- a/proto/go/gitalypb/log.pb.go +++ b/proto/go/gitalypb/log.pb.go @@ -39,6 +39,8 @@ type LogEntry struct { ReferenceTransactions []*LogEntry_ReferenceTransaction `protobuf:"bytes,2,rep,name=reference_transactions,json=referenceTransactions,proto3" json:"reference_transactions,omitempty"` // repository_deletion, when set, indicates this log entry deletes the repository. RepositoryDeletion *LogEntry_RepositoryDeletion `protobuf:"bytes,6,opt,name=repository_deletion,json=repositoryDeletion,proto3" json:"repository_deletion,omitempty"` + // repository_creation, when set, indicates this log entry creates the repository. + RepositoryCreation *LogEntry_RepositoryCreation `protobuf:"bytes,11,opt,name=repository_creation,json=repositoryCreation,proto3" json:"repository_creation,omitempty"` // housekeeping, when set, indicates this log entry contains a housekeeping task. Housekeeping *LogEntry_Housekeeping `protobuf:"bytes,9,opt,name=housekeeping,proto3" json:"housekeeping,omitempty"` // operations is an ordered list of operations to run in order to apply @@ -99,6 +101,13 @@ func (x *LogEntry) GetRepositoryDeletion() *LogEntry_RepositoryDeletion { return nil } +func (x *LogEntry) GetRepositoryCreation() *LogEntry_RepositoryCreation { + if x != nil { + return x.RepositoryCreation + } + return nil +} + func (x *LogEntry) GetHousekeeping() *LogEntry_Housekeeping { if x != nil { return x.Housekeeping @@ -254,6 +263,45 @@ func (*LogEntry_RepositoryDeletion) Descriptor() ([]byte, []int) { return file_log_proto_rawDescGZIP(), []int{0, 1} } +// RepositoryCreation models a repository creation. +type LogEntry_RepositoryCreation struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *LogEntry_RepositoryCreation) Reset() { + *x = LogEntry_RepositoryCreation{} + if protoimpl.UnsafeEnabled { + mi := &file_log_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *LogEntry_RepositoryCreation) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*LogEntry_RepositoryCreation) ProtoMessage() {} + +func (x *LogEntry_RepositoryCreation) ProtoReflect() protoreflect.Message { + mi := &file_log_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use LogEntry_RepositoryCreation.ProtoReflect.Descriptor instead. +func (*LogEntry_RepositoryCreation) Descriptor() ([]byte, []int) { + return file_log_proto_rawDescGZIP(), []int{0, 2} +} + // Housekeeping models a housekeeping run. It is supposed to handle housekeeping tasks for repositories such as the // cleanup of unneeded files and optimizations for the repository's data structures. It is a collection of smaller // tasks. @@ -273,7 +321,7 @@ type LogEntry_Housekeeping struct { func (x *LogEntry_Housekeeping) Reset() { *x = LogEntry_Housekeeping{} if protoimpl.UnsafeEnabled { - mi := &file_log_proto_msgTypes[4] + mi := &file_log_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -286,7 +334,7 @@ func (x *LogEntry_Housekeeping) String() string { func (*LogEntry_Housekeeping) ProtoMessage() {} func (x *LogEntry_Housekeeping) ProtoReflect() protoreflect.Message { - mi := &file_log_proto_msgTypes[4] + mi := &file_log_proto_msgTypes[5] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -299,7 +347,7 @@ func (x *LogEntry_Housekeeping) ProtoReflect() protoreflect.Message { // Deprecated: Use LogEntry_Housekeeping.ProtoReflect.Descriptor instead. func (*LogEntry_Housekeeping) Descriptor() ([]byte, []int) { - return file_log_proto_rawDescGZIP(), []int{0, 2} + return file_log_proto_rawDescGZIP(), []int{0, 3} } func (x *LogEntry_Housekeeping) GetPackRefs() *LogEntry_Housekeeping_PackRefs { @@ -342,7 +390,7 @@ type LogEntry_Operation struct { func (x *LogEntry_Operation) Reset() { *x = LogEntry_Operation{} if protoimpl.UnsafeEnabled { - mi := &file_log_proto_msgTypes[5] + mi := &file_log_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -355,7 +403,7 @@ func (x *LogEntry_Operation) String() string { func (*LogEntry_Operation) ProtoMessage() {} func (x *LogEntry_Operation) ProtoReflect() protoreflect.Message { - mi := &file_log_proto_msgTypes[5] + mi := &file_log_proto_msgTypes[6] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -368,7 +416,7 @@ func (x *LogEntry_Operation) ProtoReflect() protoreflect.Message { // Deprecated: Use LogEntry_Operation.ProtoReflect.Descriptor instead. func (*LogEntry_Operation) Descriptor() ([]byte, []int) { - return file_log_proto_rawDescGZIP(), []int{0, 3} + return file_log_proto_rawDescGZIP(), []int{0, 4} } func (m *LogEntry_Operation) GetOperation() isLogEntry_Operation_Operation { @@ -474,7 +522,7 @@ type LogEntry_ReferenceTransaction_Change struct { func (x *LogEntry_ReferenceTransaction_Change) Reset() { *x = LogEntry_ReferenceTransaction_Change{} if protoimpl.UnsafeEnabled { - mi := &file_log_proto_msgTypes[6] + mi := &file_log_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -487,7 +535,7 @@ func (x *LogEntry_ReferenceTransaction_Change) String() string { func (*LogEntry_ReferenceTransaction_Change) ProtoMessage() {} func (x *LogEntry_ReferenceTransaction_Change) ProtoReflect() protoreflect.Message { - mi := &file_log_proto_msgTypes[6] + mi := &file_log_proto_msgTypes[7] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -539,7 +587,7 @@ type LogEntry_Housekeeping_PackRefs struct { func (x *LogEntry_Housekeeping_PackRefs) Reset() { *x = LogEntry_Housekeeping_PackRefs{} if protoimpl.UnsafeEnabled { - mi := &file_log_proto_msgTypes[7] + mi := &file_log_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -552,7 +600,7 @@ func (x *LogEntry_Housekeeping_PackRefs) String() string { func (*LogEntry_Housekeeping_PackRefs) ProtoMessage() {} func (x *LogEntry_Housekeeping_PackRefs) ProtoReflect() protoreflect.Message { - mi := &file_log_proto_msgTypes[7] + mi := &file_log_proto_msgTypes[8] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -565,7 +613,7 @@ func (x *LogEntry_Housekeeping_PackRefs) ProtoReflect() protoreflect.Message { // Deprecated: Use LogEntry_Housekeeping_PackRefs.ProtoReflect.Descriptor instead. func (*LogEntry_Housekeeping_PackRefs) Descriptor() ([]byte, []int) { - return file_log_proto_rawDescGZIP(), []int{0, 2, 0} + return file_log_proto_rawDescGZIP(), []int{0, 3, 0} } func (x *LogEntry_Housekeeping_PackRefs) GetPrunedRefs() [][]byte { @@ -595,7 +643,7 @@ type LogEntry_Housekeeping_Repack struct { func (x *LogEntry_Housekeeping_Repack) Reset() { *x = LogEntry_Housekeeping_Repack{} if protoimpl.UnsafeEnabled { - mi := &file_log_proto_msgTypes[8] + mi := &file_log_proto_msgTypes[9] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -608,7 +656,7 @@ func (x *LogEntry_Housekeeping_Repack) String() string { func (*LogEntry_Housekeeping_Repack) ProtoMessage() {} func (x *LogEntry_Housekeeping_Repack) ProtoReflect() protoreflect.Message { - mi := &file_log_proto_msgTypes[8] + mi := &file_log_proto_msgTypes[9] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -621,7 +669,7 @@ func (x *LogEntry_Housekeeping_Repack) ProtoReflect() protoreflect.Message { // Deprecated: Use LogEntry_Housekeeping_Repack.ProtoReflect.Descriptor instead. func (*LogEntry_Housekeeping_Repack) Descriptor() ([]byte, []int) { - return file_log_proto_rawDescGZIP(), []int{0, 2, 1} + return file_log_proto_rawDescGZIP(), []int{0, 3, 1} } func (x *LogEntry_Housekeeping_Repack) GetNewFiles() []string { @@ -656,7 +704,7 @@ type LogEntry_Housekeeping_WriteCommitGraphs struct { func (x *LogEntry_Housekeeping_WriteCommitGraphs) Reset() { *x = LogEntry_Housekeeping_WriteCommitGraphs{} if protoimpl.UnsafeEnabled { - mi := &file_log_proto_msgTypes[9] + mi := &file_log_proto_msgTypes[10] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -669,7 +717,7 @@ func (x *LogEntry_Housekeeping_WriteCommitGraphs) String() string { func (*LogEntry_Housekeeping_WriteCommitGraphs) ProtoMessage() {} func (x *LogEntry_Housekeeping_WriteCommitGraphs) ProtoReflect() protoreflect.Message { - mi := &file_log_proto_msgTypes[9] + mi := &file_log_proto_msgTypes[10] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -682,7 +730,7 @@ func (x *LogEntry_Housekeeping_WriteCommitGraphs) ProtoReflect() protoreflect.Me // Deprecated: Use LogEntry_Housekeeping_WriteCommitGraphs.ProtoReflect.Descriptor instead. func (*LogEntry_Housekeeping_WriteCommitGraphs) Descriptor() ([]byte, []int) { - return file_log_proto_rawDescGZIP(), []int{0, 2, 2} + return file_log_proto_rawDescGZIP(), []int{0, 3, 2} } // CreateHardLink creates a hard link. The existing inode metadata, including @@ -705,7 +753,7 @@ type LogEntry_Operation_CreateHardLink struct { func (x *LogEntry_Operation_CreateHardLink) Reset() { *x = LogEntry_Operation_CreateHardLink{} if protoimpl.UnsafeEnabled { - mi := &file_log_proto_msgTypes[10] + mi := &file_log_proto_msgTypes[11] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -718,7 +766,7 @@ func (x *LogEntry_Operation_CreateHardLink) String() string { func (*LogEntry_Operation_CreateHardLink) ProtoMessage() {} func (x *LogEntry_Operation_CreateHardLink) ProtoReflect() protoreflect.Message { - mi := &file_log_proto_msgTypes[10] + mi := &file_log_proto_msgTypes[11] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -731,7 +779,7 @@ func (x *LogEntry_Operation_CreateHardLink) ProtoReflect() protoreflect.Message // Deprecated: Use LogEntry_Operation_CreateHardLink.ProtoReflect.Descriptor instead. func (*LogEntry_Operation_CreateHardLink) Descriptor() ([]byte, []int) { - return file_log_proto_rawDescGZIP(), []int{0, 3, 0} + return file_log_proto_rawDescGZIP(), []int{0, 4, 0} } func (x *LogEntry_Operation_CreateHardLink) GetSourcePath() []byte { @@ -770,7 +818,7 @@ type LogEntry_Operation_RemoveDirectoryEntry struct { func (x *LogEntry_Operation_RemoveDirectoryEntry) Reset() { *x = LogEntry_Operation_RemoveDirectoryEntry{} if protoimpl.UnsafeEnabled { - mi := &file_log_proto_msgTypes[11] + mi := &file_log_proto_msgTypes[12] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -783,7 +831,7 @@ func (x *LogEntry_Operation_RemoveDirectoryEntry) String() string { func (*LogEntry_Operation_RemoveDirectoryEntry) ProtoMessage() {} func (x *LogEntry_Operation_RemoveDirectoryEntry) ProtoReflect() protoreflect.Message { - mi := &file_log_proto_msgTypes[11] + mi := &file_log_proto_msgTypes[12] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -796,7 +844,7 @@ func (x *LogEntry_Operation_RemoveDirectoryEntry) ProtoReflect() protoreflect.Me // Deprecated: Use LogEntry_Operation_RemoveDirectoryEntry.ProtoReflect.Descriptor instead. func (*LogEntry_Operation_RemoveDirectoryEntry) Descriptor() ([]byte, []int) { - return file_log_proto_rawDescGZIP(), []int{0, 3, 1} + return file_log_proto_rawDescGZIP(), []int{0, 4, 1} } func (x *LogEntry_Operation_RemoveDirectoryEntry) GetPath() []byte { @@ -821,7 +869,7 @@ type LogEntry_Operation_CreateDirectory struct { func (x *LogEntry_Operation_CreateDirectory) Reset() { *x = LogEntry_Operation_CreateDirectory{} if protoimpl.UnsafeEnabled { - mi := &file_log_proto_msgTypes[12] + mi := &file_log_proto_msgTypes[13] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -834,7 +882,7 @@ func (x *LogEntry_Operation_CreateDirectory) String() string { func (*LogEntry_Operation_CreateDirectory) ProtoMessage() {} func (x *LogEntry_Operation_CreateDirectory) ProtoReflect() protoreflect.Message { - mi := &file_log_proto_msgTypes[12] + mi := &file_log_proto_msgTypes[13] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -847,7 +895,7 @@ func (x *LogEntry_Operation_CreateDirectory) ProtoReflect() protoreflect.Message // Deprecated: Use LogEntry_Operation_CreateDirectory.ProtoReflect.Descriptor instead. func (*LogEntry_Operation_CreateDirectory) Descriptor() ([]byte, []int) { - return file_log_proto_rawDescGZIP(), []int{0, 3, 2} + return file_log_proto_rawDescGZIP(), []int{0, 4, 2} } func (x *LogEntry_Operation_CreateDirectory) GetPath() []byte { @@ -879,7 +927,7 @@ type LogEntry_Operation_SetKey struct { func (x *LogEntry_Operation_SetKey) Reset() { *x = LogEntry_Operation_SetKey{} if protoimpl.UnsafeEnabled { - mi := &file_log_proto_msgTypes[13] + mi := &file_log_proto_msgTypes[14] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -892,7 +940,7 @@ func (x *LogEntry_Operation_SetKey) String() string { func (*LogEntry_Operation_SetKey) ProtoMessage() {} func (x *LogEntry_Operation_SetKey) ProtoReflect() protoreflect.Message { - mi := &file_log_proto_msgTypes[13] + mi := &file_log_proto_msgTypes[14] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -905,7 +953,7 @@ func (x *LogEntry_Operation_SetKey) ProtoReflect() protoreflect.Message { // Deprecated: Use LogEntry_Operation_SetKey.ProtoReflect.Descriptor instead. func (*LogEntry_Operation_SetKey) Descriptor() ([]byte, []int) { - return file_log_proto_rawDescGZIP(), []int{0, 3, 3} + return file_log_proto_rawDescGZIP(), []int{0, 4, 3} } func (x *LogEntry_Operation_SetKey) GetKey() []byte { @@ -935,7 +983,7 @@ type LogEntry_Operation_DeleteKey struct { func (x *LogEntry_Operation_DeleteKey) Reset() { *x = LogEntry_Operation_DeleteKey{} if protoimpl.UnsafeEnabled { - mi := &file_log_proto_msgTypes[14] + mi := &file_log_proto_msgTypes[15] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -948,7 +996,7 @@ func (x *LogEntry_Operation_DeleteKey) String() string { func (*LogEntry_Operation_DeleteKey) ProtoMessage() {} func (x *LogEntry_Operation_DeleteKey) ProtoReflect() protoreflect.Message { - mi := &file_log_proto_msgTypes[14] + mi := &file_log_proto_msgTypes[15] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -961,7 +1009,7 @@ func (x *LogEntry_Operation_DeleteKey) ProtoReflect() protoreflect.Message { // Deprecated: Use LogEntry_Operation_DeleteKey.ProtoReflect.Descriptor instead. func (*LogEntry_Operation_DeleteKey) Descriptor() ([]byte, []int) { - return file_log_proto_rawDescGZIP(), []int{0, 3, 4} + return file_log_proto_rawDescGZIP(), []int{0, 4, 4} } func (x *LogEntry_Operation_DeleteKey) GetKey() []byte { @@ -975,7 +1023,7 @@ var File_log_proto protoreflect.FileDescriptor var file_log_proto_rawDesc = []byte{ 0x0a, 0x09, 0x6c, 0x6f, 0x67, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x67, 0x69, 0x74, - 0x61, 0x6c, 0x79, 0x22, 0xe7, 0x0d, 0x0a, 0x08, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, + 0x61, 0x6c, 0x79, 0x22, 0xd3, 0x0e, 0x0a, 0x08, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x6c, 0x61, 0x74, 0x69, 0x76, 0x65, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, 0x6c, 0x61, 0x74, 0x69, 0x76, 0x65, 0x50, 0x61, 0x74, 0x68, 0x12, 0x5c, 0x0a, 0x16, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, @@ -989,109 +1037,116 @@ var file_log_proto_rawDesc = []byte{ 0x32, 0x23, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x12, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, - 0x79, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x41, 0x0a, 0x0c, 0x68, 0x6f, 0x75, - 0x73, 0x65, 0x6b, 0x65, 0x65, 0x70, 0x69, 0x6e, 0x67, 0x18, 0x09, 0x20, 0x01, 0x28, 0x0b, 0x32, - 0x1d, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, - 0x79, 0x2e, 0x48, 0x6f, 0x75, 0x73, 0x65, 0x6b, 0x65, 0x65, 0x70, 0x69, 0x6e, 0x67, 0x52, 0x0c, - 0x68, 0x6f, 0x75, 0x73, 0x65, 0x6b, 0x65, 0x65, 0x70, 0x69, 0x6e, 0x67, 0x12, 0x3a, 0x0a, 0x0a, - 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x0a, 0x20, 0x03, 0x28, 0x0b, - 0x32, 0x1a, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, - 0x72, 0x79, 0x2e, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x0a, 0x6f, 0x70, - 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x1a, 0xc7, 0x01, 0x0a, 0x14, 0x52, 0x65, 0x66, - 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, - 0x6e, 0x12, 0x46, 0x0a, 0x07, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, - 0x28, 0x0b, 0x32, 0x2c, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4c, 0x6f, 0x67, 0x45, - 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x52, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x54, 0x72, - 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, - 0x52, 0x07, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x73, 0x1a, 0x67, 0x0a, 0x06, 0x43, 0x68, 0x61, - 0x6e, 0x67, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, - 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0d, 0x72, 0x65, 0x66, - 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x17, 0x0a, 0x07, 0x6e, 0x65, - 0x77, 0x5f, 0x6f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x6e, 0x65, 0x77, - 0x4f, 0x69, 0x64, 0x12, 0x1d, 0x0a, 0x0a, 0x6e, 0x65, 0x77, 0x5f, 0x74, 0x61, 0x72, 0x67, 0x65, - 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x6e, 0x65, 0x77, 0x54, 0x61, 0x72, 0x67, - 0x65, 0x74, 0x1a, 0x14, 0x0a, 0x12, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, - 0x44, 0x65, 0x6c, 0x65, 0x74, 0x69, 0x6f, 0x6e, 0x1a, 0xa6, 0x03, 0x0a, 0x0c, 0x48, 0x6f, 0x75, - 0x73, 0x65, 0x6b, 0x65, 0x65, 0x70, 0x69, 0x6e, 0x67, 0x12, 0x43, 0x0a, 0x09, 0x70, 0x61, 0x63, - 0x6b, 0x5f, 0x72, 0x65, 0x66, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x26, 0x2e, 0x67, - 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x48, - 0x6f, 0x75, 0x73, 0x65, 0x6b, 0x65, 0x65, 0x70, 0x69, 0x6e, 0x67, 0x2e, 0x50, 0x61, 0x63, 0x6b, - 0x52, 0x65, 0x66, 0x73, 0x52, 0x08, 0x70, 0x61, 0x63, 0x6b, 0x52, 0x65, 0x66, 0x73, 0x12, 0x3c, - 0x0a, 0x06, 0x72, 0x65, 0x70, 0x61, 0x63, 0x6b, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x24, - 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, - 0x2e, 0x48, 0x6f, 0x75, 0x73, 0x65, 0x6b, 0x65, 0x65, 0x70, 0x69, 0x6e, 0x67, 0x2e, 0x52, 0x65, - 0x70, 0x61, 0x63, 0x6b, 0x52, 0x06, 0x72, 0x65, 0x70, 0x61, 0x63, 0x6b, 0x12, 0x5f, 0x0a, 0x13, - 0x77, 0x72, 0x69, 0x74, 0x65, 0x5f, 0x63, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x5f, 0x67, 0x72, 0x61, - 0x70, 0x68, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2f, 0x2e, 0x67, 0x69, 0x74, 0x61, - 0x6c, 0x79, 0x2e, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x48, 0x6f, 0x75, 0x73, - 0x65, 0x6b, 0x65, 0x65, 0x70, 0x69, 0x6e, 0x67, 0x2e, 0x57, 0x72, 0x69, 0x74, 0x65, 0x43, 0x6f, - 0x6d, 0x6d, 0x69, 0x74, 0x47, 0x72, 0x61, 0x70, 0x68, 0x73, 0x52, 0x11, 0x77, 0x72, 0x69, 0x74, - 0x65, 0x43, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x47, 0x72, 0x61, 0x70, 0x68, 0x73, 0x1a, 0x2b, 0x0a, - 0x08, 0x50, 0x61, 0x63, 0x6b, 0x52, 0x65, 0x66, 0x73, 0x12, 0x1f, 0x0a, 0x0b, 0x70, 0x72, 0x75, - 0x6e, 0x65, 0x64, 0x5f, 0x72, 0x65, 0x66, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0c, 0x52, 0x0a, - 0x70, 0x72, 0x75, 0x6e, 0x65, 0x64, 0x52, 0x65, 0x66, 0x73, 0x1a, 0x70, 0x0a, 0x06, 0x52, 0x65, - 0x70, 0x61, 0x63, 0x6b, 0x12, 0x1b, 0x0a, 0x09, 0x6e, 0x65, 0x77, 0x5f, 0x66, 0x69, 0x6c, 0x65, - 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x08, 0x6e, 0x65, 0x77, 0x46, 0x69, 0x6c, 0x65, - 0x73, 0x12, 0x23, 0x0a, 0x0d, 0x64, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x5f, 0x66, 0x69, 0x6c, - 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0c, 0x64, 0x65, 0x6c, 0x65, 0x74, 0x65, - 0x64, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x12, 0x24, 0x0a, 0x0e, 0x69, 0x73, 0x5f, 0x66, 0x75, 0x6c, - 0x6c, 0x5f, 0x72, 0x65, 0x70, 0x61, 0x63, 0x6b, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0c, - 0x69, 0x73, 0x46, 0x75, 0x6c, 0x6c, 0x52, 0x65, 0x70, 0x61, 0x63, 0x6b, 0x1a, 0x13, 0x0a, 0x11, - 0x57, 0x72, 0x69, 0x74, 0x65, 0x43, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x47, 0x72, 0x61, 0x70, 0x68, - 0x73, 0x1a, 0xf9, 0x05, 0x0a, 0x09, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, - 0x55, 0x0a, 0x10, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x5f, 0x68, 0x61, 0x72, 0x64, 0x5f, 0x6c, - 0x69, 0x6e, 0x6b, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x29, 0x2e, 0x67, 0x69, 0x74, 0x61, - 0x6c, 0x79, 0x2e, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x4f, 0x70, 0x65, 0x72, - 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x48, 0x61, 0x72, 0x64, - 0x4c, 0x69, 0x6e, 0x6b, 0x48, 0x00, 0x52, 0x0e, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x48, 0x61, - 0x72, 0x64, 0x4c, 0x69, 0x6e, 0x6b, 0x12, 0x67, 0x0a, 0x16, 0x72, 0x65, 0x6d, 0x6f, 0x76, 0x65, - 0x5f, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x79, 0x5f, 0x65, 0x6e, 0x74, 0x72, 0x79, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2f, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, + 0x79, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x54, 0x0a, 0x13, 0x72, 0x65, 0x70, + 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x5f, 0x63, 0x72, 0x65, 0x61, 0x74, 0x69, 0x6f, 0x6e, + 0x18, 0x0b, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x23, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, + 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, + 0x6f, 0x72, 0x79, 0x43, 0x72, 0x65, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x12, 0x72, 0x65, 0x70, + 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x43, 0x72, 0x65, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, + 0x41, 0x0a, 0x0c, 0x68, 0x6f, 0x75, 0x73, 0x65, 0x6b, 0x65, 0x65, 0x70, 0x69, 0x6e, 0x67, 0x18, + 0x09, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4c, + 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x48, 0x6f, 0x75, 0x73, 0x65, 0x6b, 0x65, 0x65, + 0x70, 0x69, 0x6e, 0x67, 0x52, 0x0c, 0x68, 0x6f, 0x75, 0x73, 0x65, 0x6b, 0x65, 0x65, 0x70, 0x69, + 0x6e, 0x67, 0x12, 0x3a, 0x0a, 0x0a, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, + 0x18, 0x0a, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, + 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, + 0x6f, 0x6e, 0x52, 0x0a, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x1a, 0xc7, + 0x01, 0x0a, 0x14, 0x52, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, + 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x46, 0x0a, 0x07, 0x63, 0x68, 0x61, 0x6e, 0x67, + 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2c, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, + 0x79, 0x2e, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x52, 0x65, 0x66, 0x65, 0x72, + 0x65, 0x6e, 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2e, + 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x52, 0x07, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x73, 0x1a, + 0x67, 0x0a, 0x06, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x72, 0x65, 0x66, + 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x0c, 0x52, 0x0d, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x4e, 0x61, 0x6d, 0x65, + 0x12, 0x17, 0x0a, 0x07, 0x6e, 0x65, 0x77, 0x5f, 0x6f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x0c, 0x52, 0x06, 0x6e, 0x65, 0x77, 0x4f, 0x69, 0x64, 0x12, 0x1d, 0x0a, 0x0a, 0x6e, 0x65, 0x77, + 0x5f, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x6e, + 0x65, 0x77, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74, 0x1a, 0x14, 0x0a, 0x12, 0x52, 0x65, 0x70, 0x6f, + 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x69, 0x6f, 0x6e, 0x1a, 0x14, + 0x0a, 0x12, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x43, 0x72, 0x65, 0x61, + 0x74, 0x69, 0x6f, 0x6e, 0x1a, 0xa6, 0x03, 0x0a, 0x0c, 0x48, 0x6f, 0x75, 0x73, 0x65, 0x6b, 0x65, + 0x65, 0x70, 0x69, 0x6e, 0x67, 0x12, 0x43, 0x0a, 0x09, 0x70, 0x61, 0x63, 0x6b, 0x5f, 0x72, 0x65, + 0x66, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x26, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, + 0x79, 0x2e, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x48, 0x6f, 0x75, 0x73, 0x65, + 0x6b, 0x65, 0x65, 0x70, 0x69, 0x6e, 0x67, 0x2e, 0x50, 0x61, 0x63, 0x6b, 0x52, 0x65, 0x66, 0x73, + 0x52, 0x08, 0x70, 0x61, 0x63, 0x6b, 0x52, 0x65, 0x66, 0x73, 0x12, 0x3c, 0x0a, 0x06, 0x72, 0x65, + 0x70, 0x61, 0x63, 0x6b, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x24, 0x2e, 0x67, 0x69, 0x74, + 0x61, 0x6c, 0x79, 0x2e, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x48, 0x6f, 0x75, + 0x73, 0x65, 0x6b, 0x65, 0x65, 0x70, 0x69, 0x6e, 0x67, 0x2e, 0x52, 0x65, 0x70, 0x61, 0x63, 0x6b, + 0x52, 0x06, 0x72, 0x65, 0x70, 0x61, 0x63, 0x6b, 0x12, 0x5f, 0x0a, 0x13, 0x77, 0x72, 0x69, 0x74, + 0x65, 0x5f, 0x63, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x5f, 0x67, 0x72, 0x61, 0x70, 0x68, 0x73, 0x18, + 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2f, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4c, + 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x48, 0x6f, 0x75, 0x73, 0x65, 0x6b, 0x65, 0x65, + 0x70, 0x69, 0x6e, 0x67, 0x2e, 0x57, 0x72, 0x69, 0x74, 0x65, 0x43, 0x6f, 0x6d, 0x6d, 0x69, 0x74, + 0x47, 0x72, 0x61, 0x70, 0x68, 0x73, 0x52, 0x11, 0x77, 0x72, 0x69, 0x74, 0x65, 0x43, 0x6f, 0x6d, + 0x6d, 0x69, 0x74, 0x47, 0x72, 0x61, 0x70, 0x68, 0x73, 0x1a, 0x2b, 0x0a, 0x08, 0x50, 0x61, 0x63, + 0x6b, 0x52, 0x65, 0x66, 0x73, 0x12, 0x1f, 0x0a, 0x0b, 0x70, 0x72, 0x75, 0x6e, 0x65, 0x64, 0x5f, + 0x72, 0x65, 0x66, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0c, 0x52, 0x0a, 0x70, 0x72, 0x75, 0x6e, + 0x65, 0x64, 0x52, 0x65, 0x66, 0x73, 0x1a, 0x70, 0x0a, 0x06, 0x52, 0x65, 0x70, 0x61, 0x63, 0x6b, + 0x12, 0x1b, 0x0a, 0x09, 0x6e, 0x65, 0x77, 0x5f, 0x66, 0x69, 0x6c, 0x65, 0x73, 0x18, 0x01, 0x20, + 0x03, 0x28, 0x09, 0x52, 0x08, 0x6e, 0x65, 0x77, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x12, 0x23, 0x0a, + 0x0d, 0x64, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x5f, 0x66, 0x69, 0x6c, 0x65, 0x73, 0x18, 0x02, + 0x20, 0x03, 0x28, 0x09, 0x52, 0x0c, 0x64, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x46, 0x69, 0x6c, + 0x65, 0x73, 0x12, 0x24, 0x0a, 0x0e, 0x69, 0x73, 0x5f, 0x66, 0x75, 0x6c, 0x6c, 0x5f, 0x72, 0x65, + 0x70, 0x61, 0x63, 0x6b, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0c, 0x69, 0x73, 0x46, 0x75, + 0x6c, 0x6c, 0x52, 0x65, 0x70, 0x61, 0x63, 0x6b, 0x1a, 0x13, 0x0a, 0x11, 0x57, 0x72, 0x69, 0x74, + 0x65, 0x43, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x47, 0x72, 0x61, 0x70, 0x68, 0x73, 0x1a, 0xf9, 0x05, + 0x0a, 0x09, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x55, 0x0a, 0x10, 0x63, + 0x72, 0x65, 0x61, 0x74, 0x65, 0x5f, 0x68, 0x61, 0x72, 0x64, 0x5f, 0x6c, 0x69, 0x6e, 0x6b, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x29, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4c, + 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x48, 0x61, 0x72, 0x64, 0x4c, 0x69, 0x6e, 0x6b, + 0x48, 0x00, 0x52, 0x0e, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x48, 0x61, 0x72, 0x64, 0x4c, 0x69, + 0x6e, 0x6b, 0x12, 0x67, 0x0a, 0x16, 0x72, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x5f, 0x64, 0x69, 0x72, + 0x65, 0x63, 0x74, 0x6f, 0x72, 0x79, 0x5f, 0x65, 0x6e, 0x74, 0x72, 0x79, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x2f, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4c, 0x6f, 0x67, 0x45, + 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x52, + 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x44, 0x69, 0x72, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x79, 0x45, 0x6e, + 0x74, 0x72, 0x79, 0x48, 0x00, 0x52, 0x14, 0x72, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x44, 0x69, 0x72, + 0x65, 0x63, 0x74, 0x6f, 0x72, 0x79, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x57, 0x0a, 0x10, 0x63, + 0x72, 0x65, 0x61, 0x74, 0x65, 0x5f, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x79, 0x18, + 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2a, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4c, + 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x44, 0x69, 0x72, 0x65, 0x63, 0x74, 0x6f, 0x72, + 0x79, 0x48, 0x00, 0x52, 0x0f, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x44, 0x69, 0x72, 0x65, 0x63, + 0x74, 0x6f, 0x72, 0x79, 0x12, 0x3c, 0x0a, 0x07, 0x73, 0x65, 0x74, 0x5f, 0x6b, 0x65, 0x79, 0x18, + 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4c, + 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x2e, 0x53, 0x65, 0x74, 0x4b, 0x65, 0x79, 0x48, 0x00, 0x52, 0x06, 0x73, 0x65, 0x74, 0x4b, + 0x65, 0x79, 0x12, 0x45, 0x0a, 0x0a, 0x64, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x5f, 0x6b, 0x65, 0x79, + 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x24, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, - 0x6f, 0x6e, 0x2e, 0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x44, 0x69, 0x72, 0x65, 0x63, 0x74, 0x6f, - 0x72, 0x79, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x48, 0x00, 0x52, 0x14, 0x72, 0x65, 0x6d, 0x6f, 0x76, - 0x65, 0x44, 0x69, 0x72, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x79, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, - 0x57, 0x0a, 0x10, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x5f, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, - 0x6f, 0x72, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2a, 0x2e, 0x67, 0x69, 0x74, 0x61, - 0x6c, 0x79, 0x2e, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x4f, 0x70, 0x65, 0x72, - 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x44, 0x69, 0x72, 0x65, - 0x63, 0x74, 0x6f, 0x72, 0x79, 0x48, 0x00, 0x52, 0x0f, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x44, - 0x69, 0x72, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x79, 0x12, 0x3c, 0x0a, 0x07, 0x73, 0x65, 0x74, 0x5f, - 0x6b, 0x65, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x67, 0x69, 0x74, 0x61, - 0x6c, 0x79, 0x2e, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x4f, 0x70, 0x65, 0x72, - 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x53, 0x65, 0x74, 0x4b, 0x65, 0x79, 0x48, 0x00, 0x52, 0x06, - 0x73, 0x65, 0x74, 0x4b, 0x65, 0x79, 0x12, 0x45, 0x0a, 0x0a, 0x64, 0x65, 0x6c, 0x65, 0x74, 0x65, - 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x24, 0x2e, 0x67, 0x69, 0x74, - 0x61, 0x6c, 0x79, 0x2e, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x4f, 0x70, 0x65, - 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x4b, 0x65, 0x79, - 0x48, 0x00, 0x52, 0x09, 0x64, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x1a, 0x88, 0x01, - 0x0a, 0x0e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x48, 0x61, 0x72, 0x64, 0x4c, 0x69, 0x6e, 0x6b, - 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x50, 0x61, 0x74, - 0x68, 0x12, 0x2a, 0x0a, 0x11, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x69, 0x6e, 0x5f, 0x73, - 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0f, 0x73, 0x6f, - 0x75, 0x72, 0x63, 0x65, 0x49, 0x6e, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x12, 0x29, 0x0a, - 0x10, 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x70, 0x61, 0x74, - 0x68, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0f, 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, - 0x74, 0x69, 0x6f, 0x6e, 0x50, 0x61, 0x74, 0x68, 0x1a, 0x2a, 0x0a, 0x14, 0x52, 0x65, 0x6d, 0x6f, - 0x76, 0x65, 0x44, 0x69, 0x72, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x79, 0x45, 0x6e, 0x74, 0x72, 0x79, - 0x12, 0x12, 0x0a, 0x04, 0x70, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, - 0x70, 0x61, 0x74, 0x68, 0x1a, 0x39, 0x0a, 0x0f, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x44, 0x69, - 0x72, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x79, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x61, 0x74, 0x68, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x70, 0x61, 0x74, 0x68, 0x12, 0x12, 0x0a, 0x04, 0x6d, - 0x6f, 0x64, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x04, 0x6d, 0x6f, 0x64, 0x65, 0x1a, - 0x30, 0x0a, 0x06, 0x53, 0x65, 0x74, 0x4b, 0x65, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, - 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, - 0x65, 0x1a, 0x1d, 0x0a, 0x09, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x12, 0x10, - 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x6b, 0x65, 0x79, - 0x42, 0x0b, 0x0a, 0x09, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x1b, 0x0a, - 0x03, 0x4c, 0x53, 0x4e, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x04, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, - 0x74, 0x6c, 0x61, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2d, - 0x6f, 0x72, 0x67, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2f, 0x76, 0x31, 0x36, 0x2f, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x70, 0x62, - 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x6f, 0x6e, 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x48, 0x00, 0x52, 0x09, + 0x64, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x1a, 0x88, 0x01, 0x0a, 0x0e, 0x43, 0x72, + 0x65, 0x61, 0x74, 0x65, 0x48, 0x61, 0x72, 0x64, 0x4c, 0x69, 0x6e, 0x6b, 0x12, 0x1f, 0x0a, 0x0b, + 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x0c, 0x52, 0x0a, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x50, 0x61, 0x74, 0x68, 0x12, 0x2a, 0x0a, + 0x11, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x69, 0x6e, 0x5f, 0x73, 0x74, 0x6f, 0x72, 0x61, + 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0f, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, + 0x49, 0x6e, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x12, 0x29, 0x0a, 0x10, 0x64, 0x65, 0x73, + 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x0c, 0x52, 0x0f, 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, + 0x50, 0x61, 0x74, 0x68, 0x1a, 0x2a, 0x0a, 0x14, 0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x44, 0x69, + 0x72, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x79, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x12, 0x0a, 0x04, + 0x70, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x70, 0x61, 0x74, 0x68, + 0x1a, 0x39, 0x0a, 0x0f, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x44, 0x69, 0x72, 0x65, 0x63, 0x74, + 0x6f, 0x72, 0x79, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x0c, 0x52, 0x04, 0x70, 0x61, 0x74, 0x68, 0x12, 0x12, 0x0a, 0x04, 0x6d, 0x6f, 0x64, 0x65, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x04, 0x6d, 0x6f, 0x64, 0x65, 0x1a, 0x30, 0x0a, 0x06, 0x53, + 0x65, 0x74, 0x4b, 0x65, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x0c, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x1a, 0x1d, 0x0a, + 0x09, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, + 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x42, 0x0b, 0x0a, 0x09, + 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x1b, 0x0a, 0x03, 0x4c, 0x53, 0x4e, + 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, + 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, + 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2d, 0x6f, 0x72, 0x67, 0x2f, + 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2f, 0x76, 0x31, 0x36, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x2f, 0x67, 0x6f, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -1106,43 +1161,45 @@ func file_log_proto_rawDescGZIP() []byte { return file_log_proto_rawDescData } -var file_log_proto_msgTypes = make([]protoimpl.MessageInfo, 15) +var file_log_proto_msgTypes = make([]protoimpl.MessageInfo, 16) var file_log_proto_goTypes = []any{ (*LogEntry)(nil), // 0: gitaly.LogEntry (*LSN)(nil), // 1: gitaly.LSN (*LogEntry_ReferenceTransaction)(nil), // 2: gitaly.LogEntry.ReferenceTransaction (*LogEntry_RepositoryDeletion)(nil), // 3: gitaly.LogEntry.RepositoryDeletion - (*LogEntry_Housekeeping)(nil), // 4: gitaly.LogEntry.Housekeeping - (*LogEntry_Operation)(nil), // 5: gitaly.LogEntry.Operation - (*LogEntry_ReferenceTransaction_Change)(nil), // 6: gitaly.LogEntry.ReferenceTransaction.Change - (*LogEntry_Housekeeping_PackRefs)(nil), // 7: gitaly.LogEntry.Housekeeping.PackRefs - (*LogEntry_Housekeeping_Repack)(nil), // 8: gitaly.LogEntry.Housekeeping.Repack - (*LogEntry_Housekeeping_WriteCommitGraphs)(nil), // 9: gitaly.LogEntry.Housekeeping.WriteCommitGraphs - (*LogEntry_Operation_CreateHardLink)(nil), // 10: gitaly.LogEntry.Operation.CreateHardLink - (*LogEntry_Operation_RemoveDirectoryEntry)(nil), // 11: gitaly.LogEntry.Operation.RemoveDirectoryEntry - (*LogEntry_Operation_CreateDirectory)(nil), // 12: gitaly.LogEntry.Operation.CreateDirectory - (*LogEntry_Operation_SetKey)(nil), // 13: gitaly.LogEntry.Operation.SetKey - (*LogEntry_Operation_DeleteKey)(nil), // 14: gitaly.LogEntry.Operation.DeleteKey + (*LogEntry_RepositoryCreation)(nil), // 4: gitaly.LogEntry.RepositoryCreation + (*LogEntry_Housekeeping)(nil), // 5: gitaly.LogEntry.Housekeeping + (*LogEntry_Operation)(nil), // 6: gitaly.LogEntry.Operation + (*LogEntry_ReferenceTransaction_Change)(nil), // 7: gitaly.LogEntry.ReferenceTransaction.Change + (*LogEntry_Housekeeping_PackRefs)(nil), // 8: gitaly.LogEntry.Housekeeping.PackRefs + (*LogEntry_Housekeeping_Repack)(nil), // 9: gitaly.LogEntry.Housekeeping.Repack + (*LogEntry_Housekeeping_WriteCommitGraphs)(nil), // 10: gitaly.LogEntry.Housekeeping.WriteCommitGraphs + (*LogEntry_Operation_CreateHardLink)(nil), // 11: gitaly.LogEntry.Operation.CreateHardLink + (*LogEntry_Operation_RemoveDirectoryEntry)(nil), // 12: gitaly.LogEntry.Operation.RemoveDirectoryEntry + (*LogEntry_Operation_CreateDirectory)(nil), // 13: gitaly.LogEntry.Operation.CreateDirectory + (*LogEntry_Operation_SetKey)(nil), // 14: gitaly.LogEntry.Operation.SetKey + (*LogEntry_Operation_DeleteKey)(nil), // 15: gitaly.LogEntry.Operation.DeleteKey } var file_log_proto_depIdxs = []int32{ 2, // 0: gitaly.LogEntry.reference_transactions:type_name -> gitaly.LogEntry.ReferenceTransaction 3, // 1: gitaly.LogEntry.repository_deletion:type_name -> gitaly.LogEntry.RepositoryDeletion - 4, // 2: gitaly.LogEntry.housekeeping:type_name -> gitaly.LogEntry.Housekeeping - 5, // 3: gitaly.LogEntry.operations:type_name -> gitaly.LogEntry.Operation - 6, // 4: gitaly.LogEntry.ReferenceTransaction.changes:type_name -> gitaly.LogEntry.ReferenceTransaction.Change - 7, // 5: gitaly.LogEntry.Housekeeping.pack_refs:type_name -> gitaly.LogEntry.Housekeeping.PackRefs - 8, // 6: gitaly.LogEntry.Housekeeping.repack:type_name -> gitaly.LogEntry.Housekeeping.Repack - 9, // 7: gitaly.LogEntry.Housekeeping.write_commit_graphs:type_name -> gitaly.LogEntry.Housekeeping.WriteCommitGraphs - 10, // 8: gitaly.LogEntry.Operation.create_hard_link:type_name -> gitaly.LogEntry.Operation.CreateHardLink - 11, // 9: gitaly.LogEntry.Operation.remove_directory_entry:type_name -> gitaly.LogEntry.Operation.RemoveDirectoryEntry - 12, // 10: gitaly.LogEntry.Operation.create_directory:type_name -> gitaly.LogEntry.Operation.CreateDirectory - 13, // 11: gitaly.LogEntry.Operation.set_key:type_name -> gitaly.LogEntry.Operation.SetKey - 14, // 12: gitaly.LogEntry.Operation.delete_key:type_name -> gitaly.LogEntry.Operation.DeleteKey - 13, // [13:13] is the sub-list for method output_type - 13, // [13:13] is the sub-list for method input_type - 13, // [13:13] is the sub-list for extension type_name - 13, // [13:13] is the sub-list for extension extendee - 0, // [0:13] is the sub-list for field type_name + 4, // 2: gitaly.LogEntry.repository_creation:type_name -> gitaly.LogEntry.RepositoryCreation + 5, // 3: gitaly.LogEntry.housekeeping:type_name -> gitaly.LogEntry.Housekeeping + 6, // 4: gitaly.LogEntry.operations:type_name -> gitaly.LogEntry.Operation + 7, // 5: gitaly.LogEntry.ReferenceTransaction.changes:type_name -> gitaly.LogEntry.ReferenceTransaction.Change + 8, // 6: gitaly.LogEntry.Housekeeping.pack_refs:type_name -> gitaly.LogEntry.Housekeeping.PackRefs + 9, // 7: gitaly.LogEntry.Housekeeping.repack:type_name -> gitaly.LogEntry.Housekeeping.Repack + 10, // 8: gitaly.LogEntry.Housekeeping.write_commit_graphs:type_name -> gitaly.LogEntry.Housekeeping.WriteCommitGraphs + 11, // 9: gitaly.LogEntry.Operation.create_hard_link:type_name -> gitaly.LogEntry.Operation.CreateHardLink + 12, // 10: gitaly.LogEntry.Operation.remove_directory_entry:type_name -> gitaly.LogEntry.Operation.RemoveDirectoryEntry + 13, // 11: gitaly.LogEntry.Operation.create_directory:type_name -> gitaly.LogEntry.Operation.CreateDirectory + 14, // 12: gitaly.LogEntry.Operation.set_key:type_name -> gitaly.LogEntry.Operation.SetKey + 15, // 13: gitaly.LogEntry.Operation.delete_key:type_name -> gitaly.LogEntry.Operation.DeleteKey + 14, // [14:14] is the sub-list for method output_type + 14, // [14:14] is the sub-list for method input_type + 14, // [14:14] is the sub-list for extension type_name + 14, // [14:14] is the sub-list for extension extendee + 0, // [0:14] is the sub-list for field type_name } func init() { file_log_proto_init() } @@ -1200,7 +1257,7 @@ func file_log_proto_init() { } } file_log_proto_msgTypes[4].Exporter = func(v any, i int) any { - switch v := v.(*LogEntry_Housekeeping); i { + switch v := v.(*LogEntry_RepositoryCreation); i { case 0: return &v.state case 1: @@ -1212,7 +1269,7 @@ func file_log_proto_init() { } } file_log_proto_msgTypes[5].Exporter = func(v any, i int) any { - switch v := v.(*LogEntry_Operation); i { + switch v := v.(*LogEntry_Housekeeping); i { case 0: return &v.state case 1: @@ -1224,7 +1281,7 @@ func file_log_proto_init() { } } file_log_proto_msgTypes[6].Exporter = func(v any, i int) any { - switch v := v.(*LogEntry_ReferenceTransaction_Change); i { + switch v := v.(*LogEntry_Operation); i { case 0: return &v.state case 1: @@ -1236,7 +1293,7 @@ func file_log_proto_init() { } } file_log_proto_msgTypes[7].Exporter = func(v any, i int) any { - switch v := v.(*LogEntry_Housekeeping_PackRefs); i { + switch v := v.(*LogEntry_ReferenceTransaction_Change); i { case 0: return &v.state case 1: @@ -1248,7 +1305,7 @@ func file_log_proto_init() { } } file_log_proto_msgTypes[8].Exporter = func(v any, i int) any { - switch v := v.(*LogEntry_Housekeeping_Repack); i { + switch v := v.(*LogEntry_Housekeeping_PackRefs); i { case 0: return &v.state case 1: @@ -1260,7 +1317,7 @@ func file_log_proto_init() { } } file_log_proto_msgTypes[9].Exporter = func(v any, i int) any { - switch v := v.(*LogEntry_Housekeeping_WriteCommitGraphs); i { + switch v := v.(*LogEntry_Housekeeping_Repack); i { case 0: return &v.state case 1: @@ -1272,7 +1329,7 @@ func file_log_proto_init() { } } file_log_proto_msgTypes[10].Exporter = func(v any, i int) any { - switch v := v.(*LogEntry_Operation_CreateHardLink); i { + switch v := v.(*LogEntry_Housekeeping_WriteCommitGraphs); i { case 0: return &v.state case 1: @@ -1284,7 +1341,7 @@ func file_log_proto_init() { } } file_log_proto_msgTypes[11].Exporter = func(v any, i int) any { - switch v := v.(*LogEntry_Operation_RemoveDirectoryEntry); i { + switch v := v.(*LogEntry_Operation_CreateHardLink); i { case 0: return &v.state case 1: @@ -1296,7 +1353,7 @@ func file_log_proto_init() { } } file_log_proto_msgTypes[12].Exporter = func(v any, i int) any { - switch v := v.(*LogEntry_Operation_CreateDirectory); i { + switch v := v.(*LogEntry_Operation_RemoveDirectoryEntry); i { case 0: return &v.state case 1: @@ -1308,7 +1365,7 @@ func file_log_proto_init() { } } file_log_proto_msgTypes[13].Exporter = func(v any, i int) any { - switch v := v.(*LogEntry_Operation_SetKey); i { + switch v := v.(*LogEntry_Operation_CreateDirectory); i { case 0: return &v.state case 1: @@ -1320,6 +1377,18 @@ func file_log_proto_init() { } } file_log_proto_msgTypes[14].Exporter = func(v any, i int) any { + switch v := v.(*LogEntry_Operation_SetKey); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_log_proto_msgTypes[15].Exporter = func(v any, i int) any { switch v := v.(*LogEntry_Operation_DeleteKey); i { case 0: return &v.state @@ -1332,7 +1401,7 @@ func file_log_proto_init() { } } } - file_log_proto_msgTypes[5].OneofWrappers = []any{ + file_log_proto_msgTypes[6].OneofWrappers = []any{ (*LogEntry_Operation_CreateHardLink_)(nil), (*LogEntry_Operation_RemoveDirectoryEntry_)(nil), (*LogEntry_Operation_CreateDirectory_)(nil), @@ -1345,7 +1414,7 @@ func file_log_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_log_proto_rawDesc, NumEnums: 0, - NumMessages: 15, + NumMessages: 16, NumExtensions: 0, NumServices: 0, }, diff --git a/proto/log.proto b/proto/log.proto index fbf7879b79c664a5552a2faf42abd92d6fed16a4..e18ea34f9b82489ea4094eb3b4594fdf575d594e 100644 --- a/proto/log.proto +++ b/proto/log.proto @@ -34,6 +34,10 @@ message LogEntry { message RepositoryDeletion { } + // RepositoryCreation models a repository creation. + message RepositoryCreation { + } + // Housekeeping models a housekeeping run. It is supposed to handle housekeeping tasks for repositories such as the // cleanup of unneeded files and optimizations for the repository's data structures. It is a collection of smaller // tasks. @@ -84,6 +88,8 @@ message LogEntry { repeated ReferenceTransaction reference_transactions = 2; // repository_deletion, when set, indicates this log entry deletes the repository. RepositoryDeletion repository_deletion = 6; + // repository_creation, when set, indicates this log entry creates the repository. + RepositoryCreation repository_creation = 11; // housekeeping, when set, indicates this log entry contains a housekeeping task. Housekeeping housekeeping = 9;