diff --git a/armada/handlers/armada.py b/armada/handlers/armada.py index 4dfa6c5e..6365ae41 100644 --- a/armada/handlers/armada.py +++ b/armada/handlers/armada.py @@ -13,6 +13,7 @@ # limitations under the License. from concurrent.futures import ThreadPoolExecutor, as_completed +import time from oslo_config import cfg from oslo_log import log as logging @@ -180,10 +181,31 @@ class Armada(object): for chart in cg_charts } + while True: + LOG.debug("Thread stats - num of threads: %s", + len(future_to_chart)) + stats = [] + for fut, chrt in future_to_chart.items(): + LOG.debug("chart: %s R:%s D:%s C:%s", + chrt['metadata']['name'], + fut.running(), fut.done(), + fut.cancelled()) + if fut.done() or fut.cancelled(): + stats.append(True) + else: + stats.append(False) + + if all(stats): + LOG.debug("Looks like all charts finished") + break + time.sleep(10) + for future in as_completed(future_to_chart): chart = future_to_chart[future] + LOG.debug("Yielded %s", chart['metadata']['name']) handle_result(chart, future.result) + LOG.debug("Thread pool part finished") if failures: LOG.error('Chart deploy(s) failed: %s', failures) raise armada_exceptions.ChartDeployException(failures) diff --git a/armada/handlers/chart_deploy.py b/armada/handlers/chart_deploy.py index b0303616..0520a02b 100644 --- a/armada/handlers/chart_deploy.py +++ b/armada/handlers/chart_deploy.py @@ -129,7 +129,7 @@ class ChartDeploy(object): if not self.disable_update_post and upgrade_post: LOG.warning( - 'Post upgrade actions are ignored by Armada' + 'Post upgrade actions are ignored by Armada ' 'and will not affect deployment.') LOG.info('Checking for updates to chart release inputs.') @@ -160,7 +160,7 @@ class ChartDeploy(object): timeout=timer, force=force) - LOG.info('Upgrade completed') + LOG.info('Upgrade completed for release %s', release_id) result['upgrade'] = release_id deploy = upgrade @@ -236,7 +236,9 @@ class ChartDeploy(object): with metrics.CHART_DEPLOY.get_context(wait_timeout, manifest_name, chart_name, action.get_label_value()): + LOG.info('Calling deploy fn for release=%s', release_id) deploy() + LOG.info('Deploy fn called for release=%s', release_id) # Wait timer = int(round(deadline - time.time())) @@ -251,11 +253,15 @@ class ChartDeploy(object): run_test = test_handler.test_enabled and ( just_deployed or not last_test_passed) + LOG.info('Test handler created for release=%s', release_id) if run_test: with metrics.CHART_TEST.get_context(test_handler.timeout, manifest_name, chart_name): + LOG.info('Running tests for release=%s', release_id) self._test_chart(test_handler) + LOG.info('Tests completed for release=%s', release_id) + LOG.info('Exec function completed for release=%s', release_id) return result def purge_release( diff --git a/armada/handlers/helm.py b/armada/handlers/helm.py index 77661803..013733e2 100644 --- a/armada/handlers/helm.py +++ b/armada/handlers/helm.py @@ -61,8 +61,10 @@ class Helm(object): stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout) except subprocess.CalledProcessError as e: + LOG.info('Exception command=%s', command) raise HelmCommandException(e) + LOG.info('Command finished=%s', command) if json: return JSON.loads(result.stdout) return result.stdout diff --git a/armada/handlers/k8s.py b/armada/handlers/k8s.py index f9317176..80f9cb6e 100644 --- a/armada/handlers/k8s.py +++ b/armada/handlers/k8s.py @@ -21,6 +21,8 @@ from kubernetes import watch from kubernetes.client.rest import ApiException from oslo_config import cfg from oslo_log import log as logging +from retry import retry +import urllib3.exceptions from armada.const import DEFAULT_K8S_TIMEOUT from armada.exceptions import k8s_exceptions as exceptions @@ -118,6 +120,11 @@ class K8s(object): self.client.list_namespaced_pod, self.client.delete_namespaced_pod, "pod", name, namespace, propagation_policy, timeout) + @retry( + exceptions=( + urllib3.exceptions.ProtocolError, + urllib3.exceptions.MaxRetryError), + delay=1, logger=LOG) def _delete_item_action( self, list_func, @@ -237,12 +244,15 @@ class K8s(object): This will return a list of objects req namespace ''' - - return self.client.list_namespaced_pod(namespace, **kwargs) + LOG.debug("Get namespace pod called with=%s", kwargs) + res = self.client.list_namespaced_pod(namespace, **kwargs) + LOG.debug("Get namespace pod 1 result with=%s , result=%s", + kwargs, res) + return res def get_namespace_deployment(self, namespace='default', **kwargs): ''' - :param namespace: namespace of target deamonset + :param namespace: namespace of target daemonset :param labels: specify targeted deployment ''' return self.apps_v1_api.list_namespaced_deployment(namespace, **kwargs) @@ -309,6 +319,16 @@ class K8s(object): pod_base_name = base_pod_pattern.match(old_pod_name).group(1) + self._wait_for_pod_redeployment( + namespace, base_pod_pattern, pod_base_name) + + @retry( + exceptions=( + urllib3.exceptions.ProtocolError, + urllib3.exceptions.MaxRetryError), + delay=1, logger=LOG) + def _wait_for_pod_redeployment( + self, namespace, base_pod_pattern, pod_base_name): new_pod_name = '' w = watch.Watch() @@ -329,30 +349,6 @@ class K8s(object): LOG.info('New pod %s deployed', new_pod_name) w.stop() - def wait_get_completed_podphase( - self, release, timeout=DEFAULT_K8S_TIMEOUT): - ''' - :param release: part of namespace - :param timeout: time before disconnecting stream - ''' - timeout = self._check_timeout(timeout) - - w = watch.Watch() - found_events = False - for event in w.stream(self.client.list_pod_for_all_namespaces, - timeout_seconds=timeout): - resource_name = event['object'].metadata.name - - if release in resource_name: - found_events = True - pod_state = event['object'].status.phase - if pod_state == 'Succeeded': - w.stop() - break - - if not found_events: - LOG.warn('Saw no test events for release %s', release) - def _check_timeout(self, timeout): if timeout <= 0: LOG.warn( @@ -434,5 +430,20 @@ class K8s(object): :return: k8s client response :rtype: object """ + try: + return self._replace_custom_resource( + group, version, namespace, plural, name, body) + except Exception as e: + LOG.error( + "Exception caught while trying to replace the resource %s", e) + raise e + + @retry( + exceptions=( + urllib3.exceptions.ProtocolError, + urllib3.exceptions.MaxRetryError), + delay=1, logger=LOG) + def _replace_custom_resource( + self, group, version, namespace, plural, name, body): return self.custom_objects.replace_namespaced_custom_object( group, version, namespace, plural, name, body) diff --git a/armada/handlers/lock.py b/armada/handlers/lock.py index ee084856..5cffb2ef 100644 --- a/armada/handlers/lock.py +++ b/armada/handlers/lock.py @@ -175,6 +175,7 @@ class Lock: def update_lock(self): LOG.debug("Updating lock") self.lock_config.replace_lock() + LOG.debug("Lock updated") def __enter__(self): self.acquire_lock() diff --git a/armada/handlers/wait.py b/armada/handlers/wait.py index 5fcfa14f..04b5c94d 100644 --- a/armada/handlers/wait.py +++ b/armada/handlers/wait.py @@ -322,7 +322,7 @@ class ResourceWait(ABC): exceptions=( urllib3.exceptions.ProtocolError, urllib3.exceptions.MaxRetryError), - delay=1) + delay=1, logger=LOG) def _wait(self, deadline): ''' Waits for resources to become ready. @@ -340,8 +340,16 @@ class ResourceWait(ABC): LOG.error(error) raise k8s_exceptions.KubernetesWatchTimeoutException(error) + LOG.debug( + 'Call watch resource completions type=%s ns=%s labels=%s timeo=%s', + self.resource_type, self.chart_wait.release_id.namespace, + self.label_selector, deadline_remaining) timed_out, modified, unready, found_resources = ( self._watch_resource_completions(timeout=deadline_remaining)) + LOG.debug( + 'Watch res completions return type=%s ns=%s labels=%s timeo=%s', + self.resource_type, self.chart_wait.release_id.namespace, + self.label_selector, deadline_remaining) if (not found_resources) and not self.required: return None @@ -404,56 +412,90 @@ class ResourceWait(ABC): # Only watch new events. kwargs['resource_version'] = resource_list.metadata.resource_version + LOG.debug( + 'Starting 1 to wait on: namespace=%s, resource type=%s, ' + 'label_selector=(%s), timeout=%s kwargs=%s', + self.chart_wait.release_id.namespace, self.resource_type, + self.label_selector, timeout, kwargs) w = watch.Watch() - for event in w.stream(self.get_resources, **kwargs): - event_type = event['type'].upper() - resource = event['object'] - resource_name = resource.metadata.name - resource_version = resource.metadata.resource_version + try: + for event in w.stream(self.get_resources, **kwargs): + event_type = event['type'].upper() + resource = event['object'] + LOG.info("Event arrived type - %s, label %s", + event_type, self.label_selector) + timeout_v = kwargs.get('timeout_seconds') + if isinstance(resource, dict): + LOG.info("Event is broken - res dict type label: %s", + self.label_selector) + if resource.get('metadata', None) is not None: + resource_name = resource['metadata'].get('name', "") + resource_version = resource['metadata'].get( + 'resource_version', "") + else: + resource_name = resource.metadata.name + resource_version = resource.metadata.resource_version - # Skip resources that should be excluded from wait operations - if not self.include_resource(resource): - continue + if resource_name == "" or resource_version == "": + LOG.info("Skipping broken event %s %s", + event_type, self.label_selector) + continue - msg = ( - 'Watch event: type=%s, name=%s, namespace=%s, ' - 'resource_version=%s') - LOG.debug( - msg, event_type, resource_name, - self.chart_wait.release_id.namespace, resource_version) + # Skip resources that should be excluded from wait operations + if not self.include_resource(resource): + continue - if event_type in {'ADDED', 'MODIFIED'}: - found_resources = True - resource_ready = self.handle_resource(resource) - ready[resource_name] = resource_ready + msg = ( + 'Watch event: type=%s, name=%s, namespace=%s, ' + 'resource_version=%s timeout=%s') + LOG.debug( + msg, event_type, resource_name, + self.chart_wait.release_id.namespace, resource_version, + timeout_v) - if event_type == 'MODIFIED': - modified.add(resource_name) + if event_type in {'ADDED', 'MODIFIED'}: + found_resources = True + resource_ready = self.handle_resource(resource) + ready[resource_name] = resource_ready - elif event_type == 'DELETED': - LOG.debug('Resource %s: removed from tracking', resource_name) - ready.pop(resource_name) + if event_type == 'MODIFIED': + modified.add(resource_name) - elif event_type == 'ERROR': - LOG.error( - 'Resource %s: Got error event %s', resource_name, - event['object'].to_dict()) - raise k8s_exceptions.KubernetesErrorEventException( - 'Got error event for resource: %s' % event['object']) + elif event_type == 'DELETED': + LOG.debug('Resource %s: removed from tracking', + resource_name) + ready.pop(resource_name) - else: - LOG.error( - 'Unrecognized event type (%s) for resource: %s', - event_type, event['object']) - raise ( - k8s_exceptions. - KubernetesUnknownStreamingEventTypeException( - 'Got unknown event type (%s) for resource: %s' % - (event_type, event['object']))) + elif event_type == 'ERROR': + LOG.error( + 'Resource %s: Got error event %s', resource_name, + event['object'].to_dict()) + raise k8s_exceptions.KubernetesErrorEventException( + 'Got error event for resource: %s' % event['object']) - if all(ready.values()): - return (False, modified, [], found_resources) + else: + LOG.error( + 'Unrecognized event type (%s) for resource: %s', + event_type, event['object']) + raise ( + k8s_exceptions. + KubernetesUnknownStreamingEventTypeException( + 'Got unknown event type (%s) for resource: %s' % + (event_type, event['object']))) + + if all(ready.values()): + return (False, modified, [], found_resources) + + except Exception as e: + LOG.error("Watch !!! Exception caught %s", e) + raise e + + LOG.debug( + 'Finished 1 to wait on: namespace=%s, resource type=%s, ' + 'label_selector=(%s), timeout=%s kwargs=%s', + self.chart_wait.release_id.namespace, self.resource_type, + self.label_selector, timeout, kwargs) return ( True, modified, @@ -470,7 +512,7 @@ class PodWait(ResourceWait): def __init__(self, resource_type, chart_wait, labels, **kwargs): super(PodWait, self).__init__( resource_type, chart_wait, labels, - chart_wait.k8s.client.list_namespaced_pod, **kwargs) + chart_wait.k8s.get_namespace_pod, **kwargs) def get_exclude_reason(self, resource): pod = resource