diff --git a/src/bin/shipyard_airflow/shipyard_airflow/plugins/check_k8s_node_status.py b/src/bin/shipyard_airflow/shipyard_airflow/plugins/check_k8s_node_status.py index 9220f769..f5fa333a 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/plugins/check_k8s_node_status.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/plugins/check_k8s_node_status.py @@ -11,14 +11,14 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging import time -from kubernetes import client, config +from kubernetes import client +from kubernetes import config -def check_node_status(time_out, interval): +def check_node_status(time_out, interval, expected_nodes): """This function retrieves the current state of the nodes in the Kubernetes cluster. We can use it to check the state of the cluster join process (drydock/promenade) and determine if all @@ -27,6 +27,8 @@ def check_node_status(time_out, interval): :param time_out: Node should be in Ready state before Time Out :param interval: Time interval in which we query node state + :param expected_nodes: The list of nodes that are expected to be + present in the check for status Example:: @@ -41,72 +43,96 @@ def check_node_status(time_out, interval): # Calls function to check that all nodes are in Ready State # Time out in this case is set to 15 mins, the time interval # has been set to 60 seconds - check_node_status(900, 60) + + # The expected nodes are the nodes to be compared against, + # as there could be nodes that never show up as ready, and those + # need to be represented in the response + check_node_status(900, 60, expected_nodes=['a','b','c']) """ - # Initialize Variable - not_ready_node_list = [] + # Initialize Variables - the nodes we are watching for + if not expected_nodes: + # if you're not looking for any, don't expect me to look either + return [] - # Note that we are using 'in_cluster_config' - config.load_incluster_config() - v1 = client.CoreV1Api() - - # Logs initial state of all nodes in the cluster - ret_init = v1.list_node(watch=False) - - logging.info("Current state of nodes in the cluster is") - - for i in ret_init.items: - logging.info("%s\t%s\t%s", i.metadata.name, - i.status.conditions[-1].status, - i.status.conditions[-1].type) - - # Populates the list of nodes in the Cluster - not_ready_node_list.append(i.metadata.name) + not_ready_node_list = list(expected_nodes) # Calculate number of times to execute the 'for' loop # Ensure that 'time_out' and 'interval' is passed in as integer # The result from the division will be a floating number which # We will round off to nearest whole number + + # no div/0 or negative intervals + if interval < 1: + interval = 1 + if time_out < 1: + time_out = 1 + end_range = round(int(time_out) / int(interval)) - + # end_range + 1 since the first check doesn't have a sleep ahead of it for i in range(0, end_range + 1): - # Reset node_ready to True for each iteration - cluster_ready = True - + logging.info("Remaining expected nodes to join cluster: [%s]", + ", ".join(not_ready_node_list)) # Get updated snapshot view of Cluster for each iteration - ret = v1.list_node(watch=False) + ret = _get_all_k8s_node_status() - # Check the current state of nodes that are not in Ready state - # from the previous iteration - for j in ret.items: - if j.metadata.name in not_ready_node_list: - if j.status.conditions[-1].status != 'True': - # Set cluster_ready to False - cluster_ready = False + # cautiously prevent crashing out of this code to ensure continued + # processing. + if ret is not None and hasattr(ret, 'items'): + # Check the state of nodes against the remaining expceted nodes + for j in ret.items: + # resolve response item fields without letting them break + # the processing loop. + try: + node_name = j.metadata.name + summary_status = j.status.conditions[-1].status + summary_message = j.status.conditions[-1].message + except (AttributeError, IndexError): + # any issue with the response object, move on to next item + logging.warning("Malformed node status response object. " + "Processing continues with the next item", + exc_info=True) + continue - # Print current state of node - logging.info("Node %s is not ready", j.metadata.name) - logging.debug("Current status of %s is %s", - j.metadata.name, - j.status.conditions[-1].message) - else: - # Remove 'Ready' node from list - not_ready_node_list.remove(j.metadata.name) + # only check nodes that we're currently waiting for + if node_name in not_ready_node_list: + if summary_status != 'True': + # Node not ready, print current state of node + logging.info("Node %s is not ready. Status is: %s", + node_name, summary_message) + else: + # Remove this node from list, it is ready + not_ready_node_list.remove(node_name) + logging.info("Node %s is in ready state", node_name) - logging.info("Node %s is in Ready state", j.metadata.name) - - # If any nodes are not ready and the timeout is reached, stop waiting - if not cluster_ready and i == end_range: - logging.info("Timed Out! One or more Nodes failed to reach ready " - "state") + # determine what to do based on the not_ready_node_list + if not_ready_node_list and i == end_range: + # There are remining items, and the timeout is elapsed + logging.info("Timed Out! Nodes [%s] did not reach ready state", + ", ".join(not_ready_node_list)) break - elif cluster_ready: - # Exit loop if Cluster is in Ready state - logging.info("All nodes are in ready state") + elif not not_ready_node_list: + # Exit loop where there are no more nodes to wait for (all ready) + logging.info("All expected nodes are in ready state") break else: + # There are nodes remaining, and time remining # Back off and check again in next iteration - logging.info("Wait for %d seconds...", int(interval)) + logging.info("Waiting %d seconds for next check of cluster status", + int(interval)) time.sleep(int(interval)) + # Return the nodes that are not ready. return not_ready_node_list + + +def _get_all_k8s_node_status(): + """Invoke Kubernetes and return the status response object""" + # Note that we are using 'in_cluster_config' + try: + config.load_incluster_config() + v1 = client.CoreV1Api() + return v1.list_node(watch=False) + except Exception: + # Log some diagnostics and return None. + logging.warning("There was an error retrieving the cluster status", + exc_info=True) diff --git a/src/bin/shipyard_airflow/shipyard_airflow/plugins/drydock_nodes.py b/src/bin/shipyard_airflow/shipyard_airflow/plugins/drydock_nodes.py index 0e7922c7..cba97c0e 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/plugins/drydock_nodes.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/plugins/drydock_nodes.py @@ -167,7 +167,8 @@ class DrydockNodesOperator(DrydockBaseOperator): # Anything not ready in the timeout needs to be considered a failure not_ready_list = check_k8s_node_status.check_node_status( self.node_st_timeout, - self.node_st_interval + self.node_st_interval, + expected_nodes=task_result.successes ) for node in not_ready_list: # Remove nodes that are not ready from the list of successes, since diff --git a/src/bin/shipyard_airflow/tests/unit/plugins/test_check_k8s_node_status.py b/src/bin/shipyard_airflow/tests/unit/plugins/test_check_k8s_node_status.py new file mode 100644 index 00000000..0f59794d --- /dev/null +++ b/src/bin/shipyard_airflow/tests/unit/plugins/test_check_k8s_node_status.py @@ -0,0 +1,247 @@ +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for check_k8s_node_status functions""" +import mock + +from shipyard_airflow.plugins.check_k8s_node_status import ( + check_node_status +) + + +class MockNodeStatus: + """A fake response object used to simulate a k8s node status item""" + def __init__(self, name, status, message): + self.metadata = mock.MagicMock() + self.metadata.name = name + self.item = mock.MagicMock() + self.item.status = status + self.item.message = message + self.status = mock.MagicMock() + self.status.conditions = [self.item] + + +class MalformedNodeStatus: + """A malformed esponse object used to simulate a k8s node status item + + Accepts a name, if the name field should be formed correctly. + """ + def __init__(self, name=None): + if name: + self.metadata = mock.MagicMock() + self.metadata.name = name + + self.status = mock.MagicMock() + self.status.conditions = "broken" + + +def gen_check_node_status(response_dict): + """Generate a function that will return the requested response dict + + :param response_dict: the set of responses to return + """ + class _StatefulResponder: + def __init__(self, res_dict=response_dict): + self.res_dict = res_dict + self.invocation = 0 + + def responder(self): + ret = mock.MagicMock() + if str(self.invocation) in self.res_dict: + ret.items = self.res_dict.get(str(self.invocation)) + else: + ret.items = self.res_dict.get('final') + self.invocation += 1 + return ret + sr = _StatefulResponder() + return sr.responder + + +# Node names used in these tests will be represented by a letter and number +# E.g. a1, a2, a3, b1, b2, etc. + +# The following dictionaries are sequences of response objects that are +# returned from the _get_all_k8s_node_status substitute method. + +# Successful single invocation response. 3 nodes, a1, b1, c1, all ready on the +# first pass +INV_SEQ_A = { + 'final': [ + MockNodeStatus('a1', 'True', 'Ready'), + MockNodeStatus('b1', 'True', 'Ready'), + MockNodeStatus('c1', 'True', 'Ready'), + ] +} + + +# Successful invocation response. 3 nodes, a1, b1, c1, ready after three +# passes +INV_SEQ_B = { + '0': [ + ], + '1': [ + MockNodeStatus('c1', 'True', 'Ready'), + ], + '2': [ + MockNodeStatus('c1', 'True', 'Ready'), + ], + 'final': [ + MockNodeStatus('a1', 'True', 'Ready'), + MockNodeStatus('b1', 'True', 'Ready'), + MockNodeStatus('c1', 'True', 'Ready'), + ] +} + +# Successful invocation response. 3 nodes, a1, b1, c1, ready after three +# passes with non-ready nodes appearing along the way +INV_SEQ_C = { + '0': [ + MockNodeStatus('a1', 'False', 'Not Ready'), + ], + '1': [ + MockNodeStatus('a1', 'False', 'Not Ready'), + MockNodeStatus('b1', 'False', 'Not Ready'), + MockNodeStatus('c1', 'True', 'Ready'), + ], + '2': [ + MockNodeStatus('a1', 'False', 'Not Ready'), + MockNodeStatus('b1', 'True', 'Ready'), + MockNodeStatus('c1', 'True', 'Ready'), + ], + 'final': [ + MockNodeStatus('a1', 'True', 'Ready'), + MockNodeStatus('b1', 'True', 'Ready'), + MockNodeStatus('c1', 'True', 'Ready'), + ] +} + +# Malformed invocation response on first try. +# Successful node c1 +INV_SEQ_D = { + 'final': [ + MalformedNodeStatus('a1'), + MalformedNodeStatus(), + MockNodeStatus('c1', 'True', 'Ready'), + ], +} + + +class TestCheckK8sNodeStatus: + + @mock.patch("shipyard_airflow.plugins.check_k8s_node_status." + "_get_all_k8s_node_status", + new=gen_check_node_status(INV_SEQ_A)) + def test_check_node_status_all_success(self): + """Assert that check_node_status completes successfully + + Simple case - all nodes ready when response has all values + (set input) + """ + not_found_nodes = check_node_status(10, 1, set(['a1', 'b1', 'c1'])) + assert not not_found_nodes + + @mock.patch("shipyard_airflow.plugins.check_k8s_node_status." + "_get_all_k8s_node_status", + new=gen_check_node_status(INV_SEQ_A)) + def test_check_node_status_simple_failure(self): + """Assert that check_node_status completes successfully with failures + + Some nodes successful, but looking for some that never show up. + (list input) + """ + not_found_nodes = check_node_status(1, 1, ['a1', 'b1', 'c1', 'z1']) + assert not_found_nodes == ['z1'] + + @mock.patch("shipyard_airflow.plugins.check_k8s_node_status." + "_get_all_k8s_node_status", + new=gen_check_node_status(INV_SEQ_B)) + def test_check_node_status_all_success_4th(self): + """Assert that check_node_status completes successfully + + All nodes ready on 4th iteration + """ + not_found_nodes = check_node_status(3, 1, set(['a1', 'b1', 'c1'])) + assert not not_found_nodes + + @mock.patch("shipyard_airflow.plugins.check_k8s_node_status." + "_get_all_k8s_node_status", + new=gen_check_node_status(INV_SEQ_B)) + def test_check_node_status_timeout_before_4th(self): + """Assert that check_node_status completes successfully with failures + + Some nodes not ready before timeout (before 4th iteration) + """ + not_found_nodes = check_node_status(2, 1, set(['a1', 'b1', 'c1'])) + assert 'a1' in not_found_nodes + assert 'b1' in not_found_nodes + assert 'c1' not in not_found_nodes + + @mock.patch("shipyard_airflow.plugins.check_k8s_node_status." + "_get_all_k8s_node_status", + new=gen_check_node_status(INV_SEQ_C)) + def test_check_node_status_success_changing_status(self): + """Assert that check_node_status completes successfully + + Nodes go from not ready to ready + """ + not_found_nodes = check_node_status(30, 1, set(['a1', 'b1', 'c1'])) + assert not not_found_nodes + + def test_check_node_status_no_interest(self): + """Assert that check_node_status completes successfully + + Returns empty array because nothing was requested to look for. + """ + not_found_nodes = check_node_status(3, 1, expected_nodes=None) + assert not not_found_nodes + not_found_nodes = check_node_status(3, 1, []) + assert not not_found_nodes + not_found_nodes = check_node_status(3, 1, set([])) + assert not not_found_nodes + + @mock.patch("shipyard_airflow.plugins.check_k8s_node_status." + "_get_all_k8s_node_status", + new=gen_check_node_status(INV_SEQ_D)) + def test_check_node_status_malformed(self): + """Assert that check_node_status completes successfully + + Nodes go from not ready to ready + """ + not_found_nodes = check_node_status(1, 1, set(['a1', 'b1', 'c1'])) + assert 'a1' in not_found_nodes + assert 'b1' in not_found_nodes + assert 'c1' not in not_found_nodes + + def test_check_node_status_error_not_connected_to_k8s(self): + """Assert that check_node_status completes successfully with failures + + No nodes found because of errors doing lookup. + """ + not_found_nodes = check_node_status(1, 1, set(['a1', 'b1', 'c1'])) + assert 'a1' in not_found_nodes + assert 'b1' in not_found_nodes + assert 'c1' in not_found_nodes + + @mock.patch("shipyard_airflow.plugins.check_k8s_node_status." + "_get_all_k8s_node_status", + new=gen_check_node_status(INV_SEQ_A)) + def test_check_node_status_bad_intervals(self): + """Assert that check_node_status completes successfully + + With bogus timeout and interval values + """ + not_found_nodes = check_node_status(-1, -1, set(['a1', 'b1', 'c1'])) + assert not not_found_nodes + + not_found_nodes = check_node_status(1, 5, set(['a1', 'b1', 'z1'])) + assert not_found_nodes == ['z1']