Skip to content

Commit e72abc8

Browse files
authored
Merge pull request #4237 from norio-nomura/fix-closing-grpc-stream-in-guestagent
pkg/portfwdserver: Close `stream` by returning from the handler method
2 parents dce57ae + 34fa305 commit e72abc8

File tree

4 files changed

+43
-35
lines changed

4 files changed

+43
-35
lines changed

.github/workflows/test.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ jobs:
177177
- name: Integration tests (WSL2, Windows host)
178178
run: |
179179
$env:PATH = "$pwd\_output\bin;" + 'C:\msys64\usr\bin;' + $env:PATH
180-
pacman -Sy --noconfirm openbsd-netcat diffutils socat
180+
pacman -Sy --noconfirm openbsd-netcat diffutils socat w3m
181181
$env:MSYS2_ENV_CONV_EXCL = 'HOME_HOST;HOME_GUEST;_LIMA_WINDOWS_EXTRA_PATH'
182182
$env:HOME_HOST = $(cygpath.exe "$env:USERPROFILE")
183183
$env:HOME_GUEST = "/mnt$env:HOME_HOST"
@@ -206,7 +206,7 @@ jobs:
206206
- name: Integration tests (QEMU, Windows host)
207207
run: |
208208
$env:PATH = "$pwd\_output\bin;" + 'C:\msys64\usr\bin;' + 'C:\Program Files\QEMU;' + $env:PATH
209-
pacman -Sy --noconfirm openbsd-netcat diffutils socat
209+
pacman -Sy --noconfirm openbsd-netcat diffutils socat w3m
210210
$env:MSYS2_ENV_CONV_EXCL = 'HOME_HOST;HOME_GUEST;_LIMA_WINDOWS_EXTRA_PATH'
211211
$env:HOME_HOST = $(cygpath.exe "$env:USERPROFILE")
212212
$env:HOME_GUEST = "$env:HOME_HOST"

hack/test-templates.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -489,7 +489,7 @@ if [[ -n ${CHECKS["port-forwards"]} ]]; then
489489
limactl shell "$NAME" $sudo $CONTAINER_ENGINE rm -f nginx
490490
fi
491491
fi
492-
if [[ ${NAME} != "alpine"* && ${NAME} != "wsl2"* ]] && command -v w3m >/dev/null; then
492+
if [[ ${NAME} != "alpine"* ]] && command -v w3m >/dev/null; then
493493
INFO "Testing https://github.com/lima-vm/lima/issues/3685 ([gRPC portfwd] client connection is not closed immediately when server closed the connection)"
494494
# Skip the test on Alpine, as systemd-run is missing
495495
# Skip the test on WSL2, as port forwarding is half broken https://github.com/lima-vm/lima/pull/3686#issuecomment-3034842616

pkg/portfwd/client.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
func HandleTCPConnection(_ context.Context, dialContext func(ctx context.Context, network string, addr string) (net.Conn, error), conn net.Conn, guestAddr string) {
2222
proxy := tcpproxy.DialProxy{Addr: guestAddr, DialContext: dialContext}
2323
proxy.HandleConn(conn)
24+
logrus.Debugf("tcp proxy for guestAddr: %s closed", guestAddr)
2425
}
2526

2627
func HandleUDPConnection(ctx context.Context, dialContext func(ctx context.Context, network string, addr string) (net.Conn, error), conn net.PacketConn, guestAddr string) {
@@ -39,6 +40,7 @@ func HandleUDPConnection(ctx context.Context, dialContext func(ctx context.Conte
3940
}
4041
}()
4142
proxy.Run()
43+
logrus.Debugf("udp proxy for guestAddr: %s closed", guestAddr)
4244
}
4345

4446
func DialContextToGRPCTunnel(client *guestagentclient.GuestAgentClient) func(ctx context.Context, network, addr string) (net.Conn, error) {
@@ -97,6 +99,7 @@ func (g *GrpcClientRW) Read(p []byte) (n int, err error) {
9799
}
98100

99101
func (g *GrpcClientRW) Close() error {
102+
logrus.Debugf("closing GrpcClientRW for id: %s", g.id)
100103
return g.stream.CloseSend()
101104
}
102105

pkg/portfwdserver/server.go

Lines changed: 37 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,11 @@ import (
88
"errors"
99
"io"
1010
"net"
11-
"os"
12-
"strings"
1311
"time"
1412

1513
"github.com/containers/gvisor-tap-vsock/pkg/tcpproxy"
14+
"github.com/sirupsen/logrus"
1615

17-
"github.com/lima-vm/lima/v2/pkg/bicopy"
1816
"github.com/lima-vm/lima/v2/pkg/guestagent/api"
1917
)
2018

@@ -41,30 +39,30 @@ func (s *TunnelServer) Start(stream api.GuestService_TunnelServer) error {
4139
if err != nil {
4240
return err
4341
}
44-
rw := &GRPCServerRW{stream: stream, id: in.Id}
45-
46-
// FIXME: consolidate bicopy and tcpproxy into one
47-
//
48-
// The bicopy package does not seem to work with `w3m -dump`:
49-
// https://github.com/lima-vm/lima/issues/3685
50-
//
51-
// However, the tcpproxy package can't pass the CI for WSL2 (experimental):
52-
// https://github.com/lima-vm/lima/pull/3686#issuecomment-3034842616
53-
if wsl2, _ := seemsWSL2(); wsl2 {
54-
bicopy.Bicopy(rw, conn, nil)
55-
} else {
56-
proxy := tcpproxy.DialProxy{DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
57-
return conn, nil
58-
}}
59-
proxy.HandleConn(rw)
60-
}
42+
rw := &GRPCServerRW{stream: stream, id: in.Id, closeCh: make(chan any, 1)}
43+
go func() {
44+
<-ctx.Done()
45+
rw.Close()
46+
}()
47+
48+
proxy := tcpproxy.DialProxy{DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
49+
return conn, nil
50+
}}
51+
go proxy.HandleConn(rw)
52+
53+
// The stream will be closed when this function returns.
54+
// Wait here until rw.Close(), rw.CloseRead(), or rw.CloseWrite() is called.
55+
// We can't close rw.closeCh since the calling order of Close* methods is not guaranteed.
56+
<-rw.closeCh
57+
logrus.Debugf("closed GRPCServerRW for id: %s", in.Id)
6158

6259
return nil
6360
}
6461

6562
type GRPCServerRW struct {
66-
id string
67-
stream api.GuestService_TunnelServer
63+
id string
64+
stream api.GuestService_TunnelServer
65+
closeCh chan any
6866
}
6967

7068
var _ net.Conn = (*GRPCServerRW)(nil)
@@ -84,6 +82,23 @@ func (g *GRPCServerRW) Read(p []byte) (n int, err error) {
8482
}
8583

8684
func (g *GRPCServerRW) Close() error {
85+
logrus.Debugf("closing GRPCServerRW for id: %s", g.id)
86+
g.closeCh <- struct{}{}
87+
return nil
88+
}
89+
90+
// By adding CloseRead and CloseWrite methods, GRPCServerRW can work with
91+
// other than containers/gvisor-tap-vsock/pkg/tcpproxy, e.g., inetaf/tcpproxy, bicopy.Bicopy.
92+
93+
func (g *GRPCServerRW) CloseRead() error {
94+
logrus.Debugf("closing read GRPCServerRW for id: %s", g.id)
95+
g.closeCh <- struct{}{}
96+
return nil
97+
}
98+
99+
func (g *GRPCServerRW) CloseWrite() error {
100+
logrus.Debugf("closing write GRPCServerRW for id: %s", g.id)
101+
g.closeCh <- struct{}{}
87102
return nil
88103
}
89104

@@ -106,13 +121,3 @@ func (g *GRPCServerRW) SetReadDeadline(_ time.Time) error {
106121
func (g *GRPCServerRW) SetWriteDeadline(_ time.Time) error {
107122
return nil
108123
}
109-
110-
// seemsWSL2 returns whether lima.env contains LIMA_CIDATA_VMTYPE=wsl2 .
111-
// This is a temporary workaround and has to be removed.
112-
func seemsWSL2() (bool, error) {
113-
b, err := os.ReadFile("/mnt/lima-cidata/lima.env")
114-
if err != nil {
115-
return false, err
116-
}
117-
return strings.Contains(string(b), "LIMA_CIDATA_VMTYPE=wsl2"), nil
118-
}

0 commit comments

Comments
 (0)