#!/usr/bin/python3
# coding: utf-8
# __________ ________________________________________________ #
# kube_apply - apply Yaml similar to kubectl apply -f file.yaml #
# #
# (C) 2019 Hermann Vosseler <Ichthyostega@web.de> #
# This is OpenSource software; licensed under Apache License v2+ #
# ############################################################### #
'''
Utility for the official Kubernetes python client: apply Yaml data.
While still limited to some degree, this utility attempts to provide
functionality similar to `kubectl apply -f`
- load and parse Yaml
- try to figure out the object type and API to use
- figure out if the resource already exists, in which case
it needs to be patched or replaced alltogether.
- otherwise just create a new resource.
Based on inspiration from `kubernetes/utils/create_from_yaml.py`
@since: 2/2019
@author: Ichthyostega
'''
import re
import yaml
import logging
import kubernetes.client
def runUsageExample():
''' demonstrate usage by creating a simple Pod through default client
'''
logging.basicConfig(level=logging.DEBUG)
#
# KUBECONFIG = '/path/to/special/kubecfg.yaml'
# import kubernetes.config
# client = kubernetes.config.new_client_from_config(config_file=KUBECONFIG)
# # --or alternatively--
# kubernetes.config.load_kube_config(config_file=KUBECONFIG)
fromYaml('''
kind: Pod
apiVersion: v1
metadata:
name: dummy-pod
labels:
blow: job
spec:
containers:
- name: sleepr
image: busybox
command:
- /bin/sh
- -c
- sleep 24000
''')
def fromYaml(rawData, client=None, **kwargs):
''' invoke the K8s API to create or replace an object given as YAML spec.
@param rawData: either a string or an opened input stream with a
YAML formatted spec, as you'd use for `kubectl apply -f`
@param client: (optional) preconfigured client environment to use for invocation
@param kwargs: (optional) further arguments to pass to the create/replace call
@return: response object from Kubernetes API call
'''
for obj in yaml.load_all(rawData):
createOrUpdateOrReplace(obj, client, **kwargs)
def createOrUpdateOrReplace(obj, client=None, **kwargs):
''' invoke the K8s API to create or replace a kubernetes object.
The first attempt is to create(insert) this object; when this is rejected because
of an existing object with same name, we attempt to patch this existing object.
As a last resort, if even the patch is rejected, we *delete* the existing object
and recreate from scratch.
@param obj: complete object specification, including API version and metadata.
@param client: (optional) preconfigured client environment to use for invocation
@param kwargs: (optional) further arguments to pass to the create/replace call
@return: response object from Kubernetes API call
'''
k8sApi = findK8sApi(obj, client)
try:
res = invokeApi(k8sApi, 'create', obj, **kwargs)
logging.debug('K8s: %s created -> uid=%s', describe(obj), res.metadata.uid)
except kubernetes.client.rest.ApiException as apiEx:
if apiEx.reason != 'Conflict': raise
try:
# asking for forgiveness...
res = invokeApi(k8sApi, 'patch', obj, **kwargs)
logging.debug('K8s: %s PATCHED -> uid=%s', describe(obj), res.metadata.uid)
except kubernetes.client.rest.ApiException as apiEx:
if apiEx.reason != 'Unprocessable Entity': raise
try:
# second attempt... delete the existing object and re-insert
logging.debug('K8s: replacing %s FAILED. Attempting deletion and recreation...', describe(obj))
res = invokeApi(k8sApi, 'delete', obj, **kwargs)
logging.debug('K8s: %s DELETED...', describe(obj))
res = invokeApi(k8sApi, 'create', obj, **kwargs)
logging.debug('K8s: %s CREATED -> uid=%s', describe(obj), res.metadata.uid)
except Exception as ex:
message = 'K8s: FAILURE updating %s. Exception: %s' % (describe(obj), ex)
logging.error(message)
raise RuntimeError(message)
return res
def patchObject(obj, client=None, **kwargs):
k8sApi = findK8sApi(obj, client)
try:
res = invokeApi(k8sApi, 'patch', obj, **kwargs)
logging.debug('K8s: %s PATCHED -> uid=%s', describe(obj), res.metadata.uid)
return res
except kubernetes.client.rest.ApiException as apiEx:
if apiEx.reason == 'Unprocessable Entity':
message = 'K8s: patch for %s rejected. Exception: %s' % (describe(obj), apiEx)
logging.error(message)
raise RuntimeError(message)
else:
raise
def deleteObject(obj, client=None, **kwargs):
k8sApi = findK8sApi(obj, client)
try:
res = invokeApi(k8sApi, 'delete', obj, **kwargs)
logging.debug('K8s: %s DELETED. uid was: %s', describe(obj), res.details and res.details.uid or '?')
return True
except kubernetes.client.rest.ApiException as apiEx:
if apiEx.reason == 'Not Found':
logging.warning('K8s: %s does not exist (anymore).', describe(obj))
return False
else:
message = 'K8s: deleting %s FAILED. Exception: %s' % (describe(obj), apiEx)
logging.error(message)
raise RuntimeError(message)
def findK8sApi(obj, client=None):
''' Investigate the object spec and lookup the corresponding API object
@param client: (optional) preconfigured client environment to use for invocation
@return: a client instance wired to the apriopriate API
'''
grp, _, ver = obj['apiVersion'].partition('/')
if ver == '':
ver = grp
grp = 'core'
# Strip 'k8s.io', camel-case-join dot separated parts. rbac.authorization.k8s.io -> RbacAuthorzation
grp = ''.join(part.capitalize() for part in grp.rsplit('.k8s.io', 1)[0].split('.'))
ver = ver.capitalize()
k8sApi = '%s%sApi' % (grp, ver)
return getattr(kubernetes.client, k8sApi)(client)
def invokeApi(k8sApi, action, obj, **args):
''' find a suitalbe function and perform the actual API invocation.
@param k8sApi: client object for the invocation, wired to correct API version
@param action: either 'create' (to inject a new objet) or 'replace','patch','delete'
@param obj: the full object spec to be passed into the API invocation
@param args: (optional) extraneous arguments to pass
@return: response object from Kubernetes API call
'''
# transform ActionType from Yaml into action_type for swagger API
kind = camel2snake(obj['kind'])
# determine namespace to place the object in, supply default
try: namespace = obj['metadata']['namespace']
except: namespace = 'default'
functionName = '%s_%s' %(action,kind)
if hasattr(k8sApi, functionName):
# namespace agnostic API
function = getattr(k8sApi, functionName)
else:
functionName = '%s_namespaced_%s' %(action,kind)
function = getattr(k8sApi, functionName)
args['namespace'] = namespace
if not 'create' in functionName:
args['name'] = obj['metadata']['name']
if 'delete' in functionName:
from kubernetes.client.models.v1_delete_options import V1DeleteOptions
obj = V1DeleteOptions()
return function(body=obj, **args)
def describe(obj):
return "%s '%s'" % (obj['kind'], obj['metadata']['name'])
def camel2snake(string):
string = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', string)
string = re.sub('([a-z0-9])([A-Z])', r'\1_\2', string).lower()
return string
if __name__=='__main__':
runUsageExample()
4条答案
按热度按时间csga3l581#
我检查了
pkg/kubectl/cmd/apply.go
中的代码,我认为以下代码行显示了运行kubectl apply -f
时的幕后操作:下面是代码
helper.Patch
:x4shl7ld2#
这个API的设计并不令人信服,因为它迫使我们在客户端重新实现这些基本的东西...
不管怎样,这是我在Python中重新发明六角轮的尝试...
Python模块库贝应用
用法类似于
kube_apply.fromYaml(myStuff)
文件:
kube_apply.py
pqwbnv8z3#
你可以安装
kubectl
二进制文件,然后从Python程序中调用它。一旦server-side apply准备就绪,这个问题就会变得简单一些,因为实际上您可以访问一个k8s API端点(尽管听起来仍然不像是资源不可知的,即您仍然必须专门使用
PATCH /api/v1/some-k8s-resource
,而使用kubectl apply
,您可以输入一些异构的资源列表)。wn9m85ua4#
2023服务器端应用解决方案
在较新版本的Kubernetes中,您可以使用服务器端应用。此方法的优点是只需要一个API请求,并允许服务器合并资源。
下面是一个API请求示例:
您可以通过提供
--server-side
参数从命令行使kubectl使用SSA。你可以在这里阅读更多。