Redeploy Server - Dags & Operators

This patch set updates the required dags and operators
for the redeploy server workflow. It also introduces the
Promenade Operator.

Note that many of the required functionalities in DryDock
and Promenade are being worked on and are not ready at the
moment. As such, this patch set is mainly providing the
skeleton framework for the redeploy server workflow. The
dags and relevant Operators will be updated at a later date
when the features and functionalities are ready for usage.

Change-Id: I4baae76ea9d8cde9c2b0bab3feac896d01400868
This commit is contained in:
Anthony Lin 2018-01-22 05:44:32 +00:00
parent 313c44d054
commit 3d88cf9e33
10 changed files with 393 additions and 30 deletions

View File

@ -328,6 +328,8 @@ conf:
prepare_node_task_timeout: 1800
deploy_node_query_interval: 30
deploy_node_task_timeout: 3600
destroy_node_query_interval: 30
destroy_node_task_timeout: 900
cluster_join_check_backoff_time: 120
keystone_authtoken:
delay_auth_decision: true

View File

@ -82,6 +82,12 @@
# Time out (in seconds) for deploy_node task (integer value)
#deploy_node_task_timeout = 3600
# Query interval (in seconds) for destroy_node task (integer value)
#destroy_node_query_interval = 30
# Time out (in seconds) for destroy_node task (integer value)
#destroy_node_task_timeout = 900
# Backoff time (in seconds) before checking cluster join (integer value)
#cluster_join_check_backoff_time = 120

View File

@ -170,6 +170,16 @@ SECTIONS = [
default=3600,
help='Time out (in seconds) for deploy_node task'
),
cfg.IntOpt(
'destroy_node_query_interval',
default=30,
help='Query interval (in seconds) for destroy_node task'
),
cfg.IntOpt(
'destroy_node_task_timeout',
default=900,
help='Time out (in seconds) for destroy_node task'
),
cfg.IntOpt(
'cluster_join_check_backoff_time',
default=120,

View File

@ -0,0 +1,95 @@
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from airflow.models import DAG
from airflow.operators import DryDockOperator
from airflow.operators import PromenadeOperator
# Location of shiyard.conf
# Note that the shipyard.conf file needs to be placed on a volume
# that can be accessed by the containers
config_path = '/usr/local/airflow/plugins/shipyard.conf'
def destroy_server(parent_dag_name, child_dag_name, args):
'''
Tear Down Node
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args)
# Drain Node
promenade_drain_node = PromenadeOperator(
task_id='promenade_drain_node',
shipyard_conf=config_path,
action='promenade_drain_node',
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
dag=dag)
# Remove Labels
promenade_remove_labels = PromenadeOperator(
task_id='promenade_remove_labels',
shipyard_conf=config_path,
action='promenade_remove_labels',
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
dag=dag)
# Stop Kubelet
promenade_stop_kubelet = PromenadeOperator(
task_id='promenade_stop_kubelet',
shipyard_conf=config_path,
action='promenade_stop_kubelet',
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
dag=dag)
# ETCD Sanity Check
promenade_check_etcd = PromenadeOperator(
task_id='promenade_check_etcd',
shipyard_conf=config_path,
action='promenade_check_etcd',
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
dag=dag)
# Power down and destroy node using DryDock
drydock_destroy_node = DryDockOperator(
task_id='destroy_node',
shipyard_conf=config_path,
action='destroy_node',
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
dag=dag)
# Delete node from cluster using Promenade
promenade_delete_node = PromenadeOperator(
task_id='promenade_delete_node',
shipyard_conf=config_path,
action='promenade_delete_node',
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
dag=dag)
# Define dependencies
promenade_remove_labels.set_upstream(promenade_drain_node)
promenade_stop_kubelet.set_upstream(promenade_remove_labels)
promenade_check_etcd.set_upstream(promenade_stop_kubelet)
drydock_destroy_node.set_upstream(promenade_check_etcd)
promenade_delete_node.set_upstream(drydock_destroy_node)
return dag

View File

@ -14,24 +14,28 @@
from datetime import timedelta
import airflow
from airflow import DAG
import failure_handlers
from airflow import DAG
from airflow.operators import ConcurrencyCheckOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.subdag_operator import SubDagOperator
from deckhand_get_design import get_design_deckhand
from destroy_node import destroy_server
from drydock_deploy_site import deploy_site_drydock
from preflight_checks import all_preflight_checks
from validate_site_design import validate_site_design
from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators import ConcurrencyCheckOperator
from airflow.operators import DeckhandOperator
from airflow.operators import PlaceholderOperator
from airflow.operators.python_operator import PythonOperator
"""
redeploy_server is the top-level orchestration DAG for redeploying a
server using the Undercloud platform.
"""
PARENT_DAG_NAME = 'redeploy_server'
DAG_CONCURRENCY_CHECK_DAG_NAME = 'dag_concurrency_check'
ALL_PREFLIGHT_CHECKS_DAG_NAME = 'preflight'
DAG_CONCURRENCY_CHECK_DAG_NAME = 'dag_concurrency_check'
DECKHAND_GET_DESIGN_VERSION = 'deckhand_get_design_version'
DESTROY_SERVER_DAG_NAME = 'destroy_server'
DRYDOCK_BUILD_DAG_NAME = 'drydock_build'
PARENT_DAG_NAME = 'redeploy_server'
VALIDATE_SITE_DESIGN_DAG_NAME = 'validate_site_design'
default_args = {
@ -43,7 +47,7 @@ default_args = {
'email_on_retry': False,
'provide_context': True,
'retries': 0,
'retry_delay': timedelta(minutes=1),
'retry_delay': timedelta(seconds=30),
}
dag = DAG(PARENT_DAG_NAME, default_args=default_args, schedule_interval=None)
@ -73,9 +77,11 @@ preflight = SubDagOperator(
PARENT_DAG_NAME, ALL_PREFLIGHT_CHECKS_DAG_NAME, args=default_args),
task_id=ALL_PREFLIGHT_CHECKS_DAG_NAME,
on_failure_callback=failure_handlers.step_failure_handler,
dag=dag, )
dag=dag)
get_design_version = DeckhandOperator(
get_design_version = SubDagOperator(
subdag=get_design_deckhand(
PARENT_DAG_NAME, DECKHAND_GET_DESIGN_VERSION, args=default_args),
task_id=DECKHAND_GET_DESIGN_VERSION,
on_failure_callback=failure_handlers.step_failure_handler,
dag=dag)
@ -87,23 +93,17 @@ validate_site_design = SubDagOperator(
on_failure_callback=failure_handlers.step_failure_handler,
dag=dag)
site_evacuation = PlaceholderOperator(
task_id='site_evacuation',
destroy_server = SubDagOperator(
subdag=destroy_server(
PARENT_DAG_NAME, DESTROY_SERVER_DAG_NAME, args=default_args),
task_id=DESTROY_SERVER_DAG_NAME,
on_failure_callback=failure_handlers.step_failure_handler,
dag=dag)
drydock_rebuild = PlaceholderOperator(
task_id='drydock_rebuild',
on_failure_callback=failure_handlers.step_failure_handler,
dag=dag)
query_node_status = PlaceholderOperator(
task_id='redeployed_node_status',
on_failure_callback=failure_handlers.step_failure_handler,
dag=dag)
armada_rebuild = PlaceholderOperator(
task_id='armada_rebuild',
drydock_build = SubDagOperator(
subdag=deploy_site_drydock(
PARENT_DAG_NAME, DRYDOCK_BUILD_DAG_NAME, args=default_args),
task_id=DRYDOCK_BUILD_DAG_NAME,
on_failure_callback=failure_handlers.step_failure_handler,
dag=dag)
@ -112,7 +112,5 @@ concurrency_check.set_upstream(action_xcom)
preflight.set_upstream(concurrency_check)
get_design_version.set_upstream(preflight)
validate_site_design.set_upstream(get_design_version)
site_evacuation.set_upstream(validate_site_design)
drydock_rebuild.set_upstream(site_evacuation)
query_node_status.set_upstream(drydock_rebuild)
armada_rebuild.set_upstream(query_node_status)
destroy_server.set_upstream(validate_site_design)
drydock_build.set_upstream(destroy_server)

View File

@ -56,6 +56,7 @@ class DeckhandOperator(BaseOperator):
def execute(self, context):
# Initialize Variables
deckhand_design_version = None
redeploy_server = None
# Define task_instance
task_instance = context['task_instance']
@ -71,6 +72,17 @@ class DeckhandOperator(BaseOperator):
# Logs uuid of action performed by the Operator
logging.info("DeckHand Operator for action %s", workflow_info['id'])
# Retrieve information of the server that we want to redeploy if user
# executes the 'redeploy_server' dag
if workflow_info['dag_id'] == 'redeploy_server':
redeploy_server = workflow_info['parameters'].get('server-name')
if redeploy_server:
logging.info("Server to be redeployed is %s", redeploy_server)
else:
raise AirflowException('Unable to retrieve information of '
'node to be redeployed!')
# Retrieve Endpoint Information
svc_type = 'deckhand'
context['svc_endpoint'] = ucp_service_endpoint(self,

View File

@ -68,6 +68,8 @@ class DryDockOperator(BaseOperator):
self.xcom_push_flag = xcom_push
def execute(self, context):
# Initialize Variable
redeploy_server = None
# Placeholder definition
# TODO: Need to decide how to pass the required value from Shipyard to
@ -88,6 +90,19 @@ class DryDockOperator(BaseOperator):
# Logs uuid of action performed by the Operator
logging.info("DryDock Operator for action %s", workflow_info['id'])
# Retrieve information of the server that we want to redeploy if user
# executes the 'redeploy_server' dag
# Set node filter to be the server that we want to redeploy
if workflow_info['dag_id'] == 'redeploy_server':
redeploy_server = workflow_info['parameters'].get('server-name')
if redeploy_server:
logging.info("Server to be redeployed is %s", redeploy_server)
self.node_filter = redeploy_server
else:
raise AirflowException('Unable to retrieve information of '
'node to be redeployed!')
# Retrieve Deckhand Design Reference
self.design_ref = self.get_deckhand_design_ref(context)
@ -188,6 +203,26 @@ class DryDockOperator(BaseOperator):
# polling interval to 30 seconds.
check_node_status(1800, 30)
# Create Task for destroy_node
# NOTE: This is a PlaceHolder function. The 'destroy_node'
# functionalities in DryDock is being worked on and is not
# ready at the moment.
elif self.action == 'destroy_node':
# Default settings for 'destroy_node' execution is to query
# the task every 30 seconds and to time out after 900 seconds
query_interval = config.get('drydock',
'destroy_node_query_interval')
task_timeout = config.get('drydock', 'destroy_node_task_timeout')
logging.info("Destroying node %s from cluster...", redeploy_server)
time.sleep(30)
logging.info("Successfully deleted node %s", redeploy_server)
# TODO: Uncomment when the function to destroy/delete node is
# ready for consumption in Drydock
# self.drydock_action(drydock_client, context, self.action,
# query_interval, task_timeout)
# Do not perform any action
else:
logging.info('No Action to Perform')
@ -235,7 +270,7 @@ class DryDockOperator(BaseOperator):
# Trigger DryDock to execute task and retrieve task ID
task_id = self.drydock_perform_task(drydock_client, context,
action, None)
action, self.node_filter)
logging.info('Task ID is %s', task_id)

View File

@ -0,0 +1,201 @@
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.plugins_manager import AirflowPlugin
from airflow.exceptions import AirflowException
from service_endpoint import ucp_service_endpoint
from service_token import shipyard_service_token
class PromenadeOperator(BaseOperator):
"""
Supports interaction with Promenade
:param action: Task to perform
:param main_dag_name: Parent Dag
:param shipyard_conf: Location of shipyard.conf
:param sub_dag_name: Child Dag
"""
@apply_defaults
def __init__(self,
action=None,
main_dag_name=None,
shipyard_conf=None,
sub_dag_name=None,
workflow_info={},
xcom_push=True,
*args, **kwargs):
super(PromenadeOperator, self).__init__(*args, **kwargs)
self.action = action
self.main_dag_name = main_dag_name
self.shipyard_conf = shipyard_conf
self.sub_dag_name = sub_dag_name
self.workflow_info = workflow_info
self.xcom_push_flag = xcom_push
def execute(self, context):
# Initialize Variables
check_etcd = False
delete_node = False
labels_removed = False
node_drained = False
redeploy_server = None
stop_kubelet = False
# Define task_instance
task_instance = context['task_instance']
# Extract information related to current workflow
# The workflow_info variable will be a dictionary
# that contains information about the workflow such
# as action_id, name and other related parameters
workflow_info = task_instance.xcom_pull(
task_ids='action_xcom', key='action',
dag_id=self.main_dag_name)
# Logs uuid of action performed by the Operator
logging.info("Promenade Operator for action %s", workflow_info['id'])
# Retrieve information of the server that we want to redeploy if user
# executes the 'redeploy_server' dag
if workflow_info['dag_id'] == 'redeploy_server':
redeploy_server = workflow_info['parameters'].get('server-name')
if redeploy_server:
logging.info("Server to be redeployed is %s", redeploy_server)
else:
raise AirflowException('Unable to retrieve information of '
'node to be redeployed!')
# Retrieve Endpoint Information
svc_type = 'kubernetesprovisioner'
context['svc_endpoint'] = ucp_service_endpoint(self,
svc_type=svc_type)
logging.info("Promenade endpoint is %s", context['svc_endpoint'])
# Promenade API Call
# Drain node using Promenade
if self.action == 'promenade_drain_node':
node_drained = self.promenade_drain_node(context,
redeploy_server)
if node_drained:
logging.info("Node %s has been successfully drained",
redeploy_server)
else:
raise AirflowException('Failed to drain %s!',
redeploy_server)
# Remove labels using Promenade
elif self.action == 'promenade_remove_labels':
labels_removed = self.promenade_drain_node(context,
redeploy_server)
if labels_removed:
logging.info("Successfully removed labels on %s",
redeploy_server)
else:
raise AirflowException('Failed to remove labels on %s!',
redeploy_server)
# Stops kubelet on node using Promenade
elif self.action == 'promenade_stop_kubelet':
stop_kubelet = self.promenade_stop_kubelet(context,
redeploy_server)
if stop_kubelet:
logging.info("Successfully stopped kubelet on %s",
redeploy_server)
else:
raise AirflowException('Failed to stopped kubelet on %s!',
redeploy_server)
# Performs etcd sanity check using Promenade
elif self.action == 'promenade_check_etcd':
check_etcd = self.promenade_check_etcd(context)
if check_etcd:
logging.info("The etcd cluster is healthy and ready")
else:
raise AirflowException('Please check the state of etcd!')
# Delete node from cluster using Promenade
elif self.action == 'promenade_delete_node':
delete_node = self.promenade_delete_node(context,
redeploy_server)
if delete_node:
logging.info("Succesfully deleted node %s from cluster",
redeploy_server)
else:
raise AirflowException('Failed to node %s from cluster!',
redeploy_server)
# No action to perform
else:
logging.info('No Action to Perform')
@shipyard_service_token
def promenade_drain_node(self, context, redeploy_server):
# Placeholder function. Updates will be made when the Promenade
# API is ready for consumption.
logging.info("Draining node...")
return True
@shipyard_service_token
def promenade_remove_labels(self, context, redeploy_server):
# Placeholder function. Updates will be made when the Promenade
# API is ready for consumption.
logging.info("Removing labels on node...")
return True
@shipyard_service_token
def promenade_stop_kubelet(self, context, redeploy_server):
# Placeholder function. Updates will be made when the Promenade
# API is ready for consumption.
logging.info("Stopping kubelet on node...")
return True
@shipyard_service_token
def promenade_check_etcd(self, context):
# Placeholder function. Updates will be made when the Promenade
# API is ready for consumption.
logging.info("Performing health check on etcd...")
return True
@shipyard_service_token
def promenade_delete_node(self, context, redeploy_server):
# Placeholder function. Updates will be made when the Promenade
# API is ready for consumption.
logging.info("Deleting node from cluster...")
time.sleep(30)
logging.info("Successfully deleted node %s", redeploy_server)
return True
class PromenadeOperatorPlugin(AirflowPlugin):
name = 'promenade_operator_plugin'
operators = [PromenadeOperator]

View File

@ -11,6 +11,8 @@ service_type = deckhand
cluster_join_check_backoff_time = 120
deploy_node_query_interval = 30
deploy_node_task_timeout = 3600
destroy_node_query_interval = 30
destroy_node_task_timeout = 900
prepare_node_query_interval = 30
prepare_node_task_timeout = 1800
prepare_site_query_interval = 10

View File

@ -13,6 +13,8 @@ service_type = deckhand
cluster_join_check_backoff_time = 120
deploy_node_query_interval = 30
deploy_node_task_timeout = 3600
destroy_node_query_interval = 30
destroy_node_task_timeout = 900
prepare_node_query_interval = 30
prepare_node_task_timeout = 1800
prepare_site_query_interval = 10