Add EnableLogging to Stream Proxy for log control

- Add `EnableLogging` to control TCP/UDP Connection logs to reduce log latency.
- Add `Enable Logging` Option in Stream Proxy rule.
- Update Stream Proxy UI.
This commit is contained in:
jemmy1794
2025-07-03 08:54:11 +08:00
parent 2d611a559a
commit 6b3b89f7bf
6 changed files with 70 additions and 24 deletions

View File

@@ -31,16 +31,16 @@ func isValidPort(port string) bool {
return true
}
func connCopy(conn1 net.Conn, conn2 net.Conn, wg *sync.WaitGroup, accumulator *atomic.Int64) {
func (c *ProxyRelayInstance) connCopy(conn1 net.Conn, conn2 net.Conn, wg *sync.WaitGroup, accumulator *atomic.Int64) {
n, err := io.Copy(conn1, conn2)
if err != nil {
return
}
accumulator.Add(n) //Add to accumulator
conn1.Close()
log.Println("[←]", "close the connect at local:["+conn1.LocalAddr().String()+"] and remote:["+conn1.RemoteAddr().String()+"]")
c.LogMsg("[←] close the connect at local:["+conn1.LocalAddr().String()+"] and remote:["+conn1.RemoteAddr().String()+"]", nil)
//conn2.Close()
//log.Println("[←]", "close the connect at local:["+conn2.LocalAddr().String()+"] and remote:["+conn2.RemoteAddr().String()+"]")
//c.LogMsg("[←] close the connect at local:["+conn2.LocalAddr().String()+"] and remote:["+conn2.RemoteAddr().String()+"]", nil)
wg.Done()
}
@@ -61,14 +61,16 @@ func writeProxyProtocolHeaderV1(dst net.Conn, src net.Conn) error {
return err
}
func forward(conn1 net.Conn, conn2 net.Conn, aTob *atomic.Int64, bToa *atomic.Int64) {
log.Printf("[+] start transmit. [%s],[%s] <-> [%s],[%s] \n", conn1.LocalAddr().String(), conn1.RemoteAddr().String(), conn2.LocalAddr().String(), conn2.RemoteAddr().String())
func (c *ProxyRelayInstance) forward(conn1 net.Conn, conn2 net.Conn, aTob *atomic.Int64, bToa *atomic.Int64) {
msg := fmt.Sprintf("[+] start transmit. [%s],[%s] <-> [%s],[%s]",
conn1.LocalAddr().String(), conn1.RemoteAddr().String(),
conn2.LocalAddr().String(), conn2.RemoteAddr().String())
c.LogMsg(msg, nil)
var wg sync.WaitGroup
// wait tow goroutines
wg.Add(2)
go connCopy(conn1, conn2, &wg, aTob)
go connCopy(conn2, conn1, &wg, bToa)
//blocking when the wg is locked
go c.connCopy(conn1, conn2, &wg, aTob)
go c.connCopy(conn2, conn1, &wg, bToa)
wg.Wait()
}
@@ -78,18 +80,18 @@ func (c *ProxyRelayInstance) accept(listener net.Listener) (net.Conn, error) {
return nil, err
}
//Check if connection in blacklist or whitelist
// Check if connection in blacklist or whitelist
if addr, ok := conn.RemoteAddr().(*net.TCPAddr); ok {
if !c.parent.Options.AccessControlHandler(conn) {
time.Sleep(300 * time.Millisecond)
conn.Close()
log.Println("[x]", "Connection from "+addr.IP.String()+" rejected by access control policy")
c.LogMsg("[x] Connection from "+addr.IP.String()+" rejected by access control policy", nil)
return nil, errors.New("Connection from " + addr.IP.String() + " rejected by access control policy")
}
}
log.Println("[√]", "accept a new client. remote address:["+conn.RemoteAddr().String()+"], local address:["+conn.LocalAddr().String()+"]")
return conn, err
c.LogMsg("[√] accept a new client. remote address:["+conn.RemoteAddr().String()+"], local address:["+conn.LocalAddr().String()+"]", nil)
return conn, nil
}
func startListener(address string) (net.Listener, error) {
@@ -130,7 +132,7 @@ func (c *ProxyRelayInstance) Port2host(allowPort string, targetAddress string, s
//Start stop handler
go func() {
<-stopChan
log.Println("[x]", "Received stop signal. Exiting Port to Host forwarder")
c.LogMsg("[x] Received stop signal. Exiting Port to Host forwarder", nil)
server.Close()
}()
@@ -147,32 +149,32 @@ func (c *ProxyRelayInstance) Port2host(allowPort string, targetAddress string, s
}
go func(targetAddress string) {
log.Println("[+]", "start connect host:["+targetAddress+"]")
c.LogMsg("[+] start connect host:["+targetAddress+"]", nil)
target, err := net.Dial("tcp", targetAddress)
if err != nil {
// temporarily unavailable, don't use fatal.
log.Println("[x]", "connect target address ["+targetAddress+"] faild. retry in ", c.Timeout, "seconds. ")
c.LogMsg("[x] connect target address ["+targetAddress+"] failed. retry in "+strconv.Itoa(c.Timeout)+" seconds.", nil)
conn.Close()
log.Println("[←]", "close the connect at local:["+conn.LocalAddr().String()+"] and remote:["+conn.RemoteAddr().String()+"]")
c.LogMsg("[←] close the connect at local:["+conn.LocalAddr().String()+"] and remote:["+conn.RemoteAddr().String()+"]", nil)
time.Sleep(time.Duration(c.Timeout) * time.Second)
return
}
log.Println("[→]", "connect target address ["+targetAddress+"] success.")
c.LogMsg("[→] connect target address ["+targetAddress+"] success.", nil)
if c.UseProxyProtocol {
log.Println("[+]", "write proxy protocol header to target address ["+targetAddress+"]")
c.LogMsg("[+] write proxy protocol header to target address ["+targetAddress+"]", nil)
err = writeProxyProtocolHeaderV1(target, conn)
if err != nil {
log.Println("[x]", "Write proxy protocol header faild: ", err)
c.LogMsg("[x] Write proxy protocol header failed: "+err.Error(), nil)
target.Close()
conn.Close()
log.Println("[←]", "close the connect at local:["+conn.LocalAddr().String()+"] and remote:["+conn.RemoteAddr().String()+"]")
c.LogMsg("[←] close the connect at local:["+conn.LocalAddr().String()+"] and remote:["+conn.RemoteAddr().String()+"]", nil)
time.Sleep(time.Duration(c.Timeout) * time.Second)
return
}
}
forward(target, conn, &c.aTobAccumulatedByteTransfer, &c.bToaAccumulatedByteTransfer)
c.forward(target, conn, &c.aTobAccumulatedByteTransfer, &c.bToaAccumulatedByteTransfer)
}(targetAddress)
}
}