Refactor Armada Operator

We will move away from the usage of if/else block and will instead
make use of inheritance (armada base operator) and task specific
operators in our dags to execute the workflow.

We also added timeout option for client read so that the CI/CD team
can set other values for the execution of armada tasks (the default
time out is currently set to 1hr for all tasks).

Change-Id: I563fde76d91feae06a8a0298bc6eaf7cca1e66da
This commit is contained in:
Anthony Lin 2018-03-21 08:50:51 +00:00
parent 29463d5aa2
commit 4c71cc2c83
17 changed files with 585 additions and 417 deletions

View File

@ -1,4 +1,4 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -13,7 +13,9 @@
# limitations under the License.
from airflow.models import DAG
from airflow.operators import ArmadaOperator
from airflow.operators import ArmadaGetReleasesOperator
from airflow.operators import ArmadaGetStatusOperator
from airflow.operators import ArmadaPostApplyOperator
from config_path import config_path
@ -27,35 +29,32 @@ def deploy_site_armada(parent_dag_name, child_dag_name, args):
default_args=args)
# Get Tiller Status
armada_status = ArmadaOperator(
task_id='armada_status',
armada_get_status = ArmadaGetStatusOperator(
task_id='armada_get_status',
shipyard_conf=config_path,
action='armada_status',
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
dag=dag)
# Armada Apply
armada_apply = ArmadaOperator(
task_id='armada_apply',
armada_post_apply = ArmadaPostApplyOperator(
task_id='armada_post_apply',
shipyard_conf=config_path,
action='armada_apply',
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
retries=3,
dag=dag)
# Get Helm Releases
armada_get_releases = ArmadaOperator(
armada_get_releases = ArmadaGetReleasesOperator(
task_id='armada_get_releases',
shipyard_conf=config_path,
action='armada_get_releases',
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
dag=dag)
# Define dependencies
armada_apply.set_upstream(armada_status)
armada_get_releases.set_upstream(armada_apply)
armada_post_apply.set_upstream(armada_get_status)
armada_get_releases.set_upstream(armada_post_apply)
return dag

View File

@ -47,8 +47,8 @@ action_xcom = step_factory.get_action_xcom()
concurrency_check = step_factory.get_concurrency_check()
preflight = step_factory.get_preflight()
get_design_version = step_factory.get_get_design_version()
validate_site_design = step_factory.get_validate_site_design()
deployment_configuration = step_factory.get_deployment_configuration()
validate_site_design = step_factory.get_validate_site_design()
drydock_build = step_factory.get_drydock_build()
armada_build = step_factory.get_armada_build()
@ -56,10 +56,7 @@ armada_build = step_factory.get_armada_build()
concurrency_check.set_upstream(action_xcom)
preflight.set_upstream(concurrency_check)
get_design_version.set_upstream(preflight)
validate_site_design.set_upstream(get_design_version)
deployment_configuration.set_upstream(get_design_version)
drydock_build.set_upstream([
validate_site_design,
deployment_configuration
])
validate_site_design.set_upstream(deployment_configuration)
drydock_build.set_upstream(validate_site_design)
armada_build.set_upstream(drydock_build)

View File

@ -48,8 +48,8 @@ action_xcom = step_factory.get_action_xcom()
concurrency_check = step_factory.get_concurrency_check()
preflight = step_factory.get_preflight()
get_design_version = step_factory.get_get_design_version()
validate_site_design = step_factory.get_validate_site_design()
deployment_configuration = step_factory.get_deployment_configuration()
validate_site_design = step_factory.get_validate_site_design()
destroy_server = step_factory.get_destroy_server()
drydock_build = step_factory.get_drydock_build()
@ -57,10 +57,7 @@ drydock_build = step_factory.get_drydock_build()
concurrency_check.set_upstream(action_xcom)
preflight.set_upstream(concurrency_check)
get_design_version.set_upstream(preflight)
validate_site_design.set_upstream(get_design_version)
deployment_configuration.set_upstream(get_design_version)
destroy_server.set_upstream([
validate_site_design,
deployment_configuration
])
validate_site_design.set_upstream(deployment_configuration)
destroy_server.set_upstream(validate_site_design)
drydock_build.set_upstream(destroy_server)

View File

@ -51,8 +51,8 @@ step_factory = CommonStepFactory(parent_dag_name=PARENT_DAG_NAME,
action_xcom = step_factory.get_action_xcom()
concurrency_check = step_factory.get_concurrency_check()
get_design_version = step_factory.get_get_design_version()
validate_site_design = step_factory.get_validate_site_design()
deployment_configuration = step_factory.get_deployment_configuration()
validate_site_design = step_factory.get_validate_site_design()
drydock_build = step_factory.get_drydock_build()
armada_build = step_factory.get_armada_build()
decide_airflow_upgrade = step_factory.get_decide_airflow_upgrade()
@ -62,12 +62,9 @@ skip_upgrade_airflow = step_factory.get_skip_upgrade_airflow()
# DAG Wiring
concurrency_check.set_upstream(action_xcom)
get_design_version.set_upstream(concurrency_check)
validate_site_design.set_upstream(get_design_version)
deployment_configuration.set_upstream(get_design_version)
drydock_build.set_upstream([
validate_site_design,
deployment_configuration
])
validate_site_design.set_upstream(deployment_configuration)
drydock_build.set_upstream(validate_site_design)
armada_build.set_upstream(drydock_build)
decide_airflow_upgrade.set_upstream(armada_build)
decide_airflow_upgrade.set_downstream(upgrade_airflow)

View File

@ -13,7 +13,7 @@
# limitations under the License.
from airflow.models import DAG
from airflow.operators import ArmadaOperator
from airflow.operators import ArmadaValidateDesignOperator
from airflow.operators import DeckhandValidateSiteDesignOperator
from airflow.operators import DrydockValidateDesignOperator
@ -44,10 +44,9 @@ def validate_site_design(parent_dag_name, child_dag_name, args):
retries=3,
dag=dag)
armada_validate_docs = ArmadaOperator(
armada_validate_docs = ArmadaValidateDesignOperator(
task_id='armada_validate_site_design',
shipyard_conf=config_path,
action='validate_site_design',
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
retries=3,

View File

@ -0,0 +1,199 @@
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
from urllib.parse import urlparse
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
import armada.common.client as client
import armada.common.session as session
from get_k8s_pod_port_ip import get_pod_port_ip
from service_endpoint import ucp_service_endpoint
from service_token import shipyard_service_token
from xcom_puller import XcomPuller
from xcom_pusher import XcomPusher
class ArmadaBaseOperator(BaseOperator):
"""Armada Base Operator
All armada related workflow operators will use the aramda
base operator as the parent and inherit attributes and methods
from this class
"""
@apply_defaults
def __init__(self,
deckhand_design_ref=None,
deckhand_svc_type='deckhand',
armada_client=None,
armada_svc_endpoint=None,
armada_svc_type='armada',
main_dag_name=None,
query={},
shipyard_conf=None,
sub_dag_name=None,
svc_session=None,
svc_token=None,
xcom_push=True,
*args, **kwargs):
"""Initialization of ArmadaBaseOperator object.
:param deckhand_design_ref: A URI reference to the design documents
:param deckhand_svc_type: Deckhand Service Type
:param armadaclient: An instance of armada client
:param armada_svc_endpoint: Armada Service Endpoint
:param armada_svc_type: Armada Service Type
:param main_dag_name: Parent Dag
:param query: A dictionary containing explicit query string parameters
:param shipyard_conf: Location of shipyard.conf
:param sub_dag_name: Child Dag
:param svc_session: Keystone Session
:param svc_token: Keystone Token
:param xcom_push: xcom usage
The Armada operator assumes that prior steps have set xcoms for
the action and the deployment configuration
"""
super(ArmadaBaseOperator, self).__init__(*args, **kwargs)
self.deckhand_design_ref = deckhand_design_ref
self.deckhand_svc_type = deckhand_svc_type
self.armada_client = armada_client
self.armada_svc_endpoint = armada_svc_endpoint
self.armada_svc_type = armada_svc_type
self.main_dag_name = main_dag_name
self.query = query
self.shipyard_conf = shipyard_conf
self.sub_dag_name = sub_dag_name
self.svc_session = svc_session
self.svc_token = svc_token
self.xcom_push_flag = xcom_push
def execute(self, context):
# Execute armada base function
self.armada_base(context)
# Exeute child function
self.do_execute()
@shipyard_service_token
def armada_base(self, context):
# Define task_instance
task_instance = context['task_instance']
# Set up and retrieve values from xcom
self.xcom_puller = XcomPuller(self.main_dag_name, task_instance)
self.action_info = self.xcom_puller.get_action_info()
self.dc = self.xcom_puller.get_deployment_configuration()
# Set up xcom_pusher to push values to xcom
self.xcom_pusher = XcomPusher(context['task_instance'])
# Logs uuid of action performed by the Operator
logging.info("Armada Operator for action %s", self.action_info['id'])
# Set up armada client
self.get_armada_client()
# Get deckhand design reference url
self.get_deckhand_design_ref()
def get_armada_client(self):
# Retrieve Endpoint Information
self.armada_svc_endpoint = ucp_service_endpoint(
self, svc_type=self.armada_svc_type)
logging.info("Armada endpoint is %s", self.armada_svc_endpoint)
# Parse Armada Service Endpoint
armada_url = urlparse(self.armada_svc_endpoint)
# Build a ArmadaSession with credentials and target host
# information.
logging.info("Build Armada Session")
a_session = session.ArmadaSession(host=armada_url.hostname,
port=armada_url.port,
scheme='http',
token=self.svc_token,
marker=None)
# Raise Exception if we are not able to set up the session
if a_session:
logging.info("Successfully Set Up Armada Session")
else:
raise AirflowException("Failed to set up Armada Session!")
# Use the ArmadaSession to build a ArmadaClient that can
# be used to make one or more API calls
logging.info("Create Armada Client")
self.armada_client = client.ArmadaClient(a_session)
# Raise Exception if we are not able to build armada client
if self.armada_client:
logging.info("Successfully Set Up Armada client")
else:
raise AirflowException("Failed to set up Armada client!")
def get_deckhand_design_ref(self):
# Retrieve DeckHand Endpoint Information
deckhand_svc_endpoint = ucp_service_endpoint(
self, svc_type=self.deckhand_svc_type)
logging.info("Deckhand endpoint is %s", deckhand_svc_endpoint)
# Retrieve last committed revision id
committed_revision_id = self.xcom_puller.get_design_version()
# Form DeckHand Design Reference Path
# This URL will be used to retrieve the Site Design YAMLs
deckhand_path = "deckhand+" + deckhand_svc_endpoint
self.deckhand_design_ref = os.path.join(deckhand_path,
"revisions",
str(committed_revision_id),
"rendered-documents")
if self.deckhand_design_ref:
logging.info("Design YAMLs will be retrieved from %s",
self.deckhand_design_ref)
else:
raise AirflowException("Unable to Retrieve Design Reference!")
@get_pod_port_ip('tiller', namespace='kube-system')
def get_tiller_info(self, pods_ip_port={}):
# Assign value to the 'query' dictionary so that we can pass
# it via the Armada Client
self.query['tiller_host'] = pods_ip_port['tiller']['ip']
self.query['tiller_port'] = pods_ip_port['tiller']['port']
class ArmadaBaseOperatorPlugin(AirflowPlugin):
"""Creates ArmadaBaseOperator in Airflow."""
name = 'armada_base_operator_plugin'
operators = [ArmadaBaseOperator]

View File

@ -0,0 +1,64 @@
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from airflow.exceptions import AirflowException
from airflow.plugins_manager import AirflowPlugin
from armada_base_operator import ArmadaBaseOperator
from armada.exceptions import api_exceptions as errors
class ArmadaGetReleasesOperator(ArmadaBaseOperator):
"""Armada Get Releases Operator
This operator will trigger armada to get the Helm charts releases
of the environment.
"""
def do_execute(self):
# Retrieve Tiller Information
self.get_tiller_info(pods_ip_port={})
# Retrieve read timeout
timeout = self.dc['armada.get_releases_timeout']
# Retrieve Armada Releases after deployment
logging.info("Retrieving Helm charts releases after deployment..")
try:
armada_get_releases = self.armada_client.get_releases(
self.query,
timeout=timeout)
except errors.ClientError as client_error:
raise AirflowException(client_error)
if armada_get_releases:
logging.info("Successfully retrieved Helm charts releases")
logging.info(armada_get_releases)
else:
raise AirflowException("Failed to retrieve Helm charts releases!")
class ArmadaGetReleasesOperatorPlugin(AirflowPlugin):
"""Creates ArmadaGetReleasesOperator in Airflow."""
name = 'armada_get_releases_operator'
operators = [ArmadaGetReleasesOperator]

View File

@ -0,0 +1,67 @@
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from airflow.exceptions import AirflowException
from airflow.plugins_manager import AirflowPlugin
from armada_base_operator import ArmadaBaseOperator
from armada.exceptions import api_exceptions as errors
class ArmadaGetStatusOperator(ArmadaBaseOperator):
"""Armada Get Status Operator
This operator will trigger armada to get the current status of
Tiller. Tiller needs to be in a healthy state before any site
deployment/update.
"""
def do_execute(self):
# Retrieve Tiller Information
self.get_tiller_info(pods_ip_port={})
# Retrieve read timeout
timeout = self.dc['armada.get_status_timeout']
# Check State of Tiller
try:
armada_get_status = self.armada_client.get_status(
self.query,
timeout=timeout)
except errors.ClientError as client_error:
raise AirflowException(client_error)
# Tiller State will return boolean value, i.e. True/False
# Raise Exception if Tiller is unhealthy
if armada_get_status['tiller']['state']:
logging.info("Tiller is in running state")
logging.info("Tiller version is %s",
armada_get_status['tiller']['version'])
else:
raise AirflowException("Please check Tiller!")
class ArmadaGetStatusOperatorPlugin(AirflowPlugin):
"""Creates ArmadaGetStatusOperator in Airflow."""
name = 'armada_get_status_operator'
operators = [ArmadaGetStatusOperator]

View File

@ -1,361 +0,0 @@
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import os
import requests
from urllib.parse import urlparse
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.plugins_manager import AirflowPlugin
from airflow.exceptions import AirflowException
import armada.common.client as client
import armada.common.session as session
from get_k8s_pod_port_ip import get_pod_port_ip
from service_endpoint import ucp_service_endpoint
from service_token import shipyard_service_token
from xcom_puller import XcomPuller
from xcom_pusher import XcomPusher
class ArmadaOperator(BaseOperator):
"""
Supports interaction with Armada
:param action: Task to perform
:param main_dag_name: Parent Dag
:param shipyard_conf: Location of shipyard.conf
:param sub_dag_name: Child Dag
The Armada operator assumes that prior steps have set xcoms for
the action and the deployment configuration
"""
@apply_defaults
def __init__(self,
action=None,
main_dag_name=None,
shipyard_conf=None,
svc_token=None,
sub_dag_name=None,
xcom_push=True,
*args, **kwargs):
super(ArmadaOperator, self).__init__(*args, **kwargs)
self.action = action
self.main_dag_name = main_dag_name
self.shipyard_conf = shipyard_conf
self.svc_token = svc_token
self.sub_dag_name = sub_dag_name
self.xcom_push_flag = xcom_push
def execute(self, context):
# Initialize Variables
armada_client = None
design_ref = None
# Define task_instance
task_instance = context['task_instance']
# Set up and retrieve values from xcom
self.xcom_puller = XcomPuller(self.main_dag_name, task_instance)
self.action_info = self.xcom_puller.get_action_info()
# Logs uuid of action performed by the Operator
logging.info("Armada Operator for action %s", self.action_info['id'])
# Retrieve Deckhand Design Reference
design_ref = self.get_deckhand_design_ref(context)
if design_ref:
logging.info("Design YAMLs will be retrieved from %s",
design_ref)
else:
raise AirflowException("Unable to Retrieve Design Reference!")
# Validate Site Design
if self.action == 'validate_site_design':
# Initialize variable
armada_svc_endpoint = None
site_design_validity = 'invalid'
# Retrieve Endpoint Information
svc_type = 'armada'
armada_svc_endpoint = ucp_service_endpoint(self,
svc_type=svc_type)
site_design_validity = self.armada_validate_site_design(
armada_svc_endpoint, design_ref)
if site_design_validity == 'valid':
logging.info("Site Design has been successfully validated")
else:
raise AirflowException("Site Design Validation Failed!")
return site_design_validity
# Set up target manifest (only if not doing validate)
self.dc = self.xcom_puller.get_deployment_configuration()
self.target_manifest = self.dc['armada.manifest']
# Create Armada Client
# Retrieve Endpoint Information
svc_type = 'armada'
armada_svc_endpoint = ucp_service_endpoint(self,
svc_type=svc_type)
logging.info("Armada endpoint is %s", armada_svc_endpoint)
# Set up Armada Client
armada_client = self.armada_session_client(armada_svc_endpoint)
# Retrieve Tiller Information and assign to context 'query'
context['query'] = self.get_tiller_info(context)
# Armada API Call
# Armada Status
if self.action == 'armada_status':
self.get_armada_status(context, armada_client)
# Armada Apply
elif self.action == 'armada_apply':
self.armada_apply(context, armada_client, design_ref,
self.target_manifest)
# Armada Get Releases
elif self.action == 'armada_get_releases':
self.armada_get_releases(context, armada_client)
else:
logging.info('No Action to Perform')
@shipyard_service_token
def armada_session_client(self, armada_svc_endpoint):
# Initialize Variables
armada_url = None
a_session = None
a_client = None
# Parse Armada Service Endpoint
armada_url = urlparse(armada_svc_endpoint)
# Build a ArmadaSession with credentials and target host
# information.
logging.info("Build Armada Session")
a_session = session.ArmadaSession(host=armada_url.hostname,
port=armada_url.port,
scheme='http',
token=self.svc_token,
marker=None)
# Raise Exception if we are not able to get armada session
if a_session:
logging.info("Successfully Set Up Armada Session")
else:
raise AirflowException("Failed to set up Armada Session!")
# Use session to build a ArmadaClient to make one or more
# API calls. The ArmadaSession will care for TCP connection
# pooling and header management
logging.info("Create Armada Client")
a_client = client.ArmadaClient(a_session)
# Raise Exception if we are not able to build armada client
if a_client:
logging.info("Successfully Set Up Armada client")
else:
raise AirflowException("Failed to set up Armada client!")
# Return Armada client for XCOM Usage
return a_client
@get_pod_port_ip('tiller', namespace='kube-system')
def get_tiller_info(self, context, *args, **kwargs):
# Initialize Variable
query = {}
# Get IP and port information of Pods from context
k8s_pods_ip_port = context['pods_ip_port']
# Assign value to the 'query' dictionary so that we can pass
# it via the Armada Client
query['tiller_host'] = k8s_pods_ip_port['tiller'].get('ip')
query['tiller_port'] = k8s_pods_ip_port['tiller'].get('port')
return query
def get_armada_status(self, context, armada_client):
# Check State of Tiller
armada_status = armada_client.get_status(context['query'])
# Tiller State will return boolean value, i.e. True/False
# Raise Exception if Tiller is in a bad state
if armada_status['tiller']['state']:
logging.info("Tiller is in running state")
logging.info("Tiller version is %s",
armada_status['tiller']['version'])
else:
raise AirflowException("Please check Tiller!")
def armada_apply(self, context, armada_client, design_ref,
target_manifest):
'''Run Armada Apply
'''
# Initialize Variables
armada_manifest = None
armada_ref = design_ref
armada_post_apply = {}
override_values = []
chart_set = []
upgrade_airflow_worker = False
# enhance the context's query entity with target_manifest
query = context.get('query', {})
query['target_manifest'] = target_manifest
# Execute Armada Apply to install the helm charts in sequence
logging.info("Armada Apply")
armada_post_apply = armada_client.post_apply(manifest=armada_manifest,
manifest_ref=armada_ref,
values=override_values,
set=chart_set,
query=query)
# Search for Shipyard deployment in the list of chart upgrades
# NOTE: It is possible for the chart name to take on different
# values, e.g. 'aic-ucp-shipyard', 'ucp-shipyard'. Hence we
# will search for the word 'shipyard', which should exist as
# part of the name of the Shipyard Helm Chart.
for i in armada_post_apply['message']['upgrade']:
if 'shipyard' in i:
upgrade_airflow_worker = True
break
# Create xcom key 'upgrade_airflow_worker'
# Value of key will depend on whether an upgrade has been
# performed on the Shipyard/Airflow Chart
self.xcom_pusher = XcomPusher(context['task_instance'])
if upgrade_airflow_worker:
self.xcom_pusher.xcom_push(key='upgrade_airflow_worker',
value='true')
else:
self.xcom_pusher.xcom_push(key='upgrade_airflow_worker',
value='false')
# We will expect Armada to return the releases that it is
# deploying. Note that if we try and deploy the same release
# twice, we will end up with empty response as nothing has
# changed.
if (armada_post_apply['message']['install'] or
armada_post_apply['message']['upgrade']):
logging.info("Armada Apply Successfully Executed")
logging.info(armada_post_apply)
else:
logging.warning("No new changes/updates were detected")
logging.info(armada_post_apply)
def armada_get_releases(self, context, armada_client):
# Initialize Variables
armada_releases = {}
deckhand_svc_endpoint = None
# Retrieve Armada Releases after deployment
logging.info("Retrieving Armada Releases after deployment..")
armada_releases = armada_client.get_releases(context['query'])
if armada_releases:
logging.info("Retrieved current Armada Releases")
logging.info(armada_releases)
else:
raise AirflowException("Failed to retrieve Armada Releases")
def get_deckhand_design_ref(self, context):
# Retrieve DeckHand Endpoint Information
svc_type = 'deckhand'
deckhand_svc_endpoint = ucp_service_endpoint(self,
svc_type=svc_type)
logging.info("Deckhand endpoint is %s", deckhand_svc_endpoint)
# Retrieve revision_id from xcom
committed_revision_id = self.xcom_puller.get_design_version()
# Form Design Reference Path that we will use to retrieve
# the Design YAMLs
deckhand_path = "deckhand+" + deckhand_svc_endpoint
deckhand_design_ref = os.path.join(deckhand_path,
"revisions",
str(committed_revision_id),
"rendered-documents")
return deckhand_design_ref
@shipyard_service_token
def armada_validate_site_design(self, armada_svc_endpoint, design_ref):
# Form Validation Endpoint
validation_endpoint = os.path.join(armada_svc_endpoint,
'validatedesign')
logging.info("Validation Endpoint is %s", validation_endpoint)
# Define Headers and Payload
headers = {
'Content-Type': 'application/json',
'X-Auth-Token': self.svc_token
}
payload = {
'rel': "design",
'href': design_ref,
'type': "application/x-yaml"
}
# Requests Armada to validate site design
logging.info("Waiting for Armada to validate site design...")
try:
design_validate_response = requests.post(validation_endpoint,
headers=headers,
data=json.dumps(payload))
except requests.exceptions.RequestException as e:
raise AirflowException(e)
# Convert response to string
validate_site_design = design_validate_response.text
# Print response
logging.info("Retrieving Armada validate site design response...")
try:
validate_site_design_dict = json.loads(validate_site_design)
logging.info(validate_site_design_dict)
except json.JSONDecodeError as e:
raise AirflowException(e)
status = str(validate_site_design_dict.get('status', 'unspecified'))
# Check if site design is valid
logging.info("Armada Site Design valdation status "
"is: {}".format(status))
if status.lower() == 'success':
return 'valid'
else:
return 'invalid'
class ArmadaOperatorPlugin(AirflowPlugin):
name = 'armada_operator_plugin'
operators = [ArmadaOperator]

View File

@ -0,0 +1,130 @@
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from airflow.exceptions import AirflowException
from airflow.plugins_manager import AirflowPlugin
from armada_base_operator import ArmadaBaseOperator
from armada.exceptions import api_exceptions as errors
class ArmadaPostApplyOperator(ArmadaBaseOperator):
"""Armada Post Apply Operator
This operator will trigger armada to apply the manifest and
start a site deployment/update/upgrade.
"""
def do_execute(self):
# Initialize Variables
armada_manifest = None
chart_set = []
override_values = []
upgrade_airflow_worker = False
# Set up target manifest
self.dc = self.xcom_puller.get_deployment_configuration()
self.target_manifest = self.dc['armada.manifest']
# Retrieve Tiller Information
self.get_tiller_info(pods_ip_port={})
# Update query dict with information of target_manifest
self.query['target_manifest'] = self.target_manifest
# Retrieve read timeout
timeout = self.dc['armada.post_apply_timeout']
# Execute Armada Apply to install the helm charts in sequence
logging.info("Armada Apply")
try:
armada_post_apply = self.armada_client.post_apply(
manifest=armada_manifest,
manifest_ref=self.deckhand_design_ref,
values=override_values,
set=chart_set,
query=self.query,
timeout=timeout)
except errors.ClientError as client_error:
# Set 'get_attempted_failed_install_upgrade' xcom to 'true'
self.xcom_pusher.xcom_push(
key='get_attempted_failed_install_upgrade',
value='true')
raise AirflowException(client_error)
# Retrieve xcom for 'get_attempted_failed_install_upgrade'
# NOTE: The key will only be set to 'true' if there was a failed
# attempt to upgrade or update the Helm charts. It does not hold
# any value by default.
if self.xcom_puller.get_attempted_failed_install_upgrade() == 'true':
# NOTE: It is possible for Armada to return a HTTP 500 response
# even though the Helm charts have been upgraded/updated. The
# workflow will treat the 'Armada Apply' task as a failed attempt
# in such situation and proceed to schedule and run the task for
# a second time (the default is 3 retries). As the relevant Helm
# Charts would have already been updated, we will get an empty
# list from Armada for that second retry. As a workaround, we will
# need to treat such response as a successful upgrade/update.
# A long term solution will be in place in the future.
if (not armada_post_apply['message']['install'] and
not armada_post_apply['message']['upgrade']):
upgrade_airflow_worker = True
# Search for Shipyard deployment in the list of chart upgrades
# NOTE: It is possible for the chart name to take on different
# values, e.g. 'aic-ucp-shipyard', 'ucp-shipyard'. Hence we
# will search for the word 'shipyard', which should exist as
# part of the name of the Shipyard Helm Chart.
for i in armada_post_apply['message']['upgrade']:
if 'shipyard' in i:
upgrade_airflow_worker = True
break
# Create xcom key 'upgrade_airflow_worker'
# Value of key will depend on whether an upgrade has been
# performed on the Shipyard/Airflow Chart
if upgrade_airflow_worker:
self.xcom_pusher.xcom_push(key='upgrade_airflow_worker',
value='true')
else:
self.xcom_pusher.xcom_push(key='upgrade_airflow_worker',
value='false')
# We will expect Armada to return the releases that it is
# deploying. Note that if we try and deploy the same release
# twice, we will end up with empty response as nothing has
# changed.
if (armada_post_apply['message']['install'] or
armada_post_apply['message']['upgrade']):
logging.info("Successfully Executed Armada Apply")
logging.info(armada_post_apply)
else:
logging.warning("No new changes/updates were detected!")
logging.info(armada_post_apply)
class ArmadaPostApplyOperatorPlugin(AirflowPlugin):
"""Creates ArmadaPostApplyOperator in Airflow."""
name = 'armada_post_apply_operator'
operators = [ArmadaPostApplyOperator]

View File

@ -0,0 +1,69 @@
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from airflow.plugins_manager import AirflowPlugin
from airflow.exceptions import AirflowException
from armada_base_operator import ArmadaBaseOperator
from armada.exceptions import api_exceptions as errors
class ArmadaValidateDesignOperator(ArmadaBaseOperator):
"""Armada Validate Design Operator
This operator will trigger armada to validate the
site design
"""
def do_execute(self):
# Requests Armada to validate site design
logging.info("Waiting for Armada to validate site design...")
# Retrieve read timeout
timeout = self.dc['armada.validate_design_timeout']
# Validate Site Design
try:
post_validate = self.armada_client.post_validate(
manifest=self.deckhand_design_ref,
timeout=timeout)
except errors.ClientError as client_error:
raise AirflowException(client_error)
# Print results
logging.info("Retrieving Armada validate site design response...")
logging.info(post_validate)
# Check if site design is valid
status = str(post_validate.get('status', 'unspecified'))
if status.lower() == 'success':
logging.info("Site Design has been successfully validated")
else:
raise AirflowException("Site Design Validation Failed "
"with status: {}!".format(status))
class ArmadaValidateDesignOperatorPlugin(AirflowPlugin):
"""Creates ArmadaValidateDesignOperator in Airflow."""
name = 'armada_validate_design_operator'
operators = [ArmadaValidateDesignOperator]

View File

@ -64,7 +64,11 @@ class DeploymentConfigurationOperator(BaseOperator):
"kubernetes_provisioner.clear_labels_timeout": 1800,
"kubernetes_provisioner.remove_etcd_timeout": 1800,
"kubernetes_provisioner.etcd_ready_timeout": 600,
"armada.manifest": "full-site"
"armada.get_releases_timeout": 300,
"armada.get_status_timeout": 300,
"armada.manifest": "full-site",
"armada.post_apply_timeout": 1800,
"armada.validate_design_timeout": 600
}
@apply_defaults

View File

@ -22,26 +22,25 @@ from kubernetes import client, config
def get_pod_port_ip(*pods, namespace):
def get_k8s_pod_port_ip(func):
@wraps(func)
def k8s_pod_port_ip_get(self, context, *args, **kwargs):
def k8s_pod_port_ip_get(self, pods_ip_port):
"""This function retrieves Kubernetes Pod Port and IP
information. It can be used to retrieve information of
single pod deployment and/or statefulsets. For instance,
it can be used to retrieve the tiller pod IP and port
information for usage in the Armada Operator.
:param context: Information on the current workflow
:param pods_ip_port: IP and port information of the pods
Example::
from get_k8s_pod_port_ip import get_pod_port_ip
@get_pod_port_ip('tiller', namespace='kube-system')
def get_pod_info(self, context, *args, **kwargs):
# Get IP and port information of Pods from context
k8s_pods_ip_port = context['pods_ip_port']
def get_pod_info(self, pods_ip_port={}):
tiller_ip = pods_ip_port['tiller']['ip']
tiller_port = pods_ip_port['tiller']['port']
tiller_ip = k8s_pods_ip_port['tiller'].get('ip')
tiller_port = k8s_pods_ip_port['tiller'].get('port')
"""
# Initialize variable
k8s_pods = {}
@ -116,10 +115,7 @@ def get_pod_port_ip(*pods, namespace):
if not pod_attr[pod_name]:
raise AirflowException("Unable to locate", pod_name)
# Assign pods IP and ports information to context
context['pods_ip_port'] = k8s_pods
return func(self, context, *args, **kwargs)
return func(self, pods_ip_port=k8s_pods)
return k8s_pod_port_ip_get
return get_k8s_pod_port_ip

View File

@ -82,3 +82,13 @@ class XcomPuller(object):
return self._get_xcom(source_task=source_task,
dag_id=source_dag,
key=key)
def get_attempted_failed_install_upgrade(self):
"""Retrieve information on whether there was a failed attempt
of Armada Apply"""
source_task = 'armada_post_apply'
source_dag = 'armada_build'
key = 'attempted_failed_install_upgrade'
return self._get_xcom(source_task=source_task,
dag_id=source_dag,
key=key)

View File

@ -66,8 +66,16 @@ data:
armada:
type: 'object'
properties:
get_releases_timeout:
type: 'integer'
get_status_timeout:
type: 'integer'
manifest:
type: 'string'
post_apply_timeout:
type: 'integer'
validate_design_timeout:
type: 'integer'
additionalProperties: false
required:
- manifest

View File

@ -10,18 +10,7 @@ airflow_api_read_timeout = 60
[deckhand]
service_type = deckhand
[drydock]
cluster_join_check_backoff_time = 120
deploy_node_query_interval = 30
deploy_node_task_timeout = 3600
destroy_node_query_interval = 30
destroy_node_task_timeout = 900
prepare_node_query_interval = 30
prepare_node_task_timeout = 1800
prepare_site_query_interval = 10
prepare_site_task_timeout = 300
service_type = physicalprovisioner
verify_site_query_interval = 10
verify_site_task_timeout = 60
[keystone_authtoken]
auth_section = keystone_authtoken
auth_type = password

View File

@ -28,4 +28,8 @@ data:
remove_etcd_timeout: 1800
etcd_ready_timeout: 600
armada:
get_releases_timeout: 300
get_status_timeout: 300
manifest: 'full-site'
post_apply_timeout: 1800
validate_design_timeout: 600