diff --git a/internal/git/catfile/cache.go b/internal/git/catfile/cache.go index 3dd44707eb2708fe82c67be9dda57c0c44bdb8ce..d44b0dcdf5d3d133decb3ad2fffb61b7affdebb6 100644 --- a/internal/git/catfile/cache.go +++ b/internal/git/catfile/cache.go @@ -60,8 +60,9 @@ type ProcessCache struct { monitorTicker helper.Ticker monitorDone chan interface{} - objectReaders processes - objectInfoReaders processes + objectReaders processes + objectContentReaders processes + objectInfoReaders processes catfileCacheCounter *prometheus.CounterVec currentCatfileProcesses prometheus.Gauge @@ -82,12 +83,15 @@ func newCache(ttl time.Duration, maxLen int, monitorTicker helper.Ticker) *Proce processCache := &ProcessCache{ ttl: ttl, - objectReaders: processes{ + objectContentReaders: processes{ maxLen: maxLen, }, objectInfoReaders: processes{ maxLen: maxLen, }, + objectReaders: processes{ + maxLen: maxLen, + }, catfileCacheCounter: prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "gitaly_catfile_cache_total", @@ -149,8 +153,9 @@ func (c *ProcessCache) monitor() { for { select { case <-c.monitorTicker.C(): - c.objectReaders.EnforceTTL(time.Now()) + c.objectContentReaders.EnforceTTL(time.Now()) c.objectInfoReaders.EnforceTTL(time.Now()) + c.objectReaders.EnforceTTL(time.Now()) c.monitorTicker.Reset() case <-c.monitorDone: close(c.monitorDone) @@ -186,9 +191,9 @@ func (c *ProcessCache) ObjectReader(ctx context.Context, repo git.RepositoryExec return newObjectReader(ctx, repo, c.catfileLookupCounter) }, "catfile.ObjectReader") } else { - cached, cancel, err = c.getOrCreateProcess(ctx, repo, &c.objectReaders, func(ctx context.Context) (cacheable, error) { + cached, cancel, err = c.getOrCreateProcess(ctx, repo, &c.objectContentReaders, func(ctx context.Context) (cacheable, error) { return newObjectContentReader(ctx, repo, c.catfileLookupCounter) - }, "catfile.ObjectReader") + }, "catfile.ObjectContentReader") } if err != nil { return nil, nil, err @@ -214,9 +219,9 @@ func (c *ProcessCache) ObjectInfoReader(ctx context.Context, repo git.Repository } if featureflag.CatfileBatchCommand.IsEnabled(ctx) && version.CatfileSupportsNulTerminatedOutput() { - cached, cancel, err = c.getOrCreateProcess(ctx, repo, &c.objectInfoReaders, func(ctx context.Context) (cacheable, error) { + cached, cancel, err = c.getOrCreateProcess(ctx, repo, &c.objectReaders, func(ctx context.Context) (cacheable, error) { return newObjectReader(ctx, repo, c.catfileLookupCounter) - }, "catfile.ObjectInfoReader") + }, "catfile.ObjectReader") } else { cached, cancel, err = c.getOrCreateProcess(ctx, repo, &c.objectInfoReaders, func(ctx context.Context) (cacheable, error) { return newObjectInfoReader(ctx, repo, c.catfileLookupCounter) @@ -334,14 +339,16 @@ func (c *ProcessCache) getOrCreateProcess( } func (c *ProcessCache) reportCacheMembers() { - c.catfileCacheMembers.WithLabelValues("object_reader").Set(float64(c.objectReaders.EntryCount())) + c.catfileCacheMembers.WithLabelValues("object_content_reader").Set(float64(c.objectContentReaders.EntryCount())) c.catfileCacheMembers.WithLabelValues("object_info_reader").Set(float64(c.objectInfoReaders.EntryCount())) + c.catfileCacheMembers.WithLabelValues("object_reader").Set(float64(c.objectReaders.EntryCount())) } // Evict evicts all cached processes from the cache. func (c *ProcessCache) Evict() { - c.objectReaders.Evict() + c.objectContentReaders.Evict() c.objectInfoReaders.Evict() + c.objectReaders.Evict() } func (c *ProcessCache) returnToCache(p *processes, cacheKey key, value cacheable, cancel func()) { diff --git a/internal/git/catfile/cache_test.go b/internal/git/catfile/cache_test.go index 3bc0eecdc667866ac86afdf2f0775f5c12ad1752..26c0a439f42c05ee42cb962ef2f7ef7a035b71fc 100644 --- a/internal/git/catfile/cache_test.go +++ b/internal/git/catfile/cache_test.go @@ -185,10 +185,10 @@ func TestCache_autoExpiry(t *testing.T) { // Add a process that has expired already. key0 := mustCreateKey(t, "0", repo) value0, cancel := mustCreateCacheable(t, cfg, repo) - c.objectReaders.Add(key0, value0, time.Now().Add(-time.Millisecond), cancel) - requireProcessesValid(t, &c.objectReaders) + c.objectContentReaders.Add(key0, value0, time.Now().Add(-time.Millisecond), cancel) + requireProcessesValid(t, &c.objectContentReaders) - require.Contains(t, keys(t, &c.objectReaders), key0, "key should still be in map") + require.Contains(t, keys(t, &c.objectContentReaders), key0, "key should still be in map") require.False(t, value0.isClosed(), "value should not have been closed") // We need to tick thrice to get deterministic results: the first tick is discarded before @@ -199,7 +199,7 @@ func TestCache_autoExpiry(t *testing.T) { monitorTicker.Tick() monitorTicker.Tick() - require.Empty(t, keys(t, &c.objectReaders), "key should no longer be in map") + require.Empty(t, keys(t, &c.objectContentReaders), "key should no longer be in map") require.True(t, value0.isClosed(), "value should be closed after eviction") } @@ -219,6 +219,9 @@ func testCacheObjectReader(t *testing.T, ctx context.Context) { repoExecutor := newRepoExecutor(t, cfg, repo) + version, err := repoExecutor.GitVersion(ctx) + require.NoError(t, err) + cache := newCache(time.Hour, 10, helper.NewManualTicker()) defer cache.Stop() @@ -232,7 +235,7 @@ func testCacheObjectReader(t *testing.T, ctx context.Context) { cancel() require.True(t, reader.isClosed()) - require.Empty(t, keys(t, &cache.objectReaders)) + require.Empty(t, keys(t, &cache.objectContentReaders)) }) t.Run("cacheable", func(t *testing.T) { @@ -250,12 +253,18 @@ func testCacheObjectReader(t *testing.T, ctx context.Context) { // cache and wait for the cache to collect it. cancel() - keys := keys(t, &cache.objectReaders) + var allKeys []key + if featureflag.CatfileBatchCommand.IsEnabled(ctx) && version.CatfileSupportsNulTerminatedOutput() { + allKeys = keys(t, &cache.objectReaders) + } else { + allKeys = keys(t, &cache.objectContentReaders) + } + require.Equal(t, []key{{ sessionID: "1", repoStorage: repo.GetStorageName(), repoRelPath: repo.GetRelativePath(), - }}, keys) + }}, allKeys) // Assert that we can still read from the cached process. _, err = reader.Object(ctx, "refs/heads/main") @@ -280,7 +289,7 @@ func testCacheObjectReader(t *testing.T, ctx context.Context) { // Cancel the process such that it will be considered for return to the cache. cancel() - require.Empty(t, keys(t, &cache.objectReaders)) + require.Empty(t, keys(t, &cache.objectContentReaders)) // The process should be killed now, so reading the object must fail. _, err = io.ReadAll(object) @@ -304,7 +313,7 @@ func testCacheObjectReader(t *testing.T, ctx context.Context) { // Cancel the process such that it will be considered for return to the cache. cancel() - require.Empty(t, keys(t, &cache.objectReaders)) + require.Empty(t, keys(t, &cache.objectContentReaders)) }) } @@ -324,6 +333,9 @@ func testCacheObjectInfoReader(t *testing.T, ctx context.Context) { repoExecutor := newRepoExecutor(t, cfg, repo) + version, err := repoExecutor.GitVersion(ctx) + require.NoError(t, err) + cache := newCache(time.Hour, 10, helper.NewManualTicker()) defer cache.Stop() @@ -354,12 +366,18 @@ func testCacheObjectInfoReader(t *testing.T, ctx context.Context) { // Cancel the process such it will be considered for return to the cache. cancel() - keys := keys(t, &cache.objectInfoReaders) + var allKeys []key + if featureflag.CatfileBatchCommand.IsEnabled(ctx) && version.CatfileSupportsNulTerminatedOutput() { + allKeys = keys(t, &cache.objectReaders) + } else { + allKeys = keys(t, &cache.objectInfoReaders) + } + require.Equal(t, []key{{ sessionID: "1", repoStorage: repo.GetStorageName(), repoRelPath: repo.GetRelativePath(), - }}, keys) + }}, allKeys) // Assert that we can still read from the cached process. _, err = reader.Info(ctx, "refs/heads/main")