From 7724d223cd87f5405da5a0801918c22221792b6e Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Sun, 28 Jul 2024 14:00:46 +0700 Subject: [PATCH 1/4] storagemgr: Add ReadEntry function to LogConsumer That function lets the consumer reads a particular log entry at a LSN when it consumes the log entry. --- internal/backup/log_entry_test.go | 5 +++ .../storage/storagemgr/testhelper_test.go | 19 ++++++++++ .../storage/storagemgr/transaction_manager.go | 14 ++++++++ .../transaction_manager_consumer_test.go | 36 +++++++++++++++++++ 4 files changed, 74 insertions(+) diff --git a/internal/backup/log_entry_test.go b/internal/backup/log_entry_test.go index dffedfd45c..45911641e0 100644 --- a/internal/backup/log_entry_test.go +++ b/internal/backup/log_entry_test.go @@ -20,6 +20,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" "gitlab.com/gitlab-org/gitaly/v16/internal/helper" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) type mockLogManager struct { @@ -70,6 +71,10 @@ func (lm *mockLogManager) AcknowledgeTransaction(_ storagemgr.LogConsumer, lsn s } } +func (lm *mockLogManager) ReadEntry(lsn storage.LSN) (*gitalypb.LogEntry, error) { + return nil, nil +} + func (lm *mockLogManager) SendNotification() { n := lm.notifications[0] lm.archiver.NotifyNewTransactions(lm.partitionInfo.storageName, lm.partitionInfo.partitionID, n.lowWaterMark, n.highWaterMark) diff --git a/internal/gitaly/storage/storagemgr/testhelper_test.go b/internal/gitaly/storage/storagemgr/testhelper_test.go index 9171c431df..4d5055b8de 100644 --- a/internal/gitaly/storage/storagemgr/testhelper_test.go +++ b/internal/gitaly/storage/storagemgr/testhelper_test.go @@ -842,6 +842,16 @@ type ConsumerAcknowledge struct { LSN storage.LSN } +// ConsumerReadEntry asserts the entry returned to the consumer at a point of time. +type ConsumerReadEntry struct { + // LSN is the target LSN by the consumers. + LSN storage.LSN + // ExpectedEntry is the expected returned log entry. + ExpectedEntry *gitalypb.LogEntry + // ExpectedErr is the expected returned error string. + ExpectedErr string +} + // RemoveRepository removes the repository from the disk. It must be run with the TransactionManager // closed. type RemoveRepository struct{} @@ -1354,6 +1364,15 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas transaction.WriteCommitGraphs(step.Config) case ConsumerAcknowledge: transactionManager.AcknowledgeTransaction(transactionManager.consumer, step.LSN) + case ConsumerReadEntry: + entry, err := transactionManager.ReadEntry(step.LSN) + if step.ExpectedErr == "" { + require.NoError(t, err) + testhelper.ProtoEqual(t, step.ExpectedEntry, entry) + } else { + require.Error(t, err) + require.Contains(t, err.Error(), step.ExpectedErr) + } case RepositoryAssertion: require.Contains(t, openTransactions, step.TransactionID, "test error: transaction's snapshot asserted before beginning it") transaction := openTransactions[step.TransactionID] diff --git a/internal/gitaly/storage/storagemgr/transaction_manager.go b/internal/gitaly/storage/storagemgr/transaction_manager.go index bf6e716f06..36ea859a93 100644 --- a/internal/gitaly/storage/storagemgr/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/transaction_manager.go @@ -899,6 +899,8 @@ type LogManager interface { AcknowledgeTransaction(consumer LogConsumer, lsn storage.LSN) // GetTransactionPath returns the path of the log entry's root directory. GetTransactionPath(lsn storage.LSN) string + // ReadEntry returns the log entry object of the LSN. + ReadEntry(lsn storage.LSN) (*gitalypb.LogEntry, error) } // AcknowledgeTransaction acknowledges log entries up and including lsn as successfully processed @@ -919,6 +921,18 @@ func (mgr *TransactionManager) GetTransactionPath(lsn storage.LSN) string { return walFilesPathForLSN(mgr.stateDirectory, lsn) } +// ReadEntry returns the log entry object of the LSN. +func (mgr *TransactionManager) ReadEntry(lsn storage.LSN) (*gitalypb.LogEntry, error) { + if lsn < mgr.lowWaterMark() { + return nil, fmt.Errorf("requested log entry is gone") + } + entry, err := mgr.readLogEntry(lsn) + if err != nil { + return nil, fmt.Errorf("reading log entry: %w", err) + } + return entry, nil +} + // consumerPosition tracks the last LSN acknowledged for a consumer. type consumerPosition struct { // position is the last LSN acknowledged as completed by the consumer. diff --git a/internal/gitaly/storage/storagemgr/transaction_manager_consumer_test.go b/internal/gitaly/storage/storagemgr/transaction_manager_consumer_test.go index b9872f7b2e..02702bb817 100644 --- a/internal/gitaly/storage/storagemgr/transaction_manager_consumer_test.go +++ b/internal/gitaly/storage/storagemgr/transaction_manager_consumer_test.go @@ -2,6 +2,7 @@ package storagemgr import ( "context" + "path/filepath" "testing" "gitlab.com/gitlab-org/gitaly/v16/internal/git" @@ -9,6 +10,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) func generateConsumerTests(t *testing.T, setup testTransactionSetup) []transactionTestCase { @@ -80,9 +82,35 @@ func generateConsumerTests(t *testing.T, setup testTransactionSetup) []transacti "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, }, }, + ConsumerReadEntry{ + LSN: 1, + ExpectedEntry: &gitalypb.LogEntry{ + RelativePath: setup.RelativePath, + Operations: []*gitalypb.LogEntry_Operation{ + {Operation: &gitalypb.LogEntry_Operation_CreateHardLink_{ + CreateHardLink: &gitalypb.LogEntry_Operation_CreateHardLink{ + SourcePath: []byte("1"), + DestinationPath: []byte(filepath.Join(setup.RelativePath, "refs/heads/main")), + }, + }}, + }, + ReferenceTransactions: []*gitalypb.LogEntry_ReferenceTransaction{ + {Changes: []*gitalypb.LogEntry_ReferenceTransaction_Change{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(setup.Commits.First.OID), + }, + }}, + }, + }, + }, ConsumerAcknowledge{ LSN: 1, }, + ConsumerReadEntry{ + LSN: 1, + ExpectedErr: "requested log entry is gone", + }, }, expectedState: StateAssertion{ Database: DatabaseState{ @@ -137,6 +165,14 @@ func generateConsumerTests(t *testing.T, setup testTransactionSetup) []transacti ConsumerAcknowledge{ LSN: 2, }, + ConsumerReadEntry{ + LSN: 1, + ExpectedErr: "requested log entry is gone", + }, + ConsumerReadEntry{ + LSN: 2, + ExpectedErr: "requested log entry is gone", + }, }, expectedState: StateAssertion{ Database: DatabaseState{ -- GitLab From 0f60d4c8fabaef3dcf1d157d98b70ba957f1c002 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Sun, 28 Jul 2024 14:22:01 +0700 Subject: [PATCH 2/4] storagemgr: Simplify log consumer factory Recently, the PartitionManager triggers a factory to create a log consumer with corresponding arguments. The factory returns a log consumer object and a clean up function. This commit simplifies the log consumer factory so that it returns the log consumer object only. The partition manager triggers the cleanup method directly from that consumer instead. --- internal/cli/gitaly/serve.go | 4 ++-- .../storage/storagemgr/partition_manager.go | 22 +++++++++++-------- .../storage/storagemgr/testhelper_test.go | 2 ++ .../storage/storagemgr/transaction_manager.go | 4 +++- 4 files changed, 20 insertions(+), 12 deletions(-) diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go index d94704aac6..4ec5b970f9 100644 --- a/internal/cli/gitaly/serve.go +++ b/internal/cli/gitaly/serve.go @@ -392,12 +392,12 @@ func run(appCtx *cli.Context, cfg config.Cfg, logger log.Logger) error { return fmt.Errorf("resolving write-ahead log backup sink: %w", err) } - consumerFactory = func(lma storagemgr.LogManagerAccessor) (storagemgr.LogConsumer, func()) { + consumerFactory = func(lma storagemgr.LogManagerAccessor) storagemgr.LogConsumer { walArchiver := backup.NewLogEntryArchiver(logger, walSink, cfg.Backup.WALWorkerCount, lma) prometheus.MustRegister(walArchiver) walArchiver.Run() - return walArchiver, walArchiver.Close + return walArchiver } } diff --git a/internal/gitaly/storage/storagemgr/partition_manager.go b/internal/gitaly/storage/storagemgr/partition_manager.go index 0aa49fcaf4..dca012bc44 100644 --- a/internal/gitaly/storage/storagemgr/partition_manager.go +++ b/internal/gitaly/storage/storagemgr/partition_manager.go @@ -54,8 +54,10 @@ type PartitionManager struct { // transactionManagerFactory is a factory to create TransactionManagers. This shouldn't ever be changed // during normal operation, but can be used to adjust the transaction manager's behaviour in tests. transactionManagerFactory transactionManagerFactory - // consumerCleanup closes the LogConsumer. - consumerCleanup func() + // consumer is a hook that will be passed to the each TransactionManager. The Transactionmanager + // notifies the consumer of new transactions by invoking the NotifyNewTransaction method after + // they are committed. + consumer LogConsumer // metrics accounts for all metrics of transaction operations. It will be // passed down to each transaction manager and is shared between them. The // metrics must be registered to be collected by prometheus collector. @@ -284,13 +286,10 @@ func NewPartitionManager( metrics: metrics, } - var logConsumer LogConsumer - var cleanup func() if consumerFactory != nil { - logConsumer, cleanup = consumerFactory(pm) + pm.consumer = consumerFactory(pm) } - pm.consumerCleanup = cleanup pm.transactionManagerFactory = func( logger log.Logger, partitionID storage.PartitionID, @@ -312,7 +311,7 @@ func NewPartitionManager( metrics.housekeeping, metrics.snapshot.Scope(storageMgr.name), ), - logConsumer, + pm.consumer, ) } @@ -449,6 +448,11 @@ func (pm *PartitionManager) CallLogManager(ctx context.Context, storageName stri return nil } +// GetLogConsumer returns the registered log consumer if any. +func (pm *PartitionManager) GetLogConsumer() LogConsumer { + return pm.consumer +} + // StorageKV executes the provided function against the storage's metadata DB. All write operations // issued by the function are committed in an atomic fashion. All read operations are performed // against a snapshot of the database. @@ -603,8 +607,8 @@ func (pm *PartitionManager) Close() { }() } - if pm.consumerCleanup != nil { - pm.consumerCleanup() + if pm.consumer != nil { + pm.consumer.Close() } activeStorages.Wait() diff --git a/internal/gitaly/storage/storagemgr/testhelper_test.go b/internal/gitaly/storage/storagemgr/testhelper_test.go index 4d5055b8de..39d021e440 100644 --- a/internal/gitaly/storage/storagemgr/testhelper_test.go +++ b/internal/gitaly/storage/storagemgr/testhelper_test.go @@ -907,6 +907,8 @@ func (lc *MockLogConsumer) NotifyNewTransactions(storageName string, partitionID lc.highWaterMark = highWaterMark } +func (lc *MockLogConsumer) Close() {} + // ConsumerState is used to track the log positions received by the consumer and the corresponding // acknowledgements from the consumer to the manager. We deliberately do not track the LowWaterMark // sent to consumers as this is non-deterministic. diff --git a/internal/gitaly/storage/storagemgr/transaction_manager.go b/internal/gitaly/storage/storagemgr/transaction_manager.go index 36ea859a93..dce3524535 100644 --- a/internal/gitaly/storage/storagemgr/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/transaction_manager.go @@ -875,6 +875,8 @@ type LogConsumer interface { // LSNs are sent so that a newly initialized consumer is aware of the full range of // entries it can process. NotifyNewTransactions(storageName string, partitionID storage.PartitionID, lowWaterMark, highWaterMark storage.LSN) + // Close shuts down the log consumer. It is called when the partition manager shutdowns. + Close() } // LogManagerAccessor is the interface used by the LogManager coordinator. It is called by @@ -889,7 +891,7 @@ type LogManagerAccessor interface { // LogConsumerFactory returns a LogConsumer that requires a LogManagerAccessor for construction and // a function to close the LogConsumer. -type LogConsumerFactory func(LogManagerAccessor) (_ LogConsumer, cleanup func()) +type LogConsumerFactory func(LogManagerAccessor) LogConsumer // LogManager is the interface used on the consumer side of the integration. The consumer // has the ability to acknowledge transactions as having been processed with AcknowledgeTransaction. -- GitLab From 5cf98af9f0c8de5fdc2a72ef5c81bd6c5117b565 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Sun, 28 Jul 2024 23:15:21 +0700 Subject: [PATCH 3/4] raft: Replace "raft_" log prefix to "raft." Recently, all loggers inside Raft package have "raft_" prefix. It works well on the local environment. However, on production, the logs are not grouped in log aggregation systems such as Kibana. This commit replaces all "raft_" log prefixes to "raft". --- internal/gitaly/storage/raft/logger.go | 2 +- internal/gitaly/storage/raft/manager.go | 12 ++++++------ internal/gitaly/storage/raft/metadata_group.go | 6 +++--- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/internal/gitaly/storage/raft/logger.go b/internal/gitaly/storage/raft/logger.go index 733506e22f..70d1c81f75 100644 --- a/internal/gitaly/storage/raft/logger.go +++ b/internal/gitaly/storage/raft/logger.go @@ -73,7 +73,7 @@ func SetLogger(logger log.Logger, suppressDefaultLog bool) { name: pkgName, Logger: logger.WithFields(log.Fields{ "component": "raft", - "raft_component": strings.ToLower(pkgName), + "raft.component": strings.ToLower(pkgName), }), } }) diff --git a/internal/gitaly/storage/raft/manager.go b/internal/gitaly/storage/raft/manager.go index 6a068013da..14a708c6b5 100644 --- a/internal/gitaly/storage/raft/manager.go +++ b/internal/gitaly/storage/raft/manager.go @@ -79,9 +79,9 @@ func NewManager( managerConfig: managerCfg, logger: logger.WithFields(log.Fields{ "component": "raft", - "raft_component": "manager", - "raft_cluster_id": clusterCfg.ClusterID, - "raft_node_id": clusterCfg.NodeID, + "raft.component": "manager", + "raft.cluster_id": clusterCfg.ClusterID, + "raft.node_id": clusterCfg.NodeID, }), storageManagers: map[string]*storageManager{}, } @@ -96,7 +96,7 @@ func NewManager( DefaultNodeRegistryEnabled: false, EnableMetrics: true, RaftEventListener: &raftLogger{ - Logger: m.logger.WithField("raft_component", "system"), + Logger: m.logger.WithField("raft.component", "system"), }, Expert: managerCfg.expertConfig, }) @@ -138,8 +138,8 @@ func (m *Manager) Start() (returnedErr error) { }() m.logger.WithFields(log.Fields{ - "raft_config": m.clusterConfig, - "raft_manager_conf": m.managerConfig, + "raft.config": m.clusterConfig, + "raft.manager_conf": m.managerConfig, }).Info("Raft cluster is starting") // A Gitaly node contains multiple independent storages, and each storage maps to a dragonboat diff --git a/internal/gitaly/storage/raft/metadata_group.go b/internal/gitaly/storage/raft/metadata_group.go index 545bd6103e..bd69b8cc7a 100644 --- a/internal/gitaly/storage/raft/metadata_group.go +++ b/internal/gitaly/storage/raft/metadata_group.go @@ -61,9 +61,9 @@ func newMetadataRaftGroup(ctx context.Context, nodeHost *dragonboat.NodeHost, db } groupLogger := logger.WithFields(log.Fields{ - "raft_group": "metadata", - "raft_group_id": MetadataGroupID, - "raft_replica_id": clusterCfg.NodeID, + "raft.group": "metadata", + "raft.group_id": MetadataGroupID, + "raft.replica_id": clusterCfg.NodeID, }) groupLogger.Info("joined metadata group") -- GitLab From 05a5a19c7c06a4f0fbcbd0be6e6c9fff178502d7 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Thu, 1 Aug 2024 16:39:40 +0700 Subject: [PATCH 4/4] storagemgr: Fix a small typo --- internal/gitaly/storage/storagemgr/transaction_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/gitaly/storage/storagemgr/transaction_manager.go b/internal/gitaly/storage/storagemgr/transaction_manager.go index dce3524535..8bad03b02b 100644 --- a/internal/gitaly/storage/storagemgr/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/transaction_manager.go @@ -2795,7 +2795,7 @@ func (mgr *TransactionManager) verifyReferences(ctx context.Context, transaction // to transaction operations. // // To ensure that we don't modify existing tables and autocompact, we lock the existing tables -// before applying the updates. This way the reftable backend willl only create new tables +// before applying the updates. This way the reftable backend will only create new tables func (mgr *TransactionManager) verifyReferencesWithGitForReftables( ctx context.Context, referenceTransactions []*gitalypb.LogEntry_ReferenceTransaction, -- GitLab