diff --git a/src/mod/dynamicproxy/dpcore/dpcore.go b/src/mod/dynamicproxy/dpcore/dpcore.go index 1d9cc6a..e7b360b 100644 --- a/src/mod/dynamicproxy/dpcore/dpcore.go +++ b/src/mod/dynamicproxy/dpcore/dpcore.go @@ -1,6 +1,7 @@ package dpcore import ( + "context" "errors" "io" "log" @@ -8,12 +9,9 @@ import ( "net/http" "net/url" "strings" - "sync" "time" ) -var onExitFlushLoop func() - // ReverseProxy is an HTTP Handler that takes an incoming request and // sends it to another server, proxying the response back to the // client, support http, also support https tunnel using http.hijacker @@ -68,7 +66,12 @@ type requestCanceler interface { CancelRequest(req *http.Request) } -func NewDynamicProxyCore(target *url.URL, prepender string, ignoreTLSVerification bool) *ReverseProxy { +type DpcoreOptions struct { + IgnoreTLSVerification bool + FlushInterval time.Duration +} + +func NewDynamicProxyCore(target *url.URL, prepender string, dpcOptions *DpcoreOptions) *ReverseProxy { targetQuery := target.RawQuery director := func(req *http.Request) { req.URL.Scheme = target.Scheme @@ -80,10 +83,6 @@ func NewDynamicProxyCore(target *url.URL, prepender string, ignoreTLSVerificatio req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery } - if _, ok := req.Header["User-Agent"]; !ok { - req.Header.Set("User-Agent", "") - } - } //Hack the default transporter to handle more connections @@ -95,16 +94,17 @@ func NewDynamicProxyCore(target *url.URL, prepender string, ignoreTLSVerificatio thisTransporter.(*http.Transport).MaxConnsPerHost = optimalConcurrentConnection * 2 thisTransporter.(*http.Transport).DisableCompression = true - if ignoreTLSVerification { + if dpcOptions.IgnoreTLSVerification { //Ignore TLS certificate validation error thisTransporter.(*http.Transport).TLSClientConfig.InsecureSkipVerify = true } return &ReverseProxy{ - Director: director, - Prepender: prepender, - Verbal: false, - Transport: thisTransporter, + Director: director, + Prepender: prepender, + FlushInterval: dpcOptions.FlushInterval, + Verbal: false, + Transport: thisTransporter, } } @@ -178,64 +178,66 @@ var hopHeaders = []string{ //"Upgrade", } -func (p *ReverseProxy) copyResponse(dst io.Writer, src io.Reader) { - if p.FlushInterval != 0 { - if wf, ok := dst.(writeFlusher); ok { - mlw := &maxLatencyWriter{ - dst: wf, - latency: p.FlushInterval, - done: make(chan bool), - } - - go mlw.flushLoop() - defer mlw.stop() - dst = mlw +// Copy response from src to dst with given flush interval, reference from httputil.ReverseProxy +func (p *ReverseProxy) copyResponse(dst http.ResponseWriter, src io.Reader, flushInterval time.Duration) error { + var w io.Writer = dst + if flushInterval != 0 { + mlw := &maxLatencyWriter{ + dst: dst, + flush: http.NewResponseController(dst).Flush, + latency: flushInterval, } + + defer mlw.stop() + // set up initial timer so headers get flushed even if body writes are delayed + mlw.flushPending = true + mlw.t = time.AfterFunc(flushInterval, mlw.delayedFlush) + w = mlw } - io.Copy(dst, src) + var buf []byte + _, err := p.copyBuffer(w, src, buf) + return err + } -type writeFlusher interface { - io.Writer - http.Flusher -} +// Copy with given buffer size. Default to 64k +func (p *ReverseProxy) copyBuffer(dst io.Writer, src io.Reader, buf []byte) (int64, error) { + if len(buf) == 0 { + buf = make([]byte, 64*1024) + } -type maxLatencyWriter struct { - dst writeFlusher - latency time.Duration - mu sync.Mutex - done chan bool -} - -func (m *maxLatencyWriter) Write(b []byte) (int, error) { - m.mu.Lock() - defer m.mu.Unlock() - return m.dst.Write(b) -} - -func (m *maxLatencyWriter) flushLoop() { - t := time.NewTicker(m.latency) - defer t.Stop() + var written int64 for { - select { - case <-m.done: - if onExitFlushLoop != nil { - onExitFlushLoop() + nr, rerr := src.Read(buf) + if rerr != nil && rerr != io.EOF && rerr != context.Canceled { + p.logf("dpcore read error during body copy: %v", rerr) + } + + if nr > 0 { + nw, werr := dst.Write(buf[:nr]) + if nw > 0 { + written += int64(nw) } - return - case <-t.C: - m.mu.Lock() - m.dst.Flush() - m.mu.Unlock() + + if werr != nil { + return written, werr + } + + if nr != nw { + return written, io.ErrShortWrite + } + } + + if rerr != nil { + if rerr == io.EOF { + rerr = nil + } + return written, rerr } } } -func (m *maxLatencyWriter) stop() { - m.done <- true -} - func (p *ReverseProxy) logf(format string, args ...interface{}) { if p.ErrorLog != nil { p.ErrorLog.Printf(format, args...) @@ -373,6 +375,12 @@ func (p *ReverseProxy) ProxyHTTP(rw http.ResponseWriter, req *http.Request, rrr // Remove hop-by-hop headers listed in the "Connection" header of the response, Remove hop-by-hop headers. removeHeaders(res.Header, rrr.NoCache) + //Remove the User-Agent header if exists + if _, ok := res.Header["User-Agent"]; ok { + //Server to client request should not contains a User-Agent header + res.Header.Del("User-Agent") + } + if p.ModifyResponse != nil { if err := p.ModifyResponse(res); err != nil { if p.Verbal { @@ -438,7 +446,10 @@ func (p *ReverseProxy) ProxyHTTP(rw http.ResponseWriter, req *http.Request, rrr } } - p.copyResponse(rw, res.Body) + //Get flush interval in real time and start copying the request + flushInterval := p.getFlushInterval(req, res) + p.copyResponse(rw, res.Body, flushInterval) + // close now, instead of defer, to populate res.Trailer res.Body.Close() copyHeader(rw.Header(), res.Trailer) diff --git a/src/mod/dynamicproxy/dpcore/flush.go b/src/mod/dynamicproxy/dpcore/flush.go new file mode 100644 index 0000000..d62e058 --- /dev/null +++ b/src/mod/dynamicproxy/dpcore/flush.go @@ -0,0 +1,38 @@ +package dpcore + +import ( + "mime" + "net/http" + "time" +) + +// Auto sniff of flush interval from header +func (p *ReverseProxy) getFlushInterval(req *http.Request, res *http.Response) time.Duration { + contentType := req.Header.Get("Content-Type") + if actualContentType, _, _ := mime.ParseMediaType(contentType); actualContentType == "text/event-stream" { + return -1 + } + + if req.ContentLength == -1 || p.isBidirectionalStream(req, res) { + return -1 + } + + //Cannot sniff anything. Use default value + return p.FlushInterval + +} + +// Check for bidirectional stream, copy from Caddy :D +func (p *ReverseProxy) isBidirectionalStream(req *http.Request, res *http.Response) bool { + // We have to check the encoding here; only flush headers with identity encoding. + // Non-identity encoding might combine with "encode" directive, and in that case, + // if body size larger than enc.MinLength, upper level encode handle might have + // Content-Encoding header to write. + // (see https://github.com/caddyserver/caddy/issues/3606 for use case) + ae := req.Header.Get("Accept-Encoding") + + return req.ProtoMajor == 2 && + res.ProtoMajor == 2 && + res.ContentLength == -1 && + (ae == "identity" || ae == "") +} diff --git a/src/mod/dynamicproxy/dpcore/maxLatencyWriter.go b/src/mod/dynamicproxy/dpcore/maxLatencyWriter.go new file mode 100644 index 0000000..fec8520 --- /dev/null +++ b/src/mod/dynamicproxy/dpcore/maxLatencyWriter.go @@ -0,0 +1,73 @@ +package dpcore + +/* + +Max Latency Writer + +This script implements a io writer with periodic flushing base on a ticker +Mostly based on httputil.ReverseProxy + +*/ + +import ( + "io" + "sync" + "time" +) + +type maxLatencyWriter struct { + dst io.Writer + flush func() error + latency time.Duration // non-zero; negative means to flush immediately + mu sync.Mutex // protects t, flushPending, and dst.Flush + t *time.Timer + flushPending bool +} + +func (m *maxLatencyWriter) Write(p []byte) (n int, err error) { + m.mu.Lock() + defer m.mu.Unlock() + n, err = m.dst.Write(p) + if m.latency < 0 { + //Flush immediately + m.flush() + return + } + + if m.flushPending { + //Flush in next tick cycle + return + } + + if m.t == nil { + m.t = time.AfterFunc(m.latency, m.delayedFlush) + } else { + m.t.Reset(m.latency) + } + + m.flushPending = true + return + +} + +func (m *maxLatencyWriter) delayedFlush() { + m.mu.Lock() + defer m.mu.Unlock() + if !m.flushPending { + // if stop was called but AfterFunc already started this goroutine + return + } + + m.flush() + m.flushPending = false +} + +func (m *maxLatencyWriter) stop() { + m.mu.Lock() + defer m.mu.Unlock() + + m.flushPending = false + if m.t != nil { + m.t.Stop() + } +} diff --git a/src/mod/dynamicproxy/router.go b/src/mod/dynamicproxy/router.go index 5f8c741..2c30143 100644 --- a/src/mod/dynamicproxy/router.go +++ b/src/mod/dynamicproxy/router.go @@ -42,7 +42,9 @@ func (router *Router) PrepareProxyRoute(endpoint *ProxyEndpoint) (*ProxyEndpoint } //Create the proxy routing handler - proxy := dpcore.NewDynamicProxyCore(path, "", endpoint.SkipCertValidations) + proxy := dpcore.NewDynamicProxyCore(path, "", &dpcore.DpcoreOptions{ + IgnoreTLSVerification: endpoint.SkipCertValidations, + }) endpoint.proxy = proxy endpoint.parent = router @@ -69,7 +71,9 @@ func (router *Router) PrepareProxyRoute(endpoint *ProxyEndpoint) (*ProxyEndpoint return nil, err } - proxy := dpcore.NewDynamicProxyCore(path, vdir.MatchingPath, vdir.SkipCertValidations) + proxy := dpcore.NewDynamicProxyCore(path, vdir.MatchingPath, &dpcore.DpcoreOptions{ + IgnoreTLSVerification: vdir.SkipCertValidations, + }) vdir.proxy = proxy vdir.parent = endpoint } diff --git a/src/mod/reverseproxy/reverse.go b/src/mod/reverseproxy/reverse.go index 6ded0ae..27d2c48 100644 --- a/src/mod/reverseproxy/reverse.go +++ b/src/mod/reverseproxy/reverse.go @@ -85,10 +85,6 @@ func NewReverseProxy(target *url.URL) *ReverseProxy { } else { req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery } - - if _, ok := req.Header["User-Agent"]; !ok { - req.Header.Set("User-Agent", "") - } } return &ReverseProxy{Director: director, Verbal: false}