diff --git a/charts/shipyard/values.yaml b/charts/shipyard/values.yaml index 87e20ef7..7e6a1720 100644 --- a/charts/shipyard/values.yaml +++ b/charts/shipyard/values.yaml @@ -380,10 +380,15 @@ conf: auth_version: v3 memcache_security_strategy: ENCRYPT requests_config: + airflow_log_connect_timeout: 5 + airflow_log_read_timeout: 300 deckhand_client_connect_timeout: 5 deckhand_client_read_timeout: 300 validation_connect_timeout: 5 validation_read_timeout: 300 + airflow: + worker_endpoint_scheme: 'http' + worker_port: 8793 airflow_config_file: path: /usr/local/airflow/airflow.cfg airflow: diff --git a/etc/shipyard/policy.yaml.sample b/etc/shipyard/policy.yaml.sample index 3d924a92..3de17a82 100644 --- a/etc/shipyard/policy.yaml.sample +++ b/etc/shipyard/policy.yaml.sample @@ -17,6 +17,10 @@ # GET /api/v1.0/actions/{action_id}/steps/{step_id} #"workflow_orchestrator:get_action_step": "rule:admin_required" +# Retreive the logs of an action step by its id +# GET /api/v1.0/actions/{action_id}/steps/{step_id}/logs +#"workflow_orchestrator:get_action_step_logs": "rule:admin_required" + # Retreive an action validation by its id # GET /api/v1.0/actions/{action_id}/validations/{validation_id} #"workflow_orchestrator:get_action_validation": "rule:admin_required" diff --git a/etc/shipyard/shipyard.conf.sample b/etc/shipyard/shipyard.conf.sample index 67fbb874..a21f2ba6 100644 --- a/etc/shipyard/shipyard.conf.sample +++ b/etc/shipyard/shipyard.conf.sample @@ -1,6 +1,19 @@ [DEFAULT] +[airflow] + +# +# From shipyard_airflow +# + +# Airflow worker url scheme (string value) +#worker_endpoint_scheme = 'http' + +# Airflow worker port (integer value) +#worker_port = 8793 + + [armada] # @@ -253,6 +266,12 @@ # From shipyard_airflow # +# Airflow logs retrieval connect timeout (in seconds) (integer value) +#airflow_log_connect_timeout = 5 + +# Airflow logs retrieval timeout (in seconds) (integer value) +#airflow_log_read_timeout = 300 + # Deckhand client connect timeout (in seconds) (integer value) #deckhand_client_connect_timeout = 5 diff --git a/shipyard_airflow/conf/config.py b/shipyard_airflow/conf/config.py index 26bede24..fbb25ba8 100644 --- a/shipyard_airflow/conf/config.py +++ b/shipyard_airflow/conf/config.py @@ -150,6 +150,16 @@ SECTIONS = [ name='requests_config', title='Requests Configuration', options=[ + cfg.IntOpt( + 'airflow_log_connect_timeout', + default=5, + help='Airflow logs retrieval connect timeout (in seconds)' + ), + cfg.IntOpt( + 'airflow_log_read_timeout', + default=300, + help='Airflow logs retrieval timeout (in seconds)' + ), cfg.IntOpt( 'deckhand_client_connect_timeout', default=5, @@ -175,6 +185,22 @@ SECTIONS = [ ), ] ), + ConfigSection( + name='airflow', + title='Airflow connection info', + options=[ + cfg.StrOpt( + 'worker_endpoint_scheme', + default='http', + help='Airflow worker url scheme' + ), + cfg.IntOpt( + 'worker_port', + default=8793, + help='Airflow worker port' + ), + ] + ), ] diff --git a/shipyard_airflow/control/action/action_helper.py b/shipyard_airflow/control/action/action_helper.py index 0dfca79c..02bea2a5 100644 --- a/shipyard_airflow/control/action/action_helper.py +++ b/shipyard_airflow/control/action/action_helper.py @@ -12,6 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. """ Common methods for use by action api classes as necessary """ +from datetime import datetime +import falcon +import logging + +from shipyard_airflow.db.db import AIRFLOW_DB, SHIPYARD_DB +from shipyard_airflow.errors import ApiError + + +LOG = logging.getLogger(__name__) + DAG_STATE_MAPPING = { 'QUEUED': 'Pending', @@ -59,3 +69,159 @@ def format_step(action_id, step, index): 'id': step.get('task_id'), 'index': index } + + +class ActionsHelper(object): + """ + A helper class for Shipyard actions + """ + def __init__(self, action_id): + """Initialization of ActionsHelper object. + :param action_id: Shipyard action ID + + """ + self.action_id = action_id + self.action = self._get_action_info() + + LOG.debug("Initialized ActionHelper for action_id: %s", self.action_id) + + def _get_action_info(self): + """ + :returns: Action Information + """ + # Retrieve information related to the action id + action = self._get_action_db(self.action_id) + + if not action: + raise ApiError( + title='Action Not Found!', + description='Unknown Action {}'.format(self.action_id), + status=falcon.HTTP_404) + else: + return action + + def _get_dag_info(self): + """ + returns: Dag Information + """ + # Retrieve 'dag_id' and 'dag_execution_date' + dag_id = self.action.get('dag_id') + dag_execution_date = self.action.get('dag_execution_date') + + if not dag_id: + raise ApiError( + title='Dag ID Not Found!', + description='Unable to retrieve Dag ID', + status=falcon.HTTP_404) + elif not dag_execution_date: + raise ApiError( + title='Execution Date Not Found!', + description='Unable to retrieve Execution Date', + status=falcon.HTTP_404) + else: + return dag_id, dag_execution_date + + def _get_all_steps(self): + """ + returns: All steps for the action ID + """ + # Retrieve dag_id and dag_execution_date + dag_id, dag_execution_date = self._get_dag_info() + + # Retrieve the action steps + steps = self._get_tasks_db(dag_id, dag_execution_date) + + if not steps: + raise ApiError( + title='Steps Not Found!', + description='Unable to retrieve Information on Steps', + status=falcon.HTTP_404) + else: + LOG.debug("%s steps found for action %s", + len(steps), self.action_id) + LOG.debug("The steps for action %s are as follows:", + self.action_id) + LOG.debug(steps) + + return steps + + def get_step(self, step_id): + """ + returns: Step + """ + # Retrieve step. Note that we will get a list and it will + # be the content of step[0] + step_list = [x for x in + self._get_all_steps() + if step_id == x['task_id']] + + if not step_list: + raise ApiError( + title='Step Not Found!', + description='Unable to retrieve Step', + status=falcon.HTTP_404) + else: + step = step_list[0] + LOG.debug("Step Located:") + LOG.debug(step) + + return step + + @staticmethod + def get_formatted_dag_execution_date(step): + """ + :returns: dag_execution_date + """ + strftime = datetime.strftime + return step['execution_date'].strftime("%Y-%m-%dT%H:%M:%S") + + @staticmethod + def parse_action_id(**kwargs): + """ + Retreive action_id from the input parameters + Action_id must be a 26 character ulid. + """ + action_id = kwargs.get('action_id') + if action_id is None or not len(action_id) == 26: + raise ApiError( + title='Invalid Action ID!', + description='An invalid action ID was specified', + status=falcon.HTTP_400) + + LOG.debug("Action ID parsed: %s", action_id) + + return action_id + + @staticmethod + def parse_step_id(**kwargs): + """ + Retreive step_id from the input parameters + """ + step_id = kwargs.get('step_id') + if step_id is None: + raise ApiError( + title='Missing Step ID!', + description='Missing Step ID', + status=falcon.HTTP_400) + + LOG.debug("Step ID parsed: %s", step_id) + + return step_id + + @staticmethod + def _get_action_db(action_id): + """ + Wrapper for call to the shipyard database to get an action + :returns: a dictionary of action details. + """ + return SHIPYARD_DB.get_action_by_id( + action_id=action_id) + + @staticmethod + def _get_tasks_db(dag_id, execution_date): + """ + Wrapper for call to the airflow db to get all tasks for a dag run + :returns: a list of task dictionaries + """ + return AIRFLOW_DB.get_tasks_by_id( + dag_id=dag_id, execution_date=execution_date) diff --git a/shipyard_airflow/control/action/actions_steps_id_logs_api.py b/shipyard_airflow/control/action/actions_steps_id_logs_api.py new file mode 100644 index 00000000..eb3a0275 --- /dev/null +++ b/shipyard_airflow/control/action/actions_steps_id_logs_api.py @@ -0,0 +1,133 @@ +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import falcon +import logging +import os +import requests + +from oslo_config import cfg + +from shipyard_airflow import policy +from shipyard_airflow.control.action.action_helper import ActionsHelper +from shipyard_airflow.control.base import BaseResource + +CONF = cfg.CONF +LOG = logging.getLogger(__name__) + + +class ActionsStepsLogsResource(BaseResource): + """ + The actions steps logs resource retrieves the logs for a particular + step of an action. By default, it will retrieve the logs from the + last attempt. Note that a workflow step can retry multiple times with + the names of the logs as 1.log, 2.log, 3.log, etc. + + """ + @policy.ApiEnforcer('workflow_orchestrator:get_action_step_logs') + def on_get(self, req, resp, **kwargs): + """ + Returns the logs of an action step + :returns: logs of an action step + """ + # We will set the kwarg to 'try_number' as 'try' is a + # reserved keyword + try_number = req.get_param_as_int('try', + required=False) + + # Parse kwargs + action_id = ActionsHelper.parse_action_id(**kwargs) + step_id = ActionsHelper.parse_step_id(**kwargs) + + # Retrieve logs for the action step + resp.body = self.get_action_step_logs(action_id, + step_id, + try_number) + + resp.status = falcon.HTTP_200 + + def get_action_step_logs(self, action_id, step_id, try_number=None): + """ + Retrieve Airflow Logs + """ + # Set up actions helper + self.actions_helper = ActionsHelper(action_id=action_id) + + # Retrieve step + step = self.actions_helper.get_step(step_id) + + # Retrieve Dag ID + dag_id = step['dag_id'] + + # Generate Log Endpoint + log_endpoint = self.generate_log_endpoint(step, + dag_id, + step_id, + try_number) + + LOG.debug("Log endpoint url is: %s", log_endpoint) + + return self.retrieve_logs(log_endpoint) + + def generate_log_endpoint(self, step, dag_id, step_id, try_number): + """ + Retrieve Log Endpoint + """ + # Construct worker pod URL + scheme = CONF.airflow.worker_endpoint_scheme + worker_pod_fqdn = step['hostname'] + worker_pod_port = CONF.airflow.worker_port + worker_pod_url = "{}://{}:{}".format(scheme, + worker_pod_fqdn, + str(worker_pod_port)) + + # Define log_file + if try_number: + log_file = str(try_number) + '.log' + else: + log_file = str(step['try_number']) + '.log' + + # Define dag_execution_date + dag_execution_date = ( + self.actions_helper.get_formatted_dag_execution_date(step)) + + # Form logs query endpoint + log_endpoint = os.path.join(worker_pod_url, + 'log', + dag_id, + step_id, + dag_execution_date, + log_file) + + return log_endpoint + + @staticmethod + def retrieve_logs(log_endpoint): + """ + Retrieve Logs + """ + try: + LOG.debug("Retrieving Airflow logs...") + + response = requests.get( + log_endpoint, + timeout=( + CONF.requests_config.airflow_log_connect_timeout, + CONF.requests_config.airflow_log_read_timeout)) + + return response.text + + except requests.exceptions.RequestException as e: + LOG.info(e) + LOG.info("Unable to retrieve requested logs") + return [] diff --git a/shipyard_airflow/control/api.py b/shipyard_airflow/control/api.py index 86c7a6ca..7f204cfa 100644 --- a/shipyard_airflow/control/api.py +++ b/shipyard_airflow/control/api.py @@ -20,6 +20,8 @@ from shipyard_airflow.control.action.actions_control_api import \ from shipyard_airflow.control.action.actions_id_api import ActionsIdResource from shipyard_airflow.control.action.actions_steps_id_api import \ ActionsStepsResource +from shipyard_airflow.control.action.actions_steps_id_logs_api import \ + ActionsStepsLogsResource from shipyard_airflow.control.action.actions_validations_id_api import \ ActionsValidationsResource from shipyard_airflow.control.af_monitoring.workflows_api import ( @@ -65,6 +67,8 @@ def start_api(): ActionsControlResource()), ('/actions/{action_id}/steps/{step_id}', ActionsStepsResource()), + ('/actions/{action_id}/steps/{step_id}/logs', + ActionsStepsLogsResource()), ('/actions/{action_id}/validations/{validation_id}', ActionsValidationsResource()), ('/configdocs', ConfigDocsStatusResource()), diff --git a/shipyard_airflow/db/airflow_db.py b/shipyard_airflow/db/airflow_db.py index 900260b1..d0b357d2 100644 --- a/shipyard_airflow/db/airflow_db.py +++ b/shipyard_airflow/db/airflow_db.py @@ -35,26 +35,30 @@ class AirflowDbAccess(DbAccess): SELECT_ALL_DAG_RUNS = sqlalchemy.sql.text(''' SELECT + "id", "dag_id", "execution_date", "state", "run_id", "external_trigger", - "start_date", - "end_date" + "conf", + "end_date", + "start_date" FROM dag_run ''') SELECT_DAG_RUNS_BY_ID = sqlalchemy.sql.text(''' SELECT + "id", "dag_id", "execution_date", "state", "run_id", "external_trigger", - "start_date", - "end_date" + "conf", + "end_date", + "start_date" FROM dag_run WHERE @@ -67,13 +71,15 @@ class AirflowDbAccess(DbAccess): # used to merge into this query. SELECT_DAG_RUNS_LIKE_ID = sqlalchemy.sql.text(''' SELECT + "id", "dag_id", "execution_date", "state", "run_id", "external_trigger", - "start_date", - "end_date" + "conf", + "end_date", + "start_date" FROM dag_run WHERE @@ -92,8 +98,16 @@ class AirflowDbAccess(DbAccess): "duration", "state", "try_number", + "hostname", + "unixname", + "job_id", + "pool", + "queue", + "priority_weight", "operator", - "queued_dttm" + "queued_dttm", + "pid", + "max_tries" FROM task_instance ORDER BY @@ -113,8 +127,16 @@ class AirflowDbAccess(DbAccess): "duration", "state", "try_number", + "hostname", + "unixname", + "job_id", + "pool", + "queue", + "priority_weight", "operator", - "queued_dttm" + "queued_dttm", + "pid", + "max_tries" FROM task_instance WHERE diff --git a/shipyard_airflow/policy.py b/shipyard_airflow/policy.py index bcfc6106..4a22bf8e 100644 --- a/shipyard_airflow/policy.py +++ b/shipyard_airflow/policy.py @@ -64,7 +64,7 @@ class ShipyardPolicy(object): policy.DocumentedRuleDefault( 'workflow_orchestrator:get_action', RULE_ADMIN_REQUIRED, - 'Retreive an action by its id', + 'Retrieve an action by its id', [{ 'path': '/api/v1.0/actions/{action_id}', 'method': 'GET' @@ -73,16 +73,25 @@ class ShipyardPolicy(object): policy.DocumentedRuleDefault( 'workflow_orchestrator:get_action_step', RULE_ADMIN_REQUIRED, - 'Retreive an action step by its id', + 'Retrieve an action step by its id', [{ 'path': '/api/v1.0/actions/{action_id}/steps/{step_id}', 'method': 'GET' }] ), + policy.DocumentedRuleDefault( + 'workflow_orchestrator:get_action_step_logs', + RULE_ADMIN_REQUIRED, + 'Retrieve logs of an action step by its id', + [{ + 'path': '/api/v1.0/actions/{action_id}/steps/{step_id}/logs', + 'method': 'GET' + }] + ), policy.DocumentedRuleDefault( 'workflow_orchestrator:get_action_validation', RULE_ADMIN_REQUIRED, - 'Retreive an action validation by its id', + 'Retrieve an action validation by its id', [{ 'path': '/api/v1.0/actions/{action_id}/validations/{validation_id}', diff --git a/tests/unit/control/test.conf b/tests/unit/control/test.conf index 239d1cdd..22ea897f 100644 --- a/tests/unit/control/test.conf +++ b/tests/unit/control/test.conf @@ -1,3 +1,6 @@ +[airflow] +worker_endpoint_scheme = http +worker_port = 8793 [armada] service_type = armada [base] @@ -27,10 +30,12 @@ project_name = service user_domain_name = default username = shipyard [requests_config] -deckhand_client_connect_timeout=5 -deckhand_client_read_timeout=300 -validation_connect_timeout=5 -validation_read_timeout=300 +airflow_log_connect_timeout = 5 +airflow_log_read_timeout = 300 +deckhand_client_connect_timeout = 5 +deckhand_client_read_timeout = 300 +validation_connect_timeout = 5 +validation_read_timeout = 300 [shipyard] service_type = shipyard [logging] diff --git a/tests/unit/control/test_actions_steps_id_logs_api.py b/tests/unit/control/test_actions_steps_id_logs_api.py new file mode 100644 index 00000000..7c22bc80 --- /dev/null +++ b/tests/unit/control/test_actions_steps_id_logs_api.py @@ -0,0 +1,196 @@ +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import mock +from datetime import datetime +from mock import patch + +from shipyard_airflow.control.action.actions_steps_id_logs_api import \ + ActionsStepsLogsResource +from tests.unit.control import common + +# Define Global Variables +DATE_ONE = datetime(2018, 4, 5, 16, 29, 11) +DATE_TWO = datetime(2018, 4, 5, 16, 29, 17, 630472) +DATE_THREE = datetime(2018, 4, 5, 16, 29, 21, 984263) +DATE_FOUR = datetime(2018, 4, 5, 16, 29, 13, 698703) +DATE_ONE_STR = DATE_ONE.strftime('%Y-%m-%d %H:%M:%S') +DATE_TWO_STR = DATE_TWO.strftime('%Y-%m-%d %H:%M:%S.%f') +DATE_THREE_STR = DATE_THREE.strftime('%Y-%m-%d %H:%M:%S.%f') +DATE_FOUR_STR = DATE_FOUR.strftime('%Y-%m-%d %H:%M:%S.%f') + +EXECUTION_DATE_STR = DATE_ONE.strftime('%Y-%m-%dT%H:%M:%S') + +KWARGS = { + 'action_id': '01C9VVQSCFS7V9QB5GBS3WFVSE', + 'step_id': 'action_xcom', + 'try_number': 2 +} + +ACTIONS_DB = { + 'id': '01C9VVQSCFS7V9QB5GBS3WFVSE', + 'name': 'deploy_site', + 'parameters': {}, + 'dag_id': 'deploy_site', + 'dag_execution_date': DATE_ONE_STR, + 'user': 'shipyard', + 'timestamp': DATE_ONE_STR, + 'context_marker': '00c6d78e-fb18-4f55-8a1a-b659a516850d' +} + +TASK_INSTANCE_DB = [ + { + 'task_id': 'action_xcom', + 'dag_id': 'deploy_site', + 'execution_date': DATE_ONE, + 'start_date': DATE_TWO, + 'end_date': DATE_THREE, + 'duration': '4.353791', + 'state': 'success', + 'try_number': '1', + 'hostname': 'airflow-worker-0.ucp.svc.cluster.local', + 'unixname': 'airflow', + 'job_id': '2', + 'pool': 'default', + 'priority_weight': '8', + 'operator': 'PythonOperator', + 'queued_dttm': DATE_FOUR, + 'pid': '290', + 'max_tries': '0' + }, { + 'task_id': 'dag_concurrency_check', + 'dag_id': 'deploy_site', + 'execution_date': DATE_ONE, + 'start_date': DATE_TWO, + 'end_date': DATE_THREE, + 'duration': '4.034112', + 'state': 'success', + 'try_number': '1', + 'hostname': 'airflow-worker-1.ucp.svc.cluster.local', + 'unixname': 'airflow', + 'job_id': '3', + 'pool': 'default', + 'priority_weight': '7', + 'operator': 'ConcurrencyCheckOperator', + 'queued_dttm': DATE_FOUR, + 'pid': '324', + 'max_tries': '0' + }, { + 'task_id': 'k8s_preflight_check', + 'dag_id': 'deploy_site', + 'execution_date': DATE_ONE, + 'start_date': DATE_TWO, + 'end_date': DATE_THREE, + 'duration': '4.452571', + 'state': 'failed', + 'try_number': '1', + 'hostname': 'airflow-worker-0.ucp.svc.cluster.local', + 'unixname': 'airflow', + 'job_id': '7', + 'pool': 'default', + 'priority_weight': '1', + 'operator': 'K8sHealthCheckOperator', + 'queued_dttm': DATE_FOUR, + 'pid': '394', + 'max_tries': '0' + } +] + +XCOM_RUN_LOGS = """ +Running on host airflow-worker-0.airflow-worker-discovery.ucp.svc.cluster.local +Dependencies all met for TaskInstance: deploy_site.action_xcom +Dependencies all met for TaskInstance: deploy_site.action_xcom +INFO - +-------------------------------------------------------------------------------- +Starting attempt 1 of 1 +-------------------------------------------------------------------------------- + +Executing Task(PythonOperator): action_xcom +Running: ['bash', '-c', 'airflow run deploy_site action_xcom \ +2018-04-11T07:30:37 --job_id 2 --raw -sd DAGS_FOLDER/deploy_site.py'] +Running on host airflow-worker-0.airflow-worker-discovery.ucp.svc.cluster.local +Subtask: [2018-04-11 07:30:43,944] {{python_operator.py:90}} \ +INFO - Done. Returned value was: None +""" + + +class TestActionsStepsLogsEndpoint(): + @patch.object(ActionsStepsLogsResource, 'get_action_step_logs', + common.str_responder) + def test_on_get(self, api_client): + """Validate the on_get method returns 200 on success""" + # Define Endpoint + endpoint = "/api/v1.0/actions/{}/steps/{}/logs".format( + '01C9VVQSCFS7V9QB5GBS3WFVSE', + 'action_xcom') + + result = api_client.simulate_get(endpoint, + headers=common.AUTH_HEADERS) + assert result.status_code == 200 + + @patch('shipyard_airflow.control.action.action_helper.ActionsHelper', + autospec=True) + def test_generate_log_endpoint(self, mock_actions_helper): + """Tests log endpoint generation""" + action_logs_resource = ActionsStepsLogsResource() + + mock_actions_helper.get_formatted_dag_execution_date.return_value = ( + EXECUTION_DATE_STR) + action_logs_resource.actions_helper = mock_actions_helper + + # Define variables + action_id = ACTIONS_DB['name'] + step = TASK_INSTANCE_DB[0] + step_id = TASK_INSTANCE_DB[0]['task_id'] + try_number = '2' + required_endpoint = ( + 'http://airflow-worker-0.ucp.svc.cluster.local:8793/' + 'log/deploy_site/action_xcom/2018-04-05T16:29:11/2.log') + + log_endpoint = action_logs_resource.generate_log_endpoint( + step, action_id, step_id, try_number) + + assert log_endpoint == required_endpoint + + mock_actions_helper.get_formatted_dag_execution_date.\ + assert_called_once_with(step) + + def _mock_response( + self, + status=200, + text="TEXT"): + + mock_resp = mock.Mock() + + # set status code and content + mock_resp.status_code = status + mock_resp.text = text + + return mock_resp + + @mock.patch('requests.get') + def test_retrieve_logs(self, mock_get): + """Tests log retrieval""" + action_logs_resource = ActionsStepsLogsResource() + + # Define variables + log_endpoint = ( + 'http://airflow-worker-0.ucp.svc.cluster.local:8793/' + 'log/deploy_site/action_xcom/2018-04-05T16:29:11/2.log') + + mock_resp = self._mock_response(text=XCOM_RUN_LOGS) + mock_get.return_value = mock_resp + + result = action_logs_resource.retrieve_logs(log_endpoint) + + assert result == XCOM_RUN_LOGS diff --git a/tools/resources/shipyard.conf b/tools/resources/shipyard.conf index 55bea973..02d4ea7b 100644 --- a/tools/resources/shipyard.conf +++ b/tools/resources/shipyard.conf @@ -1,5 +1,8 @@ # A fake configuration file with default/fake values specified for use in # basic testing of the Shipyard image +[airflow] +worker_endpoint_scheme = http +worker_port = 8793 [armada] service_type = armada [base] @@ -38,9 +41,11 @@ project_name = service user_domain_name = default username = shipyard [requests_config] -deckhand_client_connect_timeout=5 -deckhand_client_read_timeout=300 -validation_connect_timeout=5 -validation_read_timeout=300 +airflow_log_connect_timeout = 5 +airflow_log_read_timeout = 300 +deckhand_client_connect_timeout = 5 +deckhand_client_read_timeout = 300 +validation_connect_timeout = 5 +validation_read_timeout = 300 [shipyard] service_type = shipyard