Align Operators with UcpBaseOperator

All UCP Operators will inherit from the UcpBaseOperator [0]

This patch set will align the rest of the Operators, i.e. Armada,
Deckhand and Promenade Operators with the UcpBaseOperator

It also updates the name of the shipyard container to be
'shipyard-api' instead of 'shipyard'

[0] https://review.gerrithub.io/#/c/407736/

Change-Id: I516590c492e9bb5554161119dade278d74197374
This commit is contained in:
Anthony Lin 2018-04-18 07:15:26 +00:00
parent 91b60ac595
commit 47cd7a25f4
17 changed files with 153 additions and 184 deletions

View File

@ -24,7 +24,7 @@
apiVersion: apps/v1beta1
kind: Deployment
metadata:
name: shipyard
name: shipyard-api
spec:
replicas: {{ .Values.pod.replicas.shipyard.api }}
{{ tuple $envAll | include "helm-toolkit.snippets.kubernetes_upgrades_deployment" | indent 2 }}
@ -44,7 +44,7 @@ spec:
initContainers:
{{ tuple $envAll $dependencies $mounts_shipyard_init | include "helm-toolkit.snippets.kubernetes_entrypoint_init_container" | indent 8 }}
containers:
- name: shipyard
- name: shipyard-api
env:
- name: 'SHIPYARD_API_WORKERS'
value: {{ .Values.conf.uwsgi.workers | quote }}

View File

@ -11,13 +11,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
from urllib.parse import urlparse
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
@ -26,11 +24,13 @@ import armada.common.session as session
from get_k8s_pod_port_ip import get_pod_port_ip
from service_endpoint import ucp_service_endpoint
from service_token import shipyard_service_token
from xcom_puller import XcomPuller
from ucp_base_operator import UcpBaseOperator
from xcom_pusher import XcomPusher
LOG = logging.getLogger(__name__)
class ArmadaBaseOperator(BaseOperator):
class ArmadaBaseOperator(UcpBaseOperator):
"""Armada Base Operator
@ -44,66 +44,42 @@ class ArmadaBaseOperator(BaseOperator):
def __init__(self,
armada_svc_type='armada',
deckhand_svc_type='deckhand',
main_dag_name=None,
query={},
shipyard_conf=None,
sub_dag_name=None,
svc_session=None,
svc_token=None,
xcom_push=True,
*args, **kwargs):
"""Initialization of ArmadaBaseOperator object.
:param armada_svc_type: Armada Service Type
:param deckhand_svc_type: Deckhand Service Type
:param main_dag_name: Parent Dag
:param query: A dictionary containing explicit query string parameters
:param shipyard_conf: Location of shipyard.conf
:param sub_dag_name: Child Dag
:param svc_session: Keystone Session
:param svc_token: Keystone Token
:param xcom_push: xcom usage
The Armada operator assumes that prior steps have set xcoms for
the action and the deployment configuration
"""
super(ArmadaBaseOperator, self).__init__(*args, **kwargs)
super(ArmadaBaseOperator,
self).__init__(
pod_selector_pattern=[{'pod_pattern': 'armada-api',
'container': 'armada-api'}],
*args, **kwargs)
self.armada_svc_type = armada_svc_type
self.deckhand_svc_type = deckhand_svc_type
self.main_dag_name = main_dag_name
self.query = query
self.shipyard_conf = shipyard_conf
self.sub_dag_name = sub_dag_name
self.svc_session = svc_session
self.svc_token = svc_token
self.xcom_push_flag = xcom_push
def execute(self, context):
# Execute armada base function
self.armada_base(context)
# Exeute child function
self.do_execute()
@shipyard_service_token
def armada_base(self, context):
# Define task_instance
self.task_instance = context['task_instance']
# Set up and retrieve values from xcom
self.xcom_puller = XcomPuller(self.main_dag_name, self.task_instance)
self.action_info = self.xcom_puller.get_action_info()
self.dc = self.xcom_puller.get_deployment_configuration()
def run_base(self, context):
# Set up xcom_pusher to push values to xcom
self.xcom_pusher = XcomPusher(self.task_instance)
# Logs uuid of action performed by the Operator
logging.info("Armada Operator for action %s", self.action_info['id'])
LOG.info("Armada Operator for action %s", self.action_info['id'])
# Retrieve Endpoint Information
armada_svc_endpoint = ucp_service_endpoint(
@ -128,14 +104,14 @@ class ArmadaBaseOperator(BaseOperator):
@staticmethod
def _init_armada_client(armada_svc_endpoint, svc_token):
logging.info("Armada endpoint is %s", armada_svc_endpoint)
LOG.info("Armada endpoint is %s", armada_svc_endpoint)
# Parse Armada Service Endpoint
armada_url = urlparse(armada_svc_endpoint)
# Build a ArmadaSession with credentials and target host
# information.
logging.info("Build Armada Session")
LOG.info("Build Armada Session")
a_session = session.ArmadaSession(host=armada_url.hostname,
port=armada_url.port,
scheme='http',
@ -144,18 +120,18 @@ class ArmadaBaseOperator(BaseOperator):
# Raise Exception if we are not able to set up the session
if a_session:
logging.info("Successfully Set Up Armada Session")
LOG.info("Successfully Set Up Armada Session")
else:
raise AirflowException("Failed to set up Armada Session!")
# Use the ArmadaSession to build a ArmadaClient that can
# be used to make one or more API calls
logging.info("Create Armada Client")
LOG.info("Create Armada Client")
_armada_client = client.ArmadaClient(a_session)
# Raise Exception if we are not able to build armada client
if _armada_client:
logging.info("Successfully Set Up Armada client")
LOG.info("Successfully Set Up Armada client")
return _armada_client
else:
@ -165,7 +141,7 @@ class ArmadaBaseOperator(BaseOperator):
def _init_deckhand_design_ref(deckhand_svc_endpoint,
committed_revision_id):
logging.info("Deckhand endpoint is %s", deckhand_svc_endpoint)
LOG.info("Deckhand endpoint is %s", deckhand_svc_endpoint)
# Form DeckHand Design Reference Path
# This URL will be used to retrieve the Site Design YAMLs
@ -176,8 +152,8 @@ class ArmadaBaseOperator(BaseOperator):
"rendered-documents")
if _deckhand_design_ref:
logging.info("Design YAMLs will be retrieved from %s",
_deckhand_design_ref)
LOG.info("Design YAMLs will be retrieved from %s",
_deckhand_design_ref)
return _deckhand_design_ref
else:

View File

@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from airflow.exceptions import AirflowException
@ -20,6 +19,8 @@ from airflow.plugins_manager import AirflowPlugin
from armada_base_operator import ArmadaBaseOperator
from armada.exceptions import api_exceptions as errors
LOG = logging.getLogger(__name__)
class ArmadaGetReleasesOperator(ArmadaBaseOperator):
@ -39,7 +40,7 @@ class ArmadaGetReleasesOperator(ArmadaBaseOperator):
timeout = self.dc['armada.get_releases_timeout']
# Retrieve Armada Releases after deployment
logging.info("Retrieving Helm charts releases after deployment..")
LOG.info("Retrieving Helm charts releases after deployment..")
try:
armada_get_releases = self.armada_client.get_releases(
@ -50,9 +51,12 @@ class ArmadaGetReleasesOperator(ArmadaBaseOperator):
raise AirflowException(client_error)
if armada_get_releases:
logging.info("Successfully retrieved Helm charts releases")
logging.info(armada_get_releases)
LOG.info("Successfully retrieved Helm charts releases")
LOG.info(armada_get_releases)
else:
# Dump logs from Armada API pods
self.get_k8s_logs()
raise AirflowException("Failed to retrieve Helm charts releases!")

View File

@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from airflow.exceptions import AirflowException
@ -20,6 +19,8 @@ from airflow.plugins_manager import AirflowPlugin
from armada_base_operator import ArmadaBaseOperator
from armada.exceptions import api_exceptions as errors
LOG = logging.getLogger(__name__)
class ArmadaGetStatusOperator(ArmadaBaseOperator):
@ -51,11 +52,14 @@ class ArmadaGetStatusOperator(ArmadaBaseOperator):
# Tiller State will return boolean value, i.e. True/False
# Raise Exception if Tiller is unhealthy
if armada_get_status['tiller']['state']:
logging.info("Tiller is in running state")
logging.info("Tiller version is %s",
armada_get_status['tiller']['version'])
LOG.info("Tiller is in running state")
LOG.info("Tiller version is %s",
armada_get_status['tiller']['version'])
else:
# Dump logs from Armada API pods
self.get_k8s_logs()
raise AirflowException("Please check Tiller!")

View File

@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from airflow.exceptions import AirflowException
@ -20,6 +19,8 @@ from airflow.plugins_manager import AirflowPlugin
from armada_base_operator import ArmadaBaseOperator
from armada.exceptions import api_exceptions as errors
LOG = logging.getLogger(__name__)
class ArmadaPostApplyOperator(ArmadaBaseOperator):
@ -52,7 +53,7 @@ class ArmadaPostApplyOperator(ArmadaBaseOperator):
timeout = self.dc['armada.post_apply_timeout']
# Execute Armada Apply to install the helm charts in sequence
logging.info("Armada Apply")
LOG.info("Armada Apply")
try:
armada_post_apply = self.armada_client.post_apply(
@ -64,6 +65,9 @@ class ArmadaPostApplyOperator(ArmadaBaseOperator):
timeout=timeout)
except errors.ClientError as client_error:
# Dump logs from Armada API pods
self.get_k8s_logs()
raise AirflowException(client_error)
# if this is a retry, assume that the airflow worker needs to be
@ -74,7 +78,7 @@ class ArmadaPostApplyOperator(ArmadaBaseOperator):
# needs to. Problem with xcom is that it is cleared for the task
# on retry, which means we can't use it as a flag reliably.
if self.task_instance.try_number > 1:
logging.info(
LOG.info(
"Airflow Worker will be upgraded because retry may obfuscate "
"an upgrade of shipyard/airflow."
)
@ -87,7 +91,7 @@ class ArmadaPostApplyOperator(ArmadaBaseOperator):
# part of the name of the Shipyard Helm Chart.
for i in armada_post_apply['message']['upgrade']:
if 'shipyard' in i:
logging.info(
LOG.info(
"Shipyard was upgraded. Airflow worker must be "
"restarted to reflect any workflow changes."
)
@ -110,11 +114,11 @@ class ArmadaPostApplyOperator(ArmadaBaseOperator):
# changed.
if (armada_post_apply['message']['install'] or
armada_post_apply['message']['upgrade']):
logging.info("Successfully Executed Armada Apply")
logging.info(armada_post_apply)
LOG.info("Successfully Executed Armada Apply")
LOG.info(armada_post_apply)
else:
logging.warning("No new changes/updates were detected!")
logging.info(armada_post_apply)
LOG.warning("No new changes/updates were detected!")
LOG.info(armada_post_apply)
class ArmadaPostApplyOperatorPlugin(AirflowPlugin):

View File

@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from airflow.plugins_manager import AirflowPlugin
@ -20,6 +19,8 @@ from airflow.exceptions import AirflowException
from armada_base_operator import ArmadaBaseOperator
from armada.exceptions import api_exceptions as errors
LOG = logging.getLogger(__name__)
class ArmadaValidateDesignOperator(ArmadaBaseOperator):
@ -33,7 +34,7 @@ class ArmadaValidateDesignOperator(ArmadaBaseOperator):
def do_execute(self):
# Requests Armada to validate site design
logging.info("Waiting for Armada to validate site design...")
LOG.info("Waiting for Armada to validate site design...")
# Retrieve read timeout
timeout = self.dc['armada.validate_design_timeout']
@ -45,18 +46,24 @@ class ArmadaValidateDesignOperator(ArmadaBaseOperator):
timeout=timeout)
except errors.ClientError as client_error:
# Dump logs from Armada API pods
self.get_k8s_logs()
raise AirflowException(client_error)
# Print results
logging.info("Retrieving Armada validate site design response...")
logging.info(post_validate)
LOG.info("Retrieving Armada validate site design response...")
LOG.info(post_validate)
# Check if site design is valid
status = str(post_validate.get('status', 'unspecified'))
if status.lower() == 'success':
logging.info("Site Design has been successfully validated")
LOG.info("Site Design has been successfully validated")
else:
# Dump logs from Armada API pods
self.get_k8s_logs()
raise AirflowException("Site Design Validation Failed "
"with status: {}!".format(status))

View File

@ -11,11 +11,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import configparser
import logging
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.plugins_manager import AirflowPlugin
from airflow.exceptions import AirflowException
@ -23,10 +21,12 @@ from airflow.exceptions import AirflowException
from deckhand.client import client as deckhand_client
from service_endpoint import ucp_service_endpoint
from service_token import shipyard_service_token
from xcom_puller import XcomPuller
from ucp_base_operator import UcpBaseOperator
LOG = logging.getLogger(__name__)
class DeckhandBaseOperator(BaseOperator):
class DeckhandBaseOperator(UcpBaseOperator):
"""Deckhand Base Operator
@ -43,14 +43,10 @@ class DeckhandBaseOperator(BaseOperator):
deckhand_client_read_timeout=None,
deckhand_svc_endpoint=None,
deckhand_svc_type='deckhand',
main_dag_name=None,
revision_id=None,
shipyard_conf=None,
sub_dag_name=None,
svc_session=None,
svc_token=None,
validation_read_timeout=None,
xcom_push=True,
*args, **kwargs):
"""Initialization of DeckhandBaseOperator object.
@ -59,47 +55,30 @@ class DeckhandBaseOperator(BaseOperator):
:param deckhand_client_read_timeout: Deckhand client connect timeout
:param deckhand_svc_endpoint: Deckhand Service Endpoint
:param deckhand_svc_type: Deckhand Service Type
:param main_dag_name: Parent Dag
:param revision_id: Target revision for workflow
:param shipyard_conf: Path of shipyard.conf
:param sub_dag_name: Child Dag
:param svc_session: Keystone Session
:param svc_token: Keystone Token
:param validation_read_timeout: Deckhand validation timeout
:param xcom_push: xcom usage
"""
super(DeckhandBaseOperator, self).__init__(*args, **kwargs)
super(DeckhandBaseOperator,
self).__init__(
pod_selector_pattern=[{'pod_pattern': 'deckhand-api',
'container': 'deckhand-api'}],
*args, **kwargs)
self.committed_ver = committed_ver
self.deckhandclient = deckhandclient
self.deckhand_client_read_timeout = deckhand_client_read_timeout
self.deckhand_svc_endpoint = deckhand_svc_endpoint
self.deckhand_svc_type = deckhand_svc_type
self.main_dag_name = main_dag_name
self.revision_id = revision_id
self.shipyard_conf = shipyard_conf
self.sub_dag_name = sub_dag_name
self.svc_session = svc_session
self.svc_token = svc_token
self.validation_read_timeout = validation_read_timeout
self.xcom_push_flag = xcom_push
def execute(self, context):
# Execute deckhand base function
self.deckhand_base(context)
# Exeute child function
self.do_execute()
# Push last committed version to xcom for the
# 'get_design_version' subdag
if self.sub_dag_name == 'get_design_version':
return self.committed_ver
@shipyard_service_token
def deckhand_base(self, context):
def run_base(self, context):
# Read and parse shiyard.conf
config = configparser.ConfigParser()
@ -112,26 +91,19 @@ class DeckhandBaseOperator(BaseOperator):
self.validation_read_timeout = int(config.get(
'requests_config', 'validation_read_timeout'))
# Define task_instance
task_instance = context['task_instance']
# Set up and retrieve values from xcom
self.xcom_puller = XcomPuller(self.main_dag_name, task_instance)
self.action_info = self.xcom_puller.get_action_info()
# Logs uuid of Shipyard action
logging.info("Executing Shipyard Action %s",
self.action_info['id'])
LOG.info("Executing Shipyard Action %s",
self.action_info['id'])
# Retrieve Endpoint Information
self.deckhand_svc_endpoint = ucp_service_endpoint(
self, svc_type=self.deckhand_svc_type)
logging.info("Deckhand endpoint is %s",
self.deckhand_svc_endpoint)
LOG.info("Deckhand endpoint is %s",
self.deckhand_svc_endpoint)
# Set up DeckHand Client
logging.info("Setting up DeckHand Client...")
LOG.info("Setting up DeckHand Client...")
# NOTE: The communication between the Airflow workers
# and Deckhand happens via the 'internal' endpoint.
@ -155,7 +127,7 @@ class DeckhandBaseOperator(BaseOperator):
self.revision_id = self.xcom_puller.get_design_version()
if self.revision_id:
logging.info("Revision ID is %d", self.revision_id)
LOG.info("Revision ID is %d", self.revision_id)
else:
raise AirflowException('Failed to retrieve Revision ID!')

View File

@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import requests
@ -22,6 +21,8 @@ from airflow.exceptions import AirflowException
from deckhand_base_operator import DeckhandBaseOperator
LOG = logging.getLogger(__name__)
class DeckhandGetDesignOperator(DeckhandBaseOperator):
@ -42,7 +43,7 @@ class DeckhandGetDesignOperator(DeckhandBaseOperator):
'revisions')
# Retrieve Revision
logging.info("Retrieving revisions information...")
LOG.info("Retrieving revisions information...")
try:
query_params = {'tag': 'committed', 'sort': 'id', 'order': 'desc'}
@ -53,12 +54,15 @@ class DeckhandGetDesignOperator(DeckhandBaseOperator):
timeout=self.deckhand_client_read_timeout).text)
except requests.exceptions.RequestException as e:
# Dump logs from Deckhand pods
self.get_k8s_logs()
raise AirflowException(e)
# Print info about revisions from DeckHand
logging.info("Revisions response: %s", revisions)
logging.info("The number of committed revisions is %s",
revisions['count'])
LOG.info("Revisions response: %s", revisions)
LOG.info("The number of committed revisions is %s",
revisions['count'])
# Search for the latest committed version and save it as xcom.
# Since the order : desc paramater above, this is index 0 if there
@ -66,10 +70,13 @@ class DeckhandGetDesignOperator(DeckhandBaseOperator):
revision_list = revisions.get('results', [])
if revision_list:
self.committed_ver = revision_list[0].get('id')
logging.info("Latest committed revision is %d", self.committed_ver)
LOG.info("Latest committed revision is %d", self.committed_ver)
# Error if we cannot resolve the committed version to use.
if not self.committed_ver:
# Dump logs from Deckhand pods
self.get_k8s_logs()
raise AirflowException("No committed revision found in Deckhand!")

View File

@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from airflow.plugins_manager import AirflowPlugin
@ -19,6 +18,8 @@ from airflow.exceptions import AirflowException
from deckhand_base_operator import DeckhandBaseOperator
LOG = logging.getLogger(__name__)
class DeckhandRetrieveRenderedDocOperator(DeckhandBaseOperator):
@ -30,17 +31,19 @@ class DeckhandRetrieveRenderedDocOperator(DeckhandBaseOperator):
def do_execute(self):
logging.info("Retrieving Rendered Document...")
LOG.info("Retrieving Rendered Document...")
# Retrieve Rendered Document
try:
rendered_doc = self.deckhandclient.revisions.documents(
self.deckhandclient.revisions.documents(
self.revision_id, rendered=True)
logging.info("Successfully Retrieved Rendered Document")
logging.info(rendered_doc)
LOG.info("Successfully Retrieved Rendered Document")
except:
# Dump logs from Deckhand pods
self.get_k8s_logs()
raise AirflowException("Failed to Retrieve Rendered Document!")

View File

@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import requests
@ -22,6 +21,8 @@ from airflow.exceptions import AirflowException
from deckhand_base_operator import DeckhandBaseOperator
LOG = logging.getLogger(__name__)
class DeckhandValidateSiteDesignOperator(DeckhandBaseOperator):
@ -43,7 +44,7 @@ class DeckhandValidateSiteDesignOperator(DeckhandBaseOperator):
str(self.revision_id),
'validations')
# Retrieve Validation list
logging.info("Retrieving validation list...")
LOG.info("Retrieving validation list...")
try:
retrieved_list = yaml.safe_load(
@ -52,14 +53,20 @@ class DeckhandValidateSiteDesignOperator(DeckhandBaseOperator):
timeout=self.validation_read_timeout).text)
except requests.exceptions.RequestException as e:
# Dump logs from Deckhand pods
self.get_k8s_logs()
raise AirflowException(e)
if (any([str(v.get('status', 'unspecified')).lower() == 'failure'
for v in retrieved_list.get('results', [])])):
# Dump logs from Deckhand pods
self.get_k8s_logs()
raise AirflowException("DeckHand Site Design Validation Failed!")
else:
logging.info("Revision %d has been successfully validated",
self.revision_id)
LOG.info("Revision %d has been successfully validated",
self.revision_id)
class DeckhandValidateSiteDesignOperatorPlugin(AirflowPlugin):

View File

@ -11,20 +11,20 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.plugins_manager import AirflowPlugin
from airflow.exceptions import AirflowException
from service_endpoint import ucp_service_endpoint
from service_token import shipyard_service_token
from xcom_puller import XcomPuller
from ucp_base_operator import UcpBaseOperator
LOG = logging.getLogger(__name__)
class PromenadeBaseOperator(BaseOperator):
class PromenadeBaseOperator(UcpBaseOperator):
"""Promenade Base Operator
@ -35,59 +35,36 @@ class PromenadeBaseOperator(BaseOperator):
@apply_defaults
def __init__(self,
main_dag_name=None,
promenade_svc_endpoint=None,
promenade_svc_type='kubernetesprovisioner',
redeploy_server=None,
shipyard_conf=None,
sub_dag_name=None,
svc_token=None,
xcom_push=True,
*args, **kwargs):
"""Initialization of PromenadeBaseOperator object.
:param main_dag_name: Parent Dag
:param promenade_svc_endpoint: Promenade Service Endpoint
:param promenade_svc_type: Promenade Service Type
:param redeploy_server: Server to be redeployed
:param shipyard_conf: Path of shipyard.conf
:param sub_dag_name: Child Dag
:param svc_token: Keystone Token
:param xcom_push: xcom usage
The Drydock operator assumes that prior steps have set xcoms for
the action and the deployment configuration
"""
super(PromenadeBaseOperator, self).__init__(*args,
**kwargs)
self.main_dag_name = main_dag_name
super(PromenadeBaseOperator,
self).__init__(
pod_selector_pattern=[{'pod_pattern': 'promenade-api',
'container': 'promenade-api'}],
*args, **kwargs)
self.promenade_svc_endpoint = promenade_svc_endpoint
self.promenade_svc_type = promenade_svc_type
self.redeploy_server = redeploy_server
self.shipyard_conf = shipyard_conf
self.sub_dag_name = sub_dag_name
self.svc_token = svc_token
self.xcom_push_flag = xcom_push
def execute(self, context):
# Execute promenade base function
self.promenade_base(context)
# Exeute child function
self.do_execute()
@shipyard_service_token
def promenade_base(self, context):
# Define task_instance
task_instance = context['task_instance']
# Set up and retrieve values from xcom
self.xcom_puller = XcomPuller(self.main_dag_name, task_instance)
self.action_info = self.xcom_puller.get_action_info()
self.dc = self.xcom_puller.get_deployment_configuration()
def run_base(self, context):
# Logs uuid of Shipyard action
logging.info("Executing Shipyard Action %s", self.action_info['id'])
LOG.info("Executing Shipyard Action %s", self.action_info['id'])
# Retrieve information of the server that we want to redeploy
# if user executes the 'redeploy_server' dag
@ -96,8 +73,7 @@ class PromenadeBaseOperator(BaseOperator):
'server-name')
if self.redeploy_server:
logging.info("Server to be redeployed is %s",
self.redeploy_server)
LOG.info("Server to be redeployed is %s", self.redeploy_server)
else:
raise AirflowException('%s was unable to retrieve the '
'server to be redeployed.'
@ -107,8 +83,7 @@ class PromenadeBaseOperator(BaseOperator):
self.promenade_svc_endpoint = ucp_service_endpoint(
self, svc_type=self.promenade_svc_type)
logging.info("Promenade endpoint is %s",
self.promenade_svc_endpoint)
LOG.info("Promenade endpoint is %s", self.promenade_svc_endpoint)
class PromenadeBaseOperatorPlugin(AirflowPlugin):

View File

@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
@ -20,6 +19,8 @@ from airflow.exceptions import AirflowException
from promenade_base_operator import PromenadeBaseOperator
LOG = logging.getLogger(__name__)
class PromenadeCheckEtcdOperator(PromenadeBaseOperator):
@ -37,13 +38,13 @@ class PromenadeCheckEtcdOperator(PromenadeBaseOperator):
# TODO(bryan-strassner) use:
# self.dc['kubernetes_provisioner.etcd_ready_timeout']
# self.dc['kubernetes_provisioner.remove_etcd_timeout']
logging.info("Performing health check on etcd...")
LOG.info("Performing health check on etcd...")
time.sleep(5)
check_etcd = True
if check_etcd:
logging.info("The etcd cluster is healthy and ready")
LOG.info("The etcd cluster is healthy and ready")
else:
raise AirflowException('Please check the state of etcd!')

View File

@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
@ -20,6 +19,8 @@ from airflow.exceptions import AirflowException
from promenade_base_operator import PromenadeBaseOperator
LOG = logging.getLogger(__name__)
class PromenadeClearLabelsOperator(PromenadeBaseOperator):
@ -37,14 +38,14 @@ class PromenadeClearLabelsOperator(PromenadeBaseOperator):
# TODO(bryan-strassner) use:
# self.dc['kubernetes_provisioner.clear_labels_timeout']
logging.info("Removing labels on node...")
LOG.info("Removing labels on node...")
time.sleep(5)
labels_removed = True
if labels_removed:
logging.info("Successfully removed labels on %s",
self.redeploy_server)
LOG.info("Successfully removed labels on %s",
self.redeploy_server)
else:
raise AirflowException('Failed to remove labels on %s!',
self.redeploy_server)

View File

@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
@ -20,6 +19,8 @@ from airflow.exceptions import AirflowException
from promenade_base_operator import PromenadeBaseOperator
LOG = logging.getLogger(__name__)
class PromenadeDecommissionNodeOperator(PromenadeBaseOperator):
@ -33,14 +34,14 @@ class PromenadeDecommissionNodeOperator(PromenadeBaseOperator):
def do_execute(self):
# Placeholder function. Updates will be made when the Promenade
# API is ready for consumption.
logging.info("Decommissioning node from Kubernetes cluster...")
LOG.info("Decommissioning node from Kubernetes cluster...")
time.sleep(5)
decommission_node = True
if decommission_node:
logging.info("Succesfully decommissioned node %s",
self.redeploy_server)
LOG.info("Succesfully decommissioned node %s",
self.redeploy_server)
else:
raise AirflowException('Failed to decommission node %s!',
self.redeploy_server)

View File

@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
@ -20,6 +19,8 @@ from airflow.exceptions import AirflowException
from promenade_base_operator import PromenadeBaseOperator
LOG = logging.getLogger(__name__)
class PromenadeDrainNodeOperator(PromenadeBaseOperator):
@ -40,14 +41,14 @@ class PromenadeDrainNodeOperator(PromenadeBaseOperator):
# self.dc['kubernetes_provisioner.drain_timeout']
# self.dc['kubernetes_provisioner.drain_grace_period']
logging.info("Draining node...")
LOG.info("Draining node...")
time.sleep(5)
node_drained = True
if node_drained:
logging.info("Node %s has been successfully drained",
self.redeploy_server)
LOG.info("Node %s has been successfully drained",
self.redeploy_server)
else:
raise AirflowException('Failed to drain %s!',
self.redeploy_server)

View File

@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
@ -20,6 +19,8 @@ from airflow.exceptions import AirflowException
from promenade_base_operator import PromenadeBaseOperator
LOG = logging.getLogger(__name__)
class PromenadeShutdownKubeletOperator(PromenadeBaseOperator):
@ -33,14 +34,14 @@ class PromenadeShutdownKubeletOperator(PromenadeBaseOperator):
def do_execute(self):
# Placeholder function. Updates will be made when the Promenade
# API is ready for consumption.
logging.info("Shutting down kubelet on node...")
LOG.info("Shutting down kubelet on node...")
time.sleep(5)
shutdown_kubelet = True
if shutdown_kubelet:
logging.info("Successfully shut down kubelet on %s",
self.redeploy_server)
LOG.info("Successfully shut down kubelet on %s",
self.redeploy_server)
else:
raise AirflowException('Failed to shut down kubelet on %s!',
self.redeploy_server)

View File

@ -86,6 +86,11 @@ class UcpBaseOperator(BaseOperator):
# Exeute child function
self.do_execute()
# Push last committed version to xcom for the
# 'get_design_version' subdag
if self.sub_dag_name == 'get_design_version':
return self.committed_ver
def ucp_base(self, context):
LOG.info("Running UCP Base Operator...")
@ -98,10 +103,10 @@ class UcpBaseOperator(BaseOperator):
self.ucp_namespace = config.get('k8s_logs', 'ucp_namespace')
# Define task_instance
task_instance = context['task_instance']
self.task_instance = context['task_instance']
# Set up and retrieve values from xcom
self.xcom_puller = XcomPuller(self.main_dag_name, task_instance)
self.xcom_puller = XcomPuller(self.main_dag_name, self.task_instance)
self.action_info = self.xcom_puller.get_action_info()
self.dc = self.xcom_puller.get_deployment_configuration()