diff options
Diffstat (limited to 'workhorse/internal/gitaly/smarthttp.go')
-rw-r--r-- | workhorse/internal/gitaly/smarthttp.go | 139 |
1 files changed, 139 insertions, 0 deletions
diff --git a/workhorse/internal/gitaly/smarthttp.go b/workhorse/internal/gitaly/smarthttp.go new file mode 100644 index 00000000000..d1fe6fae5ba --- /dev/null +++ b/workhorse/internal/gitaly/smarthttp.go @@ -0,0 +1,139 @@ +package gitaly + +import ( + "context" + "fmt" + "io" + + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "gitlab.com/gitlab-org/gitaly/streamio" +) + +type SmartHTTPClient struct { + gitalypb.SmartHTTPServiceClient +} + +func (client *SmartHTTPClient) InfoRefsResponseReader(ctx context.Context, repo *gitalypb.Repository, rpc string, gitConfigOptions []string, gitProtocol string) (io.Reader, error) { + rpcRequest := &gitalypb.InfoRefsRequest{ + Repository: repo, + GitConfigOptions: gitConfigOptions, + GitProtocol: gitProtocol, + } + + switch rpc { + case "git-upload-pack": + stream, err := client.InfoRefsUploadPack(ctx, rpcRequest) + return infoRefsReader(stream), err + case "git-receive-pack": + stream, err := client.InfoRefsReceivePack(ctx, rpcRequest) + return infoRefsReader(stream), err + default: + return nil, fmt.Errorf("InfoRefsResponseWriterTo: Unsupported RPC: %q", rpc) + } +} + +type infoRefsClient interface { + Recv() (*gitalypb.InfoRefsResponse, error) +} + +func infoRefsReader(stream infoRefsClient) io.Reader { + return streamio.NewReader(func() ([]byte, error) { + resp, err := stream.Recv() + return resp.GetData(), err + }) +} + +func (client *SmartHTTPClient) ReceivePack(ctx context.Context, repo *gitalypb.Repository, glId string, glUsername string, glRepository string, gitConfigOptions []string, clientRequest io.Reader, clientResponse io.Writer, gitProtocol string) error { + stream, err := client.PostReceivePack(ctx) + if err != nil { + return err + } + + rpcRequest := &gitalypb.PostReceivePackRequest{ + Repository: repo, + GlId: glId, + GlUsername: glUsername, + GlRepository: glRepository, + GitConfigOptions: gitConfigOptions, + GitProtocol: gitProtocol, + } + + if err := stream.Send(rpcRequest); err != nil { + return fmt.Errorf("initial request: %v", err) + } + + numStreams := 2 + errC := make(chan error, numStreams) + + go func() { + rr := streamio.NewReader(func() ([]byte, error) { + response, err := stream.Recv() + return response.GetData(), err + }) + _, err := io.Copy(clientResponse, rr) + errC <- err + }() + + go func() { + sw := streamio.NewWriter(func(data []byte) error { + return stream.Send(&gitalypb.PostReceivePackRequest{Data: data}) + }) + _, err := io.Copy(sw, clientRequest) + stream.CloseSend() + errC <- err + }() + + for i := 0; i < numStreams; i++ { + if err := <-errC; err != nil { + return err + } + } + + return nil +} + +func (client *SmartHTTPClient) UploadPack(ctx context.Context, repo *gitalypb.Repository, clientRequest io.Reader, clientResponse io.Writer, gitConfigOptions []string, gitProtocol string) error { + stream, err := client.PostUploadPack(ctx) + if err != nil { + return err + } + + rpcRequest := &gitalypb.PostUploadPackRequest{ + Repository: repo, + GitConfigOptions: gitConfigOptions, + GitProtocol: gitProtocol, + } + + if err := stream.Send(rpcRequest); err != nil { + return fmt.Errorf("initial request: %v", err) + } + + numStreams := 2 + errC := make(chan error, numStreams) + + go func() { + rr := streamio.NewReader(func() ([]byte, error) { + response, err := stream.Recv() + return response.GetData(), err + }) + _, err := io.Copy(clientResponse, rr) + errC <- err + }() + + go func() { + sw := streamio.NewWriter(func(data []byte) error { + return stream.Send(&gitalypb.PostUploadPackRequest{Data: data}) + }) + _, err := io.Copy(sw, clientRequest) + stream.CloseSend() + errC <- err + }() + + for i := 0; i < numStreams; i++ { + if err := <-errC; err != nil { + return err + } + } + + return nil +} |