From 48920224cc3f70bf1414839de65837fffbfebb68 Mon Sep 17 00:00:00 2001 From: Michael Beaver Date: Tue, 22 Jan 2019 09:09:47 -0600 Subject: [PATCH] Support in Armada for locking Tiller This creates a new mechanism in Armada to enable functions to only be run once across multiple instances of Armada working with the same Kubernetes cluster. This is accomplished by utilizing custom resources via the Kubernetes API. This also introduces new config defaults that can be used to configure the lock timeout, expiration, and update interval. Some notes on how the lock works: * Functions to be locked can add the new decorator * The optional name parameter can be used to create multiple types of locks which can coexist * If the lock is unable to be acquired before the timeout a new exception is raised * The lock is updated regularly while the decorated function is still running * If a lock already exists it will only be overwritten if the duration since its last update is longer than the expiration time For now this locking method is being used for components that require write access to Tiller so that simultaneous write operations are avoided. Change-Id: Iee07da9a233ee2e2a54c6bc4881185388b377c05 --- armada/api/controller/armada.py | 4 + armada/api/controller/rollback.py | 4 + armada/api/controller/test.py | 44 ++- armada/cli/apply.py | 2 + armada/cli/delete.py | 2 + armada/cli/rollback.py | 2 + armada/cli/test.py | 2 + armada/conf/default.py | 25 ++ armada/handlers/k8s.py | 78 +++++ armada/handlers/lock.py | 301 ++++++++++++++++++ .../tests/unit/api/test_armada_controller.py | 2 + .../unit/api/test_rollback_controller.py | 3 + armada/tests/unit/api/test_test_controller.py | 6 + armada/tests/unit/handlers/test_lock.py | 232 ++++++++++++++ 14 files changed, 690 insertions(+), 17 deletions(-) create mode 100644 armada/handlers/lock.py create mode 100644 armada/tests/unit/handlers/test_lock.py diff --git a/armada/api/controller/armada.py b/armada/api/controller/armada.py index 73a3e074..9dfb9d5d 100644 --- a/armada/api/controller/armada.py +++ b/armada/api/controller/armada.py @@ -22,6 +22,7 @@ from armada.common import policy from armada import exceptions from armada.handlers.armada import Armada from armada.handlers.document import ReferenceResolver +from armada.handlers.lock import lock_and_thread, LockException from armada.handlers.override import Override @@ -78,12 +79,15 @@ class Apply(api.BaseResource): except exceptions.ManifestException as e: self.return_error(resp, falcon.HTTP_400, message=str(e)) + except LockException as e: + self.return_error(resp, falcon.HTTP_409, message=str(e)) except Exception as e: self.logger.exception('Caught unexpected exception') err_message = 'Failed to apply manifest: {}'.format(e) self.error(req.context, err_message) self.return_error(resp, falcon.HTTP_500, message=err_message) + @lock_and_thread() def handle(self, req, documents, tiller): armada = Armada( documents, diff --git a/armada/api/controller/rollback.py b/armada/api/controller/rollback.py index 78bc933d..ba6ab0a6 100644 --- a/armada/api/controller/rollback.py +++ b/armada/api/controller/rollback.py @@ -19,6 +19,7 @@ from oslo_config import cfg from armada import api from armada.common import policy +from armada.handlers.lock import lock_and_thread, LockException CONF = cfg.CONF @@ -37,12 +38,15 @@ class Rollback(api.BaseResource): }) resp.content_type = 'application/json' resp.status = falcon.HTTP_200 + except LockException as e: + self.return_error(resp, falcon.HTTP_409, message=str(e)) except Exception as e: self.logger.exception('Caught unexpected exception') err_message = 'Failed to rollback release: {}'.format(e) self.error(req.context, err_message) self.return_error(resp, falcon.HTTP_500, message=err_message) + @lock_and_thread() def handle(self, req, release, tiller): dry_run = req.get_param_as_bool('dry_run') tiller.rollback_release( diff --git a/armada/api/controller/test.py b/armada/api/controller/test.py index 594b6ff7..e66bcc7e 100644 --- a/armada/api/controller/test.py +++ b/armada/api/controller/test.py @@ -21,6 +21,7 @@ from oslo_config import cfg from armada import api from armada.common import policy from armada import const +from armada.handlers.lock import lock_and_thread, LockException from armada.handlers.manifest import Manifest from armada.handlers.test import Test from armada.utils.release import release_prefixer @@ -36,24 +37,29 @@ class TestReleasesReleaseNameController(api.BaseResource): @policy.enforce('armada:test_release') def on_get(self, req, resp, release): - with self.get_tiller(req, resp) as tiller: - success = self.handle(req, release, tiller) + try: - if success: - msg = { - 'result': 'PASSED: {}'.format(release), - 'message': 'MESSAGE: Test Pass' - } - else: - msg = { - 'result': 'FAILED: {}'.format(release), - 'message': 'MESSAGE: Test Fail' - } + with self.get_tiller(req, resp) as tiller: + success = self.handle(req, release, tiller) - resp.body = json.dumps(msg) - resp.status = falcon.HTTP_200 - resp.content_type = 'application/json' + if success: + msg = { + 'result': 'PASSED: {}'.format(release), + 'message': 'MESSAGE: Test Pass' + } + else: + msg = { + 'result': 'FAILED: {}'.format(release), + 'message': 'MESSAGE: Test Fail' + } + resp.body = json.dumps(msg) + resp.status = falcon.HTTP_200 + resp.content_type = 'application/json' + except LockException as e: + self.return_error(resp, falcon.HTTP_409, message=str(e)) + + @lock_and_thread() def handle(self, req, release, tiller): cleanup = req.get_param_as_bool('cleanup') test_handler = Test({}, release, tiller, cleanup=cleanup) @@ -104,9 +110,13 @@ class TestReleasesManifestController(api.BaseResource): @policy.enforce('armada:test_manifest') def on_post(self, req, resp): # TODO(fmontei): Validation Content-Type is application/x-yaml. - with self.get_tiller(req, resp) as tiller: - return self.handle(req, resp, tiller) + try: + with self.get_tiller(req, resp) as tiller: + return self.handle(req, resp, tiller) + except LockException as e: + self.return_error(resp, falcon.HTTP_409, message=str(e)) + @lock_and_thread() def handle(self, req, resp, tiller): try: documents = self.req_yaml(req, default=[]) diff --git a/armada/cli/apply.py b/armada/cli/apply.py index ee56f29e..34276e49 100644 --- a/armada/cli/apply.py +++ b/armada/cli/apply.py @@ -21,6 +21,7 @@ from armada.cli import CliAction from armada.exceptions.source_exceptions import InvalidPathException from armada.handlers.armada import Armada from armada.handlers.document import ReferenceResolver +from armada.handlers.lock import lock_and_thread from armada.handlers.tiller import Tiller CONF = cfg.CONF @@ -234,6 +235,7 @@ class ApplyManifest(CliAction): manifest=documents, set=self.set, query=query) self.output(resp.get('message')) + @lock_and_thread() def handle(self, documents, tiller): armada = Armada( documents, diff --git a/armada/cli/delete.py b/armada/cli/delete.py index f36cfdfe..d142df18 100644 --- a/armada/cli/delete.py +++ b/armada/cli/delete.py @@ -20,6 +20,7 @@ from oslo_config import cfg from armada.cli import CliAction from armada import const from armada.handlers.chart_delete import ChartDelete +from armada.handlers.lock import lock_and_thread from armada.handlers.manifest import Manifest from armada.handlers.tiller import Tiller from armada.utils.release import release_prefixer @@ -97,6 +98,7 @@ class DeleteChartManifest(CliAction): bearer_token=self.bearer_token) as tiller: self.handle(tiller) + @lock_and_thread() def handle(self, tiller): known_release_names = [release[0] for release in tiller.list_charts()] diff --git a/armada/cli/rollback.py b/armada/cli/rollback.py index f7016865..504276c0 100644 --- a/armada/cli/rollback.py +++ b/armada/cli/rollback.py @@ -16,6 +16,7 @@ import click from oslo_config import cfg from armada.cli import CliAction +from armada.handlers.lock import lock_and_thread from armada.handlers.tiller import Tiller CONF = cfg.CONF @@ -120,6 +121,7 @@ class Rollback(CliAction): self.output(response) + @lock_and_thread() def handle(self, tiller): return tiller.rollback_release( self.release, diff --git a/armada/cli/test.py b/armada/cli/test.py index 315374a6..7e83b160 100644 --- a/armada/cli/test.py +++ b/armada/cli/test.py @@ -19,6 +19,7 @@ from oslo_config import cfg from armada.cli import CliAction from armada import const +from armada.handlers.lock import lock_and_thread from armada.handlers.manifest import Manifest from armada.handlers.test import Test from armada.handlers.tiller import Tiller @@ -116,6 +117,7 @@ class TestChartManifest(CliAction): self.handle(tiller) + @lock_and_thread() def handle(self, tiller): known_release_names = [release[0] for release in tiller.list_charts()] diff --git a/armada/conf/default.py b/armada/conf/default.py index ca4967e3..7af20271 100644 --- a/armada/conf/default.py +++ b/armada/conf/default.py @@ -81,6 +81,31 @@ path to the private key that includes the name of the key itself.""")), 'tiller_release_roles', default=['admin'], help=utils.fmt('IDs of approved API access roles.')), + cfg.IntOpt( + 'lock_acquire_timeout', + default=60, + min=0, + help=utils.fmt("""Time in seconds of how long armada will attempt to + acquire a lock before an exception is raised""")), + cfg.IntOpt( + 'lock_acquire_delay', + default=5, + min=0, + help=utils.fmt("""Time in seconds of how long to wait between attempts + to acquire a lock""")), + cfg.IntOpt( + 'lock_update_interval', + default=60, + min=0, + help=utils.fmt("""Time in seconds of how often armada will update the + lock while it is continuing to do work""")), + cfg.IntOpt( + 'lock_expiration', + default=600, + min=0, + help=utils.fmt("""Time in seconds of how much time needs to pass since + the last update of an existing lock before armada forcibly removes it + and tries to acquire its own lock""")), ] diff --git a/armada/handlers/k8s.py b/armada/handlers/k8s.py index 33f18598..ed745961 100644 --- a/armada/handlers/k8s.py +++ b/armada/handlers/k8s.py @@ -63,6 +63,8 @@ class K8s(object): self.client = client.CoreV1Api(api_client) self.batch_api = client.BatchV1Api(api_client) self.batch_v1beta1_api = client.BatchV1beta1Api(api_client) + self.custom_objects = client.CustomObjectsApi(api_client) + self.api_extensions = client.ApiextensionsV1beta1Api(api_client) self.extension_api = client.ExtensionsV1beta1Api(api_client) self.apps_v1_api = client.AppsV1Api(api_client) @@ -340,3 +342,79 @@ class K8s(object): 'using default %ss.', DEFAULT_K8S_TIMEOUT) timeout = DEFAULT_K8S_TIMEOUT return timeout + + def create_custom_resource_definition(self, crd): + """Creates a custom resource definition + + :param crd: custom resource definition to create + + :type crd: kubernetes.client.V1beta1CustomResourceDefinition + + :return: new custom resource definition + :rtype: kubernetes.client.V1beta1CustomResourceDefinition + """ + return self.api_extensions.create_custom_resource_definition(crd) + + def create_custom_resource(self, group, version, namespace, plural, body): + """Creates a custom resource + + :param group: the custom resource's group + :param version: the custom resource's version + :param namespace: the custom resource's namespace + :param plural: the custom resource's plural name + :param body: the data to go into the body of the custom resource + + :return: k8s client response + :rtype: object + """ + return self.custom_objects.create_namespaced_custom_object( + group, version, namespace, plural, body) + + def delete_custom_resource(self, group, version, namespace, plural, name, + body): + """Deletes a custom resource + + :param group: the custom resource's group + :param version: the custom resource's version + :param namespace: the custom resource's namespace + :param plural: the custom resource's plural name + :param name: the custom resource's full name + :param body: the data to go into the body of the custom resource + + :return: k8s client response + :rtype: object + """ + return self.custom_objects.delete_namespaced_custom_object( + group, version, namespace, plural, name, body) + + def read_custom_resource(self, group, version, namespace, plural, name): + """Gets information on a specified custom resource + + :param group: the custom resource's group + :param version: the custom resource's version + :param namespace: the custom resource's namespace + :param plural: the custom resource's plural name + :param name: the custom resource's full name + + :return: k8s client response + :rtype: object + """ + return self.custom_objects.get_namespaced_custom_object( + group, version, namespace, plural, name) + + def replace_custom_resource(self, group, version, namespace, plural, name, + body): + """Replaces a custom resource + + :param group: the custom resource's group + :param version: the custom resource's version + :param namespace: the custom resource's namespace + :param plural: the custom resource's plural name + :param name: the custom resource's full name + :param body: the data to go into the body of the custom resource + + :return: k8s client response + :rtype: object + """ + return self.custom_objects.replace_namespaced_custom_object( + group, version, namespace, plural, name, body) diff --git a/armada/handlers/lock.py b/armada/handlers/lock.py new file mode 100644 index 00000000..80510804 --- /dev/null +++ b/armada/handlers/lock.py @@ -0,0 +1,301 @@ +# Copyright 2019, AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import functools +import time +from datetime import datetime, timedelta +from concurrent.futures import ThreadPoolExecutor +from kubernetes import client +from kubernetes.client.rest import ApiException +from oslo_config import cfg +from oslo_log import log as logging + +from armada.handlers.k8s import K8s + +CONF = cfg.CONF + +TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ" +# Lock configuration +LOCK_GROUP = "armada.process" +LOCK_VERSION = "v1" +LOCK_NAMESPACE = "kube-system" +LOCK_PLURAL = "locks" +LOCK_SINGULAR = "lock" + +LOG = logging.getLogger(__name__) + + +class LockException(Exception): + pass + + +def lock_and_thread(lock_name="lock"): + """This function creates a thread to execute the wrapped function after + acquiring a lock. While the thread is still running, this function + periodically updates the lock + + :param lock_name: name of the lock to create + """ + + def lock_decorator(func): + + @functools.wraps(func) + def func_wrapper(*args, **kwargs): + with Lock(lock_name) as lock: + pool = ThreadPoolExecutor(1) + future = pool.submit(func, *args, **kwargs) + start = time.time() + while not future.done(): + if time.time() - start > CONF.lock_update_interval: + lock.update_lock() + start = time.time() + time.sleep(1) + return future.result() + + return func_wrapper + + return lock_decorator + + +class Lock: + + def __init__(self, lock_name, additional_data=None): + """Creates a lock with the specified name and data. When a lock with + that name already exists then this will continuously attempt to acquire + it until: + * the attempt times out + * the lock is gone this is able to acquire a new lock + * the existing lock expires, in which case this will forcibly + remove it and continue attempting to acquire the lock + + :param lock_name: name of the lock resource to be created, locks with + different names can coexist and won't conflict with each other + :param additional_data: dict of any additional data to be added to the + lock's `data` section + """ + self.expire_time = CONF.lock_expiration + self.timeout = CONF.lock_acquire_timeout + self.acquire_delay = CONF.lock_acquire_delay + self.lock_config = LockConfig( + name=lock_name, additional_data=additional_data) + + def _test_lock_ownership(self): + # If the uid of the current lock is the same as the one given when we + # created the lock, then it must be the one created by this program + lock = self.lock_config.get_lock() + if lock: + lock_uid = lock['metadata']['uid'] + current_uid = self.lock_config.metadata.get('uid', None) + return current_uid == lock_uid + # The lock must not exist + return False + + def lock_age(self): + lock = self.lock_config.get_lock() + if lock: + creation = lock['data']['lastUpdated'] + creation_time = datetime.strptime(creation, TIME_FORMAT) + return datetime.utcnow() - creation_time + # If no lock exists then 0 is returned so the lock is assuredly not old + # enough to be expired + return 0 + + def acquire_lock(self): + start = time.time() + LOG.info("Acquiring lock") + while (time.time() - start) < self.timeout: + try: + self.lock_config.create_lock() + return True + except ApiException as err: + if err.status == 404: + LOG.info("Lock Custom Resource Definition not found, " + "creating now") + self.lock_config.create_definition() + continue + elif err.status == 409: + # If the exception is 409 then there is already a lock, so + # we should continue with the rest of the logic + LOG.warn("There is already an existing lock") + else: + raise + if self._test_lock_ownership(): + # If there is already a lock that was created by this thread + # then we must have successfully acquired the lock + return True + else: + # There is a lock but it was not created by this thread, which + # means that the only way it should be removed is if the age + # of the lock exceeds the expire time in order to avoid + # removing another thread's lock while it is still working + if self.lock_age() > timedelta(seconds=self.expire_time): + LOG.info("Lock has exceeded expiry time, removing so" + "processing can continue") + self.release_lock() + continue + LOG.debug("Sleeping before attempting to acquire lock again") + time.sleep(self.acquire_delay) + raise LockException("Unable to acquire lock before timeout") + + def release_lock(self): + LOG.info("Releasing lock") + return self.lock_config.delete_lock() + + def update_lock(self): + LOG.debug("Updating lock") + self.lock_config.replace_lock() + + def __enter__(self): + self.acquire_lock() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.release_lock() + return False + + +class LockConfig: + + def __init__(self, name, additional_data=None): + self.name = name + data = additional_data or dict() + self.full_name = "{}.{}.{}".format(LOCK_PLURAL, LOCK_GROUP, self.name) + self.metadata = {'name': self.full_name} + self.body = { + 'kind': "Resource", + 'apiVersion': "{}/{}".format(LOCK_GROUP, LOCK_VERSION), + 'metadata': self.metadata, + 'data': data + } + self.delete_options = {} + + self.k8s = K8s() + + def create_lock(self): + """ Creates the Lock custom resource object + :return: the Lock custom resource object + :rtype: object + """ + self.body['data']['lastUpdated'] = \ + datetime.utcnow().strftime(TIME_FORMAT) + lock = self.k8s.create_custom_resource( + group=LOCK_GROUP, + version=LOCK_VERSION, + namespace=LOCK_NAMESPACE, + plural=LOCK_PLURAL, + body=self.body) + + self.metadata = lock.get('metadata', self.metadata) + return lock + + def get_lock(self): + """Retrieves the Lock custom resource object + + :return: the Lock custom resource object + :rtype: object + """ + try: + return self.k8s.read_custom_resource( + group=LOCK_GROUP, + version=LOCK_VERSION, + namespace=LOCK_NAMESPACE, + plural=LOCK_PLURAL, + name=self.full_name) + + except ApiException as err: + if err.status == 404: + return None + raise + + def delete_lock(self): + """Deletes the Lock custom resource + + :return: whether it was successfully deleted + :rtype: bool + """ + try: + self.k8s.delete_custom_resource( + group=LOCK_GROUP, + version=LOCK_VERSION, + namespace=LOCK_NAMESPACE, + plural=LOCK_PLURAL, + name=self.full_name, + body=self.delete_options) + + return True + except ApiException as err: + # If it returns 404 then something else deleted it + if err.status == 404: + return True + raise + + def replace_lock(self): + """Updates the Lock custom resource object with a new lastUpdated time + + :return: the Lock custom resource object + :rtype: object + """ + self.body['metadata']['resourceVersion'] = \ + self.metadata['resourceVersion'] + self.body['data']['lastUpdated'] = \ + datetime.utcnow().strftime(TIME_FORMAT) + lock = self.k8s.replace_custom_resource( + group=LOCK_GROUP, + version=LOCK_VERSION, + namespace=LOCK_NAMESPACE, + plural=LOCK_PLURAL, + name=self.full_name, + body=self.body) + + self.metadata = lock.get('metadata', self.metadata) + return lock + + def create_definition(self): + names = client.V1beta1CustomResourceDefinitionNames( + kind="Resource", plural=LOCK_PLURAL, singular=LOCK_SINGULAR) + metadata = client.V1ObjectMeta( + name="{}.{}".format(LOCK_PLURAL, LOCK_GROUP), + resource_version=LOCK_VERSION) + status = client.V1beta1CustomResourceDefinitionStatus( + accepted_names=names, + conditions=[], + stored_versions=[LOCK_VERSION]) + spec = client.V1beta1CustomResourceDefinitionSpec( + group=LOCK_GROUP, + names=names, + scope="Namespaced", + version=LOCK_VERSION) + crd = client.V1beta1CustomResourceDefinition( + spec=spec, + status=status, + metadata=metadata, + kind="CustomResourceDefinition") + try: + self.k8s.create_custom_resource_definition(crd) + except ValueError as err: + # Because of an issue with the Kubernetes code, the API server + # may return `null` for the required field `conditions` in + # kubernetes.client.V1beta1CustomResourceDefinitionStatus + # This causes validation to fail which will raise the subsequent + # ValueError even though the CRD was created successfully + # https://github.com/kubernetes-client/gen/issues/52 + # TODO if this is fixed upstream this should be removed + known_msg = "Invalid value for `conditions`, must not be `None`" + known_err = ValueError(known_msg) + if err.args != known_err.args: + raise + LOG.debug("Encountered known issue while creating CRD, continuing") + except ApiException as err: + # If a 409 is received then the definition already exists + if err.status != 409: + raise diff --git a/armada/tests/unit/api/test_armada_controller.py b/armada/tests/unit/api/test_armada_controller.py index 0985d69b..3b96782d 100644 --- a/armada/tests/unit/api/test_armada_controller.py +++ b/armada/tests/unit/api/test_armada_controller.py @@ -26,6 +26,8 @@ from armada.tests.unit.api import base CONF = cfg.CONF +@mock.patch.object(armada_api.Apply, 'handle', + armada_api.Apply.handle.__wrapped__) class ArmadaControllerTest(base.BaseControllerTest): @mock.patch.object(api, 'Tiller') diff --git a/armada/tests/unit/api/test_rollback_controller.py b/armada/tests/unit/api/test_rollback_controller.py index df43b912..e2b46a12 100644 --- a/armada/tests/unit/api/test_rollback_controller.py +++ b/armada/tests/unit/api/test_rollback_controller.py @@ -20,8 +20,11 @@ from armada import api from armada.common.policies import base as policy_base from armada.tests import test_utils from armada.tests.unit.api import base +from armada.api.controller import rollback +@mock.patch.object(rollback.Rollback, 'handle', + rollback.Rollback.handle.__wrapped__) class RollbackReleaseControllerTest(base.BaseControllerTest): @mock.patch.object(api, 'Tiller') diff --git a/armada/tests/unit/api/test_test_controller.py b/armada/tests/unit/api/test_test_controller.py index d5f7fa49..140e19e4 100644 --- a/armada/tests/unit/api/test_test_controller.py +++ b/armada/tests/unit/api/test_test_controller.py @@ -26,6 +26,8 @@ from armada.tests import test_utils from armada.tests.unit.api import base +@mock.patch.object(test.TestReleasesManifestController, 'handle', + test.TestReleasesManifestController.handle.__wrapped__) class TestReleasesManifestControllerTest(base.BaseControllerTest): @mock.patch.object(test, 'Manifest') @@ -57,6 +59,8 @@ class TestReleasesManifestControllerTest(base.BaseControllerTest): m_tiller.__exit__.assert_called() +@mock.patch.object(test.TestReleasesReleaseNameController, 'handle', + test.TestReleasesReleaseNameController.handle.__wrapped__) class TestReleasesReleaseNameControllerTest(base.BaseControllerTest): @mock.patch.object(test.Test, 'test_release_for_success') @@ -119,6 +123,8 @@ class TestReleasesReleaseNameControllerTest(base.BaseControllerTest): @test_utils.attr(type=['negative']) +@mock.patch.object(test.TestReleasesManifestController, 'handle', + test.TestReleasesManifestController.handle.__wrapped__) class TestReleasesManifestControllerNegativeTest(base.BaseControllerTest): @mock.patch.object(test, 'Manifest') diff --git a/armada/tests/unit/handlers/test_lock.py b/armada/tests/unit/handlers/test_lock.py new file mode 100644 index 00000000..65f3b8a1 --- /dev/null +++ b/armada/tests/unit/handlers/test_lock.py @@ -0,0 +1,232 @@ +# Copyright 2019, AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import copy +from datetime import datetime + +import mock +import testtools + +from kubernetes.client.rest import ApiException +from armada.handlers import lock + + +@mock.patch('armada.handlers.lock.K8s') +@mock.patch.object(lock.time, 'sleep', lambda x: True) +class LockTestCase(testtools.TestCase): + + def __init__(self, *args, **kwargs): + super(LockTestCase, self).__init__(*args, **kwargs) + self.resp = None + self.test_lock = None + self.mock_create = None + self.mock_read = None + self.mock_delete = None + self.mock_replace = None + self.mock_create_crd = None + + def setUp(self): + super(LockTestCase, self).setUp() + self_link = "/apis/armada.tiller/v1/namespaces/default/locks/"\ + "locks.armada.tiller.test" + self.resp = { + 'apiVersion': "armada.tiller/v1", + 'data': { + 'lastUpdated': "2019-01-22T16:20:14Z" + }, + 'metadata': { + 'resourceVersion': "95961", + 'generation': 1, + 'name': "locks.armada.process.test", + 'creationTimestamp': "2019-01-22T16:20:14Z", + 'uid': "9930c9a0-1e61-11e9-9e5a-0800276b7c7d", + 'clusterName': "", + 'namespace': "default", + 'selfLink': self_link + }, + 'kind': "Resource" + } + with mock.patch("armada.handlers.lock.K8s"): + self.test_lock = lock.Lock("test") + self.test_lock.timeout = 1 + self.test_lock.acquire_delay = 0.1 + self.test_lock.expire_time = 10 + + # Mocking the methods of self.k8s for the LockConfig + mock_k8s = self.test_lock.lock_config.k8s = mock.Mock() + self.mock_create = mock_k8s.create_custom_resource = mock.Mock() + self.mock_read = mock_k8s.read_custom_resource = mock.Mock() + self.mock_delete = mock_k8s.delete_custom_resource = mock.Mock() + self.mock_replace = mock_k8s.replace_custom_resource = mock.Mock() + self.mock_create_crd = mock_k8s.create_custom_resource_definition \ + = mock.Mock() + + def test_get_lock(self, _): + try: + # read needs to raise a 404 when the lock doesn't exist + self.mock_read.side_effect = ApiException(status=404) + mock_read = self.mock_read + resp = self.resp + + def update_get_and_set_return(*args, **kwargs): + # Once the lock is 'created' it should no longer raise err + mock_read.read_custom_resource.side_effect = None + mock_read.read_custom_resource.return_value = resp + # Set the mock_create return to return the new lock + return resp + + self.mock_create.side_effect = update_get_and_set_return + + self.test_lock.acquire_lock() + except lock.LockException: + self.fail("acquire_lock() raised LockException unexpectedly") + except ApiException: + self.fail("acquire_lock() raised ApiException unexpectedly") + try: + self.test_lock.release_lock() + except lock.LockException: + self.fail("release_lock() raised LockException unexpectedly") + except ApiException: + self.fail("acquire_lock() raised ApiException unexpectedly") + + @mock.patch('armada.handlers.lock.time', autospec=True) + def test_timeout_getting_lock(self, mock_time, _): + # The timestamp on the 'lock' will be new to avoid expiring + last_update = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ') + self.resp['data']['lastUpdated'] = str(last_update) + # Mocking time.time() so that acquire_lock() is run through once, and + # once the time is checked again the timeout will be reached + test_time = 1550510151.792119 + mock_time.time = mock.Mock() + + def set_time(): + nonlocal test_time + test_time += self.test_lock.timeout / 2 + return test_time + + mock_time.time.side_effect = set_time + + # Creating large expire time so the lock doesn't get overwritten + self.test_lock.expire_time = 60 + # Updating mocks so that there is always a 'lock' + self.mock_create.side_effect = ApiException(status=409) + self.mock_read.return_value = self.resp + + # It should fail to acquire the lock before the attempt times out + self.assertRaises(lock.LockException, self.test_lock.acquire_lock) + + def test_lock_expiration(self, _): + # Timestamp on the 'lock' is old to ensure lock is expired + self.resp['data']['lastUpdated'] = "2018-01-22T16:20:14Z" + + # When the lock already exists, Kubernetes responds with a 409 + self.mock_create.side_effect = ApiException(status=409) + # Getting the lock should return the 'lock' above + self.mock_read.return_value = self.resp + + # New return value of create should have a newer timestamp + new_resp = copy.deepcopy(self.resp) + new_time = str(datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')) + new_resp['metadata']['creationTimestamp'] = new_time + mock_create = self.mock_create + + def clear_side_effect(*args, **kwargs): + mock_create.side_effect = None + mock_create.return_value = new_resp + + # Once the lock is 'deleted' we need to stop create from raising err + self.mock_delete.side_effect = clear_side_effect + + try: + self.test_lock.acquire_lock() + except lock.LockException: + self.fail("acquire_lock() raised LockException unexpectedly") + + def test_custom_resource_definition_creation(self, _): + # When the crd doesn't exist yet, Kubernetes responds with a 404 when + # trying to create a lock + self.mock_create.side_effect = ApiException(status=404) + mock_create = self.mock_create + resp = self.resp + + def clear_side_effect(*args, **kwargs): + mock_create.side_effect = None + mock_create.return_value = resp + + # Once the definition is 'created' we need to stop raising err + self.mock_create_crd.side_effect = clear_side_effect + + try: + self.test_lock.acquire_lock() + except lock.LockException: + self.fail("acquire_lock() raised LockException unexpectedly") + + @mock.patch.object(lock.CONF, "lock_update_interval", 0.1) + @mock.patch('armada.handlers.lock.ThreadPoolExecutor') + @mock.patch('armada.handlers.lock.time', autospec=True) + def test_lock_decorator(self, mock_time, mock_thread, _): + # read needs to raise a 404 when the lock doesn't exist + self.mock_read.side_effect = ApiException(status=404) + mock_read = self.mock_read + resp = self.resp + + def update_get_and_set_return(*args, **kwargs): + # Once the lock is 'created' it should no longer raise err + mock_read.read_custom_resource.side_effect = None + mock_read.read_custom_resource.return_value = resp + # Set the mock_create return to return the new lock + return resp + + self.mock_create.side_effect = update_get_and_set_return + self.mock_replace.return_value = self.resp + + # Mocking the threading in lock_and_thread + mock_pool = mock_thread.return_value = mock.Mock() + mock_pool.submit = mock.Mock() + mock_future = mock_pool.submit.return_value = mock.Mock() + mock_future.done = mock.Mock() + # future.done() needs to return false so lock.update_lock() gets called + mock_future.done.return_value = False + + def clear_done(): + mock_future.done.return_value = True + mock_future.done.side_effect = None + + # After future.done() is called once it can be cleared and return True + mock_future.done.side_effect = clear_done + + # Mocking time.time() so it appears that more time has passed than + # CONF.lock_update_interval so update_lock() is run + # This also affects the acquire_lock() timeout check, which is why + # the lock_update_interval is mocked to be a low number + test_time = 1550510151.792119 + mock_time.time = mock.Mock() + + def set_time(): + nonlocal test_time + test_time += lock.CONF.lock_update_interval + 1 + return test_time + + mock_time.time.side_effect = set_time + + def func(): + return + + test_func_dec = lock.lock_and_thread()(func) + test_func_dec.lock = self.test_lock + try: + test_func_dec() + except lock.LockException: + self.fail("acquire_lock() raised LockException unexpectedly") + except ApiException: + self.fail("acquire_lock() raised ApiException unexpectedly")