Skip to content

Commit bffc7d0

Browse files
authored
Fix memory leak in resampler (#322)
* Fix memory leak in resampler. Optimize it. * Switch to tparse for formatting test output. * Fix flaky media tests.
1 parent 9f51498 commit bffc7d0

File tree

6 files changed

+225
-69
lines changed

6 files changed

+225
-69
lines changed

.github/workflows/test.yaml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ jobs:
3737
with:
3838
go-version: ">=1.24.1"
3939

40-
- name: Set up gotestfmt
41-
run: go install github.com/gotesttools/gotestfmt/v2/cmd/gotestfmt@v2.5.0
40+
- name: Set up tparse
41+
run: go install github.com/mfridman/tparse@v0.17.0
4242

4343
- name: Static Check
4444
uses: dominikh/staticcheck-action@v1
@@ -50,7 +50,7 @@ jobs:
5050
- name: Test
5151
run: |
5252
set -euo pipefail
53-
go test -coverpkg=./... -race -json -v -coverprofile=coverage.out -covermode=atomic ./pkg/... 2>&1 | tee /tmp/gotest.log | gotestfmt
53+
go test -coverpkg=./... -race -json -v -coverprofile=coverage.out -covermode=atomic ./pkg/... 2>&1 | tee /tmp/gotest.log | tparse
5454
5555
- name: Pull LiveKit Docker image
5656
run: |
@@ -60,7 +60,7 @@ jobs:
6060
- name: Test E2E
6161
run: |
6262
set -euo pipefail
63-
go test -coverpkg=./... -json -v -coverprofile=coverage2.out -covermode=atomic ./test/integration/... 2>&1 | tee -a /tmp/gotest.log | gotestfmt
63+
go test -coverpkg=./... -json -v -coverprofile=coverage2.out -covermode=atomic ./test/integration/... 2>&1 | tee -a /tmp/gotest.log | tparse
6464
6565
- name: Upload test log
6666
uses: actions/upload-artifact@v4

go.mod

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ require (
2727
github.com/sirupsen/logrus v1.9.3
2828
github.com/stretchr/testify v1.10.0
2929
github.com/urfave/cli/v2 v2.27.2
30-
github.com/zaf/resample v1.5.0
3130
golang.org/x/exp v0.0.0-20250207012021-f9890c6ad9f3
3231
google.golang.org/protobuf v1.36.5
3332
gopkg.in/hraban/opus.v2 v2.0.0-20230925203106-0188a62cb302

go.sum

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -260,8 +260,6 @@ github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1/go.mod h1:Ohn+xnUBi
260260
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
261261
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
262262
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
263-
github.com/zaf/resample v1.5.0 h1:c3yumHrV1cJoED8ZY2Ai3cehS8s0mJSroA9/vMaUcho=
264-
github.com/zaf/resample v1.5.0/go.mod h1:e4yWalfgRccQrnZSrkIxTqmMCOPhTi1xvYpNpRIB13k=
265263
github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
266264
github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0=
267265
github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=

pkg/media/resample_soxr.go

Lines changed: 141 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,21 @@
1616

1717
package media
1818

19+
/*
20+
#cgo pkg-config: soxr
21+
#include <stdlib.h>
22+
#include <soxr.h>
23+
*/
24+
import "C"
25+
1926
import (
20-
"bytes"
21-
"encoding/binary"
27+
"errors"
2228
"fmt"
2329
"runtime"
30+
"slices"
2431
"sync"
25-
26-
"github.com/zaf/resample"
32+
"sync/atomic"
33+
"unsafe"
2734
)
2835

2936
func resampleSize(dstSampleRate, srcSampleRate int, srcSize int) int {
@@ -38,26 +45,12 @@ func resampleSize(dstSampleRate, srcSampleRate int, srcSize int) int {
3845
}
3946

4047
func resampleBuffer(dst PCM16Sample, dstSampleRate int, src PCM16Sample, srcSampleRate int) PCM16Sample {
41-
inbuf := make([]byte, len(src)*2)
42-
outbuf := bytes.NewBuffer(nil)
43-
outbuf.Grow(resampleSize(dstSampleRate, srcSampleRate, len(src)))
44-
r, err := resample.New(outbuf, float64(srcSampleRate), float64(dstSampleRate), 1, resample.I16, resample.Quick)
45-
if err != nil {
46-
panic(err)
47-
}
48-
for i, v := range src {
49-
binary.LittleEndian.PutUint16(inbuf[i*2:], uint16(v))
50-
}
51-
_, err = r.Write(inbuf)
52-
_ = r.Close()
48+
w := newResampleWriter(NewPCM16BufferWriter(&dst, dstSampleRate), srcSampleRate)
49+
err := w.WriteSample(src)
50+
_ = w.Close()
5351
if err != nil {
5452
panic(err)
5553
}
56-
buf := outbuf.Bytes()
57-
for i := range len(buf) / 2 {
58-
v := int16(binary.LittleEndian.Uint16(buf[i*2:]))
59-
dst = append(dst, v)
60-
}
6154
return dst
6255
}
6356

@@ -68,30 +61,26 @@ func newResampleWriter(w WriteCloser[PCM16Sample], sampleRate int) WriteCloser[P
6861
w: w,
6962
srcRate: srcRate,
7063
dstRate: dstRate,
71-
buffer: 0, // set larger buffer for better resampler quality
64+
buffer: 0, // set larger buffer for better resampler quality (see below)
7265
}
73-
quality := resample.Quick
66+
quality := int(C.SOXR_QQ)
7467
if srcRate > dstRate {
7568
if float64(srcRate)/float64(dstRate) > 3 {
76-
quality = resample.LowQ
69+
quality = int(C.SOXR_LQ)
7770
}
7871
}
7972
var err error
80-
r.r, err = resample.New(r, float64(srcRate), float64(dstRate), 1, resample.I16, quality)
73+
r.r, err = newSoxr(dstRate, srcRate, quality)
8174
if err != nil {
8275
panic(err)
8376
}
84-
runtime.AddCleanup(r, func(rr *resample.Resampler) {
85-
_ = rr.Close()
86-
}, r.r)
8777
return r
8878
}
8979

9080
type resampleWriter struct {
9181
mu sync.Mutex
9282
w WriteCloser[PCM16Sample]
93-
r *resample.Resampler
94-
inbuf []byte
83+
r *soxrResampler
9584
srcRate int
9685
dstRate int
9786
dstFrame int
@@ -113,9 +102,21 @@ func (w *resampleWriter) SampleRate() int {
113102
func (w *resampleWriter) Close() error {
114103
w.mu.Lock()
115104
defer w.mu.Unlock()
116-
_ = w.r.Close() // flush resampler's internal buffer
117-
_ = w.flush(0) // flush our own PCM frame buffer
118-
return w.w.Close()
105+
// Flush soxr buffer to our buffer.
106+
var err error
107+
w.buf, _, err = w.r.Resample(w.buf, nil)
108+
if err != nil {
109+
return err
110+
}
111+
// Close soxr resampler.
112+
_ = w.r.Close()
113+
// Flush our own PCM frame buffer to the underlying writer.
114+
_ = w.flush(0)
115+
err2 := w.w.Close()
116+
if err2 != nil {
117+
err = err2
118+
}
119+
return err
119120
}
120121

121122
func (w *resampleWriter) flush(minSize int) error {
@@ -147,37 +148,121 @@ func (w *resampleWriter) WriteSample(data PCM16Sample) error {
147148
}
148149
w.mu.Lock()
149150
defer w.mu.Unlock()
150-
if sz := len(data) * 2; cap(w.inbuf) < sz {
151-
w.inbuf = make([]byte, sz)
152-
} else {
153-
w.inbuf = w.inbuf[:sz]
154-
}
155-
for i, v := range data {
156-
binary.LittleEndian.PutUint16(w.inbuf[i*2:], uint16(v))
157-
}
158-
left := w.inbuf
159-
// Write converted input to the resampler's buffer.
160-
// It will call our own Write method which collects data into a frame buffer.
161-
for len(left) > 0 {
162-
n, err := w.r.Write(left)
163-
if err != nil {
164-
return err
165-
}
166-
left = left[n:]
151+
var err error
152+
// Write input to the resampler buffer.
153+
w.buf, _, err = w.r.Resample(w.buf, data)
154+
if err != nil {
155+
return err
167156
}
168157
// Resampler will likely return a short buffer in the first run. In that case, we emit no samples on the first call.
169158
// This will cause a one frame delay for each resampler. Flushing the sampler, however will lead to frame
170159
// discontinuity, and thus - distortions on the frame boundaries.
171-
172160
dstFrame := resampleSize(w.dstRate, w.srcRate, len(data))
173161
w.dstFrame = max(w.dstFrame, dstFrame)
174162
return w.flush(w.dstFrame * (1 + w.buffer))
175163
}
176164

177-
func (w *resampleWriter) Write(data []byte) (int, error) {
178-
for i := range len(data) / 2 {
179-
v := int16(binary.LittleEndian.Uint16(data[i*2:]))
180-
w.buf = append(w.buf, v)
165+
type soxrResampler struct {
166+
ptr C.soxr_t
167+
srcRate int
168+
dstRate int
169+
maxIn int
170+
done *atomic.Bool
171+
}
172+
173+
func soxrErr(e C.soxr_error_t) error {
174+
if e == nil {
175+
return nil
176+
}
177+
defer C.free(unsafe.Pointer(e))
178+
estr := C.GoString(e)
179+
switch estr {
180+
case "", "0":
181+
return nil
182+
}
183+
return errors.New(estr)
184+
}
185+
186+
func newSoxr(dstRate, srcRate int, quality int) (*soxrResampler, error) {
187+
ic := C.soxr_io_spec(C.SOXR_INT16_I, C.SOXR_INT16_I)
188+
qc := C.soxr_quality_spec(C.ulong(quality), 0)
189+
rc := C.soxr_runtime_spec(1) // 1 thread
190+
var e C.soxr_error_t
191+
p := C.soxr_create(C.double(srcRate), C.double(dstRate), 1, &e, &ic, &qc, &rc)
192+
err := soxrErr(e)
193+
if err != nil {
194+
return nil, err
195+
}
196+
// This variable helps avoid double-free on the soxr resampler ptr. See soxrCleanup.
197+
done := new(atomic.Bool)
198+
r := &soxrResampler{
199+
ptr: p,
200+
dstRate: dstRate,
201+
srcRate: srcRate,
202+
done: done,
203+
}
204+
runtime.AddCleanup(r, func(p C.soxr_t) {
205+
soxrCleanup(done, p)
206+
}, p)
207+
return r, nil
208+
}
209+
210+
func soxrCleanup(done *atomic.Bool, p C.soxr_t) {
211+
if done.CompareAndSwap(false, true) {
212+
C.soxr_delete(p)
213+
}
214+
}
215+
216+
func (r *soxrResampler) Close() error {
217+
if r.ptr == nil {
218+
return nil
219+
}
220+
soxrCleanup(r.done, r.ptr)
221+
r.ptr = nil
222+
return nil
223+
}
224+
225+
func (r *soxrResampler) Resample(out PCM16Sample, in PCM16Sample) (PCM16Sample, int, error) {
226+
if r.ptr == nil || r.done.Load() {
227+
return out, 0, errors.New("resampler is closed")
228+
}
229+
r.maxIn = max(r.maxIn, len(in))
230+
dstN := (len(in) * r.dstRate) / r.srcRate
231+
if dstN == 0 {
232+
dstN = max(
233+
(r.maxIn*r.dstRate)/r.srcRate,
234+
cap(out)-len(out),
235+
1024,
236+
)
237+
}
238+
// Make sure output has space for new samples. Length is still unchanged.
239+
out = slices.Grow(out, dstN)
240+
// Slice for the unused capacity, which we will write into.
241+
dst := out[len(out) : len(out)+dstN]
242+
total := 0
243+
// Always call at least once (for flush to work), thus not considering len(in) here.
244+
for len(dst) > 0 {
245+
var read, done C.size_t
246+
var e C.soxr_error_t
247+
if len(in) != 0 {
248+
e = C.soxr_process(r.ptr, C.soxr_in_t(unsafe.Pointer(&in[0])), C.size_t(len(in)), &read, C.soxr_out_t(unsafe.Pointer(&dst[0])), C.size_t(len(dst)), &done)
249+
} else {
250+
// Flush, no input.
251+
e = C.soxr_process(r.ptr, nil, 0, nil, C.soxr_out_t(unsafe.Pointer(&dst[0])), C.size_t(len(dst)), &done)
252+
read = 0
253+
}
254+
err := soxrErr(e)
255+
if err != nil {
256+
return out, 0, err
257+
}
258+
total += int(done)
259+
dst = dst[done:]
260+
in = in[read:]
261+
if len(in) == 0 {
262+
break
263+
}
181264
}
182-
return len(data), nil
265+
// Finally adjust the length to cover written data.
266+
out = out[:len(out)+total]
267+
return out, total, nil
183268
}

pkg/media/resample_test.go

Lines changed: 69 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ import (
2020
"encoding/hex"
2121
"fmt"
2222
"os"
23+
"runtime"
2324
"strconv"
25+
"strings"
2426
"sync"
2527
"testing"
2628

@@ -85,8 +87,7 @@ func TestResample(t *testing.T) {
8587
Buffer int
8688
Bumps []int
8789
}{
88-
{
89-
Rate: 16000, Skip: 0},
90+
{Rate: 16000, Skip: 0},
9091
{
9192
Rate: 8000, Skip: 2,
9293
Buffer: 3,
@@ -175,3 +176,69 @@ func writePCM16sWebm(t testing.TB, path string, rate int, buf []media.PCM16Sampl
175176
require.NoError(t, err)
176177
}
177178
}
179+
180+
func memstats(pid int) int64 {
181+
data, err := os.ReadFile(fmt.Sprintf("/proc/%d/statm", pid))
182+
if err != nil {
183+
panic(err)
184+
}
185+
fields := strings.Fields(string(data))
186+
v, err := strconv.ParseInt(fields[1], 10, 64)
187+
if err != nil {
188+
panic(err)
189+
}
190+
return v
191+
}
192+
193+
func TestResampleLeak(t *testing.T) {
194+
if runtime.GOOS == "windows" {
195+
t.Skip("windows is not supported for this test")
196+
}
197+
pid := os.Getpid()
198+
199+
const (
200+
srcFile = "resample.src.s16le"
201+
srcFileWebm = "resample.src.mka"
202+
srcRate = res.SampleRate
203+
)
204+
srcFrames := res.ReadOggAudioFile(testdata.TestAudioOgg)
205+
gotSrc := writePCM16s(t, srcFile, srcFrames)
206+
writePCM16sWebm(t, srcFileWebm, srcRate, srcFrames)
207+
require.True(t, gotSrc == "c774af95" || gotSrc == "b04a6a1f")
208+
209+
const (
210+
runs = 300
211+
pagesThreshold = 100
212+
)
213+
startMem := memstats(pid)
214+
captureStats := func(run int) {
215+
if t.Failed() {
216+
return
217+
}
218+
runtime.GC()
219+
220+
endMem := memstats(pid)
221+
diff := endMem - startMem
222+
if diff > pagesThreshold {
223+
t.Fatalf("resampler memory leak detected (%d Kb / %d runs)", diff, run)
224+
}
225+
}
226+
defer captureStats(runs)
227+
228+
for run := range runs {
229+
func() {
230+
defer captureStats(run + 1)
231+
var dstFrames []media.PCM16Sample
232+
dst := media.NewPCM16FrameWriter(&dstFrames, 16000)
233+
r := media.ResampleWriter(dst, srcRate)
234+
// This test the cleanup function, so intentionally avoid Close.
235+
//defer r.Close()
236+
totalSamples := 0
237+
for _, src := range srcFrames {
238+
err := r.WriteSample(src)
239+
require.NoError(t, err)
240+
totalSamples += len(src)
241+
}
242+
}()
243+
}
244+
}

0 commit comments

Comments
 (0)