diff --git a/armada/api/controller/armada.py b/armada/api/controller/armada.py index 73a3e074..9dfb9d5d 100644 --- a/armada/api/controller/armada.py +++ b/armada/api/controller/armada.py @@ -22,6 +22,7 @@ from armada.common import policy from armada import exceptions from armada.handlers.armada import Armada from armada.handlers.document import ReferenceResolver +from armada.handlers.lock import lock_and_thread, LockException from armada.handlers.override import Override @@ -78,12 +79,15 @@ class Apply(api.BaseResource): except exceptions.ManifestException as e: self.return_error(resp, falcon.HTTP_400, message=str(e)) + except LockException as e: + self.return_error(resp, falcon.HTTP_409, message=str(e)) except Exception as e: self.logger.exception('Caught unexpected exception') err_message = 'Failed to apply manifest: {}'.format(e) self.error(req.context, err_message) self.return_error(resp, falcon.HTTP_500, message=err_message) + @lock_and_thread() def handle(self, req, documents, tiller): armada = Armada( documents, diff --git a/armada/api/controller/rollback.py b/armada/api/controller/rollback.py index 78bc933d..ba6ab0a6 100644 --- a/armada/api/controller/rollback.py +++ b/armada/api/controller/rollback.py @@ -19,6 +19,7 @@ from oslo_config import cfg from armada import api from armada.common import policy +from armada.handlers.lock import lock_and_thread, LockException CONF = cfg.CONF @@ -37,12 +38,15 @@ class Rollback(api.BaseResource): }) resp.content_type = 'application/json' resp.status = falcon.HTTP_200 + except LockException as e: + self.return_error(resp, falcon.HTTP_409, message=str(e)) except Exception as e: self.logger.exception('Caught unexpected exception') err_message = 'Failed to rollback release: {}'.format(e) self.error(req.context, err_message) self.return_error(resp, falcon.HTTP_500, message=err_message) + @lock_and_thread() def handle(self, req, release, tiller): dry_run = req.get_param_as_bool('dry_run') tiller.rollback_release( diff --git a/armada/api/controller/test.py b/armada/api/controller/test.py index 594b6ff7..e66bcc7e 100644 --- a/armada/api/controller/test.py +++ b/armada/api/controller/test.py @@ -21,6 +21,7 @@ from oslo_config import cfg from armada import api from armada.common import policy from armada import const +from armada.handlers.lock import lock_and_thread, LockException from armada.handlers.manifest import Manifest from armada.handlers.test import Test from armada.utils.release import release_prefixer @@ -36,24 +37,29 @@ class TestReleasesReleaseNameController(api.BaseResource): @policy.enforce('armada:test_release') def on_get(self, req, resp, release): - with self.get_tiller(req, resp) as tiller: - success = self.handle(req, release, tiller) + try: - if success: - msg = { - 'result': 'PASSED: {}'.format(release), - 'message': 'MESSAGE: Test Pass' - } - else: - msg = { - 'result': 'FAILED: {}'.format(release), - 'message': 'MESSAGE: Test Fail' - } + with self.get_tiller(req, resp) as tiller: + success = self.handle(req, release, tiller) - resp.body = json.dumps(msg) - resp.status = falcon.HTTP_200 - resp.content_type = 'application/json' + if success: + msg = { + 'result': 'PASSED: {}'.format(release), + 'message': 'MESSAGE: Test Pass' + } + else: + msg = { + 'result': 'FAILED: {}'.format(release), + 'message': 'MESSAGE: Test Fail' + } + resp.body = json.dumps(msg) + resp.status = falcon.HTTP_200 + resp.content_type = 'application/json' + except LockException as e: + self.return_error(resp, falcon.HTTP_409, message=str(e)) + + @lock_and_thread() def handle(self, req, release, tiller): cleanup = req.get_param_as_bool('cleanup') test_handler = Test({}, release, tiller, cleanup=cleanup) @@ -104,9 +110,13 @@ class TestReleasesManifestController(api.BaseResource): @policy.enforce('armada:test_manifest') def on_post(self, req, resp): # TODO(fmontei): Validation Content-Type is application/x-yaml. - with self.get_tiller(req, resp) as tiller: - return self.handle(req, resp, tiller) + try: + with self.get_tiller(req, resp) as tiller: + return self.handle(req, resp, tiller) + except LockException as e: + self.return_error(resp, falcon.HTTP_409, message=str(e)) + @lock_and_thread() def handle(self, req, resp, tiller): try: documents = self.req_yaml(req, default=[]) diff --git a/armada/cli/apply.py b/armada/cli/apply.py index ee56f29e..34276e49 100644 --- a/armada/cli/apply.py +++ b/armada/cli/apply.py @@ -21,6 +21,7 @@ from armada.cli import CliAction from armada.exceptions.source_exceptions import InvalidPathException from armada.handlers.armada import Armada from armada.handlers.document import ReferenceResolver +from armada.handlers.lock import lock_and_thread from armada.handlers.tiller import Tiller CONF = cfg.CONF @@ -234,6 +235,7 @@ class ApplyManifest(CliAction): manifest=documents, set=self.set, query=query) self.output(resp.get('message')) + @lock_and_thread() def handle(self, documents, tiller): armada = Armada( documents, diff --git a/armada/cli/delete.py b/armada/cli/delete.py index f36cfdfe..d142df18 100644 --- a/armada/cli/delete.py +++ b/armada/cli/delete.py @@ -20,6 +20,7 @@ from oslo_config import cfg from armada.cli import CliAction from armada import const from armada.handlers.chart_delete import ChartDelete +from armada.handlers.lock import lock_and_thread from armada.handlers.manifest import Manifest from armada.handlers.tiller import Tiller from armada.utils.release import release_prefixer @@ -97,6 +98,7 @@ class DeleteChartManifest(CliAction): bearer_token=self.bearer_token) as tiller: self.handle(tiller) + @lock_and_thread() def handle(self, tiller): known_release_names = [release[0] for release in tiller.list_charts()] diff --git a/armada/cli/rollback.py b/armada/cli/rollback.py index f7016865..504276c0 100644 --- a/armada/cli/rollback.py +++ b/armada/cli/rollback.py @@ -16,6 +16,7 @@ import click from oslo_config import cfg from armada.cli import CliAction +from armada.handlers.lock import lock_and_thread from armada.handlers.tiller import Tiller CONF = cfg.CONF @@ -120,6 +121,7 @@ class Rollback(CliAction): self.output(response) + @lock_and_thread() def handle(self, tiller): return tiller.rollback_release( self.release, diff --git a/armada/cli/test.py b/armada/cli/test.py index 315374a6..7e83b160 100644 --- a/armada/cli/test.py +++ b/armada/cli/test.py @@ -19,6 +19,7 @@ from oslo_config import cfg from armada.cli import CliAction from armada import const +from armada.handlers.lock import lock_and_thread from armada.handlers.manifest import Manifest from armada.handlers.test import Test from armada.handlers.tiller import Tiller @@ -116,6 +117,7 @@ class TestChartManifest(CliAction): self.handle(tiller) + @lock_and_thread() def handle(self, tiller): known_release_names = [release[0] for release in tiller.list_charts()] diff --git a/armada/conf/default.py b/armada/conf/default.py index ca4967e3..7af20271 100644 --- a/armada/conf/default.py +++ b/armada/conf/default.py @@ -81,6 +81,31 @@ path to the private key that includes the name of the key itself.""")), 'tiller_release_roles', default=['admin'], help=utils.fmt('IDs of approved API access roles.')), + cfg.IntOpt( + 'lock_acquire_timeout', + default=60, + min=0, + help=utils.fmt("""Time in seconds of how long armada will attempt to + acquire a lock before an exception is raised""")), + cfg.IntOpt( + 'lock_acquire_delay', + default=5, + min=0, + help=utils.fmt("""Time in seconds of how long to wait between attempts + to acquire a lock""")), + cfg.IntOpt( + 'lock_update_interval', + default=60, + min=0, + help=utils.fmt("""Time in seconds of how often armada will update the + lock while it is continuing to do work""")), + cfg.IntOpt( + 'lock_expiration', + default=600, + min=0, + help=utils.fmt("""Time in seconds of how much time needs to pass since + the last update of an existing lock before armada forcibly removes it + and tries to acquire its own lock""")), ] diff --git a/armada/handlers/k8s.py b/armada/handlers/k8s.py index 33f18598..ed745961 100644 --- a/armada/handlers/k8s.py +++ b/armada/handlers/k8s.py @@ -63,6 +63,8 @@ class K8s(object): self.client = client.CoreV1Api(api_client) self.batch_api = client.BatchV1Api(api_client) self.batch_v1beta1_api = client.BatchV1beta1Api(api_client) + self.custom_objects = client.CustomObjectsApi(api_client) + self.api_extensions = client.ApiextensionsV1beta1Api(api_client) self.extension_api = client.ExtensionsV1beta1Api(api_client) self.apps_v1_api = client.AppsV1Api(api_client) @@ -340,3 +342,79 @@ class K8s(object): 'using default %ss.', DEFAULT_K8S_TIMEOUT) timeout = DEFAULT_K8S_TIMEOUT return timeout + + def create_custom_resource_definition(self, crd): + """Creates a custom resource definition + + :param crd: custom resource definition to create + + :type crd: kubernetes.client.V1beta1CustomResourceDefinition + + :return: new custom resource definition + :rtype: kubernetes.client.V1beta1CustomResourceDefinition + """ + return self.api_extensions.create_custom_resource_definition(crd) + + def create_custom_resource(self, group, version, namespace, plural, body): + """Creates a custom resource + + :param group: the custom resource's group + :param version: the custom resource's version + :param namespace: the custom resource's namespace + :param plural: the custom resource's plural name + :param body: the data to go into the body of the custom resource + + :return: k8s client response + :rtype: object + """ + return self.custom_objects.create_namespaced_custom_object( + group, version, namespace, plural, body) + + def delete_custom_resource(self, group, version, namespace, plural, name, + body): + """Deletes a custom resource + + :param group: the custom resource's group + :param version: the custom resource's version + :param namespace: the custom resource's namespace + :param plural: the custom resource's plural name + :param name: the custom resource's full name + :param body: the data to go into the body of the custom resource + + :return: k8s client response + :rtype: object + """ + return self.custom_objects.delete_namespaced_custom_object( + group, version, namespace, plural, name, body) + + def read_custom_resource(self, group, version, namespace, plural, name): + """Gets information on a specified custom resource + + :param group: the custom resource's group + :param version: the custom resource's version + :param namespace: the custom resource's namespace + :param plural: the custom resource's plural name + :param name: the custom resource's full name + + :return: k8s client response + :rtype: object + """ + return self.custom_objects.get_namespaced_custom_object( + group, version, namespace, plural, name) + + def replace_custom_resource(self, group, version, namespace, plural, name, + body): + """Replaces a custom resource + + :param group: the custom resource's group + :param version: the custom resource's version + :param namespace: the custom resource's namespace + :param plural: the custom resource's plural name + :param name: the custom resource's full name + :param body: the data to go into the body of the custom resource + + :return: k8s client response + :rtype: object + """ + return self.custom_objects.replace_namespaced_custom_object( + group, version, namespace, plural, name, body) diff --git a/armada/handlers/lock.py b/armada/handlers/lock.py new file mode 100644 index 00000000..80510804 --- /dev/null +++ b/armada/handlers/lock.py @@ -0,0 +1,301 @@ +# Copyright 2019, AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import functools +import time +from datetime import datetime, timedelta +from concurrent.futures import ThreadPoolExecutor +from kubernetes import client +from kubernetes.client.rest import ApiException +from oslo_config import cfg +from oslo_log import log as logging + +from armada.handlers.k8s import K8s + +CONF = cfg.CONF + +TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ" +# Lock configuration +LOCK_GROUP = "armada.process" +LOCK_VERSION = "v1" +LOCK_NAMESPACE = "kube-system" +LOCK_PLURAL = "locks" +LOCK_SINGULAR = "lock" + +LOG = logging.getLogger(__name__) + + +class LockException(Exception): + pass + + +def lock_and_thread(lock_name="lock"): + """This function creates a thread to execute the wrapped function after + acquiring a lock. While the thread is still running, this function + periodically updates the lock + + :param lock_name: name of the lock to create + """ + + def lock_decorator(func): + + @functools.wraps(func) + def func_wrapper(*args, **kwargs): + with Lock(lock_name) as lock: + pool = ThreadPoolExecutor(1) + future = pool.submit(func, *args, **kwargs) + start = time.time() + while not future.done(): + if time.time() - start > CONF.lock_update_interval: + lock.update_lock() + start = time.time() + time.sleep(1) + return future.result() + + return func_wrapper + + return lock_decorator + + +class Lock: + + def __init__(self, lock_name, additional_data=None): + """Creates a lock with the specified name and data. When a lock with + that name already exists then this will continuously attempt to acquire + it until: + * the attempt times out + * the lock is gone this is able to acquire a new lock + * the existing lock expires, in which case this will forcibly + remove it and continue attempting to acquire the lock + + :param lock_name: name of the lock resource to be created, locks with + different names can coexist and won't conflict with each other + :param additional_data: dict of any additional data to be added to the + lock's `data` section + """ + self.expire_time = CONF.lock_expiration + self.timeout = CONF.lock_acquire_timeout + self.acquire_delay = CONF.lock_acquire_delay + self.lock_config = LockConfig( + name=lock_name, additional_data=additional_data) + + def _test_lock_ownership(self): + # If the uid of the current lock is the same as the one given when we + # created the lock, then it must be the one created by this program + lock = self.lock_config.get_lock() + if lock: + lock_uid = lock['metadata']['uid'] + current_uid = self.lock_config.metadata.get('uid', None) + return current_uid == lock_uid + # The lock must not exist + return False + + def lock_age(self): + lock = self.lock_config.get_lock() + if lock: + creation = lock['data']['lastUpdated'] + creation_time = datetime.strptime(creation, TIME_FORMAT) + return datetime.utcnow() - creation_time + # If no lock exists then 0 is returned so the lock is assuredly not old + # enough to be expired + return 0 + + def acquire_lock(self): + start = time.time() + LOG.info("Acquiring lock") + while (time.time() - start) < self.timeout: + try: + self.lock_config.create_lock() + return True + except ApiException as err: + if err.status == 404: + LOG.info("Lock Custom Resource Definition not found, " + "creating now") + self.lock_config.create_definition() + continue + elif err.status == 409: + # If the exception is 409 then there is already a lock, so + # we should continue with the rest of the logic + LOG.warn("There is already an existing lock") + else: + raise + if self._test_lock_ownership(): + # If there is already a lock that was created by this thread + # then we must have successfully acquired the lock + return True + else: + # There is a lock but it was not created by this thread, which + # means that the only way it should be removed is if the age + # of the lock exceeds the expire time in order to avoid + # removing another thread's lock while it is still working + if self.lock_age() > timedelta(seconds=self.expire_time): + LOG.info("Lock has exceeded expiry time, removing so" + "processing can continue") + self.release_lock() + continue + LOG.debug("Sleeping before attempting to acquire lock again") + time.sleep(self.acquire_delay) + raise LockException("Unable to acquire lock before timeout") + + def release_lock(self): + LOG.info("Releasing lock") + return self.lock_config.delete_lock() + + def update_lock(self): + LOG.debug("Updating lock") + self.lock_config.replace_lock() + + def __enter__(self): + self.acquire_lock() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.release_lock() + return False + + +class LockConfig: + + def __init__(self, name, additional_data=None): + self.name = name + data = additional_data or dict() + self.full_name = "{}.{}.{}".format(LOCK_PLURAL, LOCK_GROUP, self.name) + self.metadata = {'name': self.full_name} + self.body = { + 'kind': "Resource", + 'apiVersion': "{}/{}".format(LOCK_GROUP, LOCK_VERSION), + 'metadata': self.metadata, + 'data': data + } + self.delete_options = {} + + self.k8s = K8s() + + def create_lock(self): + """ Creates the Lock custom resource object + :return: the Lock custom resource object + :rtype: object + """ + self.body['data']['lastUpdated'] = \ + datetime.utcnow().strftime(TIME_FORMAT) + lock = self.k8s.create_custom_resource( + group=LOCK_GROUP, + version=LOCK_VERSION, + namespace=LOCK_NAMESPACE, + plural=LOCK_PLURAL, + body=self.body) + + self.metadata = lock.get('metadata', self.metadata) + return lock + + def get_lock(self): + """Retrieves the Lock custom resource object + + :return: the Lock custom resource object + :rtype: object + """ + try: + return self.k8s.read_custom_resource( + group=LOCK_GROUP, + version=LOCK_VERSION, + namespace=LOCK_NAMESPACE, + plural=LOCK_PLURAL, + name=self.full_name) + + except ApiException as err: + if err.status == 404: + return None + raise + + def delete_lock(self): + """Deletes the Lock custom resource + + :return: whether it was successfully deleted + :rtype: bool + """ + try: + self.k8s.delete_custom_resource( + group=LOCK_GROUP, + version=LOCK_VERSION, + namespace=LOCK_NAMESPACE, + plural=LOCK_PLURAL, + name=self.full_name, + body=self.delete_options) + + return True + except ApiException as err: + # If it returns 404 then something else deleted it + if err.status == 404: + return True + raise + + def replace_lock(self): + """Updates the Lock custom resource object with a new lastUpdated time + + :return: the Lock custom resource object + :rtype: object + """ + self.body['metadata']['resourceVersion'] = \ + self.metadata['resourceVersion'] + self.body['data']['lastUpdated'] = \ + datetime.utcnow().strftime(TIME_FORMAT) + lock = self.k8s.replace_custom_resource( + group=LOCK_GROUP, + version=LOCK_VERSION, + namespace=LOCK_NAMESPACE, + plural=LOCK_PLURAL, + name=self.full_name, + body=self.body) + + self.metadata = lock.get('metadata', self.metadata) + return lock + + def create_definition(self): + names = client.V1beta1CustomResourceDefinitionNames( + kind="Resource", plural=LOCK_PLURAL, singular=LOCK_SINGULAR) + metadata = client.V1ObjectMeta( + name="{}.{}".format(LOCK_PLURAL, LOCK_GROUP), + resource_version=LOCK_VERSION) + status = client.V1beta1CustomResourceDefinitionStatus( + accepted_names=names, + conditions=[], + stored_versions=[LOCK_VERSION]) + spec = client.V1beta1CustomResourceDefinitionSpec( + group=LOCK_GROUP, + names=names, + scope="Namespaced", + version=LOCK_VERSION) + crd = client.V1beta1CustomResourceDefinition( + spec=spec, + status=status, + metadata=metadata, + kind="CustomResourceDefinition") + try: + self.k8s.create_custom_resource_definition(crd) + except ValueError as err: + # Because of an issue with the Kubernetes code, the API server + # may return `null` for the required field `conditions` in + # kubernetes.client.V1beta1CustomResourceDefinitionStatus + # This causes validation to fail which will raise the subsequent + # ValueError even though the CRD was created successfully + # https://github.com/kubernetes-client/gen/issues/52 + # TODO if this is fixed upstream this should be removed + known_msg = "Invalid value for `conditions`, must not be `None`" + known_err = ValueError(known_msg) + if err.args != known_err.args: + raise + LOG.debug("Encountered known issue while creating CRD, continuing") + except ApiException as err: + # If a 409 is received then the definition already exists + if err.status != 409: + raise diff --git a/armada/tests/unit/api/test_armada_controller.py b/armada/tests/unit/api/test_armada_controller.py index 0985d69b..3b96782d 100644 --- a/armada/tests/unit/api/test_armada_controller.py +++ b/armada/tests/unit/api/test_armada_controller.py @@ -26,6 +26,8 @@ from armada.tests.unit.api import base CONF = cfg.CONF +@mock.patch.object(armada_api.Apply, 'handle', + armada_api.Apply.handle.__wrapped__) class ArmadaControllerTest(base.BaseControllerTest): @mock.patch.object(api, 'Tiller') diff --git a/armada/tests/unit/api/test_rollback_controller.py b/armada/tests/unit/api/test_rollback_controller.py index df43b912..e2b46a12 100644 --- a/armada/tests/unit/api/test_rollback_controller.py +++ b/armada/tests/unit/api/test_rollback_controller.py @@ -20,8 +20,11 @@ from armada import api from armada.common.policies import base as policy_base from armada.tests import test_utils from armada.tests.unit.api import base +from armada.api.controller import rollback +@mock.patch.object(rollback.Rollback, 'handle', + rollback.Rollback.handle.__wrapped__) class RollbackReleaseControllerTest(base.BaseControllerTest): @mock.patch.object(api, 'Tiller') diff --git a/armada/tests/unit/api/test_test_controller.py b/armada/tests/unit/api/test_test_controller.py index d5f7fa49..140e19e4 100644 --- a/armada/tests/unit/api/test_test_controller.py +++ b/armada/tests/unit/api/test_test_controller.py @@ -26,6 +26,8 @@ from armada.tests import test_utils from armada.tests.unit.api import base +@mock.patch.object(test.TestReleasesManifestController, 'handle', + test.TestReleasesManifestController.handle.__wrapped__) class TestReleasesManifestControllerTest(base.BaseControllerTest): @mock.patch.object(test, 'Manifest') @@ -57,6 +59,8 @@ class TestReleasesManifestControllerTest(base.BaseControllerTest): m_tiller.__exit__.assert_called() +@mock.patch.object(test.TestReleasesReleaseNameController, 'handle', + test.TestReleasesReleaseNameController.handle.__wrapped__) class TestReleasesReleaseNameControllerTest(base.BaseControllerTest): @mock.patch.object(test.Test, 'test_release_for_success') @@ -119,6 +123,8 @@ class TestReleasesReleaseNameControllerTest(base.BaseControllerTest): @test_utils.attr(type=['negative']) +@mock.patch.object(test.TestReleasesManifestController, 'handle', + test.TestReleasesManifestController.handle.__wrapped__) class TestReleasesManifestControllerNegativeTest(base.BaseControllerTest): @mock.patch.object(test, 'Manifest') diff --git a/armada/tests/unit/handlers/test_lock.py b/armada/tests/unit/handlers/test_lock.py new file mode 100644 index 00000000..65f3b8a1 --- /dev/null +++ b/armada/tests/unit/handlers/test_lock.py @@ -0,0 +1,232 @@ +# Copyright 2019, AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import copy +from datetime import datetime + +import mock +import testtools + +from kubernetes.client.rest import ApiException +from armada.handlers import lock + + +@mock.patch('armada.handlers.lock.K8s') +@mock.patch.object(lock.time, 'sleep', lambda x: True) +class LockTestCase(testtools.TestCase): + + def __init__(self, *args, **kwargs): + super(LockTestCase, self).__init__(*args, **kwargs) + self.resp = None + self.test_lock = None + self.mock_create = None + self.mock_read = None + self.mock_delete = None + self.mock_replace = None + self.mock_create_crd = None + + def setUp(self): + super(LockTestCase, self).setUp() + self_link = "/apis/armada.tiller/v1/namespaces/default/locks/"\ + "locks.armada.tiller.test" + self.resp = { + 'apiVersion': "armada.tiller/v1", + 'data': { + 'lastUpdated': "2019-01-22T16:20:14Z" + }, + 'metadata': { + 'resourceVersion': "95961", + 'generation': 1, + 'name': "locks.armada.process.test", + 'creationTimestamp': "2019-01-22T16:20:14Z", + 'uid': "9930c9a0-1e61-11e9-9e5a-0800276b7c7d", + 'clusterName': "", + 'namespace': "default", + 'selfLink': self_link + }, + 'kind': "Resource" + } + with mock.patch("armada.handlers.lock.K8s"): + self.test_lock = lock.Lock("test") + self.test_lock.timeout = 1 + self.test_lock.acquire_delay = 0.1 + self.test_lock.expire_time = 10 + + # Mocking the methods of self.k8s for the LockConfig + mock_k8s = self.test_lock.lock_config.k8s = mock.Mock() + self.mock_create = mock_k8s.create_custom_resource = mock.Mock() + self.mock_read = mock_k8s.read_custom_resource = mock.Mock() + self.mock_delete = mock_k8s.delete_custom_resource = mock.Mock() + self.mock_replace = mock_k8s.replace_custom_resource = mock.Mock() + self.mock_create_crd = mock_k8s.create_custom_resource_definition \ + = mock.Mock() + + def test_get_lock(self, _): + try: + # read needs to raise a 404 when the lock doesn't exist + self.mock_read.side_effect = ApiException(status=404) + mock_read = self.mock_read + resp = self.resp + + def update_get_and_set_return(*args, **kwargs): + # Once the lock is 'created' it should no longer raise err + mock_read.read_custom_resource.side_effect = None + mock_read.read_custom_resource.return_value = resp + # Set the mock_create return to return the new lock + return resp + + self.mock_create.side_effect = update_get_and_set_return + + self.test_lock.acquire_lock() + except lock.LockException: + self.fail("acquire_lock() raised LockException unexpectedly") + except ApiException: + self.fail("acquire_lock() raised ApiException unexpectedly") + try: + self.test_lock.release_lock() + except lock.LockException: + self.fail("release_lock() raised LockException unexpectedly") + except ApiException: + self.fail("acquire_lock() raised ApiException unexpectedly") + + @mock.patch('armada.handlers.lock.time', autospec=True) + def test_timeout_getting_lock(self, mock_time, _): + # The timestamp on the 'lock' will be new to avoid expiring + last_update = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ') + self.resp['data']['lastUpdated'] = str(last_update) + # Mocking time.time() so that acquire_lock() is run through once, and + # once the time is checked again the timeout will be reached + test_time = 1550510151.792119 + mock_time.time = mock.Mock() + + def set_time(): + nonlocal test_time + test_time += self.test_lock.timeout / 2 + return test_time + + mock_time.time.side_effect = set_time + + # Creating large expire time so the lock doesn't get overwritten + self.test_lock.expire_time = 60 + # Updating mocks so that there is always a 'lock' + self.mock_create.side_effect = ApiException(status=409) + self.mock_read.return_value = self.resp + + # It should fail to acquire the lock before the attempt times out + self.assertRaises(lock.LockException, self.test_lock.acquire_lock) + + def test_lock_expiration(self, _): + # Timestamp on the 'lock' is old to ensure lock is expired + self.resp['data']['lastUpdated'] = "2018-01-22T16:20:14Z" + + # When the lock already exists, Kubernetes responds with a 409 + self.mock_create.side_effect = ApiException(status=409) + # Getting the lock should return the 'lock' above + self.mock_read.return_value = self.resp + + # New return value of create should have a newer timestamp + new_resp = copy.deepcopy(self.resp) + new_time = str(datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')) + new_resp['metadata']['creationTimestamp'] = new_time + mock_create = self.mock_create + + def clear_side_effect(*args, **kwargs): + mock_create.side_effect = None + mock_create.return_value = new_resp + + # Once the lock is 'deleted' we need to stop create from raising err + self.mock_delete.side_effect = clear_side_effect + + try: + self.test_lock.acquire_lock() + except lock.LockException: + self.fail("acquire_lock() raised LockException unexpectedly") + + def test_custom_resource_definition_creation(self, _): + # When the crd doesn't exist yet, Kubernetes responds with a 404 when + # trying to create a lock + self.mock_create.side_effect = ApiException(status=404) + mock_create = self.mock_create + resp = self.resp + + def clear_side_effect(*args, **kwargs): + mock_create.side_effect = None + mock_create.return_value = resp + + # Once the definition is 'created' we need to stop raising err + self.mock_create_crd.side_effect = clear_side_effect + + try: + self.test_lock.acquire_lock() + except lock.LockException: + self.fail("acquire_lock() raised LockException unexpectedly") + + @mock.patch.object(lock.CONF, "lock_update_interval", 0.1) + @mock.patch('armada.handlers.lock.ThreadPoolExecutor') + @mock.patch('armada.handlers.lock.time', autospec=True) + def test_lock_decorator(self, mock_time, mock_thread, _): + # read needs to raise a 404 when the lock doesn't exist + self.mock_read.side_effect = ApiException(status=404) + mock_read = self.mock_read + resp = self.resp + + def update_get_and_set_return(*args, **kwargs): + # Once the lock is 'created' it should no longer raise err + mock_read.read_custom_resource.side_effect = None + mock_read.read_custom_resource.return_value = resp + # Set the mock_create return to return the new lock + return resp + + self.mock_create.side_effect = update_get_and_set_return + self.mock_replace.return_value = self.resp + + # Mocking the threading in lock_and_thread + mock_pool = mock_thread.return_value = mock.Mock() + mock_pool.submit = mock.Mock() + mock_future = mock_pool.submit.return_value = mock.Mock() + mock_future.done = mock.Mock() + # future.done() needs to return false so lock.update_lock() gets called + mock_future.done.return_value = False + + def clear_done(): + mock_future.done.return_value = True + mock_future.done.side_effect = None + + # After future.done() is called once it can be cleared and return True + mock_future.done.side_effect = clear_done + + # Mocking time.time() so it appears that more time has passed than + # CONF.lock_update_interval so update_lock() is run + # This also affects the acquire_lock() timeout check, which is why + # the lock_update_interval is mocked to be a low number + test_time = 1550510151.792119 + mock_time.time = mock.Mock() + + def set_time(): + nonlocal test_time + test_time += lock.CONF.lock_update_interval + 1 + return test_time + + mock_time.time.side_effect = set_time + + def func(): + return + + test_func_dec = lock.lock_and_thread()(func) + test_func_dec.lock = self.test_lock + try: + test_func_dec() + except lock.LockException: + self.fail("acquire_lock() raised LockException unexpectedly") + except ApiException: + self.fail("acquire_lock() raised ApiException unexpectedly")