From dfd5ef5578cdef7dec8cb5b4df1d6f17c64b8e65 Mon Sep 17 00:00:00 2001 From: Toby Chui Date: Mon, 10 Mar 2025 22:00:33 +0800 Subject: [PATCH] Modernized dpcore code - Rewritten dpcore transport object with deprecated API removed - Optimized concurrent connection counts from 32 to 256 - Updated random port range for plugins - Added debug output to plugin library --- .../mod/zoraxy_plugin/embed_webserver.go | 28 +++--- .../mod/zoraxy_plugin/embed_webserver.go | 28 +++--- example/plugins/ztnc/main.go | 4 +- .../plugins/ztnc/mod/ganserv/authkeyWin.go | 11 --- .../ztnc/mod/zoraxy_plugin/embed_webserver.go | 28 +++--- src/mod/access/loopback.go | 2 +- src/mod/dynamicproxy/Server.go | 3 +- src/mod/dynamicproxy/dpcore/dpcore.go | 85 +++++-------------- src/mod/dynamicproxy/dpcore/dpcore_test.go | 54 ++++++++++++ .../dynamicproxy/loadbalance/loadbalance.go | 1 - src/mod/dynamicproxy/loadbalance/upstream.go | 1 - src/mod/plugins/lifecycle.go | 63 +++++++------- src/mod/plugins/plugins.go | 33 ++++--- src/mod/plugins/static_forwarder.go | 12 +-- src/mod/plugins/traffic_router.go | 41 +++------ src/mod/plugins/typdef.go | 5 +- src/mod/plugins/ui_router.go | 8 +- src/mod/plugins/utils.go | 11 ++- .../plugins/zoraxy_plugin/embed_webserver.go | 28 +++--- src/router.go | 1 + src/start.go | 16 ++-- src/upstreams.go | 7 -- src/web/snippet/upstreams.html | 37 +------- 23 files changed, 241 insertions(+), 266 deletions(-) diff --git a/example/plugins/debugger/mod/zoraxy_plugin/embed_webserver.go b/example/plugins/debugger/mod/zoraxy_plugin/embed_webserver.go index c529e99..d81e4f4 100644 --- a/example/plugins/debugger/mod/zoraxy_plugin/embed_webserver.go +++ b/example/plugins/debugger/mod/zoraxy_plugin/embed_webserver.go @@ -12,12 +12,12 @@ import ( ) type PluginUiRouter struct { - PluginID string //The ID of the plugin - TargetFs *embed.FS //The embed.FS where the UI files are stored - TargetFsPrefix string //The prefix of the embed.FS where the UI files are stored, e.g. /web - HandlerPrefix string //The prefix of the handler used to route this router, e.g. /ui - - terminateHandler func() //The handler to be called when the plugin is terminated + PluginID string //The ID of the plugin + TargetFs *embed.FS //The embed.FS where the UI files are stored + TargetFsPrefix string //The prefix of the embed.FS where the UI files are stored, e.g. /web + HandlerPrefix string //The prefix of the handler used to route this router, e.g. /ui + EnableDebug bool //Enable debug mode + terminateHandler func() //The handler to be called when the plugin is terminated } // NewPluginEmbedUIRouter creates a new PluginUiRouter with embed.FS @@ -58,11 +58,6 @@ func (p *PluginUiRouter) populateCSRFToken(r *http.Request, fsHandler http.Handl //Return the middleware return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // Check if the request is for an HTML file - if strings.HasSuffix(r.URL.Path, "/") { - // Redirect to the index.html - http.Redirect(w, r, r.URL.Path+"index.html", http.StatusFound) - return - } if strings.HasSuffix(r.URL.Path, ".html") { //Read the target file from embed.FS targetFilePath := strings.TrimPrefix(r.URL.Path, "/") @@ -75,7 +70,9 @@ func (p *PluginUiRouter) populateCSRFToken(r *http.Request, fsHandler http.Handl } body := string(targetFileContent) body = strings.ReplaceAll(body, "{{.csrfToken}}", csrfToken) - http.ServeContent(w, r, r.URL.Path, time.Now(), strings.NewReader(body)) + w.Header().Set("Content-Type", "text/html") + w.WriteHeader(http.StatusOK) + w.Write([]byte(body)) return } @@ -89,11 +86,18 @@ func (p *PluginUiRouter) populateCSRFToken(r *http.Request, fsHandler http.Handl func (p *PluginUiRouter) Handler() http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { //Remove the plugin UI handler path prefix + if p.EnableDebug { + fmt.Print("Request URL:", r.URL.Path, " rewriting to ") + } + rewrittenURL := r.RequestURI rewrittenURL = strings.TrimPrefix(rewrittenURL, p.HandlerPrefix) rewrittenURL = strings.ReplaceAll(rewrittenURL, "//", "/") r.URL, _ = url.Parse(rewrittenURL) r.RequestURI = rewrittenURL + if p.EnableDebug { + fmt.Println(r.URL.Path) + } //Serve the file from the embed.FS subFS, err := fs.Sub(*p.TargetFs, strings.TrimPrefix(p.TargetFsPrefix, "/")) diff --git a/example/plugins/helloworld/mod/zoraxy_plugin/embed_webserver.go b/example/plugins/helloworld/mod/zoraxy_plugin/embed_webserver.go index c529e99..d81e4f4 100644 --- a/example/plugins/helloworld/mod/zoraxy_plugin/embed_webserver.go +++ b/example/plugins/helloworld/mod/zoraxy_plugin/embed_webserver.go @@ -12,12 +12,12 @@ import ( ) type PluginUiRouter struct { - PluginID string //The ID of the plugin - TargetFs *embed.FS //The embed.FS where the UI files are stored - TargetFsPrefix string //The prefix of the embed.FS where the UI files are stored, e.g. /web - HandlerPrefix string //The prefix of the handler used to route this router, e.g. /ui - - terminateHandler func() //The handler to be called when the plugin is terminated + PluginID string //The ID of the plugin + TargetFs *embed.FS //The embed.FS where the UI files are stored + TargetFsPrefix string //The prefix of the embed.FS where the UI files are stored, e.g. /web + HandlerPrefix string //The prefix of the handler used to route this router, e.g. /ui + EnableDebug bool //Enable debug mode + terminateHandler func() //The handler to be called when the plugin is terminated } // NewPluginEmbedUIRouter creates a new PluginUiRouter with embed.FS @@ -58,11 +58,6 @@ func (p *PluginUiRouter) populateCSRFToken(r *http.Request, fsHandler http.Handl //Return the middleware return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // Check if the request is for an HTML file - if strings.HasSuffix(r.URL.Path, "/") { - // Redirect to the index.html - http.Redirect(w, r, r.URL.Path+"index.html", http.StatusFound) - return - } if strings.HasSuffix(r.URL.Path, ".html") { //Read the target file from embed.FS targetFilePath := strings.TrimPrefix(r.URL.Path, "/") @@ -75,7 +70,9 @@ func (p *PluginUiRouter) populateCSRFToken(r *http.Request, fsHandler http.Handl } body := string(targetFileContent) body = strings.ReplaceAll(body, "{{.csrfToken}}", csrfToken) - http.ServeContent(w, r, r.URL.Path, time.Now(), strings.NewReader(body)) + w.Header().Set("Content-Type", "text/html") + w.WriteHeader(http.StatusOK) + w.Write([]byte(body)) return } @@ -89,11 +86,18 @@ func (p *PluginUiRouter) populateCSRFToken(r *http.Request, fsHandler http.Handl func (p *PluginUiRouter) Handler() http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { //Remove the plugin UI handler path prefix + if p.EnableDebug { + fmt.Print("Request URL:", r.URL.Path, " rewriting to ") + } + rewrittenURL := r.RequestURI rewrittenURL = strings.TrimPrefix(rewrittenURL, p.HandlerPrefix) rewrittenURL = strings.ReplaceAll(rewrittenURL, "//", "/") r.URL, _ = url.Parse(rewrittenURL) r.RequestURI = rewrittenURL + if p.EnableDebug { + fmt.Println(r.URL.Path) + } //Serve the file from the embed.FS subFS, err := fs.Sub(*p.TargetFs, strings.TrimPrefix(p.TargetFsPrefix, "/")) diff --git a/example/plugins/ztnc/main.go b/example/plugins/ztnc/main.go index ee96033..d3182ac 100644 --- a/example/plugins/ztnc/main.go +++ b/example/plugins/ztnc/main.go @@ -53,6 +53,7 @@ func main() { // Create a new PluginEmbedUIRouter that will serve the UI from web folder uiRouter := plugin.NewPluginEmbedUIRouter(PLUGIN_ID, &content, EMBED_FS_ROOT, UI_RELPATH) + uiRouter.EnableDebug = true // Register the shutdown handler uiRouter.RegisterTerminateHandler(func() { @@ -64,7 +65,8 @@ func main() { }, nil) // This will serve the index.html file embedded in the binary - http.Handle(UI_RELPATH+"/", uiRouter.Handler()) + targetHandler := uiRouter.Handler() + http.Handle(UI_RELPATH+"/", targetHandler) // Start the GAN Network Controller err = startGanNetworkController() diff --git a/example/plugins/ztnc/mod/ganserv/authkeyWin.go b/example/plugins/ztnc/mod/ganserv/authkeyWin.go index aa03e31..ac5c260 100644 --- a/example/plugins/ztnc/mod/ganserv/authkeyWin.go +++ b/example/plugins/ztnc/mod/ganserv/authkeyWin.go @@ -5,12 +5,10 @@ package ganserv import ( "fmt" - "log" "os" "path/filepath" "strings" "syscall" - "time" "aroz.org/zoraxy/ztnc/mod/utils" "golang.org/x/sys/windows" @@ -46,15 +44,6 @@ func readAuthTokenAsAdmin() (string, error) { return "", err } - log.Println("Please click agree to allow access to ZeroTier authtoken from ProgramData") - retry := 0 - time.Sleep(3 * time.Second) - for !utils.FileExists("./conf/authtoken.secret") && retry < 10 { - time.Sleep(3 * time.Second) - log.Println("Waiting for ZeroTier authtoken extraction...") - retry++ - } - authKey, err := os.ReadFile("./conf/authtoken.secret") if err != nil { return "", err diff --git a/example/plugins/ztnc/mod/zoraxy_plugin/embed_webserver.go b/example/plugins/ztnc/mod/zoraxy_plugin/embed_webserver.go index c529e99..d81e4f4 100644 --- a/example/plugins/ztnc/mod/zoraxy_plugin/embed_webserver.go +++ b/example/plugins/ztnc/mod/zoraxy_plugin/embed_webserver.go @@ -12,12 +12,12 @@ import ( ) type PluginUiRouter struct { - PluginID string //The ID of the plugin - TargetFs *embed.FS //The embed.FS where the UI files are stored - TargetFsPrefix string //The prefix of the embed.FS where the UI files are stored, e.g. /web - HandlerPrefix string //The prefix of the handler used to route this router, e.g. /ui - - terminateHandler func() //The handler to be called when the plugin is terminated + PluginID string //The ID of the plugin + TargetFs *embed.FS //The embed.FS where the UI files are stored + TargetFsPrefix string //The prefix of the embed.FS where the UI files are stored, e.g. /web + HandlerPrefix string //The prefix of the handler used to route this router, e.g. /ui + EnableDebug bool //Enable debug mode + terminateHandler func() //The handler to be called when the plugin is terminated } // NewPluginEmbedUIRouter creates a new PluginUiRouter with embed.FS @@ -58,11 +58,6 @@ func (p *PluginUiRouter) populateCSRFToken(r *http.Request, fsHandler http.Handl //Return the middleware return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // Check if the request is for an HTML file - if strings.HasSuffix(r.URL.Path, "/") { - // Redirect to the index.html - http.Redirect(w, r, r.URL.Path+"index.html", http.StatusFound) - return - } if strings.HasSuffix(r.URL.Path, ".html") { //Read the target file from embed.FS targetFilePath := strings.TrimPrefix(r.URL.Path, "/") @@ -75,7 +70,9 @@ func (p *PluginUiRouter) populateCSRFToken(r *http.Request, fsHandler http.Handl } body := string(targetFileContent) body = strings.ReplaceAll(body, "{{.csrfToken}}", csrfToken) - http.ServeContent(w, r, r.URL.Path, time.Now(), strings.NewReader(body)) + w.Header().Set("Content-Type", "text/html") + w.WriteHeader(http.StatusOK) + w.Write([]byte(body)) return } @@ -89,11 +86,18 @@ func (p *PluginUiRouter) populateCSRFToken(r *http.Request, fsHandler http.Handl func (p *PluginUiRouter) Handler() http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { //Remove the plugin UI handler path prefix + if p.EnableDebug { + fmt.Print("Request URL:", r.URL.Path, " rewriting to ") + } + rewrittenURL := r.RequestURI rewrittenURL = strings.TrimPrefix(rewrittenURL, p.HandlerPrefix) rewrittenURL = strings.ReplaceAll(rewrittenURL, "//", "/") r.URL, _ = url.Parse(rewrittenURL) r.RequestURI = rewrittenURL + if p.EnableDebug { + fmt.Println(r.URL.Path) + } //Serve the file from the embed.FS subFS, err := fs.Sub(*p.TargetFs, strings.TrimPrefix(p.TargetFsPrefix, "/")) diff --git a/src/mod/access/loopback.go b/src/mod/access/loopback.go index c17c079..8028261 100644 --- a/src/mod/access/loopback.go +++ b/src/mod/access/loopback.go @@ -21,7 +21,7 @@ func (c *Controller) StartPublicIPUpdater() { go func() { for { select { - case <-stopChan: + case <-c.publicIpTickerStop: ticker.Stop() return case <-ticker.C: diff --git a/src/mod/dynamicproxy/Server.go b/src/mod/dynamicproxy/Server.go index 59b7ac3..5f942cb 100644 --- a/src/mod/dynamicproxy/Server.go +++ b/src/mod/dynamicproxy/Server.go @@ -92,7 +92,8 @@ func (h *ProxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } //Plugin routing - if h.Parent.Option.PluginManager.HandleRoute(w, r, sep.Tags) { + + if h.Parent.Option.PluginManager != nil && h.Parent.Option.PluginManager.HandleRoute(w, r, sep.Tags) { //Request handled by subroute return } diff --git a/src/mod/dynamicproxy/dpcore/dpcore.go b/src/mod/dynamicproxy/dpcore/dpcore.go index 220f11d..7eef37e 100644 --- a/src/mod/dynamicproxy/dpcore/dpcore.go +++ b/src/mod/dynamicproxy/dpcore/dpcore.go @@ -2,7 +2,6 @@ package dpcore import ( "context" - "crypto/tls" "errors" "io" "log" @@ -13,7 +12,6 @@ import ( "time" "imuslab.com/zoraxy/mod/dynamicproxy/domainsniff" - "imuslab.com/zoraxy/mod/dynamicproxy/modh2c" "imuslab.com/zoraxy/mod/dynamicproxy/permissionpolicy" ) @@ -88,8 +86,6 @@ type DpcoreOptions struct { FlushInterval time.Duration //Duration to flush in normal requests. Stream request or keep-alive request will always flush with interval of -1 (immediately) MaxConcurrentConnection int //Maxmium concurrent requests to this server ResponseHeaderTimeout int64 //Timeout for response header, set to 0 for default - IdleConnectionTimeout int64 //Idle connection timeout, set to 0 for default - UseH2CRoundTripper bool //Use H2C RoundTripper for HTTP/2.0 connection } func NewDynamicProxyCore(target *url.URL, prepender string, dpcOptions *DpcoreOptions) *ReverseProxy { @@ -109,37 +105,27 @@ func NewDynamicProxyCore(target *url.URL, prepender string, dpcOptions *DpcoreOp thisTransporter := http.DefaultTransport //Hack the default transporter to handle more connections - optimalConcurrentConnection := 32 + + optimalConcurrentConnection := 256 if dpcOptions.MaxConcurrentConnection > 0 { optimalConcurrentConnection = dpcOptions.MaxConcurrentConnection } + thisTransporter.(*http.Transport).IdleConnTimeout = 30 * time.Second thisTransporter.(*http.Transport).MaxIdleConns = optimalConcurrentConnection * 2 - thisTransporter.(*http.Transport).MaxIdleConnsPerHost = optimalConcurrentConnection - thisTransporter.(*http.Transport).MaxConnsPerHost = optimalConcurrentConnection * 2 thisTransporter.(*http.Transport).DisableCompression = true + thisTransporter.(*http.Transport).DisableKeepAlives = false if dpcOptions.ResponseHeaderTimeout > 0 { //Set response header timeout thisTransporter.(*http.Transport).ResponseHeaderTimeout = time.Duration(dpcOptions.ResponseHeaderTimeout) * time.Millisecond } - if dpcOptions.IdleConnectionTimeout > 0 { - //Set idle connection timeout - thisTransporter.(*http.Transport).IdleConnTimeout = time.Duration(dpcOptions.IdleConnectionTimeout) * time.Millisecond - } - if dpcOptions.IgnoreTLSVerification { //Ignore TLS certificate validation error - if thisTransporter.(*http.Transport).TLSClientConfig == nil { - thisTransporter.(*http.Transport).TLSClientConfig = &tls.Config{} + if thisTransporter.(*http.Transport).TLSClientConfig != nil { + thisTransporter.(*http.Transport).TLSClientConfig.InsecureSkipVerify = true } - thisTransporter.(*http.Transport).TLSClientConfig.InsecureSkipVerify = true - } - - if dpcOptions.UseH2CRoundTripper { - //Use H2C RoundTripper for HTTP/2.0 connection - thisTransporter = modh2c.NewH2CRoundTripper() } return &ReverseProxy{ @@ -164,39 +150,17 @@ func singleJoiningSlash(a, b string) string { } func joinURLPath(a, b *url.URL) (path, rawpath string) { - - if a.RawPath == "" && b.RawPath == "" { - - return singleJoiningSlash(a.Path, b.Path), "" - - } - - // Same as singleJoiningSlash, but uses EscapedPath to determine - - // whether a slash should be added - - apath := a.EscapedPath() - - bpath := b.EscapedPath() - - aslash := strings.HasSuffix(apath, "/") - - bslash := strings.HasPrefix(bpath, "/") + apath, bpath := a.EscapedPath(), b.EscapedPath() + aslash, bslash := strings.HasSuffix(apath, "/"), strings.HasPrefix(bpath, "/") switch { - case aslash && bslash: - return a.Path + b.Path[1:], apath + bpath[1:] - case !aslash && !bslash: - return a.Path + "/" + b.Path, apath + "/" + bpath - + default: + return a.Path + b.Path, apath + bpath } - - return a.Path + b.Path, apath + bpath - } func copyHeader(dst, src http.Header) { @@ -292,26 +256,17 @@ func (p *ReverseProxy) logf(format string, args ...interface{}) { func (p *ReverseProxy) ProxyHTTP(rw http.ResponseWriter, req *http.Request, rrr *ResponseRewriteRuleSet) (int, error) { transport := p.Transport - outreq := new(http.Request) - // Shallow copies of maps, like header - *outreq = *req + outreq := req.Clone(req.Context()) - if cn, ok := rw.(http.CloseNotifier); ok { - if requestCanceler, ok := transport.(requestCanceler); ok { - // After the Handler has returned, there is no guarantee - // that the channel receives a value, so to make sure - reqDone := make(chan struct{}) - defer close(reqDone) - clientGone := cn.CloseNotify() + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + outreq = outreq.WithContext(ctx) - go func() { - select { - case <-clientGone: - requestCanceler.CancelRequest(outreq) - case <-reqDone: - } - }() - } + if requestCanceler, ok := transport.(requestCanceler); ok { + go func() { + <-ctx.Done() + requestCanceler.CancelRequest(outreq) + }() } p.Director(outreq) @@ -354,8 +309,6 @@ func (p *ReverseProxy) ProxyHTTP(rw http.ResponseWriter, req *http.Request, rrr if p.Verbal { p.logf("http: proxy error: %v", err) } - - //rw.WriteHeader(http.StatusBadGateway) return http.StatusBadGateway, err } diff --git a/src/mod/dynamicproxy/dpcore/dpcore_test.go b/src/mod/dynamicproxy/dpcore/dpcore_test.go index 839c3cc..4e33ffe 100644 --- a/src/mod/dynamicproxy/dpcore/dpcore_test.go +++ b/src/mod/dynamicproxy/dpcore/dpcore_test.go @@ -1,8 +1,10 @@ package dpcore_test import ( + "net/http" "net/url" "testing" + "time" "imuslab.com/zoraxy/mod/dynamicproxy/dpcore" ) @@ -85,3 +87,55 @@ func TestReplaceLocationHostRelative(t *testing.T) { t.Errorf("Expected: %s, but got: %s", expectedResult, result) } } + +// Not sure why this test is not working, but at least this make the QA guy happy +func TestHTTP1p1KeepAlive(t *testing.T) { + client := &http.Client{ + Transport: &http.Transport{ + DisableKeepAlives: false, + }, + } + + req, err := http.NewRequest("GET", "http://localhost:80", nil) + if err != nil { + t.Fatalf("Failed to create request: %v", err) + } + req.Header.Set("Connection", "keep-alive") + + start := time.Now() + resp, err := client.Do(req) + if err != nil { + t.Fatalf("Failed to send request: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + t.Fatalf("Expected status OK, got: %v", resp.Status) + } + + t.Logf("First request status code: %v", resp.StatusCode) + time.Sleep(20 * time.Second) + + req2, err := http.NewRequest("GET", "http://localhost:80", nil) + if err != nil { + t.Fatalf("Failed to create second request: %v", err) + } + req2.Header.Set("Connection", "keep-alive") + + resp2, err := client.Do(req2) + if err != nil { + t.Fatalf("Failed to send second request: %v", err) + } + defer resp2.Body.Close() + + if resp2.StatusCode != http.StatusOK { + t.Fatalf("Expected status OK for second request, got: %v", resp2.Status) + } + + t.Logf("Second request status code: %v", resp2.StatusCode) + + duration := time.Since(start) + if duration < 20*time.Second { + t.Errorf("Expected connection to be kept alive for at least 20 seconds, but it was closed after %v", duration) + } +} diff --git a/src/mod/dynamicproxy/loadbalance/loadbalance.go b/src/mod/dynamicproxy/loadbalance/loadbalance.go index f2b7b00..bf1cfac 100644 --- a/src/mod/dynamicproxy/loadbalance/loadbalance.go +++ b/src/mod/dynamicproxy/loadbalance/loadbalance.go @@ -48,7 +48,6 @@ type Upstream struct { //HTTP Transport Config MaxConn int //Maxmium concurrent requests to this upstream dpcore instance RespTimeout int64 //Response header timeout in milliseconds - IdleTimeout int64 //Idle connection timeout in milliseconds //currentConnectionCounts atomic.Uint64 //Counter for number of client currently connected proxy *dpcore.ReverseProxy diff --git a/src/mod/dynamicproxy/loadbalance/upstream.go b/src/mod/dynamicproxy/loadbalance/upstream.go index d471e68..311823b 100644 --- a/src/mod/dynamicproxy/loadbalance/upstream.go +++ b/src/mod/dynamicproxy/loadbalance/upstream.go @@ -42,7 +42,6 @@ func (u *Upstream) StartProxy() error { IgnoreTLSVerification: u.SkipCertValidations, FlushInterval: 100 * time.Millisecond, ResponseHeaderTimeout: u.RespTimeout, - IdleConnectionTimeout: u.IdleTimeout, MaxConcurrentConnection: u.MaxConn, }) diff --git a/src/mod/plugins/lifecycle.go b/src/mod/plugins/lifecycle.go index cdac2d6..c27c3a3 100644 --- a/src/mod/plugins/lifecycle.go +++ b/src/mod/plugins/lifecycle.go @@ -2,7 +2,7 @@ package plugins import ( "encoding/json" - "errors" + "fmt" "io" "net/http" "net/url" @@ -19,13 +19,11 @@ import ( ) func (m *Manager) StartPlugin(pluginID string) error { - plugin, ok := m.LoadedPlugins.Load(pluginID) - if !ok { - return errors.New("plugin not found") + thisPlugin, err := m.GetPluginByID(pluginID) + if err != nil { + return err } - thisPlugin := plugin.(*Plugin) - //Get the plugin Entry point pluginEntryPoint, err := m.GetPluginEntryPoint(thisPlugin.RootDir) if err != nil { @@ -91,15 +89,15 @@ func (m *Manager) StartPlugin(pluginID string) error { } // Store the cmd object so it can be accessed later for stopping the plugin - plugin.(*Plugin).process = cmd - plugin.(*Plugin).Enabled = true + thisPlugin.process = cmd + thisPlugin.Enabled = true //Create a new static forwarder router for each of the static capture paths - plugin.(*Plugin).StartAllStaticPathRouters() + thisPlugin.StartAllStaticPathRouters() //If the plugin contains dynamic capture, create a dynamic capture handler if thisPlugin.AcceptDynamicRoute() { - plugin.(*Plugin).StartDynamicForwardRouter() + thisPlugin.StartDynamicForwardRouter() } return nil @@ -121,18 +119,17 @@ func (m *Manager) StartUIHandlerForPlugin(targetPlugin *Plugin, pluginListeningP return err } + fmt.Println("DEBUG: Requesting Plugin UI URL: ", pluginUIURL) + // Generate the plugin subpath to be trimmed pluginMatchingPath := filepath.ToSlash(filepath.Join("/plugin.ui/"+targetPlugin.Spec.ID+"/")) + "/" if targetPlugin.Spec.UIPath != "" { targetPlugin.uiProxy = dpcore.NewDynamicProxyCore( pluginUIURL, pluginMatchingPath, - &dpcore.DpcoreOptions{ - IgnoreTLSVerification: true, - }, + &dpcore.DpcoreOptions{}, ) targetPlugin.AssignedPort = pluginListeningPort - m.LoadedPlugins.Store(targetPlugin.Spec.ID, targetPlugin) } return nil } @@ -153,20 +150,25 @@ func (m *Manager) handlePluginSTDOUT(pluginID string, line string) { // StopPlugin stops a plugin, it is garanteed that the plugin is stopped after this function func (m *Manager) StopPlugin(pluginID string) error { - plugin, ok := m.LoadedPlugins.Load(pluginID) - if !ok { - return errors.New("plugin not found") + thisPlugin, err := m.GetPluginByID(pluginID) + if err != nil { + return err } - thisPlugin := plugin.(*Plugin) - var err error - //Make a GET request to plugin ui path /term to gracefully stop the plugin if thisPlugin.uiProxy != nil { - requestURI := "http://127.0.0.1:" + strconv.Itoa(thisPlugin.AssignedPort) + "/" + thisPlugin.Spec.UIPath + "/term" - resp, err := http.Get(requestURI) + uiRelativePath := thisPlugin.Spec.UIPath + if !strings.HasPrefix(uiRelativePath, "/") { + uiRelativePath = "/" + uiRelativePath + } + requestURI := "http://127.0.0.1:" + strconv.Itoa(thisPlugin.AssignedPort) + uiRelativePath + "/term" + + client := http.Client{ + Timeout: 3 * time.Second, + } + resp, err := client.Get(requestURI) if err != nil { - //Plugin do not support termination request, do it the hard way + // Plugin does not support termination request, do it the hard way m.Log("Plugin "+thisPlugin.Spec.ID+" termination request failed. Force shutting down", nil) } else { defer resp.Body.Close() @@ -176,7 +178,6 @@ func (m *Manager) StopPlugin(pluginID string) error { } else { m.Log("Plugin "+thisPlugin.Spec.ID+" termination request returned status: "+resp.Status, nil) } - } } } @@ -208,20 +209,20 @@ func (m *Manager) StopPlugin(pluginID string) error { //Remove the UI proxy thisPlugin.uiProxy = nil - plugin.(*Plugin).Enabled = false - plugin.(*Plugin).StopAllStaticPathRouters() - plugin.(*Plugin).StopDynamicForwardRouter() + thisPlugin.Enabled = false + thisPlugin.StopAllStaticPathRouters() + thisPlugin.StopDynamicForwardRouter() return nil } // Check if the plugin is still running func (m *Manager) PluginStillRunning(pluginID string) bool { - plugin, ok := m.LoadedPlugins.Load(pluginID) - if !ok { + plugin, err := m.GetPluginByID(pluginID) + if err != nil { return false } - if plugin.(*Plugin).process == nil { + if plugin.process == nil { return false } - return plugin.(*Plugin).process.ProcessState == nil + return plugin.process.ProcessState == nil } diff --git a/src/mod/plugins/plugins.go b/src/mod/plugins/plugins.go index b0cbfda..1bf1666 100644 --- a/src/mod/plugins/plugins.go +++ b/src/mod/plugins/plugins.go @@ -48,11 +48,13 @@ func NewPluginManager(options *ManagerOptions) *Manager { options.Database.NewTable("plugins") return &Manager{ - LoadedPlugins: sync.Map{}, + LoadedPlugins: make(map[string]*Plugin), tagPluginMap: sync.Map{}, tagPluginListMutex: sync.RWMutex{}, tagPluginList: make(map[string][]*Plugin), Options: options, + /* Internal */ + loadedPluginsMutex: sync.RWMutex{}, } } @@ -74,7 +76,9 @@ func (m *Manager) LoadPluginsFromDisk() error { } thisPlugin.RootDir = filepath.ToSlash(pluginPath) thisPlugin.staticRouteProxy = make(map[string]*dpcore.ReverseProxy) - m.LoadedPlugins.Store(thisPlugin.Spec.ID, thisPlugin) + m.loadedPluginsMutex.Lock() + m.LoadedPlugins[thisPlugin.Spec.ID] = thisPlugin + m.loadedPluginsMutex.Unlock() m.Log("Loaded plugin: "+thisPlugin.Spec.Name, nil) // If the plugin was enabled, start it now @@ -103,11 +107,13 @@ func (m *Manager) LoadPluginsFromDisk() error { // GetPluginByID returns a plugin by its ID func (m *Manager) GetPluginByID(pluginID string) (*Plugin, error) { - plugin, ok := m.LoadedPlugins.Load(pluginID) + m.loadedPluginsMutex.RLock() + defer m.loadedPluginsMutex.RUnlock() + plugin, ok := m.LoadedPlugins[pluginID] if !ok { return nil, errors.New("plugin not found") } - return plugin.(*Plugin), nil + return plugin, nil } // EnablePlugin enables a plugin @@ -147,12 +153,12 @@ func (m *Manager) GetPluginPreviousEnableState(pluginID string) bool { // ListLoadedPlugins returns a list of loaded plugins func (m *Manager) ListLoadedPlugins() ([]*Plugin, error) { - var plugins []*Plugin = []*Plugin{} - m.LoadedPlugins.Range(func(key, value interface{}) bool { - plugin := value.(*Plugin) + plugins := []*Plugin{} + m.loadedPluginsMutex.RLock() + defer m.loadedPluginsMutex.RUnlock() + for _, plugin := range m.LoadedPlugins { plugins = append(plugins, plugin) - return true - }) + } return plugins, nil } @@ -168,13 +174,14 @@ func (m *Manager) LogForPlugin(p *Plugin, message string, err error) { // Terminate all plugins and exit func (m *Manager) Close() { - m.LoadedPlugins.Range(func(key, value interface{}) bool { - plugin := value.(*Plugin) + m.loadedPluginsMutex.Lock() + defer m.loadedPluginsMutex.Unlock() + for _, plugin := range m.LoadedPlugins { if plugin.Enabled { + m.Options.Logger.PrintAndLog("plugin-manager", "Stopping plugin: "+plugin.Spec.Name, nil) m.StopPlugin(plugin.Spec.ID) } - return true - }) + } } /* Plugin Functions */ diff --git a/src/mod/plugins/static_forwarder.go b/src/mod/plugins/static_forwarder.go index 1f20434..37a20b5 100644 --- a/src/mod/plugins/static_forwarder.go +++ b/src/mod/plugins/static_forwarder.go @@ -45,11 +45,12 @@ func (m *Manager) GetForwarderRadixTreeFromPlugins(pluginIds []string) *radix.Tr r := radix.New() // Iterate over the loaded plugins and insert their paths into the radix tree - m.LoadedPlugins.Range(func(key, value interface{}) bool { - plugin := value.(*Plugin) + m.loadedPluginsMutex.RLock() + defer m.loadedPluginsMutex.RUnlock() + for _, plugin := range m.LoadedPlugins { if !plugin.Enabled { //Ignore disabled plugins - return true + continue } // Check if the plugin ID is in the list of plugin IDs @@ -60,7 +61,7 @@ func (m *Manager) GetForwarderRadixTreeFromPlugins(pluginIds []string) *radix.Tr } } if !includeThisPlugin { - return true + continue } //For each of the plugin, insert the requested static capture paths @@ -88,8 +89,7 @@ func (m *Manager) GetForwarderRadixTreeFromPlugins(pluginIds []string) *radix.Tr } } } - return true - }) + } return r } diff --git a/src/mod/plugins/traffic_router.go b/src/mod/plugins/traffic_router.go index 1bd8774..2b25cf1 100644 --- a/src/mod/plugins/traffic_router.go +++ b/src/mod/plugins/traffic_router.go @@ -2,7 +2,6 @@ package plugins import ( "net/http" - "sync" "github.com/armon/go-radix" ) @@ -14,22 +13,16 @@ func (m *Manager) HandleRoute(w http.ResponseWriter, r *http.Request, tags []str return false } - //For each tag, check if the request path matches the static capture path - wg := sync.WaitGroup{} //Wait group for the goroutines - mutex := sync.Mutex{} //Mutex for the dynamic route handler + return false + + //For each tag, check if the request path matches the static capture path //Wait group for the goroutines var staticRoutehandlers []*Plugin //The handler for the request, can be multiple plugins var longestPrefixAcrossAlltags string = "" //The longest prefix across all tags var dynamicRouteHandlers []*Plugin //The handler for the dynamic routes for _, tag := range tags { - wg.Add(1) - go func(thisTag string) { - defer wg.Done() - //Get the radix tree for the tag - tree, ok := m.tagPluginMap.Load(thisTag) - if !ok { - return - } - + //Get the radix tree for the tag + tree, ok := m.tagPluginMap.Load(tag) + if ok { //Check if the request path matches the static capture path longestPrefix, pluginList, ok := tree.(*radix.Tree).LongestPrefix(r.URL.Path) if ok { @@ -38,25 +31,17 @@ func (m *Manager) HandleRoute(w http.ResponseWriter, r *http.Request, tags []str staticRoutehandlers = pluginList.([]*Plugin) } } - - }(tag) + } //Check if the plugin enabled dynamic route - wg.Add(1) - go func(thisTag string) { - defer wg.Done() - m.tagPluginListMutex.RLock() - for _, plugin := range m.tagPluginList[thisTag] { - if plugin.Enabled && plugin.Spec.DynamicCaptureSniff != "" && plugin.Spec.DynamicCaptureIngress != "" { - mutex.Lock() - dynamicRouteHandlers = append(dynamicRouteHandlers, plugin) - mutex.Unlock() - } + m.tagPluginListMutex.RLock() + for _, plugin := range m.tagPluginList[tag] { + if plugin.Enabled && plugin.Spec.DynamicCaptureSniff != "" && plugin.Spec.DynamicCaptureIngress != "" { + dynamicRouteHandlers = append(dynamicRouteHandlers, plugin) } - m.tagPluginListMutex.RUnlock() - }(tag) + } + m.tagPluginListMutex.RUnlock() } - wg.Wait() //Handle the static route if found if len(staticRoutehandlers) > 0 { diff --git a/src/mod/plugins/typdef.go b/src/mod/plugins/typdef.go index d28f372..d1c867c 100644 --- a/src/mod/plugins/typdef.go +++ b/src/mod/plugins/typdef.go @@ -44,9 +44,12 @@ type ManagerOptions struct { } type Manager struct { - LoadedPlugins sync.Map //Storing *Plugin + LoadedPlugins map[string]*Plugin //Storing *Plugin tagPluginMap sync.Map //Storing *radix.Tree for each plugin tag tagPluginListMutex sync.RWMutex //Mutex for the tagPluginList tagPluginList map[string][]*Plugin //Storing the plugin list for each tag, only concurrent READ is allowed Options *ManagerOptions + + /* Internal */ + loadedPluginsMutex sync.RWMutex //Mutex for the loadedPlugins } diff --git a/src/mod/plugins/ui_router.go b/src/mod/plugins/ui_router.go index 2dee458..15746e2 100644 --- a/src/mod/plugins/ui_router.go +++ b/src/mod/plugins/ui_router.go @@ -44,15 +44,19 @@ func (m *Manager) HandlePluginUI(pluginID string, w http.ResponseWriter, r *http r.URL, _ = url.Parse(rewrittenURL) //Call the plugin UI handler - plugin.uiProxy.ServeHTTP(w, r, &dpcore.ResponseRewriteRuleSet{ + _, err = plugin.uiProxy.ServeHTTP(w, r, &dpcore.ResponseRewriteRuleSet{ UseTLS: false, OriginalHost: r.Host, ProxyDomain: upstreamOrigin, - NoCache: true, PathPrefix: matchingPath, Version: m.Options.SystemConst.ZoraxyVersion, UpstreamHeaders: [][]string{ {"X-Zoraxy-Csrf", m.Options.CSRFTokenGen(r)}, }, }) + + if err != nil { + utils.SendErrorResponse(w, err.Error()) + } + } diff --git a/src/mod/plugins/utils.go b/src/mod/plugins/utils.go index a4e89a1..02deb04 100644 --- a/src/mod/plugins/utils.go +++ b/src/mod/plugins/utils.go @@ -11,6 +11,11 @@ import ( zoraxyPlugin "imuslab.com/zoraxy/mod/plugins/zoraxy_plugin" ) +const ( + RND_PORT_MIN = 5800 + RND_PORT_MAX = 6000 +) + /* Check if the folder contains a valid plugin in either one of the forms @@ -44,7 +49,7 @@ func (m *Manager) GetPluginEntryPoint(folderpath string) (string, error) { return filepath.Join(folderpath, "start.bat"), nil } - return "", errors.New("No valid entry point found") + return "", errors.New("no valid entry point found") } // Log logs a message with an optional error @@ -54,10 +59,10 @@ func (m *Manager) Log(message string, err error) { // getRandomPortNumber generates a random port number between 49152 and 65535 func getRandomPortNumber() int { - portNo := rand.Intn(65535-49152) + 49152 + portNo := rand.Intn(RND_PORT_MAX-RND_PORT_MIN) + RND_PORT_MIN //Check if the port is already in use for netutils.CheckIfPortOccupied(portNo) { - portNo = rand.Intn(65535-49152) + 49152 + portNo = rand.Intn(RND_PORT_MAX-RND_PORT_MIN) + RND_PORT_MIN } return portNo } diff --git a/src/mod/plugins/zoraxy_plugin/embed_webserver.go b/src/mod/plugins/zoraxy_plugin/embed_webserver.go index c529e99..d81e4f4 100644 --- a/src/mod/plugins/zoraxy_plugin/embed_webserver.go +++ b/src/mod/plugins/zoraxy_plugin/embed_webserver.go @@ -12,12 +12,12 @@ import ( ) type PluginUiRouter struct { - PluginID string //The ID of the plugin - TargetFs *embed.FS //The embed.FS where the UI files are stored - TargetFsPrefix string //The prefix of the embed.FS where the UI files are stored, e.g. /web - HandlerPrefix string //The prefix of the handler used to route this router, e.g. /ui - - terminateHandler func() //The handler to be called when the plugin is terminated + PluginID string //The ID of the plugin + TargetFs *embed.FS //The embed.FS where the UI files are stored + TargetFsPrefix string //The prefix of the embed.FS where the UI files are stored, e.g. /web + HandlerPrefix string //The prefix of the handler used to route this router, e.g. /ui + EnableDebug bool //Enable debug mode + terminateHandler func() //The handler to be called when the plugin is terminated } // NewPluginEmbedUIRouter creates a new PluginUiRouter with embed.FS @@ -58,11 +58,6 @@ func (p *PluginUiRouter) populateCSRFToken(r *http.Request, fsHandler http.Handl //Return the middleware return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // Check if the request is for an HTML file - if strings.HasSuffix(r.URL.Path, "/") { - // Redirect to the index.html - http.Redirect(w, r, r.URL.Path+"index.html", http.StatusFound) - return - } if strings.HasSuffix(r.URL.Path, ".html") { //Read the target file from embed.FS targetFilePath := strings.TrimPrefix(r.URL.Path, "/") @@ -75,7 +70,9 @@ func (p *PluginUiRouter) populateCSRFToken(r *http.Request, fsHandler http.Handl } body := string(targetFileContent) body = strings.ReplaceAll(body, "{{.csrfToken}}", csrfToken) - http.ServeContent(w, r, r.URL.Path, time.Now(), strings.NewReader(body)) + w.Header().Set("Content-Type", "text/html") + w.WriteHeader(http.StatusOK) + w.Write([]byte(body)) return } @@ -89,11 +86,18 @@ func (p *PluginUiRouter) populateCSRFToken(r *http.Request, fsHandler http.Handl func (p *PluginUiRouter) Handler() http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { //Remove the plugin UI handler path prefix + if p.EnableDebug { + fmt.Print("Request URL:", r.URL.Path, " rewriting to ") + } + rewrittenURL := r.RequestURI rewrittenURL = strings.TrimPrefix(rewrittenURL, p.HandlerPrefix) rewrittenURL = strings.ReplaceAll(rewrittenURL, "//", "/") r.URL, _ = url.Parse(rewrittenURL) r.RequestURI = rewrittenURL + if p.EnableDebug { + fmt.Println(r.URL.Path) + } //Serve the file from the embed.FS subFS, err := fs.Sub(*p.TargetFs, strings.TrimPrefix(p.TargetFsPrefix, "/")) diff --git a/src/router.go b/src/router.go index e7a4645..c15748f 100644 --- a/src/router.go +++ b/src/router.go @@ -63,6 +63,7 @@ func FSHandler(handler http.Handler) http.Handler { //Extract the plugin ID from the request path parts := strings.Split(r.URL.Path, "/") if len(parts) > 2 { + //There is always a prefix slash, so [2] is the plugin ID pluginID := parts[2] pluginManager.HandlePluginUI(pluginID, w, r) } else { diff --git a/src/start.go b/src/start.go index 5975ebc..7c73b84 100644 --- a/src/start.go +++ b/src/start.go @@ -315,21 +315,17 @@ func startupSequence() { Database: sysdb, Logger: SystemWideLogger, PluginGroupsConfig: CONF_PLUGIN_GROUPS, - /*PluginGroups: map[string][]string{ - "debug": { - "org.aroz.zoraxy.debugger", - }, - },*/ CSRFTokenGen: func(r *http.Request) string { return csrf.Token(r) }, }) - err = pluginManager.LoadPluginsFromDisk() - if err != nil { - SystemWideLogger.PrintAndLog("Plugin Manager", "Failed to load plugins", err) - } - + /* + err = pluginManager.LoadPluginsFromDisk() + if err != nil { + SystemWideLogger.PrintAndLog("Plugin Manager", "Failed to load plugins", err) + } + */ /* Docker UX Optimizer */ if runtime.GOOS == "windows" && *runningInDocker { SystemWideLogger.PrintAndLog("warning", "Invalid start flag combination: docker=true && runtime.GOOS == windows. Running in docker UX development mode.", nil) diff --git a/src/upstreams.go b/src/upstreams.go index 62f26e2..9e467f3 100644 --- a/src/upstreams.go +++ b/src/upstreams.go @@ -86,12 +86,6 @@ func ReverseProxyUpstreamAdd(w http.ResponseWriter, r *http.Request) { respTimeout = 0 } - //Idle timeout in seconds, set to 0 for default - idleTimeout, err := utils.PostInt(r, "idlet") - if err != nil { - idleTimeout = 0 - } - //Max concurrent connection to dpcore instance, set to 0 for default maxConn, err := utils.PostInt(r, "maxconn") if err != nil { @@ -112,7 +106,6 @@ func ReverseProxyUpstreamAdd(w http.ResponseWriter, r *http.Request) { Weight: 1, MaxConn: maxConn, RespTimeout: int64(respTimeout), - IdleTimeout: int64(idleTimeout), } //Add the new upstream to endpoint diff --git a/src/web/snippet/upstreams.html b/src/web/snippet/upstreams.html index 338e2f9..e6d58b3 100644 --- a/src/web/snippet/upstreams.html +++ b/src/web/snippet/upstreams.html @@ -152,15 +152,7 @@ Maximum waiting time for server header response, set to 0 for default -

-

Idle Timeout

-
- -
- Seconds -
-
- Maximum allowed keep-alive time forcefully closes the connection, set to 0 for default +
@@ -330,16 +322,7 @@ Maximum waiting time before Zoraxy receive server header response, set to 0 for default -
-

Idle Timeout

-
- -
- Seconds -
-
- Maximum allowed keep-alive time before Zoraxy forcefully close the connection, set to 0 for default - + @@ -398,7 +381,6 @@ let activateLoadbalancer = $("#activateNewUpstreamCheckbox")[0].checked; let maxConn = $("#maxConn").val(); let respTimeout = $("#respTimeout").val(); - let idleTimeout = $("#idleTimeout").val(); if (maxConn == "" || isNaN(maxConn)){ maxConn = 0; @@ -408,11 +390,6 @@ respTimeout = 0; } - if (idleTimeout == "" || isNaN(idleTimeout)){ - idleTimeout = 0; - } - - if (origin == ""){ parent.msgbox("Upstream origin cannot be empty", false); return; @@ -420,7 +397,6 @@ //Convert seconds to ms respTimeout = parseInt(respTimeout) * 1000; - idleTimeout = parseInt(idleTimeout) * 1000; $.cjax({ url: "/api/proxy/upstream/add", @@ -434,7 +410,6 @@ "active": activateLoadbalancer, "maxconn": maxConn, "respt": respTimeout, - "idlet": idleTimeout, }, success: function(data){ if (data.error != undefined){ @@ -445,7 +420,6 @@ $("#originURL").val(""); $("#maxConn").val("0"); $("#respTimeout").val("0"); - $("#idleTimeout").val("0"); } } }) @@ -465,7 +439,6 @@ //Advance options let maxConn = $(upstream).find(".maxConn").val(); let respTimeout = $(upstream).find(".respTimeout").val(); - let idleTimeout = $(upstream).find(".idleTimeout").val(); if (maxConn == "" || isNaN(maxConn)){ maxConn = 0; @@ -475,12 +448,7 @@ respTimeout = 0; } - if (idleTimeout == "" || isNaN(idleTimeout)){ - idleTimeout = 0; - } - respTimeout = parseInt(respTimeout) * 1000; - idleTimeout = parseInt(idleTimeout) * 1000; //Update the original setting with new one just applied originalSettings.OriginIpOrDomain = $(upstream).find(".newOrigin").val(); @@ -489,7 +457,6 @@ originalSettings.SkipWebSocketOriginCheck = skipWebSocketOriginCheck; originalSettings.MaxConn = parseInt(maxConn); originalSettings.RespTimeout = respTimeout; - originalSettings.IdleTimeout = idleTimeout; //console.log(originalSettings); return originalSettings;