diff --git a/internal/backup/log_entry_test.go b/internal/backup/log_entry_test.go index dffedfd45cfacbed43262ae476625a02bdf24c38..45911641e0a73a948b0439154a76ab8326874147 100644 --- a/internal/backup/log_entry_test.go +++ b/internal/backup/log_entry_test.go @@ -20,6 +20,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" "gitlab.com/gitlab-org/gitaly/v16/internal/helper" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) type mockLogManager struct { @@ -70,6 +71,10 @@ func (lm *mockLogManager) AcknowledgeTransaction(_ storagemgr.LogConsumer, lsn s } } +func (lm *mockLogManager) ReadEntry(lsn storage.LSN) (*gitalypb.LogEntry, error) { + return nil, nil +} + func (lm *mockLogManager) SendNotification() { n := lm.notifications[0] lm.archiver.NotifyNewTransactions(lm.partitionInfo.storageName, lm.partitionInfo.partitionID, n.lowWaterMark, n.highWaterMark) diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go index d94704aac6a91662c5e09f4ca9f7d9817434c416..4ec5b970f9a1467c253d119eb173aefa272f5998 100644 --- a/internal/cli/gitaly/serve.go +++ b/internal/cli/gitaly/serve.go @@ -392,12 +392,12 @@ func run(appCtx *cli.Context, cfg config.Cfg, logger log.Logger) error { return fmt.Errorf("resolving write-ahead log backup sink: %w", err) } - consumerFactory = func(lma storagemgr.LogManagerAccessor) (storagemgr.LogConsumer, func()) { + consumerFactory = func(lma storagemgr.LogManagerAccessor) storagemgr.LogConsumer { walArchiver := backup.NewLogEntryArchiver(logger, walSink, cfg.Backup.WALWorkerCount, lma) prometheus.MustRegister(walArchiver) walArchiver.Run() - return walArchiver, walArchiver.Close + return walArchiver } } diff --git a/internal/gitaly/storage/raft/logger.go b/internal/gitaly/storage/raft/logger.go index 733506e22fdd4881a607f65483d5d91174dfb79a..70d1c81f752759835ccbc0bf4bdb6125383adc79 100644 --- a/internal/gitaly/storage/raft/logger.go +++ b/internal/gitaly/storage/raft/logger.go @@ -73,7 +73,7 @@ func SetLogger(logger log.Logger, suppressDefaultLog bool) { name: pkgName, Logger: logger.WithFields(log.Fields{ "component": "raft", - "raft_component": strings.ToLower(pkgName), + "raft.component": strings.ToLower(pkgName), }), } }) diff --git a/internal/gitaly/storage/raft/manager.go b/internal/gitaly/storage/raft/manager.go index 6a068013da4f883ce95f365e394feb15e18b001f..14a708c6b52475ff3a8ace2cf19f2b9ff3dca757 100644 --- a/internal/gitaly/storage/raft/manager.go +++ b/internal/gitaly/storage/raft/manager.go @@ -79,9 +79,9 @@ func NewManager( managerConfig: managerCfg, logger: logger.WithFields(log.Fields{ "component": "raft", - "raft_component": "manager", - "raft_cluster_id": clusterCfg.ClusterID, - "raft_node_id": clusterCfg.NodeID, + "raft.component": "manager", + "raft.cluster_id": clusterCfg.ClusterID, + "raft.node_id": clusterCfg.NodeID, }), storageManagers: map[string]*storageManager{}, } @@ -96,7 +96,7 @@ func NewManager( DefaultNodeRegistryEnabled: false, EnableMetrics: true, RaftEventListener: &raftLogger{ - Logger: m.logger.WithField("raft_component", "system"), + Logger: m.logger.WithField("raft.component", "system"), }, Expert: managerCfg.expertConfig, }) @@ -138,8 +138,8 @@ func (m *Manager) Start() (returnedErr error) { }() m.logger.WithFields(log.Fields{ - "raft_config": m.clusterConfig, - "raft_manager_conf": m.managerConfig, + "raft.config": m.clusterConfig, + "raft.manager_conf": m.managerConfig, }).Info("Raft cluster is starting") // A Gitaly node contains multiple independent storages, and each storage maps to a dragonboat diff --git a/internal/gitaly/storage/raft/metadata_group.go b/internal/gitaly/storage/raft/metadata_group.go index 545bd6103e91eb71c21dbb6747b450860b6c35b9..bd69b8cc7aa8f4bab3752e1a67ddbeca1a4f6325 100644 --- a/internal/gitaly/storage/raft/metadata_group.go +++ b/internal/gitaly/storage/raft/metadata_group.go @@ -61,9 +61,9 @@ func newMetadataRaftGroup(ctx context.Context, nodeHost *dragonboat.NodeHost, db } groupLogger := logger.WithFields(log.Fields{ - "raft_group": "metadata", - "raft_group_id": MetadataGroupID, - "raft_replica_id": clusterCfg.NodeID, + "raft.group": "metadata", + "raft.group_id": MetadataGroupID, + "raft.replica_id": clusterCfg.NodeID, }) groupLogger.Info("joined metadata group") diff --git a/internal/gitaly/storage/storagemgr/partition_manager.go b/internal/gitaly/storage/storagemgr/partition_manager.go index 0aa49fcaf4945b90c8ffee93dbbfdd337de88281..dca012bc443494002ad4fb7700d3db4ae2658b11 100644 --- a/internal/gitaly/storage/storagemgr/partition_manager.go +++ b/internal/gitaly/storage/storagemgr/partition_manager.go @@ -54,8 +54,10 @@ type PartitionManager struct { // transactionManagerFactory is a factory to create TransactionManagers. This shouldn't ever be changed // during normal operation, but can be used to adjust the transaction manager's behaviour in tests. transactionManagerFactory transactionManagerFactory - // consumerCleanup closes the LogConsumer. - consumerCleanup func() + // consumer is a hook that will be passed to the each TransactionManager. The Transactionmanager + // notifies the consumer of new transactions by invoking the NotifyNewTransaction method after + // they are committed. + consumer LogConsumer // metrics accounts for all metrics of transaction operations. It will be // passed down to each transaction manager and is shared between them. The // metrics must be registered to be collected by prometheus collector. @@ -284,13 +286,10 @@ func NewPartitionManager( metrics: metrics, } - var logConsumer LogConsumer - var cleanup func() if consumerFactory != nil { - logConsumer, cleanup = consumerFactory(pm) + pm.consumer = consumerFactory(pm) } - pm.consumerCleanup = cleanup pm.transactionManagerFactory = func( logger log.Logger, partitionID storage.PartitionID, @@ -312,7 +311,7 @@ func NewPartitionManager( metrics.housekeeping, metrics.snapshot.Scope(storageMgr.name), ), - logConsumer, + pm.consumer, ) } @@ -449,6 +448,11 @@ func (pm *PartitionManager) CallLogManager(ctx context.Context, storageName stri return nil } +// GetLogConsumer returns the registered log consumer if any. +func (pm *PartitionManager) GetLogConsumer() LogConsumer { + return pm.consumer +} + // StorageKV executes the provided function against the storage's metadata DB. All write operations // issued by the function are committed in an atomic fashion. All read operations are performed // against a snapshot of the database. @@ -603,8 +607,8 @@ func (pm *PartitionManager) Close() { }() } - if pm.consumerCleanup != nil { - pm.consumerCleanup() + if pm.consumer != nil { + pm.consumer.Close() } activeStorages.Wait() diff --git a/internal/gitaly/storage/storagemgr/testhelper_test.go b/internal/gitaly/storage/storagemgr/testhelper_test.go index 9171c431df5c047b6d43425f267e93a6f1f4a7e5..39d021e4403a173da3f2af3e5bfa5e4fb4dede50 100644 --- a/internal/gitaly/storage/storagemgr/testhelper_test.go +++ b/internal/gitaly/storage/storagemgr/testhelper_test.go @@ -842,6 +842,16 @@ type ConsumerAcknowledge struct { LSN storage.LSN } +// ConsumerReadEntry asserts the entry returned to the consumer at a point of time. +type ConsumerReadEntry struct { + // LSN is the target LSN by the consumers. + LSN storage.LSN + // ExpectedEntry is the expected returned log entry. + ExpectedEntry *gitalypb.LogEntry + // ExpectedErr is the expected returned error string. + ExpectedErr string +} + // RemoveRepository removes the repository from the disk. It must be run with the TransactionManager // closed. type RemoveRepository struct{} @@ -897,6 +907,8 @@ func (lc *MockLogConsumer) NotifyNewTransactions(storageName string, partitionID lc.highWaterMark = highWaterMark } +func (lc *MockLogConsumer) Close() {} + // ConsumerState is used to track the log positions received by the consumer and the corresponding // acknowledgements from the consumer to the manager. We deliberately do not track the LowWaterMark // sent to consumers as this is non-deterministic. @@ -1354,6 +1366,15 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas transaction.WriteCommitGraphs(step.Config) case ConsumerAcknowledge: transactionManager.AcknowledgeTransaction(transactionManager.consumer, step.LSN) + case ConsumerReadEntry: + entry, err := transactionManager.ReadEntry(step.LSN) + if step.ExpectedErr == "" { + require.NoError(t, err) + testhelper.ProtoEqual(t, step.ExpectedEntry, entry) + } else { + require.Error(t, err) + require.Contains(t, err.Error(), step.ExpectedErr) + } case RepositoryAssertion: require.Contains(t, openTransactions, step.TransactionID, "test error: transaction's snapshot asserted before beginning it") transaction := openTransactions[step.TransactionID] diff --git a/internal/gitaly/storage/storagemgr/transaction_manager.go b/internal/gitaly/storage/storagemgr/transaction_manager.go index bf6e716f0604d66ada9309c4da8f8dd361fe67dc..8bad03b02beff89f4883863ec879b3de1f87f4c0 100644 --- a/internal/gitaly/storage/storagemgr/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/transaction_manager.go @@ -875,6 +875,8 @@ type LogConsumer interface { // LSNs are sent so that a newly initialized consumer is aware of the full range of // entries it can process. NotifyNewTransactions(storageName string, partitionID storage.PartitionID, lowWaterMark, highWaterMark storage.LSN) + // Close shuts down the log consumer. It is called when the partition manager shutdowns. + Close() } // LogManagerAccessor is the interface used by the LogManager coordinator. It is called by @@ -889,7 +891,7 @@ type LogManagerAccessor interface { // LogConsumerFactory returns a LogConsumer that requires a LogManagerAccessor for construction and // a function to close the LogConsumer. -type LogConsumerFactory func(LogManagerAccessor) (_ LogConsumer, cleanup func()) +type LogConsumerFactory func(LogManagerAccessor) LogConsumer // LogManager is the interface used on the consumer side of the integration. The consumer // has the ability to acknowledge transactions as having been processed with AcknowledgeTransaction. @@ -899,6 +901,8 @@ type LogManager interface { AcknowledgeTransaction(consumer LogConsumer, lsn storage.LSN) // GetTransactionPath returns the path of the log entry's root directory. GetTransactionPath(lsn storage.LSN) string + // ReadEntry returns the log entry object of the LSN. + ReadEntry(lsn storage.LSN) (*gitalypb.LogEntry, error) } // AcknowledgeTransaction acknowledges log entries up and including lsn as successfully processed @@ -919,6 +923,18 @@ func (mgr *TransactionManager) GetTransactionPath(lsn storage.LSN) string { return walFilesPathForLSN(mgr.stateDirectory, lsn) } +// ReadEntry returns the log entry object of the LSN. +func (mgr *TransactionManager) ReadEntry(lsn storage.LSN) (*gitalypb.LogEntry, error) { + if lsn < mgr.lowWaterMark() { + return nil, fmt.Errorf("requested log entry is gone") + } + entry, err := mgr.readLogEntry(lsn) + if err != nil { + return nil, fmt.Errorf("reading log entry: %w", err) + } + return entry, nil +} + // consumerPosition tracks the last LSN acknowledged for a consumer. type consumerPosition struct { // position is the last LSN acknowledged as completed by the consumer. @@ -2779,7 +2795,7 @@ func (mgr *TransactionManager) verifyReferences(ctx context.Context, transaction // to transaction operations. // // To ensure that we don't modify existing tables and autocompact, we lock the existing tables -// before applying the updates. This way the reftable backend willl only create new tables +// before applying the updates. This way the reftable backend will only create new tables func (mgr *TransactionManager) verifyReferencesWithGitForReftables( ctx context.Context, referenceTransactions []*gitalypb.LogEntry_ReferenceTransaction, diff --git a/internal/gitaly/storage/storagemgr/transaction_manager_consumer_test.go b/internal/gitaly/storage/storagemgr/transaction_manager_consumer_test.go index b9872f7b2e16d623b04b4a2b133fffbd0199fbd3..02702bb81717a8018f6740a57f38b90c9e994379 100644 --- a/internal/gitaly/storage/storagemgr/transaction_manager_consumer_test.go +++ b/internal/gitaly/storage/storagemgr/transaction_manager_consumer_test.go @@ -2,6 +2,7 @@ package storagemgr import ( "context" + "path/filepath" "testing" "gitlab.com/gitlab-org/gitaly/v16/internal/git" @@ -9,6 +10,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) func generateConsumerTests(t *testing.T, setup testTransactionSetup) []transactionTestCase { @@ -80,9 +82,35 @@ func generateConsumerTests(t *testing.T, setup testTransactionSetup) []transacti "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, }, }, + ConsumerReadEntry{ + LSN: 1, + ExpectedEntry: &gitalypb.LogEntry{ + RelativePath: setup.RelativePath, + Operations: []*gitalypb.LogEntry_Operation{ + {Operation: &gitalypb.LogEntry_Operation_CreateHardLink_{ + CreateHardLink: &gitalypb.LogEntry_Operation_CreateHardLink{ + SourcePath: []byte("1"), + DestinationPath: []byte(filepath.Join(setup.RelativePath, "refs/heads/main")), + }, + }}, + }, + ReferenceTransactions: []*gitalypb.LogEntry_ReferenceTransaction{ + {Changes: []*gitalypb.LogEntry_ReferenceTransaction_Change{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(setup.Commits.First.OID), + }, + }}, + }, + }, + }, ConsumerAcknowledge{ LSN: 1, }, + ConsumerReadEntry{ + LSN: 1, + ExpectedErr: "requested log entry is gone", + }, }, expectedState: StateAssertion{ Database: DatabaseState{ @@ -137,6 +165,14 @@ func generateConsumerTests(t *testing.T, setup testTransactionSetup) []transacti ConsumerAcknowledge{ LSN: 2, }, + ConsumerReadEntry{ + LSN: 1, + ExpectedErr: "requested log entry is gone", + }, + ConsumerReadEntry{ + LSN: 2, + ExpectedErr: "requested log entry is gone", + }, }, expectedState: StateAssertion{ Database: DatabaseState{