diff --git a/requirements.txt b/requirements.txt index 1657a362..bc67a50a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,10 +1,27 @@ +# Copyright 2017 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +PasteDeploy==1.5.2 +keystonemiddleware==4.17.0 falcon==1.2.0 python-dateutil==2.6.1 requests==2.18.4 -uwsgi===2.0.15 +uwsgi==2.0.15 configparser==3.5.0 python-openstackclient==3.11.0 SQLAlchemy==1.1.13 psycopg2==2.7.3.1 -PasteDeploy==1.5.2 -keystonemiddleware===4.9.1 +oslo.config==4.11.0 +oslo.policy==1.25.1 +keystoneauth1==2.13.0 diff --git a/setup.py b/setup.py index 630bbde7..f6d4d9b8 100644 --- a/setup.py +++ b/setup.py @@ -14,19 +14,25 @@ from setuptools import setup -setup(name='shipyard_airflow', - version='0.1a1', - description='API for managing Airflow-based orchestration', - url='http://github.com/att-comdev/shipyard', - author='Anthony Lin - AT&T', - author_email='al498u@att.com', - license='Apache 2.0', - packages=['shipyard_airflow', - 'shipyard_airflow.control'], - install_requires=[ - 'falcon', - 'requests', - 'configparser', - 'uwsgi>1.4', - 'python-dateutil' - ]) +setup( + name='shipyard_airflow', + version='0.1a1', + description='API for managing Airflow-based orchestration', + url='http://github.com/att-comdev/shipyard', + author='Anthony Lin - AT&T', + author_email='al498u@att.com', + license='Apache 2.0', + packages=['shipyard_airflow', 'shipyard_airflow.control'], + entry_points={ + "oslo.policy.policies": + ["shipyard = shipyard.common.policies:list_rules"], + "oslo.config.opts": ["shipyard = shipyard.conf.opts:list_opts"] + }, + install_requires=[ + 'falcon', + 'requests', + 'configparser', + 'uwsgi>1.4', + 'python-dateutil', + 'oslo.config', + ]) diff --git a/shipyard_airflow/config.py b/shipyard_airflow/config.py new file mode 100644 index 00000000..820027dc --- /dev/null +++ b/shipyard_airflow/config.py @@ -0,0 +1,202 @@ +# Copyright 2017 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Single point of entry to generate the sample configuration file. +This module collects all the necessary info from the other modules in this +package. It is assumed that: +* Every other module in this package has a 'list_opts' function which + returns a dict where: + * The keys are strings which are the group names. + * The value of each key is a list of config options for that group. +* The conf package doesn't have further packages with config options. +* This module is only used in the context of sample file generation. +""" +import importlib +import os +import pkgutil + +from oslo_config import cfg +import keystoneauth1.loading as loading + +IGNORED_MODULES = ('shipyard', 'config') + +if (os.path.exists('etc/shipyard/shipyard.conf')): + cfg.CONF(['--config-file', 'etc/shipyard/shipyard.conf']) + +class ShipyardConfig(object): + """ + Initialize all the core options + """ + # Default options + options = [ + cfg.IntOpt( + 'poll_interval', + default=10, + help=[ + '''Polling interval in seconds for checking subtask or + downstream status''' + ]), + ] + + # Logging options + logging_options = [ + cfg.StrOpt( + 'log_level', default='INFO', help='Global log level for Shipyard'), + cfg.StrOpt( + 'global_logger_name', + default='shipyard', + help='Logger name for the top-level logger'), + ] + + # Enabled plugins + plugin_options = [ + cfg.MultiStrOpt( + 'ingester', + default=['shipyard_airflow.ingester.plugins.yaml.YamlIngester'], + help='Module path string of a input ingester to enable'), + cfg.MultiStrOpt( + 'oob_driver', + default=[ + 'shipyard_airflow.drivers.oob.pyghmi_driver.PyghmiDriver' + ], + help='Module path string of a OOB driver to enable'), + cfg.StrOpt( + 'node_driver', + default=[ + '''shipyard_airflow.drivers.node.maasdriver.driver + .MaasNodeDriver''' + ], + help='Module path string of the Node driver to enable'), + # TODO Network driver not yet implemented + cfg.StrOpt( + 'network_driver', + default=None, + help='Module path string of the Network driver enable'), + ] + + # Timeouts for various tasks specified in minutes + timeout_options = [ + cfg.IntOpt( + 'shipyard_timeout', + default=5, + help='Fallback timeout when a specific one is not configured'), + cfg.IntOpt( + 'create_network_template', + default=2, + help='Timeout in minutes for creating site network templates'), + cfg.IntOpt( + 'configure_user_credentials', + default=2, + help='Timeout in minutes for creating user credentials'), + cfg.IntOpt( + 'identify_node', + default=10, + help='Timeout in minutes for initial node identification'), + cfg.IntOpt( + 'configure_hardware', + default=30, + help=[ + '''Timeout in minutes for node commissioning and + hardware configuration''' + ]), + cfg.IntOpt( + 'apply_node_networking', + default=5, + help='Timeout in minutes for configuring node networking'), + cfg.IntOpt( + 'apply_node_platform', + default=5, + help='Timeout in minutes for configuring node platform'), + cfg.IntOpt( + 'deploy_node', + default=45, + help='Timeout in minutes for deploying a node'), + ] + + def __init__(self): + self.conf = cfg.CONF + + def register_options(self): + self.conf.register_opts(ShipyardConfig.options) + self.conf.register_opts( + ShipyardConfig.logging_options, group='logging') + self.conf.register_opts(ShipyardConfig.plugin_options, group='plugins') + self.conf.register_opts( + ShipyardConfig.timeout_options, group='timeouts') + self.conf.register_opts( + loading.get_auth_plugin_conf_options('password'), + group='keystone_authtoken') + + +config_mgr = ShipyardConfig() + + +def list_opts(): + opts = { + 'DEFAULT': ShipyardConfig.options, + 'logging': ShipyardConfig.logging_options, + 'plugins': ShipyardConfig.plugin_options, + 'timeouts': ShipyardConfig.timeout_options + } + + package_path = os.path.dirname(os.path.abspath(__file__)) + parent_module = ".".join(__name__.split('.')[:-1]) + module_names = _list_module_names(package_path, parent_module) + imported_modules = _import_modules(module_names) + _append_config_options(imported_modules, opts) + # Assume we'll use the password plugin, + # so include those options in the configuration template + opts['keystone_authtoken'] = loading.get_auth_plugin_conf_options( + 'password') + return _tupleize(opts) + + +def _tupleize(d): + """Convert a dict of options to the 2-tuple format.""" + return [(key, value) for key, value in d.items()] + + +def _list_module_names(pkg_path, parent_module): + module_names = [] + for _, module_name, ispkg in pkgutil.iter_modules(path=[pkg_path]): + if module_name in IGNORED_MODULES: + # Skip this module. + continue + elif ispkg: + module_names.extend( + _list_module_names(pkg_path + "/" + module_name, + parent_module + "." + module_name)) + else: + module_names.append(parent_module + "." + module_name) + return module_names + + +def _import_modules(module_names): + imported_modules = [] + for module_name in module_names: + module = importlib.import_module(module_name) + if hasattr(module, 'list_opts'): + print("Pulling options from module %s" % module.__name__) + imported_modules.append(module) + return imported_modules + + +def _append_config_options(imported_modules, config_options): + for module in imported_modules: + configs = module.list_opts() + for key, val in configs.items(): + if key not in config_options: + config_options[key] = val + else: + config_options[key].extend(val) diff --git a/shipyard_airflow/control/airflow_connections.py b/shipyard_airflow/control/airflow_connections.py deleted file mode 100644 index 4dc633d4..00000000 --- a/shipyard_airflow/control/airflow_connections.py +++ /dev/null @@ -1,78 +0,0 @@ -# Copyright 2017 AT&T Intellectual Property. All other rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from urllib.parse import urlunsplit -from falcon import HTTPInvalidParam - -from .base import BaseResource -from shipyard_airflow.airflow_client import AirflowClient - -# We need to be able to add/delete connections so that we can create/delete -# connection endpoints that Airflow needs to connect to -class AirflowAddConnectionResource(BaseResource): - - authorized_roles = ['user'] - - def on_get(self, req, resp, action, conn_id, protocol, host, port): - web_server_url = self.retrieve_config('base', 'web_server') - if action != 'add': - raise HTTPInvalidParam( - 'Invalid Paremeters for Adding Airflow Connection', 'action') - - # Concatenate to form the connection URL - netloc = ''.join([host, ':', port]) - url = (protocol, netloc, '', '', '') - conn_uri = urlunsplit(url) - # Form the request URL towards Airflow - req_url = ('{}/admin/rest_api/api?api=connections&add=true&conn_id' - '={}&conn_uri={}'.format(web_server_url, conn_id, conn_uri)) - - airflow_client = AirflowClient(req_url) - self.on_success(resp, airflow_client.get()) - - -# Delete a particular connection endpoint -class AirflowDeleteConnectionResource(BaseResource): - - authorized_roles = ['user'] - - def on_get(self, req, resp, action, conn_id): - # Retrieve URL - web_server_url = self.retrieve_config('base', 'web_server') - if action != 'delete': - raise HTTPInvalidParam( - 'Invalid Paremeters for Deleting Airflow Connection', 'action') - - # Form the request URL towards Airflow - req_url = ('{}/admin/rest_api/api?api=connections&delete=true&conn_id' - '={}'.format(web_server_url, conn_id)) - airflow_client = AirflowClient(req_url) - self.on_success(resp, airflow_client.get()) - - -# List all current connection endpoints -class AirflowListConnectionsResource(BaseResource): - - authorized_roles = ['user'] - - def on_get(self, req, resp, action): - web_server_url = self.retrieve_config('base', 'web_server') - if action != 'list': - raise HTTPInvalidParam( - 'Invalid Paremeters for listing Airflow Connections', 'action') - - req_url = '{}/admin/rest_api/api?api=connections&list=true'.format( - web_server_url) - - airflow_client = AirflowClient(req_url) - self.on_success(resp, airflow_client.get()) diff --git a/shipyard_airflow/control/airflow_dag_state.py b/shipyard_airflow/control/airflow_dag_state.py deleted file mode 100644 index 6392ee7a..00000000 --- a/shipyard_airflow/control/airflow_dag_state.py +++ /dev/null @@ -1,44 +0,0 @@ -# Copyright 2017 AT&T Intellectual Property. All other rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import falcon -import requests - -from .base import BaseResource - - -class GetDagStateResource(BaseResource): - - authorized_roles = ['user'] - - def on_get(self, req, resp, dag_id, execution_date): - # Retrieve URL - web_server_url = self.retrieve_config('base', 'web_server') - - if 'Error' in web_server_url: - resp.status = falcon.HTTP_500 - raise falcon.HTTPInternalServerError("Internal Server Error", - "Missing Configuration File") - else: - req_url = ('{}/admin/rest_api/api?api=dag_state&dag_id={}' - '&execution_date={}'.format(web_server_url, dag_id, - execution_date)) - response = requests.get(req_url).json() - - if response["output"]["stderr"]: - resp.status = falcon.HTTP_400 - resp.body = response["output"]["stderr"] - return - else: - resp.status = falcon.HTTP_200 - resp.body = response["output"]["stdout"] diff --git a/shipyard_airflow/control/airflow_get_task_status.py b/shipyard_airflow/control/airflow_get_task_status.py deleted file mode 100644 index 1c404242..00000000 --- a/shipyard_airflow/control/airflow_get_task_status.py +++ /dev/null @@ -1,46 +0,0 @@ -# Copyright 2017 AT&T Intellectual Property. All other rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import falcon -import requests - -from .base import BaseResource - - -class GetTaskStatusResource(BaseResource): - - authorized_roles = ['user'] - - def on_get(self, req, resp, dag_id, task_id, execution_date): - # Retrieve URL - web_server_url = self.retrieve_config('base', 'web_server') - - if 'Error' in web_server_url: - resp.status = falcon.HTTP_500 - raise falcon.HTTPInternalServerError("Internal Server Error", - "Missing Configuration File") - else: - - req_url = ( - '{}/admin/rest_api/api?api=task_state&dag_id={}&task_id={}' - '&execution_date={}'.format(web_server_url, dag_id, task_id, - execution_date)) - response = requests.get(req_url).json() - - if response["output"]["stderr"]: - resp.status = falcon.HTTP_400 - resp.body = response["output"]["stderr"] - return - else: - resp.status = falcon.HTTP_200 - resp.body = response["output"]["stdout"] diff --git a/shipyard_airflow/control/airflow_get_version.py b/shipyard_airflow/control/airflow_get_version.py deleted file mode 100644 index 2b111913..00000000 --- a/shipyard_airflow/control/airflow_get_version.py +++ /dev/null @@ -1,44 +0,0 @@ -# Copyright 2017 AT&T Intellectual Property. All other rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import falcon -import requests - -from .base import BaseResource - - -class GetAirflowVersionResource(BaseResource): - - authorized_roles = ['user'] - - def on_get(self, req, resp): - # Retrieve URL - web_server_url = self.retrieve_config('base', 'web_server') - - if 'Error' in web_server_url: - resp.status = falcon.HTTP_500 - raise falcon.HTTPInternalServerError("Internal Server Error", - "Missing Configuration File") - else: - # Get Airflow Version - req_url = '{}/admin/rest_api/api?api=version'.format( - web_server_url) - response = requests.get(req_url).json() - - if response["output"]: - resp.status = falcon.HTTP_200 - resp.body = response["output"] - else: - self.return_error(resp, falcon.HTTP_400, - 'Fail to Retrieve Airflow Version') - return diff --git a/shipyard_airflow/control/airflow_list_dags.py b/shipyard_airflow/control/airflow_list_dags.py deleted file mode 100644 index fee190ba..00000000 --- a/shipyard_airflow/control/airflow_list_dags.py +++ /dev/null @@ -1,44 +0,0 @@ -# Copyright 2017 AT&T Intellectual Property. All other rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import falcon -import requests - -from .base import BaseResource - - -class ListDagsResource(BaseResource): - - authorized_roles = ['user'] - - def on_get(self, req, resp): - # Retrieve URL - web_server_url = self.retrieve_config('base', 'web_server') - - if 'Error' in web_server_url: - resp.status = falcon.HTTP_500 - raise falcon.HTTPInternalServerError("Internal Server Error", - "Missing Configuration File") - else: - # List available dags - req_url = '{}/admin/rest_api/api?api=list_dags'.format( - web_server_url) - response = requests.get(req_url).json() - - if response["output"]["stderr"]: - resp.status = falcon.HTTP_400 - resp.body = response["output"]["stderr"] - return - else: - resp.status = falcon.HTTP_200 - resp.body = response["output"]["stdout"] diff --git a/shipyard_airflow/control/airflow_list_tasks.py b/shipyard_airflow/control/airflow_list_tasks.py deleted file mode 100644 index eb139915..00000000 --- a/shipyard_airflow/control/airflow_list_tasks.py +++ /dev/null @@ -1,44 +0,0 @@ -# Copyright 2017 AT&T Intellectual Property. All other rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import falcon -import requests - -from .base import BaseResource - - -class ListTasksResource(BaseResource): - - authorized_roles = ['user'] - - def on_get(self, req, resp, dag_id): - # Retrieve URL - web_server_url = self.retrieve_config('base', 'web_server') - - if 'Error' in web_server_url: - resp.status = falcon.HTTP_500 - raise falcon.HTTPInternalServerError("Internal Server Error", - "Missing Configuration File") - else: - # Retrieve all tasks belonging to a particular Dag - req_url = '{}/admin/rest_api/api?api=list_tasks&dag_id={}'.format( - web_server_url, dag_id) - response = requests.get(req_url).json() - - if response["output"]["stderr"]: - resp.status = falcon.HTTP_400 - resp.body = response["output"]["stderr"] - return - else: - resp.status = falcon.HTTP_200 - resp.body = response["output"]["stdout"] diff --git a/shipyard_airflow/control/airflow_trigger_dag.py b/shipyard_airflow/control/airflow_trigger_dag.py deleted file mode 100644 index c176f6cc..00000000 --- a/shipyard_airflow/control/airflow_trigger_dag.py +++ /dev/null @@ -1,54 +0,0 @@ -# Copyright 2017 AT&T Intellectual Property. All other rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import falcon -import requests - -from dateutil.parser import parse -from .base import BaseResource - - -class TriggerDagRunResource(BaseResource): - - authorized_roles = ['user'] - - def on_get(self, req, resp, dag_id, conf): - # Retrieve URL - web_server_url = self.retrieve_config('base', 'web_server') - - if 'Error' in web_server_url: - resp.status = falcon.HTTP_500 - raise falcon.HTTPInternalServerError("Internal Server Error", - "Missing Configuration File") - else: - # "conf" - JSON string that gets pickled into the DagRun's - # conf attribute - req_url = ('{}/admin/rest_api/api?api=trigger_dag&dag_id={}' - '&conf={}'.format(web_server_url, dag_id, conf)) - - response = requests.get(req_url).json() - - # Returns error response if API call returns - # response code other than 200 - if response["http_response_code"] != 200: - resp.status = falcon.HTTP_400 - resp.body = response["output"] - return - else: - # Returns 201 if action is created successfully - resp.status = falcon.HTTP_201 - - # Return time of execution so that we can use - # it to query dag/task status - dt = parse(response["response_time"]) - resp.body = dt.strftime('%Y-%m-%dT%H:%M:%S') diff --git a/shipyard_airflow/control/airflow_trigger_dag_poll.py b/shipyard_airflow/control/airflow_trigger_dag_poll.py deleted file mode 100644 index 36765f3f..00000000 --- a/shipyard_airflow/control/airflow_trigger_dag_poll.py +++ /dev/null @@ -1,92 +0,0 @@ -# Copyright 2017 AT&T Intellectual Property. All other rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import falcon -import json -import requests -import time -import logging - -from dateutil.parser import parse -from .base import BaseResource - - -class TriggerDagRunPollResource(BaseResource): - - authorized_roles = ['user'] - - def on_get(self, req, resp, dag_id, run_id): - # Retrieve URL - web_server_url = self.retrieve_config('base', 'web_server') - - if 'Error' in web_server_url: - resp.status = falcon.HTTP_500 - raise falcon.HTTPInternalServerError("Internal Server Error", - "Missing Configuration File") - else: - req_url = ('{}/admin/rest_api/api?api=trigger_dag&dag_id={}' - '&run_id={}'.format(web_server_url, dag_id, run_id)) - response = requests.get(req_url).json() - - if response["http_response_code"] != 200: - resp.status = falcon.HTTP_400 - resp.body = response["output"] - return - else: - resp.status = falcon.HTTP_200 - - logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(levelname)s - %(message)s') - logging.info("Executing '" + dag_id + "' Dag...") - - # Retrieve time of execution so that we - # can use it to query dag/task status - dt = parse(response["response_time"]) - exec_date = dt.strftime('%Y-%m-%dT%H:%M:%S') - - url = ('{}/admin/rest_api/api' - '?api=dag_state&dag_id={}&execution_date={}'.format( - web_server_url, dag_id, exec_date)) - - # Back off for 5 seconds before querying the initial state - time.sleep(5) - dag_state = requests.get(url).json() - - # Remove newline character at the end of the response - dag_state = dag_state["output"]["stdout"].encode( - 'utf8').rstrip() - - while dag_state != 'success': - # Get current state - dag_state = requests.get(url).json() - - # Remove newline character at the end of the response - dag_state = dag_state["output"]["stdout"].encode( - 'utf8').rstrip() - - # Logs output of current dag state - logging.info('Current Dag State: ' + dag_state) - - if dag_state == 'failed': - resp.status = falcon.HTTP_500 - logging.info('Dag Execution Failed') - resp.body = json.dumps({ - 'Error': 'Dag Execution Failed' - }) - return - - # Wait for 20 seconds before doing a new query - time.sleep(20) - - logging.info('Dag Successfully Executed') diff --git a/shipyard_airflow/control/api-paste.ini b/shipyard_airflow/control/api-paste.ini new file mode 100644 index 00000000..bd607ad5 --- /dev/null +++ b/shipyard_airflow/control/api-paste.ini @@ -0,0 +1,25 @@ +# Copyright 2017 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +#PasteDeploy Configuration File +#Used to configure uWSGI middleware pipeline + +[app:shipyard-api] +paste.app_factory = shipyard_airflow.shipyard:paste_start_shipyard + +[pipeline:main] +pipeline = authtoken shipyard-api + +[filter:authtoken] +paste.filter_factory = keystonemiddleware.auth_token:filter_factory diff --git a/shipyard_airflow/control/api.py b/shipyard_airflow/control/api.py index f7577e92..c115685a 100644 --- a/shipyard_airflow/control/api.py +++ b/shipyard_airflow/control/api.py @@ -11,28 +11,15 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import falcon import json +import falcon +from shipyard_airflow.errors import AppError from .regions import RegionsResource, RegionResource from .base import ShipyardRequest, BaseResource -from .tasks import TaskResource -from .dag_runs import DagRunResource -from .airflow_get_task_status import GetTaskStatusResource -from .airflow_list_tasks import ListTasksResource -from .airflow_list_dags import ListDagsResource -from .airflow_dag_state import GetDagStateResource -from .airflow_trigger_dag import TriggerDagRunResource -from .airflow_trigger_dag_poll import TriggerDagRunPollResource -from .airflow_connections import AirflowAddConnectionResource -from .airflow_connections import AirflowDeleteConnectionResource -from .airflow_connections import AirflowListConnectionsResource -from .airflow_get_version import GetAirflowVersionResource from .middleware import AuthMiddleware, ContextMiddleware, LoggingMiddleware -from shipyard_airflow.errors import AppError from .health import HealthResource - def start_api(): middlewares = [ AuthMiddleware(), @@ -49,24 +36,6 @@ def start_api(): # API for managing region data ('/regions', RegionsResource()), ('/regions/{region_id}', RegionResource()), - ('/dags/{dag_id}/tasks/{task_id}', TaskResource()), - ('/dags/{dag_id}/dag_runs', DagRunResource()), - ('/list_dags', ListDagsResource()), - ('/task_state/dags/{dag_id}/tasks/{task_id}/execution_date/' - '{execution_date}', GetTaskStatusResource()), - ('/dag_state/dags/{dag_id}/execution_date/{execution_date}', - GetDagStateResource()), - ('/list_tasks/dags/{dag_id}', ListTasksResource()), - ('/trigger_dag/dags/{dag_id}/conf/{conf}', - TriggerDagRunResource()), - ('/trigger_dag/dags/{dag_id}/run_id/{run_id}/poll', - TriggerDagRunPollResource()), - ('/connections/{action}/conn_id/{conn_id}/protocol/{protocol}' - '/host/{host}/port/{port}', AirflowAddConnectionResource()), - ('/connections/{action}/conn_id/{conn_id}', - AirflowDeleteConnectionResource()), - ('/connections/{action}', AirflowListConnectionsResource()), - ('/airflow/version', GetAirflowVersionResource()), ('/health', HealthResource()), ] diff --git a/shipyard_airflow/control/base.py b/shipyard_airflow/control/base.py index 574695f3..3832af88 100644 --- a/shipyard_airflow/control/base.py +++ b/shipyard_airflow/control/base.py @@ -12,11 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. import falcon -import falcon.request as request import uuid import json import configparser import os +import logging + try: from collections import OrderedDict except ImportError: @@ -30,8 +31,6 @@ from shipyard_airflow.errors import ( class BaseResource(object): - authorized_roles = [] - def on_options(self, req, resp): self_attrs = dir(self) methods = ['GET', 'HEAD', 'POST', 'PUT', 'DELETE', 'PATCH'] @@ -44,16 +43,6 @@ class BaseResource(object): resp.headers['Allow'] = ','.join(allowed_methods) resp.status = falcon.HTTP_200 - # By default, no one is authorized to use a resource - def authorize_roles(self, role_list): - authorized = set(self.authorized_roles) - applied = set(role_list) - - if authorized.isdisjoint(applied): - return False - else: - return True - def to_json(self, body_dict): return json.dumps(body_dict) @@ -99,6 +88,25 @@ class BaseResource(object): else: raise AppError(ERR_UNKNOWN, "Missing Configuration File") + def error(self, ctx, msg): + self.log_error(ctx, logging.ERROR, msg) + + def info(self, ctx, msg): + self.log_error(ctx, logging.INFO, msg) + + def log_error(self, ctx, level, msg): + extra = { + 'user': 'N/A', + 'req_id': 'N/A', + 'external_ctx': 'N/A' + } + + if ctx is not None: + extra = { + 'user': ctx.user, + 'req_id': ctx.request_id, + 'external_ctx': ctx.external_marker, + } class ShipyardRequestContext(object): @@ -108,6 +116,14 @@ class ShipyardRequestContext(object): self.roles = ['anyone'] self.request_id = str(uuid.uuid4()) self.external_marker = None + self.project_id = None + self.user_id = None # User ID (UUID) + self.policy_engine = None + self.user_domain_id = None # Domain owning user + self.project_domain_id = None # Domain owning project + self.is_admin_project = False + self.authenticated = False + self.request_id = str(uuid.uuid4()) def set_log_level(self, level): if level in ['error', 'info', 'debug']: @@ -116,6 +132,9 @@ class ShipyardRequestContext(object): def set_user(self, user): self.user = user + def set_project(self, project): + self.project = project + def add_role(self, role): self.roles.append(role) @@ -127,7 +146,22 @@ class ShipyardRequestContext(object): if x != role] def set_external_marker(self, marker): - self.external_marker = str(marker)[:32] + self.external_marker = marker -class ShipyardRequest(request.Request): + def set_policy_engine(self, engine): + self.policy_engine = engine + + def to_policy_view(self): + policy_dict = {} + + policy_dict['user_id'] = self.user_id + policy_dict['user_domain_id'] = self.user_domain_id + policy_dict['project_id'] = self.project_id + policy_dict['project_domain_id'] = self.project_domain_id + policy_dict['roles'] = self.roles + policy_dict['is_admin_project'] = self.is_admin_project + + return policy_dict + +class ShipyardRequest(falcon.request.Request): context_type = ShipyardRequestContext diff --git a/shipyard_airflow/control/dag_runs.py b/shipyard_airflow/control/dag_runs.py deleted file mode 100644 index 3d992b7c..00000000 --- a/shipyard_airflow/control/dag_runs.py +++ /dev/null @@ -1,54 +0,0 @@ -# Copyright 2017 AT&T Intellectual Property. All other rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import falcon -import requests - -from .base import BaseResource - - -class DagRunResource(BaseResource): - - authorized_roles = ['user'] - - def on_post(self, - req, - resp, - dag_id, - run_id=None, - conf=None, - execution_date=None): - # Retrieve URL - web_server_url = self.retrieve_config('base', 'web_server') - - if 'Error' in web_server_url: - resp.status = falcon.HTTP_500 - raise falcon.HTTPInternalServerError("Internal Server Error", - "Missing Configuration File") - else: - req_url = '{}/api/experimental/dags/{}/dag_runs'.format( - web_server_url, dag_id) - - response = requests.post( - req_url, - json={ - "run_id": run_id, - "conf": conf, - "execution_date": execution_date, - }) - - if response.ok: - resp.status = falcon.HTTP_200 - else: - self.return_error(resp, falcon.HTTP_400, 'Fail to Execute Dag') - return diff --git a/shipyard_airflow/control/health.py b/shipyard_airflow/control/health.py index a59b6270..e9680cf3 100644 --- a/shipyard_airflow/control/health.py +++ b/shipyard_airflow/control/health.py @@ -15,11 +15,8 @@ import falcon from shipyard_airflow.control.base import BaseResource - class HealthResource(BaseResource): - authorized_roles = ['anyone'] - # Return empty response/body to show # that shipyard is healthy def on_get(self, req, resp): diff --git a/shipyard_airflow/control/middleware.py b/shipyard_airflow/control/middleware.py index 28941686..8c3066c4 100644 --- a/shipyard_airflow/control/middleware.py +++ b/shipyard_airflow/control/middleware.py @@ -11,66 +11,90 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import falcon import logging +from uuid import UUID +from oslo_utils import uuidutils +from shipyard_airflow import policy class AuthMiddleware(object): + def __init__(self): + self.logger = logging.getLogger('shipyard') # Authentication def process_request(self, req, resp): ctx = req.context - token = req.get_header('X-Auth-Token') + ctx.set_policy_engine(policy.policy_engine) - user = self.validate_token(token) + for k, v in req.headers.items(): + self.logger.debug("Request with header %s: %s" % (k, v)) - if user is not None: - ctx.set_user(user) - user_roles = self.role_list(user) - ctx.add_roles(user_roles) + auth_status = req.get_header( + 'X-SERVICE-IDENTITY-STATUS') # will be set to Confirmed or Invalid + service = True + + if auth_status is None: + auth_status = req.get_header('X-IDENTITY-STATUS') + service = False + + if auth_status == 'Confirmed': + # Process account and roles + ctx.authenticated = True + # User Identity, unique within owning domain + ctx.user = req.get_header( + 'X-SERVICE-USER-NAME') if service else req.get_header( + 'X-USER-NAME') + # Identity-service managed unique identifier + ctx.user_id = req.get_header( + 'X-SERVICE-USER-ID') if service else req.get_header( + 'X-USER-ID') + # Identity service managed unique identifier of owning domain of + # user name + ctx.user_domain_id = req.get_header( + 'X-SERVICE-USER-DOMAIN-ID') if service else req.get_header( + 'X-USER-DOMAIN-ID') + # Identity service managed unique identifier + ctx.project_id = req.get_header( + 'X-SERVICE-PROJECT-ID') if service else req.get_header( + 'X-PROJECT-ID') + # Name of owning domain of project + ctx.project_domain_id = req.get_header( + 'X-SERVICE-PROJECT-DOMAIN-ID') if service else req.get_header( + 'X-PROJECT-DOMAIN-NAME') + if service: + # comma delimieted list of case-sensitive role names + ctx.add_roles(req.get_header('X-SERVICE-ROLES').split(',')) + else: + ctx.add_roles(req.get_header('X-ROLES').split(',')) + + if req.get_header('X-IS-ADMIN-PROJECT') == 'True': + ctx.is_admin_project = True + else: + ctx.is_admin_project = False + + self.logger.debug( + 'Request from authenticated user %s with roles %s' % + (ctx.user, ','.join(ctx.roles))) else: - ctx.add_role('anyone') - - # Authorization - def process_resource(self, req, resp, resource, params): - ctx = req.context - - if not resource.authorize_roles(ctx.roles): - raise falcon.HTTPUnauthorized( - 'Authentication required', - ('This resource requires an authorized role.')) - - # Return the username associated with an authenticated token or None - def validate_token(self, token): - if token == '10': - return 'shipyard' - elif token == 'admin': - return 'admin' - else: - return None - - # Return the list of roles assigned to the username - # Roles need to be an enum - def role_list(self, username): - if username == 'shipyard': - return ['user'] - elif username == 'admin': - return ['user', 'admin'] + ctx.authenticated = False class ContextMiddleware(object): + def __init__(self): + # Setup validation pattern for external marker + try: + uuid_value = uuidutils.generate_uuid(dashed=True) + UUID(uuid_value) + except: + self.logger.error('UUID generation fail') + def process_request(self, req, resp): ctx = req.context - requested_logging = req.get_header('X-Log-Level') - - if requested_logging == 'DEBUG' and 'admin' in ctx.roles: - ctx.set_log_level('debug') - elif requested_logging == 'INFO': - ctx.set_log_level('info') - ext_marker = req.get_header('X-Context-Marker') - ctx.set_external_marker(ext_marker if ext_marker is not None else '') + + if ext_marker is not None and self.marker_re.fullmatch(ext_marker): + ctx.set_external_marker(ext_marker) class LoggingMiddleware(object): diff --git a/shipyard_airflow/control/regions.py b/shipyard_airflow/control/regions.py index 667a2cae..2209805c 100644 --- a/shipyard_airflow/control/regions.py +++ b/shipyard_airflow/control/regions.py @@ -14,19 +14,16 @@ import falcon from .base import BaseResource - +from shipyard_airflow import policy class RegionsResource(BaseResource): - authorized_roles = ['user'] - + @policy.ApiEnforcer('workflow_orchestrator:get_regions') def on_get(self, req, resp): resp.status = falcon.HTTP_200 - class RegionResource(BaseResource): - authorized_roles = ['user'] - + @policy.ApiEnforcer('workflow_orchestrator:get_regions') def on_get(self, req, resp, region_id): resp.status = falcon.HTTP_200 diff --git a/shipyard_airflow/control/shipyard.conf b/shipyard_airflow/control/shipyard.conf deleted file mode 100644 index 762129ad..00000000 --- a/shipyard_airflow/control/shipyard.conf +++ /dev/null @@ -1,36 +0,0 @@ -[base] -web_server=http://localhost:32080 -postgresql_db = postgresql+psycopg2://postgresql.ucp:5432/shipyard -postgresql_airflow_db = postgresql+psycopg2://postgresql.ucp:5432/airflow - -[shipyard] -host=shipyard-int.ucp -port=9000 - -[deckhand] -host=deckhand-api.ucp -port=80 - -[armada] -host=armada-api.ucp -port=8000 - -[drydock] -host=drydock-api.ucp -port=9000 -token=bigboss -site_yaml=/usr/local/airflow/plugins/drydock.yaml -prom_yaml=/usr/local/airflow/plugins/promenade.yaml - -[keystone] -OS_AUTH_URL=http://keystone-api.ucp:80/v3 -OS_PROJECT_NAME=service -OS_USER_DOMAIN_NAME=Default -OS_USERNAME=shipyard -OS_PASSWORD=password -OS_REGION_NAME=RegionOne -OS_IDENTITY_API_VERSION=3 - -[healthcheck] -schema=http -endpoint=/api/v1.0/health diff --git a/shipyard_airflow/control/shipyard.conf.example b/shipyard_airflow/control/shipyard.conf.example new file mode 100644 index 00000000..2b78f549 --- /dev/null +++ b/shipyard_airflow/control/shipyard.conf.example @@ -0,0 +1,320 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +#     http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +[base] +web_server=http://localhost:32080 +postgresql_db = postgresql+psycopg2://postgresql.ucp:5432/shipyard +postgresql_airflow_db = postgresql+psycopg2://postgresql.ucp:5432/airflow + +[shipyard] +host=shipyard-int.ucp +port=9000 + +[deckhand] +host=deckhand-api.ucp +port=80 + +[armada] +host=armada-api.ucp +port=8000 + +[drydock] +host=drydock-api.ucp +port=9000 +token=bigboss +site_yaml=/usr/local/airflow/plugins/drydock.yaml +prom_yaml=/usr/local/airflow/plugins/promenade.yaml + +[keystone] +OS_AUTH_URL=http://keystone-api.ucp:80/v3 +OS_PROJECT_NAME=service +OS_USER_DOMAIN_NAME=Default +OS_USERNAME=shipyard +OS_PASSWORD=password +OS_REGION_NAME=RegionOne +OS_IDENTITY_API_VERSION=3 + +[healthcheck] +schema=http +endpoint=/api/v1.0/health + +[keystone_authtoken] + +# +# From keystonemiddleware.auth_token +# + +# Complete "public" Identity API endpoint. This endpoint should not be an +# "admin" endpoint, as it should be accessible by all end users. Unauthenticated +# clients are redirected to this endpoint to authenticate. Although this +# endpoint should  ideally be unversioned, client support in the wild varies. +# If you're using a versioned v2 endpoint here, then this  should *not* be the +# same endpoint the service user utilizes  for validating tokens, because normal +# end users may not be  able to reach that endpoint. (string value) +# from .keystone_authtoken.keystonemiddleware.auth_token.auth_uri +auth_uri = http://keystone-api.openstack:80/v3 + +# API version of the admin Identity API endpoint. (string value) +# from .keystone_authtoken.keystonemiddleware.auth_token.auth_version +#auth_version = + +# Do not handle authorization requests within the middleware, but delegate the +# authorization decision to downstream WSGI components. (boolean value) +# from .keystone_authtoken.keystonemiddleware.auth_token.delay_auth_decision +delay_auth_decision = true + +# Request timeout value for communicating with Identity API server. (integer +# value) +# from .keystone_authtoken.keystonemiddleware.auth_token.http_connect_timeout +#http_connect_timeout = + +# How many times are we trying to reconnect when communicating with Identity API +# Server. (integer value) +# from .keystone_authtoken.keystonemiddleware.auth_token.http_request_max_retries +#http_request_max_retries = 3 + +# Request environment key where the Swift cache object is stored. When +# auth_token middleware is deployed with a Swift cache, use this option to have +# the middleware share a caching backend with swift. Otherwise, use the +# ``memcached_servers`` option instead. (string value) +# from .keystone_authtoken.keystonemiddleware.auth_token.cache +#cache = + +# Required if identity server requires client certificate (string value) +# from .keystone_authtoken.keystonemiddleware.auth_token.certfile +#certfile = + +# Required if identity server requires client certificate (string value) +# from .keystone_authtoken.keystonemiddleware.auth_token.keyfile +#keyfile = + +# A PEM encoded Certificate Authority to use when verifying HTTPs connections. +# Defaults to system CAs. (string value) +# from .keystone_authtoken.keystonemiddleware.auth_token.cafile +#cafile = + +# Verify HTTPS connections. (boolean value) +# from .keystone_authtoken.keystonemiddleware.auth_token.insecure +#insecure = false + +# The region in which the identity server can be found. (string value) +# from .keystone_authtoken.keystonemiddleware.auth_token.region_name +#region_name = + +# Directory used to cache files related to PKI tokens. (string value) +# from .keystone_authtoken.keystonemiddleware.auth_token.signing_dir +#signing_dir = + +# Optionally specify a list of memcached server(s) to use for caching. If left +# undefined, tokens will instead be cached in-process. (list value) +# Deprecated group/name - [keystone_authtoken]/memcache_servers +# from .keystone_authtoken.keystonemiddleware.auth_token.memcached_servers +#memcached_servers = + +# In order to prevent excessive effort spent validating tokens, the middleware +# caches previously-seen tokens for a configurable duration (in seconds). Set to +# -1 to disable caching completely. (integer value) +# from .keystone_authtoken.keystonemiddleware.auth_token.token_cache_time +#token_cache_time = 300 + +# Determines the frequency at which the list of revoked tokens is retrieved from +# the Identity service (in seconds). A high number of revocation events combined +# with a low cache duration may significantly reduce performance. Only valid for +# PKI tokens. (integer value) +# from .keystone_authtoken.keystonemiddleware.auth_token.revocation_cache_time +#revocation_cache_time = 10 + +# (Optional) If defined, indicate whether token data should be authenticated or +# authenticated and encrypted. If MAC, token data is authenticated (with HMAC) +# in the cache. If ENCRYPT, token data is encrypted and authenticated in the +# cache. If the value is not one of these options or empty, auth_token will +# raise an exception on initialization. (string value) +# Allowed values: None, MAC, ENCRYPT +# from .keystone_authtoken.keystonemiddleware.auth_token.memcache_security_strategy +#memcache_security_strategy = None + +# (Optional, mandatory if memcache_security_strategy is defined) This string is +# used for key derivation. (string value) +# from .keystone_authtoken.keystonemiddleware.auth_token.memcache_secret_key +#memcache_secret_key = + +# (Optional) Number of seconds memcached server is considered dead before it is +# tried again. (integer value) +# from .keystone_authtoken.keystonemiddleware.auth_token.memcache_pool_dead_retry +#memcache_pool_dead_retry = 300 + +# (Optional) Maximum total number of open connections to every memcached server. +# (integer value) +# from .keystone_authtoken.keystonemiddleware.auth_token.memcache_pool_maxsize +#memcache_pool_maxsize = 10 + +# (Optional) Socket timeout in seconds for communicating with a memcached +# server. (integer value) +# from .keystone_authtoken.keystonemiddleware.auth_token.memcache_pool_socket_timeout +#memcache_pool_socket_timeout = 3 + +# (Optional) Number of seconds a connection to memcached is held unused in the +# pool before it is closed. (integer value) +# from .keystone_authtoken.keystonemiddleware.auth_token.memcache_pool_unused_timeout +#memcache_pool_unused_timeout = 60 + +# (Optional) Number of seconds that an operation will wait to get a memcached +# client connection from the pool. (integer value) +# from .keystone_authtoken.keystonemiddleware.auth_token.memcache_pool_conn_get_timeout +#memcache_pool_conn_get_timeout = 10 + +# (Optional) Use the advanced (eventlet safe) memcached client pool. The +# advanced pool will only work under python 2.x. (boolean value) +# from .keystone_authtoken.keystonemiddleware.auth_token.memcache_use_advanced_pool +#memcache_use_advanced_pool = false + +# (Optional) Indicate whether to set the X-Service-Catalog header. If False, +# middleware will not ask for service catalog on token validation and will not +# set the X-Service-Catalog header. (boolean value) +# from .keystone_authtoken.keystonemiddleware.auth_token.include_service_catalog +#include_service_catalog = true + +# Used to control the use and type of token binding. Can be set to: "disabled" +# to not check token binding. "permissive" (default) to validate binding +# information if the bind type is of a form known to the server and ignore it if +# not. "strict" like "permissive" but if the bind type is unknown the token will +# be rejected. "required" any form of token binding is needed to be allowed. +# Finally the name of a binding method that must be present in tokens. (string +# value) +# from .keystone_authtoken.keystonemiddleware.auth_token.enforce_token_bind +#enforce_token_bind = permissive + +# If true, the revocation list will be checked for cached tokens. This requires +# that PKI tokens are configured on the identity server. (boolean value) +# from .keystone_authtoken.keystonemiddleware.auth_token.check_revocations_for_cached +#check_revocations_for_cached = false + +# Hash algorithms to use for hashing PKI tokens. This may be a single algorithm +# or multiple. The algorithms are those supported by Python standard +# hashlib.new(). The hashes will be tried in the order given, so put the +# preferred one first for performance. The result of the first hash will be +# stored in the cache. This will typically be set to multiple values only while +# migrating from a less secure algorithm to a more secure one. Once all the old +# tokens are expired this option should be set to a single value for better +# performance. (list value) +# from .keystone_authtoken.keystonemiddleware.auth_token.hash_algorithms +#hash_algorithms = md5 + +# Authentication type to load (string value) +# Deprecated group/name - [keystone_authtoken]/auth_plugin +# from .keystone_authtoken.keystonemiddleware.auth_token.auth_type +auth_type = password + +# Config Section from which to load plugin specific options (string value) +# from .keystone_authtoken.keystonemiddleware.auth_token.auth_section +auth_section = keystone_authtoken + + + +# +# From shipyard_orchestrator +# + +# Authentication URL (string value) +# from .keystone_authtoken.shipyard_orchestrator.auth_url +auth_url = http://keystone-api.openstack:80/v3 + +# Domain ID to scope to (string value) +# from .keystone_authtoken.shipyard_orchestrator.domain_id +#domain_id = + +# Domain name to scope to (string value) +# from .keystone_authtoken.shipyard_orchestrator.domain_name +#domain_name = + +# Project ID to scope to (string value) +# Deprecated group/name - [keystone_authtoken]/tenant-id +# from .keystone_authtoken.shipyard_orchestrator.project_id +#project_id = + +# Project name to scope to (string value) +# Deprecated group/name - [keystone_authtoken]/tenant-name +# from .keystone_authtoken.shipyard_orchestrator.project_name +project_name = service + +# Domain ID containing project (string value) +# from .keystone_authtoken.shipyard_orchestrator.project_domain_id +#project_domain_id = + +# Domain name containing project (string value) +# from .keystone_authtoken.shipyard_orchestrator.project_domain_name +project_domain_name = default + +# Trust ID (string value) +# from .keystone_authtoken.shipyard_orchestrator.trust_id +#trust_id = + +# Optional domain ID to use with v3 and v2 parameters. It will be used for both +# the user and project domain in v3 and ignored in v2 authentication. (string +# value) +# from .keystone_authtoken.shipyard_orchestrator.default_domain_id +#default_domain_id = + +# Optional domain name to use with v3 API and v2 parameters. It will be used for +# both the user and project domain in v3 and ignored in v2 authentication. +# (string value) +# from .keystone_authtoken.shipyard_orchestrator.default_domain_name +#default_domain_name = + +# User id (string value) +# from .keystone_authtoken.shipyard_orchestrator.user_id +#user_id = + +# Username (string value) +# Deprecated group/name - [keystone_authtoken]/user-name +# from .keystone_authtoken.shipyard_orchestrator.username +username = shipyard + +# User's domain id (string value) +# from .keystone_authtoken.shipyard_orchestrator.user_domain_id +#user_domain_id = + +# User's domain name (string value) +# from .keystone_authtoken.shipyard_orchestrator.user_domain_name +user_domain_name = default + +# User's password (string value) +# from .keystone_authtoken.shipyard_orchestrator.password +password = password + + +[oslo_policy] + +# +# From oslo.policy +# + +# The file that defines policies. (string value) +# Deprecated group/name - [DEFAULT]/policy_file +# from .oslo_policy.oslo.policy.policy_file +#policy_file = policy.json + +# Default rule. Enforced when a requested rule is not found. (string value) +# Deprecated group/name - [DEFAULT]/policy_default_rule +# from .oslo_policy.oslo.policy.policy_default_rule +#policy_default_rule = default + +# Directories where policy configuration files are stored. They can be relative +# to any directory in the search path defined by the config_dir option, or +# absolute paths. The file defined by policy_file must exist for these +# directories to be searched.  Missing or empty directories are ignored. (multi +# valued) +# Deprecated group/name - [DEFAULT]/policy_dirs +# from .oslo_policy.oslo.policy.policy_dirs (multiopt) +#policy_dirs = policy.d diff --git a/shipyard_airflow/control/tasks.py b/shipyard_airflow/control/tasks.py deleted file mode 100644 index eccdbb8f..00000000 --- a/shipyard_airflow/control/tasks.py +++ /dev/null @@ -1,44 +0,0 @@ -# Copyright 2017 AT&T Intellectual Property. All other rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import falcon -import json -import requests - -from .base import BaseResource - - -class TaskResource(BaseResource): - - authorized_roles = ['user'] - - def on_get(self, req, resp, dag_id, task_id): - # Retrieve URL - web_server_url = self.retrieve_config('base', 'web_server') - - if 'Error' in web_server_url: - resp.status = falcon.HTTP_500 - raise falcon.HTTPInternalServerError("Internal Server Error", - "Missing Configuration File") - else: - req_url = '{}/api/experimental/dags/{}/tasks/{}'.format( - web_server_url, dag_id, task_id) - task_details = requests.get(req_url).json() - - if 'error' in task_details: - resp.status = falcon.HTTP_400 - resp.body = json.dumps(task_details) - return - else: - resp.status = falcon.HTTP_200 - resp.body = json.dumps(task_details) diff --git a/shipyard_airflow/errors.py b/shipyard_airflow/errors.py index 5174ad00..deb7bacd 100644 --- a/shipyard_airflow/errors.py +++ b/shipyard_airflow/errors.py @@ -8,10 +8,7 @@ try: except ImportError: OrderedDict = dict -ERR_UNKNOWN = { - 'status': falcon.HTTP_500, - 'title': 'Internal Server Error' -} +ERR_UNKNOWN = {'status': falcon.HTTP_500, 'title': 'Internal Server Error'} ERR_AIRFLOW_RESPONSE = { 'status': falcon.HTTP_400, diff --git a/shipyard_airflow/policy.py b/shipyard_airflow/policy.py new file mode 100644 index 00000000..ab0d3f38 --- /dev/null +++ b/shipyard_airflow/policy.py @@ -0,0 +1,122 @@ +# Copyright 2017 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import logging +import functools +import falcon + +from oslo_config import cfg +from oslo_policy import policy + +policy_engine = None + + +class ShipyardPolicy(object): + """ + Initialize policy defaults + """ + # Base Policy + base_rules = [ + policy.RuleDefault( + 'admin_required', + 'role:admin', + description='Actions requiring admin authority'), + ] + + # Orchestrator Policy + task_rules = [ + policy.DocumentedRuleDefault('workflow_orchestrator:get_regions', + 'role:admin', 'Get region information', [{ + 'path': + '/api/v1.0/regions', + 'method': + 'GET' + }, { + 'path': + '/api/v1.0/regions/{region_id}', + 'method': + 'GET' + }]) + ] + + # Regions Policy + + def __init__(self): + self.enforcer = policy.Enforcer(cfg.CONF) + + def register_policy(self): + self.enforcer.register_defaults(ShipyardPolicy.base_rules) + self.enforcer.register_defaults(ShipyardPolicy.task_rules) + + def authorize(self, action, ctx): + target = {'project_id': ctx.project_id, 'user_id': ctx.user_id} + self.enforcer.authorize(action, target, ctx.to_policy_view()) + return self.enforcer.authorize(action, target, ctx.to_policy_view()) + + +class ApiEnforcer(object): + """ + A decorator class for enforcing RBAC policies + """ + + def __init__(self, action): + self.action = action + self.logger = logging.getLogger('shipyard.policy') + + def __call__(self, f): + @functools.wraps(f) + def secure_handler(slf, req, resp, *args, **kwargs): + ctx = req.context + policy_engine = ctx.policy_engine + self.logger.debug("Enforcing policy %s on request %s" % + (self.action, ctx.request_id)) + try: + if policy_engine is not None and policy_engine.authorize( + self.action, ctx): + return f(slf, req, resp, *args, **kwargs) + else: + if ctx.authenticated: + slf.info(ctx, "Error - Forbidden access - action: %s" % + self.action) + slf.return_error( + resp, + falcon.HTTP_403, + message="Forbidden", + retry=False) + else: + slf.info(ctx, "Error - Unauthenticated access") + slf.return_error( + resp, + falcon.HTTP_401, + message="Unauthenticated", + retry=False) + except: + slf.info( + ctx, + "Error - Expectation Failed - action: %s" % self.action) + slf.return_error( + resp, + falcon.HTTP_417, + message="Expectation Failed", + retry=False) + + return secure_handler + + +def list_policies(): + default_policy = [] + default_policy.extend(ShipyardPolicy.base_rules) + default_policy.extend(ShipyardPolicy.task_rules) + + return default_policy diff --git a/shipyard_airflow/shipyard.py b/shipyard_airflow/shipyard.py old mode 100644 new mode 100755 index f4d8f950..95f5df7c --- a/shipyard_airflow/shipyard.py +++ b/shipyard_airflow/shipyard.py @@ -12,11 +12,23 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging + +from oslo_config import cfg + +from shipyard_airflow import policy import shipyard_airflow.control.api as api +# We need to import config so the initializing code can run for oslo config +import shipyard_airflow.config as config # noqa: F401 def start_shipyard(): + # Setup configuration parsing + cli_options = [ + cfg.BoolOpt( + 'debug', short='d', default=False, help='Enable debug logging'), + ] + # Setup root logger logger = logging.getLogger('shipyard') @@ -37,7 +49,16 @@ def start_shipyard(): ch.setFormatter(formatter) logger.addHandler(ch) + # Setup the RBAC policy enforcer + policy.policy_engine = policy.ShipyardPolicy() + policy.policy_engine.register_policy() + return api.start_api() +# Initialization compatible with PasteDeploy +def paste_start_shipyard(global_conf, **kwargs): + return shipyard + + shipyard = start_shipyard() diff --git a/test-requirements.txt b/test-requirements.txt index 29d61759..fd27e23c 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -8,4 +8,4 @@ apache-airflow[crypto,celery,postgres,hive,hdfs,jdbc]==1.8.1 flake8==3.3.0 # Security scanning -bandit>=1.1.0 # Apache-2.0 \ No newline at end of file +bandit>=1.1.0 # Apache-2.0 diff --git a/tests/unit/control/test_airflow_connections.py b/tests/unit/control/test_airflow_connections.py deleted file mode 100644 index 9a6e3530..00000000 --- a/tests/unit/control/test_airflow_connections.py +++ /dev/null @@ -1,267 +0,0 @@ -# Copyright 2017 AT&T Intellectual Property. All other rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the 'License'); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an 'AS IS' BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import falcon -from falcon import testing -import mock - -from shipyard_airflow.control import api -from shipyard_airflow.control.airflow_connections import ( - AirflowAddConnectionResource, - AirflowDeleteConnectionResource, - AirflowListConnectionsResource, -) - - -class BaseTesting(testing.TestCase): - def setUp(self): - super().setUp() - self.app = api.start_api() - self.conn_id = 1 - self.protocol = 'http' - self.host = '10.0.0.1' - self.port = '3000' - - @property - def _headers(self): - return { - 'X-Auth-Token': '10' - } - - -class AirflowAddConnectionResourceTestCase(BaseTesting): - def setUp(self): - super().setUp() - self.action = 'add' - - @property - def _url(self): - return ('/api/v1.0/connections/{}/conn_id/{}/' - 'protocol/{}/host/{}/port/{}'.format( - self.action, self.conn_id, - self.protocol, self.host, self.port)) - - def test_on_get_missing_config_file(self): - doc = { - 'description': 'Missing Configuration File', - 'message': 'Internal Server Error' - } - result = self.simulate_get(self._url, headers=self._headers) - assert result.json == doc - assert result.status == falcon.HTTP_500 - - @mock.patch.object(AirflowAddConnectionResource, 'retrieve_config') - def test_on_get_invalid_action(self, mock_config): - self.action = 'invalid_action' - doc = { - 'title': 'Invalid parameter', - 'description': ('The "action" parameter is invalid.' - ' Invalid Paremeters for Adding Airflow' - ' Connection') - } - mock_config.return_value = 'some_url' - result = self.simulate_get(self._url, headers=self._headers) - assert result.json == doc - assert result.status == falcon.HTTP_400 - mock_config.assert_called_once_with('base', 'web_server') - - @mock.patch('shipyard_airflow.airflow_client.requests') - @mock.patch.object(AirflowAddConnectionResource, 'retrieve_config') - def test_on_get_airflow_error(self, mock_config, mock_requests): - doc = { - 'message': 'Error response from Airflow', - 'description': "can't add connections in airflow" - } - mock_response = { - 'output': { - 'stderr': "can't add connections in airflow" - } - } - mock_requests.get.return_value.json.return_value = mock_response - mock_config.return_value = 'some_url' - result = self.simulate_get(self._url, headers=self._headers) - assert result.json == doc - assert result.status == falcon.HTTP_400 - mock_config.assert_called_once_with('base', 'web_server') - - @mock.patch('shipyard_airflow.airflow_client.requests') - @mock.patch.object(AirflowAddConnectionResource, 'retrieve_config') - def test_on_get_airflow_success(self, mock_config, mock_requests): - doc = { - 'type': 'success', - 'message': 'Airflow Success', - } - mock_response = { - 'output': { - 'stderr': None, - 'stdout': 'Airflow Success' - } - } - mock_requests.get.return_value.json.return_value = mock_response - mock_config.return_value = 'some_url' - result = self.simulate_get(self._url, headers=self._headers) - assert result.json == doc - assert result.status == falcon.HTTP_200 - mock_config.assert_called_once_with('base', 'web_server') - - -class AirflowDeleteConnectionResource(BaseTesting): - def setUp(self): - self.action = 'delete' - super().setUp() - - @property - def _url(self): - return '/api/v1.0/connections/{}/conn_id/{}'.format( - self.action, self.conn_id) - - def test_on_get_missing_config_file(self): - doc = { - 'description': 'Missing Configuration File', - 'message': 'Internal Server Error' - } - result = self.simulate_get(self._url, headers=self._headers) - assert result.json == doc - assert result.status == falcon.HTTP_500 - - @mock.patch.object(AirflowDeleteConnectionResource, 'retrieve_config') - def test_on_get_invalid_action(self, mock_config): - self.action = 'invalid_action' - doc = { - 'title': 'Invalid parameter', - 'description': ('The "action" parameter is invalid.' - ' Invalid Paremeters for Deleting Airflow' - ' Connection') - } - mock_config.return_value = 'some_url' - result = self.simulate_get(self._url, headers=self._headers) - assert result.json == doc - assert result.status == falcon.HTTP_400 - mock_config.assert_called_once_with('base', 'web_server') - - @mock.patch('shipyard_airflow.airflow_client.requests') - @mock.patch.object(AirflowDeleteConnectionResource, 'retrieve_config') - def test_on_get_airflow_error(self, mock_config, mock_requests): - doc = { - 'message': 'Error response from Airflow', - 'description': "can't delete connections in airflow" - } - mock_response = { - 'output': { - 'stderr': "can't delete connections in airflow" - } - } - mock_requests.get.return_value.json.return_value = mock_response - mock_config.return_value = 'some_url' - result = self.simulate_get(self._url, headers=self._headers) - - assert result.json == doc - assert result.status == falcon.HTTP_400 - mock_config.assert_called_once_with('base', 'web_server') - - @mock.patch('shipyard_airflow.airflow_client.requests') - @mock.patch.object(AirflowDeleteConnectionResource, 'retrieve_config') - def test_on_get_airflow_success(self, mock_config, mock_requests): - doc = { - 'type': 'success', - 'message': 'Airflow Success', - } - mock_response = { - 'output': { - 'stderr': None, - 'stdout': 'Airflow Success' - } - } - mock_requests.get.return_value.json.return_value = mock_response - mock_config.return_value = 'some_url' - result = self.simulate_get(self._url, headers=self._headers) - assert result.json == doc - assert result.status == falcon.HTTP_200 - mock_config.assert_called_once_with('base', 'web_server') - - -class AirflowListConnectionsResource(BaseTesting): - def setUp(self): - self.action = 'list' - super().setUp() - - @property - def _url(self): - return '/api/v1.0/connections/{}'.format(self.action) - - def test_on_get_missing_config_file(self): - doc = { - 'description': 'Missing Configuration File', - 'message': 'Internal Server Error' - } - result = self.simulate_get(self._url, headers=self._headers) - assert result.json == doc - assert result.status == falcon.HTTP_500 - - @mock.patch.object(AirflowListConnectionsResource, 'retrieve_config') - def test_on_get_invalid_action(self, mock_config): - self.action = 'invalid_action' - doc = { - 'title': 'Invalid parameter', - 'description': ('The "action" parameter is invalid.' - ' Invalid Paremeters for listing Airflow' - ' Connections') - } - mock_config.return_value = 'some_url' - result = self.simulate_get(self._url, headers=self._headers) - - assert result.json == doc - assert result.status == falcon.HTTP_400 - mock_config.assert_called_once_with('base', 'web_server') - - @mock.patch('shipyard_airflow.airflow_client.requests') - @mock.patch.object(AirflowListConnectionsResource, 'retrieve_config') - def test_on_get_airflow_error(self, mock_config, mock_requests): - doc = { - 'message': 'Error response from Airflow', - 'description': "can't list connections in airlfow" - } - mock_response = { - 'output': { - 'stderr': "can't list connections in airlfow" - } - } - mock_requests.get.return_value.json.return_value = mock_response - mock_config.return_value = 'some_url' - result = self.simulate_get(self._url, headers=self._headers) - - assert result.json == doc - assert result.status == falcon.HTTP_400 - mock_config.assert_called_once_with('base', 'web_server') - - @mock.patch('shipyard_airflow.airflow_client.requests') - @mock.patch.object(AirflowListConnectionsResource, 'retrieve_config') - def test_on_get_airflow_success(self, mock_config, mock_requests): - doc = { - 'type': 'success', - 'message': 'Airflow Success', - } - mock_response = { - 'output': { - 'stderr': None, - 'stdout': 'Airflow Success' - } - } - mock_requests.get.return_value.json.return_value = mock_response - mock_config.return_value = 'some_url' - result = self.simulate_get(self._url, headers=self._headers) - - assert result.json == doc - assert result.status == falcon.HTTP_200 - mock_config.assert_called_once_with('base', 'web_server')