Skip to content

Commit 0d9c125

Browse files
author
Achille Roussel
committed
add files
1 parent 2a3d0db commit 0d9c125

File tree

15 files changed

+3112
-0
lines changed

15 files changed

+3112
-0
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,6 @@
1313

1414
# Dependency directories (remove the comment below to include it)
1515
# vendor/
16+
17+
# Emacs
18+
*~

README.md

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,52 @@
11
# kubectl-curl
2+
23
Kubectl plugin to run curl commands against kubernetes pods
4+
5+
## Motivation
6+
7+
Sending http requests to kubernetes pods is unnecessarily complicated, this plugin makes it easy.
8+
9+
## Installation
10+
11+
If `$GOPATH/bin` is in the `PATH`, the plugin can be installed with:
12+
```
13+
$ go install github.com/segmentio/kubectl-curl
14+
```
15+
16+
If it was installed properly, it will be visibile when listing kubectl plugins:
17+
```
18+
$ kubectl plugin list
19+
The following compatible plugins are available:
20+
21+
/.../kubectl-curl
22+
```
23+
24+
## Usage
25+
26+
```
27+
kubectl curl [options] URL [container]
28+
```
29+
30+
* In the URL, the host part must be the name of the pod to send the request to.
31+
* If no port number is specified, the request will be sent to a `http` port.
32+
* If there are multiple containers with a `http` port, the name of the container to send to the request to must be specified after the URL.
33+
34+
## Examples
35+
36+
This section records common use cases for this kubectl plugin.
37+
38+
### Collecting profiles of Go programs
39+
40+
```
41+
$ kubectl curl "http://{pod}/debug/pprof/profile?debug=1&seconds=10" > ./profile
42+
$ go tool pprof -http :6060 ./profile
43+
```
44+
45+
* Full documentation: [net/http/pprof](https://pkg.go.dev/net/http/pprof)
46+
47+
### Retrieving prometheus metrics
48+
49+
```
50+
$ kubectl curl http://{pod}/metrics
51+
...
52+
```

curl.go

Lines changed: 316 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,316 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"io"
7+
"log"
8+
"math/rand"
9+
"net"
10+
"net/http"
11+
"net/url"
12+
"os"
13+
"os/signal"
14+
"strconv"
15+
"strings"
16+
"sync"
17+
"time"
18+
19+
"github.com/segmentio/kubectl-curl/curl"
20+
"github.com/spf13/pflag"
21+
corev1 "k8s.io/api/core/v1"
22+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
23+
"k8s.io/apimachinery/pkg/util/runtime"
24+
"k8s.io/cli-runtime/pkg/genericclioptions"
25+
"k8s.io/client-go/kubernetes"
26+
_ "k8s.io/client-go/plugin/pkg/client/auth"
27+
"k8s.io/client-go/rest"
28+
"k8s.io/client-go/tools/portforward"
29+
"k8s.io/client-go/transport/spdy"
30+
)
31+
32+
var (
33+
curlOptions = curl.NewOptionSet()
34+
35+
help bool
36+
debug bool
37+
usage string
38+
flags *pflag.FlagSet
39+
config *genericclioptions.ConfigFlags
40+
)
41+
42+
func init() {
43+
runtime.ErrorHandlers = nil // disables default kubernetes error logging
44+
rand.Seed(time.Now().UnixNano())
45+
46+
log.SetOutput(os.Stderr)
47+
log.SetPrefix("* ")
48+
49+
flags = pflag.NewFlagSet("kubectl curl", pflag.ExitOnError)
50+
flags.BoolVarP(&help, "help", "h", false, "Prints the kubectl pprof help.")
51+
flags.BoolVarP(&debug, "debug", "", false, "Enable debug mode to print more details about the kubectl command execution.")
52+
53+
for _, opt := range curlOptions {
54+
name := strings.TrimPrefix(opt.Name, "--")
55+
short := strings.TrimPrefix(opt.Short, "-")
56+
57+
// Rewrite option names that conflict with the kubectl default options:
58+
switch name {
59+
case "user": // * Change curl's "--user" option to "--userinfo"
60+
name = "userinfo"
61+
}
62+
63+
switch short {
64+
case "n", "s":
65+
// Remove short names that conflict with the kubectl default options:
66+
// * "-n" conflicts between kubectl's "--namespace" and curl's "--netrc"
67+
// * "-s" conflicts between kubectl's "--server" and curl's "--silent"
68+
short = ""
69+
}
70+
71+
flags.VarP(opt.Value, name, short, opt.Help)
72+
}
73+
74+
config = genericclioptions.NewConfigFlags(false)
75+
config.AddFlags(flags)
76+
usage = flags.FlagUsages()
77+
}
78+
79+
func main() {
80+
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
81+
defer stop()
82+
83+
if err := run(ctx); err != nil {
84+
fmt.Fprintf(os.Stderr, "* ERROR: %s", err)
85+
os.Exit(1)
86+
}
87+
}
88+
89+
func run(ctx context.Context) error {
90+
flags.Parse(os.Args[1:])
91+
92+
if help {
93+
printUsage()
94+
return nil
95+
}
96+
97+
var stdout io.Writer
98+
var stderr io.Writer
99+
if debug {
100+
stdout = os.Stdout
101+
stderr = os.Stderr
102+
} else {
103+
log.SetOutput(io.Discard)
104+
}
105+
106+
var args = flags.Args()
107+
var query string
108+
var containerName string
109+
switch len(args) {
110+
case 2:
111+
query, containerName = args[0], args[1]
112+
case 1:
113+
query = args[0]
114+
default:
115+
return fmt.Errorf("too many arguments passed in the command line invocation of kubectl curl [URL] [container]")
116+
}
117+
118+
requestURL, err := url.Parse(query)
119+
if err != nil {
120+
return fmt.Errorf("malformed URL: %w", err)
121+
}
122+
switch requestURL.Scheme {
123+
case "http", "https":
124+
case "":
125+
return fmt.Errorf("missing scheme in query URL: %s", query)
126+
default:
127+
return fmt.Errorf("unsupposed scheme in query URL: %s", query)
128+
}
129+
130+
podName, podPort, err := net.SplitHostPort(requestURL.Host)
131+
if err != nil {
132+
podName = requestURL.Host
133+
podPort = ""
134+
}
135+
136+
kubeConfig := config.ToRawKubeConfigLoader()
137+
namespace, _, err := kubeConfig.Namespace()
138+
if err != nil {
139+
return err
140+
}
141+
restConfig, err := config.ToRESTConfig()
142+
if err != nil {
143+
return err
144+
}
145+
client, err := kubernetes.NewForConfig(restConfig)
146+
if err != nil {
147+
return err
148+
}
149+
150+
log.Printf("kubectl get -n %s pod/%s", namespace, podName)
151+
pod, err := client.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{})
152+
if err != nil {
153+
return err
154+
}
155+
if pod.Status.Phase != corev1.PodRunning {
156+
return fmt.Errorf("unable to forward port because pod is not running. Current status=%v", pod.Status.Phase)
157+
}
158+
159+
const minPort = 10200
160+
const maxPort = 16383
161+
localPort := rand.Int31n(maxPort-minPort) + minPort
162+
remotePort := int32(0)
163+
portName := requestURL.Scheme
164+
165+
if podPort != "" {
166+
p, err := strconv.ParseInt(podPort, 10, 32)
167+
if err != nil {
168+
portName = podPort
169+
} else {
170+
remotePort = int32(p)
171+
}
172+
}
173+
174+
if remotePort == 0 {
175+
selectedContainerName, selectedContainerPort, err := selectContainerPort(pod, containerName, portName)
176+
if err != nil {
177+
return err
178+
}
179+
containerName = selectedContainerName
180+
remotePort = selectedContainerPort.ContainerPort
181+
}
182+
183+
log.Printf("forwarding local port %d to port %d of %s", localPort, remotePort, containerName)
184+
ctx, cancel := context.WithCancel(ctx)
185+
defer cancel()
186+
187+
f, err := openPortForwarder(ctx, portForwarderConfig{
188+
config: restConfig,
189+
namespace: namespace,
190+
podName: podName,
191+
localPort: localPort,
192+
remotePort: remotePort,
193+
stdout: stdout,
194+
stderr: stderr,
195+
})
196+
if err != nil {
197+
return err
198+
}
199+
200+
wg := sync.WaitGroup{}
201+
defer wg.Wait()
202+
defer log.Printf("waiting for port forwarder to stop")
203+
204+
defer cancel()
205+
defer log.Printf("shutting down port forwarder")
206+
207+
wg.Add(1)
208+
go func() {
209+
defer wg.Done()
210+
defer f.Close()
211+
212+
if err := f.ForwardPorts(); err != nil {
213+
log.Print(err)
214+
}
215+
}()
216+
217+
log.Printf("waiting for port fowarding to be established")
218+
select {
219+
case <-f.Ready:
220+
case <-ctx.Done():
221+
return nil
222+
}
223+
224+
requestURL.Host = net.JoinHostPort("localhost", strconv.Itoa(int(localPort)))
225+
options := append(curlOptions,
226+
// The -s option is taken by -s,--server from the default kubectl
227+
// configuration. Force --silent because we don't really need to
228+
// print the dynamic progress view for the scenarios in which this
229+
// plugin is useful for.
230+
curl.Silent(true),
231+
)
232+
cmd := curl.Command(ctx, requestURL.String(), options...)
233+
cmd.Stdin = os.Stdin
234+
cmd.Stdout = os.Stdout
235+
cmd.Stderr = os.Stderr
236+
log.Printf("curl %s", strings.Join(cmd.Args[1:], "\n\t"))
237+
return cmd.Run()
238+
}
239+
240+
func selectContainerPort(pod *corev1.Pod, containerName, portName string) (selectedContainerName string, selectedContainerPort corev1.ContainerPort, err error) {
241+
for _, container := range pod.Spec.Containers {
242+
if containerName != "" && container.Name != containerName {
243+
continue
244+
}
245+
for _, port := range container.Ports {
246+
if port.Name != portName || port.Protocol != corev1.ProtocolTCP {
247+
continue
248+
}
249+
if selectedContainerPort.Name != "" {
250+
err = fmt.Errorf("pod %[1]s has multiple containers with a %[2]s port, use kubectl %[1]s [container] to specify which one to profile", pod.Name, portName)
251+
return
252+
}
253+
selectedContainerName = container.Name
254+
selectedContainerPort = port
255+
}
256+
}
257+
if selectedContainerPort.Name == "" {
258+
err = fmt.Errorf("pod %s had no containers exposing a %s port", pod.Name, portName)
259+
}
260+
return
261+
}
262+
263+
type portForwarderConfig struct {
264+
config *rest.Config
265+
namespace string
266+
podName string
267+
localPort int32
268+
remotePort int32
269+
stdout io.Writer
270+
stderr io.Writer
271+
}
272+
273+
func openPortForwarder(ctx context.Context, fwd portForwarderConfig) (*portforward.PortForwarder, error) {
274+
transport, upgrader, err := spdy.RoundTripperFor(fwd.config)
275+
if err != nil {
276+
return nil, err
277+
}
278+
279+
host := strings.TrimLeft(fwd.config.Host, "htps:/")
280+
path := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/portforward", fwd.namespace, fwd.podName)
281+
282+
client := &http.Client{
283+
Transport: transport,
284+
}
285+
286+
dialer := spdy.NewDialer(upgrader, client, http.MethodPost, &url.URL{
287+
Scheme: "https",
288+
Host: host,
289+
Path: path,
290+
})
291+
292+
ports := []string{
293+
fmt.Sprintf("%d:%d", fwd.localPort, fwd.remotePort),
294+
}
295+
296+
if fwd.stdout == nil {
297+
fwd.stdout = io.Discard
298+
}
299+
300+
if fwd.stderr == nil {
301+
fwd.stderr = io.Discard
302+
}
303+
304+
return portforward.New(dialer, ports, ctx.Done(), make(chan struct{}), fwd.stdout, fwd.stderr)
305+
}
306+
307+
func printUsage() {
308+
fmt.Printf(`Run curl against kubernetes pods
309+
310+
Usage:
311+
kubectl curl [options] URL [container]
312+
313+
Options:
314+
%s
315+
`, usage)
316+
}

0 commit comments

Comments
 (0)