diff options
Diffstat (limited to 'workhorse/internal/upstream/upstream.go')
-rw-r--r-- | workhorse/internal/upstream/upstream.go | 146 |
1 files changed, 117 insertions, 29 deletions
diff --git a/workhorse/internal/upstream/upstream.go b/workhorse/internal/upstream/upstream.go index c41eb98683b..d6e5e7766b5 100644 --- a/workhorse/internal/upstream/upstream.go +++ b/workhorse/internal/upstream/upstream.go @@ -9,21 +9,26 @@ package upstream import ( "fmt" "os" + "sync" + "time" "net/http" + "net/url" "strings" "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/labkit/correlation" - apipkg "gitlab.com/gitlab-org/gitlab-workhorse/internal/api" - "gitlab.com/gitlab-org/gitlab-workhorse/internal/config" - "gitlab.com/gitlab-org/gitlab-workhorse/internal/helper" - "gitlab.com/gitlab-org/gitlab-workhorse/internal/log" - "gitlab.com/gitlab-org/gitlab-workhorse/internal/rejectmethods" - "gitlab.com/gitlab-org/gitlab-workhorse/internal/upload" - "gitlab.com/gitlab-org/gitlab-workhorse/internal/upstream/roundtripper" - "gitlab.com/gitlab-org/gitlab-workhorse/internal/urlprefix" + apipkg "gitlab.com/gitlab-org/gitlab/workhorse/internal/api" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/config" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/helper" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/log" + proxypkg "gitlab.com/gitlab-org/gitlab/workhorse/internal/proxy" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/rejectmethods" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/upstream/roundtripper" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/urlprefix" ) var ( @@ -31,6 +36,7 @@ var ( requestHeaderBlacklist = []string{ upload.RewrittenFieldsHeader, } + geoProxyApiPollingInterval = 10 * time.Second ) type upstream struct { @@ -40,8 +46,14 @@ type upstream struct { RoundTripper http.RoundTripper CableRoundTripper http.RoundTripper APIClient *apipkg.API + geoProxyBackend *url.URL + geoLocalRoutes []routeEntry + geoProxyCableRoute routeEntry + geoProxyRoute routeEntry + geoProxyTestChannel chan struct{} accessLogger *logrus.Logger enableGeoProxyFeature bool + mu sync.RWMutex } func NewUpstream(cfg config.Config, accessLogger *logrus.Logger) http.Handler { @@ -52,6 +64,9 @@ func newUpstream(cfg config.Config, accessLogger *logrus.Logger, routesCallback up := upstream{ Config: cfg, accessLogger: accessLogger, + // Kind of a feature flag. See https://gitlab.com/groups/gitlab-org/-/epics/5914#note_564974130 + enableGeoProxyFeature: os.Getenv("GEO_SECONDARY_PROXY") == "1", + geoProxyBackend: &url.URL{}, } if up.Backend == nil { up.Backend = DefaultBackend @@ -70,17 +85,26 @@ func newUpstream(cfg config.Config, accessLogger *logrus.Logger, routesCallback up.Version, up.RoundTripper, ) - // Kind of a feature flag. See https://gitlab.com/groups/gitlab-org/-/epics/5914#note_564974130 - up.enableGeoProxyFeature = os.Getenv("GEO_SECONDARY_PROXY") == "1" + routesCallback(&up) + if up.enableGeoProxyFeature { + go up.pollGeoProxyAPI() + } + var correlationOpts []correlation.InboundHandlerOption if cfg.PropagateCorrelationID { correlationOpts = append(correlationOpts, correlation.WithPropagation()) } + if cfg.TrustedCIDRsForPropagation != nil { + correlationOpts = append(correlationOpts, correlation.WithCIDRsTrustedForPropagation(cfg.TrustedCIDRsForPropagation)) + } + if cfg.TrustedCIDRsForXForwardedFor != nil { + correlationOpts = append(correlationOpts, correlation.WithCIDRsTrustedForXForwardedFor(cfg.TrustedCIDRsForXForwardedFor)) + } handler := correlation.InjectCorrelationID(&up, correlationOpts...) - // TODO: move to LabKit https://gitlab.com/gitlab-org/gitlab-workhorse/-/issues/339 + // TODO: move to LabKit https://gitlab.com/gitlab-org/gitlab/-/issues/324823 handler = rejectmethods.NewMiddleware(handler) return handler } @@ -118,25 +142,9 @@ func (u *upstream) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - // Look for a matching route - var route *routeEntry + cleanedPath := prefix.Strip(URIPath) - if u.enableGeoProxyFeature { - geoProxyURL, err := u.APIClient.GetGeoProxyURL() - - if err == nil { - log.WithRequest(r).WithFields(log.Fields{"geoProxyURL": geoProxyURL}).Info("Geo Proxy: Set route according to Geo Proxy logic") - } else if err != apipkg.ErrNotGeoSecondary { - log.WithRequest(r).WithError(err).Error("Geo Proxy: Unable to determine Geo Proxy URL. Falling back to normal routing") - } - } - - for _, ro := range u.Routes { - if ro.isMatch(prefix.Strip(URIPath), r) { - route = &ro - break - } - } + route := u.findRoute(cleanedPath, r) if route == nil { // The protocol spec in git/Documentation/technical/http-protocol.txt @@ -151,3 +159,83 @@ func (u *upstream) ServeHTTP(w http.ResponseWriter, r *http.Request) { route.handler.ServeHTTP(w, r) } + +func (u *upstream) findRoute(cleanedPath string, r *http.Request) *routeEntry { + if u.enableGeoProxyFeature { + if route := u.findGeoProxyRoute(cleanedPath, r); route != nil { + return route + } + } + + for _, ro := range u.Routes { + if ro.isMatch(cleanedPath, r) { + return &ro + } + } + + return nil +} + +func (u *upstream) findGeoProxyRoute(cleanedPath string, r *http.Request) *routeEntry { + u.mu.RLock() + defer u.mu.RUnlock() + + if u.geoProxyBackend.String() == "" { + log.WithRequest(r).Debug("Geo Proxy: Not a Geo proxy") + return nil + } + + // Some routes are safe to serve from this GitLab instance + for _, ro := range u.geoLocalRoutes { + if ro.isMatch(cleanedPath, r) { + log.WithRequest(r).Debug("Geo Proxy: Handle this request locally") + return &ro + } + } + + log.WithRequest(r).WithFields(log.Fields{"geoProxyBackend": u.geoProxyBackend}).Debug("Geo Proxy: Forward this request") + + if cleanedPath == "/-/cable" { + return &u.geoProxyCableRoute + } + + return &u.geoProxyRoute +} + +func (u *upstream) pollGeoProxyAPI() { + for { + u.callGeoProxyAPI() + + // Notify tests when callGeoProxyAPI() finishes + if u.geoProxyTestChannel != nil { + u.geoProxyTestChannel <- struct{}{} + } + + time.Sleep(geoProxyApiPollingInterval) + } +} + +// Calls /api/v4/geo/proxy and sets up routes +func (u *upstream) callGeoProxyAPI() { + geoProxyURL, err := u.APIClient.GetGeoProxyURL() + if err != nil { + log.WithError(err).WithFields(log.Fields{"geoProxyBackend": u.geoProxyBackend}).Error("Geo Proxy: Unable to determine Geo Proxy URL. Fallback on cached value.") + return + } + + if u.geoProxyBackend.String() != geoProxyURL.String() { + log.WithFields(log.Fields{"oldGeoProxyURL": u.geoProxyBackend, "newGeoProxyURL": geoProxyURL}).Info("Geo Proxy: URL changed") + u.updateGeoProxyFields(geoProxyURL) + } +} + +func (u *upstream) updateGeoProxyFields(geoProxyURL *url.URL) { + u.mu.Lock() + defer u.mu.Unlock() + + u.geoProxyBackend = geoProxyURL + geoProxyRoundTripper := roundtripper.NewBackendRoundTripper(u.geoProxyBackend, "", u.ProxyHeadersTimeout, u.DevelopmentMode) + geoProxyUpstream := proxypkg.NewProxy(u.geoProxyBackend, u.Version, geoProxyRoundTripper) + u.geoProxyCableRoute = u.wsRoute(`^/-/cable\z`, geoProxyUpstream) + u.geoProxyRoute = u.route("", "", geoProxyUpstream) +} |