[go: up one dir, main page]

kube/
lib.rs

1//! Kube is an umbrella-crate for interacting with [Kubernetes](http://kubernetes.io) in Rust.
2//!
3//! # Overview
4//!
5//! Kube contains a Kubernetes client, a controller runtime, a custom resource derive, and various tooling
6//! required for building applications or controllers that interact with Kubernetes.
7//!
8//! The main modules are:
9//!
10//! - [`client`] with the Kubernetes [`Client`] and its layers
11//! - [`config`] for cluster [`Config`]
12//! - [`api`] with the generic Kubernetes [`Api`]
13//! - [`derive`](kube_derive) with the [`CustomResource`] / [`Resource`](kube_derive::Resource) derive for building controllers types
14//! - [`runtime`] with a [`Controller`](crate::runtime::Controller) / [`watcher`](crate::runtime::watcher()) / [`reflector`](crate::runtime::reflector::reflector) / [`Store`](crate::runtime::reflector::Store)
15//! - [`core`] with generics from `apimachinery`
16//!
17//! You can use each of these as you need with the help of the [exported features](https://kube.rs/features/).
18//!
19//! # Using the Client
20//! ```no_run
21//! use futures::{StreamExt, TryStreamExt};
22//! use kube::{Client, api::{Api, ResourceExt, ListParams, PostParams}};
23//! use k8s_openapi::api::core::v1::Pod;
24//!
25//! #[tokio::main]
26//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
27//!     // Infer the runtime environment and try to create a Kubernetes Client
28//!     let client = Client::try_default().await?;
29//!
30//!     // Read pods in the configured namespace into the typed interface from k8s-openapi
31//!     let pods: Api<Pod> = Api::default_namespaced(client);
32//!     for p in pods.list(&ListParams::default()).await? {
33//!         println!("found pod {}", p.name_any());
34//!     }
35//!     Ok(())
36//! }
37//! ```
38//!
39//! For details, see:
40//!
41//! - [`Client`](crate::client) for the extensible Kubernetes client
42//! - [`Api`] for the generic api methods available on Kubernetes resources
43//! - [k8s-openapi](https://docs.rs/k8s-openapi/*/k8s_openapi/) for documentation about the generated Kubernetes types
44//!
45//! # Using the Runtime with the Derive macro
46//!
47//! ```no_run
48//! use schemars::JsonSchema;
49//! use serde::{Deserialize, Serialize};
50//! use serde_json::json;
51//! use futures::{StreamExt, TryStreamExt};
52//! use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
53//! use kube::{
54//!     api::{Api, DeleteParams, PatchParams, Patch, ResourceExt},
55//!     core::CustomResourceExt,
56//!     Client, CustomResource,
57//!     runtime::{watcher, WatchStreamExt, wait::{conditions, await_condition}},
58//! };
59//!
60//! // Our custom resource
61//! #[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
62//! #[kube(group = "clux.dev", version = "v1", kind = "Foo", namespaced)]
63//! pub struct FooSpec {
64//!     info: String,
65//!     #[schemars(length(min = 3))]
66//!     name: String,
67//!     replicas: i32,
68//! }
69//!
70//! #[tokio::main]
71//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
72//!     let client = Client::try_default().await?;
73//!     let crds: Api<CustomResourceDefinition> = Api::all(client.clone());
74//!
75//!     // Apply the CRD so users can create Foo instances in Kubernetes
76//!     crds.patch("foos.clux.dev",
77//!         &PatchParams::apply("my_manager"),
78//!         &Patch::Apply(Foo::crd())
79//!     ).await?;
80//!
81//!     // Wait for the CRD to be ready
82//!     tokio::time::timeout(
83//!         std::time::Duration::from_secs(10),
84//!         await_condition(crds, "foos.clux.dev", conditions::is_crd_established())
85//!     ).await?;
86//!
87//!     // Watch for changes to foos in the configured namespace
88//!     let foos: Api<Foo> = Api::default_namespaced(client.clone());
89//!     let wc = watcher::Config::default();
90//!     let mut apply_stream = watcher(foos, wc).applied_objects().boxed();
91//!     while let Some(f) = apply_stream.try_next().await? {
92//!         println!("saw apply to {}", f.name_any());
93//!     }
94//!     Ok(())
95//! }
96//! ```
97//!
98//! For details, see:
99//!
100//! - [`CustomResource`] for documentation how to configure custom resources
101//! - [`runtime::watcher`](crate::runtime::watcher()) for how to long-running watches work and why you want to use this over [`Api::watch`](crate::Api::watch)
102//! - [`runtime`] for abstractions that help with more complicated Kubernetes application
103//!
104//! # Examples
105//! A large list of complete, runnable examples with explainations are available in the [examples folder](https://github.com/kube-rs/kube/tree/main/examples).
106#![cfg_attr(docsrs, feature(doc_cfg))]
107
108macro_rules! cfg_client {
109    ($($item:item)*) => {
110        $(
111            #[cfg_attr(docsrs, doc(cfg(feature = "client")))]
112            #[cfg(feature = "client")]
113            $item
114        )*
115    }
116}
117macro_rules! cfg_config {
118    ($($item:item)*) => {
119        $(
120            #[cfg_attr(docsrs, doc(cfg(feature = "config")))]
121            #[cfg(feature = "config")]
122            $item
123        )*
124    }
125}
126
127macro_rules! cfg_error {
128    ($($item:item)*) => {
129        $(
130            #[cfg_attr(docsrs, doc(cfg(any(feature = "config", feature = "client"))))]
131            #[cfg(any(feature = "config", feature = "client"))]
132            $item
133        )*
134    }
135}
136
137cfg_client! {
138    pub use kube_client::api;
139    pub use kube_client::discovery;
140    pub use kube_client::client;
141
142    #[doc(inline)]
143    pub use api::Api;
144    #[doc(inline)]
145    pub use client::Client;
146    #[doc(inline)]
147    pub use discovery::Discovery;
148}
149
150cfg_config! {
151    pub use kube_client::config;
152    #[doc(inline)]
153    pub use config::Config;
154}
155
156cfg_error! {
157    pub use kube_client::error;
158    #[doc(inline)] pub use error::Error;
159    /// Convient alias for `Result<T, Error>`
160    pub type Result<T, E = Error> = std::result::Result<T, E>;
161}
162
163#[cfg(feature = "derive")]
164#[cfg_attr(docsrs, doc(cfg(feature = "derive")))]
165pub use kube_derive::CustomResource;
166
167#[cfg(feature = "derive")]
168#[cfg_attr(docsrs, doc(cfg(feature = "derive")))]
169pub use kube_derive::Resource;
170
171#[cfg(feature = "derive")]
172#[cfg_attr(docsrs, doc(cfg(feature = "derive")))]
173pub use kube_derive::CELSchema;
174
175#[cfg(feature = "runtime")]
176#[cfg_attr(docsrs, doc(cfg(feature = "runtime")))]
177#[doc(inline)]
178pub use kube_runtime as runtime;
179
180pub use crate::core::{CustomResourceExt, Resource, ResourceExt};
181#[doc(inline)] pub use kube_core as core;
182
183// Mock tests for the runtime
184#[cfg(test)]
185#[cfg(all(feature = "derive", feature = "runtime"))]
186mod mock_tests;
187
188pub mod prelude {
189    //! A prelude for kube. Reduces the number of duplicated imports.
190    //!
191    //! This prelude is similar to the standard library's prelude in that you'll
192    //! almost always want to import its entire contents, but unlike the
193    //! standard library's prelude you'll have to do so manually:
194    //!
195    //! ```
196    //! use kube::prelude::*;
197    //! ```
198    //!
199    //! The prelude may grow over time as additional items see ubiquitous use.
200
201    #[cfg(feature = "client")]
202    #[allow(unreachable_pub)]
203    pub use crate::client::ConfigExt as _;
204
205    #[cfg(feature = "unstable-client")] pub use crate::client::scope::NamespacedRef;
206
207    #[allow(unreachable_pub)] pub use crate::core::PartialObjectMetaExt as _;
208    #[allow(unreachable_pub)] pub use crate::core::SelectorExt as _;
209    pub use crate::{core::crd::CustomResourceExt as _, Resource as _, ResourceExt as _};
210
211    #[cfg(feature = "runtime")] pub use crate::runtime::utils::WatchStreamExt as _;
212}
213
214// Tests that require a cluster and the complete feature set
215// Can be run with `cargo test -p kube --lib --features=runtime,derive -- --ignored`
216#[cfg(test)]
217#[cfg(all(feature = "derive", feature = "client"))]
218mod test {
219    use crate::{
220        api::{DeleteParams, Patch, PatchParams},
221        Api, Client, CustomResourceExt, Resource, ResourceExt,
222    };
223    use kube_derive::CustomResource;
224    use schemars::JsonSchema;
225    use serde::{Deserialize, Serialize};
226
227    #[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
228    #[kube(group = "clux.dev", version = "v1", kind = "Foo", namespaced)]
229    #[kube(status = "FooStatus")]
230    #[kube(scale(
231        spec_replicas_path = ".spec.replicas",
232        status_replicas_path = ".status.replicas"
233    ))]
234    #[kube(crates(kube_core = "crate::core"))] // for dev-dep test structure
235    pub struct FooSpec {
236        name: String,
237        info: Option<String>,
238        replicas: isize,
239    }
240
241    #[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
242    pub struct FooStatus {
243        is_bad: bool,
244        replicas: isize,
245    }
246
247    #[tokio::test]
248    #[ignore = "needs kubeconfig"]
249    async fn custom_resource_generates_correct_core_structs() {
250        use crate::core::{ApiResource, DynamicObject, GroupVersionKind};
251        let client = Client::try_default().await.unwrap();
252
253        let gvk = GroupVersionKind::gvk("clux.dev", "v1", "Foo");
254        let api_resource = ApiResource::from_gvk(&gvk);
255        let a1: Api<DynamicObject> = Api::namespaced_with(client.clone(), "myns", &api_resource);
256        let a2: Api<Foo> = Api::namespaced(client, "myns");
257
258        // make sure they return the same url_path through their impls
259        assert_eq!(a1.resource_url(), a2.resource_url());
260    }
261
262    use k8s_openapi::{
263        api::core::v1::ConfigMap,
264        apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition,
265    };
266    #[tokio::test]
267    #[ignore = "needs cluster (creates + patches foo crd)"]
268    #[cfg(all(feature = "derive", feature = "runtime"))]
269    async fn derived_resource_queriable_and_has_subresources() -> Result<(), Box<dyn std::error::Error>> {
270        use crate::runtime::wait::{await_condition, conditions};
271
272        use serde_json::json;
273        let client = Client::try_default().await?;
274        let ssapply = PatchParams::apply("kube").force();
275        let crds: Api<CustomResourceDefinition> = Api::all(client.clone());
276        // Server-side apply CRD and wait for it to get ready
277        crds.patch("foos.clux.dev", &ssapply, &Patch::Apply(Foo::crd()))
278            .await?;
279        let establish = await_condition(crds.clone(), "foos.clux.dev", conditions::is_crd_established());
280        let _ = tokio::time::timeout(std::time::Duration::from_secs(10), establish).await?;
281        // Use it
282        let foos: Api<Foo> = Api::default_namespaced(client.clone());
283        // Apply from generated struct
284        {
285            let foo = Foo::new("baz", FooSpec {
286                name: "baz".into(),
287                info: Some("old baz".into()),
288                replicas: 1,
289            });
290            let o = foos.patch("baz", &ssapply, &Patch::Apply(&foo)).await?;
291            assert_eq!(o.spec.name, "baz");
292            let oref = o.object_ref(&());
293            assert_eq!(oref.name.unwrap(), "baz");
294            assert_eq!(oref.uid, o.uid());
295        }
296        // Apply from partial json!
297        {
298            let patch = json!({
299                "apiVersion": "clux.dev/v1",
300                "kind": "Foo",
301                "spec": {
302                    "name": "foo",
303                    "replicas": 2
304                }
305            });
306            let o = foos.patch("baz", &ssapply, &Patch::Apply(patch)).await?;
307            assert_eq!(o.spec.replicas, 2, "patching spec updated spec.replicas");
308        }
309        // check subresource
310        {
311            let scale = foos.get_scale("baz").await?;
312            assert_eq!(scale.spec.unwrap().replicas, Some(2));
313            let status = foos.get_status("baz").await?;
314            assert!(status.status.is_none(), "nothing has set status");
315        }
316        // set status subresource
317        {
318            let fs = serde_json::json!({"status": FooStatus { is_bad: false, replicas: 1 }});
319            let o = foos
320                .patch_status("baz", &Default::default(), &Patch::Merge(&fs))
321                .await?;
322            assert!(o.status.is_some(), "status set after patch_status");
323        }
324        // set scale subresource
325        {
326            let fs = serde_json::json!({"spec": { "replicas": 3 }});
327            let o = foos
328                .patch_scale("baz", &Default::default(), &Patch::Merge(&fs))
329                .await?;
330            assert_eq!(o.status.unwrap().replicas, 1, "scale replicas got patched");
331            let linked_replicas = o.spec.unwrap().replicas.unwrap();
332            assert_eq!(linked_replicas, 3, "patch_scale updates linked spec.replicas");
333        }
334
335        // cleanup
336        foos.delete_collection(&DeleteParams::default(), &Default::default())
337            .await?;
338        crds.delete("foos.clux.dev", &DeleteParams::default()).await?;
339        Ok(())
340    }
341
342    #[tokio::test]
343    #[ignore = "needs cluster (lists pods)"]
344    async fn custom_serialized_objects_are_queryable_and_iterable() -> Result<(), Box<dyn std::error::Error>>
345    {
346        use crate::core::{
347            object::{HasSpec, HasStatus, NotUsed, Object},
348            ApiResource,
349        };
350        use k8s_openapi::api::core::v1::Pod;
351        #[derive(Clone, Deserialize, Debug)]
352        struct PodSpecSimple {
353            containers: Vec<ContainerSimple>,
354        }
355        #[derive(Clone, Deserialize, Debug)]
356        struct ContainerSimple {
357            #[allow(dead_code)]
358            image: String,
359        }
360        type PodSimple = Object<PodSpecSimple, NotUsed>;
361
362        // use known type information from pod (can also use discovery for this)
363        let ar = ApiResource::erase::<Pod>(&());
364
365        let client = Client::try_default().await?;
366        let api: Api<PodSimple> = Api::default_namespaced_with(client, &ar);
367        let mut list = api.list(&Default::default()).await?;
368        // check we can mutably iterate over ObjectList
369        for pod in &mut list {
370            pod.spec_mut().containers = vec![];
371            *pod.status_mut() = None;
372            pod.annotations_mut()
373                .entry("kube-seen".to_string())
374                .or_insert_with(|| "yes".to_string());
375            pod.labels_mut()
376                .entry("kube.rs".to_string())
377                .or_insert_with(|| "hello".to_string());
378            pod.finalizers_mut().push("kube-finalizer".to_string());
379            pod.managed_fields_mut().clear();
380            // NB: we are **not** pushing these back upstream - (Api::apply or Api::replace needed for it)
381        }
382        // check we can iterate over ObjectList normally - and check the mutations worked
383        for pod in list {
384            assert!(pod.annotations().get("kube-seen").is_some());
385            assert!(pod.labels().get("kube.rs").is_some());
386            assert!(pod.finalizers().contains(&"kube-finalizer".to_string()));
387            assert!(pod.spec().containers.is_empty());
388            assert!(pod.managed_fields().is_empty());
389        }
390        Ok(())
391    }
392
393    #[tokio::test]
394    #[ignore = "needs cluster (fetches api resources, and lists all)"]
395    #[cfg(feature = "derive")]
396    async fn derived_resources_discoverable() -> Result<(), Box<dyn std::error::Error>> {
397        use crate::{
398            core::{DynamicObject, GroupVersion, GroupVersionKind},
399            discovery::{self, verbs, ApiGroup, Discovery, Scope},
400            runtime::wait::{await_condition, conditions, Condition},
401        };
402
403        #[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
404        #[kube(group = "kube.rs", version = "v1", kind = "TestCr", namespaced)]
405        #[kube(crates(kube_core = "crate::core"))] // for dev-dep test structure
406        struct TestCrSpec {}
407
408        let client = Client::try_default().await?;
409
410        // install crd is installed
411        let crds: Api<CustomResourceDefinition> = Api::all(client.clone());
412        let ssapply = PatchParams::apply("kube").force();
413        crds.patch("testcrs.kube.rs", &ssapply, &Patch::Apply(TestCr::crd()))
414            .await?;
415        let establish = await_condition(crds.clone(), "testcrs.kube.rs", conditions::is_crd_established());
416        let crd = tokio::time::timeout(std::time::Duration::from_secs(10), establish).await??;
417        assert!(conditions::is_crd_established().matches_object(crd.as_ref()));
418        tokio::time::sleep(std::time::Duration::from_secs(2)).await; // Established condition is actually not enough for api discovery :(
419
420        // create partial information for it to discover
421        let gvk = GroupVersionKind::gvk("kube.rs", "v1", "TestCr");
422        let gv = GroupVersion::gv("kube.rs", "v1");
423
424        // discover by both (recommended kind on groupversion) and (pinned gvk) and they should equal
425        let apigroup = discovery::oneshot::pinned_group(&client, &gv).await?;
426        let (ar1, caps1) = apigroup.recommended_kind("TestCr").unwrap();
427        let (ar2, caps2) = discovery::pinned_kind(&client, &gvk).await?;
428        assert_eq!(caps1.operations.len(), caps2.operations.len(), "unequal caps");
429        assert_eq!(ar1, ar2, "unequal apiresource");
430        assert_eq!(DynamicObject::api_version(&ar2), "kube.rs/v1", "unequal dynver");
431
432        // run (almost) full discovery
433        let discovery = Discovery::new(client.clone())
434            // skip something in discovery (clux.dev crd being mutated in other tests)
435            .exclude(&["rbac.authorization.k8s.io", "clux.dev"])
436            .run()
437            .await?;
438
439        // check our custom resource first by resolving within groups
440        assert!(discovery.has_group("kube.rs"), "missing group kube.rs");
441        let (ar, _caps) = discovery.resolve_gvk(&gvk).unwrap();
442        assert_eq!(ar.group, gvk.group, "unexpected discovered group");
443        assert_eq!(ar.version, gvk.version, "unexcepted discovered ver");
444        assert_eq!(ar.kind, gvk.kind, "unexpected discovered kind");
445
446        // check all non-excluded groups that are iterable
447        let mut groups = discovery.groups_alphabetical().into_iter();
448        let firstgroup = groups.next().unwrap();
449        assert_eq!(firstgroup.name(), ApiGroup::CORE_GROUP, "core not first");
450        for group in groups {
451            for (ar, caps) in group.recommended_resources() {
452                if !caps.supports_operation(verbs::LIST) {
453                    continue;
454                }
455                let api: Api<DynamicObject> = if caps.scope == Scope::Namespaced {
456                    Api::default_namespaced_with(client.clone(), &ar)
457                } else {
458                    Api::all_with(client.clone(), &ar)
459                };
460                api.list(&Default::default()).await?;
461            }
462        }
463
464        // cleanup
465        crds.delete("testcrs.kube.rs", &DeleteParams::default()).await?;
466        Ok(())
467    }
468
469    #[tokio::test]
470    #[ignore = "needs cluster (will create await a pod)"]
471    #[cfg(feature = "runtime")]
472    async fn pod_can_await_conditions() -> Result<(), Box<dyn std::error::Error>> {
473        use crate::{
474            api::{DeleteParams, PostParams},
475            runtime::wait::{await_condition, conditions, delete::delete_and_finalize, Condition},
476            Api, Client,
477        };
478        use k8s_openapi::api::core::v1::Pod;
479        use std::time::Duration;
480        use tokio::time::timeout;
481
482        let client = Client::try_default().await?;
483        let pods: Api<Pod> = Api::default_namespaced(client);
484
485        // create busybox pod that's alive for at most 20s
486        let data: Pod = serde_json::from_value(serde_json::json!({
487            "apiVersion": "v1",
488            "kind": "Pod",
489            "metadata": {
490                "name": "busybox-kube4",
491                "labels": { "app": "kube-rs-test" },
492            },
493            "spec": {
494                "terminationGracePeriodSeconds": 1,
495                "restartPolicy": "Never",
496                "containers": [{
497                  "name": "busybox",
498                  "image": "busybox:1.34.1",
499                  "command": ["sh", "-c", "sleep 20"],
500                }],
501            }
502        }))?;
503
504        let pp = PostParams::default();
505        assert_eq!(
506            data.name_unchecked(),
507            pods.create(&pp, &data).await?.name_unchecked()
508        );
509
510        // Watch it phase for a few seconds
511        let is_running = await_condition(pods.clone(), "busybox-kube4", conditions::is_pod_running());
512        let _ = timeout(Duration::from_secs(15), is_running).await?;
513
514        // Verify we can get it
515        let pod = pods.get("busybox-kube4").await?;
516        assert_eq!(pod.spec.as_ref().unwrap().containers[0].name, "busybox");
517
518        // Wait for a more complicated condition: ContainersReady AND Initialized
519        // TODO: remove these once we can write these functions generically
520        fn is_each_container_ready() -> impl Condition<Pod> {
521            |obj: Option<&Pod>| {
522                if let Some(o) = obj {
523                    if let Some(s) = &o.status {
524                        if let Some(conds) = &s.conditions {
525                            if let Some(pcond) = conds.iter().find(|c| c.type_ == "ContainersReady") {
526                                return pcond.status == "True";
527                            }
528                        }
529                    }
530                }
531                false
532            }
533        }
534        let is_fully_ready = await_condition(
535            pods.clone(),
536            "busybox-kube4",
537            conditions::is_pod_running().and(is_each_container_ready()),
538        );
539        let _ = timeout(Duration::from_secs(10), is_fully_ready).await?;
540
541        // Delete it - and wait for deletion to complete
542        let dp = DeleteParams::default();
543        delete_and_finalize(pods.clone(), "busybox-kube4", &dp).await?;
544
545        // verify it is properly gone
546        assert!(pods.get("busybox-kube4").await.is_err());
547
548        Ok(())
549    }
550
551    #[tokio::test]
552    #[ignore = "needs cluster (lists cms)"]
553    async fn api_get_opt_handles_404() -> Result<(), Box<dyn std::error::Error>> {
554        let client = Client::try_default().await?;
555        let api = Api::<ConfigMap>::default_namespaced(client);
556        assert_eq!(
557            api.get_opt("this-cm-does-not-exist-ajklisdhfqkljwhreq").await?,
558            None
559        );
560        Ok(())
561    }
562}