From e59fb314c1f14e2c8b6c0e2aabb82ce51762e246 Mon Sep 17 00:00:00 2001 From: Bryan Strassner Date: Mon, 6 Aug 2018 20:42:23 -0500 Subject: [PATCH] Set ULID of action on DAG request Sets the run_id for a DAG invoked in Airflow to the same ULID assigned to it in Shipyard. While this was already happening as a parameter to the DAG being invoked, by making it the run_id, further correlation is possible, at a level that both Shipyard and the Airflow framework are aware. As part of making this change, fragility was uncovered in the rest_api_plugin that expedited the need to switch to the built-in, but experimental airflow API to trigger a dag (one of two API endpoints provided - this is important later in this story). In any case, the 3rd party rest_api_plugin was removed. As a result of the rest_api_plugin being removed: 1) the simpleton helm test to check the api of airflow was also removed (it used the version endpoint of this plugin). As the built-in api provides no version endpoint or similarly accessible-without-being-stateful endpoint, the helm test had no new place to look for something to call. 2) Some clean up of exclusions and documentation was possible - test coverage, security exclusions, left over documentation remnants Change-Id: I0b68496a8500408b776b4acc12888aa017c4c7d2 --- .../templates/tests/test-airflow-api.yaml | 42 - charts/shipyard/values.yaml | 1 - docs/source/_static/shipyard.conf.sample | 2 +- docs/source/deployment-guide.rst | 1 - src/bin/shipyard_airflow/.coveragerc | 3 +- .../etc/shipyard/shipyard.conf.sample | 2 +- .../shipyard_airflow/conf/config.py | 2 +- .../control/action/actions_api.py | 24 +- .../plugins/rest_api_plugin.py | 753 ------------------ .../tests/unit/control/test_actions_api.py | 23 +- src/bin/shipyard_airflow/tox.ini | 4 +- 11 files changed, 27 insertions(+), 830 deletions(-) delete mode 100644 charts/shipyard/templates/tests/test-airflow-api.yaml delete mode 100644 src/bin/shipyard_airflow/shipyard_airflow/plugins/rest_api_plugin.py diff --git a/charts/shipyard/templates/tests/test-airflow-api.yaml b/charts/shipyard/templates/tests/test-airflow-api.yaml deleted file mode 100644 index 6d817514..00000000 --- a/charts/shipyard/templates/tests/test-airflow-api.yaml +++ /dev/null @@ -1,42 +0,0 @@ -{{/* -# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -*/}} -{{/* -Test Airflow for api functionality. -*/}} -{{- if .Values.manifests.test_airflow_api }} -{{- $envAll := . }} ---- -apiVersion: v1 -kind: Pod -metadata: - name: "{{ .Release.Name }}-airflow-api-test" - annotations: - "helm.sh/hook": "test-success" - labels: -{{ tuple $envAll "airflow" "api-test" | include "helm-toolkit.snippets.kubernetes_metadata_labels" | indent 4 }} -spec: - restartPolicy: Never - containers: - - name: "{{ .Release.Name }}-airflow-api-test" - env: - - name: 'AIRFLOW_URL' - value: {{ tuple "airflow_web" "internal" "airflow_web" . | include "helm-toolkit.endpoints.host_and_port_endpoint_uri_lookup" | quote }} - image: {{ .Values.images.tags.airflow }} - imagePullPolicy: {{ .Values.images.pull_policy }} -{{ tuple . .Values.pod.resources.test.airflow | include "helm-toolkit.snippets.kubernetes_resources" | indent 6 }} - command: ["/bin/bash", "-c", "curl -v -X GET --fail ${AIRFLOW_URL}/admin/rest_api/api?api=version; exit $?"] -... -{{- end }} diff --git a/charts/shipyard/values.yaml b/charts/shipyard/values.yaml index 0ed5b875..eb41ca61 100644 --- a/charts/shipyard/values.yaml +++ b/charts/shipyard/values.yaml @@ -727,4 +727,3 @@ manifests: service_airflow_worker: true service_discovery_airflow_worker: true test_shipyard_api: true - test_airflow_api: true diff --git a/docs/source/_static/shipyard.conf.sample b/docs/source/_static/shipyard.conf.sample index 75d643aa..16f848f9 100644 --- a/docs/source/_static/shipyard.conf.sample +++ b/docs/source/_static/shipyard.conf.sample @@ -33,7 +33,7 @@ # # The web server for Airflow (string value) -#web_server = http://localhost:32080/ +#web_server = http://localhost:8080/ # Seconds to wait to connect to the airflow api (integer value) #airflow_api_connect_timeout = 5 diff --git a/docs/source/deployment-guide.rst b/docs/source/deployment-guide.rst index 73d128c8..7367bfff 100644 --- a/docs/source/deployment-guide.rst +++ b/docs/source/deployment-guide.rst @@ -104,4 +104,3 @@ Post Deployment .. _airship-in-a-bottle: https://git.airshipit.org/cgit/airship-in-a-bottle .. _dev_minimal: https://git.airshipit.org/cgit/airship-in-a-bottle/tree/manifests/dev_minimal -.. _rest_api_plugin: https://git.airshipit.org/cgit/airship-shipyard/tree/src/bin/shipyard_airflow/shipyard_airflow/plugins/rest_api_plugin.py diff --git a/src/bin/shipyard_airflow/.coveragerc b/src/bin/shipyard_airflow/.coveragerc index b9455419..dd42a9fe 100644 --- a/src/bin/shipyard_airflow/.coveragerc +++ b/src/bin/shipyard_airflow/.coveragerc @@ -1,4 +1,3 @@ [run] # omit third party code and unit tests -omit = - shipyard_airflow/plugins/rest_api_plugin.py +omit = [] diff --git a/src/bin/shipyard_airflow/etc/shipyard/shipyard.conf.sample b/src/bin/shipyard_airflow/etc/shipyard/shipyard.conf.sample index 75d643aa..16f848f9 100644 --- a/src/bin/shipyard_airflow/etc/shipyard/shipyard.conf.sample +++ b/src/bin/shipyard_airflow/etc/shipyard/shipyard.conf.sample @@ -33,7 +33,7 @@ # # The web server for Airflow (string value) -#web_server = http://localhost:32080/ +#web_server = http://localhost:8080/ # Seconds to wait to connect to the airflow api (integer value) #airflow_api_connect_timeout = 5 diff --git a/src/bin/shipyard_airflow/shipyard_airflow/conf/config.py b/src/bin/shipyard_airflow/shipyard_airflow/conf/config.py index f1df08b3..73328c8e 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/conf/config.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/conf/config.py @@ -29,7 +29,7 @@ SECTIONS = [ options=[ cfg.StrOpt( 'web_server', - default='http://localhost:32080/', + default='http://localhost:8080/', help='The web server for Airflow' ), cfg.IntOpt( diff --git a/src/bin/shipyard_airflow/shipyard_airflow/control/action/actions_api.py b/src/bin/shipyard_airflow/shipyard_airflow/control/action/actions_api.py index 0ddaf59b..484102be 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/control/action/actions_api.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/control/action/actions_api.py @@ -13,6 +13,7 @@ # limitations under the License. from datetime import datetime import logging +import os import falcon import requests @@ -254,6 +255,8 @@ class ActionsResource(BaseResource): # Retrieve URL web_server_url = CONF.base.web_server + api_path = 'api/experimental/dags/{}/dag_runs' + req_url = os.path.join(web_server_url, api_path.format(dag_id)) c_timeout = CONF.base.airflow_api_connect_timeout r_timeout = CONF.base.airflow_api_read_timeout @@ -266,15 +269,16 @@ class ActionsResource(BaseResource): status=falcon.HTTP_503, retry=True, ) else: - conf_value = {'action': action} + # No cache please + headers = {'Cache-Control': 'no-cache'} # "conf" - JSON string that gets pickled into the DagRun's - # conf attribute - req_url = ('{}admin/rest_api/api?api=trigger_dag&dag_id={}' - '&conf={}'.format(web_server_url, - dag_id, self.to_json(conf_value))) - + # conf attribute. The conf is passed as as a string of escaped + # json inside the json payload accepted by the API. + conf_value = self.to_json({'action': action}) + payload = {'run_id': action['id'], 'conf': conf_value} try: - resp = requests.get(req_url, timeout=(c_timeout, r_timeout)) + resp = requests.post(req_url, timeout=(c_timeout, r_timeout), + headers=headers, json=payload) LOG.info('Response code from Airflow trigger_dag: %s', resp.status_code) # any 4xx/5xx will be HTTPError, which are RequestException @@ -295,16 +299,14 @@ class ActionsResource(BaseResource): retry=True, ) dag_time = self._exhume_date(dag_id, - response['output']['stdout']) + response['message']) dag_execution_date = dag_time.strftime('%Y-%m-%dT%H:%M:%S') return dag_execution_date def _exhume_date(self, dag_id, log_string): - # TODO(bryan-strassner) refactor this to an airflow api client module - # we are unable to use the response time because that # does not match the time when the dag was recorded. - # We have to parse the stdout returned to find the + # We have to parse the returned message to find the # Created 0: - logging.warning("Missing required arguments: " + str(missing_required_arguments)) - return REST_API_Response_Util.get_400_error_response(base_response, "The argument(s) " + str(missing_required_arguments) + " are required") - - # Check to make sure that the DAG you're referring to, already exists. - dag_bag = self.get_dagbag() - if dag_id is not None and dag_id not in dag_bag.dags: - logging.info("DAG_ID '" + str(dag_id) + "' was not found in the DagBag list '" + str(dag_bag.dags) + "'") - return REST_API_Response_Util.get_400_error_response(base_response, "The DAG ID '" + str(dag_id) + "' does not exist") - - # Deciding which function to use based off the API object that was requested. Some functions are custom and need to be manually routed to. - if api == "version": - final_response = self.version(base_response) - elif api == "rest_api_plugin_version": - final_response = self.rest_api_plugin_version(base_response) - elif api == "deploy_dag": - final_response = self.deploy_dag(base_response) - elif api == "refresh_dag": - final_response = self.refresh_dag(base_response) - else: - final_response = self.execute_cli(base_response, api_metadata) - - return final_response - - # General execution of a CLI command - # A command will be assembled and then passed to the OS as a commandline function and the results will be returned - def execute_cli(self, base_response, api_metadata): - logging.info("Executing cli function") - - # getting the largest cli_end_position in the api_metadata object so that the cli function can be assembled - largest_end_argument_value = 0 - for argument in api_metadata.get("arguments", []): - if argument.get("cli_end_position") is not None and argument["cli_end_position"] > largest_end_argument_value: - largest_end_argument_value = argument["cli_end_position"] - - # starting to create the airflow_cmd function - airflow_cmd_split = ["airflow", api_metadata["name"]] - - # appending arguments to the airflow_cmd_split array and setting arguments aside in the end_arguments array to be appended onto the end of airflow_cmd_split - end_arguments = [0] * largest_end_argument_value - for argument in api_metadata["arguments"]: - argument_name = argument["name"] - argument_value = request.args.get(argument_name) - logging.info("argument_name: " + str(argument_name) + ", argument_value: " + str(argument_value)) - if argument_value is not None: - # if the argument should be appended onto the end, find the position and add it to the end_arguments array - if "cli_end_position" in argument: - logging.info("argument['cli_end_position']: " + str(argument['cli_end_position'])) - end_arguments[argument["cli_end_position"]-1] = argument_value - else: - airflow_cmd_split.extend(["--" + argument_name]) - if argument["form_input_type"] is not "checkbox": - airflow_cmd_split.extend(argument_value.split(" ")) - else: - logging.warning("argument_value is null") - - # appending fixed arguments that should always be provided to the APIs - for fixed_argument in api_metadata.get("fixed_arguments", []): - fixed_argument_name = fixed_argument["name"] - fixed_argument_value = fixed_argument.get("fixed_value") - logging.info("fixed_argument_name: " + str(fixed_argument_name) + ", fixed_argument_value: " + str(fixed_argument_value)) - if fixed_argument_value is not None: - airflow_cmd_split.extend(["--" + fixed_argument_name]) - if fixed_argument_value: - airflow_cmd_split.extend(fixed_argument_value.split(" ")) - - # appending the end_arguments to the very end - airflow_cmd_split.extend(end_arguments) - - # joining all the individual arguments and components into a single string - airflow_cmd = " ".join(airflow_cmd_split) - - logging.info("airflow_cmd array: " + str(airflow_cmd_split)) - logging.info("airflow_cmd: " + str(airflow_cmd)) - - output = self.execute_cli_command(airflow_cmd_split) - - # if desired, filter out the loading messages to reduce the noise in the output - if filter_loading_messages_in_cli_response: - logging.info("Filtering Loading Messages from the CLI Response") - output = self.filter_loading_messages(output) - - return REST_API_Response_Util.get_200_response(base_response=base_response, output=output, airflow_cmd=airflow_cmd) - - # Custom function for the version API - def version(self, base_response): - logging.info("Executing custom 'version' function") - return REST_API_Response_Util.get_200_response(base_response, airflow_version) - - # Custom Function for the deploy_dag API - def deploy_dag(self, base_response): - logging.info("Executing custom 'deploy_dag' function") - - if 'dag_file' not in request.files or request.files['dag_file'].filename == '': # check if the post request has the file part - logging.warning("The dag_file argument wasn't provided") - return REST_API_Response_Util.get_400_error_response(base_response, "dag_file should be provided") - dag_file = request.files['dag_file'] - - force = True if request.form.get('force') is not None else False - logging.info("deploy_dag force upload: " + str(force)) - - pause = True if request.form.get('pause') is not None else False - logging.info("deploy_dag in pause state: " + str(pause)) - - unpause = True if request.form.get('unpause') is not None else False - logging.info("deploy_dag in unpause state: " + str(unpause)) - - # make sure that the dag_file is a python script - if dag_file and dag_file.filename.endswith(".py"): - save_file_path = os.path.join(airflow_dags_folder, dag_file.filename) - - # Check if the file already exists. - if os.path.isfile(save_file_path) and not force: - logging.warning("File to upload already exists") - return REST_API_Response_Util.get_400_error_response(base_response, "The file '" + save_file_path + "' already exists on host '" + hostname + "'.") - - logging.info("Saving file to '" + save_file_path + "'") - dag_file.save(save_file_path) - - else: - logging.warning("deploy_dag file is not a python file. It does not end with a .py.") - return REST_API_Response_Util.get_400_error_response(base_response, "dag_file is not a *.py file") - - warning = None - # if both the pause and unpause options are provided then skip the pausing and unpausing phase - if not (pause and unpause): - if pause or unpause: - try: - # import the DAG file that was uploaded so that we can get the DAG_ID to execute the command to pause or unpause it - import imp - dag_file = imp.load_source('module.name', save_file_path) - dag_id = dag_file.dag.dag_id - - # run the pause or unpause cli command - airflow_cmd_split = [] - if pause: - airflow_cmd_split = ["airflow", "pause", dag_id] - if unpause: - airflow_cmd_split = ["airflow", "unpause", dag_id] - cli_output = self.execute_cli_command(airflow_cmd_split) - except Exception as e: - warning = "Failed to set the state (pause, unpause) of the DAG: " + str(e) - logging.warning(warning) - else: - warning = "Both options pause and unpause were given. Skipping setting the state (pause, unpause) of the DAG." - logging.warning(warning) - - return REST_API_Response_Util.get_200_response(base_response=base_response, output="DAG File [{}] has been uploaded".format(dag_file), warning=warning) - - # Custom Function for the refresh_dag API - # This will call the direct function corresponding to the web endpoint '/admin/airflow/refresh' that already exists in Airflow - def refresh_dag(self, base_response): - logging.info("Executing custom 'refresh_dag' function") - dag_id = request.args.get('dag_id') - logging.info("dag_id to refresh: '" + str(dag_id) + "'") - if self.is_arg_not_provided(dag_id): - return REST_API_Response_Util.get_400_error_response(base_response, "dag_id should be provided") - elif " " in dag_id: - return REST_API_Response_Util.get_400_error_response(base_response, "dag_id contains spaces and is therefore an illegal argument") - - try: - from airflow.www.views import Airflow - # NOTE: The request argument 'dag_id' is required for the refresh() function to get the dag_id - refresh_result = Airflow().refresh() - logging.info("Refresh Result: " + str(refresh_result)) - except Exception as e: - error_message = "An error occurred while trying to Refresh the DAG '" + str(dag_id) + "': " + str(e) - logging.error(error_message) - return REST_API_Response_Util.get_500_error_response(base_response, error_message) - - return REST_API_Response_Util.get_200_response(base_response=base_response, output="DAG [{}] is now fresh as a daisy".format(dag_id)) - - # General execution of the airflow command passed to it and returns the response - @staticmethod - def execute_cli_command(airflow_cmd_split): - logging.info("Executing CLI Command") - - # There is a need to handle this case separately as the current implementation - # breaks the JSON string into multiple parts in a List and this cause the Airflow - # command to stop working properly - # - # The idea here is to handle the JSON string separately and to make use of the - # fact that the command will take on the following pattern: - # - # `airflow trigger_dag --config '{"message": "Hello World"}' test_1` - # - # , where test_1 is the name of the Dag - # - if airflow_cmd_split[2] == '--conf': - # Initialize list x and extract the JSON string - # from the airflow_cmd_split List - x = [] - for i in range(3, len(airflow_cmd_split) - 1): - x.append(airflow_cmd_split[i]) - - # Initialize list y - y = [None] * 5 - - # Assign values to list y - for j in range(0, 3): - y[j] = airflow_cmd_split[j] - - # Create string using list x and assigns to y[3] - y[3] = " ".join(x) - - # Dag name will always be airflow_cmd_split[-1] - y[4] = airflow_cmd_split[-1] - - # Assigns updated values to airflow_cmd_split - airflow_cmd_split = y - - logging.info(airflow_cmd_split) - - process = subprocess.Popen(airflow_cmd_split, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - process.wait() - return REST_API.collect_process_output(process) - - # gets and empty object that has all the fields a CLI function would have in it. - @staticmethod - def get_empty_process_output(): - return { - "stderr": "", - "stdin": "", - "stdout": "" - } - - # Get the output of the CLI process and package it in a dict - @staticmethod - def collect_process_output(process): - output = REST_API.get_empty_process_output() - if process.stderr is not None: - output["stderr"] = "" - for line in process.stderr.readlines(): - output["stderr"] += str(line) - if process.stdin is not None: - output["stdin"] = "" - for line in process.stdin.readlines(): - output["stdin"] += str(line) - if process.stdout is not None: - output["stdout"] = "" - for line in process.stdout.readlines(): - output["stdout"] += str(line) - logging.info("RestAPI Output: " + str(output)) - return output - - # Filtering out logging statements from the standard output - # Content like: - # - # [2017-04-19 10:04:34,927] {__init__.py:36} INFO - Using executor CeleryExecutor - # [2017-04-19 10:04:35,926] {models.py:154} INFO - Filling up the DagBag from /Users/... - @staticmethod - def filter_loading_messages(output): - stdout = output["stdout"] - new_stdout_array = stdout.split("\n") - content_to_remove_greatest_index = 0 - for index, content in enumerate(new_stdout_array): - if content.startswith("["): - content_to_remove_greatest_index = index - content_to_remove_greatest_index += 1 - if len(new_stdout_array) > content_to_remove_greatest_index: - new_stdout_array = new_stdout_array[content_to_remove_greatest_index:] - output["stdout"] = "\n".join(new_stdout_array) - return output - -# Creating View to be used by Plugin -rest_api_view = REST_API(category="Admin", name="REST API Plugin") - -# Creating Blueprint -rest_api_bp = Blueprint( - "rest_api_bp", - __name__, - template_folder='templates', - static_folder='static', - static_url_path='/static/' -) - - -# Creating the REST_API_Plugin which extends the AirflowPlugin so its imported into Airflow -class REST_API_Plugin(AirflowPlugin): - name = "rest_api" - operators = [] - flask_blueprints = [rest_api_bp] - hooks = [] - executors = [] - admin_views = [rest_api_view] - menu_links = [] diff --git a/src/bin/shipyard_airflow/tests/unit/control/test_actions_api.py b/src/bin/shipyard_airflow/tests/unit/control/test_actions_api.py index 8aa3a68a..6d662d36 100644 --- a/src/bin/shipyard_airflow/tests/unit/control/test_actions_api.py +++ b/src/bin/shipyard_airflow/tests/unit/control/test_actions_api.py @@ -450,20 +450,16 @@ def test_audit_control_command_db(mock_insert_action_audit): def test_invoke_airflow_dag_success(mock_info, mock_exhume_date): act_resource = ActionsResource() dag_id = 'test_dag_id' - action = 'test_action' + action = {'id': '123', 'user': 'unittester'} CONF = cfg.CONF web_server_url = CONF.base.web_server conf_value = {'action': action} log_string = 'Created