From 6a7522bbb2f59056eac97fcb0b790830d70d0f80 Mon Sep 17 00:00:00 2001 From: Bryan Strassner Date: Wed, 16 Aug 2017 15:57:11 -0500 Subject: [PATCH] Updates for pep-8 compliance. Changed many files cosmetically Added a file exclusion to tox.ini, and also excluded rule for unused named variables. excluded more things that are system or tox related. Change-Id: I022b72cbe048e3fe1f70e6017038248e6a2e9538 --- shipyard_airflow/control/airflow_dag_state.py | 12 ++++--- .../control/airflow_get_task_status.py | 14 +++++--- .../control/airflow_get_version.py | 14 ++++---- shipyard_airflow/control/airflow_list_dags.py | 11 +++--- .../control/airflow_list_tasks.py | 11 +++--- .../control/airflow_trigger_dag.py | 15 ++++---- .../control/airflow_trigger_dag_poll.py | 35 ++++++++++++------- shipyard_airflow/control/dag_runs.py | 32 ++++++++++------- shipyard_airflow/control/middleware.py | 11 +++--- shipyard_airflow/control/regions.py | 3 +- shipyard_airflow/control/tasks.py | 8 +++-- .../dags/dag_concurrency_check.py | 4 +-- shipyard_airflow/dags/deploy_site.py | 28 ++++++--------- shipyard_airflow/dags/preflight_checks.py | 23 ++++++------ .../samples/airflow_task_state_operators.py | 10 +++--- .../dags/samples/openstack_api_call.py | 13 +++---- shipyard_airflow/dags/validate_site_design.py | 7 ++-- .../plugins/airflow_task_state_operators.py | 26 +++++++++----- shipyard_airflow/plugins/deckhand_operator.py | 2 +- .../plugins/openstack_operators.py | 27 +++++++------- shipyard_airflow/shipyard.py | 7 ++-- tox.ini | 8 ++++- 22 files changed, 181 insertions(+), 140 deletions(-) diff --git a/shipyard_airflow/control/airflow_dag_state.py b/shipyard_airflow/control/airflow_dag_state.py index 3d6d467a..6392ee7a 100644 --- a/shipyard_airflow/control/airflow_dag_state.py +++ b/shipyard_airflow/control/airflow_dag_state.py @@ -12,11 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. import falcon -import json import requests from .base import BaseResource + class GetDagStateResource(BaseResource): authorized_roles = ['user'] @@ -27,11 +27,14 @@ class GetDagStateResource(BaseResource): if 'Error' in web_server_url: resp.status = falcon.HTTP_500 - raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File") + raise falcon.HTTPInternalServerError("Internal Server Error", + "Missing Configuration File") else: - req_url = '{}/admin/rest_api/api?api=dag_state&dag_id={}&execution_date={}'.format(web_server_url, dag_id, execution_date) + req_url = ('{}/admin/rest_api/api?api=dag_state&dag_id={}' + '&execution_date={}'.format(web_server_url, dag_id, + execution_date)) response = requests.get(req_url).json() - + if response["output"]["stderr"]: resp.status = falcon.HTTP_400 resp.body = response["output"]["stderr"] @@ -39,4 +42,3 @@ class GetDagStateResource(BaseResource): else: resp.status = falcon.HTTP_200 resp.body = response["output"]["stdout"] - diff --git a/shipyard_airflow/control/airflow_get_task_status.py b/shipyard_airflow/control/airflow_get_task_status.py index d768504f..1c404242 100644 --- a/shipyard_airflow/control/airflow_get_task_status.py +++ b/shipyard_airflow/control/airflow_get_task_status.py @@ -12,11 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. import falcon -import json import requests from .base import BaseResource + class GetTaskStatusResource(BaseResource): authorized_roles = ['user'] @@ -27,11 +27,16 @@ class GetTaskStatusResource(BaseResource): if 'Error' in web_server_url: resp.status = falcon.HTTP_500 - raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File") + raise falcon.HTTPInternalServerError("Internal Server Error", + "Missing Configuration File") else: - req_url = '{}/admin/rest_api/api?api=task_state&dag_id={}&task_id={}&execution_date={}'.format(web_server_url, dag_id, task_id, execution_date) + + req_url = ( + '{}/admin/rest_api/api?api=task_state&dag_id={}&task_id={}' + '&execution_date={}'.format(web_server_url, dag_id, task_id, + execution_date)) response = requests.get(req_url).json() - + if response["output"]["stderr"]: resp.status = falcon.HTTP_400 resp.body = response["output"]["stderr"] @@ -39,4 +44,3 @@ class GetTaskStatusResource(BaseResource): else: resp.status = falcon.HTTP_200 resp.body = response["output"]["stdout"] - diff --git a/shipyard_airflow/control/airflow_get_version.py b/shipyard_airflow/control/airflow_get_version.py index 07abed0a..2b111913 100644 --- a/shipyard_airflow/control/airflow_get_version.py +++ b/shipyard_airflow/control/airflow_get_version.py @@ -12,11 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. import falcon -import json import requests from .base import BaseResource + class GetAirflowVersionResource(BaseResource): authorized_roles = ['user'] @@ -27,16 +27,18 @@ class GetAirflowVersionResource(BaseResource): if 'Error' in web_server_url: resp.status = falcon.HTTP_500 - raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File") + raise falcon.HTTPInternalServerError("Internal Server Error", + "Missing Configuration File") else: # Get Airflow Version - req_url = '{}/admin/rest_api/api?api=version'.format(web_server_url) + req_url = '{}/admin/rest_api/api?api=version'.format( + web_server_url) response = requests.get(req_url).json() - + if response["output"]: resp.status = falcon.HTTP_200 resp.body = response["output"] else: - self.return_error(resp, falcon.HTTP_400, 'Fail to Retrieve Airflow Version') + self.return_error(resp, falcon.HTTP_400, + 'Fail to Retrieve Airflow Version') return - diff --git a/shipyard_airflow/control/airflow_list_dags.py b/shipyard_airflow/control/airflow_list_dags.py index ee2fc8c7..fee190ba 100644 --- a/shipyard_airflow/control/airflow_list_dags.py +++ b/shipyard_airflow/control/airflow_list_dags.py @@ -12,11 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. import falcon -import json import requests from .base import BaseResource + class ListDagsResource(BaseResource): authorized_roles = ['user'] @@ -27,12 +27,14 @@ class ListDagsResource(BaseResource): if 'Error' in web_server_url: resp.status = falcon.HTTP_500 - raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File") + raise falcon.HTTPInternalServerError("Internal Server Error", + "Missing Configuration File") else: # List available dags - req_url = '{}/admin/rest_api/api?api=list_dags'.format(web_server_url) + req_url = '{}/admin/rest_api/api?api=list_dags'.format( + web_server_url) response = requests.get(req_url).json() - + if response["output"]["stderr"]: resp.status = falcon.HTTP_400 resp.body = response["output"]["stderr"] @@ -40,4 +42,3 @@ class ListDagsResource(BaseResource): else: resp.status = falcon.HTTP_200 resp.body = response["output"]["stdout"] - diff --git a/shipyard_airflow/control/airflow_list_tasks.py b/shipyard_airflow/control/airflow_list_tasks.py index b8b848a6..eb139915 100644 --- a/shipyard_airflow/control/airflow_list_tasks.py +++ b/shipyard_airflow/control/airflow_list_tasks.py @@ -12,11 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. import falcon -import json import requests from .base import BaseResource + class ListTasksResource(BaseResource): authorized_roles = ['user'] @@ -27,12 +27,14 @@ class ListTasksResource(BaseResource): if 'Error' in web_server_url: resp.status = falcon.HTTP_500 - raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File") + raise falcon.HTTPInternalServerError("Internal Server Error", + "Missing Configuration File") else: # Retrieve all tasks belonging to a particular Dag - req_url = '{}/admin/rest_api/api?api=list_tasks&dag_id={}'.format(web_server_url, dag_id) + req_url = '{}/admin/rest_api/api?api=list_tasks&dag_id={}'.format( + web_server_url, dag_id) response = requests.get(req_url).json() - + if response["output"]["stderr"]: resp.status = falcon.HTTP_400 resp.body = response["output"]["stderr"] @@ -40,4 +42,3 @@ class ListTasksResource(BaseResource): else: resp.status = falcon.HTTP_200 resp.body = response["output"]["stdout"] - diff --git a/shipyard_airflow/control/airflow_trigger_dag.py b/shipyard_airflow/control/airflow_trigger_dag.py index 1e860aa9..3acf9d92 100644 --- a/shipyard_airflow/control/airflow_trigger_dag.py +++ b/shipyard_airflow/control/airflow_trigger_dag.py @@ -12,12 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. import falcon -import json import requests from dateutil.parser import parse from .base import BaseResource + class TriggerDagRunResource(BaseResource): authorized_roles = ['user'] @@ -28,12 +28,15 @@ class TriggerDagRunResource(BaseResource): if 'Error' in web_server_url: resp.status = falcon.HTTP_500 - raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File") + raise falcon.HTTPInternalServerError("Internal Server Error", + "Missing Configuration File") else: - req_url = '{}/admin/rest_api/api?api=trigger_dag&dag_id={}&run_id={}'.format(web_server_url, dag_id, run_id) + req_url = ('{}/admin/rest_api/api?api=trigger_dag&dag_id={}' + '&run_id={}'.format(web_server_url, dag_id, run_id)) response = requests.get(req_url).json() - # Returns error response if API call returns response code other than 200 + # Returns error response if API call returns + # response code other than 200 if response["http_response_code"] != 200: resp.status = falcon.HTTP_400 resp.body = response["output"] @@ -41,7 +44,7 @@ class TriggerDagRunResource(BaseResource): else: resp.status = falcon.HTTP_200 - # Return time of execution so that we can use it to query dag/task status + # Return time of execution so that we can use + # it to query dag/task status dt = parse(response["response_time"]) resp.body = dt.strftime('%Y-%m-%dT%H:%M:%S') - diff --git a/shipyard_airflow/control/airflow_trigger_dag_poll.py b/shipyard_airflow/control/airflow_trigger_dag_poll.py index 05db6997..36765f3f 100644 --- a/shipyard_airflow/control/airflow_trigger_dag_poll.py +++ b/shipyard_airflow/control/airflow_trigger_dag_poll.py @@ -20,6 +20,7 @@ import logging from dateutil.parser import parse from .base import BaseResource + class TriggerDagRunPollResource(BaseResource): authorized_roles = ['user'] @@ -30,11 +31,13 @@ class TriggerDagRunPollResource(BaseResource): if 'Error' in web_server_url: resp.status = falcon.HTTP_500 - raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File") + raise falcon.HTTPInternalServerError("Internal Server Error", + "Missing Configuration File") else: - req_url = '{}/admin/rest_api/api?api=trigger_dag&dag_id={}&run_id={}'.format(web_server_url, dag_id, run_id) + req_url = ('{}/admin/rest_api/api?api=trigger_dag&dag_id={}' + '&run_id={}'.format(web_server_url, dag_id, run_id)) response = requests.get(req_url).json() - + if response["http_response_code"] != 200: resp.status = falcon.HTTP_400 resp.body = response["output"] @@ -42,28 +45,35 @@ class TriggerDagRunPollResource(BaseResource): else: resp.status = falcon.HTTP_200 - logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s') logging.info("Executing '" + dag_id + "' Dag...") - # Retrieve time of execution so that we can use it to query dag/task status + # Retrieve time of execution so that we + # can use it to query dag/task status dt = parse(response["response_time"]) exec_date = dt.strftime('%Y-%m-%dT%H:%M:%S') - url = '{}/admin/rest_api/api?api=dag_state&dag_id={}&execution_date={}'.format(web_server_url, dag_id, exec_date) + url = ('{}/admin/rest_api/api' + '?api=dag_state&dag_id={}&execution_date={}'.format( + web_server_url, dag_id, exec_date)) # Back off for 5 seconds before querying the initial state - time.sleep( 5 ) + time.sleep(5) dag_state = requests.get(url).json() # Remove newline character at the end of the response - dag_state = dag_state["output"]["stdout"].encode('utf8').rstrip() + dag_state = dag_state["output"]["stdout"].encode( + 'utf8').rstrip() while dag_state != 'success': # Get current state dag_state = requests.get(url).json() # Remove newline character at the end of the response - dag_state = dag_state["output"]["stdout"].encode('utf8').rstrip() + dag_state = dag_state["output"]["stdout"].encode( + 'utf8').rstrip() # Logs output of current dag state logging.info('Current Dag State: ' + dag_state) @@ -71,11 +81,12 @@ class TriggerDagRunPollResource(BaseResource): if dag_state == 'failed': resp.status = falcon.HTTP_500 logging.info('Dag Execution Failed') - resp.body = json.dumps({'Error': 'Dag Execution Failed'}) + resp.body = json.dumps({ + 'Error': 'Dag Execution Failed' + }) return # Wait for 20 seconds before doing a new query - time.sleep( 20 ) + time.sleep(20) logging.info('Dag Successfully Executed') - diff --git a/shipyard_airflow/control/dag_runs.py b/shipyard_airflow/control/dag_runs.py index ee480634..3d992b7c 100644 --- a/shipyard_airflow/control/dag_runs.py +++ b/shipyard_airflow/control/dag_runs.py @@ -13,34 +13,42 @@ # limitations under the License. import falcon import requests -import json from .base import BaseResource + class DagRunResource(BaseResource): authorized_roles = ['user'] - def on_post(self, req, resp, dag_id, run_id=None, conf=None, execution_date=None): + def on_post(self, + req, + resp, + dag_id, + run_id=None, + conf=None, + execution_date=None): # Retrieve URL web_server_url = self.retrieve_config('base', 'web_server') if 'Error' in web_server_url: resp.status = falcon.HTTP_500 - raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File") + raise falcon.HTTPInternalServerError("Internal Server Error", + "Missing Configuration File") else: - req_url = '{}/api/experimental/dags/{}/dag_runs'.format(web_server_url, dag_id) - - response = requests.post(req_url, - json={ - "run_id": run_id, - "conf": conf, - "execution_date": execution_date, - }) + req_url = '{}/api/experimental/dags/{}/dag_runs'.format( + web_server_url, dag_id) + + response = requests.post( + req_url, + json={ + "run_id": run_id, + "conf": conf, + "execution_date": execution_date, + }) if response.ok: resp.status = falcon.HTTP_200 else: self.return_error(resp, falcon.HTTP_400, 'Fail to Execute Dag') return - diff --git a/shipyard_airflow/control/middleware.py b/shipyard_airflow/control/middleware.py index c4db2c76..28941686 100644 --- a/shipyard_airflow/control/middleware.py +++ b/shipyard_airflow/control/middleware.py @@ -14,6 +14,7 @@ import falcon import logging + class AuthMiddleware(object): # Authentication @@ -35,8 +36,9 @@ class AuthMiddleware(object): ctx = req.context if not resource.authorize_roles(ctx.roles): - raise falcon.HTTPUnauthorized('Authentication required', - ('This resource requires an authorized role.')) + raise falcon.HTTPUnauthorized( + 'Authentication required', + ('This resource requires an authorized role.')) # Return the username associated with an authenticated token or None def validate_token(self, token): @@ -55,8 +57,8 @@ class AuthMiddleware(object): elif username == 'admin': return ['user', 'admin'] -class ContextMiddleware(object): +class ContextMiddleware(object): def process_request(self, req, resp): ctx = req.context @@ -70,8 +72,8 @@ class ContextMiddleware(object): ext_marker = req.get_header('X-Context-Marker') ctx.set_external_marker(ext_marker if ext_marker is not None else '') -class LoggingMiddleware(object): +class LoggingMiddleware(object): def __init__(self): self.logger = logging.getLogger('shipyard.control') @@ -86,4 +88,3 @@ class LoggingMiddleware(object): resp.append_header('X-Shipyard-Req', ctx.request_id) self.logger.info("%s - %s" % (req.uri, resp.status), extra=extra) - diff --git a/shipyard_airflow/control/regions.py b/shipyard_airflow/control/regions.py index ac6d970a..667a2cae 100644 --- a/shipyard_airflow/control/regions.py +++ b/shipyard_airflow/control/regions.py @@ -15,6 +15,7 @@ import falcon from .base import BaseResource + class RegionsResource(BaseResource): authorized_roles = ['user'] @@ -22,10 +23,10 @@ class RegionsResource(BaseResource): def on_get(self, req, resp): resp.status = falcon.HTTP_200 + class RegionResource(BaseResource): authorized_roles = ['user'] def on_get(self, req, resp, region_id): resp.status = falcon.HTTP_200 - diff --git a/shipyard_airflow/control/tasks.py b/shipyard_airflow/control/tasks.py index e67b79e6..eccdbb8f 100644 --- a/shipyard_airflow/control/tasks.py +++ b/shipyard_airflow/control/tasks.py @@ -17,6 +17,7 @@ import requests from .base import BaseResource + class TaskResource(BaseResource): authorized_roles = ['user'] @@ -27,9 +28,11 @@ class TaskResource(BaseResource): if 'Error' in web_server_url: resp.status = falcon.HTTP_500 - raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File") + raise falcon.HTTPInternalServerError("Internal Server Error", + "Missing Configuration File") else: - req_url = '{}/api/experimental/dags/{}/tasks/{}'.format(web_server_url, dag_id, task_id) + req_url = '{}/api/experimental/dags/{}/tasks/{}'.format( + web_server_url, dag_id, task_id) task_details = requests.get(req_url).json() if 'error' in task_details: @@ -39,4 +42,3 @@ class TaskResource(BaseResource): else: resp.status = falcon.HTTP_200 resp.body = json.dumps(task_details) - diff --git a/shipyard_airflow/dags/dag_concurrency_check.py b/shipyard_airflow/dags/dag_concurrency_check.py index 7a2558f2..744eb1c3 100644 --- a/shipyard_airflow/dags/dag_concurrency_check.py +++ b/shipyard_airflow/dags/dag_concurrency_check.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import airflow from airflow.models import DAG from airflow.operators import PlaceholderOperator from airflow.operators.dummy_operator import DummyOperator @@ -48,6 +47,7 @@ def dag_concurrency_check_failure_handler(parent_dag_name, child_dag_name, default_args=args, ) operator = DummyOperator( - task_id='dag_concurrency_check_failure_handler', dag=dag, ) + task_id='dag_concurrency_check_failure_handler', + dag=dag, ) return dag diff --git a/shipyard_airflow/dags/deploy_site.py b/shipyard_airflow/dags/deploy_site.py index 4e748a30..53bb97a3 100644 --- a/shipyard_airflow/dags/deploy_site.py +++ b/shipyard_airflow/dags/deploy_site.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from datetime import datetime, timedelta +from datetime import timedelta import airflow from airflow import DAG @@ -26,8 +26,6 @@ from airflow.operators.subdag_operator import SubDagOperator from airflow.operators import DeckhandOperator from airflow.operators import PlaceholderOperator from airflow.utils.trigger_rule import TriggerRule -from datetime import datetime, timedelta - ''' deploy_site is the top-level orchestration DAG for deploying a site using the Undercloud platform. @@ -57,30 +55,27 @@ default_args = { dag = DAG(PARENT_DAG_NAME, default_args=default_args, schedule_interval=None) concurrency_check = SubDagOperator( - subdag=dag_concurrency_check(PARENT_DAG_NAME, - DAG_CONCURRENCY_CHECK_DAG_NAME, - args=default_args), + subdag=dag_concurrency_check( + PARENT_DAG_NAME, DAG_CONCURRENCY_CHECK_DAG_NAME, args=default_args), task_id=DAG_CONCURRENCY_CHECK_DAG_NAME, dag=dag, ) concurrency_check_failure_handler = SubDagOperator( subdag=dag_concurrency_check_failure_handler( - PARENT_DAG_NAME, CONCURRENCY_FAILURE_DAG_NAME, - args=default_args), + PARENT_DAG_NAME, CONCURRENCY_FAILURE_DAG_NAME, args=default_args), task_id=CONCURRENCY_FAILURE_DAG_NAME, trigger_rule=TriggerRule.ONE_FAILED, dag=dag, ) preflight = SubDagOperator( - subdag=all_preflight_checks(PARENT_DAG_NAME, ALL_PREFLIGHT_CHECKS_DAG_NAME, - args=default_args), + subdag=all_preflight_checks( + PARENT_DAG_NAME, ALL_PREFLIGHT_CHECKS_DAG_NAME, args=default_args), task_id=ALL_PREFLIGHT_CHECKS_DAG_NAME, dag=dag, ) preflight_failure = SubDagOperator( - subdag=preflight_failure_handler(PARENT_DAG_NAME, - PREFLIGHT_FAILURE_DAG_NAME, - args=default_args), + subdag=preflight_failure_handler( + PARENT_DAG_NAME, PREFLIGHT_FAILURE_DAG_NAME, args=default_args), task_id=PREFLIGHT_FAILURE_DAG_NAME, trigger_rule=TriggerRule.ONE_FAILED, dag=dag, ) @@ -89,15 +84,14 @@ get_design_version = DeckhandOperator( task_id=DECKHAND_GET_DESIGN_VERSION, dag=dag) validate_site_design = SubDagOperator( - subdag=validate_site_design(PARENT_DAG_NAME, VALIDATE_SITE_DESIGN_DAG_NAME, - args=default_args), + subdag=validate_site_design( + PARENT_DAG_NAME, VALIDATE_SITE_DESIGN_DAG_NAME, args=default_args), task_id=VALIDATE_SITE_DESIGN_DAG_NAME, dag=dag) validate_site_design_failure = SubDagOperator( subdag=validate_site_design_failure_handler( - dag.dag_id, VALIDATION_FAILED_DAG_NAME, - args=default_args), + dag.dag_id, VALIDATION_FAILED_DAG_NAME, args=default_args), task_id=VALIDATION_FAILED_DAG_NAME, trigger_rule=TriggerRule.ONE_FAILED, dag=dag) diff --git a/shipyard_airflow/dags/preflight_checks.py b/shipyard_airflow/dags/preflight_checks.py index dcd351c5..d076e630 100644 --- a/shipyard_airflow/dags/preflight_checks.py +++ b/shipyard_airflow/dags/preflight_checks.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import airflow from airflow.models import DAG from airflow.operators.subdag_operator import SubDagOperator from airflow.operators.dummy_operator import DummyOperator @@ -52,7 +51,10 @@ def shipyard_preflight_check(parent_dag_name, child_dag_name, args): return dag -def deckhand_preflight_check(parent_dag_name, child_dag_name, args, ): +def deckhand_preflight_check( + parent_dag_name, + child_dag_name, + args, ): ''' Checks that deckhand is in a good state for the purposes of the Undercloud Platform to proceed with processing @@ -124,29 +126,26 @@ def all_preflight_checks(parent_dag_name, child_dag_name, args): dag=dag, ) shipyard = SubDagOperator( - subdag=shipyard_preflight_check(dag.dag_id, - SHIPYARD_PREFLIGHT_CHECK_DAG_NAME, - args), + subdag=shipyard_preflight_check( + dag.dag_id, SHIPYARD_PREFLIGHT_CHECK_DAG_NAME, args), task_id=SHIPYARD_PREFLIGHT_CHECK_DAG_NAME, dag=dag, ) deckhand = SubDagOperator( - subdag=deckhand_preflight_check(dag.dag_id, - DECKHAND_PREFLIGHT_CHECK_DAG_NAME, - args), + subdag=deckhand_preflight_check( + dag.dag_id, DECKHAND_PREFLIGHT_CHECK_DAG_NAME, args), task_id=DECKHAND_PREFLIGHT_CHECK_DAG_NAME, dag=dag, ) drydock = SubDagOperator( subdag=drydock_preflight_check(dag.dag_id, - DRYDOCK_PREFLIGHT_CHECK_DAG_NAME, - args), + DRYDOCK_PREFLIGHT_CHECK_DAG_NAME, args), task_id=DRYDOCK_PREFLIGHT_CHECK_DAG_NAME, dag=dag, ) armada = SubDagOperator( - subdag=armada_preflight_check( - dag.dag_id, ARMADA_PREFLIGHT_CHECK_DAG_NAME, args), + subdag=armada_preflight_check(dag.dag_id, + ARMADA_PREFLIGHT_CHECK_DAG_NAME, args), task_id=ARMADA_PREFLIGHT_CHECK_DAG_NAME, dag=dag, ) diff --git a/shipyard_airflow/dags/samples/airflow_task_state_operators.py b/shipyard_airflow/dags/samples/airflow_task_state_operators.py index 7a462cdc..b90c6560 100644 --- a/shipyard_airflow/dags/samples/airflow_task_state_operators.py +++ b/shipyard_airflow/dags/samples/airflow_task_state_operators.py @@ -18,7 +18,7 @@ import airflow from airflow import DAG from airflow.operators import TaskStateOperator from airflow.operators.bash_operator import BashOperator -from datetime import datetime, timedelta +from datetime import timedelta default_args = { 'owner': 'airflow', @@ -31,7 +31,9 @@ default_args = { 'retry_delay': timedelta(minutes=1), } -dag = DAG('airflow_task_state', default_args=default_args, schedule_interval=None) +dag = DAG('airflow_task_state', + default_args=default_args, + schedule_interval=None) # Get Task State t1 = TaskStateOperator( @@ -44,9 +46,9 @@ t1 = TaskStateOperator( # Use XCOM to Retrieve Task State t2 = BashOperator( task_id='pull', - bash_command="echo {{ ti.xcom_pull(task_ids='airflow_task_state', key='task_state') }}", + bash_command=("echo {{ ti.xcom_pull(task_ids='airflow_task_state'," + " key='task_state') }}"), xcom_push=True, dag=dag) t2.set_upstream(t1) - diff --git a/shipyard_airflow/dags/samples/openstack_api_call.py b/shipyard_airflow/dags/samples/openstack_api_call.py index f7abfd59..8a936771 100644 --- a/shipyard_airflow/dags/samples/openstack_api_call.py +++ b/shipyard_airflow/dags/samples/openstack_api_call.py @@ -18,8 +18,7 @@ import airflow from airflow import DAG from airflow.operators import OpenStackOperator from airflow.operators.bash_operator import BashOperator -from datetime import datetime, timedelta - +from datetime import timedelta default_args = { 'owner': 'airflow', @@ -35,13 +34,10 @@ default_args = { dag = DAG('openstack_cli', default_args=default_args, schedule_interval=None) # print_date -t1 = BashOperator( - task_id='print_date', - bash_command='date', - dag=dag) +t1 = BashOperator(task_id='print_date', bash_command='date', dag=dag) -## Note that the openrc.sh file needs to be placed on a volume that can be -## accessed by the containers +# Note that the openrc.sh file needs to be placed on a volume that can be +# accessed by the containers # openstack endpoint list t2 = OpenStackOperator( @@ -75,4 +71,3 @@ t2.set_upstream(t1) t3.set_upstream(t1) t4.set_upstream(t1) t5.set_upstream(t1) - diff --git a/shipyard_airflow/dags/validate_site_design.py b/shipyard_airflow/dags/validate_site_design.py index 0a117548..0e1efaeb 100644 --- a/shipyard_airflow/dags/validate_site_design.py +++ b/shipyard_airflow/dags/validate_site_design.py @@ -12,11 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -import airflow from airflow.models import DAG from airflow.operators.dummy_operator import DummyOperator -from airflow.operators.subdag_operator import SubDagOperator -from airflow.utils.trigger_rule import TriggerRule from airflow.operators import DeckhandOperator from airflow.operators import PlaceholderOperator @@ -47,11 +44,11 @@ def validate_site_design(parent_dag_name, child_dag_name, args): deckhand_validate_docs = DeckhandOperator( task_id='deckhand_validate_site_design', dag=dag) - #TODO () use the real operator here + # TODO () use the real operator here drydock_validate_docs = PlaceholderOperator( task_id='drydock_validate_site_design', dag=dag) - #TODO () use the real operator here + # TODO () use the real operator here armada_validate_docs = PlaceholderOperator( task_id='armada_validate_site_design', dag=dag) diff --git a/shipyard_airflow/plugins/airflow_task_state_operators.py b/shipyard_airflow/plugins/airflow_task_state_operators.py index 43dbb187..56cb84b5 100644 --- a/shipyard_airflow/plugins/airflow_task_state_operators.py +++ b/shipyard_airflow/plugins/airflow_task_state_operators.py @@ -14,14 +14,13 @@ import logging import subprocess -import sys -import os from airflow.exceptions import AirflowException from airflow.models import BaseOperator from airflow.plugins_manager import AirflowPlugin from airflow.utils.decorators import apply_defaults + class TaskStateOperator(BaseOperator): """ Retrieve Task State @@ -29,38 +28,48 @@ class TaskStateOperator(BaseOperator): :airflow_task_id: Task ID :airflow_execution_date: Task Execution Date """ + @apply_defaults def __init__(self, airflow_command=None, airflow_dag_id=None, airflow_task_id=None, airflow_execution_date=None, - *args, **kwargs): + *args, + **kwargs): super(TaskStateOperator, self).__init__(*args, **kwargs) self.airflow_dag_id = airflow_dag_id self.airflow_task_id = airflow_task_id self.airflow_execution_date = airflow_execution_date - self.airflow_command = ['airflow', 'task_state', airflow_dag_id, airflow_task_id, airflow_execution_date] + self.airflow_command = [ + 'airflow', 'task_state', airflow_dag_id, airflow_task_id, + airflow_execution_date + ] def execute(self, context): logging.info("Running Airflow Command: %s", self.airflow_command) # Execute Airflow CLI Command - airflow_cli = subprocess.Popen(self.airflow_command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + airflow_cli = subprocess.Popen( + self.airflow_command, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) # Logs Output - # Filter out logging messages from standard output and keep only the relevant information + # Filter out logging messages from standard output + # and keep only the relevant information line = '' for line in iter(airflow_cli.stdout.readline, b''): line = line.strip() - if line.startswith( '[' ): + if line.startswith('['): pass else: logging.info(line) - # Wait for child process to terminate. Set and return returncode attribute. + # Wait for child process to terminate. + # Set and return returncode attribute. airflow_cli.wait() # Raise Execptions if Task State Command Fails @@ -79,4 +88,3 @@ class TaskStateOperator(BaseOperator): class TaskStatePlugin(AirflowPlugin): name = "task_state_plugin" operators = [TaskStateOperator] - diff --git a/shipyard_airflow/plugins/deckhand_operator.py b/shipyard_airflow/plugins/deckhand_operator.py index e4fdee18..85aa4004 100644 --- a/shipyard_airflow/plugins/deckhand_operator.py +++ b/shipyard_airflow/plugins/deckhand_operator.py @@ -23,7 +23,7 @@ class DeckhandOperator(BaseOperator): Supports interaction with Deckhand. """ - #TODO () remove this special coloring when the operator is done. + # TODO () remove this special coloring when the operator is done. ui_color = '#e8f7e4' @apply_defaults diff --git a/shipyard_airflow/plugins/openstack_operators.py b/shipyard_airflow/plugins/openstack_operators.py index 6575c9e4..caea688c 100644 --- a/shipyard_airflow/plugins/openstack_operators.py +++ b/shipyard_airflow/plugins/openstack_operators.py @@ -14,26 +14,27 @@ import logging import subprocess -import sys -import os from airflow.exceptions import AirflowException from airflow.models import BaseOperator from airflow.plugins_manager import AirflowPlugin from airflow.utils.decorators import apply_defaults + class OpenStackOperator(BaseOperator): """ Performs OpenStack CLI calls :openrc_file: Path of the openrc file :openstack_command: The OpenStack command to be executed """ + @apply_defaults def __init__(self, openrc_file, openstack_command=None, xcom_push=False, - *args, **kwargs): + *args, + **kwargs): super(OpenStackOperator, self).__init__(*args, **kwargs) self.openrc_file = openrc_file @@ -44,13 +45,17 @@ class OpenStackOperator(BaseOperator): logging.info("Running OpenStack Command: %s", self.openstack_command) # Emulate "source" in bash. Sets up environment variables. - pipe = subprocess.Popen(". %s; env" % self.openrc_file, stdout=subprocess.PIPE, shell=True) + pipe = subprocess.Popen( + ". %s; env" % self.openrc_file, stdout=subprocess.PIPE, shell=True) data = pipe.communicate()[0] os_env = dict((line.split("=", 1) for line in data.splitlines())) - - # Execute the OpenStack CLI Command - openstack_cli = subprocess.Popen(self.openstack_command, env=os_env, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + # Execute the OpenStack CLI Command + openstack_cli = subprocess.Popen( + self.openstack_command, + env=os_env, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) # Logs Output logging.info("Output:") @@ -60,18 +65,15 @@ class OpenStackOperator(BaseOperator): line = line.strip() logging.info(line) - - # Wait for child process to terminate. Set and return returncode attribute. + # Wait for child process to terminate. + # Set and return returncode attribute. openstack_cli.wait() logging.info("Command exited with " "return code {0}".format(openstack_cli.returncode)) - # Raise Execptions if OpenStack Command Fails if openstack_cli.returncode: raise AirflowException("OpenStack Command Failed") - - """ Push response to an XCom if xcom_push is True """ @@ -82,4 +84,3 @@ class OpenStackOperator(BaseOperator): class OpenStackCliPlugin(AirflowPlugin): name = "openstack_cli_plugin" operators = [OpenStackOperator] - diff --git a/shipyard_airflow/shipyard.py b/shipyard_airflow/shipyard.py index 6716dc19..f4d8f950 100644 --- a/shipyard_airflow/shipyard.py +++ b/shipyard_airflow/shipyard.py @@ -14,6 +14,7 @@ import logging import shipyard_airflow.control.api as api + def start_shipyard(): # Setup root logger @@ -28,7 +29,9 @@ def start_shipyard(): # Specalized format for API logging logger = logging.getLogger('shipyard.control') logger.propagate = False - formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(user)s - %(req_id)s - %(external_ctx)s - %(message)s') + formatter = logging.Formatter( + ('%(asctime)s - %(levelname)s - %(user)s - %(req_id)s - ' + '%(external_ctx)s - %(message)s')) ch = logging.StreamHandler() ch.setFormatter(formatter) @@ -36,5 +39,5 @@ def start_shipyard(): return api.start_api() -shipyard = start_shipyard() +shipyard = start_shipyard() diff --git a/tox.ini b/tox.ini index fb104908..5ae783bc 100644 --- a/tox.ini +++ b/tox.ini @@ -14,4 +14,10 @@ commands= commands = flake8 {posargs} [flake8] -ignore=E302,H306,D100,D101,D102 \ No newline at end of file +# NOTE(Bryan Strassner) ignoring F841 because of the airflow example pattern +# of naming variables even if they aren't used for DAGs and Operators. +# Doing so adds readability and context in this case. +ignore=E302,H306,D100,D101,D102,F841 +# NOTE(Bryan Strassner) excluding 3rd party code that is brought into the +# codebase. +exclude=*plugins/rest_api_plugin.py,*lib/python*,*egg,.git*,*.md,.tox* \ No newline at end of file