diff --git a/setup.py b/setup.py index f5cf46a3..85e81e05 100644 --- a/setup.py +++ b/setup.py @@ -26,6 +26,8 @@ setup(name='shipyard_airflow', install_requires=[ 'falcon', 'requests', + 'configparser', 'uwsgi>1.4' ] ) + diff --git a/shipyard_airflow/control/api.py b/shipyard_airflow/control/api.py index 8913b86c..4a004aa8 100644 --- a/shipyard_airflow/control/api.py +++ b/shipyard_airflow/control/api.py @@ -18,10 +18,10 @@ from .regions import RegionsResource, RegionResource from .base import ShipyardRequest, BaseResource from .tasks import TaskResource from .dag_runs import DagRunResource - from .middleware import AuthMiddleware, ContextMiddleware, LoggingMiddleware def start_api(): + control_api = falcon.API(request_type=ShipyardRequest, middleware=[AuthMiddleware(), ContextMiddleware(), LoggingMiddleware()]) @@ -51,3 +51,4 @@ class VersionsResource(BaseResource): 'status': 'stable' }}) resp.status = falcon.HTTP_200 + diff --git a/shipyard_airflow/control/base.py b/shipyard_airflow/control/base.py index bb7e051f..94945f2b 100644 --- a/shipyard_airflow/control/base.py +++ b/shipyard_airflow/control/base.py @@ -13,6 +13,8 @@ # limitations under the License. import falcon.request as request import uuid +import json +import ConfigParser class BaseResource(object): @@ -40,6 +42,33 @@ class BaseResource(object): else: return True + # Error Handling + def return_error(self, resp, status_code, message="", retry=False): + """ + Write a error message body and throw a Falcon exception to trigger an HTTP status + + :param resp: Falcon response object to update + :param status_code: Falcon status_code constant + :param message: Optional error message to include in the body + :param retry: Optional flag whether client should retry the operation. Can ignore if we rely solely on 4XX vs 5xx status codes + """ + resp.body = json.dumps({'type': 'error', 'message': message, 'retry': retry}) + resp.content_type = 'application/json' + resp.status = status_code + + # Get Config Data + def retrieve_config(self, resp, section="", variable=""): + config = ConfigParser.ConfigParser() + + # The current assumption is that shipyard.conf will be placed in a fixed path + # within the shipyard container - Path TBD + config.read('/home/ubuntu/att-comdev/shipyard/shipyard_airflow/control/shipyard.conf') + + # Retrieve data from shipyard.conf + query_data = config.get(section, variable) + + return query_data + class ShipyardRequestContext(object): @@ -72,3 +101,4 @@ class ShipyardRequestContext(object): class ShipyardRequest(request.Request): context_type = ShipyardRequestContext + diff --git a/shipyard_airflow/control/dag_runs.py b/shipyard_airflow/control/dag_runs.py index e34c13ca..4fcaca84 100644 --- a/shipyard_airflow/control/dag_runs.py +++ b/shipyard_airflow/control/dag_runs.py @@ -13,6 +13,7 @@ # limitations under the License. import falcon import requests +import json from .base import BaseResource @@ -21,19 +22,21 @@ class DagRunResource(BaseResource): authorized_roles = ['user'] def on_post(self, req, resp, dag_id, run_id=None, conf=None, execution_date=None): - req_url = 'http://localhost:32080/api/experimental/dags/{}/dag_runs'.format(dag_id) + # Retrieve URL + web_server_url = self.retrieve_config(resp, 'BASE', 'WEB_SERVER') + req_url = '{}/api/experimental/dags/{}/dag_runs'.format(web_server_url, dag_id) + response = requests.post(req_url, json={ "run_id": run_id, "conf": conf, "execution_date": execution_date, }) - if not response.ok: - raise IOError() - else: + + if response.ok: resp.status = falcon.HTTP_200 + else: + self.return_error(resp, falcon.HTTP_400, 'Fail to Execute Dag') + return - data = response.json() - - return data['message'] diff --git a/shipyard_airflow/control/middleware.py b/shipyard_airflow/control/middleware.py index 019053e2..c4db2c76 100644 --- a/shipyard_airflow/control/middleware.py +++ b/shipyard_airflow/control/middleware.py @@ -85,4 +85,5 @@ class LoggingMiddleware(object): } resp.append_header('X-Shipyard-Req', ctx.request_id) - self.logger.info("%s - %s" % (req.uri, resp.status), extra=extra) \ No newline at end of file + self.logger.info("%s - %s" % (req.uri, resp.status), extra=extra) + diff --git a/shipyard_airflow/control/regions.py b/shipyard_airflow/control/regions.py index 32808528..ac6d970a 100644 --- a/shipyard_airflow/control/regions.py +++ b/shipyard_airflow/control/regions.py @@ -27,4 +27,5 @@ class RegionResource(BaseResource): authorized_roles = ['user'] def on_get(self, req, resp, region_id): - resp.status = falcon.HTTP_200 \ No newline at end of file + resp.status = falcon.HTTP_200 + diff --git a/shipyard_airflow/control/shipyard.conf b/shipyard_airflow/control/shipyard.conf new file mode 100644 index 00000000..6dadc48c --- /dev/null +++ b/shipyard_airflow/control/shipyard.conf @@ -0,0 +1,3 @@ +[BASE] +WEB_SERVER=http://localhost:32080 + diff --git a/shipyard_airflow/control/tasks.py b/shipyard_airflow/control/tasks.py index 27a02347..4584c54c 100644 --- a/shipyard_airflow/control/tasks.py +++ b/shipyard_airflow/control/tasks.py @@ -22,7 +22,17 @@ class TaskResource(BaseResource): authorized_roles = ['user'] def on_get(self, req, resp, dag_id, task_id): - resp.status = falcon.HTTP_200 - req_url = 'http://localhost:32080/api/experimental/dags/{}/tasks/{}'.format(dag_id, task_id) + # Retrieve URL + web_server_url = self.retrieve_config(resp, 'BASE', 'WEB_SERVER') + + req_url = '{}/api/experimental/dags/{}/tasks/{}'.format(web_server_url, dag_id, task_id) task_details = requests.get(req_url).json() - resp.body = json.dumps(task_details) + + if 'error' in task_details: + resp.status = falcon.HTTP_400 + resp.body = json.dumps(task_details) + return + else: + resp.status = falcon.HTTP_200 + resp.body = json.dumps(task_details) + diff --git a/shipyard_airflow/setup.py b/shipyard_airflow/setup.py index f5cf46a3..85e81e05 100644 --- a/shipyard_airflow/setup.py +++ b/shipyard_airflow/setup.py @@ -26,6 +26,8 @@ setup(name='shipyard_airflow', install_requires=[ 'falcon', 'requests', + 'configparser', 'uwsgi>1.4' ] ) + diff --git a/shipyard_airflow/shipyard.py b/shipyard_airflow/shipyard.py index 5f95458b..887028b3 100644 --- a/shipyard_airflow/shipyard.py +++ b/shipyard_airflow/shipyard.py @@ -36,4 +36,5 @@ def start_shipyard(): return api.start_api() -shipyard = start_shipyard() \ No newline at end of file +shipyard = start_shipyard() +