Fix 404 on log retrieval for retries of a step

Retries of a step may execute on different workers.
Each worker only exposes the logs that were generated by that worker.
When assembling the URL for the step, we need to ensure
that we include the retry number as part of the key lookup for a step,
so that it can use the correct worker reference.

Updated ActionsHelper.get_step to use try_number.
The test is included.

Change-Id: I2f8cf9ed70ce344f53ecdcc4edbec924ba6a00c9
This commit is contained in:
skovaleff 2018-09-10 19:36:26 -07:00
parent e022b0178d
commit 9f453dd22b
3 changed files with 151 additions and 3 deletions

View File

@ -64,7 +64,7 @@ class ActionsStepsLogsResource(BaseResource):
self.actions_helper = ActionsHelper(action_id=action_id)
# Retrieve step
step = self.actions_helper.get_step(step_id)
step = self.actions_helper.get_step(step_id, try_number)
# Retrieve Dag ID
dag_id = step['dag_id']

View File

@ -145,15 +145,23 @@ class ActionsHelper(object):
return steps
def get_step(self, step_id):
def get_step(self, step_id, try_number=None):
"""
:param step_id: Step ID - task_id in db
:param try_number: Number of try - try_number in db
returns: Step
"""
# Retrieve step. Note that we will get a list and it will
# be the content of step[0]
step_list = [x for x in
self._get_all_steps()
if step_id == x['task_id']]
if step_id == x['task_id'] and
(try_number is None or try_number == x['try_number'])
]
# try_number is needed to get correct task from correct worker
# the worker host for request URL
# is referenced in correct task's 'hostname' field
if not step_list:
raise ApiError(

View File

@ -14,6 +14,7 @@
""" Tests for the action_helper.py module """
from shipyard_airflow.control.helpers import action_helper
import yaml
def test_determine_lifecycle():
@ -28,3 +29,142 @@ def test_determine_lifecycle():
for status_pair in dag_statuses:
assert(status_pair['expected'] ==
action_helper.determine_lifecycle(status_pair['input']))
def test_get_step():
# Set up actions helper
action_id = '01CPV581B0CM8C9CA0CFRNVPPY' # id in db
actions = yaml.safe_load("""
---
- id: 01CPV581B0CM8C9CA0CFRNVPPY
name: update_software
parameters: {}
dag_id: update_software
dag_execution_date: 2018-09-07T23:18:04
user: admin
datetime: 2018-09-07 23:18:04.38546+00
context_marker: 10447c79-b02c-4dfd-a9e8-1362842f029d
...
""")
action_helper.ActionsHelper._get_action_db = lambda \
self, action_id: actions[0]
tasks = yaml.safe_load("""
---
- task_id: armada_get_status
dag_id: update_software.armada_build
execution_date: 2018-09-07 23:18:04
start_date: 2018-09-07 23:18:55.950298
end_date: 2018-09-07 23:18:58.159597
duration: 2.209299
state: success
try_number: 1
hostname: airflow-worker-0.airflow-worker-discovery.ucp.svc.cluster.local
unixname: airflow
job_id: 11
pool:
queue: default
priority_weight: 3
operator: ArmadaGetStatusOperator
queued_dttm:
pid: 249
max_tries: 0
- task_id: armada_get_status
dag_id: update_software.armada_build
execution_date: 2018-09-07 23:18:04
start_date: 2018-09-07 23:18:55.950298
end_date: 2018-09-07 23:18:58.159597
duration: 2.209299
state: success
try_number: 2
hostname: airflow-worker-1.airflow-worker-discovery.ucp.svc.cluster.local
unixname: airflow
job_id: 12
pool:
queue: default
priority_weight: 3
operator: ArmadaGetStatusOperator
queued_dttm:
pid: 249
max_tries: 0
- task_id: armada_post_apply
dag_id: update_software.armada_build
execution_date: 2018-09-07 23:18:04
start_date: 2018-09-07 23:48:25.884615
end_date: 2018-09-07 23:48:50.552757
duration: 24.668142
state: success
try_number: 2
hostname: airflow-worker-0.airflow-worker-discovery.ucp.svc.cluster.local
unixname: airflow
job_id: 13
pool:
queue: default
priority_weight: 2
operator: ArmadaPostApplyOperator
queued_dttm:
pid: 329
max_tries: 3
- task_id: armada_get_releases
dag_id: update_software.armada_build
execution_date: 2018-09-07 23:18:04
start_date: 2018-09-07 23:48:59.024696
end_date: 2018-09-07 23:49:01.471963
duration: 2.447267
state: success
try_number: 1
hostname: airflow-worker-0.airflow-worker-discovery.ucp.svc.cluster.local
unixname: airflow
job_id: 14
pool:
queue: default
priority_weight: 1
operator: ArmadaGetReleasesOperator
queued_dttm:
pid: 365
max_tries: 0
- task_id: armada_build
dag_id: update_software
execution_date: 2018-09-07 23:18:04
start_date: 2018-09-07 23:18:47.447987
end_date: 2018-09-07 23:49:02.397515
duration: 1814.949528
state: success
try_number: 1
hostname: airflow-worker-0.airflow-worker-discovery.ucp.svc.cluster.local
unixname: airflow
job_id: 9
pool:
queue: default
priority_weight: 5
operator: SubDagOperator
queued_dttm: 2018-09-07 23:18:45.772501
pid: 221
max_tries: 0
...
""")
action_helper.ActionsHelper._get_tasks_db = lambda \
self, dag_id, execution_date: tasks
actions_helper = action_helper.ActionsHelper(action_id=action_id)
# Retrieve step
step_id = 'armada_get_status' # task_id in db
# test backward compatibility with no additional param
step = actions_helper.get_step(step_id)
assert(step['hostname'].startswith('airflow-worker-0'))
# test explicit None
try_number = None
step = actions_helper.get_step(step_id, try_number)
assert(step['hostname'].startswith('airflow-worker-0'))
# test try_number associated with 0 worker
try_number = 1
step = actions_helper.get_step(step_id, try_number)
assert(step['hostname'].startswith('airflow-worker-0'))
# test try_number associated with 1 worker
try_number = 2
step = actions_helper.get_step(step_id, try_number)
assert(step['hostname'].startswith('airflow-worker-1'))