Optimized structure for stream proxy

- Separated instance and config for stream proxy
This commit is contained in:
Toby Chui
2025-07-02 21:03:57 +08:00
parent 6c5eba01c2
commit 2d611a559a
6 changed files with 164 additions and 128 deletions

View File

@@ -89,8 +89,20 @@ func (m *Manager) HandleEditProxyConfigs(w http.ResponseWriter, r *http.Request)
} }
} }
// Create a new ProxyRuleUpdateConfig with the extracted parameters
newConfig := &ProxyRuleUpdateConfig{
InstanceUUID: configUUID,
NewName: newName,
NewListeningAddr: listenAddr,
NewProxyAddr: proxyAddr,
UseTCP: useTCP,
UseUDP: useUDP,
UseProxyProtocol: useProxyProtocol,
NewTimeout: newTimeout,
}
// Call the EditConfig method to modify the configuration // Call the EditConfig method to modify the configuration
err = m.EditConfig(configUUID, newName, listenAddr, proxyAddr, useTCP, useUDP, useProxyProtocol, newTimeout) err = m.EditConfig(newConfig)
if err != nil { if err != nil {
utils.SendErrorResponse(w, err.Error()) utils.SendErrorResponse(w, err.Error())
return return

View File

@@ -0,0 +1,97 @@
package streamproxy
/*
Instances.go
This file contains the methods to start, stop, and manage the proxy relay instances.
*/
import (
"errors"
"time"
)
// Start a proxy if stopped
func (c *ProxyRelayInstance) Start() error {
if c.IsRunning() {
c.Running = true
return errors.New("proxy already running")
}
// Create a stopChan to control the loop
tcpStopChan := make(chan bool)
udpStopChan := make(chan bool)
//Start the proxy service
if c.UseUDP {
c.udpStopChan = udpStopChan
go func() {
err := c.ForwardUDP(c.ListeningAddress, c.ProxyTargetAddr, udpStopChan)
if err != nil {
if !c.UseTCP {
c.Running = false
c.udpStopChan = nil
c.parent.SaveConfigToDatabase()
}
c.parent.logf("[proto:udp] Error starting stream proxy "+c.Name+"("+c.UUID+")", err)
}
}()
}
if c.UseTCP {
c.tcpStopChan = tcpStopChan
go func() {
//Default to transport mode
err := c.Port2host(c.ListeningAddress, c.ProxyTargetAddr, tcpStopChan)
if err != nil {
c.Running = false
c.tcpStopChan = nil
c.parent.SaveConfigToDatabase()
c.parent.logf("[proto:tcp] Error starting stream proxy "+c.Name+"("+c.UUID+")", err)
}
}()
}
//Successfully spawned off the proxy routine
c.Running = true
c.parent.SaveConfigToDatabase()
return nil
}
// Return if a proxy config is running
func (c *ProxyRelayInstance) IsRunning() bool {
return c.tcpStopChan != nil || c.udpStopChan != nil
}
// Restart a proxy config
func (c *ProxyRelayInstance) Restart() {
if c.IsRunning() {
c.Stop()
}
time.Sleep(3000 * time.Millisecond)
c.Start()
}
// Stop a running proxy if running
func (c *ProxyRelayInstance) Stop() {
c.parent.logf("Stopping Stream Proxy "+c.Name, nil)
if c.udpStopChan != nil {
c.parent.logf("Stopping UDP for "+c.Name, nil)
c.udpStopChan <- true
c.udpStopChan = nil
}
if c.tcpStopChan != nil {
c.parent.logf("Stopping TCP for "+c.Name, nil)
c.tcpStopChan <- true
c.tcpStopChan = nil
}
c.parent.logf("Stopped Stream Proxy "+c.Name, nil)
c.Running = false
//Update the running status
c.parent.SaveConfigToDatabase()
}

View File

@@ -8,7 +8,6 @@ import (
"path/filepath" "path/filepath"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time"
"github.com/google/uuid" "github.com/google/uuid"
"imuslab.com/zoraxy/mod/info/logger" "imuslab.com/zoraxy/mod/info/logger"
@@ -33,17 +32,32 @@ type ProxyRelayOptions struct {
UseProxyProtocol bool UseProxyProtocol bool
} }
type ProxyRelayConfig struct { // ProxyRuleUpdateConfig is used to update the proxy rule config
UUID string //A UUIDv4 representing this config type ProxyRuleUpdateConfig struct {
Name string //Name of the config InstanceUUID string //The target instance UUID to update
Running bool //Status, read only NewName string //New name for the instance, leave empty for no change
AutoStart bool //If the service suppose to started automatically NewListeningAddr string //New listening address, leave empty for no change
ListeningAddress string //Listening Address, usually 127.0.0.1:port NewProxyAddr string //New proxy target address, leave empty for no change
ProxyTargetAddr string //Proxy target address UseTCP bool //Enable TCP proxy, default to false
UseTCP bool //Enable TCP proxy UseUDP bool //Enable UDP proxy, default to false
UseUDP bool //Enable UDP proxy UseProxyProtocol bool //Enable Proxy Protocol, default to false
UseProxyProtocol bool //Enable Proxy Protocol NewTimeout int //New timeout for the connection, leave -1 for no change
Timeout int //Timeout for connection in sec }
type ProxyRelayInstance struct {
/* Runtime Config */
UUID string //A UUIDv4 representing this config
Name string //Name of the config
Running bool //Status, read only
AutoStart bool //If the service suppose to started automatically
ListeningAddress string //Listening Address, usually 127.0.0.1:port
ProxyTargetAddr string //Proxy target address
UseTCP bool //Enable TCP proxy
UseUDP bool //Enable UDP proxy
UseProxyProtocol bool //Enable Proxy Protocol
Timeout int //Timeout for connection in sec
/* Internal */
tcpStopChan chan bool //Stop channel for TCP listener tcpStopChan chan bool //Stop channel for TCP listener
udpStopChan chan bool //Stop channel for UDP listener udpStopChan chan bool //Stop channel for UDP listener
aTobAccumulatedByteTransfer atomic.Int64 //Accumulated byte transfer from A to B aTobAccumulatedByteTransfer atomic.Int64 //Accumulated byte transfer from A to B
@@ -62,13 +76,14 @@ type Options struct {
type Manager struct { type Manager struct {
//Config and stores //Config and stores
Options *Options Options *Options
Configs []*ProxyRelayConfig Configs []*ProxyRelayInstance
//Realtime Statistics //Realtime Statistics
Connections int //currently connected connect counts Connections int //currently connected connect counts
} }
// NewStreamProxy creates a new stream proxy manager with the given options
func NewStreamProxy(options *Options) (*Manager, error) { func NewStreamProxy(options *Options) (*Manager, error) {
if !utils.FileExists(options.ConfigStore) { if !utils.FileExists(options.ConfigStore) {
err := os.MkdirAll(options.ConfigStore, 0775) err := os.MkdirAll(options.ConfigStore, 0775)
@@ -78,7 +93,7 @@ func NewStreamProxy(options *Options) (*Manager, error) {
} }
//Load relay configs from db //Load relay configs from db
previousRules := []*ProxyRelayConfig{} previousRules := []*ProxyRelayInstance{}
streamProxyConfigFiles, err := filepath.Glob(options.ConfigStore + "/*.config") streamProxyConfigFiles, err := filepath.Glob(options.ConfigStore + "/*.config")
if err != nil { if err != nil {
return nil, err return nil, err
@@ -91,7 +106,7 @@ func NewStreamProxy(options *Options) (*Manager, error) {
options.Logger.PrintAndLog("stream-prox", "Read stream proxy config failed", err) options.Logger.PrintAndLog("stream-prox", "Read stream proxy config failed", err)
continue continue
} }
thisRelayConfig := &ProxyRelayConfig{} thisRelayConfig := &ProxyRelayInstance{}
err = json.Unmarshal(configBytes, thisRelayConfig) err = json.Unmarshal(configBytes, thisRelayConfig)
if err != nil { if err != nil {
options.Logger.PrintAndLog("stream-prox", "Unmarshal stream proxy config failed", err) options.Logger.PrintAndLog("stream-prox", "Unmarshal stream proxy config failed", err)
@@ -144,6 +159,7 @@ func (m *Manager) logf(message string, originalError error) {
m.Options.Logger.PrintAndLog("stream-prox", message, originalError) m.Options.Logger.PrintAndLog("stream-prox", message, originalError)
} }
// NewConfig creates a new proxy relay config with the given options
func (m *Manager) NewConfig(config *ProxyRelayOptions) string { func (m *Manager) NewConfig(config *ProxyRelayOptions) string {
//Generate two zero value for atomic int64 //Generate two zero value for atomic int64
aAcc := atomic.Int64{} aAcc := atomic.Int64{}
@@ -152,7 +168,7 @@ func (m *Manager) NewConfig(config *ProxyRelayOptions) string {
bAcc.Store(0) bAcc.Store(0)
//Generate a new config from options //Generate a new config from options
configUUID := uuid.New().String() configUUID := uuid.New().String()
thisConfig := ProxyRelayConfig{ thisConfig := ProxyRelayInstance{
UUID: configUUID, UUID: configUUID,
Name: config.Name, Name: config.Name,
ListeningAddress: config.ListeningAddr, ListeningAddress: config.ListeningAddr,
@@ -173,7 +189,7 @@ func (m *Manager) NewConfig(config *ProxyRelayOptions) string {
return configUUID return configUUID
} }
func (m *Manager) GetConfigByUUID(configUUID string) (*ProxyRelayConfig, error) { func (m *Manager) GetConfigByUUID(configUUID string) (*ProxyRelayInstance, error) {
// Find and return the config with the specified UUID // Find and return the config with the specified UUID
for _, config := range m.Configs { for _, config := range m.Configs {
if config.UUID == configUUID { if config.UUID == configUUID {
@@ -184,33 +200,33 @@ func (m *Manager) GetConfigByUUID(configUUID string) (*ProxyRelayConfig, error)
} }
// Edit the config based on config UUID, leave empty for unchange fields // Edit the config based on config UUID, leave empty for unchange fields
func (m *Manager) EditConfig(configUUID string, newName string, newListeningAddr string, newProxyAddr string, useTCP bool, useUDP bool, useProxyProtocol bool, newTimeout int) error { func (m *Manager) EditConfig(newConfig *ProxyRuleUpdateConfig) error {
// Find the config with the specified UUID // Find the config with the specified UUID
foundConfig, err := m.GetConfigByUUID(configUUID) foundConfig, err := m.GetConfigByUUID(newConfig.InstanceUUID)
if err != nil { if err != nil {
return err return err
} }
// Validate and update the fields // Validate and update the fields
if newName != "" { if newConfig.NewName != "" {
foundConfig.Name = newName foundConfig.Name = newConfig.NewName
} }
if newListeningAddr != "" { if newConfig.NewListeningAddr != "" {
foundConfig.ListeningAddress = newListeningAddr foundConfig.ListeningAddress = newConfig.NewListeningAddr
} }
if newProxyAddr != "" { if newConfig.NewProxyAddr != "" {
foundConfig.ProxyTargetAddr = newProxyAddr foundConfig.ProxyTargetAddr = newConfig.NewProxyAddr
} }
foundConfig.UseTCP = useTCP foundConfig.UseTCP = newConfig.UseTCP
foundConfig.UseUDP = useUDP foundConfig.UseUDP = newConfig.UseUDP
foundConfig.UseProxyProtocol = useProxyProtocol foundConfig.UseProxyProtocol = newConfig.UseProxyProtocol
if newTimeout != -1 { if newConfig.NewTimeout != -1 {
if newTimeout < 0 { if newConfig.NewTimeout < 0 {
return errors.New("invalid timeout value given") return errors.New("invalid timeout value given")
} }
foundConfig.Timeout = newTimeout foundConfig.Timeout = newConfig.NewTimeout
} }
m.SaveConfigToDatabase() m.SaveConfigToDatabase()
@@ -219,12 +235,11 @@ func (m *Manager) EditConfig(configUUID string, newName string, newListeningAddr
if foundConfig.IsRunning() { if foundConfig.IsRunning() {
foundConfig.Restart() foundConfig.Restart()
} }
return nil return nil
} }
// Remove the config from file by UUID
func (m *Manager) RemoveConfig(configUUID string) error { func (m *Manager) RemoveConfig(configUUID string) error {
//Remove the config from file
err := os.Remove(filepath.Join(m.Options.ConfigStore, configUUID+".config")) err := os.Remove(filepath.Join(m.Options.ConfigStore, configUUID+".config"))
if err != nil { if err != nil {
return err return err
@@ -254,91 +269,3 @@ func (m *Manager) SaveConfigToDatabase() {
} }
} }
} }
/*
Config Functions
*/
// Start a proxy if stopped
func (c *ProxyRelayConfig) Start() error {
if c.IsRunning() {
c.Running = true
return errors.New("proxy already running")
}
// Create a stopChan to control the loop
tcpStopChan := make(chan bool)
udpStopChan := make(chan bool)
//Start the proxy service
if c.UseUDP {
c.udpStopChan = udpStopChan
go func() {
err := c.ForwardUDP(c.ListeningAddress, c.ProxyTargetAddr, udpStopChan)
if err != nil {
if !c.UseTCP {
c.Running = false
c.udpStopChan = nil
c.parent.SaveConfigToDatabase()
}
c.parent.logf("[proto:udp] Error starting stream proxy "+c.Name+"("+c.UUID+")", err)
}
}()
}
if c.UseTCP {
c.tcpStopChan = tcpStopChan
go func() {
//Default to transport mode
err := c.Port2host(c.ListeningAddress, c.ProxyTargetAddr, tcpStopChan)
if err != nil {
c.Running = false
c.tcpStopChan = nil
c.parent.SaveConfigToDatabase()
c.parent.logf("[proto:tcp] Error starting stream proxy "+c.Name+"("+c.UUID+")", err)
}
}()
}
//Successfully spawned off the proxy routine
c.Running = true
c.parent.SaveConfigToDatabase()
return nil
}
// Return if a proxy config is running
func (c *ProxyRelayConfig) IsRunning() bool {
return c.tcpStopChan != nil || c.udpStopChan != nil
}
// Restart a proxy config
func (c *ProxyRelayConfig) Restart() {
if c.IsRunning() {
c.Stop()
}
time.Sleep(3000 * time.Millisecond)
c.Start()
}
// Stop a running proxy if running
func (c *ProxyRelayConfig) Stop() {
c.parent.logf("Stopping Stream Proxy "+c.Name, nil)
if c.udpStopChan != nil {
c.parent.logf("Stopping UDP for "+c.Name, nil)
c.udpStopChan <- true
c.udpStopChan = nil
}
if c.tcpStopChan != nil {
c.parent.logf("Stopping TCP for "+c.Name, nil)
c.tcpStopChan <- true
c.tcpStopChan = nil
}
c.parent.logf("Stopped Stream Proxy "+c.Name, nil)
c.Running = false
//Update the running status
c.parent.SaveConfigToDatabase()
}

View File

@@ -12,7 +12,7 @@ func TestPort2Port(t *testing.T) {
stopChan := make(chan bool) stopChan := make(chan bool)
// Create a ProxyRelayConfig with dummy values // Create a ProxyRelayConfig with dummy values
config := &streamproxy.ProxyRelayConfig{ config := &streamproxy.ProxyRelayInstance{
Timeout: 1, Timeout: 1,
} }

View File

@@ -72,7 +72,7 @@ func forward(conn1 net.Conn, conn2 net.Conn, aTob *atomic.Int64, bToa *atomic.In
wg.Wait() wg.Wait()
} }
func (c *ProxyRelayConfig) accept(listener net.Listener) (net.Conn, error) { func (c *ProxyRelayInstance) accept(listener net.Listener) (net.Conn, error) {
conn, err := listener.Accept() conn, err := listener.Accept()
if err != nil { if err != nil {
return nil, err return nil, err
@@ -110,7 +110,7 @@ func startListener(address string) (net.Listener, error) {
portA -> server portA -> server
server -> portB server -> portB
*/ */
func (c *ProxyRelayConfig) Port2host(allowPort string, targetAddress string, stopChan chan bool) error { func (c *ProxyRelayInstance) Port2host(allowPort string, targetAddress string, stopChan chan bool) error {
listenerStartingAddr := allowPort listenerStartingAddr := allowPort
if isValidPort(allowPort) { if isValidPort(allowPort) {
//number only, e.g. 8080 //number only, e.g. 8080

View File

@@ -53,7 +53,7 @@ func initUDPConnections(listenAddr string, targetAddress string) (*net.UDPConn,
} }
// Go routine which manages connection from server to single client // Go routine which manages connection from server to single client
func (c *ProxyRelayConfig) RunUDPConnectionRelay(conn *udpClientServerConn, lisenter *net.UDPConn) { func (c *ProxyRelayInstance) RunUDPConnectionRelay(conn *udpClientServerConn, lisenter *net.UDPConn) {
var buffer [1500]byte var buffer [1500]byte
for { for {
// Read from server // Read from server
@@ -74,7 +74,7 @@ func (c *ProxyRelayConfig) RunUDPConnectionRelay(conn *udpClientServerConn, lise
} }
// Close all connections that waiting for read from server // Close all connections that waiting for read from server
func (c *ProxyRelayConfig) CloseAllUDPConnections() { func (c *ProxyRelayInstance) CloseAllUDPConnections() {
c.udpClientMap.Range(func(clientAddr, clientServerConn interface{}) bool { c.udpClientMap.Range(func(clientAddr, clientServerConn interface{}) bool {
conn := clientServerConn.(*udpClientServerConn) conn := clientServerConn.(*udpClientServerConn)
conn.ServerConn.Close() conn.ServerConn.Close()
@@ -82,7 +82,7 @@ func (c *ProxyRelayConfig) CloseAllUDPConnections() {
}) })
} }
func (c *ProxyRelayConfig) ForwardUDP(address1, address2 string, stopChan chan bool) error { func (c *ProxyRelayInstance) ForwardUDP(address1, address2 string, stopChan chan bool) error {
//By default the incoming listen Address is int //By default the incoming listen Address is int
//We need to add the loopback address into it //We need to add the loopback address into it
if isValidPort(address1) { if isValidPort(address1) {