Implement the inquiry API for Shipyard.

This allows for query and browsing of all items that have been
invoked via Airflow, whether through the Shipyard front door or not.

Processing for this set of endpoints has been placed in a subdirectory
of the control directory. A later refactoring will move the other API
resources and helpers to their own respective directories.

Change-Id: I7242fba80ad745ef5bcd41d7d07a320dfebb1dd7
This commit is contained in:
Bryan Strassner 2017-10-09 10:02:15 -04:00
parent 90b292f209
commit 2168b3f00b
13 changed files with 768 additions and 16 deletions

View File

@ -604,12 +604,6 @@ The resource that represents DAGs (workflows) in airflow
#### Entity Structure
A list of objects representing the DAGs that have run in airflow.
```
[
{TBD},
...
]
```
#### GET /v1.0/workflows
Queries airflow for DAGs that are running or have run (successfully or
@ -621,18 +615,141 @@ optional, a boundary in the past within which to retrieve results. Default is
##### Responses
* 200 OK
##### Example
Note the workflow_id values, these can be used for drilldown.
```
curl -D - -X GET $URL/api/v1.0/workflows -H "X-Auth-Token:$TOKEN"
HTTP/1.1 200 OK
content-type: application/json; charset=UTF-8
x-shipyard-req: 3ab4ccc6-b956-4c7a-9ae6-183c562d8297
[
{
"execution_date": "2017-10-09 21:18:56",
"end_date": null,
"workflow_id": "deploy_site__2017-10-09T21:18:56.000000",
"start_date": "2017-10-09 21:18:56.685999",
"external_trigger": true,
"dag_id": "deploy_site",
"state": "failed",
"run_id": "manual__2017-10-09T21:18:56"
},
{
"execution_date": "2017-10-09 21:19:03",
"end_date": null,
"workflow_id": "deploy_site__2017-10-09T21:19:03.000000",
"start_date": "2017-10-09 21:19:03.361522",
"external_trigger": true,
"dag_id": "deploy_site",
"state": "failed",
"run_id": "manual__2017-10-09T21:19:03"
}
...
]
```
---
### /v1.0/workflows/{id}
### /v1.0/workflows/{workflow_id}
#### Entity Structure
An object representing the information available from airflow regarding a DAG's
execution
```
{ TBD }
```
#### GET /v1.0/workflows/{id}
Further details of a particular scheduled DAG's output
Further details of a particular workflow's steps. All steps of all sub-dags
will be included in the list of steps, as well as section indicating the
sub-dags for this parent workflow.
##### Responses
* 200 OK
##### Example
Note that sub_dags can be queried to restrict to only that sub-dag's steps.
e.g. using this as {workflow_id}:
deploy_site.preflight.armada_preflight_check__2017-10-09T21:19:03.000000
```
curl -D - \
-X GET $URL/api/v1.0/workflows/deploy_site__2017-10-09T21:19:03.000000 \
-H "X-Auth-Token:$TOKEN"
HTTP/1.1 200 OK
content-type: application/json; charset=UTF-8
x-shipyard-req: 98d71530-816a-4692-9df2-68f22c057467
{
"execution_date": "2017-10-09 21:19:03",
"end_date": null,
"workflow_id": "deploy_site__2017-10-09T21:19:03.000000",
"start_date": "2017-10-09 21:19:03.361522",
"external_trigger": true,
"steps": [
{
"end_date": "2017-10-09 21:19:14.916220",
"task_id": "action_xcom",
"start_date": "2017-10-09 21:19:14.798053",
"duration": 0.118167,
"queued_dttm": "2017-10-09 21:19:08.432582",
"try_number": 1,
"state": "success",
"operator": "PythonOperator",
"dag_id": "deploy_site",
"execution_date": "2017-10-09 21:19:03"
},
{
"end_date": "2017-10-09 21:19:25.283785",
"task_id": "dag_concurrency_check",
"start_date": "2017-10-09 21:19:25.181492",
"duration": 0.102293,
"queued_dttm": "2017-10-09 21:19:19.283132",
"try_number": 1,
"state": "success",
"operator": "ConcurrencyCheckOperator",
"dag_id": "deploy_site",
"execution_date": "2017-10-09 21:19:03"
},
{
"end_date": "2017-10-09 21:20:05.394677",
"task_id": "preflight",
"start_date": "2017-10-09 21:19:34.994775",
"duration": 30.399902,
"queued_dttm": "2017-10-09 21:19:28.449848",
"try_number": 1,
"state": "failed",
"operator": "SubDagOperator",
"dag_id": "deploy_site",
"execution_date": "2017-10-09 21:19:03"
},
...
],
"dag_id": "deploy_site",
"state": "failed",
"run_id": "manual__2017-10-09T21:19:03",
"sub_dags": [
{
"execution_date": "2017-10-09 21:19:03",
"end_date": null,
"workflow_id": "deploy_site.preflight__2017-10-09T21:19:03.000000",
"start_date": "2017-10-09 21:19:35.082479",
"external_trigger": false,
"dag_id": "deploy_site.preflight",
"state": "failed",
"run_id": "backfill_2017-10-09T21:19:03"
},
...,
{
"execution_date": "2017-10-09 21:19:03",
"end_date": null,
"workflow_id": "deploy_site.preflight.armada_preflight_check__2017-10-09T21:19:03.000000",
"start_date": "2017-10-09 21:19:48.265023",
"external_trigger": false,
"dag_id": "deploy_site.preflight.armada_preflight_check",
"state": "failed",
"run_id": "backfill_2017-10-09T21:19:03"
}
]
}
```

View File

@ -13,6 +13,7 @@
# limitations under the License.
alembic==0.9.5
arrow==0.10.0
configparser==3.5.0
falcon==1.2.0
jsonschema==2.6.0

View File

@ -173,7 +173,7 @@ class ActionsResource(BaseResource):
def get_all_actions_db(self):
"""
Wrapper for call to the shipyard database to get all actions
:returns: a dictionary of dictionaries keyed by action id
:returns: a list of dictionaries keyed by action id
"""
return SHIPYARD_DB.get_all_submitted_actions()
@ -192,8 +192,7 @@ class ActionsResource(BaseResource):
def get_all_dag_runs_db(self):
"""
Wrapper for call to the airflow db to get all dag runs
:returns: a dictionary of dictionaries keyed by dag_id and
execution_date
:returns: a list of dictionaries representing dag runs in airflow
"""
return AIRFLOW_DB.get_all_dag_runs()

View File

@ -0,0 +1,188 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Helper class for workflows/inquiry API classes, encapsulating data access
"""
import logging
import arrow
from arrow.parser import ParserError
from shipyard_airflow.db.db import AIRFLOW_DB
LOG = logging.getLogger(__name__)
class WorkflowHelper(object):
"""
WorkflowHelper provides a layer to represent encapsulation of data
access for the Airflow Monitoring API.
A new WorkflowHelper is intended to be used for each invocation
of the service, so as to keep the context_marker associated
with the invoking user.
"""
def __init__(self, context_marker):
""" Sets up this helper with the supplied context marker
:param context_marker: a UUID marker used for correlation
"""
self.context_marker = context_marker
@staticmethod
def _add_id_to_run(dag_run):
# generates the id values for the dag_run and returns it
time_string = None
if dag_run.get('execution_date') is not None:
dag_exec_tm = arrow.get(dag_run.get('execution_date'))
time_string = (dag_exec_tm.format('YYYY-MM-DDTHH:mm:ss.SSSSSS'))
dag_run['workflow_id'] = '{}__{}'.format(
dag_run.get('dag_id'),
time_string
)
return dag_run
@staticmethod
def _split_workflow_id_to_dag_run(workflow_id):
# translates standard {dag_id}__{execution_date} format to a
# dictionary:
# {'dag_id': <dag_id>, 'execution_date':<execution_date> }
if '__' in workflow_id:
dag_id, execution_date = workflow_id.rsplit('__', 1)
return {
'dag_id': dag_id,
'execution_date': execution_date
}
return {}
@staticmethod
def validate_workflow_id(workflow_id):
"""
checks that the workflow_id contains the separator '__' to pass
minimal identificaiton as a valid workflow_id:
{dag_id}__{execution_date}
"""
return workflow_id is not None and '__' in workflow_id
@staticmethod
def _get_threshold_date(since_iso8601=None):
# generates the threshold date from the input. Defaults to
# 30 days prior to UTC now.
threshold_date = None
if since_iso8601 is None:
threshold_date = arrow.utcnow().shift(days=-30)
else:
try:
threshold_date = arrow.get(since_iso8601)
except ParserError as parser_err:
LOG.error(
'Unable to parse date from %s. Error: %s, defaulting to '
'now minus 30 days',
since_iso8601,
str(parser_err)
)
threshold_date = arrow.utcnow().shift(days=-30)
return threshold_date.naive
def get_workflow_list(self, since_iso8601=None):
"""
Returns all top level workflows, filtering by the supplied
since_iso8601 value.
:param since_iso8601: An iso8601 format specifying a date
boundary. Optional, defaults to 30 days prior
:returns: a list of workflow dictionaries
"""
threshold_date = WorkflowHelper._get_threshold_date(
since_iso8601=since_iso8601
)
dag_runs = self._get_all_dag_runs_db()
# only dags meeting the date criteria and are not subdags
# by convention subdags are parent.child named.
return [
WorkflowHelper._add_id_to_run(run) for run in dag_runs
if ('.' not in run.get('dag_id') and
arrow.get(run.get('execution_date')).naive >= threshold_date)
]
def get_workflow(self, workflow_id):
"""
Retrieves the workflow(DAG) and associated steps from airflow
using key derived from workflow_id
:returns: the workflow dictionary or {} if it cannot be found
"""
dag_info = WorkflowHelper._split_workflow_id_to_dag_run(workflow_id)
# input should have been validated previously.
if not dag_info:
return {}
workflow_list = self._get_dag_run_like_id_db(**dag_info)
if not workflow_list:
return {}
# workflow list contains parent and all child dags. Sort them
workflow_list.sort(key=lambda workflow: workflow.get('dag_id'))
# add the workflow id to each one
for workflow_item in workflow_list:
WorkflowHelper._add_id_to_run(workflow_item)
# extract the parent workflow, and add the rest as children.
workflow = workflow_list[0]
workflow['sub_dags'] = workflow_list[1:]
workflow['steps'] = self._get_tasks_list(workflow_id)
return workflow
def _get_tasks_list(self, workflow_id):
"""
Returns the task list for a specified dag run derived from the
workflow_id (dag_id, execution_date)
"""
# NOTE (bryan-strassner) This currently is just a simple passthrough,
# but is anticipated to do more manipulation of
# the data in the future.
dag_info = WorkflowHelper._split_workflow_id_to_dag_run(workflow_id)
# input should have been validated previously.
if not dag_info:
return []
return self._get_tasks_by_id_db(**dag_info)
def _get_all_dag_runs_db(self):
"""
Wrapper for call to the airflow database to get all actions
:returns: a list of dag_runs dictionaries
"""
return AIRFLOW_DB.get_all_dag_runs()
def _get_dag_run_like_id_db(self, dag_id, execution_date):
"""
Wrapper for call to the airflow database to get a single action
by id
"""
return AIRFLOW_DB.get_dag_runs_like_id(
dag_id=dag_id, execution_date=execution_date
)
def _get_tasks_by_id_db(self, dag_id, execution_date):
"""
Wrapper for call to the airflow db to get all tasks
:returns: a list of task dictionaries
"""
return AIRFLOW_DB.get_tasks_by_id(
dag_id=dag_id,
execution_date=execution_date
)

View File

@ -0,0 +1,104 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import falcon
from oslo_config import cfg
from shipyard_airflow import policy
from shipyard_airflow.control.base import BaseResource
from shipyard_airflow.control.af_monitoring.workflow_helper import (
WorkflowHelper
)
from shipyard_airflow.errors import ApiError
CONF = cfg.CONF
class WorkflowResource(BaseResource):
"""
API handler for /workflows invocations
/api/v1.0/workflows
"""
@policy.ApiEnforcer('workflow_orchestrator:list_workflows')
def on_get(self, req, resp):
"""
Return actions that have been invoked through shipyard.
:returns: a json array of workflow entities
"""
since_date = req.params.get('since')
helper = WorkflowHelper(req.context.external_marker)
resp.body = self.to_json(
self.get_all_workflows(helper=helper, since_date=since_date)
)
resp.status = falcon.HTTP_200
self.info(req.context, 'response data is %s' % resp.body)
def get_all_workflows(self, helper, since_date=None):
"""
Retrieve all workflows from airflow that have occurred,
using the since_date (iso8601) as a boundary.
:param helper: The WorkflowHelper constructed for this invocation
:param since_date: a Date string in iso8601 or None
:returns: a list of workflow dictionaries
"""
return helper.get_workflow_list(since_iso8601=since_date)
class WorkflowIdResource(BaseResource):
"""
API handler for /workflows/{workflow_id} invocations
/api/v1/workflows/{workflow_id}
"""
@policy.ApiEnforcer('workflow_orchestrator:get_workflow')
def on_get(self, req, resp, workflow_id):
"""
Retrieve the step details of workflows invoked in Airflow.
:returns: a json object of a workflow entity
"""
helper = WorkflowHelper(req.context.external_marker)
resp.body = self.to_json(
self.get_workflow_detail(helper=helper, workflow_id=workflow_id)
)
resp.status = falcon.HTTP_200
self.info(req.context, 'response data is %s' % resp.body)
def get_workflow_detail(self, helper, workflow_id):
"""
Retrieve a workflow by id,
:param helper: The WorkflowHelper constructed for this invocation
:param workflow_id: a string in {dag_id}T{execution_date} format
identifying a workflow
:returns: a workflow detail dictionary including steps
"""
if not WorkflowHelper.validate_workflow_id(workflow_id):
raise ApiError(
title='Invalid Workflow ID specified',
description=(
'Workflow id must use {dag id}__{execution date} format',
),
status=falcon.HTTP_400,
retry=False,
)
workflow = helper.get_workflow(workflow_id=workflow_id)
if workflow is None:
raise ApiError(
title='Workflow not found',
description=(
'A Workflow with id {} was not found'.format(workflow_id),
),
status=falcon.HTTP_404,
retry=False,
)
return workflow

View File

@ -21,6 +21,8 @@ from shipyard_airflow.control.actions_id_api import ActionsIdResource
from shipyard_airflow.control.actions_steps_id_api import ActionsStepsResource
from shipyard_airflow.control.actions_validations_id_api import \
ActionsValidationsResource
from shipyard_airflow.control.af_monitoring.workflows_api import \
WorkflowIdResource, WorkflowResource
from shipyard_airflow.control.base import BaseResource, ShipyardRequest
from shipyard_airflow.control.configdocs_api import (CommitConfigDocsResource,
ConfigDocsResource)
@ -62,6 +64,8 @@ def start_api():
('/configdocs/{collection_id}', ConfigDocsResource()),
('/commitconfigdocs', CommitConfigDocsResource()),
('/renderedconfigdocs', RenderedConfigDocsResource()),
('/workflows', WorkflowResource()),
('/workflows/{workflow_id}', WorkflowIdResource()),
]
# Set up the 1.0 routes

View File

@ -63,6 +63,25 @@ class AirflowDbAccess(DbAccess):
execution_date = :execution_date
''')
# The like parameter must have '%' appropriately applied to the args
# used to merge into this query.
SELECT_DAG_RUNS_LIKE_ID = sqlalchemy.sql.text('''
SELECT
"dag_id",
"execution_date",
"state",
"run_id",
"external_trigger",
"start_date",
"end_date"
FROM
dag_run
WHERE
dag_id LIKE :dag_id
AND
execution_date = :execution_date
''')
SELECT_ALL_TASKS = sqlalchemy.sql.text('''
SELECT
"task_id",
@ -82,6 +101,8 @@ class AirflowDbAccess(DbAccess):
start_date
''')
# The like parameter must have '%' appropriately applied to the args
# used to merge into this query.
SELECT_TASKS_BY_ID = sqlalchemy.sql.text('''
SELECT
"task_id",
@ -140,6 +161,16 @@ class AirflowDbAccess(DbAccess):
dag_id=dag_id,
execution_date=execution_date)
def get_dag_runs_like_id(self, dag_id, execution_date):
"""
Retrieves dag runs, for parent and child dags by the parent
dag id and execution date
"""
return self.get_as_dict_array(
AirflowDbAccess.SELECT_DAG_RUNS_LIKE_ID,
dag_id=dag_id + '%',
execution_date=execution_date)
def get_all_tasks(self):
"""
Retrieves all tasks.

View File

@ -72,7 +72,7 @@ def format_error_resp(req,
# message list as well. In both cases, if the error flag is not
# set, set it appropriately.
if error_list is None:
error_list = [{'message': 'An error ocurred, but was not specified',
error_list = [{'message': 'An error occurred, but was not specified',
'error': True}]
else:
for error_item in error_list:

View File

@ -136,6 +136,26 @@ class ShipyardPolicy(object):
'method': 'GET'
}]
),
policy.DocumentedRuleDefault(
'workflow_orchestrator:list_workflows',
RULE_ADMIN_REQUIRED,
('Retrieve the list of workflows (DAGs) that have been invoked '
'in Airflow, whether via Shipyard or scheduled'),
[{
'path': '/api/v1.0/workflows',
'method': 'GET'
}]
),
policy.DocumentedRuleDefault(
'workflow_orchestrator:get_workflow',
RULE_ADMIN_REQUIRED,
('Retrieve the detailed information for a workflow (DAG) from '
'Airflow'),
[{
'path': '/api/v1.0/workflows/{id}',
'method': 'GET'
}]
),
]
# Regions Policy

View File

@ -0,0 +1,65 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mock import patch
import pytest
from shipyard_airflow.control.af_monitoring.workflow_helper import (
WorkflowHelper
)
from shipyard_airflow.control.af_monitoring.workflows_api import (
WorkflowIdResource,
WorkflowResource
)
from shipyard_airflow.errors import ApiError
def test_get_all_workflows():
"""
test that get_all_workflows invokes the helper properly
"""
wr = WorkflowResource()
with patch.object(WorkflowHelper, 'get_workflow_list') as mock_method:
helper = WorkflowHelper('')
wr.get_all_workflows(helper, None)
mock_method.assert_called_once_with(since_iso8601=None)
def test_get_workflow_detail():
"""
test that get_workflow_detail properly invokes the helper
"""
wir = WorkflowIdResource()
with patch.object(WorkflowHelper, 'get_workflow') as mock_method:
helper = WorkflowHelper('')
wir.get_workflow_detail(helper,
'deploy_site__1972-04-03T10:00:00.20123')
mock_method.assert_called_once_with(
workflow_id='deploy_site__1972-04-03T10:00:00.20123'
)
with patch.object(WorkflowHelper, 'get_workflow') as mock_method:
helper = WorkflowHelper('')
with pytest.raises(ApiError):
wir.get_workflow_detail(helper, None)
with pytest.raises(ApiError):
wir.get_workflow_detail(helper, 'this is a bad id')
with pytest.raises(ApiError):
wir.get_workflow_detail(helper, 'dag_idTexecution_date')
with pytest.raises(ApiError):
wir.get_workflow_detail(helper, 'dag_id_execution_date')
wir.get_workflow_detail(helper, 'dag_id__execution_date')
mock_method.assert_called_once_with(workflow_id='dag_id__execution_date')

View File

@ -0,0 +1,223 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import datetime
import arrow
from shipyard_airflow.control.af_monitoring.workflow_helper import (
WorkflowHelper
)
def test__add_id_to_run():
"""
static method _add_id_to_run
"""
assert WorkflowHelper._add_id_to_run(
{'dag_id': 'aardvark', 'execution_date': '1924-04-12T05:34:01.222220'}
)['workflow_id'] == 'aardvark__1924-04-12T05:34:01.222220'
assert WorkflowHelper._add_id_to_run(
{}
)['workflow_id'] == 'None__None'
def test__split_workflow_id_to_dag_run():
"""
static method _split_workflow_id_to_dag_run
"""
assert WorkflowHelper._split_workflow_id_to_dag_run(
'aardvark__1924-04-12T05:34:01.222220'
)['dag_id'] == 'aardvark'
assert WorkflowHelper._split_workflow_id_to_dag_run(
'aar__dvark__1924-04-12T05:34:01.222220'
)['dag_id'] == 'aar__dvark'
assert WorkflowHelper._split_workflow_id_to_dag_run(
'aardvark__1924-04-12T05:34:01.222220'
)['execution_date'] == '1924-04-12T05:34:01.222220'
assert WorkflowHelper._split_workflow_id_to_dag_run(
'aar__dvark__1924-04-12T05:34:01.222220'
)['execution_date'] == '1924-04-12T05:34:01.222220'
def test_validate_workflow_id():
"""
static method validate_workflow_id
"""
assert WorkflowHelper.validate_workflow_id(
'aar__dvark__1924-04-12T05:34:01.222220'
)
assert not WorkflowHelper.validate_workflow_id(
'aardvark_1924-04-12T05:34:01.222220'
)
assert not WorkflowHelper.validate_workflow_id(
None
)
def test__get_threshold_date():
"""
static method _get_threshold_date
"""
assert (
arrow.utcnow().shift(days=-30).naive <=
WorkflowHelper._get_threshold_date('turnip') <=
arrow.utcnow().shift(days=-30).naive
)
assert (
arrow.utcnow().shift(days=-30).naive <=
WorkflowHelper._get_threshold_date(None) <=
arrow.utcnow().shift(days=-30).naive
)
assert (
WorkflowHelper._get_threshold_date('2017-10-09T10:00:00.000000') ==
arrow.get('2017-10-09T10:00:00.00000').naive
)
DATE_ONE = datetime(2017, 9, 13, 11, 13, 3, 57000)
DATE_TWO = datetime(2017, 9, 13, 11, 13, 5, 57000)
DAG_RUN_1 = {
'dag_id': 'did2',
'execution_date': DATE_ONE,
'state': 'FAILED',
'run_id': '99',
'external_trigger': 'something',
'start_date': DATE_ONE,
'end_date': DATE_ONE
}
def test_get_workflow_list():
"""
Tests the get_workflow_list method
"""
helper = WorkflowHelper('')
helper._get_all_dag_runs_db = lambda: [DAG_RUN_1, DAG_RUN_1, DAG_RUN_1]
# Time includes items
dag_list = helper.get_workflow_list(
since_iso8601='2017-09-13T11:12:00.000000'
)
assert DAG_RUN_1 in dag_list
assert len(dag_list) == 3
# Time excludes items
dag_list = helper.get_workflow_list(
since_iso8601='2017-10-01T11:12:00.000000'
)
assert DAG_RUN_1 not in dag_list
assert len(dag_list) == 0
TASK_LIST = [
{
'task_id': '1a',
'dag_id': 'did2',
'execution_date': DATE_ONE,
'state': 'SUCCESS',
'run_id': '12345',
'external_trigger': 'something',
'start_date': DATE_ONE,
'end_date': DATE_ONE,
'duration': '20mins',
'try_number': '1',
'operator': 'smooth',
'queued_dttm': DATE_ONE
},
{
'task_id': '1b',
'dag_id': 'did2',
'execution_date': DATE_ONE,
'state': 'SUCCESS',
'run_id': '12345',
'external_trigger': 'something',
'start_date': DATE_TWO,
'end_date': DATE_TWO,
'duration': '1minute',
'try_number': '1',
'operator': 'smooth',
'queued_dttm': DATE_ONE
},
{
'task_id': '1c',
'dag_id': 'did2',
'execution_date': DATE_ONE,
'state': 'FAILED',
'run_id': '12345',
'external_trigger': 'something',
'start_date': DATE_TWO,
'end_date': DATE_TWO,
'duration': '1day',
'try_number': '3',
'operator': 'smooth',
'queued_dttm': DATE_TWO
}
]
def test_get_workflow():
"""
Tests the get_workflow method
"""
helper = WorkflowHelper('')
helper._get_dag_run_like_id_db = lambda dag_id, execution_date: [DAG_RUN_1]
helper._get_tasks_by_id_db = lambda dag_id, execution_date: TASK_LIST
dag_detail = helper.get_workflow(
workflow_id='dag_id__1957-03-14T12:12:12.000000'
)
assert dag_detail['dag_id'] == 'did2'
assert len(dag_detail['steps']) == 3
dag_detail = helper.get_workflow(
workflow_id='NOTHING'
)
assert dag_detail == {}
DAG_RUN_SUB = {
'dag_id': 'did2.didnt',
'execution_date': DATE_ONE,
'state': 'FAILED',
'run_id': '99',
'external_trigger': 'something',
'start_date': DATE_ONE,
'end_date': DATE_ONE
}
def test_get_workflow_subords():
"""
Tests the get_workflow method
"""
helper = WorkflowHelper('')
helper._get_dag_run_like_id_db = (
lambda dag_id, execution_date: [DAG_RUN_SUB, DAG_RUN_1]
)
helper._get_tasks_by_id_db = lambda dag_id, execution_date: TASK_LIST
dag_detail = helper.get_workflow(
workflow_id='dag_id__1957-03-14T12:12:12.000000'
)
assert dag_detail['dag_id'] == 'did2'
assert len(dag_detail['sub_dags']) == 1
assert dag_detail['sub_dags'][0]['dag_id'] == 'did2.didnt'
assert len(dag_detail['steps']) == 3