Updated dpcore

- Added immediate flush for buffer (i.e. no chunked encoding)
- Added support for stream mode proxy sniffing logic
This commit is contained in:
Toby Chui 2024-04-01 21:36:11 +08:00
parent 8ce6471be5
commit 05daeded37
5 changed files with 188 additions and 66 deletions

View File

@ -1,6 +1,7 @@
package dpcore
import (
"context"
"errors"
"io"
"log"
@ -8,12 +9,9 @@ import (
"net/http"
"net/url"
"strings"
"sync"
"time"
)
var onExitFlushLoop func()
// ReverseProxy is an HTTP Handler that takes an incoming request and
// sends it to another server, proxying the response back to the
// client, support http, also support https tunnel using http.hijacker
@ -68,7 +66,12 @@ type requestCanceler interface {
CancelRequest(req *http.Request)
}
func NewDynamicProxyCore(target *url.URL, prepender string, ignoreTLSVerification bool) *ReverseProxy {
type DpcoreOptions struct {
IgnoreTLSVerification bool
FlushInterval time.Duration
}
func NewDynamicProxyCore(target *url.URL, prepender string, dpcOptions *DpcoreOptions) *ReverseProxy {
targetQuery := target.RawQuery
director := func(req *http.Request) {
req.URL.Scheme = target.Scheme
@ -80,10 +83,6 @@ func NewDynamicProxyCore(target *url.URL, prepender string, ignoreTLSVerificatio
req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery
}
if _, ok := req.Header["User-Agent"]; !ok {
req.Header.Set("User-Agent", "")
}
}
//Hack the default transporter to handle more connections
@ -95,16 +94,17 @@ func NewDynamicProxyCore(target *url.URL, prepender string, ignoreTLSVerificatio
thisTransporter.(*http.Transport).MaxConnsPerHost = optimalConcurrentConnection * 2
thisTransporter.(*http.Transport).DisableCompression = true
if ignoreTLSVerification {
if dpcOptions.IgnoreTLSVerification {
//Ignore TLS certificate validation error
thisTransporter.(*http.Transport).TLSClientConfig.InsecureSkipVerify = true
}
return &ReverseProxy{
Director: director,
Prepender: prepender,
Verbal: false,
Transport: thisTransporter,
Director: director,
Prepender: prepender,
FlushInterval: dpcOptions.FlushInterval,
Verbal: false,
Transport: thisTransporter,
}
}
@ -178,64 +178,66 @@ var hopHeaders = []string{
//"Upgrade",
}
func (p *ReverseProxy) copyResponse(dst io.Writer, src io.Reader) {
if p.FlushInterval != 0 {
if wf, ok := dst.(writeFlusher); ok {
mlw := &maxLatencyWriter{
dst: wf,
latency: p.FlushInterval,
done: make(chan bool),
}
go mlw.flushLoop()
defer mlw.stop()
dst = mlw
// Copy response from src to dst with given flush interval, reference from httputil.ReverseProxy
func (p *ReverseProxy) copyResponse(dst http.ResponseWriter, src io.Reader, flushInterval time.Duration) error {
var w io.Writer = dst
if flushInterval != 0 {
mlw := &maxLatencyWriter{
dst: dst,
flush: http.NewResponseController(dst).Flush,
latency: flushInterval,
}
defer mlw.stop()
// set up initial timer so headers get flushed even if body writes are delayed
mlw.flushPending = true
mlw.t = time.AfterFunc(flushInterval, mlw.delayedFlush)
w = mlw
}
io.Copy(dst, src)
var buf []byte
_, err := p.copyBuffer(w, src, buf)
return err
}
type writeFlusher interface {
io.Writer
http.Flusher
}
// Copy with given buffer size. Default to 64k
func (p *ReverseProxy) copyBuffer(dst io.Writer, src io.Reader, buf []byte) (int64, error) {
if len(buf) == 0 {
buf = make([]byte, 64*1024)
}
type maxLatencyWriter struct {
dst writeFlusher
latency time.Duration
mu sync.Mutex
done chan bool
}
func (m *maxLatencyWriter) Write(b []byte) (int, error) {
m.mu.Lock()
defer m.mu.Unlock()
return m.dst.Write(b)
}
func (m *maxLatencyWriter) flushLoop() {
t := time.NewTicker(m.latency)
defer t.Stop()
var written int64
for {
select {
case <-m.done:
if onExitFlushLoop != nil {
onExitFlushLoop()
nr, rerr := src.Read(buf)
if rerr != nil && rerr != io.EOF && rerr != context.Canceled {
p.logf("dpcore read error during body copy: %v", rerr)
}
if nr > 0 {
nw, werr := dst.Write(buf[:nr])
if nw > 0 {
written += int64(nw)
}
return
case <-t.C:
m.mu.Lock()
m.dst.Flush()
m.mu.Unlock()
if werr != nil {
return written, werr
}
if nr != nw {
return written, io.ErrShortWrite
}
}
if rerr != nil {
if rerr == io.EOF {
rerr = nil
}
return written, rerr
}
}
}
func (m *maxLatencyWriter) stop() {
m.done <- true
}
func (p *ReverseProxy) logf(format string, args ...interface{}) {
if p.ErrorLog != nil {
p.ErrorLog.Printf(format, args...)
@ -373,6 +375,12 @@ func (p *ReverseProxy) ProxyHTTP(rw http.ResponseWriter, req *http.Request, rrr
// Remove hop-by-hop headers listed in the "Connection" header of the response, Remove hop-by-hop headers.
removeHeaders(res.Header, rrr.NoCache)
//Remove the User-Agent header if exists
if _, ok := res.Header["User-Agent"]; ok {
//Server to client request should not contains a User-Agent header
res.Header.Del("User-Agent")
}
if p.ModifyResponse != nil {
if err := p.ModifyResponse(res); err != nil {
if p.Verbal {
@ -438,7 +446,10 @@ func (p *ReverseProxy) ProxyHTTP(rw http.ResponseWriter, req *http.Request, rrr
}
}
p.copyResponse(rw, res.Body)
//Get flush interval in real time and start copying the request
flushInterval := p.getFlushInterval(req, res)
p.copyResponse(rw, res.Body, flushInterval)
// close now, instead of defer, to populate res.Trailer
res.Body.Close()
copyHeader(rw.Header(), res.Trailer)

View File

@ -0,0 +1,38 @@
package dpcore
import (
"mime"
"net/http"
"time"
)
// Auto sniff of flush interval from header
func (p *ReverseProxy) getFlushInterval(req *http.Request, res *http.Response) time.Duration {
contentType := req.Header.Get("Content-Type")
if actualContentType, _, _ := mime.ParseMediaType(contentType); actualContentType == "text/event-stream" {
return -1
}
if req.ContentLength == -1 || p.isBidirectionalStream(req, res) {
return -1
}
//Cannot sniff anything. Use default value
return p.FlushInterval
}
// Check for bidirectional stream, copy from Caddy :D
func (p *ReverseProxy) isBidirectionalStream(req *http.Request, res *http.Response) bool {
// We have to check the encoding here; only flush headers with identity encoding.
// Non-identity encoding might combine with "encode" directive, and in that case,
// if body size larger than enc.MinLength, upper level encode handle might have
// Content-Encoding header to write.
// (see https://github.com/caddyserver/caddy/issues/3606 for use case)
ae := req.Header.Get("Accept-Encoding")
return req.ProtoMajor == 2 &&
res.ProtoMajor == 2 &&
res.ContentLength == -1 &&
(ae == "identity" || ae == "")
}

View File

@ -0,0 +1,73 @@
package dpcore
/*
Max Latency Writer
This script implements a io writer with periodic flushing base on a ticker
Mostly based on httputil.ReverseProxy
*/
import (
"io"
"sync"
"time"
)
type maxLatencyWriter struct {
dst io.Writer
flush func() error
latency time.Duration // non-zero; negative means to flush immediately
mu sync.Mutex // protects t, flushPending, and dst.Flush
t *time.Timer
flushPending bool
}
func (m *maxLatencyWriter) Write(p []byte) (n int, err error) {
m.mu.Lock()
defer m.mu.Unlock()
n, err = m.dst.Write(p)
if m.latency < 0 {
//Flush immediately
m.flush()
return
}
if m.flushPending {
//Flush in next tick cycle
return
}
if m.t == nil {
m.t = time.AfterFunc(m.latency, m.delayedFlush)
} else {
m.t.Reset(m.latency)
}
m.flushPending = true
return
}
func (m *maxLatencyWriter) delayedFlush() {
m.mu.Lock()
defer m.mu.Unlock()
if !m.flushPending {
// if stop was called but AfterFunc already started this goroutine
return
}
m.flush()
m.flushPending = false
}
func (m *maxLatencyWriter) stop() {
m.mu.Lock()
defer m.mu.Unlock()
m.flushPending = false
if m.t != nil {
m.t.Stop()
}
}

View File

@ -42,7 +42,9 @@ func (router *Router) PrepareProxyRoute(endpoint *ProxyEndpoint) (*ProxyEndpoint
}
//Create the proxy routing handler
proxy := dpcore.NewDynamicProxyCore(path, "", endpoint.SkipCertValidations)
proxy := dpcore.NewDynamicProxyCore(path, "", &dpcore.DpcoreOptions{
IgnoreTLSVerification: endpoint.SkipCertValidations,
})
endpoint.proxy = proxy
endpoint.parent = router
@ -69,7 +71,9 @@ func (router *Router) PrepareProxyRoute(endpoint *ProxyEndpoint) (*ProxyEndpoint
return nil, err
}
proxy := dpcore.NewDynamicProxyCore(path, vdir.MatchingPath, vdir.SkipCertValidations)
proxy := dpcore.NewDynamicProxyCore(path, vdir.MatchingPath, &dpcore.DpcoreOptions{
IgnoreTLSVerification: vdir.SkipCertValidations,
})
vdir.proxy = proxy
vdir.parent = endpoint
}

View File

@ -85,10 +85,6 @@ func NewReverseProxy(target *url.URL) *ReverseProxy {
} else {
req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery
}
if _, ok := req.Header["User-Agent"]; !ok {
req.Header.Set("User-Agent", "")
}
}
return &ReverseProxy{Director: director, Verbal: false}