|
6 | 6 | "os" |
7 | 7 | "os/exec" |
8 | 8 | "strings" |
| 9 | + "sync" |
9 | 10 |
|
10 | 11 | logutil "github.com/docker/infrakit/pkg/log" |
11 | 12 | "github.com/docker/infrakit/pkg/template" |
@@ -34,6 +35,28 @@ type Builder struct { |
34 | 35 | context interface{} |
35 | 36 | rendered string // rendered command string |
36 | 37 | cmd *exec.Cmd |
| 38 | + stdout io.Writer |
| 39 | + stderr io.Writer |
| 40 | + stdin io.Reader |
| 41 | + wg sync.WaitGroup |
| 42 | +} |
| 43 | + |
| 44 | +// WithStdin sets the stdin reader |
| 45 | +func (b *Builder) WithStdin(r io.Reader) *Builder { |
| 46 | + b.stdin = r |
| 47 | + return b |
| 48 | +} |
| 49 | + |
| 50 | +// WithStdout sets the stdout writer |
| 51 | +func (b *Builder) WithStdout(w io.Writer) *Builder { |
| 52 | + b.stdout = w |
| 53 | + return b |
| 54 | +} |
| 55 | + |
| 56 | +// WithStderr sets the stderr writer |
| 57 | +func (b *Builder) WithStderr(w io.Writer) *Builder { |
| 58 | + b.stdout = w |
| 59 | + return b |
37 | 60 | } |
38 | 61 |
|
39 | 62 | // WithArg sets the arg key, value pair that can be accessed via the 'arg' function |
@@ -80,120 +103,94 @@ func (b *Builder) WithContext(context interface{}) *Builder { |
80 | 103 | return b |
81 | 104 | } |
82 | 105 |
|
83 | | -// Step is something you do with the processes streams |
84 | | -type Step func(stdin io.WriteCloser, stdout io.ReadCloser, stderr io.ReadCloser) error |
85 | | - |
86 | | -// Thenable is a fluent builder for chaining tasks |
87 | | -type Thenable struct { |
88 | | - steps []Step |
89 | | -} |
| 106 | +var noop = func() error { return nil } |
90 | 107 |
|
91 | | -// Do creates a thenable |
92 | | -func Do(f Step) *Thenable { |
93 | | - return &Thenable{ |
94 | | - steps: []Step{f}, |
95 | | - } |
96 | | -} |
97 | | - |
98 | | -// Then adds another step |
99 | | -func (t *Thenable) Then(then Step) *Thenable { |
100 | | - t.steps = append(t.steps, then) |
101 | | - return t |
102 | | -} |
103 | | - |
104 | | -// Done returns the final function |
105 | | -func (t *Thenable) Done() Step { |
106 | | - all := t.steps |
107 | | - return func(stdin io.WriteCloser, stdout, stderr io.ReadCloser) error { |
108 | | - for _, next := range all { |
109 | | - if err := next(stdin, stdout, stderr); err != nil { |
110 | | - return err |
111 | | - } |
112 | | - } |
113 | | - return nil |
114 | | - } |
115 | | -} |
116 | | - |
117 | | -// SendInput is a convenience function for writing to the exec process's stdin. When the function completes, the |
118 | | -// stdin is closed. |
119 | | -func SendInput(f func(io.WriteCloser) error) Step { |
120 | | - return func(stdin io.WriteCloser, stdout, stderr io.ReadCloser) error { |
121 | | - defer stdin.Close() |
122 | | - return f(stdin) |
123 | | - } |
124 | | -} |
125 | | - |
126 | | -// RedirectStdout sends stdout to given writer |
127 | | -func RedirectStdout(out io.Writer) Step { |
128 | | - return func(stdin io.WriteCloser, stdout, stderr io.ReadCloser) error { |
129 | | - _, err := io.Copy(out, stdout) |
130 | | - return err |
131 | | - } |
132 | | -} |
133 | | - |
134 | | -// RedirectStderr sends stdout to given writer |
135 | | -func RedirectStderr(out io.Writer) Step { |
136 | | - return func(stdin io.WriteCloser, stdout, stderr io.ReadCloser) error { |
137 | | - _, err := io.Copy(out, stderr) |
138 | | - return err |
139 | | - } |
140 | | -} |
141 | | - |
142 | | -// MergeOutput combines the stdout and stderr into the given stream |
143 | | -func MergeOutput(out io.Writer) Step { |
144 | | - return func(stdin io.WriteCloser, stdout, stderr io.ReadCloser) error { |
145 | | - _, err := io.Copy(out, io.MultiReader(stdout, stderr)) |
146 | | - return err |
147 | | - } |
148 | | -} |
149 | | - |
150 | | -// StartWithStreams starts the the process and then calls the function which allows |
151 | | -// the streams to be wired. Calling the provided function blocks. |
152 | | -func (b *Builder) StartWithStreams(f Step, args ...interface{}) error { |
| 108 | +// StartWithHandlers starts the cmd non blocking and calls the given handlers to process input / output |
| 109 | +func (b *Builder) StartWithHandlers(stdinFunc func(io.Writer) error, |
| 110 | + stdoutFunc func(io.Reader) error, |
| 111 | + stderrFunc func(io.Reader) error, |
| 112 | + args ...interface{}) error { |
153 | 113 |
|
154 | 114 | if err := b.Prepare(args...); err != nil { |
155 | 115 | return err |
156 | 116 | } |
157 | 117 |
|
158 | | - run := func() error { return nil } |
159 | | - if f != nil { |
| 118 | + // There's a race between the input/output streams reads and cmd.wait() which |
| 119 | + // will close the pipes even while others are trying to read. |
| 120 | + // So we need to ensure that all the input/output are done before actually waiting |
| 121 | + // on the cmd to complete. |
| 122 | + // To do so, we use a waitgroup |
| 123 | + |
| 124 | + handleInput := noop |
| 125 | + if stdinFunc != nil { |
160 | 126 | pIn, err := b.cmd.StdinPipe() |
161 | 127 | if err != nil { |
162 | 128 | return err |
163 | 129 | } |
164 | 130 |
|
| 131 | + handleInput = func() error { |
| 132 | + defer func() { |
| 133 | + pIn.Close() |
| 134 | + b.wg.Done() |
| 135 | + }() |
| 136 | + return stdinFunc(pIn) |
| 137 | + } |
| 138 | + b.wg.Add(1) |
| 139 | + } |
| 140 | + |
| 141 | + handleStdout := noop |
| 142 | + if stdoutFunc != nil { |
165 | 143 | pOut, err := b.cmd.StdoutPipe() |
166 | 144 | if err != nil { |
167 | 145 | return err |
168 | 146 | } |
169 | | - |
| 147 | + handleStdout = func() error { |
| 148 | + defer func() { |
| 149 | + pOut.Close() |
| 150 | + b.wg.Done() |
| 151 | + }() |
| 152 | + return stdoutFunc(pOut) |
| 153 | + } |
| 154 | + b.wg.Add(1) |
| 155 | + } |
| 156 | + handleStderr := noop |
| 157 | + if stderrFunc != nil { |
170 | 158 | pErr, err := b.cmd.StderrPipe() |
171 | 159 | if err != nil { |
172 | 160 | return err |
173 | 161 | } |
174 | | - |
175 | | - run = func() error { |
176 | | - return f(pIn, pOut, pErr) |
| 162 | + handleStderr = func() error { |
| 163 | + defer func() { |
| 164 | + pErr.Close() |
| 165 | + b.wg.Done() |
| 166 | + }() |
| 167 | + return stderrFunc(pErr) |
177 | 168 | } |
| 169 | + b.wg.Add(1) |
178 | 170 | } |
179 | 171 |
|
180 | 172 | if err := b.cmd.Start(); err != nil { |
181 | 173 | return err |
182 | 174 | } |
183 | 175 |
|
184 | | - return run() |
| 176 | + go handleStdout() |
| 177 | + go handleStderr() |
| 178 | + go handleInput() |
| 179 | + |
| 180 | + return nil |
185 | 181 | } |
186 | 182 |
|
187 | 183 | // Start does a Cmd.Start on the command |
188 | 184 | func (b *Builder) Start(args ...interface{}) error { |
189 | 185 | if err := b.Prepare(args...); err != nil { |
190 | 186 | return err |
191 | 187 | } |
192 | | - return b.StartWithStreams(nil, args...) |
| 188 | + return b.StartWithHandlers(nil, nil, nil, args...) |
193 | 189 | } |
194 | 190 |
|
195 | 191 | // Wait waits for the command to complete |
196 | 192 | func (b *Builder) Wait() error { |
| 193 | + b.wg.Wait() |
197 | 194 | return b.cmd.Wait() |
198 | 195 | } |
199 | 196 |
|
@@ -260,6 +257,15 @@ func (b *Builder) Prepare(args ...interface{}) error { |
260 | 257 | if b.inheritEnvs { |
261 | 258 | b.cmd.Env = append(os.Environ(), b.envs...) |
262 | 259 | } |
| 260 | + if b.stdin != nil { |
| 261 | + b.cmd.Stdin = b.stdin |
| 262 | + } |
| 263 | + if b.stdout != nil { |
| 264 | + b.cmd.Stdout = b.stdout |
| 265 | + } |
| 266 | + if b.stderr != nil { |
| 267 | + b.cmd.Stderr = b.stderr |
| 268 | + } |
263 | 269 | return nil |
264 | 270 | } |
265 | 271 |
|
|
0 commit comments