diff --git a/armada/handlers/k8s.py b/armada/handlers/k8s.py index fa82167f..94642294 100644 --- a/armada/handlers/k8s.py +++ b/armada/handlers/k8s.py @@ -55,15 +55,17 @@ class K8s(object): propagation_policy='Foreground', timeout=DEFAULT_K8S_TIMEOUT): ''' + Delete a job from a namespace (see _delete_item_action). + :param name: name of job - :param namespace: namespace of job + :param namespace: namespace :param propagation_policy: The Kubernetes propagation_policy to apply - to the delete. Default 'Foreground' means that child pods to the - job will be deleted before the job is marked as deleted. + to the delete. + :param timeout: The timeout to wait for the delete to complete ''' - self._delete_job_action(self.batch_api.list_namespaced_job, - self.batch_api.delete_namespaced_job, "job", - name, namespace, propagation_policy, timeout) + self._delete_item_action(self.batch_api.list_namespaced_job, + self.batch_api.delete_namespaced_job, "job", + name, namespace, propagation_policy, timeout) def delete_cron_job_action(self, name, @@ -71,30 +73,69 @@ class K8s(object): propagation_policy='Foreground', timeout=DEFAULT_K8S_TIMEOUT): ''' + Delete a cron job from a namespace (see _delete_item_action). + :param name: name of cron job - :param namespace: namespace of cron job + :param namespace: namespace :param propagation_policy: The Kubernetes propagation_policy to apply - to the delete. Default 'Foreground' means that child pods of the - cron job will be deleted before the cron job is marked as deleted. + to the delete. + :param timeout: The timeout to wait for the delete to complete ''' - self._delete_job_action( + self._delete_item_action( self.batch_v1beta1_api.list_namespaced_cron_job, self.batch_v1beta1_api.delete_namespaced_cron_job, "cron job", name, namespace, propagation_policy, timeout) - def _delete_job_action(self, - list_func, - delete_func, - job_type_description, - name, - namespace="default", - propagation_policy='Foreground', - timeout=DEFAULT_K8S_TIMEOUT): + def delete_pod_action(self, + name, + namespace="default", + propagation_policy='Foreground', + timeout=DEFAULT_K8S_TIMEOUT): + ''' + Delete a pod from a namespace (see _delete_item_action). + + :param name: name of pod + :param namespace: namespace + :param propagation_policy: The Kubernetes propagation_policy to apply + to the delete. + :param timeout: The timeout to wait for the delete to complete + ''' + self._delete_item_action(self.client.list_namespaced_pod, + self.client.delete_namespaced_pod, "pod", + name, namespace, propagation_policy, timeout) + + def _delete_item_action(self, + list_func, + delete_func, + object_type_description, + name, + namespace="default", + propagation_policy='Foreground', + timeout=DEFAULT_K8S_TIMEOUT): + ''' + This function takes the action to delete an object (job, cronjob, pod) + from kubernetes. It will wait for the object to be fully deleted before + returning to processing or timing out. + + :param list_func: The callback function to list the specified object + type + :param delete_func: The callback function to delete the specified + object type + :param object_type_description: The types of objects to delete, + in `job`, `cronjob`, or `pod` + :param name: The name of the object to delete + :param namespace: The namespace of the object + :param propagation_policy: The Kubernetes propagation_policy to apply + to the delete. Default 'Foreground' means that child objects + will be deleted before the given object is marked as deleted. + See: https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/#controlling-how-the-garbage-collector-deletes-dependents # noqa + :param timeout: The timeout to wait for the delete to complete + ''' try: timeout = self._check_timeout(timeout) LOG.debug('Watching to delete %s %s, Wait timeout=%s', - job_type_description, name, timeout) + object_type_description, name, timeout) body = client.V1DeleteOptions() w = watch.Watch() issue_delete = True @@ -110,29 +151,29 @@ class K8s(object): issue_delete = False event_type = event['type'].upper() - job_name = event['object'].metadata.name - LOG.debug('Watch event %s on %s', event_type, job_name) + item_name = event['object'].metadata.name + LOG.debug('Watch event %s on %s', event_type, item_name) - if job_name == name: + if item_name == name: found_events = True if event_type == 'DELETED': LOG.info('Successfully deleted %s %s', - job_type_description, job_name) + object_type_description, item_name) return if not found_events: LOG.warn('Saw no delete events for %s %s in namespace=%s', - job_type_description, name, namespace) + object_type_description, name, namespace) err_msg = ('Reached timeout while waiting to delete %s: ' - 'name=%s, namespace=%s' % (job_type_description, name, - namespace)) + 'name=%s, namespace=%s' % (object_type_description, + name, namespace)) LOG.error(err_msg) raise exceptions.KubernetesWatchTimeoutException(err_msg) except ApiException as e: LOG.exception("Exception when deleting %s: name=%s, namespace=%s", - job_type_description, name, namespace) + object_type_description, name, namespace) raise e def get_namespace_job(self, namespace="default", label_selector=''): @@ -204,19 +245,6 @@ class K8s(object): return self.extension_api.delete_namespaced_daemon_set( name, namespace, body) - def delete_namespace_pod(self, name, namespace="default", body=None): - ''' - :param name: name of the Pod - :param namespace: namespace of the Pod - :param body: V1DeleteOptions - - Deletes pod by name and returns V1Status object - ''' - if body is None: - body = client.V1DeleteOptions() - - return self.client.delete_namespaced_pod(name, namespace, body) - def wait_for_pod_redeployment(self, old_pod_name, namespace): ''' :param old_pod_name: name of pods @@ -319,8 +347,16 @@ class K8s(object): LOG.warn('"label_selector" not specified, waiting with no labels ' 'may cause unintended consequences.') + # Track the overall deadline for timing out during waits deadline = time.time() + timeout + # First, we should watch for jobs before checking pods, as a job can + # still be running even after its current pods look healthy or have + # been removed and are pending reschedule + found_jobs = self.get_namespace_job(namespace, label_selector) + if len(found_jobs.items): + self._watch_job_completion(namespace, label_selector, timeout) + # NOTE(mark-burnett): Attempt to wait multiple times without # modification, in case new pods appear after our watch exits. @@ -328,9 +364,13 @@ class K8s(object): while successes < wait_attempts: deadline_remaining = int(round(deadline - time.time())) if deadline_remaining <= 0: - return False + LOG.info('Timed out while waiting for pods.') + raise exceptions.KubernetesWatchTimeoutException( + 'Timed out while waiting on namespace=(%s) labels=(%s)' % + (namespace, label_selector)) + timed_out, modified_pods, unready_pods, found_events = ( - self._wait_one_time( + self._watch_pod_completions( namespace=namespace, label_selector=label_selector, timeout=deadline_remaining)) @@ -338,8 +378,8 @@ class K8s(object): if not found_events: LOG.warn( 'Saw no install/update events for release=%s, ' - 'namespace=%s, labels=(%s)', release, namespace, - label_selector) + 'namespace=%s, labels=(%s). Are the labels correct?', + release, namespace, label_selector) if timed_out: LOG.info('Timed out waiting for pods: %s', @@ -347,7 +387,6 @@ class K8s(object): raise exceptions.KubernetesWatchTimeoutException( 'Timed out while waiting on namespace=(%s) labels=(%s)' % (namespace, label_selector)) - return False if modified_pods: successes = 0 @@ -362,9 +401,14 @@ class K8s(object): return True - def _wait_one_time(self, namespace, label_selector, timeout=100): + def _watch_pod_completions(self, namespace, label_selector, timeout=100): + ''' + Watch and wait for pod completions. + Returns lists of pods in various conditions for the calling function + to handle. + ''' LOG.debug( - 'Starting to wait: namespace=%s, label_selector=(%s), ' + 'Starting to wait on pods: namespace=%s, label_selector=(%s), ' 'timeout=%s', namespace, label_selector, timeout) ready_pods = {} modified_pods = set() @@ -457,3 +501,56 @@ class K8s(object): 'using default %ss.', DEFAULT_K8S_TIMEOUT) timeout = DEFAULT_K8S_TIMEOUT return timeout + + def _watch_job_completion(self, namespace, label_selector, timeout): + ''' + Watch and wait for job completion. + Returns when conditions are met, or raises a timeout exception. + ''' + try: + timeout = self._check_timeout(timeout) + + ready_jobs = {} + w = watch.Watch() + for event in w.stream( + self.batch_api.list_namespaced_job, + namespace=namespace, + label_selector=label_selector, + timeout_seconds=timeout): + + job_name = event['object'].metadata.name + LOG.debug('Watch event %s on job %s', event['type'].upper(), + job_name) + + # Track the expected and actual number of completed pods + # See: https://kubernetes.io/docs/concepts/workloads/controllers/jobs-run-to-completion/ # noqa + expected = event['object'].spec.completions + completed = event['object'].status.succeeded + + if expected != completed: + ready_jobs[job_name] = False + else: + ready_jobs[job_name] = True + LOG.debug( + 'Job %s complete (spec.completions=%s, ' + 'status.succeeded=%s)', job_name, expected, completed) + + if all(ready_jobs.values()): + return True + + except ApiException as e: + LOG.exception( + "Exception when watching jobs: namespace=%s, labels=(%s)", + namespace, label_selector) + raise e + + if not ready_jobs: + LOG.warn( + 'Saw no job events for namespace=%s, labels=(%s). ' + 'Are the labels correct?', namespace, label_selector) + return False + + err_msg = ('Reached timeout while waiting for job completions: ' + 'namespace=%s, labels=(%s)' % (namespace, label_selector)) + LOG.error(err_msg) + raise exceptions.KubernetesWatchTimeoutException(err_msg) diff --git a/armada/handlers/tiller.py b/armada/handlers/tiller.py index 4391c082..cf5fe811 100644 --- a/armada/handlers/tiller.py +++ b/armada/handlers/tiller.py @@ -265,8 +265,13 @@ class Tiller(object): action_type = action.get('type') labels = action.get('labels', None) - self.delete_resources(release_name, name, action_type, labels, - namespace, timeout) + self.delete_resources( + release_name, + name, + action_type, + labels, + namespace, + timeout=timeout) except Exception: LOG.warn("PRE: Could not delete anything, please check yaml") raise ex.PreUpdateJobDeleteException(name, namespace) @@ -631,7 +636,7 @@ class Tiller(object): LOG.info("Deleting pod %s in namespace: %s", pod_name, namespace) - self.k8s.delete_namespace_pod(pod_name, namespace) + self.k8s.delete_pod_action(pod_name, namespace) if wait: self.k8s.wait_for_pod_redeployment(pod_name, namespace) handled = True