From 2d611a559a344020f1a131d1075c16bbdc4aca0f Mon Sep 17 00:00:00 2001 From: Toby Chui Date: Wed, 2 Jul 2025 21:03:57 +0800 Subject: [PATCH] Optimized structure for stream proxy - Separated instance and config for stream proxy --- src/mod/streamproxy/handler.go | 14 +- src/mod/streamproxy/instances.go | 97 ++++++++++++++ src/mod/streamproxy/streamproxy.go | 169 +++++++----------------- src/mod/streamproxy/streamproxy_test.go | 2 +- src/mod/streamproxy/tcpprox.go | 4 +- src/mod/streamproxy/udpprox.go | 6 +- 6 files changed, 164 insertions(+), 128 deletions(-) create mode 100644 src/mod/streamproxy/instances.go diff --git a/src/mod/streamproxy/handler.go b/src/mod/streamproxy/handler.go index 22d523a..50a7cb4 100644 --- a/src/mod/streamproxy/handler.go +++ b/src/mod/streamproxy/handler.go @@ -89,8 +89,20 @@ func (m *Manager) HandleEditProxyConfigs(w http.ResponseWriter, r *http.Request) } } + // Create a new ProxyRuleUpdateConfig with the extracted parameters + newConfig := &ProxyRuleUpdateConfig{ + InstanceUUID: configUUID, + NewName: newName, + NewListeningAddr: listenAddr, + NewProxyAddr: proxyAddr, + UseTCP: useTCP, + UseUDP: useUDP, + UseProxyProtocol: useProxyProtocol, + NewTimeout: newTimeout, + } + // Call the EditConfig method to modify the configuration - err = m.EditConfig(configUUID, newName, listenAddr, proxyAddr, useTCP, useUDP, useProxyProtocol, newTimeout) + err = m.EditConfig(newConfig) if err != nil { utils.SendErrorResponse(w, err.Error()) return diff --git a/src/mod/streamproxy/instances.go b/src/mod/streamproxy/instances.go new file mode 100644 index 0000000..2447143 --- /dev/null +++ b/src/mod/streamproxy/instances.go @@ -0,0 +1,97 @@ +package streamproxy + +/* + Instances.go + + This file contains the methods to start, stop, and manage the proxy relay instances. + +*/ + +import ( + "errors" + "time" +) + +// Start a proxy if stopped +func (c *ProxyRelayInstance) Start() error { + if c.IsRunning() { + c.Running = true + return errors.New("proxy already running") + } + + // Create a stopChan to control the loop + tcpStopChan := make(chan bool) + udpStopChan := make(chan bool) + + //Start the proxy service + if c.UseUDP { + c.udpStopChan = udpStopChan + go func() { + err := c.ForwardUDP(c.ListeningAddress, c.ProxyTargetAddr, udpStopChan) + if err != nil { + if !c.UseTCP { + c.Running = false + c.udpStopChan = nil + c.parent.SaveConfigToDatabase() + } + c.parent.logf("[proto:udp] Error starting stream proxy "+c.Name+"("+c.UUID+")", err) + } + }() + } + + if c.UseTCP { + c.tcpStopChan = tcpStopChan + go func() { + //Default to transport mode + err := c.Port2host(c.ListeningAddress, c.ProxyTargetAddr, tcpStopChan) + if err != nil { + c.Running = false + c.tcpStopChan = nil + c.parent.SaveConfigToDatabase() + c.parent.logf("[proto:tcp] Error starting stream proxy "+c.Name+"("+c.UUID+")", err) + } + }() + } + + //Successfully spawned off the proxy routine + c.Running = true + c.parent.SaveConfigToDatabase() + return nil +} + +// Return if a proxy config is running +func (c *ProxyRelayInstance) IsRunning() bool { + return c.tcpStopChan != nil || c.udpStopChan != nil +} + +// Restart a proxy config +func (c *ProxyRelayInstance) Restart() { + if c.IsRunning() { + c.Stop() + } + time.Sleep(3000 * time.Millisecond) + c.Start() +} + +// Stop a running proxy if running +func (c *ProxyRelayInstance) Stop() { + c.parent.logf("Stopping Stream Proxy "+c.Name, nil) + + if c.udpStopChan != nil { + c.parent.logf("Stopping UDP for "+c.Name, nil) + c.udpStopChan <- true + c.udpStopChan = nil + } + + if c.tcpStopChan != nil { + c.parent.logf("Stopping TCP for "+c.Name, nil) + c.tcpStopChan <- true + c.tcpStopChan = nil + } + + c.parent.logf("Stopped Stream Proxy "+c.Name, nil) + c.Running = false + + //Update the running status + c.parent.SaveConfigToDatabase() +} diff --git a/src/mod/streamproxy/streamproxy.go b/src/mod/streamproxy/streamproxy.go index 4e3cc90..6804459 100644 --- a/src/mod/streamproxy/streamproxy.go +++ b/src/mod/streamproxy/streamproxy.go @@ -8,7 +8,6 @@ import ( "path/filepath" "sync" "sync/atomic" - "time" "github.com/google/uuid" "imuslab.com/zoraxy/mod/info/logger" @@ -33,17 +32,32 @@ type ProxyRelayOptions struct { UseProxyProtocol bool } -type ProxyRelayConfig struct { - UUID string //A UUIDv4 representing this config - Name string //Name of the config - Running bool //Status, read only - AutoStart bool //If the service suppose to started automatically - ListeningAddress string //Listening Address, usually 127.0.0.1:port - ProxyTargetAddr string //Proxy target address - UseTCP bool //Enable TCP proxy - UseUDP bool //Enable UDP proxy - UseProxyProtocol bool //Enable Proxy Protocol - Timeout int //Timeout for connection in sec +// ProxyRuleUpdateConfig is used to update the proxy rule config +type ProxyRuleUpdateConfig struct { + InstanceUUID string //The target instance UUID to update + NewName string //New name for the instance, leave empty for no change + NewListeningAddr string //New listening address, leave empty for no change + NewProxyAddr string //New proxy target address, leave empty for no change + UseTCP bool //Enable TCP proxy, default to false + UseUDP bool //Enable UDP proxy, default to false + UseProxyProtocol bool //Enable Proxy Protocol, default to false + NewTimeout int //New timeout for the connection, leave -1 for no change +} + +type ProxyRelayInstance struct { + /* Runtime Config */ + UUID string //A UUIDv4 representing this config + Name string //Name of the config + Running bool //Status, read only + AutoStart bool //If the service suppose to started automatically + ListeningAddress string //Listening Address, usually 127.0.0.1:port + ProxyTargetAddr string //Proxy target address + UseTCP bool //Enable TCP proxy + UseUDP bool //Enable UDP proxy + UseProxyProtocol bool //Enable Proxy Protocol + Timeout int //Timeout for connection in sec + + /* Internal */ tcpStopChan chan bool //Stop channel for TCP listener udpStopChan chan bool //Stop channel for UDP listener aTobAccumulatedByteTransfer atomic.Int64 //Accumulated byte transfer from A to B @@ -62,13 +76,14 @@ type Options struct { type Manager struct { //Config and stores Options *Options - Configs []*ProxyRelayConfig + Configs []*ProxyRelayInstance //Realtime Statistics Connections int //currently connected connect counts } +// NewStreamProxy creates a new stream proxy manager with the given options func NewStreamProxy(options *Options) (*Manager, error) { if !utils.FileExists(options.ConfigStore) { err := os.MkdirAll(options.ConfigStore, 0775) @@ -78,7 +93,7 @@ func NewStreamProxy(options *Options) (*Manager, error) { } //Load relay configs from db - previousRules := []*ProxyRelayConfig{} + previousRules := []*ProxyRelayInstance{} streamProxyConfigFiles, err := filepath.Glob(options.ConfigStore + "/*.config") if err != nil { return nil, err @@ -91,7 +106,7 @@ func NewStreamProxy(options *Options) (*Manager, error) { options.Logger.PrintAndLog("stream-prox", "Read stream proxy config failed", err) continue } - thisRelayConfig := &ProxyRelayConfig{} + thisRelayConfig := &ProxyRelayInstance{} err = json.Unmarshal(configBytes, thisRelayConfig) if err != nil { options.Logger.PrintAndLog("stream-prox", "Unmarshal stream proxy config failed", err) @@ -144,6 +159,7 @@ func (m *Manager) logf(message string, originalError error) { m.Options.Logger.PrintAndLog("stream-prox", message, originalError) } +// NewConfig creates a new proxy relay config with the given options func (m *Manager) NewConfig(config *ProxyRelayOptions) string { //Generate two zero value for atomic int64 aAcc := atomic.Int64{} @@ -152,7 +168,7 @@ func (m *Manager) NewConfig(config *ProxyRelayOptions) string { bAcc.Store(0) //Generate a new config from options configUUID := uuid.New().String() - thisConfig := ProxyRelayConfig{ + thisConfig := ProxyRelayInstance{ UUID: configUUID, Name: config.Name, ListeningAddress: config.ListeningAddr, @@ -173,7 +189,7 @@ func (m *Manager) NewConfig(config *ProxyRelayOptions) string { return configUUID } -func (m *Manager) GetConfigByUUID(configUUID string) (*ProxyRelayConfig, error) { +func (m *Manager) GetConfigByUUID(configUUID string) (*ProxyRelayInstance, error) { // Find and return the config with the specified UUID for _, config := range m.Configs { if config.UUID == configUUID { @@ -184,33 +200,33 @@ func (m *Manager) GetConfigByUUID(configUUID string) (*ProxyRelayConfig, error) } // Edit the config based on config UUID, leave empty for unchange fields -func (m *Manager) EditConfig(configUUID string, newName string, newListeningAddr string, newProxyAddr string, useTCP bool, useUDP bool, useProxyProtocol bool, newTimeout int) error { +func (m *Manager) EditConfig(newConfig *ProxyRuleUpdateConfig) error { // Find the config with the specified UUID - foundConfig, err := m.GetConfigByUUID(configUUID) + foundConfig, err := m.GetConfigByUUID(newConfig.InstanceUUID) if err != nil { return err } // Validate and update the fields - if newName != "" { - foundConfig.Name = newName + if newConfig.NewName != "" { + foundConfig.Name = newConfig.NewName } - if newListeningAddr != "" { - foundConfig.ListeningAddress = newListeningAddr + if newConfig.NewListeningAddr != "" { + foundConfig.ListeningAddress = newConfig.NewListeningAddr } - if newProxyAddr != "" { - foundConfig.ProxyTargetAddr = newProxyAddr + if newConfig.NewProxyAddr != "" { + foundConfig.ProxyTargetAddr = newConfig.NewProxyAddr } - foundConfig.UseTCP = useTCP - foundConfig.UseUDP = useUDP - foundConfig.UseProxyProtocol = useProxyProtocol + foundConfig.UseTCP = newConfig.UseTCP + foundConfig.UseUDP = newConfig.UseUDP + foundConfig.UseProxyProtocol = newConfig.UseProxyProtocol - if newTimeout != -1 { - if newTimeout < 0 { + if newConfig.NewTimeout != -1 { + if newConfig.NewTimeout < 0 { return errors.New("invalid timeout value given") } - foundConfig.Timeout = newTimeout + foundConfig.Timeout = newConfig.NewTimeout } m.SaveConfigToDatabase() @@ -219,12 +235,11 @@ func (m *Manager) EditConfig(configUUID string, newName string, newListeningAddr if foundConfig.IsRunning() { foundConfig.Restart() } - return nil } +// Remove the config from file by UUID func (m *Manager) RemoveConfig(configUUID string) error { - //Remove the config from file err := os.Remove(filepath.Join(m.Options.ConfigStore, configUUID+".config")) if err != nil { return err @@ -254,91 +269,3 @@ func (m *Manager) SaveConfigToDatabase() { } } } - -/* - Config Functions -*/ - -// Start a proxy if stopped -func (c *ProxyRelayConfig) Start() error { - if c.IsRunning() { - c.Running = true - return errors.New("proxy already running") - } - - // Create a stopChan to control the loop - tcpStopChan := make(chan bool) - udpStopChan := make(chan bool) - - //Start the proxy service - if c.UseUDP { - c.udpStopChan = udpStopChan - go func() { - err := c.ForwardUDP(c.ListeningAddress, c.ProxyTargetAddr, udpStopChan) - if err != nil { - if !c.UseTCP { - c.Running = false - c.udpStopChan = nil - c.parent.SaveConfigToDatabase() - } - c.parent.logf("[proto:udp] Error starting stream proxy "+c.Name+"("+c.UUID+")", err) - } - }() - } - - if c.UseTCP { - c.tcpStopChan = tcpStopChan - go func() { - //Default to transport mode - err := c.Port2host(c.ListeningAddress, c.ProxyTargetAddr, tcpStopChan) - if err != nil { - c.Running = false - c.tcpStopChan = nil - c.parent.SaveConfigToDatabase() - c.parent.logf("[proto:tcp] Error starting stream proxy "+c.Name+"("+c.UUID+")", err) - } - }() - } - - //Successfully spawned off the proxy routine - c.Running = true - c.parent.SaveConfigToDatabase() - return nil -} - -// Return if a proxy config is running -func (c *ProxyRelayConfig) IsRunning() bool { - return c.tcpStopChan != nil || c.udpStopChan != nil -} - -// Restart a proxy config -func (c *ProxyRelayConfig) Restart() { - if c.IsRunning() { - c.Stop() - } - time.Sleep(3000 * time.Millisecond) - c.Start() -} - -// Stop a running proxy if running -func (c *ProxyRelayConfig) Stop() { - c.parent.logf("Stopping Stream Proxy "+c.Name, nil) - - if c.udpStopChan != nil { - c.parent.logf("Stopping UDP for "+c.Name, nil) - c.udpStopChan <- true - c.udpStopChan = nil - } - - if c.tcpStopChan != nil { - c.parent.logf("Stopping TCP for "+c.Name, nil) - c.tcpStopChan <- true - c.tcpStopChan = nil - } - - c.parent.logf("Stopped Stream Proxy "+c.Name, nil) - c.Running = false - - //Update the running status - c.parent.SaveConfigToDatabase() -} diff --git a/src/mod/streamproxy/streamproxy_test.go b/src/mod/streamproxy/streamproxy_test.go index a9ccb04..35c777c 100644 --- a/src/mod/streamproxy/streamproxy_test.go +++ b/src/mod/streamproxy/streamproxy_test.go @@ -12,7 +12,7 @@ func TestPort2Port(t *testing.T) { stopChan := make(chan bool) // Create a ProxyRelayConfig with dummy values - config := &streamproxy.ProxyRelayConfig{ + config := &streamproxy.ProxyRelayInstance{ Timeout: 1, } diff --git a/src/mod/streamproxy/tcpprox.go b/src/mod/streamproxy/tcpprox.go index f817edf..64cf468 100644 --- a/src/mod/streamproxy/tcpprox.go +++ b/src/mod/streamproxy/tcpprox.go @@ -72,7 +72,7 @@ func forward(conn1 net.Conn, conn2 net.Conn, aTob *atomic.Int64, bToa *atomic.In wg.Wait() } -func (c *ProxyRelayConfig) accept(listener net.Listener) (net.Conn, error) { +func (c *ProxyRelayInstance) accept(listener net.Listener) (net.Conn, error) { conn, err := listener.Accept() if err != nil { return nil, err @@ -110,7 +110,7 @@ func startListener(address string) (net.Listener, error) { portA -> server server -> portB */ -func (c *ProxyRelayConfig) Port2host(allowPort string, targetAddress string, stopChan chan bool) error { +func (c *ProxyRelayInstance) Port2host(allowPort string, targetAddress string, stopChan chan bool) error { listenerStartingAddr := allowPort if isValidPort(allowPort) { //number only, e.g. 8080 diff --git a/src/mod/streamproxy/udpprox.go b/src/mod/streamproxy/udpprox.go index 9e78639..405739c 100644 --- a/src/mod/streamproxy/udpprox.go +++ b/src/mod/streamproxy/udpprox.go @@ -53,7 +53,7 @@ func initUDPConnections(listenAddr string, targetAddress string) (*net.UDPConn, } // Go routine which manages connection from server to single client -func (c *ProxyRelayConfig) RunUDPConnectionRelay(conn *udpClientServerConn, lisenter *net.UDPConn) { +func (c *ProxyRelayInstance) RunUDPConnectionRelay(conn *udpClientServerConn, lisenter *net.UDPConn) { var buffer [1500]byte for { // Read from server @@ -74,7 +74,7 @@ func (c *ProxyRelayConfig) RunUDPConnectionRelay(conn *udpClientServerConn, lise } // Close all connections that waiting for read from server -func (c *ProxyRelayConfig) CloseAllUDPConnections() { +func (c *ProxyRelayInstance) CloseAllUDPConnections() { c.udpClientMap.Range(func(clientAddr, clientServerConn interface{}) bool { conn := clientServerConn.(*udpClientServerConn) conn.ServerConn.Close() @@ -82,7 +82,7 @@ func (c *ProxyRelayConfig) CloseAllUDPConnections() { }) } -func (c *ProxyRelayConfig) ForwardUDP(address1, address2 string, stopChan chan bool) error { +func (c *ProxyRelayInstance) ForwardUDP(address1, address2 string, stopChan chan bool) error { //By default the incoming listen Address is int //We need to add the loopback address into it if isValidPort(address1) {