Set ULID of action on DAG request
Sets the run_id for a DAG invoked in Airflow to the same ULID assigned to it in Shipyard. While this was already happening as a parameter to the DAG being invoked, by making it the run_id, further correlation is possible, at a level that both Shipyard and the Airflow framework are aware. As part of making this change, fragility was uncovered in the rest_api_plugin that expedited the need to switch to the built-in, but experimental airflow API to trigger a dag (one of two API endpoints provided - this is important later in this story). In any case, the 3rd party rest_api_plugin was removed. As a result of the rest_api_plugin being removed: 1) the simpleton helm test to check the api of airflow was also removed (it used the version endpoint of this plugin). As the built-in api provides no version endpoint or similarly accessible-without-being-stateful endpoint, the helm test had no new place to look for something to call. 2) Some clean up of exclusions and documentation was possible - test coverage, security exclusions, left over documentation remnants Change-Id: I0b68496a8500408b776b4acc12888aa017c4c7d2
This commit is contained in:
parent
f32dc97272
commit
e59fb314c1
|
@ -1,42 +0,0 @@
|
||||||
{{/*
|
|
||||||
# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
# you may not use this file except in compliance with the License.
|
|
||||||
# You may obtain a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
*/}}
|
|
||||||
{{/*
|
|
||||||
Test Airflow for api functionality.
|
|
||||||
*/}}
|
|
||||||
{{- if .Values.manifests.test_airflow_api }}
|
|
||||||
{{- $envAll := . }}
|
|
||||||
---
|
|
||||||
apiVersion: v1
|
|
||||||
kind: Pod
|
|
||||||
metadata:
|
|
||||||
name: "{{ .Release.Name }}-airflow-api-test"
|
|
||||||
annotations:
|
|
||||||
"helm.sh/hook": "test-success"
|
|
||||||
labels:
|
|
||||||
{{ tuple $envAll "airflow" "api-test" | include "helm-toolkit.snippets.kubernetes_metadata_labels" | indent 4 }}
|
|
||||||
spec:
|
|
||||||
restartPolicy: Never
|
|
||||||
containers:
|
|
||||||
- name: "{{ .Release.Name }}-airflow-api-test"
|
|
||||||
env:
|
|
||||||
- name: 'AIRFLOW_URL'
|
|
||||||
value: {{ tuple "airflow_web" "internal" "airflow_web" . | include "helm-toolkit.endpoints.host_and_port_endpoint_uri_lookup" | quote }}
|
|
||||||
image: {{ .Values.images.tags.airflow }}
|
|
||||||
imagePullPolicy: {{ .Values.images.pull_policy }}
|
|
||||||
{{ tuple . .Values.pod.resources.test.airflow | include "helm-toolkit.snippets.kubernetes_resources" | indent 6 }}
|
|
||||||
command: ["/bin/bash", "-c", "curl -v -X GET --fail ${AIRFLOW_URL}/admin/rest_api/api?api=version; exit $?"]
|
|
||||||
...
|
|
||||||
{{- end }}
|
|
|
@ -727,4 +727,3 @@ manifests:
|
||||||
service_airflow_worker: true
|
service_airflow_worker: true
|
||||||
service_discovery_airflow_worker: true
|
service_discovery_airflow_worker: true
|
||||||
test_shipyard_api: true
|
test_shipyard_api: true
|
||||||
test_airflow_api: true
|
|
||||||
|
|
|
@ -33,7 +33,7 @@
|
||||||
#
|
#
|
||||||
|
|
||||||
# The web server for Airflow (string value)
|
# The web server for Airflow (string value)
|
||||||
#web_server = http://localhost:32080/
|
#web_server = http://localhost:8080/
|
||||||
|
|
||||||
# Seconds to wait to connect to the airflow api (integer value)
|
# Seconds to wait to connect to the airflow api (integer value)
|
||||||
#airflow_api_connect_timeout = 5
|
#airflow_api_connect_timeout = 5
|
||||||
|
|
|
@ -104,4 +104,3 @@ Post Deployment
|
||||||
|
|
||||||
.. _airship-in-a-bottle: https://git.airshipit.org/cgit/airship-in-a-bottle
|
.. _airship-in-a-bottle: https://git.airshipit.org/cgit/airship-in-a-bottle
|
||||||
.. _dev_minimal: https://git.airshipit.org/cgit/airship-in-a-bottle/tree/manifests/dev_minimal
|
.. _dev_minimal: https://git.airshipit.org/cgit/airship-in-a-bottle/tree/manifests/dev_minimal
|
||||||
.. _rest_api_plugin: https://git.airshipit.org/cgit/airship-shipyard/tree/src/bin/shipyard_airflow/shipyard_airflow/plugins/rest_api_plugin.py
|
|
||||||
|
|
|
@ -1,4 +1,3 @@
|
||||||
[run]
|
[run]
|
||||||
# omit third party code and unit tests
|
# omit third party code and unit tests
|
||||||
omit =
|
omit = []
|
||||||
shipyard_airflow/plugins/rest_api_plugin.py
|
|
||||||
|
|
|
@ -33,7 +33,7 @@
|
||||||
#
|
#
|
||||||
|
|
||||||
# The web server for Airflow (string value)
|
# The web server for Airflow (string value)
|
||||||
#web_server = http://localhost:32080/
|
#web_server = http://localhost:8080/
|
||||||
|
|
||||||
# Seconds to wait to connect to the airflow api (integer value)
|
# Seconds to wait to connect to the airflow api (integer value)
|
||||||
#airflow_api_connect_timeout = 5
|
#airflow_api_connect_timeout = 5
|
||||||
|
|
|
@ -29,7 +29,7 @@ SECTIONS = [
|
||||||
options=[
|
options=[
|
||||||
cfg.StrOpt(
|
cfg.StrOpt(
|
||||||
'web_server',
|
'web_server',
|
||||||
default='http://localhost:32080/',
|
default='http://localhost:8080/',
|
||||||
help='The web server for Airflow'
|
help='The web server for Airflow'
|
||||||
),
|
),
|
||||||
cfg.IntOpt(
|
cfg.IntOpt(
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
import logging
|
import logging
|
||||||
|
import os
|
||||||
|
|
||||||
import falcon
|
import falcon
|
||||||
import requests
|
import requests
|
||||||
|
@ -254,6 +255,8 @@ class ActionsResource(BaseResource):
|
||||||
|
|
||||||
# Retrieve URL
|
# Retrieve URL
|
||||||
web_server_url = CONF.base.web_server
|
web_server_url = CONF.base.web_server
|
||||||
|
api_path = 'api/experimental/dags/{}/dag_runs'
|
||||||
|
req_url = os.path.join(web_server_url, api_path.format(dag_id))
|
||||||
c_timeout = CONF.base.airflow_api_connect_timeout
|
c_timeout = CONF.base.airflow_api_connect_timeout
|
||||||
r_timeout = CONF.base.airflow_api_read_timeout
|
r_timeout = CONF.base.airflow_api_read_timeout
|
||||||
|
|
||||||
|
@ -266,15 +269,16 @@ class ActionsResource(BaseResource):
|
||||||
status=falcon.HTTP_503,
|
status=falcon.HTTP_503,
|
||||||
retry=True, )
|
retry=True, )
|
||||||
else:
|
else:
|
||||||
conf_value = {'action': action}
|
# No cache please
|
||||||
|
headers = {'Cache-Control': 'no-cache'}
|
||||||
# "conf" - JSON string that gets pickled into the DagRun's
|
# "conf" - JSON string that gets pickled into the DagRun's
|
||||||
# conf attribute
|
# conf attribute. The conf is passed as as a string of escaped
|
||||||
req_url = ('{}admin/rest_api/api?api=trigger_dag&dag_id={}'
|
# json inside the json payload accepted by the API.
|
||||||
'&conf={}'.format(web_server_url,
|
conf_value = self.to_json({'action': action})
|
||||||
dag_id, self.to_json(conf_value)))
|
payload = {'run_id': action['id'], 'conf': conf_value}
|
||||||
|
|
||||||
try:
|
try:
|
||||||
resp = requests.get(req_url, timeout=(c_timeout, r_timeout))
|
resp = requests.post(req_url, timeout=(c_timeout, r_timeout),
|
||||||
|
headers=headers, json=payload)
|
||||||
LOG.info('Response code from Airflow trigger_dag: %s',
|
LOG.info('Response code from Airflow trigger_dag: %s',
|
||||||
resp.status_code)
|
resp.status_code)
|
||||||
# any 4xx/5xx will be HTTPError, which are RequestException
|
# any 4xx/5xx will be HTTPError, which are RequestException
|
||||||
|
@ -295,16 +299,14 @@ class ActionsResource(BaseResource):
|
||||||
retry=True, )
|
retry=True, )
|
||||||
|
|
||||||
dag_time = self._exhume_date(dag_id,
|
dag_time = self._exhume_date(dag_id,
|
||||||
response['output']['stdout'])
|
response['message'])
|
||||||
dag_execution_date = dag_time.strftime('%Y-%m-%dT%H:%M:%S')
|
dag_execution_date = dag_time.strftime('%Y-%m-%dT%H:%M:%S')
|
||||||
return dag_execution_date
|
return dag_execution_date
|
||||||
|
|
||||||
def _exhume_date(self, dag_id, log_string):
|
def _exhume_date(self, dag_id, log_string):
|
||||||
# TODO(bryan-strassner) refactor this to an airflow api client module
|
|
||||||
|
|
||||||
# we are unable to use the response time because that
|
# we are unable to use the response time because that
|
||||||
# does not match the time when the dag was recorded.
|
# does not match the time when the dag was recorded.
|
||||||
# We have to parse the stdout returned to find the
|
# We have to parse the returned message to find the
|
||||||
# Created <DagRun {dag_id} @ {timestamp}
|
# Created <DagRun {dag_id} @ {timestamp}
|
||||||
# e.g.
|
# e.g.
|
||||||
# ...- Created <DagRun deploy_site @ 2017-09-22 22:16:14: man...
|
# ...- Created <DagRun deploy_site @ 2017-09-22 22:16:14: man...
|
||||||
|
|
|
@ -1,753 +0,0 @@
|
||||||
# Original Source Code: https://github.com/teamclairvoyant/airflow-rest-api-plugin
|
|
||||||
# Date when airflow-rest-api-plugin source codes were downloaded: 2017-06-27
|
|
||||||
# Kept majority of the codes and removed those that are not needed for our purpose
|
|
||||||
|
|
||||||
from airflow.models import DagBag, DagModel
|
|
||||||
from airflow.plugins_manager import AirflowPlugin
|
|
||||||
from airflow import configuration
|
|
||||||
from airflow.www.app import csrf
|
|
||||||
|
|
||||||
from flask import Blueprint, request, jsonify
|
|
||||||
from flask_admin import BaseView, expose
|
|
||||||
|
|
||||||
from datetime import datetime
|
|
||||||
import airflow
|
|
||||||
import logging
|
|
||||||
import subprocess
|
|
||||||
import os
|
|
||||||
import socket
|
|
||||||
|
|
||||||
"""
|
|
||||||
CLIs this REST API exposes are Defined here: http://airflow.incubator.apache.org/cli.html
|
|
||||||
"""
|
|
||||||
|
|
||||||
# Location of the REST Endpoint
|
|
||||||
# Note: Changing this will only effect where the messages are posted to on the web interface and will not change where the endpoint actually resides
|
|
||||||
rest_api_endpoint = "/admin/rest_api/api"
|
|
||||||
|
|
||||||
# Getting Versions and Global variables
|
|
||||||
hostname = socket.gethostname()
|
|
||||||
airflow_version = airflow.__version__
|
|
||||||
|
|
||||||
# Getting configurations from airflow.cfg file
|
|
||||||
airflow_webserver_base_url = configuration.get('webserver', 'BASE_URL')
|
|
||||||
airflow_base_log_folder = configuration.get('core', 'BASE_LOG_FOLDER')
|
|
||||||
airflow_dags_folder = configuration.get('core', 'DAGS_FOLDER')
|
|
||||||
log_loading = configuration.getboolean("rest_api_plugin", "LOG_LOADING") if configuration.has_option("rest_api_plugin", "LOG_LOADING") else False
|
|
||||||
filter_loading_messages_in_cli_response = configuration.getboolean("rest_api_plugin", "FILTER_LOADING_MESSAGES_IN_CLI_RESPONSE") if configuration.has_option("rest_api_plugin", "FILTER_LOADING_MESSAGES_IN_CLI_RESPONSE") else True
|
|
||||||
|
|
||||||
# Using UTF-8 Encoding so that response messages don't have any characters in them that can't be handled
|
|
||||||
os.environ['PYTHONIOENCODING'] = 'utf-8'
|
|
||||||
|
|
||||||
|
|
||||||
"""
|
|
||||||
Metadata that defines a single API:
|
|
||||||
{
|
|
||||||
"name": "{string}", # Name of the API (cli command to be executed)
|
|
||||||
"description": "{string}", # Description of the API
|
|
||||||
"airflow_version": "{string}", # Version the API was available in to allow people to better determine if the API is available. (to be displayed on the Admin page)
|
|
||||||
"http_method": "{string}", # HTTP method to use when calling the function. (Default: GET) (Optional)
|
|
||||||
# removed "background_mode": {boolean}
|
|
||||||
"arguments": [ # List of arguments that can be provided to the API
|
|
||||||
{
|
|
||||||
"name": "{string}", # Name of the argument
|
|
||||||
"description": "{string}", # Description of the argument
|
|
||||||
"form_input_type": "{string}", # Type of input to use on the Admin page for the argument
|
|
||||||
"required": {boolean}, # Whether the argument is required upon submission
|
|
||||||
"cli_end_position": {int} # In the case with a CLI command that the arguments value should be appended on to the end (for example: airflow trigger_dag some_dag_id), this is the position that the argument should be provided in the CLI command. (Optional)
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"fixed_arguments": [ # List of arguments that will always be used by the API endpoint and can't be changed
|
|
||||||
{
|
|
||||||
"name": "{string}", # Name of the argument
|
|
||||||
"description": "{string}", # Description of the argument
|
|
||||||
"fixed_value": "{string}" # Fixed value that will always be used
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"post_arguments": [ # List of arguments that can be provided in the POST body to the API
|
|
||||||
{
|
|
||||||
"name": "{string}", # Name of the argument
|
|
||||||
"description": "{string}", # Description of the argument
|
|
||||||
"form_input_type": "{string}", # Type of input to use on the Admin page for the argument
|
|
||||||
"required": {boolean}, # Whether the argument is required upon submission
|
|
||||||
}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
"""
|
|
||||||
|
|
||||||
# Metadata about the APIs and how to call them. Representing them like this allows us to dynamically generate the APIs
|
|
||||||
# in the admin page and dynamically execute them. This also allows us to easily add new ones.
|
|
||||||
# API Object definition is described in the comment block above.
|
|
||||||
apis_metadata = [
|
|
||||||
{
|
|
||||||
"name": "version",
|
|
||||||
"description": "Displays the version of Airflow you're using",
|
|
||||||
"airflow_version": "1.0.0 or greater",
|
|
||||||
"http_method": "GET",
|
|
||||||
"arguments": []
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "render",
|
|
||||||
"description": "Render a task instance's template(s)",
|
|
||||||
"airflow_version": "1.7.0 or greater",
|
|
||||||
"http_method": "GET",
|
|
||||||
"arguments": [
|
|
||||||
{"name": "dag_id", "description": "The id of the dag", "form_input_type": "text", "required": True, "cli_end_position": 1},
|
|
||||||
{"name": "task_id", "description": "The id of the task", "form_input_type": "text", "required": True, "cli_end_position": 2},
|
|
||||||
{"name": "execution_date", "description": "The execution date of the DAG (Example: 2017-01-02T03:04:05)", "form_input_type": "text", "required": True, "cli_end_position": 3},
|
|
||||||
{"name": "subdir", "description": "File location or directory from which to look for the dag", "form_input_type": "text", "required": False}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "variables",
|
|
||||||
"description": "CRUD operations on variables",
|
|
||||||
"airflow_version": "1.7.1 or greater",
|
|
||||||
"http_method": "GET",
|
|
||||||
"arguments": [
|
|
||||||
{"name": "set", "description": "Set a variable. Expected input in the form: KEY VALUE.", "form_input_type": "text", "required": False},
|
|
||||||
{"name": "get", "description": "Get value of a variable", "form_input_type": "text", "required": False},
|
|
||||||
{"name": "json", "description": "Deserialize JSON variable", "form_input_type": "checkbox", "required": False},
|
|
||||||
{"name": "default", "description": "Default value returned if variable does not exist", "form_input_type": "text", "required": False},
|
|
||||||
{"name": "import", "description": "Import variables from JSON file", "form_input_type": "text", "required": False},
|
|
||||||
{"name": "export", "description": "Export variables to JSON file", "form_input_type": "text", "required": False},
|
|
||||||
{"name": "delete", "description": "Delete a variable", "form_input_type": "text", "required": False}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "connections",
|
|
||||||
"description": "List/Add/Delete connections",
|
|
||||||
"airflow_version": "1.8.0 or greater",
|
|
||||||
"http_method": "GET",
|
|
||||||
"arguments": [
|
|
||||||
{"name": "list", "description": "List all connections", "form_input_type": "checkbox", "required": False},
|
|
||||||
{"name": "add", "description": "Add a connection", "form_input_type": "checkbox", "required": False},
|
|
||||||
{"name": "delete", "description": "Delete a connection", "form_input_type": "checkbox", "required": False},
|
|
||||||
{"name": "conn_id", "description": "Connection id, required to add/delete a connection", "form_input_type": "text", "required": False},
|
|
||||||
{"name": "conn_uri", "description": "Connection URI, required to add a connection", "form_input_type": "text", "required": False},
|
|
||||||
{"name": "conn_extra", "description": "Connection 'Extra' field, optional when adding a connection", "form_input_type": "text", "required": False}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "pause",
|
|
||||||
"description": "Pauses a DAG",
|
|
||||||
"airflow_version": "1.7.0 or greater",
|
|
||||||
"http_method": "GET",
|
|
||||||
"arguments": [
|
|
||||||
{"name": "dag_id", "description": "The id of the dag", "form_input_type": "text", "required": True, "cli_end_position": 1},
|
|
||||||
{"name": "subdir", "description": "File location or directory from which to look for the dag", "form_input_type": "text", "required": False}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "unpause",
|
|
||||||
"description": "Unpauses a DAG",
|
|
||||||
"airflow_version": "1.7.0 or greater",
|
|
||||||
"http_method": "GET",
|
|
||||||
"arguments": [
|
|
||||||
{"name": "dag_id", "description": "The id of the dag", "form_input_type": "text", "required": True, "cli_end_position": 1},
|
|
||||||
{"name": "subdir", "description": "File location or directory from which to look for the dag", "form_input_type": "text", "required": False}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "task_failed_deps",
|
|
||||||
"description": "Returns the unmet dependencies for a task instance from the perspective of the scheduler. In other words, why a task instance doesn't get scheduled and then queued by the scheduler, and then run by an executor).",
|
|
||||||
"airflow_version": "1.8.0 or greater",
|
|
||||||
"http_method": "GET",
|
|
||||||
"arguments": [
|
|
||||||
{"name": "dag_id", "description": "The id of the dag", "form_input_type": "text", "required": True, "cli_end_position": 1},
|
|
||||||
{"name": "task_id", "description": "The id of the task", "form_input_type": "text", "required": True, "cli_end_position": 2},
|
|
||||||
{"name": "execution_date", "description": "The execution date of the DAG (Example: 2017-01-02T03:04:05)", "form_input_type": "text", "required": True, "cli_end_position": 3},
|
|
||||||
{"name": "subdir", "description": "File location or directory from which to look for the dag", "form_input_type": "text", "required": False}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "trigger_dag",
|
|
||||||
"description": "Trigger a DAG run",
|
|
||||||
"airflow_version": "1.6.0 or greater",
|
|
||||||
"http_method": "GET",
|
|
||||||
"arguments": [
|
|
||||||
{"name": "dag_id", "description": "The id of the dag", "form_input_type": "text", "required": True, "cli_end_position": 1},
|
|
||||||
{"name": "subdir", "description": "File location or directory from which to look for the dag", "form_input_type": "text", "required": False},
|
|
||||||
{"name": "run_id", "description": "Helps to identify this run", "form_input_type": "text", "required": False},
|
|
||||||
{"name": "conf", "description": "JSON string that gets pickled into the DagRun's conf attribute", "form_input_type": "text", "required": False},
|
|
||||||
{"name": "exec_date", "description": "The execution date of the DAG", "form_input_type": "text", "required": False}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "test",
|
|
||||||
"description": "Test a task instance. This will run a task without checking for dependencies or recording it's state in the database.",
|
|
||||||
"airflow_version": "0.1 or greater",
|
|
||||||
"http_method": "GET",
|
|
||||||
"arguments": [
|
|
||||||
{"name": "dag_id", "description": "The id of the dag", "form_input_type": "text", "required": True, "cli_end_position": 1},
|
|
||||||
{"name": "task_id", "description": "The id of the task", "form_input_type": "text", "required": True, "cli_end_position": 2},
|
|
||||||
{"name": "execution_date", "description": "The execution date of the DAG (Example: 2017-01-02T03:04:05)", "form_input_type": "text", "required": True, "cli_end_position": 3},
|
|
||||||
{"name": "subdir", "description": "File location or directory from which to look for the dag", "form_input_type": "text", "required": False},
|
|
||||||
{"name": "dry_run", "description": "Perform a dry run", "form_input_type": "checkbox", "required": False},
|
|
||||||
{"name": "task_params", "description": "Sends a JSON params dict to the task", "form_input_type": "text", "required": False}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "dag_state",
|
|
||||||
"description": "Get the status of a dag run",
|
|
||||||
"airflow_version": "1.8.0 or greater",
|
|
||||||
"http_method": "GET",
|
|
||||||
"arguments": [
|
|
||||||
{"name": "dag_id", "description": "The id of the dag", "form_input_type": "text", "required": True, "cli_end_position": 1},
|
|
||||||
{"name": "execution_date", "description": "The execution date of the DAG (Example: 2017-01-02T03:04:05)", "form_input_type": "text", "required": True, "cli_end_position": 2},
|
|
||||||
{"name": "subdir", "description": "File location or directory from which to look for the dag", "form_input_type": "text", "required": False}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "run",
|
|
||||||
"description": "Run a single task instance",
|
|
||||||
"airflow_version": "1.0.0 or greater",
|
|
||||||
"http_method": "GET",
|
|
||||||
"arguments": [
|
|
||||||
{"name": "dag_id", "description": "The id of the dag", "form_input_type": "text", "required": True, "cli_end_position": 1},
|
|
||||||
{"name": "task_id", "description": "The id of the task", "form_input_type": "text", "required": True, "cli_end_position": 2},
|
|
||||||
{"name": "execution_date", "description": "The execution date of the DAG (Example: 2017-01-02T03:04:05)", "form_input_type": "text", "required": True, "cli_end_position": 3},
|
|
||||||
{"name": "subdir", "description": "File location or directory from which to look for the dag", "form_input_type": "text", "required": False},
|
|
||||||
{"name": "mark_success", "description": "Mark jobs as succeeded without running them", "form_input_type": "checkbox", "required": False},
|
|
||||||
{"name": "force", "description": "Ignore previous task instance state, rerun regardless if task already succeede", "form_input_type": "checkbox", "required": False},
|
|
||||||
{"name": "pool", "description": "Resource pool to use", "form_input_type": "text", "required": False},
|
|
||||||
{"name": "cfg_path", "description": "Path to config file to use instead of airflow.cfg", "form_input_type": "text", "required": False},
|
|
||||||
{"name": "local", "description": "Run the task using the LocalExecutor", "form_input_type": "checkbox", "required": False},
|
|
||||||
{"name": "ignore_all_dependencies", "description": "Ignores all non-critical dependencies, including ignore_ti_state and ignore_task_depsstore_true", "form_input_type": "checkbox", "required": False},
|
|
||||||
{"name": "ignore_dependencies", "description": "Ignore task-specific dependencies, e.g. upstream, depends_on_past, and retry delay dependencies", "form_input_type": "checkbox", "required": False},
|
|
||||||
{"name": "ignore_depends_on_past", "description": "Ignore depends_on_past dependencies (but respect upstream dependencies)", "form_input_type": "checkbox", "required": False},
|
|
||||||
{"name": "ship_dag", "description": "Pickles (serializes) the DAG and ships it to the worker", "form_input_type": "checkbox", "required": False},
|
|
||||||
{"name": "pickle", "description": "Serialized pickle object of the entire dag (used internally)", "form_input_type": "text", "required": False},
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "list_tasks",
|
|
||||||
"description": "List the tasks within a DAG",
|
|
||||||
"airflow_version": "0.1 or greater",
|
|
||||||
"http_method": "GET",
|
|
||||||
"arguments": [
|
|
||||||
{"name": "dag_id", "description": "The id of the dag", "form_input_type": "text", "required": True, "cli_end_position": 1},
|
|
||||||
{"name": "tree", "description": "Tree view", "form_input_type": "checkbox", "required": False},
|
|
||||||
{"name": "subdir", "description": "File location or directory from which to look for the dag", "form_input_type": "text", "required": False}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "backfill",
|
|
||||||
"description": "Run subsections of a DAG for a specified date range",
|
|
||||||
"airflow_version": "0.1 or greater",
|
|
||||||
"http_method": "GET",
|
|
||||||
"arguments": [
|
|
||||||
{"name": "dag_id", "description": "The id of the dag", "form_input_type": "text", "required": True, "cli_end_position": 1},
|
|
||||||
{"name": "task_regex", "description": "The regex to filter specific task_ids to backfill (optional)", "form_input_type": "text", "required": False},
|
|
||||||
{"name": "start_date", "description": "Override start_date YYYY-MM-DD. Either this or the end_date needs to be provided.", "form_input_type": "text", "required": False},
|
|
||||||
{"name": "end_date", "description": "Override end_date YYYY-MM-DD. Either this or the start_date needs to be provided.", "form_input_type": "text", "required": False},
|
|
||||||
{"name": "mark_success", "description": "Mark jobs as succeeded without running them", "form_input_type": "checkbox", "required": False},
|
|
||||||
{"name": "local", "description": "Run the task using the LocalExecutor", "form_input_type": "checkbox", "required": False},
|
|
||||||
{"name": "donot_pickle", "description": "Do not attempt to pickle the DAG object to send over to the workers, just tell the workers to run their version of the code.", "form_input_type": "checkbox", "required": False},
|
|
||||||
{"name": "include_adhoc", "description": "Include dags with the adhoc argument.", "form_input_type": "checkbox", "required": False},
|
|
||||||
{"name": "ignore_dependencies", "description": "Ignore task-specific dependencies, e.g. upstream, depends_on_past, and retry delay dependencies", "form_input_type": "checkbox", "required": False},
|
|
||||||
{"name": "ignore_first_depends_on_past", "description": "Ignores depends_on_past dependencies for the first set of tasks only (subsequent executions in the backfill DO respect depends_on_past).", "form_input_type": "checkbox", "required": False},
|
|
||||||
{"name": "subdir", "description": "File location or directory from which to look for the dag", "form_input_type": "text", "required": False},
|
|
||||||
{"name": "pool", "description": "Resource pool to use", "form_input_type": "text", "required": False},
|
|
||||||
{"name": "dry_run", "description": "Perform a dry run", "form_input_type": "checkbox", "required": False}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "list_dags",
|
|
||||||
"description": "List all the DAGs",
|
|
||||||
"airflow_version": "0.1 or greater",
|
|
||||||
"http_method": "GET",
|
|
||||||
"arguments": [
|
|
||||||
{"name": "subdir", "description": "File location or directory from which to look for the dag", "form_input_type": "text", "required": False},
|
|
||||||
{"name": "report", "description": "Show DagBag loading report", "form_input_type": "checkbox", "required": False}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "task_state",
|
|
||||||
"description": "Get the status of a task instance",
|
|
||||||
"airflow_version": "1.0.0 or greater",
|
|
||||||
"http_method": "GET",
|
|
||||||
"arguments": [
|
|
||||||
{"name": "dag_id", "description": "The id of the dag", "form_input_type": "text", "required": True, "cli_end_position": 1},
|
|
||||||
{"name": "task_id", "description": "The id of the task", "form_input_type": "text", "required": True, "cli_end_position": 2},
|
|
||||||
{"name": "execution_date", "description": "The execution date of the DAG (Example: 2017-01-02T03:04:05)", "form_input_type": "text", "required": True, "cli_end_position": 3},
|
|
||||||
{"name": "subdir", "description": "File location or directory from which to look for the dag", "form_input_type": "text", "required": False}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "pool",
|
|
||||||
"description": "CRUD operations on pools",
|
|
||||||
"airflow_version": "1.8.0 or greater",
|
|
||||||
"http_method": "GET",
|
|
||||||
"arguments": [
|
|
||||||
{"name": "set", "description": "Set pool slot count and description, respectively. Expected input in the form: NAME SLOT_COUNT POOL_DESCRIPTION.", "form_input_type": "text", "required": False},
|
|
||||||
{"name": "get", "description": "Get pool info", "form_input_type": "text", "required": False},
|
|
||||||
{"name": "delete", "description": "Delete a pool", "form_input_type": "text", "required": False}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
# removed retrieval of logs from this interface, as it uses a background
|
|
||||||
# mode call, we don't use it, and lets us remove a security vulnerability
|
|
||||||
{
|
|
||||||
"name": "clear",
|
|
||||||
"description": "Clear a set of task instance, as if they never ran",
|
|
||||||
"airflow_version": "0.1 or greater",
|
|
||||||
"http_method": "GET",
|
|
||||||
"arguments": [
|
|
||||||
{"name": "dag_id", "description": "The id of the dag", "form_input_type": "text", "required": True, "cli_end_position": 1},
|
|
||||||
{"name": "task_regex", "description": "The regex to filter specific task_ids to backfill (optional)", "form_input_type": "text", "required": False},
|
|
||||||
{"name": "start_date", "description": "Override start_date YYYY-MM-DD", "form_input_type": "text", "required": False},
|
|
||||||
{"name": "end_date", "description": "Override end_date YYYY-MM-DD", "form_input_type": "text", "required": False},
|
|
||||||
{"name": "subdir", "description": "File location or directory from which to look for the dag", "form_input_type": "text", "required": False},
|
|
||||||
{"name": "upstream", "description": "Include upstream tasks", "form_input_type": "checkbox", "required": False},
|
|
||||||
{"name": "downstream", "description": "Include downstream tasks", "form_input_type": "checkbox", "required": False},
|
|
||||||
{"name": "only_failed", "description": "Only failed jobs", "form_input_type": "checkbox", "required": False},
|
|
||||||
{"name": "only_running", "description": "Only running jobs", "form_input_type": "checkbox", "required": False},
|
|
||||||
{"name": "exclude_subdags", "description": "Exclude subdags", "form_input_type": "checkbox", "required": False}
|
|
||||||
],
|
|
||||||
"fixed_arguments": [
|
|
||||||
{"name": "no_confirm", "description": "Do not request confirmation", "fixed_value": ""}
|
|
||||||
],
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "deploy_dag",
|
|
||||||
"description": "Deploy a new DAG File to the DAGs directory",
|
|
||||||
"airflow_version": "None - Custom API",
|
|
||||||
"http_method": "POST",
|
|
||||||
"post_body_description": "dag_file - POST Body Element - REQUIRED",
|
|
||||||
"form_enctype": "multipart/form-data",
|
|
||||||
"arguments": [],
|
|
||||||
"post_arguments": [
|
|
||||||
{"name": "dag_file", "description": "Python file to upload and deploy", "form_input_type": "file", "required": True},
|
|
||||||
{"name": "force", "description": "Whether to forcefully upload the file if the file already exists or not", "form_input_type": "checkbox", "required": False},
|
|
||||||
{"name": "pause", "description": "The DAG will be forced to be paused when created and override the 'dags_are_paused_at_creation' config.", "form_input_type": "checkbox", "required": False},
|
|
||||||
{"name": "unpause", "description": "The DAG will be forced to be unpaused when created and override the 'dags_are_paused_at_creation' config.", "form_input_type": "checkbox", "required": False}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "refresh_dag",
|
|
||||||
"description": "Refresh a DAG in the Web Server",
|
|
||||||
"airflow_version": "None - Custom API",
|
|
||||||
"http_method": "GET",
|
|
||||||
"arguments": [
|
|
||||||
{"name": "dag_id", "description": "The id of the dag", "form_input_type": "text", "required": True}
|
|
||||||
]
|
|
||||||
}
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
# Utility for creating the REST Responses
|
|
||||||
class REST_API_Response_Util():
|
|
||||||
|
|
||||||
# Gets the Base Response object with all required response fields included. To be used at the beginning of the REST Call.
|
|
||||||
@staticmethod
|
|
||||||
def get_base_response(status="OK", http_response_code=200, call_time=datetime.now(), include_arguments=True):
|
|
||||||
base_response = {"status": status, "http_response_code": http_response_code, "call_time": call_time}
|
|
||||||
if include_arguments:
|
|
||||||
base_response["arguments"] = request.args
|
|
||||||
base_response["post_arguments"] = request.form
|
|
||||||
return base_response
|
|
||||||
|
|
||||||
# Finalize the Base Response with additional data
|
|
||||||
@staticmethod
|
|
||||||
def _get_final_response(base_response, output=None, airflow_cmd=None, http_response_code=None, warning=None):
|
|
||||||
final_response = base_response
|
|
||||||
final_response["response_time"] = datetime.now()
|
|
||||||
if output:
|
|
||||||
final_response["output"] = output
|
|
||||||
if airflow_cmd:
|
|
||||||
final_response["airflow_cmd"] = airflow_cmd
|
|
||||||
if http_response_code:
|
|
||||||
final_response["http_response_code"] = http_response_code
|
|
||||||
if warning:
|
|
||||||
final_response["warning"] = warning
|
|
||||||
return jsonify(final_response)
|
|
||||||
|
|
||||||
# Set the Base Response as a 200 HTTP Response object
|
|
||||||
@staticmethod
|
|
||||||
def get_200_response(base_response, output=None, airflow_cmd=None, warning=None):
|
|
||||||
logging.info("Returning a 200 Response Code with response '" + str(output) + "'")
|
|
||||||
return REST_API_Response_Util._get_final_response(base_response=base_response, output=output, airflow_cmd=airflow_cmd, warning=warning)
|
|
||||||
|
|
||||||
# Set the Base Response and an Error
|
|
||||||
@staticmethod
|
|
||||||
def _get_error_response(base_response, error_code, output=None):
|
|
||||||
base_response["status"] = "ERROR"
|
|
||||||
return REST_API_Response_Util._get_final_response(base_response=base_response, output=output, http_response_code=error_code), error_code
|
|
||||||
|
|
||||||
# Set the Base Response as a 400 HTTP Response object
|
|
||||||
@staticmethod
|
|
||||||
def get_400_error_response(base_response, output=None):
|
|
||||||
logging.warning("Returning a 400 Response Code with response '" + str(output) + "'")
|
|
||||||
return REST_API_Response_Util._get_error_response(base_response, 400, output)
|
|
||||||
|
|
||||||
# Set the Base Response as a 403 HTTP Response object
|
|
||||||
@staticmethod
|
|
||||||
def get_403_error_response(base_response, output=None):
|
|
||||||
logging.warning("Returning a 403 Response Code with response '" + str(output) + "'")
|
|
||||||
return REST_API_Response_Util._get_error_response(base_response, 403, output)
|
|
||||||
|
|
||||||
# Set the Base Response as a 500 HTTP Response object
|
|
||||||
@staticmethod
|
|
||||||
def get_500_error_response(base_response, output=None):
|
|
||||||
logging.warning("Returning a 500 Response Code with response '" + str(output) + "'")
|
|
||||||
return REST_API_Response_Util._get_error_response(base_response, 500, output)
|
|
||||||
|
|
||||||
|
|
||||||
# REST_API View which extends the flask_admin BaseView
|
|
||||||
class REST_API(BaseView):
|
|
||||||
|
|
||||||
# Checks a string object to see if it is none or empty so we can determine if an argument (passed to the rest api) is provided
|
|
||||||
@staticmethod
|
|
||||||
def is_arg_not_provided(arg):
|
|
||||||
return arg is None or arg == ""
|
|
||||||
|
|
||||||
# Get the DagBag which has a list of all the current Dags
|
|
||||||
@staticmethod
|
|
||||||
def get_dagbag():
|
|
||||||
return DagBag()
|
|
||||||
|
|
||||||
# '/' Endpoint where the Admin page is which allows you to view the APIs available and trigger them
|
|
||||||
@expose('/')
|
|
||||||
def index(self):
|
|
||||||
logging.info("REST_API.index() called")
|
|
||||||
|
|
||||||
# get the information that we want to display on the page regarding the dags that are available
|
|
||||||
dagbag = self.get_dagbag()
|
|
||||||
dags = []
|
|
||||||
for dag_id in dagbag.dags:
|
|
||||||
orm_dag = DagModel.get_current(dag_id)
|
|
||||||
dags.append({
|
|
||||||
"dag_id": dag_id,
|
|
||||||
"is_active": (not orm_dag.is_paused) if orm_dag is not None else False
|
|
||||||
})
|
|
||||||
|
|
||||||
return self.render("rest_api_plugin/index.html",
|
|
||||||
dags=dags,
|
|
||||||
airflow_webserver_base_url=airflow_webserver_base_url,
|
|
||||||
rest_api_endpoint=rest_api_endpoint,
|
|
||||||
apis_metadata=apis_metadata,
|
|
||||||
airflow_version=airflow_version,
|
|
||||||
)
|
|
||||||
|
|
||||||
# '/api' REST Endpoint where API requests should all come in
|
|
||||||
@csrf.exempt # Exempt the CSRF token
|
|
||||||
@expose('/api', methods=["GET", "POST"])
|
|
||||||
def api(self):
|
|
||||||
base_response = REST_API_Response_Util.get_base_response()
|
|
||||||
|
|
||||||
# Get the api that you want to execute
|
|
||||||
api = request.args.get('api')
|
|
||||||
if api is not None:
|
|
||||||
api = api.strip().lower()
|
|
||||||
logging.info("REST_API.api() called (api: " + str(api) + ")")
|
|
||||||
|
|
||||||
# Validate that the API is provided
|
|
||||||
if self.is_arg_not_provided(api):
|
|
||||||
logging.warning("api argument not provided")
|
|
||||||
return REST_API_Response_Util.get_400_error_response(base_response, "API should be provided")
|
|
||||||
|
|
||||||
# Get the api_metadata from the api object list that correcsponds to the api we want to run to get the metadata.
|
|
||||||
api_metadata = None
|
|
||||||
for test_api_metadata in apis_metadata:
|
|
||||||
if test_api_metadata["name"] == api:
|
|
||||||
api_metadata = test_api_metadata
|
|
||||||
if api_metadata is None:
|
|
||||||
logging.info("api '" + str(api) + "' was not found in the apis list in the REST API Plugin")
|
|
||||||
return REST_API_Response_Util.get_400_error_response(base_response, "API '" + str(api) + "' was not found")
|
|
||||||
|
|
||||||
# check if all the required arguments are provided
|
|
||||||
missing_required_arguments = []
|
|
||||||
dag_id = None
|
|
||||||
for argument in api_metadata["arguments"]:
|
|
||||||
argument_name = argument["name"]
|
|
||||||
argument_value = request.args.get(argument_name)
|
|
||||||
if argument["required"]:
|
|
||||||
if self.is_arg_not_provided(argument_value):
|
|
||||||
missing_required_arguments.append(argument_name)
|
|
||||||
if argument_name == "dag_id" and argument_value is not None:
|
|
||||||
dag_id = argument_value.strip()
|
|
||||||
if len(missing_required_arguments) > 0:
|
|
||||||
logging.warning("Missing required arguments: " + str(missing_required_arguments))
|
|
||||||
return REST_API_Response_Util.get_400_error_response(base_response, "The argument(s) " + str(missing_required_arguments) + " are required")
|
|
||||||
|
|
||||||
# Check to make sure that the DAG you're referring to, already exists.
|
|
||||||
dag_bag = self.get_dagbag()
|
|
||||||
if dag_id is not None and dag_id not in dag_bag.dags:
|
|
||||||
logging.info("DAG_ID '" + str(dag_id) + "' was not found in the DagBag list '" + str(dag_bag.dags) + "'")
|
|
||||||
return REST_API_Response_Util.get_400_error_response(base_response, "The DAG ID '" + str(dag_id) + "' does not exist")
|
|
||||||
|
|
||||||
# Deciding which function to use based off the API object that was requested. Some functions are custom and need to be manually routed to.
|
|
||||||
if api == "version":
|
|
||||||
final_response = self.version(base_response)
|
|
||||||
elif api == "rest_api_plugin_version":
|
|
||||||
final_response = self.rest_api_plugin_version(base_response)
|
|
||||||
elif api == "deploy_dag":
|
|
||||||
final_response = self.deploy_dag(base_response)
|
|
||||||
elif api == "refresh_dag":
|
|
||||||
final_response = self.refresh_dag(base_response)
|
|
||||||
else:
|
|
||||||
final_response = self.execute_cli(base_response, api_metadata)
|
|
||||||
|
|
||||||
return final_response
|
|
||||||
|
|
||||||
# General execution of a CLI command
|
|
||||||
# A command will be assembled and then passed to the OS as a commandline function and the results will be returned
|
|
||||||
def execute_cli(self, base_response, api_metadata):
|
|
||||||
logging.info("Executing cli function")
|
|
||||||
|
|
||||||
# getting the largest cli_end_position in the api_metadata object so that the cli function can be assembled
|
|
||||||
largest_end_argument_value = 0
|
|
||||||
for argument in api_metadata.get("arguments", []):
|
|
||||||
if argument.get("cli_end_position") is not None and argument["cli_end_position"] > largest_end_argument_value:
|
|
||||||
largest_end_argument_value = argument["cli_end_position"]
|
|
||||||
|
|
||||||
# starting to create the airflow_cmd function
|
|
||||||
airflow_cmd_split = ["airflow", api_metadata["name"]]
|
|
||||||
|
|
||||||
# appending arguments to the airflow_cmd_split array and setting arguments aside in the end_arguments array to be appended onto the end of airflow_cmd_split
|
|
||||||
end_arguments = [0] * largest_end_argument_value
|
|
||||||
for argument in api_metadata["arguments"]:
|
|
||||||
argument_name = argument["name"]
|
|
||||||
argument_value = request.args.get(argument_name)
|
|
||||||
logging.info("argument_name: " + str(argument_name) + ", argument_value: " + str(argument_value))
|
|
||||||
if argument_value is not None:
|
|
||||||
# if the argument should be appended onto the end, find the position and add it to the end_arguments array
|
|
||||||
if "cli_end_position" in argument:
|
|
||||||
logging.info("argument['cli_end_position']: " + str(argument['cli_end_position']))
|
|
||||||
end_arguments[argument["cli_end_position"]-1] = argument_value
|
|
||||||
else:
|
|
||||||
airflow_cmd_split.extend(["--" + argument_name])
|
|
||||||
if argument["form_input_type"] is not "checkbox":
|
|
||||||
airflow_cmd_split.extend(argument_value.split(" "))
|
|
||||||
else:
|
|
||||||
logging.warning("argument_value is null")
|
|
||||||
|
|
||||||
# appending fixed arguments that should always be provided to the APIs
|
|
||||||
for fixed_argument in api_metadata.get("fixed_arguments", []):
|
|
||||||
fixed_argument_name = fixed_argument["name"]
|
|
||||||
fixed_argument_value = fixed_argument.get("fixed_value")
|
|
||||||
logging.info("fixed_argument_name: " + str(fixed_argument_name) + ", fixed_argument_value: " + str(fixed_argument_value))
|
|
||||||
if fixed_argument_value is not None:
|
|
||||||
airflow_cmd_split.extend(["--" + fixed_argument_name])
|
|
||||||
if fixed_argument_value:
|
|
||||||
airflow_cmd_split.extend(fixed_argument_value.split(" "))
|
|
||||||
|
|
||||||
# appending the end_arguments to the very end
|
|
||||||
airflow_cmd_split.extend(end_arguments)
|
|
||||||
|
|
||||||
# joining all the individual arguments and components into a single string
|
|
||||||
airflow_cmd = " ".join(airflow_cmd_split)
|
|
||||||
|
|
||||||
logging.info("airflow_cmd array: " + str(airflow_cmd_split))
|
|
||||||
logging.info("airflow_cmd: " + str(airflow_cmd))
|
|
||||||
|
|
||||||
output = self.execute_cli_command(airflow_cmd_split)
|
|
||||||
|
|
||||||
# if desired, filter out the loading messages to reduce the noise in the output
|
|
||||||
if filter_loading_messages_in_cli_response:
|
|
||||||
logging.info("Filtering Loading Messages from the CLI Response")
|
|
||||||
output = self.filter_loading_messages(output)
|
|
||||||
|
|
||||||
return REST_API_Response_Util.get_200_response(base_response=base_response, output=output, airflow_cmd=airflow_cmd)
|
|
||||||
|
|
||||||
# Custom function for the version API
|
|
||||||
def version(self, base_response):
|
|
||||||
logging.info("Executing custom 'version' function")
|
|
||||||
return REST_API_Response_Util.get_200_response(base_response, airflow_version)
|
|
||||||
|
|
||||||
# Custom Function for the deploy_dag API
|
|
||||||
def deploy_dag(self, base_response):
|
|
||||||
logging.info("Executing custom 'deploy_dag' function")
|
|
||||||
|
|
||||||
if 'dag_file' not in request.files or request.files['dag_file'].filename == '': # check if the post request has the file part
|
|
||||||
logging.warning("The dag_file argument wasn't provided")
|
|
||||||
return REST_API_Response_Util.get_400_error_response(base_response, "dag_file should be provided")
|
|
||||||
dag_file = request.files['dag_file']
|
|
||||||
|
|
||||||
force = True if request.form.get('force') is not None else False
|
|
||||||
logging.info("deploy_dag force upload: " + str(force))
|
|
||||||
|
|
||||||
pause = True if request.form.get('pause') is not None else False
|
|
||||||
logging.info("deploy_dag in pause state: " + str(pause))
|
|
||||||
|
|
||||||
unpause = True if request.form.get('unpause') is not None else False
|
|
||||||
logging.info("deploy_dag in unpause state: " + str(unpause))
|
|
||||||
|
|
||||||
# make sure that the dag_file is a python script
|
|
||||||
if dag_file and dag_file.filename.endswith(".py"):
|
|
||||||
save_file_path = os.path.join(airflow_dags_folder, dag_file.filename)
|
|
||||||
|
|
||||||
# Check if the file already exists.
|
|
||||||
if os.path.isfile(save_file_path) and not force:
|
|
||||||
logging.warning("File to upload already exists")
|
|
||||||
return REST_API_Response_Util.get_400_error_response(base_response, "The file '" + save_file_path + "' already exists on host '" + hostname + "'.")
|
|
||||||
|
|
||||||
logging.info("Saving file to '" + save_file_path + "'")
|
|
||||||
dag_file.save(save_file_path)
|
|
||||||
|
|
||||||
else:
|
|
||||||
logging.warning("deploy_dag file is not a python file. It does not end with a .py.")
|
|
||||||
return REST_API_Response_Util.get_400_error_response(base_response, "dag_file is not a *.py file")
|
|
||||||
|
|
||||||
warning = None
|
|
||||||
# if both the pause and unpause options are provided then skip the pausing and unpausing phase
|
|
||||||
if not (pause and unpause):
|
|
||||||
if pause or unpause:
|
|
||||||
try:
|
|
||||||
# import the DAG file that was uploaded so that we can get the DAG_ID to execute the command to pause or unpause it
|
|
||||||
import imp
|
|
||||||
dag_file = imp.load_source('module.name', save_file_path)
|
|
||||||
dag_id = dag_file.dag.dag_id
|
|
||||||
|
|
||||||
# run the pause or unpause cli command
|
|
||||||
airflow_cmd_split = []
|
|
||||||
if pause:
|
|
||||||
airflow_cmd_split = ["airflow", "pause", dag_id]
|
|
||||||
if unpause:
|
|
||||||
airflow_cmd_split = ["airflow", "unpause", dag_id]
|
|
||||||
cli_output = self.execute_cli_command(airflow_cmd_split)
|
|
||||||
except Exception as e:
|
|
||||||
warning = "Failed to set the state (pause, unpause) of the DAG: " + str(e)
|
|
||||||
logging.warning(warning)
|
|
||||||
else:
|
|
||||||
warning = "Both options pause and unpause were given. Skipping setting the state (pause, unpause) of the DAG."
|
|
||||||
logging.warning(warning)
|
|
||||||
|
|
||||||
return REST_API_Response_Util.get_200_response(base_response=base_response, output="DAG File [{}] has been uploaded".format(dag_file), warning=warning)
|
|
||||||
|
|
||||||
# Custom Function for the refresh_dag API
|
|
||||||
# This will call the direct function corresponding to the web endpoint '/admin/airflow/refresh' that already exists in Airflow
|
|
||||||
def refresh_dag(self, base_response):
|
|
||||||
logging.info("Executing custom 'refresh_dag' function")
|
|
||||||
dag_id = request.args.get('dag_id')
|
|
||||||
logging.info("dag_id to refresh: '" + str(dag_id) + "'")
|
|
||||||
if self.is_arg_not_provided(dag_id):
|
|
||||||
return REST_API_Response_Util.get_400_error_response(base_response, "dag_id should be provided")
|
|
||||||
elif " " in dag_id:
|
|
||||||
return REST_API_Response_Util.get_400_error_response(base_response, "dag_id contains spaces and is therefore an illegal argument")
|
|
||||||
|
|
||||||
try:
|
|
||||||
from airflow.www.views import Airflow
|
|
||||||
# NOTE: The request argument 'dag_id' is required for the refresh() function to get the dag_id
|
|
||||||
refresh_result = Airflow().refresh()
|
|
||||||
logging.info("Refresh Result: " + str(refresh_result))
|
|
||||||
except Exception as e:
|
|
||||||
error_message = "An error occurred while trying to Refresh the DAG '" + str(dag_id) + "': " + str(e)
|
|
||||||
logging.error(error_message)
|
|
||||||
return REST_API_Response_Util.get_500_error_response(base_response, error_message)
|
|
||||||
|
|
||||||
return REST_API_Response_Util.get_200_response(base_response=base_response, output="DAG [{}] is now fresh as a daisy".format(dag_id))
|
|
||||||
|
|
||||||
# General execution of the airflow command passed to it and returns the response
|
|
||||||
@staticmethod
|
|
||||||
def execute_cli_command(airflow_cmd_split):
|
|
||||||
logging.info("Executing CLI Command")
|
|
||||||
|
|
||||||
# There is a need to handle this case separately as the current implementation
|
|
||||||
# breaks the JSON string into multiple parts in a List and this cause the Airflow
|
|
||||||
# command to stop working properly
|
|
||||||
#
|
|
||||||
# The idea here is to handle the JSON string separately and to make use of the
|
|
||||||
# fact that the command will take on the following pattern:
|
|
||||||
#
|
|
||||||
# `airflow trigger_dag --config '{"message": "Hello World"}' test_1`
|
|
||||||
#
|
|
||||||
# , where test_1 is the name of the Dag
|
|
||||||
#
|
|
||||||
if airflow_cmd_split[2] == '--conf':
|
|
||||||
# Initialize list x and extract the JSON string
|
|
||||||
# from the airflow_cmd_split List
|
|
||||||
x = []
|
|
||||||
for i in range(3, len(airflow_cmd_split) - 1):
|
|
||||||
x.append(airflow_cmd_split[i])
|
|
||||||
|
|
||||||
# Initialize list y
|
|
||||||
y = [None] * 5
|
|
||||||
|
|
||||||
# Assign values to list y
|
|
||||||
for j in range(0, 3):
|
|
||||||
y[j] = airflow_cmd_split[j]
|
|
||||||
|
|
||||||
# Create string using list x and assigns to y[3]
|
|
||||||
y[3] = " ".join(x)
|
|
||||||
|
|
||||||
# Dag name will always be airflow_cmd_split[-1]
|
|
||||||
y[4] = airflow_cmd_split[-1]
|
|
||||||
|
|
||||||
# Assigns updated values to airflow_cmd_split
|
|
||||||
airflow_cmd_split = y
|
|
||||||
|
|
||||||
logging.info(airflow_cmd_split)
|
|
||||||
|
|
||||||
process = subprocess.Popen(airflow_cmd_split, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
||||||
process.wait()
|
|
||||||
return REST_API.collect_process_output(process)
|
|
||||||
|
|
||||||
# gets and empty object that has all the fields a CLI function would have in it.
|
|
||||||
@staticmethod
|
|
||||||
def get_empty_process_output():
|
|
||||||
return {
|
|
||||||
"stderr": "",
|
|
||||||
"stdin": "",
|
|
||||||
"stdout": ""
|
|
||||||
}
|
|
||||||
|
|
||||||
# Get the output of the CLI process and package it in a dict
|
|
||||||
@staticmethod
|
|
||||||
def collect_process_output(process):
|
|
||||||
output = REST_API.get_empty_process_output()
|
|
||||||
if process.stderr is not None:
|
|
||||||
output["stderr"] = ""
|
|
||||||
for line in process.stderr.readlines():
|
|
||||||
output["stderr"] += str(line)
|
|
||||||
if process.stdin is not None:
|
|
||||||
output["stdin"] = ""
|
|
||||||
for line in process.stdin.readlines():
|
|
||||||
output["stdin"] += str(line)
|
|
||||||
if process.stdout is not None:
|
|
||||||
output["stdout"] = ""
|
|
||||||
for line in process.stdout.readlines():
|
|
||||||
output["stdout"] += str(line)
|
|
||||||
logging.info("RestAPI Output: " + str(output))
|
|
||||||
return output
|
|
||||||
|
|
||||||
# Filtering out logging statements from the standard output
|
|
||||||
# Content like:
|
|
||||||
#
|
|
||||||
# [2017-04-19 10:04:34,927] {__init__.py:36} INFO - Using executor CeleryExecutor
|
|
||||||
# [2017-04-19 10:04:35,926] {models.py:154} INFO - Filling up the DagBag from /Users/...
|
|
||||||
@staticmethod
|
|
||||||
def filter_loading_messages(output):
|
|
||||||
stdout = output["stdout"]
|
|
||||||
new_stdout_array = stdout.split("\n")
|
|
||||||
content_to_remove_greatest_index = 0
|
|
||||||
for index, content in enumerate(new_stdout_array):
|
|
||||||
if content.startswith("["):
|
|
||||||
content_to_remove_greatest_index = index
|
|
||||||
content_to_remove_greatest_index += 1
|
|
||||||
if len(new_stdout_array) > content_to_remove_greatest_index:
|
|
||||||
new_stdout_array = new_stdout_array[content_to_remove_greatest_index:]
|
|
||||||
output["stdout"] = "\n".join(new_stdout_array)
|
|
||||||
return output
|
|
||||||
|
|
||||||
# Creating View to be used by Plugin
|
|
||||||
rest_api_view = REST_API(category="Admin", name="REST API Plugin")
|
|
||||||
|
|
||||||
# Creating Blueprint
|
|
||||||
rest_api_bp = Blueprint(
|
|
||||||
"rest_api_bp",
|
|
||||||
__name__,
|
|
||||||
template_folder='templates',
|
|
||||||
static_folder='static',
|
|
||||||
static_url_path='/static/'
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# Creating the REST_API_Plugin which extends the AirflowPlugin so its imported into Airflow
|
|
||||||
class REST_API_Plugin(AirflowPlugin):
|
|
||||||
name = "rest_api"
|
|
||||||
operators = []
|
|
||||||
flask_blueprints = [rest_api_bp]
|
|
||||||
hooks = []
|
|
||||||
executors = []
|
|
||||||
admin_views = [rest_api_view]
|
|
||||||
menu_links = []
|
|
|
@ -450,20 +450,16 @@ def test_audit_control_command_db(mock_insert_action_audit):
|
||||||
def test_invoke_airflow_dag_success(mock_info, mock_exhume_date):
|
def test_invoke_airflow_dag_success(mock_info, mock_exhume_date):
|
||||||
act_resource = ActionsResource()
|
act_resource = ActionsResource()
|
||||||
dag_id = 'test_dag_id'
|
dag_id = 'test_dag_id'
|
||||||
action = 'test_action'
|
action = {'id': '123', 'user': 'unittester'}
|
||||||
CONF = cfg.CONF
|
CONF = cfg.CONF
|
||||||
web_server_url = CONF.base.web_server
|
web_server_url = CONF.base.web_server
|
||||||
conf_value = {'action': action}
|
conf_value = {'action': action}
|
||||||
log_string = 'Created <DagRun deploy_site @ 2017-09-22 22:16:14: man'
|
log_string = 'Created <DagRun deploy_site @ 2017-09-22 22:16:14: man'
|
||||||
responses.add(
|
responses.add(
|
||||||
method='GET',
|
method='POST',
|
||||||
url='{}admin/rest_api/api?api=trigger_dag&dag_id={}&conf={}'.format(
|
url='{}api/experimental/dags/{}/dag_runs'.format(
|
||||||
web_server_url, dag_id, act_resource.to_json(conf_value)),
|
web_server_url, dag_id),
|
||||||
body=json.dumps({
|
body=json.dumps({'message': log_string}),
|
||||||
'output': {
|
|
||||||
'stdout': log_string
|
|
||||||
}
|
|
||||||
}),
|
|
||||||
status=200,
|
status=200,
|
||||||
content_type='application/json')
|
content_type='application/json')
|
||||||
|
|
||||||
|
@ -477,14 +473,13 @@ def test_invoke_airflow_dag_success(mock_info, mock_exhume_date):
|
||||||
def test_invoke_airflow_dag_errors(mock_info):
|
def test_invoke_airflow_dag_errors(mock_info):
|
||||||
act_resource = ActionsResource()
|
act_resource = ActionsResource()
|
||||||
dag_id = 'test_dag_id'
|
dag_id = 'test_dag_id'
|
||||||
action = 'test_action'
|
action = {'id': '123', 'user': 'unittester'}
|
||||||
web_server_url = CONF.base.web_server
|
web_server_url = CONF.base.web_server
|
||||||
conf_value = {'action': action}
|
conf_value = {'action': action}
|
||||||
responses.add(
|
responses.add(
|
||||||
method='GET',
|
method='POST',
|
||||||
url='{}admin/rest_api/api?api=trigger_dag&dag_id={}'
|
url='{}api/experimental/dags/{}/dag_runs'.format(
|
||||||
'&conf={}'.format(web_server_url, dag_id,
|
web_server_url, dag_id),
|
||||||
act_resource.to_json(conf_value)),
|
|
||||||
body=json.dumps({
|
body=json.dumps({
|
||||||
"error": "not found"
|
"error": "not found"
|
||||||
}),
|
}),
|
||||||
|
|
|
@ -35,11 +35,9 @@ commands = flake8 {posargs}
|
||||||
|
|
||||||
[testenv:bandit]
|
[testenv:bandit]
|
||||||
skipsdist=True
|
skipsdist=True
|
||||||
# NOTE(Bryan Strassner) ignoring airflow plugin which uses a subexec
|
|
||||||
commands =
|
commands =
|
||||||
bandit \
|
bandit \
|
||||||
-r shipyard_airflow \
|
-r shipyard_airflow \
|
||||||
-x plugins/rest_api_plugin.py \
|
|
||||||
-n 5
|
-n 5
|
||||||
|
|
||||||
[testenv:genconfig]
|
[testenv:genconfig]
|
||||||
|
@ -66,4 +64,4 @@ filename = *.py
|
||||||
ignore = F841, H101, H201, H210, H238, H301, H304, H306, H401, H403, H404, H405
|
ignore = F841, H101, H201, H210, H238, H301, H304, H306, H401, H403, H404, H405
|
||||||
# NOTE(Bryan Strassner) excluding 3rd party and generated code that is brought into the
|
# NOTE(Bryan Strassner) excluding 3rd party and generated code that is brought into the
|
||||||
# codebase.
|
# codebase.
|
||||||
exclude = .venv,.git,.tox,build,dist,*plugins/rest_api_plugin.py,*lib/python*,*egg,alembic/env.py,docs
|
exclude = .venv,.git,.tox,build,dist,*lib/python*,*egg,alembic/env.py,docs
|
||||||
|
|
Loading…
Reference in New Issue