summaryrefslogtreecommitdiff
path: root/workhorse/internal/upstream/upstream.go
diff options
context:
space:
mode:
Diffstat (limited to 'workhorse/internal/upstream/upstream.go')
-rw-r--r--workhorse/internal/upstream/upstream.go146
1 files changed, 117 insertions, 29 deletions
diff --git a/workhorse/internal/upstream/upstream.go b/workhorse/internal/upstream/upstream.go
index c41eb98683b..d6e5e7766b5 100644
--- a/workhorse/internal/upstream/upstream.go
+++ b/workhorse/internal/upstream/upstream.go
@@ -9,21 +9,26 @@ package upstream
import (
"fmt"
"os"
+ "sync"
+ "time"
"net/http"
+ "net/url"
"strings"
"github.com/sirupsen/logrus"
+
"gitlab.com/gitlab-org/labkit/correlation"
- apipkg "gitlab.com/gitlab-org/gitlab-workhorse/internal/api"
- "gitlab.com/gitlab-org/gitlab-workhorse/internal/config"
- "gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
- "gitlab.com/gitlab-org/gitlab-workhorse/internal/log"
- "gitlab.com/gitlab-org/gitlab-workhorse/internal/rejectmethods"
- "gitlab.com/gitlab-org/gitlab-workhorse/internal/upload"
- "gitlab.com/gitlab-org/gitlab-workhorse/internal/upstream/roundtripper"
- "gitlab.com/gitlab-org/gitlab-workhorse/internal/urlprefix"
+ apipkg "gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/config"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/helper"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/log"
+ proxypkg "gitlab.com/gitlab-org/gitlab/workhorse/internal/proxy"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/rejectmethods"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/upload"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/upstream/roundtripper"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/urlprefix"
)
var (
@@ -31,6 +36,7 @@ var (
requestHeaderBlacklist = []string{
upload.RewrittenFieldsHeader,
}
+ geoProxyApiPollingInterval = 10 * time.Second
)
type upstream struct {
@@ -40,8 +46,14 @@ type upstream struct {
RoundTripper http.RoundTripper
CableRoundTripper http.RoundTripper
APIClient *apipkg.API
+ geoProxyBackend *url.URL
+ geoLocalRoutes []routeEntry
+ geoProxyCableRoute routeEntry
+ geoProxyRoute routeEntry
+ geoProxyTestChannel chan struct{}
accessLogger *logrus.Logger
enableGeoProxyFeature bool
+ mu sync.RWMutex
}
func NewUpstream(cfg config.Config, accessLogger *logrus.Logger) http.Handler {
@@ -52,6 +64,9 @@ func newUpstream(cfg config.Config, accessLogger *logrus.Logger, routesCallback
up := upstream{
Config: cfg,
accessLogger: accessLogger,
+ // Kind of a feature flag. See https://gitlab.com/groups/gitlab-org/-/epics/5914#note_564974130
+ enableGeoProxyFeature: os.Getenv("GEO_SECONDARY_PROXY") == "1",
+ geoProxyBackend: &url.URL{},
}
if up.Backend == nil {
up.Backend = DefaultBackend
@@ -70,17 +85,26 @@ func newUpstream(cfg config.Config, accessLogger *logrus.Logger, routesCallback
up.Version,
up.RoundTripper,
)
- // Kind of a feature flag. See https://gitlab.com/groups/gitlab-org/-/epics/5914#note_564974130
- up.enableGeoProxyFeature = os.Getenv("GEO_SECONDARY_PROXY") == "1"
+
routesCallback(&up)
+ if up.enableGeoProxyFeature {
+ go up.pollGeoProxyAPI()
+ }
+
var correlationOpts []correlation.InboundHandlerOption
if cfg.PropagateCorrelationID {
correlationOpts = append(correlationOpts, correlation.WithPropagation())
}
+ if cfg.TrustedCIDRsForPropagation != nil {
+ correlationOpts = append(correlationOpts, correlation.WithCIDRsTrustedForPropagation(cfg.TrustedCIDRsForPropagation))
+ }
+ if cfg.TrustedCIDRsForXForwardedFor != nil {
+ correlationOpts = append(correlationOpts, correlation.WithCIDRsTrustedForXForwardedFor(cfg.TrustedCIDRsForXForwardedFor))
+ }
handler := correlation.InjectCorrelationID(&up, correlationOpts...)
- // TODO: move to LabKit https://gitlab.com/gitlab-org/gitlab-workhorse/-/issues/339
+ // TODO: move to LabKit https://gitlab.com/gitlab-org/gitlab/-/issues/324823
handler = rejectmethods.NewMiddleware(handler)
return handler
}
@@ -118,25 +142,9 @@ func (u *upstream) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
- // Look for a matching route
- var route *routeEntry
+ cleanedPath := prefix.Strip(URIPath)
- if u.enableGeoProxyFeature {
- geoProxyURL, err := u.APIClient.GetGeoProxyURL()
-
- if err == nil {
- log.WithRequest(r).WithFields(log.Fields{"geoProxyURL": geoProxyURL}).Info("Geo Proxy: Set route according to Geo Proxy logic")
- } else if err != apipkg.ErrNotGeoSecondary {
- log.WithRequest(r).WithError(err).Error("Geo Proxy: Unable to determine Geo Proxy URL. Falling back to normal routing")
- }
- }
-
- for _, ro := range u.Routes {
- if ro.isMatch(prefix.Strip(URIPath), r) {
- route = &ro
- break
- }
- }
+ route := u.findRoute(cleanedPath, r)
if route == nil {
// The protocol spec in git/Documentation/technical/http-protocol.txt
@@ -151,3 +159,83 @@ func (u *upstream) ServeHTTP(w http.ResponseWriter, r *http.Request) {
route.handler.ServeHTTP(w, r)
}
+
+func (u *upstream) findRoute(cleanedPath string, r *http.Request) *routeEntry {
+ if u.enableGeoProxyFeature {
+ if route := u.findGeoProxyRoute(cleanedPath, r); route != nil {
+ return route
+ }
+ }
+
+ for _, ro := range u.Routes {
+ if ro.isMatch(cleanedPath, r) {
+ return &ro
+ }
+ }
+
+ return nil
+}
+
+func (u *upstream) findGeoProxyRoute(cleanedPath string, r *http.Request) *routeEntry {
+ u.mu.RLock()
+ defer u.mu.RUnlock()
+
+ if u.geoProxyBackend.String() == "" {
+ log.WithRequest(r).Debug("Geo Proxy: Not a Geo proxy")
+ return nil
+ }
+
+ // Some routes are safe to serve from this GitLab instance
+ for _, ro := range u.geoLocalRoutes {
+ if ro.isMatch(cleanedPath, r) {
+ log.WithRequest(r).Debug("Geo Proxy: Handle this request locally")
+ return &ro
+ }
+ }
+
+ log.WithRequest(r).WithFields(log.Fields{"geoProxyBackend": u.geoProxyBackend}).Debug("Geo Proxy: Forward this request")
+
+ if cleanedPath == "/-/cable" {
+ return &u.geoProxyCableRoute
+ }
+
+ return &u.geoProxyRoute
+}
+
+func (u *upstream) pollGeoProxyAPI() {
+ for {
+ u.callGeoProxyAPI()
+
+ // Notify tests when callGeoProxyAPI() finishes
+ if u.geoProxyTestChannel != nil {
+ u.geoProxyTestChannel <- struct{}{}
+ }
+
+ time.Sleep(geoProxyApiPollingInterval)
+ }
+}
+
+// Calls /api/v4/geo/proxy and sets up routes
+func (u *upstream) callGeoProxyAPI() {
+ geoProxyURL, err := u.APIClient.GetGeoProxyURL()
+ if err != nil {
+ log.WithError(err).WithFields(log.Fields{"geoProxyBackend": u.geoProxyBackend}).Error("Geo Proxy: Unable to determine Geo Proxy URL. Fallback on cached value.")
+ return
+ }
+
+ if u.geoProxyBackend.String() != geoProxyURL.String() {
+ log.WithFields(log.Fields{"oldGeoProxyURL": u.geoProxyBackend, "newGeoProxyURL": geoProxyURL}).Info("Geo Proxy: URL changed")
+ u.updateGeoProxyFields(geoProxyURL)
+ }
+}
+
+func (u *upstream) updateGeoProxyFields(geoProxyURL *url.URL) {
+ u.mu.Lock()
+ defer u.mu.Unlock()
+
+ u.geoProxyBackend = geoProxyURL
+ geoProxyRoundTripper := roundtripper.NewBackendRoundTripper(u.geoProxyBackend, "", u.ProxyHeadersTimeout, u.DevelopmentMode)
+ geoProxyUpstream := proxypkg.NewProxy(u.geoProxyBackend, u.Version, geoProxyRoundTripper)
+ u.geoProxyCableRoute = u.wsRoute(`^/-/cable\z`, geoProxyUpstream)
+ u.geoProxyRoute = u.route("", "", geoProxyUpstream)
+}