From 4c71cc2c83fe2d19b69f61277edbe51eedd619ad Mon Sep 17 00:00:00 2001 From: Anthony Lin Date: Wed, 21 Mar 2018 08:50:51 +0000 Subject: [PATCH] Refactor Armada Operator We will move away from the usage of if/else block and will instead make use of inheritance (armada base operator) and task specific operators in our dags to execute the workflow. We also added timeout option for client read so that the CI/CD team can set other values for the execution of armada tasks (the default time out is currently set to 1hr for all tasks). Change-Id: I563fde76d91feae06a8a0298bc6eaf7cca1e66da --- shipyard_airflow/dags/armada_deploy_site.py | 23 +- shipyard_airflow/dags/deploy_site.py | 9 +- shipyard_airflow/dags/redeploy_server.py | 9 +- shipyard_airflow/dags/update_site.py | 9 +- shipyard_airflow/dags/validate_site_design.py | 5 +- .../plugins/armada_base_operator.py | 199 ++++++++++ .../plugins/armada_get_releases.py | 64 ++++ shipyard_airflow/plugins/armada_get_status.py | 67 ++++ shipyard_airflow/plugins/armada_operator.py | 361 ------------------ shipyard_airflow/plugins/armada_post_apply.py | 130 +++++++ .../plugins/armada_validate_design.py | 69 ++++ .../deployment_configuration_operator.py | 6 +- .../plugins/get_k8s_pod_port_ip.py | 18 +- shipyard_airflow/plugins/xcom_puller.py | 10 + .../schemas/deploymentConfiguration.yaml | 8 + tests/unit/control/test.conf | 11 - .../deploymentConfiguration_full_valid.yaml | 4 + 17 files changed, 585 insertions(+), 417 deletions(-) create mode 100644 shipyard_airflow/plugins/armada_base_operator.py create mode 100644 shipyard_airflow/plugins/armada_get_releases.py create mode 100644 shipyard_airflow/plugins/armada_get_status.py delete mode 100644 shipyard_airflow/plugins/armada_operator.py create mode 100644 shipyard_airflow/plugins/armada_post_apply.py create mode 100644 shipyard_airflow/plugins/armada_validate_design.py diff --git a/shipyard_airflow/dags/armada_deploy_site.py b/shipyard_airflow/dags/armada_deploy_site.py index 7839f740..c301697c 100644 --- a/shipyard_airflow/dags/armada_deploy_site.py +++ b/shipyard_airflow/dags/armada_deploy_site.py @@ -1,4 +1,4 @@ -# Copyright 2017 AT&T Intellectual Property. All other rights reserved. +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,7 +13,9 @@ # limitations under the License. from airflow.models import DAG -from airflow.operators import ArmadaOperator +from airflow.operators import ArmadaGetReleasesOperator +from airflow.operators import ArmadaGetStatusOperator +from airflow.operators import ArmadaPostApplyOperator from config_path import config_path @@ -27,35 +29,32 @@ def deploy_site_armada(parent_dag_name, child_dag_name, args): default_args=args) # Get Tiller Status - armada_status = ArmadaOperator( - task_id='armada_status', + armada_get_status = ArmadaGetStatusOperator( + task_id='armada_get_status', shipyard_conf=config_path, - action='armada_status', main_dag_name=parent_dag_name, sub_dag_name=child_dag_name, dag=dag) # Armada Apply - armada_apply = ArmadaOperator( - task_id='armada_apply', + armada_post_apply = ArmadaPostApplyOperator( + task_id='armada_post_apply', shipyard_conf=config_path, - action='armada_apply', main_dag_name=parent_dag_name, sub_dag_name=child_dag_name, retries=3, dag=dag) # Get Helm Releases - armada_get_releases = ArmadaOperator( + armada_get_releases = ArmadaGetReleasesOperator( task_id='armada_get_releases', shipyard_conf=config_path, - action='armada_get_releases', main_dag_name=parent_dag_name, sub_dag_name=child_dag_name, dag=dag) # Define dependencies - armada_apply.set_upstream(armada_status) - armada_get_releases.set_upstream(armada_apply) + armada_post_apply.set_upstream(armada_get_status) + armada_get_releases.set_upstream(armada_post_apply) return dag diff --git a/shipyard_airflow/dags/deploy_site.py b/shipyard_airflow/dags/deploy_site.py index fa40767f..0edfd877 100644 --- a/shipyard_airflow/dags/deploy_site.py +++ b/shipyard_airflow/dags/deploy_site.py @@ -47,8 +47,8 @@ action_xcom = step_factory.get_action_xcom() concurrency_check = step_factory.get_concurrency_check() preflight = step_factory.get_preflight() get_design_version = step_factory.get_get_design_version() -validate_site_design = step_factory.get_validate_site_design() deployment_configuration = step_factory.get_deployment_configuration() +validate_site_design = step_factory.get_validate_site_design() drydock_build = step_factory.get_drydock_build() armada_build = step_factory.get_armada_build() @@ -56,10 +56,7 @@ armada_build = step_factory.get_armada_build() concurrency_check.set_upstream(action_xcom) preflight.set_upstream(concurrency_check) get_design_version.set_upstream(preflight) -validate_site_design.set_upstream(get_design_version) deployment_configuration.set_upstream(get_design_version) -drydock_build.set_upstream([ - validate_site_design, - deployment_configuration -]) +validate_site_design.set_upstream(deployment_configuration) +drydock_build.set_upstream(validate_site_design) armada_build.set_upstream(drydock_build) diff --git a/shipyard_airflow/dags/redeploy_server.py b/shipyard_airflow/dags/redeploy_server.py index 32da66c0..d2fd6b65 100644 --- a/shipyard_airflow/dags/redeploy_server.py +++ b/shipyard_airflow/dags/redeploy_server.py @@ -48,8 +48,8 @@ action_xcom = step_factory.get_action_xcom() concurrency_check = step_factory.get_concurrency_check() preflight = step_factory.get_preflight() get_design_version = step_factory.get_get_design_version() -validate_site_design = step_factory.get_validate_site_design() deployment_configuration = step_factory.get_deployment_configuration() +validate_site_design = step_factory.get_validate_site_design() destroy_server = step_factory.get_destroy_server() drydock_build = step_factory.get_drydock_build() @@ -57,10 +57,7 @@ drydock_build = step_factory.get_drydock_build() concurrency_check.set_upstream(action_xcom) preflight.set_upstream(concurrency_check) get_design_version.set_upstream(preflight) -validate_site_design.set_upstream(get_design_version) deployment_configuration.set_upstream(get_design_version) -destroy_server.set_upstream([ - validate_site_design, - deployment_configuration -]) +validate_site_design.set_upstream(deployment_configuration) +destroy_server.set_upstream(validate_site_design) drydock_build.set_upstream(destroy_server) diff --git a/shipyard_airflow/dags/update_site.py b/shipyard_airflow/dags/update_site.py index 33764fcd..40ce4270 100644 --- a/shipyard_airflow/dags/update_site.py +++ b/shipyard_airflow/dags/update_site.py @@ -51,8 +51,8 @@ step_factory = CommonStepFactory(parent_dag_name=PARENT_DAG_NAME, action_xcom = step_factory.get_action_xcom() concurrency_check = step_factory.get_concurrency_check() get_design_version = step_factory.get_get_design_version() -validate_site_design = step_factory.get_validate_site_design() deployment_configuration = step_factory.get_deployment_configuration() +validate_site_design = step_factory.get_validate_site_design() drydock_build = step_factory.get_drydock_build() armada_build = step_factory.get_armada_build() decide_airflow_upgrade = step_factory.get_decide_airflow_upgrade() @@ -62,12 +62,9 @@ skip_upgrade_airflow = step_factory.get_skip_upgrade_airflow() # DAG Wiring concurrency_check.set_upstream(action_xcom) get_design_version.set_upstream(concurrency_check) -validate_site_design.set_upstream(get_design_version) deployment_configuration.set_upstream(get_design_version) -drydock_build.set_upstream([ - validate_site_design, - deployment_configuration -]) +validate_site_design.set_upstream(deployment_configuration) +drydock_build.set_upstream(validate_site_design) armada_build.set_upstream(drydock_build) decide_airflow_upgrade.set_upstream(armada_build) decide_airflow_upgrade.set_downstream(upgrade_airflow) diff --git a/shipyard_airflow/dags/validate_site_design.py b/shipyard_airflow/dags/validate_site_design.py index 23aea500..4cbc5f17 100644 --- a/shipyard_airflow/dags/validate_site_design.py +++ b/shipyard_airflow/dags/validate_site_design.py @@ -13,7 +13,7 @@ # limitations under the License. from airflow.models import DAG -from airflow.operators import ArmadaOperator +from airflow.operators import ArmadaValidateDesignOperator from airflow.operators import DeckhandValidateSiteDesignOperator from airflow.operators import DrydockValidateDesignOperator @@ -44,10 +44,9 @@ def validate_site_design(parent_dag_name, child_dag_name, args): retries=3, dag=dag) - armada_validate_docs = ArmadaOperator( + armada_validate_docs = ArmadaValidateDesignOperator( task_id='armada_validate_site_design', shipyard_conf=config_path, - action='validate_site_design', main_dag_name=parent_dag_name, sub_dag_name=child_dag_name, retries=3, diff --git a/shipyard_airflow/plugins/armada_base_operator.py b/shipyard_airflow/plugins/armada_base_operator.py new file mode 100644 index 00000000..73012083 --- /dev/null +++ b/shipyard_airflow/plugins/armada_base_operator.py @@ -0,0 +1,199 @@ +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os +from urllib.parse import urlparse + +from airflow.exceptions import AirflowException +from airflow.models import BaseOperator +from airflow.plugins_manager import AirflowPlugin +from airflow.utils.decorators import apply_defaults + +import armada.common.client as client +import armada.common.session as session +from get_k8s_pod_port_ip import get_pod_port_ip +from service_endpoint import ucp_service_endpoint +from service_token import shipyard_service_token +from xcom_puller import XcomPuller +from xcom_pusher import XcomPusher + + +class ArmadaBaseOperator(BaseOperator): + + """Armada Base Operator + + All armada related workflow operators will use the aramda + base operator as the parent and inherit attributes and methods + from this class + + """ + + @apply_defaults + def __init__(self, + deckhand_design_ref=None, + deckhand_svc_type='deckhand', + armada_client=None, + armada_svc_endpoint=None, + armada_svc_type='armada', + main_dag_name=None, + query={}, + shipyard_conf=None, + sub_dag_name=None, + svc_session=None, + svc_token=None, + xcom_push=True, + *args, **kwargs): + """Initialization of ArmadaBaseOperator object. + + :param deckhand_design_ref: A URI reference to the design documents + :param deckhand_svc_type: Deckhand Service Type + :param armadaclient: An instance of armada client + :param armada_svc_endpoint: Armada Service Endpoint + :param armada_svc_type: Armada Service Type + :param main_dag_name: Parent Dag + :param query: A dictionary containing explicit query string parameters + :param shipyard_conf: Location of shipyard.conf + :param sub_dag_name: Child Dag + :param svc_session: Keystone Session + :param svc_token: Keystone Token + :param xcom_push: xcom usage + + The Armada operator assumes that prior steps have set xcoms for + the action and the deployment configuration + + """ + + super(ArmadaBaseOperator, self).__init__(*args, **kwargs) + self.deckhand_design_ref = deckhand_design_ref + self.deckhand_svc_type = deckhand_svc_type + self.armada_client = armada_client + self.armada_svc_endpoint = armada_svc_endpoint + self.armada_svc_type = armada_svc_type + self.main_dag_name = main_dag_name + self.query = query + self.shipyard_conf = shipyard_conf + self.sub_dag_name = sub_dag_name + self.svc_session = svc_session + self.svc_token = svc_token + self.xcom_push_flag = xcom_push + + def execute(self, context): + + # Execute armada base function + self.armada_base(context) + + # Exeute child function + self.do_execute() + + @shipyard_service_token + def armada_base(self, context): + + # Define task_instance + task_instance = context['task_instance'] + + # Set up and retrieve values from xcom + self.xcom_puller = XcomPuller(self.main_dag_name, task_instance) + self.action_info = self.xcom_puller.get_action_info() + self.dc = self.xcom_puller.get_deployment_configuration() + + # Set up xcom_pusher to push values to xcom + self.xcom_pusher = XcomPusher(context['task_instance']) + + # Logs uuid of action performed by the Operator + logging.info("Armada Operator for action %s", self.action_info['id']) + + # Set up armada client + self.get_armada_client() + + # Get deckhand design reference url + self.get_deckhand_design_ref() + + def get_armada_client(self): + + # Retrieve Endpoint Information + self.armada_svc_endpoint = ucp_service_endpoint( + self, svc_type=self.armada_svc_type) + + logging.info("Armada endpoint is %s", self.armada_svc_endpoint) + + # Parse Armada Service Endpoint + armada_url = urlparse(self.armada_svc_endpoint) + + # Build a ArmadaSession with credentials and target host + # information. + logging.info("Build Armada Session") + a_session = session.ArmadaSession(host=armada_url.hostname, + port=armada_url.port, + scheme='http', + token=self.svc_token, + marker=None) + + # Raise Exception if we are not able to set up the session + if a_session: + logging.info("Successfully Set Up Armada Session") + else: + raise AirflowException("Failed to set up Armada Session!") + + # Use the ArmadaSession to build a ArmadaClient that can + # be used to make one or more API calls + logging.info("Create Armada Client") + self.armada_client = client.ArmadaClient(a_session) + + # Raise Exception if we are not able to build armada client + if self.armada_client: + logging.info("Successfully Set Up Armada client") + else: + raise AirflowException("Failed to set up Armada client!") + + def get_deckhand_design_ref(self): + + # Retrieve DeckHand Endpoint Information + deckhand_svc_endpoint = ucp_service_endpoint( + self, svc_type=self.deckhand_svc_type) + + logging.info("Deckhand endpoint is %s", deckhand_svc_endpoint) + + # Retrieve last committed revision id + committed_revision_id = self.xcom_puller.get_design_version() + + # Form DeckHand Design Reference Path + # This URL will be used to retrieve the Site Design YAMLs + deckhand_path = "deckhand+" + deckhand_svc_endpoint + self.deckhand_design_ref = os.path.join(deckhand_path, + "revisions", + str(committed_revision_id), + "rendered-documents") + + if self.deckhand_design_ref: + logging.info("Design YAMLs will be retrieved from %s", + self.deckhand_design_ref) + else: + raise AirflowException("Unable to Retrieve Design Reference!") + + @get_pod_port_ip('tiller', namespace='kube-system') + def get_tiller_info(self, pods_ip_port={}): + + # Assign value to the 'query' dictionary so that we can pass + # it via the Armada Client + self.query['tiller_host'] = pods_ip_port['tiller']['ip'] + self.query['tiller_port'] = pods_ip_port['tiller']['port'] + + +class ArmadaBaseOperatorPlugin(AirflowPlugin): + + """Creates ArmadaBaseOperator in Airflow.""" + + name = 'armada_base_operator_plugin' + operators = [ArmadaBaseOperator] diff --git a/shipyard_airflow/plugins/armada_get_releases.py b/shipyard_airflow/plugins/armada_get_releases.py new file mode 100644 index 00000000..616debfe --- /dev/null +++ b/shipyard_airflow/plugins/armada_get_releases.py @@ -0,0 +1,64 @@ +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from airflow.exceptions import AirflowException +from airflow.plugins_manager import AirflowPlugin + +from armada_base_operator import ArmadaBaseOperator +from armada.exceptions import api_exceptions as errors + + +class ArmadaGetReleasesOperator(ArmadaBaseOperator): + + """Armada Get Releases Operator + + This operator will trigger armada to get the Helm charts releases + of the environment. + + """ + + def do_execute(self): + + # Retrieve Tiller Information + self.get_tiller_info(pods_ip_port={}) + + # Retrieve read timeout + timeout = self.dc['armada.get_releases_timeout'] + + # Retrieve Armada Releases after deployment + logging.info("Retrieving Helm charts releases after deployment..") + + try: + armada_get_releases = self.armada_client.get_releases( + self.query, + timeout=timeout) + + except errors.ClientError as client_error: + raise AirflowException(client_error) + + if armada_get_releases: + logging.info("Successfully retrieved Helm charts releases") + logging.info(armada_get_releases) + else: + raise AirflowException("Failed to retrieve Helm charts releases!") + + +class ArmadaGetReleasesOperatorPlugin(AirflowPlugin): + + """Creates ArmadaGetReleasesOperator in Airflow.""" + + name = 'armada_get_releases_operator' + operators = [ArmadaGetReleasesOperator] diff --git a/shipyard_airflow/plugins/armada_get_status.py b/shipyard_airflow/plugins/armada_get_status.py new file mode 100644 index 00000000..9690a46d --- /dev/null +++ b/shipyard_airflow/plugins/armada_get_status.py @@ -0,0 +1,67 @@ +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from airflow.exceptions import AirflowException +from airflow.plugins_manager import AirflowPlugin + +from armada_base_operator import ArmadaBaseOperator +from armada.exceptions import api_exceptions as errors + + +class ArmadaGetStatusOperator(ArmadaBaseOperator): + + """Armada Get Status Operator + + This operator will trigger armada to get the current status of + Tiller. Tiller needs to be in a healthy state before any site + deployment/update. + + """ + + def do_execute(self): + + # Retrieve Tiller Information + self.get_tiller_info(pods_ip_port={}) + + # Retrieve read timeout + timeout = self.dc['armada.get_status_timeout'] + + # Check State of Tiller + try: + armada_get_status = self.armada_client.get_status( + self.query, + timeout=timeout) + + except errors.ClientError as client_error: + raise AirflowException(client_error) + + # Tiller State will return boolean value, i.e. True/False + # Raise Exception if Tiller is unhealthy + if armada_get_status['tiller']['state']: + logging.info("Tiller is in running state") + logging.info("Tiller version is %s", + armada_get_status['tiller']['version']) + + else: + raise AirflowException("Please check Tiller!") + + +class ArmadaGetStatusOperatorPlugin(AirflowPlugin): + + """Creates ArmadaGetStatusOperator in Airflow.""" + + name = 'armada_get_status_operator' + operators = [ArmadaGetStatusOperator] diff --git a/shipyard_airflow/plugins/armada_operator.py b/shipyard_airflow/plugins/armada_operator.py deleted file mode 100644 index 6f22ef74..00000000 --- a/shipyard_airflow/plugins/armada_operator.py +++ /dev/null @@ -1,361 +0,0 @@ -# Copyright 2018 AT&T Intellectual Property. All other rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import json -import logging -import os -import requests -from urllib.parse import urlparse - -from airflow.models import BaseOperator -from airflow.utils.decorators import apply_defaults -from airflow.plugins_manager import AirflowPlugin -from airflow.exceptions import AirflowException - -import armada.common.client as client -import armada.common.session as session -from get_k8s_pod_port_ip import get_pod_port_ip -from service_endpoint import ucp_service_endpoint -from service_token import shipyard_service_token -from xcom_puller import XcomPuller -from xcom_pusher import XcomPusher - - -class ArmadaOperator(BaseOperator): - """ - Supports interaction with Armada - :param action: Task to perform - :param main_dag_name: Parent Dag - :param shipyard_conf: Location of shipyard.conf - :param sub_dag_name: Child Dag - - The Armada operator assumes that prior steps have set xcoms for - the action and the deployment configuration - """ - - @apply_defaults - def __init__(self, - action=None, - main_dag_name=None, - shipyard_conf=None, - svc_token=None, - sub_dag_name=None, - xcom_push=True, - *args, **kwargs): - - super(ArmadaOperator, self).__init__(*args, **kwargs) - self.action = action - self.main_dag_name = main_dag_name - self.shipyard_conf = shipyard_conf - self.svc_token = svc_token - self.sub_dag_name = sub_dag_name - self.xcom_push_flag = xcom_push - - def execute(self, context): - # Initialize Variables - armada_client = None - design_ref = None - - # Define task_instance - task_instance = context['task_instance'] - - # Set up and retrieve values from xcom - self.xcom_puller = XcomPuller(self.main_dag_name, task_instance) - self.action_info = self.xcom_puller.get_action_info() - - # Logs uuid of action performed by the Operator - logging.info("Armada Operator for action %s", self.action_info['id']) - - # Retrieve Deckhand Design Reference - design_ref = self.get_deckhand_design_ref(context) - - if design_ref: - logging.info("Design YAMLs will be retrieved from %s", - design_ref) - else: - raise AirflowException("Unable to Retrieve Design Reference!") - - # Validate Site Design - if self.action == 'validate_site_design': - # Initialize variable - armada_svc_endpoint = None - site_design_validity = 'invalid' - - # Retrieve Endpoint Information - svc_type = 'armada' - armada_svc_endpoint = ucp_service_endpoint(self, - svc_type=svc_type) - - site_design_validity = self.armada_validate_site_design( - armada_svc_endpoint, design_ref) - - if site_design_validity == 'valid': - logging.info("Site Design has been successfully validated") - else: - raise AirflowException("Site Design Validation Failed!") - - return site_design_validity - - # Set up target manifest (only if not doing validate) - self.dc = self.xcom_puller.get_deployment_configuration() - self.target_manifest = self.dc['armada.manifest'] - - # Create Armada Client - # Retrieve Endpoint Information - svc_type = 'armada' - armada_svc_endpoint = ucp_service_endpoint(self, - svc_type=svc_type) - logging.info("Armada endpoint is %s", armada_svc_endpoint) - - # Set up Armada Client - armada_client = self.armada_session_client(armada_svc_endpoint) - - # Retrieve Tiller Information and assign to context 'query' - context['query'] = self.get_tiller_info(context) - - # Armada API Call - # Armada Status - if self.action == 'armada_status': - self.get_armada_status(context, armada_client) - - # Armada Apply - elif self.action == 'armada_apply': - self.armada_apply(context, armada_client, design_ref, - self.target_manifest) - - # Armada Get Releases - elif self.action == 'armada_get_releases': - self.armada_get_releases(context, armada_client) - - else: - logging.info('No Action to Perform') - - @shipyard_service_token - def armada_session_client(self, armada_svc_endpoint): - # Initialize Variables - armada_url = None - a_session = None - a_client = None - - # Parse Armada Service Endpoint - armada_url = urlparse(armada_svc_endpoint) - - # Build a ArmadaSession with credentials and target host - # information. - logging.info("Build Armada Session") - a_session = session.ArmadaSession(host=armada_url.hostname, - port=armada_url.port, - scheme='http', - token=self.svc_token, - marker=None) - - # Raise Exception if we are not able to get armada session - if a_session: - logging.info("Successfully Set Up Armada Session") - else: - raise AirflowException("Failed to set up Armada Session!") - - # Use session to build a ArmadaClient to make one or more - # API calls. The ArmadaSession will care for TCP connection - # pooling and header management - logging.info("Create Armada Client") - a_client = client.ArmadaClient(a_session) - - # Raise Exception if we are not able to build armada client - if a_client: - logging.info("Successfully Set Up Armada client") - else: - raise AirflowException("Failed to set up Armada client!") - - # Return Armada client for XCOM Usage - return a_client - - @get_pod_port_ip('tiller', namespace='kube-system') - def get_tiller_info(self, context, *args, **kwargs): - # Initialize Variable - query = {} - - # Get IP and port information of Pods from context - k8s_pods_ip_port = context['pods_ip_port'] - - # Assign value to the 'query' dictionary so that we can pass - # it via the Armada Client - query['tiller_host'] = k8s_pods_ip_port['tiller'].get('ip') - query['tiller_port'] = k8s_pods_ip_port['tiller'].get('port') - - return query - - def get_armada_status(self, context, armada_client): - # Check State of Tiller - armada_status = armada_client.get_status(context['query']) - - # Tiller State will return boolean value, i.e. True/False - # Raise Exception if Tiller is in a bad state - if armada_status['tiller']['state']: - logging.info("Tiller is in running state") - logging.info("Tiller version is %s", - armada_status['tiller']['version']) - else: - raise AirflowException("Please check Tiller!") - - def armada_apply(self, context, armada_client, design_ref, - target_manifest): - '''Run Armada Apply - ''' - # Initialize Variables - armada_manifest = None - armada_ref = design_ref - armada_post_apply = {} - override_values = [] - chart_set = [] - upgrade_airflow_worker = False - - # enhance the context's query entity with target_manifest - query = context.get('query', {}) - query['target_manifest'] = target_manifest - - # Execute Armada Apply to install the helm charts in sequence - logging.info("Armada Apply") - armada_post_apply = armada_client.post_apply(manifest=armada_manifest, - manifest_ref=armada_ref, - values=override_values, - set=chart_set, - query=query) - - # Search for Shipyard deployment in the list of chart upgrades - # NOTE: It is possible for the chart name to take on different - # values, e.g. 'aic-ucp-shipyard', 'ucp-shipyard'. Hence we - # will search for the word 'shipyard', which should exist as - # part of the name of the Shipyard Helm Chart. - for i in armada_post_apply['message']['upgrade']: - if 'shipyard' in i: - upgrade_airflow_worker = True - break - - # Create xcom key 'upgrade_airflow_worker' - # Value of key will depend on whether an upgrade has been - # performed on the Shipyard/Airflow Chart - self.xcom_pusher = XcomPusher(context['task_instance']) - - if upgrade_airflow_worker: - self.xcom_pusher.xcom_push(key='upgrade_airflow_worker', - value='true') - else: - self.xcom_pusher.xcom_push(key='upgrade_airflow_worker', - value='false') - - # We will expect Armada to return the releases that it is - # deploying. Note that if we try and deploy the same release - # twice, we will end up with empty response as nothing has - # changed. - if (armada_post_apply['message']['install'] or - armada_post_apply['message']['upgrade']): - logging.info("Armada Apply Successfully Executed") - logging.info(armada_post_apply) - else: - logging.warning("No new changes/updates were detected") - logging.info(armada_post_apply) - - def armada_get_releases(self, context, armada_client): - # Initialize Variables - armada_releases = {} - deckhand_svc_endpoint = None - - # Retrieve Armada Releases after deployment - logging.info("Retrieving Armada Releases after deployment..") - armada_releases = armada_client.get_releases(context['query']) - - if armada_releases: - logging.info("Retrieved current Armada Releases") - logging.info(armada_releases) - else: - raise AirflowException("Failed to retrieve Armada Releases") - - def get_deckhand_design_ref(self, context): - - # Retrieve DeckHand Endpoint Information - svc_type = 'deckhand' - deckhand_svc_endpoint = ucp_service_endpoint(self, - svc_type=svc_type) - logging.info("Deckhand endpoint is %s", deckhand_svc_endpoint) - - # Retrieve revision_id from xcom - committed_revision_id = self.xcom_puller.get_design_version() - - # Form Design Reference Path that we will use to retrieve - # the Design YAMLs - deckhand_path = "deckhand+" + deckhand_svc_endpoint - deckhand_design_ref = os.path.join(deckhand_path, - "revisions", - str(committed_revision_id), - "rendered-documents") - - return deckhand_design_ref - - @shipyard_service_token - def armada_validate_site_design(self, armada_svc_endpoint, design_ref): - - # Form Validation Endpoint - validation_endpoint = os.path.join(armada_svc_endpoint, - 'validatedesign') - - logging.info("Validation Endpoint is %s", validation_endpoint) - - # Define Headers and Payload - headers = { - 'Content-Type': 'application/json', - 'X-Auth-Token': self.svc_token - } - - payload = { - 'rel': "design", - 'href': design_ref, - 'type': "application/x-yaml" - } - - # Requests Armada to validate site design - logging.info("Waiting for Armada to validate site design...") - - try: - design_validate_response = requests.post(validation_endpoint, - headers=headers, - data=json.dumps(payload)) - except requests.exceptions.RequestException as e: - raise AirflowException(e) - - # Convert response to string - validate_site_design = design_validate_response.text - - # Print response - logging.info("Retrieving Armada validate site design response...") - - try: - validate_site_design_dict = json.loads(validate_site_design) - logging.info(validate_site_design_dict) - except json.JSONDecodeError as e: - raise AirflowException(e) - - status = str(validate_site_design_dict.get('status', 'unspecified')) - # Check if site design is valid - logging.info("Armada Site Design valdation status " - "is: {}".format(status)) - if status.lower() == 'success': - return 'valid' - else: - return 'invalid' - - -class ArmadaOperatorPlugin(AirflowPlugin): - name = 'armada_operator_plugin' - operators = [ArmadaOperator] diff --git a/shipyard_airflow/plugins/armada_post_apply.py b/shipyard_airflow/plugins/armada_post_apply.py new file mode 100644 index 00000000..17c66b9a --- /dev/null +++ b/shipyard_airflow/plugins/armada_post_apply.py @@ -0,0 +1,130 @@ +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from airflow.exceptions import AirflowException +from airflow.plugins_manager import AirflowPlugin + +from armada_base_operator import ArmadaBaseOperator +from armada.exceptions import api_exceptions as errors + + +class ArmadaPostApplyOperator(ArmadaBaseOperator): + + """Armada Post Apply Operator + + This operator will trigger armada to apply the manifest and + start a site deployment/update/upgrade. + + """ + + def do_execute(self): + + # Initialize Variables + armada_manifest = None + chart_set = [] + override_values = [] + upgrade_airflow_worker = False + + # Set up target manifest + self.dc = self.xcom_puller.get_deployment_configuration() + self.target_manifest = self.dc['armada.manifest'] + + # Retrieve Tiller Information + self.get_tiller_info(pods_ip_port={}) + + # Update query dict with information of target_manifest + self.query['target_manifest'] = self.target_manifest + + # Retrieve read timeout + timeout = self.dc['armada.post_apply_timeout'] + + # Execute Armada Apply to install the helm charts in sequence + logging.info("Armada Apply") + + try: + armada_post_apply = self.armada_client.post_apply( + manifest=armada_manifest, + manifest_ref=self.deckhand_design_ref, + values=override_values, + set=chart_set, + query=self.query, + timeout=timeout) + + except errors.ClientError as client_error: + # Set 'get_attempted_failed_install_upgrade' xcom to 'true' + self.xcom_pusher.xcom_push( + key='get_attempted_failed_install_upgrade', + value='true') + + raise AirflowException(client_error) + + # Retrieve xcom for 'get_attempted_failed_install_upgrade' + # NOTE: The key will only be set to 'true' if there was a failed + # attempt to upgrade or update the Helm charts. It does not hold + # any value by default. + if self.xcom_puller.get_attempted_failed_install_upgrade() == 'true': + # NOTE: It is possible for Armada to return a HTTP 500 response + # even though the Helm charts have been upgraded/updated. The + # workflow will treat the 'Armada Apply' task as a failed attempt + # in such situation and proceed to schedule and run the task for + # a second time (the default is 3 retries). As the relevant Helm + # Charts would have already been updated, we will get an empty + # list from Armada for that second retry. As a workaround, we will + # need to treat such response as a successful upgrade/update. + # A long term solution will be in place in the future. + if (not armada_post_apply['message']['install'] and + not armada_post_apply['message']['upgrade']): + upgrade_airflow_worker = True + + # Search for Shipyard deployment in the list of chart upgrades + # NOTE: It is possible for the chart name to take on different + # values, e.g. 'aic-ucp-shipyard', 'ucp-shipyard'. Hence we + # will search for the word 'shipyard', which should exist as + # part of the name of the Shipyard Helm Chart. + for i in armada_post_apply['message']['upgrade']: + if 'shipyard' in i: + upgrade_airflow_worker = True + break + + # Create xcom key 'upgrade_airflow_worker' + # Value of key will depend on whether an upgrade has been + # performed on the Shipyard/Airflow Chart + if upgrade_airflow_worker: + self.xcom_pusher.xcom_push(key='upgrade_airflow_worker', + value='true') + else: + self.xcom_pusher.xcom_push(key='upgrade_airflow_worker', + value='false') + + # We will expect Armada to return the releases that it is + # deploying. Note that if we try and deploy the same release + # twice, we will end up with empty response as nothing has + # changed. + if (armada_post_apply['message']['install'] or + armada_post_apply['message']['upgrade']): + logging.info("Successfully Executed Armada Apply") + logging.info(armada_post_apply) + else: + logging.warning("No new changes/updates were detected!") + logging.info(armada_post_apply) + + +class ArmadaPostApplyOperatorPlugin(AirflowPlugin): + + """Creates ArmadaPostApplyOperator in Airflow.""" + + name = 'armada_post_apply_operator' + operators = [ArmadaPostApplyOperator] diff --git a/shipyard_airflow/plugins/armada_validate_design.py b/shipyard_airflow/plugins/armada_validate_design.py new file mode 100644 index 00000000..ac17d97b --- /dev/null +++ b/shipyard_airflow/plugins/armada_validate_design.py @@ -0,0 +1,69 @@ +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from airflow.plugins_manager import AirflowPlugin +from airflow.exceptions import AirflowException + +from armada_base_operator import ArmadaBaseOperator +from armada.exceptions import api_exceptions as errors + + +class ArmadaValidateDesignOperator(ArmadaBaseOperator): + + """Armada Validate Design Operator + + This operator will trigger armada to validate the + site design + + """ + + def do_execute(self): + + # Requests Armada to validate site design + logging.info("Waiting for Armada to validate site design...") + + # Retrieve read timeout + timeout = self.dc['armada.validate_design_timeout'] + + # Validate Site Design + try: + post_validate = self.armada_client.post_validate( + manifest=self.deckhand_design_ref, + timeout=timeout) + + except errors.ClientError as client_error: + raise AirflowException(client_error) + + # Print results + logging.info("Retrieving Armada validate site design response...") + logging.info(post_validate) + + # Check if site design is valid + status = str(post_validate.get('status', 'unspecified')) + + if status.lower() == 'success': + logging.info("Site Design has been successfully validated") + else: + raise AirflowException("Site Design Validation Failed " + "with status: {}!".format(status)) + + +class ArmadaValidateDesignOperatorPlugin(AirflowPlugin): + + """Creates ArmadaValidateDesignOperator in Airflow.""" + + name = 'armada_validate_design_operator' + operators = [ArmadaValidateDesignOperator] diff --git a/shipyard_airflow/plugins/deployment_configuration_operator.py b/shipyard_airflow/plugins/deployment_configuration_operator.py index 5e15d924..8d27ca72 100644 --- a/shipyard_airflow/plugins/deployment_configuration_operator.py +++ b/shipyard_airflow/plugins/deployment_configuration_operator.py @@ -64,7 +64,11 @@ class DeploymentConfigurationOperator(BaseOperator): "kubernetes_provisioner.clear_labels_timeout": 1800, "kubernetes_provisioner.remove_etcd_timeout": 1800, "kubernetes_provisioner.etcd_ready_timeout": 600, - "armada.manifest": "full-site" + "armada.get_releases_timeout": 300, + "armada.get_status_timeout": 300, + "armada.manifest": "full-site", + "armada.post_apply_timeout": 1800, + "armada.validate_design_timeout": 600 } @apply_defaults diff --git a/shipyard_airflow/plugins/get_k8s_pod_port_ip.py b/shipyard_airflow/plugins/get_k8s_pod_port_ip.py index 2c6dd78f..d1c9e99a 100644 --- a/shipyard_airflow/plugins/get_k8s_pod_port_ip.py +++ b/shipyard_airflow/plugins/get_k8s_pod_port_ip.py @@ -22,26 +22,25 @@ from kubernetes import client, config def get_pod_port_ip(*pods, namespace): def get_k8s_pod_port_ip(func): @wraps(func) - def k8s_pod_port_ip_get(self, context, *args, **kwargs): + def k8s_pod_port_ip_get(self, pods_ip_port): """This function retrieves Kubernetes Pod Port and IP information. It can be used to retrieve information of single pod deployment and/or statefulsets. For instance, it can be used to retrieve the tiller pod IP and port information for usage in the Armada Operator. - :param context: Information on the current workflow + :param pods_ip_port: IP and port information of the pods Example:: from get_k8s_pod_port_ip import get_pod_port_ip @get_pod_port_ip('tiller', namespace='kube-system') - def get_pod_info(self, context, *args, **kwargs): - # Get IP and port information of Pods from context - k8s_pods_ip_port = context['pods_ip_port'] + def get_pod_info(self, pods_ip_port={}): + + tiller_ip = pods_ip_port['tiller']['ip'] + tiller_port = pods_ip_port['tiller']['port'] - tiller_ip = k8s_pods_ip_port['tiller'].get('ip') - tiller_port = k8s_pods_ip_port['tiller'].get('port') """ # Initialize variable k8s_pods = {} @@ -116,10 +115,7 @@ def get_pod_port_ip(*pods, namespace): if not pod_attr[pod_name]: raise AirflowException("Unable to locate", pod_name) - # Assign pods IP and ports information to context - context['pods_ip_port'] = k8s_pods - - return func(self, context, *args, **kwargs) + return func(self, pods_ip_port=k8s_pods) return k8s_pod_port_ip_get return get_k8s_pod_port_ip diff --git a/shipyard_airflow/plugins/xcom_puller.py b/shipyard_airflow/plugins/xcom_puller.py index 5b3ddef9..38a6ad45 100644 --- a/shipyard_airflow/plugins/xcom_puller.py +++ b/shipyard_airflow/plugins/xcom_puller.py @@ -82,3 +82,13 @@ class XcomPuller(object): return self._get_xcom(source_task=source_task, dag_id=source_dag, key=key) + + def get_attempted_failed_install_upgrade(self): + """Retrieve information on whether there was a failed attempt + of Armada Apply""" + source_task = 'armada_post_apply' + source_dag = 'armada_build' + key = 'attempted_failed_install_upgrade' + return self._get_xcom(source_task=source_task, + dag_id=source_dag, + key=key) diff --git a/shipyard_airflow/schemas/deploymentConfiguration.yaml b/shipyard_airflow/schemas/deploymentConfiguration.yaml index ce6ae1aa..91466f1a 100644 --- a/shipyard_airflow/schemas/deploymentConfiguration.yaml +++ b/shipyard_airflow/schemas/deploymentConfiguration.yaml @@ -66,8 +66,16 @@ data: armada: type: 'object' properties: + get_releases_timeout: + type: 'integer' + get_status_timeout: + type: 'integer' manifest: type: 'string' + post_apply_timeout: + type: 'integer' + validate_design_timeout: + type: 'integer' additionalProperties: false required: - manifest diff --git a/tests/unit/control/test.conf b/tests/unit/control/test.conf index b9986955..239d1cdd 100644 --- a/tests/unit/control/test.conf +++ b/tests/unit/control/test.conf @@ -10,18 +10,7 @@ airflow_api_read_timeout = 60 [deckhand] service_type = deckhand [drydock] -cluster_join_check_backoff_time = 120 -deploy_node_query_interval = 30 -deploy_node_task_timeout = 3600 -destroy_node_query_interval = 30 -destroy_node_task_timeout = 900 -prepare_node_query_interval = 30 -prepare_node_task_timeout = 1800 -prepare_site_query_interval = 10 -prepare_site_task_timeout = 300 service_type = physicalprovisioner -verify_site_query_interval = 10 -verify_site_task_timeout = 60 [keystone_authtoken] auth_section = keystone_authtoken auth_type = password diff --git a/tests/unit/yaml_samples/deploymentConfiguration_full_valid.yaml b/tests/unit/yaml_samples/deploymentConfiguration_full_valid.yaml index 7501b172..fbb1942a 100644 --- a/tests/unit/yaml_samples/deploymentConfiguration_full_valid.yaml +++ b/tests/unit/yaml_samples/deploymentConfiguration_full_valid.yaml @@ -28,4 +28,8 @@ data: remove_etcd_timeout: 1800 etcd_ready_timeout: 600 armada: + get_releases_timeout: 300 + get_status_timeout: 300 manifest: 'full-site' + post_apply_timeout: 1800 + validate_design_timeout: 600