diff --git a/etc/shipyard/policy.yaml.sample b/etc/shipyard/policy.yaml.sample index 721635d5..3d924a92 100644 --- a/etc/shipyard/policy.yaml.sample +++ b/etc/shipyard/policy.yaml.sample @@ -42,3 +42,12 @@ # GET /api/v1.0/renderedconfigdocs #"workflow_orchestrator:get_renderedconfigdocs": "rule:admin_required" +# Retrieve the list of workflows (DAGs) that have been invoked in +# Airflow, whether via Shipyard or scheduled +# GET /api/v1.0/workflows +#"workflow_orchestrator:list_workflows": "rule:admin_required" + +# Retrieve the detailed information for a workflow (DAG) from Airflow +# GET /api/v1.0/workflows/{id} +#"workflow_orchestrator:get_workflow": "rule:admin_required" + diff --git a/etc/shipyard/shipyard.conf.sample b/etc/shipyard/shipyard.conf.sample index 62f7b883..e2e5423a 100644 --- a/etc/shipyard/shipyard.conf.sample +++ b/etc/shipyard/shipyard.conf.sample @@ -58,6 +58,30 @@ # (string value) #service_type = physicalprovisioner +# Query interval (in seconds) for verify_site task (integer value) +#verify_site_query_interval = 10 + +# Time out (in seconds) for verify_site task (integer value) +#verify_site_task_timeout = 60 + +# Query interval (in seconds) for prepare_site task (integer value) +#prepare_site_query_interval = 10 + +# Time out (in seconds) for prepare_site task (integer value) +#prepare_site_task_timeout = 120 + +# Query interval (in seconds) for prepare_node task (integer value) +#prepare_node_query_interval = 30 + +# Time out (in seconds) for prepare_node task (integer value) +#prepare_node_task_timeout = 1800 + +# Query interval (in seconds) for deploy_node task (integer value) +#deploy_node_query_interval = 30 + +# Time out (in seconds) for deploy_node task (integer value) +#deploy_node_task_timeout = 3600 + [healthcheck] diff --git a/shipyard_airflow/conf/config.py b/shipyard_airflow/conf/config.py index b78bb425..66bcc9cc 100644 --- a/shipyard_airflow/conf/config.py +++ b/shipyard_airflow/conf/config.py @@ -29,7 +29,7 @@ SECTIONS = [ options=[ cfg.StrOpt( 'web_server', - default='http://localhost:32080', + default='http://localhost:32080/', help='The web server for Airflow' ), cfg.StrOpt( diff --git a/shipyard_airflow/control/action/actions_api.py b/shipyard_airflow/control/action/actions_api.py index f9cefd35..d25a793e 100644 --- a/shipyard_airflow/control/action/actions_api.py +++ b/shipyard_airflow/control/action/actions_api.py @@ -100,7 +100,6 @@ class ActionsResource(BaseResource): # populate action parameters if they are not set if 'parameters' not in action: action['parameters'] = {} - # validate if there is any validation to do validator = SUPPORTED_ACTION_MAPPINGS.get(action['name'])['validator'] if validator is not None: @@ -284,13 +283,13 @@ class ActionsResource(BaseResource): raise ApiError( title='Unable to determine if workflow has started', description=( - 'Airflow has not responded with parseable output. ', + 'Airflow has not responded with parseable output. ' 'Shipyard is unable to determine run timestamp'), status=falcon.HTTP_500, error_list=[{ 'message': log_string }], - retry=True, + retry=True ) else: # everything before the ': ' should be a date/time @@ -300,9 +299,9 @@ class ActionsResource(BaseResource): except ValueError as valerr: raise ApiError( title='Unable to determine if workflow has started', - description=( - 'Airflow has not responded with parseable output. ', - 'Shipyard is unable to determine run timestamp'), + description=('Airflow has not responded with parseable ' + 'output. Shipyard is unable to determine run ' + 'timestamp'), status=falcon.HTTP_500, error_list=[{ 'message': 'value {} has caused {}'.format(date_split, diff --git a/tests/unit/control/test_actions_api.py b/tests/unit/control/test_actions_api.py index a9dd1759..a8f6abda 100644 --- a/tests/unit/control/test_actions_api.py +++ b/tests/unit/control/test_actions_api.py @@ -11,19 +11,60 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging + import json import os from datetime import datetime +import mock +from mock import patch +from oslo_config import cfg +import pytest +import falcon +from falcon import testing +import responses + +from shipyard_airflow.control.action import actions_api from shipyard_airflow.control.action.actions_api import ActionsResource from shipyard_airflow.control.base import ShipyardRequestContext from shipyard_airflow.errors import ApiError +from shipyard_airflow.policy import ShipyardPolicy DATE_ONE = datetime(2017, 9, 13, 11, 13, 3, 57000) DATE_TWO = datetime(2017, 9, 13, 11, 13, 5, 57000) DATE_ONE_STR = DATE_ONE.strftime('%Y-%m-%dT%H:%M:%S') DATE_TWO_STR = DATE_TWO.strftime('%Y-%m-%dT%H:%M:%S') +CONF = cfg.CONF +LOG = logging.getLogger(__name__) + + +def create_req(ctx, body): + '''creates a falcon request''' + env = testing.create_environ( + path='/', + query_string='', + protocol='HTTP/1.1', + scheme='http', + host='falconframework.org', + port=None, + headers={'Content-Type': 'application/json'}, + app='', + body=body, + method='POST', + wsgierrors=None, + file_wrapper=None) + req = falcon.Request(env) + req.context = ctx + return req + + +def create_resp(): + '''creates a falcon response''' + resp = falcon.Response() + return resp + def actions_db(): """ @@ -150,7 +191,6 @@ def airflow_stub(**kwargs): """ assert kwargs['dag_id'] assert kwargs['action'] - print(kwargs) return '2017-09-06 14:10:08.528402' @@ -168,9 +208,68 @@ def audit_control_command_db(action_audit): assert action_audit['command'] == 'invoke' +@pytest.fixture(scope='function') +def conf_fixture(request): + def set_override(name, override, group): + CONF = cfg.CONF + CONF.set_override(name, override, group=group) + request.addfinalizer(CONF.clear_override(name, group=group)) + + return set_override + + context = ShipyardRequestContext() +@mock.patch.object(ShipyardPolicy, 'authorize', return_value=True) +@mock.patch.object( + ActionsResource, + 'get_all_actions', + return_value={'id': 'test_id', + 'name': 'test_name'}) +def test_on_get(mock_get_all_actions, mock_authorize): + act_resource = ActionsResource() + context.policy_engine = ShipyardPolicy() + req = create_req(context, None) + resp = create_resp() + act_resource.on_get(req, resp) + mock_authorize.assert_called_once_with( + 'workflow_orchestrator:list_actions', context) + mock_get_all_actions.assert_called_once() + assert resp.body is not None + assert resp.status == '200 OK' + + +@mock.patch.object(ShipyardPolicy, 'authorize', return_value=True) +@mock.patch.object( + ActionsResource, + 'create_action', + return_value={'id': 'test_id', + 'name': 'test_name'}) +@patch('logging.Logger.info') +def test_on_post(mock_info, mock_create_action, mock_authorize): + act_resource = ActionsResource() + context.policy_engine = ShipyardPolicy() + json_body = json.dumps({ + 'user': "test_user", + 'req_id': "test_req_id", + 'external_ctx': "test_ext_ctx", + 'name': "test_name" + }).encode('utf-8') + req = create_req(context, json_body) + resp = create_resp() + act_resource.on_post(req, resp) + mock_authorize.assert_called_once_with( + 'workflow_orchestrator:create_action', context) + mock_create_action.assert_called_once_with( + action=json.loads(json_body.decode('utf-8')), context=context) + mock_info.assert_called_with("Id %s generated for action %s", 'test_id', + 'test_name') + assert resp.status == '201 Created' + assert resp.body is not None + assert '/api/v1.0/actions/' in resp.location + + def test_get_all_actions(): """ Tests the main response from get all actions @@ -182,7 +281,6 @@ def test_get_all_actions(): os.environ['DB_CONN_AIRFLOW'] = 'nothing' os.environ['DB_CONN_SHIPYARD'] = 'nothing' result = action_resource.get_all_actions() - print(result) assert len(result) == len(actions_db()) for action in result: if action['name'] == 'dag_it': @@ -205,9 +303,11 @@ def test_create_action(): # with invalid input. fail. try: action = action_resource.create_action( - action={'name': 'broken', 'parameters': {'a': 'aaa'}}, - context=context - ) + action={'name': 'broken', + 'parameters': { + 'a': 'aaa' + }}, + context=context) assert False, 'Should throw an ApiError' except ApiError: # expected @@ -216,9 +316,11 @@ def test_create_action(): # with valid input and some parameters try: action = action_resource.create_action( - action={'name': 'deploy_site', 'parameters': {'a': 'aaa'}}, - context=context - ) + action={'name': 'deploy_site', + 'parameters': { + 'a': 'aaa' + }}, + context=context) assert action['timestamp'] assert action['id'] assert len(action['id']) == 26 @@ -226,14 +328,11 @@ def test_create_action(): assert action['dag_status'] == 'SCHEDULED' except ApiError: assert False, 'Should not raise an ApiError' - print(json.dumps(action, default=str)) # with valid input and no parameters try: action = action_resource.create_action( - action={'name': 'deploy_site'}, - context=context - ) + action={'name': 'deploy_site'}, context=context) assert action['timestamp'] assert action['id'] assert len(action['id']) == 26 @@ -241,4 +340,133 @@ def test_create_action(): assert action['dag_status'] == 'SCHEDULED' except ApiError: assert False, 'Should not raise an ApiError' - print(json.dumps(action, default=str)) + + +@patch('shipyard_airflow.db.shipyard_db.ShipyardDbAccess.' + 'get_all_submitted_actions') +def test_get_all_actions_db(mock_get_all_submitted_actions): + act_resource = ActionsResource() + act_resource.get_all_actions_db() + mock_get_all_submitted_actions.assert_called() + + +@patch('shipyard_airflow.db.airflow_db.AirflowDbAccess.get_all_dag_runs') +def test_get_all_dag_runs_db(mock_get_all_dag_runs): + act_resource = ActionsResource() + act_resource.get_all_dag_runs_db() + mock_get_all_dag_runs.assert_called() + + +@patch('shipyard_airflow.db.airflow_db.AirflowDbAccess.get_all_tasks') +def test_get_all_tasks_db(mock_get_all_tasks): + act_resource = ActionsResource() + act_resource.get_all_tasks_db() + mock_get_all_tasks.assert_called() + + +@patch('shipyard_airflow.db.shipyard_db.ShipyardDbAccess.insert_action') +def test_insert_action(mock_insert_action): + act_resource = ActionsResource() + action = 'test_action' + act_resource.insert_action(action) + mock_insert_action.assert_called_with(action) + + +@patch('shipyard_airflow.db.shipyard_db.ShipyardDbAccess.' + 'insert_action_command_audit') +def test_audit_control_command_db(mock_insert_action_audit): + act_resource = ActionsResource() + action_audit = 'test_action_audit' + act_resource.audit_control_command_db(action_audit) + mock_insert_action_audit.assert_called_with(action_audit) + + +@responses.activate +@mock.patch.object( + ActionsResource, + '_exhume_date', + return_value=datetime(2017, 9, 22, 22, 16, 14)) +@patch('logging.Logger.info') +def test_invoke_airflow_dag_success(mock_info, mock_exhume_date): + act_resource = ActionsResource() + dag_id = 'test_dag_id' + action = 'test_action' + CONF = cfg.CONF + web_server_url = CONF.base.web_server + conf_value = {'action': action} + log_string = 'Created