Added dynamic capture capabilities to plugin API

- Added dynamic capture ingress and sniff endpoint
- Removed static capture example (API update)
This commit is contained in:
Toby Chui
2025-03-02 12:26:44 +08:00
parent 39d6d16c2a
commit 23d4df1ed7
25 changed files with 961 additions and 644 deletions

View File

@ -0,0 +1,111 @@
package plugins
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"net/url"
"strconv"
"strings"
"github.com/google/uuid"
"imuslab.com/zoraxy/mod/dynamicproxy/dpcore"
"imuslab.com/zoraxy/mod/plugins/zoraxy_plugin"
)
// StartDynamicForwardRouter create and start a dynamic forward router for
// this plugin
func (p *Plugin) StartDynamicForwardRouter() error {
// Create a new dpcore object to forward the traffic to the plugin
targetURL, err := url.Parse("http://127.0.0.1:" + strconv.Itoa(p.AssignedPort) + p.Spec.DynamicCaptureIngress)
if err != nil {
fmt.Println("Failed to parse target URL: "+targetURL.String(), err)
return err
}
thisRouter := dpcore.NewDynamicProxyCore(targetURL, "", &dpcore.DpcoreOptions{})
p.dynamicRouteProxy = thisRouter
return nil
}
// StopDynamicForwardRouter stops the dynamic forward router for this plugin
func (p *Plugin) StopDynamicForwardRouter() {
if p.dynamicRouteProxy != nil {
p.dynamicRouteProxy = nil
}
}
// AcceptDynamicRoute returns whether this plugin accepts dynamic route
func (p *Plugin) AcceptDynamicRoute() bool {
return p.Spec.DynamicCaptureSniff != "" && p.Spec.DynamicCaptureIngress != ""
}
func (p *Plugin) HandleDynamicRoute(w http.ResponseWriter, r *http.Request) bool {
//Make sure p.Spec.DynamicCaptureSniff and p.Spec.DynamicCaptureIngress are not empty and start with /
if !p.AcceptDynamicRoute() {
return false
}
//Make sure the paths start with / and do not end with /
if !strings.HasPrefix(p.Spec.DynamicCaptureSniff, "/") {
p.Spec.DynamicCaptureSniff = "/" + p.Spec.DynamicCaptureSniff
}
p.Spec.DynamicCaptureSniff = strings.TrimSuffix(p.Spec.DynamicCaptureSniff, "/")
if !strings.HasPrefix(p.Spec.DynamicCaptureIngress, "/") {
p.Spec.DynamicCaptureIngress = "/" + p.Spec.DynamicCaptureIngress
}
p.Spec.DynamicCaptureIngress = strings.TrimSuffix(p.Spec.DynamicCaptureIngress, "/")
//Send the request to the sniff endpoint
sniffURL, err := url.Parse("http://127.0.0.1:" + strconv.Itoa(p.AssignedPort) + p.Spec.DynamicCaptureSniff + "/")
if err != nil {
//Error when parsing the sniff URL, let the next plugin handle the request
return false
}
// Create an instance of CustomRequest with the original request's data
forwardReq := zoraxy_plugin.EncodeForwardRequestPayload(r)
// Encode the custom request object into JSON
jsonData, err := json.Marshal(forwardReq)
if err != nil {
// Error when encoding the request, let the next plugin handle the request
return false
}
//Generate a unique request ID
uniqueRequestID := uuid.New().String()
req, err := http.NewRequest("POST", sniffURL.String(), bytes.NewBuffer(jsonData))
if err != nil {
// Error when creating the request, let the next plugin handle the request
return false
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Zoraxy-RequestID", uniqueRequestID)
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
// Error when sending the request, let the next plugin handle the request
return false
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
// Sniff endpoint did not return OK, let the next plugin handle the request
return false
}
p.dynamicRouteProxy.ServeHTTP(w, r, &dpcore.ResponseRewriteRuleSet{
UseTLS: false,
OriginalHost: r.Host,
ProxyDomain: "127.0.0.1:" + strconv.Itoa(p.AssignedPort),
NoCache: true,
PathPrefix: p.Spec.DynamicCaptureIngress,
UpstreamHeaders: [][]string{
{"X-Zoraxy-RequestID", uniqueRequestID},
},
})
return true
}

View File

@ -46,6 +46,7 @@ func (m *Manager) StartPlugin(pluginID string) error {
}
js, _ := json.Marshal(pluginConfiguration)
//Start the plugin with given configuration
m.Log("Starting plugin "+thisPlugin.Spec.Name+" at :"+strconv.Itoa(pluginConfiguration.Port), nil)
cmd := exec.Command(absolutePath, "-configure="+string(js))
cmd.Dir = filepath.Dir(absolutePath)
@ -95,6 +96,12 @@ func (m *Manager) StartPlugin(pluginID string) error {
//Create a new static forwarder router for each of the static capture paths
plugin.(*Plugin).StartAllStaticPathRouters()
//If the plugin contains dynamic capture, create a dynamic capture handler
if thisPlugin.AcceptDynamicRoute() {
plugin.(*Plugin).StartDynamicForwardRouter()
}
return nil
}
@ -202,6 +209,7 @@ func (m *Manager) StopPlugin(pluginID string) error {
thisPlugin.uiProxy = nil
plugin.(*Plugin).Enabled = false
plugin.(*Plugin).StopAllStaticPathRouters()
plugin.(*Plugin).StopDynamicForwardRouter()
return nil
}

View File

@ -39,7 +39,8 @@ func NewPluginManager(options *ManagerOptions) *Manager {
return &Manager{
LoadedPlugins: sync.Map{},
TagPluginMap: sync.Map{},
tagPluginMap: sync.Map{},
tagPluginList: make(map[string][]*Plugin),
Options: options,
}
}
@ -76,7 +77,7 @@ func (m *Manager) LoadPluginsFromDisk() error {
}
//Generate the static forwarder radix tree
m.UpdateTagsToTree()
m.UpdateTagsToPluginMaps()
return nil
}
@ -98,7 +99,7 @@ func (m *Manager) EnablePlugin(pluginID string) error {
}
m.Options.Database.Write("plugins", pluginID, true)
//Generate the static forwarder radix tree
m.UpdateTagsToTree()
m.UpdateTagsToPluginMaps()
return nil
}
@ -110,7 +111,7 @@ func (m *Manager) DisablePlugin(pluginID string) error {
return err
}
//Generate the static forwarder radix tree
m.UpdateTagsToTree()
m.UpdateTagsToPluginMaps()
return nil
}
@ -184,11 +185,17 @@ func (m *Plugin) StartAllStaticPathRouters() {
}
}
// StopAllStaticPathRouters stops all static path routers
func (m *Plugin) StopAllStaticPathRouters() {
for path := range m.staticRouteProxy {
m.staticRouteProxy[path] = nil
delete(m.staticRouteProxy, path)
}
m.staticRouteProxy = make(map[string]*dpcore.ReverseProxy)
}
func (p *Plugin) HandleRoute(w http.ResponseWriter, r *http.Request, longestPrefix string) {
// HandleStaticRoute handles the request to the plugin via static path captures (static forwarder)
func (p *Plugin) HandleStaticRoute(w http.ResponseWriter, r *http.Request, longestPrefix string) {
longestPrefix = strings.TrimSuffix(longestPrefix, "/")
targetRouter := p.staticRouteProxy[longestPrefix]
if targetRouter == nil {

View File

@ -15,12 +15,25 @@ import (
request path registered when the plugin started
*/
func (m *Manager) UpdateTagsToTree() {
func (m *Manager) UpdateTagsToPluginMaps() {
//build the tag to plugin pointer sync.Map
m.TagPluginMap = sync.Map{}
m.tagPluginMap = sync.Map{}
for tag, pluginIds := range m.Options.PluginGroups {
tree := m.GetForwarderRadixTreeFromPlugins(pluginIds)
m.TagPluginMap.Store(tag, tree)
m.tagPluginMap.Store(tag, tree)
}
//build the plugin list for each tag
m.tagPluginList = make(map[string][]*Plugin)
for tag, pluginIds := range m.Options.PluginGroups {
for _, pluginId := range pluginIds {
plugin, err := m.GetPluginByID(pluginId)
if err != nil {
m.Log("Failed to get plugin by ID: "+pluginId, err)
continue
}
m.tagPluginList[tag] = append(m.tagPluginList[tag], plugin)
}
}
}
@ -61,8 +74,6 @@ func (m *Manager) GetForwarderRadixTreeFromPlugins(pluginIds []string) *radix.Tr
} else {
//The path has already been assigned to another plugin
pluginList, _ := r.Get(captureRule.CapturePath)
//pluginList = append(pluginList.([]*Plugin), plugin)
//r.Insert(captureRule.CapturePath, pluginList)
//Warn the path is already assigned to another plugin
if plugin.Spec.ID == pluginList.([]*Plugin)[0].Spec.ID {

View File

@ -1,47 +0,0 @@
package plugins
import (
"net/http"
"sync"
"github.com/armon/go-radix"
)
// HandleRoute handles the request to the plugin
// return true if the request is handled by the plugin
func (m *Manager) HandleRoute(w http.ResponseWriter, r *http.Request, tags []string) bool {
if len(tags) == 0 {
return false
}
//For each tag, check if the request path matches the static capture path
var wg sync.WaitGroup //Wait group for the goroutines
var handler []*Plugin //The handler for the request, can be multiple plugins
var longestPrefixAcrossAlltags string = "" //The longest prefix across all tags
for _, tag := range tags {
wg.Add(1)
go func(thisTag string) {
defer wg.Done()
//Get the radix tree for the tag
tree, ok := m.TagPluginMap.Load(thisTag)
if !ok {
return
}
//Check if the request path matches the static capture path
longestPrefix, pluginList, ok := tree.(*radix.Tree).LongestPrefix(r.URL.Path)
if ok {
if longestPrefix > longestPrefixAcrossAlltags {
longestPrefixAcrossAlltags = longestPrefix
handler = pluginList.([]*Plugin)
}
}
}(tag)
}
wg.Wait()
if len(handler) > 0 {
//Handle the request
handler[0].HandleRoute(w, r, longestPrefixAcrossAlltags)
return true
}
return false
}

View File

@ -0,0 +1,73 @@
package plugins
import (
"net/http"
"sync"
"github.com/armon/go-radix"
)
// HandleRoute handles the request to the plugin
// return true if the request is handled by the plugin
func (m *Manager) HandleRoute(w http.ResponseWriter, r *http.Request, tags []string) bool {
if len(tags) == 0 {
return false
}
//For each tag, check if the request path matches the static capture path
wg := sync.WaitGroup{} //Wait group for the goroutines
mutex := sync.Mutex{} //Mutex for the dynamic route handler
var staticRoutehandlers []*Plugin //The handler for the request, can be multiple plugins
var longestPrefixAcrossAlltags string = "" //The longest prefix across all tags
var dynamicRouteHandlers []*Plugin //The handler for the dynamic routes
for _, tag := range tags {
wg.Add(1)
go func(thisTag string) {
defer wg.Done()
//Get the radix tree for the tag
tree, ok := m.tagPluginMap.Load(thisTag)
if !ok {
return
}
//Check if the request path matches the static capture path
longestPrefix, pluginList, ok := tree.(*radix.Tree).LongestPrefix(r.URL.Path)
if ok {
if longestPrefix > longestPrefixAcrossAlltags {
longestPrefixAcrossAlltags = longestPrefix
staticRoutehandlers = pluginList.([]*Plugin)
}
}
}(tag)
//Check if the plugin enabled dynamic route
wg.Add(1)
go func(thisTag string) {
defer wg.Done()
for _, plugin := range m.tagPluginList[thisTag] {
if plugin.Enabled && plugin.Spec.DynamicCaptureSniff != "" && plugin.Spec.DynamicCaptureIngress != "" {
mutex.Lock()
dynamicRouteHandlers = append(dynamicRouteHandlers, plugin)
mutex.Unlock()
}
}
}(tag)
}
wg.Wait()
//Handle the static route if found
if len(staticRoutehandlers) > 0 {
//Handle the request
staticRoutehandlers[0].HandleStaticRoute(w, r, longestPrefixAcrossAlltags)
return true
}
//No static route handler found, check for dynamic route handler
for _, plugin := range dynamicRouteHandlers {
if plugin.HandleDynamicRoute(w, r) {
return true
}
}
return false
}

View File

@ -21,23 +21,27 @@ type Plugin struct {
Enabled bool //Whether the plugin is enabled
//Runtime
AssignedPort int //The assigned port for the plugin
uiProxy *dpcore.ReverseProxy //The reverse proxy for the plugin UI
staticRouteProxy map[string]*dpcore.ReverseProxy //Storing longest prefix => dpcore map for static route
process *exec.Cmd //The process of the plugin
AssignedPort int //The assigned port for the plugin
uiProxy *dpcore.ReverseProxy //The reverse proxy for the plugin UI
staticRouteProxy map[string]*dpcore.ReverseProxy //Storing longest prefix => dpcore map for static route
dynamicRouteProxy *dpcore.ReverseProxy //The reverse proxy for the dynamic route
process *exec.Cmd //The process of the plugin
}
type ManagerOptions struct {
PluginDir string //The directory where the plugins are stored
PluginGroups map[string][]string //The plugin groups,key is the tag name and the value is an array of plugin IDs
/* Runtime */
SystemConst *zoraxyPlugin.RuntimeConstantValue
Database *database.Database
Logger *logger.Logger
CSRFTokenGen func(*http.Request) string //The CSRF token generator function
CSRFTokenGen func(*http.Request) string `json:"-"` //The CSRF token generator function
Database *database.Database `json:"-"`
Logger *logger.Logger `json:"-"`
}
type Manager struct {
LoadedPlugins sync.Map //Storing *Plugin
TagPluginMap sync.Map //Storing *radix.Tree for each plugin tag
LoadedPlugins sync.Map //Storing *Plugin
tagPluginMap sync.Map //Storing *radix.Tree for each plugin tag
tagPluginList map[string][]*Plugin //Storing the plugin list for each tag, only concurrent READ is allowed
Options *ManagerOptions
}

View File

@ -0,0 +1,162 @@
package zoraxy_plugin
import (
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
)
/*
Dynamic Path Handler
*/
type SniffResult int
const (
SniffResultAccpet SniffResult = iota // Forward the request to this plugin dynamic capture ingress
SniffResultSkip // Skip this plugin and let the next plugin handle the request
)
type SniffHandler func(*DynamicSniffForwardRequest) SniffResult
/*
RegisterDynamicSniffHandler registers a dynamic sniff handler for a path
You can decide to accept or skip the request based on the request header and paths
*/
func (p *PathRouter) RegisterDynamicSniffHandler(sniff_ingress string, mux *http.ServeMux, handler SniffHandler) {
if !strings.HasSuffix(sniff_ingress, "/") {
sniff_ingress = sniff_ingress + "/"
}
mux.Handle(sniff_ingress, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if p.enableDebugPrint {
fmt.Println("Request captured by dynamic sniff path: " + r.RequestURI)
}
// Decode the request payload
jsonBytes, err := io.ReadAll(r.Body)
if err != nil {
if p.enableDebugPrint {
fmt.Println("Error reading request body:", err)
}
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
payload, err := DecodeForwardRequestPayload(jsonBytes)
if err != nil {
if p.enableDebugPrint {
fmt.Println("Error decoding request payload:", err)
fmt.Print("Payload: ")
fmt.Println(string(jsonBytes))
}
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
// Get the forwarded request UUID
forwardUUID := r.Header.Get("X-Zoraxy-RequestID")
payload.requestUUID = forwardUUID
payload.rawRequest = r
sniffResult := handler(&payload)
if sniffResult == SniffResultAccpet {
w.WriteHeader(http.StatusOK)
w.Write([]byte("OK"))
} else {
w.WriteHeader(http.StatusNotImplemented)
w.Write([]byte("SKIP"))
}
}))
}
// RegisterDynamicCaptureHandle register the dynamic capture ingress path with a handler
func (p *PathRouter) RegisterDynamicCaptureHandle(capture_ingress string, mux *http.ServeMux, handlefunc func(http.ResponseWriter, *http.Request)) {
if !strings.HasSuffix(capture_ingress, "/") {
capture_ingress = capture_ingress + "/"
}
mux.Handle(capture_ingress, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if p.enableDebugPrint {
fmt.Println("Request captured by dynamic capture path: " + r.RequestURI)
}
rewrittenURL := r.RequestURI
rewrittenURL = strings.TrimPrefix(rewrittenURL, capture_ingress)
rewrittenURL = strings.ReplaceAll(rewrittenURL, "//", "/")
if rewrittenURL == "" {
rewrittenURL = "/"
}
if !strings.HasPrefix(rewrittenURL, "/") {
rewrittenURL = "/" + rewrittenURL
}
r.RequestURI = rewrittenURL
handlefunc(w, r)
}))
}
/*
Sniffing and forwarding
The following functions are here to help with
sniffing and forwarding requests to the dynamic
router.
*/
// A custom request object to be used in the dynamic sniffing
type DynamicSniffForwardRequest struct {
Method string `json:"method"`
Hostname string `json:"hostname"`
URL string `json:"url"`
Header map[string][]string `json:"header"`
RemoteAddr string `json:"remote_addr"`
Host string `json:"host"`
RequestURI string `json:"request_uri"`
Proto string `json:"proto"`
ProtoMajor int `json:"proto_major"`
ProtoMinor int `json:"proto_minor"`
/* Internal use */
rawRequest *http.Request `json:"-"`
requestUUID string `json:"-"`
}
// GetForwardRequestPayload returns a DynamicSniffForwardRequest object from an http.Request object
func EncodeForwardRequestPayload(r *http.Request) DynamicSniffForwardRequest {
return DynamicSniffForwardRequest{
Method: r.Method,
Hostname: r.Host,
URL: r.URL.String(),
Header: r.Header,
RemoteAddr: r.RemoteAddr,
Host: r.Host,
RequestURI: r.RequestURI,
Proto: r.Proto,
ProtoMajor: r.ProtoMajor,
ProtoMinor: r.ProtoMinor,
rawRequest: r,
}
}
// DecodeForwardRequestPayload decodes JSON bytes into a DynamicSniffForwardRequest object
func DecodeForwardRequestPayload(jsonBytes []byte) (DynamicSniffForwardRequest, error) {
var payload DynamicSniffForwardRequest
err := json.Unmarshal(jsonBytes, &payload)
if err != nil {
return DynamicSniffForwardRequest{}, err
}
return payload, nil
}
// GetRequest returns the original http.Request object, for debugging purposes
func (dsfr *DynamicSniffForwardRequest) GetRequest() *http.Request {
return dsfr.rawRequest
}
// GetRequestUUID returns the request UUID
// if this UUID is empty string, that might indicate the request
// is not coming from the dynamic router
func (dsfr *DynamicSniffForwardRequest) GetRequestUUID() string {
return dsfr.requestUUID
}

View File

@ -43,16 +43,18 @@ func (p *PathRouter) SetDebugPrintMode(enable bool) {
p.enableDebugPrint = enable
}
func (p *PathRouter) RegisterHandle(capture_ingress string, mux *http.ServeMux) {
// StartStaticCapture starts the static capture ingress
func (p *PathRouter) RegisterStaticCaptureHandle(capture_ingress string, mux *http.ServeMux) {
if !strings.HasSuffix(capture_ingress, "/") {
capture_ingress = capture_ingress + "/"
}
mux.Handle(capture_ingress, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
p.ServeHTTP(w, r)
p.staticCaptureServeHTTP(w, r)
}))
}
func (p *PathRouter) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// staticCaptureServeHTTP serves the static capture path using user defined handler
func (p *PathRouter) staticCaptureServeHTTP(w http.ResponseWriter, r *http.Request) {
capturePath := r.Header.Get("X-Zoraxy-Capture")
if capturePath != "" {
if p.enableDebugPrint {

View File

@ -931,7 +931,11 @@ func RemoveProxyBasicAuthExceptionPaths(w http.ResponseWriter, r *http.Request)
// Report the current status of the reverse proxy server
func ReverseProxyStatus(w http.ResponseWriter, r *http.Request) {
js, _ := json.Marshal(dynamicProxyRouter)
js, err := json.Marshal(dynamicProxyRouter)
if err != nil {
utils.SendErrorResponse(w, "Unable to marshal status data")
return
}
utils.SendJSONResponse(w, string(js))
}