Wait for pods to become ready in a specific namespace.

Also tweaking wait retry/time logic.

Change-Id: I1f92de896e2d4b80ffe744b936bc47816be8b404
This commit is contained in:
Felipe Monteiro 2018-02-21 15:00:59 +00:00 committed by Marshall Margenau
parent 35b426db32
commit 4fc77ddb27
3 changed files with 82 additions and 40 deletions

View File

@ -56,7 +56,9 @@ class Armada(object):
tiller_port=None,
tiller_namespace=None,
values=None,
target_manifest=None):
target_manifest=None,
k8s_wait_attempts=1,
k8s_wait_attempt_sleep=1):
'''
Initialize the Armada engine and establish a connection to Tiller.
@ -75,6 +77,10 @@ class Armada(object):
``CONF.tiller_namespace``.
:param str target_manifest: The target manifest to run. Useful for
specifying which manifest to run when multiple are available.
:param int k8s_wait_attempts: The number of times to attempt waiting
for pods to become ready.
:param int k8s_wait_attempt_sleep: The time in seconds to sleep
between attempts.
'''
tiller_port = tiller_port or CONF.tiller_port
tiller_namespace = tiller_namespace or CONF.tiller_namespace
@ -92,6 +98,8 @@ class Armada(object):
self.values = values
self.documents = documents
self.target_manifest = target_manifest
self.k8s_wait_attempts = k8s_wait_attempts
self.k8s_wait_attempt_sleep = k8s_wait_attempt_sleep
self.manifest = self.get_armada_manifest()
def get_armada_manifest(self):
@ -353,6 +361,8 @@ class Armada(object):
release=prefix_chart,
labels=wait_values.get('labels', ''),
namespace=chart.namespace,
k8s_wait_attempts=self.k8s_wait_attempts,
k8s_wait_attempt_sleep=self.k8s_wait_attempt_sleep,
timeout=wait_values.get('timeout', DEFAULT_TIMEOUT)
)
@ -378,7 +388,10 @@ class Armada(object):
release=prefix_chart,
labels=wait_values.get('labels', ''),
namespace=chart.namespace,
timeout=wait_values.get('timeout', 3600))
k8s_wait_attempts=self.k8s_wait_attempts,
k8s_wait_attempt_sleep=self.k8s_wait_attempt_sleep,
timeout=wait_values.get('timeout', DEFAULT_TIMEOUT)
)
msg['install'].append(prefix_chart)
@ -396,7 +409,11 @@ class Armada(object):
else:
LOG.info("FAILED: %s", prefix_chart)
self.tiller.k8s.wait_until_ready(timeout=chart_timeout)
# TODO(MarshM) does this need release/labels/namespace?
self.tiller.k8s.wait_until_ready(
k8s_wait_attempts=self.k8s_wait_attempts,
k8s_wait_attempt_sleep=self.k8s_wait_attempt_sleep,
timeout=chart_timeout)
LOG.info("Performing Post-Flight Operations")
self.post_flight_ops()

View File

@ -25,12 +25,8 @@ from oslo_log import log as logging
from armada.utils.release import label_selectors
from armada.exceptions import k8s_exceptions as exceptions
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
READY_PHASES = {'Running', 'Succeeded'}
LOG = logging.getLogger(__name__)
class K8s(object):
@ -67,7 +63,8 @@ class K8s(object):
name=name, namespace=namespace, body=body,
propagation_policy=propagation_policy)
except ApiException as e:
LOG.error("Exception when deleting a job: %s", e)
LOG.error("Exception when deleting job: name=%s, namespace=%s: %s",
name, namespace, e)
def get_namespace_job(self, namespace="default", label_selector=''):
'''
@ -79,13 +76,15 @@ class K8s(object):
return self.batch_api.list_namespaced_job(
namespace, label_selector=label_selector)
except ApiException as e:
LOG.error("Exception getting a job: %s", e)
LOG.error("Exception getting a job: namespace=%s, label=%s: %s",
namespace, label_selector, e)
def create_job_action(self, name, namespace="default"):
'''
:params name - name of the job
:params namespace - name of pod that job
'''
# TODO(MarshM) this does nothing?
LOG.debug(" %s in namespace: %s", name, namespace)
def get_namespace_pod(self, namespace="default", label_selector=''):
@ -99,6 +98,7 @@ class K8s(object):
return self.client.list_namespaced_pod(
namespace, label_selector=label_selector)
# TODO(MarshM) unused?
def get_all_pods(self, label_selector=''):
'''
:params label_selector - filters Pods by label
@ -210,20 +210,31 @@ class K8s(object):
namespace='default',
labels='',
timeout=300,
sleep=15,
required_successes=3,
inter_success_wait=10):
k8s_wait_attempts=1,
k8s_wait_attempt_sleep=1):
'''
:param release - part of namespace
:param timeout - time before disconnecting stream
Wait until all pods become ready given the filters provided by
``release``, ``labels`` and ``namespace``.
:param release: chart release
:param namespace: the namespace used to filter which pods to wait on
:param labels: the labels used to filter which pods to wait on
:param timeout: time before disconnecting ``Watch`` stream
:param k8s_wait_attempts: The number of times to attempt waiting
for pods to become ready (minimum 1).
:param k8s_wait_attempt_sleep: The time in seconds to sleep
between attempts (minimum 1).
'''
label_selector = ''
# NOTE(MarshM) 'release' is currently unused
label_selector = label_selectors(labels) if labels else ''
if labels:
label_selector = label_selectors(labels)
wait_attempts = (k8s_wait_attempts if k8s_wait_attempts >= 1 else 1)
sleep_time = (k8s_wait_attempt_sleep if k8s_wait_attempt_sleep >= 1
else 1)
LOG.debug("Wait on %s (%s) for %s sec", namespace, label_selector,
timeout)
LOG.debug("Wait on %s (%s) for %s sec (k8s wait %s times, sleep %ss)",
namespace, label_selector, timeout,
wait_attempts, sleep_time)
deadline = time.time() + timeout
@ -231,12 +242,13 @@ class K8s(object):
# modification, in case new pods appear after our watch exits.
successes = 0
while successes < required_successes:
while successes < wait_attempts:
deadline_remaining = int(deadline - time.time())
if deadline_remaining <= 0:
return False
timed_out, modified_pods, unready_pods = self.wait_one_time(
label_selector, timeout=deadline_remaining)
timed_out, modified_pods, unready_pods = self._wait_one_time(
namespace=namespace, label_selector=label_selector,
timeout=deadline_remaining)
if timed_out:
LOG.info('Timed out waiting for pods: %s', unready_pods)
@ -251,17 +263,19 @@ class K8s(object):
LOG.debug('Found no modified pods this attempt. successes=%d',
successes)
time.sleep(inter_success_wait)
time.sleep(sleep_time)
return True
def wait_one_time(self, label_selector='', timeout=100):
LOG.debug('Starting to wait: label_selector=%s, timeout=%s',
label_selector, timeout)
def _wait_one_time(self, namespace, label_selector, timeout=100):
LOG.debug('Starting to wait: namespace=%s, label_selector=%s, '
'timeout=%s', namespace, label_selector, timeout)
ready_pods = {}
modified_pods = set()
w = watch.Watch()
first_event = True
# TODO(MarshM) still need to filter pods on namespace
for event in w.stream(self.client.list_pod_for_all_namespaces,
label_selector=label_selector,
timeout_seconds=timeout):
@ -277,33 +291,37 @@ class K8s(object):
event_type = event['type'].upper()
pod_name = event['object'].metadata.name
LOG.debug('Watch event for pod %s namespace=%s label_selector=%s',
pod_name, namespace, label_selector)
if event_type in {'ADDED', 'MODIFIED'}:
status = event['object'].status
is_ready = status.phase in READY_PHASES
pod_phase = status.phase
if is_ready:
LOG.debug('Pod %s (%s) is_ready=%s', pod_name, event_type,
is_ready)
pod_ready = True
if (pod_phase == 'Succeeded' or
(pod_phase == 'Running' and
self._get_pod_condition(status.conditions,
'Ready') == 'True')):
LOG.debug('Pod %s is ready!', pod_name)
else:
container_statuses = status.container_statuses
conditions = status.conditions
LOG.debug('Pod %s (%s) is_ready=%s container_statuses=%s '
'conditions=%s', pod_name, event_type, is_ready,
container_statuses, conditions)
pod_ready = False
LOG.debug('Pod %s not ready: conditions=[%s] '
'container_statuses=[%s] ', pod_name,
status.conditions, status.container_statuses)
ready_pods[pod_name] = is_ready
ready_pods[pod_name] = pod_ready
if event_type == 'MODIFIED':
modified_pods.add(pod_name)
elif event_type == 'DELETED':
LOG.debug('Removing pod %s from tracking', pod_name)
LOG.debug('Pod %s: removed from tracking', pod_name)
ready_pods.pop(pod_name)
elif event_type == 'ERROR':
LOG.error('Got error event for pod: %s',
event['object'].to_dict())
LOG.error('Pod %s: Got error event %s',
pod_name, event['object'].to_dict())
raise exceptions.KubernetesErrorEventException(
'Got error event for pod: %s' % event['object'])
@ -321,3 +339,8 @@ class K8s(object):
# (unlikely) or in the case of the watch timing out.
return (not all(ready_pods.values()), modified_pods,
[name for name, ready in ready_pods.items() if not ready])
def _get_pod_condition(self, pod_conditions, condition_type):
for pc in pod_conditions:
if pc.type == condition_type:
return pc.status

View File

@ -226,6 +226,7 @@ class Tiller(object):
action_type = action.get("type")
if "job" in action_type:
LOG.info("Creating %s in namespace: %s", name, namespace)
# TODO(MarshM) create_job_action does nothing but LOG.debug
self.k8s.create_job_action(name, action_type)
continue
except Exception:
@ -239,6 +240,7 @@ class Tiller(object):
action_type = action.get("type")
if "job" in action_type:
LOG.info("Creating %s in namespace: %s", name, namespace)
# TODO(MarshM) create_job_action does nothing but LOG.debug
self.k8s.create_job_action(name, action_type)
continue
except Exception: