Updates for pep-8 compliance. Changed many files cosmetically

Added a file exclusion to tox.ini, and also excluded rule for
unused named variables.

excluded more things that are system or tox related.

Change-Id: I022b72cbe048e3fe1f70e6017038248e6a2e9538
This commit is contained in:
Bryan Strassner 2017-08-16 15:57:11 -05:00
parent a889877524
commit 6a7522bbb2
22 changed files with 181 additions and 140 deletions

View File

@ -12,11 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import falcon
import json
import requests
from .base import BaseResource
class GetDagStateResource(BaseResource):
authorized_roles = ['user']
@ -27,11 +27,14 @@ class GetDagStateResource(BaseResource):
if 'Error' in web_server_url:
resp.status = falcon.HTTP_500
raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File")
raise falcon.HTTPInternalServerError("Internal Server Error",
"Missing Configuration File")
else:
req_url = '{}/admin/rest_api/api?api=dag_state&dag_id={}&execution_date={}'.format(web_server_url, dag_id, execution_date)
req_url = ('{}/admin/rest_api/api?api=dag_state&dag_id={}'
'&execution_date={}'.format(web_server_url, dag_id,
execution_date))
response = requests.get(req_url).json()
if response["output"]["stderr"]:
resp.status = falcon.HTTP_400
resp.body = response["output"]["stderr"]
@ -39,4 +42,3 @@ class GetDagStateResource(BaseResource):
else:
resp.status = falcon.HTTP_200
resp.body = response["output"]["stdout"]

View File

@ -12,11 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import falcon
import json
import requests
from .base import BaseResource
class GetTaskStatusResource(BaseResource):
authorized_roles = ['user']
@ -27,11 +27,16 @@ class GetTaskStatusResource(BaseResource):
if 'Error' in web_server_url:
resp.status = falcon.HTTP_500
raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File")
raise falcon.HTTPInternalServerError("Internal Server Error",
"Missing Configuration File")
else:
req_url = '{}/admin/rest_api/api?api=task_state&dag_id={}&task_id={}&execution_date={}'.format(web_server_url, dag_id, task_id, execution_date)
req_url = (
'{}/admin/rest_api/api?api=task_state&dag_id={}&task_id={}'
'&execution_date={}'.format(web_server_url, dag_id, task_id,
execution_date))
response = requests.get(req_url).json()
if response["output"]["stderr"]:
resp.status = falcon.HTTP_400
resp.body = response["output"]["stderr"]
@ -39,4 +44,3 @@ class GetTaskStatusResource(BaseResource):
else:
resp.status = falcon.HTTP_200
resp.body = response["output"]["stdout"]

View File

@ -12,11 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import falcon
import json
import requests
from .base import BaseResource
class GetAirflowVersionResource(BaseResource):
authorized_roles = ['user']
@ -27,16 +27,18 @@ class GetAirflowVersionResource(BaseResource):
if 'Error' in web_server_url:
resp.status = falcon.HTTP_500
raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File")
raise falcon.HTTPInternalServerError("Internal Server Error",
"Missing Configuration File")
else:
# Get Airflow Version
req_url = '{}/admin/rest_api/api?api=version'.format(web_server_url)
req_url = '{}/admin/rest_api/api?api=version'.format(
web_server_url)
response = requests.get(req_url).json()
if response["output"]:
resp.status = falcon.HTTP_200
resp.body = response["output"]
else:
self.return_error(resp, falcon.HTTP_400, 'Fail to Retrieve Airflow Version')
self.return_error(resp, falcon.HTTP_400,
'Fail to Retrieve Airflow Version')
return

View File

@ -12,11 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import falcon
import json
import requests
from .base import BaseResource
class ListDagsResource(BaseResource):
authorized_roles = ['user']
@ -27,12 +27,14 @@ class ListDagsResource(BaseResource):
if 'Error' in web_server_url:
resp.status = falcon.HTTP_500
raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File")
raise falcon.HTTPInternalServerError("Internal Server Error",
"Missing Configuration File")
else:
# List available dags
req_url = '{}/admin/rest_api/api?api=list_dags'.format(web_server_url)
req_url = '{}/admin/rest_api/api?api=list_dags'.format(
web_server_url)
response = requests.get(req_url).json()
if response["output"]["stderr"]:
resp.status = falcon.HTTP_400
resp.body = response["output"]["stderr"]
@ -40,4 +42,3 @@ class ListDagsResource(BaseResource):
else:
resp.status = falcon.HTTP_200
resp.body = response["output"]["stdout"]

View File

@ -12,11 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import falcon
import json
import requests
from .base import BaseResource
class ListTasksResource(BaseResource):
authorized_roles = ['user']
@ -27,12 +27,14 @@ class ListTasksResource(BaseResource):
if 'Error' in web_server_url:
resp.status = falcon.HTTP_500
raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File")
raise falcon.HTTPInternalServerError("Internal Server Error",
"Missing Configuration File")
else:
# Retrieve all tasks belonging to a particular Dag
req_url = '{}/admin/rest_api/api?api=list_tasks&dag_id={}'.format(web_server_url, dag_id)
req_url = '{}/admin/rest_api/api?api=list_tasks&dag_id={}'.format(
web_server_url, dag_id)
response = requests.get(req_url).json()
if response["output"]["stderr"]:
resp.status = falcon.HTTP_400
resp.body = response["output"]["stderr"]
@ -40,4 +42,3 @@ class ListTasksResource(BaseResource):
else:
resp.status = falcon.HTTP_200
resp.body = response["output"]["stdout"]

View File

@ -12,12 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import falcon
import json
import requests
from dateutil.parser import parse
from .base import BaseResource
class TriggerDagRunResource(BaseResource):
authorized_roles = ['user']
@ -28,12 +28,15 @@ class TriggerDagRunResource(BaseResource):
if 'Error' in web_server_url:
resp.status = falcon.HTTP_500
raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File")
raise falcon.HTTPInternalServerError("Internal Server Error",
"Missing Configuration File")
else:
req_url = '{}/admin/rest_api/api?api=trigger_dag&dag_id={}&run_id={}'.format(web_server_url, dag_id, run_id)
req_url = ('{}/admin/rest_api/api?api=trigger_dag&dag_id={}'
'&run_id={}'.format(web_server_url, dag_id, run_id))
response = requests.get(req_url).json()
# Returns error response if API call returns response code other than 200
# Returns error response if API call returns
# response code other than 200
if response["http_response_code"] != 200:
resp.status = falcon.HTTP_400
resp.body = response["output"]
@ -41,7 +44,7 @@ class TriggerDagRunResource(BaseResource):
else:
resp.status = falcon.HTTP_200
# Return time of execution so that we can use it to query dag/task status
# Return time of execution so that we can use
# it to query dag/task status
dt = parse(response["response_time"])
resp.body = dt.strftime('%Y-%m-%dT%H:%M:%S')

View File

@ -20,6 +20,7 @@ import logging
from dateutil.parser import parse
from .base import BaseResource
class TriggerDagRunPollResource(BaseResource):
authorized_roles = ['user']
@ -30,11 +31,13 @@ class TriggerDagRunPollResource(BaseResource):
if 'Error' in web_server_url:
resp.status = falcon.HTTP_500
raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File")
raise falcon.HTTPInternalServerError("Internal Server Error",
"Missing Configuration File")
else:
req_url = '{}/admin/rest_api/api?api=trigger_dag&dag_id={}&run_id={}'.format(web_server_url, dag_id, run_id)
req_url = ('{}/admin/rest_api/api?api=trigger_dag&dag_id={}'
'&run_id={}'.format(web_server_url, dag_id, run_id))
response = requests.get(req_url).json()
if response["http_response_code"] != 200:
resp.status = falcon.HTTP_400
resp.body = response["output"]
@ -42,28 +45,35 @@ class TriggerDagRunPollResource(BaseResource):
else:
resp.status = falcon.HTTP_200
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
logging.info("Executing '" + dag_id + "' Dag...")
# Retrieve time of execution so that we can use it to query dag/task status
# Retrieve time of execution so that we
# can use it to query dag/task status
dt = parse(response["response_time"])
exec_date = dt.strftime('%Y-%m-%dT%H:%M:%S')
url = '{}/admin/rest_api/api?api=dag_state&dag_id={}&execution_date={}'.format(web_server_url, dag_id, exec_date)
url = ('{}/admin/rest_api/api'
'?api=dag_state&dag_id={}&execution_date={}'.format(
web_server_url, dag_id, exec_date))
# Back off for 5 seconds before querying the initial state
time.sleep( 5 )
time.sleep(5)
dag_state = requests.get(url).json()
# Remove newline character at the end of the response
dag_state = dag_state["output"]["stdout"].encode('utf8').rstrip()
dag_state = dag_state["output"]["stdout"].encode(
'utf8').rstrip()
while dag_state != 'success':
# Get current state
dag_state = requests.get(url).json()
# Remove newline character at the end of the response
dag_state = dag_state["output"]["stdout"].encode('utf8').rstrip()
dag_state = dag_state["output"]["stdout"].encode(
'utf8').rstrip()
# Logs output of current dag state
logging.info('Current Dag State: ' + dag_state)
@ -71,11 +81,12 @@ class TriggerDagRunPollResource(BaseResource):
if dag_state == 'failed':
resp.status = falcon.HTTP_500
logging.info('Dag Execution Failed')
resp.body = json.dumps({'Error': 'Dag Execution Failed'})
resp.body = json.dumps({
'Error': 'Dag Execution Failed'
})
return
# Wait for 20 seconds before doing a new query
time.sleep( 20 )
time.sleep(20)
logging.info('Dag Successfully Executed')

View File

@ -13,34 +13,42 @@
# limitations under the License.
import falcon
import requests
import json
from .base import BaseResource
class DagRunResource(BaseResource):
authorized_roles = ['user']
def on_post(self, req, resp, dag_id, run_id=None, conf=None, execution_date=None):
def on_post(self,
req,
resp,
dag_id,
run_id=None,
conf=None,
execution_date=None):
# Retrieve URL
web_server_url = self.retrieve_config('base', 'web_server')
if 'Error' in web_server_url:
resp.status = falcon.HTTP_500
raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File")
raise falcon.HTTPInternalServerError("Internal Server Error",
"Missing Configuration File")
else:
req_url = '{}/api/experimental/dags/{}/dag_runs'.format(web_server_url, dag_id)
response = requests.post(req_url,
json={
"run_id": run_id,
"conf": conf,
"execution_date": execution_date,
})
req_url = '{}/api/experimental/dags/{}/dag_runs'.format(
web_server_url, dag_id)
response = requests.post(
req_url,
json={
"run_id": run_id,
"conf": conf,
"execution_date": execution_date,
})
if response.ok:
resp.status = falcon.HTTP_200
else:
self.return_error(resp, falcon.HTTP_400, 'Fail to Execute Dag')
return

View File

@ -14,6 +14,7 @@
import falcon
import logging
class AuthMiddleware(object):
# Authentication
@ -35,8 +36,9 @@ class AuthMiddleware(object):
ctx = req.context
if not resource.authorize_roles(ctx.roles):
raise falcon.HTTPUnauthorized('Authentication required',
('This resource requires an authorized role.'))
raise falcon.HTTPUnauthorized(
'Authentication required',
('This resource requires an authorized role.'))
# Return the username associated with an authenticated token or None
def validate_token(self, token):
@ -55,8 +57,8 @@ class AuthMiddleware(object):
elif username == 'admin':
return ['user', 'admin']
class ContextMiddleware(object):
class ContextMiddleware(object):
def process_request(self, req, resp):
ctx = req.context
@ -70,8 +72,8 @@ class ContextMiddleware(object):
ext_marker = req.get_header('X-Context-Marker')
ctx.set_external_marker(ext_marker if ext_marker is not None else '')
class LoggingMiddleware(object):
class LoggingMiddleware(object):
def __init__(self):
self.logger = logging.getLogger('shipyard.control')
@ -86,4 +88,3 @@ class LoggingMiddleware(object):
resp.append_header('X-Shipyard-Req', ctx.request_id)
self.logger.info("%s - %s" % (req.uri, resp.status), extra=extra)

View File

@ -15,6 +15,7 @@ import falcon
from .base import BaseResource
class RegionsResource(BaseResource):
authorized_roles = ['user']
@ -22,10 +23,10 @@ class RegionsResource(BaseResource):
def on_get(self, req, resp):
resp.status = falcon.HTTP_200
class RegionResource(BaseResource):
authorized_roles = ['user']
def on_get(self, req, resp, region_id):
resp.status = falcon.HTTP_200

View File

@ -17,6 +17,7 @@ import requests
from .base import BaseResource
class TaskResource(BaseResource):
authorized_roles = ['user']
@ -27,9 +28,11 @@ class TaskResource(BaseResource):
if 'Error' in web_server_url:
resp.status = falcon.HTTP_500
raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File")
raise falcon.HTTPInternalServerError("Internal Server Error",
"Missing Configuration File")
else:
req_url = '{}/api/experimental/dags/{}/tasks/{}'.format(web_server_url, dag_id, task_id)
req_url = '{}/api/experimental/dags/{}/tasks/{}'.format(
web_server_url, dag_id, task_id)
task_details = requests.get(req_url).json()
if 'error' in task_details:
@ -39,4 +42,3 @@ class TaskResource(BaseResource):
else:
resp.status = falcon.HTTP_200
resp.body = json.dumps(task_details)

View File

@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import airflow
from airflow.models import DAG
from airflow.operators import PlaceholderOperator
from airflow.operators.dummy_operator import DummyOperator
@ -48,6 +47,7 @@ def dag_concurrency_check_failure_handler(parent_dag_name, child_dag_name,
default_args=args, )
operator = DummyOperator(
task_id='dag_concurrency_check_failure_handler', dag=dag, )
task_id='dag_concurrency_check_failure_handler',
dag=dag, )
return dag

View File

@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import datetime, timedelta
from datetime import timedelta
import airflow
from airflow import DAG
@ -26,8 +26,6 @@ from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators import DeckhandOperator
from airflow.operators import PlaceholderOperator
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime, timedelta
'''
deploy_site is the top-level orchestration DAG for deploying a site using the
Undercloud platform.
@ -57,30 +55,27 @@ default_args = {
dag = DAG(PARENT_DAG_NAME, default_args=default_args, schedule_interval=None)
concurrency_check = SubDagOperator(
subdag=dag_concurrency_check(PARENT_DAG_NAME,
DAG_CONCURRENCY_CHECK_DAG_NAME,
args=default_args),
subdag=dag_concurrency_check(
PARENT_DAG_NAME, DAG_CONCURRENCY_CHECK_DAG_NAME, args=default_args),
task_id=DAG_CONCURRENCY_CHECK_DAG_NAME,
dag=dag, )
concurrency_check_failure_handler = SubDagOperator(
subdag=dag_concurrency_check_failure_handler(
PARENT_DAG_NAME, CONCURRENCY_FAILURE_DAG_NAME,
args=default_args),
PARENT_DAG_NAME, CONCURRENCY_FAILURE_DAG_NAME, args=default_args),
task_id=CONCURRENCY_FAILURE_DAG_NAME,
trigger_rule=TriggerRule.ONE_FAILED,
dag=dag, )
preflight = SubDagOperator(
subdag=all_preflight_checks(PARENT_DAG_NAME, ALL_PREFLIGHT_CHECKS_DAG_NAME,
args=default_args),
subdag=all_preflight_checks(
PARENT_DAG_NAME, ALL_PREFLIGHT_CHECKS_DAG_NAME, args=default_args),
task_id=ALL_PREFLIGHT_CHECKS_DAG_NAME,
dag=dag, )
preflight_failure = SubDagOperator(
subdag=preflight_failure_handler(PARENT_DAG_NAME,
PREFLIGHT_FAILURE_DAG_NAME,
args=default_args),
subdag=preflight_failure_handler(
PARENT_DAG_NAME, PREFLIGHT_FAILURE_DAG_NAME, args=default_args),
task_id=PREFLIGHT_FAILURE_DAG_NAME,
trigger_rule=TriggerRule.ONE_FAILED,
dag=dag, )
@ -89,15 +84,14 @@ get_design_version = DeckhandOperator(
task_id=DECKHAND_GET_DESIGN_VERSION, dag=dag)
validate_site_design = SubDagOperator(
subdag=validate_site_design(PARENT_DAG_NAME, VALIDATE_SITE_DESIGN_DAG_NAME,
args=default_args),
subdag=validate_site_design(
PARENT_DAG_NAME, VALIDATE_SITE_DESIGN_DAG_NAME, args=default_args),
task_id=VALIDATE_SITE_DESIGN_DAG_NAME,
dag=dag)
validate_site_design_failure = SubDagOperator(
subdag=validate_site_design_failure_handler(
dag.dag_id, VALIDATION_FAILED_DAG_NAME,
args=default_args),
dag.dag_id, VALIDATION_FAILED_DAG_NAME, args=default_args),
task_id=VALIDATION_FAILED_DAG_NAME,
trigger_rule=TriggerRule.ONE_FAILED,
dag=dag)

View File

@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import airflow
from airflow.models import DAG
from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators.dummy_operator import DummyOperator
@ -52,7 +51,10 @@ def shipyard_preflight_check(parent_dag_name, child_dag_name, args):
return dag
def deckhand_preflight_check(parent_dag_name, child_dag_name, args, ):
def deckhand_preflight_check(
parent_dag_name,
child_dag_name,
args, ):
'''
Checks that deckhand is in a good state for
the purposes of the Undercloud Platform to proceed with processing
@ -124,29 +126,26 @@ def all_preflight_checks(parent_dag_name, child_dag_name, args):
dag=dag, )
shipyard = SubDagOperator(
subdag=shipyard_preflight_check(dag.dag_id,
SHIPYARD_PREFLIGHT_CHECK_DAG_NAME,
args),
subdag=shipyard_preflight_check(
dag.dag_id, SHIPYARD_PREFLIGHT_CHECK_DAG_NAME, args),
task_id=SHIPYARD_PREFLIGHT_CHECK_DAG_NAME,
dag=dag, )
deckhand = SubDagOperator(
subdag=deckhand_preflight_check(dag.dag_id,
DECKHAND_PREFLIGHT_CHECK_DAG_NAME,
args),
subdag=deckhand_preflight_check(
dag.dag_id, DECKHAND_PREFLIGHT_CHECK_DAG_NAME, args),
task_id=DECKHAND_PREFLIGHT_CHECK_DAG_NAME,
dag=dag, )
drydock = SubDagOperator(
subdag=drydock_preflight_check(dag.dag_id,
DRYDOCK_PREFLIGHT_CHECK_DAG_NAME,
args),
DRYDOCK_PREFLIGHT_CHECK_DAG_NAME, args),
task_id=DRYDOCK_PREFLIGHT_CHECK_DAG_NAME,
dag=dag, )
armada = SubDagOperator(
subdag=armada_preflight_check(
dag.dag_id, ARMADA_PREFLIGHT_CHECK_DAG_NAME, args),
subdag=armada_preflight_check(dag.dag_id,
ARMADA_PREFLIGHT_CHECK_DAG_NAME, args),
task_id=ARMADA_PREFLIGHT_CHECK_DAG_NAME,
dag=dag, )

View File

@ -18,7 +18,7 @@ import airflow
from airflow import DAG
from airflow.operators import TaskStateOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from datetime import timedelta
default_args = {
'owner': 'airflow',
@ -31,7 +31,9 @@ default_args = {
'retry_delay': timedelta(minutes=1),
}
dag = DAG('airflow_task_state', default_args=default_args, schedule_interval=None)
dag = DAG('airflow_task_state',
default_args=default_args,
schedule_interval=None)
# Get Task State
t1 = TaskStateOperator(
@ -44,9 +46,9 @@ t1 = TaskStateOperator(
# Use XCOM to Retrieve Task State
t2 = BashOperator(
task_id='pull',
bash_command="echo {{ ti.xcom_pull(task_ids='airflow_task_state', key='task_state') }}",
bash_command=("echo {{ ti.xcom_pull(task_ids='airflow_task_state',"
" key='task_state') }}"),
xcom_push=True,
dag=dag)
t2.set_upstream(t1)

View File

@ -18,8 +18,7 @@ import airflow
from airflow import DAG
from airflow.operators import OpenStackOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from datetime import timedelta
default_args = {
'owner': 'airflow',
@ -35,13 +34,10 @@ default_args = {
dag = DAG('openstack_cli', default_args=default_args, schedule_interval=None)
# print_date
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t1 = BashOperator(task_id='print_date', bash_command='date', dag=dag)
## Note that the openrc.sh file needs to be placed on a volume that can be
## accessed by the containers
# Note that the openrc.sh file needs to be placed on a volume that can be
# accessed by the containers
# openstack endpoint list
t2 = OpenStackOperator(
@ -75,4 +71,3 @@ t2.set_upstream(t1)
t3.set_upstream(t1)
t4.set_upstream(t1)
t5.set_upstream(t1)

View File

@ -12,11 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import airflow
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.operators import DeckhandOperator
from airflow.operators import PlaceholderOperator
@ -47,11 +44,11 @@ def validate_site_design(parent_dag_name, child_dag_name, args):
deckhand_validate_docs = DeckhandOperator(
task_id='deckhand_validate_site_design', dag=dag)
#TODO () use the real operator here
# TODO () use the real operator here
drydock_validate_docs = PlaceholderOperator(
task_id='drydock_validate_site_design', dag=dag)
#TODO () use the real operator here
# TODO () use the real operator here
armada_validate_docs = PlaceholderOperator(
task_id='armada_validate_site_design', dag=dag)

View File

@ -14,14 +14,13 @@
import logging
import subprocess
import sys
import os
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
class TaskStateOperator(BaseOperator):
"""
Retrieve Task State
@ -29,38 +28,48 @@ class TaskStateOperator(BaseOperator):
:airflow_task_id: Task ID
:airflow_execution_date: Task Execution Date
"""
@apply_defaults
def __init__(self,
airflow_command=None,
airflow_dag_id=None,
airflow_task_id=None,
airflow_execution_date=None,
*args, **kwargs):
*args,
**kwargs):
super(TaskStateOperator, self).__init__(*args, **kwargs)
self.airflow_dag_id = airflow_dag_id
self.airflow_task_id = airflow_task_id
self.airflow_execution_date = airflow_execution_date
self.airflow_command = ['airflow', 'task_state', airflow_dag_id, airflow_task_id, airflow_execution_date]
self.airflow_command = [
'airflow', 'task_state', airflow_dag_id, airflow_task_id,
airflow_execution_date
]
def execute(self, context):
logging.info("Running Airflow Command: %s", self.airflow_command)
# Execute Airflow CLI Command
airflow_cli = subprocess.Popen(self.airflow_command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
airflow_cli = subprocess.Popen(
self.airflow_command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
# Logs Output
# Filter out logging messages from standard output and keep only the relevant information
# Filter out logging messages from standard output
# and keep only the relevant information
line = ''
for line in iter(airflow_cli.stdout.readline, b''):
line = line.strip()
if line.startswith( '[' ):
if line.startswith('['):
pass
else:
logging.info(line)
# Wait for child process to terminate. Set and return returncode attribute.
# Wait for child process to terminate.
# Set and return returncode attribute.
airflow_cli.wait()
# Raise Execptions if Task State Command Fails
@ -79,4 +88,3 @@ class TaskStateOperator(BaseOperator):
class TaskStatePlugin(AirflowPlugin):
name = "task_state_plugin"
operators = [TaskStateOperator]

View File

@ -23,7 +23,7 @@ class DeckhandOperator(BaseOperator):
Supports interaction with Deckhand.
"""
#TODO () remove this special coloring when the operator is done.
# TODO () remove this special coloring when the operator is done.
ui_color = '#e8f7e4'
@apply_defaults

View File

@ -14,26 +14,27 @@
import logging
import subprocess
import sys
import os
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
class OpenStackOperator(BaseOperator):
"""
Performs OpenStack CLI calls
:openrc_file: Path of the openrc file
:openstack_command: The OpenStack command to be executed
"""
@apply_defaults
def __init__(self,
openrc_file,
openstack_command=None,
xcom_push=False,
*args, **kwargs):
*args,
**kwargs):
super(OpenStackOperator, self).__init__(*args, **kwargs)
self.openrc_file = openrc_file
@ -44,13 +45,17 @@ class OpenStackOperator(BaseOperator):
logging.info("Running OpenStack Command: %s", self.openstack_command)
# Emulate "source" in bash. Sets up environment variables.
pipe = subprocess.Popen(". %s; env" % self.openrc_file, stdout=subprocess.PIPE, shell=True)
pipe = subprocess.Popen(
". %s; env" % self.openrc_file, stdout=subprocess.PIPE, shell=True)
data = pipe.communicate()[0]
os_env = dict((line.split("=", 1) for line in data.splitlines()))
# Execute the OpenStack CLI Command
openstack_cli = subprocess.Popen(self.openstack_command, env=os_env, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
# Execute the OpenStack CLI Command
openstack_cli = subprocess.Popen(
self.openstack_command,
env=os_env,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
# Logs Output
logging.info("Output:")
@ -60,18 +65,15 @@ class OpenStackOperator(BaseOperator):
line = line.strip()
logging.info(line)
# Wait for child process to terminate. Set and return returncode attribute.
# Wait for child process to terminate.
# Set and return returncode attribute.
openstack_cli.wait()
logging.info("Command exited with "
"return code {0}".format(openstack_cli.returncode))
# Raise Execptions if OpenStack Command Fails
if openstack_cli.returncode:
raise AirflowException("OpenStack Command Failed")
"""
Push response to an XCom if xcom_push is True
"""
@ -82,4 +84,3 @@ class OpenStackOperator(BaseOperator):
class OpenStackCliPlugin(AirflowPlugin):
name = "openstack_cli_plugin"
operators = [OpenStackOperator]

View File

@ -14,6 +14,7 @@
import logging
import shipyard_airflow.control.api as api
def start_shipyard():
# Setup root logger
@ -28,7 +29,9 @@ def start_shipyard():
# Specalized format for API logging
logger = logging.getLogger('shipyard.control')
logger.propagate = False
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(user)s - %(req_id)s - %(external_ctx)s - %(message)s')
formatter = logging.Formatter(
('%(asctime)s - %(levelname)s - %(user)s - %(req_id)s - '
'%(external_ctx)s - %(message)s'))
ch = logging.StreamHandler()
ch.setFormatter(formatter)
@ -36,5 +39,5 @@ def start_shipyard():
return api.start_api()
shipyard = start_shipyard()
shipyard = start_shipyard()

View File

@ -14,4 +14,10 @@ commands=
commands = flake8 {posargs}
[flake8]
ignore=E302,H306,D100,D101,D102
# NOTE(Bryan Strassner) ignoring F841 because of the airflow example pattern
# of naming variables even if they aren't used for DAGs and Operators.
# Doing so adds readability and context in this case.
ignore=E302,H306,D100,D101,D102,F841
# NOTE(Bryan Strassner) excluding 3rd party code that is brought into the
# codebase.
exclude=*plugins/rest_api_plugin.py,*lib/python*,*egg,.git*,*.md,.tox*