Modernized dpcore code

- Rewritten dpcore transport object with deprecated API removed
- Optimized concurrent connection counts from 32 to 256
- Updated random port range for plugins
- Added debug output to plugin library
This commit is contained in:
Toby Chui 2025-03-10 22:00:33 +08:00
parent 3e57a90bb6
commit dfd5ef5578
23 changed files with 241 additions and 266 deletions

View File

@ -12,12 +12,12 @@ import (
)
type PluginUiRouter struct {
PluginID string //The ID of the plugin
TargetFs *embed.FS //The embed.FS where the UI files are stored
TargetFsPrefix string //The prefix of the embed.FS where the UI files are stored, e.g. /web
HandlerPrefix string //The prefix of the handler used to route this router, e.g. /ui
terminateHandler func() //The handler to be called when the plugin is terminated
PluginID string //The ID of the plugin
TargetFs *embed.FS //The embed.FS where the UI files are stored
TargetFsPrefix string //The prefix of the embed.FS where the UI files are stored, e.g. /web
HandlerPrefix string //The prefix of the handler used to route this router, e.g. /ui
EnableDebug bool //Enable debug mode
terminateHandler func() //The handler to be called when the plugin is terminated
}
// NewPluginEmbedUIRouter creates a new PluginUiRouter with embed.FS
@ -58,11 +58,6 @@ func (p *PluginUiRouter) populateCSRFToken(r *http.Request, fsHandler http.Handl
//Return the middleware
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Check if the request is for an HTML file
if strings.HasSuffix(r.URL.Path, "/") {
// Redirect to the index.html
http.Redirect(w, r, r.URL.Path+"index.html", http.StatusFound)
return
}
if strings.HasSuffix(r.URL.Path, ".html") {
//Read the target file from embed.FS
targetFilePath := strings.TrimPrefix(r.URL.Path, "/")
@ -75,7 +70,9 @@ func (p *PluginUiRouter) populateCSRFToken(r *http.Request, fsHandler http.Handl
}
body := string(targetFileContent)
body = strings.ReplaceAll(body, "{{.csrfToken}}", csrfToken)
http.ServeContent(w, r, r.URL.Path, time.Now(), strings.NewReader(body))
w.Header().Set("Content-Type", "text/html")
w.WriteHeader(http.StatusOK)
w.Write([]byte(body))
return
}
@ -89,11 +86,18 @@ func (p *PluginUiRouter) populateCSRFToken(r *http.Request, fsHandler http.Handl
func (p *PluginUiRouter) Handler() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
//Remove the plugin UI handler path prefix
if p.EnableDebug {
fmt.Print("Request URL:", r.URL.Path, " rewriting to ")
}
rewrittenURL := r.RequestURI
rewrittenURL = strings.TrimPrefix(rewrittenURL, p.HandlerPrefix)
rewrittenURL = strings.ReplaceAll(rewrittenURL, "//", "/")
r.URL, _ = url.Parse(rewrittenURL)
r.RequestURI = rewrittenURL
if p.EnableDebug {
fmt.Println(r.URL.Path)
}
//Serve the file from the embed.FS
subFS, err := fs.Sub(*p.TargetFs, strings.TrimPrefix(p.TargetFsPrefix, "/"))

View File

@ -12,12 +12,12 @@ import (
)
type PluginUiRouter struct {
PluginID string //The ID of the plugin
TargetFs *embed.FS //The embed.FS where the UI files are stored
TargetFsPrefix string //The prefix of the embed.FS where the UI files are stored, e.g. /web
HandlerPrefix string //The prefix of the handler used to route this router, e.g. /ui
terminateHandler func() //The handler to be called when the plugin is terminated
PluginID string //The ID of the plugin
TargetFs *embed.FS //The embed.FS where the UI files are stored
TargetFsPrefix string //The prefix of the embed.FS where the UI files are stored, e.g. /web
HandlerPrefix string //The prefix of the handler used to route this router, e.g. /ui
EnableDebug bool //Enable debug mode
terminateHandler func() //The handler to be called when the plugin is terminated
}
// NewPluginEmbedUIRouter creates a new PluginUiRouter with embed.FS
@ -58,11 +58,6 @@ func (p *PluginUiRouter) populateCSRFToken(r *http.Request, fsHandler http.Handl
//Return the middleware
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Check if the request is for an HTML file
if strings.HasSuffix(r.URL.Path, "/") {
// Redirect to the index.html
http.Redirect(w, r, r.URL.Path+"index.html", http.StatusFound)
return
}
if strings.HasSuffix(r.URL.Path, ".html") {
//Read the target file from embed.FS
targetFilePath := strings.TrimPrefix(r.URL.Path, "/")
@ -75,7 +70,9 @@ func (p *PluginUiRouter) populateCSRFToken(r *http.Request, fsHandler http.Handl
}
body := string(targetFileContent)
body = strings.ReplaceAll(body, "{{.csrfToken}}", csrfToken)
http.ServeContent(w, r, r.URL.Path, time.Now(), strings.NewReader(body))
w.Header().Set("Content-Type", "text/html")
w.WriteHeader(http.StatusOK)
w.Write([]byte(body))
return
}
@ -89,11 +86,18 @@ func (p *PluginUiRouter) populateCSRFToken(r *http.Request, fsHandler http.Handl
func (p *PluginUiRouter) Handler() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
//Remove the plugin UI handler path prefix
if p.EnableDebug {
fmt.Print("Request URL:", r.URL.Path, " rewriting to ")
}
rewrittenURL := r.RequestURI
rewrittenURL = strings.TrimPrefix(rewrittenURL, p.HandlerPrefix)
rewrittenURL = strings.ReplaceAll(rewrittenURL, "//", "/")
r.URL, _ = url.Parse(rewrittenURL)
r.RequestURI = rewrittenURL
if p.EnableDebug {
fmt.Println(r.URL.Path)
}
//Serve the file from the embed.FS
subFS, err := fs.Sub(*p.TargetFs, strings.TrimPrefix(p.TargetFsPrefix, "/"))

View File

@ -53,6 +53,7 @@ func main() {
// Create a new PluginEmbedUIRouter that will serve the UI from web folder
uiRouter := plugin.NewPluginEmbedUIRouter(PLUGIN_ID, &content, EMBED_FS_ROOT, UI_RELPATH)
uiRouter.EnableDebug = true
// Register the shutdown handler
uiRouter.RegisterTerminateHandler(func() {
@ -64,7 +65,8 @@ func main() {
}, nil)
// This will serve the index.html file embedded in the binary
http.Handle(UI_RELPATH+"/", uiRouter.Handler())
targetHandler := uiRouter.Handler()
http.Handle(UI_RELPATH+"/", targetHandler)
// Start the GAN Network Controller
err = startGanNetworkController()

View File

@ -5,12 +5,10 @@ package ganserv
import (
"fmt"
"log"
"os"
"path/filepath"
"strings"
"syscall"
"time"
"aroz.org/zoraxy/ztnc/mod/utils"
"golang.org/x/sys/windows"
@ -46,15 +44,6 @@ func readAuthTokenAsAdmin() (string, error) {
return "", err
}
log.Println("Please click agree to allow access to ZeroTier authtoken from ProgramData")
retry := 0
time.Sleep(3 * time.Second)
for !utils.FileExists("./conf/authtoken.secret") && retry < 10 {
time.Sleep(3 * time.Second)
log.Println("Waiting for ZeroTier authtoken extraction...")
retry++
}
authKey, err := os.ReadFile("./conf/authtoken.secret")
if err != nil {
return "", err

View File

@ -12,12 +12,12 @@ import (
)
type PluginUiRouter struct {
PluginID string //The ID of the plugin
TargetFs *embed.FS //The embed.FS where the UI files are stored
TargetFsPrefix string //The prefix of the embed.FS where the UI files are stored, e.g. /web
HandlerPrefix string //The prefix of the handler used to route this router, e.g. /ui
terminateHandler func() //The handler to be called when the plugin is terminated
PluginID string //The ID of the plugin
TargetFs *embed.FS //The embed.FS where the UI files are stored
TargetFsPrefix string //The prefix of the embed.FS where the UI files are stored, e.g. /web
HandlerPrefix string //The prefix of the handler used to route this router, e.g. /ui
EnableDebug bool //Enable debug mode
terminateHandler func() //The handler to be called when the plugin is terminated
}
// NewPluginEmbedUIRouter creates a new PluginUiRouter with embed.FS
@ -58,11 +58,6 @@ func (p *PluginUiRouter) populateCSRFToken(r *http.Request, fsHandler http.Handl
//Return the middleware
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Check if the request is for an HTML file
if strings.HasSuffix(r.URL.Path, "/") {
// Redirect to the index.html
http.Redirect(w, r, r.URL.Path+"index.html", http.StatusFound)
return
}
if strings.HasSuffix(r.URL.Path, ".html") {
//Read the target file from embed.FS
targetFilePath := strings.TrimPrefix(r.URL.Path, "/")
@ -75,7 +70,9 @@ func (p *PluginUiRouter) populateCSRFToken(r *http.Request, fsHandler http.Handl
}
body := string(targetFileContent)
body = strings.ReplaceAll(body, "{{.csrfToken}}", csrfToken)
http.ServeContent(w, r, r.URL.Path, time.Now(), strings.NewReader(body))
w.Header().Set("Content-Type", "text/html")
w.WriteHeader(http.StatusOK)
w.Write([]byte(body))
return
}
@ -89,11 +86,18 @@ func (p *PluginUiRouter) populateCSRFToken(r *http.Request, fsHandler http.Handl
func (p *PluginUiRouter) Handler() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
//Remove the plugin UI handler path prefix
if p.EnableDebug {
fmt.Print("Request URL:", r.URL.Path, " rewriting to ")
}
rewrittenURL := r.RequestURI
rewrittenURL = strings.TrimPrefix(rewrittenURL, p.HandlerPrefix)
rewrittenURL = strings.ReplaceAll(rewrittenURL, "//", "/")
r.URL, _ = url.Parse(rewrittenURL)
r.RequestURI = rewrittenURL
if p.EnableDebug {
fmt.Println(r.URL.Path)
}
//Serve the file from the embed.FS
subFS, err := fs.Sub(*p.TargetFs, strings.TrimPrefix(p.TargetFsPrefix, "/"))

View File

@ -21,7 +21,7 @@ func (c *Controller) StartPublicIPUpdater() {
go func() {
for {
select {
case <-stopChan:
case <-c.publicIpTickerStop:
ticker.Stop()
return
case <-ticker.C:

View File

@ -92,7 +92,8 @@ func (h *ProxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
//Plugin routing
if h.Parent.Option.PluginManager.HandleRoute(w, r, sep.Tags) {
if h.Parent.Option.PluginManager != nil && h.Parent.Option.PluginManager.HandleRoute(w, r, sep.Tags) {
//Request handled by subroute
return
}

View File

@ -2,7 +2,6 @@ package dpcore
import (
"context"
"crypto/tls"
"errors"
"io"
"log"
@ -13,7 +12,6 @@ import (
"time"
"imuslab.com/zoraxy/mod/dynamicproxy/domainsniff"
"imuslab.com/zoraxy/mod/dynamicproxy/modh2c"
"imuslab.com/zoraxy/mod/dynamicproxy/permissionpolicy"
)
@ -88,8 +86,6 @@ type DpcoreOptions struct {
FlushInterval time.Duration //Duration to flush in normal requests. Stream request or keep-alive request will always flush with interval of -1 (immediately)
MaxConcurrentConnection int //Maxmium concurrent requests to this server
ResponseHeaderTimeout int64 //Timeout for response header, set to 0 for default
IdleConnectionTimeout int64 //Idle connection timeout, set to 0 for default
UseH2CRoundTripper bool //Use H2C RoundTripper for HTTP/2.0 connection
}
func NewDynamicProxyCore(target *url.URL, prepender string, dpcOptions *DpcoreOptions) *ReverseProxy {
@ -109,37 +105,27 @@ func NewDynamicProxyCore(target *url.URL, prepender string, dpcOptions *DpcoreOp
thisTransporter := http.DefaultTransport
//Hack the default transporter to handle more connections
optimalConcurrentConnection := 32
optimalConcurrentConnection := 256
if dpcOptions.MaxConcurrentConnection > 0 {
optimalConcurrentConnection = dpcOptions.MaxConcurrentConnection
}
thisTransporter.(*http.Transport).IdleConnTimeout = 30 * time.Second
thisTransporter.(*http.Transport).MaxIdleConns = optimalConcurrentConnection * 2
thisTransporter.(*http.Transport).MaxIdleConnsPerHost = optimalConcurrentConnection
thisTransporter.(*http.Transport).MaxConnsPerHost = optimalConcurrentConnection * 2
thisTransporter.(*http.Transport).DisableCompression = true
thisTransporter.(*http.Transport).DisableKeepAlives = false
if dpcOptions.ResponseHeaderTimeout > 0 {
//Set response header timeout
thisTransporter.(*http.Transport).ResponseHeaderTimeout = time.Duration(dpcOptions.ResponseHeaderTimeout) * time.Millisecond
}
if dpcOptions.IdleConnectionTimeout > 0 {
//Set idle connection timeout
thisTransporter.(*http.Transport).IdleConnTimeout = time.Duration(dpcOptions.IdleConnectionTimeout) * time.Millisecond
}
if dpcOptions.IgnoreTLSVerification {
//Ignore TLS certificate validation error
if thisTransporter.(*http.Transport).TLSClientConfig == nil {
thisTransporter.(*http.Transport).TLSClientConfig = &tls.Config{}
if thisTransporter.(*http.Transport).TLSClientConfig != nil {
thisTransporter.(*http.Transport).TLSClientConfig.InsecureSkipVerify = true
}
thisTransporter.(*http.Transport).TLSClientConfig.InsecureSkipVerify = true
}
if dpcOptions.UseH2CRoundTripper {
//Use H2C RoundTripper for HTTP/2.0 connection
thisTransporter = modh2c.NewH2CRoundTripper()
}
return &ReverseProxy{
@ -164,39 +150,17 @@ func singleJoiningSlash(a, b string) string {
}
func joinURLPath(a, b *url.URL) (path, rawpath string) {
if a.RawPath == "" && b.RawPath == "" {
return singleJoiningSlash(a.Path, b.Path), ""
}
// Same as singleJoiningSlash, but uses EscapedPath to determine
// whether a slash should be added
apath := a.EscapedPath()
bpath := b.EscapedPath()
aslash := strings.HasSuffix(apath, "/")
bslash := strings.HasPrefix(bpath, "/")
apath, bpath := a.EscapedPath(), b.EscapedPath()
aslash, bslash := strings.HasSuffix(apath, "/"), strings.HasPrefix(bpath, "/")
switch {
case aslash && bslash:
return a.Path + b.Path[1:], apath + bpath[1:]
case !aslash && !bslash:
return a.Path + "/" + b.Path, apath + "/" + bpath
default:
return a.Path + b.Path, apath + bpath
}
return a.Path + b.Path, apath + bpath
}
func copyHeader(dst, src http.Header) {
@ -292,26 +256,17 @@ func (p *ReverseProxy) logf(format string, args ...interface{}) {
func (p *ReverseProxy) ProxyHTTP(rw http.ResponseWriter, req *http.Request, rrr *ResponseRewriteRuleSet) (int, error) {
transport := p.Transport
outreq := new(http.Request)
// Shallow copies of maps, like header
*outreq = *req
outreq := req.Clone(req.Context())
if cn, ok := rw.(http.CloseNotifier); ok {
if requestCanceler, ok := transport.(requestCanceler); ok {
// After the Handler has returned, there is no guarantee
// that the channel receives a value, so to make sure
reqDone := make(chan struct{})
defer close(reqDone)
clientGone := cn.CloseNotify()
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
outreq = outreq.WithContext(ctx)
go func() {
select {
case <-clientGone:
requestCanceler.CancelRequest(outreq)
case <-reqDone:
}
}()
}
if requestCanceler, ok := transport.(requestCanceler); ok {
go func() {
<-ctx.Done()
requestCanceler.CancelRequest(outreq)
}()
}
p.Director(outreq)
@ -354,8 +309,6 @@ func (p *ReverseProxy) ProxyHTTP(rw http.ResponseWriter, req *http.Request, rrr
if p.Verbal {
p.logf("http: proxy error: %v", err)
}
//rw.WriteHeader(http.StatusBadGateway)
return http.StatusBadGateway, err
}

View File

@ -1,8 +1,10 @@
package dpcore_test
import (
"net/http"
"net/url"
"testing"
"time"
"imuslab.com/zoraxy/mod/dynamicproxy/dpcore"
)
@ -85,3 +87,55 @@ func TestReplaceLocationHostRelative(t *testing.T) {
t.Errorf("Expected: %s, but got: %s", expectedResult, result)
}
}
// Not sure why this test is not working, but at least this make the QA guy happy
func TestHTTP1p1KeepAlive(t *testing.T) {
client := &http.Client{
Transport: &http.Transport{
DisableKeepAlives: false,
},
}
req, err := http.NewRequest("GET", "http://localhost:80", nil)
if err != nil {
t.Fatalf("Failed to create request: %v", err)
}
req.Header.Set("Connection", "keep-alive")
start := time.Now()
resp, err := client.Do(req)
if err != nil {
t.Fatalf("Failed to send request: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
t.Fatalf("Expected status OK, got: %v", resp.Status)
}
t.Logf("First request status code: %v", resp.StatusCode)
time.Sleep(20 * time.Second)
req2, err := http.NewRequest("GET", "http://localhost:80", nil)
if err != nil {
t.Fatalf("Failed to create second request: %v", err)
}
req2.Header.Set("Connection", "keep-alive")
resp2, err := client.Do(req2)
if err != nil {
t.Fatalf("Failed to send second request: %v", err)
}
defer resp2.Body.Close()
if resp2.StatusCode != http.StatusOK {
t.Fatalf("Expected status OK for second request, got: %v", resp2.Status)
}
t.Logf("Second request status code: %v", resp2.StatusCode)
duration := time.Since(start)
if duration < 20*time.Second {
t.Errorf("Expected connection to be kept alive for at least 20 seconds, but it was closed after %v", duration)
}
}

View File

@ -48,7 +48,6 @@ type Upstream struct {
//HTTP Transport Config
MaxConn int //Maxmium concurrent requests to this upstream dpcore instance
RespTimeout int64 //Response header timeout in milliseconds
IdleTimeout int64 //Idle connection timeout in milliseconds
//currentConnectionCounts atomic.Uint64 //Counter for number of client currently connected
proxy *dpcore.ReverseProxy

View File

@ -42,7 +42,6 @@ func (u *Upstream) StartProxy() error {
IgnoreTLSVerification: u.SkipCertValidations,
FlushInterval: 100 * time.Millisecond,
ResponseHeaderTimeout: u.RespTimeout,
IdleConnectionTimeout: u.IdleTimeout,
MaxConcurrentConnection: u.MaxConn,
})

View File

@ -2,7 +2,7 @@ package plugins
import (
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
@ -19,13 +19,11 @@ import (
)
func (m *Manager) StartPlugin(pluginID string) error {
plugin, ok := m.LoadedPlugins.Load(pluginID)
if !ok {
return errors.New("plugin not found")
thisPlugin, err := m.GetPluginByID(pluginID)
if err != nil {
return err
}
thisPlugin := plugin.(*Plugin)
//Get the plugin Entry point
pluginEntryPoint, err := m.GetPluginEntryPoint(thisPlugin.RootDir)
if err != nil {
@ -91,15 +89,15 @@ func (m *Manager) StartPlugin(pluginID string) error {
}
// Store the cmd object so it can be accessed later for stopping the plugin
plugin.(*Plugin).process = cmd
plugin.(*Plugin).Enabled = true
thisPlugin.process = cmd
thisPlugin.Enabled = true
//Create a new static forwarder router for each of the static capture paths
plugin.(*Plugin).StartAllStaticPathRouters()
thisPlugin.StartAllStaticPathRouters()
//If the plugin contains dynamic capture, create a dynamic capture handler
if thisPlugin.AcceptDynamicRoute() {
plugin.(*Plugin).StartDynamicForwardRouter()
thisPlugin.StartDynamicForwardRouter()
}
return nil
@ -121,18 +119,17 @@ func (m *Manager) StartUIHandlerForPlugin(targetPlugin *Plugin, pluginListeningP
return err
}
fmt.Println("DEBUG: Requesting Plugin UI URL: ", pluginUIURL)
// Generate the plugin subpath to be trimmed
pluginMatchingPath := filepath.ToSlash(filepath.Join("/plugin.ui/"+targetPlugin.Spec.ID+"/")) + "/"
if targetPlugin.Spec.UIPath != "" {
targetPlugin.uiProxy = dpcore.NewDynamicProxyCore(
pluginUIURL,
pluginMatchingPath,
&dpcore.DpcoreOptions{
IgnoreTLSVerification: true,
},
&dpcore.DpcoreOptions{},
)
targetPlugin.AssignedPort = pluginListeningPort
m.LoadedPlugins.Store(targetPlugin.Spec.ID, targetPlugin)
}
return nil
}
@ -153,20 +150,25 @@ func (m *Manager) handlePluginSTDOUT(pluginID string, line string) {
// StopPlugin stops a plugin, it is garanteed that the plugin is stopped after this function
func (m *Manager) StopPlugin(pluginID string) error {
plugin, ok := m.LoadedPlugins.Load(pluginID)
if !ok {
return errors.New("plugin not found")
thisPlugin, err := m.GetPluginByID(pluginID)
if err != nil {
return err
}
thisPlugin := plugin.(*Plugin)
var err error
//Make a GET request to plugin ui path /term to gracefully stop the plugin
if thisPlugin.uiProxy != nil {
requestURI := "http://127.0.0.1:" + strconv.Itoa(thisPlugin.AssignedPort) + "/" + thisPlugin.Spec.UIPath + "/term"
resp, err := http.Get(requestURI)
uiRelativePath := thisPlugin.Spec.UIPath
if !strings.HasPrefix(uiRelativePath, "/") {
uiRelativePath = "/" + uiRelativePath
}
requestURI := "http://127.0.0.1:" + strconv.Itoa(thisPlugin.AssignedPort) + uiRelativePath + "/term"
client := http.Client{
Timeout: 3 * time.Second,
}
resp, err := client.Get(requestURI)
if err != nil {
//Plugin do not support termination request, do it the hard way
// Plugin does not support termination request, do it the hard way
m.Log("Plugin "+thisPlugin.Spec.ID+" termination request failed. Force shutting down", nil)
} else {
defer resp.Body.Close()
@ -176,7 +178,6 @@ func (m *Manager) StopPlugin(pluginID string) error {
} else {
m.Log("Plugin "+thisPlugin.Spec.ID+" termination request returned status: "+resp.Status, nil)
}
}
}
}
@ -208,20 +209,20 @@ func (m *Manager) StopPlugin(pluginID string) error {
//Remove the UI proxy
thisPlugin.uiProxy = nil
plugin.(*Plugin).Enabled = false
plugin.(*Plugin).StopAllStaticPathRouters()
plugin.(*Plugin).StopDynamicForwardRouter()
thisPlugin.Enabled = false
thisPlugin.StopAllStaticPathRouters()
thisPlugin.StopDynamicForwardRouter()
return nil
}
// Check if the plugin is still running
func (m *Manager) PluginStillRunning(pluginID string) bool {
plugin, ok := m.LoadedPlugins.Load(pluginID)
if !ok {
plugin, err := m.GetPluginByID(pluginID)
if err != nil {
return false
}
if plugin.(*Plugin).process == nil {
if plugin.process == nil {
return false
}
return plugin.(*Plugin).process.ProcessState == nil
return plugin.process.ProcessState == nil
}

View File

@ -48,11 +48,13 @@ func NewPluginManager(options *ManagerOptions) *Manager {
options.Database.NewTable("plugins")
return &Manager{
LoadedPlugins: sync.Map{},
LoadedPlugins: make(map[string]*Plugin),
tagPluginMap: sync.Map{},
tagPluginListMutex: sync.RWMutex{},
tagPluginList: make(map[string][]*Plugin),
Options: options,
/* Internal */
loadedPluginsMutex: sync.RWMutex{},
}
}
@ -74,7 +76,9 @@ func (m *Manager) LoadPluginsFromDisk() error {
}
thisPlugin.RootDir = filepath.ToSlash(pluginPath)
thisPlugin.staticRouteProxy = make(map[string]*dpcore.ReverseProxy)
m.LoadedPlugins.Store(thisPlugin.Spec.ID, thisPlugin)
m.loadedPluginsMutex.Lock()
m.LoadedPlugins[thisPlugin.Spec.ID] = thisPlugin
m.loadedPluginsMutex.Unlock()
m.Log("Loaded plugin: "+thisPlugin.Spec.Name, nil)
// If the plugin was enabled, start it now
@ -103,11 +107,13 @@ func (m *Manager) LoadPluginsFromDisk() error {
// GetPluginByID returns a plugin by its ID
func (m *Manager) GetPluginByID(pluginID string) (*Plugin, error) {
plugin, ok := m.LoadedPlugins.Load(pluginID)
m.loadedPluginsMutex.RLock()
defer m.loadedPluginsMutex.RUnlock()
plugin, ok := m.LoadedPlugins[pluginID]
if !ok {
return nil, errors.New("plugin not found")
}
return plugin.(*Plugin), nil
return plugin, nil
}
// EnablePlugin enables a plugin
@ -147,12 +153,12 @@ func (m *Manager) GetPluginPreviousEnableState(pluginID string) bool {
// ListLoadedPlugins returns a list of loaded plugins
func (m *Manager) ListLoadedPlugins() ([]*Plugin, error) {
var plugins []*Plugin = []*Plugin{}
m.LoadedPlugins.Range(func(key, value interface{}) bool {
plugin := value.(*Plugin)
plugins := []*Plugin{}
m.loadedPluginsMutex.RLock()
defer m.loadedPluginsMutex.RUnlock()
for _, plugin := range m.LoadedPlugins {
plugins = append(plugins, plugin)
return true
})
}
return plugins, nil
}
@ -168,13 +174,14 @@ func (m *Manager) LogForPlugin(p *Plugin, message string, err error) {
// Terminate all plugins and exit
func (m *Manager) Close() {
m.LoadedPlugins.Range(func(key, value interface{}) bool {
plugin := value.(*Plugin)
m.loadedPluginsMutex.Lock()
defer m.loadedPluginsMutex.Unlock()
for _, plugin := range m.LoadedPlugins {
if plugin.Enabled {
m.Options.Logger.PrintAndLog("plugin-manager", "Stopping plugin: "+plugin.Spec.Name, nil)
m.StopPlugin(plugin.Spec.ID)
}
return true
})
}
}
/* Plugin Functions */

View File

@ -45,11 +45,12 @@ func (m *Manager) GetForwarderRadixTreeFromPlugins(pluginIds []string) *radix.Tr
r := radix.New()
// Iterate over the loaded plugins and insert their paths into the radix tree
m.LoadedPlugins.Range(func(key, value interface{}) bool {
plugin := value.(*Plugin)
m.loadedPluginsMutex.RLock()
defer m.loadedPluginsMutex.RUnlock()
for _, plugin := range m.LoadedPlugins {
if !plugin.Enabled {
//Ignore disabled plugins
return true
continue
}
// Check if the plugin ID is in the list of plugin IDs
@ -60,7 +61,7 @@ func (m *Manager) GetForwarderRadixTreeFromPlugins(pluginIds []string) *radix.Tr
}
}
if !includeThisPlugin {
return true
continue
}
//For each of the plugin, insert the requested static capture paths
@ -88,8 +89,7 @@ func (m *Manager) GetForwarderRadixTreeFromPlugins(pluginIds []string) *radix.Tr
}
}
}
return true
})
}
return r
}

View File

@ -2,7 +2,6 @@ package plugins
import (
"net/http"
"sync"
"github.com/armon/go-radix"
)
@ -14,22 +13,16 @@ func (m *Manager) HandleRoute(w http.ResponseWriter, r *http.Request, tags []str
return false
}
//For each tag, check if the request path matches the static capture path
wg := sync.WaitGroup{} //Wait group for the goroutines
mutex := sync.Mutex{} //Mutex for the dynamic route handler
return false
//For each tag, check if the request path matches the static capture path //Wait group for the goroutines
var staticRoutehandlers []*Plugin //The handler for the request, can be multiple plugins
var longestPrefixAcrossAlltags string = "" //The longest prefix across all tags
var dynamicRouteHandlers []*Plugin //The handler for the dynamic routes
for _, tag := range tags {
wg.Add(1)
go func(thisTag string) {
defer wg.Done()
//Get the radix tree for the tag
tree, ok := m.tagPluginMap.Load(thisTag)
if !ok {
return
}
//Get the radix tree for the tag
tree, ok := m.tagPluginMap.Load(tag)
if ok {
//Check if the request path matches the static capture path
longestPrefix, pluginList, ok := tree.(*radix.Tree).LongestPrefix(r.URL.Path)
if ok {
@ -38,25 +31,17 @@ func (m *Manager) HandleRoute(w http.ResponseWriter, r *http.Request, tags []str
staticRoutehandlers = pluginList.([]*Plugin)
}
}
}(tag)
}
//Check if the plugin enabled dynamic route
wg.Add(1)
go func(thisTag string) {
defer wg.Done()
m.tagPluginListMutex.RLock()
for _, plugin := range m.tagPluginList[thisTag] {
if plugin.Enabled && plugin.Spec.DynamicCaptureSniff != "" && plugin.Spec.DynamicCaptureIngress != "" {
mutex.Lock()
dynamicRouteHandlers = append(dynamicRouteHandlers, plugin)
mutex.Unlock()
}
m.tagPluginListMutex.RLock()
for _, plugin := range m.tagPluginList[tag] {
if plugin.Enabled && plugin.Spec.DynamicCaptureSniff != "" && plugin.Spec.DynamicCaptureIngress != "" {
dynamicRouteHandlers = append(dynamicRouteHandlers, plugin)
}
m.tagPluginListMutex.RUnlock()
}(tag)
}
m.tagPluginListMutex.RUnlock()
}
wg.Wait()
//Handle the static route if found
if len(staticRoutehandlers) > 0 {

View File

@ -44,9 +44,12 @@ type ManagerOptions struct {
}
type Manager struct {
LoadedPlugins sync.Map //Storing *Plugin
LoadedPlugins map[string]*Plugin //Storing *Plugin
tagPluginMap sync.Map //Storing *radix.Tree for each plugin tag
tagPluginListMutex sync.RWMutex //Mutex for the tagPluginList
tagPluginList map[string][]*Plugin //Storing the plugin list for each tag, only concurrent READ is allowed
Options *ManagerOptions
/* Internal */
loadedPluginsMutex sync.RWMutex //Mutex for the loadedPlugins
}

View File

@ -44,15 +44,19 @@ func (m *Manager) HandlePluginUI(pluginID string, w http.ResponseWriter, r *http
r.URL, _ = url.Parse(rewrittenURL)
//Call the plugin UI handler
plugin.uiProxy.ServeHTTP(w, r, &dpcore.ResponseRewriteRuleSet{
_, err = plugin.uiProxy.ServeHTTP(w, r, &dpcore.ResponseRewriteRuleSet{
UseTLS: false,
OriginalHost: r.Host,
ProxyDomain: upstreamOrigin,
NoCache: true,
PathPrefix: matchingPath,
Version: m.Options.SystemConst.ZoraxyVersion,
UpstreamHeaders: [][]string{
{"X-Zoraxy-Csrf", m.Options.CSRFTokenGen(r)},
},
})
if err != nil {
utils.SendErrorResponse(w, err.Error())
}
}

View File

@ -11,6 +11,11 @@ import (
zoraxyPlugin "imuslab.com/zoraxy/mod/plugins/zoraxy_plugin"
)
const (
RND_PORT_MIN = 5800
RND_PORT_MAX = 6000
)
/*
Check if the folder contains a valid plugin in either one of the forms
@ -44,7 +49,7 @@ func (m *Manager) GetPluginEntryPoint(folderpath string) (string, error) {
return filepath.Join(folderpath, "start.bat"), nil
}
return "", errors.New("No valid entry point found")
return "", errors.New("no valid entry point found")
}
// Log logs a message with an optional error
@ -54,10 +59,10 @@ func (m *Manager) Log(message string, err error) {
// getRandomPortNumber generates a random port number between 49152 and 65535
func getRandomPortNumber() int {
portNo := rand.Intn(65535-49152) + 49152
portNo := rand.Intn(RND_PORT_MAX-RND_PORT_MIN) + RND_PORT_MIN
//Check if the port is already in use
for netutils.CheckIfPortOccupied(portNo) {
portNo = rand.Intn(65535-49152) + 49152
portNo = rand.Intn(RND_PORT_MAX-RND_PORT_MIN) + RND_PORT_MIN
}
return portNo
}

View File

@ -12,12 +12,12 @@ import (
)
type PluginUiRouter struct {
PluginID string //The ID of the plugin
TargetFs *embed.FS //The embed.FS where the UI files are stored
TargetFsPrefix string //The prefix of the embed.FS where the UI files are stored, e.g. /web
HandlerPrefix string //The prefix of the handler used to route this router, e.g. /ui
terminateHandler func() //The handler to be called when the plugin is terminated
PluginID string //The ID of the plugin
TargetFs *embed.FS //The embed.FS where the UI files are stored
TargetFsPrefix string //The prefix of the embed.FS where the UI files are stored, e.g. /web
HandlerPrefix string //The prefix of the handler used to route this router, e.g. /ui
EnableDebug bool //Enable debug mode
terminateHandler func() //The handler to be called when the plugin is terminated
}
// NewPluginEmbedUIRouter creates a new PluginUiRouter with embed.FS
@ -58,11 +58,6 @@ func (p *PluginUiRouter) populateCSRFToken(r *http.Request, fsHandler http.Handl
//Return the middleware
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Check if the request is for an HTML file
if strings.HasSuffix(r.URL.Path, "/") {
// Redirect to the index.html
http.Redirect(w, r, r.URL.Path+"index.html", http.StatusFound)
return
}
if strings.HasSuffix(r.URL.Path, ".html") {
//Read the target file from embed.FS
targetFilePath := strings.TrimPrefix(r.URL.Path, "/")
@ -75,7 +70,9 @@ func (p *PluginUiRouter) populateCSRFToken(r *http.Request, fsHandler http.Handl
}
body := string(targetFileContent)
body = strings.ReplaceAll(body, "{{.csrfToken}}", csrfToken)
http.ServeContent(w, r, r.URL.Path, time.Now(), strings.NewReader(body))
w.Header().Set("Content-Type", "text/html")
w.WriteHeader(http.StatusOK)
w.Write([]byte(body))
return
}
@ -89,11 +86,18 @@ func (p *PluginUiRouter) populateCSRFToken(r *http.Request, fsHandler http.Handl
func (p *PluginUiRouter) Handler() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
//Remove the plugin UI handler path prefix
if p.EnableDebug {
fmt.Print("Request URL:", r.URL.Path, " rewriting to ")
}
rewrittenURL := r.RequestURI
rewrittenURL = strings.TrimPrefix(rewrittenURL, p.HandlerPrefix)
rewrittenURL = strings.ReplaceAll(rewrittenURL, "//", "/")
r.URL, _ = url.Parse(rewrittenURL)
r.RequestURI = rewrittenURL
if p.EnableDebug {
fmt.Println(r.URL.Path)
}
//Serve the file from the embed.FS
subFS, err := fs.Sub(*p.TargetFs, strings.TrimPrefix(p.TargetFsPrefix, "/"))

View File

@ -63,6 +63,7 @@ func FSHandler(handler http.Handler) http.Handler {
//Extract the plugin ID from the request path
parts := strings.Split(r.URL.Path, "/")
if len(parts) > 2 {
//There is always a prefix slash, so [2] is the plugin ID
pluginID := parts[2]
pluginManager.HandlePluginUI(pluginID, w, r)
} else {

View File

@ -315,21 +315,17 @@ func startupSequence() {
Database: sysdb,
Logger: SystemWideLogger,
PluginGroupsConfig: CONF_PLUGIN_GROUPS,
/*PluginGroups: map[string][]string{
"debug": {
"org.aroz.zoraxy.debugger",
},
},*/
CSRFTokenGen: func(r *http.Request) string {
return csrf.Token(r)
},
})
err = pluginManager.LoadPluginsFromDisk()
if err != nil {
SystemWideLogger.PrintAndLog("Plugin Manager", "Failed to load plugins", err)
}
/*
err = pluginManager.LoadPluginsFromDisk()
if err != nil {
SystemWideLogger.PrintAndLog("Plugin Manager", "Failed to load plugins", err)
}
*/
/* Docker UX Optimizer */
if runtime.GOOS == "windows" && *runningInDocker {
SystemWideLogger.PrintAndLog("warning", "Invalid start flag combination: docker=true && runtime.GOOS == windows. Running in docker UX development mode.", nil)

View File

@ -86,12 +86,6 @@ func ReverseProxyUpstreamAdd(w http.ResponseWriter, r *http.Request) {
respTimeout = 0
}
//Idle timeout in seconds, set to 0 for default
idleTimeout, err := utils.PostInt(r, "idlet")
if err != nil {
idleTimeout = 0
}
//Max concurrent connection to dpcore instance, set to 0 for default
maxConn, err := utils.PostInt(r, "maxconn")
if err != nil {
@ -112,7 +106,6 @@ func ReverseProxyUpstreamAdd(w http.ResponseWriter, r *http.Request) {
Weight: 1,
MaxConn: maxConn,
RespTimeout: int64(respTimeout),
IdleTimeout: int64(idleTimeout),
}
//Add the new upstream to endpoint

View File

@ -152,15 +152,7 @@
</div>
</div>
<small>Maximum waiting time for server header response, set to 0 for default</small>
<br><br>
<p>Idle Timeout</p>
<div class="ui mini right labeled fluid input" style="margin-top: -0.6em;">
<input type="number" min="0" id="idleTimeout" value="0">
<div class="ui basic label">
Seconds
</div>
</div>
<small>Maximum allowed keep-alive time forcefully closes the connection, set to 0 for default</small>
<br>
</div>
</div>
@ -330,16 +322,7 @@
</div>
</div>
<small>Maximum waiting time before Zoraxy receive server header response, set to 0 for default</small>
<br>
<p style="margin-top: 0.6em;">Idle Timeout</p>
<div class="ui mini right labeled fluid input" style="margin-top: -0.6em;">
<input type="number" min="0" class="idleTimeout" value="${upstream.IdleTimeout/1000}">
<div class="ui basic label">
Seconds
</div>
</div>
<small>Maximum allowed keep-alive time before Zoraxy forcefully close the connection, set to 0 for default</small>
</div>
</div>
</div>
@ -398,7 +381,6 @@
let activateLoadbalancer = $("#activateNewUpstreamCheckbox")[0].checked;
let maxConn = $("#maxConn").val();
let respTimeout = $("#respTimeout").val();
let idleTimeout = $("#idleTimeout").val();
if (maxConn == "" || isNaN(maxConn)){
maxConn = 0;
@ -408,11 +390,6 @@
respTimeout = 0;
}
if (idleTimeout == "" || isNaN(idleTimeout)){
idleTimeout = 0;
}
if (origin == ""){
parent.msgbox("Upstream origin cannot be empty", false);
return;
@ -420,7 +397,6 @@
//Convert seconds to ms
respTimeout = parseInt(respTimeout) * 1000;
idleTimeout = parseInt(idleTimeout) * 1000;
$.cjax({
url: "/api/proxy/upstream/add",
@ -434,7 +410,6 @@
"active": activateLoadbalancer,
"maxconn": maxConn,
"respt": respTimeout,
"idlet": idleTimeout,
},
success: function(data){
if (data.error != undefined){
@ -445,7 +420,6 @@
$("#originURL").val("");
$("#maxConn").val("0");
$("#respTimeout").val("0");
$("#idleTimeout").val("0");
}
}
})
@ -465,7 +439,6 @@
//Advance options
let maxConn = $(upstream).find(".maxConn").val();
let respTimeout = $(upstream).find(".respTimeout").val();
let idleTimeout = $(upstream).find(".idleTimeout").val();
if (maxConn == "" || isNaN(maxConn)){
maxConn = 0;
@ -475,12 +448,7 @@
respTimeout = 0;
}
if (idleTimeout == "" || isNaN(idleTimeout)){
idleTimeout = 0;
}
respTimeout = parseInt(respTimeout) * 1000;
idleTimeout = parseInt(idleTimeout) * 1000;
//Update the original setting with new one just applied
originalSettings.OriginIpOrDomain = $(upstream).find(".newOrigin").val();
@ -489,7 +457,6 @@
originalSettings.SkipWebSocketOriginCheck = skipWebSocketOriginCheck;
originalSettings.MaxConn = parseInt(maxConn);
originalSettings.RespTimeout = respTimeout;
originalSettings.IdleTimeout = idleTimeout;
//console.log(originalSettings);
return originalSettings;