Merge "plugins: Add DAG and operator to test site"

This commit is contained in:
Zuul 2018-10-05 15:46:40 +00:00 committed by Gerrit Code Review
commit 2776423da1
5 changed files with 264 additions and 0 deletions

View File

@ -19,6 +19,7 @@ from airflow.operators.subdag_operator import SubDagOperator
try:
# Operators are loaded from being registered to airflow.operators
# in a deployed fashion
from airflow.operators import ArmadaTestReleasesOperator
from airflow.operators import ConcurrencyCheckOperator
from airflow.operators import DeckhandRetrieveRenderedDocOperator
from airflow.operators import DeploymentConfigurationOperator
@ -27,6 +28,8 @@ try:
from airflow.operators import DrydockRelabelNodesOperator
except ImportError:
# for local testing, they are loaded from their source directory
from shipyard_airflow.plugins.armada_test_releases import \
ArmadaTestReleasesOperator
from shipyard_airflow.plugins.concurrency_check_operator import \
ConcurrencyCheckOperator
from shipyard_airflow.plugins.deckhand_retrieve_rendered_doc import \
@ -219,6 +222,19 @@ class CommonStepFactory(object):
on_failure_callback=step_failure_handler,
dag=self.dag)
def get_armada_test_releases(self, task_id=dn.ARMADA_TEST_RELEASES):
"""Generate the armada_test_releases step
Armada invokes Helm tests for all deployed releases or a targeted
release specified by the "release" parameter.
"""
return ArmadaTestReleasesOperator(
shipyard_conf=config_path,
main_dag_name=self.parent_dag_name,
task_id=task_id,
on_failure_callback=step_failure_handler,
dag=self.dag)
def get_unguarded_destroy_servers(self, task_id=dn.DESTROY_SERVER):
"""Generates an unguarded destroy server step.

View File

@ -22,6 +22,7 @@ RELABEL_NODES_DAG_NAME = 'relabel_nodes'
# Steps
ACTION_XCOM = 'action_xcom'
ARMADA_TEST_RELEASES = 'armada_test_releases'
CONCURRENCY_CHECK = 'dag_concurrency_check'
CREATE_ACTION_TAG = 'create_action_tag'
DECIDE_AIRFLOW_UPGRADE = 'decide_airflow_upgrade'

View File

@ -0,0 +1,58 @@
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import timedelta
import airflow
from airflow import DAG
try:
from common_step_factory import CommonStepFactory
except ImportError:
from shipyard_airflow.dags.common_step_factory import CommonStepFactory
"""test site"""
PARENT_DAG_NAME = 'test_site'
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(1),
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'provide_context': True,
'retries': 0,
'retry_delay': timedelta(seconds=30),
}
dag = DAG(PARENT_DAG_NAME, default_args=default_args, schedule_interval=None)
step_factory = CommonStepFactory(parent_dag_name=PARENT_DAG_NAME,
dag=dag,
default_args=default_args,
action_type='site')
action_xcom = step_factory.get_action_xcom()
preflight = step_factory.get_preflight()
deployment_configuration = step_factory.get_deployment_configuration()
test_releases = step_factory.get_armada_test_releases()
# DAG Wiring
preflight.set_upstream(action_xcom)
deployment_configuration.set_upstream(action_xcom)
test_releases.set_upstream([
deployment_configuration,
preflight
])

View File

@ -0,0 +1,81 @@
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from airflow.exceptions import AirflowException
from airflow.plugins_manager import AirflowPlugin
try:
from armada_base_operator import ArmadaBaseOperator
except ImportError:
from shipyard_airflow.plugins.armada_base_operator import \
ArmadaBaseOperator
from armada.exceptions import api_exceptions as errors
LOG = logging.getLogger(__name__)
class ArmadaTestReleasesOperator(ArmadaBaseOperator):
"""Armada Test Releases Operator
Invoke the Helm test of every deployed release or a targeted release
specified by the "release" parameter.
"""
def do_execute(self):
# Retrieve cleanup flag from action params
cleanup = self.action_params.get('cleanup')
if cleanup:
self.query['cleanup'] = cleanup
release = self.action_params.get('release')
if release:
# Invoke Helm tests for specified release
self._test_release(release)
else:
# Invoke Helm tests for all deployed releases
# TODO(@drewwalters96): Support execution of tests in parallel.
for release_list in self.get_releases().values():
for release in release_list:
self._test_release(release)
def _test_release(self, release):
"""Invoke Helm tests on a specified release
Invokes Helm tests on a specified release using the Armada client
and logs all test results.
"""
LOG.info("Invoking Helm tests for release '{}'".format(release))
try:
armada_test_release = self.armada_client.get_test_release(
release=release,
query=self.query,
timeout=None)
except errors.ClientError as client_error:
raise AirflowException(client_error)
if armada_test_release:
LOG.info("Successfully executed Helm tests for release "
"'{}'".format(release))
LOG.info(armada_test_release)
else:
# Dump logs from Armada API pods
self.get_k8s_logs()
raise AirflowException("Failed to execute Helms test for "
"release '{}'!".format(release))
class ArmadaTestReleasesOperatorPlugin(AirflowPlugin):
"""Creates ArmadaTestReleasesOperator in Airflow."""
name = 'armada_test_releases_operator'
operators = [ArmadaTestReleasesOperator]

View File

@ -0,0 +1,108 @@
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests ArmadaTestReleasesOperator functionality"""
import os
from unittest import mock
from airflow.exceptions import AirflowException
import pytest
from shipyard_airflow.plugins.armada_base_operator import \
ArmadaBaseOperator
from shipyard_airflow.plugins.armada_test_releases import \
ArmadaTestReleasesOperator
from shipyard_airflow.plugins.ucp_base_operator import \
UcpBaseOperator
CONF_FILE = os.path.join(os.path.dirname(__file__), 'test.conf')
ACTION_PARAMS = {
'cleanup': True,
'release': 'glance'
}
RELEASES = {
'ucp': ['armada', 'deckhand', 'shipyard'],
'openstack': ['glance', 'heat', 'horizon', 'keystone']
}
class TestArmadaTestReleasesOperator:
@mock.patch('shipyard_airflow.plugins.armada_test_releases.LOG.info')
@mock.patch.object(ArmadaBaseOperator, 'armada_client', create=True)
@mock.patch.object(ArmadaBaseOperator, 'get_releases',
return_value=RELEASES)
@mock.patch.object(ArmadaBaseOperator, 'get_tiller_info')
def test_do_execute(self, mock_tiller_info, mock_releases, mock_client,
mock_logs):
op = ArmadaTestReleasesOperator(main_dag_name='main',
shipyard_conf=CONF_FILE,
task_id='t1')
op.action_params = dict()
op.do_execute()
# Verify Armada client called to test every release
calls = list()
for release_list in RELEASES.values():
for release in release_list:
calls.append(mock.call(
release=release,
query=dict(),
timeout=None))
mock_client.get_test_release.assert_has_calls(calls, any_order=True)
# Verify test results logged
mock_logs.assert_called_with(mock_client.get_test_release.return_value)
@mock.patch('shipyard_airflow.plugins.armada_test_releases.LOG.info')
@mock.patch.object(ArmadaBaseOperator, 'armada_client', create=True)
@mock.patch.object(ArmadaBaseOperator, 'get_tiller_info')
def test_do_execute_with_params(self, mock_tiller, mock_client, mock_logs):
op = ArmadaTestReleasesOperator(main_dag_name='main',
shipyard_conf=CONF_FILE,
task_id='t1')
op.action_params = ACTION_PARAMS
op.do_execute()
# Verify Armada client called for single release with action params
cleanup = ACTION_PARAMS['cleanup']
release = ACTION_PARAMS['release']
mock_client.get_test_release.assert_called_once_with(
release=release,
query=dict(cleanup=cleanup),
timeout=None)
# Verify test results logged
mock_logs.assert_called_with(mock_client.get_test_release.return_value)
@mock.patch.object(ArmadaBaseOperator, 'armada_client', create=True)
@mock.patch.object(ArmadaBaseOperator, 'get_releases',
return_value=RELEASES)
@mock.patch.object(ArmadaBaseOperator, 'get_tiller_info')
@mock.patch.object(UcpBaseOperator, 'get_k8s_logs')
def test_do_execute_fail(self, mock_k8s_logs, mock_tiller_info,
mock_releases, mock_client):
mock_client.get_test_release.return_value = None
op = ArmadaTestReleasesOperator(main_dag_name='main',
shipyard_conf=CONF_FILE,
task_id='t1')
op.action_params = dict()
# Verify errors logged to pods
with pytest.raises(AirflowException):
op.do_execute()
mock_k8s_logs.assert_called_once()