From 72195191356165ec9e1fad9198b3c7ca38508472 Mon Sep 17 00:00:00 2001 From: Anthony Lin Date: Mon, 12 Mar 2018 09:49:33 +0000 Subject: [PATCH] Add Airflow Worker Upgrade Workflow MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This patch set is meant to create a workflow that will allow us to upgrade the airflow worker without causing disruption to the current running workflow. Note that we will set the update strategy for airflow worker to 'OnDelete'. The 'OnDelete' update strategy implements the legacy (1.6 and prior) behavior. When we select this update strategy, the statefulSet controller will not automatically update Pods when a modification is made to the StatefulSet’s '.spec.template field'. This strategy can be selected by setting the '.spec.template.updateStrategy.type' to 'OnDelete'. Refer to [0] for more information. [0] https://kubernetes.io/docs/tutorials/stateful-application/basic-stateful-set/#creating-a-statefulset Change-Id: I1f6c3564b7fba6abe422b86e36818eb2cd3454ea --- .../templates/statefulset-airflow-worker.yaml | 9 ++ images/airflow/Dockerfile | 4 +- images/airflow/script/airflow_logrotate.sh | 2 +- .../airflow/script/airflow_start_service.sh | 0 images/airflow/script/entrypoint.sh | 0 .../airflow/script/upgrade_airflow_worker.sh | 98 +++++++++++++++++++ shipyard_airflow/dags/common_step_factory.py | 68 +++++++++++++ shipyard_airflow/dags/dag_names.py | 3 + shipyard_airflow/dags/update_site.py | 6 ++ shipyard_airflow/plugins/armada_operator.py | 32 +++++- shipyard_airflow/plugins/xcom_pusher.py | 42 ++++++++ 11 files changed, 258 insertions(+), 6 deletions(-) mode change 100644 => 100755 images/airflow/script/airflow_logrotate.sh mode change 100644 => 100755 images/airflow/script/airflow_start_service.sh mode change 100644 => 100755 images/airflow/script/entrypoint.sh create mode 100755 images/airflow/script/upgrade_airflow_worker.sh create mode 100644 shipyard_airflow/plugins/xcom_pusher.py diff --git a/charts/shipyard/templates/statefulset-airflow-worker.yaml b/charts/shipyard/templates/statefulset-airflow-worker.yaml index e78dcbd0..7033d294 100644 --- a/charts/shipyard/templates/statefulset-airflow-worker.yaml +++ b/charts/shipyard/templates/statefulset-airflow-worker.yaml @@ -33,6 +33,7 @@ rules: - endpoints - pods verbs: + - delete - get - list - watch @@ -57,6 +58,14 @@ metadata: spec: serviceName: {{ tuple "airflow_worker" "discovery" $envAll | include "helm-toolkit.endpoints.hostname_short_endpoint_lookup" }} podManagementPolicy: "Parallel" + # NOTE: We are using 'OnDelete' strategy instead of 'RollingUpdate' + # so that the upgrade of airflow worker will only start after the + # completion of the 'update_site' workflow (the worker pods will get + # deleted by the workflow at the very end, after everything is completed). + # This will ensure availability of airflow worker during update/upgrade + # and prevent any disruption to the workflow. + updateStrategy: + type: OnDelete replicas: {{ .Values.pod.replicas.airflow.worker }} template: metadata: diff --git a/images/airflow/Dockerfile b/images/airflow/Dockerfile index 0d88db65..7295ff79 100644 --- a/images/airflow/Dockerfile +++ b/images/airflow/Dockerfile @@ -93,10 +93,10 @@ RUN curl -L -o /usr/local/bin/kubectl \ COPY script/entrypoint.sh ${AIRFLOW_HOME}/entrypoint.sh COPY script/airflow_start_service.sh ${AIRFLOW_HOME}/airflow_start_service.sh COPY script/airflow_logrotate.sh ${AIRFLOW_HOME}/airflow_logrotate.sh +COPY script/upgrade_airflow_worker.sh ${AIRFLOW_HOME}/upgrade_airflow_worker.sh # Change permissions -RUN chown -R airflow: ${AIRFLOW_HOME} \ - && chmod +x ${AIRFLOW_HOME}/entrypoint.sh +RUN chown -R airflow: ${AIRFLOW_HOME} # Set work directory USER airflow diff --git a/images/airflow/script/airflow_logrotate.sh b/images/airflow/script/airflow_logrotate.sh old mode 100644 new mode 100755 index eda69d8d..fee7f1f0 --- a/images/airflow/script/airflow_logrotate.sh +++ b/images/airflow/script/airflow_logrotate.sh @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -set -x +set -ex while true; do diff --git a/images/airflow/script/airflow_start_service.sh b/images/airflow/script/airflow_start_service.sh old mode 100644 new mode 100755 diff --git a/images/airflow/script/entrypoint.sh b/images/airflow/script/entrypoint.sh old mode 100644 new mode 100755 diff --git a/images/airflow/script/upgrade_airflow_worker.sh b/images/airflow/script/upgrade_airflow_worker.sh new file mode 100755 index 00000000..cde1238c --- /dev/null +++ b/images/airflow/script/upgrade_airflow_worker.sh @@ -0,0 +1,98 @@ +#!/bin/bash +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -ex + +# NOTE: We are directing all the output of the script to /dev/null in order +# to complete the workflow. Hence it will be useful to create a log file to +# track the progress of the script for troubleshooting purpose. +check_timeout_counter() { + + # Check total elapsed time + # The default time out is set to 5 minutes + if [[ $counter -ge $max_count ]]; then + echo -e "Update Site Workflow Status Check Timed Out!" >> /usr/local/airflow/upgrade_airflow_worker.log + return 1 + fi +} + +# Define Variables +# +# Allow user to optionally pass in custom query time and max count as $4 +# and $5 respectively +# Default query time is 30 seconds +# Default max_count for 5 minutes time out is 60*5/30 = 10 +# +# NOTE: Dag ID will take value of $1 +# +# NOTE: $2 will look like '2018-03-13' while $3 will look like '05:10:19' +# The execution date that we need to pass into the Airflow CLI will need +# to take the form of '2018-03-13T05:10:19'. Hence we will need to concatenate +# $2 and $3 together to form the dag_execution_date. +# +dag_id=$1 +dag_execution_date="$2T$3" +query_time=${4:-30} +max_count=${5:-10} + +# Initialize dag_state to "running" state +# Dag can be in "running", "success", "failed", "skipped" or "up for retry" state +dag_state="running" + +# Initialize counter to 1 +counter=1 + +echo -e "Checking Dag State..." +while true; +do + # Set current working directory to be the directory where the shell script + # is located. In this way we will be able to import the modules that are + # required for our custom Operators. + cd "${0%/*}" + + # Get current state of dag using Airflow CLI + check_dag_state=`airflow dag_state ${dag_id} ${dag_execution_date}` + echo -e ${check_dag_state} >> /usr/local/airflow/upgrade_airflow_worker.log + + # We will need to extract the last word in the 'check_dag_state' + # string variable as that will contain the status of the dag run + dag_state=`echo ${check_dag_state} | awk '{print $NF}'` + echo -e ${dag_state} >> /usr/local/airflow/upgrade_airflow_worker.log + + if [[ $dag_state == "success" ]]; then + echo -e "\nWorkflow has completed" >> /usr/local/airflow/upgrade_airflow_worker.log + echo -e "\n" >> /usr/local/airflow/upgrade_airflow_worker.log + echo -e "Proceeding to upgrade Airflow Worker..." >> /usr/local/airflow/upgrade_airflow_worker.log + echo -e "Deleting Airflow Worker Pods..." >> /usr/local/airflow/upgrade_airflow_worker.log + + for i in `kubectl get pods -n ucp | grep -i airflow-worker | awk '{print $1}'`; do + # Delete Airflow Worker pod so that they will respawn with the new + # configurations and/or images + kubectl delete pod $i -n ucp + done + + echo -e "Airflow Worker Pods Deleted!" >> /usr/local/airflow/upgrade_airflow_worker.log + + return 0 + fi + + echo -e "Workflow is in" $dag_state "state\n" >> /usr/local/airflow/upgrade_airflow_worker.log + echo -e "Back Off for $query_time seconds...\n" >> /usr/local/airflow/upgrade_airflow_worker.log + sleep $query_time + + # Step counter and check the timeout counter + ((counter++)) + check_timeout_counter +done diff --git a/shipyard_airflow/dags/common_step_factory.py b/shipyard_airflow/dags/common_step_factory.py index 5fab959f..b192b0db 100644 --- a/shipyard_airflow/dags/common_step_factory.py +++ b/shipyard_airflow/dags/common_step_factory.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. from airflow.operators import ConcurrencyCheckOperator +from airflow.operators.bash_operator import BashOperator +from airflow.operators.python_operator import BranchPythonOperator from airflow.operators.python_operator import PythonOperator from airflow.operators.subdag_operator import SubDagOperator @@ -173,3 +175,69 @@ class CommonStepFactory(object): task_id=task_id, on_failure_callback=step_failure_handler, dag=self.dag) + + def get_decide_airflow_upgrade(self, task_id=dn.DECIDE_AIRFLOW_UPGRADE): + """Generate the decide_airflow_upgrade step + + Step responsible for deciding whether to branch to the path + to upgrade airflow worker + """ + def upgrade_airflow_check(**kwargs): + """upgrade_airflow_check function + + Defines a function to decide whether to upgrade airflow + worker. The decision will be based on the xcom value that + is retrieved from the 'armada_apply' task + """ + # DAG ID will be parent + subdag name + dag_id = self.parent_dag_name + '.' + dn.ARMADA_BUILD_DAG_NAME + + # Check if Shipyard/Airflow were upgraded by the workflow + upgrade_airflow = kwargs['ti'].xcom_pull( + key='upgrade_airflow_worker', + task_ids='armada_apply', + dag_id=dag_id) + + # Go to the branch to upgrade Airflow worker if the Shipyard + # chart were upgraded/modified + if upgrade_airflow == "true": + return "upgrade_airflow" + else: + return "skip_upgrade_airflow" + + return BranchPythonOperator(task_id=task_id, + python_callable=upgrade_airflow_check, + trigger_rule="all_done", + dag=self.dag) + + def get_upgrade_airflow(self, task_id=dn.UPGRADE_AIRFLOW): + """Generate the upgrade_airflow step + + Step responsible for upgrading airflow worker. Step will + execute the upgrade script in the background and direct + output to null so that 'nohup.out' will not be created. + Note that this is done intentionally so that the upgrade + of airflow worker will only start after the completion of + the 'update_site' workflow. This will ensure availability + of airflow worker during update/upgrade and prevent any + disruption to the workflow. Note that dag_id and execution + date are required for proper execution of the script. + """ + return BashOperator(task_id=task_id, + bash_command=( + "nohup " + "/usr/local/airflow/upgrade_airflow_worker.sh " + "{{ ti.dag_id }} {{ ti.execution_date }} " + ">/dev/null 2>&1 &"), + dag=self.dag) + + def get_skip_upgrade_airflow(self, task_id=dn.SKIP_UPGRADE_AIRFLOW): + """Generate the skip_upgrade_airflow step + + Step will print a message stating that we do not need to + upgrade the airflow worker + """ + return BashOperator(task_id=task_id, + bash_command=( + "echo 'Airflow Worker Upgrade Not Required'"), + dag=self.dag) diff --git a/shipyard_airflow/dags/dag_names.py b/shipyard_airflow/dags/dag_names.py index b1bcad88..1e48e3c7 100644 --- a/shipyard_airflow/dags/dag_names.py +++ b/shipyard_airflow/dags/dag_names.py @@ -24,3 +24,6 @@ DESTROY_SERVER_DAG_NAME = 'destroy_server' # Steps ACTION_XCOM = 'action_xcom' +DECIDE_AIRFLOW_UPGRADE = 'decide_airflow_upgrade' +UPGRADE_AIRFLOW = 'upgrade_airflow' +SKIP_UPGRADE_AIRFLOW = 'skip_upgrade_airflow' diff --git a/shipyard_airflow/dags/update_site.py b/shipyard_airflow/dags/update_site.py index 13dad988..33764fcd 100644 --- a/shipyard_airflow/dags/update_site.py +++ b/shipyard_airflow/dags/update_site.py @@ -55,6 +55,9 @@ validate_site_design = step_factory.get_validate_site_design() deployment_configuration = step_factory.get_deployment_configuration() drydock_build = step_factory.get_drydock_build() armada_build = step_factory.get_armada_build() +decide_airflow_upgrade = step_factory.get_decide_airflow_upgrade() +upgrade_airflow = step_factory.get_upgrade_airflow() +skip_upgrade_airflow = step_factory.get_skip_upgrade_airflow() # DAG Wiring concurrency_check.set_upstream(action_xcom) @@ -66,3 +69,6 @@ drydock_build.set_upstream([ deployment_configuration ]) armada_build.set_upstream(drydock_build) +decide_airflow_upgrade.set_upstream(armada_build) +decide_airflow_upgrade.set_downstream(upgrade_airflow) +decide_airflow_upgrade.set_downstream(skip_upgrade_airflow) diff --git a/shipyard_airflow/plugins/armada_operator.py b/shipyard_airflow/plugins/armada_operator.py index fc0b94c8..1033de2e 100644 --- a/shipyard_airflow/plugins/armada_operator.py +++ b/shipyard_airflow/plugins/armada_operator.py @@ -1,4 +1,4 @@ -# Copyright 2017 AT&T Intellectual Property. All other rights reserved. +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -29,6 +29,7 @@ from get_k8s_pod_port_ip import get_pod_port_ip from service_endpoint import ucp_service_endpoint from service_token import shipyard_service_token from xcom_puller import XcomPuller +from xcom_pusher import XcomPusher class ArmadaOperator(BaseOperator): @@ -39,7 +40,7 @@ class ArmadaOperator(BaseOperator): :param shipyard_conf: Location of shipyard.conf :param sub_dag_name: Child Dag - The Drydock operator assumes that prior steps have set xcoms for + The Armada operator assumes that prior steps have set xcoms for the action and the deployment configuration """ @@ -218,6 +219,8 @@ class ArmadaOperator(BaseOperator): armada_post_apply = {} override_values = [] chart_set = [] + upgrade_airflow_worker = False + # enhance the context's query entity with target_manifest query = context.get('query', {}) query['target_manifest'] = target_manifest @@ -230,11 +233,34 @@ class ArmadaOperator(BaseOperator): set=chart_set, query=query) + # Search for Shipyard deployment in the list of chart upgrades + # NOTE: It is possible for the chart name to take on different + # values, e.g. 'aic-ucp-shipyard', 'ucp-shipyard'. Hence we + # will search for the word 'shipyard', which should exist as + # part of the name of the Shipyard Helm Chart. + for i in armada_post_apply['message']['upgrade']: + if 'shipyard' in i: + upgrade_airflow_worker = True + break + + # Create xcom key 'upgrade_airflow_worker' + # Value of key will depend on whether an upgrade has been + # performed on the Shipyard/Airflow Chart + self.xcom_pusher = XcomPusher(context['task_instance']) + + if upgrade_airflow_worker: + self.xcom_pusher.xcom_push(key='upgrade_airflow_worker', + value='true') + else: + self.xcom_pusher.xcom_push(key='upgrade_airflow_worker', + value='false') + # We will expect Armada to return the releases that it is # deploying. Note that if we try and deploy the same release # twice, we will end up with empty response as nothing has # changed. - if armada_post_apply['message']['install']: + if (armada_post_apply['message']['install'] or + armada_post_apply['message']['upgrade']): logging.info("Armada Apply Successfully Executed") logging.info(armada_post_apply) else: diff --git a/shipyard_airflow/plugins/xcom_pusher.py b/shipyard_airflow/plugins/xcom_pusher.py new file mode 100644 index 00000000..4aed9814 --- /dev/null +++ b/shipyard_airflow/plugins/xcom_pusher.py @@ -0,0 +1,42 @@ +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging + +from airflow.exceptions import AirflowException + +LOG = logging.getLogger(__name__) + + +class XcomPusher(object): + """XcomPusher pushes xcom value + + Create specific key with value and stores as xcom. + """ + + def __init__(self, task_instance): + self.ti = task_instance + + def xcom_push(self, key=None, value=None): + """Push a particular xcom value""" + + LOG.info("Pushing xcom from %s.%s with key %s and value %s", + self.ti.dag_id, + self.ti.task_id, + key, + value) + + try: + self.ti.xcom_push(key=key, value=value) + except: + raise AirflowException("Xcom push failed!")