Refactor UCP Health Check Operator
There has been significant changes to the Shipyard code base since the last major update to the UCP Health Check Operator. This patch set is meant to align its implementation with the rest of the Operators. It removes the usage of 'urlopen' which can be a security risk and make use of the python 'requests' module instead. We are also adding 'timeout' parameters to the other Operators that are using 'requests.get' as failure to do so can cause the Operator(s) to hang indefinitely. The default time out has been set to 30 seconds. It is noted that nearly all production code should use this parameter in nearly all requests. Change-Id: I1205aab38ff120cd239c236dc9bdffd1660c9afb
This commit is contained in:
parent
4de8c00830
commit
768981df44
|
@ -329,9 +329,6 @@ conf:
|
|||
deploy_node_query_interval: 30
|
||||
deploy_node_task_timeout: 3600
|
||||
cluster_join_check_backoff_time: 120
|
||||
healthcheck:
|
||||
schema: http
|
||||
endpoint: /api/v1.0/health
|
||||
keystone_authtoken:
|
||||
delay_auth_decision: true
|
||||
auth_type: password
|
||||
|
|
|
@ -85,18 +85,6 @@
|
|||
# Backoff time (in seconds) before checking cluster join (integer value)
|
||||
#cluster_join_check_backoff_time = 120
|
||||
|
||||
[healthcheck]
|
||||
|
||||
#
|
||||
# From shipyard_airflow
|
||||
#
|
||||
|
||||
# Schema to perform health check with (string value)
|
||||
#schema = http
|
||||
|
||||
# Health check standard endpoint (string value)
|
||||
#endpoint = /api/v1.0/health
|
||||
|
||||
|
||||
[keystone_authtoken]
|
||||
|
||||
|
|
|
@ -177,22 +177,6 @@ SECTIONS = [
|
|||
),
|
||||
]
|
||||
),
|
||||
ConfigSection(
|
||||
name='healthcheck',
|
||||
title='Healthcheck connection info',
|
||||
options=[
|
||||
cfg.StrOpt(
|
||||
'schema',
|
||||
default='http',
|
||||
help='Schema to perform health check with'
|
||||
),
|
||||
cfg.StrOpt(
|
||||
'endpoint',
|
||||
default='/api/v1.0/health',
|
||||
help='Health check standard endpoint'
|
||||
),
|
||||
]
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
|
|
|
@ -42,43 +42,12 @@ def all_preflight_checks(parent_dag_name, child_dag_name, args):
|
|||
dag=dag)
|
||||
|
||||
'''
|
||||
Checks that shipyard is in a good state for the purposes of the
|
||||
Undercloud Platform to proceed with processing
|
||||
Check that all UCP components are in good state for the purposes
|
||||
of the Undercloud Platform to proceed with processing
|
||||
'''
|
||||
shipyard = UcpHealthCheckOperator(
|
||||
task_id='shipyard_preflight_check',
|
||||
task_id='ucp_preflight_check',
|
||||
shipyard_conf=config_path,
|
||||
ucp_node='shipyard',
|
||||
dag=dag)
|
||||
|
||||
'''
|
||||
Checks that deckhand is in a good state for the purposes of the
|
||||
Undercloud Platform to proceed with processing
|
||||
'''
|
||||
deckhand = UcpHealthCheckOperator(
|
||||
task_id='deckhand_preflight_check',
|
||||
shipyard_conf=config_path,
|
||||
ucp_node='deckhand',
|
||||
dag=dag)
|
||||
|
||||
'''
|
||||
Checks that drydock is in a good state for the purposes of the
|
||||
Undercloud Platform to proceed with processing
|
||||
'''
|
||||
drydock = UcpHealthCheckOperator(
|
||||
task_id='drydock_preflight_check',
|
||||
shipyard_conf=config_path,
|
||||
ucp_node='drydock',
|
||||
dag=dag)
|
||||
|
||||
'''
|
||||
Checks that armada is in a good state for the purposes of the
|
||||
Undercloud Platform to proceed with processing
|
||||
'''
|
||||
armada = UcpHealthCheckOperator(
|
||||
task_id='armada_preflight_check',
|
||||
shipyard_conf=config_path,
|
||||
ucp_node='armada',
|
||||
dag=dag)
|
||||
|
||||
return dag
|
||||
|
|
|
@ -280,7 +280,7 @@ class ArmadaOperator(BaseOperator):
|
|||
# TODO: We will implement the new approach when Armada and DeckHand
|
||||
# integration is completed.
|
||||
try:
|
||||
armada_manifest = requests.get(design_ref).text
|
||||
armada_manifest = requests.get(design_ref, timeout=30).text
|
||||
except requests.exceptions.RequestException as e:
|
||||
raise AirflowException(e)
|
||||
|
||||
|
|
|
@ -122,7 +122,7 @@ class DeckhandOperator(BaseOperator):
|
|||
query_params = {'tag': 'committed', 'sort': 'id', 'order': 'desc'}
|
||||
revisions = yaml.safe_load(requests.get(
|
||||
revision_endpoint, headers=x_auth_token,
|
||||
params=query_params).text)
|
||||
params=query_params, timeout=30).text)
|
||||
except requests.exceptions.RequestException as e:
|
||||
raise AirflowException(e)
|
||||
|
||||
|
@ -163,7 +163,8 @@ class DeckhandOperator(BaseOperator):
|
|||
|
||||
try:
|
||||
retrieved_list = yaml.safe_load(
|
||||
requests.get(validation_endpoint, headers=x_auth_token).text)
|
||||
requests.get(validation_endpoint, headers=x_auth_token,
|
||||
timeout=30).text)
|
||||
except requests.exceptions.RequestException as e:
|
||||
raise AirflowException(e)
|
||||
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
|
@ -13,65 +13,73 @@
|
|||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import configparser
|
||||
import os
|
||||
import requests
|
||||
|
||||
from airflow.exceptions import AirflowException
|
||||
from airflow.models import BaseOperator
|
||||
from airflow.plugins_manager import AirflowPlugin
|
||||
from airflow.utils.decorators import apply_defaults
|
||||
from urllib.request import urlopen
|
||||
from urllib.error import URLError, HTTPError
|
||||
from socket import timeout
|
||||
|
||||
from service_endpoint import ucp_service_endpoint
|
||||
|
||||
|
||||
class UcpHealthCheckOperator(BaseOperator):
|
||||
"""
|
||||
UCP Health Checks
|
||||
:shipyard_conf: Location of shipyard.conf
|
||||
:ucp_node: ucp node to perform health check on
|
||||
"""
|
||||
|
||||
@apply_defaults
|
||||
def __init__(self,
|
||||
shipyard_conf,
|
||||
ucp_node,
|
||||
*args,
|
||||
**kwargs):
|
||||
|
||||
super(UcpHealthCheckOperator, self).__init__(*args, **kwargs)
|
||||
self.shipyard_conf = shipyard_conf
|
||||
self.ucp_node = ucp_node
|
||||
|
||||
def execute(self, context):
|
||||
logging.info("Performing Health Check on %s", self.ucp_node)
|
||||
|
||||
# Read and parse shiyard.conf
|
||||
config = configparser.ConfigParser()
|
||||
config.read(self.shipyard_conf)
|
||||
# Initialize variable
|
||||
# TODO: Include Promenade when its API endpoint is ready
|
||||
ucp_components = [
|
||||
'armada',
|
||||
'deckhand',
|
||||
'physicalprovisioner',
|
||||
'shipyard']
|
||||
|
||||
# Construct Health Check API Endpoint
|
||||
schema = config.get('healthcheck', 'schema')
|
||||
endpoint = config.get('healthcheck', 'endpoint')
|
||||
host = config.get(self.ucp_node, 'host')
|
||||
port = config.get(self.ucp_node, 'port')
|
||||
# Loop through various UCP Components
|
||||
for i in ucp_components:
|
||||
|
||||
url = schema + '://' + host + ':' + port + endpoint
|
||||
# Define context 'svc_type'
|
||||
context['svc_type'] = i
|
||||
|
||||
try:
|
||||
# Set health check timeout to 30 seconds
|
||||
# using nosec since the urls are taken from our own configuration
|
||||
# files only, never external.
|
||||
req = urlopen(url, timeout=30).read().decode('utf-8') # nosec
|
||||
except (HTTPError, URLError) as error:
|
||||
# Raise Exception for HTTP/URL Error
|
||||
logging.error('Error Encountered: %s', error)
|
||||
raise AirflowException("HTTP/URL Error Encountered")
|
||||
except timeout:
|
||||
# Raise Exception for Timeout
|
||||
logging.error('Health Check Timed Out for %s', self.ucp_node)
|
||||
raise AirflowException("Health Check Timed Out")
|
||||
else:
|
||||
logging.info("%s is alive and healthy", self.ucp_node)
|
||||
# Retrieve Endpoint Information
|
||||
context['svc_endpoint'] = ucp_service_endpoint(self, context)
|
||||
logging.info("%s endpoint is %s", i, context['svc_endpoint'])
|
||||
|
||||
# Construct Health Check Endpoint
|
||||
healthcheck_endpoint = os.path.join(context['svc_endpoint'],
|
||||
'health')
|
||||
|
||||
logging.info("%s healthcheck endpoint is %s", i,
|
||||
healthcheck_endpoint)
|
||||
|
||||
try:
|
||||
logging.info("Performing Health Check on %s", i)
|
||||
|
||||
# Set health check timeout to 30 seconds
|
||||
req = requests.get(healthcheck_endpoint, timeout=30)
|
||||
except requests.exceptions.RequestException as e:
|
||||
raise AirflowException(e)
|
||||
|
||||
# UCP Component will return empty response/body to show that
|
||||
# it is healthy
|
||||
if req.status_code == 204:
|
||||
logging.info("%s is alive and healthy", i)
|
||||
else:
|
||||
logging.error(req.text)
|
||||
raise AirflowException("Invalid Response!")
|
||||
|
||||
|
||||
class UcpHealthCheckPlugin(AirflowPlugin):
|
||||
|
|
|
@ -18,9 +18,6 @@ prepare_site_task_timeout = 120
|
|||
service_type = physicalprovisioner
|
||||
verify_site_query_interval = 10
|
||||
verify_site_task_timeout = 60
|
||||
[healthcheck]
|
||||
endpoint = /api/v1.0/health
|
||||
schema = http
|
||||
[keystone_authtoken]
|
||||
auth_section = keystone_authtoken
|
||||
auth_type = password
|
||||
|
|
|
@ -20,9 +20,6 @@ prepare_site_task_timeout = 120
|
|||
service_type = physicalprovisioner
|
||||
verify_site_query_interval = 10
|
||||
verify_site_task_timeout = 60
|
||||
[healthcheck]
|
||||
endpoint = /api/v1.0/health
|
||||
schema = http
|
||||
[keystone_authtoken]
|
||||
auth_section = keystone_authtoken
|
||||
auth_type = password
|
||||
|
@ -39,4 +36,4 @@ project_name = service
|
|||
user_domain_name = default
|
||||
username = shipyard
|
||||
[shipyard]
|
||||
service_type = shipyard
|
||||
service_type = shipyard
|
||||
|
|
Loading…
Reference in New Issue