[chore]: Bump golang.org/x/net from 0.25.0 to 0.26.0 (#2986)

Bumps [golang.org/x/net](https://github.com/golang/net) from 0.25.0 to 0.26.0.
- [Commits](https://github.com/golang/net/compare/v0.25.0...v0.26.0)

---
updated-dependencies:
- dependency-name: golang.org/x/net
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
This commit is contained in:
dependabot[bot] 2024-06-10 07:40:53 +00:00 committed by GitHub
parent 83ee766e34
commit 3babb6c0d2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 217 additions and 577 deletions

2
go.mod
View file

@ -74,7 +74,7 @@ require (
go.uber.org/automaxprocs v1.5.3 go.uber.org/automaxprocs v1.5.3
golang.org/x/crypto v0.24.0 golang.org/x/crypto v0.24.0
golang.org/x/image v0.16.0 golang.org/x/image v0.16.0
golang.org/x/net v0.25.0 golang.org/x/net v0.26.0
golang.org/x/oauth2 v0.20.0 golang.org/x/oauth2 v0.20.0
golang.org/x/text v0.16.0 golang.org/x/text v0.16.0
gopkg.in/mcuadros/go-syslog.v2 v2.3.0 gopkg.in/mcuadros/go-syslog.v2 v2.3.0

4
go.sum
View file

@ -754,8 +754,8 @@ golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81R
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY= golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY=
golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ=
golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=

View file

@ -17,6 +17,7 @@
import ( import (
"bufio" "bufio"
"context"
"crypto/tls" "crypto/tls"
"fmt" "fmt"
"io" "io"
@ -26,6 +27,7 @@
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"time"
"golang.org/x/net/http/httpguts" "golang.org/x/net/http/httpguts"
) )
@ -210,12 +212,6 @@ type stringWriter interface {
WriteString(s string) (n int, err error) WriteString(s string) (n int, err error)
} }
// A gate lets two goroutines coordinate their activities.
type gate chan struct{}
func (g gate) Done() { g <- struct{}{} }
func (g gate) Wait() { <-g }
// A closeWaiter is like a sync.WaitGroup but only goes 1 to 0 (open to closed). // A closeWaiter is like a sync.WaitGroup but only goes 1 to 0 (open to closed).
type closeWaiter chan struct{} type closeWaiter chan struct{}
@ -383,3 +379,14 @@ func validPseudoPath(v string) bool {
// makes that struct also non-comparable, and generally doesn't add // makes that struct also non-comparable, and generally doesn't add
// any size (as long as it's first). // any size (as long as it's first).
type incomparable [0]func() type incomparable [0]func()
// synctestGroupInterface is the methods of synctestGroup used by Server and Transport.
// It's defined as an interface here to let us keep synctestGroup entirely test-only
// and not a part of non-test builds.
type synctestGroupInterface interface {
Join()
Now() time.Time
NewTimer(d time.Duration) timer
AfterFunc(d time.Duration, f func()) timer
ContextWithTimeout(ctx context.Context, d time.Duration) (context.Context, context.CancelFunc)
}

View file

@ -154,6 +154,39 @@ type Server struct {
// so that we don't embed a Mutex in this struct, which will make the // so that we don't embed a Mutex in this struct, which will make the
// struct non-copyable, which might break some callers. // struct non-copyable, which might break some callers.
state *serverInternalState state *serverInternalState
// Synchronization group used for testing.
// Outside of tests, this is nil.
group synctestGroupInterface
}
func (s *Server) markNewGoroutine() {
if s.group != nil {
s.group.Join()
}
}
func (s *Server) now() time.Time {
if s.group != nil {
return s.group.Now()
}
return time.Now()
}
// newTimer creates a new time.Timer, or a synthetic timer in tests.
func (s *Server) newTimer(d time.Duration) timer {
if s.group != nil {
return s.group.NewTimer(d)
}
return timeTimer{time.NewTimer(d)}
}
// afterFunc creates a new time.AfterFunc timer, or a synthetic timer in tests.
func (s *Server) afterFunc(d time.Duration, f func()) timer {
if s.group != nil {
return s.group.AfterFunc(d, f)
}
return timeTimer{time.AfterFunc(d, f)}
} }
func (s *Server) initialConnRecvWindowSize() int32 { func (s *Server) initialConnRecvWindowSize() int32 {
@ -400,6 +433,10 @@ func (o *ServeConnOpts) handler() http.Handler {
// //
// The opts parameter is optional. If nil, default values are used. // The opts parameter is optional. If nil, default values are used.
func (s *Server) ServeConn(c net.Conn, opts *ServeConnOpts) { func (s *Server) ServeConn(c net.Conn, opts *ServeConnOpts) {
s.serveConn(c, opts, nil)
}
func (s *Server) serveConn(c net.Conn, opts *ServeConnOpts, newf func(*serverConn)) {
baseCtx, cancel := serverConnBaseContext(c, opts) baseCtx, cancel := serverConnBaseContext(c, opts)
defer cancel() defer cancel()
@ -426,6 +463,9 @@ func (s *Server) ServeConn(c net.Conn, opts *ServeConnOpts) {
pushEnabled: true, pushEnabled: true,
sawClientPreface: opts.SawClientPreface, sawClientPreface: opts.SawClientPreface,
} }
if newf != nil {
newf(sc)
}
s.state.registerConn(sc) s.state.registerConn(sc)
defer s.state.unregisterConn(sc) defer s.state.unregisterConn(sc)
@ -599,8 +639,8 @@ type serverConn struct {
inFrameScheduleLoop bool // whether we're in the scheduleFrameWrite loop inFrameScheduleLoop bool // whether we're in the scheduleFrameWrite loop
needToSendGoAway bool // we need to schedule a GOAWAY frame write needToSendGoAway bool // we need to schedule a GOAWAY frame write
goAwayCode ErrCode goAwayCode ErrCode
shutdownTimer *time.Timer // nil until used shutdownTimer timer // nil until used
idleTimer *time.Timer // nil if unused idleTimer timer // nil if unused
// Owned by the writeFrameAsync goroutine: // Owned by the writeFrameAsync goroutine:
headerWriteBuf bytes.Buffer headerWriteBuf bytes.Buffer
@ -649,12 +689,12 @@ type stream struct {
flow outflow // limits writing from Handler to client flow outflow // limits writing from Handler to client
inflow inflow // what the client is allowed to POST/etc to us inflow inflow // what the client is allowed to POST/etc to us
state streamState state streamState
resetQueued bool // RST_STREAM queued for write; set by sc.resetStream resetQueued bool // RST_STREAM queued for write; set by sc.resetStream
gotTrailerHeader bool // HEADER frame for trailers was seen gotTrailerHeader bool // HEADER frame for trailers was seen
wroteHeaders bool // whether we wrote headers (not status 100) wroteHeaders bool // whether we wrote headers (not status 100)
readDeadline *time.Timer // nil if unused readDeadline timer // nil if unused
writeDeadline *time.Timer // nil if unused writeDeadline timer // nil if unused
closeErr error // set before cw is closed closeErr error // set before cw is closed
trailer http.Header // accumulated trailers trailer http.Header // accumulated trailers
reqTrailer http.Header // handler's Request.Trailer reqTrailer http.Header // handler's Request.Trailer
@ -811,8 +851,9 @@ type readFrameResult struct {
// consumer is done with the frame. // consumer is done with the frame.
// It's run on its own goroutine. // It's run on its own goroutine.
func (sc *serverConn) readFrames() { func (sc *serverConn) readFrames() {
gate := make(gate) sc.srv.markNewGoroutine()
gateDone := gate.Done gate := make(chan struct{})
gateDone := func() { gate <- struct{}{} }
for { for {
f, err := sc.framer.ReadFrame() f, err := sc.framer.ReadFrame()
select { select {
@ -843,6 +884,7 @@ type frameWriteResult struct {
// At most one goroutine can be running writeFrameAsync at a time per // At most one goroutine can be running writeFrameAsync at a time per
// serverConn. // serverConn.
func (sc *serverConn) writeFrameAsync(wr FrameWriteRequest, wd *writeData) { func (sc *serverConn) writeFrameAsync(wr FrameWriteRequest, wd *writeData) {
sc.srv.markNewGoroutine()
var err error var err error
if wd == nil { if wd == nil {
err = wr.write.writeFrame(sc) err = wr.write.writeFrame(sc)
@ -922,13 +964,13 @@ func (sc *serverConn) serve() {
sc.setConnState(http.StateIdle) sc.setConnState(http.StateIdle)
if sc.srv.IdleTimeout > 0 { if sc.srv.IdleTimeout > 0 {
sc.idleTimer = time.AfterFunc(sc.srv.IdleTimeout, sc.onIdleTimer) sc.idleTimer = sc.srv.afterFunc(sc.srv.IdleTimeout, sc.onIdleTimer)
defer sc.idleTimer.Stop() defer sc.idleTimer.Stop()
} }
go sc.readFrames() // closed by defer sc.conn.Close above go sc.readFrames() // closed by defer sc.conn.Close above
settingsTimer := time.AfterFunc(firstSettingsTimeout, sc.onSettingsTimer) settingsTimer := sc.srv.afterFunc(firstSettingsTimeout, sc.onSettingsTimer)
defer settingsTimer.Stop() defer settingsTimer.Stop()
loopNum := 0 loopNum := 0
@ -1057,10 +1099,10 @@ func (sc *serverConn) readPreface() error {
errc <- nil errc <- nil
} }
}() }()
timer := time.NewTimer(prefaceTimeout) // TODO: configurable on *Server? timer := sc.srv.newTimer(prefaceTimeout) // TODO: configurable on *Server?
defer timer.Stop() defer timer.Stop()
select { select {
case <-timer.C: case <-timer.C():
return errPrefaceTimeout return errPrefaceTimeout
case err := <-errc: case err := <-errc:
if err == nil { if err == nil {
@ -1425,7 +1467,7 @@ func (sc *serverConn) goAway(code ErrCode) {
func (sc *serverConn) shutDownIn(d time.Duration) { func (sc *serverConn) shutDownIn(d time.Duration) {
sc.serveG.check() sc.serveG.check()
sc.shutdownTimer = time.AfterFunc(d, sc.onShutdownTimer) sc.shutdownTimer = sc.srv.afterFunc(d, sc.onShutdownTimer)
} }
func (sc *serverConn) resetStream(se StreamError) { func (sc *serverConn) resetStream(se StreamError) {
@ -1639,7 +1681,7 @@ func (sc *serverConn) closeStream(st *stream, err error) {
delete(sc.streams, st.id) delete(sc.streams, st.id)
if len(sc.streams) == 0 { if len(sc.streams) == 0 {
sc.setConnState(http.StateIdle) sc.setConnState(http.StateIdle)
if sc.srv.IdleTimeout > 0 { if sc.srv.IdleTimeout > 0 && sc.idleTimer != nil {
sc.idleTimer.Reset(sc.srv.IdleTimeout) sc.idleTimer.Reset(sc.srv.IdleTimeout)
} }
if h1ServerKeepAlivesDisabled(sc.hs) { if h1ServerKeepAlivesDisabled(sc.hs) {
@ -1661,6 +1703,7 @@ func (sc *serverConn) closeStream(st *stream, err error) {
} }
} }
st.closeErr = err st.closeErr = err
st.cancelCtx()
st.cw.Close() // signals Handler's CloseNotifier, unblocks writes, etc st.cw.Close() // signals Handler's CloseNotifier, unblocks writes, etc
sc.writeSched.CloseStream(st.id) sc.writeSched.CloseStream(st.id)
} }
@ -2021,7 +2064,7 @@ func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error {
// (in Go 1.8), though. That's a more sane option anyway. // (in Go 1.8), though. That's a more sane option anyway.
if sc.hs.ReadTimeout > 0 { if sc.hs.ReadTimeout > 0 {
sc.conn.SetReadDeadline(time.Time{}) sc.conn.SetReadDeadline(time.Time{})
st.readDeadline = time.AfterFunc(sc.hs.ReadTimeout, st.onReadTimeout) st.readDeadline = sc.srv.afterFunc(sc.hs.ReadTimeout, st.onReadTimeout)
} }
return sc.scheduleHandler(id, rw, req, handler) return sc.scheduleHandler(id, rw, req, handler)
@ -2119,7 +2162,7 @@ func (sc *serverConn) newStream(id, pusherID uint32, state streamState) *stream
st.flow.add(sc.initialStreamSendWindowSize) st.flow.add(sc.initialStreamSendWindowSize)
st.inflow.init(sc.srv.initialStreamRecvWindowSize()) st.inflow.init(sc.srv.initialStreamRecvWindowSize())
if sc.hs.WriteTimeout > 0 { if sc.hs.WriteTimeout > 0 {
st.writeDeadline = time.AfterFunc(sc.hs.WriteTimeout, st.onWriteTimeout) st.writeDeadline = sc.srv.afterFunc(sc.hs.WriteTimeout, st.onWriteTimeout)
} }
sc.streams[id] = st sc.streams[id] = st
@ -2343,6 +2386,7 @@ func (sc *serverConn) handlerDone() {
// Run on its own goroutine. // Run on its own goroutine.
func (sc *serverConn) runHandler(rw *responseWriter, req *http.Request, handler func(http.ResponseWriter, *http.Request)) { func (sc *serverConn) runHandler(rw *responseWriter, req *http.Request, handler func(http.ResponseWriter, *http.Request)) {
sc.srv.markNewGoroutine()
defer sc.sendServeMsg(handlerDoneMsg) defer sc.sendServeMsg(handlerDoneMsg)
didPanic := true didPanic := true
defer func() { defer func() {
@ -2639,7 +2683,7 @@ func (rws *responseWriterState) writeChunk(p []byte) (n int, err error) {
var date string var date string
if _, ok := rws.snapHeader["Date"]; !ok { if _, ok := rws.snapHeader["Date"]; !ok {
// TODO(bradfitz): be faster here, like net/http? measure. // TODO(bradfitz): be faster here, like net/http? measure.
date = time.Now().UTC().Format(http.TimeFormat) date = rws.conn.srv.now().UTC().Format(http.TimeFormat)
} }
for _, v := range rws.snapHeader["Trailer"] { for _, v := range rws.snapHeader["Trailer"] {
@ -2761,7 +2805,7 @@ func (rws *responseWriterState) promoteUndeclaredTrailers() {
func (w *responseWriter) SetReadDeadline(deadline time.Time) error { func (w *responseWriter) SetReadDeadline(deadline time.Time) error {
st := w.rws.stream st := w.rws.stream
if !deadline.IsZero() && deadline.Before(time.Now()) { if !deadline.IsZero() && deadline.Before(w.rws.conn.srv.now()) {
// If we're setting a deadline in the past, reset the stream immediately // If we're setting a deadline in the past, reset the stream immediately
// so writes after SetWriteDeadline returns will fail. // so writes after SetWriteDeadline returns will fail.
st.onReadTimeout() st.onReadTimeout()
@ -2777,9 +2821,9 @@ func (w *responseWriter) SetReadDeadline(deadline time.Time) error {
if deadline.IsZero() { if deadline.IsZero() {
st.readDeadline = nil st.readDeadline = nil
} else if st.readDeadline == nil { } else if st.readDeadline == nil {
st.readDeadline = time.AfterFunc(deadline.Sub(time.Now()), st.onReadTimeout) st.readDeadline = sc.srv.afterFunc(deadline.Sub(sc.srv.now()), st.onReadTimeout)
} else { } else {
st.readDeadline.Reset(deadline.Sub(time.Now())) st.readDeadline.Reset(deadline.Sub(sc.srv.now()))
} }
}) })
return nil return nil
@ -2787,7 +2831,7 @@ func (w *responseWriter) SetReadDeadline(deadline time.Time) error {
func (w *responseWriter) SetWriteDeadline(deadline time.Time) error { func (w *responseWriter) SetWriteDeadline(deadline time.Time) error {
st := w.rws.stream st := w.rws.stream
if !deadline.IsZero() && deadline.Before(time.Now()) { if !deadline.IsZero() && deadline.Before(w.rws.conn.srv.now()) {
// If we're setting a deadline in the past, reset the stream immediately // If we're setting a deadline in the past, reset the stream immediately
// so writes after SetWriteDeadline returns will fail. // so writes after SetWriteDeadline returns will fail.
st.onWriteTimeout() st.onWriteTimeout()
@ -2803,9 +2847,9 @@ func (w *responseWriter) SetWriteDeadline(deadline time.Time) error {
if deadline.IsZero() { if deadline.IsZero() {
st.writeDeadline = nil st.writeDeadline = nil
} else if st.writeDeadline == nil { } else if st.writeDeadline == nil {
st.writeDeadline = time.AfterFunc(deadline.Sub(time.Now()), st.onWriteTimeout) st.writeDeadline = sc.srv.afterFunc(deadline.Sub(sc.srv.now()), st.onWriteTimeout)
} else { } else {
st.writeDeadline.Reset(deadline.Sub(time.Now())) st.writeDeadline.Reset(deadline.Sub(sc.srv.now()))
} }
}) })
return nil return nil

View file

@ -1,331 +0,0 @@
// Copyright 2024 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package http2
import (
"context"
"sync"
"time"
)
// testSyncHooks coordinates goroutines in tests.
//
// For example, a call to ClientConn.RoundTrip involves several goroutines, including:
// - the goroutine running RoundTrip;
// - the clientStream.doRequest goroutine, which writes the request; and
// - the clientStream.readLoop goroutine, which reads the response.
//
// Using testSyncHooks, a test can start a RoundTrip and identify when all these goroutines
// are blocked waiting for some condition such as reading the Request.Body or waiting for
// flow control to become available.
//
// The testSyncHooks also manage timers and synthetic time in tests.
// This permits us to, for example, start a request and cause it to time out waiting for
// response headers without resorting to time.Sleep calls.
type testSyncHooks struct {
// active/inactive act as a mutex and condition variable.
//
// - neither chan contains a value: testSyncHooks is locked.
// - active contains a value: unlocked, and at least one goroutine is not blocked
// - inactive contains a value: unlocked, and all goroutines are blocked
active chan struct{}
inactive chan struct{}
// goroutine counts
total int // total goroutines
condwait map[*sync.Cond]int // blocked in sync.Cond.Wait
blocked []*testBlockedGoroutine // otherwise blocked
// fake time
now time.Time
timers []*fakeTimer
// Transport testing: Report various events.
newclientconn func(*ClientConn)
newstream func(*clientStream)
}
// testBlockedGoroutine is a blocked goroutine.
type testBlockedGoroutine struct {
f func() bool // blocked until f returns true
ch chan struct{} // closed when unblocked
}
func newTestSyncHooks() *testSyncHooks {
h := &testSyncHooks{
active: make(chan struct{}, 1),
inactive: make(chan struct{}, 1),
condwait: map[*sync.Cond]int{},
}
h.inactive <- struct{}{}
h.now = time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC)
return h
}
// lock acquires the testSyncHooks mutex.
func (h *testSyncHooks) lock() {
select {
case <-h.active:
case <-h.inactive:
}
}
// waitInactive waits for all goroutines to become inactive.
func (h *testSyncHooks) waitInactive() {
for {
<-h.inactive
if !h.unlock() {
break
}
}
}
// unlock releases the testSyncHooks mutex.
// It reports whether any goroutines are active.
func (h *testSyncHooks) unlock() (active bool) {
// Look for a blocked goroutine which can be unblocked.
blocked := h.blocked[:0]
unblocked := false
for _, b := range h.blocked {
if !unblocked && b.f() {
unblocked = true
close(b.ch)
} else {
blocked = append(blocked, b)
}
}
h.blocked = blocked
// Count goroutines blocked on condition variables.
condwait := 0
for _, count := range h.condwait {
condwait += count
}
if h.total > condwait+len(blocked) {
h.active <- struct{}{}
return true
} else {
h.inactive <- struct{}{}
return false
}
}
// goRun starts a new goroutine.
func (h *testSyncHooks) goRun(f func()) {
h.lock()
h.total++
h.unlock()
go func() {
defer func() {
h.lock()
h.total--
h.unlock()
}()
f()
}()
}
// blockUntil indicates that a goroutine is blocked waiting for some condition to become true.
// It waits until f returns true before proceeding.
//
// Example usage:
//
// h.blockUntil(func() bool {
// // Is the context done yet?
// select {
// case <-ctx.Done():
// default:
// return false
// }
// return true
// })
// // Wait for the context to become done.
// <-ctx.Done()
//
// The function f passed to blockUntil must be non-blocking and idempotent.
func (h *testSyncHooks) blockUntil(f func() bool) {
if f() {
return
}
ch := make(chan struct{})
h.lock()
h.blocked = append(h.blocked, &testBlockedGoroutine{
f: f,
ch: ch,
})
h.unlock()
<-ch
}
// broadcast is sync.Cond.Broadcast.
func (h *testSyncHooks) condBroadcast(cond *sync.Cond) {
h.lock()
delete(h.condwait, cond)
h.unlock()
cond.Broadcast()
}
// broadcast is sync.Cond.Wait.
func (h *testSyncHooks) condWait(cond *sync.Cond) {
h.lock()
h.condwait[cond]++
h.unlock()
}
// newTimer creates a new fake timer.
func (h *testSyncHooks) newTimer(d time.Duration) timer {
h.lock()
defer h.unlock()
t := &fakeTimer{
hooks: h,
when: h.now.Add(d),
c: make(chan time.Time),
}
h.timers = append(h.timers, t)
return t
}
// afterFunc creates a new fake AfterFunc timer.
func (h *testSyncHooks) afterFunc(d time.Duration, f func()) timer {
h.lock()
defer h.unlock()
t := &fakeTimer{
hooks: h,
when: h.now.Add(d),
f: f,
}
h.timers = append(h.timers, t)
return t
}
func (h *testSyncHooks) contextWithTimeout(ctx context.Context, d time.Duration) (context.Context, context.CancelFunc) {
ctx, cancel := context.WithCancel(ctx)
t := h.afterFunc(d, cancel)
return ctx, func() {
t.Stop()
cancel()
}
}
func (h *testSyncHooks) timeUntilEvent() time.Duration {
h.lock()
defer h.unlock()
var next time.Time
for _, t := range h.timers {
if next.IsZero() || t.when.Before(next) {
next = t.when
}
}
if d := next.Sub(h.now); d > 0 {
return d
}
return 0
}
// advance advances time and causes synthetic timers to fire.
func (h *testSyncHooks) advance(d time.Duration) {
h.lock()
defer h.unlock()
h.now = h.now.Add(d)
timers := h.timers[:0]
for _, t := range h.timers {
t := t // remove after go.mod depends on go1.22
t.mu.Lock()
switch {
case t.when.After(h.now):
timers = append(timers, t)
case t.when.IsZero():
// stopped timer
default:
t.when = time.Time{}
if t.c != nil {
close(t.c)
}
if t.f != nil {
h.total++
go func() {
defer func() {
h.lock()
h.total--
h.unlock()
}()
t.f()
}()
}
}
t.mu.Unlock()
}
h.timers = timers
}
// A timer wraps a time.Timer, or a synthetic equivalent in tests.
// Unlike time.Timer, timer is single-use: The timer channel is closed when the timer expires.
type timer interface {
C() <-chan time.Time
Stop() bool
Reset(d time.Duration) bool
}
// timeTimer implements timer using real time.
type timeTimer struct {
t *time.Timer
c chan time.Time
}
// newTimeTimer creates a new timer using real time.
func newTimeTimer(d time.Duration) timer {
ch := make(chan time.Time)
t := time.AfterFunc(d, func() {
close(ch)
})
return &timeTimer{t, ch}
}
// newTimeAfterFunc creates an AfterFunc timer using real time.
func newTimeAfterFunc(d time.Duration, f func()) timer {
return &timeTimer{
t: time.AfterFunc(d, f),
}
}
func (t timeTimer) C() <-chan time.Time { return t.c }
func (t timeTimer) Stop() bool { return t.t.Stop() }
func (t timeTimer) Reset(d time.Duration) bool { return t.t.Reset(d) }
// fakeTimer implements timer using fake time.
type fakeTimer struct {
hooks *testSyncHooks
mu sync.Mutex
when time.Time // when the timer will fire
c chan time.Time // closed when the timer fires; mutually exclusive with f
f func() // called when the timer fires; mutually exclusive with c
}
func (t *fakeTimer) C() <-chan time.Time { return t.c }
func (t *fakeTimer) Stop() bool {
t.mu.Lock()
defer t.mu.Unlock()
stopped := t.when.IsZero()
t.when = time.Time{}
return stopped
}
func (t *fakeTimer) Reset(d time.Duration) bool {
if t.c != nil || t.f == nil {
panic("fakeTimer only supports Reset on AfterFunc timers")
}
t.mu.Lock()
defer t.mu.Unlock()
t.hooks.lock()
defer t.hooks.unlock()
active := !t.when.IsZero()
t.when = t.hooks.now.Add(d)
if !active {
t.hooks.timers = append(t.hooks.timers, t)
}
return active
}

20
vendor/golang.org/x/net/http2/timer.go generated vendored Normal file
View file

@ -0,0 +1,20 @@
// Copyright 2024 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package http2
import "time"
// A timer is a time.Timer, as an interface which can be replaced in tests.
type timer = interface {
C() <-chan time.Time
Reset(d time.Duration) bool
Stop() bool
}
// timeTimer adapts a time.Timer to the timer interface.
type timeTimer struct {
*time.Timer
}
func (t timeTimer) C() <-chan time.Time { return t.Timer.C }

View file

@ -185,7 +185,45 @@ type Transport struct {
connPoolOnce sync.Once connPoolOnce sync.Once
connPoolOrDef ClientConnPool // non-nil version of ConnPool connPoolOrDef ClientConnPool // non-nil version of ConnPool
syncHooks *testSyncHooks *transportTestHooks
}
// Hook points used for testing.
// Outside of tests, t.transportTestHooks is nil and these all have minimal implementations.
// Inside tests, see the testSyncHooks function docs.
type transportTestHooks struct {
newclientconn func(*ClientConn)
group synctestGroupInterface
}
func (t *Transport) markNewGoroutine() {
if t != nil && t.transportTestHooks != nil {
t.transportTestHooks.group.Join()
}
}
// newTimer creates a new time.Timer, or a synthetic timer in tests.
func (t *Transport) newTimer(d time.Duration) timer {
if t.transportTestHooks != nil {
return t.transportTestHooks.group.NewTimer(d)
}
return timeTimer{time.NewTimer(d)}
}
// afterFunc creates a new time.AfterFunc timer, or a synthetic timer in tests.
func (t *Transport) afterFunc(d time.Duration, f func()) timer {
if t.transportTestHooks != nil {
return t.transportTestHooks.group.AfterFunc(d, f)
}
return timeTimer{time.AfterFunc(d, f)}
}
func (t *Transport) contextWithTimeout(ctx context.Context, d time.Duration) (context.Context, context.CancelFunc) {
if t.transportTestHooks != nil {
return t.transportTestHooks.group.ContextWithTimeout(ctx, d)
}
return context.WithTimeout(ctx, d)
} }
func (t *Transport) maxHeaderListSize() uint32 { func (t *Transport) maxHeaderListSize() uint32 {
@ -352,60 +390,6 @@ type ClientConn struct {
werr error // first write error that has occurred werr error // first write error that has occurred
hbuf bytes.Buffer // HPACK encoder writes into this hbuf bytes.Buffer // HPACK encoder writes into this
henc *hpack.Encoder henc *hpack.Encoder
syncHooks *testSyncHooks // can be nil
}
// Hook points used for testing.
// Outside of tests, cc.syncHooks is nil and these all have minimal implementations.
// Inside tests, see the testSyncHooks function docs.
// goRun starts a new goroutine.
func (cc *ClientConn) goRun(f func()) {
if cc.syncHooks != nil {
cc.syncHooks.goRun(f)
return
}
go f()
}
// condBroadcast is cc.cond.Broadcast.
func (cc *ClientConn) condBroadcast() {
if cc.syncHooks != nil {
cc.syncHooks.condBroadcast(cc.cond)
}
cc.cond.Broadcast()
}
// condWait is cc.cond.Wait.
func (cc *ClientConn) condWait() {
if cc.syncHooks != nil {
cc.syncHooks.condWait(cc.cond)
}
cc.cond.Wait()
}
// newTimer creates a new time.Timer, or a synthetic timer in tests.
func (cc *ClientConn) newTimer(d time.Duration) timer {
if cc.syncHooks != nil {
return cc.syncHooks.newTimer(d)
}
return newTimeTimer(d)
}
// afterFunc creates a new time.AfterFunc timer, or a synthetic timer in tests.
func (cc *ClientConn) afterFunc(d time.Duration, f func()) timer {
if cc.syncHooks != nil {
return cc.syncHooks.afterFunc(d, f)
}
return newTimeAfterFunc(d, f)
}
func (cc *ClientConn) contextWithTimeout(ctx context.Context, d time.Duration) (context.Context, context.CancelFunc) {
if cc.syncHooks != nil {
return cc.syncHooks.contextWithTimeout(ctx, d)
}
return context.WithTimeout(ctx, d)
} }
// clientStream is the state for a single HTTP/2 stream. One of these // clientStream is the state for a single HTTP/2 stream. One of these
@ -487,7 +471,7 @@ func (cs *clientStream) abortStreamLocked(err error) {
// TODO(dneil): Clean up tests where cs.cc.cond is nil. // TODO(dneil): Clean up tests where cs.cc.cond is nil.
if cs.cc.cond != nil { if cs.cc.cond != nil {
// Wake up writeRequestBody if it is waiting on flow control. // Wake up writeRequestBody if it is waiting on flow control.
cs.cc.condBroadcast() cs.cc.cond.Broadcast()
} }
} }
@ -497,7 +481,7 @@ func (cs *clientStream) abortRequestBodyWrite() {
defer cc.mu.Unlock() defer cc.mu.Unlock()
if cs.reqBody != nil && cs.reqBodyClosed == nil { if cs.reqBody != nil && cs.reqBodyClosed == nil {
cs.closeReqBodyLocked() cs.closeReqBodyLocked()
cc.condBroadcast() cc.cond.Broadcast()
} }
} }
@ -507,10 +491,11 @@ func (cs *clientStream) closeReqBodyLocked() {
} }
cs.reqBodyClosed = make(chan struct{}) cs.reqBodyClosed = make(chan struct{})
reqBodyClosed := cs.reqBodyClosed reqBodyClosed := cs.reqBodyClosed
cs.cc.goRun(func() { go func() {
cs.cc.t.markNewGoroutine()
cs.reqBody.Close() cs.reqBody.Close()
close(reqBodyClosed) close(reqBodyClosed)
}) }()
} }
type stickyErrWriter struct { type stickyErrWriter struct {
@ -626,21 +611,7 @@ func (t *Transport) RoundTripOpt(req *http.Request, opt RoundTripOpt) (*http.Res
backoff := float64(uint(1) << (uint(retry) - 1)) backoff := float64(uint(1) << (uint(retry) - 1))
backoff += backoff * (0.1 * mathrand.Float64()) backoff += backoff * (0.1 * mathrand.Float64())
d := time.Second * time.Duration(backoff) d := time.Second * time.Duration(backoff)
var tm timer tm := t.newTimer(d)
if t.syncHooks != nil {
tm = t.syncHooks.newTimer(d)
t.syncHooks.blockUntil(func() bool {
select {
case <-tm.C():
case <-req.Context().Done():
default:
return false
}
return true
})
} else {
tm = newTimeTimer(d)
}
select { select {
case <-tm.C(): case <-tm.C():
t.vlogf("RoundTrip retrying after failure: %v", roundTripErr) t.vlogf("RoundTrip retrying after failure: %v", roundTripErr)
@ -725,8 +696,8 @@ func canRetryError(err error) bool {
} }
func (t *Transport) dialClientConn(ctx context.Context, addr string, singleUse bool) (*ClientConn, error) { func (t *Transport) dialClientConn(ctx context.Context, addr string, singleUse bool) (*ClientConn, error) {
if t.syncHooks != nil { if t.transportTestHooks != nil {
return t.newClientConn(nil, singleUse, t.syncHooks) return t.newClientConn(nil, singleUse)
} }
host, _, err := net.SplitHostPort(addr) host, _, err := net.SplitHostPort(addr)
if err != nil { if err != nil {
@ -736,7 +707,7 @@ func (t *Transport) dialClientConn(ctx context.Context, addr string, singleUse b
if err != nil { if err != nil {
return nil, err return nil, err
} }
return t.newClientConn(tconn, singleUse, nil) return t.newClientConn(tconn, singleUse)
} }
func (t *Transport) newTLSConfig(host string) *tls.Config { func (t *Transport) newTLSConfig(host string) *tls.Config {
@ -802,10 +773,10 @@ func (t *Transport) maxEncoderHeaderTableSize() uint32 {
} }
func (t *Transport) NewClientConn(c net.Conn) (*ClientConn, error) { func (t *Transport) NewClientConn(c net.Conn) (*ClientConn, error) {
return t.newClientConn(c, t.disableKeepAlives(), nil) return t.newClientConn(c, t.disableKeepAlives())
} }
func (t *Transport) newClientConn(c net.Conn, singleUse bool, hooks *testSyncHooks) (*ClientConn, error) { func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, error) {
cc := &ClientConn{ cc := &ClientConn{
t: t, t: t,
tconn: c, tconn: c,
@ -820,16 +791,12 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool, hooks *testSyncHoo
wantSettingsAck: true, wantSettingsAck: true,
pings: make(map[[8]byte]chan struct{}), pings: make(map[[8]byte]chan struct{}),
reqHeaderMu: make(chan struct{}, 1), reqHeaderMu: make(chan struct{}, 1),
syncHooks: hooks,
} }
if hooks != nil { if t.transportTestHooks != nil {
hooks.newclientconn(cc) t.markNewGoroutine()
t.transportTestHooks.newclientconn(cc)
c = cc.tconn c = cc.tconn
} }
if d := t.idleConnTimeout(); d != 0 {
cc.idleTimeout = d
cc.idleTimer = cc.afterFunc(d, cc.onIdleTimeout)
}
if VerboseLogs { if VerboseLogs {
t.vlogf("http2: Transport creating client conn %p to %v", cc, c.RemoteAddr()) t.vlogf("http2: Transport creating client conn %p to %v", cc, c.RemoteAddr())
} }
@ -893,7 +860,13 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool, hooks *testSyncHoo
return nil, cc.werr return nil, cc.werr
} }
cc.goRun(cc.readLoop) // Start the idle timer after the connection is fully initialized.
if d := t.idleConnTimeout(); d != 0 {
cc.idleTimeout = d
cc.idleTimer = t.afterFunc(d, cc.onIdleTimeout)
}
go cc.readLoop()
return cc, nil return cc, nil
} }
@ -901,7 +874,7 @@ func (cc *ClientConn) healthCheck() {
pingTimeout := cc.t.pingTimeout() pingTimeout := cc.t.pingTimeout()
// We don't need to periodically ping in the health check, because the readLoop of ClientConn will // We don't need to periodically ping in the health check, because the readLoop of ClientConn will
// trigger the healthCheck again if there is no frame received. // trigger the healthCheck again if there is no frame received.
ctx, cancel := cc.contextWithTimeout(context.Background(), pingTimeout) ctx, cancel := cc.t.contextWithTimeout(context.Background(), pingTimeout)
defer cancel() defer cancel()
cc.vlogf("http2: Transport sending health check") cc.vlogf("http2: Transport sending health check")
err := cc.Ping(ctx) err := cc.Ping(ctx)
@ -1144,7 +1117,8 @@ func (cc *ClientConn) Shutdown(ctx context.Context) error {
// Wait for all in-flight streams to complete or connection to close // Wait for all in-flight streams to complete or connection to close
done := make(chan struct{}) done := make(chan struct{})
cancelled := false // guarded by cc.mu cancelled := false // guarded by cc.mu
cc.goRun(func() { go func() {
cc.t.markNewGoroutine()
cc.mu.Lock() cc.mu.Lock()
defer cc.mu.Unlock() defer cc.mu.Unlock()
for { for {
@ -1156,9 +1130,9 @@ func (cc *ClientConn) Shutdown(ctx context.Context) error {
if cancelled { if cancelled {
break break
} }
cc.condWait() cc.cond.Wait()
} }
}) }()
shutdownEnterWaitStateHook() shutdownEnterWaitStateHook()
select { select {
case <-done: case <-done:
@ -1168,7 +1142,7 @@ func (cc *ClientConn) Shutdown(ctx context.Context) error {
cc.mu.Lock() cc.mu.Lock()
// Free the goroutine above // Free the goroutine above
cancelled = true cancelled = true
cc.condBroadcast() cc.cond.Broadcast()
cc.mu.Unlock() cc.mu.Unlock()
return ctx.Err() return ctx.Err()
} }
@ -1206,7 +1180,7 @@ func (cc *ClientConn) closeForError(err error) {
for _, cs := range cc.streams { for _, cs := range cc.streams {
cs.abortStreamLocked(err) cs.abortStreamLocked(err)
} }
cc.condBroadcast() cc.cond.Broadcast()
cc.mu.Unlock() cc.mu.Unlock()
cc.closeConn() cc.closeConn()
} }
@ -1321,23 +1295,30 @@ func (cc *ClientConn) roundTrip(req *http.Request, streamf func(*clientStream))
respHeaderRecv: make(chan struct{}), respHeaderRecv: make(chan struct{}),
donec: make(chan struct{}), donec: make(chan struct{}),
} }
cc.goRun(func() {
cs.doRequest(req) // TODO(bradfitz): this is a copy of the logic in net/http. Unify somewhere?
}) if !cc.t.disableCompression() &&
req.Header.Get("Accept-Encoding") == "" &&
req.Header.Get("Range") == "" &&
!cs.isHead {
// Request gzip only, not deflate. Deflate is ambiguous and
// not as universally supported anyway.
// See: https://zlib.net/zlib_faq.html#faq39
//
// Note that we don't request this for HEAD requests,
// due to a bug in nginx:
// http://trac.nginx.org/nginx/ticket/358
// https://golang.org/issue/5522
//
// We don't request gzip if the request is for a range, since
// auto-decoding a portion of a gzipped document will just fail
// anyway. See https://golang.org/issue/8923
cs.requestedGzip = true
}
go cs.doRequest(req, streamf)
waitDone := func() error { waitDone := func() error {
if cc.syncHooks != nil {
cc.syncHooks.blockUntil(func() bool {
select {
case <-cs.donec:
case <-ctx.Done():
case <-cs.reqCancel:
default:
return false
}
return true
})
}
select { select {
case <-cs.donec: case <-cs.donec:
return nil return nil
@ -1398,24 +1379,7 @@ func (cc *ClientConn) roundTrip(req *http.Request, streamf func(*clientStream))
return err return err
} }
if streamf != nil {
streamf(cs)
}
for { for {
if cc.syncHooks != nil {
cc.syncHooks.blockUntil(func() bool {
select {
case <-cs.respHeaderRecv:
case <-cs.abort:
case <-ctx.Done():
case <-cs.reqCancel:
default:
return false
}
return true
})
}
select { select {
case <-cs.respHeaderRecv: case <-cs.respHeaderRecv:
return handleResponseHeaders() return handleResponseHeaders()
@ -1445,8 +1409,9 @@ func (cc *ClientConn) roundTrip(req *http.Request, streamf func(*clientStream))
// doRequest runs for the duration of the request lifetime. // doRequest runs for the duration of the request lifetime.
// //
// It sends the request and performs post-request cleanup (closing Request.Body, etc.). // It sends the request and performs post-request cleanup (closing Request.Body, etc.).
func (cs *clientStream) doRequest(req *http.Request) { func (cs *clientStream) doRequest(req *http.Request, streamf func(*clientStream)) {
err := cs.writeRequest(req) cs.cc.t.markNewGoroutine()
err := cs.writeRequest(req, streamf)
cs.cleanupWriteRequest(err) cs.cleanupWriteRequest(err)
} }
@ -1457,7 +1422,7 @@ func (cs *clientStream) doRequest(req *http.Request) {
// //
// It returns non-nil if the request ends otherwise. // It returns non-nil if the request ends otherwise.
// If the returned error is StreamError, the error Code may be used in resetting the stream. // If the returned error is StreamError, the error Code may be used in resetting the stream.
func (cs *clientStream) writeRequest(req *http.Request) (err error) { func (cs *clientStream) writeRequest(req *http.Request, streamf func(*clientStream)) (err error) {
cc := cs.cc cc := cs.cc
ctx := cs.ctx ctx := cs.ctx
@ -1471,21 +1436,6 @@ func (cs *clientStream) writeRequest(req *http.Request) (err error) {
if cc.reqHeaderMu == nil { if cc.reqHeaderMu == nil {
panic("RoundTrip on uninitialized ClientConn") // for tests panic("RoundTrip on uninitialized ClientConn") // for tests
} }
var newStreamHook func(*clientStream)
if cc.syncHooks != nil {
newStreamHook = cc.syncHooks.newstream
cc.syncHooks.blockUntil(func() bool {
select {
case cc.reqHeaderMu <- struct{}{}:
<-cc.reqHeaderMu
case <-cs.reqCancel:
case <-ctx.Done():
default:
return false
}
return true
})
}
select { select {
case cc.reqHeaderMu <- struct{}{}: case cc.reqHeaderMu <- struct{}{}:
case <-cs.reqCancel: case <-cs.reqCancel:
@ -1510,28 +1460,8 @@ func (cs *clientStream) writeRequest(req *http.Request) (err error) {
} }
cc.mu.Unlock() cc.mu.Unlock()
if newStreamHook != nil { if streamf != nil {
newStreamHook(cs) streamf(cs)
}
// TODO(bradfitz): this is a copy of the logic in net/http. Unify somewhere?
if !cc.t.disableCompression() &&
req.Header.Get("Accept-Encoding") == "" &&
req.Header.Get("Range") == "" &&
!cs.isHead {
// Request gzip only, not deflate. Deflate is ambiguous and
// not as universally supported anyway.
// See: https://zlib.net/zlib_faq.html#faq39
//
// Note that we don't request this for HEAD requests,
// due to a bug in nginx:
// http://trac.nginx.org/nginx/ticket/358
// https://golang.org/issue/5522
//
// We don't request gzip if the request is for a range, since
// auto-decoding a portion of a gzipped document will just fail
// anyway. See https://golang.org/issue/8923
cs.requestedGzip = true
} }
continueTimeout := cc.t.expectContinueTimeout() continueTimeout := cc.t.expectContinueTimeout()
@ -1594,7 +1524,7 @@ func (cs *clientStream) writeRequest(req *http.Request) (err error) {
var respHeaderTimer <-chan time.Time var respHeaderTimer <-chan time.Time
var respHeaderRecv chan struct{} var respHeaderRecv chan struct{}
if d := cc.responseHeaderTimeout(); d != 0 { if d := cc.responseHeaderTimeout(); d != 0 {
timer := cc.newTimer(d) timer := cc.t.newTimer(d)
defer timer.Stop() defer timer.Stop()
respHeaderTimer = timer.C() respHeaderTimer = timer.C()
respHeaderRecv = cs.respHeaderRecv respHeaderRecv = cs.respHeaderRecv
@ -1603,21 +1533,6 @@ func (cs *clientStream) writeRequest(req *http.Request) (err error) {
// or until the request is aborted (via context, error, or otherwise), // or until the request is aborted (via context, error, or otherwise),
// whichever comes first. // whichever comes first.
for { for {
if cc.syncHooks != nil {
cc.syncHooks.blockUntil(func() bool {
select {
case <-cs.peerClosed:
case <-respHeaderTimer:
case <-respHeaderRecv:
case <-cs.abort:
case <-ctx.Done():
case <-cs.reqCancel:
default:
return false
}
return true
})
}
select { select {
case <-cs.peerClosed: case <-cs.peerClosed:
return nil return nil
@ -1766,7 +1681,7 @@ func (cc *ClientConn) awaitOpenSlotForStreamLocked(cs *clientStream) error {
return nil return nil
} }
cc.pendingRequests++ cc.pendingRequests++
cc.condWait() cc.cond.Wait()
cc.pendingRequests-- cc.pendingRequests--
select { select {
case <-cs.abort: case <-cs.abort:
@ -2028,7 +1943,7 @@ func (cs *clientStream) awaitFlowControl(maxBytes int) (taken int32, err error)
cs.flow.take(take) cs.flow.take(take)
return take, nil return take, nil
} }
cc.condWait() cc.cond.Wait()
} }
} }
@ -2311,7 +2226,7 @@ func (cc *ClientConn) forgetStreamID(id uint32) {
} }
// Wake up writeRequestBody via clientStream.awaitFlowControl and // Wake up writeRequestBody via clientStream.awaitFlowControl and
// wake up RoundTrip if there is a pending request. // wake up RoundTrip if there is a pending request.
cc.condBroadcast() cc.cond.Broadcast()
closeOnIdle := cc.singleUse || cc.doNotReuse || cc.t.disableKeepAlives() || cc.goAway != nil closeOnIdle := cc.singleUse || cc.doNotReuse || cc.t.disableKeepAlives() || cc.goAway != nil
if closeOnIdle && cc.streamsReserved == 0 && len(cc.streams) == 0 { if closeOnIdle && cc.streamsReserved == 0 && len(cc.streams) == 0 {
@ -2333,6 +2248,7 @@ type clientConnReadLoop struct {
// readLoop runs in its own goroutine and reads and dispatches frames. // readLoop runs in its own goroutine and reads and dispatches frames.
func (cc *ClientConn) readLoop() { func (cc *ClientConn) readLoop() {
cc.t.markNewGoroutine()
rl := &clientConnReadLoop{cc: cc} rl := &clientConnReadLoop{cc: cc}
defer rl.cleanup() defer rl.cleanup()
cc.readerErr = rl.run() cc.readerErr = rl.run()
@ -2399,7 +2315,7 @@ func (rl *clientConnReadLoop) cleanup() {
cs.abortStreamLocked(err) cs.abortStreamLocked(err)
} }
} }
cc.condBroadcast() cc.cond.Broadcast()
cc.mu.Unlock() cc.mu.Unlock()
} }
@ -2436,7 +2352,7 @@ func (rl *clientConnReadLoop) run() error {
readIdleTimeout := cc.t.ReadIdleTimeout readIdleTimeout := cc.t.ReadIdleTimeout
var t timer var t timer
if readIdleTimeout != 0 { if readIdleTimeout != 0 {
t = cc.afterFunc(readIdleTimeout, cc.healthCheck) t = cc.t.afterFunc(readIdleTimeout, cc.healthCheck)
} }
for { for {
f, err := cc.fr.ReadFrame() f, err := cc.fr.ReadFrame()
@ -3034,7 +2950,7 @@ func (rl *clientConnReadLoop) processSettingsNoWrite(f *SettingsFrame) error {
for _, cs := range cc.streams { for _, cs := range cc.streams {
cs.flow.add(delta) cs.flow.add(delta)
} }
cc.condBroadcast() cc.cond.Broadcast()
cc.initialWindowSize = s.Val cc.initialWindowSize = s.Val
case SettingHeaderTableSize: case SettingHeaderTableSize:
@ -3089,7 +3005,7 @@ func (rl *clientConnReadLoop) processWindowUpdate(f *WindowUpdateFrame) error {
return ConnectionError(ErrCodeFlowControl) return ConnectionError(ErrCodeFlowControl)
} }
cc.condBroadcast() cc.cond.Broadcast()
return nil return nil
} }
@ -3133,7 +3049,8 @@ func (cc *ClientConn) Ping(ctx context.Context) error {
} }
var pingError error var pingError error
errc := make(chan struct{}) errc := make(chan struct{})
cc.goRun(func() { go func() {
cc.t.markNewGoroutine()
cc.wmu.Lock() cc.wmu.Lock()
defer cc.wmu.Unlock() defer cc.wmu.Unlock()
if pingError = cc.fr.WritePing(false, p); pingError != nil { if pingError = cc.fr.WritePing(false, p); pingError != nil {
@ -3144,20 +3061,7 @@ func (cc *ClientConn) Ping(ctx context.Context) error {
close(errc) close(errc)
return return
} }
}) }()
if cc.syncHooks != nil {
cc.syncHooks.blockUntil(func() bool {
select {
case <-c:
case <-errc:
case <-ctx.Done():
case <-cc.readerDone:
default:
return false
}
return true
})
}
select { select {
case <-c: case <-c:
return nil return nil

View file

@ -443,8 +443,8 @@ func (ws *priorityWriteScheduler) addClosedOrIdleNode(list *[]*priorityNode, max
} }
func (ws *priorityWriteScheduler) removeNode(n *priorityNode) { func (ws *priorityWriteScheduler) removeNode(n *priorityNode) {
for k := n.kids; k != nil; k = k.next { for n.kids != nil {
k.setParent(n.parent) n.kids.setParent(n.parent)
} }
n.setParent(nil) n.setParent(nil)
delete(ws.nodes, n.id) delete(ws.nodes, n.id)

View file

@ -137,9 +137,7 @@ func (p *PerHost) AddNetwork(net *net.IPNet) {
// AddZone specifies a DNS suffix that will use the bypass proxy. A zone of // AddZone specifies a DNS suffix that will use the bypass proxy. A zone of
// "example.com" matches "example.com" and all of its subdomains. // "example.com" matches "example.com" and all of its subdomains.
func (p *PerHost) AddZone(zone string) { func (p *PerHost) AddZone(zone string) {
if strings.HasSuffix(zone, ".") { zone = strings.TrimSuffix(zone, ".")
zone = zone[:len(zone)-1]
}
if !strings.HasPrefix(zone, ".") { if !strings.HasPrefix(zone, ".") {
zone = "." + zone zone = "." + zone
} }
@ -148,8 +146,6 @@ func (p *PerHost) AddZone(zone string) {
// AddHost specifies a host name that will use the bypass proxy. // AddHost specifies a host name that will use the bypass proxy.
func (p *PerHost) AddHost(host string) { func (p *PerHost) AddHost(host string) {
if strings.HasSuffix(host, ".") { host = strings.TrimSuffix(host, ".")
host = host[:len(host)-1]
}
p.bypassHosts = append(p.bypassHosts, host) p.bypassHosts = append(p.bypassHosts, host)
} }

2
vendor/modules.txt vendored
View file

@ -1086,7 +1086,7 @@ golang.org/x/image/webp
golang.org/x/mod/internal/lazyregexp golang.org/x/mod/internal/lazyregexp
golang.org/x/mod/module golang.org/x/mod/module
golang.org/x/mod/semver golang.org/x/mod/semver
# golang.org/x/net v0.25.0 # golang.org/x/net v0.26.0
## explicit; go 1.18 ## explicit; go 1.18
golang.org/x/net/bpf golang.org/x/net/bpf
golang.org/x/net/context golang.org/x/net/context