From 32f60dfba672609bc00f42971c7daa0fcea79e95 Mon Sep 17 00:00:00 2001 From: Toby Chui Date: Fri, 14 Feb 2025 22:04:51 +0800 Subject: [PATCH] Fixed #523 - Fixed passive fallback logic - Added active fallback setting notify from uptime monitor --- .../dynamicproxy/loadbalance/loadbalance.go | 49 ++++++++---- .../dynamicproxy/loadbalance/onlineStatus.go | 78 +++++++++++++------ .../dynamicproxy/loadbalance/originPicker.go | 43 +++++++--- src/mod/dynamicproxy/proxyRequestHandler.go | 11 ++- src/mod/dynamicproxy/typedef.go | 3 +- src/mod/uptime/typedef.go | 58 ++++++++++++++ src/mod/uptime/uptime.go | 58 ++------------ src/reverseproxy.go | 9 ++- 8 files changed, 204 insertions(+), 105 deletions(-) create mode 100644 src/mod/uptime/typedef.go diff --git a/src/mod/dynamicproxy/loadbalance/loadbalance.go b/src/mod/dynamicproxy/loadbalance/loadbalance.go index b6ab3fc..b1be8e8 100644 --- a/src/mod/dynamicproxy/loadbalance/loadbalance.go +++ b/src/mod/dynamicproxy/loadbalance/loadbalance.go @@ -3,6 +3,7 @@ package loadbalance import ( "strings" "sync" + "time" "github.com/google/uuid" "github.com/gorilla/sessions" @@ -25,11 +26,12 @@ type Options struct { } type RouteManager struct { - SessionStore *sessions.CookieStore - LoadBalanceMap sync.Map //Sync map to store the last load balance state of a given node - OnlineStatusMap sync.Map //Sync map to store the online status of a given ip address or domain name - onlineStatusTickerStop chan bool //Stopping channel for the online status pinger - Options Options //Options for the load balancer + SessionStore *sessions.CookieStore + OnlineStatus sync.Map //Store the online status notify by uptime monitor + Options Options //Options for the load balancer + + cacheTicker *time.Ticker //Ticker for cache cleanup + cacheTickerStop chan bool //Stop the cache cleanup } /* Upstream or Origin Server */ @@ -55,14 +57,31 @@ func NewLoadBalancer(options *Options) *RouteManager { options.SystemUUID = uuid.New().String() } + //Create a ticker for cache cleanup every 12 hours + cacheTicker := time.NewTicker(12 * time.Hour) + cacheTickerStop := make(chan bool) + go func() { + options.Logger.PrintAndLog("LoadBalancer", "Upstream state cache ticker started", nil) + for { + select { + case <-cacheTickerStop: + return + case <-cacheTicker.C: + //Clean up the cache + options.Logger.PrintAndLog("LoadBalancer", "Cleaning up upstream state cache", nil) + } + } + }() + //Generate a session store for stickySession store := sessions.NewCookieStore([]byte(options.SystemUUID)) return &RouteManager{ - SessionStore: store, - LoadBalanceMap: sync.Map{}, - OnlineStatusMap: sync.Map{}, - onlineStatusTickerStop: nil, - Options: *options, + SessionStore: store, + OnlineStatus: sync.Map{}, + Options: *options, + + cacheTicker: cacheTicker, + cacheTickerStop: cacheTickerStop, } } @@ -91,10 +110,14 @@ func GetUpstreamsAsString(upstreams []*Upstream) string { } func (m *RouteManager) Close() { - if m.onlineStatusTickerStop != nil { - m.onlineStatusTickerStop <- true - } + //Close the session store + m.SessionStore.MaxAge(0) + //Stop the cache cleanup + if m.cacheTicker != nil { + m.cacheTicker.Stop() + } + close(m.cacheTickerStop) } // Log Println, replace all log.Println or fmt.Println with this diff --git a/src/mod/dynamicproxy/loadbalance/onlineStatus.go b/src/mod/dynamicproxy/loadbalance/onlineStatus.go index 2dcd4e3..860c266 100644 --- a/src/mod/dynamicproxy/loadbalance/onlineStatus.go +++ b/src/mod/dynamicproxy/loadbalance/onlineStatus.go @@ -1,39 +1,71 @@ package loadbalance import ( - "net/http" + "strconv" + "strings" "time" ) -// Return the last ping status to see if the target is online -func (m *RouteManager) IsTargetOnline(matchingDomainOrIp string) bool { - value, ok := m.LoadBalanceMap.Load(matchingDomainOrIp) +// Return if the target host is online +func (m *RouteManager) IsTargetOnline(upstreamIP string) bool { + value, ok := m.OnlineStatus.Load(upstreamIP) if !ok { - return false + // Assume online if not found, also update the map + m.OnlineStatus.Store(upstreamIP, true) + return true } isOnline, ok := value.(bool) return ok && isOnline } -// Ping a target to see if it is online -func PingTarget(targetMatchingDomainOrIp string, requireTLS bool) bool { - client := &http.Client{ - Timeout: 10 * time.Second, +// Notify the host online state, should be called from uptime monitor +func (m *RouteManager) NotifyHostOnlineState(upstreamIP string, isOnline bool) { + //if the upstream IP contains http or https, strip it + upstreamIP = strings.TrimPrefix(upstreamIP, "http://") + upstreamIP = strings.TrimPrefix(upstreamIP, "https://") + + //Check previous state and update + if m.IsTargetOnline(upstreamIP) == isOnline { + return } - url := targetMatchingDomainOrIp - if requireTLS { - url = "https://" + url - } else { - url = "http://" + url - } - - resp, err := client.Get(url) - if err != nil { - return false - } - defer resp.Body.Close() - - return resp.StatusCode >= 200 && resp.StatusCode <= 600 + m.OnlineStatus.Store(upstreamIP, isOnline) + m.println("Updating upstream "+upstreamIP+" online state to "+strconv.FormatBool(isOnline), nil) +} + +// Set this host unreachable for a given amount of time defined in timeout +// this shall be used in passive fallback. The uptime monitor should call to NotifyHostOnlineState() instead +func (m *RouteManager) NotifyHostUnreachableWithTimeout(upstreamIp string, timeout int64) { + //if the upstream IP contains http or https, strip it + upstreamIp = strings.TrimPrefix(upstreamIp, "http://") + upstreamIp = strings.TrimPrefix(upstreamIp, "https://") + if timeout <= 0 { + //Set to the default timeout + timeout = 60 + } + + if !m.IsTargetOnline(upstreamIp) { + //Already offline + return + } + + m.OnlineStatus.Store(upstreamIp, false) + m.println("Setting upstream "+upstreamIp+" unreachable for "+strconv.FormatInt(timeout, 10)+"s", nil) + go func() { + //Set the upstream back to online after the timeout + <-time.After(time.Duration(timeout) * time.Second) + m.NotifyHostOnlineState(upstreamIp, true) + }() +} + +// FilterOfflineOrigins return only online origins from a list of origins +func (m *RouteManager) FilterOfflineOrigins(origins []*Upstream) []*Upstream { + var onlineOrigins []*Upstream + for _, origin := range origins { + if m.IsTargetOnline(origin.OriginIpOrDomain) { + onlineOrigins = append(onlineOrigins, origin) + } + } + return onlineOrigins } diff --git a/src/mod/dynamicproxy/loadbalance/originPicker.go b/src/mod/dynamicproxy/loadbalance/originPicker.go index cbc294d..92d0cfd 100644 --- a/src/mod/dynamicproxy/loadbalance/originPicker.go +++ b/src/mod/dynamicproxy/loadbalance/originPicker.go @@ -19,12 +19,20 @@ func (m *RouteManager) GetRequestUpstreamTarget(w http.ResponseWriter, r *http.R if len(origins) == 0 { return nil, errors.New("no upstream is defined for this host") } - var targetOrigin = origins[0] + + //Pick the origin if useStickySession { //Use stick session, check which origins this request previously used targetOriginId, err := m.getSessionHandler(r, origins) - if err != nil { - //No valid session found. Assign a new upstream + if err != nil || !m.IsTargetOnline(origins[targetOriginId].OriginIpOrDomain) { + // No valid session found or origin is offline + // Filter the offline origins + origins = m.FilterOfflineOrigins(origins) + if len(origins) == 0 { + return nil, errors.New("no online upstream is available for origin: " + r.Host) + } + + //Get a random origin targetOrigin, index, err := getRandomUpstreamByWeight(origins) if err != nil { m.println("Unable to get random upstream", err) @@ -35,23 +43,34 @@ func (m *RouteManager) GetRequestUpstreamTarget(w http.ResponseWriter, r *http.R return targetOrigin, nil } - //Valid session found. Resume the previous session + //Valid session found and origin is online return origins[targetOriginId], nil - } else { - //Do not use stick session. Get a random one - var err error - targetOrigin, _, err = getRandomUpstreamByWeight(origins) - if err != nil { - m.println("Failed to get next origin", err) - targetOrigin = origins[0] - } + } + //No sticky session, get a random origin + //Filter the offline origins + origins = m.FilterOfflineOrigins(origins) + if len(origins) == 0 { + return nil, errors.New("no online upstream is available for origin: " + r.Host) + } + + //Get a random origin + targetOrigin, _, err := getRandomUpstreamByWeight(origins) + if err != nil { + m.println("Failed to get next origin", err) + targetOrigin = origins[0] } //fmt.Println("DEBUG: Picking origin " + targetOrigin.OriginIpOrDomain) return targetOrigin, nil } +// GetUsableUpstreamCounts return the number of usable upstreams +func (m *RouteManager) GetUsableUpstreamCounts(origins []*Upstream) int { + origins = m.FilterOfflineOrigins(origins) + return len(origins) +} + /* Features related to session access */ //Set a new origin for this connection by session func (m *RouteManager) setSessionHandler(w http.ResponseWriter, r *http.Request, originIpOrDomain string, index int) error { diff --git a/src/mod/dynamicproxy/proxyRequestHandler.go b/src/mod/dynamicproxy/proxyRequestHandler.go index 5719fe2..142972d 100644 --- a/src/mod/dynamicproxy/proxyRequestHandler.go +++ b/src/mod/dynamicproxy/proxyRequestHandler.go @@ -1,7 +1,9 @@ package dynamicproxy import ( + "context" "errors" + "fmt" "log" "net" "net/http" @@ -198,14 +200,21 @@ func (h *ProxyHandler) hostRequest(w http.ResponseWriter, r *http.Request, targe Version: target.parent.Option.HostVersion, }) + //validate the error var dnsError *net.DNSError if err != nil { if errors.As(err, &dnsError) { http.ServeFile(w, r, "./web/hosterror.html") h.Parent.logRequest(r, false, 404, "host-http", r.URL.Hostname()) + } else if errors.Is(err, context.Canceled) { + //Request canceled by client, usually due to manual refresh before page load + http.Error(w, "Request canceled", http.StatusRequestTimeout) + h.Parent.logRequest(r, false, http.StatusRequestTimeout, "host-http", r.URL.Hostname()) } else { + //Notify the load balancer that the host is unreachable + fmt.Println(err.Error()) + h.Parent.loadBalancer.NotifyHostUnreachableWithTimeout(selectedUpstream.OriginIpOrDomain, PassiveLoadBalanceNotifyTimeout) http.ServeFile(w, r, "./web/rperror.html") - //TODO: Take this upstream offline automatically h.Parent.logRequest(r, false, 521, "host-http", r.URL.Hostname()) } } diff --git a/src/mod/dynamicproxy/typedef.go b/src/mod/dynamicproxy/typedef.go index 2919a7e..6761674 100644 --- a/src/mod/dynamicproxy/typedef.go +++ b/src/mod/dynamicproxy/typedef.go @@ -28,6 +28,7 @@ import ( type ProxyType int +const PassiveLoadBalanceNotifyTimeout = 60 //Time to assume a passive load balance is unreachable, in seconds const ( ProxyTypeRoot ProxyType = iota //Root Proxy, everything not matching will be routed here ProxyTypeHost //Host Proxy, match by host (domain) name @@ -193,7 +194,7 @@ type ProxyEndpoint struct { DefaultSiteValue string //Fallback routing target, optional //Internal Logic Elements - parent *Router `json:"-"` + parent *Router `json:"-"` Tags []string // Tags for the proxy endpoint } diff --git a/src/mod/uptime/typedef.go b/src/mod/uptime/typedef.go new file mode 100644 index 0000000..d9c552d --- /dev/null +++ b/src/mod/uptime/typedef.go @@ -0,0 +1,58 @@ +package uptime + +import "imuslab.com/zoraxy/mod/info/logger" + +const ( + logModuleName = "uptime-monitor" +) + +type Record struct { + Timestamp int64 + ID string + Name string + URL string + Protocol string + Online bool + StatusCode int + Latency int64 +} + +type ProxyType string + +const ( + ProxyType_Host ProxyType = "Origin Server" + ProxyType_Vdir ProxyType = "Virtual Directory" +) + +type Target struct { + ID string + Name string + URL string + Protocol string + ProxyType ProxyType +} + +type Config struct { + Targets []*Target + Interval int + MaxRecordsStore int + OnlineStateNotify func(upstreamIP string, isOnline bool) + Logger *logger.Logger +} + +type Monitor struct { + Config *Config + OnlineStatusLog map[string][]*Record +} + +// Default configs +var exampleTarget = Target{ + ID: "example", + Name: "Example", + URL: "example.com", + Protocol: "https", +} + +func defaultNotify(upstreamIP string, isOnline bool) { + // Do nothing +} diff --git a/src/mod/uptime/uptime.go b/src/mod/uptime/uptime.go index 44eeef9..7d31da0 100644 --- a/src/mod/uptime/uptime.go +++ b/src/mod/uptime/uptime.go @@ -14,56 +14,6 @@ import ( "imuslab.com/zoraxy/mod/utils" ) -const ( - logModuleName = "uptime-monitor" -) - -type Record struct { - Timestamp int64 - ID string - Name string - URL string - Protocol string - Online bool - StatusCode int - Latency int64 -} - -type ProxyType string - -const ( - ProxyType_Host ProxyType = "Origin Server" - ProxyType_Vdir ProxyType = "Virtual Directory" -) - -type Target struct { - ID string - Name string - URL string - Protocol string - ProxyType ProxyType -} - -type Config struct { - Targets []*Target - Interval int - MaxRecordsStore int - Logger *logger.Logger -} - -type Monitor struct { - Config *Config - OnlineStatusLog map[string][]*Record -} - -// Default configs -var exampleTarget = Target{ - ID: "example", - Name: "Example", - URL: "example.com", - Protocol: "https", -} - // Create a new uptime monitor func NewUptimeMonitor(config *Config) (*Monitor, error) { //Create new monitor object @@ -77,6 +27,11 @@ func NewUptimeMonitor(config *Config) (*Monitor, error) { config.Logger, _ = logger.NewFmtLogger() } + if config.OnlineStateNotify == nil { + //Use default notify function if not provided + config.OnlineStateNotify = defaultNotify + } + //Start the endpoint listener ticker := time.NewTicker(time.Duration(config.Interval) * time.Second) done := make(chan bool) @@ -218,6 +173,7 @@ func (m *Monitor) getWebsiteStatusWithLatency(url string) (bool, int64, int) { end := time.Now().UnixNano() / int64(time.Millisecond) if err != nil { m.Config.Logger.PrintAndLog(logModuleName, "Ping upstream timeout. Assume offline", err) + m.Config.OnlineStateNotify(url, false) return false, 0, 0 } else { diff := end - start @@ -231,7 +187,7 @@ func (m *Monitor) getWebsiteStatusWithLatency(url string) (bool, int64, int) { } else { succ = false } - + m.Config.OnlineStateNotify(url, true) return succ, diff, statusCode } diff --git a/src/reverseproxy.go b/src/reverseproxy.go index 2313bd5..62c8c3a 100644 --- a/src/reverseproxy.go +++ b/src/reverseproxy.go @@ -163,10 +163,11 @@ func ReverseProxtInit() { go func() { //This must be done in go routine to prevent blocking on system startup uptimeMonitor, _ = uptime.NewUptimeMonitor(&uptime.Config{ - Targets: GetUptimeTargetsFromReverseProxyRules(dynamicProxyRouter), - Interval: 300, //5 minutes - MaxRecordsStore: 288, //1 day - Logger: SystemWideLogger, //Logger + Targets: GetUptimeTargetsFromReverseProxyRules(dynamicProxyRouter), + Interval: 300, //5 minutes + MaxRecordsStore: 288, //1 day + OnlineStateNotify: loadBalancer.NotifyHostOnlineState, //Notify the load balancer for online state + Logger: SystemWideLogger, //Logger }) SystemWideLogger.Println("Uptime Monitor background service started")