From 30a54cd173c467d7c3854e26d70c6a82c12c2755 Mon Sep 17 00:00:00 2001 From: John Cai Date: Fri, 22 May 2020 15:22:19 -0700 Subject: [PATCH 1/3] Multi node write --- changelogs/unreleased/jc-mult-node-write.yml | 5 + internal/praefect/coordinator.go | 91 ++++++--- internal/praefect/coordinator_test.go | 14 +- .../praefect/grpc-proxy/proxy/director.go | 36 ++-- .../grpc-proxy/proxy/examples_test.go | 5 +- internal/praefect/grpc-proxy/proxy/handler.go | 92 ++++++--- .../praefect/grpc-proxy/proxy/handler_test.go | 13 +- internal/praefect/grpc-proxy/proxy/peeker.go | 6 +- .../praefect/grpc-proxy/proxy/peeker_test.go | 7 +- internal/praefect/server_test.go | 178 ++++++++++++++++++ 10 files changed, 362 insertions(+), 85 deletions(-) create mode 100644 changelogs/unreleased/jc-mult-node-write.yml diff --git a/changelogs/unreleased/jc-mult-node-write.yml b/changelogs/unreleased/jc-mult-node-write.yml new file mode 100644 index 0000000000..0bf485bb35 --- /dev/null +++ b/changelogs/unreleased/jc-mult-node-write.yml @@ -0,0 +1,5 @@ +--- +title: Multi node write +merge_request: 2208 +author: +type: added diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 953918c416..4af15baf3e 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -145,13 +145,18 @@ func (c *Coordinator) accessorStreamParameters(ctx context.Context, call grpcCal } storage := node.GetStorage() - if err := c.rewriteStorageForRepositoryMessage(call.methodInfo, call.msg, call.peeker, storage); err != nil { + b, err := c.rewrittenRepositoryMessage(call.methodInfo, call.msg, storage) + if err != nil { return nil, fmt.Errorf("accessor call: rewrite storage: %w", err) } metrics.ReadDistribution.WithLabelValues(virtualStorage, storage).Inc() - return proxy.NewStreamParameters(ctx, node.GetConnection(), nil, nil), nil + return proxy.NewStreamParameters(proxy.Destination{ + Conn: node.GetConnection(), + Msg: b, + Ctx: helper.IncomingToOutgoing(ctx), + }, nil, nil, nil), nil } func (c *Coordinator) injectTransaction(ctx context.Context, node nodes.Node) (context.Context, func(), error) { @@ -172,6 +177,11 @@ func (c *Coordinator) injectTransaction(ctx context.Context, node nodes.Node) (c return ctx, cancel, nil } +var transactionRPCs = map[string]struct{}{ + "/gitaly.SmartHTTPService/PostReceivePack": {}, + "/gitaly.SSHService/SSHReceivePack": {}, +} + func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall, targetRepo *gitalypb.Repository) (*proxy.StreamParameters, error) { virtualStorage := targetRepo.StorageName @@ -184,7 +194,8 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall return nil, helper.ErrPreconditionFailed(ReadOnlyStorageError(call.targetRepo.GetStorageName())) } - if err = c.rewriteStorageForRepositoryMessage(call.methodInfo, call.msg, call.peeker, shard.Primary.GetStorage()); err != nil { + primaryMessage, err := c.rewrittenRepositoryMessage(call.methodInfo, call.msg, shard.Primary.GetStorage()) + if err != nil { return nil, fmt.Errorf("mutator call: rewrite storage: %w", err) } @@ -195,6 +206,12 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall var finalizers []func() + primaryDest := proxy.Destination{ + Conn: shard.Primary.GetConnection(), + Msg: primaryMessage, + Ctx: helper.IncomingToOutgoing(ctx), + } + if featureflag.IsEnabled(ctx, featureflag.ReferenceTransactions) { var transactionCleanup func() ctx, transactionCleanup, err = c.injectTransaction(ctx, shard.Primary) @@ -202,11 +219,39 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall return nil, err } finalizers = append(finalizers, transactionCleanup) + primaryDest.Ctx = helper.IncomingToOutgoing(ctx) + } + + var secondaryDests []proxy.Destination + + if _, ok := transactionRPCs[call.fullMethodName]; ok && featureflag.IsEnabled(ctx, featureflag.ReferenceTransactions) { + for _, secondary := range shard.Secondaries { + secondaryMsg, err := c.rewrittenRepositoryMessage(call.methodInfo, call.msg, secondary.GetStorage()) + if err != nil { + return nil, err + } + secondaryDest := proxy.Destination{ + Conn: secondary.GetConnection(), + Msg: secondaryMsg, + } + + if featureflag.IsEnabled(ctx, featureflag.ReferenceTransactions) { + var transactionCleanup func() + ctx, transactionCleanup, err = c.injectTransaction(ctx, secondary) + if err != nil { + return nil, err + } + finalizers = append(finalizers, transactionCleanup) + secondaryDest.Ctx = helper.IncomingToOutgoing(ctx) + } + + secondaryDests = append(secondaryDests, secondaryDest) + } } finalizers = append(finalizers, c.createReplicaJobs(ctx, virtualStorage, call.targetRepo, shard.Primary, shard.Secondaries, change, params)) - return proxy.NewStreamParameters(ctx, shard.Primary.GetConnection(), func() { + return proxy.NewStreamParameters(primaryDest, secondaryDests, func() { for _, finalizer := range finalizers { finalizer() } @@ -224,7 +269,12 @@ func (c *Coordinator) StreamDirector(ctx context.Context, fullMethodName string, return nil, err } - m, err := protoMessageFromPeeker(mi, peeker) + frame, err := peeker.Peek() + if err != nil { + return nil, err + } + + m, err := protoMessage(mi, frame) if err != nil { return nil, err } @@ -243,8 +293,8 @@ func (c *Coordinator) StreamDirector(ctx context.Context, fullMethodName string, fullMethodName: fullMethodName, methodInfo: mi, msg: m, - peeker: peeker, - targetRepo: targetRepo}, + targetRepo: targetRepo, + }, ) if err != nil { if errors.Is(err, nodes.ErrVirtualStorageNotExist) { @@ -266,13 +316,17 @@ func (c *Coordinator) StreamDirector(ctx context.Context, fullMethodName string, return nil, err } - return proxy.NewStreamParameters(ctx, shard.Primary.GetConnection(), func() {}, nil), nil + return proxy.NewStreamParameters(proxy.Destination{ + Ctx: helper.IncomingToOutgoing(ctx), + Conn: shard.Primary.GetConnection(), + Msg: frame, + }, nil, func() {}, nil), nil } -func (c *Coordinator) rewriteStorageForRepositoryMessage(mi protoregistry.MethodInfo, m proto.Message, peeker proxy.StreamModifier, storage string) error { +func (c *Coordinator) rewrittenRepositoryMessage(mi protoregistry.MethodInfo, m proto.Message, storage string) ([]byte, error) { targetRepo, err := mi.TargetRepo(m) if err != nil { - return helper.ErrInvalidArgument(err) + return nil, helper.ErrInvalidArgument(err) } // rewrite storage name @@ -280,7 +334,7 @@ func (c *Coordinator) rewriteStorageForRepositoryMessage(mi protoregistry.Method additionalRepo, ok, err := mi.AdditionalRepo(m) if err != nil { - return helper.ErrInvalidArgument(err) + return nil, helper.ErrInvalidArgument(err) } if ok { @@ -289,22 +343,13 @@ func (c *Coordinator) rewriteStorageForRepositoryMessage(mi protoregistry.Method b, err := proxy.NewCodec().Marshal(m) if err != nil { - return err - } - - if err = peeker.Modify(b); err != nil { - return err + return nil, err } - return nil + return b, nil } -func protoMessageFromPeeker(mi protoregistry.MethodInfo, peeker proxy.StreamModifier) (proto.Message, error) { - frame, err := peeker.Peek() - if err != nil { - return nil, err - } - +func protoMessage(mi protoregistry.MethodInfo, frame []byte) (proto.Message, error) { m, err := mi.UnmarshalRequestProto(frame) if err != nil { return nil, err diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 4576738081..7886f9ddc5 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -200,16 +200,16 @@ func TestStreamDirectorMutator(t *testing.T) { peeker := &mockPeeker{frame} streamParams, err := coordinator.StreamDirector(correlation.ContextWithCorrelation(ctx, "my-correlation-id"), fullMethod, peeker) require.NoError(t, err) - require.Equal(t, primaryAddress, streamParams.Conn().Target()) + require.Equal(t, primaryAddress, streamParams.PrimaryConn().Conn.Target()) - md, ok := metadata.FromOutgoingContext(streamParams.Context()) + md, ok := metadata.FromOutgoingContext(streamParams.PrimaryConn().Ctx) require.True(t, ok) require.Contains(t, md, "praefect-server") mi, err := coordinator.registry.LookupMethod(fullMethod) require.NoError(t, err) - m, err := protoMessageFromPeeker(mi, peeker) + m, err := mi.UnmarshalRequestProto(streamParams.PrimaryConn().Msg) require.NoError(t, err) rewrittenTargetRepo, err := mi.TargetRepo(m) @@ -298,16 +298,16 @@ func TestStreamDirectorAccessor(t *testing.T) { peeker := &mockPeeker{frame: frame} streamParams, err := coordinator.StreamDirector(correlation.ContextWithCorrelation(ctx, "my-correlation-id"), fullMethod, peeker) require.NoError(t, err) - require.Equal(t, secondaryAddress, streamParams.Conn().Target()) + require.Equal(t, secondaryAddress, streamParams.PrimaryConn().Conn.Target()) - md, ok := metadata.FromOutgoingContext(streamParams.Context()) + md, ok := metadata.FromOutgoingContext(streamParams.PrimaryConn().Ctx) require.True(t, ok) require.Contains(t, md, "praefect-server") mi, err := coordinator.registry.LookupMethod(fullMethod) require.NoError(t, err) - m, err := protoMessageFromPeeker(mi, peeker) + m, err := mi.UnmarshalRequestProto(streamParams.PrimaryConn().Msg) require.NoError(t, err) rewrittenTargetRepo, err := mi.TargetRepo(m) @@ -393,7 +393,7 @@ func TestAbsentCorrelationID(t *testing.T) { peeker := &mockPeeker{frame} streamParams, err := coordinator.StreamDirector(ctx, fullMethod, peeker) require.NoError(t, err) - require.Equal(t, primaryAddress, streamParams.Conn().Target()) + require.Equal(t, primaryAddress, streamParams.PrimaryConn().Conn.Target()) replEventWait.Add(1) // expected only one event to be created // must be run as it adds replication events to the queue diff --git a/internal/praefect/grpc-proxy/proxy/director.go b/internal/praefect/grpc-proxy/proxy/director.go index 50a0ee63fc..c6d9e11cd9 100644 --- a/internal/praefect/grpc-proxy/proxy/director.go +++ b/internal/praefect/grpc-proxy/proxy/director.go @@ -6,7 +6,6 @@ package proxy import ( "context" - "gitlab.com/gitlab-org/gitaly/internal/helper" "google.golang.org/grpc" ) @@ -28,30 +27,35 @@ type StreamDirector func(ctx context.Context, fullMethodName string, peeker Stre // StreamParameters encapsulates streaming parameters the praefect coordinator returns to the // proxy handler type StreamParameters struct { - ctx context.Context - conn *grpc.ClientConn - reqFinalizer func() - callOptions []grpc.CallOption + primaryConn Destination + reqFinalizer func() + callOptions []grpc.CallOption + secondaryConns []Destination +} + +// Destination contains a client connection as well as a rewritten protobuf message +type Destination struct { + Ctx context.Context + Conn *grpc.ClientConn + Msg []byte } // NewStreamParameters returns a new instance of StreamParameters -func NewStreamParameters(ctx context.Context, conn *grpc.ClientConn, reqFinalizer func(), callOpts []grpc.CallOption) *StreamParameters { +func NewStreamParameters(primaryConn Destination, secondaryConns []Destination, reqFinalizer func(), callOpts []grpc.CallOption) *StreamParameters { return &StreamParameters{ - ctx: helper.IncomingToOutgoing(ctx), - conn: conn, - reqFinalizer: reqFinalizer, - callOptions: callOpts, + primaryConn: primaryConn, + reqFinalizer: reqFinalizer, + callOptions: callOpts, + secondaryConns: secondaryConns, } } -// Context returns the outgoing context -func (s *StreamParameters) Context() context.Context { - return s.ctx +func (s *StreamParameters) PrimaryConn() Destination { + return s.primaryConn } -// Conn returns a grpc client connection -func (s *StreamParameters) Conn() *grpc.ClientConn { - return s.conn +func (s *StreamParameters) SecondaryConns() []Destination { + return s.secondaryConns } // RequestFinalizer calls the request finalizer diff --git a/internal/praefect/grpc-proxy/proxy/examples_test.go b/internal/praefect/grpc-proxy/proxy/examples_test.go index 0f40508843..9ece680bb1 100644 --- a/internal/praefect/grpc-proxy/proxy/examples_test.go +++ b/internal/praefect/grpc-proxy/proxy/examples_test.go @@ -11,6 +11,7 @@ import ( "context" "strings" + "gitlab.com/gitlab-org/gitaly/internal/helper" "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -51,10 +52,10 @@ func ExampleStreamDirector() { if val, exists := md[":authority"]; exists && val[0] == "staging.api.example.com" { // Make sure we use DialContext so the dialing can be cancelled/time out together with the context. conn, err := grpc.DialContext(ctx, "api-service.staging.svc.local", grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.NewCodec()))) - return proxy.NewStreamParameters(ctx, conn, nil, nil), err + return proxy.NewStreamParameters(proxy.Destination{Conn: conn, Ctx: helper.IncomingToOutgoing(ctx)}, nil, nil, nil), err } else if val, exists := md[":authority"]; exists && val[0] == "api.example.com" { conn, err := grpc.DialContext(ctx, "api-service.prod.svc.local", grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.NewCodec()))) - return proxy.NewStreamParameters(ctx, conn, nil, nil), err + return proxy.NewStreamParameters(proxy.Destination{Conn: conn, Ctx: helper.IncomingToOutgoing(ctx)}, nil, nil, nil), err } } return nil, status.Errorf(codes.Unimplemented, "Unknown method") diff --git a/internal/praefect/grpc-proxy/proxy/handler.go b/internal/praefect/grpc-proxy/proxy/handler.go index 5ea376e07c..235806b8e5 100644 --- a/internal/praefect/grpc-proxy/proxy/handler.go +++ b/internal/praefect/grpc-proxy/proxy/handler.go @@ -12,6 +12,7 @@ import ( "io" "gitlab.com/gitlab-org/gitaly/internal/middleware/sentryhandler" + "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -88,6 +89,11 @@ type handler struct { director StreamDirector } +type streamAndMsg struct { + grpc.ClientStream + msg []byte +} + // handler is where the real magic of proxying happens. // It is invoked like any gRPC server stream and uses the gRPC server framing to get and receive bytes from the wire, // forwarding it to a ClientStream established against the relevant ClientConn. @@ -108,18 +114,40 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error defer params.RequestFinalizer() - clientCtx, clientCancel := context.WithCancel(params.Context()) + clientCtx, clientCancel := context.WithCancel(params.PrimaryConn().Ctx) defer clientCancel() // TODO(mwitkow): Add a `forwarded` header to metadata, https://en.wikipedia.org/wiki/X-Forwarded-For. - clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, params.Conn(), fullMethodName, params.CallOptions()...) + + primaryClientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, params.PrimaryConn().Conn, fullMethodName, params.CallOptions()...) if err != nil { return err } + + var allClientStreams []streamAndMsg + for _, conn := range params.SecondaryConns() { + secondaryClientStream, err := grpc.NewClientStream(conn.Ctx, clientStreamDescForProxying, conn.Conn, fullMethodName, params.CallOptions()...) + if err != nil { + return err + } + allClientStreams = append(allClientStreams, streamAndMsg{ + ClientStream: secondaryClientStream, + msg: conn.Msg, + }) + } + + allClientStreams = append(allClientStreams, streamAndMsg{ + ClientStream: primaryClientStream, + msg: params.PrimaryConn().Msg, + }) + // Explicitly *do not close* s2cErrChan and c2sErrChan, otherwise the select below will not terminate. // Channels do not have to be closed, it is just a control flow mechanism, see + // https://groups.google.com/forum/#!msg/golang-nuts/pZwdYRGxCIk/qpbHxRRPJdUJ - s2cErrChan := s.forwardServerToClient(serverStream, clientStream, peeker.consumedStream) - c2sErrChan := s.forwardClientToServer(clientStream, serverStream) + + s2cErrChan := s.forwardServerToClients(serverStream, allClientStreams) + c2sErrChan := s.forwardClientToServer(primaryClientStream, serverStream) + // We don't know which side is going to stop sending first, so we need a select between the two. for i := 0; i < 2; i++ { select { @@ -127,7 +155,9 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error if s2cErr == io.EOF { // this is the happy case where the sender has encountered io.EOF, and won't be sending anymore./ // the clientStream>serverStream may continue pumping though. - clientStream.CloseSend() + for _, clientStream := range allClientStreams { + clientStream.CloseSend() + } } else { // however, we may have gotten a receive error (stream disconnected, a read error etc) in which case we need // to cancel the clientStream to the backend, let all of its goroutines be freed up by the CancelFunc and @@ -139,7 +169,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error // This happens when the clientStream has nothing else to offer (io.EOF), returned a gRPC error. In those two // cases we may have received Trailers as part of the call. In case of other errors (stream closed) the trailers // will be nil. - trailer := clientStream.Trailer() + trailer := primaryClientStream.Trailer() serverStream.SetTrailer(trailer) // c2sErr will contain RPC error from client code. If not io.EOF return the RPC error as server stream error. if c2sErr != io.EOF { @@ -187,27 +217,26 @@ func (s *handler) forwardClientToServer(src grpc.ClientStream, dst grpc.ServerSt return ret } -func (s *handler) forwardServerToClient(src grpc.ServerStream, dst grpc.ClientStream, consumedStream *partialStream) chan error { - ret := make(chan error, 1) +func (s *handler) forwardServerToClients(src grpc.ServerStream, dsts []streamAndMsg) chan error { + ret := make(chan error, len(dsts)) go func() { - // send any consumed/peeked frames first - for _, frame := range consumedStream.frames { - if frame == nil { - // It is possible for peeked frames to be empty. This most likely - // occurs when the server stream returns an error before the desired - // number of frames can be peeked - break - } - if err := dst.SendMsg(frame); err != nil { - ret <- err - return - } + var g errgroup.Group + for _, dst := range dsts { + dst := dst + g.Go(func() error { + newPayloads := [][]byte{dst.msg} + for _, payload := range newPayloads { + if err := dst.SendMsg(&frame{payload: payload}); err != nil { + return err + } + } + + return nil + }) } - // we may have encountered an error earlier while peeking - if consumedStream.err != nil { - ret <- consumedStream.err - return + if err := g.Wait(); err != nil { + ret <- err } // resume two-way stream after peeked messages @@ -217,9 +246,20 @@ func (s *handler) forwardServerToClient(src grpc.ServerStream, dst grpc.ClientSt ret <- err // this can be io.EOF which is happy case break } - if err := dst.SendMsg(f); err != nil { + + var g errgroup.Group + for _, dst := range dsts { + dst := dst + f := f + g.Go(func() error { + if err := dst.SendMsg(f); err != nil { + return err + } + return nil + }) + } + if err := g.Wait(); err != nil { ret <- err - break } } }() diff --git a/internal/praefect/grpc-proxy/proxy/handler_test.go b/internal/praefect/grpc-proxy/proxy/handler_test.go index 267ed1e85c..d53faa85c6 100644 --- a/internal/praefect/grpc-proxy/proxy/handler_test.go +++ b/internal/praefect/grpc-proxy/proxy/handler_test.go @@ -25,6 +25,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "gitlab.com/gitlab-org/gitaly/client" + "gitlab.com/gitlab-org/gitaly/internal/helper" "gitlab.com/gitlab-org/gitaly/internal/helper/fieldextractors" "gitlab.com/gitlab-org/gitaly/internal/middleware/sentryhandler" "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" @@ -252,15 +253,21 @@ func (s *ProxyHappySuite) SetupSuite() { // Setup of the proxy's Director. s.serverClientConn, err = grpc.Dial(s.serverListener.Addr().String(), grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.NewCodec()))) require.NoError(s.T(), err, "must not error on deferred client Dial") - director := func(ctx context.Context, fullName string, _ proxy.StreamModifier) (*proxy.StreamParameters, error) { + director := func(ctx context.Context, fullName string, peeker proxy.StreamModifier) (*proxy.StreamParameters, error) { + payload, err := peeker.Peek() + if err != nil { + return nil, err + } + md, ok := metadata.FromIncomingContext(ctx) if ok { if _, exists := md[rejectingMdKey]; exists { - return proxy.NewStreamParameters(ctx, nil, nil, nil), status.Errorf(codes.PermissionDenied, "testing rejection") + return proxy.NewStreamParameters(proxy.Destination{Ctx: helper.IncomingToOutgoing(ctx), Msg: payload}, nil, nil, nil), status.Errorf(codes.PermissionDenied, "testing rejection") } } + // Explicitly copy the metadata, otherwise the tests will fail. - return proxy.NewStreamParameters(ctx, s.serverClientConn, nil, nil), nil + return proxy.NewStreamParameters(proxy.Destination{Ctx: helper.IncomingToOutgoing(ctx), Conn: s.serverClientConn, Msg: payload}, nil, nil, nil), nil } s.proxy = grpc.NewServer( diff --git a/internal/praefect/grpc-proxy/proxy/peeker.go b/internal/praefect/grpc-proxy/proxy/peeker.go index 1d1e02df56..d47caa01af 100644 --- a/internal/praefect/grpc-proxy/proxy/peeker.go +++ b/internal/praefect/grpc-proxy/proxy/peeker.go @@ -15,7 +15,7 @@ type StreamModifier interface { // the backend server. Peek() (frame []byte, _ error) - // Modify replaces the peeked payload in the stream with the provided frame. + // Modify replaces the peeked payload in the strea with the provided frame. // If no payload was peeked, an error will be returned. // Note: Modify cannot be called after the director returns. Modify(payload []byte) error @@ -23,7 +23,6 @@ type StreamModifier interface { type partialStream struct { frames []*frame // frames encountered in partial stream - err error // error returned by partial stream } type peeker struct { @@ -70,8 +69,7 @@ func (p peeker) peek(n uint) ([][]byte, error) { for i := 0; i < len(p.consumedStream.frames); i++ { f := &frame{} if err := p.srcStream.RecvMsg(f); err != nil { - p.consumedStream.err = err - break + return nil, err } p.consumedStream.frames[i] = f peekedFrames[i] = f.payload diff --git a/internal/praefect/grpc-proxy/proxy/peeker_test.go b/internal/praefect/grpc-proxy/proxy/peeker_test.go index b5d882ff48..c02860343f 100644 --- a/internal/praefect/grpc-proxy/proxy/peeker_test.go +++ b/internal/praefect/grpc-proxy/proxy/peeker_test.go @@ -9,6 +9,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/helper" "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" testservice "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/testdata" ) @@ -37,7 +38,7 @@ func TestStreamPeeking(t *testing.T) { require.NoError(t, err) require.True(t, proto.Equal(pingReqSent, peekedRequest), "expected to be the same") - return proxy.NewStreamParameters(ctx, backendCC, nil, nil), nil + return proxy.NewStreamParameters(proxy.Destination{Ctx: helper.IncomingToOutgoing(ctx), Conn: backendCC, Msg: peekedMsg}, nil, nil, nil), nil } pingResp := &testservice.PingResponse{ @@ -100,9 +101,7 @@ func TestStreamInjecting(t *testing.T) { newPayload, err := proto.Marshal(peekedRequest) require.NoError(t, err) - require.NoError(t, peeker.Modify(newPayload)) - - return proxy.NewStreamParameters(ctx, backendCC, nil, nil), nil + return proxy.NewStreamParameters(proxy.Destination{Ctx: helper.IncomingToOutgoing(ctx), Conn: backendCC, Msg: newPayload}, nil, nil, nil), nil } pingResp := &testservice.PingResponse{ diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index c4dd3be1df..269a8f5c71 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -2,7 +2,9 @@ package praefect import ( "context" + "io" "io/ioutil" + "net" "os" "path/filepath" "strings" @@ -11,6 +13,8 @@ import ( "time" "github.com/golang/protobuf/proto" + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" + grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" "github.com/sirupsen/logrus" "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/assert" @@ -18,14 +22,24 @@ import ( gconfig "gitlab.com/gitlab-org/gitaly/internal/config" "gitlab.com/gitlab-org/gitaly/internal/git" "gitlab.com/gitlab-org/gitaly/internal/helper" + "gitlab.com/gitlab-org/gitaly/internal/helper/fieldextractors" "gitlab.com/gitlab-org/gitaly/internal/helper/text" + "gitlab.com/gitlab-org/gitaly/internal/metadata/featureflag" + "gitlab.com/gitlab-org/gitaly/internal/middleware/sentryhandler" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" "gitlab.com/gitlab-org/gitaly/internal/praefect/mock" + "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" + "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" + "gitlab.com/gitlab-org/gitaly/internal/praefect/transactions" "gitlab.com/gitlab-org/gitaly/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/internal/testhelper/promtest" "gitlab.com/gitlab-org/gitaly/internal/version" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "google.golang.org/grpc" "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/reflection" ) func TestServerRouteServerAccessor(t *testing.T) { @@ -634,6 +648,170 @@ func TestRepoRename(t *testing.T) { defer func() { require.NoError(t, os.RemoveAll(expNewPath2)) }() } +type mockSmartHttp struct { + m sync.Mutex + methodsCalled map[string]int +} + +func (m *mockSmartHttp) InfoRefsUploadPack(req *gitalypb.InfoRefsRequest, stream gitalypb.SmartHTTPService_InfoRefsUploadPackServer) error { + m.m.Lock() + defer m.m.Unlock() + if m.methodsCalled == nil { + m.methodsCalled = make(map[string]int) + } + + m.methodsCalled["InfoRefsUploadPack"] += 1 + return nil +} + +func (m *mockSmartHttp) InfoRefsReceivePack(req *gitalypb.InfoRefsRequest, stream gitalypb.SmartHTTPService_InfoRefsReceivePackServer) error { + m.m.Lock() + defer m.m.Unlock() + if m.methodsCalled == nil { + m.methodsCalled = make(map[string]int) + } + + m.methodsCalled["InfoRefsReceivePack"] += 1 + return nil +} + +func (m *mockSmartHttp) PostUploadPack(stream gitalypb.SmartHTTPService_PostUploadPackServer) error { + m.m.Lock() + defer m.m.Unlock() + if m.methodsCalled == nil { + m.methodsCalled = make(map[string]int) + } + + m.methodsCalled["PostUploadPack"] += 1 + + return nil +} + +func (m *mockSmartHttp) PostReceivePack(stream gitalypb.SmartHTTPService_PostReceivePackServer) error { + m.m.Lock() + defer m.m.Unlock() + if m.methodsCalled == nil { + m.methodsCalled = make(map[string]int) + } + + m.methodsCalled["PostReceivePack"] += 1 + return nil +} + +func newGrpcServer(t *testing.T, srv gitalypb.SmartHTTPServiceServer) (string, *grpc.Server) { + grpcSrv := testhelper.NewTestGrpcServer(t, nil, nil) + socketPath := testhelper.GetTemporaryGitalySocketFileName() + + gitalypb.RegisterSmartHTTPServiceServer(grpcSrv, srv) + reflection.Register(grpcSrv) + + listener, err := net.Listen("unix", socketPath) + require.NoError(t, err) + + go func() { grpcSrv.Serve(listener) }() + + return socketPath, grpcSrv +} + +func TestProxyWrites(t *testing.T) { + smartHttp0, smartHttp1, smartHttp2 := &mockSmartHttp{}, &mockSmartHttp{}, &mockSmartHttp{} + + socket0, srv0 := newGrpcServer(t, smartHttp0) + defer srv0.Stop() + socket1, srv1 := newGrpcServer(t, smartHttp1) + defer srv1.Stop() + socket2, srv2 := newGrpcServer(t, smartHttp2) + defer srv2.Stop() + + conf := config.Config{ + VirtualStorages: []*config.VirtualStorage{ + { + Name: "default", + Nodes: []*config.Node{ + { + DefaultPrimary: true, + Storage: "praefect-internal-0", + Address: "unix://" + socket0, + }, + { + Storage: "praefect-internal-1", + Address: "unix://" + socket1, + }, + { + Storage: "praefect-internal-2", + Address: "unix://" + socket2, + }, + }, + }, + }, + } + + queue := datastore.NewMemoryReplicationEventQueue(conf) + entry := testhelper.DiscardTestEntry(t) + + nodeMgr, err := nodes.NewManager(entry, conf, nil, queue, promtest.NewMockHistogramVec()) + require.NoError(t, err) + txMgr := transactions.NewManager() + + coordinator := NewCoordinator(queue, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered) + + server := grpc.NewServer( + grpc.CustomCodec(proxy.NewCodec()), + grpc.StreamInterceptor( + grpc_middleware.ChainStreamServer( + // context tags usage is required by sentryhandler.StreamLogHandler + grpc_ctxtags.StreamServerInterceptor(grpc_ctxtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor)), + // sentry middleware to capture errors + sentryhandler.StreamLogHandler, + ), + ), + grpc.UnknownServiceHandler(proxy.TransparentHandler(coordinator.StreamDirector)), + ) + + socket := testhelper.GetTemporaryGitalySocketFileName() + listener, err := net.Listen("unix", socket) + require.NoError(t, err) + + go server.Serve(listener) + defer server.Stop() + + client, _ := newSmartHTTPClient(t, "unix://"+socket) + + ctx, cancel := testhelper.Context() + defer cancel() + + ctx = featureflag.OutgoingCtxWithFeatureFlag(ctx, featureflag.ReferenceTransactions) + + testRepo, _, cleanup := testhelper.NewTestRepo(t) + defer cleanup() + + stream, err := client.PostReceivePack(ctx) + require.NoError(t, err) + require.NoError(t, stream.Send(&gitalypb.PostReceivePackRequest{ + Repository: testRepo, + })) + require.NoError(t, stream.CloseSend()) + + _, err = stream.Recv() + require.Equal(t, io.EOF, err) + + assert.Equal(t, 1, smartHttp0.methodsCalled["PostReceivePack"]) + assert.Equal(t, 1, smartHttp1.methodsCalled["PostReceivePack"]) + assert.Equal(t, 1, smartHttp2.methodsCalled["PostReceivePack"]) +} + +func newSmartHTTPClient(t *testing.T, serverSocketPath string) (gitalypb.SmartHTTPServiceClient, *grpc.ClientConn) { + connOpts := []grpc.DialOption{ + grpc.WithInsecure(), + } + conn, err := grpc.Dial(serverSocketPath, connOpts...) + if err != nil { + t.Fatal(err) + } + + return gitalypb.NewSmartHTTPServiceClient(conn), conn +} + func tempStoragePath(t testing.TB) string { p, err := ioutil.TempDir("", t.Name()) require.NoError(t, err) -- GitLab From 6604f94d8da44699365302e7c7c8c0c1cc30ccb6 Mon Sep 17 00:00:00 2001 From: John Cai Date: Thu, 4 Jun 2020 12:42:34 -0700 Subject: [PATCH 2/3] Remove Modify, and change StreamModifier to StreamPeeker --- internal/praefect/coordinator.go | 4 ++-- internal/praefect/grpc-proxy/proxy/director.go | 2 +- internal/praefect/grpc-proxy/proxy/examples_test.go | 2 +- internal/praefect/grpc-proxy/proxy/handler_test.go | 4 ++-- internal/praefect/grpc-proxy/proxy/peeker.go | 11 +---------- internal/praefect/grpc-proxy/proxy/peeker_test.go | 4 ++-- 6 files changed, 9 insertions(+), 18 deletions(-) diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 4af15baf3e..c0d4ca8a15 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -72,7 +72,7 @@ type grpcCall struct { fullMethodName string methodInfo protoregistry.MethodInfo msg proto.Message - peeker proxy.StreamModifier + peeker proxy.StreamPeeker targetRepo *gitalypb.Repository } @@ -259,7 +259,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall } // streamDirector determines which downstream servers receive requests -func (c *Coordinator) StreamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (*proxy.StreamParameters, error) { +func (c *Coordinator) StreamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) { // For phase 1, we need to route messages based on the storage location // to the appropriate Gitaly node. ctxlogrus.Extract(ctx).Debugf("Stream director received method %s", fullMethodName) diff --git a/internal/praefect/grpc-proxy/proxy/director.go b/internal/praefect/grpc-proxy/proxy/director.go index c6d9e11cd9..ef54fbd81e 100644 --- a/internal/praefect/grpc-proxy/proxy/director.go +++ b/internal/praefect/grpc-proxy/proxy/director.go @@ -22,7 +22,7 @@ import ( // are invoked. So decisions around authorization, monitoring etc. are better to be handled there. // // See the rather rich example. -type StreamDirector func(ctx context.Context, fullMethodName string, peeker StreamModifier) (*StreamParameters, error) +type StreamDirector func(ctx context.Context, fullMethodName string, peeker StreamPeeker) (*StreamParameters, error) // StreamParameters encapsulates streaming parameters the praefect coordinator returns to the // proxy handler diff --git a/internal/praefect/grpc-proxy/proxy/examples_test.go b/internal/praefect/grpc-proxy/proxy/examples_test.go index 9ece680bb1..eb5508de54 100644 --- a/internal/praefect/grpc-proxy/proxy/examples_test.go +++ b/internal/praefect/grpc-proxy/proxy/examples_test.go @@ -41,7 +41,7 @@ func ExampleTransparentHandler() { // Provide sa simple example of a director that shields internal services and dials a staging or production backend. // This is a *very naive* implementation that creates a new connection on every request. Consider using pooling. func ExampleStreamDirector() { - director = func(ctx context.Context, fullMethodName string, _ proxy.StreamModifier) (*proxy.StreamParameters, error) { + director = func(ctx context.Context, fullMethodName string, _ proxy.StreamPeeker) (*proxy.StreamParameters, error) { // Make sure we never forward internal services. if strings.HasPrefix(fullMethodName, "/com.example.internal.") { return nil, status.Errorf(codes.Unimplemented, "Unknown method") diff --git a/internal/praefect/grpc-proxy/proxy/handler_test.go b/internal/praefect/grpc-proxy/proxy/handler_test.go index d53faa85c6..f9ef180a17 100644 --- a/internal/praefect/grpc-proxy/proxy/handler_test.go +++ b/internal/praefect/grpc-proxy/proxy/handler_test.go @@ -253,7 +253,7 @@ func (s *ProxyHappySuite) SetupSuite() { // Setup of the proxy's Director. s.serverClientConn, err = grpc.Dial(s.serverListener.Addr().String(), grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.NewCodec()))) require.NoError(s.T(), err, "must not error on deferred client Dial") - director := func(ctx context.Context, fullName string, peeker proxy.StreamModifier) (*proxy.StreamParameters, error) { + director := func(ctx context.Context, fullName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) { payload, err := peeker.Peek() if err != nil { return nil, err @@ -333,7 +333,7 @@ func TestRegisterStreamHandlers(t *testing.T) { server := grpc.NewServer( grpc.CustomCodec(proxy.NewCodec()), - grpc.UnknownServiceHandler(proxy.TransparentHandler(func(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (*proxy.StreamParameters, error) { + grpc.UnknownServiceHandler(proxy.TransparentHandler(func(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) { return nil, directorCalledError })), ) diff --git a/internal/praefect/grpc-proxy/proxy/peeker.go b/internal/praefect/grpc-proxy/proxy/peeker.go index d47caa01af..c3736ea31f 100644 --- a/internal/praefect/grpc-proxy/proxy/peeker.go +++ b/internal/praefect/grpc-proxy/proxy/peeker.go @@ -9,16 +9,11 @@ import ( // StreamModifier abstracts away the gRPC stream being forwarded so that it can // be inspected and modified. -type StreamModifier interface { +type StreamPeeker interface { // Peek allows a director to peek one message into the request stream without // removing those messages from the stream that will be forwarded to // the backend server. Peek() (frame []byte, _ error) - - // Modify replaces the peeked payload in the strea with the provided frame. - // If no payload was peeked, an error will be returned. - // Note: Modify cannot be called after the director returns. - Modify(payload []byte) error } type partialStream struct { @@ -54,10 +49,6 @@ func (p peeker) Peek() ([]byte, error) { return payloads[0], nil } -func (p peeker) Modify(payload []byte) error { - return p.modify([][]byte{payload}) -} - func (p peeker) peek(n uint) ([][]byte, error) { if n < 1 { return nil, ErrInvalidPeekCount diff --git a/internal/praefect/grpc-proxy/proxy/peeker_test.go b/internal/praefect/grpc-proxy/proxy/peeker_test.go index c02860343f..823cb50f61 100644 --- a/internal/praefect/grpc-proxy/proxy/peeker_test.go +++ b/internal/praefect/grpc-proxy/proxy/peeker_test.go @@ -27,7 +27,7 @@ func TestStreamPeeking(t *testing.T) { pingReqSent := &testservice.PingRequest{Value: "hi"} // director will peek into stream before routing traffic - director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (*proxy.StreamParameters, error) { + director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) { t.Logf("director routing method %s to backend", fullMethodName) peekedMsg, err := peeker.Peek() @@ -86,7 +86,7 @@ func TestStreamInjecting(t *testing.T) { newValue := "bye" // director will peek into stream and change some frames - director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (*proxy.StreamParameters, error) { + director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) { t.Logf("modifying request for method %s", fullMethodName) peekedMsg, err := peeker.Peek() -- GitLab From 274024f910dab28ee09a675d71c83955c9facb0d Mon Sep 17 00:00:00 2001 From: John Cai Date: Thu, 4 Jun 2020 12:12:18 -0700 Subject: [PATCH 3/3] long running goroutines --- internal/praefect/grpc-proxy/proxy/handler.go | 77 +++++++++++-------- 1 file changed, 47 insertions(+), 30 deletions(-) diff --git a/internal/praefect/grpc-proxy/proxy/handler.go b/internal/praefect/grpc-proxy/proxy/handler.go index 235806b8e5..a26bf06516 100644 --- a/internal/praefect/grpc-proxy/proxy/handler.go +++ b/internal/praefect/grpc-proxy/proxy/handler.go @@ -145,26 +145,24 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error // https://groups.google.com/forum/#!msg/golang-nuts/pZwdYRGxCIk/qpbHxRRPJdUJ - s2cErrChan := s.forwardServerToClients(serverStream, allClientStreams) + s2cErrChan, s2cDoneChan := s.forwardServerToClients(serverStream, allClientStreams) c2sErrChan := s.forwardClientToServer(primaryClientStream, serverStream) // We don't know which side is going to stop sending first, so we need a select between the two. for i := 0; i < 2; i++ { select { - case s2cErr := <-s2cErrChan: - if s2cErr == io.EOF { - // this is the happy case where the sender has encountered io.EOF, and won't be sending anymore./ - // the clientStream>serverStream may continue pumping though. - for _, clientStream := range allClientStreams { - clientStream.CloseSend() - } - } else { - // however, we may have gotten a receive error (stream disconnected, a read error etc) in which case we need - // to cancel the clientStream to the backend, let all of its goroutines be freed up by the CancelFunc and - // exit with an error to the stack - clientCancel() - return status.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr) + case <-s2cDoneChan: + // this is the happy case where the sender has encountered io.EOF, and won't be sending anymore./ + // the clientStream>serverStream may continue pumping though. + for _, clientStream := range allClientStreams { + clientStream.CloseSend() } + case s2cErr := <-s2cErrChan: + // however, we may have gotten a receive error (stream disconnected, a read error etc) in which case we need + // to cancel the clientStream to the backend, let all of its goroutines be freed up by the CancelFunc and + // exit with an error to the stack + clientCancel() + return status.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr) case c2sErr := <-c2sErrChan: // This happens when the clientStream has nothing else to offer (io.EOF), returned a gRPC error. In those two // cases we may have received Trailers as part of the call. In case of other errors (stream closed) the trailers @@ -217,8 +215,10 @@ func (s *handler) forwardClientToServer(src grpc.ClientStream, dst grpc.ServerSt return ret } -func (s *handler) forwardServerToClients(src grpc.ServerStream, dsts []streamAndMsg) chan error { +func (s *handler) forwardServerToClients(src grpc.ServerStream, dsts []streamAndMsg) (chan error, chan struct{}) { ret := make(chan error, len(dsts)) + done := make(chan struct{}) + go func() { var g errgroup.Group for _, dst := range dsts { @@ -239,29 +239,46 @@ func (s *handler) forwardServerToClients(src grpc.ServerStream, dsts []streamAnd ret <- err } + var frameChans []chan *frame + for _, dst := range dsts { + dst := dst + frameChan := make(chan *frame) + frameChans = append(frameChans, frameChan) + g.Go(func() error { + for { + f := <-frameChan + if f == nil { + return nil + } + if err := dst.SendMsg(f); err != nil { + return err + } + } + }) + } // resume two-way stream after peeked messages f := &frame{} for i := 0; ; i++ { if err := src.RecvMsg(f); err != nil { - ret <- err // this can be io.EOF which is happy case + for _, frameChan := range frameChans { + close(frameChan) + } + if err != io.EOF { + ret <- err // this can be io.EOF which is happy case + } break } - var g errgroup.Group - for _, dst := range dsts { - dst := dst - f := f - g.Go(func() error { - if err := dst.SendMsg(f); err != nil { - return err - } - return nil - }) - } - if err := g.Wait(); err != nil { - ret <- err + for _, frameChan := range frameChans { + frameChan <- f } } + + if err := g.Wait(); err != nil { + ret <- err + } + + done <- struct{}{} }() - return ret + return ret, done } -- GitLab