diff --git a/charts/shipyard/templates/statefulset-airflow-worker.yaml b/charts/shipyard/templates/statefulset-airflow-worker.yaml index e78dcbd0..7033d294 100644 --- a/charts/shipyard/templates/statefulset-airflow-worker.yaml +++ b/charts/shipyard/templates/statefulset-airflow-worker.yaml @@ -33,6 +33,7 @@ rules: - endpoints - pods verbs: + - delete - get - list - watch @@ -57,6 +58,14 @@ metadata: spec: serviceName: {{ tuple "airflow_worker" "discovery" $envAll | include "helm-toolkit.endpoints.hostname_short_endpoint_lookup" }} podManagementPolicy: "Parallel" + # NOTE: We are using 'OnDelete' strategy instead of 'RollingUpdate' + # so that the upgrade of airflow worker will only start after the + # completion of the 'update_site' workflow (the worker pods will get + # deleted by the workflow at the very end, after everything is completed). + # This will ensure availability of airflow worker during update/upgrade + # and prevent any disruption to the workflow. + updateStrategy: + type: OnDelete replicas: {{ .Values.pod.replicas.airflow.worker }} template: metadata: diff --git a/images/airflow/Dockerfile b/images/airflow/Dockerfile index 0d88db65..7295ff79 100644 --- a/images/airflow/Dockerfile +++ b/images/airflow/Dockerfile @@ -93,10 +93,10 @@ RUN curl -L -o /usr/local/bin/kubectl \ COPY script/entrypoint.sh ${AIRFLOW_HOME}/entrypoint.sh COPY script/airflow_start_service.sh ${AIRFLOW_HOME}/airflow_start_service.sh COPY script/airflow_logrotate.sh ${AIRFLOW_HOME}/airflow_logrotate.sh +COPY script/upgrade_airflow_worker.sh ${AIRFLOW_HOME}/upgrade_airflow_worker.sh # Change permissions -RUN chown -R airflow: ${AIRFLOW_HOME} \ - && chmod +x ${AIRFLOW_HOME}/entrypoint.sh +RUN chown -R airflow: ${AIRFLOW_HOME} # Set work directory USER airflow diff --git a/images/airflow/script/airflow_logrotate.sh b/images/airflow/script/airflow_logrotate.sh old mode 100644 new mode 100755 index eda69d8d..fee7f1f0 --- a/images/airflow/script/airflow_logrotate.sh +++ b/images/airflow/script/airflow_logrotate.sh @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -set -x +set -ex while true; do diff --git a/images/airflow/script/airflow_start_service.sh b/images/airflow/script/airflow_start_service.sh old mode 100644 new mode 100755 diff --git a/images/airflow/script/entrypoint.sh b/images/airflow/script/entrypoint.sh old mode 100644 new mode 100755 diff --git a/images/airflow/script/upgrade_airflow_worker.sh b/images/airflow/script/upgrade_airflow_worker.sh new file mode 100755 index 00000000..cde1238c --- /dev/null +++ b/images/airflow/script/upgrade_airflow_worker.sh @@ -0,0 +1,98 @@ +#!/bin/bash +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -ex + +# NOTE: We are directing all the output of the script to /dev/null in order +# to complete the workflow. Hence it will be useful to create a log file to +# track the progress of the script for troubleshooting purpose. +check_timeout_counter() { + + # Check total elapsed time + # The default time out is set to 5 minutes + if [[ $counter -ge $max_count ]]; then + echo -e "Update Site Workflow Status Check Timed Out!" >> /usr/local/airflow/upgrade_airflow_worker.log + return 1 + fi +} + +# Define Variables +# +# Allow user to optionally pass in custom query time and max count as $4 +# and $5 respectively +# Default query time is 30 seconds +# Default max_count for 5 minutes time out is 60*5/30 = 10 +# +# NOTE: Dag ID will take value of $1 +# +# NOTE: $2 will look like '2018-03-13' while $3 will look like '05:10:19' +# The execution date that we need to pass into the Airflow CLI will need +# to take the form of '2018-03-13T05:10:19'. Hence we will need to concatenate +# $2 and $3 together to form the dag_execution_date. +# +dag_id=$1 +dag_execution_date="$2T$3" +query_time=${4:-30} +max_count=${5:-10} + +# Initialize dag_state to "running" state +# Dag can be in "running", "success", "failed", "skipped" or "up for retry" state +dag_state="running" + +# Initialize counter to 1 +counter=1 + +echo -e "Checking Dag State..." +while true; +do + # Set current working directory to be the directory where the shell script + # is located. In this way we will be able to import the modules that are + # required for our custom Operators. + cd "${0%/*}" + + # Get current state of dag using Airflow CLI + check_dag_state=`airflow dag_state ${dag_id} ${dag_execution_date}` + echo -e ${check_dag_state} >> /usr/local/airflow/upgrade_airflow_worker.log + + # We will need to extract the last word in the 'check_dag_state' + # string variable as that will contain the status of the dag run + dag_state=`echo ${check_dag_state} | awk '{print $NF}'` + echo -e ${dag_state} >> /usr/local/airflow/upgrade_airflow_worker.log + + if [[ $dag_state == "success" ]]; then + echo -e "\nWorkflow has completed" >> /usr/local/airflow/upgrade_airflow_worker.log + echo -e "\n" >> /usr/local/airflow/upgrade_airflow_worker.log + echo -e "Proceeding to upgrade Airflow Worker..." >> /usr/local/airflow/upgrade_airflow_worker.log + echo -e "Deleting Airflow Worker Pods..." >> /usr/local/airflow/upgrade_airflow_worker.log + + for i in `kubectl get pods -n ucp | grep -i airflow-worker | awk '{print $1}'`; do + # Delete Airflow Worker pod so that they will respawn with the new + # configurations and/or images + kubectl delete pod $i -n ucp + done + + echo -e "Airflow Worker Pods Deleted!" >> /usr/local/airflow/upgrade_airflow_worker.log + + return 0 + fi + + echo -e "Workflow is in" $dag_state "state\n" >> /usr/local/airflow/upgrade_airflow_worker.log + echo -e "Back Off for $query_time seconds...\n" >> /usr/local/airflow/upgrade_airflow_worker.log + sleep $query_time + + # Step counter and check the timeout counter + ((counter++)) + check_timeout_counter +done diff --git a/shipyard_airflow/dags/common_step_factory.py b/shipyard_airflow/dags/common_step_factory.py index 5fab959f..b192b0db 100644 --- a/shipyard_airflow/dags/common_step_factory.py +++ b/shipyard_airflow/dags/common_step_factory.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. from airflow.operators import ConcurrencyCheckOperator +from airflow.operators.bash_operator import BashOperator +from airflow.operators.python_operator import BranchPythonOperator from airflow.operators.python_operator import PythonOperator from airflow.operators.subdag_operator import SubDagOperator @@ -173,3 +175,69 @@ class CommonStepFactory(object): task_id=task_id, on_failure_callback=step_failure_handler, dag=self.dag) + + def get_decide_airflow_upgrade(self, task_id=dn.DECIDE_AIRFLOW_UPGRADE): + """Generate the decide_airflow_upgrade step + + Step responsible for deciding whether to branch to the path + to upgrade airflow worker + """ + def upgrade_airflow_check(**kwargs): + """upgrade_airflow_check function + + Defines a function to decide whether to upgrade airflow + worker. The decision will be based on the xcom value that + is retrieved from the 'armada_apply' task + """ + # DAG ID will be parent + subdag name + dag_id = self.parent_dag_name + '.' + dn.ARMADA_BUILD_DAG_NAME + + # Check if Shipyard/Airflow were upgraded by the workflow + upgrade_airflow = kwargs['ti'].xcom_pull( + key='upgrade_airflow_worker', + task_ids='armada_apply', + dag_id=dag_id) + + # Go to the branch to upgrade Airflow worker if the Shipyard + # chart were upgraded/modified + if upgrade_airflow == "true": + return "upgrade_airflow" + else: + return "skip_upgrade_airflow" + + return BranchPythonOperator(task_id=task_id, + python_callable=upgrade_airflow_check, + trigger_rule="all_done", + dag=self.dag) + + def get_upgrade_airflow(self, task_id=dn.UPGRADE_AIRFLOW): + """Generate the upgrade_airflow step + + Step responsible for upgrading airflow worker. Step will + execute the upgrade script in the background and direct + output to null so that 'nohup.out' will not be created. + Note that this is done intentionally so that the upgrade + of airflow worker will only start after the completion of + the 'update_site' workflow. This will ensure availability + of airflow worker during update/upgrade and prevent any + disruption to the workflow. Note that dag_id and execution + date are required for proper execution of the script. + """ + return BashOperator(task_id=task_id, + bash_command=( + "nohup " + "/usr/local/airflow/upgrade_airflow_worker.sh " + "{{ ti.dag_id }} {{ ti.execution_date }} " + ">/dev/null 2>&1 &"), + dag=self.dag) + + def get_skip_upgrade_airflow(self, task_id=dn.SKIP_UPGRADE_AIRFLOW): + """Generate the skip_upgrade_airflow step + + Step will print a message stating that we do not need to + upgrade the airflow worker + """ + return BashOperator(task_id=task_id, + bash_command=( + "echo 'Airflow Worker Upgrade Not Required'"), + dag=self.dag) diff --git a/shipyard_airflow/dags/dag_names.py b/shipyard_airflow/dags/dag_names.py index b1bcad88..1e48e3c7 100644 --- a/shipyard_airflow/dags/dag_names.py +++ b/shipyard_airflow/dags/dag_names.py @@ -24,3 +24,6 @@ DESTROY_SERVER_DAG_NAME = 'destroy_server' # Steps ACTION_XCOM = 'action_xcom' +DECIDE_AIRFLOW_UPGRADE = 'decide_airflow_upgrade' +UPGRADE_AIRFLOW = 'upgrade_airflow' +SKIP_UPGRADE_AIRFLOW = 'skip_upgrade_airflow' diff --git a/shipyard_airflow/dags/update_site.py b/shipyard_airflow/dags/update_site.py index 13dad988..33764fcd 100644 --- a/shipyard_airflow/dags/update_site.py +++ b/shipyard_airflow/dags/update_site.py @@ -55,6 +55,9 @@ validate_site_design = step_factory.get_validate_site_design() deployment_configuration = step_factory.get_deployment_configuration() drydock_build = step_factory.get_drydock_build() armada_build = step_factory.get_armada_build() +decide_airflow_upgrade = step_factory.get_decide_airflow_upgrade() +upgrade_airflow = step_factory.get_upgrade_airflow() +skip_upgrade_airflow = step_factory.get_skip_upgrade_airflow() # DAG Wiring concurrency_check.set_upstream(action_xcom) @@ -66,3 +69,6 @@ drydock_build.set_upstream([ deployment_configuration ]) armada_build.set_upstream(drydock_build) +decide_airflow_upgrade.set_upstream(armada_build) +decide_airflow_upgrade.set_downstream(upgrade_airflow) +decide_airflow_upgrade.set_downstream(skip_upgrade_airflow) diff --git a/shipyard_airflow/plugins/armada_operator.py b/shipyard_airflow/plugins/armada_operator.py index fc0b94c8..1033de2e 100644 --- a/shipyard_airflow/plugins/armada_operator.py +++ b/shipyard_airflow/plugins/armada_operator.py @@ -1,4 +1,4 @@ -# Copyright 2017 AT&T Intellectual Property. All other rights reserved. +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -29,6 +29,7 @@ from get_k8s_pod_port_ip import get_pod_port_ip from service_endpoint import ucp_service_endpoint from service_token import shipyard_service_token from xcom_puller import XcomPuller +from xcom_pusher import XcomPusher class ArmadaOperator(BaseOperator): @@ -39,7 +40,7 @@ class ArmadaOperator(BaseOperator): :param shipyard_conf: Location of shipyard.conf :param sub_dag_name: Child Dag - The Drydock operator assumes that prior steps have set xcoms for + The Armada operator assumes that prior steps have set xcoms for the action and the deployment configuration """ @@ -218,6 +219,8 @@ class ArmadaOperator(BaseOperator): armada_post_apply = {} override_values = [] chart_set = [] + upgrade_airflow_worker = False + # enhance the context's query entity with target_manifest query = context.get('query', {}) query['target_manifest'] = target_manifest @@ -230,11 +233,34 @@ class ArmadaOperator(BaseOperator): set=chart_set, query=query) + # Search for Shipyard deployment in the list of chart upgrades + # NOTE: It is possible for the chart name to take on different + # values, e.g. 'aic-ucp-shipyard', 'ucp-shipyard'. Hence we + # will search for the word 'shipyard', which should exist as + # part of the name of the Shipyard Helm Chart. + for i in armada_post_apply['message']['upgrade']: + if 'shipyard' in i: + upgrade_airflow_worker = True + break + + # Create xcom key 'upgrade_airflow_worker' + # Value of key will depend on whether an upgrade has been + # performed on the Shipyard/Airflow Chart + self.xcom_pusher = XcomPusher(context['task_instance']) + + if upgrade_airflow_worker: + self.xcom_pusher.xcom_push(key='upgrade_airflow_worker', + value='true') + else: + self.xcom_pusher.xcom_push(key='upgrade_airflow_worker', + value='false') + # We will expect Armada to return the releases that it is # deploying. Note that if we try and deploy the same release # twice, we will end up with empty response as nothing has # changed. - if armada_post_apply['message']['install']: + if (armada_post_apply['message']['install'] or + armada_post_apply['message']['upgrade']): logging.info("Armada Apply Successfully Executed") logging.info(armada_post_apply) else: diff --git a/shipyard_airflow/plugins/xcom_pusher.py b/shipyard_airflow/plugins/xcom_pusher.py new file mode 100644 index 00000000..4aed9814 --- /dev/null +++ b/shipyard_airflow/plugins/xcom_pusher.py @@ -0,0 +1,42 @@ +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging + +from airflow.exceptions import AirflowException + +LOG = logging.getLogger(__name__) + + +class XcomPusher(object): + """XcomPusher pushes xcom value + + Create specific key with value and stores as xcom. + """ + + def __init__(self, task_instance): + self.ti = task_instance + + def xcom_push(self, key=None, value=None): + """Push a particular xcom value""" + + LOG.info("Pushing xcom from %s.%s with key %s and value %s", + self.ti.dag_id, + self.ti.task_id, + key, + value) + + try: + self.ti.xcom_push(key=key, value=value) + except: + raise AirflowException("Xcom push failed!")