Harden 'get_k8s_pod_port_ip' Operator

We note that 'list_pod_for_all_namespaces' can return results
in any order. This means that we can end up with inconsistent
results if we have search string that matches the partial names
of 2 or more pods.

This is especially true when the 'get_k8s_pod_port_ip' Operator
only selects the first pod with the name that contains the
search string. Hence there is a need to update the Operator to
avoid getting into such situation.

We will also require namespace to be passed in to avoid any
ambiguity.

Change-Id: I6b93d85ea25b5a30d8a115d78bfa4a51198d6bcb
This commit is contained in:
Anthony Lin 2018-03-08 07:13:33 +00:00
parent 58b1936178
commit 6b5d76eece
2 changed files with 32 additions and 17 deletions

View File

@ -183,8 +183,8 @@ class ArmadaOperator(BaseOperator):
# Return Armada client for XCOM Usage
return a_client
@get_pod_port_ip('tiller')
def get_tiller_info(self, context, *args):
@get_pod_port_ip('tiller', namespace='kube-system')
def get_tiller_info(self, context, *args, **kwargs):
# Initialize Variable
query = {}

View File

@ -1,4 +1,4 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -19,14 +19,15 @@ from airflow.exceptions import AirflowException
from kubernetes import client, config
def get_pod_port_ip(*pods):
def get_pod_port_ip(*pods, namespace):
def get_k8s_pod_port_ip(func):
@wraps(func)
def k8s_pod_port_ip_get(self, context, *args):
def k8s_pod_port_ip_get(self, context, *args, **kwargs):
"""This function retrieves Kubernetes Pod Port and IP
information. It can be used in different areas. For
instance, we can use this to retrieve the tiller pod
IP and port information for usage in the Armada Operator
information. It can be used to retrieve information of
single pod deployment and/or statefulsets. For instance,
it can be used to retrieve the tiller pod IP and port
information for usage in the Armada Operator.
:param context: Information on the current workflow
@ -34,16 +35,13 @@ def get_pod_port_ip(*pods):
from get_k8s_pod_port_ip import get_pod_port_ip
@get_pod_port_ip('tiller', 'drydock')
def get_pod_info(self, context, *args):
@get_pod_port_ip('tiller', namespace='kube-system')
def get_pod_info(self, context, *args, **kwargs):
# Get IP and port information of Pods from context
k8s_pods_ip_port = context['pods_ip_port']
tiller_ip = k8s_pods_ip_port['tiller'].get('ip')
drydock_ip = k8s_pods_ip_port['drydock'].get('ip')
tiller_port = k8s_pods_ip_port['tiller'].get('port')
drydock_port = k8s_pods_ip_port['drydock'].get('port')
"""
# Initialize variable
k8s_pods = {}
@ -55,12 +53,18 @@ def get_pod_port_ip(*pods):
pod_attr = {}
pod_attr[pod_name] = {}
# Initialize/Reset counter
count = 0
# Make use of kubernetes client to retrieve pod IP
# and port information
# Note that we should use 'in_cluster_config'
# Note that we will only search for pods in the namespace
# that was specified in the request
config.load_incluster_config()
v1 = client.CoreV1Api()
ret = v1.list_pod_for_all_namespaces(watch=False)
ret = v1.list_namespaced_pod(namespace=namespace,
watch=False)
# Loop through items to extract port and IP information
# of the pod
@ -89,12 +93,23 @@ def get_pod_port_ip(*pods):
pod_attr[pod_name]['port'])
except:
pod_attr[pod_name]['port'] = 'None'
logging.info("%s Port is None", pod_name)
logging.warning("%s Port is None", pod_name)
# Update k8s_pods with new entry
k8s_pods.update(pod_attr)
break
# It is possible for different pods to have the same
# partial names. This means that we can end up with
# inconsistent results depending on how the pods were
# ordered in the results for 'list_namespaced_pod'.
# Hence an exception should be raised when the function
# returns results for 2 or more pods.
if count > 0:
raise AirflowException(
"Pod search string is not unique!")
# Step counter
count += 1
# Raise Execptions if the pod does not exits in the
# Kubernetes cluster
@ -104,7 +119,7 @@ def get_pod_port_ip(*pods):
# Assign pods IP and ports information to context
context['pods_ip_port'] = k8s_pods
return func(self, context, *args)
return func(self, context, *args, **kwargs)
return k8s_pod_port_ip_get
return get_k8s_pod_port_ip