- Fixed passive fallback logic
- Added active fallback setting notify from uptime monitor
This commit is contained in:
Toby Chui 2025-02-14 22:04:51 +08:00
parent 0abe4c12cf
commit 32f60dfba6
8 changed files with 204 additions and 105 deletions

View File

@ -3,6 +3,7 @@ package loadbalance
import ( import (
"strings" "strings"
"sync" "sync"
"time"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/gorilla/sessions" "github.com/gorilla/sessions"
@ -26,10 +27,11 @@ type Options struct {
type RouteManager struct { type RouteManager struct {
SessionStore *sessions.CookieStore SessionStore *sessions.CookieStore
LoadBalanceMap sync.Map //Sync map to store the last load balance state of a given node OnlineStatus sync.Map //Store the online status notify by uptime monitor
OnlineStatusMap sync.Map //Sync map to store the online status of a given ip address or domain name
onlineStatusTickerStop chan bool //Stopping channel for the online status pinger
Options Options //Options for the load balancer Options Options //Options for the load balancer
cacheTicker *time.Ticker //Ticker for cache cleanup
cacheTickerStop chan bool //Stop the cache cleanup
} }
/* Upstream or Origin Server */ /* Upstream or Origin Server */
@ -55,14 +57,31 @@ func NewLoadBalancer(options *Options) *RouteManager {
options.SystemUUID = uuid.New().String() options.SystemUUID = uuid.New().String()
} }
//Create a ticker for cache cleanup every 12 hours
cacheTicker := time.NewTicker(12 * time.Hour)
cacheTickerStop := make(chan bool)
go func() {
options.Logger.PrintAndLog("LoadBalancer", "Upstream state cache ticker started", nil)
for {
select {
case <-cacheTickerStop:
return
case <-cacheTicker.C:
//Clean up the cache
options.Logger.PrintAndLog("LoadBalancer", "Cleaning up upstream state cache", nil)
}
}
}()
//Generate a session store for stickySession //Generate a session store for stickySession
store := sessions.NewCookieStore([]byte(options.SystemUUID)) store := sessions.NewCookieStore([]byte(options.SystemUUID))
return &RouteManager{ return &RouteManager{
SessionStore: store, SessionStore: store,
LoadBalanceMap: sync.Map{}, OnlineStatus: sync.Map{},
OnlineStatusMap: sync.Map{},
onlineStatusTickerStop: nil,
Options: *options, Options: *options,
cacheTicker: cacheTicker,
cacheTickerStop: cacheTickerStop,
} }
} }
@ -91,10 +110,14 @@ func GetUpstreamsAsString(upstreams []*Upstream) string {
} }
func (m *RouteManager) Close() { func (m *RouteManager) Close() {
if m.onlineStatusTickerStop != nil { //Close the session store
m.onlineStatusTickerStop <- true m.SessionStore.MaxAge(0)
}
//Stop the cache cleanup
if m.cacheTicker != nil {
m.cacheTicker.Stop()
}
close(m.cacheTickerStop)
} }
// Log Println, replace all log.Println or fmt.Println with this // Log Println, replace all log.Println or fmt.Println with this

View File

@ -1,39 +1,71 @@
package loadbalance package loadbalance
import ( import (
"net/http" "strconv"
"strings"
"time" "time"
) )
// Return the last ping status to see if the target is online // Return if the target host is online
func (m *RouteManager) IsTargetOnline(matchingDomainOrIp string) bool { func (m *RouteManager) IsTargetOnline(upstreamIP string) bool {
value, ok := m.LoadBalanceMap.Load(matchingDomainOrIp) value, ok := m.OnlineStatus.Load(upstreamIP)
if !ok { if !ok {
return false // Assume online if not found, also update the map
m.OnlineStatus.Store(upstreamIP, true)
return true
} }
isOnline, ok := value.(bool) isOnline, ok := value.(bool)
return ok && isOnline return ok && isOnline
} }
// Ping a target to see if it is online // Notify the host online state, should be called from uptime monitor
func PingTarget(targetMatchingDomainOrIp string, requireTLS bool) bool { func (m *RouteManager) NotifyHostOnlineState(upstreamIP string, isOnline bool) {
client := &http.Client{ //if the upstream IP contains http or https, strip it
Timeout: 10 * time.Second, upstreamIP = strings.TrimPrefix(upstreamIP, "http://")
upstreamIP = strings.TrimPrefix(upstreamIP, "https://")
//Check previous state and update
if m.IsTargetOnline(upstreamIP) == isOnline {
return
} }
url := targetMatchingDomainOrIp m.OnlineStatus.Store(upstreamIP, isOnline)
if requireTLS { m.println("Updating upstream "+upstreamIP+" online state to "+strconv.FormatBool(isOnline), nil)
url = "https://" + url
} else {
url = "http://" + url
} }
resp, err := client.Get(url) // Set this host unreachable for a given amount of time defined in timeout
if err != nil { // this shall be used in passive fallback. The uptime monitor should call to NotifyHostOnlineState() instead
return false func (m *RouteManager) NotifyHostUnreachableWithTimeout(upstreamIp string, timeout int64) {
//if the upstream IP contains http or https, strip it
upstreamIp = strings.TrimPrefix(upstreamIp, "http://")
upstreamIp = strings.TrimPrefix(upstreamIp, "https://")
if timeout <= 0 {
//Set to the default timeout
timeout = 60
} }
defer resp.Body.Close()
return resp.StatusCode >= 200 && resp.StatusCode <= 600 if !m.IsTargetOnline(upstreamIp) {
//Already offline
return
}
m.OnlineStatus.Store(upstreamIp, false)
m.println("Setting upstream "+upstreamIp+" unreachable for "+strconv.FormatInt(timeout, 10)+"s", nil)
go func() {
//Set the upstream back to online after the timeout
<-time.After(time.Duration(timeout) * time.Second)
m.NotifyHostOnlineState(upstreamIp, true)
}()
}
// FilterOfflineOrigins return only online origins from a list of origins
func (m *RouteManager) FilterOfflineOrigins(origins []*Upstream) []*Upstream {
var onlineOrigins []*Upstream
for _, origin := range origins {
if m.IsTargetOnline(origin.OriginIpOrDomain) {
onlineOrigins = append(onlineOrigins, origin)
}
}
return onlineOrigins
} }

View File

@ -19,12 +19,20 @@ func (m *RouteManager) GetRequestUpstreamTarget(w http.ResponseWriter, r *http.R
if len(origins) == 0 { if len(origins) == 0 {
return nil, errors.New("no upstream is defined for this host") return nil, errors.New("no upstream is defined for this host")
} }
var targetOrigin = origins[0]
//Pick the origin
if useStickySession { if useStickySession {
//Use stick session, check which origins this request previously used //Use stick session, check which origins this request previously used
targetOriginId, err := m.getSessionHandler(r, origins) targetOriginId, err := m.getSessionHandler(r, origins)
if err != nil { if err != nil || !m.IsTargetOnline(origins[targetOriginId].OriginIpOrDomain) {
//No valid session found. Assign a new upstream // No valid session found or origin is offline
// Filter the offline origins
origins = m.FilterOfflineOrigins(origins)
if len(origins) == 0 {
return nil, errors.New("no online upstream is available for origin: " + r.Host)
}
//Get a random origin
targetOrigin, index, err := getRandomUpstreamByWeight(origins) targetOrigin, index, err := getRandomUpstreamByWeight(origins)
if err != nil { if err != nil {
m.println("Unable to get random upstream", err) m.println("Unable to get random upstream", err)
@ -35,23 +43,34 @@ func (m *RouteManager) GetRequestUpstreamTarget(w http.ResponseWriter, r *http.R
return targetOrigin, nil return targetOrigin, nil
} }
//Valid session found. Resume the previous session //Valid session found and origin is online
return origins[targetOriginId], nil return origins[targetOriginId], nil
} else { }
//Do not use stick session. Get a random one //No sticky session, get a random origin
var err error
targetOrigin, _, err = getRandomUpstreamByWeight(origins) //Filter the offline origins
origins = m.FilterOfflineOrigins(origins)
if len(origins) == 0 {
return nil, errors.New("no online upstream is available for origin: " + r.Host)
}
//Get a random origin
targetOrigin, _, err := getRandomUpstreamByWeight(origins)
if err != nil { if err != nil {
m.println("Failed to get next origin", err) m.println("Failed to get next origin", err)
targetOrigin = origins[0] targetOrigin = origins[0]
} }
}
//fmt.Println("DEBUG: Picking origin " + targetOrigin.OriginIpOrDomain) //fmt.Println("DEBUG: Picking origin " + targetOrigin.OriginIpOrDomain)
return targetOrigin, nil return targetOrigin, nil
} }
// GetUsableUpstreamCounts return the number of usable upstreams
func (m *RouteManager) GetUsableUpstreamCounts(origins []*Upstream) int {
origins = m.FilterOfflineOrigins(origins)
return len(origins)
}
/* Features related to session access */ /* Features related to session access */
//Set a new origin for this connection by session //Set a new origin for this connection by session
func (m *RouteManager) setSessionHandler(w http.ResponseWriter, r *http.Request, originIpOrDomain string, index int) error { func (m *RouteManager) setSessionHandler(w http.ResponseWriter, r *http.Request, originIpOrDomain string, index int) error {

View File

@ -1,7 +1,9 @@
package dynamicproxy package dynamicproxy
import ( import (
"context"
"errors" "errors"
"fmt"
"log" "log"
"net" "net"
"net/http" "net/http"
@ -198,14 +200,21 @@ func (h *ProxyHandler) hostRequest(w http.ResponseWriter, r *http.Request, targe
Version: target.parent.Option.HostVersion, Version: target.parent.Option.HostVersion,
}) })
//validate the error
var dnsError *net.DNSError var dnsError *net.DNSError
if err != nil { if err != nil {
if errors.As(err, &dnsError) { if errors.As(err, &dnsError) {
http.ServeFile(w, r, "./web/hosterror.html") http.ServeFile(w, r, "./web/hosterror.html")
h.Parent.logRequest(r, false, 404, "host-http", r.URL.Hostname()) h.Parent.logRequest(r, false, 404, "host-http", r.URL.Hostname())
} else if errors.Is(err, context.Canceled) {
//Request canceled by client, usually due to manual refresh before page load
http.Error(w, "Request canceled", http.StatusRequestTimeout)
h.Parent.logRequest(r, false, http.StatusRequestTimeout, "host-http", r.URL.Hostname())
} else { } else {
//Notify the load balancer that the host is unreachable
fmt.Println(err.Error())
h.Parent.loadBalancer.NotifyHostUnreachableWithTimeout(selectedUpstream.OriginIpOrDomain, PassiveLoadBalanceNotifyTimeout)
http.ServeFile(w, r, "./web/rperror.html") http.ServeFile(w, r, "./web/rperror.html")
//TODO: Take this upstream offline automatically
h.Parent.logRequest(r, false, 521, "host-http", r.URL.Hostname()) h.Parent.logRequest(r, false, 521, "host-http", r.URL.Hostname())
} }
} }

View File

@ -28,6 +28,7 @@ import (
type ProxyType int type ProxyType int
const PassiveLoadBalanceNotifyTimeout = 60 //Time to assume a passive load balance is unreachable, in seconds
const ( const (
ProxyTypeRoot ProxyType = iota //Root Proxy, everything not matching will be routed here ProxyTypeRoot ProxyType = iota //Root Proxy, everything not matching will be routed here
ProxyTypeHost //Host Proxy, match by host (domain) name ProxyTypeHost //Host Proxy, match by host (domain) name

58
src/mod/uptime/typedef.go Normal file
View File

@ -0,0 +1,58 @@
package uptime
import "imuslab.com/zoraxy/mod/info/logger"
const (
logModuleName = "uptime-monitor"
)
type Record struct {
Timestamp int64
ID string
Name string
URL string
Protocol string
Online bool
StatusCode int
Latency int64
}
type ProxyType string
const (
ProxyType_Host ProxyType = "Origin Server"
ProxyType_Vdir ProxyType = "Virtual Directory"
)
type Target struct {
ID string
Name string
URL string
Protocol string
ProxyType ProxyType
}
type Config struct {
Targets []*Target
Interval int
MaxRecordsStore int
OnlineStateNotify func(upstreamIP string, isOnline bool)
Logger *logger.Logger
}
type Monitor struct {
Config *Config
OnlineStatusLog map[string][]*Record
}
// Default configs
var exampleTarget = Target{
ID: "example",
Name: "Example",
URL: "example.com",
Protocol: "https",
}
func defaultNotify(upstreamIP string, isOnline bool) {
// Do nothing
}

View File

@ -14,56 +14,6 @@ import (
"imuslab.com/zoraxy/mod/utils" "imuslab.com/zoraxy/mod/utils"
) )
const (
logModuleName = "uptime-monitor"
)
type Record struct {
Timestamp int64
ID string
Name string
URL string
Protocol string
Online bool
StatusCode int
Latency int64
}
type ProxyType string
const (
ProxyType_Host ProxyType = "Origin Server"
ProxyType_Vdir ProxyType = "Virtual Directory"
)
type Target struct {
ID string
Name string
URL string
Protocol string
ProxyType ProxyType
}
type Config struct {
Targets []*Target
Interval int
MaxRecordsStore int
Logger *logger.Logger
}
type Monitor struct {
Config *Config
OnlineStatusLog map[string][]*Record
}
// Default configs
var exampleTarget = Target{
ID: "example",
Name: "Example",
URL: "example.com",
Protocol: "https",
}
// Create a new uptime monitor // Create a new uptime monitor
func NewUptimeMonitor(config *Config) (*Monitor, error) { func NewUptimeMonitor(config *Config) (*Monitor, error) {
//Create new monitor object //Create new monitor object
@ -77,6 +27,11 @@ func NewUptimeMonitor(config *Config) (*Monitor, error) {
config.Logger, _ = logger.NewFmtLogger() config.Logger, _ = logger.NewFmtLogger()
} }
if config.OnlineStateNotify == nil {
//Use default notify function if not provided
config.OnlineStateNotify = defaultNotify
}
//Start the endpoint listener //Start the endpoint listener
ticker := time.NewTicker(time.Duration(config.Interval) * time.Second) ticker := time.NewTicker(time.Duration(config.Interval) * time.Second)
done := make(chan bool) done := make(chan bool)
@ -218,6 +173,7 @@ func (m *Monitor) getWebsiteStatusWithLatency(url string) (bool, int64, int) {
end := time.Now().UnixNano() / int64(time.Millisecond) end := time.Now().UnixNano() / int64(time.Millisecond)
if err != nil { if err != nil {
m.Config.Logger.PrintAndLog(logModuleName, "Ping upstream timeout. Assume offline", err) m.Config.Logger.PrintAndLog(logModuleName, "Ping upstream timeout. Assume offline", err)
m.Config.OnlineStateNotify(url, false)
return false, 0, 0 return false, 0, 0
} else { } else {
diff := end - start diff := end - start
@ -231,7 +187,7 @@ func (m *Monitor) getWebsiteStatusWithLatency(url string) (bool, int64, int) {
} else { } else {
succ = false succ = false
} }
m.Config.OnlineStateNotify(url, true)
return succ, diff, statusCode return succ, diff, statusCode
} }

View File

@ -166,6 +166,7 @@ func ReverseProxtInit() {
Targets: GetUptimeTargetsFromReverseProxyRules(dynamicProxyRouter), Targets: GetUptimeTargetsFromReverseProxyRules(dynamicProxyRouter),
Interval: 300, //5 minutes Interval: 300, //5 minutes
MaxRecordsStore: 288, //1 day MaxRecordsStore: 288, //1 day
OnlineStateNotify: loadBalancer.NotifyHostOnlineState, //Notify the load balancer for online state
Logger: SystemWideLogger, //Logger Logger: SystemWideLogger, //Logger
}) })