Updates for pep-8 compliance. Changed many files cosmetically

Added a file exclusion to tox.ini, and also excluded rule for
unused named variables.

excluded more things that are system or tox related.

Change-Id: I022b72cbe048e3fe1f70e6017038248e6a2e9538
This commit is contained in:
Bryan Strassner 2017-08-16 15:57:11 -05:00
parent a889877524
commit 6a7522bbb2
22 changed files with 181 additions and 140 deletions

View File

@ -12,11 +12,11 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import falcon import falcon
import json
import requests import requests
from .base import BaseResource from .base import BaseResource
class GetDagStateResource(BaseResource): class GetDagStateResource(BaseResource):
authorized_roles = ['user'] authorized_roles = ['user']
@ -27,11 +27,14 @@ class GetDagStateResource(BaseResource):
if 'Error' in web_server_url: if 'Error' in web_server_url:
resp.status = falcon.HTTP_500 resp.status = falcon.HTTP_500
raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File") raise falcon.HTTPInternalServerError("Internal Server Error",
"Missing Configuration File")
else: else:
req_url = '{}/admin/rest_api/api?api=dag_state&dag_id={}&execution_date={}'.format(web_server_url, dag_id, execution_date) req_url = ('{}/admin/rest_api/api?api=dag_state&dag_id={}'
'&execution_date={}'.format(web_server_url, dag_id,
execution_date))
response = requests.get(req_url).json() response = requests.get(req_url).json()
if response["output"]["stderr"]: if response["output"]["stderr"]:
resp.status = falcon.HTTP_400 resp.status = falcon.HTTP_400
resp.body = response["output"]["stderr"] resp.body = response["output"]["stderr"]
@ -39,4 +42,3 @@ class GetDagStateResource(BaseResource):
else: else:
resp.status = falcon.HTTP_200 resp.status = falcon.HTTP_200
resp.body = response["output"]["stdout"] resp.body = response["output"]["stdout"]

View File

@ -12,11 +12,11 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import falcon import falcon
import json
import requests import requests
from .base import BaseResource from .base import BaseResource
class GetTaskStatusResource(BaseResource): class GetTaskStatusResource(BaseResource):
authorized_roles = ['user'] authorized_roles = ['user']
@ -27,11 +27,16 @@ class GetTaskStatusResource(BaseResource):
if 'Error' in web_server_url: if 'Error' in web_server_url:
resp.status = falcon.HTTP_500 resp.status = falcon.HTTP_500
raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File") raise falcon.HTTPInternalServerError("Internal Server Error",
"Missing Configuration File")
else: else:
req_url = '{}/admin/rest_api/api?api=task_state&dag_id={}&task_id={}&execution_date={}'.format(web_server_url, dag_id, task_id, execution_date)
req_url = (
'{}/admin/rest_api/api?api=task_state&dag_id={}&task_id={}'
'&execution_date={}'.format(web_server_url, dag_id, task_id,
execution_date))
response = requests.get(req_url).json() response = requests.get(req_url).json()
if response["output"]["stderr"]: if response["output"]["stderr"]:
resp.status = falcon.HTTP_400 resp.status = falcon.HTTP_400
resp.body = response["output"]["stderr"] resp.body = response["output"]["stderr"]
@ -39,4 +44,3 @@ class GetTaskStatusResource(BaseResource):
else: else:
resp.status = falcon.HTTP_200 resp.status = falcon.HTTP_200
resp.body = response["output"]["stdout"] resp.body = response["output"]["stdout"]

View File

@ -12,11 +12,11 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import falcon import falcon
import json
import requests import requests
from .base import BaseResource from .base import BaseResource
class GetAirflowVersionResource(BaseResource): class GetAirflowVersionResource(BaseResource):
authorized_roles = ['user'] authorized_roles = ['user']
@ -27,16 +27,18 @@ class GetAirflowVersionResource(BaseResource):
if 'Error' in web_server_url: if 'Error' in web_server_url:
resp.status = falcon.HTTP_500 resp.status = falcon.HTTP_500
raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File") raise falcon.HTTPInternalServerError("Internal Server Error",
"Missing Configuration File")
else: else:
# Get Airflow Version # Get Airflow Version
req_url = '{}/admin/rest_api/api?api=version'.format(web_server_url) req_url = '{}/admin/rest_api/api?api=version'.format(
web_server_url)
response = requests.get(req_url).json() response = requests.get(req_url).json()
if response["output"]: if response["output"]:
resp.status = falcon.HTTP_200 resp.status = falcon.HTTP_200
resp.body = response["output"] resp.body = response["output"]
else: else:
self.return_error(resp, falcon.HTTP_400, 'Fail to Retrieve Airflow Version') self.return_error(resp, falcon.HTTP_400,
'Fail to Retrieve Airflow Version')
return return

View File

@ -12,11 +12,11 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import falcon import falcon
import json
import requests import requests
from .base import BaseResource from .base import BaseResource
class ListDagsResource(BaseResource): class ListDagsResource(BaseResource):
authorized_roles = ['user'] authorized_roles = ['user']
@ -27,12 +27,14 @@ class ListDagsResource(BaseResource):
if 'Error' in web_server_url: if 'Error' in web_server_url:
resp.status = falcon.HTTP_500 resp.status = falcon.HTTP_500
raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File") raise falcon.HTTPInternalServerError("Internal Server Error",
"Missing Configuration File")
else: else:
# List available dags # List available dags
req_url = '{}/admin/rest_api/api?api=list_dags'.format(web_server_url) req_url = '{}/admin/rest_api/api?api=list_dags'.format(
web_server_url)
response = requests.get(req_url).json() response = requests.get(req_url).json()
if response["output"]["stderr"]: if response["output"]["stderr"]:
resp.status = falcon.HTTP_400 resp.status = falcon.HTTP_400
resp.body = response["output"]["stderr"] resp.body = response["output"]["stderr"]
@ -40,4 +42,3 @@ class ListDagsResource(BaseResource):
else: else:
resp.status = falcon.HTTP_200 resp.status = falcon.HTTP_200
resp.body = response["output"]["stdout"] resp.body = response["output"]["stdout"]

View File

@ -12,11 +12,11 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import falcon import falcon
import json
import requests import requests
from .base import BaseResource from .base import BaseResource
class ListTasksResource(BaseResource): class ListTasksResource(BaseResource):
authorized_roles = ['user'] authorized_roles = ['user']
@ -27,12 +27,14 @@ class ListTasksResource(BaseResource):
if 'Error' in web_server_url: if 'Error' in web_server_url:
resp.status = falcon.HTTP_500 resp.status = falcon.HTTP_500
raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File") raise falcon.HTTPInternalServerError("Internal Server Error",
"Missing Configuration File")
else: else:
# Retrieve all tasks belonging to a particular Dag # Retrieve all tasks belonging to a particular Dag
req_url = '{}/admin/rest_api/api?api=list_tasks&dag_id={}'.format(web_server_url, dag_id) req_url = '{}/admin/rest_api/api?api=list_tasks&dag_id={}'.format(
web_server_url, dag_id)
response = requests.get(req_url).json() response = requests.get(req_url).json()
if response["output"]["stderr"]: if response["output"]["stderr"]:
resp.status = falcon.HTTP_400 resp.status = falcon.HTTP_400
resp.body = response["output"]["stderr"] resp.body = response["output"]["stderr"]
@ -40,4 +42,3 @@ class ListTasksResource(BaseResource):
else: else:
resp.status = falcon.HTTP_200 resp.status = falcon.HTTP_200
resp.body = response["output"]["stdout"] resp.body = response["output"]["stdout"]

View File

@ -12,12 +12,12 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import falcon import falcon
import json
import requests import requests
from dateutil.parser import parse from dateutil.parser import parse
from .base import BaseResource from .base import BaseResource
class TriggerDagRunResource(BaseResource): class TriggerDagRunResource(BaseResource):
authorized_roles = ['user'] authorized_roles = ['user']
@ -28,12 +28,15 @@ class TriggerDagRunResource(BaseResource):
if 'Error' in web_server_url: if 'Error' in web_server_url:
resp.status = falcon.HTTP_500 resp.status = falcon.HTTP_500
raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File") raise falcon.HTTPInternalServerError("Internal Server Error",
"Missing Configuration File")
else: else:
req_url = '{}/admin/rest_api/api?api=trigger_dag&dag_id={}&run_id={}'.format(web_server_url, dag_id, run_id) req_url = ('{}/admin/rest_api/api?api=trigger_dag&dag_id={}'
'&run_id={}'.format(web_server_url, dag_id, run_id))
response = requests.get(req_url).json() response = requests.get(req_url).json()
# Returns error response if API call returns response code other than 200 # Returns error response if API call returns
# response code other than 200
if response["http_response_code"] != 200: if response["http_response_code"] != 200:
resp.status = falcon.HTTP_400 resp.status = falcon.HTTP_400
resp.body = response["output"] resp.body = response["output"]
@ -41,7 +44,7 @@ class TriggerDagRunResource(BaseResource):
else: else:
resp.status = falcon.HTTP_200 resp.status = falcon.HTTP_200
# Return time of execution so that we can use it to query dag/task status # Return time of execution so that we can use
# it to query dag/task status
dt = parse(response["response_time"]) dt = parse(response["response_time"])
resp.body = dt.strftime('%Y-%m-%dT%H:%M:%S') resp.body = dt.strftime('%Y-%m-%dT%H:%M:%S')

View File

@ -20,6 +20,7 @@ import logging
from dateutil.parser import parse from dateutil.parser import parse
from .base import BaseResource from .base import BaseResource
class TriggerDagRunPollResource(BaseResource): class TriggerDagRunPollResource(BaseResource):
authorized_roles = ['user'] authorized_roles = ['user']
@ -30,11 +31,13 @@ class TriggerDagRunPollResource(BaseResource):
if 'Error' in web_server_url: if 'Error' in web_server_url:
resp.status = falcon.HTTP_500 resp.status = falcon.HTTP_500
raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File") raise falcon.HTTPInternalServerError("Internal Server Error",
"Missing Configuration File")
else: else:
req_url = '{}/admin/rest_api/api?api=trigger_dag&dag_id={}&run_id={}'.format(web_server_url, dag_id, run_id) req_url = ('{}/admin/rest_api/api?api=trigger_dag&dag_id={}'
'&run_id={}'.format(web_server_url, dag_id, run_id))
response = requests.get(req_url).json() response = requests.get(req_url).json()
if response["http_response_code"] != 200: if response["http_response_code"] != 200:
resp.status = falcon.HTTP_400 resp.status = falcon.HTTP_400
resp.body = response["output"] resp.body = response["output"]
@ -42,28 +45,35 @@ class TriggerDagRunPollResource(BaseResource):
else: else:
resp.status = falcon.HTTP_200 resp.status = falcon.HTTP_200
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
logging.info("Executing '" + dag_id + "' Dag...") logging.info("Executing '" + dag_id + "' Dag...")
# Retrieve time of execution so that we can use it to query dag/task status # Retrieve time of execution so that we
# can use it to query dag/task status
dt = parse(response["response_time"]) dt = parse(response["response_time"])
exec_date = dt.strftime('%Y-%m-%dT%H:%M:%S') exec_date = dt.strftime('%Y-%m-%dT%H:%M:%S')
url = '{}/admin/rest_api/api?api=dag_state&dag_id={}&execution_date={}'.format(web_server_url, dag_id, exec_date) url = ('{}/admin/rest_api/api'
'?api=dag_state&dag_id={}&execution_date={}'.format(
web_server_url, dag_id, exec_date))
# Back off for 5 seconds before querying the initial state # Back off for 5 seconds before querying the initial state
time.sleep( 5 ) time.sleep(5)
dag_state = requests.get(url).json() dag_state = requests.get(url).json()
# Remove newline character at the end of the response # Remove newline character at the end of the response
dag_state = dag_state["output"]["stdout"].encode('utf8').rstrip() dag_state = dag_state["output"]["stdout"].encode(
'utf8').rstrip()
while dag_state != 'success': while dag_state != 'success':
# Get current state # Get current state
dag_state = requests.get(url).json() dag_state = requests.get(url).json()
# Remove newline character at the end of the response # Remove newline character at the end of the response
dag_state = dag_state["output"]["stdout"].encode('utf8').rstrip() dag_state = dag_state["output"]["stdout"].encode(
'utf8').rstrip()
# Logs output of current dag state # Logs output of current dag state
logging.info('Current Dag State: ' + dag_state) logging.info('Current Dag State: ' + dag_state)
@ -71,11 +81,12 @@ class TriggerDagRunPollResource(BaseResource):
if dag_state == 'failed': if dag_state == 'failed':
resp.status = falcon.HTTP_500 resp.status = falcon.HTTP_500
logging.info('Dag Execution Failed') logging.info('Dag Execution Failed')
resp.body = json.dumps({'Error': 'Dag Execution Failed'}) resp.body = json.dumps({
'Error': 'Dag Execution Failed'
})
return return
# Wait for 20 seconds before doing a new query # Wait for 20 seconds before doing a new query
time.sleep( 20 ) time.sleep(20)
logging.info('Dag Successfully Executed') logging.info('Dag Successfully Executed')

View File

@ -13,34 +13,42 @@
# limitations under the License. # limitations under the License.
import falcon import falcon
import requests import requests
import json
from .base import BaseResource from .base import BaseResource
class DagRunResource(BaseResource): class DagRunResource(BaseResource):
authorized_roles = ['user'] authorized_roles = ['user']
def on_post(self, req, resp, dag_id, run_id=None, conf=None, execution_date=None): def on_post(self,
req,
resp,
dag_id,
run_id=None,
conf=None,
execution_date=None):
# Retrieve URL # Retrieve URL
web_server_url = self.retrieve_config('base', 'web_server') web_server_url = self.retrieve_config('base', 'web_server')
if 'Error' in web_server_url: if 'Error' in web_server_url:
resp.status = falcon.HTTP_500 resp.status = falcon.HTTP_500
raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File") raise falcon.HTTPInternalServerError("Internal Server Error",
"Missing Configuration File")
else: else:
req_url = '{}/api/experimental/dags/{}/dag_runs'.format(web_server_url, dag_id) req_url = '{}/api/experimental/dags/{}/dag_runs'.format(
web_server_url, dag_id)
response = requests.post(req_url,
json={ response = requests.post(
"run_id": run_id, req_url,
"conf": conf, json={
"execution_date": execution_date, "run_id": run_id,
}) "conf": conf,
"execution_date": execution_date,
})
if response.ok: if response.ok:
resp.status = falcon.HTTP_200 resp.status = falcon.HTTP_200
else: else:
self.return_error(resp, falcon.HTTP_400, 'Fail to Execute Dag') self.return_error(resp, falcon.HTTP_400, 'Fail to Execute Dag')
return return

View File

@ -14,6 +14,7 @@
import falcon import falcon
import logging import logging
class AuthMiddleware(object): class AuthMiddleware(object):
# Authentication # Authentication
@ -35,8 +36,9 @@ class AuthMiddleware(object):
ctx = req.context ctx = req.context
if not resource.authorize_roles(ctx.roles): if not resource.authorize_roles(ctx.roles):
raise falcon.HTTPUnauthorized('Authentication required', raise falcon.HTTPUnauthorized(
('This resource requires an authorized role.')) 'Authentication required',
('This resource requires an authorized role.'))
# Return the username associated with an authenticated token or None # Return the username associated with an authenticated token or None
def validate_token(self, token): def validate_token(self, token):
@ -55,8 +57,8 @@ class AuthMiddleware(object):
elif username == 'admin': elif username == 'admin':
return ['user', 'admin'] return ['user', 'admin']
class ContextMiddleware(object):
class ContextMiddleware(object):
def process_request(self, req, resp): def process_request(self, req, resp):
ctx = req.context ctx = req.context
@ -70,8 +72,8 @@ class ContextMiddleware(object):
ext_marker = req.get_header('X-Context-Marker') ext_marker = req.get_header('X-Context-Marker')
ctx.set_external_marker(ext_marker if ext_marker is not None else '') ctx.set_external_marker(ext_marker if ext_marker is not None else '')
class LoggingMiddleware(object):
class LoggingMiddleware(object):
def __init__(self): def __init__(self):
self.logger = logging.getLogger('shipyard.control') self.logger = logging.getLogger('shipyard.control')
@ -86,4 +88,3 @@ class LoggingMiddleware(object):
resp.append_header('X-Shipyard-Req', ctx.request_id) resp.append_header('X-Shipyard-Req', ctx.request_id)
self.logger.info("%s - %s" % (req.uri, resp.status), extra=extra) self.logger.info("%s - %s" % (req.uri, resp.status), extra=extra)

View File

@ -15,6 +15,7 @@ import falcon
from .base import BaseResource from .base import BaseResource
class RegionsResource(BaseResource): class RegionsResource(BaseResource):
authorized_roles = ['user'] authorized_roles = ['user']
@ -22,10 +23,10 @@ class RegionsResource(BaseResource):
def on_get(self, req, resp): def on_get(self, req, resp):
resp.status = falcon.HTTP_200 resp.status = falcon.HTTP_200
class RegionResource(BaseResource): class RegionResource(BaseResource):
authorized_roles = ['user'] authorized_roles = ['user']
def on_get(self, req, resp, region_id): def on_get(self, req, resp, region_id):
resp.status = falcon.HTTP_200 resp.status = falcon.HTTP_200

View File

@ -17,6 +17,7 @@ import requests
from .base import BaseResource from .base import BaseResource
class TaskResource(BaseResource): class TaskResource(BaseResource):
authorized_roles = ['user'] authorized_roles = ['user']
@ -27,9 +28,11 @@ class TaskResource(BaseResource):
if 'Error' in web_server_url: if 'Error' in web_server_url:
resp.status = falcon.HTTP_500 resp.status = falcon.HTTP_500
raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File") raise falcon.HTTPInternalServerError("Internal Server Error",
"Missing Configuration File")
else: else:
req_url = '{}/api/experimental/dags/{}/tasks/{}'.format(web_server_url, dag_id, task_id) req_url = '{}/api/experimental/dags/{}/tasks/{}'.format(
web_server_url, dag_id, task_id)
task_details = requests.get(req_url).json() task_details = requests.get(req_url).json()
if 'error' in task_details: if 'error' in task_details:
@ -39,4 +42,3 @@ class TaskResource(BaseResource):
else: else:
resp.status = falcon.HTTP_200 resp.status = falcon.HTTP_200
resp.body = json.dumps(task_details) resp.body = json.dumps(task_details)

View File

@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import airflow
from airflow.models import DAG from airflow.models import DAG
from airflow.operators import PlaceholderOperator from airflow.operators import PlaceholderOperator
from airflow.operators.dummy_operator import DummyOperator from airflow.operators.dummy_operator import DummyOperator
@ -48,6 +47,7 @@ def dag_concurrency_check_failure_handler(parent_dag_name, child_dag_name,
default_args=args, ) default_args=args, )
operator = DummyOperator( operator = DummyOperator(
task_id='dag_concurrency_check_failure_handler', dag=dag, ) task_id='dag_concurrency_check_failure_handler',
dag=dag, )
return dag return dag

View File

@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from datetime import datetime, timedelta from datetime import timedelta
import airflow import airflow
from airflow import DAG from airflow import DAG
@ -26,8 +26,6 @@ from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators import DeckhandOperator from airflow.operators import DeckhandOperator
from airflow.operators import PlaceholderOperator from airflow.operators import PlaceholderOperator
from airflow.utils.trigger_rule import TriggerRule from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime, timedelta
''' '''
deploy_site is the top-level orchestration DAG for deploying a site using the deploy_site is the top-level orchestration DAG for deploying a site using the
Undercloud platform. Undercloud platform.
@ -57,30 +55,27 @@ default_args = {
dag = DAG(PARENT_DAG_NAME, default_args=default_args, schedule_interval=None) dag = DAG(PARENT_DAG_NAME, default_args=default_args, schedule_interval=None)
concurrency_check = SubDagOperator( concurrency_check = SubDagOperator(
subdag=dag_concurrency_check(PARENT_DAG_NAME, subdag=dag_concurrency_check(
DAG_CONCURRENCY_CHECK_DAG_NAME, PARENT_DAG_NAME, DAG_CONCURRENCY_CHECK_DAG_NAME, args=default_args),
args=default_args),
task_id=DAG_CONCURRENCY_CHECK_DAG_NAME, task_id=DAG_CONCURRENCY_CHECK_DAG_NAME,
dag=dag, ) dag=dag, )
concurrency_check_failure_handler = SubDagOperator( concurrency_check_failure_handler = SubDagOperator(
subdag=dag_concurrency_check_failure_handler( subdag=dag_concurrency_check_failure_handler(
PARENT_DAG_NAME, CONCURRENCY_FAILURE_DAG_NAME, PARENT_DAG_NAME, CONCURRENCY_FAILURE_DAG_NAME, args=default_args),
args=default_args),
task_id=CONCURRENCY_FAILURE_DAG_NAME, task_id=CONCURRENCY_FAILURE_DAG_NAME,
trigger_rule=TriggerRule.ONE_FAILED, trigger_rule=TriggerRule.ONE_FAILED,
dag=dag, ) dag=dag, )
preflight = SubDagOperator( preflight = SubDagOperator(
subdag=all_preflight_checks(PARENT_DAG_NAME, ALL_PREFLIGHT_CHECKS_DAG_NAME, subdag=all_preflight_checks(
args=default_args), PARENT_DAG_NAME, ALL_PREFLIGHT_CHECKS_DAG_NAME, args=default_args),
task_id=ALL_PREFLIGHT_CHECKS_DAG_NAME, task_id=ALL_PREFLIGHT_CHECKS_DAG_NAME,
dag=dag, ) dag=dag, )
preflight_failure = SubDagOperator( preflight_failure = SubDagOperator(
subdag=preflight_failure_handler(PARENT_DAG_NAME, subdag=preflight_failure_handler(
PREFLIGHT_FAILURE_DAG_NAME, PARENT_DAG_NAME, PREFLIGHT_FAILURE_DAG_NAME, args=default_args),
args=default_args),
task_id=PREFLIGHT_FAILURE_DAG_NAME, task_id=PREFLIGHT_FAILURE_DAG_NAME,
trigger_rule=TriggerRule.ONE_FAILED, trigger_rule=TriggerRule.ONE_FAILED,
dag=dag, ) dag=dag, )
@ -89,15 +84,14 @@ get_design_version = DeckhandOperator(
task_id=DECKHAND_GET_DESIGN_VERSION, dag=dag) task_id=DECKHAND_GET_DESIGN_VERSION, dag=dag)
validate_site_design = SubDagOperator( validate_site_design = SubDagOperator(
subdag=validate_site_design(PARENT_DAG_NAME, VALIDATE_SITE_DESIGN_DAG_NAME, subdag=validate_site_design(
args=default_args), PARENT_DAG_NAME, VALIDATE_SITE_DESIGN_DAG_NAME, args=default_args),
task_id=VALIDATE_SITE_DESIGN_DAG_NAME, task_id=VALIDATE_SITE_DESIGN_DAG_NAME,
dag=dag) dag=dag)
validate_site_design_failure = SubDagOperator( validate_site_design_failure = SubDagOperator(
subdag=validate_site_design_failure_handler( subdag=validate_site_design_failure_handler(
dag.dag_id, VALIDATION_FAILED_DAG_NAME, dag.dag_id, VALIDATION_FAILED_DAG_NAME, args=default_args),
args=default_args),
task_id=VALIDATION_FAILED_DAG_NAME, task_id=VALIDATION_FAILED_DAG_NAME,
trigger_rule=TriggerRule.ONE_FAILED, trigger_rule=TriggerRule.ONE_FAILED,
dag=dag) dag=dag)

View File

@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import airflow
from airflow.models import DAG from airflow.models import DAG
from airflow.operators.subdag_operator import SubDagOperator from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators.dummy_operator import DummyOperator from airflow.operators.dummy_operator import DummyOperator
@ -52,7 +51,10 @@ def shipyard_preflight_check(parent_dag_name, child_dag_name, args):
return dag return dag
def deckhand_preflight_check(parent_dag_name, child_dag_name, args, ): def deckhand_preflight_check(
parent_dag_name,
child_dag_name,
args, ):
''' '''
Checks that deckhand is in a good state for Checks that deckhand is in a good state for
the purposes of the Undercloud Platform to proceed with processing the purposes of the Undercloud Platform to proceed with processing
@ -124,29 +126,26 @@ def all_preflight_checks(parent_dag_name, child_dag_name, args):
dag=dag, ) dag=dag, )
shipyard = SubDagOperator( shipyard = SubDagOperator(
subdag=shipyard_preflight_check(dag.dag_id, subdag=shipyard_preflight_check(
SHIPYARD_PREFLIGHT_CHECK_DAG_NAME, dag.dag_id, SHIPYARD_PREFLIGHT_CHECK_DAG_NAME, args),
args),
task_id=SHIPYARD_PREFLIGHT_CHECK_DAG_NAME, task_id=SHIPYARD_PREFLIGHT_CHECK_DAG_NAME,
dag=dag, ) dag=dag, )
deckhand = SubDagOperator( deckhand = SubDagOperator(
subdag=deckhand_preflight_check(dag.dag_id, subdag=deckhand_preflight_check(
DECKHAND_PREFLIGHT_CHECK_DAG_NAME, dag.dag_id, DECKHAND_PREFLIGHT_CHECK_DAG_NAME, args),
args),
task_id=DECKHAND_PREFLIGHT_CHECK_DAG_NAME, task_id=DECKHAND_PREFLIGHT_CHECK_DAG_NAME,
dag=dag, ) dag=dag, )
drydock = SubDagOperator( drydock = SubDagOperator(
subdag=drydock_preflight_check(dag.dag_id, subdag=drydock_preflight_check(dag.dag_id,
DRYDOCK_PREFLIGHT_CHECK_DAG_NAME, DRYDOCK_PREFLIGHT_CHECK_DAG_NAME, args),
args),
task_id=DRYDOCK_PREFLIGHT_CHECK_DAG_NAME, task_id=DRYDOCK_PREFLIGHT_CHECK_DAG_NAME,
dag=dag, ) dag=dag, )
armada = SubDagOperator( armada = SubDagOperator(
subdag=armada_preflight_check( subdag=armada_preflight_check(dag.dag_id,
dag.dag_id, ARMADA_PREFLIGHT_CHECK_DAG_NAME, args), ARMADA_PREFLIGHT_CHECK_DAG_NAME, args),
task_id=ARMADA_PREFLIGHT_CHECK_DAG_NAME, task_id=ARMADA_PREFLIGHT_CHECK_DAG_NAME,
dag=dag, ) dag=dag, )

View File

@ -18,7 +18,7 @@ import airflow
from airflow import DAG from airflow import DAG
from airflow.operators import TaskStateOperator from airflow.operators import TaskStateOperator
from airflow.operators.bash_operator import BashOperator from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta from datetime import timedelta
default_args = { default_args = {
'owner': 'airflow', 'owner': 'airflow',
@ -31,7 +31,9 @@ default_args = {
'retry_delay': timedelta(minutes=1), 'retry_delay': timedelta(minutes=1),
} }
dag = DAG('airflow_task_state', default_args=default_args, schedule_interval=None) dag = DAG('airflow_task_state',
default_args=default_args,
schedule_interval=None)
# Get Task State # Get Task State
t1 = TaskStateOperator( t1 = TaskStateOperator(
@ -44,9 +46,9 @@ t1 = TaskStateOperator(
# Use XCOM to Retrieve Task State # Use XCOM to Retrieve Task State
t2 = BashOperator( t2 = BashOperator(
task_id='pull', task_id='pull',
bash_command="echo {{ ti.xcom_pull(task_ids='airflow_task_state', key='task_state') }}", bash_command=("echo {{ ti.xcom_pull(task_ids='airflow_task_state',"
" key='task_state') }}"),
xcom_push=True, xcom_push=True,
dag=dag) dag=dag)
t2.set_upstream(t1) t2.set_upstream(t1)

View File

@ -18,8 +18,7 @@ import airflow
from airflow import DAG from airflow import DAG
from airflow.operators import OpenStackOperator from airflow.operators import OpenStackOperator
from airflow.operators.bash_operator import BashOperator from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta from datetime import timedelta
default_args = { default_args = {
'owner': 'airflow', 'owner': 'airflow',
@ -35,13 +34,10 @@ default_args = {
dag = DAG('openstack_cli', default_args=default_args, schedule_interval=None) dag = DAG('openstack_cli', default_args=default_args, schedule_interval=None)
# print_date # print_date
t1 = BashOperator( t1 = BashOperator(task_id='print_date', bash_command='date', dag=dag)
task_id='print_date',
bash_command='date',
dag=dag)
## Note that the openrc.sh file needs to be placed on a volume that can be # Note that the openrc.sh file needs to be placed on a volume that can be
## accessed by the containers # accessed by the containers
# openstack endpoint list # openstack endpoint list
t2 = OpenStackOperator( t2 = OpenStackOperator(
@ -75,4 +71,3 @@ t2.set_upstream(t1)
t3.set_upstream(t1) t3.set_upstream(t1)
t4.set_upstream(t1) t4.set_upstream(t1)
t5.set_upstream(t1) t5.set_upstream(t1)

View File

@ -12,11 +12,8 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import airflow
from airflow.models import DAG from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.operators import DeckhandOperator from airflow.operators import DeckhandOperator
from airflow.operators import PlaceholderOperator from airflow.operators import PlaceholderOperator
@ -47,11 +44,11 @@ def validate_site_design(parent_dag_name, child_dag_name, args):
deckhand_validate_docs = DeckhandOperator( deckhand_validate_docs = DeckhandOperator(
task_id='deckhand_validate_site_design', dag=dag) task_id='deckhand_validate_site_design', dag=dag)
#TODO () use the real operator here # TODO () use the real operator here
drydock_validate_docs = PlaceholderOperator( drydock_validate_docs = PlaceholderOperator(
task_id='drydock_validate_site_design', dag=dag) task_id='drydock_validate_site_design', dag=dag)
#TODO () use the real operator here # TODO () use the real operator here
armada_validate_docs = PlaceholderOperator( armada_validate_docs = PlaceholderOperator(
task_id='armada_validate_site_design', dag=dag) task_id='armada_validate_site_design', dag=dag)

View File

@ -14,14 +14,13 @@
import logging import logging
import subprocess import subprocess
import sys
import os
from airflow.exceptions import AirflowException from airflow.exceptions import AirflowException
from airflow.models import BaseOperator from airflow.models import BaseOperator
from airflow.plugins_manager import AirflowPlugin from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults from airflow.utils.decorators import apply_defaults
class TaskStateOperator(BaseOperator): class TaskStateOperator(BaseOperator):
""" """
Retrieve Task State Retrieve Task State
@ -29,38 +28,48 @@ class TaskStateOperator(BaseOperator):
:airflow_task_id: Task ID :airflow_task_id: Task ID
:airflow_execution_date: Task Execution Date :airflow_execution_date: Task Execution Date
""" """
@apply_defaults @apply_defaults
def __init__(self, def __init__(self,
airflow_command=None, airflow_command=None,
airflow_dag_id=None, airflow_dag_id=None,
airflow_task_id=None, airflow_task_id=None,
airflow_execution_date=None, airflow_execution_date=None,
*args, **kwargs): *args,
**kwargs):
super(TaskStateOperator, self).__init__(*args, **kwargs) super(TaskStateOperator, self).__init__(*args, **kwargs)
self.airflow_dag_id = airflow_dag_id self.airflow_dag_id = airflow_dag_id
self.airflow_task_id = airflow_task_id self.airflow_task_id = airflow_task_id
self.airflow_execution_date = airflow_execution_date self.airflow_execution_date = airflow_execution_date
self.airflow_command = ['airflow', 'task_state', airflow_dag_id, airflow_task_id, airflow_execution_date] self.airflow_command = [
'airflow', 'task_state', airflow_dag_id, airflow_task_id,
airflow_execution_date
]
def execute(self, context): def execute(self, context):
logging.info("Running Airflow Command: %s", self.airflow_command) logging.info("Running Airflow Command: %s", self.airflow_command)
# Execute Airflow CLI Command # Execute Airflow CLI Command
airflow_cli = subprocess.Popen(self.airflow_command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) airflow_cli = subprocess.Popen(
self.airflow_command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
# Logs Output # Logs Output
# Filter out logging messages from standard output and keep only the relevant information # Filter out logging messages from standard output
# and keep only the relevant information
line = '' line = ''
for line in iter(airflow_cli.stdout.readline, b''): for line in iter(airflow_cli.stdout.readline, b''):
line = line.strip() line = line.strip()
if line.startswith( '[' ): if line.startswith('['):
pass pass
else: else:
logging.info(line) logging.info(line)
# Wait for child process to terminate. Set and return returncode attribute. # Wait for child process to terminate.
# Set and return returncode attribute.
airflow_cli.wait() airflow_cli.wait()
# Raise Execptions if Task State Command Fails # Raise Execptions if Task State Command Fails
@ -79,4 +88,3 @@ class TaskStateOperator(BaseOperator):
class TaskStatePlugin(AirflowPlugin): class TaskStatePlugin(AirflowPlugin):
name = "task_state_plugin" name = "task_state_plugin"
operators = [TaskStateOperator] operators = [TaskStateOperator]

View File

@ -23,7 +23,7 @@ class DeckhandOperator(BaseOperator):
Supports interaction with Deckhand. Supports interaction with Deckhand.
""" """
#TODO () remove this special coloring when the operator is done. # TODO () remove this special coloring when the operator is done.
ui_color = '#e8f7e4' ui_color = '#e8f7e4'
@apply_defaults @apply_defaults

View File

@ -14,26 +14,27 @@
import logging import logging
import subprocess import subprocess
import sys
import os
from airflow.exceptions import AirflowException from airflow.exceptions import AirflowException
from airflow.models import BaseOperator from airflow.models import BaseOperator
from airflow.plugins_manager import AirflowPlugin from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults from airflow.utils.decorators import apply_defaults
class OpenStackOperator(BaseOperator): class OpenStackOperator(BaseOperator):
""" """
Performs OpenStack CLI calls Performs OpenStack CLI calls
:openrc_file: Path of the openrc file :openrc_file: Path of the openrc file
:openstack_command: The OpenStack command to be executed :openstack_command: The OpenStack command to be executed
""" """
@apply_defaults @apply_defaults
def __init__(self, def __init__(self,
openrc_file, openrc_file,
openstack_command=None, openstack_command=None,
xcom_push=False, xcom_push=False,
*args, **kwargs): *args,
**kwargs):
super(OpenStackOperator, self).__init__(*args, **kwargs) super(OpenStackOperator, self).__init__(*args, **kwargs)
self.openrc_file = openrc_file self.openrc_file = openrc_file
@ -44,13 +45,17 @@ class OpenStackOperator(BaseOperator):
logging.info("Running OpenStack Command: %s", self.openstack_command) logging.info("Running OpenStack Command: %s", self.openstack_command)
# Emulate "source" in bash. Sets up environment variables. # Emulate "source" in bash. Sets up environment variables.
pipe = subprocess.Popen(". %s; env" % self.openrc_file, stdout=subprocess.PIPE, shell=True) pipe = subprocess.Popen(
". %s; env" % self.openrc_file, stdout=subprocess.PIPE, shell=True)
data = pipe.communicate()[0] data = pipe.communicate()[0]
os_env = dict((line.split("=", 1) for line in data.splitlines())) os_env = dict((line.split("=", 1) for line in data.splitlines()))
# Execute the OpenStack CLI Command
openstack_cli = subprocess.Popen(self.openstack_command, env=os_env, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
# Execute the OpenStack CLI Command
openstack_cli = subprocess.Popen(
self.openstack_command,
env=os_env,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
# Logs Output # Logs Output
logging.info("Output:") logging.info("Output:")
@ -60,18 +65,15 @@ class OpenStackOperator(BaseOperator):
line = line.strip() line = line.strip()
logging.info(line) logging.info(line)
# Wait for child process to terminate.
# Wait for child process to terminate. Set and return returncode attribute. # Set and return returncode attribute.
openstack_cli.wait() openstack_cli.wait()
logging.info("Command exited with " logging.info("Command exited with "
"return code {0}".format(openstack_cli.returncode)) "return code {0}".format(openstack_cli.returncode))
# Raise Execptions if OpenStack Command Fails # Raise Execptions if OpenStack Command Fails
if openstack_cli.returncode: if openstack_cli.returncode:
raise AirflowException("OpenStack Command Failed") raise AirflowException("OpenStack Command Failed")
""" """
Push response to an XCom if xcom_push is True Push response to an XCom if xcom_push is True
""" """
@ -82,4 +84,3 @@ class OpenStackOperator(BaseOperator):
class OpenStackCliPlugin(AirflowPlugin): class OpenStackCliPlugin(AirflowPlugin):
name = "openstack_cli_plugin" name = "openstack_cli_plugin"
operators = [OpenStackOperator] operators = [OpenStackOperator]

View File

@ -14,6 +14,7 @@
import logging import logging
import shipyard_airflow.control.api as api import shipyard_airflow.control.api as api
def start_shipyard(): def start_shipyard():
# Setup root logger # Setup root logger
@ -28,7 +29,9 @@ def start_shipyard():
# Specalized format for API logging # Specalized format for API logging
logger = logging.getLogger('shipyard.control') logger = logging.getLogger('shipyard.control')
logger.propagate = False logger.propagate = False
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(user)s - %(req_id)s - %(external_ctx)s - %(message)s') formatter = logging.Formatter(
('%(asctime)s - %(levelname)s - %(user)s - %(req_id)s - '
'%(external_ctx)s - %(message)s'))
ch = logging.StreamHandler() ch = logging.StreamHandler()
ch.setFormatter(formatter) ch.setFormatter(formatter)
@ -36,5 +39,5 @@ def start_shipyard():
return api.start_api() return api.start_api()
shipyard = start_shipyard()
shipyard = start_shipyard()

View File

@ -14,4 +14,10 @@ commands=
commands = flake8 {posargs} commands = flake8 {posargs}
[flake8] [flake8]
ignore=E302,H306,D100,D101,D102 # NOTE(Bryan Strassner) ignoring F841 because of the airflow example pattern
# of naming variables even if they aren't used for DAGs and Operators.
# Doing so adds readability and context in this case.
ignore=E302,H306,D100,D101,D102,F841
# NOTE(Bryan Strassner) excluding 3rd party code that is brought into the
# codebase.
exclude=*plugins/rest_api_plugin.py,*lib/python*,*egg,.git*,*.md,.tox*