diff --git a/charts/shipyard/templates/deployment-shipyard.yaml b/charts/shipyard/templates/deployment-shipyard.yaml index 5de45438..0eaa33a8 100644 --- a/charts/shipyard/templates/deployment-shipyard.yaml +++ b/charts/shipyard/templates/deployment-shipyard.yaml @@ -24,7 +24,7 @@ apiVersion: apps/v1beta1 kind: Deployment metadata: - name: shipyard + name: shipyard-api spec: replicas: {{ .Values.pod.replicas.shipyard.api }} {{ tuple $envAll | include "helm-toolkit.snippets.kubernetes_upgrades_deployment" | indent 2 }} @@ -44,7 +44,7 @@ spec: initContainers: {{ tuple $envAll $dependencies $mounts_shipyard_init | include "helm-toolkit.snippets.kubernetes_entrypoint_init_container" | indent 8 }} containers: - - name: shipyard + - name: shipyard-api env: - name: 'SHIPYARD_API_WORKERS' value: {{ .Values.conf.uwsgi.workers | quote }} diff --git a/shipyard_airflow/plugins/armada_base_operator.py b/shipyard_airflow/plugins/armada_base_operator.py index c76050c9..a66415d9 100644 --- a/shipyard_airflow/plugins/armada_base_operator.py +++ b/shipyard_airflow/plugins/armada_base_operator.py @@ -11,13 +11,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging import os from urllib.parse import urlparse from airflow.exceptions import AirflowException -from airflow.models import BaseOperator from airflow.plugins_manager import AirflowPlugin from airflow.utils.decorators import apply_defaults @@ -26,11 +24,13 @@ import armada.common.session as session from get_k8s_pod_port_ip import get_pod_port_ip from service_endpoint import ucp_service_endpoint from service_token import shipyard_service_token -from xcom_puller import XcomPuller +from ucp_base_operator import UcpBaseOperator from xcom_pusher import XcomPusher +LOG = logging.getLogger(__name__) -class ArmadaBaseOperator(BaseOperator): + +class ArmadaBaseOperator(UcpBaseOperator): """Armada Base Operator @@ -44,66 +44,42 @@ class ArmadaBaseOperator(BaseOperator): def __init__(self, armada_svc_type='armada', deckhand_svc_type='deckhand', - main_dag_name=None, query={}, - shipyard_conf=None, - sub_dag_name=None, svc_session=None, svc_token=None, - xcom_push=True, *args, **kwargs): """Initialization of ArmadaBaseOperator object. :param armada_svc_type: Armada Service Type :param deckhand_svc_type: Deckhand Service Type - :param main_dag_name: Parent Dag :param query: A dictionary containing explicit query string parameters - :param shipyard_conf: Location of shipyard.conf - :param sub_dag_name: Child Dag :param svc_session: Keystone Session :param svc_token: Keystone Token - :param xcom_push: xcom usage The Armada operator assumes that prior steps have set xcoms for the action and the deployment configuration """ - super(ArmadaBaseOperator, self).__init__(*args, **kwargs) + super(ArmadaBaseOperator, + self).__init__( + pod_selector_pattern=[{'pod_pattern': 'armada-api', + 'container': 'armada-api'}], + *args, **kwargs) self.armada_svc_type = armada_svc_type self.deckhand_svc_type = deckhand_svc_type - self.main_dag_name = main_dag_name self.query = query - self.shipyard_conf = shipyard_conf - self.sub_dag_name = sub_dag_name self.svc_session = svc_session self.svc_token = svc_token - self.xcom_push_flag = xcom_push - - def execute(self, context): - - # Execute armada base function - self.armada_base(context) - - # Exeute child function - self.do_execute() @shipyard_service_token - def armada_base(self, context): - - # Define task_instance - self.task_instance = context['task_instance'] - - # Set up and retrieve values from xcom - self.xcom_puller = XcomPuller(self.main_dag_name, self.task_instance) - self.action_info = self.xcom_puller.get_action_info() - self.dc = self.xcom_puller.get_deployment_configuration() + def run_base(self, context): # Set up xcom_pusher to push values to xcom self.xcom_pusher = XcomPusher(self.task_instance) # Logs uuid of action performed by the Operator - logging.info("Armada Operator for action %s", self.action_info['id']) + LOG.info("Armada Operator for action %s", self.action_info['id']) # Retrieve Endpoint Information armada_svc_endpoint = ucp_service_endpoint( @@ -128,14 +104,14 @@ class ArmadaBaseOperator(BaseOperator): @staticmethod def _init_armada_client(armada_svc_endpoint, svc_token): - logging.info("Armada endpoint is %s", armada_svc_endpoint) + LOG.info("Armada endpoint is %s", armada_svc_endpoint) # Parse Armada Service Endpoint armada_url = urlparse(armada_svc_endpoint) # Build a ArmadaSession with credentials and target host # information. - logging.info("Build Armada Session") + LOG.info("Build Armada Session") a_session = session.ArmadaSession(host=armada_url.hostname, port=armada_url.port, scheme='http', @@ -144,18 +120,18 @@ class ArmadaBaseOperator(BaseOperator): # Raise Exception if we are not able to set up the session if a_session: - logging.info("Successfully Set Up Armada Session") + LOG.info("Successfully Set Up Armada Session") else: raise AirflowException("Failed to set up Armada Session!") # Use the ArmadaSession to build a ArmadaClient that can # be used to make one or more API calls - logging.info("Create Armada Client") + LOG.info("Create Armada Client") _armada_client = client.ArmadaClient(a_session) # Raise Exception if we are not able to build armada client if _armada_client: - logging.info("Successfully Set Up Armada client") + LOG.info("Successfully Set Up Armada client") return _armada_client else: @@ -165,7 +141,7 @@ class ArmadaBaseOperator(BaseOperator): def _init_deckhand_design_ref(deckhand_svc_endpoint, committed_revision_id): - logging.info("Deckhand endpoint is %s", deckhand_svc_endpoint) + LOG.info("Deckhand endpoint is %s", deckhand_svc_endpoint) # Form DeckHand Design Reference Path # This URL will be used to retrieve the Site Design YAMLs @@ -176,8 +152,8 @@ class ArmadaBaseOperator(BaseOperator): "rendered-documents") if _deckhand_design_ref: - logging.info("Design YAMLs will be retrieved from %s", - _deckhand_design_ref) + LOG.info("Design YAMLs will be retrieved from %s", + _deckhand_design_ref) return _deckhand_design_ref else: diff --git a/shipyard_airflow/plugins/armada_get_releases.py b/shipyard_airflow/plugins/armada_get_releases.py index 616debfe..b805c0b2 100644 --- a/shipyard_airflow/plugins/armada_get_releases.py +++ b/shipyard_airflow/plugins/armada_get_releases.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging from airflow.exceptions import AirflowException @@ -20,6 +19,8 @@ from airflow.plugins_manager import AirflowPlugin from armada_base_operator import ArmadaBaseOperator from armada.exceptions import api_exceptions as errors +LOG = logging.getLogger(__name__) + class ArmadaGetReleasesOperator(ArmadaBaseOperator): @@ -39,7 +40,7 @@ class ArmadaGetReleasesOperator(ArmadaBaseOperator): timeout = self.dc['armada.get_releases_timeout'] # Retrieve Armada Releases after deployment - logging.info("Retrieving Helm charts releases after deployment..") + LOG.info("Retrieving Helm charts releases after deployment..") try: armada_get_releases = self.armada_client.get_releases( @@ -50,9 +51,12 @@ class ArmadaGetReleasesOperator(ArmadaBaseOperator): raise AirflowException(client_error) if armada_get_releases: - logging.info("Successfully retrieved Helm charts releases") - logging.info(armada_get_releases) + LOG.info("Successfully retrieved Helm charts releases") + LOG.info(armada_get_releases) else: + # Dump logs from Armada API pods + self.get_k8s_logs() + raise AirflowException("Failed to retrieve Helm charts releases!") diff --git a/shipyard_airflow/plugins/armada_get_status.py b/shipyard_airflow/plugins/armada_get_status.py index 9690a46d..1acba694 100644 --- a/shipyard_airflow/plugins/armada_get_status.py +++ b/shipyard_airflow/plugins/armada_get_status.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging from airflow.exceptions import AirflowException @@ -20,6 +19,8 @@ from airflow.plugins_manager import AirflowPlugin from armada_base_operator import ArmadaBaseOperator from armada.exceptions import api_exceptions as errors +LOG = logging.getLogger(__name__) + class ArmadaGetStatusOperator(ArmadaBaseOperator): @@ -51,11 +52,14 @@ class ArmadaGetStatusOperator(ArmadaBaseOperator): # Tiller State will return boolean value, i.e. True/False # Raise Exception if Tiller is unhealthy if armada_get_status['tiller']['state']: - logging.info("Tiller is in running state") - logging.info("Tiller version is %s", - armada_get_status['tiller']['version']) + LOG.info("Tiller is in running state") + LOG.info("Tiller version is %s", + armada_get_status['tiller']['version']) else: + # Dump logs from Armada API pods + self.get_k8s_logs() + raise AirflowException("Please check Tiller!") diff --git a/shipyard_airflow/plugins/armada_post_apply.py b/shipyard_airflow/plugins/armada_post_apply.py index ff6a1a43..ad931561 100644 --- a/shipyard_airflow/plugins/armada_post_apply.py +++ b/shipyard_airflow/plugins/armada_post_apply.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging from airflow.exceptions import AirflowException @@ -20,6 +19,8 @@ from airflow.plugins_manager import AirflowPlugin from armada_base_operator import ArmadaBaseOperator from armada.exceptions import api_exceptions as errors +LOG = logging.getLogger(__name__) + class ArmadaPostApplyOperator(ArmadaBaseOperator): @@ -52,7 +53,7 @@ class ArmadaPostApplyOperator(ArmadaBaseOperator): timeout = self.dc['armada.post_apply_timeout'] # Execute Armada Apply to install the helm charts in sequence - logging.info("Armada Apply") + LOG.info("Armada Apply") try: armada_post_apply = self.armada_client.post_apply( @@ -64,6 +65,9 @@ class ArmadaPostApplyOperator(ArmadaBaseOperator): timeout=timeout) except errors.ClientError as client_error: + # Dump logs from Armada API pods + self.get_k8s_logs() + raise AirflowException(client_error) # if this is a retry, assume that the airflow worker needs to be @@ -74,7 +78,7 @@ class ArmadaPostApplyOperator(ArmadaBaseOperator): # needs to. Problem with xcom is that it is cleared for the task # on retry, which means we can't use it as a flag reliably. if self.task_instance.try_number > 1: - logging.info( + LOG.info( "Airflow Worker will be upgraded because retry may obfuscate " "an upgrade of shipyard/airflow." ) @@ -87,7 +91,7 @@ class ArmadaPostApplyOperator(ArmadaBaseOperator): # part of the name of the Shipyard Helm Chart. for i in armada_post_apply['message']['upgrade']: if 'shipyard' in i: - logging.info( + LOG.info( "Shipyard was upgraded. Airflow worker must be " "restarted to reflect any workflow changes." ) @@ -110,11 +114,11 @@ class ArmadaPostApplyOperator(ArmadaBaseOperator): # changed. if (armada_post_apply['message']['install'] or armada_post_apply['message']['upgrade']): - logging.info("Successfully Executed Armada Apply") - logging.info(armada_post_apply) + LOG.info("Successfully Executed Armada Apply") + LOG.info(armada_post_apply) else: - logging.warning("No new changes/updates were detected!") - logging.info(armada_post_apply) + LOG.warning("No new changes/updates were detected!") + LOG.info(armada_post_apply) class ArmadaPostApplyOperatorPlugin(AirflowPlugin): diff --git a/shipyard_airflow/plugins/armada_validate_design.py b/shipyard_airflow/plugins/armada_validate_design.py index ac17d97b..c7672f46 100644 --- a/shipyard_airflow/plugins/armada_validate_design.py +++ b/shipyard_airflow/plugins/armada_validate_design.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging from airflow.plugins_manager import AirflowPlugin @@ -20,6 +19,8 @@ from airflow.exceptions import AirflowException from armada_base_operator import ArmadaBaseOperator from armada.exceptions import api_exceptions as errors +LOG = logging.getLogger(__name__) + class ArmadaValidateDesignOperator(ArmadaBaseOperator): @@ -33,7 +34,7 @@ class ArmadaValidateDesignOperator(ArmadaBaseOperator): def do_execute(self): # Requests Armada to validate site design - logging.info("Waiting for Armada to validate site design...") + LOG.info("Waiting for Armada to validate site design...") # Retrieve read timeout timeout = self.dc['armada.validate_design_timeout'] @@ -45,18 +46,24 @@ class ArmadaValidateDesignOperator(ArmadaBaseOperator): timeout=timeout) except errors.ClientError as client_error: + # Dump logs from Armada API pods + self.get_k8s_logs() + raise AirflowException(client_error) # Print results - logging.info("Retrieving Armada validate site design response...") - logging.info(post_validate) + LOG.info("Retrieving Armada validate site design response...") + LOG.info(post_validate) # Check if site design is valid status = str(post_validate.get('status', 'unspecified')) if status.lower() == 'success': - logging.info("Site Design has been successfully validated") + LOG.info("Site Design has been successfully validated") else: + # Dump logs from Armada API pods + self.get_k8s_logs() + raise AirflowException("Site Design Validation Failed " "with status: {}!".format(status)) diff --git a/shipyard_airflow/plugins/deckhand_base_operator.py b/shipyard_airflow/plugins/deckhand_base_operator.py index 18899508..491c9394 100644 --- a/shipyard_airflow/plugins/deckhand_base_operator.py +++ b/shipyard_airflow/plugins/deckhand_base_operator.py @@ -11,11 +11,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import configparser import logging -from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults from airflow.plugins_manager import AirflowPlugin from airflow.exceptions import AirflowException @@ -23,10 +21,12 @@ from airflow.exceptions import AirflowException from deckhand.client import client as deckhand_client from service_endpoint import ucp_service_endpoint from service_token import shipyard_service_token -from xcom_puller import XcomPuller +from ucp_base_operator import UcpBaseOperator + +LOG = logging.getLogger(__name__) -class DeckhandBaseOperator(BaseOperator): +class DeckhandBaseOperator(UcpBaseOperator): """Deckhand Base Operator @@ -43,14 +43,10 @@ class DeckhandBaseOperator(BaseOperator): deckhand_client_read_timeout=None, deckhand_svc_endpoint=None, deckhand_svc_type='deckhand', - main_dag_name=None, revision_id=None, - shipyard_conf=None, - sub_dag_name=None, svc_session=None, svc_token=None, validation_read_timeout=None, - xcom_push=True, *args, **kwargs): """Initialization of DeckhandBaseOperator object. @@ -59,47 +55,30 @@ class DeckhandBaseOperator(BaseOperator): :param deckhand_client_read_timeout: Deckhand client connect timeout :param deckhand_svc_endpoint: Deckhand Service Endpoint :param deckhand_svc_type: Deckhand Service Type - :param main_dag_name: Parent Dag :param revision_id: Target revision for workflow - :param shipyard_conf: Path of shipyard.conf - :param sub_dag_name: Child Dag :param svc_session: Keystone Session :param svc_token: Keystone Token :param validation_read_timeout: Deckhand validation timeout - :param xcom_push: xcom usage """ - super(DeckhandBaseOperator, self).__init__(*args, **kwargs) + super(DeckhandBaseOperator, + self).__init__( + pod_selector_pattern=[{'pod_pattern': 'deckhand-api', + 'container': 'deckhand-api'}], + *args, **kwargs) self.committed_ver = committed_ver self.deckhandclient = deckhandclient self.deckhand_client_read_timeout = deckhand_client_read_timeout self.deckhand_svc_endpoint = deckhand_svc_endpoint self.deckhand_svc_type = deckhand_svc_type - self.main_dag_name = main_dag_name self.revision_id = revision_id - self.shipyard_conf = shipyard_conf - self.sub_dag_name = sub_dag_name self.svc_session = svc_session self.svc_token = svc_token self.validation_read_timeout = validation_read_timeout - self.xcom_push_flag = xcom_push - - def execute(self, context): - - # Execute deckhand base function - self.deckhand_base(context) - - # Exeute child function - self.do_execute() - - # Push last committed version to xcom for the - # 'get_design_version' subdag - if self.sub_dag_name == 'get_design_version': - return self.committed_ver @shipyard_service_token - def deckhand_base(self, context): + def run_base(self, context): # Read and parse shiyard.conf config = configparser.ConfigParser() @@ -112,26 +91,19 @@ class DeckhandBaseOperator(BaseOperator): self.validation_read_timeout = int(config.get( 'requests_config', 'validation_read_timeout')) - # Define task_instance - task_instance = context['task_instance'] - - # Set up and retrieve values from xcom - self.xcom_puller = XcomPuller(self.main_dag_name, task_instance) - self.action_info = self.xcom_puller.get_action_info() - # Logs uuid of Shipyard action - logging.info("Executing Shipyard Action %s", - self.action_info['id']) + LOG.info("Executing Shipyard Action %s", + self.action_info['id']) # Retrieve Endpoint Information self.deckhand_svc_endpoint = ucp_service_endpoint( self, svc_type=self.deckhand_svc_type) - logging.info("Deckhand endpoint is %s", - self.deckhand_svc_endpoint) + LOG.info("Deckhand endpoint is %s", + self.deckhand_svc_endpoint) # Set up DeckHand Client - logging.info("Setting up DeckHand Client...") + LOG.info("Setting up DeckHand Client...") # NOTE: The communication between the Airflow workers # and Deckhand happens via the 'internal' endpoint. @@ -155,7 +127,7 @@ class DeckhandBaseOperator(BaseOperator): self.revision_id = self.xcom_puller.get_design_version() if self.revision_id: - logging.info("Revision ID is %d", self.revision_id) + LOG.info("Revision ID is %d", self.revision_id) else: raise AirflowException('Failed to retrieve Revision ID!') diff --git a/shipyard_airflow/plugins/deckhand_get_design.py b/shipyard_airflow/plugins/deckhand_get_design.py index 329af20e..b7800cc8 100644 --- a/shipyard_airflow/plugins/deckhand_get_design.py +++ b/shipyard_airflow/plugins/deckhand_get_design.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging import os import requests @@ -22,6 +21,8 @@ from airflow.exceptions import AirflowException from deckhand_base_operator import DeckhandBaseOperator +LOG = logging.getLogger(__name__) + class DeckhandGetDesignOperator(DeckhandBaseOperator): @@ -42,7 +43,7 @@ class DeckhandGetDesignOperator(DeckhandBaseOperator): 'revisions') # Retrieve Revision - logging.info("Retrieving revisions information...") + LOG.info("Retrieving revisions information...") try: query_params = {'tag': 'committed', 'sort': 'id', 'order': 'desc'} @@ -53,12 +54,15 @@ class DeckhandGetDesignOperator(DeckhandBaseOperator): timeout=self.deckhand_client_read_timeout).text) except requests.exceptions.RequestException as e: + # Dump logs from Deckhand pods + self.get_k8s_logs() + raise AirflowException(e) # Print info about revisions from DeckHand - logging.info("Revisions response: %s", revisions) - logging.info("The number of committed revisions is %s", - revisions['count']) + LOG.info("Revisions response: %s", revisions) + LOG.info("The number of committed revisions is %s", + revisions['count']) # Search for the latest committed version and save it as xcom. # Since the order : desc paramater above, this is index 0 if there @@ -66,10 +70,13 @@ class DeckhandGetDesignOperator(DeckhandBaseOperator): revision_list = revisions.get('results', []) if revision_list: self.committed_ver = revision_list[0].get('id') - logging.info("Latest committed revision is %d", self.committed_ver) + LOG.info("Latest committed revision is %d", self.committed_ver) # Error if we cannot resolve the committed version to use. if not self.committed_ver: + # Dump logs from Deckhand pods + self.get_k8s_logs() + raise AirflowException("No committed revision found in Deckhand!") diff --git a/shipyard_airflow/plugins/deckhand_retrieve_rendered_doc.py b/shipyard_airflow/plugins/deckhand_retrieve_rendered_doc.py index 0afd9f59..cffe3a37 100644 --- a/shipyard_airflow/plugins/deckhand_retrieve_rendered_doc.py +++ b/shipyard_airflow/plugins/deckhand_retrieve_rendered_doc.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging from airflow.plugins_manager import AirflowPlugin @@ -19,6 +18,8 @@ from airflow.exceptions import AirflowException from deckhand_base_operator import DeckhandBaseOperator +LOG = logging.getLogger(__name__) + class DeckhandRetrieveRenderedDocOperator(DeckhandBaseOperator): @@ -30,17 +31,19 @@ class DeckhandRetrieveRenderedDocOperator(DeckhandBaseOperator): def do_execute(self): - logging.info("Retrieving Rendered Document...") + LOG.info("Retrieving Rendered Document...") # Retrieve Rendered Document try: - rendered_doc = self.deckhandclient.revisions.documents( + self.deckhandclient.revisions.documents( self.revision_id, rendered=True) - logging.info("Successfully Retrieved Rendered Document") - logging.info(rendered_doc) + LOG.info("Successfully Retrieved Rendered Document") except: + # Dump logs from Deckhand pods + self.get_k8s_logs() + raise AirflowException("Failed to Retrieve Rendered Document!") diff --git a/shipyard_airflow/plugins/deckhand_validate_site.py b/shipyard_airflow/plugins/deckhand_validate_site.py index 8b3a43a4..cc77c731 100644 --- a/shipyard_airflow/plugins/deckhand_validate_site.py +++ b/shipyard_airflow/plugins/deckhand_validate_site.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging import os import requests @@ -22,6 +21,8 @@ from airflow.exceptions import AirflowException from deckhand_base_operator import DeckhandBaseOperator +LOG = logging.getLogger(__name__) + class DeckhandValidateSiteDesignOperator(DeckhandBaseOperator): @@ -43,7 +44,7 @@ class DeckhandValidateSiteDesignOperator(DeckhandBaseOperator): str(self.revision_id), 'validations') # Retrieve Validation list - logging.info("Retrieving validation list...") + LOG.info("Retrieving validation list...") try: retrieved_list = yaml.safe_load( @@ -52,14 +53,20 @@ class DeckhandValidateSiteDesignOperator(DeckhandBaseOperator): timeout=self.validation_read_timeout).text) except requests.exceptions.RequestException as e: + # Dump logs from Deckhand pods + self.get_k8s_logs() + raise AirflowException(e) if (any([str(v.get('status', 'unspecified')).lower() == 'failure' for v in retrieved_list.get('results', [])])): + # Dump logs from Deckhand pods + self.get_k8s_logs() + raise AirflowException("DeckHand Site Design Validation Failed!") else: - logging.info("Revision %d has been successfully validated", - self.revision_id) + LOG.info("Revision %d has been successfully validated", + self.revision_id) class DeckhandValidateSiteDesignOperatorPlugin(AirflowPlugin): diff --git a/shipyard_airflow/plugins/promenade_base_operator.py b/shipyard_airflow/plugins/promenade_base_operator.py index 1389f9e1..e4987acd 100644 --- a/shipyard_airflow/plugins/promenade_base_operator.py +++ b/shipyard_airflow/plugins/promenade_base_operator.py @@ -11,20 +11,20 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging -from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults from airflow.plugins_manager import AirflowPlugin from airflow.exceptions import AirflowException from service_endpoint import ucp_service_endpoint from service_token import shipyard_service_token -from xcom_puller import XcomPuller +from ucp_base_operator import UcpBaseOperator + +LOG = logging.getLogger(__name__) -class PromenadeBaseOperator(BaseOperator): +class PromenadeBaseOperator(UcpBaseOperator): """Promenade Base Operator @@ -35,59 +35,36 @@ class PromenadeBaseOperator(BaseOperator): @apply_defaults def __init__(self, - main_dag_name=None, promenade_svc_endpoint=None, promenade_svc_type='kubernetesprovisioner', redeploy_server=None, - shipyard_conf=None, - sub_dag_name=None, svc_token=None, - xcom_push=True, *args, **kwargs): """Initialization of PromenadeBaseOperator object. - :param main_dag_name: Parent Dag :param promenade_svc_endpoint: Promenade Service Endpoint :param promenade_svc_type: Promenade Service Type :param redeploy_server: Server to be redeployed - :param shipyard_conf: Path of shipyard.conf - :param sub_dag_name: Child Dag :param svc_token: Keystone Token - :param xcom_push: xcom usage The Drydock operator assumes that prior steps have set xcoms for the action and the deployment configuration """ - super(PromenadeBaseOperator, self).__init__(*args, - **kwargs) - self.main_dag_name = main_dag_name + super(PromenadeBaseOperator, + self).__init__( + pod_selector_pattern=[{'pod_pattern': 'promenade-api', + 'container': 'promenade-api'}], + *args, **kwargs) self.promenade_svc_endpoint = promenade_svc_endpoint self.promenade_svc_type = promenade_svc_type self.redeploy_server = redeploy_server - self.shipyard_conf = shipyard_conf - self.sub_dag_name = sub_dag_name self.svc_token = svc_token - self.xcom_push_flag = xcom_push - - def execute(self, context): - # Execute promenade base function - self.promenade_base(context) - - # Exeute child function - self.do_execute() @shipyard_service_token - def promenade_base(self, context): - # Define task_instance - task_instance = context['task_instance'] - - # Set up and retrieve values from xcom - self.xcom_puller = XcomPuller(self.main_dag_name, task_instance) - self.action_info = self.xcom_puller.get_action_info() - self.dc = self.xcom_puller.get_deployment_configuration() + def run_base(self, context): # Logs uuid of Shipyard action - logging.info("Executing Shipyard Action %s", self.action_info['id']) + LOG.info("Executing Shipyard Action %s", self.action_info['id']) # Retrieve information of the server that we want to redeploy # if user executes the 'redeploy_server' dag @@ -96,8 +73,7 @@ class PromenadeBaseOperator(BaseOperator): 'server-name') if self.redeploy_server: - logging.info("Server to be redeployed is %s", - self.redeploy_server) + LOG.info("Server to be redeployed is %s", self.redeploy_server) else: raise AirflowException('%s was unable to retrieve the ' 'server to be redeployed.' @@ -107,8 +83,7 @@ class PromenadeBaseOperator(BaseOperator): self.promenade_svc_endpoint = ucp_service_endpoint( self, svc_type=self.promenade_svc_type) - logging.info("Promenade endpoint is %s", - self.promenade_svc_endpoint) + LOG.info("Promenade endpoint is %s", self.promenade_svc_endpoint) class PromenadeBaseOperatorPlugin(AirflowPlugin): diff --git a/shipyard_airflow/plugins/promenade_check_etcd.py b/shipyard_airflow/plugins/promenade_check_etcd.py index f0a406ac..944e7f2b 100644 --- a/shipyard_airflow/plugins/promenade_check_etcd.py +++ b/shipyard_airflow/plugins/promenade_check_etcd.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging import time @@ -20,6 +19,8 @@ from airflow.exceptions import AirflowException from promenade_base_operator import PromenadeBaseOperator +LOG = logging.getLogger(__name__) + class PromenadeCheckEtcdOperator(PromenadeBaseOperator): @@ -37,13 +38,13 @@ class PromenadeCheckEtcdOperator(PromenadeBaseOperator): # TODO(bryan-strassner) use: # self.dc['kubernetes_provisioner.etcd_ready_timeout'] # self.dc['kubernetes_provisioner.remove_etcd_timeout'] - logging.info("Performing health check on etcd...") + LOG.info("Performing health check on etcd...") time.sleep(5) check_etcd = True if check_etcd: - logging.info("The etcd cluster is healthy and ready") + LOG.info("The etcd cluster is healthy and ready") else: raise AirflowException('Please check the state of etcd!') diff --git a/shipyard_airflow/plugins/promenade_clear_labels.py b/shipyard_airflow/plugins/promenade_clear_labels.py index a78a9f8d..1aee2669 100644 --- a/shipyard_airflow/plugins/promenade_clear_labels.py +++ b/shipyard_airflow/plugins/promenade_clear_labels.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging import time @@ -20,6 +19,8 @@ from airflow.exceptions import AirflowException from promenade_base_operator import PromenadeBaseOperator +LOG = logging.getLogger(__name__) + class PromenadeClearLabelsOperator(PromenadeBaseOperator): @@ -37,14 +38,14 @@ class PromenadeClearLabelsOperator(PromenadeBaseOperator): # TODO(bryan-strassner) use: # self.dc['kubernetes_provisioner.clear_labels_timeout'] - logging.info("Removing labels on node...") + LOG.info("Removing labels on node...") time.sleep(5) labels_removed = True if labels_removed: - logging.info("Successfully removed labels on %s", - self.redeploy_server) + LOG.info("Successfully removed labels on %s", + self.redeploy_server) else: raise AirflowException('Failed to remove labels on %s!', self.redeploy_server) diff --git a/shipyard_airflow/plugins/promenade_decommission_node.py b/shipyard_airflow/plugins/promenade_decommission_node.py index 74664b31..8dc83172 100644 --- a/shipyard_airflow/plugins/promenade_decommission_node.py +++ b/shipyard_airflow/plugins/promenade_decommission_node.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging import time @@ -20,6 +19,8 @@ from airflow.exceptions import AirflowException from promenade_base_operator import PromenadeBaseOperator +LOG = logging.getLogger(__name__) + class PromenadeDecommissionNodeOperator(PromenadeBaseOperator): @@ -33,14 +34,14 @@ class PromenadeDecommissionNodeOperator(PromenadeBaseOperator): def do_execute(self): # Placeholder function. Updates will be made when the Promenade # API is ready for consumption. - logging.info("Decommissioning node from Kubernetes cluster...") + LOG.info("Decommissioning node from Kubernetes cluster...") time.sleep(5) decommission_node = True if decommission_node: - logging.info("Succesfully decommissioned node %s", - self.redeploy_server) + LOG.info("Succesfully decommissioned node %s", + self.redeploy_server) else: raise AirflowException('Failed to decommission node %s!', self.redeploy_server) diff --git a/shipyard_airflow/plugins/promenade_drain_node.py b/shipyard_airflow/plugins/promenade_drain_node.py index 1822bdf7..47ced5ee 100644 --- a/shipyard_airflow/plugins/promenade_drain_node.py +++ b/shipyard_airflow/plugins/promenade_drain_node.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging import time @@ -20,6 +19,8 @@ from airflow.exceptions import AirflowException from promenade_base_operator import PromenadeBaseOperator +LOG = logging.getLogger(__name__) + class PromenadeDrainNodeOperator(PromenadeBaseOperator): @@ -40,14 +41,14 @@ class PromenadeDrainNodeOperator(PromenadeBaseOperator): # self.dc['kubernetes_provisioner.drain_timeout'] # self.dc['kubernetes_provisioner.drain_grace_period'] - logging.info("Draining node...") + LOG.info("Draining node...") time.sleep(5) node_drained = True if node_drained: - logging.info("Node %s has been successfully drained", - self.redeploy_server) + LOG.info("Node %s has been successfully drained", + self.redeploy_server) else: raise AirflowException('Failed to drain %s!', self.redeploy_server) diff --git a/shipyard_airflow/plugins/promenade_shutdown_kubelet.py b/shipyard_airflow/plugins/promenade_shutdown_kubelet.py index 774c731c..0a880f59 100644 --- a/shipyard_airflow/plugins/promenade_shutdown_kubelet.py +++ b/shipyard_airflow/plugins/promenade_shutdown_kubelet.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging import time @@ -20,6 +19,8 @@ from airflow.exceptions import AirflowException from promenade_base_operator import PromenadeBaseOperator +LOG = logging.getLogger(__name__) + class PromenadeShutdownKubeletOperator(PromenadeBaseOperator): @@ -33,14 +34,14 @@ class PromenadeShutdownKubeletOperator(PromenadeBaseOperator): def do_execute(self): # Placeholder function. Updates will be made when the Promenade # API is ready for consumption. - logging.info("Shutting down kubelet on node...") + LOG.info("Shutting down kubelet on node...") time.sleep(5) shutdown_kubelet = True if shutdown_kubelet: - logging.info("Successfully shut down kubelet on %s", - self.redeploy_server) + LOG.info("Successfully shut down kubelet on %s", + self.redeploy_server) else: raise AirflowException('Failed to shut down kubelet on %s!', self.redeploy_server) diff --git a/shipyard_airflow/plugins/ucp_base_operator.py b/shipyard_airflow/plugins/ucp_base_operator.py index 2a1a35a4..368f60db 100644 --- a/shipyard_airflow/plugins/ucp_base_operator.py +++ b/shipyard_airflow/plugins/ucp_base_operator.py @@ -86,6 +86,11 @@ class UcpBaseOperator(BaseOperator): # Exeute child function self.do_execute() + # Push last committed version to xcom for the + # 'get_design_version' subdag + if self.sub_dag_name == 'get_design_version': + return self.committed_ver + def ucp_base(self, context): LOG.info("Running UCP Base Operator...") @@ -98,10 +103,10 @@ class UcpBaseOperator(BaseOperator): self.ucp_namespace = config.get('k8s_logs', 'ucp_namespace') # Define task_instance - task_instance = context['task_instance'] + self.task_instance = context['task_instance'] # Set up and retrieve values from xcom - self.xcom_puller = XcomPuller(self.main_dag_name, task_instance) + self.xcom_puller = XcomPuller(self.main_dag_name, self.task_instance) self.action_info = self.xcom_puller.get_action_info() self.dc = self.xcom_puller.get_deployment_configuration()