38
loading...
This website collects cookies to deliver better user experience
kube-apiserver
, without splitting an internal and external interface, we can interact with the cluster and any other system to integrate both from the same application (Controller) and even use custom resources to describe our unique operations, know as the Operator Pattern.k8s.io/client-go
project.k8s.io/client-go/kubernets.ClientSet
, which is a typed client. What that means is that this interface provides exclusive methods for each resource on Kubernetes (think of Pods, Deployments, Services, everything!) and operation (Create, Get, List, Watch, Update, Patch and Delete). It is obvious why you should, whenever possible, prefer to use this client.k8s.io/client-go/dynamic.Interface
, the dynamic client, will enter the game. This client has a twofold purpose:go.mod
.k8s.io/client-go
project and see how we can leverage it.The code below assumes to be running inside a Kubernetes cluster.
func newClient() (dynamic.Interface, error) {
config, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
dynClient, err := dynamic.NewForConfig(config)
if err != nil {
return nil, err
}
return dynClient, nil
}
CoreV1().Pod
. Instead, you need to first provide a schema.GroupVersionResource
, which is a Golang type that provides the necessary information to construct an HTTP request to the cluster API Server.var monboDBResource = schema.GroupVersionResource{Group: "mongodbcommunity.mongodb.com", Version: "v1", Resource: "mongodbcommunity"}
func ListMongoDB(ctx context.Context, client dynamic.Interface, namespace string) ([]unstructured.Unstructured, error) {
// GET /apis/mongodbcommunity.mongodb.com/v1/namespaces/{namespace}/mongodbcommunity/
list, err := client.Resource(monboDBResource).Namespace(namespace).List(ctx, metav1.ListOptions{})
if err != nil {
return nil, err
}
return list.Items, nil
}
.Namespace(namespace)
is obligatory, even if you will use an empty string to list on all namespaces.unstructured.Unstructured
. This is a special type that encapsulates an arbitrary JSON while also complying with standard Kubernetes interfaces like runtime.Object
, but most importantly it provides a set of helpers on the unstructured
package to manipulate this data.// ScaleMongoDB changes the number of members by the given proportion,
// which should be 0 =< proportion < 1.
func ScaleMongoDB(ctx context.Context, client dynamic.Interface, name string, namespace string, proportion uint) error {
if proportion > 1 {
return fmt.Errorf("proportion should be between 0 =< proportion < 1")
}
mongoDBClient := client.Resource(monboDBResource).Namespace(namespace)
mdb, err := mongoDBClient.Get(ctx, name, metav1.GetOptions{})
if err != nil {
return err
}
members, found, err := unstructured.NestedInt64(mdb.UnstructuredContent(), "spec", "members")
if err != nil {
return err
}
if !found {
return fmt.Errorf("members field not found on MongoDB spec")
}
scaled := int(members) * (1 + int(proportion))
patch := []interface{}{
map[string]interface{}{
"op": "replace",
"path": "/spec/members",
"value": scaled,
},
}
payload, err := json.Marshal(patch)
if err != nil {
return err
}
_, err = mongoDBClient.Patch(ctx, name, types.JSONPatchType, payload, metav1.PatchOptions{})
if err != nil {
return err
}
return nil
}
unstructured.NestedInt64
to access only the field that we are interested in, keeping our coupling to the MongoDB CRD to a minimum while also being able to manipulate the resource data with type safety.unstructured
package has lots of helpers like this, not only for reading but also for writing to any field on the resource.scheme.GroupVersionResource
and handle the unstructured.Unstructured
result.k8s.io/client-go
, that runs a handler when changes are detected, created from a typed client. Luckily the dynamic
package also provides an Informer component that we can use.PersistentVolumeClaims
:package main
import (
"fmt"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"time"
)
const maxRetries = 3
var monboDBResource = schema.GroupVersionResource{Group: "mongodbcommunity.mongodb.com", Version: "v1", Resource: "mongodbcommunity"}
type MongoDBController struct {
informer cache.SharedIndexInformer
stopper chan struct{}
queue workqueue.RateLimitingInterface
}
func NewMongoDBController(client dynamic.Interface) (*MongoDBController, error) {
dynInformer := dynamicinformer.NewDynamicSharedInformerFactory(client, 0)
informer := dynInformer.ForResource(monboDBResource).Informer()
stopper := make(chan struct{})
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: func(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
})
return &MongoDBController{
informer: informer,
queue: queue,
stopper: stopper,
}, nil
}
func (m *MongoDBController) Stop() {
close(m.stopper)
}
func (m *MongoDBController) Run() {
defer utilruntime.HandleCrash()
defer m.queue.ShutDown()
go m.informer.Run(m.stopper)
// wait for the caches to synchronize before starting the worker
if !cache.WaitForCacheSync(m.stopper, m.informer.HasSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
return
}
// runWorker will loop until some problem happens. The wait.Until will then restart the worker after one second
wait.Until(m.runWorker, time.Second, m.stopper)
}
func (m *MongoDBController) runWorker() {
for {
key, quit := m.queue.Get()
if quit {
return
}
err := m.processItem(key.(string))
if err == nil {
m.queue.Forget(key)
} else if m.queue.NumRequeues(key) < maxRetries {
m.queue.AddRateLimited(key)
} else {
m.queue.Forget(key)
utilruntime.HandleError(err)
}
m.queue.Done(key)
}
}
func (m *MongoDBController) processItem(mongodb string) error {
// Clean up PVCs
return nil
}
dynamic
package provides an equivalent fake client that allows for stubbing objects and asserting actions performed using it.package main
import (
"context"
"k8s.io/apimachinery/pkg/runtime"
dynamicfake "k8s.io/client-go/dynamic/fake"
)
func TestDynamicClient(t *testing.T) {
// Setup an Object as mock on the client
// Write it like its YAML manifest
mdb := &unstructured.Unstructured{}
mdb.SetUnstructuredContent(map[string]interface{}{
"apiVersion": "mongodbcommunity.mongodb.com/v1",
"kind": "MongoDBCommunity",
"metadata": map[string]interface{} {
"name": "mongodb-test",
"namespace": "default",
},
"spec": map[string]interface{}{
"members": 3,
},
})
dynamicClient := dynamicfake.NewSimpleDynamicClient(runtime.NewScheme(), mdb)
// Run any logic that depend on the dynamic client
NotifyMongoDBs(context.Background(), dynamicClient)
AssertActions(t, dynamicClient.Actions(), []ExpectedAction{
{
Verb: "list",
Namespace: "default",
Resource: "mongodbcommunity",
},
})
}
unestructured.Unestructured
type we can create stub Kubernetes objects using the same syntax as in YAML, but with maps.dynamicClient.Actions()
to see all operations that were performed by our code. However, manually asserting these actions on every test often lead to unreadable code and brittle assertions.AssertActions
that verify if every expected action can be found in the performed actions. An important note is that this function does not perform an exact list match, i.e. if a delete operation was performed using the client the test would not break, the only condition for the AssertActions
to fail is if the list operation provided on the expected list isn’t found. One could change the asserting function or make a sibling function that validates only if the expected actions were performed.type ExpectedAction struct {
Verb string
Name string
Namespace string
Resource string
// Patch action
PatchType types.PatchType
PatchPayload []map[string]interface{}
}
func AssertActions(t *testing.T, got []kubetesting.Action, expected []ExpectedAction) {
if len(expected) > len(got) {
t.Fatalf("executed actions too short, expected %d, got %d", len(expected), len(got))
return
}
for i, expectedAction := range expected {
if !AssertExpectedAction(got, expectedAction) {
t.Fatalf("action %d does not match any of the got actions", i)
}
}
}
func AssertExpectedAction(got []kubetesting.Action, expectedAction ExpectedAction) bool {
for _, gotAction := range got {
switch expectedAction.Verb {
case "get":
getAction, ok := gotAction.(kubetesting.GetAction)
if !ok {
continue
}
if getAction.GetName() != expectedAction.Name {
continue
}
if !validateNamespaceAndResource(getAction, expectedAction) {
continue
}
return true
case "list":
listAction, ok := gotAction.(kubetesting.ListAction)
if !ok {
continue
}
if !validateNamespaceAndResource(listAction, expectedAction) {
continue
}
return true
case "watch":
watchAction, ok := gotAction.(kubetesting.WatchAction)
if !ok {
continue
}
if !validateNamespaceAndResource(watchAction, expectedAction) {
continue
}
return true
case "create":
createAction, ok := gotAction.(kubetesting.CreateAction)
if !ok {
continue
}
if !validateNamespaceAndResource(createAction, expectedAction) {
continue
}
return true
case "update":
updateAction, ok := gotAction.(kubetesting.UpdateAction)
if !ok {
continue
}
if !validateNamespaceAndResource(updateAction, expectedAction) {
continue
}
return true
case "delete":
deleteAction, ok := gotAction.(kubetesting.DeleteAction)
if !ok {
continue
}
if deleteAction.GetName() != expectedAction.Name {
continue
}
if !validateNamespaceAndResource(deleteAction, expectedAction) {
continue
}
return true
case "patch":
patchAction, ok := gotAction.(kubetesting.PatchAction)
if !ok {
continue
}
if patchAction.GetName() != expectedAction.Name {
continue
}
if !validateNamespaceAndResource(patchAction, expectedAction) {
continue
}
if patchAction.GetPatchType() != expectedAction.PatchType {
continue
}
patchBytes, err := json.Marshal(expectedAction.PatchPayload)
if err != nil {
continue
}
if !bytes.Equal(patchAction.GetPatch(), patchBytes) {
continue
}
return true
}
}
return false
}
func validateNamespaceAndResource(action kubetesting.Action, expectedAction ExpectedAction) bool {
return action.GetNamespace() == expectedAction.Namespace && action.GetResource().Resource == expectedAction.Resource
}
k8s.io/client-go
but also other like sigs.k8s.io/controller-runtime
project and the Kubernetes Reference API documentation.