diff --git a/armada/handlers/armada.py b/armada/handlers/armada.py index 22e6e079..66b20e7b 100644 --- a/armada/handlers/armada.py +++ b/armada/handlers/armada.py @@ -56,7 +56,9 @@ class Armada(object): tiller_port=None, tiller_namespace=None, values=None, - target_manifest=None): + target_manifest=None, + k8s_wait_attempts=1, + k8s_wait_attempt_sleep=1): ''' Initialize the Armada engine and establish a connection to Tiller. @@ -75,6 +77,10 @@ class Armada(object): ``CONF.tiller_namespace``. :param str target_manifest: The target manifest to run. Useful for specifying which manifest to run when multiple are available. + :param int k8s_wait_attempts: The number of times to attempt waiting + for pods to become ready. + :param int k8s_wait_attempt_sleep: The time in seconds to sleep + between attempts. ''' tiller_port = tiller_port or CONF.tiller_port tiller_namespace = tiller_namespace or CONF.tiller_namespace @@ -92,6 +98,8 @@ class Armada(object): self.values = values self.documents = documents self.target_manifest = target_manifest + self.k8s_wait_attempts = k8s_wait_attempts + self.k8s_wait_attempt_sleep = k8s_wait_attempt_sleep self.manifest = self.get_armada_manifest() def get_armada_manifest(self): @@ -353,6 +361,8 @@ class Armada(object): release=prefix_chart, labels=wait_values.get('labels', ''), namespace=chart.namespace, + k8s_wait_attempts=self.k8s_wait_attempts, + k8s_wait_attempt_sleep=self.k8s_wait_attempt_sleep, timeout=wait_values.get('timeout', DEFAULT_TIMEOUT) ) @@ -378,7 +388,10 @@ class Armada(object): release=prefix_chart, labels=wait_values.get('labels', ''), namespace=chart.namespace, - timeout=wait_values.get('timeout', 3600)) + k8s_wait_attempts=self.k8s_wait_attempts, + k8s_wait_attempt_sleep=self.k8s_wait_attempt_sleep, + timeout=wait_values.get('timeout', DEFAULT_TIMEOUT) + ) msg['install'].append(prefix_chart) @@ -396,7 +409,11 @@ class Armada(object): else: LOG.info("FAILED: %s", prefix_chart) - self.tiller.k8s.wait_until_ready(timeout=chart_timeout) + # TODO(MarshM) does this need release/labels/namespace? + self.tiller.k8s.wait_until_ready( + k8s_wait_attempts=self.k8s_wait_attempts, + k8s_wait_attempt_sleep=self.k8s_wait_attempt_sleep, + timeout=chart_timeout) LOG.info("Performing Post-Flight Operations") self.post_flight_ops() diff --git a/armada/handlers/k8s.py b/armada/handlers/k8s.py index 672de56e..428ac1fb 100644 --- a/armada/handlers/k8s.py +++ b/armada/handlers/k8s.py @@ -25,12 +25,8 @@ from oslo_log import log as logging from armada.utils.release import label_selectors from armada.exceptions import k8s_exceptions as exceptions - -LOG = logging.getLogger(__name__) - CONF = cfg.CONF - -READY_PHASES = {'Running', 'Succeeded'} +LOG = logging.getLogger(__name__) class K8s(object): @@ -67,7 +63,8 @@ class K8s(object): name=name, namespace=namespace, body=body, propagation_policy=propagation_policy) except ApiException as e: - LOG.error("Exception when deleting a job: %s", e) + LOG.error("Exception when deleting job: name=%s, namespace=%s: %s", + name, namespace, e) def get_namespace_job(self, namespace="default", label_selector=''): ''' @@ -79,13 +76,15 @@ class K8s(object): return self.batch_api.list_namespaced_job( namespace, label_selector=label_selector) except ApiException as e: - LOG.error("Exception getting a job: %s", e) + LOG.error("Exception getting a job: namespace=%s, label=%s: %s", + namespace, label_selector, e) def create_job_action(self, name, namespace="default"): ''' :params name - name of the job :params namespace - name of pod that job ''' + # TODO(MarshM) this does nothing? LOG.debug(" %s in namespace: %s", name, namespace) def get_namespace_pod(self, namespace="default", label_selector=''): @@ -99,6 +98,7 @@ class K8s(object): return self.client.list_namespaced_pod( namespace, label_selector=label_selector) + # TODO(MarshM) unused? def get_all_pods(self, label_selector=''): ''' :params label_selector - filters Pods by label @@ -210,20 +210,31 @@ class K8s(object): namespace='default', labels='', timeout=300, - sleep=15, - required_successes=3, - inter_success_wait=10): + k8s_wait_attempts=1, + k8s_wait_attempt_sleep=1): ''' - :param release - part of namespace - :param timeout - time before disconnecting stream + Wait until all pods become ready given the filters provided by + ``release``, ``labels`` and ``namespace``. + + :param release: chart release + :param namespace: the namespace used to filter which pods to wait on + :param labels: the labels used to filter which pods to wait on + :param timeout: time before disconnecting ``Watch`` stream + :param k8s_wait_attempts: The number of times to attempt waiting + for pods to become ready (minimum 1). + :param k8s_wait_attempt_sleep: The time in seconds to sleep + between attempts (minimum 1). ''' - label_selector = '' + # NOTE(MarshM) 'release' is currently unused + label_selector = label_selectors(labels) if labels else '' - if labels: - label_selector = label_selectors(labels) + wait_attempts = (k8s_wait_attempts if k8s_wait_attempts >= 1 else 1) + sleep_time = (k8s_wait_attempt_sleep if k8s_wait_attempt_sleep >= 1 + else 1) - LOG.debug("Wait on %s (%s) for %s sec", namespace, label_selector, - timeout) + LOG.debug("Wait on %s (%s) for %s sec (k8s wait %s times, sleep %ss)", + namespace, label_selector, timeout, + wait_attempts, sleep_time) deadline = time.time() + timeout @@ -231,12 +242,13 @@ class K8s(object): # modification, in case new pods appear after our watch exits. successes = 0 - while successes < required_successes: + while successes < wait_attempts: deadline_remaining = int(deadline - time.time()) if deadline_remaining <= 0: return False - timed_out, modified_pods, unready_pods = self.wait_one_time( - label_selector, timeout=deadline_remaining) + timed_out, modified_pods, unready_pods = self._wait_one_time( + namespace=namespace, label_selector=label_selector, + timeout=deadline_remaining) if timed_out: LOG.info('Timed out waiting for pods: %s', unready_pods) @@ -251,17 +263,19 @@ class K8s(object): LOG.debug('Found no modified pods this attempt. successes=%d', successes) - time.sleep(inter_success_wait) + time.sleep(sleep_time) return True - def wait_one_time(self, label_selector='', timeout=100): - LOG.debug('Starting to wait: label_selector=%s, timeout=%s', - label_selector, timeout) + def _wait_one_time(self, namespace, label_selector, timeout=100): + LOG.debug('Starting to wait: namespace=%s, label_selector=%s, ' + 'timeout=%s', namespace, label_selector, timeout) ready_pods = {} modified_pods = set() w = watch.Watch() first_event = True + + # TODO(MarshM) still need to filter pods on namespace for event in w.stream(self.client.list_pod_for_all_namespaces, label_selector=label_selector, timeout_seconds=timeout): @@ -277,33 +291,37 @@ class K8s(object): event_type = event['type'].upper() pod_name = event['object'].metadata.name + LOG.debug('Watch event for pod %s namespace=%s label_selector=%s', + pod_name, namespace, label_selector) if event_type in {'ADDED', 'MODIFIED'}: status = event['object'].status - is_ready = status.phase in READY_PHASES + pod_phase = status.phase - if is_ready: - LOG.debug('Pod %s (%s) is_ready=%s', pod_name, event_type, - is_ready) + pod_ready = True + if (pod_phase == 'Succeeded' or + (pod_phase == 'Running' and + self._get_pod_condition(status.conditions, + 'Ready') == 'True')): + LOG.debug('Pod %s is ready!', pod_name) else: - container_statuses = status.container_statuses - conditions = status.conditions - LOG.debug('Pod %s (%s) is_ready=%s container_statuses=%s ' - 'conditions=%s', pod_name, event_type, is_ready, - container_statuses, conditions) + pod_ready = False + LOG.debug('Pod %s not ready: conditions=[%s] ' + 'container_statuses=[%s] ', pod_name, + status.conditions, status.container_statuses) - ready_pods[pod_name] = is_ready + ready_pods[pod_name] = pod_ready if event_type == 'MODIFIED': modified_pods.add(pod_name) elif event_type == 'DELETED': - LOG.debug('Removing pod %s from tracking', pod_name) + LOG.debug('Pod %s: removed from tracking', pod_name) ready_pods.pop(pod_name) elif event_type == 'ERROR': - LOG.error('Got error event for pod: %s', - event['object'].to_dict()) + LOG.error('Pod %s: Got error event %s', + pod_name, event['object'].to_dict()) raise exceptions.KubernetesErrorEventException( 'Got error event for pod: %s' % event['object']) @@ -321,3 +339,8 @@ class K8s(object): # (unlikely) or in the case of the watch timing out. return (not all(ready_pods.values()), modified_pods, [name for name, ready in ready_pods.items() if not ready]) + + def _get_pod_condition(self, pod_conditions, condition_type): + for pc in pod_conditions: + if pc.type == condition_type: + return pc.status diff --git a/armada/handlers/tiller.py b/armada/handlers/tiller.py index 7a18ba7f..acdc7bd6 100644 --- a/armada/handlers/tiller.py +++ b/armada/handlers/tiller.py @@ -226,6 +226,7 @@ class Tiller(object): action_type = action.get("type") if "job" in action_type: LOG.info("Creating %s in namespace: %s", name, namespace) + # TODO(MarshM) create_job_action does nothing but LOG.debug self.k8s.create_job_action(name, action_type) continue except Exception: @@ -239,6 +240,7 @@ class Tiller(object): action_type = action.get("type") if "job" in action_type: LOG.info("Creating %s in namespace: %s", name, namespace) + # TODO(MarshM) create_job_action does nothing but LOG.debug self.k8s.create_job_action(name, action_type) continue except Exception: