From 47cd7a25f47cda9f6d8dfac7f109879320af82a1 Mon Sep 17 00:00:00 2001 From: Anthony Lin Date: Wed, 18 Apr 2018 07:15:26 +0000 Subject: [PATCH] Align Operators with UcpBaseOperator All UCP Operators will inherit from the UcpBaseOperator [0] This patch set will align the rest of the Operators, i.e. Armada, Deckhand and Promenade Operators with the UcpBaseOperator It also updates the name of the shipyard container to be 'shipyard-api' instead of 'shipyard' [0] https://review.gerrithub.io/#/c/407736/ Change-Id: I516590c492e9bb5554161119dade278d74197374 --- .../templates/deployment-shipyard.yaml | 4 +- .../plugins/armada_base_operator.py | 62 ++++++------------- .../plugins/armada_get_releases.py | 12 ++-- shipyard_airflow/plugins/armada_get_status.py | 12 ++-- shipyard_airflow/plugins/armada_post_apply.py | 20 +++--- .../plugins/armada_validate_design.py | 17 +++-- .../plugins/deckhand_base_operator.py | 60 +++++------------- .../plugins/deckhand_get_design.py | 19 ++++-- .../plugins/deckhand_retrieve_rendered_doc.py | 13 ++-- .../plugins/deckhand_validate_site.py | 15 +++-- .../plugins/promenade_base_operator.py | 51 ++++----------- .../plugins/promenade_check_etcd.py | 7 ++- .../plugins/promenade_clear_labels.py | 9 +-- .../plugins/promenade_decommission_node.py | 9 +-- .../plugins/promenade_drain_node.py | 9 +-- .../plugins/promenade_shutdown_kubelet.py | 9 +-- shipyard_airflow/plugins/ucp_base_operator.py | 9 ++- 17 files changed, 153 insertions(+), 184 deletions(-) diff --git a/charts/shipyard/templates/deployment-shipyard.yaml b/charts/shipyard/templates/deployment-shipyard.yaml index 5de45438..0eaa33a8 100644 --- a/charts/shipyard/templates/deployment-shipyard.yaml +++ b/charts/shipyard/templates/deployment-shipyard.yaml @@ -24,7 +24,7 @@ apiVersion: apps/v1beta1 kind: Deployment metadata: - name: shipyard + name: shipyard-api spec: replicas: {{ .Values.pod.replicas.shipyard.api }} {{ tuple $envAll | include "helm-toolkit.snippets.kubernetes_upgrades_deployment" | indent 2 }} @@ -44,7 +44,7 @@ spec: initContainers: {{ tuple $envAll $dependencies $mounts_shipyard_init | include "helm-toolkit.snippets.kubernetes_entrypoint_init_container" | indent 8 }} containers: - - name: shipyard + - name: shipyard-api env: - name: 'SHIPYARD_API_WORKERS' value: {{ .Values.conf.uwsgi.workers | quote }} diff --git a/shipyard_airflow/plugins/armada_base_operator.py b/shipyard_airflow/plugins/armada_base_operator.py index c76050c9..a66415d9 100644 --- a/shipyard_airflow/plugins/armada_base_operator.py +++ b/shipyard_airflow/plugins/armada_base_operator.py @@ -11,13 +11,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging import os from urllib.parse import urlparse from airflow.exceptions import AirflowException -from airflow.models import BaseOperator from airflow.plugins_manager import AirflowPlugin from airflow.utils.decorators import apply_defaults @@ -26,11 +24,13 @@ import armada.common.session as session from get_k8s_pod_port_ip import get_pod_port_ip from service_endpoint import ucp_service_endpoint from service_token import shipyard_service_token -from xcom_puller import XcomPuller +from ucp_base_operator import UcpBaseOperator from xcom_pusher import XcomPusher +LOG = logging.getLogger(__name__) -class ArmadaBaseOperator(BaseOperator): + +class ArmadaBaseOperator(UcpBaseOperator): """Armada Base Operator @@ -44,66 +44,42 @@ class ArmadaBaseOperator(BaseOperator): def __init__(self, armada_svc_type='armada', deckhand_svc_type='deckhand', - main_dag_name=None, query={}, - shipyard_conf=None, - sub_dag_name=None, svc_session=None, svc_token=None, - xcom_push=True, *args, **kwargs): """Initialization of ArmadaBaseOperator object. :param armada_svc_type: Armada Service Type :param deckhand_svc_type: Deckhand Service Type - :param main_dag_name: Parent Dag :param query: A dictionary containing explicit query string parameters - :param shipyard_conf: Location of shipyard.conf - :param sub_dag_name: Child Dag :param svc_session: Keystone Session :param svc_token: Keystone Token - :param xcom_push: xcom usage The Armada operator assumes that prior steps have set xcoms for the action and the deployment configuration """ - super(ArmadaBaseOperator, self).__init__(*args, **kwargs) + super(ArmadaBaseOperator, + self).__init__( + pod_selector_pattern=[{'pod_pattern': 'armada-api', + 'container': 'armada-api'}], + *args, **kwargs) self.armada_svc_type = armada_svc_type self.deckhand_svc_type = deckhand_svc_type - self.main_dag_name = main_dag_name self.query = query - self.shipyard_conf = shipyard_conf - self.sub_dag_name = sub_dag_name self.svc_session = svc_session self.svc_token = svc_token - self.xcom_push_flag = xcom_push - - def execute(self, context): - - # Execute armada base function - self.armada_base(context) - - # Exeute child function - self.do_execute() @shipyard_service_token - def armada_base(self, context): - - # Define task_instance - self.task_instance = context['task_instance'] - - # Set up and retrieve values from xcom - self.xcom_puller = XcomPuller(self.main_dag_name, self.task_instance) - self.action_info = self.xcom_puller.get_action_info() - self.dc = self.xcom_puller.get_deployment_configuration() + def run_base(self, context): # Set up xcom_pusher to push values to xcom self.xcom_pusher = XcomPusher(self.task_instance) # Logs uuid of action performed by the Operator - logging.info("Armada Operator for action %s", self.action_info['id']) + LOG.info("Armada Operator for action %s", self.action_info['id']) # Retrieve Endpoint Information armada_svc_endpoint = ucp_service_endpoint( @@ -128,14 +104,14 @@ class ArmadaBaseOperator(BaseOperator): @staticmethod def _init_armada_client(armada_svc_endpoint, svc_token): - logging.info("Armada endpoint is %s", armada_svc_endpoint) + LOG.info("Armada endpoint is %s", armada_svc_endpoint) # Parse Armada Service Endpoint armada_url = urlparse(armada_svc_endpoint) # Build a ArmadaSession with credentials and target host # information. - logging.info("Build Armada Session") + LOG.info("Build Armada Session") a_session = session.ArmadaSession(host=armada_url.hostname, port=armada_url.port, scheme='http', @@ -144,18 +120,18 @@ class ArmadaBaseOperator(BaseOperator): # Raise Exception if we are not able to set up the session if a_session: - logging.info("Successfully Set Up Armada Session") + LOG.info("Successfully Set Up Armada Session") else: raise AirflowException("Failed to set up Armada Session!") # Use the ArmadaSession to build a ArmadaClient that can # be used to make one or more API calls - logging.info("Create Armada Client") + LOG.info("Create Armada Client") _armada_client = client.ArmadaClient(a_session) # Raise Exception if we are not able to build armada client if _armada_client: - logging.info("Successfully Set Up Armada client") + LOG.info("Successfully Set Up Armada client") return _armada_client else: @@ -165,7 +141,7 @@ class ArmadaBaseOperator(BaseOperator): def _init_deckhand_design_ref(deckhand_svc_endpoint, committed_revision_id): - logging.info("Deckhand endpoint is %s", deckhand_svc_endpoint) + LOG.info("Deckhand endpoint is %s", deckhand_svc_endpoint) # Form DeckHand Design Reference Path # This URL will be used to retrieve the Site Design YAMLs @@ -176,8 +152,8 @@ class ArmadaBaseOperator(BaseOperator): "rendered-documents") if _deckhand_design_ref: - logging.info("Design YAMLs will be retrieved from %s", - _deckhand_design_ref) + LOG.info("Design YAMLs will be retrieved from %s", + _deckhand_design_ref) return _deckhand_design_ref else: diff --git a/shipyard_airflow/plugins/armada_get_releases.py b/shipyard_airflow/plugins/armada_get_releases.py index 616debfe..b805c0b2 100644 --- a/shipyard_airflow/plugins/armada_get_releases.py +++ b/shipyard_airflow/plugins/armada_get_releases.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging from airflow.exceptions import AirflowException @@ -20,6 +19,8 @@ from airflow.plugins_manager import AirflowPlugin from armada_base_operator import ArmadaBaseOperator from armada.exceptions import api_exceptions as errors +LOG = logging.getLogger(__name__) + class ArmadaGetReleasesOperator(ArmadaBaseOperator): @@ -39,7 +40,7 @@ class ArmadaGetReleasesOperator(ArmadaBaseOperator): timeout = self.dc['armada.get_releases_timeout'] # Retrieve Armada Releases after deployment - logging.info("Retrieving Helm charts releases after deployment..") + LOG.info("Retrieving Helm charts releases after deployment..") try: armada_get_releases = self.armada_client.get_releases( @@ -50,9 +51,12 @@ class ArmadaGetReleasesOperator(ArmadaBaseOperator): raise AirflowException(client_error) if armada_get_releases: - logging.info("Successfully retrieved Helm charts releases") - logging.info(armada_get_releases) + LOG.info("Successfully retrieved Helm charts releases") + LOG.info(armada_get_releases) else: + # Dump logs from Armada API pods + self.get_k8s_logs() + raise AirflowException("Failed to retrieve Helm charts releases!") diff --git a/shipyard_airflow/plugins/armada_get_status.py b/shipyard_airflow/plugins/armada_get_status.py index 9690a46d..1acba694 100644 --- a/shipyard_airflow/plugins/armada_get_status.py +++ b/shipyard_airflow/plugins/armada_get_status.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging from airflow.exceptions import AirflowException @@ -20,6 +19,8 @@ from airflow.plugins_manager import AirflowPlugin from armada_base_operator import ArmadaBaseOperator from armada.exceptions import api_exceptions as errors +LOG = logging.getLogger(__name__) + class ArmadaGetStatusOperator(ArmadaBaseOperator): @@ -51,11 +52,14 @@ class ArmadaGetStatusOperator(ArmadaBaseOperator): # Tiller State will return boolean value, i.e. True/False # Raise Exception if Tiller is unhealthy if armada_get_status['tiller']['state']: - logging.info("Tiller is in running state") - logging.info("Tiller version is %s", - armada_get_status['tiller']['version']) + LOG.info("Tiller is in running state") + LOG.info("Tiller version is %s", + armada_get_status['tiller']['version']) else: + # Dump logs from Armada API pods + self.get_k8s_logs() + raise AirflowException("Please check Tiller!") diff --git a/shipyard_airflow/plugins/armada_post_apply.py b/shipyard_airflow/plugins/armada_post_apply.py index ff6a1a43..ad931561 100644 --- a/shipyard_airflow/plugins/armada_post_apply.py +++ b/shipyard_airflow/plugins/armada_post_apply.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging from airflow.exceptions import AirflowException @@ -20,6 +19,8 @@ from airflow.plugins_manager import AirflowPlugin from armada_base_operator import ArmadaBaseOperator from armada.exceptions import api_exceptions as errors +LOG = logging.getLogger(__name__) + class ArmadaPostApplyOperator(ArmadaBaseOperator): @@ -52,7 +53,7 @@ class ArmadaPostApplyOperator(ArmadaBaseOperator): timeout = self.dc['armada.post_apply_timeout'] # Execute Armada Apply to install the helm charts in sequence - logging.info("Armada Apply") + LOG.info("Armada Apply") try: armada_post_apply = self.armada_client.post_apply( @@ -64,6 +65,9 @@ class ArmadaPostApplyOperator(ArmadaBaseOperator): timeout=timeout) except errors.ClientError as client_error: + # Dump logs from Armada API pods + self.get_k8s_logs() + raise AirflowException(client_error) # if this is a retry, assume that the airflow worker needs to be @@ -74,7 +78,7 @@ class ArmadaPostApplyOperator(ArmadaBaseOperator): # needs to. Problem with xcom is that it is cleared for the task # on retry, which means we can't use it as a flag reliably. if self.task_instance.try_number > 1: - logging.info( + LOG.info( "Airflow Worker will be upgraded because retry may obfuscate " "an upgrade of shipyard/airflow." ) @@ -87,7 +91,7 @@ class ArmadaPostApplyOperator(ArmadaBaseOperator): # part of the name of the Shipyard Helm Chart. for i in armada_post_apply['message']['upgrade']: if 'shipyard' in i: - logging.info( + LOG.info( "Shipyard was upgraded. Airflow worker must be " "restarted to reflect any workflow changes." ) @@ -110,11 +114,11 @@ class ArmadaPostApplyOperator(ArmadaBaseOperator): # changed. if (armada_post_apply['message']['install'] or armada_post_apply['message']['upgrade']): - logging.info("Successfully Executed Armada Apply") - logging.info(armada_post_apply) + LOG.info("Successfully Executed Armada Apply") + LOG.info(armada_post_apply) else: - logging.warning("No new changes/updates were detected!") - logging.info(armada_post_apply) + LOG.warning("No new changes/updates were detected!") + LOG.info(armada_post_apply) class ArmadaPostApplyOperatorPlugin(AirflowPlugin): diff --git a/shipyard_airflow/plugins/armada_validate_design.py b/shipyard_airflow/plugins/armada_validate_design.py index ac17d97b..c7672f46 100644 --- a/shipyard_airflow/plugins/armada_validate_design.py +++ b/shipyard_airflow/plugins/armada_validate_design.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging from airflow.plugins_manager import AirflowPlugin @@ -20,6 +19,8 @@ from airflow.exceptions import AirflowException from armada_base_operator import ArmadaBaseOperator from armada.exceptions import api_exceptions as errors +LOG = logging.getLogger(__name__) + class ArmadaValidateDesignOperator(ArmadaBaseOperator): @@ -33,7 +34,7 @@ class ArmadaValidateDesignOperator(ArmadaBaseOperator): def do_execute(self): # Requests Armada to validate site design - logging.info("Waiting for Armada to validate site design...") + LOG.info("Waiting for Armada to validate site design...") # Retrieve read timeout timeout = self.dc['armada.validate_design_timeout'] @@ -45,18 +46,24 @@ class ArmadaValidateDesignOperator(ArmadaBaseOperator): timeout=timeout) except errors.ClientError as client_error: + # Dump logs from Armada API pods + self.get_k8s_logs() + raise AirflowException(client_error) # Print results - logging.info("Retrieving Armada validate site design response...") - logging.info(post_validate) + LOG.info("Retrieving Armada validate site design response...") + LOG.info(post_validate) # Check if site design is valid status = str(post_validate.get('status', 'unspecified')) if status.lower() == 'success': - logging.info("Site Design has been successfully validated") + LOG.info("Site Design has been successfully validated") else: + # Dump logs from Armada API pods + self.get_k8s_logs() + raise AirflowException("Site Design Validation Failed " "with status: {}!".format(status)) diff --git a/shipyard_airflow/plugins/deckhand_base_operator.py b/shipyard_airflow/plugins/deckhand_base_operator.py index 18899508..491c9394 100644 --- a/shipyard_airflow/plugins/deckhand_base_operator.py +++ b/shipyard_airflow/plugins/deckhand_base_operator.py @@ -11,11 +11,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import configparser import logging -from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults from airflow.plugins_manager import AirflowPlugin from airflow.exceptions import AirflowException @@ -23,10 +21,12 @@ from airflow.exceptions import AirflowException from deckhand.client import client as deckhand_client from service_endpoint import ucp_service_endpoint from service_token import shipyard_service_token -from xcom_puller import XcomPuller +from ucp_base_operator import UcpBaseOperator + +LOG = logging.getLogger(__name__) -class DeckhandBaseOperator(BaseOperator): +class DeckhandBaseOperator(UcpBaseOperator): """Deckhand Base Operator @@ -43,14 +43,10 @@ class DeckhandBaseOperator(BaseOperator): deckhand_client_read_timeout=None, deckhand_svc_endpoint=None, deckhand_svc_type='deckhand', - main_dag_name=None, revision_id=None, - shipyard_conf=None, - sub_dag_name=None, svc_session=None, svc_token=None, validation_read_timeout=None, - xcom_push=True, *args, **kwargs): """Initialization of DeckhandBaseOperator object. @@ -59,47 +55,30 @@ class DeckhandBaseOperator(BaseOperator): :param deckhand_client_read_timeout: Deckhand client connect timeout :param deckhand_svc_endpoint: Deckhand Service Endpoint :param deckhand_svc_type: Deckhand Service Type - :param main_dag_name: Parent Dag :param revision_id: Target revision for workflow - :param shipyard_conf: Path of shipyard.conf - :param sub_dag_name: Child Dag :param svc_session: Keystone Session :param svc_token: Keystone Token :param validation_read_timeout: Deckhand validation timeout - :param xcom_push: xcom usage """ - super(DeckhandBaseOperator, self).__init__(*args, **kwargs) + super(DeckhandBaseOperator, + self).__init__( + pod_selector_pattern=[{'pod_pattern': 'deckhand-api', + 'container': 'deckhand-api'}], + *args, **kwargs) self.committed_ver = committed_ver self.deckhandclient = deckhandclient self.deckhand_client_read_timeout = deckhand_client_read_timeout self.deckhand_svc_endpoint = deckhand_svc_endpoint self.deckhand_svc_type = deckhand_svc_type - self.main_dag_name = main_dag_name self.revision_id = revision_id - self.shipyard_conf = shipyard_conf - self.sub_dag_name = sub_dag_name self.svc_session = svc_session self.svc_token = svc_token self.validation_read_timeout = validation_read_timeout - self.xcom_push_flag = xcom_push - - def execute(self, context): - - # Execute deckhand base function - self.deckhand_base(context) - - # Exeute child function - self.do_execute() - - # Push last committed version to xcom for the - # 'get_design_version' subdag - if self.sub_dag_name == 'get_design_version': - return self.committed_ver @shipyard_service_token - def deckhand_base(self, context): + def run_base(self, context): # Read and parse shiyard.conf config = configparser.ConfigParser() @@ -112,26 +91,19 @@ class DeckhandBaseOperator(BaseOperator): self.validation_read_timeout = int(config.get( 'requests_config', 'validation_read_timeout')) - # Define task_instance - task_instance = context['task_instance'] - - # Set up and retrieve values from xcom - self.xcom_puller = XcomPuller(self.main_dag_name, task_instance) - self.action_info = self.xcom_puller.get_action_info() - # Logs uuid of Shipyard action - logging.info("Executing Shipyard Action %s", - self.action_info['id']) + LOG.info("Executing Shipyard Action %s", + self.action_info['id']) # Retrieve Endpoint Information self.deckhand_svc_endpoint = ucp_service_endpoint( self, svc_type=self.deckhand_svc_type) - logging.info("Deckhand endpoint is %s", - self.deckhand_svc_endpoint) + LOG.info("Deckhand endpoint is %s", + self.deckhand_svc_endpoint) # Set up DeckHand Client - logging.info("Setting up DeckHand Client...") + LOG.info("Setting up DeckHand Client...") # NOTE: The communication between the Airflow workers # and Deckhand happens via the 'internal' endpoint. @@ -155,7 +127,7 @@ class DeckhandBaseOperator(BaseOperator): self.revision_id = self.xcom_puller.get_design_version() if self.revision_id: - logging.info("Revision ID is %d", self.revision_id) + LOG.info("Revision ID is %d", self.revision_id) else: raise AirflowException('Failed to retrieve Revision ID!') diff --git a/shipyard_airflow/plugins/deckhand_get_design.py b/shipyard_airflow/plugins/deckhand_get_design.py index 329af20e..b7800cc8 100644 --- a/shipyard_airflow/plugins/deckhand_get_design.py +++ b/shipyard_airflow/plugins/deckhand_get_design.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging import os import requests @@ -22,6 +21,8 @@ from airflow.exceptions import AirflowException from deckhand_base_operator import DeckhandBaseOperator +LOG = logging.getLogger(__name__) + class DeckhandGetDesignOperator(DeckhandBaseOperator): @@ -42,7 +43,7 @@ class DeckhandGetDesignOperator(DeckhandBaseOperator): 'revisions') # Retrieve Revision - logging.info("Retrieving revisions information...") + LOG.info("Retrieving revisions information...") try: query_params = {'tag': 'committed', 'sort': 'id', 'order': 'desc'} @@ -53,12 +54,15 @@ class DeckhandGetDesignOperator(DeckhandBaseOperator): timeout=self.deckhand_client_read_timeout).text) except requests.exceptions.RequestException as e: + # Dump logs from Deckhand pods + self.get_k8s_logs() + raise AirflowException(e) # Print info about revisions from DeckHand - logging.info("Revisions response: %s", revisions) - logging.info("The number of committed revisions is %s", - revisions['count']) + LOG.info("Revisions response: %s", revisions) + LOG.info("The number of committed revisions is %s", + revisions['count']) # Search for the latest committed version and save it as xcom. # Since the order : desc paramater above, this is index 0 if there @@ -66,10 +70,13 @@ class DeckhandGetDesignOperator(DeckhandBaseOperator): revision_list = revisions.get('results', []) if revision_list: self.committed_ver = revision_list[0].get('id') - logging.info("Latest committed revision is %d", self.committed_ver) + LOG.info("Latest committed revision is %d", self.committed_ver) # Error if we cannot resolve the committed version to use. if not self.committed_ver: + # Dump logs from Deckhand pods + self.get_k8s_logs() + raise AirflowException("No committed revision found in Deckhand!") diff --git a/shipyard_airflow/plugins/deckhand_retrieve_rendered_doc.py b/shipyard_airflow/plugins/deckhand_retrieve_rendered_doc.py index 0afd9f59..cffe3a37 100644 --- a/shipyard_airflow/plugins/deckhand_retrieve_rendered_doc.py +++ b/shipyard_airflow/plugins/deckhand_retrieve_rendered_doc.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging from airflow.plugins_manager import AirflowPlugin @@ -19,6 +18,8 @@ from airflow.exceptions import AirflowException from deckhand_base_operator import DeckhandBaseOperator +LOG = logging.getLogger(__name__) + class DeckhandRetrieveRenderedDocOperator(DeckhandBaseOperator): @@ -30,17 +31,19 @@ class DeckhandRetrieveRenderedDocOperator(DeckhandBaseOperator): def do_execute(self): - logging.info("Retrieving Rendered Document...") + LOG.info("Retrieving Rendered Document...") # Retrieve Rendered Document try: - rendered_doc = self.deckhandclient.revisions.documents( + self.deckhandclient.revisions.documents( self.revision_id, rendered=True) - logging.info("Successfully Retrieved Rendered Document") - logging.info(rendered_doc) + LOG.info("Successfully Retrieved Rendered Document") except: + # Dump logs from Deckhand pods + self.get_k8s_logs() + raise AirflowException("Failed to Retrieve Rendered Document!") diff --git a/shipyard_airflow/plugins/deckhand_validate_site.py b/shipyard_airflow/plugins/deckhand_validate_site.py index 8b3a43a4..cc77c731 100644 --- a/shipyard_airflow/plugins/deckhand_validate_site.py +++ b/shipyard_airflow/plugins/deckhand_validate_site.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging import os import requests @@ -22,6 +21,8 @@ from airflow.exceptions import AirflowException from deckhand_base_operator import DeckhandBaseOperator +LOG = logging.getLogger(__name__) + class DeckhandValidateSiteDesignOperator(DeckhandBaseOperator): @@ -43,7 +44,7 @@ class DeckhandValidateSiteDesignOperator(DeckhandBaseOperator): str(self.revision_id), 'validations') # Retrieve Validation list - logging.info("Retrieving validation list...") + LOG.info("Retrieving validation list...") try: retrieved_list = yaml.safe_load( @@ -52,14 +53,20 @@ class DeckhandValidateSiteDesignOperator(DeckhandBaseOperator): timeout=self.validation_read_timeout).text) except requests.exceptions.RequestException as e: + # Dump logs from Deckhand pods + self.get_k8s_logs() + raise AirflowException(e) if (any([str(v.get('status', 'unspecified')).lower() == 'failure' for v in retrieved_list.get('results', [])])): + # Dump logs from Deckhand pods + self.get_k8s_logs() + raise AirflowException("DeckHand Site Design Validation Failed!") else: - logging.info("Revision %d has been successfully validated", - self.revision_id) + LOG.info("Revision %d has been successfully validated", + self.revision_id) class DeckhandValidateSiteDesignOperatorPlugin(AirflowPlugin): diff --git a/shipyard_airflow/plugins/promenade_base_operator.py b/shipyard_airflow/plugins/promenade_base_operator.py index 1389f9e1..e4987acd 100644 --- a/shipyard_airflow/plugins/promenade_base_operator.py +++ b/shipyard_airflow/plugins/promenade_base_operator.py @@ -11,20 +11,20 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging -from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults from airflow.plugins_manager import AirflowPlugin from airflow.exceptions import AirflowException from service_endpoint import ucp_service_endpoint from service_token import shipyard_service_token -from xcom_puller import XcomPuller +from ucp_base_operator import UcpBaseOperator + +LOG = logging.getLogger(__name__) -class PromenadeBaseOperator(BaseOperator): +class PromenadeBaseOperator(UcpBaseOperator): """Promenade Base Operator @@ -35,59 +35,36 @@ class PromenadeBaseOperator(BaseOperator): @apply_defaults def __init__(self, - main_dag_name=None, promenade_svc_endpoint=None, promenade_svc_type='kubernetesprovisioner', redeploy_server=None, - shipyard_conf=None, - sub_dag_name=None, svc_token=None, - xcom_push=True, *args, **kwargs): """Initialization of PromenadeBaseOperator object. - :param main_dag_name: Parent Dag :param promenade_svc_endpoint: Promenade Service Endpoint :param promenade_svc_type: Promenade Service Type :param redeploy_server: Server to be redeployed - :param shipyard_conf: Path of shipyard.conf - :param sub_dag_name: Child Dag :param svc_token: Keystone Token - :param xcom_push: xcom usage The Drydock operator assumes that prior steps have set xcoms for the action and the deployment configuration """ - super(PromenadeBaseOperator, self).__init__(*args, - **kwargs) - self.main_dag_name = main_dag_name + super(PromenadeBaseOperator, + self).__init__( + pod_selector_pattern=[{'pod_pattern': 'promenade-api', + 'container': 'promenade-api'}], + *args, **kwargs) self.promenade_svc_endpoint = promenade_svc_endpoint self.promenade_svc_type = promenade_svc_type self.redeploy_server = redeploy_server - self.shipyard_conf = shipyard_conf - self.sub_dag_name = sub_dag_name self.svc_token = svc_token - self.xcom_push_flag = xcom_push - - def execute(self, context): - # Execute promenade base function - self.promenade_base(context) - - # Exeute child function - self.do_execute() @shipyard_service_token - def promenade_base(self, context): - # Define task_instance - task_instance = context['task_instance'] - - # Set up and retrieve values from xcom - self.xcom_puller = XcomPuller(self.main_dag_name, task_instance) - self.action_info = self.xcom_puller.get_action_info() - self.dc = self.xcom_puller.get_deployment_configuration() + def run_base(self, context): # Logs uuid of Shipyard action - logging.info("Executing Shipyard Action %s", self.action_info['id']) + LOG.info("Executing Shipyard Action %s", self.action_info['id']) # Retrieve information of the server that we want to redeploy # if user executes the 'redeploy_server' dag @@ -96,8 +73,7 @@ class PromenadeBaseOperator(BaseOperator): 'server-name') if self.redeploy_server: - logging.info("Server to be redeployed is %s", - self.redeploy_server) + LOG.info("Server to be redeployed is %s", self.redeploy_server) else: raise AirflowException('%s was unable to retrieve the ' 'server to be redeployed.' @@ -107,8 +83,7 @@ class PromenadeBaseOperator(BaseOperator): self.promenade_svc_endpoint = ucp_service_endpoint( self, svc_type=self.promenade_svc_type) - logging.info("Promenade endpoint is %s", - self.promenade_svc_endpoint) + LOG.info("Promenade endpoint is %s", self.promenade_svc_endpoint) class PromenadeBaseOperatorPlugin(AirflowPlugin): diff --git a/shipyard_airflow/plugins/promenade_check_etcd.py b/shipyard_airflow/plugins/promenade_check_etcd.py index f0a406ac..944e7f2b 100644 --- a/shipyard_airflow/plugins/promenade_check_etcd.py +++ b/shipyard_airflow/plugins/promenade_check_etcd.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging import time @@ -20,6 +19,8 @@ from airflow.exceptions import AirflowException from promenade_base_operator import PromenadeBaseOperator +LOG = logging.getLogger(__name__) + class PromenadeCheckEtcdOperator(PromenadeBaseOperator): @@ -37,13 +38,13 @@ class PromenadeCheckEtcdOperator(PromenadeBaseOperator): # TODO(bryan-strassner) use: # self.dc['kubernetes_provisioner.etcd_ready_timeout'] # self.dc['kubernetes_provisioner.remove_etcd_timeout'] - logging.info("Performing health check on etcd...") + LOG.info("Performing health check on etcd...") time.sleep(5) check_etcd = True if check_etcd: - logging.info("The etcd cluster is healthy and ready") + LOG.info("The etcd cluster is healthy and ready") else: raise AirflowException('Please check the state of etcd!') diff --git a/shipyard_airflow/plugins/promenade_clear_labels.py b/shipyard_airflow/plugins/promenade_clear_labels.py index a78a9f8d..1aee2669 100644 --- a/shipyard_airflow/plugins/promenade_clear_labels.py +++ b/shipyard_airflow/plugins/promenade_clear_labels.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging import time @@ -20,6 +19,8 @@ from airflow.exceptions import AirflowException from promenade_base_operator import PromenadeBaseOperator +LOG = logging.getLogger(__name__) + class PromenadeClearLabelsOperator(PromenadeBaseOperator): @@ -37,14 +38,14 @@ class PromenadeClearLabelsOperator(PromenadeBaseOperator): # TODO(bryan-strassner) use: # self.dc['kubernetes_provisioner.clear_labels_timeout'] - logging.info("Removing labels on node...") + LOG.info("Removing labels on node...") time.sleep(5) labels_removed = True if labels_removed: - logging.info("Successfully removed labels on %s", - self.redeploy_server) + LOG.info("Successfully removed labels on %s", + self.redeploy_server) else: raise AirflowException('Failed to remove labels on %s!', self.redeploy_server) diff --git a/shipyard_airflow/plugins/promenade_decommission_node.py b/shipyard_airflow/plugins/promenade_decommission_node.py index 74664b31..8dc83172 100644 --- a/shipyard_airflow/plugins/promenade_decommission_node.py +++ b/shipyard_airflow/plugins/promenade_decommission_node.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging import time @@ -20,6 +19,8 @@ from airflow.exceptions import AirflowException from promenade_base_operator import PromenadeBaseOperator +LOG = logging.getLogger(__name__) + class PromenadeDecommissionNodeOperator(PromenadeBaseOperator): @@ -33,14 +34,14 @@ class PromenadeDecommissionNodeOperator(PromenadeBaseOperator): def do_execute(self): # Placeholder function. Updates will be made when the Promenade # API is ready for consumption. - logging.info("Decommissioning node from Kubernetes cluster...") + LOG.info("Decommissioning node from Kubernetes cluster...") time.sleep(5) decommission_node = True if decommission_node: - logging.info("Succesfully decommissioned node %s", - self.redeploy_server) + LOG.info("Succesfully decommissioned node %s", + self.redeploy_server) else: raise AirflowException('Failed to decommission node %s!', self.redeploy_server) diff --git a/shipyard_airflow/plugins/promenade_drain_node.py b/shipyard_airflow/plugins/promenade_drain_node.py index 1822bdf7..47ced5ee 100644 --- a/shipyard_airflow/plugins/promenade_drain_node.py +++ b/shipyard_airflow/plugins/promenade_drain_node.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging import time @@ -20,6 +19,8 @@ from airflow.exceptions import AirflowException from promenade_base_operator import PromenadeBaseOperator +LOG = logging.getLogger(__name__) + class PromenadeDrainNodeOperator(PromenadeBaseOperator): @@ -40,14 +41,14 @@ class PromenadeDrainNodeOperator(PromenadeBaseOperator): # self.dc['kubernetes_provisioner.drain_timeout'] # self.dc['kubernetes_provisioner.drain_grace_period'] - logging.info("Draining node...") + LOG.info("Draining node...") time.sleep(5) node_drained = True if node_drained: - logging.info("Node %s has been successfully drained", - self.redeploy_server) + LOG.info("Node %s has been successfully drained", + self.redeploy_server) else: raise AirflowException('Failed to drain %s!', self.redeploy_server) diff --git a/shipyard_airflow/plugins/promenade_shutdown_kubelet.py b/shipyard_airflow/plugins/promenade_shutdown_kubelet.py index 774c731c..0a880f59 100644 --- a/shipyard_airflow/plugins/promenade_shutdown_kubelet.py +++ b/shipyard_airflow/plugins/promenade_shutdown_kubelet.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging import time @@ -20,6 +19,8 @@ from airflow.exceptions import AirflowException from promenade_base_operator import PromenadeBaseOperator +LOG = logging.getLogger(__name__) + class PromenadeShutdownKubeletOperator(PromenadeBaseOperator): @@ -33,14 +34,14 @@ class PromenadeShutdownKubeletOperator(PromenadeBaseOperator): def do_execute(self): # Placeholder function. Updates will be made when the Promenade # API is ready for consumption. - logging.info("Shutting down kubelet on node...") + LOG.info("Shutting down kubelet on node...") time.sleep(5) shutdown_kubelet = True if shutdown_kubelet: - logging.info("Successfully shut down kubelet on %s", - self.redeploy_server) + LOG.info("Successfully shut down kubelet on %s", + self.redeploy_server) else: raise AirflowException('Failed to shut down kubelet on %s!', self.redeploy_server) diff --git a/shipyard_airflow/plugins/ucp_base_operator.py b/shipyard_airflow/plugins/ucp_base_operator.py index 2a1a35a4..368f60db 100644 --- a/shipyard_airflow/plugins/ucp_base_operator.py +++ b/shipyard_airflow/plugins/ucp_base_operator.py @@ -86,6 +86,11 @@ class UcpBaseOperator(BaseOperator): # Exeute child function self.do_execute() + # Push last committed version to xcom for the + # 'get_design_version' subdag + if self.sub_dag_name == 'get_design_version': + return self.committed_ver + def ucp_base(self, context): LOG.info("Running UCP Base Operator...") @@ -98,10 +103,10 @@ class UcpBaseOperator(BaseOperator): self.ucp_namespace = config.get('k8s_logs', 'ucp_namespace') # Define task_instance - task_instance = context['task_instance'] + self.task_instance = context['task_instance'] # Set up and retrieve values from xcom - self.xcom_puller = XcomPuller(self.main_dag_name, task_instance) + self.xcom_puller = XcomPuller(self.main_dag_name, self.task_instance) self.action_info = self.xcom_puller.get_action_info() self.dc = self.xcom_puller.get_deployment_configuration()