Add notes support for Builddata output

Enhances the workflow to include adding notes that contain the builddata
information associated with the Drydock steps. Part of adding this
support includes adding general notes support to all of the operators
that inherit from the UcpBaseOperator

Storyboard References:
Story: 2002797
Story: 2002796

Change-Id: I5e1a54d6373c4a523e2d4fe87796da4358f22055
This commit is contained in:
Bryan Strassner 2018-10-10 14:11:02 -05:00
parent 5a9abc73dd
commit f5774206e5
6 changed files with 261 additions and 16 deletions

View File

@ -389,6 +389,11 @@ conf:
shipyard:
base:
web_server:
pool_size: 15
pool_pre_ping: true
pool_timeout: 30
pool_overflow: 10
connection_recycle: -1
profiler: false
shipyard:
service_type: shipyard
@ -413,6 +418,8 @@ conf:
deckhand_client_read_timeout: 300
validation_connect_timeout: 5
validation_read_timeout: 300
notes_connect_timeout: 5
notes_read_timeout: 10
airflow:
worker_endpoint_scheme: 'http'
worker_port: 8793

View File

@ -419,6 +419,7 @@ class DrydockBaseOperator(UcpBaseOperator):
)
pass
_report_task_info(task_id, task_result, task_status)
self._create_drydock_results_notes(task_id, task_result)
# for each child, report only the step info, do not add to overall
# success list.
@ -436,6 +437,97 @@ class DrydockBaseOperator(UcpBaseOperator):
# deduplicate and return
return set(success_nodes)
def _create_drydock_results_notes(self, dd_task_id, task_result):
"""Generate a note in the database with a url to the builddata
:param dd_task_id: the id of the Drydock task. Note that `self.task_id`
is the workflow task_id, not the same drydock task_id.
:param task_result: the task result object containing the info needed
to produce a note.
Example task result:
{
'status': 'success',
'kind': 'Status',
'failures': [],
'apiVersion': 'v1.0',
'metadata': {},
'details': {
'errorCount': 0,
'messageList': [{
'error': False,
'context': 'n2',
'context_type': 'node',
'extra': '{}',
'ts': '2018-10-12 16:09:53.778696',
'message': 'Acquiring node n2 for deployment'
}]
},
'successes': ['n2'],
'links': [{
'rel': 'detail_logs',
'href': 'http://drydock-api.ucp.svc.cluster.local:9000/api/...'
}],
'reason': None,
'message': None
}
"""
for msg in task_result.get('details', {}).get('messageList', []):
try:
if msg.get('message'):
error = msg.get('error', False)
msg_text = "{}:{}:{}{}".format(
msg.get('context_type', 'N/A'),
msg.get('context', 'N/A'),
msg.get('message'),
" (error)" if error else "")
self.notes_helper.make_step_note(
action_id=self.action_id,
step_id=self.task_id,
note_val=msg_text,
subject=dd_task_id,
sub_type="Task Message",
note_timestamp=msg.get('ts'),
verbosity=3)
except Exception as ex:
LOG.warn("Error while creating a task result note, "
"processing continues. Source info %s", msg)
LOG.exception(ex)
links = task_result.get('links', [])
for link in links:
try:
rel = link.get('rel')
href = link.get('href')
extra = _get_context_info_from_url(href)
if rel and href:
self.notes_helper.make_step_note(
action_id=self.action_id,
step_id=self.task_id,
note_val="{}{}".format(rel, extra),
subject=dd_task_id,
sub_type="Linked Task Info",
link_url=href,
is_auth_link=True,
verbosity=5)
except Exception as ex:
LOG.warn("Error while creating a link-based note, "
"processing continues. Source info: %s", link)
LOG.exception(ex)
def _get_context_info_from_url(url_string):
"""Examine a url for helpful info for use in a note
:param url_string: The url to examine
:returns: String of helpful information
Strings returned should include a leading space.
"""
if url_string.endswith("/builddata"):
return " - builddata"
# Other "helpful" patterns would show up here.
return ""
def gen_node_name_filter(node_names):
"""Generates a drydock compatible node filter using only node names

View File

@ -78,22 +78,60 @@ class DrydockNodesOperator(DrydockBaseOperator):
# All groups "complete" (as they're going to be). Report summary
dgm.report_group_summary()
dgm.report_node_summary()
self._gen_summary_notes(dgm)
if dgm.critical_groups_failed():
raise AirflowException(
"One or more deployment groups marked as critical have failed"
)
else:
LOG.info("All critical groups have met their success criteria")
# TODO (bryan-strassner) it is very possible that many nodes failed
# deployment, but all critical groups had enough success to
# continue processing. This will be non-obvious to the casual
# observer of the workflow. A likely enhancement is to allow
# notes be added to the shipyard action associated with this
# workflow that would be reported back to the end user doing a
# describe of the action. This will require new database structures
# to hold the notes, and a means to insert the notes. A shared
# functionality in the base ucp operator or a common module would
# be a reasonable way to support this.
def _gen_summary_notes(self, dgm):
"""Generate notes for the step summarizing the deployment results
:param dgm: The deployment group manager containing results
"""
# Assemble the nodes into a note
stages = [Stage.NOT_STARTED, Stage.DEPLOYED, Stage.FAILED]
nodes_by_stage = []
for stage in stages:
nodes = dgm.get_nodes(stage=stage)
if nodes:
nodes_by_stage.append("{}: {}".format(
stage, ", ".join(nodes)))
if nodes_by_stage:
self.notes_helper.make_step_note(
action_id=self.action_id,
step_id=self.task_id,
note_val="; ".join(nodes_by_stage),
subject=self.main_dag_name,
sub_type="Node Deployment",
verbosity=1)
# assemble the group info into a note
# rotate list into a dict by stage
groups_stages = {}
for group in dgm.group_list():
if group.stage not in groups_stages:
groups_stages[group.stage] = []
groups_stages[group.stage].append("{}{}".format(
group.name, "(critical)" if group.critical else ""))
# iterate stage keyed dictionary for text summary
groups_by_stage = [
"{}: {}".format(stage, ", ".join(group_list))
for stage, group_list in groups_stages.items()
]
if groups_by_stage:
self.notes_helper.make_step_note(
action_id=self.action_id,
step_id=self.task_id,
note_val="; ".join(groups_by_stage),
subject=self.main_dag_name,
sub_type="Deployment Groups",
verbosity=1)
def _setup_configured_values(self):
"""Sets self.<name> values from the deployment configuration"""

View File

@ -21,12 +21,14 @@ from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
import sqlalchemy
try:
from deckhand_client_factory import DeckhandClientFactory
import service_endpoint
from get_k8s_logs import get_pod_logs
from get_k8s_logs import K8sLoggingException
from service_token import shipyard_service_token
from xcom_puller import XcomPuller
except ImportError:
from shipyard_airflow.plugins.deckhand_client_factory import \
@ -34,10 +36,20 @@ except ImportError:
from shipyard_airflow.plugins import service_endpoint
from shipyard_airflow.plugins.get_k8s_logs import get_pod_logs
from shipyard_airflow.plugins.get_k8s_logs import K8sLoggingException
from shipyard_airflow.plugins.service_token import shipyard_service_token
from shipyard_airflow.plugins.xcom_puller import XcomPuller
from shipyard_airflow.common.document_validators.document_validation_utils \
import DocumentValidationUtils
from shipyard_airflow.common.notes.notes import NotesManager
from shipyard_airflow.common.notes.notes_helper import NotesHelper
from shipyard_airflow.common.notes.storage_impl_db import \
ShipyardSQLNotesStorage
# Configuration sections
BASE = 'base'
K8S_LOGS = 'k8s_logs'
REQUESTS_CONFIG = 'requests_config'
LOG = logging.getLogger(__name__)
@ -89,16 +101,22 @@ class UcpBaseOperator(BaseOperator):
self.shipyard_conf = shipyard_conf
self.start_time = datetime.now()
self.xcom_push_flag = xcom_push
# lazy init field to hold a shipyard_db_engine
self._shipyard_db_engine = None
def execute(self, context):
# Setup values that depend on the shipyard configuration
self.doc_utils = _get_document_util(self.shipyard_conf)
self.endpoints = service_endpoint.ServiceEndpoints(self.shipyard_conf)
# Read and parse shiyard.conf
self.config = configparser.ConfigParser()
self.config.read(self.shipyard_conf)
# Execute Airship base function
self.ucp_base(context)
# Execute base function
# Execute base function for child operator
self.run_base(context)
if self.continue_processing:
@ -119,12 +137,13 @@ class UcpBaseOperator(BaseOperator):
LOG.info("Running Airship Base Operator...")
# Read and parse shiyard.conf
config = configparser.ConfigParser()
config.read(self.shipyard_conf)
# Configure the notes helper for this run of an operator
# establishes self.notes_helper
self._setup_notes_helper()
# Initialize variable
self.ucp_namespace = config.get('k8s_logs', 'ucp_namespace')
# Initialize variable that indicates the kubernetes namespace for the
# Airship components
self.ucp_namespace = self.config.get(K8S_LOGS, 'ucp_namespace')
# Define task_instance
self.task_instance = context['task_instance']
@ -137,6 +156,8 @@ class UcpBaseOperator(BaseOperator):
# Set up other common-use values
self.action_id = self.action_info['id']
# extract the `task` or `step` name for easy access
self.task_id = self.task_instance.task_id
self.revision_id = self.action_info['committed_rev_id']
self.action_params = self.action_info.get('parameters', {})
self.design_ref = self._deckhand_design_ref()
@ -250,6 +271,58 @@ class UcpBaseOperator(BaseOperator):
# broken. Raise an Airflow Exception.
raise AirflowException(ex)
def _get_shipyard_db_engine(self):
"""Lazy initialize an engine for the Shipyard database.
:returns: a SQLAlchemy engine for the Shipyard database.
Developer's Note: Initially the idea was to use the PostgresHook and
retrieve an engine from there as is done with the concurrency check,
but since we have easy access to a configuration file, this does
direct SQLAlchemy to get the engine. By using the config, the database
connection is not exposed as environment variables -- which is one way
that Airflow registers database connections for use by the dbApiHook
"""
if self._shipyard_db_engine is None:
connection_string = self.config.get(BASE, 'postgresql_db')
pool_size = self.config.getint(BASE, 'pool_size')
max_overflow = self.config.getint(BASE, 'pool_overflow')
pool_pre_ping = self.config.getboolean(BASE, 'pool_pre_ping')
pool_recycle = self.config.getint(BASE, 'connection_recycle')
pool_timeout = self.config.getint(BASE, 'pool_timeout')
self._shipyard_db_engine = sqlalchemy.create_engine(
connection_string, pool_size=pool_size,
max_overflow=max_overflow,
pool_pre_ping=pool_pre_ping,
pool_recycle=pool_recycle,
pool_timeout=pool_timeout
)
LOG.info("Initialized Shipyard database connection with pool "
"size: %d, max overflow: %d, pool pre ping: %s, pool "
"recycle: %d, and pool timeout: %d",
pool_size, max_overflow,
pool_pre_ping, pool_recycle,
pool_timeout)
return self._shipyard_db_engine
@shipyard_service_token
def _token_getter(self):
# Generator method to get a shipyard service token
return self.svc_token
def _setup_notes_helper(self):
"""Setup a notes helper for use by all descendent operators"""
connect_timeout = self.config.get(REQUESTS_CONFIG,
'notes_connect_timeout')
read_timeout = self.config.get(REQUESTS_CONFIG, 'notes_read_timeout')
self.notes_helper = NotesHelper(
NotesManager(
storage=ShipyardSQLNotesStorage(self._get_shipyard_db_engine),
get_token=self._token_getter,
connect_timeout=connect_timeout,
read_timeout=read_timeout))
def _get_document_util(shipyard_conf):
"""Retrieve an instance of the DocumentValidationUtils"""

View File

@ -1,3 +1,16 @@
[base]
postgresl_db = postgresql+psycopg2://shipyard:changeme@postgresql.ucp:5432/shipyard
pool_size = 15
pool_pre_ping = true
pool_timeout = 30
pool_overflow = 10
connection_recycle = -1
profiler = false
[requests_config]
notes_connect_timeout = 5
notes_read_timeout = 10
[keystone_authtoken]
auth_section = keystone_authtoken
auth_type = password

View File

@ -29,6 +29,9 @@ from shipyard_airflow.common.deployment_group.deployment_group import (
from shipyard_airflow.common.deployment_group.deployment_group_manager import (
DeploymentGroupManager
)
from shipyard_airflow.common.notes.notes import NotesManager
from shipyard_airflow.common.notes.notes_helper import NotesHelper
from shipyard_airflow.common.notes.storage_impl_mem import MemoryNotesStorage
from shipyard_airflow.plugins.drydock_base_operator import (
gen_node_name_filter,
@ -213,6 +216,13 @@ def _gen_pe_func(mode, stand_alone=False):
else:
return _func_self
def get_notes_helper():
"""Setup a notes helper using the in-memory storage module"""
return NotesHelper(NotesManager(
storage=MemoryNotesStorage(),
get_token=lambda: "fake_token")
)
class TestDrydockNodesOperator:
def test_default_deployment_strategy(self):
@ -290,6 +300,10 @@ class TestDrydockNodesOperator:
DeploymentConfigurationOperator.config_keys_defaults
)
op.design_ref = {}
op.notes_helper = get_notes_helper()
op.action_id = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
op.task_id = "prepare_and_deploy_nodes"
op.do_execute()
assert get_dgm.call_count == 1
assert nl.call_count == 1
@ -312,6 +326,10 @@ class TestDrydockNodesOperator:
DeploymentConfigurationOperator.config_keys_defaults
)
op.design_ref = {}
op.notes_helper = get_notes_helper()
op.action_id = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
op.task_id = "prepare_and_deploy_nodes"
op.do_execute()
assert get_dgm.call_count == 1
@ -459,6 +477,10 @@ class TestDrydockNodesOperator:
DeploymentConfigurationOperator.config_keys_defaults
)
op.design_ref = {"a": "b"}
op.notes_helper = get_notes_helper()
op.action_id = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
op.task_id = "prepare_and_deploy_nodes"
op.do_execute()
assert "critical groups have met their success criteria" in caplog.text