From e11956afe6753f542be57876f3e5551e5c64d5b8 Mon Sep 17 00:00:00 2001 From: John Cai Date: Tue, 7 Jan 2020 18:10:22 -0800 Subject: [PATCH 1/2] Add test server to automatically start praefect --- .gitlab-ci.yml | 5 + Makefile | 8 ++ internal/errors/errors.go | 10 ++ internal/praefect/coordinator.go | 16 ++- internal/praefect/datastore/datastore.go | 5 +- internal/praefect/protoregistry/find_oid.go | 2 +- internal/service/blob/get_blob_test.go | 12 +- internal/service/blob/get_blobs_test.go | 8 +- internal/service/blob/lfs_pointers_test.go | 24 ++-- internal/service/blob/testhelper_test.go | 22 ++-- internal/testhelper/testhelper.go | 138 ++++++++++++++++++++ 11 files changed, 209 insertions(+), 41 deletions(-) create mode 100644 internal/errors/errors.go diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index f9dbc908a8..d02ca67e16 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -137,6 +137,11 @@ test:proxy: script: - make test-with-proxies +test:praefect: + <<: *test_definition + script: + - make test-with-praefect + race: <<: *go_test_definition script: diff --git a/Makefile b/Makefile index 10afe2289b..7e220a8fdb 100644 --- a/Makefile +++ b/Makefile @@ -60,6 +60,14 @@ prepare-tests: prepare-build test: prepare-build cd $(BUILD_DIR) && $(MAKE) $@ +.PHONY: test-with-praefect +test-with-praefect: build-praefect + cd $(BUILD_DIR) && GITALY_TEST_PRAEFECT_BIN=1 PRAEFECT_BIN_PATH="$(BUILD_DIR)/bin/praefect" $(MAKE) test + +.PHONY: build-praefect +build-praefect: prepare-build + cd cmd/praefect && go build -o "$(BUILD_DIR)/bin/praefect" + .PHONY: test-with-proxies test-with-proxies: prepare-build cd $(BUILD_DIR) && $(MAKE) $@ diff --git a/internal/errors/errors.go b/internal/errors/errors.go new file mode 100644 index 0000000000..d2efaef524 --- /dev/null +++ b/internal/errors/errors.go @@ -0,0 +1,10 @@ +package errors + +import "errors" + +var ( + // ErrEmptyRepository is returned when an RPC is missing a repository as an argument + ErrEmptyRepository = errors.New("empty Repository") + // ErrInvalidRepository is returned when an RPC has an invalid repository as an argument + ErrInvalidRepository = errors.New("invalid Repository") +) diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index ef9bbdc285..2a4859a8ef 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -12,6 +12,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/golang/protobuf/protoc-gen-go/descriptor" "github.com/sirupsen/logrus" + internalerrs "gitlab.com/gitlab-org/gitaly/internal/errors" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" @@ -19,6 +20,8 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) func isDestructive(methodName string) bool { @@ -80,10 +83,14 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, var requestFinalizer func() var storage string + var getRepoErr error if mi.Scope == protoregistry.ScopeRepository { - storage, requestFinalizer, err = c.getStorageForRepositoryMessage(mi, m, peeker, fullMethodName) - if err != nil { - return nil, err + storage, requestFinalizer, getRepoErr = c.getStorageForRepositoryMessage(mi, m, peeker, fullMethodName) + if getRepoErr != nil { + if getRepoErr == protoregistry.ErrTargetRepoMissing { + return nil, status.Errorf(codes.InvalidArgument, getRepoErr.Error()) + } + return nil, getRepoErr } } else { storage, requestFinalizer, err = c.getAnyStorageNode() @@ -185,6 +192,9 @@ func (c *Coordinator) selectPrimary(mi protoregistry.MethodInfo, targetRepo *git newPrimary, err := c.datastore.PickAPrimary(targetRepo.GetStorageName()) if err != nil { + if err == datastore.ErrNoPrimaryForStorage { + return nil, status.Error(codes.InvalidArgument, internalerrs.ErrInvalidRepository.Error()) + } return nil, fmt.Errorf("could not choose a primary: %v", err) } diff --git a/internal/praefect/datastore/datastore.go b/internal/praefect/datastore/datastore.go index 2830b197a7..c8de9d82c5 100644 --- a/internal/praefect/datastore/datastore.go +++ b/internal/praefect/datastore/datastore.go @@ -179,6 +179,9 @@ func NewInMemory(cfg config.Config) *MemoryDatastore { return m } +// ErrNoPrimaryForStorage indicates a virtual storage has no primary associated with it +var ErrNoPrimaryForStorage = errors.New("no primary for storage") + // PickAPrimary returns the primary configured in the config file func (md *MemoryDatastore) PickAPrimary(virtualStorage string) (models.Node, error) { for _, node := range md.virtualStorages[virtualStorage] { @@ -187,7 +190,7 @@ func (md *MemoryDatastore) PickAPrimary(virtualStorage string) (models.Node, err } } - return models.Node{}, errors.New("no default primaries found") + return models.Node{}, ErrNoPrimaryForStorage } // GetReplicas gets the secondaries for a repository based on the relative path diff --git a/internal/praefect/protoregistry/find_oid.go b/internal/praefect/protoregistry/find_oid.go index c17324a965..3e3db44856 100644 --- a/internal/praefect/protoregistry/find_oid.go +++ b/internal/praefect/protoregistry/find_oid.go @@ -17,7 +17,7 @@ const ( ) // ErrTargetRepoMissing indicates that the target repo is missing or not set -var ErrTargetRepoMissing = errors.New("target repo is not set") +var ErrTargetRepoMissing = errors.New("empty Repository") func reflectFindRepoTarget(pbMsg proto.Message, targetOID []int) (*gitalypb.Repository, error) { msgV, e := reflectFindOID(pbMsg, targetOID) diff --git a/internal/service/blob/get_blob_test.go b/internal/service/blob/get_blob_test.go index 728a1b2479..ca24b07ad2 100644 --- a/internal/service/blob/get_blob_test.go +++ b/internal/service/blob/get_blob_test.go @@ -13,8 +13,8 @@ import ( ) func TestSuccessfulGetBlob(t *testing.T) { - server, serverSocketPath := runBlobServer(t) - defer server.Stop() + stop, serverSocketPath := runBlobServer(t) + defer stop() testRepo, _, cleanupFn := testhelper.NewTestRepo(t) defer cleanupFn() @@ -90,8 +90,8 @@ func TestSuccessfulGetBlob(t *testing.T) { } func TestGetBlobNotFound(t *testing.T) { - server, serverSocketPath := runBlobServer(t) - defer server.Stop() + stop, serverSocketPath := runBlobServer(t) + defer stop() client, conn := newBlobClient(t, serverSocketPath) defer conn.Close() @@ -146,8 +146,8 @@ func getBlob(stream gitalypb.BlobService_GetBlobClient) (int64, string, []byte, } func TestFailedGetBlobRequestDueToValidationError(t *testing.T) { - server, serverSocketPath := runBlobServer(t) - defer server.Stop() + stop, serverSocketPath := runBlobServer(t) + defer stop() testRepo, _, cleanupFn := testhelper.NewTestRepo(t) defer cleanupFn() diff --git a/internal/service/blob/get_blobs_test.go b/internal/service/blob/get_blobs_test.go index 8b71997316..8ecfcb6964 100644 --- a/internal/service/blob/get_blobs_test.go +++ b/internal/service/blob/get_blobs_test.go @@ -13,8 +13,8 @@ import ( ) func TestSuccessfulGetBlobsRequest(t *testing.T) { - server, serverSocketPath := runBlobServer(t) - defer server.Stop() + stop, serverSocketPath := runBlobServer(t) + defer stop() testRepo, testRepoPath, cleanupFn := testhelper.NewTestRepo(t) defer cleanupFn() @@ -133,8 +133,8 @@ func TestSuccessfulGetBlobsRequest(t *testing.T) { } func TestFailedGetBlobsRequestDueToValidation(t *testing.T) { - server, serverSocketPath := runBlobServer(t) - defer server.Stop() + stop, serverSocketPath := runBlobServer(t) + defer stop() testRepo, _, cleanupFn := testhelper.NewTestRepo(t) defer cleanupFn() diff --git a/internal/service/blob/lfs_pointers_test.go b/internal/service/blob/lfs_pointers_test.go index 019caa37c9..7dd31eda24 100644 --- a/internal/service/blob/lfs_pointers_test.go +++ b/internal/service/blob/lfs_pointers_test.go @@ -13,8 +13,8 @@ import ( ) func TestSuccessfulGetLFSPointersRequest(t *testing.T) { - server, serverSocketPath := runBlobServer(t) - defer server.Stop() + stop, serverSocketPath := runBlobServer(t) + defer stop() testRepo, _, cleanupFn := testhelper.NewTestRepo(t) defer cleanupFn() @@ -77,8 +77,8 @@ func TestSuccessfulGetLFSPointersRequest(t *testing.T) { } func TestFailedGetLFSPointersRequestDueToValidations(t *testing.T) { - server, serverSocketPath := runBlobServer(t) - defer server.Stop() + stop, serverSocketPath := runBlobServer(t) + defer stop() testRepo, _, cleanupFn := testhelper.NewTestRepo(t) defer cleanupFn() @@ -125,8 +125,8 @@ func TestFailedGetLFSPointersRequestDueToValidations(t *testing.T) { } func TestSuccessfulGetNewLFSPointersRequest(t *testing.T) { - server, serverSocketPath := runBlobServer(t) - defer server.Stop() + stop, serverSocketPath := runBlobServer(t) + defer stop() client, conn := newBlobClient(t, serverSocketPath) defer conn.Close() @@ -291,8 +291,8 @@ func TestSuccessfulGetNewLFSPointersRequest(t *testing.T) { } func TestFailedGetNewLFSPointersRequestDueToValidations(t *testing.T) { - server, serverSocketPath := runBlobServer(t) - defer server.Stop() + stop, serverSocketPath := runBlobServer(t) + defer stop() client, conn := newBlobClient(t, serverSocketPath) defer conn.Close() @@ -352,8 +352,8 @@ func drainNewPointers(c gitalypb.BlobService_GetNewLFSPointersClient) error { } func TestSuccessfulGetAllLFSPointersRequest(t *testing.T) { - server, serverSocketPath := runBlobServer(t) - defer server.Stop() + stop, serverSocketPath := runBlobServer(t) + defer stop() client, conn := newBlobClient(t, serverSocketPath) defer conn.Close() @@ -423,8 +423,8 @@ func getAllPointers(t *testing.T, c gitalypb.BlobService_GetAllLFSPointersClient } func TestFailedGetAllLFSPointersRequestDueToValidations(t *testing.T) { - server, serverSocketPath := runBlobServer(t) - defer server.Stop() + stop, serverSocketPath := runBlobServer(t) + defer stop() client, conn := newBlobClient(t, serverSocketPath) defer conn.Close() diff --git a/internal/service/blob/testhelper_test.go b/internal/service/blob/testhelper_test.go index c8d9909950..d28ad8eb82 100644 --- a/internal/service/blob/testhelper_test.go +++ b/internal/service/blob/testhelper_test.go @@ -2,10 +2,10 @@ package blob import ( "log" - "net" "os" "testing" + "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/internal/rubyserver" "gitlab.com/gitlab-org/gitaly/internal/testhelper" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" @@ -23,6 +23,7 @@ func testMain(m *testing.M) int { defer testhelper.MustHaveNoChildProcess() testhelper.ConfigureRuby() + if err := rubyServer.Start(); err != nil { log.Fatal(err) } @@ -31,22 +32,15 @@ func testMain(m *testing.M) int { return m.Run() } -func runBlobServer(t *testing.T) (*grpc.Server, string) { - grpcServer := testhelper.NewTestGrpcServer(t, nil, nil) - - serverSocketPath := testhelper.GetTemporaryGitalySocketFileName() - listener, err := net.Listen("unix", serverSocketPath) - - if err != nil { - t.Fatal(err) - } +func runBlobServer(t *testing.T) (func(), string) { + srv := testhelper.NewServer(t, nil, nil) - gitalypb.RegisterBlobServiceServer(grpcServer, &server{ruby: rubyServer}) - reflection.Register(grpcServer) + gitalypb.RegisterBlobServiceServer(srv.GrpcServer(), &server{ruby: rubyServer}) + reflection.Register(srv.GrpcServer()) - go grpcServer.Serve(listener) + require.NoError(t, srv.Start()) - return grpcServer, "unix://" + serverSocketPath + return srv.Stop, "unix://" + srv.Socket() } func newBlobClient(t *testing.T, serverSocketPath string) (gitalypb.BlobServiceClient, *grpc.ClientConn) { diff --git a/internal/testhelper/testhelper.go b/internal/testhelper/testhelper.go index a052c51197..e9de2fd304 100644 --- a/internal/testhelper/testhelper.go +++ b/internal/testhelper/testhelper.go @@ -21,6 +21,7 @@ import ( "testing" "time" + "github.com/BurntSushi/toml" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus" grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" @@ -32,10 +33,13 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/helper/fieldextractors" "gitlab.com/gitlab-org/gitaly/internal/helper/text" gitalylog "gitlab.com/gitlab-org/gitaly/internal/log" + praefectconfig "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/internal/storage" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" "google.golang.org/grpc" "google.golang.org/grpc/codes" + healthpb "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" ) @@ -313,6 +317,121 @@ func GetGitEnvData() (string, error) { return string(gitEnvBytes), nil } +// NewTestServer instantiates a new TestServer +func NewTestServer(srv *grpc.Server) *TestServer { + return &TestServer{ + grpcServer: srv, + } +} + +// TestServer wraps a grpc Server and handles automatically putting a praefect in front of a gitaly instance +// if necessary +type TestServer struct { + grpcServer *grpc.Server + socket string + process *os.Process +} + +// GrpcServer returns the underlying grpc.Server +func (p *TestServer) GrpcServer() *grpc.Server { + return p.grpcServer +} + +// Stop will stop both the grpc server as well as the praefect process +func (p *TestServer) Stop() { + p.grpcServer.Stop() + if p.process != nil { + p.process.Kill() + } +} + +// Socket returns the socket file the test server is listening on +func (p *TestServer) Socket() string { + return p.socket +} + +// Start will start the grpc server as well as spawn a praefect instance if GITALY_TEST_PRAEFECT_BIN is enabled +func (p *TestServer) Start() error { + gitalyServerSocketPath := GetTemporaryGitalySocketFileName() + + listener, err := net.Listen("unix", gitalyServerSocketPath) + if err != nil { + return err + } + + go p.grpcServer.Serve(listener) + + if os.Getenv("GITALY_TEST_PRAEFECT_BIN") == "1" { + tempDir, err := ioutil.TempDir("", "praefect-test-server") + if err != nil { + return err + } + + praefectServerSocketPath := GetTemporaryGitalySocketFileName() + + configFilePath := filepath.Join(tempDir, "config.toml") + configFile, err := os.Create(configFilePath) + if err != nil { + return err + } + defer configFile.Close() + + c := praefectconfig.Config{ + SocketPath: praefectServerSocketPath, + VirtualStorages: []*praefectconfig.VirtualStorage{ + { + Name: "default", + Nodes: []*models.Node{ + { + Storage: "default", + Address: "unix:/" + gitalyServerSocketPath, + DefaultPrimary: true, + }, + }, + }, + }, + } + + if err := toml.NewEncoder(configFile).Encode(&c); err != nil { + return err + } + + cmd := exec.Command(os.Getenv("PRAEFECT_BIN_PATH"), "-config", configFilePath) + cmd.Stderr = os.Stderr + + p.socket = praefectServerSocketPath + + go cmd.Run() + + conn, err := grpc.Dial("unix://"+praefectServerSocketPath, grpc.WithInsecure()) + + if err != nil { + return fmt.Errorf("dial: %v", err) + } + defer conn.Close() + + client := healthpb.NewHealthClient(conn) + ctx, cancel := Context() + defer cancel() + + for { + resp, err := client.Check(ctx, &healthpb.HealthCheckRequest{}) + if err == nil && resp.Status == healthpb.HealthCheckResponse_SERVING { + break + } + time.Sleep(1 * time.Microsecond) + } + + os.Remove(tempDir) + p.process = cmd.Process + + return nil + } + + p.socket = gitalyServerSocketPath + return nil +} + // NewTestGrpcServer creates a GRPC Server for testing purposes func NewTestGrpcServer(tb testing.TB, streamInterceptors []grpc.StreamServerInterceptor, unaryInterceptors []grpc.UnaryServerInterceptor) *grpc.Server { logger := NewTestLogger(tb) @@ -331,6 +450,25 @@ func NewTestGrpcServer(tb testing.TB, streamInterceptors []grpc.StreamServerInte ) } +// NewServer creates a Server for testing purposes +func NewServer(tb testing.TB, streamInterceptors []grpc.StreamServerInterceptor, unaryInterceptors []grpc.UnaryServerInterceptor) *TestServer { + logger := NewTestLogger(tb) + logrusEntry := log.NewEntry(logger).WithField("test", tb.Name()) + + ctxTagger := grpc_ctxtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor) + ctxStreamTagger := grpc_ctxtags.StreamServerInterceptor(ctxTagger) + ctxUnaryTagger := grpc_ctxtags.UnaryServerInterceptor(ctxTagger) + + streamInterceptors = append([]grpc.StreamServerInterceptor{ctxStreamTagger, grpc_logrus.StreamServerInterceptor(logrusEntry)}, streamInterceptors...) + unaryInterceptors = append([]grpc.UnaryServerInterceptor{ctxUnaryTagger, grpc_logrus.UnaryServerInterceptor(logrusEntry)}, unaryInterceptors...) + + return NewTestServer( + grpc.NewServer( + grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(streamInterceptors...)), + grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(unaryInterceptors...)), + )) +} + // MustHaveNoChildProcess panics if it finds a running or finished child // process. It waits for 2 seconds for processes to be cleaned up by other // goroutines. -- GitLab From 80f37b6b0da168f53086d05a1e869081d2d8d11a Mon Sep 17 00:00:00 2001 From: John Cai Date: Tue, 14 Jan 2020 16:14:25 -0800 Subject: [PATCH 2/2] Updating blob tests with new praefect test server --- Makefile | 8 +- _support/Makefile.template | 6 + changelogs/unreleased/jc-praefect-test.yml | 5 + internal/praefect/coordinator.go | 14 +- internal/praefect/protoregistry/find_oid.go | 2 +- internal/service/blob/lfs_pointers.go | 3 +- internal/service/blob/lfs_pointers_test.go | 4 +- internal/testhelper/testhelper.go | 138 --------------- internal/testhelper/testserver.go | 178 ++++++++++++++++++++ 9 files changed, 206 insertions(+), 152 deletions(-) create mode 100644 changelogs/unreleased/jc-praefect-test.yml create mode 100644 internal/testhelper/testserver.go diff --git a/Makefile b/Makefile index 7e220a8fdb..4a49729c27 100644 --- a/Makefile +++ b/Makefile @@ -61,12 +61,8 @@ test: prepare-build cd $(BUILD_DIR) && $(MAKE) $@ .PHONY: test-with-praefect -test-with-praefect: build-praefect - cd $(BUILD_DIR) && GITALY_TEST_PRAEFECT_BIN=1 PRAEFECT_BIN_PATH="$(BUILD_DIR)/bin/praefect" $(MAKE) test - -.PHONY: build-praefect -build-praefect: prepare-build - cd cmd/praefect && go build -o "$(BUILD_DIR)/bin/praefect" +test-with-praefect: prepare-build + cd $(BUILD_DIR) && $(MAKE) $@ .PHONY: test-with-proxies test-with-proxies: prepare-build diff --git a/_support/Makefile.template b/_support/Makefile.template index 118100d6f5..80a3b0457e 100644 --- a/_support/Makefile.template +++ b/_support/Makefile.template @@ -130,6 +130,12 @@ test-with-proxies: prepare-tests @cd {{ .SourceDir }} &&\ go test -tags "$(BUILD_TAGS)" -count=1 -exec {{ .SourceDir }}/_support/bad-proxies {{ .Pkg }}/internal/rubyserver/ + +.PHONY: test-with-praefect +test-with-praefect: build prepare-tests + @cd {{ .SourceDir }} &&\ + GITALY_TEST_PRAEFECT_BIN={{ .BuildDir }}/bin/praefect go test -tags "$(BUILD_TAGS)" -count=1 {{ join .AllPackages " " }} # count=1 bypasses go 1.10 test caching + .PHONY: race-go race-go: prepare-tests @cd {{ .SourceDir }} && go test -tags "$(BUILD_TAGS)" -race {{ join .AllPackages " " }} diff --git a/changelogs/unreleased/jc-praefect-test.yml b/changelogs/unreleased/jc-praefect-test.yml new file mode 100644 index 0000000000..94149b7550 --- /dev/null +++ b/changelogs/unreleased/jc-praefect-test.yml @@ -0,0 +1,5 @@ +--- +title: Add praefect as a transparent pass through for tests +merge_request: 1736 +author: +type: other diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 2a4859a8ef..a476b61c92 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -83,15 +83,21 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, var requestFinalizer func() var storage string - var getRepoErr error if mi.Scope == protoregistry.ScopeRepository { + var getRepoErr error storage, requestFinalizer, getRepoErr = c.getStorageForRepositoryMessage(mi, m, peeker, fullMethodName) + + if getRepoErr == protoregistry.ErrTargetRepoMissing { + return nil, status.Errorf(codes.InvalidArgument, getRepoErr.Error()) + } + if getRepoErr != nil { - if getRepoErr == protoregistry.ErrTargetRepoMissing { - return nil, status.Errorf(codes.InvalidArgument, getRepoErr.Error()) - } return nil, getRepoErr } + + if storage == "" { + return nil, status.Error(codes.InvalidArgument, "storage not found") + } } else { storage, requestFinalizer, err = c.getAnyStorageNode() if err != nil { diff --git a/internal/praefect/protoregistry/find_oid.go b/internal/praefect/protoregistry/find_oid.go index 3e3db44856..8b73d49431 100644 --- a/internal/praefect/protoregistry/find_oid.go +++ b/internal/praefect/protoregistry/find_oid.go @@ -58,7 +58,7 @@ func reflectFindOID(pbMsg proto.Message, targetOID []int) (reflect.Value, error) msgV, err = findProtoField(msgV, fieldNo) if err != nil { return reflect.Value{}, fmt.Errorf( - "unable to descend OID %+v into message %s: %s", + "unable to descend OID %+v into message %s: %v", targetOID, proto.MessageName(pbMsg), err, ) } diff --git a/internal/service/blob/lfs_pointers.go b/internal/service/blob/lfs_pointers.go index 9fffdb7a32..e4b61490f5 100644 --- a/internal/service/blob/lfs_pointers.go +++ b/internal/service/blob/lfs_pointers.go @@ -3,6 +3,7 @@ package blob import ( "fmt" + "gitlab.com/gitlab-org/gitaly/internal/errors" "gitlab.com/gitlab-org/gitaly/internal/git" "gitlab.com/gitlab-org/gitaly/internal/helper" "gitlab.com/gitlab-org/gitaly/internal/rubyserver" @@ -62,7 +63,7 @@ func (s *server) GetLFSPointers(req *gitalypb.GetLFSPointersRequest, stream gita func validateGetLFSPointersRequest(req *gitalypb.GetLFSPointersRequest) error { if req.GetRepository() == nil { - return fmt.Errorf("empty Repository") + return errors.ErrEmptyRepository } if len(req.GetBlobIds()) == 0 { diff --git a/internal/service/blob/lfs_pointers_test.go b/internal/service/blob/lfs_pointers_test.go index 7dd31eda24..7126b35f86 100644 --- a/internal/service/blob/lfs_pointers_test.go +++ b/internal/service/blob/lfs_pointers_test.go @@ -470,8 +470,8 @@ func drainAllPointers(c gitalypb.BlobService_GetAllLFSPointersClient) error { // TestGetAllLFSPointersVerifyScope verifies that this RPC returns all LFS // pointers in a repository, not only ones reachable from the default branch func TestGetAllLFSPointersVerifyScope(t *testing.T) { - server, serverSocketPath := runBlobServer(t) - defer server.Stop() + stop, serverSocketPath := runBlobServer(t) + defer stop() client, conn := newBlobClient(t, serverSocketPath) defer conn.Close() diff --git a/internal/testhelper/testhelper.go b/internal/testhelper/testhelper.go index e9de2fd304..a052c51197 100644 --- a/internal/testhelper/testhelper.go +++ b/internal/testhelper/testhelper.go @@ -21,7 +21,6 @@ import ( "testing" "time" - "github.com/BurntSushi/toml" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus" grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" @@ -33,13 +32,10 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/helper/fieldextractors" "gitlab.com/gitlab-org/gitaly/internal/helper/text" gitalylog "gitlab.com/gitlab-org/gitaly/internal/log" - praefectconfig "gitlab.com/gitlab-org/gitaly/internal/praefect/config" - "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/internal/storage" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" "google.golang.org/grpc" "google.golang.org/grpc/codes" - healthpb "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" ) @@ -317,121 +313,6 @@ func GetGitEnvData() (string, error) { return string(gitEnvBytes), nil } -// NewTestServer instantiates a new TestServer -func NewTestServer(srv *grpc.Server) *TestServer { - return &TestServer{ - grpcServer: srv, - } -} - -// TestServer wraps a grpc Server and handles automatically putting a praefect in front of a gitaly instance -// if necessary -type TestServer struct { - grpcServer *grpc.Server - socket string - process *os.Process -} - -// GrpcServer returns the underlying grpc.Server -func (p *TestServer) GrpcServer() *grpc.Server { - return p.grpcServer -} - -// Stop will stop both the grpc server as well as the praefect process -func (p *TestServer) Stop() { - p.grpcServer.Stop() - if p.process != nil { - p.process.Kill() - } -} - -// Socket returns the socket file the test server is listening on -func (p *TestServer) Socket() string { - return p.socket -} - -// Start will start the grpc server as well as spawn a praefect instance if GITALY_TEST_PRAEFECT_BIN is enabled -func (p *TestServer) Start() error { - gitalyServerSocketPath := GetTemporaryGitalySocketFileName() - - listener, err := net.Listen("unix", gitalyServerSocketPath) - if err != nil { - return err - } - - go p.grpcServer.Serve(listener) - - if os.Getenv("GITALY_TEST_PRAEFECT_BIN") == "1" { - tempDir, err := ioutil.TempDir("", "praefect-test-server") - if err != nil { - return err - } - - praefectServerSocketPath := GetTemporaryGitalySocketFileName() - - configFilePath := filepath.Join(tempDir, "config.toml") - configFile, err := os.Create(configFilePath) - if err != nil { - return err - } - defer configFile.Close() - - c := praefectconfig.Config{ - SocketPath: praefectServerSocketPath, - VirtualStorages: []*praefectconfig.VirtualStorage{ - { - Name: "default", - Nodes: []*models.Node{ - { - Storage: "default", - Address: "unix:/" + gitalyServerSocketPath, - DefaultPrimary: true, - }, - }, - }, - }, - } - - if err := toml.NewEncoder(configFile).Encode(&c); err != nil { - return err - } - - cmd := exec.Command(os.Getenv("PRAEFECT_BIN_PATH"), "-config", configFilePath) - cmd.Stderr = os.Stderr - - p.socket = praefectServerSocketPath - - go cmd.Run() - - conn, err := grpc.Dial("unix://"+praefectServerSocketPath, grpc.WithInsecure()) - - if err != nil { - return fmt.Errorf("dial: %v", err) - } - defer conn.Close() - - client := healthpb.NewHealthClient(conn) - ctx, cancel := Context() - defer cancel() - - for { - resp, err := client.Check(ctx, &healthpb.HealthCheckRequest{}) - if err == nil && resp.Status == healthpb.HealthCheckResponse_SERVING { - break - } - time.Sleep(1 * time.Microsecond) - } - - os.Remove(tempDir) - p.process = cmd.Process - - return nil - } - - p.socket = gitalyServerSocketPath - return nil -} - // NewTestGrpcServer creates a GRPC Server for testing purposes func NewTestGrpcServer(tb testing.TB, streamInterceptors []grpc.StreamServerInterceptor, unaryInterceptors []grpc.UnaryServerInterceptor) *grpc.Server { logger := NewTestLogger(tb) @@ -450,25 +331,6 @@ func NewTestGrpcServer(tb testing.TB, streamInterceptors []grpc.StreamServerInte ) } -// NewServer creates a Server for testing purposes -func NewServer(tb testing.TB, streamInterceptors []grpc.StreamServerInterceptor, unaryInterceptors []grpc.UnaryServerInterceptor) *TestServer { - logger := NewTestLogger(tb) - logrusEntry := log.NewEntry(logger).WithField("test", tb.Name()) - - ctxTagger := grpc_ctxtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor) - ctxStreamTagger := grpc_ctxtags.StreamServerInterceptor(ctxTagger) - ctxUnaryTagger := grpc_ctxtags.UnaryServerInterceptor(ctxTagger) - - streamInterceptors = append([]grpc.StreamServerInterceptor{ctxStreamTagger, grpc_logrus.StreamServerInterceptor(logrusEntry)}, streamInterceptors...) - unaryInterceptors = append([]grpc.UnaryServerInterceptor{ctxUnaryTagger, grpc_logrus.UnaryServerInterceptor(logrusEntry)}, unaryInterceptors...) - - return NewTestServer( - grpc.NewServer( - grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(streamInterceptors...)), - grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(unaryInterceptors...)), - )) -} - // MustHaveNoChildProcess panics if it finds a running or finished child // process. It waits for 2 seconds for processes to be cleaned up by other // goroutines. diff --git a/internal/testhelper/testserver.go b/internal/testhelper/testserver.go new file mode 100644 index 0000000000..4d69fbfc2a --- /dev/null +++ b/internal/testhelper/testserver.go @@ -0,0 +1,178 @@ +package testhelper + +import ( + "context" + "errors" + "fmt" + "io/ioutil" + "net" + "os" + "os/exec" + "path/filepath" + "testing" + "time" + + "github.com/BurntSushi/toml" + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" + grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus" + grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" + log "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/internal/helper/fieldextractors" + praefectconfig "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/models" + "google.golang.org/grpc" + healthpb "google.golang.org/grpc/health/grpc_health_v1" +) + +// NewTestServer instantiates a new TestServer +func NewTestServer(srv *grpc.Server) *TestServer { + return &TestServer{ + grpcServer: srv, + } +} + +// TestServer wraps a grpc Server and handles automatically putting a praefect in front of a gitaly instance +// if necessary +type TestServer struct { + grpcServer *grpc.Server + socket string + process *os.Process +} + +// GrpcServer returns the underlying grpc.Server +func (p *TestServer) GrpcServer() *grpc.Server { + return p.grpcServer +} + +// Stop will stop both the grpc server as well as the praefect process +func (p *TestServer) Stop() { + p.grpcServer.Stop() + if p.process != nil { + p.process.Kill() + } +} + +// Socket returns the socket file the test server is listening on +func (p *TestServer) Socket() string { + return p.socket +} + +// Start will start the grpc server as well as spawn a praefect instance if GITALY_TEST_PRAEFECT_BIN is enabled +func (p *TestServer) Start() error { + gitalyServerSocketPath := GetTemporaryGitalySocketFileName() + + listener, err := net.Listen("unix", gitalyServerSocketPath) + if err != nil { + return err + } + + go p.grpcServer.Serve(listener) + + praefectBinPath, ok := os.LookupEnv("GITALY_TEST_PRAEFECT_BIN") + if !ok { + p.socket = gitalyServerSocketPath + return nil + } + + tempDir, err := ioutil.TempDir("", "praefect-test-server") + if err != nil { + return err + } + defer os.RemoveAll(tempDir) + + praefectServerSocketPath := GetTemporaryGitalySocketFileName() + + configFilePath := filepath.Join(tempDir, "config.toml") + configFile, err := os.Create(configFilePath) + if err != nil { + return err + } + defer configFile.Close() + + c := praefectconfig.Config{ + SocketPath: praefectServerSocketPath, + VirtualStorages: []*praefectconfig.VirtualStorage{ + { + Name: "default", + Nodes: []*models.Node{ + { + Storage: "default", + Address: "unix:/" + gitalyServerSocketPath, + DefaultPrimary: true, + }, + }, + }, + }, + } + + if err := toml.NewEncoder(configFile).Encode(&c); err != nil { + return err + } + if err = configFile.Sync(); err != nil { + return err + } + configFile.Close() + + cmd := exec.Command(praefectBinPath, "-config", configFilePath) + cmd.Stderr = os.Stderr + cmd.Stdout = os.Stdout + + p.socket = praefectServerSocketPath + + if err := cmd.Start(); err != nil { + return err + } + go cmd.Wait() + + conn, err := grpc.Dial("unix://"+praefectServerSocketPath, grpc.WithInsecure()) + + if err != nil { + return fmt.Errorf("dial: %v", err) + } + defer conn.Close() + + if err = waitForPraefectStartup(conn); err != nil { + return err + } + + p.process = cmd.Process + + return nil +} + +func waitForPraefectStartup(conn *grpc.ClientConn) error { + client := healthpb.NewHealthClient(conn) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + resp, err := client.Check(ctx, &healthpb.HealthCheckRequest{}, grpc.WaitForReady(true)) + if err != nil { + return err + } + + if resp.Status != healthpb.HealthCheckResponse_SERVING { + return errors.New("server not yet ready to serve") + } + + return nil +} + +// NewServer creates a Server for testing purposes +func NewServer(tb testing.TB, streamInterceptors []grpc.StreamServerInterceptor, unaryInterceptors []grpc.UnaryServerInterceptor) *TestServer { + logger := NewTestLogger(tb) + logrusEntry := log.NewEntry(logger).WithField("test", tb.Name()) + + ctxTagger := grpc_ctxtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor) + ctxStreamTagger := grpc_ctxtags.StreamServerInterceptor(ctxTagger) + ctxUnaryTagger := grpc_ctxtags.UnaryServerInterceptor(ctxTagger) + + streamInterceptors = append([]grpc.StreamServerInterceptor{ctxStreamTagger, grpc_logrus.StreamServerInterceptor(logrusEntry)}, streamInterceptors...) + unaryInterceptors = append([]grpc.UnaryServerInterceptor{ctxUnaryTagger, grpc_logrus.UnaryServerInterceptor(logrusEntry)}, unaryInterceptors...) + + return NewTestServer( + grpc.NewServer( + grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(streamInterceptors...)), + grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(unaryInterceptors...)), + )) +} -- GitLab