Merge "Set ULID of action on DAG request"

This commit is contained in:
Zuul 2018-08-15 16:35:39 +00:00 committed by Gerrit Code Review
commit 18ed6674d2
11 changed files with 27 additions and 830 deletions

View File

@ -1,42 +0,0 @@
{{/*
# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
*/}}
{{/*
Test Airflow for api functionality.
*/}}
{{- if .Values.manifests.test_airflow_api }}
{{- $envAll := . }}
---
apiVersion: v1
kind: Pod
metadata:
name: "{{ .Release.Name }}-airflow-api-test"
annotations:
"helm.sh/hook": "test-success"
labels:
{{ tuple $envAll "airflow" "api-test" | include "helm-toolkit.snippets.kubernetes_metadata_labels" | indent 4 }}
spec:
restartPolicy: Never
containers:
- name: "{{ .Release.Name }}-airflow-api-test"
env:
- name: 'AIRFLOW_URL'
value: {{ tuple "airflow_web" "internal" "airflow_web" . | include "helm-toolkit.endpoints.host_and_port_endpoint_uri_lookup" | quote }}
image: {{ .Values.images.tags.airflow }}
imagePullPolicy: {{ .Values.images.pull_policy }}
{{ tuple . .Values.pod.resources.test.airflow | include "helm-toolkit.snippets.kubernetes_resources" | indent 6 }}
command: ["/bin/bash", "-c", "curl -v -X GET --fail ${AIRFLOW_URL}/admin/rest_api/api?api=version; exit $?"]
...
{{- end }}

View File

@ -726,4 +726,3 @@ manifests:
service_airflow_worker: true
service_discovery_airflow_worker: true
test_shipyard_api: true
test_airflow_api: true

View File

@ -33,7 +33,7 @@
#
# The web server for Airflow (string value)
#web_server = http://localhost:32080/
#web_server = http://localhost:8080/
# Seconds to wait to connect to the airflow api (integer value)
#airflow_api_connect_timeout = 5

View File

@ -104,4 +104,3 @@ Post Deployment
.. _airship-in-a-bottle: https://git.airshipit.org/cgit/airship-in-a-bottle
.. _dev_minimal: https://git.airshipit.org/cgit/airship-in-a-bottle/tree/manifests/dev_minimal
.. _rest_api_plugin: https://git.airshipit.org/cgit/airship-shipyard/tree/src/bin/shipyard_airflow/shipyard_airflow/plugins/rest_api_plugin.py

View File

@ -1,4 +1,3 @@
[run]
# omit third party code and unit tests
omit =
shipyard_airflow/plugins/rest_api_plugin.py
omit = []

View File

@ -33,7 +33,7 @@
#
# The web server for Airflow (string value)
#web_server = http://localhost:32080/
#web_server = http://localhost:8080/
# Seconds to wait to connect to the airflow api (integer value)
#airflow_api_connect_timeout = 5

View File

@ -29,7 +29,7 @@ SECTIONS = [
options=[
cfg.StrOpt(
'web_server',
default='http://localhost:32080/',
default='http://localhost:8080/',
help='The web server for Airflow'
),
cfg.IntOpt(

View File

@ -13,6 +13,7 @@
# limitations under the License.
from datetime import datetime
import logging
import os
import falcon
import requests
@ -254,6 +255,8 @@ class ActionsResource(BaseResource):
# Retrieve URL
web_server_url = CONF.base.web_server
api_path = 'api/experimental/dags/{}/dag_runs'
req_url = os.path.join(web_server_url, api_path.format(dag_id))
c_timeout = CONF.base.airflow_api_connect_timeout
r_timeout = CONF.base.airflow_api_read_timeout
@ -266,15 +269,16 @@ class ActionsResource(BaseResource):
status=falcon.HTTP_503,
retry=True, )
else:
conf_value = {'action': action}
# No cache please
headers = {'Cache-Control': 'no-cache'}
# "conf" - JSON string that gets pickled into the DagRun's
# conf attribute
req_url = ('{}admin/rest_api/api?api=trigger_dag&dag_id={}'
'&conf={}'.format(web_server_url,
dag_id, self.to_json(conf_value)))
# conf attribute. The conf is passed as as a string of escaped
# json inside the json payload accepted by the API.
conf_value = self.to_json({'action': action})
payload = {'run_id': action['id'], 'conf': conf_value}
try:
resp = requests.get(req_url, timeout=(c_timeout, r_timeout))
resp = requests.post(req_url, timeout=(c_timeout, r_timeout),
headers=headers, json=payload)
LOG.info('Response code from Airflow trigger_dag: %s',
resp.status_code)
# any 4xx/5xx will be HTTPError, which are RequestException
@ -295,16 +299,14 @@ class ActionsResource(BaseResource):
retry=True, )
dag_time = self._exhume_date(dag_id,
response['output']['stdout'])
response['message'])
dag_execution_date = dag_time.strftime('%Y-%m-%dT%H:%M:%S')
return dag_execution_date
def _exhume_date(self, dag_id, log_string):
# TODO(bryan-strassner) refactor this to an airflow api client module
# we are unable to use the response time because that
# does not match the time when the dag was recorded.
# We have to parse the stdout returned to find the
# We have to parse the returned message to find the
# Created <DagRun {dag_id} @ {timestamp}
# e.g.
# ...- Created <DagRun deploy_site @ 2017-09-22 22:16:14: man...

View File

@ -1,753 +0,0 @@
# Original Source Code: https://github.com/teamclairvoyant/airflow-rest-api-plugin
# Date when airflow-rest-api-plugin source codes were downloaded: 2017-06-27
# Kept majority of the codes and removed those that are not needed for our purpose
from airflow.models import DagBag, DagModel
from airflow.plugins_manager import AirflowPlugin
from airflow import configuration
from airflow.www.app import csrf
from flask import Blueprint, request, jsonify
from flask_admin import BaseView, expose
from datetime import datetime
import airflow
import logging
import subprocess
import os
import socket
"""
CLIs this REST API exposes are Defined here: http://airflow.incubator.apache.org/cli.html
"""
# Location of the REST Endpoint
# Note: Changing this will only effect where the messages are posted to on the web interface and will not change where the endpoint actually resides
rest_api_endpoint = "/admin/rest_api/api"
# Getting Versions and Global variables
hostname = socket.gethostname()
airflow_version = airflow.__version__
# Getting configurations from airflow.cfg file
airflow_webserver_base_url = configuration.get('webserver', 'BASE_URL')
airflow_base_log_folder = configuration.get('core', 'BASE_LOG_FOLDER')
airflow_dags_folder = configuration.get('core', 'DAGS_FOLDER')
log_loading = configuration.getboolean("rest_api_plugin", "LOG_LOADING") if configuration.has_option("rest_api_plugin", "LOG_LOADING") else False
filter_loading_messages_in_cli_response = configuration.getboolean("rest_api_plugin", "FILTER_LOADING_MESSAGES_IN_CLI_RESPONSE") if configuration.has_option("rest_api_plugin", "FILTER_LOADING_MESSAGES_IN_CLI_RESPONSE") else True
# Using UTF-8 Encoding so that response messages don't have any characters in them that can't be handled
os.environ['PYTHONIOENCODING'] = 'utf-8'
"""
Metadata that defines a single API:
{
"name": "{string}", # Name of the API (cli command to be executed)
"description": "{string}", # Description of the API
"airflow_version": "{string}", # Version the API was available in to allow people to better determine if the API is available. (to be displayed on the Admin page)
"http_method": "{string}", # HTTP method to use when calling the function. (Default: GET) (Optional)
# removed "background_mode": {boolean}
"arguments": [ # List of arguments that can be provided to the API
{
"name": "{string}", # Name of the argument
"description": "{string}", # Description of the argument
"form_input_type": "{string}", # Type of input to use on the Admin page for the argument
"required": {boolean}, # Whether the argument is required upon submission
"cli_end_position": {int} # In the case with a CLI command that the arguments value should be appended on to the end (for example: airflow trigger_dag some_dag_id), this is the position that the argument should be provided in the CLI command. (Optional)
}
],
"fixed_arguments": [ # List of arguments that will always be used by the API endpoint and can't be changed
{
"name": "{string}", # Name of the argument
"description": "{string}", # Description of the argument
"fixed_value": "{string}" # Fixed value that will always be used
}
],
"post_arguments": [ # List of arguments that can be provided in the POST body to the API
{
"name": "{string}", # Name of the argument
"description": "{string}", # Description of the argument
"form_input_type": "{string}", # Type of input to use on the Admin page for the argument
"required": {boolean}, # Whether the argument is required upon submission
}
]
},
"""
# Metadata about the APIs and how to call them. Representing them like this allows us to dynamically generate the APIs
# in the admin page and dynamically execute them. This also allows us to easily add new ones.
# API Object definition is described in the comment block above.
apis_metadata = [
{
"name": "version",
"description": "Displays the version of Airflow you're using",
"airflow_version": "1.0.0 or greater",
"http_method": "GET",
"arguments": []
},
{
"name": "render",
"description": "Render a task instance's template(s)",
"airflow_version": "1.7.0 or greater",
"http_method": "GET",
"arguments": [
{"name": "dag_id", "description": "The id of the dag", "form_input_type": "text", "required": True, "cli_end_position": 1},
{"name": "task_id", "description": "The id of the task", "form_input_type": "text", "required": True, "cli_end_position": 2},
{"name": "execution_date", "description": "The execution date of the DAG (Example: 2017-01-02T03:04:05)", "form_input_type": "text", "required": True, "cli_end_position": 3},
{"name": "subdir", "description": "File location or directory from which to look for the dag", "form_input_type": "text", "required": False}
]
},
{
"name": "variables",
"description": "CRUD operations on variables",
"airflow_version": "1.7.1 or greater",
"http_method": "GET",
"arguments": [
{"name": "set", "description": "Set a variable. Expected input in the form: KEY VALUE.", "form_input_type": "text", "required": False},
{"name": "get", "description": "Get value of a variable", "form_input_type": "text", "required": False},
{"name": "json", "description": "Deserialize JSON variable", "form_input_type": "checkbox", "required": False},
{"name": "default", "description": "Default value returned if variable does not exist", "form_input_type": "text", "required": False},
{"name": "import", "description": "Import variables from JSON file", "form_input_type": "text", "required": False},
{"name": "export", "description": "Export variables to JSON file", "form_input_type": "text", "required": False},
{"name": "delete", "description": "Delete a variable", "form_input_type": "text", "required": False}
]
},
{
"name": "connections",
"description": "List/Add/Delete connections",
"airflow_version": "1.8.0 or greater",
"http_method": "GET",
"arguments": [
{"name": "list", "description": "List all connections", "form_input_type": "checkbox", "required": False},
{"name": "add", "description": "Add a connection", "form_input_type": "checkbox", "required": False},
{"name": "delete", "description": "Delete a connection", "form_input_type": "checkbox", "required": False},
{"name": "conn_id", "description": "Connection id, required to add/delete a connection", "form_input_type": "text", "required": False},
{"name": "conn_uri", "description": "Connection URI, required to add a connection", "form_input_type": "text", "required": False},
{"name": "conn_extra", "description": "Connection 'Extra' field, optional when adding a connection", "form_input_type": "text", "required": False}
]
},
{
"name": "pause",
"description": "Pauses a DAG",
"airflow_version": "1.7.0 or greater",
"http_method": "GET",
"arguments": [
{"name": "dag_id", "description": "The id of the dag", "form_input_type": "text", "required": True, "cli_end_position": 1},
{"name": "subdir", "description": "File location or directory from which to look for the dag", "form_input_type": "text", "required": False}
]
},
{
"name": "unpause",
"description": "Unpauses a DAG",
"airflow_version": "1.7.0 or greater",
"http_method": "GET",
"arguments": [
{"name": "dag_id", "description": "The id of the dag", "form_input_type": "text", "required": True, "cli_end_position": 1},
{"name": "subdir", "description": "File location or directory from which to look for the dag", "form_input_type": "text", "required": False}
]
},
{
"name": "task_failed_deps",
"description": "Returns the unmet dependencies for a task instance from the perspective of the scheduler. In other words, why a task instance doesn't get scheduled and then queued by the scheduler, and then run by an executor).",
"airflow_version": "1.8.0 or greater",
"http_method": "GET",
"arguments": [
{"name": "dag_id", "description": "The id of the dag", "form_input_type": "text", "required": True, "cli_end_position": 1},
{"name": "task_id", "description": "The id of the task", "form_input_type": "text", "required": True, "cli_end_position": 2},
{"name": "execution_date", "description": "The execution date of the DAG (Example: 2017-01-02T03:04:05)", "form_input_type": "text", "required": True, "cli_end_position": 3},
{"name": "subdir", "description": "File location or directory from which to look for the dag", "form_input_type": "text", "required": False}
]
},
{
"name": "trigger_dag",
"description": "Trigger a DAG run",
"airflow_version": "1.6.0 or greater",
"http_method": "GET",
"arguments": [
{"name": "dag_id", "description": "The id of the dag", "form_input_type": "text", "required": True, "cli_end_position": 1},
{"name": "subdir", "description": "File location or directory from which to look for the dag", "form_input_type": "text", "required": False},
{"name": "run_id", "description": "Helps to identify this run", "form_input_type": "text", "required": False},
{"name": "conf", "description": "JSON string that gets pickled into the DagRun's conf attribute", "form_input_type": "text", "required": False},
{"name": "exec_date", "description": "The execution date of the DAG", "form_input_type": "text", "required": False}
]
},
{
"name": "test",
"description": "Test a task instance. This will run a task without checking for dependencies or recording it's state in the database.",
"airflow_version": "0.1 or greater",
"http_method": "GET",
"arguments": [
{"name": "dag_id", "description": "The id of the dag", "form_input_type": "text", "required": True, "cli_end_position": 1},
{"name": "task_id", "description": "The id of the task", "form_input_type": "text", "required": True, "cli_end_position": 2},
{"name": "execution_date", "description": "The execution date of the DAG (Example: 2017-01-02T03:04:05)", "form_input_type": "text", "required": True, "cli_end_position": 3},
{"name": "subdir", "description": "File location or directory from which to look for the dag", "form_input_type": "text", "required": False},
{"name": "dry_run", "description": "Perform a dry run", "form_input_type": "checkbox", "required": False},
{"name": "task_params", "description": "Sends a JSON params dict to the task", "form_input_type": "text", "required": False}
]
},
{
"name": "dag_state",
"description": "Get the status of a dag run",
"airflow_version": "1.8.0 or greater",
"http_method": "GET",
"arguments": [
{"name": "dag_id", "description": "The id of the dag", "form_input_type": "text", "required": True, "cli_end_position": 1},
{"name": "execution_date", "description": "The execution date of the DAG (Example: 2017-01-02T03:04:05)", "form_input_type": "text", "required": True, "cli_end_position": 2},
{"name": "subdir", "description": "File location or directory from which to look for the dag", "form_input_type": "text", "required": False}
]
},
{
"name": "run",
"description": "Run a single task instance",
"airflow_version": "1.0.0 or greater",
"http_method": "GET",
"arguments": [
{"name": "dag_id", "description": "The id of the dag", "form_input_type": "text", "required": True, "cli_end_position": 1},
{"name": "task_id", "description": "The id of the task", "form_input_type": "text", "required": True, "cli_end_position": 2},
{"name": "execution_date", "description": "The execution date of the DAG (Example: 2017-01-02T03:04:05)", "form_input_type": "text", "required": True, "cli_end_position": 3},
{"name": "subdir", "description": "File location or directory from which to look for the dag", "form_input_type": "text", "required": False},
{"name": "mark_success", "description": "Mark jobs as succeeded without running them", "form_input_type": "checkbox", "required": False},
{"name": "force", "description": "Ignore previous task instance state, rerun regardless if task already succeede", "form_input_type": "checkbox", "required": False},
{"name": "pool", "description": "Resource pool to use", "form_input_type": "text", "required": False},
{"name": "cfg_path", "description": "Path to config file to use instead of airflow.cfg", "form_input_type": "text", "required": False},
{"name": "local", "description": "Run the task using the LocalExecutor", "form_input_type": "checkbox", "required": False},
{"name": "ignore_all_dependencies", "description": "Ignores all non-critical dependencies, including ignore_ti_state and ignore_task_depsstore_true", "form_input_type": "checkbox", "required": False},
{"name": "ignore_dependencies", "description": "Ignore task-specific dependencies, e.g. upstream, depends_on_past, and retry delay dependencies", "form_input_type": "checkbox", "required": False},
{"name": "ignore_depends_on_past", "description": "Ignore depends_on_past dependencies (but respect upstream dependencies)", "form_input_type": "checkbox", "required": False},
{"name": "ship_dag", "description": "Pickles (serializes) the DAG and ships it to the worker", "form_input_type": "checkbox", "required": False},
{"name": "pickle", "description": "Serialized pickle object of the entire dag (used internally)", "form_input_type": "text", "required": False},
]
},
{
"name": "list_tasks",
"description": "List the tasks within a DAG",
"airflow_version": "0.1 or greater",
"http_method": "GET",
"arguments": [
{"name": "dag_id", "description": "The id of the dag", "form_input_type": "text", "required": True, "cli_end_position": 1},
{"name": "tree", "description": "Tree view", "form_input_type": "checkbox", "required": False},
{"name": "subdir", "description": "File location or directory from which to look for the dag", "form_input_type": "text", "required": False}
]
},
{
"name": "backfill",
"description": "Run subsections of a DAG for a specified date range",
"airflow_version": "0.1 or greater",
"http_method": "GET",
"arguments": [
{"name": "dag_id", "description": "The id of the dag", "form_input_type": "text", "required": True, "cli_end_position": 1},
{"name": "task_regex", "description": "The regex to filter specific task_ids to backfill (optional)", "form_input_type": "text", "required": False},
{"name": "start_date", "description": "Override start_date YYYY-MM-DD. Either this or the end_date needs to be provided.", "form_input_type": "text", "required": False},
{"name": "end_date", "description": "Override end_date YYYY-MM-DD. Either this or the start_date needs to be provided.", "form_input_type": "text", "required": False},
{"name": "mark_success", "description": "Mark jobs as succeeded without running them", "form_input_type": "checkbox", "required": False},
{"name": "local", "description": "Run the task using the LocalExecutor", "form_input_type": "checkbox", "required": False},
{"name": "donot_pickle", "description": "Do not attempt to pickle the DAG object to send over to the workers, just tell the workers to run their version of the code.", "form_input_type": "checkbox", "required": False},
{"name": "include_adhoc", "description": "Include dags with the adhoc argument.", "form_input_type": "checkbox", "required": False},
{"name": "ignore_dependencies", "description": "Ignore task-specific dependencies, e.g. upstream, depends_on_past, and retry delay dependencies", "form_input_type": "checkbox", "required": False},
{"name": "ignore_first_depends_on_past", "description": "Ignores depends_on_past dependencies for the first set of tasks only (subsequent executions in the backfill DO respect depends_on_past).", "form_input_type": "checkbox", "required": False},
{"name": "subdir", "description": "File location or directory from which to look for the dag", "form_input_type": "text", "required": False},
{"name": "pool", "description": "Resource pool to use", "form_input_type": "text", "required": False},
{"name": "dry_run", "description": "Perform a dry run", "form_input_type": "checkbox", "required": False}
]
},
{
"name": "list_dags",
"description": "List all the DAGs",
"airflow_version": "0.1 or greater",
"http_method": "GET",
"arguments": [
{"name": "subdir", "description": "File location or directory from which to look for the dag", "form_input_type": "text", "required": False},
{"name": "report", "description": "Show DagBag loading report", "form_input_type": "checkbox", "required": False}
]
},
{
"name": "task_state",
"description": "Get the status of a task instance",
"airflow_version": "1.0.0 or greater",
"http_method": "GET",
"arguments": [
{"name": "dag_id", "description": "The id of the dag", "form_input_type": "text", "required": True, "cli_end_position": 1},
{"name": "task_id", "description": "The id of the task", "form_input_type": "text", "required": True, "cli_end_position": 2},
{"name": "execution_date", "description": "The execution date of the DAG (Example: 2017-01-02T03:04:05)", "form_input_type": "text", "required": True, "cli_end_position": 3},
{"name": "subdir", "description": "File location or directory from which to look for the dag", "form_input_type": "text", "required": False}
]
},
{
"name": "pool",
"description": "CRUD operations on pools",
"airflow_version": "1.8.0 or greater",
"http_method": "GET",
"arguments": [
{"name": "set", "description": "Set pool slot count and description, respectively. Expected input in the form: NAME SLOT_COUNT POOL_DESCRIPTION.", "form_input_type": "text", "required": False},
{"name": "get", "description": "Get pool info", "form_input_type": "text", "required": False},
{"name": "delete", "description": "Delete a pool", "form_input_type": "text", "required": False}
]
},
# removed retrieval of logs from this interface, as it uses a background
# mode call, we don't use it, and lets us remove a security vulnerability
{
"name": "clear",
"description": "Clear a set of task instance, as if they never ran",
"airflow_version": "0.1 or greater",
"http_method": "GET",
"arguments": [
{"name": "dag_id", "description": "The id of the dag", "form_input_type": "text", "required": True, "cli_end_position": 1},
{"name": "task_regex", "description": "The regex to filter specific task_ids to backfill (optional)", "form_input_type": "text", "required": False},
{"name": "start_date", "description": "Override start_date YYYY-MM-DD", "form_input_type": "text", "required": False},
{"name": "end_date", "description": "Override end_date YYYY-MM-DD", "form_input_type": "text", "required": False},
{"name": "subdir", "description": "File location or directory from which to look for the dag", "form_input_type": "text", "required": False},
{"name": "upstream", "description": "Include upstream tasks", "form_input_type": "checkbox", "required": False},
{"name": "downstream", "description": "Include downstream tasks", "form_input_type": "checkbox", "required": False},
{"name": "only_failed", "description": "Only failed jobs", "form_input_type": "checkbox", "required": False},
{"name": "only_running", "description": "Only running jobs", "form_input_type": "checkbox", "required": False},
{"name": "exclude_subdags", "description": "Exclude subdags", "form_input_type": "checkbox", "required": False}
],
"fixed_arguments": [
{"name": "no_confirm", "description": "Do not request confirmation", "fixed_value": ""}
],
},
{
"name": "deploy_dag",
"description": "Deploy a new DAG File to the DAGs directory",
"airflow_version": "None - Custom API",
"http_method": "POST",
"post_body_description": "dag_file - POST Body Element - REQUIRED",
"form_enctype": "multipart/form-data",
"arguments": [],
"post_arguments": [
{"name": "dag_file", "description": "Python file to upload and deploy", "form_input_type": "file", "required": True},
{"name": "force", "description": "Whether to forcefully upload the file if the file already exists or not", "form_input_type": "checkbox", "required": False},
{"name": "pause", "description": "The DAG will be forced to be paused when created and override the 'dags_are_paused_at_creation' config.", "form_input_type": "checkbox", "required": False},
{"name": "unpause", "description": "The DAG will be forced to be unpaused when created and override the 'dags_are_paused_at_creation' config.", "form_input_type": "checkbox", "required": False}
]
},
{
"name": "refresh_dag",
"description": "Refresh a DAG in the Web Server",
"airflow_version": "None - Custom API",
"http_method": "GET",
"arguments": [
{"name": "dag_id", "description": "The id of the dag", "form_input_type": "text", "required": True}
]
}
]
# Utility for creating the REST Responses
class REST_API_Response_Util():
# Gets the Base Response object with all required response fields included. To be used at the beginning of the REST Call.
@staticmethod
def get_base_response(status="OK", http_response_code=200, call_time=datetime.now(), include_arguments=True):
base_response = {"status": status, "http_response_code": http_response_code, "call_time": call_time}
if include_arguments:
base_response["arguments"] = request.args
base_response["post_arguments"] = request.form
return base_response
# Finalize the Base Response with additional data
@staticmethod
def _get_final_response(base_response, output=None, airflow_cmd=None, http_response_code=None, warning=None):
final_response = base_response
final_response["response_time"] = datetime.now()
if output:
final_response["output"] = output
if airflow_cmd:
final_response["airflow_cmd"] = airflow_cmd
if http_response_code:
final_response["http_response_code"] = http_response_code
if warning:
final_response["warning"] = warning
return jsonify(final_response)
# Set the Base Response as a 200 HTTP Response object
@staticmethod
def get_200_response(base_response, output=None, airflow_cmd=None, warning=None):
logging.info("Returning a 200 Response Code with response '" + str(output) + "'")
return REST_API_Response_Util._get_final_response(base_response=base_response, output=output, airflow_cmd=airflow_cmd, warning=warning)
# Set the Base Response and an Error
@staticmethod
def _get_error_response(base_response, error_code, output=None):
base_response["status"] = "ERROR"
return REST_API_Response_Util._get_final_response(base_response=base_response, output=output, http_response_code=error_code), error_code
# Set the Base Response as a 400 HTTP Response object
@staticmethod
def get_400_error_response(base_response, output=None):
logging.warning("Returning a 400 Response Code with response '" + str(output) + "'")
return REST_API_Response_Util._get_error_response(base_response, 400, output)
# Set the Base Response as a 403 HTTP Response object
@staticmethod
def get_403_error_response(base_response, output=None):
logging.warning("Returning a 403 Response Code with response '" + str(output) + "'")
return REST_API_Response_Util._get_error_response(base_response, 403, output)
# Set the Base Response as a 500 HTTP Response object
@staticmethod
def get_500_error_response(base_response, output=None):
logging.warning("Returning a 500 Response Code with response '" + str(output) + "'")
return REST_API_Response_Util._get_error_response(base_response, 500, output)
# REST_API View which extends the flask_admin BaseView
class REST_API(BaseView):
# Checks a string object to see if it is none or empty so we can determine if an argument (passed to the rest api) is provided
@staticmethod
def is_arg_not_provided(arg):
return arg is None or arg == ""
# Get the DagBag which has a list of all the current Dags
@staticmethod
def get_dagbag():
return DagBag()
# '/' Endpoint where the Admin page is which allows you to view the APIs available and trigger them
@expose('/')
def index(self):
logging.info("REST_API.index() called")
# get the information that we want to display on the page regarding the dags that are available
dagbag = self.get_dagbag()
dags = []
for dag_id in dagbag.dags:
orm_dag = DagModel.get_current(dag_id)
dags.append({
"dag_id": dag_id,
"is_active": (not orm_dag.is_paused) if orm_dag is not None else False
})
return self.render("rest_api_plugin/index.html",
dags=dags,
airflow_webserver_base_url=airflow_webserver_base_url,
rest_api_endpoint=rest_api_endpoint,
apis_metadata=apis_metadata,
airflow_version=airflow_version,
)
# '/api' REST Endpoint where API requests should all come in
@csrf.exempt # Exempt the CSRF token
@expose('/api', methods=["GET", "POST"])
def api(self):
base_response = REST_API_Response_Util.get_base_response()
# Get the api that you want to execute
api = request.args.get('api')
if api is not None:
api = api.strip().lower()
logging.info("REST_API.api() called (api: " + str(api) + ")")
# Validate that the API is provided
if self.is_arg_not_provided(api):
logging.warning("api argument not provided")
return REST_API_Response_Util.get_400_error_response(base_response, "API should be provided")
# Get the api_metadata from the api object list that correcsponds to the api we want to run to get the metadata.
api_metadata = None
for test_api_metadata in apis_metadata:
if test_api_metadata["name"] == api:
api_metadata = test_api_metadata
if api_metadata is None:
logging.info("api '" + str(api) + "' was not found in the apis list in the REST API Plugin")
return REST_API_Response_Util.get_400_error_response(base_response, "API '" + str(api) + "' was not found")
# check if all the required arguments are provided
missing_required_arguments = []
dag_id = None
for argument in api_metadata["arguments"]:
argument_name = argument["name"]
argument_value = request.args.get(argument_name)
if argument["required"]:
if self.is_arg_not_provided(argument_value):
missing_required_arguments.append(argument_name)
if argument_name == "dag_id" and argument_value is not None:
dag_id = argument_value.strip()
if len(missing_required_arguments) > 0:
logging.warning("Missing required arguments: " + str(missing_required_arguments))
return REST_API_Response_Util.get_400_error_response(base_response, "The argument(s) " + str(missing_required_arguments) + " are required")
# Check to make sure that the DAG you're referring to, already exists.
dag_bag = self.get_dagbag()
if dag_id is not None and dag_id not in dag_bag.dags:
logging.info("DAG_ID '" + str(dag_id) + "' was not found in the DagBag list '" + str(dag_bag.dags) + "'")
return REST_API_Response_Util.get_400_error_response(base_response, "The DAG ID '" + str(dag_id) + "' does not exist")
# Deciding which function to use based off the API object that was requested. Some functions are custom and need to be manually routed to.
if api == "version":
final_response = self.version(base_response)
elif api == "rest_api_plugin_version":
final_response = self.rest_api_plugin_version(base_response)
elif api == "deploy_dag":
final_response = self.deploy_dag(base_response)
elif api == "refresh_dag":
final_response = self.refresh_dag(base_response)
else:
final_response = self.execute_cli(base_response, api_metadata)
return final_response
# General execution of a CLI command
# A command will be assembled and then passed to the OS as a commandline function and the results will be returned
def execute_cli(self, base_response, api_metadata):
logging.info("Executing cli function")
# getting the largest cli_end_position in the api_metadata object so that the cli function can be assembled
largest_end_argument_value = 0
for argument in api_metadata.get("arguments", []):
if argument.get("cli_end_position") is not None and argument["cli_end_position"] > largest_end_argument_value:
largest_end_argument_value = argument["cli_end_position"]
# starting to create the airflow_cmd function
airflow_cmd_split = ["airflow", api_metadata["name"]]
# appending arguments to the airflow_cmd_split array and setting arguments aside in the end_arguments array to be appended onto the end of airflow_cmd_split
end_arguments = [0] * largest_end_argument_value
for argument in api_metadata["arguments"]:
argument_name = argument["name"]
argument_value = request.args.get(argument_name)
logging.info("argument_name: " + str(argument_name) + ", argument_value: " + str(argument_value))
if argument_value is not None:
# if the argument should be appended onto the end, find the position and add it to the end_arguments array
if "cli_end_position" in argument:
logging.info("argument['cli_end_position']: " + str(argument['cli_end_position']))
end_arguments[argument["cli_end_position"]-1] = argument_value
else:
airflow_cmd_split.extend(["--" + argument_name])
if argument["form_input_type"] is not "checkbox":
airflow_cmd_split.extend(argument_value.split(" "))
else:
logging.warning("argument_value is null")
# appending fixed arguments that should always be provided to the APIs
for fixed_argument in api_metadata.get("fixed_arguments", []):
fixed_argument_name = fixed_argument["name"]
fixed_argument_value = fixed_argument.get("fixed_value")
logging.info("fixed_argument_name: " + str(fixed_argument_name) + ", fixed_argument_value: " + str(fixed_argument_value))
if fixed_argument_value is not None:
airflow_cmd_split.extend(["--" + fixed_argument_name])
if fixed_argument_value:
airflow_cmd_split.extend(fixed_argument_value.split(" "))
# appending the end_arguments to the very end
airflow_cmd_split.extend(end_arguments)
# joining all the individual arguments and components into a single string
airflow_cmd = " ".join(airflow_cmd_split)
logging.info("airflow_cmd array: " + str(airflow_cmd_split))
logging.info("airflow_cmd: " + str(airflow_cmd))
output = self.execute_cli_command(airflow_cmd_split)
# if desired, filter out the loading messages to reduce the noise in the output
if filter_loading_messages_in_cli_response:
logging.info("Filtering Loading Messages from the CLI Response")
output = self.filter_loading_messages(output)
return REST_API_Response_Util.get_200_response(base_response=base_response, output=output, airflow_cmd=airflow_cmd)
# Custom function for the version API
def version(self, base_response):
logging.info("Executing custom 'version' function")
return REST_API_Response_Util.get_200_response(base_response, airflow_version)
# Custom Function for the deploy_dag API
def deploy_dag(self, base_response):
logging.info("Executing custom 'deploy_dag' function")
if 'dag_file' not in request.files or request.files['dag_file'].filename == '': # check if the post request has the file part
logging.warning("The dag_file argument wasn't provided")
return REST_API_Response_Util.get_400_error_response(base_response, "dag_file should be provided")
dag_file = request.files['dag_file']
force = True if request.form.get('force') is not None else False
logging.info("deploy_dag force upload: " + str(force))
pause = True if request.form.get('pause') is not None else False
logging.info("deploy_dag in pause state: " + str(pause))
unpause = True if request.form.get('unpause') is not None else False
logging.info("deploy_dag in unpause state: " + str(unpause))
# make sure that the dag_file is a python script
if dag_file and dag_file.filename.endswith(".py"):
save_file_path = os.path.join(airflow_dags_folder, dag_file.filename)
# Check if the file already exists.
if os.path.isfile(save_file_path) and not force:
logging.warning("File to upload already exists")
return REST_API_Response_Util.get_400_error_response(base_response, "The file '" + save_file_path + "' already exists on host '" + hostname + "'.")
logging.info("Saving file to '" + save_file_path + "'")
dag_file.save(save_file_path)
else:
logging.warning("deploy_dag file is not a python file. It does not end with a .py.")
return REST_API_Response_Util.get_400_error_response(base_response, "dag_file is not a *.py file")
warning = None
# if both the pause and unpause options are provided then skip the pausing and unpausing phase
if not (pause and unpause):
if pause or unpause:
try:
# import the DAG file that was uploaded so that we can get the DAG_ID to execute the command to pause or unpause it
import imp
dag_file = imp.load_source('module.name', save_file_path)
dag_id = dag_file.dag.dag_id
# run the pause or unpause cli command
airflow_cmd_split = []
if pause:
airflow_cmd_split = ["airflow", "pause", dag_id]
if unpause:
airflow_cmd_split = ["airflow", "unpause", dag_id]
cli_output = self.execute_cli_command(airflow_cmd_split)
except Exception as e:
warning = "Failed to set the state (pause, unpause) of the DAG: " + str(e)
logging.warning(warning)
else:
warning = "Both options pause and unpause were given. Skipping setting the state (pause, unpause) of the DAG."
logging.warning(warning)
return REST_API_Response_Util.get_200_response(base_response=base_response, output="DAG File [{}] has been uploaded".format(dag_file), warning=warning)
# Custom Function for the refresh_dag API
# This will call the direct function corresponding to the web endpoint '/admin/airflow/refresh' that already exists in Airflow
def refresh_dag(self, base_response):
logging.info("Executing custom 'refresh_dag' function")
dag_id = request.args.get('dag_id')
logging.info("dag_id to refresh: '" + str(dag_id) + "'")
if self.is_arg_not_provided(dag_id):
return REST_API_Response_Util.get_400_error_response(base_response, "dag_id should be provided")
elif " " in dag_id:
return REST_API_Response_Util.get_400_error_response(base_response, "dag_id contains spaces and is therefore an illegal argument")
try:
from airflow.www.views import Airflow
# NOTE: The request argument 'dag_id' is required for the refresh() function to get the dag_id
refresh_result = Airflow().refresh()
logging.info("Refresh Result: " + str(refresh_result))
except Exception as e:
error_message = "An error occurred while trying to Refresh the DAG '" + str(dag_id) + "': " + str(e)
logging.error(error_message)
return REST_API_Response_Util.get_500_error_response(base_response, error_message)
return REST_API_Response_Util.get_200_response(base_response=base_response, output="DAG [{}] is now fresh as a daisy".format(dag_id))
# General execution of the airflow command passed to it and returns the response
@staticmethod
def execute_cli_command(airflow_cmd_split):
logging.info("Executing CLI Command")
# There is a need to handle this case separately as the current implementation
# breaks the JSON string into multiple parts in a List and this cause the Airflow
# command to stop working properly
#
# The idea here is to handle the JSON string separately and to make use of the
# fact that the command will take on the following pattern:
#
# `airflow trigger_dag --config '{"message": "Hello World"}' test_1`
#
# , where test_1 is the name of the Dag
#
if airflow_cmd_split[2] == '--conf':
# Initialize list x and extract the JSON string
# from the airflow_cmd_split List
x = []
for i in range(3, len(airflow_cmd_split) - 1):
x.append(airflow_cmd_split[i])
# Initialize list y
y = [None] * 5
# Assign values to list y
for j in range(0, 3):
y[j] = airflow_cmd_split[j]
# Create string using list x and assigns to y[3]
y[3] = " ".join(x)
# Dag name will always be airflow_cmd_split[-1]
y[4] = airflow_cmd_split[-1]
# Assigns updated values to airflow_cmd_split
airflow_cmd_split = y
logging.info(airflow_cmd_split)
process = subprocess.Popen(airflow_cmd_split, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
process.wait()
return REST_API.collect_process_output(process)
# gets and empty object that has all the fields a CLI function would have in it.
@staticmethod
def get_empty_process_output():
return {
"stderr": "",
"stdin": "",
"stdout": ""
}
# Get the output of the CLI process and package it in a dict
@staticmethod
def collect_process_output(process):
output = REST_API.get_empty_process_output()
if process.stderr is not None:
output["stderr"] = ""
for line in process.stderr.readlines():
output["stderr"] += str(line)
if process.stdin is not None:
output["stdin"] = ""
for line in process.stdin.readlines():
output["stdin"] += str(line)
if process.stdout is not None:
output["stdout"] = ""
for line in process.stdout.readlines():
output["stdout"] += str(line)
logging.info("RestAPI Output: " + str(output))
return output
# Filtering out logging statements from the standard output
# Content like:
#
# [2017-04-19 10:04:34,927] {__init__.py:36} INFO - Using executor CeleryExecutor
# [2017-04-19 10:04:35,926] {models.py:154} INFO - Filling up the DagBag from /Users/...
@staticmethod
def filter_loading_messages(output):
stdout = output["stdout"]
new_stdout_array = stdout.split("\n")
content_to_remove_greatest_index = 0
for index, content in enumerate(new_stdout_array):
if content.startswith("["):
content_to_remove_greatest_index = index
content_to_remove_greatest_index += 1
if len(new_stdout_array) > content_to_remove_greatest_index:
new_stdout_array = new_stdout_array[content_to_remove_greatest_index:]
output["stdout"] = "\n".join(new_stdout_array)
return output
# Creating View to be used by Plugin
rest_api_view = REST_API(category="Admin", name="REST API Plugin")
# Creating Blueprint
rest_api_bp = Blueprint(
"rest_api_bp",
__name__,
template_folder='templates',
static_folder='static',
static_url_path='/static/'
)
# Creating the REST_API_Plugin which extends the AirflowPlugin so its imported into Airflow
class REST_API_Plugin(AirflowPlugin):
name = "rest_api"
operators = []
flask_blueprints = [rest_api_bp]
hooks = []
executors = []
admin_views = [rest_api_view]
menu_links = []

View File

@ -450,20 +450,16 @@ def test_audit_control_command_db(mock_insert_action_audit):
def test_invoke_airflow_dag_success(mock_info, mock_exhume_date):
act_resource = ActionsResource()
dag_id = 'test_dag_id'
action = 'test_action'
action = {'id': '123', 'user': 'unittester'}
CONF = cfg.CONF
web_server_url = CONF.base.web_server
conf_value = {'action': action}
log_string = 'Created <DagRun deploy_site @ 2017-09-22 22:16:14: man'
responses.add(
method='GET',
url='{}admin/rest_api/api?api=trigger_dag&dag_id={}&conf={}'.format(
web_server_url, dag_id, act_resource.to_json(conf_value)),
body=json.dumps({
'output': {
'stdout': log_string
}
}),
method='POST',
url='{}api/experimental/dags/{}/dag_runs'.format(
web_server_url, dag_id),
body=json.dumps({'message': log_string}),
status=200,
content_type='application/json')
@ -477,14 +473,13 @@ def test_invoke_airflow_dag_success(mock_info, mock_exhume_date):
def test_invoke_airflow_dag_errors(mock_info):
act_resource = ActionsResource()
dag_id = 'test_dag_id'
action = 'test_action'
action = {'id': '123', 'user': 'unittester'}
web_server_url = CONF.base.web_server
conf_value = {'action': action}
responses.add(
method='GET',
url='{}admin/rest_api/api?api=trigger_dag&dag_id={}'
'&conf={}'.format(web_server_url, dag_id,
act_resource.to_json(conf_value)),
method='POST',
url='{}api/experimental/dags/{}/dag_runs'.format(
web_server_url, dag_id),
body=json.dumps({
"error": "not found"
}),

View File

@ -35,11 +35,9 @@ commands = flake8 {posargs}
[testenv:bandit]
skipsdist=True
# NOTE(Bryan Strassner) ignoring airflow plugin which uses a subexec
commands =
bandit \
-r shipyard_airflow \
-x plugins/rest_api_plugin.py \
-n 5
[testenv:genconfig]
@ -66,4 +64,4 @@ filename = *.py
ignore = F841, H101, H201, H210, H238, H301, H304, H306, H401, H403, H404, H405
# NOTE(Bryan Strassner) excluding 3rd party and generated code that is brought into the
# codebase.
exclude = .venv,.git,.tox,build,dist,*plugins/rest_api_plugin.py,*lib/python*,*egg,alembic/env.py,docs
exclude = .venv,.git,.tox,build,dist,*lib/python*,*egg,alembic/env.py,docs