From 2307605ac7528bb9a9af744330a3eb9187713b51 Mon Sep 17 00:00:00 2001 From: Karthik Nayak Date: Tue, 25 Jul 2023 11:31:18 +0200 Subject: [PATCH 1/2] cache: Rename `objectReaders` to `objectContentReaders` In `ProcessCache` we keep a list of processes for `ObjectReaders` and `ObjectInfoReaders` respectively. We want to add a new list of processes for cat-file's `--batch-command`. To make way for this new set of processes, let's rename `objectReaders` to `objectContentReaders`. --- internal/git/catfile/cache.go | 20 ++++++++++---------- internal/git/catfile/cache_test.go | 16 ++++++++-------- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/internal/git/catfile/cache.go b/internal/git/catfile/cache.go index 3dd44707eb..94159cb01f 100644 --- a/internal/git/catfile/cache.go +++ b/internal/git/catfile/cache.go @@ -60,8 +60,8 @@ type ProcessCache struct { monitorTicker helper.Ticker monitorDone chan interface{} - objectReaders processes - objectInfoReaders processes + objectContentReaders processes + objectInfoReaders processes catfileCacheCounter *prometheus.CounterVec currentCatfileProcesses prometheus.Gauge @@ -82,7 +82,7 @@ func newCache(ttl time.Duration, maxLen int, monitorTicker helper.Ticker) *Proce processCache := &ProcessCache{ ttl: ttl, - objectReaders: processes{ + objectContentReaders: processes{ maxLen: maxLen, }, objectInfoReaders: processes{ @@ -149,7 +149,7 @@ func (c *ProcessCache) monitor() { for { select { case <-c.monitorTicker.C(): - c.objectReaders.EnforceTTL(time.Now()) + c.objectContentReaders.EnforceTTL(time.Now()) c.objectInfoReaders.EnforceTTL(time.Now()) c.monitorTicker.Reset() case <-c.monitorDone: @@ -182,13 +182,13 @@ func (c *ProcessCache) ObjectReader(ctx context.Context, repo git.RepositoryExec } if featureflag.CatfileBatchCommand.IsEnabled(ctx) && version.CatfileSupportsNulTerminatedOutput() { - cached, cancel, err = c.getOrCreateProcess(ctx, repo, &c.objectReaders, func(ctx context.Context) (cacheable, error) { + cached, cancel, err = c.getOrCreateProcess(ctx, repo, &c.objectContentReaders, func(ctx context.Context) (cacheable, error) { return newObjectReader(ctx, repo, c.catfileLookupCounter) - }, "catfile.ObjectReader") + }, "catfile.ObjectContentReader") } else { - cached, cancel, err = c.getOrCreateProcess(ctx, repo, &c.objectReaders, func(ctx context.Context) (cacheable, error) { + cached, cancel, err = c.getOrCreateProcess(ctx, repo, &c.objectContentReaders, func(ctx context.Context) (cacheable, error) { return newObjectContentReader(ctx, repo, c.catfileLookupCounter) - }, "catfile.ObjectReader") + }, "catfile.ObjectContentReader") } if err != nil { return nil, nil, err @@ -334,13 +334,13 @@ func (c *ProcessCache) getOrCreateProcess( } func (c *ProcessCache) reportCacheMembers() { - c.catfileCacheMembers.WithLabelValues("object_reader").Set(float64(c.objectReaders.EntryCount())) + c.catfileCacheMembers.WithLabelValues("object_content_reader").Set(float64(c.objectContentReaders.EntryCount())) c.catfileCacheMembers.WithLabelValues("object_info_reader").Set(float64(c.objectInfoReaders.EntryCount())) } // Evict evicts all cached processes from the cache. func (c *ProcessCache) Evict() { - c.objectReaders.Evict() + c.objectContentReaders.Evict() c.objectInfoReaders.Evict() } diff --git a/internal/git/catfile/cache_test.go b/internal/git/catfile/cache_test.go index 3bc0eecdc6..b0cc05a83e 100644 --- a/internal/git/catfile/cache_test.go +++ b/internal/git/catfile/cache_test.go @@ -185,10 +185,10 @@ func TestCache_autoExpiry(t *testing.T) { // Add a process that has expired already. key0 := mustCreateKey(t, "0", repo) value0, cancel := mustCreateCacheable(t, cfg, repo) - c.objectReaders.Add(key0, value0, time.Now().Add(-time.Millisecond), cancel) - requireProcessesValid(t, &c.objectReaders) + c.objectContentReaders.Add(key0, value0, time.Now().Add(-time.Millisecond), cancel) + requireProcessesValid(t, &c.objectContentReaders) - require.Contains(t, keys(t, &c.objectReaders), key0, "key should still be in map") + require.Contains(t, keys(t, &c.objectContentReaders), key0, "key should still be in map") require.False(t, value0.isClosed(), "value should not have been closed") // We need to tick thrice to get deterministic results: the first tick is discarded before @@ -199,7 +199,7 @@ func TestCache_autoExpiry(t *testing.T) { monitorTicker.Tick() monitorTicker.Tick() - require.Empty(t, keys(t, &c.objectReaders), "key should no longer be in map") + require.Empty(t, keys(t, &c.objectContentReaders), "key should no longer be in map") require.True(t, value0.isClosed(), "value should be closed after eviction") } @@ -232,7 +232,7 @@ func testCacheObjectReader(t *testing.T, ctx context.Context) { cancel() require.True(t, reader.isClosed()) - require.Empty(t, keys(t, &cache.objectReaders)) + require.Empty(t, keys(t, &cache.objectContentReaders)) }) t.Run("cacheable", func(t *testing.T) { @@ -250,7 +250,7 @@ func testCacheObjectReader(t *testing.T, ctx context.Context) { // cache and wait for the cache to collect it. cancel() - keys := keys(t, &cache.objectReaders) + keys := keys(t, &cache.objectContentReaders) require.Equal(t, []key{{ sessionID: "1", repoStorage: repo.GetStorageName(), @@ -280,7 +280,7 @@ func testCacheObjectReader(t *testing.T, ctx context.Context) { // Cancel the process such that it will be considered for return to the cache. cancel() - require.Empty(t, keys(t, &cache.objectReaders)) + require.Empty(t, keys(t, &cache.objectContentReaders)) // The process should be killed now, so reading the object must fail. _, err = io.ReadAll(object) @@ -304,7 +304,7 @@ func testCacheObjectReader(t *testing.T, ctx context.Context) { // Cancel the process such that it will be considered for return to the cache. cancel() - require.Empty(t, keys(t, &cache.objectReaders)) + require.Empty(t, keys(t, &cache.objectContentReaders)) }) } -- GitLab From 950ca5303b10f9da82612e5a4417e88302c7c44b Mon Sep 17 00:00:00 2001 From: Karthik Nayak Date: Tue, 25 Jul 2023 10:13:14 +0200 Subject: [PATCH 2/2] catfile: Use same process list when using `--batch-command` There are two types of catfile caches present, one for `ObjectReader` and one for `ObjectInfoReader`. When we introduced `--batch-command`, we didn't separate the caches for the old and new implementations. Thus, the newer `--batch-command` implementation, uses the same cache as the old implementation. This is a bug, since reverting the flag wouldn't switch to old implementation immediately since cache entries would still serve the newer implementation. There is also a bug/improvement wherein `--batch-command` can actually share the cache entry for both info/content retrieval. Let's fix this by introducing and using a new process list called `objectReaders`. Eventually we can merge the implementation's into one and users can then simply request content and info together. But this requires more changes, specifically on the queue, wherein, we'd need to expect that users can request info/content interchangeably. --- internal/git/catfile/cache.go | 15 +++++++++++---- internal/git/catfile/cache_test.go | 26 ++++++++++++++++++++++---- 2 files changed, 33 insertions(+), 8 deletions(-) diff --git a/internal/git/catfile/cache.go b/internal/git/catfile/cache.go index 94159cb01f..d44b0dcdf5 100644 --- a/internal/git/catfile/cache.go +++ b/internal/git/catfile/cache.go @@ -60,6 +60,7 @@ type ProcessCache struct { monitorTicker helper.Ticker monitorDone chan interface{} + objectReaders processes objectContentReaders processes objectInfoReaders processes @@ -88,6 +89,9 @@ func newCache(ttl time.Duration, maxLen int, monitorTicker helper.Ticker) *Proce objectInfoReaders: processes{ maxLen: maxLen, }, + objectReaders: processes{ + maxLen: maxLen, + }, catfileCacheCounter: prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "gitaly_catfile_cache_total", @@ -151,6 +155,7 @@ func (c *ProcessCache) monitor() { case <-c.monitorTicker.C(): c.objectContentReaders.EnforceTTL(time.Now()) c.objectInfoReaders.EnforceTTL(time.Now()) + c.objectReaders.EnforceTTL(time.Now()) c.monitorTicker.Reset() case <-c.monitorDone: close(c.monitorDone) @@ -182,9 +187,9 @@ func (c *ProcessCache) ObjectReader(ctx context.Context, repo git.RepositoryExec } if featureflag.CatfileBatchCommand.IsEnabled(ctx) && version.CatfileSupportsNulTerminatedOutput() { - cached, cancel, err = c.getOrCreateProcess(ctx, repo, &c.objectContentReaders, func(ctx context.Context) (cacheable, error) { + cached, cancel, err = c.getOrCreateProcess(ctx, repo, &c.objectReaders, func(ctx context.Context) (cacheable, error) { return newObjectReader(ctx, repo, c.catfileLookupCounter) - }, "catfile.ObjectContentReader") + }, "catfile.ObjectReader") } else { cached, cancel, err = c.getOrCreateProcess(ctx, repo, &c.objectContentReaders, func(ctx context.Context) (cacheable, error) { return newObjectContentReader(ctx, repo, c.catfileLookupCounter) @@ -214,9 +219,9 @@ func (c *ProcessCache) ObjectInfoReader(ctx context.Context, repo git.Repository } if featureflag.CatfileBatchCommand.IsEnabled(ctx) && version.CatfileSupportsNulTerminatedOutput() { - cached, cancel, err = c.getOrCreateProcess(ctx, repo, &c.objectInfoReaders, func(ctx context.Context) (cacheable, error) { + cached, cancel, err = c.getOrCreateProcess(ctx, repo, &c.objectReaders, func(ctx context.Context) (cacheable, error) { return newObjectReader(ctx, repo, c.catfileLookupCounter) - }, "catfile.ObjectInfoReader") + }, "catfile.ObjectReader") } else { cached, cancel, err = c.getOrCreateProcess(ctx, repo, &c.objectInfoReaders, func(ctx context.Context) (cacheable, error) { return newObjectInfoReader(ctx, repo, c.catfileLookupCounter) @@ -336,12 +341,14 @@ func (c *ProcessCache) getOrCreateProcess( func (c *ProcessCache) reportCacheMembers() { c.catfileCacheMembers.WithLabelValues("object_content_reader").Set(float64(c.objectContentReaders.EntryCount())) c.catfileCacheMembers.WithLabelValues("object_info_reader").Set(float64(c.objectInfoReaders.EntryCount())) + c.catfileCacheMembers.WithLabelValues("object_reader").Set(float64(c.objectReaders.EntryCount())) } // Evict evicts all cached processes from the cache. func (c *ProcessCache) Evict() { c.objectContentReaders.Evict() c.objectInfoReaders.Evict() + c.objectReaders.Evict() } func (c *ProcessCache) returnToCache(p *processes, cacheKey key, value cacheable, cancel func()) { diff --git a/internal/git/catfile/cache_test.go b/internal/git/catfile/cache_test.go index b0cc05a83e..26c0a439f4 100644 --- a/internal/git/catfile/cache_test.go +++ b/internal/git/catfile/cache_test.go @@ -219,6 +219,9 @@ func testCacheObjectReader(t *testing.T, ctx context.Context) { repoExecutor := newRepoExecutor(t, cfg, repo) + version, err := repoExecutor.GitVersion(ctx) + require.NoError(t, err) + cache := newCache(time.Hour, 10, helper.NewManualTicker()) defer cache.Stop() @@ -250,12 +253,18 @@ func testCacheObjectReader(t *testing.T, ctx context.Context) { // cache and wait for the cache to collect it. cancel() - keys := keys(t, &cache.objectContentReaders) + var allKeys []key + if featureflag.CatfileBatchCommand.IsEnabled(ctx) && version.CatfileSupportsNulTerminatedOutput() { + allKeys = keys(t, &cache.objectReaders) + } else { + allKeys = keys(t, &cache.objectContentReaders) + } + require.Equal(t, []key{{ sessionID: "1", repoStorage: repo.GetStorageName(), repoRelPath: repo.GetRelativePath(), - }}, keys) + }}, allKeys) // Assert that we can still read from the cached process. _, err = reader.Object(ctx, "refs/heads/main") @@ -324,6 +333,9 @@ func testCacheObjectInfoReader(t *testing.T, ctx context.Context) { repoExecutor := newRepoExecutor(t, cfg, repo) + version, err := repoExecutor.GitVersion(ctx) + require.NoError(t, err) + cache := newCache(time.Hour, 10, helper.NewManualTicker()) defer cache.Stop() @@ -354,12 +366,18 @@ func testCacheObjectInfoReader(t *testing.T, ctx context.Context) { // Cancel the process such it will be considered for return to the cache. cancel() - keys := keys(t, &cache.objectInfoReaders) + var allKeys []key + if featureflag.CatfileBatchCommand.IsEnabled(ctx) && version.CatfileSupportsNulTerminatedOutput() { + allKeys = keys(t, &cache.objectReaders) + } else { + allKeys = keys(t, &cache.objectInfoReaders) + } + require.Equal(t, []key{{ sessionID: "1", repoStorage: repo.GetStorageName(), repoRelPath: repo.GetRelativePath(), - }}, keys) + }}, allKeys) // Assert that we can still read from the cached process. _, err = reader.Info(ctx, "refs/heads/main") -- GitLab