diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index f62607ab4b6df27eed2820012599cc91b9ad768b..398dadd283e95076d2e7a354f090a6edbb56ef1e 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -203,7 +203,7 @@ test: # using bundled Git binaries. - GO_VERSION: [ "1.18", "1.19" ] TEST_TARGET: test - - TEST_TARGET: [ test-with-praefect, race-go ] + - TEST_TARGET: [ test-with-praefect, race-go, test-wal, test-with-praefect-wal] # We also verify that things work as expected with a non-bundled Git # version matching our minimum required Git version. - TEST_TARGET: test diff --git a/Makefile b/Makefile index cb8e541403b20188ad9f305f04b3b5ddb8f15e4e..10cb3a293f6e2803f5967a860f998353471f718e 100644 --- a/Makefile +++ b/Makefile @@ -400,6 +400,16 @@ bench: ${BENCHMARK_REPO} prepare-tests test-with-praefect: prepare-tests ${Q}GITALY_TEST_WITH_PRAEFECT=YesPlease $(call run_go_tests) +.PHONY: test-wal +## Run Go tests with write-ahead logging enabled. +test-wal: export GITALY_TEST_WAL = YesPlease +test-wal: test + +.PHONY: test-with-praefect-wal +## Run Go tests with write-ahead logging and Praefect enabled. +test-with-praefect-wal: export GITALY_TEST_WAL = YesPlease +test-with-praefect-wal: test-with-praefect + .PHONY: race-go ## Run Go tests with race detection enabled. race-go: override TEST_OPTIONS := ${TEST_OPTIONS} -race diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go index 120f3fdb0b066dc5da91e84a554a2d989b208c98..5f069fd0edf23822d9da3fdd5352f30575290297 100644 --- a/internal/cli/gitaly/serve.go +++ b/internal/cli/gitaly/serve.go @@ -24,12 +24,12 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/housekeeping" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/git/repository" - "gitlab.com/gitlab-org/gitaly/v16/internal/git/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/git2go" internalclient "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/client" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config/sentry" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/maintenance" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/server" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service" diff --git a/internal/gitaly/hook/custom.go b/internal/gitaly/hook/custom.go index 6ccf968a060aeca5ca39697bd1ec43e973d98cd5..35082881d4fce7923ac8a4b678efd151458cfce3 100644 --- a/internal/gitaly/hook/custom.go +++ b/internal/gitaly/hook/custom.go @@ -11,6 +11,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/command" "gitlab.com/gitlab-org/gitaly/v16/internal/git" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/env" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" "golang.org/x/sys/unix" @@ -37,27 +38,38 @@ func (e CustomHookError) Unwrap() error { return e.err } -// newCustomHooksExecutor creates a new hooks executor for custom hooks. Hooks -// are looked up and executed in the following order: +// newCustomHooksExecutor creates a new hooks executor for custom hooks. // -// 1. .git/custom_hooks/ - per project hook -// 2. .git/custom_hooks/.d/* - per project hooks +// Repository specific custom hooks are executed by default from `.git/custom_hooks`. +// This is the non-WAL behavior where hooks are updated in place. With WAL, the hooks are +// managed with MVCC. If the transaction is set, we'll use the hook path from its snapshot +// to ensure we execute the correct version of the hooks. +// +// Hooks are looked up and executed in the following order: +// +// 1. / - per project hook +// 2. /.d/* - per project hooks // 3. /hooks/.d/* - global hooks // // Any files which are either not executable or have a trailing `~` are ignored. -func (m *GitLabHookManager) newCustomHooksExecutor(repo *gitalypb.Repository, hookName string) (customHooksExecutor, error) { +func (m *GitLabHookManager) newCustomHooksExecutor(tx *gitaly.Transaction, repo *gitalypb.Repository, hookName string) (customHooksExecutor, error) { repoPath, err := m.locator.GetRepoPath(repo) if err != nil { return nil, err } + hookPath := filepath.Join(repoPath, "custom_hooks") + if tx != nil { + hookPath = tx.Snapshot().HookPath + } + var hookFiles []string - projectCustomHookFile := filepath.Join(repoPath, "custom_hooks", hookName) + projectCustomHookFile := filepath.Join(hookPath, hookName) if isValidHook(projectCustomHookFile) { hookFiles = append(hookFiles, projectCustomHookFile) } - projectCustomHookDir := filepath.Join(repoPath, "custom_hooks", fmt.Sprintf("%s.d", hookName)) + projectCustomHookDir := filepath.Join(hookPath, fmt.Sprintf("%s.d", hookName)) files, err := findHooks(projectCustomHookDir) if err != nil { return nil, err diff --git a/internal/gitaly/hook/custom_test.go b/internal/gitaly/hook/custom_test.go index 4df9c6732ae1cf6fcd63831a93ccf030199b98ea..930a41cbe1f2b8f75f0b83bcf0b280817ffe6c77 100644 --- a/internal/gitaly/hook/custom_test.go +++ b/internal/gitaly/hook/custom_test.go @@ -165,7 +165,7 @@ func TestCustomHookPartialFailure(t *testing.T) { locator: config.NewLocator(cfg), } - caller, err := mgr.newCustomHooksExecutor(repo, tc.hook) + caller, err := mgr.newCustomHooksExecutor(nil, repo, tc.hook) require.NoError(t, err) var stdout, stderr bytes.Buffer @@ -224,7 +224,7 @@ func TestCustomHooksMultipleHooks(t *testing.T) { cfg: cfg, locator: config.NewLocator(cfg), } - hooksExecutor, err := mgr.newCustomHooksExecutor(repo, "update") + hooksExecutor, err := mgr.newCustomHooksExecutor(nil, repo, "update") require.NoError(t, err) var stdout, stderr bytes.Buffer @@ -297,7 +297,7 @@ func TestCustomHooksWithSymlinks(t *testing.T) { cfg: cfg, locator: config.NewLocator(cfg), } - hooksExecutor, err := mgr.newCustomHooksExecutor(repo, "update") + hooksExecutor, err := mgr.newCustomHooksExecutor(nil, repo, "update") require.NoError(t, err) var stdout, stderr bytes.Buffer @@ -331,7 +331,7 @@ func TestMultilineStdin(t *testing.T) { locator: config.NewLocator(cfg), } - hooksExecutor, err := mgr.newCustomHooksExecutor(repo, "pre-receive") + hooksExecutor, err := mgr.newCustomHooksExecutor(nil, repo, "pre-receive") require.NoError(t, err) changes := `old1 new1 ref1 @@ -370,7 +370,7 @@ func TestMultipleScriptsStdin(t *testing.T) { locator: config.NewLocator(cfg), } - hooksExecutor, err := mgr.newCustomHooksExecutor(repo, "pre-receive") + hooksExecutor, err := mgr.newCustomHooksExecutor(nil, repo, "pre-receive") require.NoError(t, err) changes := "oldref11 newref00 ref123445" @@ -399,7 +399,7 @@ func callAndVerifyHooks(t *testing.T, cfg config.Cfg, locator storage.Locator, r locator: locator, } - callHooks, err := mgr.newCustomHooksExecutor(repo, hookName) + callHooks, err := mgr.newCustomHooksExecutor(nil, repo, hookName) require.NoError(t, err) require.NoError(t, callHooks(ctx, args, env, bytes.NewBufferString(stdin), &stdout, &stderr)) diff --git a/internal/gitaly/hook/disabled_manager.go b/internal/gitaly/hook/disabled_manager.go index e1dee47576ffb3cb99cd421a258b5d07e6a896ff..7255f0bb5add5064f407b41d6382aa600962f533 100644 --- a/internal/gitaly/hook/disabled_manager.go +++ b/internal/gitaly/hook/disabled_manager.go @@ -4,6 +4,7 @@ import ( "context" "io" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) @@ -11,17 +12,17 @@ import ( type DisabledManager struct{} // PreReceiveHook ignores its parameters and returns a nil error. -func (DisabledManager) PreReceiveHook(context.Context, *gitalypb.Repository, []string, []string, io.Reader, io.Writer, io.Writer) error { +func (DisabledManager) PreReceiveHook(context.Context, *gitaly.Transaction, *gitalypb.Repository, []string, []string, io.Reader, io.Writer, io.Writer) error { return nil } // PostReceiveHook ignores its parameters and returns a nil error. -func (DisabledManager) PostReceiveHook(context.Context, *gitalypb.Repository, []string, []string, io.Reader, io.Writer, io.Writer) error { +func (DisabledManager) PostReceiveHook(context.Context, *gitaly.Transaction, *gitalypb.Repository, []string, []string, io.Reader, io.Writer, io.Writer) error { return nil } // UpdateHook ignores its parameters and returns a nil error. -func (DisabledManager) UpdateHook(context.Context, *gitalypb.Repository, string, string, string, []string, io.Writer, io.Writer) error { +func (DisabledManager) UpdateHook(context.Context, *gitaly.Transaction, *gitalypb.Repository, string, string, string, []string, io.Writer, io.Writer) error { return nil } diff --git a/internal/gitaly/hook/manager.go b/internal/gitaly/hook/manager.go index b021d99090633bd2c990df2390f88b0804f416f2..6d1b2193e0fc3ae3b1a3e4e25db827410506cb7f 100644 --- a/internal/gitaly/hook/manager.go +++ b/internal/gitaly/hook/manager.go @@ -5,6 +5,7 @@ import ( "io" "gitlab.com/gitlab-org/gitaly/v16/internal/git" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" @@ -32,15 +33,15 @@ const ( type Manager interface { // PreReceiveHook executes the pre-receive Git hook and any installed custom hooks. stdin // must contain all references to be updated and match the format specified in githooks(5). - PreReceiveHook(ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error + PreReceiveHook(ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error // PostReceiveHook executes the post-receive Git hook and any installed custom hooks. stdin // must contain all references to be updated and match the format specified in githooks(5). - PostReceiveHook(ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error + PostReceiveHook(ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error // UpdateHook executes the update Git hook and any installed custom hooks for the reference // `ref` getting updated from `oldValue` to `newValue`. - UpdateHook(ctx context.Context, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error + UpdateHook(ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error // ReferenceTransactionHook executes the reference-transaction Git hook. stdin must contain // all references to be updated and match the format specified in githooks(5). diff --git a/internal/gitaly/hook/manager_mock.go b/internal/gitaly/hook/manager_mock.go index 51a7715e7b87f6072f1724a1face39d144e54059..b76b5eaab8b3a16139d82b4662c60ca3ad5ebe07 100644 --- a/internal/gitaly/hook/manager_mock.go +++ b/internal/gitaly/hook/manager_mock.go @@ -6,31 +6,32 @@ import ( "testing" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) // MockManager mocks the Manager interface for Git hooks (e.g. pre-receive, post-receive) type MockManager struct { t *testing.T - preReceive func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error - postReceive func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error - update func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error + preReceive func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error + postReceive func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error + update func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error referenceTransaction func(t *testing.T, ctx context.Context, state ReferenceTransactionState, env []string, stdin io.Reader) error } var ( // NopPreReceive does nothing for the pre-receive hook - NopPreReceive = func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { + NopPreReceive = func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { return nil } // NopPostReceive does nothing for the post-receive hook - NopPostReceive = func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { + NopPostReceive = func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { return nil } // NopUpdate does nothing for the update hook - NopUpdate = func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error { + NopUpdate = func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error { return nil } @@ -43,9 +44,9 @@ var ( // NewMockManager returns a mocked hook Manager with the stubbed functions func NewMockManager( t *testing.T, - preReceive func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error, - postReceive func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error, - update func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error, + preReceive func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error, + postReceive func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error, + update func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error, referenceTransaction func(t *testing.T, ctx context.Context, state ReferenceTransactionState, env []string, stdin io.Reader) error, ) Manager { return &MockManager{ @@ -58,24 +59,24 @@ func NewMockManager( } // PreReceiveHook executes the mocked pre-receive hook -func (m *MockManager) PreReceiveHook(ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { +func (m *MockManager) PreReceiveHook(ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { require.NotNil(m.t, m.preReceive, "preReceive not implemented") - return m.preReceive(m.t, ctx, repo, pushOptions, env, stdin, stdout, stderr) + return m.preReceive(m.t, ctx, tx, repo, pushOptions, env, stdin, stdout, stderr) } // PostReceiveHook executes the mocked post-receive hook -func (m *MockManager) PostReceiveHook(ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { +func (m *MockManager) PostReceiveHook(ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { require.NotNil(m.t, m.postReceive, "postReceive not implemented") - return m.postReceive(m.t, ctx, repo, pushOptions, env, stdin, stdout, stderr) + return m.postReceive(m.t, ctx, tx, repo, pushOptions, env, stdin, stdout, stderr) } // UpdateHook executes the mocked update hook -func (m *MockManager) UpdateHook(ctx context.Context, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error { +func (m *MockManager) UpdateHook(ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error { require.NotNil(m.t, m.update, "update not implemented") - return m.update(m.t, ctx, repo, ref, oldValue, newValue, env, stdout, stderr) + return m.update(m.t, ctx, tx, repo, ref, oldValue, newValue, env, stdout, stderr) } // ReferenceTransactionHook executes the mocked reference transaction hook diff --git a/internal/gitaly/hook/postreceive.go b/internal/gitaly/hook/postreceive.go index 094d09664b293240087b6ea10eaf4b5d369ffba9..b5847c45d28e6b1d26d8a8765499c4e6fdd33e07 100644 --- a/internal/gitaly/hook/postreceive.go +++ b/internal/gitaly/hook/postreceive.go @@ -11,6 +11,7 @@ import ( "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" "gitlab.com/gitlab-org/gitaly/v16/internal/git" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly" "gitlab.com/gitlab-org/gitaly/v16/internal/gitlab" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" @@ -106,7 +107,7 @@ func printAlert(m gitlab.PostReceiveMessage, w io.Writer) error { } //nolint:revive // This is unintentionally missing documentation. -func (m *GitLabHookManager) PostReceiveHook(ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { +func (m *GitLabHookManager) PostReceiveHook(ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { payload, err := git.HooksPayloadFromEnv(env) if err != nil { return structerr.NewInternal("extracting hooks payload: %w", err) @@ -118,7 +119,7 @@ func (m *GitLabHookManager) PostReceiveHook(ctx context.Context, repo *gitalypb. } if isPrimary(payload) { - if err := m.postReceiveHook(ctx, payload, repo, pushOptions, env, changes, stdout, stderr); err != nil { + if err := m.postReceiveHook(ctx, tx, payload, repo, pushOptions, env, changes, stdout, stderr); err != nil { ctxlogrus.Extract(ctx).WithError(err).Warn("stopping transaction because post-receive hook failed") // If the post-receive hook declines the push, then we need to stop any @@ -134,7 +135,7 @@ func (m *GitLabHookManager) PostReceiveHook(ctx context.Context, repo *gitalypb. return nil } -func (m *GitLabHookManager) postReceiveHook(ctx context.Context, payload git.HooksPayload, repo *gitalypb.Repository, pushOptions, env []string, stdin []byte, stdout, stderr io.Writer) error { +func (m *GitLabHookManager) postReceiveHook(ctx context.Context, tx *gitaly.Transaction, payload git.HooksPayload, repo *gitalypb.Repository, pushOptions, env []string, stdin []byte, stdout, stderr io.Writer) error { if len(stdin) == 0 { return structerr.NewInternal("hook got no reference updates") } @@ -167,7 +168,7 @@ func (m *GitLabHookManager) postReceiveHook(ctx context.Context, payload git.Hoo return errors.New("") } - executor, err := m.newCustomHooksExecutor(repo, "post-receive") + executor, err := m.newCustomHooksExecutor(tx, repo, "post-receive") if err != nil { return structerr.NewInternal("creating custom hooks executor: %w", err) } diff --git a/internal/gitaly/hook/postreceive_test.go b/internal/gitaly/hook/postreceive_test.go index 08b12655a830748ff6cb4adecd77ab28e42f1d26..9b0cf5adc68e39d0196c2bbab3a8f19707013fd9 100644 --- a/internal/gitaly/hook/postreceive_test.go +++ b/internal/gitaly/hook/postreceive_test.go @@ -217,7 +217,7 @@ func TestPostReceive_customHook(t *testing.T) { gittest.WriteCustomHook(t, repoPath, "post-receive", []byte(tc.hook)) var stdout, stderr bytes.Buffer - err = hookManager.PostReceiveHook(ctx, repo, tc.pushOptions, tc.env, strings.NewReader(tc.stdin), &stdout, &stderr) + err = hookManager.PostReceiveHook(ctx, nil, repo, tc.pushOptions, tc.env, strings.NewReader(tc.stdin), &stdout, &stderr) if tc.expectedErr != "" { require.Contains(t, err.Error(), tc.expectedErr) @@ -362,7 +362,7 @@ func TestPostReceive_gitlab(t *testing.T) { gittest.WriteCustomHook(t, repoPath, "post-receive", []byte("#!/bin/sh\necho hook called\n")) var stdout, stderr bytes.Buffer - err = hookManager.PostReceiveHook(ctx, repo, tc.pushOptions, tc.env, strings.NewReader(tc.changes), &stdout, &stderr) + err = hookManager.PostReceiveHook(ctx, nil, repo, tc.pushOptions, tc.env, strings.NewReader(tc.changes), &stdout, &stderr) if tc.expectedErr != nil { require.Equal(t, tc.expectedErr, err) @@ -424,7 +424,7 @@ func TestPostReceive_quarantine(t *testing.T) { gittest.DefaultObjectHash.ZeroOID, gittest.DefaultObjectHash.ZeroOID)) var stdout, stderr bytes.Buffer - require.NoError(t, hookManager.PostReceiveHook(ctx, repo, nil, + require.NoError(t, hookManager.PostReceiveHook(ctx, nil, repo, nil, []string{env}, stdin, &stdout, &stderr)) if isQuarantined { diff --git a/internal/gitaly/hook/prereceive.go b/internal/gitaly/hook/prereceive.go index e857888814901846186a0f329da04ae4f97b1e2f..e7e5d57f7b8ea7610915682172e40d88dc4ea1be 100644 --- a/internal/gitaly/hook/prereceive.go +++ b/internal/gitaly/hook/prereceive.go @@ -11,6 +11,7 @@ import ( "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" "gitlab.com/gitlab-org/gitaly/v16/internal/git" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly" "gitlab.com/gitlab-org/gitaly/v16/internal/gitlab" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/env" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" @@ -62,7 +63,7 @@ func getRelativeObjectDirs(repoPath, gitObjectDir, gitAlternateObjectDirs string // PreReceiveHook will try to authenticate the changes against the GitLab API. // If successful, it will execute custom hooks with the given parameters, push // options and environment. -func (m *GitLabHookManager) PreReceiveHook(ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { +func (m *GitLabHookManager) PreReceiveHook(ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { payload, err := git.HooksPayloadFromEnv(env) if err != nil { return structerr.NewInternal("extracting hooks payload: %w", err) @@ -75,7 +76,7 @@ func (m *GitLabHookManager) PreReceiveHook(ctx context.Context, repo *gitalypb.R // Only the primary should execute hooks and increment reference counters. if isPrimary(payload) { - if err := m.preReceiveHook(ctx, payload, repo, pushOptions, env, changes, stdout, stderr); err != nil { + if err := m.preReceiveHook(ctx, tx, payload, repo, pushOptions, env, changes, stdout, stderr); err != nil { ctxlogrus.Extract(ctx).WithError(err).Warn("stopping transaction because pre-receive hook failed") // If the pre-receive hook declines the push, then we need to stop any @@ -91,7 +92,7 @@ func (m *GitLabHookManager) PreReceiveHook(ctx context.Context, repo *gitalypb.R return nil } -func (m *GitLabHookManager) preReceiveHook(ctx context.Context, payload git.HooksPayload, repo *gitalypb.Repository, pushOptions, envs []string, changes []byte, stdout, stderr io.Writer) error { +func (m *GitLabHookManager) preReceiveHook(ctx context.Context, tx *gitaly.Transaction, payload git.HooksPayload, repo *gitalypb.Repository, pushOptions, envs []string, changes []byte, stdout, stderr io.Writer) error { repoPath, err := m.locator.GetRepoPath(repo) if err != nil { return structerr.NewInternal("getting repo path: %w", err) @@ -161,7 +162,7 @@ func (m *GitLabHookManager) preReceiveHook(ctx context.Context, payload git.Hook } } - executor, err := m.newCustomHooksExecutor(repo, "pre-receive") + executor, err := m.newCustomHooksExecutor(tx, repo, "pre-receive") if err != nil { return fmt.Errorf("creating custom hooks executor: %w", err) } diff --git a/internal/gitaly/hook/prereceive_test.go b/internal/gitaly/hook/prereceive_test.go index 9362b0ae316fb9c25e44669c417a98428fe09ca8..2b479b3243f80d0e2fae4828266662708c26bef3 100644 --- a/internal/gitaly/hook/prereceive_test.go +++ b/internal/gitaly/hook/prereceive_test.go @@ -173,7 +173,7 @@ func TestPrereceive_customHooks(t *testing.T) { gittest.WriteCustomHook(t, repoPath, "pre-receive", []byte(tc.hook)) var stdout, stderr bytes.Buffer - err = hookManager.PreReceiveHook(ctx, repo, tc.pushOptions, tc.env, strings.NewReader(tc.stdin), &stdout, &stderr) + err = hookManager.PreReceiveHook(ctx, nil, repo, tc.pushOptions, tc.env, strings.NewReader(tc.stdin), &stdout, &stderr) if tc.expectedErr != "" { require.Contains(t, err.Error(), tc.expectedErr) @@ -236,7 +236,7 @@ func TestPrereceive_quarantine(t *testing.T) { gittest.DefaultObjectHash.ZeroOID, gittest.DefaultObjectHash.ZeroOID)) var stdout, stderr bytes.Buffer - require.NoError(t, hookManager.PreReceiveHook(ctx, repo, nil, + require.NoError(t, hookManager.PreReceiveHook(ctx, nil, repo, nil, []string{env}, stdin, &stdout, &stderr)) if isQuarantined { @@ -396,7 +396,7 @@ func TestPrereceive_gitlab(t *testing.T) { gittest.WriteCustomHook(t, repoPath, "pre-receive", []byte("#!/bin/sh\necho called\n")) var stdout, stderr bytes.Buffer - err = hookManager.PreReceiveHook(ctx, repo, nil, tc.env, strings.NewReader(tc.changes), &stdout, &stderr) + err = hookManager.PreReceiveHook(ctx, nil, repo, nil, tc.env, strings.NewReader(tc.changes), &stdout, &stderr) if tc.expectedErr != nil { require.Equal(t, tc.expectedErr, err) diff --git a/internal/gitaly/hook/transactions_test.go b/internal/gitaly/hook/transactions_test.go index da2e6d1d8fbd38edd90695ef037a13a4e6f89663..d13e6c7e748e67b6bbe01311d272207c9dcd6e84 100644 --- a/internal/gitaly/hook/transactions_test.go +++ b/internal/gitaly/hook/transactions_test.go @@ -59,13 +59,13 @@ func TestHookManager_stopCalled(t *testing.T) { } preReceiveFunc := func(t *testing.T) error { - return hookManager.PreReceiveHook(ctx, repo, nil, []string{hooksPayload}, strings.NewReader("changes"), io.Discard, io.Discard) + return hookManager.PreReceiveHook(ctx, nil, repo, nil, []string{hooksPayload}, strings.NewReader("changes"), io.Discard, io.Discard) } updateFunc := func(t *testing.T) error { - return hookManager.UpdateHook(ctx, repo, "ref", gittest.DefaultObjectHash.ZeroOID.String(), gittest.DefaultObjectHash.ZeroOID.String(), []string{hooksPayload}, io.Discard, io.Discard) + return hookManager.UpdateHook(ctx, nil, repo, "ref", gittest.DefaultObjectHash.ZeroOID.String(), gittest.DefaultObjectHash.ZeroOID.String(), []string{hooksPayload}, io.Discard, io.Discard) } postReceiveFunc := func(t *testing.T) error { - return hookManager.PostReceiveHook(ctx, repo, nil, []string{hooksPayload}, strings.NewReader("changes"), io.Discard, io.Discard) + return hookManager.PostReceiveHook(ctx, nil, repo, nil, []string{hooksPayload}, strings.NewReader("changes"), io.Discard, io.Discard) } for _, tc := range []struct { diff --git a/internal/gitaly/hook/update.go b/internal/gitaly/hook/update.go index d8afa94dc431cff8ede8b2c1f1f1b3c26b75275a..acb729bb45d1d3dedac1891b2542ff8e8be55b3c 100644 --- a/internal/gitaly/hook/update.go +++ b/internal/gitaly/hook/update.go @@ -7,19 +7,20 @@ import ( "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" "gitlab.com/gitlab-org/gitaly/v16/internal/git" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) //nolint:revive // This is unintentionally missing documentation. -func (m *GitLabHookManager) UpdateHook(ctx context.Context, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error { +func (m *GitLabHookManager) UpdateHook(ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error { payload, err := git.HooksPayloadFromEnv(env) if err != nil { return structerr.NewInternal("extracting hooks payload: %w", err) } if isPrimary(payload) { - if err := m.updateHook(ctx, payload, repo, ref, oldValue, newValue, env, stdout, stderr); err != nil { + if err := m.updateHook(ctx, tx, payload, repo, ref, oldValue, newValue, env, stdout, stderr); err != nil { ctxlogrus.Extract(ctx).WithError(err).Warn("stopping transaction because update hook failed") // If the update hook declines the push, then we need @@ -35,7 +36,7 @@ func (m *GitLabHookManager) UpdateHook(ctx context.Context, repo *gitalypb.Repos return nil } -func (m *GitLabHookManager) updateHook(ctx context.Context, payload git.HooksPayload, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error { +func (m *GitLabHookManager) updateHook(ctx context.Context, tx *gitaly.Transaction, payload git.HooksPayload, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error { objectHash, err := git.ObjectHashByFormat(payload.ObjectFormat) if err != nil { return fmt.Errorf("looking up object hash: %w", err) @@ -54,7 +55,7 @@ func (m *GitLabHookManager) updateHook(ctx context.Context, payload git.HooksPay return structerr.NewInternal("payload has no receive hooks info") } - executor, err := m.newCustomHooksExecutor(repo, "update") + executor, err := m.newCustomHooksExecutor(tx, repo, "update") if err != nil { return structerr.NewInternal("%w", err) } diff --git a/internal/gitaly/hook/update_test.go b/internal/gitaly/hook/update_test.go index 55d8acc0601cfaaae78d5dea77c7beb54da7cf12..707ae26c43b96582cdae2860dce75cda4fffa7ac 100644 --- a/internal/gitaly/hook/update_test.go +++ b/internal/gitaly/hook/update_test.go @@ -203,7 +203,7 @@ func TestUpdate_customHooks(t *testing.T) { gittest.WriteCustomHook(t, repoPath, "update", []byte(tc.hook)) var stdout, stderr bytes.Buffer - err = hookManager.UpdateHook(ctx, repo, tc.reference, tc.oldHash.String(), tc.newHash.String(), tc.env, &stdout, &stderr) + err = hookManager.UpdateHook(ctx, nil, repo, tc.reference, tc.oldHash.String(), tc.newHash.String(), tc.env, &stdout, &stderr) if tc.expectedErr != "" { require.Contains(t, err.Error(), tc.expectedErr) @@ -263,7 +263,7 @@ func TestUpdate_quarantine(t *testing.T) { require.NoError(t, err) var stdout, stderr bytes.Buffer - require.NoError(t, hookManager.UpdateHook(ctx, repo, "refs/heads/master", + require.NoError(t, hookManager.UpdateHook(ctx, nil, repo, "refs/heads/master", gittest.DefaultObjectHash.ZeroOID.String(), gittest.DefaultObjectHash.ZeroOID.String(), []string{env}, &stdout, &stderr)) if isQuarantined { diff --git a/internal/git/updateref/update_with_hooks.go b/internal/gitaly/hook/updateref/update_with_hooks.go similarity index 66% rename from internal/git/updateref/update_with_hooks.go rename to internal/gitaly/hook/updateref/update_with_hooks.go index c2e19b86a63beec048877fecff98623dda4a1991..362aa1576f48bd7327e204d763eb395bdcd612b3 100644 --- a/internal/git/updateref/update_with_hooks.go +++ b/internal/gitaly/hook/updateref/update_with_hooks.go @@ -13,6 +13,8 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/git/quarantine" "gitlab.com/gitlab-org/gitaly/v16/internal/git/repository" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/updateref" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" @@ -144,8 +146,12 @@ func NewUpdaterWithHooks( // with the quarantined repository as returned by the quarantine structure. If these hooks succeed, // quarantined objects will be migrated and all subsequent hooks are executed via the unquarantined // repository. +// +// If transaction is set, the actual update is done through the WAL by calling Commit on it. The +// quarantine directory is taken from the transaction, and the other quarantineDir parameter is ignored. func (u *UpdaterWithHooks) UpdateReference( ctx context.Context, + tx *gitaly.Transaction, repoProto *gitalypb.Repository, user *gitalypb.User, quarantineDir *quarantine.Dir, @@ -191,7 +197,22 @@ func (u *UpdaterWithHooks) UpdateReference( // then subsequently passed to Rails, which can use the quarantine directory to more // efficiently query which objects are new. quarantinedRepo := repoProto - if quarantineDir != nil { + if tx != nil { + quarantineDir, err := tx.QuarantineDirectory() + if err != nil { + return fmt.Errorf("quarantine directory: %w", err) + } + + repoPath, err := repo.Path() + if err != nil { + return fmt.Errorf("repo path: %w", err) + } + + quarantinedRepo, err = quarantine.Apply(repoPath, repoProto, quarantineDir) + if err != nil { + return fmt.Errorf("quarantine repo: %w", err) + } + } else if quarantineDir != nil { quarantinedRepo = quarantineDir.QuarantinedRepo() } @@ -201,7 +222,7 @@ func (u *UpdaterWithHooks) UpdateReference( } var stdout, stderr bytes.Buffer - if err := u.hookManager.PreReceiveHook(ctx, quarantinedRepo, pushOptions, []string{hooksPayload}, strings.NewReader(changes), &stdout, &stderr); err != nil { + if err := u.hookManager.PreReceiveHook(ctx, tx, quarantinedRepo, pushOptions, []string{hooksPayload}, strings.NewReader(changes), &stdout, &stderr); err != nil { return fmt.Errorf("running pre-receive hooks: %w", wrapHookError(err, git.PreReceiveHook, stdout.String(), stderr.String())) } @@ -217,62 +238,97 @@ func (u *UpdaterWithHooks) UpdateReference( // We only need to update the hooks payload to the unquarantined repo in case we // had a quarantine environment. Otherwise, the initial hooks payload is for the // real repository anyway. + // + // With WAL, the update and reference transaction hooks must still execute with the quarantine + // as the objects are only written into the repository once the transaction has been committed. hooksPayload, err = git.NewHooksPayload(u.cfg, repoProto, objectHash, transaction, &receiveHooksPayload, git.ReceivePackHooks, featureflag.FromContext(ctx)).Env() if err != nil { return fmt.Errorf("constructing quarantined hooks payload: %w", err) } } - if err := u.hookManager.UpdateHook(ctx, quarantinedRepo, reference.String(), oldrev.String(), newrev.String(), []string{hooksPayload}, &stdout, &stderr); err != nil { + if err := u.hookManager.UpdateHook(ctx, tx, quarantinedRepo, reference.String(), oldrev.String(), newrev.String(), []string{hooksPayload}, &stdout, &stderr); err != nil { return fmt.Errorf("running update hooks: %w", wrapHookError(err, git.UpdateHook, stdout.String(), stderr.String())) } - // We are already manually invoking the reference-transaction hook, so there is no need to - // set up hooks again here. One could argue that it would be easier to just have git handle - // execution of the reference-transaction hook. But unfortunately, it has proven to be - // problematic: if we queue a deletion, and the reference to be deleted exists both as - // packed-ref and as loose ref, then we would see two transactions: first a transaction - // deleting the packed-ref which would otherwise get unshadowed by deleting the loose ref, - // and only then do we see the deletion of the loose ref. So this depends on how well a repo - // is packed, which is obviously a bad thing as Gitaly nodes may be differently packed. We - // thus continue to manually drive the reference-transaction hook here, which doesn't have - // this problem. - updater, err := New(ctx, repo, WithDisabledTransactions()) - if err != nil { - return fmt.Errorf("creating updater: %w", err) - } + if tx != nil { + // The prepared step deviates from the non-WAL behavior as it doesn't verify nor lock the references + // prior to casting the prepared vote. + if err := u.hookManager.ReferenceTransactionHook(ctx, hook.ReferenceTransactionPrepared, []string{hooksPayload}, strings.NewReader(changes)); err != nil { + return fmt.Errorf("executing preparatory reference-transaction hook: %w", err) + } - // We need to explicitly cancel the update here such that we release the lock when this - // function exits if there is any error between locking and committing. - defer func() { _ = updater.Close() }() + tx.UpdateReferences(gitaly.ReferenceUpdates{ + reference: {OldOID: oldrev, NewOID: newrev}, + }) + + if err := tx.Commit(ctx); err != nil { + var errReferenceVerification gitaly.ReferenceVerificationError + if errors.As(err, &errReferenceVerification) { + return Error{ + Reference: errReferenceVerification.ReferenceName, + OldOID: oldrev, + NewOID: newrev, + } + } + + return fmt.Errorf("commit: %w", err) + } - if err := updater.Start(); err != nil { - return fmt.Errorf("start reference transaction: %w", err) - } + // The quarantined objects are written into the repository following a commit and the quarantine directory + // removed. Replace the quarantined repository with the normal repository in the payload. + hooksPayload, err = git.NewHooksPayload(u.cfg, repoProto, objectHash, transaction, &receiveHooksPayload, git.ReceivePackHooks, featureflag.FromContext(ctx)).Env() + if err != nil { + return fmt.Errorf("constructing quarantined hooks payload: %w", err) + } + } else { + // We are already manually invoking the reference-transaction hook, so there is no need to + // set up hooks again here. One could argue that it would be easier to just have git handle + // execution of the reference-transaction hook. But unfortunately, it has proven to be + // problematic: if we queue a deletion, and the reference to be deleted exists both as + // packed-ref and as loose ref, then we would see two transactions: first a transaction + // deleting the packed-ref which would otherwise get unshadowed by deleting the loose ref, + // and only then do we see the deletion of the loose ref. So this depends on how well a repo + // is packed, which is obviously a bad thing as Gitaly nodes may be differently packed. We + // thus continue to manually drive the reference-transaction hook here, which doesn't have + // this problem. + updater, err := updateref.New(ctx, repo, updateref.WithDisabledTransactions()) + if err != nil { + return fmt.Errorf("creating updater: %w", err) + } - if err := updater.Update(reference, newrev, oldrev); err != nil { - return fmt.Errorf("queueing ref update: %w", err) - } + // We need to explicitly cancel the update here such that we release the lock when this + // function exits if there is any error between locking and committing. + defer func() { _ = updater.Close() }() - // We need to lock the reference before executing the reference-transaction hook such that - // there cannot be any concurrent modification. - if err := updater.Prepare(); err != nil { - return Error{ - Reference: reference, - OldOID: oldrev, - NewOID: newrev, + if err := updater.Start(); err != nil { + return fmt.Errorf("start reference transaction: %w", err) } - } - if err := u.hookManager.ReferenceTransactionHook(ctx, hook.ReferenceTransactionPrepared, []string{hooksPayload}, strings.NewReader(changes)); err != nil { - return fmt.Errorf("executing preparatory reference-transaction hook: %w", err) - } + if err := updater.Update(reference, newrev, oldrev); err != nil { + return fmt.Errorf("queueing ref update: %w", err) + } + + // We need to lock the reference before executing the reference-transaction hook such that + // there cannot be any concurrent modification. + if err := updater.Prepare(); err != nil { + return Error{ + Reference: reference, + OldOID: oldrev, + NewOID: newrev, + } + } + + if err := u.hookManager.ReferenceTransactionHook(ctx, hook.ReferenceTransactionPrepared, []string{hooksPayload}, strings.NewReader(changes)); err != nil { + return fmt.Errorf("executing preparatory reference-transaction hook: %w", err) + } - if err := updater.Commit(); err != nil { - return Error{ - Reference: reference, - OldOID: oldrev, - NewOID: newrev, + if err := updater.Commit(); err != nil { + return Error{ + Reference: reference, + OldOID: oldrev, + NewOID: newrev, + } } } @@ -280,7 +336,7 @@ func (u *UpdaterWithHooks) UpdateReference( return fmt.Errorf("executing committing reference-transaction hook: %w", err) } - if err := u.hookManager.PostReceiveHook(ctx, repoProto, pushOptions, []string{hooksPayload}, strings.NewReader(changes), &stdout, &stderr); err != nil { + if err := u.hookManager.PostReceiveHook(ctx, tx, repoProto, pushOptions, []string{hooksPayload}, strings.NewReader(changes), &stdout, &stderr); err != nil { // CustomHook errors are returned in case a custom hook has returned an error code. // The post-receive hook has special semantics though. Quoting githooks(5): // diff --git a/internal/git/updateref/update_with_hooks_test.go b/internal/gitaly/hook/updateref/update_with_hooks_test.go similarity index 75% rename from internal/git/updateref/update_with_hooks_test.go rename to internal/gitaly/hook/updateref/update_with_hooks_test.go index bea8839bd347097b94a2baba76ef1ed2dd4b6eaa..6d4d9eb2b05d136e10d0dd428a2c93f83b98896c 100644 --- a/internal/git/updateref/update_with_hooks_test.go +++ b/internal/gitaly/hook/updateref/update_with_hooks_test.go @@ -13,9 +13,10 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/git/quarantine" - "gitlab.com/gitlab-org/gitaly/v16/internal/git/updateref" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service" hookservice "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/hook" "gitlab.com/gitlab-org/gitaly/v16/internal/metadata/featureflag" @@ -26,6 +27,10 @@ import ( "google.golang.org/grpc" ) +func TestMain(m *testing.M) { + testhelper.Run(m) +} + func TestUpdaterWithHooks_UpdateReference_invalidParameters(t *testing.T) { t.Parallel() @@ -83,7 +88,7 @@ func TestUpdaterWithHooks_UpdateReference_invalidParameters(t *testing.T) { for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - err := updater.UpdateReference(ctx, repo, gittest.TestUser, nil, tc.ref, tc.newRev, tc.oldRev) + err := updater.UpdateReference(ctx, nil, repo, gittest.TestUser, nil, tc.ref, tc.newRev, tc.oldRev) require.Equal(t, tc.expectedErr, err) }) } @@ -134,16 +139,16 @@ func TestUpdaterWithHooks_UpdateReference(t *testing.T) { referenceTransactionCalls := 0 testCases := []struct { desc string - preReceive func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error - postReceive func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error - update func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error + preReceive func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error + postReceive func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error + update func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error referenceTransaction func(t *testing.T, ctx context.Context, state hook.ReferenceTransactionState, env []string, stdin io.Reader) error expectedErr string expectedRefDeletion bool }{ { desc: "successful update", - preReceive: func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { + preReceive: func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { changes, err := io.ReadAll(stdin) require.NoError(t, err) require.Equal(t, fmt.Sprintf("%s %s refs/heads/main\n", commitID, gittest.DefaultObjectHash.ZeroOID.String()), string(changes)) @@ -151,14 +156,14 @@ func TestUpdaterWithHooks_UpdateReference(t *testing.T) { requirePayload(t, env) return nil }, - update: func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error { + update: func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error { require.Equal(t, "refs/heads/main", ref) require.Equal(t, commitID.String(), oldValue) require.Equal(t, newValue, gittest.DefaultObjectHash.ZeroOID.String()) requirePayload(t, env) return nil }, - postReceive: func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { + postReceive: func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { changes, err := io.ReadAll(stdin) require.NoError(t, err) require.Equal(t, fmt.Sprintf("%s %s refs/heads/main\n", commitID.String(), gittest.DefaultObjectHash.ZeroOID.String()), string(changes)) @@ -186,7 +191,7 @@ func TestUpdaterWithHooks_UpdateReference(t *testing.T) { }, { desc: "prereceive error", - preReceive: func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { + preReceive: func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { _, err := io.Copy(stderr, strings.NewReader("prereceive failure")) require.NoError(t, err) return errors.New("ignored") @@ -195,17 +200,17 @@ func TestUpdaterWithHooks_UpdateReference(t *testing.T) { }, { desc: "prereceive error from GitLab API response", - preReceive: func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { + preReceive: func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { return hook.NotAllowedError{Message: "GitLab: file is locked"} }, expectedErr: "GitLab: file is locked", }, { desc: "update error", - preReceive: func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { + preReceive: func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { return nil }, - update: func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error { + update: func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error { _, err := io.Copy(stderr, strings.NewReader("update failure")) require.NoError(t, err) return errors.New("ignored") @@ -214,10 +219,10 @@ func TestUpdaterWithHooks_UpdateReference(t *testing.T) { }, { desc: "reference-transaction error", - preReceive: func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { + preReceive: func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { return nil }, - update: func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error { + update: func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error { return nil }, referenceTransaction: func(t *testing.T, ctx context.Context, state hook.ReferenceTransactionState, env []string, stdin io.Reader) error { @@ -230,16 +235,16 @@ func TestUpdaterWithHooks_UpdateReference(t *testing.T) { }, { desc: "post-receive custom hooks error is ignored", - preReceive: func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { + preReceive: func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { return nil }, - update: func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error { + update: func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error { return nil }, referenceTransaction: func(t *testing.T, ctx context.Context, state hook.ReferenceTransactionState, env []string, stdin io.Reader) error { return nil }, - postReceive: func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { + postReceive: func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { _, err := io.Copy(stderr, strings.NewReader("post-receive failure")) require.NoError(t, err) return hook.NewCustomHookError(errors.New("ignored")) @@ -248,16 +253,16 @@ func TestUpdaterWithHooks_UpdateReference(t *testing.T) { }, { desc: "post-receive non-custom hooks error returned", - preReceive: func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { + preReceive: func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { return nil }, - update: func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error { + update: func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error { return nil }, referenceTransaction: func(t *testing.T, ctx context.Context, state hook.ReferenceTransactionState, env []string, stdin io.Reader) error { return nil }, - postReceive: func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { + postReceive: func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { _, err := io.Copy(stderr, strings.NewReader("post-receive failure")) require.NoError(t, err) return errors.New("uh oh") @@ -275,7 +280,7 @@ func TestUpdaterWithHooks_UpdateReference(t *testing.T) { gitCmdFactory := gittest.NewCommandFactory(t, cfg) updater := updateref.NewUpdaterWithHooks(cfg, config.NewLocator(cfg), hookManager, gitCmdFactory, nil) - err := updater.UpdateReference(ctx, repo, gittest.TestUser, nil, git.ReferenceName("refs/heads/main"), gittest.DefaultObjectHash.ZeroOID, commitID) + err := updater.UpdateReference(ctx, nil, repo, gittest.TestUser, nil, git.ReferenceName("refs/heads/main"), gittest.DefaultObjectHash.ZeroOID, commitID) if tc.expectedErr == "" { require.NoError(t, err) } else { @@ -342,7 +347,7 @@ func TestUpdaterWithHooks_quarantine(t *testing.T) { hookExecutions := make(map[string]int) hookManager := hook.NewMockManager(t, // The pre-receive hook is not expected to have the object in the normal repo. - func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { + func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { expectQuarantined(t, env, true) testhelper.ProtoEqual(t, quarantine.QuarantinedRepo(), repo) hookExecutions["prereceive"]++ @@ -350,7 +355,7 @@ func TestUpdaterWithHooks_quarantine(t *testing.T) { }, // But the post-receive hook shall get the unquarantined repository as input, with // objects already having been migrated into the target repo. - func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { + func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { expectQuarantined(t, env, false) testhelper.ProtoEqual(t, repoProto, repo) hookExecutions["postreceive"]++ @@ -360,7 +365,7 @@ func TestUpdaterWithHooks_quarantine(t *testing.T) { // each reference that we're updating. As it is called immediately before the ref // gets queued for update, objects must have already been migrated or otherwise // updating the refs will fail due to missing objects. - func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error { + func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error { expectQuarantined(t, env, false) testhelper.ProtoEqual(t, quarantine.QuarantinedRepo(), repo) hookExecutions["update"]++ @@ -383,6 +388,7 @@ func TestUpdaterWithHooks_quarantine(t *testing.T) { require.NoError(t, updateref.NewUpdaterWithHooks(cfg, locator, hookManager, gitCmdFactory, nil).UpdateReference( ctx, + nil, repoProto, &gitalypb.User{ GlId: "1234", diff --git a/internal/gitaly/partition_manager.go b/internal/gitaly/partition_manager.go index 408d5f4a32467c0eee70b6127313b5034f1c04f0..52a4a0c5f21fd7d67f097cd83893b90cc2cafc02 100644 --- a/internal/gitaly/partition_manager.go +++ b/internal/gitaly/partition_manager.go @@ -4,7 +4,9 @@ import ( "context" "errors" "fmt" + "io/fs" "os" + "path/filepath" "sync" "github.com/dgraph-io/badger/v3" @@ -13,6 +15,8 @@ import ( repo "gitlab.com/gitlab-org/gitaly/v16/internal/git/repository" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm" + "gitlab.com/gitlab-org/gitaly/v16/internal/safe" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" ) @@ -21,29 +25,66 @@ var ErrPartitionManagerStopped = errors.New("partition manager stopped") // PartitionManager is responsible for managing the lifecycle of each TransactionManager. type PartitionManager struct { - // mu is the mutex to synchronize access to the partitions. + // storages are the storages configured in this Gitaly server. The map is keyed by the storage name. + storages map[string]*storagePartition +} + +// storagePartition represents a single storage. +type storagePartition struct { + // mu synchronizes access to the fields of storagePartition. mu sync.Mutex - // db is the handle to the key-value store used for storing the write-ahead log related state. - // It is used to create each transaction manager. - db *badger.DB - // partitions contains all the active partitions for which there are pending transactions. - // Each repository can have up to one partition. - partitions map[string]*partition - // localRepoFactory is used by PartitionManager to construct `localrepo.Repo`. - localRepoFactory localrepo.Factory - // logger handles all logging for PartitionManager. + // logger handles all logging for storagePartition. logger logrus.FieldLogger - // stopped tracks whether the PartitionManager has been stopped. If the manager is stopped, - // no new transactions are allowed to begin. - stopped bool - // partitionsWG keeps track of running partitions. - partitionsWG sync.WaitGroup + // path is the absolute path to the storage's root. + path string + // repoFactory is a factory type that builds localrepo instances for this storage. + repoFactory localrepo.StorageScopedFactory // stagingDirectory is the directory where all of the TransactionManager staging directories // should be created. stagingDirectory string - // storages are the storages configured in this Gitaly server. They are keyed by the name and the - // value is the storage's path. - storages map[string]string + // stopped tracks whether the storagePartition has been stopped. If it is is stopped, + // no new transactions are allowed to begin. + stopped bool + // db is the handle to the key-value store used for storing the storage's database state. + database *badger.DB + // partitions contains all the active partitions. Each repository can have up to one partition. + partitions map[string]*partition + // activePartitions keeps track of active partitions. + activePartitions sync.WaitGroup +} + +func (sp *storagePartition) stop() { + sp.mu.Lock() + // Mark the storage as stopped so no new transactions can begin anymore. This + // also means no more partitions are spawned. + sp.stopped = true + for _, ptn := range sp.partitions { + // Stop all partitions. + ptn.stop() + } + sp.mu.Unlock() + + // Wait for all partitions to finish. + sp.activePartitions.Wait() + + if err := sp.database.Close(); err != nil { + sp.logger.WithError(err).Error("failed closing storage's database") + } +} + +// transactionFinalizerFactory is executed when a transaction completes. The pending transaction counter +// for the partition is decremented by one and TransactionManager stopped if there are no longer +// any pending transactions. +func (sp *storagePartition) transactionFinalizerFactory(ptn *partition) func() { + return func() { + sp.mu.Lock() + defer sp.mu.Unlock() + + ptn.pendingTransactionCount-- + if ptn.pendingTransactionCount == 0 { + ptn.stop() + } + } } // partition contains the transaction manager and tracks the number of in-flight transactions for the partition. @@ -60,95 +101,124 @@ type partition struct { pendingTransactionCount uint } +// stop stops the partition's transaction manager. +func (ptn *partition) stop() { + ptn.shuttingDown = true + ptn.transactionManager.Stop() +} + // NewPartitionManager returns a new PartitionManager. -func NewPartitionManager(db *badger.DB, storages []config.Storage, localRepoFactory localrepo.Factory, logger logrus.FieldLogger, stagingDir string) *PartitionManager { - storagesMap := make(map[string]string, len(storages)) - for _, storage := range storages { - storagesMap[storage.Name] = storage.Path - } +func NewPartitionManager(configuredStorages []config.Storage, localRepoFactory localrepo.Factory, logger logrus.FieldLogger) (*PartitionManager, error) { + storages := make(map[string]*storagePartition, len(configuredStorages)) + for _, storage := range configuredStorages { + repoFactory, err := localRepoFactory.ScopeByStorage(storage.Name) + if err != nil { + return nil, fmt.Errorf("scope by storage: %w", err) + } - return &PartitionManager{ - db: db, - partitions: make(map[string]*partition), - localRepoFactory: localRepoFactory, - logger: logger, - stagingDirectory: stagingDir, - storages: storagesMap, + stagingDir := stagingDirectoryPath(storage.Path) + // Remove a possible already existing staging directory as it may contain stale files + // if the previous process didn't shutdown gracefully. + if err := os.RemoveAll(stagingDir); err != nil { + return nil, fmt.Errorf("failed clearing storage's staging directory: %w", err) + } + + if err := os.Mkdir(stagingDir, perm.PrivateDir); err != nil { + return nil, fmt.Errorf("create storage's staging directory: %w", err) + } + + databaseDir := filepath.Join(storage.Path, "database") + if err := os.Mkdir(databaseDir, perm.PrivateDir); err != nil && !errors.Is(err, fs.ErrExist) { + return nil, fmt.Errorf("create storage's database directory: %w", err) + } + + if err := safe.NewSyncer().SyncHierarchy(storage.Path, "database"); err != nil { + return nil, fmt.Errorf("sync database directory: %w", err) + } + + db, err := OpenDatabase(databaseDir) + if err != nil { + return nil, fmt.Errorf("create storage's database directory: %w", err) + } + + storages[storage.Name] = &storagePartition{ + logger: logrus.WithField("storage", storage.Name), + path: storage.Path, + repoFactory: repoFactory, + stagingDirectory: stagingDir, + database: db, + partitions: map[string]*partition{}, + } } + + return &PartitionManager{storages: storages}, nil } -// getPartitionKey returns a partitions's key. -func getPartitionKey(storageName, relativePath string) string { - return storageName + ":" + relativePath +func stagingDirectoryPath(storagePath string) string { + return filepath.Join(storagePath, "staging") } // Begin gets the TransactionManager for the specified repository and starts a Transaction. If a // TransactionManager is not already running, a new one is created and used. The partition tracks // the number of pending transactions and this counter gets incremented when Begin is invoked. func (pm *PartitionManager) Begin(ctx context.Context, repo repo.GitRepo) (*Transaction, error) { - storagePath, ok := pm.storages[repo.GetStorageName()] + storagePtn, ok := pm.storages[repo.GetStorageName()] if !ok { return nil, structerr.NewNotFound("unknown storage: %q", repo.GetStorageName()) } - relativePath, err := storage.ValidateRelativePath(storagePath, repo.GetRelativePath()) + relativePath, err := storage.ValidateRelativePath(storagePtn.path, repo.GetRelativePath()) if err != nil { return nil, structerr.NewInvalidArgument("validate relative path: %w", err) } - partitionKey := getPartitionKey(repo.GetStorageName(), relativePath) - for { - pm.mu.Lock() - if pm.stopped { - pm.mu.Unlock() + storagePtn.mu.Lock() + if storagePtn.stopped { + storagePtn.mu.Unlock() return nil, ErrPartitionManagerStopped } - ptn, ok := pm.partitions[partitionKey] + ptn, ok := storagePtn.partitions[relativePath] if !ok { ptn = &partition{ shutdown: make(chan struct{}), } - stagingDir, err := os.MkdirTemp(pm.stagingDirectory, "") + stagingDir, err := os.MkdirTemp(storagePtn.stagingDirectory, "") if err != nil { - pm.mu.Unlock() + storagePtn.mu.Unlock() return nil, fmt.Errorf("create staging directory: %w", err) } - storageScopedFactory, err := pm.localRepoFactory.ScopeByStorage(repo.GetStorageName()) - if err != nil { - pm.mu.Unlock() - return nil, fmt.Errorf("scope by storage: %w", err) - } - - mgr := NewTransactionManager(pm.db, storagePath, relativePath, stagingDir, storageScopedFactory, pm.transactionFinalizerFactory(ptn)) + mgr := NewTransactionManager(storagePtn.database, storagePtn.path, relativePath, stagingDir, storagePtn.repoFactory, storagePtn.transactionFinalizerFactory(ptn)) ptn.transactionManager = mgr - pm.partitions[partitionKey] = ptn + storagePtn.partitions[relativePath] = ptn - pm.partitionsWG.Add(1) + storagePtn.activePartitions.Add(1) go func() { + logger := storagePtn.logger.WithField("partition", relativePath) + if err := mgr.Run(); err != nil { - pm.logger.WithError(err).Error("partition failed") + logger.WithError(err).Error("partition failed") } // In the event that TransactionManager stops running, a new TransactionManager will // need to be started in order to continue processing transactions. The partition is // deleted allowing the next transaction for the repository to create a new partition // and TransactionManager. - pm.mu.Lock() - delete(pm.partitions, partitionKey) - pm.mu.Unlock() + storagePtn.mu.Lock() + delete(storagePtn.partitions, relativePath) + storagePtn.mu.Unlock() close(ptn.shutdown) if err := os.RemoveAll(stagingDir); err != nil { - pm.logger.WithError(err).Error("failed removing partition's staging directory") + logger.WithError(err).Error("failed removing partition's staging directory") } - pm.partitionsWG.Done() + storagePtn.activePartitions.Done() }() } @@ -157,7 +227,7 @@ func (pm *PartitionManager) Begin(ctx context.Context, repo repo.GitRepo) (*Tran // used. The lock is released while waiting for the partition to complete shutdown as to // not block other partitions from processing transactions. Once shutdown is complete, a // new attempt is made to get a valid partition. - pm.mu.Unlock() + storagePtn.mu.Unlock() select { case <-ctx.Done(): return nil, ctx.Err() @@ -168,7 +238,7 @@ func (pm *PartitionManager) Begin(ctx context.Context, repo repo.GitRepo) (*Tran } ptn.pendingTransactionCount++ - pm.mu.Unlock() + storagePtn.mu.Unlock() transaction, err := ptn.transactionManager.Begin(ctx) if err != nil { @@ -176,7 +246,7 @@ func (pm *PartitionManager) Begin(ctx context.Context, repo repo.GitRepo) (*Tran // inflight. A transaction failing does not necessarily mean the transaction manager has // stopped running. Consequently, if there are no other pending transactions the partition // should be stopped. - pm.transactionFinalizerFactory(ptn)() + storagePtn.transactionFinalizerFactory(ptn)() return nil, err } @@ -185,40 +255,17 @@ func (pm *PartitionManager) Begin(ctx context.Context, repo repo.GitRepo) (*Tran } } -// Stop stops transaction processing for all running transaction managers and waits for shutdown -// completion. +// Stop stops transaction processing for all storages and waits for shutdown completion. func (pm *PartitionManager) Stop() { - pm.mu.Lock() - // Mark the PartitionManager as stopped so no new transactions can begin anymore. This - // also means no more partitions are spawned. - pm.stopped = true - for _, ptn := range pm.partitions { - // Stop all partitions. - ptn.stop() + var activeStorages sync.WaitGroup + for _, storagePtn := range pm.storages { + activeStorages.Add(1) + storagePtn := storagePtn + go func() { + storagePtn.stop() + activeStorages.Done() + }() } - pm.mu.Unlock() - - // Wait for all goroutines to complete. - pm.partitionsWG.Wait() -} - -// stop stops the partition's transaction manager. -func (ptn *partition) stop() { - ptn.shuttingDown = true - ptn.transactionManager.Stop() -} -// transactionFinalizerFactory is executed when a transaction completes. The pending transaction counter -// for the partition is decremented by one and TransactionManager stopped if there are no longer -// any pending transactions. -func (pm *PartitionManager) transactionFinalizerFactory(ptn *partition) func() { - return func() { - pm.mu.Lock() - defer pm.mu.Unlock() - - ptn.pendingTransactionCount-- - if ptn.pendingTransactionCount == 0 { - ptn.stop() - } - } + activeStorages.Wait() } diff --git a/internal/gitaly/partition_manager_test.go b/internal/gitaly/partition_manager_test.go index aeb26f7e30ede9cda81e8673d98d1550c6756304..d041c250c5ac7ec3479a3de634083baf8b3acc0e 100644 --- a/internal/gitaly/partition_manager_test.go +++ b/internal/gitaly/partition_manager_test.go @@ -16,8 +16,10 @@ import ( repo "gitlab.com/gitlab-org/gitaly/v16/internal/git/repository" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm" + "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) func TestPartitionManager(t *testing.T) { @@ -40,9 +42,9 @@ func TestPartitionManager(t *testing.T) { ctx context.Context // repo is the repository that the transaction belongs to. repo repo.GitRepo - // expectedState contains the expected repositories and their pending transaction count at + // expectedState contains the partitions by their storages and their pending transaction count at // the end of the step. - expectedState map[string]uint + expectedState map[string]map[string]uint // expectedError is the error expected to be returned when beginning the transaction. expectedError error } @@ -53,9 +55,9 @@ func TestPartitionManager(t *testing.T) { transactionID int // ctx is the context used when `Commit()` gets invoked. ctx context.Context - // expectedState contains the expected repositories and their pending transaction count at + // expectedState contains the partitions by their storages and their pending transaction count at // the end of the step. - expectedState map[string]uint + expectedState map[string]map[string]uint // expectedError is the error that is expected to be returned when committing the transaction. expectedError error } @@ -64,9 +66,9 @@ func TestPartitionManager(t *testing.T) { type rollback struct { // transactionID identifies the transaction to rollback. transactionID int - // expectedState contains the expected repositories and their pending transaction count at + // expectedState contains the partitions by their storages and their pending transaction count at // the end of the step. - expectedState map[string]uint + expectedState map[string]map[string]uint } // stopPartition stops the transaction manager for the specified repository. This is done to @@ -87,55 +89,67 @@ func TestPartitionManager(t *testing.T) { // being processed without a running partition manager. type stopManager struct{} - // blockOnPartitionShutdown checks if the specified partition is currently in the process of - // shutting down. If it is, the function waits for the shutdown process to complete before + // blockOnPartitionShutdown checks if any partitions are currently in the process of + // shutting down. If some are, the function waits for the shutdown process to complete before // continuing. This is required in order to accurately validate partition state. - blockOnPartitionShutdown := func(t *testing.T, ptn *partition) { + blockOnPartitionShutdown := func(t *testing.T, pm *PartitionManager) { t.Helper() - if ptn != nil && ptn.shuttingDown { - <-ptn.shutdown + var waitFor []chan struct{} + for _, sp := range pm.storages { + sp.mu.Lock() + for _, ptn := range sp.partitions { + if ptn.shuttingDown { + waitFor = append(waitFor, ptn.shutdown) + } + } + sp.mu.Unlock() + } + + for _, shutdown := range waitFor { + <-shutdown } } // checkExpectedState validates that the partition manager contains the correct partitions and // associated transaction count at the point of execution. - checkExpectedState := func(t *testing.T, partitionManager *PartitionManager, expectedState map[string]uint) { + checkExpectedState := func(t *testing.T, cfg config.Cfg, partitionManager *PartitionManager, expectedState map[string]map[string]uint) { t.Helper() - require.Equal(t, len(expectedState), len(partitionManager.partitions)) - for k, v := range expectedState { - partition, ok := partitionManager.partitions[k] - require.True(t, ok, "expected partition %q to be present", k) - require.Equal(t, v, partition.pendingTransactionCount) + actualState := map[string]map[string]uint{} + for storageName, storagePtn := range partitionManager.storages { + for partitionKey, partition := range storagePtn.partitions { + if actualState[storageName] == nil { + actualState[storageName] = map[string]uint{} + } + + actualState[storageName][partitionKey] = partition.pendingTransactionCount + } } - } - cfg := testcfg.Build(t) + if expectedState == nil { + expectedState = map[string]map[string]uint{} + } + + require.Equal(t, expectedState, actualState) + } - setupRepository := func(t *testing.T) repo.GitRepo { + setupRepository := func(t *testing.T, cfg config.Cfg, storage config.Storage) repo.GitRepo { t.Helper() repo, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + Storage: storage, SkipCreationViaService: true, }) return repo } - cmdFactory, clean, err := git.NewExecCommandFactory(cfg) - require.NoError(t, err) - t.Cleanup(clean) - - catfileCache := catfile.NewCache(cfg) - t.Cleanup(catfileCache.Stop) - - localRepoFactory := localrepo.NewFactory(config.NewLocator(cfg), cmdFactory, catfileCache) - // transactionData holds relevant data for each transaction created during a testcase. type transactionData struct { - txn *Transaction - ptn *partition + txn *Transaction + storagePtn *storagePartition + ptn *partition } type setupData struct { @@ -144,19 +158,21 @@ func TestPartitionManager(t *testing.T) { for _, tc := range []struct { desc string - setup func(t *testing.T) setupData + setup func(t *testing.T, cfg config.Cfg) setupData }{ { desc: "transaction committed for single repository", - setup: func(t *testing.T) setupData { - repo := setupRepository(t) + setup: func(t *testing.T, cfg config.Cfg) setupData { + repo := setupRepository(t, cfg, cfg.Storages[0]) return setupData{ steps: steps{ begin{ repo: repo, - expectedState: map[string]uint{ - getPartitionKey(repo.GetStorageName(), repo.GetRelativePath()): 1, + expectedState: map[string]map[string]uint{ + "default": { + repo.GetRelativePath(): 1, + }, }, }, commit{}, @@ -166,16 +182,18 @@ func TestPartitionManager(t *testing.T) { }, { desc: "two transactions committed for single repository sequentially", - setup: func(t *testing.T) setupData { - repo := setupRepository(t) + setup: func(t *testing.T, cfg config.Cfg) setupData { + repo := setupRepository(t, cfg, cfg.Storages[0]) return setupData{ steps: steps{ begin{ transactionID: 1, repo: repo, - expectedState: map[string]uint{ - getPartitionKey(repo.GetStorageName(), repo.GetRelativePath()): 1, + expectedState: map[string]map[string]uint{ + "default": { + repo.GetRelativePath(): 1, + }, }, }, commit{ @@ -184,8 +202,10 @@ func TestPartitionManager(t *testing.T) { begin{ transactionID: 2, repo: repo, - expectedState: map[string]uint{ - getPartitionKey(repo.GetStorageName(), repo.GetRelativePath()): 1, + expectedState: map[string]map[string]uint{ + "default": { + repo.GetRelativePath(): 1, + }, }, }, commit{ @@ -197,29 +217,35 @@ func TestPartitionManager(t *testing.T) { }, { desc: "two transactions committed for single repository in parallel", - setup: func(t *testing.T) setupData { - repo := setupRepository(t) + setup: func(t *testing.T, cfg config.Cfg) setupData { + repo := setupRepository(t, cfg, cfg.Storages[0]) return setupData{ steps: steps{ begin{ transactionID: 1, repo: repo, - expectedState: map[string]uint{ - getPartitionKey(repo.GetStorageName(), repo.GetRelativePath()): 1, + expectedState: map[string]map[string]uint{ + "default": { + repo.GetRelativePath(): 1, + }, }, }, begin{ transactionID: 2, repo: repo, - expectedState: map[string]uint{ - getPartitionKey(repo.GetStorageName(), repo.GetRelativePath()): 2, + expectedState: map[string]map[string]uint{ + "default": { + repo.GetRelativePath(): 2, + }, }, }, commit{ transactionID: 1, - expectedState: map[string]uint{ - getPartitionKey(repo.GetStorageName(), repo.GetRelativePath()): 1, + expectedState: map[string]map[string]uint{ + "default": { + repo.GetRelativePath(): 1, + }, }, }, commit{ @@ -230,36 +256,67 @@ func TestPartitionManager(t *testing.T) { }, }, { - desc: "transaction committed for two repositories", - setup: func(t *testing.T) setupData { - repoA := setupRepository(t) - repoB := setupRepository(t) + desc: "transaction committed for multiple repositories", + setup: func(t *testing.T, cfg config.Cfg) setupData { + repoA := setupRepository(t, cfg, cfg.Storages[0]) + repoB := setupRepository(t, cfg, cfg.Storages[0]) + repoC := setupRepository(t, cfg, cfg.Storages[1]) return setupData{ steps: steps{ begin{ transactionID: 1, repo: repoA, - expectedState: map[string]uint{ - getPartitionKey(repoA.GetStorageName(), repoA.GetRelativePath()): 1, + expectedState: map[string]map[string]uint{ + "default": { + repoA.GetRelativePath(): 1, + }, }, }, begin{ transactionID: 2, repo: repoB, - expectedState: map[string]uint{ - getPartitionKey(repoA.GetStorageName(), repoA.GetRelativePath()): 1, - getPartitionKey(repoB.GetStorageName(), repoB.GetRelativePath()): 1, + expectedState: map[string]map[string]uint{ + "default": { + repoA.GetRelativePath(): 1, + repoB.GetRelativePath(): 1, + }, + }, + }, + begin{ + transactionID: 3, + repo: repoC, + expectedState: map[string]map[string]uint{ + "default": { + repoA.GetRelativePath(): 1, + repoB.GetRelativePath(): 1, + }, + "other-storage": { + repoC.GetRelativePath(): 1, + }, }, }, commit{ transactionID: 1, - expectedState: map[string]uint{ - getPartitionKey(repoB.GetStorageName(), repoB.GetRelativePath()): 1, + expectedState: map[string]map[string]uint{ + "default": { + repoB.GetRelativePath(): 1, + }, + "other-storage": { + repoC.GetRelativePath(): 1, + }, }, }, commit{ transactionID: 2, + expectedState: map[string]map[string]uint{ + "other-storage": { + repoC.GetRelativePath(): 1, + }, + }, + }, + commit{ + transactionID: 3, }, }, } @@ -267,15 +324,17 @@ func TestPartitionManager(t *testing.T) { }, { desc: "transaction rolled back for single repository", - setup: func(t *testing.T) setupData { - repo := setupRepository(t) + setup: func(t *testing.T, cfg config.Cfg) setupData { + repo := setupRepository(t, cfg, cfg.Storages[0]) return setupData{ steps: steps{ begin{ repo: repo, - expectedState: map[string]uint{ - getPartitionKey(repo.GetStorageName(), repo.GetRelativePath()): 1, + expectedState: map[string]map[string]uint{ + "default": { + repo.GetRelativePath(): 1, + }, }, }, rollback{}, @@ -285,8 +344,8 @@ func TestPartitionManager(t *testing.T) { }, { desc: "starting transaction failed due to cancelled context", - setup: func(t *testing.T) setupData { - repo := setupRepository(t) + setup: func(t *testing.T, cfg config.Cfg) setupData { + repo := setupRepository(t, cfg, cfg.Storages[0]) stepCtx, cancel := context.WithCancel(ctx) cancel() @@ -304,8 +363,8 @@ func TestPartitionManager(t *testing.T) { }, { desc: "committing transaction failed due to cancelled context", - setup: func(t *testing.T) setupData { - repo := setupRepository(t) + setup: func(t *testing.T, cfg config.Cfg) setupData { + repo := setupRepository(t, cfg, cfg.Storages[0]) stepCtx, cancel := context.WithCancel(ctx) cancel() @@ -314,8 +373,10 @@ func TestPartitionManager(t *testing.T) { steps: steps{ begin{ repo: repo, - expectedState: map[string]uint{ - getPartitionKey(repo.GetStorageName(), repo.GetRelativePath()): 1, + expectedState: map[string]map[string]uint{ + "default": { + repo.GetRelativePath(): 1, + }, }, }, commit{ @@ -328,15 +389,17 @@ func TestPartitionManager(t *testing.T) { }, { desc: "committing transaction failed due to stopped transaction manager", - setup: func(t *testing.T) setupData { - repo := setupRepository(t) + setup: func(t *testing.T, cfg config.Cfg) setupData { + repo := setupRepository(t, cfg, cfg.Storages[0]) return setupData{ steps: steps{ begin{ repo: repo, - expectedState: map[string]uint{ - getPartitionKey(repo.GetStorageName(), repo.GetRelativePath()): 1, + expectedState: map[string]map[string]uint{ + "default": { + repo.GetRelativePath(): 1, + }, }, }, stopPartition{}, @@ -349,16 +412,18 @@ func TestPartitionManager(t *testing.T) { }, { desc: "transaction from previous transaction manager finalized after new manager started", - setup: func(t *testing.T) setupData { - repo := setupRepository(t) + setup: func(t *testing.T, cfg config.Cfg) setupData { + repo := setupRepository(t, cfg, cfg.Storages[0]) return setupData{ steps: steps{ begin{ transactionID: 1, repo: repo, - expectedState: map[string]uint{ - getPartitionKey(repo.GetStorageName(), repo.GetRelativePath()): 1, + expectedState: map[string]map[string]uint{ + "default": { + repo.GetRelativePath(): 1, + }, }, }, stopPartition{ @@ -367,8 +432,10 @@ func TestPartitionManager(t *testing.T) { begin{ transactionID: 2, repo: repo, - expectedState: map[string]uint{ - getPartitionKey(repo.GetStorageName(), repo.GetRelativePath()): 1, + expectedState: map[string]map[string]uint{ + "default": { + repo.GetRelativePath(): 1, + }, }, }, finalizeTransaction{ @@ -383,8 +450,8 @@ func TestPartitionManager(t *testing.T) { }, { desc: "transaction started after partition manager stopped", - setup: func(t *testing.T) setupData { - repo := setupRepository(t) + setup: func(t *testing.T, cfg config.Cfg) setupData { + repo := setupRepository(t, cfg, cfg.Storages[0]) return setupData{ steps: steps{ @@ -399,8 +466,8 @@ func TestPartitionManager(t *testing.T) { }, { desc: "multiple transactions started after partition manager stopped", - setup: func(t *testing.T) setupData { - repo := setupRepository(t) + setup: func(t *testing.T, cfg config.Cfg) setupData { + repo := setupRepository(t, cfg, cfg.Storages[0]) return setupData{ steps: steps{ @@ -419,33 +486,107 @@ func TestPartitionManager(t *testing.T) { } }, }, + { + desc: "transaction for a non-existent storage", + setup: func(t *testing.T, cfg config.Cfg) setupData { + return setupData{ + steps: steps{ + begin{ + repo: &gitalypb.Repository{ + StorageName: "non-existent", + }, + expectedError: structerr.NewNotFound("unknown storage: %q", "non-existent"), + }, + }, + } + }, + }, + + { + desc: "relative paths are cleaned", + setup: func(t *testing.T, cfg config.Cfg) setupData { + repo := setupRepository(t, cfg, cfg.Storages[0]) + + return setupData{ + steps: steps{ + begin{ + transactionID: 1, + repo: repo, + expectedState: map[string]map[string]uint{ + "default": { + repo.GetRelativePath(): 1, + }, + }, + }, + begin{ + transactionID: 2, + repo: &gitalypb.Repository{ + StorageName: repo.GetStorageName(), + RelativePath: filepath.Join(repo.GetRelativePath(), "child-dir", ".."), + }, + expectedState: map[string]map[string]uint{ + "default": { + repo.GetRelativePath(): 2, + }, + }, + }, + }, + } + }, + }, } { tc := tc t.Run(tc.desc, func(t *testing.T) { t.Parallel() - setup := tc.setup(t) + cfg := testcfg.Build(t, testcfg.WithStorages("default", "other-storage")) - database, err := OpenDatabase(t.TempDir()) + cmdFactory, clean, err := git.NewExecCommandFactory(cfg) require.NoError(t, err) - defer testhelper.MustClose(t, database) + t.Cleanup(clean) + + catfileCache := catfile.NewCache(cfg) + t.Cleanup(catfileCache.Stop) + + localRepoFactory := localrepo.NewFactory(config.NewLocator(cfg), cmdFactory, catfileCache) - stagingDir := filepath.Join(t.TempDir(), "staging") - require.NoError(t, os.Mkdir(stagingDir, perm.PrivateDir)) + setup := tc.setup(t, cfg) - partitionManager := NewPartitionManager(database, cfg.Storages, localRepoFactory, logrus.StandardLogger(), stagingDir) + // Create some existing content in the staging directory so we can assert it gets removed and + // recreated. + for _, storage := range cfg.Storages { + require.NoError(t, + os.MkdirAll( + filepath.Join(stagingDirectoryPath(storage.Path), "existing-content"), + perm.PrivateDir, + ), + ) + } + + partitionManager, err := NewPartitionManager(cfg.Storages, localRepoFactory, logrus.StandardLogger()) + require.NoError(t, err) defer func() { partitionManager.Stop() - // Assert all staging directories have been removed. - testhelper.RequireDirectoryState(t, stagingDir, "", testhelper.DirectoryState{ - "/": {Mode: umask.Mask(fs.ModeDir | perm.PrivateDir)}, - }) + for _, storage := range cfg.Storages { + // Assert all staging directories have been emptied at the end. + testhelper.RequireDirectoryState(t, storage.Path, "staging", testhelper.DirectoryState{ + "/staging": {Mode: umask.Mask(fs.ModeDir | perm.PrivateDir)}, + }) + } }() + for _, storage := range cfg.Storages { + // Assert the existing content in the staging directory was removed. + testhelper.RequireDirectoryState(t, storage.Path, "staging", testhelper.DirectoryState{ + "/staging": {Mode: umask.Mask(fs.ModeDir | perm.PrivateDir)}, + }) + } + // openTransactionData holds references to all transactions and its associated partition // created during the testcase. openTransactionData := map[int]*transactionData{} + var partitionManagerStopped bool for _, step := range setup.steps { switch step := step.(type) { case begin: @@ -459,16 +600,22 @@ func TestPartitionManager(t *testing.T) { txn, err := partitionManager.Begin(beginCtx, step.repo) require.Equal(t, step.expectedError, err) - partitionManager.mu.Lock() - ptn := partitionManager.partitions[getPartitionKey(step.repo.GetStorageName(), step.repo.GetRelativePath())] - partitionManager.mu.Unlock() + blockOnPartitionShutdown(t, partitionManager) + checkExpectedState(t, cfg, partitionManager, step.expectedState) - blockOnPartitionShutdown(t, ptn) - checkExpectedState(t, partitionManager, step.expectedState) + if err != nil { + continue + } + + storagePtn := partitionManager.storages[step.repo.GetStorageName()] + storagePtn.mu.Lock() + ptn := storagePtn.partitions[step.repo.GetRelativePath()] + storagePtn.mu.Unlock() openTransactionData[step.transactionID] = &transactionData{ - txn: txn, - ptn: ptn, + txn: txn, + storagePtn: storagePtn, + ptn: ptn, } case commit: require.Contains(t, openTransactionData, step.transactionID, "test error: transaction committed before being started") @@ -482,30 +629,32 @@ func TestPartitionManager(t *testing.T) { require.ErrorIs(t, data.txn.Commit(commitCtx), step.expectedError) - blockOnPartitionShutdown(t, data.ptn) - checkExpectedState(t, partitionManager, step.expectedState) + blockOnPartitionShutdown(t, partitionManager) + checkExpectedState(t, cfg, partitionManager, step.expectedState) case rollback: require.Contains(t, openTransactionData, step.transactionID, "test error: transaction rolled back before being started") data := openTransactionData[step.transactionID] require.NoError(t, data.txn.Rollback()) - blockOnPartitionShutdown(t, data.ptn) - checkExpectedState(t, partitionManager, step.expectedState) + blockOnPartitionShutdown(t, partitionManager) + checkExpectedState(t, cfg, partitionManager, step.expectedState) case stopPartition: require.Contains(t, openTransactionData, step.transactionID, "test error: transaction manager stopped before being started") data := openTransactionData[step.transactionID] data.ptn.stop() - blockOnPartitionShutdown(t, data.ptn) + blockOnPartitionShutdown(t, partitionManager) case finalizeTransaction: require.Contains(t, openTransactionData, step.transactionID, "test error: transaction finalized before being started") data := openTransactionData[step.transactionID] - partitionManager.transactionFinalizerFactory(data.ptn)() + + data.storagePtn.transactionFinalizerFactory(data.ptn)() case stopManager: - require.False(t, partitionManager.stopped, "test error: partition manager already stopped") + require.False(t, partitionManagerStopped, "test error: partition manager already stopped") + partitionManagerStopped = true partitionManager.Stop() } diff --git a/internal/gitaly/server/auth_test.go b/internal/gitaly/server/auth_test.go index 566b5f93bd03a7e37454204cada1d668a4851de1..2e13e32ea1a1b039c11ce2f5b2b96b47033c52eb 100644 --- a/internal/gitaly/server/auth_test.go +++ b/internal/gitaly/server/auth_test.go @@ -20,10 +20,10 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/cache" "gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile" "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" - "gitlab.com/gitlab-org/gitaly/v16/internal/git/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config/auth" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/setup" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" diff --git a/internal/gitaly/service/conflicts/resolve_conflicts.go b/internal/gitaly/service/conflicts/resolve_conflicts.go index 3ff55ecbc26ff1cd2a281df7a1d64704a1a690b1..2e6c6d9fb67904a15318756b2a5cfe26e4590175 100644 --- a/internal/gitaly/service/conflicts/resolve_conflicts.go +++ b/internal/gitaly/service/conflicts/resolve_conflicts.go @@ -198,6 +198,7 @@ func (s *server) resolveConflicts(header *gitalypb.ResolveConflictsRequestHeader if err := s.updater.UpdateReference( ctx, + nil, header.Repository, header.User, quarantineDir, diff --git a/internal/gitaly/service/conflicts/resolve_conflicts_test.go b/internal/gitaly/service/conflicts/resolve_conflicts_test.go index ef18ff6a0766bed52e05d9ca6ed8ca46d9be34b2..80037942eabb14ced7705a2ce98cb6ce0afe7756 100644 --- a/internal/gitaly/service/conflicts/resolve_conflicts_test.go +++ b/internal/gitaly/service/conflicts/resolve_conflicts_test.go @@ -19,6 +19,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" @@ -60,7 +61,7 @@ var ( func TestSuccessfulResolveConflictsRequestHelper(t *testing.T) { var verifyFunc func(tb testing.TB, pushOptions []string, stdin io.Reader) - verifyFuncProxy := func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { + verifyFuncProxy := func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { // We use a proxy func here as we need to provide the hookManager dependency while creating the service but we only // know the commit IDs after the service is created. The proxy allows us to modify the verifyFunc after the service // is already built. diff --git a/internal/gitaly/service/conflicts/server.go b/internal/gitaly/service/conflicts/server.go index 6e540a216c32f1f09e1583936c35906bfeda6bcc..52833c4356eba4a78bb805c64828c8e29fd125b6 100644 --- a/internal/gitaly/service/conflicts/server.go +++ b/internal/gitaly/service/conflicts/server.go @@ -6,9 +6,9 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/git/repository" - "gitlab.com/gitlab-org/gitaly/v16/internal/git/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/git2go" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) diff --git a/internal/gitaly/service/dependencies.go b/internal/gitaly/service/dependencies.go index 90044ff9cfd9df296b5d0b6c1388088a195d67de..2f9ff0254471698dc6b14feb967f5d3c3dc55a55 100644 --- a/internal/gitaly/service/dependencies.go +++ b/internal/gitaly/service/dependencies.go @@ -7,10 +7,11 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile" "gitlab.com/gitlab-org/gitaly/v16/internal/git/housekeeping" - "gitlab.com/gitlab-org/gitaly/v16/internal/git/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/git2go" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" gitalyhook "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v16/internal/gitlab" @@ -37,6 +38,7 @@ type Dependencies struct { Git2goExecutor *git2go.Executor UpdaterWithHooks *updateref.UpdaterWithHooks HousekeepingManager housekeeping.Manager + PartitionManager *gitaly.PartitionManager } // GetCfg returns service configuration. @@ -123,3 +125,8 @@ func (dc *Dependencies) GetHousekeepingManager() housekeeping.Manager { func (dc *Dependencies) GetPackObjectsLimiter() limithandler.Limiter { return dc.PackObjectsLimiter } + +// GetPartitionManager returns the PartitionManager. +func (dc *Dependencies) GetPartitionManager() *gitaly.PartitionManager { + return dc.PartitionManager +} diff --git a/internal/gitaly/service/hook/pack_objects_test.go b/internal/gitaly/service/hook/pack_objects_test.go index 4c2de55abc64afb6844cc639a87c78a5ace4a4fe..80db2a339c0342712d330246dc7f4d81b34e20ad 100644 --- a/internal/gitaly/service/hook/pack_objects_test.go +++ b/internal/gitaly/service/hook/pack_objects_test.go @@ -778,8 +778,6 @@ func TestPackObjects_concurrencyLimit(t *testing.T) { func testPackObjectsConcurrency(t *testing.T, ctx context.Context) { t.Parallel() - cfg := cfgWithCache(t, 0) - args := []string{"pack-objects", "--revs", "--thin", "--stdout", "--progress", "--delta-base-offset"} for _, tc := range []struct { @@ -957,6 +955,8 @@ func testPackObjectsConcurrency(t *testing.T, ctx context.Context) { }, } { t.Run(tc.desc, func(t *testing.T) { + cfg := cfgWithCache(t, 0) + ticker := helper.NewManualTicker() monitor := limithandler.NewPackObjectsConcurrencyMonitor( cfg.Prometheus.GRPCLatencyBuckets, diff --git a/internal/gitaly/service/hook/post_receive.go b/internal/gitaly/service/hook/post_receive.go index f504fb80d97300ad0329248b59ce02ccd48ff00a..440325c0b2466c30ebccc8cb4c5170b291d163db 100644 --- a/internal/gitaly/service/hook/post_receive.go +++ b/internal/gitaly/service/hook/post_receive.go @@ -48,6 +48,7 @@ func (s *server) PostReceiveHook(stream gitalypb.HookService_PostReceiveHookServ if err := s.manager.PostReceiveHook( stream.Context(), + nil, firstRequest.Repository, firstRequest.GetGitPushOptions(), firstRequest.GetEnvironmentVariables(), diff --git a/internal/gitaly/service/hook/pre_receive.go b/internal/gitaly/service/hook/pre_receive.go index 0765b88af3e86b5ed8cce7ac4a1bd4130924fde0..ea2e69559aac3e1c1a1dced0fd5138016931d5b5 100644 --- a/internal/gitaly/service/hook/pre_receive.go +++ b/internal/gitaly/service/hook/pre_receive.go @@ -38,6 +38,7 @@ func (s *server) PreReceiveHook(stream gitalypb.HookService_PreReceiveHookServer if err := s.manager.PreReceiveHook( stream.Context(), + nil, repository, firstRequest.GetGitPushOptions(), firstRequest.GetEnvironmentVariables(), diff --git a/internal/gitaly/service/hook/update.go b/internal/gitaly/service/hook/update.go index 17774cbade49819721b033f84ff070a17de7c2f4..e12f5cfd7b789502a0fa543508cadccafdf7004e 100644 --- a/internal/gitaly/service/hook/update.go +++ b/internal/gitaly/service/hook/update.go @@ -30,6 +30,7 @@ func (s *server) UpdateHook(in *gitalypb.UpdateHookRequest, stream gitalypb.Hook if err := s.manager.UpdateHook( stream.Context(), + nil, in.GetRepository(), string(in.GetRef()), in.GetOldValue(), diff --git a/internal/gitaly/service/operations/apply_patch.go b/internal/gitaly/service/operations/apply_patch.go index 9cb9a5ee6db364fe52f7b40dce96305fb46676c2..7c181c9c11566e98c7f604781d84dd7e039bc5b6 100644 --- a/internal/gitaly/service/operations/apply_patch.go +++ b/internal/gitaly/service/operations/apply_patch.go @@ -193,7 +193,7 @@ func (s *Server) userApplyPatch(ctx context.Context, header *gitalypb.UserApplyP } } - if err := s.updateReferenceWithHooks(ctx, header.Repository, header.User, nil, targetBranch, patchedCommit, currentCommit); err != nil { + if err := s.updateReferenceWithHooks(ctx, nil, header.Repository, header.User, nil, targetBranch, patchedCommit, currentCommit); err != nil { return fmt.Errorf("update reference: %w", err) } diff --git a/internal/gitaly/service/operations/branches.go b/internal/gitaly/service/operations/branches.go index d88e816c372583b21a7460c724081cf860762b29..67b3c9faab03d18db28befb6423cdb8c5ee294a1 100644 --- a/internal/gitaly/service/operations/branches.go +++ b/internal/gitaly/service/operations/branches.go @@ -6,8 +6,8 @@ import ( "fmt" "gitlab.com/gitlab-org/gitaly/v16/internal/git" - "gitlab.com/gitlab-org/gitaly/v16/internal/git/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" @@ -58,7 +58,7 @@ func (s *Server) UserCreateBranch(ctx context.Context, req *gitalypb.UserCreateB referenceName := git.NewReferenceNameFromBranchName(string(req.BranchName)) - if err := s.updateReferenceWithHooks(ctx, req.GetRepository(), req.User, quarantineDir, referenceName, startPointOID, git.ObjectHashSHA1.ZeroOID); err != nil { + if err := s.updateReferenceWithHooks(ctx, nil, req.GetRepository(), req.User, quarantineDir, referenceName, startPointOID, git.ObjectHashSHA1.ZeroOID); err != nil { var customHookErr updateref.CustomHookError if errors.As(err, &customHookErr) { @@ -140,7 +140,7 @@ func (s *Server) UserUpdateBranch(ctx context.Context, req *gitalypb.UserUpdateB return nil, err } - if err := s.updateReferenceWithHooks(ctx, req.GetRepository(), req.User, quarantineDir, referenceName, newOID, oldOID); err != nil { + if err := s.updateReferenceWithHooks(ctx, nil, req.GetRepository(), req.User, quarantineDir, referenceName, newOID, oldOID); err != nil { var customHookErr updateref.CustomHookError if errors.As(err, &customHookErr) { return &gitalypb.UserUpdateBranchResponse{ @@ -203,7 +203,7 @@ func (s *Server) UserDeleteBranch(ctx context.Context, req *gitalypb.UserDeleteB } } - if err := s.updateReferenceWithHooks(ctx, req.Repository, req.User, nil, referenceName, git.ObjectHashSHA1.ZeroOID, referenceValue); err != nil { + if err := s.updateReferenceWithHooks(ctx, nil, req.Repository, req.User, nil, referenceName, git.ObjectHashSHA1.ZeroOID, referenceValue); err != nil { var notAllowedError hook.NotAllowedError var customHookErr updateref.CustomHookError var updateRefError updateref.Error diff --git a/internal/gitaly/service/operations/branches_test.go b/internal/gitaly/service/operations/branches_test.go index a11bedc9a40e1fb5d960fe41baf1606d5231c9c5..f34fa9410f137cbddd9648595932a63647fed6ca 100644 --- a/internal/gitaly/service/operations/branches_test.go +++ b/internal/gitaly/service/operations/branches_test.go @@ -143,6 +143,7 @@ func TestUserCreateBranch_Transactions(t *testing.T) { deps.GetGitCmdFactory(), deps.GetCatfileCache(), deps.GetUpdaterWithHooks(), + deps.GetPartitionManager(), )) gitalypb.RegisterHookServiceServer(srv, hook.NewServer(deps.GetHookManager(), deps.GetGitCmdFactory(), deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker(), deps.GetPackObjectsLimiter())) // Praefect proxy execution disabled as praefect runs only on the UNIX socket, but @@ -815,6 +816,7 @@ func TestUserDeleteBranch_transaction(t *testing.T) { deps.GetGitCmdFactory(), deps.GetCatfileCache(), deps.GetUpdaterWithHooks(), + deps.GetPartitionManager(), )) }) diff --git a/internal/gitaly/service/operations/cherry_pick.go b/internal/gitaly/service/operations/cherry_pick.go index 879456ef4c059ad6cf6a06e71bea8e748e915f64..797ad3ecf8a335ebbe8bf6600a31c1660255279b 100644 --- a/internal/gitaly/service/operations/cherry_pick.go +++ b/internal/gitaly/service/operations/cherry_pick.go @@ -8,8 +8,8 @@ import ( "time" "gitlab.com/gitlab-org/gitaly/v16/internal/git" - "gitlab.com/gitlab-org/gitaly/v16/internal/git/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/git2go" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) @@ -145,7 +145,7 @@ func (s *Server) UserCherryPick(ctx context.Context, req *gitalypb.UserCherryPic } } - if err := s.updateReferenceWithHooks(ctx, req.GetRepository(), req.User, quarantineDir, referenceName, newrev, oldrev); err != nil { + if err := s.updateReferenceWithHooks(ctx, nil, req.GetRepository(), req.User, quarantineDir, referenceName, newrev, oldrev); err != nil { var customHookErr updateref.CustomHookError if errors.As(err, &customHookErr) { return nil, structerr.NewFailedPrecondition("access check failed").WithDetail( diff --git a/internal/gitaly/service/operations/commit_files.go b/internal/gitaly/service/operations/commit_files.go index 24f376801ea10012c3fffa982fbd3acd7b80b935..588de9e9a202b537f12adce2f757abc922cf6f11 100644 --- a/internal/gitaly/service/operations/commit_files.go +++ b/internal/gitaly/service/operations/commit_files.go @@ -14,8 +14,8 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/git/remoterepo" - "gitlab.com/gitlab-org/gitaly/v16/internal/git/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/git2go" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" @@ -126,10 +126,11 @@ func validatePath(rootPath, relPath string) (string, error) { } func (s *Server) userCommitFiles(ctx context.Context, header *gitalypb.UserCommitFilesRequestHeader, stream gitalypb.OperationService_UserCommitFilesServer) error { - quarantineDir, quarantineRepo, err := s.quarantinedRepo(ctx, header.GetRepository()) + tx, quarantineDir, quarantineRepo, clean, err := s.begin(ctx, header.GetRepository()) if err != nil { - return err + return fmt.Errorf("begin: %w", err) } + defer func() { _ = clean() }() repoPath, err := quarantineRepo.Path() if err != nil { @@ -339,7 +340,7 @@ func (s *Server) userCommitFiles(ctx context.Context, header *gitalypb.UserCommi } } - if err := s.updateReferenceWithHooks(ctx, header.GetRepository(), header.User, quarantineDir, targetBranchName, commitID, oldRevision); err != nil { + if err := s.updateReferenceWithHooks(ctx, tx, header.GetRepository(), header.User, quarantineDir, targetBranchName, commitID, oldRevision); err != nil { if errors.As(err, &updateref.Error{}) { return structerr.NewFailedPrecondition("%w", err) } diff --git a/internal/gitaly/service/operations/commit_files_test.go b/internal/gitaly/service/operations/commit_files_test.go index b2293d3c8a3d6f73dae91ac6ac080584b9b7a15a..c22798fad40b3a9ec1a7c766f279b4c258acb3a9 100644 --- a/internal/gitaly/service/operations/commit_files_test.go +++ b/internal/gitaly/service/operations/commit_files_test.go @@ -952,6 +952,7 @@ func TestUserCommitFiles(t *testing.T) { continue } + require.NoError(t, err) require.Equal(t, step.branchCreated, resp.BranchUpdate.BranchCreated, "step %d", i+1) require.Equal(t, step.repoCreated, resp.BranchUpdate.RepoCreated, "step %d", i+1) gittest.RequireTree(t, cfg, repoPath, branch, step.treeEntries) diff --git a/internal/gitaly/service/operations/merge.go b/internal/gitaly/service/operations/merge.go index 4511a231942ccccb76ecc55f3331dfb7ec35e9dc..d2c6d8c9cf34299002605fe29d6871e09e8bf06a 100644 --- a/internal/gitaly/service/operations/merge.go +++ b/internal/gitaly/service/operations/merge.go @@ -11,9 +11,9 @@ import ( "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" - "gitlab.com/gitlab-org/gitaly/v16/internal/git/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/git2go" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" @@ -193,7 +193,7 @@ func (s *Server) UserMergeBranch(stream gitalypb.OperationService_UserMergeBranc return structerr.NewFailedPrecondition("merge aborted by client") } - if err := s.updateReferenceWithHooks(ctx, firstRequest.GetRepository(), firstRequest.User, quarantineDir, referenceName, mergeOID, revision); err != nil { + if err := s.updateReferenceWithHooks(ctx, nil, firstRequest.GetRepository(), firstRequest.User, quarantineDir, referenceName, mergeOID, revision); err != nil { var notAllowedError hook.NotAllowedError var customHookErr updateref.CustomHookError var updateRefError updateref.Error @@ -331,7 +331,7 @@ func (s *Server) UserFFBranch(ctx context.Context, in *gitalypb.UserFFBranchRequ return nil, structerr.NewFailedPrecondition("not fast forward") } - if err := s.updateReferenceWithHooks(ctx, in.GetRepository(), in.User, quarantineDir, referenceName, commitID, revision); err != nil { + if err := s.updateReferenceWithHooks(ctx, nil, in.GetRepository(), in.User, quarantineDir, referenceName, commitID, revision); err != nil { var customHookErr updateref.CustomHookError if errors.As(err, &customHookErr) { return &gitalypb.UserFFBranchResponse{ diff --git a/internal/gitaly/service/operations/rebase.go b/internal/gitaly/service/operations/rebase.go index a11cae6878a7211fd14dfb965d5cfc12574b111a..9e2282a1eb4eec47d76fa56fb0c4841a147b1fa0 100644 --- a/internal/gitaly/service/operations/rebase.go +++ b/internal/gitaly/service/operations/rebase.go @@ -6,8 +6,8 @@ import ( "time" "gitlab.com/gitlab-org/gitaly/v16/internal/git" - "gitlab.com/gitlab-org/gitaly/v16/internal/git/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/git2go" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" @@ -110,6 +110,7 @@ func (s *Server) UserRebaseConfirmable(stream gitalypb.OperationService_UserReba if err := s.updateReferenceWithHooks( ctx, + nil, header.GetRepository(), header.User, quarantineDir, diff --git a/internal/gitaly/service/operations/revert.go b/internal/gitaly/service/operations/revert.go index 8b6e3b66949613f57a789cc5db41b862297c4568..8f2bd15b2272df50374da7dfb3f397ce97a668c9 100644 --- a/internal/gitaly/service/operations/revert.go +++ b/internal/gitaly/service/operations/revert.go @@ -8,8 +8,8 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/git/remoterepo" - "gitlab.com/gitlab-org/gitaly/v16/internal/git/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/git2go" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) @@ -120,7 +120,7 @@ func (s *Server) UserRevert(ctx context.Context, req *gitalypb.UserRevertRequest } } - if err := s.updateReferenceWithHooks(ctx, req.GetRepository(), req.User, quarantineDir, referenceName, newrev, oldrev); err != nil { + if err := s.updateReferenceWithHooks(ctx, nil, req.GetRepository(), req.User, quarantineDir, referenceName, newrev, oldrev); err != nil { var customHookErr updateref.CustomHookError if errors.As(err, &customHookErr) { return &gitalypb.UserRevertResponse{ diff --git a/internal/gitaly/service/operations/server.go b/internal/gitaly/service/operations/server.go index 865975f7d044912f30912efea14ea15fb0bf782b..b379dacd6fcf82d837cd82db6ce4438adfb118e3 100644 --- a/internal/gitaly/service/operations/server.go +++ b/internal/gitaly/service/operations/server.go @@ -2,6 +2,7 @@ package operations import ( "context" + "fmt" "gitlab.com/gitlab-org/gitaly/v16/client" "gitlab.com/gitlab-org/gitaly/v16/internal/git" @@ -9,9 +10,10 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/git/quarantine" "gitlab.com/gitlab-org/gitaly/v16/internal/git/repository" - "gitlab.com/gitlab-org/gitaly/v16/internal/git/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/git2go" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" @@ -21,14 +23,15 @@ import ( //nolint:revive // This is unintentionally missing documentation. type Server struct { gitalypb.UnimplementedOperationServiceServer - hookManager hook.Manager - txManager transaction.Manager - locator storage.Locator - conns *client.Pool - git2goExecutor *git2go.Executor - gitCmdFactory git.CommandFactory - catfileCache catfile.Cache - updater *updateref.UpdaterWithHooks + hookManager hook.Manager + txManager transaction.Manager + locator storage.Locator + conns *client.Pool + git2goExecutor *git2go.Executor + gitCmdFactory git.CommandFactory + catfileCache catfile.Cache + updater *updateref.UpdaterWithHooks + partitionManager *gitaly.PartitionManager } // NewServer creates a new instance of a grpc OperationServiceServer @@ -41,16 +44,18 @@ func NewServer( gitCmdFactory git.CommandFactory, catfileCache catfile.Cache, updater *updateref.UpdaterWithHooks, + partitionManager *gitaly.PartitionManager, ) *Server { return &Server{ - hookManager: hookManager, - txManager: txManager, - locator: locator, - conns: conns, - git2goExecutor: git2goExecutor, - gitCmdFactory: gitCmdFactory, - catfileCache: catfileCache, - updater: updater, + hookManager: hookManager, + txManager: txManager, + locator: locator, + conns: conns, + git2goExecutor: git2goExecutor, + gitCmdFactory: gitCmdFactory, + catfileCache: catfileCache, + updater: updater, + partitionManager: partitionManager, } } @@ -58,6 +63,35 @@ func (s *Server) localrepo(repo repository.GitRepo) *localrepo.Repo { return localrepo.New(s.locator, s.gitCmdFactory, s.catfileCache, repo) } +func (s *Server) begin(ctx context.Context, repo *gitalypb.Repository) (_ *gitaly.Transaction, _ *quarantine.Dir, _ *localrepo.Repo, _ func() error, returnedErr error) { + if s.partitionManager != nil { + tx, err := s.partitionManager.Begin(ctx, repo) + if err != nil { + return nil, nil, nil, nil, fmt.Errorf("begin: %w", err) + } + defer func() { + if returnedErr != nil { + _ = tx.Rollback() + } + }() + + quarantineDir, err := tx.QuarantineDirectory() + if err != nil { + return nil, nil, nil, nil, fmt.Errorf("quarantine directory: %w", err) + } + + quarantineRepo, err := s.localrepo(repo).Quarantine(quarantineDir) + if err != nil { + return nil, nil, nil, nil, fmt.Errorf("quarantine repo: %w", err) + } + + return tx, nil, quarantineRepo, tx.Rollback, nil + } + + quarantineDir, quarantinedRepo, err := s.quarantinedRepo(ctx, repo) + return nil, quarantineDir, quarantinedRepo, func() error { return nil }, err +} + func (s *Server) quarantinedRepo( ctx context.Context, repo *gitalypb.Repository, ) (*quarantine.Dir, *localrepo.Repo, error) { diff --git a/internal/gitaly/service/operations/submodules.go b/internal/gitaly/service/operations/submodules.go index f5daad03ce0a03bc1e5d237b3d8033f8048b1dbd..af10c681d77423f7b750a711644ba271a0c878ca 100644 --- a/internal/gitaly/service/operations/submodules.go +++ b/internal/gitaly/service/operations/submodules.go @@ -11,8 +11,8 @@ import ( "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" - "gitlab.com/gitlab-org/gitaly/v16/internal/git/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/git2go" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" @@ -258,6 +258,7 @@ func (s *Server) userUpdateSubmodule(ctx context.Context, req *gitalypb.UserUpda if err := s.updateReferenceWithHooks( ctx, + nil, req.GetRepository(), req.GetUser(), quarantineDir, diff --git a/internal/gitaly/service/operations/tags.go b/internal/gitaly/service/operations/tags.go index 944c9132f8c8017a56930f48f84e987094db0c57..b328ade667b38fc5b3fc818ebaa3d0ae447872d2 100644 --- a/internal/gitaly/service/operations/tags.go +++ b/internal/gitaly/service/operations/tags.go @@ -11,8 +11,8 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" - "gitlab.com/gitlab-org/gitaly/v16/internal/git/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" @@ -63,7 +63,7 @@ func (s *Server) UserDeleteTag(ctx context.Context, req *gitalypb.UserDeleteTagR } } - if err := s.updateReferenceWithHooks(ctx, req.Repository, req.User, nil, referenceName, git.ObjectHashSHA1.ZeroOID, revision); err != nil { + if err := s.updateReferenceWithHooks(ctx, nil, req.Repository, req.User, nil, referenceName, git.ObjectHashSHA1.ZeroOID, revision); err != nil { var customHookErr updateref.CustomHookError if errors.As(err, &customHookErr) { return &gitalypb.UserDeleteTagResponse{ @@ -160,7 +160,7 @@ func (s *Server) UserCreateTag(ctx context.Context, req *gitalypb.UserCreateTagR ) } - if err := s.updateReferenceWithHooks(ctx, req.Repository, req.User, quarantineDir, referenceName, tagID, git.ObjectHashSHA1.ZeroOID); err != nil { + if err := s.updateReferenceWithHooks(ctx, nil, req.Repository, req.User, quarantineDir, referenceName, tagID, git.ObjectHashSHA1.ZeroOID); err != nil { var notAllowedError hook.NotAllowedError var customHookErr updateref.CustomHookError var updateRefError updateref.Error diff --git a/internal/gitaly/service/operations/testhelper_test.go b/internal/gitaly/service/operations/testhelper_test.go index ff6ada770a216b25ac1a4011417f89fcedd8856b..8cdab7afc2514a2ff61a7d232c958edf34879098 100644 --- a/internal/gitaly/service/operations/testhelper_test.go +++ b/internal/gitaly/service/operations/testhelper_test.go @@ -101,6 +101,7 @@ func runOperationServiceServer(tb testing.TB, cfg config.Cfg, options ...testser deps.GetGitCmdFactory(), deps.GetCatfileCache(), deps.GetUpdaterWithHooks(), + deps.GetPartitionManager(), ) gitalypb.RegisterOperationServiceServer(srv, operationServer) diff --git a/internal/gitaly/service/operations/update_with_hooks.go b/internal/gitaly/service/operations/update_with_hooks.go index 73df5e153211092b05bc839936c5928ca24b7251..4965edfc44816fa77ab2a0744bc9a2c84e1e50c1 100644 --- a/internal/gitaly/service/operations/update_with_hooks.go +++ b/internal/gitaly/service/operations/update_with_hooks.go @@ -5,11 +5,13 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/git/quarantine" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) func (s *Server) updateReferenceWithHooks( ctx context.Context, + tx *gitaly.Transaction, repo *gitalypb.Repository, user *gitalypb.User, quarantine *quarantine.Dir, @@ -17,5 +19,5 @@ func (s *Server) updateReferenceWithHooks( newrev, oldrev git.ObjectID, pushOptions ...string, ) error { - return s.updater.UpdateReference(ctx, repo, user, quarantine, reference, newrev, oldrev, pushOptions...) + return s.updater.UpdateReference(ctx, tx, repo, user, quarantine, reference, newrev, oldrev, pushOptions...) } diff --git a/internal/gitaly/service/repository/repository_exists_test.go b/internal/gitaly/service/repository/repository_exists_test.go index 70e2e3908152a241a67f5031a222cd653f8d38f5..b325388bed2017fcd1265badc45ed4d97036bed6 100644 --- a/internal/gitaly/service/repository/repository_exists_test.go +++ b/internal/gitaly/service/repository/repository_exists_test.go @@ -21,11 +21,11 @@ func TestRepositoryExists(t *testing.T) { cfgBuilder := testcfg.NewGitalyCfgBuilder(testcfg.WithStorages("default", "other", "broken")) cfg := cfgBuilder.Build(t) - require.NoError(t, os.RemoveAll(cfg.Storages[2].Path), "third storage needs to be invalid") - client, socketPath := runRepositoryService(t, cfg) cfg.SocketPath = socketPath + require.NoError(t, os.RemoveAll(cfg.Storages[2].Path), "third storage needs to be invalid") + repo, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{}) queries := []struct { diff --git a/internal/gitaly/service/server/disk_stats_test.go b/internal/gitaly/service/server/disk_stats_test.go index b445d5499e43c1a33af8940ae705c34064724068..8f014e338b1955121a85d0be8a78def1db83fd63 100644 --- a/internal/gitaly/service/server/disk_stats_test.go +++ b/internal/gitaly/service/server/disk_stats_test.go @@ -4,10 +4,10 @@ package server import ( "math" + "os" "testing" "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" @@ -15,11 +15,11 @@ import ( ) func TestStorageDiskStatistics(t *testing.T) { - cfg := testcfg.Build(t) - - cfg.Storages = append(cfg.Storages, config.Storage{Name: "broken", Path: "/does/not/exist"}) + cfg := testcfg.Build(t, testcfg.WithStorages("default", "broken")) addr := runServer(t, cfg) + require.NoError(t, os.RemoveAll(cfg.Storages[1].Path), "second storage needs to be invalid") + client := newServerClient(t, addr) ctx := testhelper.Context(t) diff --git a/internal/gitaly/service/server/info_test.go b/internal/gitaly/service/server/info_test.go index 78c1e140146942329ffb1da9007a9fa11d1a44d7..19d5c34c2afe3b780b9464275db3b2c7c25cf3bc 100644 --- a/internal/gitaly/service/server/info_test.go +++ b/internal/gitaly/service/server/info_test.go @@ -3,6 +3,7 @@ package server import ( + "os" "testing" "github.com/stretchr/testify/require" @@ -23,11 +24,10 @@ import ( ) func TestGitalyServerInfo(t *testing.T) { - cfg := testcfg.Build(t) - - cfg.Storages = append(cfg.Storages, config.Storage{Name: "broken", Path: "/does/not/exist"}) + cfg := testcfg.Build(t, testcfg.WithStorages("default", "broken")) addr := runServer(t, cfg, testserver.WithDisablePraefect()) + require.NoError(t, os.RemoveAll(cfg.Storages[1].Path), "second storage needs to be invalid") client := newServerClient(t, addr) ctx := testhelper.Context(t) diff --git a/internal/gitaly/service/setup/register.go b/internal/gitaly/service/setup/register.go index 50f0482c19c64de2e63a7c6aa08a3453c01a81db..c83928881e0d802397c5e77b67b6ad60356f7f33 100644 --- a/internal/gitaly/service/setup/register.go +++ b/internal/gitaly/service/setup/register.go @@ -83,6 +83,7 @@ func RegisterAll(srv *grpc.Server, deps *service.Dependencies) { deps.GetGitCmdFactory(), deps.GetCatfileCache(), deps.GetUpdaterWithHooks(), + deps.GetPartitionManager(), )) gitalypb.RegisterRefServiceServer(srv, ref.NewServer( deps.GetLocator(), diff --git a/internal/gitaly/service/smarthttp/receive_pack_test.go b/internal/gitaly/service/smarthttp/receive_pack_test.go index 7e14ca0ebff5f9ae43689d2f489e33bf5b2e3280..a19ff1e3fc0f033376e62d6ccbad7abdd935f2be 100644 --- a/internal/gitaly/service/smarthttp/receive_pack_test.go +++ b/internal/gitaly/service/smarthttp/receive_pack_test.go @@ -18,6 +18,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/git/pktline" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" gitalyhook "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook" "gitlab.com/gitlab-org/gitaly/v16/internal/gitlab" @@ -843,6 +844,7 @@ func TestPostReceivePack_notAllowed(t *testing.T) { func( t *testing.T, ctx context.Context, + tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer, diff --git a/internal/gitaly/transaction_manager.go b/internal/gitaly/transaction_manager.go index 6a7f4f143637ae153c13b4c68b2e54bbbeac3a68..eaffeb4caec0b315231198250f05052b91079e37 100644 --- a/internal/gitaly/transaction_manager.go +++ b/internal/gitaly/transaction_manager.go @@ -115,6 +115,8 @@ type Snapshot struct { // HookIndex is index of the hooks on the disk that are included in this Transactions's snapshot // and were the latest on the read index. HookIndex LogIndex + // HookPath is an absolute filesystem path to the hooks in this snapshot. + HookPath string } // Transaction is a unit-of-work that contains reference changes to perform on the repository. @@ -179,9 +181,16 @@ func (mgr *TransactionManager) Begin(ctx context.Context) (*Transaction, error) snapshot: Snapshot{ ReadIndex: mgr.appendedLogIndex, HookIndex: mgr.hookIndex, + HookPath: hookPathForLogIndex(mgr.repositoryPath, mgr.hookIndex), }, } + // If there are no hooks stored through the WAL yet, then default to the custom hooks + // that may already exist in the repository for backwards compatibility. + if txn.snapshot.HookIndex == 0 { + txn.snapshot.HookPath = filepath.Join(mgr.repositoryPath, "custom_hooks") + } + readReady := mgr.applyNotifications[txn.snapshot.ReadIndex] mgr.mutex.RUnlock() if readReady == nil { @@ -192,6 +201,10 @@ func (mgr *TransactionManager) Begin(ctx context.Context) (*Transaction, error) } txn.initStagingDirectory = func() error { + if txn.stagingDirectory != "" { + return nil + } + stagingDirectory, err := os.MkdirTemp(mgr.stagingDirectory, "") if err != nil { return fmt.Errorf("mkdir temp: %w", err) @@ -400,6 +413,11 @@ type TransactionManager struct { // transactionFinalizer executes when a transaction is completed. transactionFinalizer func() + + // awaitingTransactions contains transactions waiting for their log entry to be applied to + // the repository. It's keyed by the log index the transaction is waiting to be applied and the + // value is the resultChannel that is waiting the result. + awaitingTransactions map[LogIndex]resultChannel } // repository is the localrepo interface used by TransactionManager. @@ -430,6 +448,7 @@ func NewTransactionManager(db *badger.DB, storagePath, relativePath, stagingDir applyNotifications: make(map[LogIndex]chan struct{}), stagingDirectory: stagingDir, transactionFinalizer: transactionFinalizer, + awaitingTransactions: make(map[LogIndex]resultChannel), } } @@ -643,7 +662,7 @@ func (mgr *TransactionManager) processTransaction() (returnedErr error) { return err } - transaction.result <- func() (commitErr error) { + if err := func() (commitErr error) { logEntry := &gitalypb.LogEntry{} var err error @@ -692,7 +711,12 @@ func (mgr *TransactionManager) processTransaction() (returnedErr error) { } return mgr.appendLogEntry(nextLogIndex, logEntry) - }() + }(); err != nil { + transaction.result <- err + return nil + } + + mgr.awaitingTransactions[mgr.appendedLogIndex] = transaction.result return nil } @@ -1050,7 +1074,7 @@ func (mgr *TransactionManager) appendLogEntry(nextLogIndex LogIndex, logEntry *g } // applyLogEntry reads a log entry at the given index and applies it to the repository. -func (mgr *TransactionManager) applyLogEntry(ctx context.Context, logIndex LogIndex) error { +func (mgr *TransactionManager) applyLogEntry(ctx context.Context, logIndex LogIndex) (returnedErr error) { logEntry, err := mgr.readLogEntry(logIndex) if err != nil { return fmt.Errorf("read log entry: %w", err) @@ -1101,6 +1125,13 @@ func (mgr *TransactionManager) applyLogEntry(ctx context.Context, logIndex LogIn delete(mgr.applyNotifications, logIndex) close(notificationCh) + // There is no awaiter for a transaction if the transaction manager is recovering + // transactions from the log after starting up. + if resultChan, ok := mgr.awaitingTransactions[logIndex]; ok { + resultChan <- nil + delete(mgr.awaitingTransactions, logIndex) + } + return nil } @@ -1124,10 +1155,7 @@ func (mgr *TransactionManager) applyCustomHooks(ctx context.Context, logIndex Lo return nil } - syncer := safe.NewSyncer() - - hooksPath := filepath.Join("wal", "hooks") - targetDirectory := filepath.Join(mgr.repositoryPath, hooksPath, logIndex.String()) + targetDirectory := hookPathForLogIndex(mgr.repositoryPath, logIndex) if err := os.Mkdir(targetDirectory, fs.ModePerm); err != nil { // The target directory may exist if we previously tried to extract the // hooks there. TAR overwrites existing files and the hooks files are @@ -1141,19 +1169,26 @@ func (mgr *TransactionManager) applyCustomHooks(ctx context.Context, logIndex Lo return fmt.Errorf("extract hooks: %w", err) } + syncer := safe.NewSyncer() // TAR doesn't sync the extracted files so do it manually here. if err := syncer.SyncRecursive(targetDirectory); err != nil { return fmt.Errorf("sync hooks: %w", err) } // Sync the parent directory as well. - if err := syncer.Sync(filepath.Join(mgr.repositoryPath, hooksPath)); err != nil { + if err := syncer.SyncParent(targetDirectory); err != nil { return fmt.Errorf("sync hook directory: %w", err) } return nil } +// hookPathForLogIndex returns the filesystem paths where the hooks for the +// given log index are stored. +func hookPathForLogIndex(repositoryPath string, logIndex LogIndex) string { + return filepath.Join(repositoryPath, "wal", "hooks", logIndex.String()) +} + // deleteLogEntry deletes the log entry at the given index from the log. func (mgr *TransactionManager) deleteLogEntry(index LogIndex) error { return mgr.deleteKey(keyLogEntry(mgr.relativePath, index)) diff --git a/internal/gitaly/transaction_manager_test.go b/internal/gitaly/transaction_manager_test.go index 966209f6b4877aba381ccbf9af3b59560aeae133..fb3358097dde960cf2d19b5fa7b48b4e61b2b975 100644 --- a/internal/gitaly/transaction_manager_test.go +++ b/internal/gitaly/transaction_manager_test.go @@ -1077,6 +1077,7 @@ func TestTransactionManager(t *testing.T) { CustomHooksUpdate: &CustomHooksUpdate{ CustomHooksTAR: validCustomHooks(t), }, + ExpectedError: ErrTransactionProcessingStopped, }, AssertManager{ ExpectedError: errSimulatedCrash, @@ -1120,6 +1121,7 @@ func TestTransactionManager(t *testing.T) { CustomHooksUpdate: &CustomHooksUpdate{ CustomHooksTAR: validCustomHooks(t), }, + ExpectedError: ErrTransactionProcessingStopped, }, AssertManager{ ExpectedError: errSimulatedCrash, @@ -1302,6 +1304,7 @@ func TestTransactionManager(t *testing.T) { ReferenceUpdates: ReferenceUpdates{ "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, }, + ExpectedError: ErrTransactionProcessingStopped, }, AssertManager{ ExpectedError: errSimulatedCrash, @@ -1346,6 +1349,7 @@ func TestTransactionManager(t *testing.T) { ReferenceUpdates: ReferenceUpdates{ "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, }, + ExpectedError: ErrTransactionProcessingStopped, }, AssertManager{}, StartManager{}, @@ -1388,6 +1392,7 @@ func TestTransactionManager(t *testing.T) { ReferenceUpdates: ReferenceUpdates{ "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, }, + ExpectedError: ErrTransactionProcessingStopped, }, AssertManager{}, StartManager{}, @@ -1510,6 +1515,7 @@ func TestTransactionManager(t *testing.T) { ReferenceUpdates: ReferenceUpdates{ "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, }, + ExpectedError: ErrTransactionProcessingStopped, }, }, expectedState: StateAssertion{ @@ -1748,6 +1754,7 @@ func TestTransactionManager(t *testing.T) { DefaultBranchUpdate: &DefaultBranchUpdate{ Reference: "refs/heads/branch2", }, + ExpectedError: ErrTransactionProcessingStopped, }, AssertManager{ ExpectedError: errSimulatedCrash, @@ -1975,6 +1982,7 @@ func TestTransactionManager(t *testing.T) { "refs/heads/main": {OldOID: setup.Commits.First.OID, NewOID: setup.Commits.Third.OID}, }, QuarantinedPacks: [][]byte{setup.Commits.Third.Pack}, + ExpectedError: ErrTransactionProcessingStopped, }, AssertManager{ ExpectedError: errSimulatedCrash, @@ -2048,6 +2056,7 @@ func TestTransactionManager(t *testing.T) { "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, }, QuarantinedPacks: [][]byte{setup.Commits.First.Pack}, + ExpectedError: ErrTransactionProcessingStopped, }, AssertManager{ ExpectedError: errSimulatedCrash, @@ -2555,7 +2564,13 @@ func TestTransactionManager(t *testing.T) { transaction, err := transactionManager.Begin(beginCtx) require.Equal(t, step.ExpectedError, err) if err == nil { - require.Equal(t, step.ExpectedSnapshot, transaction.Snapshot()) + expectedSnapshot := step.ExpectedSnapshot + expectedSnapshot.HookPath = filepath.Join(repoPath, "custom_hooks") + if expectedSnapshot.HookIndex > 0 { + expectedSnapshot.HookPath = hookPathForLogIndex(repoPath, expectedSnapshot.HookIndex) + } + + require.Equal(t, expectedSnapshot, transaction.Snapshot()) } openTransactions[step.TransactionID] = transaction case Commit: diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go index b9a62e3638cad1d8f5355f12cf36475ed3865c46..432695302096fcf482d080d595fca00df0434ada 100644 --- a/internal/testhelper/testserver/gitaly.go +++ b/internal/testhelper/testserver/gitaly.go @@ -17,13 +17,15 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile" "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v16/internal/git/housekeeping" - "gitlab.com/gitlab-org/gitaly/v16/internal/git/updateref" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/git2go" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly" internalclient "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/client" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config/auth" gitalylog "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config/log" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/server" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" @@ -348,6 +350,18 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) * gsd.housekeepingManager = housekeeping.NewManager(cfg.Prometheus, gsd.txMgr) } + var partitionManager *gitaly.PartitionManager + if _, ok := os.LookupEnv("GITALY_TEST_WAL"); ok { + var err error + partitionManager, err = gitaly.NewPartitionManager( + cfg.Storages, + localrepo.NewFactory(gsd.locator, gsd.gitCmdFactory, gsd.catfileCache), + gsd.logger, + ) + require.NoError(tb, err) + tb.Cleanup(partitionManager.Stop) + } + return &service.Dependencies{ Cfg: cfg, ClientPool: gsd.conns, @@ -366,6 +380,7 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) * Git2goExecutor: gsd.git2goExecutor, UpdaterWithHooks: gsd.updaterWithHooks, HousekeepingManager: gsd.housekeepingManager, + PartitionManager: partitionManager, } }