@@ -5,11 +5,17 @@ use std::sync::OnceLock;
55use anyhow:: { anyhow, bail, Context , Error , Result } ;
66use bollard;
77use futures:: TryFutureExt ;
8+ use k8s_openapi:: api:: {
9+ apps:: v1:: Deployment ,
10+ core:: v1:: { Pod , Service } ,
11+ networking:: v1:: Ingress ,
12+ } ;
813use kube:: {
914 self ,
10- api:: { DynamicObject , GroupVersionKind , TypeMeta } ,
15+ api:: { DynamicObject , GroupVersionKind , Patch , PatchParams } ,
1116 core:: ResourceExt ,
12- discovery:: { ApiCapabilities , ApiResource , Discovery , Scope } ,
17+ discovery:: { ApiCapabilities , ApiResource } ,
18+ runtime:: { conditions, wait:: await_condition} ,
1319} ;
1420use s3;
1521use simplelog:: * ;
@@ -201,3 +207,141 @@ pub async fn kube_api_for(
201207 Ok ( kube:: Api :: default_namespaced_with ( client, & resource) )
202208 }
203209}
210+
211+ /// Apply multi-document manifest file, return created resources
212+ pub async fn apply_manifest_yaml (
213+ client : & kube:: Client ,
214+ manifest : & str ,
215+ ) -> Result < Vec < DynamicObject > > {
216+ // set ourself as the owner for managed fields
217+ // https://kubernetes.io/docs/reference/using-api/server-side-apply/#managers
218+ let pp = PatchParams :: apply ( "beavercds" ) . force ( ) ;
219+
220+ let mut results = vec ! [ ] ;
221+
222+ // this manifest has multiple documents (crds, deployment)
223+ for yaml in multidoc_deserialize ( manifest) ? {
224+ let obj: DynamicObject = serde_yml:: from_value ( yaml) ?;
225+ debug ! (
226+ "applying resource {} {}" ,
227+ obj. types. clone( ) . unwrap_or_default( ) . kind,
228+ obj. name_any( )
229+ ) ;
230+
231+ let obj_api = kube_api_for ( & obj, client. clone ( ) ) . await ?;
232+ match obj_api
233+ // patch is idempotent and will create if not present
234+ . patch ( & obj. name_any ( ) , & pp, & Patch :: Apply ( & obj) )
235+ . await
236+ {
237+ Ok ( d) => {
238+ results. push ( d) ;
239+ Ok ( ( ) )
240+ }
241+ // if error is from cluster api, mark it as such
242+ Err ( kube:: Error :: Api ( ae) ) => {
243+ // Err(kube::Error::Api(ae).into())
244+ Err ( anyhow ! ( ae) . context ( "error from cluster when deploying" ) )
245+ }
246+ // other errors could be anything
247+ Err ( e) => Err ( anyhow ! ( e) ) . context ( "unknown error when deploying" ) ,
248+ } ?;
249+ }
250+
251+ Ok ( results)
252+ }
253+
254+ /// Deserialize multi-document yaml string into a Vec of the documents
255+ fn multidoc_deserialize ( data : & str ) -> Result < Vec < serde_yml:: Value > > {
256+ use serde:: Deserialize ;
257+
258+ let mut docs = vec ! [ ] ;
259+ for de in serde_yml:: Deserializer :: from_str ( data) {
260+ match serde_yml:: Value :: deserialize ( de) ? {
261+ // discard any empty documents (e.g. from trailing ---)
262+ serde_yml:: Value :: Null => ( ) ,
263+ not_null => docs. push ( not_null) ,
264+ } ;
265+ }
266+ Ok ( docs)
267+
268+ // // deserialize all chunks
269+ // serde_yml::Deserializer::from_str(data)
270+ // .map(serde_yml::Value::deserialize)
271+ // // discard any empty documents (e.g. from trailing ---)
272+ // .filter_ok(|val| val != &serde_yml::Value::Null)
273+ // // coerce errors to Anyhow
274+ // .map(|r| r.map_err(|e| e.into()))
275+ // .collect()
276+ }
277+
278+ /// Check the status of the passed object and wait for it to become ready.
279+ ///
280+ /// This function does not provide a timeout. Callers will need to wrap this with a timeout instead.
281+ pub async fn wait_for_status ( client : & kube:: Client , object : & DynamicObject ) -> Result < ( ) > {
282+ debug ! (
283+ "waiting for ok status for {} {}" ,
284+ object. types. clone( ) . unwrap_or_default( ) . kind,
285+ object. name_any( )
286+ ) ;
287+
288+ // handle each separate object type differently
289+ match object. types . clone ( ) . unwrap_or_default ( ) . kind . as_str ( ) {
290+ // wait for Pod to become running
291+ "Pod" => {
292+ let api = kube:: Api :: namespaced ( client. clone ( ) , & object. namespace ( ) . unwrap ( ) ) ;
293+ let x = await_condition ( api, & object. name_any ( ) , conditions:: is_pod_running ( ) ) . await ?;
294+ }
295+
296+ // wait for Deployment to complete rollout
297+ "Deployment" => {
298+ let api = kube:: Api :: namespaced ( client. clone ( ) , & object. namespace ( ) . unwrap ( ) ) ;
299+ await_condition (
300+ api,
301+ & object. name_any ( ) ,
302+ conditions:: is_deployment_completed ( ) ,
303+ )
304+ . await ?;
305+ }
306+
307+ // wait for Ingress to get IP from ingress controller
308+ "Ingress" => {
309+ let api = kube:: Api :: namespaced ( client. clone ( ) , & object. namespace ( ) . unwrap ( ) ) ;
310+ await_condition (
311+ api,
312+ & object. name_any ( ) ,
313+ conditions:: is_ingress_provisioned ( ) ,
314+ )
315+ . await ?;
316+ }
317+
318+ // wait for LoadBalancer service to get IP
319+ "Service" => {
320+ let api = kube:: Api :: namespaced ( client. clone ( ) , & object. namespace ( ) . unwrap ( ) ) ;
321+ let svc: Service = api. get ( & object. name_any ( ) ) . await ?;
322+
323+ // we only care about checking LoadBalancer-type services, return Ok
324+ // for any non-LB services
325+ //
326+ // TODO: do we care about NodePorts? don't need to check any atm
327+ if svc. spec . unwrap_or_default ( ) . type_ != Some ( "LoadBalancer" . to_string ( ) ) {
328+ trace ! (
329+ "not checking status for internal service {}" ,
330+ object. name_any( )
331+ ) ;
332+ return Ok ( ( ) ) ;
333+ }
334+
335+ await_condition (
336+ api,
337+ & object. name_any ( ) ,
338+ conditions:: is_service_loadbalancer_provisioned ( ) ,
339+ )
340+ . await ?;
341+ }
342+
343+ other => trace ! ( "not checking status for resource type {other}" ) ,
344+ } ;
345+
346+ Ok ( ( ) )
347+ }
0 commit comments