From a9d55ab052bb4ee578ee0ef3befa7bb2f4d7d749 Mon Sep 17 00:00:00 2001 From: Sean Eagan Date: Tue, 4 Sep 2018 15:06:52 -0500 Subject: [PATCH] Clean up and refactor wait logic This patchset changes the wait logic as follows: - Move wait logic to own module - Add framework for waiting on arbitrary resource types - Unify pod and job wait logic using above framework - Pass resource_version to k8s watch API for cleaner event tracking - Only sleep for `k8s_wait_attempt_sleep` when successes not met - Update to use k8s apps_v1 API where applicable - Allow passing kwargs to k8s APIs - Logging cleanups This is in preparation for adding wait logic for other types of resources and new wait configurations. Change-Id: I92e12fe5e0dc8e79c5dd5379799623cf3f471082 --- armada/exceptions/armada_exceptions.py | 11 + armada/handlers/armada.py | 30 ++- armada/handlers/k8s.py | 292 +++--------------------- armada/handlers/tiller.py | 19 +- armada/handlers/wait.py | 299 +++++++++++++++++++++++++ 5 files changed, 373 insertions(+), 278 deletions(-) create mode 100644 armada/handlers/wait.py diff --git a/armada/exceptions/armada_exceptions.py b/armada/exceptions/armada_exceptions.py index 074c98a7..c2f996f7 100644 --- a/armada/exceptions/armada_exceptions.py +++ b/armada/exceptions/armada_exceptions.py @@ -66,3 +66,14 @@ class InvalidOverrideValuesYamlException(ArmadaException): 'Armada encountered invalid values.yaml in helm chart: %s' % chart_description) super(InvalidValuesYamlException, self).__init__(self._message) + + +class InvalidWaitTypeException(ArmadaException): + ''' + Exception that occurs when Armada encounters an invalid wait type. + ''' + + def __init__(self, wait_type): + self._message = ( + 'Armada encountered invalid wait type: %s' % wait_type) + super(InvalidWaitTypeException, self).__init__(self._message) diff --git a/armada/handlers/armada.py b/armada/handlers/armada.py index e7b1d85f..ef8fb14a 100644 --- a/armada/handlers/armada.py +++ b/armada/handlers/armada.py @@ -31,6 +31,7 @@ from armada.handlers.override import Override from armada.handlers.release_diff import ReleaseDiff from armada.handlers.test import test_release_for_success from armada.handlers.tiller import Tiller +from armada.handlers.wait import get_wait_for from armada.utils.release import release_prefixer from armada.utils import source @@ -561,13 +562,28 @@ class Armada(object): timeout) return - self.tiller.k8s.wait_until_ready( - release=release_name, - labels=wait_labels, - namespace=namespace, - k8s_wait_attempts=self.k8s_wait_attempts, - k8s_wait_attempt_sleep=self.k8s_wait_attempt_sleep, - timeout=timeout) + LOG.info('Waiting for release=%s', release_name) + + waits = [ + get_wait_for('job', self.tiller.k8s, skip_if_none_found=True), + get_wait_for('pod', self.tiller.k8s) + ] + deadline = time.time() + timeout + deadline_remaining = timeout + for wait in waits: + if deadline_remaining <= 0: + reason = ( + 'Timeout expired waiting for release=%s' % release_name) + LOG.error(reason) + raise armada_exceptions.ArmadaTimeoutException(reason) + + wait.wait( + labels=wait_labels, + namespace=namespace, + k8s_wait_attempts=self.k8s_wait_attempts, + k8s_wait_attempt_sleep=self.k8s_wait_attempt_sleep, + timeout=timeout) + deadline_remaining = int(round(deadline - time.time())) def _test_chart(self, release_name, timeout, cleanup): if self.dry_run: diff --git a/armada/handlers/k8s.py b/armada/handlers/k8s.py index 94642294..0413a2e3 100644 --- a/armada/handlers/k8s.py +++ b/armada/handlers/k8s.py @@ -13,7 +13,6 @@ # limitations under the License. import re -import time from kubernetes import client from kubernetes import config @@ -23,7 +22,6 @@ from oslo_config import cfg from oslo_log import log as logging from armada.const import DEFAULT_K8S_TIMEOUT -from armada.utils.release import label_selectors from armada.exceptions import k8s_exceptions as exceptions CONF = cfg.CONF @@ -48,6 +46,7 @@ class K8s(object): self.batch_api = client.BatchV1Api() self.batch_v1beta1_api = client.BatchV1beta1Api() self.extension_api = client.ExtensionsV1beta1Api() + self.apps_v1_api = client.AppsV1Api() def delete_job_action(self, name, @@ -176,20 +175,19 @@ class K8s(object): object_type_description, name, namespace) raise e - def get_namespace_job(self, namespace="default", label_selector=''): + def get_namespace_job(self, namespace="default", **kwargs): ''' :param label_selector: labels of the jobs :param namespace: namespace of the jobs ''' try: - return self.batch_api.list_namespaced_job( - namespace, label_selector=label_selector) + return self.batch_api.list_namespaced_job(namespace, **kwargs) except ApiException as e: LOG.error("Exception getting jobs: namespace=%s, label=%s: %s", - namespace, label_selector, e) + namespace, kwargs.get('label_selector', ''), e) - def get_namespace_cron_job(self, namespace="default", label_selector=''): + def get_namespace_cron_job(self, namespace="default", **kwargs): ''' :param label_selector: labels of the cron jobs :param namespace: namespace of the cron jobs @@ -197,13 +195,13 @@ class K8s(object): try: return self.batch_v1beta1_api.list_namespaced_cron_job( - namespace, label_selector=label_selector) + namespace, **kwargs) except ApiException as e: LOG.error( "Exception getting cron jobs: namespace=%s, label=%s: %s", - namespace, label_selector, e) + namespace, kwargs.get('label_selector', ''), e) - def get_namespace_pod(self, namespace="default", label_selector=''): + def get_namespace_pod(self, namespace="default", **kwargs): ''' :param namespace: namespace of the Pod :param label_selector: filters Pods by label @@ -211,16 +209,29 @@ class K8s(object): This will return a list of objects req namespace ''' - return self.client.list_namespaced_pod( - namespace, label_selector=label_selector) + return self.client.list_namespaced_pod(namespace, **kwargs) - def get_namespace_daemonset(self, namespace='default', label=''): + def get_namespace_deployment(self, namespace='default', **kwargs): + ''' + :param namespace: namespace of target deamonset + :param labels: specify targeted deployment + ''' + return self.apps_v1_api.list_namespaced_deployment(namespace, **kwargs) + + def get_namespace_stateful_set(self, namespace='default', **kwargs): + ''' + :param namespace: namespace of target stateful set + :param labels: specify targeted stateful set + ''' + return self.apps_v1_api.list_namespaced_stateful_set( + namespace, **kwargs) + + def get_namespace_daemon_set(self, namespace='default', **kwargs): ''' :param namespace: namespace of target deamonset :param labels: specify targeted daemonset ''' - return self.extension_api.list_namespaced_daemon_set( - namespace, label_selector=label) + return self.apps_v1_api.list_namespaced_daemon_set(namespace, **kwargs) def create_daemon_action(self, namespace, template): ''' @@ -229,8 +240,7 @@ class K8s(object): ''' # we might need to load something here - self.extension_api.create_namespaced_daemon_set( - namespace, body=template) + self.apps_v1_api.create_namespaced_daemon_set(namespace, body=template) def delete_daemon_action(self, name, namespace="default", body=None): ''' @@ -242,7 +252,7 @@ class K8s(object): if body is None: body = client.V1DeleteOptions() - return self.extension_api.delete_namespaced_daemon_set( + return self.apps_v1_api.delete_namespaced_daemon_set( name, namespace, body) def wait_for_pod_redeployment(self, old_pod_name, namespace): @@ -293,9 +303,9 @@ class K8s(object): for event in w.stream( self.client.list_pod_for_all_namespaces, timeout_seconds=timeout): - pod_name = event['object'].metadata.name + resource_name = event['object'].metadata.name - if release in pod_name: + if release in resource_name: found_events = True pod_state = event['object'].status.phase if pod_state == 'Succeeded': @@ -305,195 +315,6 @@ class K8s(object): if not found_events: LOG.warn('Saw no test events for release %s', release) - def wait_until_ready(self, - release=None, - namespace='', - labels='', - timeout=DEFAULT_K8S_TIMEOUT, - k8s_wait_attempts=1, - k8s_wait_attempt_sleep=1): - ''' - Wait until all pods become ready given the filters provided by - ``release``, ``labels`` and ``namespace``. - - :param release: chart release - :param namespace: the namespace used to filter which pods to wait on - :param labels: the labels used to filter which pods to wait on - :param timeout: time before disconnecting ``Watch`` stream - :param k8s_wait_attempts: The number of times to attempt waiting - for pods to become ready (minimum 1). - :param k8s_wait_attempt_sleep: The time in seconds to sleep - between attempts (minimum 1). - ''' - timeout = self._check_timeout(timeout) - - # NOTE(MarshM) 'release' is currently unused - label_selector = label_selectors(labels) if labels else '' - - wait_attempts = (k8s_wait_attempts if k8s_wait_attempts >= 1 else 1) - sleep_time = (k8s_wait_attempt_sleep - if k8s_wait_attempt_sleep >= 1 else 1) - - LOG.debug( - "Wait on namespace=(%s) labels=(%s) for %s sec " - "(k8s wait %s times, sleep %ss)", namespace, label_selector, - timeout, wait_attempts, sleep_time) - - if not namespace: - # This shouldn't be reachable - LOG.warn('"namespace" not specified, waiting across all available ' - 'namespaces is likely to cause unintended consequences.') - if not label_selector: - LOG.warn('"label_selector" not specified, waiting with no labels ' - 'may cause unintended consequences.') - - # Track the overall deadline for timing out during waits - deadline = time.time() + timeout - - # First, we should watch for jobs before checking pods, as a job can - # still be running even after its current pods look healthy or have - # been removed and are pending reschedule - found_jobs = self.get_namespace_job(namespace, label_selector) - if len(found_jobs.items): - self._watch_job_completion(namespace, label_selector, timeout) - - # NOTE(mark-burnett): Attempt to wait multiple times without - # modification, in case new pods appear after our watch exits. - - successes = 0 - while successes < wait_attempts: - deadline_remaining = int(round(deadline - time.time())) - if deadline_remaining <= 0: - LOG.info('Timed out while waiting for pods.') - raise exceptions.KubernetesWatchTimeoutException( - 'Timed out while waiting on namespace=(%s) labels=(%s)' % - (namespace, label_selector)) - - timed_out, modified_pods, unready_pods, found_events = ( - self._watch_pod_completions( - namespace=namespace, - label_selector=label_selector, - timeout=deadline_remaining)) - - if not found_events: - LOG.warn( - 'Saw no install/update events for release=%s, ' - 'namespace=%s, labels=(%s). Are the labels correct?', - release, namespace, label_selector) - - if timed_out: - LOG.info('Timed out waiting for pods: %s', - sorted(unready_pods)) - raise exceptions.KubernetesWatchTimeoutException( - 'Timed out while waiting on namespace=(%s) labels=(%s)' % - (namespace, label_selector)) - - if modified_pods: - successes = 0 - LOG.debug('Continuing to wait, found modified pods: %s', - sorted(modified_pods)) - else: - successes += 1 - LOG.debug('Found no modified pods this attempt. successes=%d', - successes) - - time.sleep(sleep_time) - - return True - - def _watch_pod_completions(self, namespace, label_selector, timeout=100): - ''' - Watch and wait for pod completions. - Returns lists of pods in various conditions for the calling function - to handle. - ''' - LOG.debug( - 'Starting to wait on pods: namespace=%s, label_selector=(%s), ' - 'timeout=%s', namespace, label_selector, timeout) - ready_pods = {} - modified_pods = set() - w = watch.Watch() - first_event = True - found_events = False - - # Watch across specific namespace, or all - kwargs = { - 'label_selector': label_selector, - 'timeout_seconds': timeout, - } - if namespace: - func_to_call = self.client.list_namespaced_pod - kwargs['namespace'] = namespace - else: - func_to_call = self.client.list_pod_for_all_namespaces - - for event in w.stream(func_to_call, **kwargs): - if first_event: - pod_list = func_to_call(**kwargs) - for pod in pod_list.items: - LOG.debug('Setting up to wait for pod %s namespace=%s', - pod.metadata.name, pod.metadata.namespace) - ready_pods[pod.metadata.name] = False - first_event = False - - event_type = event['type'].upper() - pod_name = event['object'].metadata.name - LOG.debug('Watch event for pod %s namespace=%s label_selector=%s', - pod_name, namespace, label_selector) - - if event_type in {'ADDED', 'MODIFIED'}: - found_events = True - status = event['object'].status - pod_phase = status.phase - - pod_ready = True - if (pod_phase == 'Succeeded' or - (pod_phase == 'Running' and self._get_pod_condition( - status.conditions, 'Ready') == 'True')): - LOG.debug('Pod %s is ready!', pod_name) - else: - pod_ready = False - LOG.debug( - 'Pod %s not ready: conditions:\n%s\n' - 'container_statuses:\n%s', pod_name, status.conditions, - status.container_statuses) - - ready_pods[pod_name] = pod_ready - - if event_type == 'MODIFIED': - modified_pods.add(pod_name) - - elif event_type == 'DELETED': - LOG.debug('Pod %s: removed from tracking', pod_name) - ready_pods.pop(pod_name) - - elif event_type == 'ERROR': - LOG.error('Pod %s: Got error event %s', pod_name, - event['object'].to_dict()) - raise exceptions.KubernetesErrorEventException( - 'Got error event for pod: %s' % event['object']) - - else: - LOG.error('Unrecognized event type (%s) for pod: %s', - event_type, event['object']) - raise exceptions.KubernetesUnknownStreamingEventTypeException( - 'Got unknown event type (%s) for pod: %s' % - (event_type, event['object'])) - - if all(ready_pods.values()): - return (False, modified_pods, [], found_events) - - # NOTE(mark-burnett): This path is reachable if there are no pods - # (unlikely) or in the case of the watch timing out. - return (not all(ready_pods.values()), modified_pods, - [name for name, ready in ready_pods.items() if not ready], - found_events) - - def _get_pod_condition(self, pod_conditions, condition_type): - for pc in pod_conditions: - if pc.type == condition_type: - return pc.status - def _check_timeout(self, timeout): if timeout <= 0: LOG.warn( @@ -501,56 +322,3 @@ class K8s(object): 'using default %ss.', DEFAULT_K8S_TIMEOUT) timeout = DEFAULT_K8S_TIMEOUT return timeout - - def _watch_job_completion(self, namespace, label_selector, timeout): - ''' - Watch and wait for job completion. - Returns when conditions are met, or raises a timeout exception. - ''' - try: - timeout = self._check_timeout(timeout) - - ready_jobs = {} - w = watch.Watch() - for event in w.stream( - self.batch_api.list_namespaced_job, - namespace=namespace, - label_selector=label_selector, - timeout_seconds=timeout): - - job_name = event['object'].metadata.name - LOG.debug('Watch event %s on job %s', event['type'].upper(), - job_name) - - # Track the expected and actual number of completed pods - # See: https://kubernetes.io/docs/concepts/workloads/controllers/jobs-run-to-completion/ # noqa - expected = event['object'].spec.completions - completed = event['object'].status.succeeded - - if expected != completed: - ready_jobs[job_name] = False - else: - ready_jobs[job_name] = True - LOG.debug( - 'Job %s complete (spec.completions=%s, ' - 'status.succeeded=%s)', job_name, expected, completed) - - if all(ready_jobs.values()): - return True - - except ApiException as e: - LOG.exception( - "Exception when watching jobs: namespace=%s, labels=(%s)", - namespace, label_selector) - raise e - - if not ready_jobs: - LOG.warn( - 'Saw no job events for namespace=%s, labels=(%s). ' - 'Are the labels correct?', namespace, label_selector) - return False - - err_msg = ('Reached timeout while waiting for job completions: ' - 'namespace=%s, labels=(%s)' % (namespace, label_selector)) - LOG.error(err_msg) - raise exceptions.KubernetesWatchTimeoutException(err_msg) diff --git a/armada/handlers/tiller.py b/armada/handlers/tiller.py index 4342af13..3526aaf2 100644 --- a/armada/handlers/tiller.py +++ b/armada/handlers/tiller.py @@ -137,8 +137,8 @@ class Tiller(object): ''' pods = None namespace = self._get_tiller_namespace() - pods = self.k8s.get_namespace_pod(namespace, - CONF.tiller_pod_labels).items + pods = self.k8s.get_namespace_pod( + namespace, label_selector=CONF.tiller_pod_labels).items # No Tiller pods found if not pods: raise ex.TillerPodNotFoundException(CONF.tiller_pod_labels) @@ -581,7 +581,8 @@ class Tiller(object): handled = False if resource_type == 'job': - get_jobs = self.k8s.get_namespace_job(namespace, label_selector) + get_jobs = self.k8s.get_namespace_job( + namespace, label_selector=label_selector) for jb in get_jobs.items: jb_name = jb.metadata.name @@ -598,8 +599,8 @@ class Tiller(object): handled = True if resource_type == 'cronjob' or resource_type == 'job': - get_jobs = self.k8s.get_namespace_cron_job(namespace, - label_selector) + get_jobs = self.k8s.get_namespace_cron_job( + namespace, label_selector=label_selector) for jb in get_jobs.items: jb_name = jb.metadata.name @@ -622,8 +623,8 @@ class Tiller(object): handled = True if resource_type == 'pod': - release_pods = self.k8s.get_namespace_pod(namespace, - label_selector) + release_pods = self.k8s.get_namespace_pod( + namespace, label_selector=label_selector) for pod in release_pods.items: pod_name = pod.metadata.name @@ -668,8 +669,8 @@ class Tiller(object): if resource_labels is not None: label_selector = label_selectors(resource_labels) - get_daemonset = self.k8s.get_namespace_daemonset( - namespace=namespace, label=label_selector) + get_daemonset = self.k8s.get_namespace_daemon_set( + namespace, label_selector=label_selector) for ds in get_daemonset.items: ds_name = ds.metadata.name diff --git a/armada/handlers/wait.py b/armada/handlers/wait.py new file mode 100644 index 00000000..a5207634 --- /dev/null +++ b/armada/handlers/wait.py @@ -0,0 +1,299 @@ +# Copyright 2018 The Armada Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import ABC, abstractmethod +import time + +from oslo_log import log as logging + +from armada.utils.release import label_selectors +from armada.const import DEFAULT_K8S_TIMEOUT +from armada.exceptions import k8s_exceptions +from armada.exceptions import armada_exceptions +from kubernetes import watch + +LOG = logging.getLogger(__name__) + +ROLLING_UPDATE_STRATEGY_TYPE = 'RollingUpdate' + + +def get_wait_for(resource_type, k8s, **kwargs): + + if resource_type == 'pod': + return PodWait(resource_type, k8s, **kwargs) + elif resource_type == 'job': + return JobWait(resource_type, k8s, **kwargs) + + # TODO: Also validate this up front in armada validate flow. + raise armada_exceptions.InvalidWaitTypeException(resource_type) + + +class Wait(ABC): + + def __init__(self, + resource_type, + k8s, + get_resources, + skip_if_none_found=False): + self.resource_type = resource_type + self.k8s = k8s + self.get_resources = get_resources + self.skip_if_none_found = skip_if_none_found + + @abstractmethod + def is_resource_ready(self, resource): + ''' + :param resource: resource to check readiness of. + :returns: 3-tuple of (status message, ready bool, error message). + ''' + pass + + def handle_resource(self, resource): + resource_name = resource.metadata.name + + message, resource_ready, err = self.is_resource_ready(resource) + + if err: + # TODO: Handle error + pass + elif resource_ready: + LOG.debug('Resource %s is ready!', resource_name) + else: + LOG.debug('Resource %s not ready: %s', resource_name, message) + + return resource_ready + + def wait(self, + labels, + namespace, + timeout=DEFAULT_K8S_TIMEOUT, + k8s_wait_attempts=1, + k8s_wait_attempt_sleep=1): + ''' + Wait until all resources become ready given the filters provided by + ``labels`` and ``namespace``. + + :param namespace: namespace of resources to wait on + :param labels: labels of resources to wait on + :param timeout: time before disconnecting ``Watch`` stream + :param k8s_wait_attempts: number of times to attempt waiting + for resources to become ready (minimum 1). + :param k8s_wait_attempt_sleep: time in seconds to sleep + between attempts (minimum 1). + ''' + + label_selector = label_selectors(labels) if labels else '' + + wait_attempts = (k8s_wait_attempts if k8s_wait_attempts >= 1 else 1) + sleep_time = (k8s_wait_attempt_sleep + if k8s_wait_attempt_sleep >= 1 else 1) + + LOG.info( + "Waiting for resource type=%s, namespace=%s labels=%s for %ss " + "(k8s wait %s times, sleep %ss)", self.resource_type, namespace, + label_selector, timeout, wait_attempts, sleep_time) + + if not label_selector: + LOG.warn('"label_selector" not specified, waiting with no labels ' + 'may cause unintended consequences.') + + # Track the overall deadline for timing out during waits + deadline = time.time() + timeout + + # NOTE(mark-burnett): Attempt to wait multiple times without + # modification, in case new resources appear after our watch exits. + + successes = 0 + while True: + deadline_remaining = int(round(deadline - time.time())) + if deadline_remaining <= 0: + LOG.info('Timed out while waiting for resources.') + raise k8s_exceptions.KubernetesWatchTimeoutException( + 'Timed out while waiting on namespace=(%s) labels=(%s)' % + (namespace, label_selector)) + + timed_out, modified, unready, found_resources = ( + self._watch_resource_completions( + namespace=namespace, + label_selector=label_selector, + timeout=deadline_remaining)) + + if not found_resources: + if self.skip_if_none_found: + return + else: + LOG.warn( + 'Saw no resources for ' + 'resource type=%s, namespace=%s, labels=(%s). Are the ' + 'labels correct?', self.resource_type, namespace, + label_selector) + + # TODO(seaneagan): Should probably fail here even when resources + # were not found, at least once we have an option to ignore + # wait timeouts. + if timed_out and found_resources: + LOG.info('Timed out waiting for resources: %s', + sorted(unready)) + raise k8s_exceptions.KubernetesWatchTimeoutException( + 'Timed out while waiting on namespace=(%s) labels=(%s)' % + (namespace, label_selector)) + + if modified: + successes = 0 + LOG.debug('Found modified resources: %s', sorted(modified)) + else: + successes += 1 + LOG.debug('Found no modified resources.') + + if successes >= wait_attempts: + break + + LOG.debug( + 'Continuing to wait: {} consecutive attempts without ' + 'modified resources of {} required.', successes, wait_attempts) + + time.sleep(sleep_time) + + def _watch_resource_completions(self, + namespace, + label_selector, + timeout=100): + ''' + Watch and wait for resource completions. + Returns lists of resources in various conditions for the calling + function to handle. + ''' + LOG.debug( + 'Starting to wait on: namespace=%s, resource type=%s, ' + 'label_selector=(%s), timeout=%s', namespace, self.resource_type, + label_selector, timeout) + ready = {} + modified = set() + found_resources = False + + kwargs = { + 'namespace': namespace, + 'label_selector': label_selector, + 'timeout_seconds': timeout + } + + resource_list = self.get_resources(**kwargs) + for resource in resource_list.items: + ready[resource.metadata.name] = self.handle_resource(resource) + if not resource_list.items: + if self.skip_if_none_found: + msg = 'Skipping wait, no %s resources found.' + LOG.debug(msg, self.resource_type) + return (False, modified, [], found_resources) + else: + found_resources = True + if all(ready.values()): + return (False, modified, [], found_resources) + + # Only watch new events. + kwargs['resource_version'] = resource_list.metadata.resource_version + + w = watch.Watch() + for event in w.stream(self.get_resources, **kwargs): + event_type = event['type'].upper() + resource = event['object'] + resource_name = resource.metadata.name + resource_version = resource.metadata.resource_version + msg = ('Watch event: type=%s, name=%s, namespace=%s,' + 'resource_version=%s') + LOG.debug(msg, event_type, resource_name, namespace, + resource_version) + + if event_type in {'ADDED', 'MODIFIED'}: + found_resources = True + resource_ready = self.handle_resource(resource) + ready[resource_name] = resource_ready + + if event_type == 'MODIFIED': + modified.add(resource_name) + + elif event_type == 'DELETED': + LOG.debug('Resource %s: removed from tracking', resource_name) + ready.pop(resource_name) + + elif event_type == 'ERROR': + LOG.error('Resource %s: Got error event %s', resource_name, + event['object'].to_dict()) + raise k8s_exceptions.KubernetesErrorEventException( + 'Got error event for resource: %s' % event['object']) + + else: + LOG.error('Unrecognized event type (%s) for resource: %s', + event_type, event['object']) + raise (k8s_exceptions. + KubernetesUnknownStreamingEventTypeException( + 'Got unknown event type (%s) for resource: %s' % + (event_type, event['object']))) + + if all(ready.values()): + return (False, modified, [], found_resources) + + return (True, modified, + [name for name, is_ready in ready.items() if not is_ready], + found_resources) + + def _get_resource_condition(self, resource_conditions, condition_type): + for pc in resource_conditions: + if pc.type == condition_type: + return pc + + +class PodWait(Wait): + + def __init__(self, resource_type, k8s, **kwargs): + super(PodWait, self).__init__(resource_type, k8s, + k8s.client.list_namespaced_pod, **kwargs) + + def is_resource_ready(self, resource): + pod = resource + name = pod.metadata.name + + status = pod.status + phase = status.phase + + if phase == 'Succeeded': + return ("Pod {} succeeded\n".format(name), True, None) + + if phase == 'Running': + cond = self._get_resource_condition(status.conditions, 'Ready') + if cond and cond.status == 'True': + return ("Pod {} ready\n".format(name), True, None) + + msg = "Waiting for pod {} to be ready...\n" + return (msg.format(name), False, None) + + +class JobWait(Wait): + + def __init__(self, resource_type, k8s, **kwargs): + super(JobWait, self).__init__( + resource_type, k8s, k8s.batch_api.list_namespaced_job, **kwargs) + + def is_resource_ready(self, resource): + job = resource + name = job.metadata.name + + expected = job.spec.completions + completed = job.status.succeeded + + if expected != completed: + msg = "Waiting for job {} to be successfully completed...\n" + return (msg.format(name), False, None) + msg = "job {} successfully completed\n" + return (msg.format(name), True, None)