Adding debug output
Signed-off-by: Ruslan Aliev <raliev@mirantis.com> Change-Id: I16df43dac83a3c7f97e679629dbc160900933d04
This commit is contained in:
parent
275dc4506f
commit
dccfc4c47e
|
@ -13,6 +13,7 @@
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||||
|
import time
|
||||||
|
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
|
@ -180,10 +181,31 @@ class Armada(object):
|
||||||
for chart in cg_charts
|
for chart in cg_charts
|
||||||
}
|
}
|
||||||
|
|
||||||
|
while True:
|
||||||
|
LOG.debug("Thread stats - num of threads: %s",
|
||||||
|
len(future_to_chart))
|
||||||
|
stats = []
|
||||||
|
for fut, chrt in future_to_chart.items():
|
||||||
|
LOG.debug("chart: %s R:%s D:%s C:%s",
|
||||||
|
chrt['metadata']['name'],
|
||||||
|
fut.running(), fut.done(),
|
||||||
|
fut.cancelled())
|
||||||
|
if fut.done() or fut.cancelled():
|
||||||
|
stats.append(True)
|
||||||
|
else:
|
||||||
|
stats.append(False)
|
||||||
|
|
||||||
|
if all(stats):
|
||||||
|
LOG.debug("Looks like all charts finished")
|
||||||
|
break
|
||||||
|
time.sleep(10)
|
||||||
|
|
||||||
for future in as_completed(future_to_chart):
|
for future in as_completed(future_to_chart):
|
||||||
chart = future_to_chart[future]
|
chart = future_to_chart[future]
|
||||||
|
LOG.debug("Yielded %s", chart['metadata']['name'])
|
||||||
handle_result(chart, future.result)
|
handle_result(chart, future.result)
|
||||||
|
|
||||||
|
LOG.debug("Thread pool part finished")
|
||||||
if failures:
|
if failures:
|
||||||
LOG.error('Chart deploy(s) failed: %s', failures)
|
LOG.error('Chart deploy(s) failed: %s', failures)
|
||||||
raise armada_exceptions.ChartDeployException(failures)
|
raise armada_exceptions.ChartDeployException(failures)
|
||||||
|
|
|
@ -129,7 +129,7 @@ class ChartDeploy(object):
|
||||||
|
|
||||||
if not self.disable_update_post and upgrade_post:
|
if not self.disable_update_post and upgrade_post:
|
||||||
LOG.warning(
|
LOG.warning(
|
||||||
'Post upgrade actions are ignored by Armada'
|
'Post upgrade actions are ignored by Armada '
|
||||||
'and will not affect deployment.')
|
'and will not affect deployment.')
|
||||||
|
|
||||||
LOG.info('Checking for updates to chart release inputs.')
|
LOG.info('Checking for updates to chart release inputs.')
|
||||||
|
@ -160,7 +160,7 @@ class ChartDeploy(object):
|
||||||
timeout=timer,
|
timeout=timer,
|
||||||
force=force)
|
force=force)
|
||||||
|
|
||||||
LOG.info('Upgrade completed')
|
LOG.info('Upgrade completed for release %s', release_id)
|
||||||
result['upgrade'] = release_id
|
result['upgrade'] = release_id
|
||||||
|
|
||||||
deploy = upgrade
|
deploy = upgrade
|
||||||
|
@ -236,7 +236,9 @@ class ChartDeploy(object):
|
||||||
with metrics.CHART_DEPLOY.get_context(wait_timeout, manifest_name,
|
with metrics.CHART_DEPLOY.get_context(wait_timeout, manifest_name,
|
||||||
chart_name,
|
chart_name,
|
||||||
action.get_label_value()):
|
action.get_label_value()):
|
||||||
|
LOG.info('Calling deploy fn for release=%s', release_id)
|
||||||
deploy()
|
deploy()
|
||||||
|
LOG.info('Deploy fn called for release=%s', release_id)
|
||||||
|
|
||||||
# Wait
|
# Wait
|
||||||
timer = int(round(deadline - time.time()))
|
timer = int(round(deadline - time.time()))
|
||||||
|
@ -251,11 +253,15 @@ class ChartDeploy(object):
|
||||||
|
|
||||||
run_test = test_handler.test_enabled and (
|
run_test = test_handler.test_enabled and (
|
||||||
just_deployed or not last_test_passed)
|
just_deployed or not last_test_passed)
|
||||||
|
LOG.info('Test handler created for release=%s', release_id)
|
||||||
if run_test:
|
if run_test:
|
||||||
with metrics.CHART_TEST.get_context(test_handler.timeout,
|
with metrics.CHART_TEST.get_context(test_handler.timeout,
|
||||||
manifest_name, chart_name):
|
manifest_name, chart_name):
|
||||||
|
LOG.info('Running tests for release=%s', release_id)
|
||||||
self._test_chart(test_handler)
|
self._test_chart(test_handler)
|
||||||
|
LOG.info('Tests completed for release=%s', release_id)
|
||||||
|
|
||||||
|
LOG.info('Exec function completed for release=%s', release_id)
|
||||||
return result
|
return result
|
||||||
|
|
||||||
def purge_release(
|
def purge_release(
|
||||||
|
|
|
@ -61,8 +61,10 @@ class Helm(object):
|
||||||
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
||||||
timeout=timeout)
|
timeout=timeout)
|
||||||
except subprocess.CalledProcessError as e:
|
except subprocess.CalledProcessError as e:
|
||||||
|
LOG.info('Exception command=%s', command)
|
||||||
raise HelmCommandException(e)
|
raise HelmCommandException(e)
|
||||||
|
|
||||||
|
LOG.info('Command finished=%s', command)
|
||||||
if json:
|
if json:
|
||||||
return JSON.loads(result.stdout)
|
return JSON.loads(result.stdout)
|
||||||
return result.stdout
|
return result.stdout
|
||||||
|
|
|
@ -21,6 +21,8 @@ from kubernetes import watch
|
||||||
from kubernetes.client.rest import ApiException
|
from kubernetes.client.rest import ApiException
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
|
from retry import retry
|
||||||
|
import urllib3.exceptions
|
||||||
|
|
||||||
from armada.const import DEFAULT_K8S_TIMEOUT
|
from armada.const import DEFAULT_K8S_TIMEOUT
|
||||||
from armada.exceptions import k8s_exceptions as exceptions
|
from armada.exceptions import k8s_exceptions as exceptions
|
||||||
|
@ -118,6 +120,11 @@ class K8s(object):
|
||||||
self.client.list_namespaced_pod, self.client.delete_namespaced_pod,
|
self.client.list_namespaced_pod, self.client.delete_namespaced_pod,
|
||||||
"pod", name, namespace, propagation_policy, timeout)
|
"pod", name, namespace, propagation_policy, timeout)
|
||||||
|
|
||||||
|
@retry(
|
||||||
|
exceptions=(
|
||||||
|
urllib3.exceptions.ProtocolError,
|
||||||
|
urllib3.exceptions.MaxRetryError),
|
||||||
|
delay=1, logger=LOG)
|
||||||
def _delete_item_action(
|
def _delete_item_action(
|
||||||
self,
|
self,
|
||||||
list_func,
|
list_func,
|
||||||
|
@ -237,15 +244,38 @@ class K8s(object):
|
||||||
|
|
||||||
This will return a list of objects req namespace
|
This will return a list of objects req namespace
|
||||||
'''
|
'''
|
||||||
|
LOG.debug("Get namespace pod called with=%s", kwargs)
|
||||||
return self.client.list_namespaced_pod(namespace, **kwargs)
|
try:
|
||||||
|
res = self.client.list_namespaced_pod(namespace, **kwargs)
|
||||||
|
LOG.debug("Get namespace pod 1 result with=%s , result type=%s",
|
||||||
|
kwargs, res.status if 'watch' in kwargs else type(res))
|
||||||
|
try:
|
||||||
|
LOG.debug("HIDDEN")
|
||||||
|
LOG.debug("HTTP Responce gt %s %s %s", res.status, res.headers,
|
||||||
|
res.data)
|
||||||
|
except Exception as e:
|
||||||
|
LOG.debug("Can not print exception - %s", e)
|
||||||
|
LOG.debug("HIDDEN 2 Returning result args %s", kwargs)
|
||||||
|
return res
|
||||||
|
except Exception as e:
|
||||||
|
LOG.error("Exception caught at list ns pod %s", e)
|
||||||
|
raise e
|
||||||
|
|
||||||
def get_namespace_deployment(self, namespace='default', **kwargs):
|
def get_namespace_deployment(self, namespace='default', **kwargs):
|
||||||
'''
|
'''
|
||||||
:param namespace: namespace of target deamonset
|
:param namespace: namespace of target daemonset
|
||||||
:param labels: specify targeted deployment
|
:param labels: specify targeted deployment
|
||||||
'''
|
'''
|
||||||
return self.apps_v1_api.list_namespaced_deployment(namespace, **kwargs)
|
LOG.debug("Get namespace deployment called with=%s", kwargs)
|
||||||
|
res = self.apps_v1_api.list_namespaced_deployment(namespace, **kwargs)
|
||||||
|
LOG.debug("Get namespace deployment 1 result with=%s , result type=%s",
|
||||||
|
kwargs, type(res))
|
||||||
|
try:
|
||||||
|
LOG.debug("HTTPResponce dep got %s %s %s", res.status, res.headers,
|
||||||
|
res.data)
|
||||||
|
except Exception as e:
|
||||||
|
LOG.debug("Can not print dep exception - %s", e)
|
||||||
|
return res
|
||||||
|
|
||||||
def get_namespace_stateful_set(self, namespace='default', **kwargs):
|
def get_namespace_stateful_set(self, namespace='default', **kwargs):
|
||||||
'''
|
'''
|
||||||
|
@ -309,6 +339,16 @@ class K8s(object):
|
||||||
|
|
||||||
pod_base_name = base_pod_pattern.match(old_pod_name).group(1)
|
pod_base_name = base_pod_pattern.match(old_pod_name).group(1)
|
||||||
|
|
||||||
|
self._wait_for_pod_redeployment(
|
||||||
|
namespace, base_pod_pattern, pod_base_name)
|
||||||
|
|
||||||
|
@retry(
|
||||||
|
exceptions=(
|
||||||
|
urllib3.exceptions.ProtocolError,
|
||||||
|
urllib3.exceptions.MaxRetryError),
|
||||||
|
delay=1, logger=LOG)
|
||||||
|
def _wait_for_pod_redeployment(
|
||||||
|
self, namespace, base_pod_pattern, pod_base_name):
|
||||||
new_pod_name = ''
|
new_pod_name = ''
|
||||||
|
|
||||||
w = watch.Watch()
|
w = watch.Watch()
|
||||||
|
@ -329,30 +369,6 @@ class K8s(object):
|
||||||
LOG.info('New pod %s deployed', new_pod_name)
|
LOG.info('New pod %s deployed', new_pod_name)
|
||||||
w.stop()
|
w.stop()
|
||||||
|
|
||||||
def wait_get_completed_podphase(
|
|
||||||
self, release, timeout=DEFAULT_K8S_TIMEOUT):
|
|
||||||
'''
|
|
||||||
:param release: part of namespace
|
|
||||||
:param timeout: time before disconnecting stream
|
|
||||||
'''
|
|
||||||
timeout = self._check_timeout(timeout)
|
|
||||||
|
|
||||||
w = watch.Watch()
|
|
||||||
found_events = False
|
|
||||||
for event in w.stream(self.client.list_pod_for_all_namespaces,
|
|
||||||
timeout_seconds=timeout):
|
|
||||||
resource_name = event['object'].metadata.name
|
|
||||||
|
|
||||||
if release in resource_name:
|
|
||||||
found_events = True
|
|
||||||
pod_state = event['object'].status.phase
|
|
||||||
if pod_state == 'Succeeded':
|
|
||||||
w.stop()
|
|
||||||
break
|
|
||||||
|
|
||||||
if not found_events:
|
|
||||||
LOG.warn('Saw no test events for release %s', release)
|
|
||||||
|
|
||||||
def _check_timeout(self, timeout):
|
def _check_timeout(self, timeout):
|
||||||
if timeout <= 0:
|
if timeout <= 0:
|
||||||
LOG.warn(
|
LOG.warn(
|
||||||
|
@ -434,5 +450,20 @@ class K8s(object):
|
||||||
:return: k8s client response
|
:return: k8s client response
|
||||||
:rtype: object
|
:rtype: object
|
||||||
"""
|
"""
|
||||||
|
try:
|
||||||
|
return self._replace_custom_resource(
|
||||||
|
group, version, namespace, plural, name, body)
|
||||||
|
except Exception as e:
|
||||||
|
LOG.error(
|
||||||
|
"Exception caught while trying to replace the resource %s", e)
|
||||||
|
raise e
|
||||||
|
|
||||||
|
@retry(
|
||||||
|
exceptions=(
|
||||||
|
urllib3.exceptions.ProtocolError,
|
||||||
|
urllib3.exceptions.MaxRetryError),
|
||||||
|
delay=1, logger=LOG)
|
||||||
|
def _replace_custom_resource(
|
||||||
|
self, group, version, namespace, plural, name, body):
|
||||||
return self.custom_objects.replace_namespaced_custom_object(
|
return self.custom_objects.replace_namespaced_custom_object(
|
||||||
group, version, namespace, plural, name, body)
|
group, version, namespace, plural, name, body)
|
||||||
|
|
|
@ -175,6 +175,7 @@ class Lock:
|
||||||
def update_lock(self):
|
def update_lock(self):
|
||||||
LOG.debug("Updating lock")
|
LOG.debug("Updating lock")
|
||||||
self.lock_config.replace_lock()
|
self.lock_config.replace_lock()
|
||||||
|
LOG.debug("Lock updated")
|
||||||
|
|
||||||
def __enter__(self):
|
def __enter__(self):
|
||||||
self.acquire_lock()
|
self.acquire_lock()
|
||||||
|
|
|
@ -17,8 +17,11 @@ import collections
|
||||||
import copy
|
import copy
|
||||||
import math
|
import math
|
||||||
import re
|
import re
|
||||||
|
import sys
|
||||||
import time
|
import time
|
||||||
|
import logging as std_logging
|
||||||
|
|
||||||
|
from kubernetes import client
|
||||||
from kubernetes import watch
|
from kubernetes import watch
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
from retry import retry
|
from retry import retry
|
||||||
|
@ -37,12 +40,140 @@ LOG = logging.getLogger(__name__)
|
||||||
ROLLING_UPDATE_STRATEGY_TYPE = 'RollingUpdate'
|
ROLLING_UPDATE_STRATEGY_TYPE = 'RollingUpdate'
|
||||||
ASYNC_UPDATE_NOT_ALLOWED_MSG = 'Async update not allowed: '
|
ASYNC_UPDATE_NOT_ALLOWED_MSG = 'Async update not allowed: '
|
||||||
|
|
||||||
|
PY2 = sys.version_info[0] == 2
|
||||||
|
if PY2:
|
||||||
|
import httplib
|
||||||
|
HTTP_STATUS_GONE = httplib.GONE
|
||||||
|
else:
|
||||||
|
import http
|
||||||
|
HTTP_STATUS_GONE = http.HTTPStatus.GONE
|
||||||
|
|
||||||
|
requests_log = std_logging.getLogger("requests.packages.urllib3")
|
||||||
|
requests_log.setLevel(std_logging.DEBUG)
|
||||||
|
requests_log.propagate = True
|
||||||
|
|
||||||
|
|
||||||
def get_wait_labels(chart):
|
def get_wait_labels(chart):
|
||||||
wait_config = chart.get('wait', {})
|
wait_config = chart.get('wait', {})
|
||||||
return wait_config.get('labels', {})
|
return wait_config.get('labels', {})
|
||||||
|
|
||||||
|
|
||||||
|
def iter_resp_lines(resp):
|
||||||
|
try:
|
||||||
|
prev = ""
|
||||||
|
for seg in resp.stream(amt=None, decode_content=False):
|
||||||
|
if isinstance(seg, bytes):
|
||||||
|
seg = seg.decode('utf8')
|
||||||
|
seg = prev + seg
|
||||||
|
lines = seg.split("\n")
|
||||||
|
if not seg.endswith("\n"):
|
||||||
|
prev = lines[-1]
|
||||||
|
lines = lines[:-1]
|
||||||
|
else:
|
||||||
|
prev = ""
|
||||||
|
for line in lines:
|
||||||
|
if line:
|
||||||
|
yield line
|
||||||
|
except Exception as e:
|
||||||
|
LOG.info("iter_resp_lines exception caught %s", e)
|
||||||
|
raise e
|
||||||
|
|
||||||
|
|
||||||
|
class Watcher(watch.Watch):
|
||||||
|
def stream(self, func, *args, **kwargs):
|
||||||
|
"""Watch an API resource and stream the result back via a generator.
|
||||||
|
Note that watching an API resource can expire. The method tries to
|
||||||
|
resume automatically once from the last result, but if that last result
|
||||||
|
is too old as well, an `ApiException` exception will be thrown with
|
||||||
|
``code`` 410. In that case you have to recover yourself, probably
|
||||||
|
by listing the API resource to obtain the latest state and then
|
||||||
|
watching from that state on by setting ``resource_version`` to
|
||||||
|
one returned from listing.
|
||||||
|
:param func: The API function pointer. Any parameter to the function
|
||||||
|
can be passed after this parameter.
|
||||||
|
:return: Event object with these keys:
|
||||||
|
'type': The type of event such as "ADDED", "DELETED", etc.
|
||||||
|
'raw_object': a dict representing the watched object.
|
||||||
|
'object': A model representation of raw_object. The name of
|
||||||
|
model will be determined based on
|
||||||
|
the func's doc string. If it cannot be determined,
|
||||||
|
'object' value will be the same as 'raw_object'.
|
||||||
|
Example:
|
||||||
|
v1 = kubernetes.client.CoreV1Api()
|
||||||
|
watch = kubernetes.watch.Watch()
|
||||||
|
for e in watch.stream(v1.list_namespace, resource_version=1127):
|
||||||
|
type = e['type']
|
||||||
|
object = e['object'] # object is one of type return_type
|
||||||
|
raw_object = e['raw_object'] # raw_object is a dict
|
||||||
|
...
|
||||||
|
if should_stop:
|
||||||
|
watch.stop()
|
||||||
|
"""
|
||||||
|
|
||||||
|
self._stop = False
|
||||||
|
return_type = self.get_return_type(func)
|
||||||
|
watch_arg = self.get_watch_argument_name(func)
|
||||||
|
kwargs[watch_arg] = True
|
||||||
|
kwargs['_preload_content'] = False
|
||||||
|
#kwargs['_request_timeout'] = 5
|
||||||
|
if 'resource_version' in kwargs:
|
||||||
|
self.resource_version = kwargs['resource_version']
|
||||||
|
|
||||||
|
# Do not attempt retries if user specifies a timeout.
|
||||||
|
# We want to ensure we are returning within that timeout.
|
||||||
|
disable_retries = ('timeout_seconds' in kwargs)
|
||||||
|
retry_after_410 = False
|
||||||
|
while True:
|
||||||
|
LOG.debug("Calling watch func with kwargs %s", kwargs)
|
||||||
|
#if kwargs.get("label_selector", "") == "release_group=clcp-ucp-prometheus-openstack-exporter":
|
||||||
|
# breakpoint()
|
||||||
|
resp = func(*args, **kwargs)
|
||||||
|
LOG.debug("Called watch func with kwargs %s, status", kwargs,
|
||||||
|
resp.status)
|
||||||
|
try:
|
||||||
|
for line in iter_resp_lines(resp):
|
||||||
|
LOG.debug("Returned a line kwargs %s", kwargs)
|
||||||
|
# unmarshal when we are receiving events from watch,
|
||||||
|
# return raw string when we are streaming log
|
||||||
|
if watch_arg == "watch":
|
||||||
|
event = self.unmarshal_event(line, return_type)
|
||||||
|
if isinstance(event, dict) \
|
||||||
|
and event['type'] == 'ERROR':
|
||||||
|
obj = event['raw_object']
|
||||||
|
# Current request expired, let's retry, (if enabled)
|
||||||
|
# but only if we have not already retried.
|
||||||
|
LOG.debug("Request expired kwargs %s", kwargs)
|
||||||
|
if not disable_retries and not retry_after_410 and \
|
||||||
|
obj['code'] == HTTP_STATUS_GONE:
|
||||||
|
retry_after_410 = True
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
LOG.debug("Raising exception kwargs %s", kwargs)
|
||||||
|
reason = "%s: %s" % (
|
||||||
|
obj['reason'], obj['message'])
|
||||||
|
raise client.rest.ApiException(
|
||||||
|
status=obj['code'], reason=reason)
|
||||||
|
else:
|
||||||
|
retry_after_410 = False
|
||||||
|
LOG.debug("Yielding event kwargs %s", kwargs)
|
||||||
|
yield event
|
||||||
|
else:
|
||||||
|
yield line
|
||||||
|
if self._stop:
|
||||||
|
break
|
||||||
|
finally:
|
||||||
|
LOG.debug("Finally block kwargs %s", kwargs)
|
||||||
|
resp.close()
|
||||||
|
resp.release_conn()
|
||||||
|
if self.resource_version is not None:
|
||||||
|
kwargs['resource_version'] = self.resource_version
|
||||||
|
else:
|
||||||
|
self._stop = True
|
||||||
|
|
||||||
|
if self._stop or disable_retries:
|
||||||
|
LOG.debug("Selfstop or disable rets kwargs %s", kwargs)
|
||||||
|
break
|
||||||
|
|
||||||
# TODO: Validate this object up front in armada validate flow.
|
# TODO: Validate this object up front in armada validate flow.
|
||||||
class ChartWait():
|
class ChartWait():
|
||||||
def __init__(
|
def __init__(
|
||||||
|
@ -322,7 +453,7 @@ class ResourceWait(ABC):
|
||||||
exceptions=(
|
exceptions=(
|
||||||
urllib3.exceptions.ProtocolError,
|
urllib3.exceptions.ProtocolError,
|
||||||
urllib3.exceptions.MaxRetryError),
|
urllib3.exceptions.MaxRetryError),
|
||||||
delay=1)
|
delay=1, logger=LOG)
|
||||||
def _wait(self, deadline):
|
def _wait(self, deadline):
|
||||||
'''
|
'''
|
||||||
Waits for resources to become ready.
|
Waits for resources to become ready.
|
||||||
|
@ -340,8 +471,16 @@ class ResourceWait(ABC):
|
||||||
LOG.error(error)
|
LOG.error(error)
|
||||||
raise k8s_exceptions.KubernetesWatchTimeoutException(error)
|
raise k8s_exceptions.KubernetesWatchTimeoutException(error)
|
||||||
|
|
||||||
|
LOG.debug(
|
||||||
|
'Call watch resource completions type=%s ns=%s labels=%s timeo=%s',
|
||||||
|
self.resource_type, self.chart_wait.release_id.namespace,
|
||||||
|
self.label_selector, deadline_remaining)
|
||||||
timed_out, modified, unready, found_resources = (
|
timed_out, modified, unready, found_resources = (
|
||||||
self._watch_resource_completions(timeout=deadline_remaining))
|
self._watch_resource_completions(timeout=deadline_remaining))
|
||||||
|
LOG.debug(
|
||||||
|
'Watch res completions return type=%s ns=%s labels=%s timeo=%s',
|
||||||
|
self.resource_type, self.chart_wait.release_id.namespace,
|
||||||
|
self.label_selector, deadline_remaining)
|
||||||
|
|
||||||
if (not found_resources) and not self.required:
|
if (not found_resources) and not self.required:
|
||||||
return None
|
return None
|
||||||
|
@ -404,56 +543,92 @@ class ResourceWait(ABC):
|
||||||
|
|
||||||
# Only watch new events.
|
# Only watch new events.
|
||||||
kwargs['resource_version'] = resource_list.metadata.resource_version
|
kwargs['resource_version'] = resource_list.metadata.resource_version
|
||||||
|
LOG.debug(
|
||||||
|
'Starting 1 to wait on: namespace=%s, resource type=%s, '
|
||||||
|
'label_selector=(%s), timeout=%s kwargs=%s',
|
||||||
|
self.chart_wait.release_id.namespace, self.resource_type,
|
||||||
|
self.label_selector, timeout, kwargs)
|
||||||
|
|
||||||
w = watch.Watch()
|
w = Watcher()
|
||||||
for event in w.stream(self.get_resources, **kwargs):
|
try:
|
||||||
event_type = event['type'].upper()
|
for event in w.stream(self.get_resources, **kwargs):
|
||||||
resource = event['object']
|
event_type = event['type'].upper()
|
||||||
resource_name = resource.metadata.name
|
resource = event['object']
|
||||||
resource_version = resource.metadata.resource_version
|
LOG.info("Event arrived type - %s, label %s",
|
||||||
|
event_type, self.label_selector)
|
||||||
|
timeout_v = kwargs.get('timeout_seconds')
|
||||||
|
if isinstance(resource, dict):
|
||||||
|
LOG.info("Event is broken - res dict type label: %s %s",
|
||||||
|
self.label_selector, event)
|
||||||
|
raise urllib3.exceptions.ProtocolError(
|
||||||
|
"Invalid event for selector %s", self.label_selector)
|
||||||
|
|
||||||
# Skip resources that should be excluded from wait operations
|
resource_name = resource.metadata.name
|
||||||
if not self.include_resource(resource):
|
resource_version = resource.metadata.resource_version
|
||||||
continue
|
|
||||||
|
|
||||||
msg = (
|
if resource_name == "" or resource_version == "":
|
||||||
'Watch event: type=%s, name=%s, namespace=%s, '
|
LOG.info("Skipping broken event %s %s '%s' '%s'",
|
||||||
'resource_version=%s')
|
event_type, self.label_selector,
|
||||||
LOG.debug(
|
resource_name, resource_version)
|
||||||
msg, event_type, resource_name,
|
raise urllib3.exceptions.ProtocolError(
|
||||||
self.chart_wait.release_id.namespace, resource_version)
|
"Invalid event for selector %s", self.label_selector)
|
||||||
|
|
||||||
if event_type in {'ADDED', 'MODIFIED'}:
|
# Skip resources that should be excluded from wait operations
|
||||||
found_resources = True
|
if not self.include_resource(resource):
|
||||||
resource_ready = self.handle_resource(resource)
|
continue
|
||||||
ready[resource_name] = resource_ready
|
|
||||||
|
|
||||||
if event_type == 'MODIFIED':
|
msg = (
|
||||||
modified.add(resource_name)
|
'Watch event: type=%s, name=%s, namespace=%s, '
|
||||||
|
'resource_version=%s timeout=%s')
|
||||||
|
LOG.debug(
|
||||||
|
msg, event_type, resource_name,
|
||||||
|
self.chart_wait.release_id.namespace, resource_version,
|
||||||
|
timeout_v)
|
||||||
|
|
||||||
elif event_type == 'DELETED':
|
if event_type in {'ADDED', 'MODIFIED'}:
|
||||||
LOG.debug('Resource %s: removed from tracking', resource_name)
|
found_resources = True
|
||||||
ready.pop(resource_name)
|
resource_ready = self.handle_resource(resource)
|
||||||
|
ready[resource_name] = resource_ready
|
||||||
|
|
||||||
elif event_type == 'ERROR':
|
if event_type == 'MODIFIED':
|
||||||
LOG.error(
|
modified.add(resource_name)
|
||||||
'Resource %s: Got error event %s', resource_name,
|
|
||||||
event['object'].to_dict())
|
|
||||||
raise k8s_exceptions.KubernetesErrorEventException(
|
|
||||||
'Got error event for resource: %s' % event['object'])
|
|
||||||
|
|
||||||
else:
|
elif event_type == 'DELETED':
|
||||||
LOG.error(
|
LOG.debug('Resource %s: removed from tracking',
|
||||||
'Unrecognized event type (%s) for resource: %s',
|
resource_name)
|
||||||
event_type, event['object'])
|
ready.pop(resource_name)
|
||||||
raise (
|
|
||||||
k8s_exceptions.
|
|
||||||
KubernetesUnknownStreamingEventTypeException(
|
|
||||||
'Got unknown event type (%s) for resource: %s' %
|
|
||||||
(event_type, event['object'])))
|
|
||||||
|
|
||||||
if all(ready.values()):
|
elif event_type == 'ERROR':
|
||||||
return (False, modified, [], found_resources)
|
LOG.error(
|
||||||
|
'Resource %s: Got error event %s', resource_name,
|
||||||
|
event['object'].to_dict())
|
||||||
|
raise k8s_exceptions.KubernetesErrorEventException(
|
||||||
|
'Got error event for resource: %s' % event['object'])
|
||||||
|
|
||||||
|
else:
|
||||||
|
LOG.error(
|
||||||
|
'Unrecognized event type (%s) for resource: %s',
|
||||||
|
event_type, event['object'])
|
||||||
|
raise (
|
||||||
|
k8s_exceptions.
|
||||||
|
KubernetesUnknownStreamingEventTypeException(
|
||||||
|
'Got unknown event type (%s) for resource: %s' %
|
||||||
|
(event_type, event['object'])))
|
||||||
|
|
||||||
|
if all(ready.values()):
|
||||||
|
LOG.info("Returning from stream %s %s",
|
||||||
|
self.resource_type, self.label_selector)
|
||||||
|
return (False, modified, [], found_resources)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
LOG.error("Watch !!! Exception caught %s", e)
|
||||||
|
raise e
|
||||||
|
|
||||||
|
LOG.debug(
|
||||||
|
'Finished 1 to wait on: namespace=%s, resource type=%s, '
|
||||||
|
'label_selector=(%s), timeout=%s kwargs=%s',
|
||||||
|
self.chart_wait.release_id.namespace, self.resource_type,
|
||||||
|
self.label_selector, timeout, kwargs)
|
||||||
|
|
||||||
return (
|
return (
|
||||||
True, modified,
|
True, modified,
|
||||||
|
@ -511,6 +686,7 @@ class PodWait(ResourceWait):
|
||||||
if cond and cond.status == 'True':
|
if cond and cond.status == 'True':
|
||||||
return ("Pod {} ready".format(name), True)
|
return ("Pod {} ready".format(name), True)
|
||||||
|
|
||||||
|
LOG.debug("Pod is not ready yet: %s, statuses: %s", name, status)
|
||||||
msg = "Waiting for pod {} to be ready..."
|
msg = "Waiting for pod {} to be ready..."
|
||||||
return (msg.format(name), False)
|
return (msg.format(name), False)
|
||||||
|
|
||||||
|
|
|
@ -12,6 +12,10 @@
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
|
import signal
|
||||||
|
import sys
|
||||||
|
import pdb
|
||||||
|
|
||||||
from urllib.parse import urlparse
|
from urllib.parse import urlparse
|
||||||
|
|
||||||
import click
|
import click
|
||||||
|
@ -27,6 +31,10 @@ from armada.common.session import ArmadaSession
|
||||||
CONF = cfg.CONF
|
CONF = cfg.CONF
|
||||||
|
|
||||||
|
|
||||||
|
def signal_handler(sig, frame):
|
||||||
|
pdb.set_trace()
|
||||||
|
|
||||||
|
|
||||||
@click.group()
|
@click.group()
|
||||||
@click.option('--debug', help="Enable debug logging", is_flag=True)
|
@click.option('--debug', help="Enable debug logging", is_flag=True)
|
||||||
@click.option(
|
@click.option(
|
||||||
|
@ -79,6 +87,8 @@ def main(ctx, debug, api, url, token):
|
||||||
log.setup(CONF, 'armada')
|
log.setup(CONF, 'armada')
|
||||||
|
|
||||||
|
|
||||||
|
signal.signal(signal.SIGUSR1, signal_handler)
|
||||||
|
|
||||||
main.add_command(apply_create)
|
main.add_command(apply_create)
|
||||||
main.add_command(test_charts)
|
main.add_command(test_charts)
|
||||||
main.add_command(validate_manifest)
|
main.add_command(validate_manifest)
|
||||||
|
|
Loading…
Reference in New Issue