Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 10 additions & 15 deletions src/runtime/mprof.go
Original file line number Diff line number Diff line change
Expand Up @@ -1261,7 +1261,7 @@ func goroutineProfileWithLabels(p []profilerecord.StackRecord, labels []unsafe.P

//go:linkname pprof_goroutineLeakProfileWithLabels
func pprof_goroutineLeakProfileWithLabels(p []profilerecord.StackRecord, labels []unsafe.Pointer) (n int, ok bool) {
return goroutineLeakProfileWithLabelsConcurrent(p, labels)
return goroutineLeakProfileWithLabels(p, labels)
}

// labels may be nil. If labels is non-nil, it must have the same length as p.
Expand Down Expand Up @@ -1323,38 +1323,33 @@ func goroutineLeakProfileWithLabelsConcurrent(p []profilerecord.StackRecord, lab
return work.goroutineLeak.count, false
}

// Use the same semaphore as goroutineProfileWithLabelsConcurrent,
// because ultimately we still use goroutine profiles.
semacquire(&goroutineProfile.sema)

// Unlike in goroutineProfileWithLabelsConcurrent, we don't need to
// save the current goroutine stack, because it is obviously not leaked.

pcbuf := makeProfStack() // see saveg() for explanation

// Prepare a profile large enough to store all leaked goroutines.
n = work.goroutineLeak.count

if n > len(p) {
// There's not enough space in p to store the whole profile, so (per the
// contract of runtime.GoroutineProfile) we're not allowed to write to p
// at all and must return n, false.
semrelease(&goroutineProfile.sema)
// There's not enough space in p to store the whole profile, so
// we're not allowed to write to p at all and must return n, false.
return n, false
}

// Visit each leaked goroutine and try to record its stack.
var offset int
forEachGRace(func(gp1 *g) {
if readgstatus(gp1) == _Gleaked {
doRecordGoroutineProfile(gp1, pcbuf)
if readgstatus(gp1)&^_Gscan == _Gleaked {
systemstack(func() { saveg(^uintptr(0), ^uintptr(0), gp1, &p[offset], pcbuf) })
if labels != nil {
labels[offset] = gp1.labels
}
offset++
}
})

if raceenabled {
raceacquire(unsafe.Pointer(&labelSync))
}

semrelease(&goroutineProfile.sema)
return n, true
}

Expand Down
34 changes: 34 additions & 0 deletions src/runtime/pprof/pprof.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,31 @@ var mutexProfile = &Profile{
write: writeMutex,
}

// goroutineLeakProfileLock ensures that the goroutine leak profile writer observes the
// leaked goroutines discovered during the goroutine leak detection GC cycle
// that was triggered by the profile request.
// This prevents a race condition between the garbage collector and the profile writer
// when multiple profile requests are issued concurrently: the status of leaked goroutines
// is reset to _Gwaiting at the beginning of a leak detection cycle, which may lead the
// profile writer of another concurrent request to produce an incomplete profile.
//
// Example trace:
//
// G1 | GC | G2
// ----------------------+-----------------------------+---------------------
// Request profile | . | .
// . | . | Request profile
// . | [G1] Resets leaked g status | .
// . | [G1] Leaks detected | .
// . | <New cycle> | .
// . | [G2] Resets leaked g status | .
// Write profile | . | .
// . | [G2] Leaks detected | .
// . | . | Write profile
// ----------------------+-----------------------------+---------------------
// Incomplete profile |+++++++++++++++++++++++++++++| Complete profile
var goroutineLeakProfileLock sync.Mutex

func lockProfiles() {
profiles.mu.Lock()
if profiles.m == nil {
Expand Down Expand Up @@ -763,6 +788,15 @@ func writeGoroutine(w io.Writer, debug int) error {
// writeGoroutineLeak first invokes a GC cycle that performs goroutine leak detection.
// It then writes the goroutine profile, filtering for leaked goroutines.
func writeGoroutineLeak(w io.Writer, debug int) error {
// Acquire the goroutine leak detection lock and release
// it after the goroutine leak profile is written.
//
// While the critical section is long, this is needed to prevent
// a race condition between the garbage collector and the goroutine
// leak profile writer when multiple profile requests are issued concurrently.
goroutineLeakProfileLock.Lock()
defer goroutineLeakProfileLock.Unlock()

// Run the GC with leak detection first so that leaked goroutines
// may transition to the leaked state.
runtime_goroutineLeakGC()
Expand Down
210 changes: 209 additions & 1 deletion src/runtime/pprof/pprof_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -774,7 +774,11 @@ func TestMorestack(t *testing.T) {
for {
go func() {
growstack1()
c <- true
// NOTE(vsaioc): This goroutine may leak without this select.
select {
case c <- true:
case <-time.After(duration):
}
}()
select {
case <-t:
Expand Down Expand Up @@ -1565,6 +1569,210 @@ func containsCountsLabels(prof *profile.Profile, countLabels map[int64]map[strin
return true
}

func goroutineLeakExample() {
<-make(chan struct{})
panic("unreachable")
}

func TestGoroutineLeakProfileConcurrency(t *testing.T) {
const leakCount = 3

testenv.MustHaveParallelism(t)
regexLeakCount := regexp.MustCompile("goroutineleak profile: total ")
whiteSpace := regexp.MustCompile("\\s+")

// Regular goroutine profile. Used to check that there is no interference between
// the two profile types.
goroutineProf := Lookup("goroutine")
goroutineLeakProf := goroutineLeakProfile

// Check that a profile with debug information contains
includesLeak := func(t *testing.T, name, s string) {
if !strings.Contains(s, "runtime/pprof.goroutineLeakExample") {
t.Errorf("%s profile does not contain expected leaked goroutine (runtime/pprof.goroutineLeakExample): %s", name, s)
}
}

checkFrame := func(i int, j int, locations []*profile.Location, expectedFunctionName string) {
if len(locations) <= i {
t.Errorf("leaked goroutine stack locations out of range at %d of %d", i+1, len(locations))
return
}
location := locations[i]
if len(location.Line) <= j {
t.Errorf("leaked goroutine stack location lines out of range at %d of %d", j+1, len(location.Line))
return
}
if location.Line[j].Function.Name != expectedFunctionName {
t.Errorf("leaked goroutine stack expected %s as the location[%d].Line[%d] but found %s (%s:%d)", expectedFunctionName, i, j, location.Line[j].Function.Name, location.Line[j].Function.Filename, location.Line[j].Line)
}
}

// We use this helper to count the total number of leaked goroutines in the profile.
//
// NOTE(vsaioc): This value should match for the number of leaks produced in this test,
// but other tests could also leak goroutines, in which case we would have a mismatch
// when bulk-running tests.
//
// The two mismatching outcomes are therefore:
// - More leaks than expected, which is a correctness issue with other tests.
// In this case, this test effectively checks other tests wrt
// goroutine leaks during bulk executions (e.g., running all.bash).
//
// - Fewer leaks than expected; this is an unfortunate symptom of scheduling
// non-determinism, which may occur once in a blue moon. We make
// a best-effort attempt to allow the expected leaks to occur, by yielding
// the main thread, but it is never a guarantee.
countLeaks := func(t *testing.T, number int, s string) {
// Strip the profile header
parts := regexLeakCount.Split(s, -1)
if len(parts) < 2 {
t.Fatalf("goroutineleak profile does not contain 'goroutineleak profile: total ': %s\nparts: %v", s, parts)
return
}

parts = whiteSpace.Split(parts[1], -1)

count, err := strconv.ParseInt(parts[0], 10, 64)
if err != nil {
t.Fatalf("goroutineleak profile count is not a number: %s\nerror: %v", s, err)
}

// Check that the total number of leaked goroutines is exactly the expected number.
if count != int64(number) {
t.Errorf("goroutineleak profile does not contain exactly %d leaked goroutines: %d", number, count)
}
}

checkLeakStack := func(t *testing.T) func(pc uintptr, locations []*profile.Location, _ map[string][]string) {
return func(pc uintptr, locations []*profile.Location, _ map[string][]string) {
if pc != leakCount {
t.Errorf("expected %d leaked goroutines with specific stack configurations, but found %d", leakCount, pc)
return
}
switch len(locations) {
case 4:
// We expect a receive operation. This is the typical stack.
checkFrame(0, 0, locations, "runtime.gopark")
checkFrame(1, 0, locations, "runtime.chanrecv")
checkFrame(2, 0, locations, "runtime.chanrecv1")
switch len(locations[3].Line) {
case 2:
// Running `go func() { goroutineLeakExample() }()` will produce a stack with 2 lines.
// The anonymous function will have the call to goroutineLeakExample inlined.
checkFrame(3, 1, locations, "runtime/pprof.TestGoroutineLeakProfileConcurrency.func5")
fallthrough
case 1:
// Running `go goroutineLeakExample()` will produce a stack with 1 line.
checkFrame(3, 0, locations, "runtime/pprof.goroutineLeakExample")
default:
t.Errorf("leaked goroutine stack location expected 1 or 2 lines in the 4th location but found %d", len(locations[3].Line))
return
}
default:
message := fmt.Sprintf("leaked goroutine stack expected 4 or 5 locations but found %d", len(locations))
for _, location := range locations {
for _, line := range location.Line {
message += fmt.Sprintf("\n%s:%d", line.Function.Name, line.Line)
}
}
t.Errorf("%s", message)
}
}
}
// Leak some goroutines that will feature in the goroutine leak profile
for i := 0; i < leakCount; i++ {
go goroutineLeakExample()
go func() {
// Leak another goroutine that will feature a slightly different stack.
// This includes the frame runtime/pprof.TestGoroutineLeakProfileConcurrency.func1.
goroutineLeakExample()
panic("unreachable")
}()
// Yield several times to allow the goroutines to leak.
runtime.Gosched()
runtime.Gosched()
}

// Give all goroutines a chance to leak.
time.Sleep(time.Second)

t.Run("profile contains leak", func(t *testing.T) {
var w strings.Builder
goroutineLeakProf.WriteTo(&w, 0)
parseProfile(t, []byte(w.String()), checkLeakStack(t))
})

t.Run("leak persists between sequential profiling runs", func(t *testing.T) {
for i := 0; i < 2; i++ {
var w strings.Builder
goroutineLeakProf.WriteTo(&w, 0)
parseProfile(t, []byte(w.String()), checkLeakStack(t))
}
})

// Concurrent calls to the goroutine leak profiler should not trigger data races
// or corruption.
t.Run("overlapping profile requests", func(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()

var wg sync.WaitGroup
for i := 0; i < 2; i++ {
wg.Add(1)
Do(ctx, Labels("i", fmt.Sprint(i)), func(context.Context) {
go func() {
defer wg.Done()
for ctx.Err() == nil {
var w strings.Builder
goroutineLeakProf.WriteTo(&w, 1)
countLeaks(t, 2*leakCount, w.String())
includesLeak(t, "goroutineleak", w.String())
}
}()
})
}
wg.Wait()
})

// Concurrent calls to the goroutine leak profiler should not trigger data races
// or corruption, or interfere with regular goroutine profiles.
t.Run("overlapping goroutine and goroutine leak profile requests", func(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()

var wg sync.WaitGroup
for i := 0; i < 2; i++ {
wg.Add(2)
Do(ctx, Labels("i", fmt.Sprint(i)), func(context.Context) {
go func() {
defer wg.Done()
for ctx.Err() == nil {
var w strings.Builder
goroutineLeakProf.WriteTo(&w, 1)
countLeaks(t, 2*leakCount, w.String())
includesLeak(t, "goroutineleak", w.String())
}
}()
go func() {
defer wg.Done()
for ctx.Err() == nil {
var w strings.Builder
goroutineProf.WriteTo(&w, 1)
// The regular goroutine profile should see the leaked
// goroutines. We simply check that the goroutine leak
// profile does not corrupt the goroutine profile state.
includesLeak(t, "goroutine", w.String())
}
}()
})
}
wg.Wait()
})
}

func TestGoroutineProfileConcurrency(t *testing.T) {
testenv.MustHaveParallelism(t)

Expand Down