Add Action API

This change introduces a large section of the API for the next major
version of Shipyard - the action api.  By interfacing with Airflow,
Shipyard will invoke workflows and allow for controlling and querying
status of those workflows. Foundationally, this patchset introduces
a lot of framework code for other apis, including error handling
to a common output format, database interaction for persistence of
action information, and use of oslo_config for configuration support.

Add GET all actions primary code - db connection not yet impl
Update base classes to have more structure
Add POST actions framework
Add GET action by id
Add GET of validations and steps
Add control api
Add unit tests of action api methods
Re-Removed duplicate deps from test reqs
Add routes for API
Removed a lot of code better handled by falcon directly
Cleaned up error flows- handlers and defaults
Refactored existing airflow tests to match standard output format
Updated json validation to be more specific
Added basic start for alembic
Added alembic upgrade at startup
Added table creation definitions
Added base revision for alembic upgrade
Bug fixes - DB queries, airflow comm, logic issues, logging issues
Bug fixes - date formats and alignment of keys between systems
Exclusions to bandit / tox.ini
Resolved merge conflicts with integration of auth
Update to use oslo config and PBR
Update the context middleware to check uuid in a less contentious way
Removed routes and resources for regions endpoint - not used
Add auth policies for action api
Restructure execptions to be consistent class hierarchy and common handler
Add generation of config and policy examples
Update tests to init configs
Update database configs to not use env. vars
Removed examples directory, it was no longer accurate
Addressed/removed several TODOs - left some behind as well
Aligned input to DAGs with action: header
Retrieved all sub-steps for dags
Expanded step information
Refactored auth handling for better logging
rename create_actions policy to create_action
removed some templated file comments in env.py generated by alembic
updated inconsistent exception parameters
updated to use ulid instead of uuid for action ids
added action control audit code per review suggestion
Fixed correlation date betwen dags/actions by more string parsing

Change-Id: I2f9ea5250923f45456aa86826e344fc055bba762
This commit is contained in:
Bryan Strassner 2017-08-25 17:57:27 -05:00
parent c84e91bad1
commit 38e58cfd30
60 changed files with 3883 additions and 1844 deletions

3
.gitignore vendored
View File

@ -99,3 +99,6 @@ ENV/
# mypy
.mypy_cache/
# Generated bogus docs
ChangeLog

13
AUTHORS Normal file
View File

@ -0,0 +1,13 @@
Alan Meadows <alan.meadows@gmail.com>
Anthony Lin <anthony.jclin@gmail.com>
Bryan Strassner <bryan.strassner@gmail.com>
Felipe Monteiro <felipe.monteiro@att.com>
Mark Burnett <mark.m.burnett@gmail.com>
One-Fine-Day <vd789v@att.com>
Pete Birley <pete@port.direct>
Rodolfo <rp2723@att.com>
Scott Hussey <sh8121@att.com>
Stacey Fletcher <staceylynnfletcher@gmail.com>
Tin Lam <tin@irrational.io>
Vamsi Krishna Surapureddi <vamsi.skrishna@gmail.com>
eanylin <anthony.jclin@gmail.com>

View File

@ -60,9 +60,6 @@ COPY ./ /home/shipyard/shipyard
# Copy entrypoint.sh to /home/shipyard
COPY entrypoint.sh /home/shipyard/entrypoint.sh
# Copy shipyard.conf to /home/shipyard
COPY ./shipyard_airflow/control/shipyard.conf /home/shipyard/shipyard.conf
# Change permissions
RUN chown -R shipyard: /home/shipyard \
&& chmod +x /home/shipyard/entrypoint.sh

69
alembic.ini Normal file
View File

@ -0,0 +1,69 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = alembic
# template used to generate migration files
# file_template = %%(rev)s_%%(slug)s
# max length of characters to apply to the
# "slug" field
#truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; this defaults
# to alembic/versions. When using multiple version
# directories, initial revisions must be specified with --version-path
# version_locations = %(here)s/bar %(here)s/bat alembic/versions
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
#Uses the envrionment variable instead: DB_CONN_SHIPYARD
sqlalchemy.url = NOT_APPLICABLE
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

1
alembic/README Normal file
View File

@ -0,0 +1 @@
Generic single-database configuration.

81
alembic/env.py Normal file
View File

@ -0,0 +1,81 @@
from __future__ import with_statement
import os
from logging.config import fileConfig
from alembic import context
from oslo_config import cfg
from sqlalchemy import create_engine, pool
# this is the shipyard config object
CONF = cfg.CONF
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.attributes.get('configure_logger', True):
fileConfig(config.config_file_name)
target_metadata = None
def get_url():
"""
Returns the url to use instead of using the alembic configuration
file
"""
return CONF.base.postgresql_db
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = get_url()
# Default code: url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url, target_metadata=target_metadata, literal_binds=True)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = create_engine(get_url())
# Default/generated code:
# connectable = engine_from_config(
# config.get_section(config.config_ini_section),
# prefix='sqlalchemy.',
# poolclass=pool.NullPool)
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

24
alembic/script.py.mako Normal file
View File

@ -0,0 +1,24 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade():
${upgrades if upgrades else "pass"}
def downgrade():
${downgrades if downgrades else "pass"}

View File

@ -0,0 +1,82 @@
"""initial shipyard base
Revision ID: 51b92375e5c4
Revises:
Create Date: 2017-09-12 11:12:23.768269
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy import (types, func)
from sqlalchemy.dialects import postgresql as pg
# revision identifiers, used by Alembic.
revision = '51b92375e5c4'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
"""
Create the initial tables needed by shipyard
26 character IDs are ULIDs. See: https://github.com/mdipierro/ulid
"""
op.create_table(
'actions',
# ULID key for the action
sa.Column('id', types.String(26), primary_key=True),
# The name of the action invoked
sa.Column('name', types.String(50), nullable=False),
# The parameters passed by the user to the action
sa.Column('parameters', pg.JSONB, nullable=True),
# The DAG/workflow name used in airflow, if applicable
sa.Column('dag_id', sa.Text, nullable=True),
# The DAG/workflow execution time string from airflow, if applicable
sa.Column('dag_execution_date', sa.Text, nullable=True),
# The invoking user
sa.Column('user', sa.Text, nullable=False),
# Timestamp of when an action was invoked
sa.Column('datetime',
types.TIMESTAMP(timezone=True),
server_default=func.now()),
# The user provided or shipayrd generated context marker
sa.Column('context_marker', types.String(36), nullable=False)
)
op.create_table(
'preflight_validation_failures',
# ID (ULID) of the preflight validation failure
sa.Column('id', types.String(26), primary_key=True),
# The ID of action this failure is associated with
sa.Column('action_id', types.String(26), nullable=False),
# The common language name of the validation that failed
sa.Column('validation_name', sa.Text, nullable=True),
# The text indicating details of the failure
sa.Column('details', sa.Text, nullable=True),
)
op.create_table(
'action_command_audit',
# ID (ULID) of the audit
sa.Column('id', types.String(26), primary_key=True),
# The ID of action this audit record
sa.Column('action_id', types.String(26), nullable=False),
# The text indicating command invoked
sa.Column('command', sa.Text, nullable=False),
# The user that invoked the command
sa.Column('user', sa.Text, nullable=False),
# Timestamp of when the command was invoked
sa.Column('datetime',
types.TIMESTAMP(timezone=True),
server_default=func.now()),
)
def downgrade():
"""
Remove the database objects created by this revision
"""
op.drop_table('actions')
op.drop_table('preflight_validation_failures')
op.drop_table('action_command_audit')

View File

@ -276,7 +276,7 @@ Returns the details for a step by id for the given action by Id.
* 200 OK
---
### /v1.0/actions/{action_id}/{control_verb}
### /v1.0/actions/{action_id}/control/{control_verb}
Allows for issuing DAG controls against an action.
#### Payload Structure

View File

@ -14,7 +14,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# Start shipyard application
exec uwsgi --http :9000 -w shipyard_airflow.shipyard --callable shipyard --enable-threads -L
exec uwsgi \
--http :9000 \
--paste config:/etc/shipyard/api-paste.ini \
--enable-threads \
-L \
--pyargv "--config-file /etc/shipyard/shipyard.conf"

View File

@ -0,0 +1,27 @@
# Actions requiring admin authority
#"admin_required": "role:admin"
# List workflow actions invoked by users
# GET /api/v1.0/actions
#"workflow_orchestrator:list_actions": "rule:admin_required"
# Create a workflow action
# POST /api/v1.0/actions
#"workflow_orchestrator:create_actions": "rule:admin_required"
# Retreive an action by its id
# GET /api/v1.0/actions/{action_id}
#"workflow_orchestrator:get_action": "rule:admin_required"
# Retreive an action step by its id
# GET /api/v1.0/actions/{action_id}/steps/{step_id}
#"workflow_orchestrator:get_action_step": "rule:admin_required"
# Retreive an action validation by its id
# GET /api/v1.0/actions/{action_id}/validations/{validation_id}
#"workflow_orchestrator:get_action_validation": "rule:admin_required"
# Send a control to an action
# POST /api/v1.0/actions/{action_id}/control/{control_verb}
#"workflow_orchestrator:invoke_action_control": "rule:admin_required"

View File

@ -0,0 +1,310 @@
[DEFAULT]
[armada]
#
# From shipyard_airflow
#
# FQDN for the armada service (string value)
#host = armada-int.ucp
# Port for the armada service (integer value)
#port = 8000
[base]
#
# From shipyard_airflow
#
# The web server for Airflow (string value)
#web_server = http://localhost:32080
# The database for shipyard (string value)
#postgresql_db = postgresql+psycopg2://shipyard:changeme@postgresql.ucp:5432/shipyard
# The database for airflow (string value)
#postgresql_airflow_db = postgresql+psycopg2://shipyard:changeme@postgresql.ucp:5432/airflow
# The direcotry containing the alembic.ini file (string value)
#alembic_ini_path = /home/shipyard/shipyard
# Upgrade the database on startup (boolean value)
#upgrade_db = true
[deckhand]
#
# From shipyard_airflow
#
# FQDN for the deckhand service (string value)
#host = deckhand-int.ucp
# Port for the deckhand service (integer value)
#port = 80
[drydock]
#
# From shipyard_airflow
#
# FQDN for the drydock service (string value)
#host = drydock-int.ucp
# Port for the drydock service (integer value)
#port = 9000
# TEMPORARY: password for drydock (string value)
#token = bigboss
# TEMPORARY: location of drydock yaml file (string value)
#site_yaml = /usr/local/airflow/plugins/drydock.yaml
# TEMPORARY: location of promenade yaml file (string value)
#prom_yaml = /usr/local/airflow/plugins/promenade.yaml
[healthcheck]
#
# From shipyard_airflow
#
# Schema to perform health check with (string value)
#schema = http
# Health check standard endpoint (string value)
#endpoint = /api/v1.0/health
[keystone]
#
# From shipyard_airflow
#
# The url for OpenStack Authentication (string value)
#OS_AUTH_URL = http://keystone-api.ucp:80/v3
# OpenStack project name (string value)
#OS_PROJECT_NAME = service
# The OpenStack user domain name (string value)
#OS_USER_DOMAIN_NAME = Default
# The OpenStack username (string value)
#OS_USERNAME = shipyard
# THe OpenStack password for the shipyard svc acct (string value)
#OS_PASSWORD = password
# The OpenStack user domain name (string value)
#OS_REGION_NAME = Regionone
# The OpenStack identity api version (integer value)
#OS_IDENTITY_API_VERSION = 3
[keystone_authtoken]
#
# From keystonemiddleware.auth_token
#
# Complete "public" Identity API endpoint. This endpoint should not be an
# "admin" endpoint, as it should be accessible by all end users.
# Unauthenticated clients are redirected to this endpoint to authenticate.
# Although this endpoint should ideally be unversioned, client support in the
# wild varies. If you're using a versioned v2 endpoint here, then this should
# *not* be the same endpoint the service user utilizes for validating tokens,
# because normal end users may not be able to reach that endpoint. (string
# value)
#auth_uri = <None>
# API version of the admin Identity API endpoint. (string value)
#auth_version = <None>
# Do not handle authorization requests within the middleware, but delegate the
# authorization decision to downstream WSGI components. (boolean value)
#delay_auth_decision = false
# Request timeout value for communicating with Identity API server. (integer
# value)
#http_connect_timeout = <None>
# How many times are we trying to reconnect when communicating with Identity
# API Server. (integer value)
#http_request_max_retries = 3
# Request environment key where the Swift cache object is stored. When
# auth_token middleware is deployed with a Swift cache, use this option to have
# the middleware share a caching backend with swift. Otherwise, use the
# ``memcached_servers`` option instead. (string value)
#cache = <None>
# Required if identity server requires client certificate (string value)
#certfile = <None>
# Required if identity server requires client certificate (string value)
#keyfile = <None>
# A PEM encoded Certificate Authority to use when verifying HTTPs connections.
# Defaults to system CAs. (string value)
#cafile = <None>
# Verify HTTPS connections. (boolean value)
#insecure = false
# The region in which the identity server can be found. (string value)
#region_name = <None>
# DEPRECATED: Directory used to cache files related to PKI tokens. This option
# has been deprecated in the Ocata release and will be removed in the P
# release. (string value)
# This option is deprecated for removal since Ocata.
# Its value may be silently ignored in the future.
# Reason: PKI token format is no longer supported.
#signing_dir = <None>
# Optionally specify a list of memcached server(s) to use for caching. If left
# undefined, tokens will instead be cached in-process. (list value)
# Deprecated group/name - [keystone_authtoken]/memcache_servers
#memcached_servers = <None>
# In order to prevent excessive effort spent validating tokens, the middleware
# caches previously-seen tokens for a configurable duration (in seconds). Set
# to -1 to disable caching completely. (integer value)
#token_cache_time = 300
# DEPRECATED: Determines the frequency at which the list of revoked tokens is
# retrieved from the Identity service (in seconds). A high number of revocation
# events combined with a low cache duration may significantly reduce
# performance. Only valid for PKI tokens. This option has been deprecated in
# the Ocata release and will be removed in the P release. (integer value)
# This option is deprecated for removal since Ocata.
# Its value may be silently ignored in the future.
# Reason: PKI token format is no longer supported.
#revocation_cache_time = 10
# (Optional) If defined, indicate whether token data should be authenticated or
# authenticated and encrypted. If MAC, token data is authenticated (with HMAC)
# in the cache. If ENCRYPT, token data is encrypted and authenticated in the
# cache. If the value is not one of these options or empty, auth_token will
# raise an exception on initialization. (string value)
# Allowed values: None, MAC, ENCRYPT
#memcache_security_strategy = None
# (Optional, mandatory if memcache_security_strategy is defined) This string is
# used for key derivation. (string value)
#memcache_secret_key = <None>
# (Optional) Number of seconds memcached server is considered dead before it is
# tried again. (integer value)
#memcache_pool_dead_retry = 300
# (Optional) Maximum total number of open connections to every memcached
# server. (integer value)
#memcache_pool_maxsize = 10
# (Optional) Socket timeout in seconds for communicating with a memcached
# server. (integer value)
#memcache_pool_socket_timeout = 3
# (Optional) Number of seconds a connection to memcached is held unused in the
# pool before it is closed. (integer value)
#memcache_pool_unused_timeout = 60
# (Optional) Number of seconds that an operation will wait to get a memcached
# client connection from the pool. (integer value)
#memcache_pool_conn_get_timeout = 10
# (Optional) Use the advanced (eventlet safe) memcached client pool. The
# advanced pool will only work under python 2.x. (boolean value)
#memcache_use_advanced_pool = false
# (Optional) Indicate whether to set the X-Service-Catalog header. If False,
# middleware will not ask for service catalog on token validation and will not
# set the X-Service-Catalog header. (boolean value)
#include_service_catalog = true
# Used to control the use and type of token binding. Can be set to: "disabled"
# to not check token binding. "permissive" (default) to validate binding
# information if the bind type is of a form known to the server and ignore it
# if not. "strict" like "permissive" but if the bind type is unknown the token
# will be rejected. "required" any form of token binding is needed to be
# allowed. Finally the name of a binding method that must be present in tokens.
# (string value)
#enforce_token_bind = permissive
# DEPRECATED: If true, the revocation list will be checked for cached tokens.
# This requires that PKI tokens are configured on the identity server. (boolean
# value)
# This option is deprecated for removal since Ocata.
# Its value may be silently ignored in the future.
# Reason: PKI token format is no longer supported.
#check_revocations_for_cached = false
# DEPRECATED: Hash algorithms to use for hashing PKI tokens. This may be a
# single algorithm or multiple. The algorithms are those supported by Python
# standard hashlib.new(). The hashes will be tried in the order given, so put
# the preferred one first for performance. The result of the first hash will be
# stored in the cache. This will typically be set to multiple values only while
# migrating from a less secure algorithm to a more secure one. Once all the old
# tokens are expired this option should be set to a single value for better
# performance. (list value)
# This option is deprecated for removal since Ocata.
# Its value may be silently ignored in the future.
# Reason: PKI token format is no longer supported.
#hash_algorithms = md5
# A choice of roles that must be present in a service token. Service tokens are
# allowed to request that an expired token can be used and so this check should
# tightly control that only actual services should be sending this token. Roles
# here are applied as an ANY check so any role in this list must be present.
# For backwards compatibility reasons this currently only affects the
# allow_expired check. (list value)
#service_token_roles = service
# For backwards compatibility reasons we must let valid service tokens pass
# that don't pass the service_token_roles check as valid. Setting this true
# will become the default in a future release and should be enabled if
# possible. (boolean value)
#service_token_roles_required = false
# Authentication type to load (string value)
# Deprecated group/name - [keystone_authtoken]/auth_plugin
#auth_type = <None>
# Config Section from which to load plugin specific options (string value)
#auth_section = <None>
[logging]
#
# From shipyard_airflow
#
# The default logging level for the root logger. ERROR=40, WARNING=30, INFO=20,
# DEBUG=10 (integer value)
#log_level = 10
[shipyard]
#
# From shipyard_airflow
#
# FQDN for the shipyard service (string value)
#host = shipyard-int.ucp
# Port for the shipyard service (integer value)
#port = 9000

View File

@ -1,60 +0,0 @@
# Shipyard Manifests
----
Shipyard manifests contain the examination of the payloads that the shipyard api will receive.
A complete manifest will consist of multiple yaml file's assembled in some way. Each yaml file will follow
Kubernetes style artifact definition.
The high level expectation of what the data on this manifests will define is pictured here :
<img src="https://github.com/att-comdev/shipyard/examples/manifests/manifest_hierarchy.png" width="100">
----
## region_manifest.yaml
Region is the largest resource shipyard can understand.
A region manifest will need to define :
- Identity of the Region. Perhaps a name will suffice, but a UUID generated by shipyard might be applicable as well.
- Cloud : The type of cloud this region is running on. i.e. AIC, or AWS, or Google etc.
- deployOn : Whether the region UCP ( undercloud) is been deployed on VM's or Baremetal
----
## servers.yaml
----
## network.yaml
----
## hw_definition.yaml
----
## host_profile.yaml
----
## services.yaml
Will define high level needs for all the services that need to run above the undercloud
It relates to the files :
## core_services.yaml
## clcp_services.yaml
## onap_services.yaml
## cdp_services.yaml
----
## undercloud.yaml
This file will incude the configuration aspects of the undercloud that are tunnables.
Such as :
i.e.
-Security
-RBAC definitions
-Certificates
-UCP Tunnables
-Kernel Tunnables, etc
-Agent Tunnables

View File

@ -1,151 +0,0 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
####################
#
# bootstrap_seed.yaml - Site server design definition for physical layer
#
####################
# version the schema in this file so consumers can rationally parse it
---
apiVersion: 'v1.0'
kind: HostProfile
metadata:
name: default
region: sitename
date: 17-FEB-2017
author: sh8121@att.com
description: Describe layer 2/3 attributes. Primarily CIs used for configuring server interfaces
# No magic to this host_profile, it just provides a way to specify
# sitewide settings. If it is absent from a node's inheritance chain
# then these values will NOT be applied
spec:
# OOB (iLO, iDRAC, etc...) settings. Should prefer open standards such
# as IPMI over vender-specific when possible.
oob:
type: ipmi
# OOB networking should be preconfigured, but we can include a network
# definition for validation or enhancement (DNS registration)
network: oob
account: admin
credential: admin
# Specify storage layout of base OS. Ceph out of scope
storage:
# How storage should be carved up: lvm (logical volumes), flat
# (single partition)
layout: lvm
# Info specific to the boot and root disk/partitions
bootdisk:
# Device will specify an alias defined in hwdefinition.yaml
device: primary_boot
# For LVM, the size of the partition added to VG as a PV
# For flat, the size of the partition formatted as ext4
root_size: 50g
# The /boot partition. If not specified, /boot will in root
boot_size: 2g
# Info for additional partitions. Need to balance between
# flexibility and complexity
partitions:
- name: logs
device: primary_boot
# Partition uuid if needed
part_uuid: 84db9664-f45e-11e6-823d-080027ef795a
size: 10g
# Optional, can carve up unformatted block devices
mountpoint: /var/log
fstype: ext4
mount_options: defaults
# Filesystem UUID or label can be specified. UUID recommended
fs_uuid: cdb74f1c-9e50-4e51-be1d-068b0e9ff69e
fs_label: logs
# Platform (Operating System) settings
platform:
image: ubuntu_16.04_hwe
kernel_params: default
# Additional metadata to apply to a node
metadata:
# Base URL of the introspection service - may go in curtin data
introspection_url: http://172.16.1.10:9090
---
apiVersion: 'v1.0'
kind: HostProfile
metadata:
name: k8-node
region: sitename
date: 17-FEB-2017
author: sh8121@att.com
description: Describe layer 2/3 attributes. Primarily CIs used for configuring server interfaces
spec:
# host_profile inheritance allows for deduplication of common CIs
# Inheritance is additive for CIs that are lists of multiple items
# To remove an inherited list member, prefix the primary key value
# with '!'.
host_profile: defaults
# Hardware profile will map hardware specific details to the abstract
# names uses in the host profile as well as specify hardware specific
# configs. A viable model should be to build a host profile without a
# hardware_profile and then for each node inherit the host profile and
# specify a hardware_profile to map that node's hardware to the abstract
# settings of the host_profile
hardware_profile: HPGen9v3
# Network interfaces.
interfaces:
# Keyed on device_name
# pxe is a special marker indicating which device should be used for pxe boot
- device_name: pxe
# The network link attached to this
network_link: pxe
# Slaves will specify aliases from hwdefinition.yaml
slaves:
- prim_nic01
# Which networks will be configured on this interface
networks:
- name: pxe
- device_name: bond0
network_link: gp
# If multiple slaves are specified, but no bonding config
# is applied to the link, design validation will fail
slaves:
- prim_nic01
- prim_nic02
# If multiple networks are specified, but no trunking
# config is applied to the link, design validation will fail
networks:
- name: mgmt
- name: private
metadata:
# Explicit tag assignment
tags:
- 'test'
# MaaS supports key/value pairs. Not sure of the use yet
owner_data:
foo: bar
---
apiVersion: 'v1.0'
kind: HostProfile
metadata:
name: k8-node-public
region: sitename
date: 17-FEB-2017
author: sh8121@att.com
description: Describe layer 2/3 attributes. Primarily CIs used for configuring server interfaces
spec:
host_profile: k8-node
interfaces:
- device_name: bond0
networks:
# This is additive, so adds a network to those defined in the host_profile
# inheritance chain
- name: public
---

View File

@ -1,58 +0,0 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#############################################################################
#
# bootstrap_hwdefinition.yaml - Definitions of server hardware layout
#
#############################################################################
# version the schema in this file so consumers can rationally parse it
---
apiVersion: 'v1.0'
kind: HardwareProfile
metadata:
name: HPGen8v3
region: sitename
date: 17-FEB-2017
description: Sample hardware definition
author: Scott Hussey
spec:
# Vendor of the server chassis
vendor: HP
# Generation of the chassis model
generation: '8'
# Version of the chassis model within its generation - not version of the hardware definition
hw_version: '3'
# The certified version of the chassis BIOS
bios_version: '2.2.3'
# Mode of the default boot of hardware - bios, uefi
boot_mode: bios
# Protocol of boot of the hardware - pxe, usb, hdd
bootstrap_protocol: pxe
# Which interface to use for network booting within the OOB manager, not OS device
pxe_interface: 0
# Map hardware addresses to aliases/roles to allow a mix of hardware configs
# in a site to result in a consistent configuration
device_aliases:
pci:
- address: pci@0000:00:03.0
alias: prim_nic01
# type could identify expected hardware - used for hardware manifest validation
type: '82540EM Gigabit Ethernet Controller'
- address: pci@0000:00:04.0
alias: prim_nic02
type: '82540EM Gigabit Ethernet Controller'
scsi:
- address: scsi@2:0.0.0
alias: primary_boot
type: 'VBOX HARDDISK'

Binary file not shown.

Before

Width:  |  Height:  |  Size: 110 KiB

View File

@ -1,230 +0,0 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
####################
#
# network.yaml - Network infor,ation design definition for physical layer
#
####################
# version the schema in this file so consumers can rationally parse it
---
---
apiVersion: 'v1.0'
kind: NetworkLink
metadata:
name: oob
region: sitename
date: 17-FEB-2017
author: sh8121@att.com
description: Describe layer 1 attributes. Primary key is 'name'. These settings will generally be things the switch and server have to agree on
spec:
bonding:
mode: none
mtu: 1500
linkspeed: 100full
trunking:
mode: none
default_network: oob
---
# pxe is a bit of 'magic' indicating the link config used when PXE booting
# a node. All other links indicate network configs applied when the node
# is deployed.
apiVersion: 'v1.0'
kind: NetworkLink
metadata:
name: pxe
region: sitename
date: 17-FEB-2017
author: sh8121@att.com
description: Describe layer 1 attributes. Primary key is 'name'. These settings will generally be things the switch and server have to agree on
spec:
bonding:
mode: none
mtu: 1500
linkspeed: auto
# Is this link supporting multiple layer 2 networks?
# none is a port-based VLAN identified by default_network
# tagged is is using 802.1q VLAN tagging. Untagged packets will default to default_netwokr
trunking:
mode: none
# use name, will translate to VLAN ID
default_network: pxe
---
apiVersion: 'v1.0'
kind: NetworkLink
metadata:
name: gp
region: sitename
date: 17-FEB-2017
author: sh8121@att.com
description: Describe layer 1 attributes. These CIs will generally be things the switch and server have to agree on
# pxe is a bit of 'magic' indicating the link config used when PXE booting
# a node. All other links indicate network configs applied when the node
# is deployed.
spec:
# If this link is a bond of physical links, how is it configured
# 802.3ad
# active-backup
# balance-rr
# Can add support for others down the road
bonding:
mode: 802.3ad
# For LACP (802.3ad) xmit hashing policy: layer2, layer2+3, layer3+4, encap3+4
hash: layer3+4
# 802.3ad specific options
peer_rate: slow
mon_rate: default
up_delay: default
down_delay: default
mtu: 9000
linkspeed: auto
# Is this link supporting multiple layer 2 networks?
trunking:
mode: tagged
default_network: mgmt
---
apiVersion: 'v1.0'
kind: Network
metadata:
name: oob
region: sitename
date: 17-FEB-2017
author: sh8121@att.com
description: Describe layer 2/3 attributes. Primarily CIs used for configuring server interfaces
spec:
allocation: static
cidr: 172.16.100.0/24
ranges:
- type: static
start: 172.16.100.15
end: 172.16.100.254
dns:
domain: ilo.sitename.att.com
servers: 172.16.100.10
---
apiVersion: 'v1.0'
kind: Network
metadata:
name: pxe
region: sitename
date: 17-FEB-2017
author: sh8121@att.com
description: Describe layer 2/3 attributes. Primarily CIs used for configuring server interfaces
spec:
# Layer 2 VLAN segment id, could support other segmentations. Optional
vlan_id: '99'
# How are addresses assigned?
allocation: dhcp
# MTU for this VLAN interface, if not specified it will be inherited from the link
mtu: 1500
# Network address
cidr: 172.16.0.0/24
# Desribe IP address ranges
ranges:
- type: dhcp
start: 172.16.0.5
end: 172.16.0.254
# DNS settings for this network
dns:
# Domain addresses on this network will be registered under
domain: admin.sitename.att.com
# DNS servers that a server using this network as its default gateway should use
servers: 172.16.0.10
---
apiVersion: 'v1.0'
kind: Network
metadata:
name: mgmt
region: sitename
date: 17-FEB-2017
author: sh8121@att.com
description: Describe layer 2/3 attributes. Primarily CIs used for configuring server interfaces
spec:
vlan_id: '100'
# How are addresses assigned?
allocation: static
# Allow MTU to be inherited from link the network rides on
mtu: 1500
# Network address
cidr: 172.16.1.0/24
# Desribe IP address ranges
ranges:
- type: static
start: 172.16.1.15
end: 172.16.1.254
# Static routes to be added for this network
routes:
- subnet: 0.0.0.0/0
# A blank gateway would leave to a static route specifying
# only the interface as a source
gateway: 172.16.1.1
metric: 10
# DNS settings for this network
dns:
# Domain addresses on this network will be registered under
domain: mgmt.sitename.example.com
# DNS servers that a server using this network as its default gateway should use
servers: 172.16.1.9,172.16.1.10
---
apiVersion: 'v1.0'
kind: Network
metadata:
name: private
region: sitename
date: 17-FEB-2017
author: sh8121@att.com
description: Describe layer 2/3 attributes. Primarily CIs used for configuring server interfaces
spec:
vlan_id: '101'
allocation: static
mtu: 9000
cidr: 172.16.2.0/24
# Desribe IP address ranges
ranges:
# Type can be reserved (not used for baremetal), static (all explicit
# assignments should fall here), dhcp (will be used by a DHCP server on this network)
- type: static
start: 172.16.2.15
end: 172.16.2.254
dns:
domain: priv.sitename.example.com
servers: 172.16.2.9,172.16.2.10
---
apiVersion: 'v1.0'
kind: Network
metadata:
name: public
region: sitename
date: 17-FEB-2017
author: sh8121@att.com
description: Describe layer 2/3 attributes. Primarily CIs used for configuring server interfaces
spec:
vlan_id: '102'
# How are addresses assigned?
allocation: static
# MTU size for the VLAN interface
mtu: 1500
cidr: 172.16.3.0/24
# Desribe IP address ranges
ranges:
- type: static
start: 172.16.3.15
end: 172.16.3.254
routes:
- subnet: 0.0.0.0/0
gateway: 172.16.3.1
metric: 9
dns:
domain: sitename.example.com
servers: 8.8.8.8

View File

@ -1,60 +0,0 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
####################
#
# region_manifest.yaml - Region Manifest File , encapsulates the multiple files
#
####################
# version
---
#
# This describes the Global details of a Region
#
apiVersion: 'v1.0'
kind: Region
metadata:
name: sitename
date: 17-FEB-2017
description: Sample site design
author: sh8121@att.com
spec:
-------
imports:
# Servers will include the list of Servers
# For Each Server it includes
# information such as :
# # OOB (iLO, iDRAC, etc...) settings. Should prefer open standards such
# as IPMI over vender-specific when possible.
# oob:
# type: ipmi
# OOB networking should be preconfigured, but we can include a network
# definition for validation or enhancement (DNS registration)
# Specify storage layout of base OS. Ceph out of scope
# storage:
# How storage should be carved up: lvm (logical volumes), flat
# (single partition)
# Platform (Operating System) settings
# platform:
# Additional metadata to apply to a node
@ metadata:
- 'servers.yaml'
- 'network.yaml'
- 'hwdefinition.yaml'
- 'hostprofile.yaml'

View File

@ -1,420 +0,0 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
####################
#
# bootstrap_seed.yaml - Site server design definition for physical layer
#
####################
# version the schema in this file so consumers can rationally parse it
---
apiVersion: 'v1.0'
kind: Region
metadata:
name: sitename
date: 17-FEB-2017
description: Sample site design
author: sh8121@att.com
spec:
# Not sure if we have site wide data that doesn't fall into another 'Kind'
---
apiVersion: 'v1.0'
kind: NetworkLink
metadata:
name: oob
region: sitename
date: 17-FEB-2017
author: sh8121@att.com
description: Describe layer 1 attributes. Primary key is 'name'. These settings will generally be things the switch and server have to agree on
spec:
bonding:
mode: none
mtu: 1500
linkspeed: 100full
trunking:
mode: none
default_network: oob
---
# pxe is a bit of 'magic' indicating the link config used when PXE booting
# a node. All other links indicate network configs applied when the node
# is deployed.
apiVersion: 'v1.0'
kind: NetworkLink
metadata:
name: pxe
region: sitename
date: 17-FEB-2017
author: sh8121@att.com
description: Describe layer 1 attributes. Primary key is 'name'. These settings will generally be things the switch and server have to agree on
spec:
bonding:
mode: none
mtu: 1500
linkspeed: auto
# Is this link supporting multiple layer 2 networks?
# none is a port-based VLAN identified by default_network
# tagged is is using 802.1q VLAN tagging. Untagged packets will default to default_netwokr
trunking:
mode: none
# use name, will translate to VLAN ID
default_network: pxe
---
apiVersion: 'v1.0'
kind: NetworkLink
metadata:
name: gp
region: sitename
date: 17-FEB-2017
author: sh8121@att.com
description: Describe layer 1 attributes. These CIs will generally be things the switch and server have to agree on
# pxe is a bit of 'magic' indicating the link config used when PXE booting
# a node. All other links indicate network configs applied when the node
# is deployed.
spec:
# If this link is a bond of physical links, how is it configured
# 802.3ad
# active-backup
# balance-rr
# Can add support for others down the road
bonding:
mode: 802.3ad
# For LACP (802.3ad) xmit hashing policy: layer2, layer2+3, layer3+4, encap3+4
hash: layer3+4
# 802.3ad specific options
peer_rate: slow
mon_rate: default
up_delay: default
down_delay: default
mtu: 9000
linkspeed: auto
# Is this link supporting multiple layer 2 networks?
trunking:
mode: tagged
default_network: mgmt
---
apiVersion: 'v1.0'
kind: Network
metadata:
name: oob
region: sitename
date: 17-FEB-2017
author: sh8121@att.com
description: Describe layer 2/3 attributes. Primarily CIs used for configuring server interfaces
spec:
allocation: static
cidr: 172.16.100.0/24
ranges:
- type: static
start: 172.16.100.15
end: 172.16.100.254
dns:
domain: ilo.sitename.att.com
servers: 172.16.100.10
---
apiVersion: 'v1.0'
kind: Network
metadata:
name: pxe
region: sitename
date: 17-FEB-2017
author: sh8121@att.com
description: Describe layer 2/3 attributes. Primarily CIs used for configuring server interfaces
spec:
# Layer 2 VLAN segment id, could support other segmentations. Optional
vlan_id: '99'
# How are addresses assigned?
allocation: dhcp
# MTU for this VLAN interface, if not specified it will be inherited from the link
mtu: 1500
# Network address
cidr: 172.16.0.0/24
# Desribe IP address ranges
ranges:
- type: dhcp
start: 172.16.0.5
end: 172.16.0.254
# DNS settings for this network
dns:
# Domain addresses on this network will be registered under
domain: admin.sitename.att.com
# DNS servers that a server using this network as its default gateway should use
servers: 172.16.0.10
---
apiVersion: 'v1.0'
kind: Network
metadata:
name: mgmt
region: sitename
date: 17-FEB-2017
author: sh8121@att.com
description: Describe layer 2/3 attributes. Primarily CIs used for configuring server interfaces
spec:
vlan_id: '100'
# How are addresses assigned?
allocation: static
# Allow MTU to be inherited from link the network rides on
mtu: 1500
# Network address
cidr: 172.16.1.0/24
# Desribe IP address ranges
ranges:
- type: static
start: 172.16.1.15
end: 172.16.1.254
# Static routes to be added for this network
routes:
- subnet: 0.0.0.0/0
# A blank gateway would leave to a static route specifying
# only the interface as a source
gateway: 172.16.1.1
metric: 10
# DNS settings for this network
dns:
# Domain addresses on this network will be registered under
domain: mgmt.sitename.example.com
# DNS servers that a server using this network as its default gateway should use
servers: 172.16.1.9,172.16.1.10
---
apiVersion: 'v1.0'
kind: Network
metadata:
name: private
region: sitename
date: 17-FEB-2017
author: sh8121@att.com
description: Describe layer 2/3 attributes. Primarily CIs used for configuring server interfaces
spec:
vlan_id: '101'
allocation: static
mtu: 9000
cidr: 172.16.2.0/24
# Desribe IP address ranges
ranges:
# Type can be reserved (not used for baremetal), static (all explicit
# assignments should fall here), dhcp (will be used by a DHCP server on this network)
- type: static
start: 172.16.2.15
end: 172.16.2.254
dns:
domain: priv.sitename.example.com
servers: 172.16.2.9,172.16.2.10
---
apiVersion: 'v1.0'
kind: Network
metadata:
name: public
region: sitename
date: 17-FEB-2017
author: sh8121@att.com
description: Describe layer 2/3 attributes. Primarily CIs used for configuring server interfaces
spec:
vlan_id: '102'
# How are addresses assigned?
allocation: static
# MTU size for the VLAN interface
mtu: 1500
cidr: 172.16.3.0/24
# Desribe IP address ranges
ranges:
- type: static
start: 172.16.3.15
end: 172.16.3.254
routes:
- subnet: 0.0.0.0/0
gateway: 172.16.3.1
metric: 9
dns:
domain: sitename.example.com
servers: 8.8.8.8
---
apiVersion: 'v1.0'
kind: HostProfile
metadata:
name: default
region: sitename
date: 17-FEB-2017
author: sh8121@att.com
description: Describe layer 2/3 attributes. Primarily CIs used for configuring server interfaces
# No magic to this host_profile, it just provides a way to specify
# sitewide settings. If it is absent from a node's inheritance chain
# then these values will NOT be applied
spec:
# OOB (iLO, iDRAC, etc...) settings. Should prefer open standards such
# as IPMI over vender-specific when possible.
oob:
type: ipmi
# OOB networking should be preconfigured, but we can include a network
# definition for validation or enhancement (DNS registration)
network: oob
account: admin
credential: admin
# Specify storage layout of base OS. Ceph out of scope
storage:
# How storage should be carved up: lvm (logical volumes), flat
# (single partition)
layout: lvm
# Info specific to the boot and root disk/partitions
bootdisk:
# Device will specify an alias defined in hwdefinition.yaml
device: primary_boot
# For LVM, the size of the partition added to VG as a PV
# For flat, the size of the partition formatted as ext4
root_size: 50g
# The /boot partition. If not specified, /boot will in root
boot_size: 2g
# Info for additional partitions. Need to balance between
# flexibility and complexity
partitions:
- name: logs
device: primary_boot
# Partition uuid if needed
part_uuid: 84db9664-f45e-11e6-823d-080027ef795a
size: 10g
# Optional, can carve up unformatted block devices
mountpoint: /var/log
fstype: ext4
mount_options: defaults
# Filesystem UUID or label can be specified. UUID recommended
fs_uuid: cdb74f1c-9e50-4e51-be1d-068b0e9ff69e
fs_label: logs
# Platform (Operating System) settings
platform:
image: ubuntu_16.04_hwe
kernel_params: default
# Additional metadata to apply to a node
metadata:
# Base URL of the introspection service - may go in curtin data
introspection_url: http://172.16.1.10:9090
---
apiVersion: 'v1.0'
kind: HostProfile
metadata:
name: k8-node
region: sitename
date: 17-FEB-2017
author: sh8121@att.com
description: Describe layer 2/3 attributes. Primarily CIs used for configuring server interfaces
spec:
# host_profile inheritance allows for deduplication of common CIs
# Inheritance is additive for CIs that are lists of multiple items
# To remove an inherited list member, prefix the primary key value
# with '!'.
host_profile: defaults
# Hardware profile will map hardware specific details to the abstract
# names uses in the host profile as well as specify hardware specific
# configs. A viable model should be to build a host profile without a
# hardware_profile and then for each node inherit the host profile and
# specify a hardware_profile to map that node's hardware to the abstract
# settings of the host_profile
hardware_profile: HPGen9v3
# Network interfaces.
interfaces:
# Keyed on device_name
# pxe is a special marker indicating which device should be used for pxe boot
- device_name: pxe
# The network link attached to this
network_link: pxe
# Slaves will specify aliases from hwdefinition.yaml
slaves:
- prim_nic01
# Which networks will be configured on this interface
networks:
- name: pxe
- device_name: bond0
network_link: gp
# If multiple slaves are specified, but no bonding config
# is applied to the link, design validation will fail
slaves:
- prim_nic01
- prim_nic02
# If multiple networks are specified, but no trunking
# config is applied to the link, design validation will fail
networks:
- name: mgmt
- name: private
metadata:
# Explicit tag assignment
tags:
- 'test'
# MaaS supports key/value pairs. Not sure of the use yet
owner_data:
foo: bar
---
apiVersion: 'v1.0'
kind: HostProfile
metadata:
name: k8-node-public
region: sitename
date: 17-FEB-2017
author: sh8121@att.com
description: Describe layer 2/3 attributes. Primarily CIs used for configuring server interfaces
spec:
host_profile: k8-node
interfaces:
- device_name: bond0
networks:
# This is additive, so adds a network to those defined in the host_profile
# inheritance chain
- name: public
---
apiVersion: 'v1.0'
kind: BaremetalNode
metadata:
name: controller01
region: sitename
date: 17-FEB-2017
author: sh8121@att.com
description: Describe layer 2/3 attributes. Primarily CIs used for configuring server interfaces
spec:
host_profile: k8-node-public
# the hostname for a server, could be used in multiple DNS domains to
# represent different interfaces
interfaces:
- device_name: bond0
networks:
# '!' prefix for the value of the primary key indicates a record should be removed
- name: '!private'
# Addresses assigned to network interfaces
addressing:
# Which network the address applies to. If a network appears in addressing
# that isn't assigned to an interface, design validation will fail
- network: pxe
# The address assigned. Either a explicit IPv4 or IPv6 address
# or dhcp or slaac
address: dhcp
- network: mgmt
address: 172.16.1.20
- network: public
address: 172.16.3.20
metadata:
tags:
- os_ctl
rack: rack01
---
apiVersion: 'v1.0'
kind: BaremetalNode
metadata:
name: compute01
region: sitename
date: 17-FEB-2017
author: sh8121@att.com
description: Describe layer 2/3 attributes. Primarily CIs used for configuring server interfaces
spec:
host_profile: k8-node
addressing:
- network: pxe
address: dhcp
- network: mgmt
address: 172.16.1.21
- network: private
address: 172.16.2.21

View File

@ -0,0 +1,5 @@
[DEFAULT]
output_file = etc/shipyard/shipyard.conf.sample
wrap_width=79
namespace = shipyard_airflow
namespace = keystonemiddleware.auth_token

View File

@ -0,0 +1,3 @@
[DEFAULT]
output_file = etc/shipyard/policy.yaml.sample
namespace = shipyard_airflow

View File

@ -12,16 +12,20 @@
# See the License for the specific language governing permissions and
# limitations under the License.
PasteDeploy==1.5.2
keystonemiddleware==4.17.0
falcon==1.2.0
python-dateutil==2.6.1
requests==2.18.4
uwsgi==2.0.15
alembic==0.9.5
configparser==3.5.0
python-openstackclient==3.11.0
SQLAlchemy==1.1.13
psycopg2==2.7.3.1
falcon==1.2.0
jsonschema==2.6.0
keystoneauth1==2.13.0
keystonemiddleware==4.17.0
oslo.config==4.11.0
oslo.policy==1.25.1
keystoneauth1==2.13.0
PasteDeploy==1.5.2
pbr!=2.1.0,>=2.0.0 # Apache-2.0
psycopg2==2.7.3.1
python-dateutil==2.6.1
python-openstackclient==3.11.0
requests==2.18.4
SQLAlchemy==1.1.13
ulid==1.1
uwsgi==2.0.15

28
setup.cfg Normal file
View File

@ -0,0 +1,28 @@
[metadata]
name = shipyard
summary = Directed acyclic graph controller for Kubernetes and OpenStack control plane life cycle management
description-file = README.md
author = undercloud team
home-page = https://github.com/att-comdev/shipyard
classifier =
Intended Audience :: Information Technology
Intended Audience :: System Administrators
License :: OSI Approved :: Apache Software License
Operating System :: POSIX :: Linux
Programming Language :: Python
Programming Language :: Python :: 3
Programming Language :: Python :: 3.5
[files]
packages =
shipyard_airflow
[entry_points]
oslo.config.opts =
shipyard_airflow = shipyard_airflow.conf.opts:list_opts
oslo.policy.policies =
shipyard_airflow = shipyard_airflow.policy:list_policies
[build_sphinx]
warning-is-error = True

View File

@ -11,28 +11,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import setuptools
from setuptools import setup
setup(
name='shipyard_airflow',
version='0.1a1',
description='API for managing Airflow-based orchestration',
url='http://github.com/att-comdev/shipyard',
author='Anthony Lin - AT&T',
author_email='al498u@att.com',
license='Apache 2.0',
packages=['shipyard_airflow', 'shipyard_airflow.control'],
entry_points={
"oslo.policy.policies":
["shipyard = shipyard.common.policies:list_rules"],
"oslo.config.opts": ["shipyard = shipyard.conf.opts:list_opts"]
},
install_requires=[
'falcon',
'requests',
'configparser',
'uwsgi>1.4',
'python-dateutil',
'oslo.config',
])
setuptools.setup(
setup_requires=['pbr>=2.0.0'],
pbr=True
)

View File

@ -1,17 +0,0 @@
import requests
from shipyard_airflow.errors import AirflowError
class AirflowClient(object):
def __init__(self, url):
self.url = url
def get(self):
response = requests.get(self.url).json()
# This gives us more freedom to handle the responses from airflow
if response["output"]["stderr"]:
raise AirflowError(response["output"]["stderr"])
else:
return response["output"]["stdout"]

View File

View File

@ -0,0 +1,250 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import keystoneauth1.loading as ks_loading
from oslo_config import cfg
from shipyard_airflow.conf.opts import ConfigSection
CONF = cfg.CONF
SECTIONS = [
ConfigSection(
name='base',
title='Base Configuration',
options=[
cfg.StrOpt(
'web_server',
default='http://localhost:32080',
help='The web server for Airflow'
),
cfg.StrOpt(
'postgresql_db',
default=(
'postgresql+psycopg2://shipyard:changeme'
'@postgresql.ucp:5432/shipyard'
),
help='The database for shipyard'
),
cfg.StrOpt(
'postgresql_airflow_db',
default=(
'postgresql+psycopg2://shipyard:changeme'
'@postgresql.ucp:5432/airflow'
),
help='The database for airflow'
),
cfg.StrOpt(
'alembic_ini_path',
default='/home/shipyard/shipyard',
help='The direcotry containing the alembic.ini file'
),
cfg.BoolOpt(
'upgrade_db',
default=True,
help='Upgrade the database on startup'
)
]
),
ConfigSection(
name='logging',
title='Logging Options',
options=[
cfg.IntOpt(
'log_level',
default=logging.DEBUG,
help=('The default logging level for the root logger. '
'ERROR=40, WARNING=30, INFO=20, DEBUG=10')
),
]
),
ConfigSection(
name='shipyard',
title='Shipyard connection info',
options=[
cfg.StrOpt(
'host',
default='shipyard-int.ucp',
help='FQDN for the shipyard service'
),
cfg.IntOpt(
'port',
default=9000,
help='Port for the shipyard service'
),
]
),
ConfigSection(
name='deckhand',
title='Deckhand connection info',
options=[
cfg.StrOpt(
'host',
default='deckhand-int.ucp',
help='FQDN for the deckhand service'
),
cfg.IntOpt(
'port',
default=80,
help='Port for the deckhand service'
),
]
),
ConfigSection(
name='armada',
title='Armada connection info',
options=[
cfg.StrOpt(
'host',
default='armada-int.ucp',
help='FQDN for the armada service'
),
cfg.IntOpt(
'port',
default=8000,
help='Port for the armada service'
),
]
),
ConfigSection(
name='drydock',
title='Drydock connection info',
options=[
cfg.StrOpt(
'host',
default='drydock-int.ucp',
help='FQDN for the drydock service'
),
cfg.IntOpt(
'port',
default=9000,
help='Port for the drydock service'
),
# TODO(Bryan Strassner) Remove this when integrated
cfg.StrOpt(
'token',
default='bigboss',
help='TEMPORARY: password for drydock'
),
# TODO(Bryan Strassner) Remove this when integrated
cfg.StrOpt(
'site_yaml',
default='/usr/local/airflow/plugins/drydock.yaml',
help='TEMPORARY: location of drydock yaml file'
),
# TODO(Bryan Strassner) Remove this when integrated
cfg.StrOpt(
'prom_yaml',
default='/usr/local/airflow/plugins/promenade.yaml',
help='TEMPORARY: location of promenade yaml file'
),
]
),
ConfigSection(
name='healthcheck',
title='Healthcheck connection info',
options=[
cfg.StrOpt(
'schema',
default='http',
help='Schema to perform health check with'
),
cfg.StrOpt(
'endpoint',
default='/api/v1.0/health',
help='Health check standard endpoint'
),
]
),
# TODO (Bryan Strassner) This section is in use by the operators we send
# to the airflow pod(s). Needs to be refactored out
# when those operators are updated.
ConfigSection(
name='keystone',
title='Keystone connection and credential information',
options=[
cfg.StrOpt(
'OS_AUTH_URL',
default='http://keystone-api.ucp:80/v3',
help='The url for OpenStack Authentication'
),
cfg.StrOpt(
'OS_PROJECT_NAME',
default='service',
help='OpenStack project name'
),
cfg.StrOpt(
'OS_USER_DOMAIN_NAME',
default='Default',
help='The OpenStack user domain name'
),
cfg.StrOpt(
'OS_USERNAME',
default='shipyard',
help='The OpenStack username'
),
cfg.StrOpt(
'OS_PASSWORD',
default='password',
help='THe OpenStack password for the shipyard svc acct'
),
cfg.StrOpt(
'OS_REGION_NAME',
default='Regionone',
help='The OpenStack user domain name'
),
cfg.IntOpt(
'OS_IDENTITY_API_VERSION',
default=3,
help='The OpenStack identity api version'
),
]
),
]
def register_opts(conf):
"""
Registers all the sections in this module.
"""
for section in SECTIONS:
conf.register_group(
cfg.OptGroup(name=section.name,
title=section.title,
help=section.help))
conf.register_opts(section.options, group=section.name)
# TODO (Bryan Strassner) is there a better, more general way to do this,
# or is password enough? Probably need some guidance
# from someone with more experience in this space.
conf.register_opts(
ks_loading.get_auth_plugin_conf_options('password'),
group='keystone_authtoken'
)
def list_opts():
return {
section.name: section.options for section in SECTIONS
}
def parse_args(args=None, usage=None, default_config_files=None):
CONF(args=args,
project='shipyard',
usage=usage,
default_config_files=default_config_files)
register_opts(CONF)

View File

@ -0,0 +1,89 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import importlib
import os
import pkgutil
LIST_OPTS_FUNC_NAME = "list_opts"
IGNORED_MODULES = ('opts', 'constants', 'utils')
CONFIG_PATH = 'shipyard_airflow.conf'
class ConfigSection(object):
"""
Defines a configuration section
"""
def __init__(self, name, title, options, help=None):
self.name = name
self.title = title
self.help = help
self.options = options
def _tupleize(dct):
"""Take the dict of options and convert to the 2-tuple format."""
return [(key, val) for key, val in dct.items()]
def list_opts():
"""Entry point used only in the context of sample file generation.
This is the single point of entry to generate the sample configuration
file. It collects all the necessary info from the other modules in this
package. It is assumed that:
* every other module in this package has a 'list_opts' function which
return a dict where
* the keys are strings which are the group names
* the value of each key is a list of config options for that group
* the {program}.conf package doesn't have further packages with config
options
"""
opts = collections.defaultdict(list)
module_names = _list_module_names()
imported_modules = _import_modules(module_names)
_append_config_options(imported_modules, opts)
return _tupleize(opts)
def _list_module_names():
module_names = []
package_path = os.path.dirname(os.path.abspath(__file__))
for _, modname, ispkg in pkgutil.iter_modules(path=[package_path]):
if modname in IGNORED_MODULES or ispkg:
continue
else:
module_names.append(modname)
return module_names
def _import_modules(module_names):
imported_modules = []
for modname in module_names:
mod = importlib.import_module(CONFIG_PATH + '.' + modname)
if not hasattr(mod, LIST_OPTS_FUNC_NAME):
msg = "The module '%s.%s' should have a '%s' "\
"function which returns the config options." % \
(CONFIG_PATH, modname, LIST_OPTS_FUNC_NAME)
raise Exception(msg)
else:
imported_modules.append(mod)
return imported_modules
def _append_config_options(imported_modules, config_options):
for mod in imported_modules:
configs = mod.list_opts()
for key, val in configs.items():
config_options[key].extend(val)

View File

@ -1,202 +0,0 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Single point of entry to generate the sample configuration file.
This module collects all the necessary info from the other modules in this
package. It is assumed that:
* Every other module in this package has a 'list_opts' function which
returns a dict where:
* The keys are strings which are the group names.
* The value of each key is a list of config options for that group.
* The conf package doesn't have further packages with config options.
* This module is only used in the context of sample file generation.
"""
import importlib
import os
import pkgutil
from oslo_config import cfg
import keystoneauth1.loading as loading
IGNORED_MODULES = ('shipyard', 'config')
if (os.path.exists('etc/shipyard/shipyard.conf')):
cfg.CONF(['--config-file', 'etc/shipyard/shipyard.conf'])
class ShipyardConfig(object):
"""
Initialize all the core options
"""
# Default options
options = [
cfg.IntOpt(
'poll_interval',
default=10,
help=[
'''Polling interval in seconds for checking subtask or
downstream status'''
]),
]
# Logging options
logging_options = [
cfg.StrOpt(
'log_level', default='INFO', help='Global log level for Shipyard'),
cfg.StrOpt(
'global_logger_name',
default='shipyard',
help='Logger name for the top-level logger'),
]
# Enabled plugins
plugin_options = [
cfg.MultiStrOpt(
'ingester',
default=['shipyard_airflow.ingester.plugins.yaml.YamlIngester'],
help='Module path string of a input ingester to enable'),
cfg.MultiStrOpt(
'oob_driver',
default=[
'shipyard_airflow.drivers.oob.pyghmi_driver.PyghmiDriver'
],
help='Module path string of a OOB driver to enable'),
cfg.StrOpt(
'node_driver',
default=[
'''shipyard_airflow.drivers.node.maasdriver.driver
.MaasNodeDriver'''
],
help='Module path string of the Node driver to enable'),
# TODO Network driver not yet implemented
cfg.StrOpt(
'network_driver',
default=None,
help='Module path string of the Network driver enable'),
]
# Timeouts for various tasks specified in minutes
timeout_options = [
cfg.IntOpt(
'shipyard_timeout',
default=5,
help='Fallback timeout when a specific one is not configured'),
cfg.IntOpt(
'create_network_template',
default=2,
help='Timeout in minutes for creating site network templates'),
cfg.IntOpt(
'configure_user_credentials',
default=2,
help='Timeout in minutes for creating user credentials'),
cfg.IntOpt(
'identify_node',
default=10,
help='Timeout in minutes for initial node identification'),
cfg.IntOpt(
'configure_hardware',
default=30,
help=[
'''Timeout in minutes for node commissioning and
hardware configuration'''
]),
cfg.IntOpt(
'apply_node_networking',
default=5,
help='Timeout in minutes for configuring node networking'),
cfg.IntOpt(
'apply_node_platform',
default=5,
help='Timeout in minutes for configuring node platform'),
cfg.IntOpt(
'deploy_node',
default=45,
help='Timeout in minutes for deploying a node'),
]
def __init__(self):
self.conf = cfg.CONF
def register_options(self):
self.conf.register_opts(ShipyardConfig.options)
self.conf.register_opts(
ShipyardConfig.logging_options, group='logging')
self.conf.register_opts(ShipyardConfig.plugin_options, group='plugins')
self.conf.register_opts(
ShipyardConfig.timeout_options, group='timeouts')
self.conf.register_opts(
loading.get_auth_plugin_conf_options('password'),
group='keystone_authtoken')
config_mgr = ShipyardConfig()
def list_opts():
opts = {
'DEFAULT': ShipyardConfig.options,
'logging': ShipyardConfig.logging_options,
'plugins': ShipyardConfig.plugin_options,
'timeouts': ShipyardConfig.timeout_options
}
package_path = os.path.dirname(os.path.abspath(__file__))
parent_module = ".".join(__name__.split('.')[:-1])
module_names = _list_module_names(package_path, parent_module)
imported_modules = _import_modules(module_names)
_append_config_options(imported_modules, opts)
# Assume we'll use the password plugin,
# so include those options in the configuration template
opts['keystone_authtoken'] = loading.get_auth_plugin_conf_options(
'password')
return _tupleize(opts)
def _tupleize(d):
"""Convert a dict of options to the 2-tuple format."""
return [(key, value) for key, value in d.items()]
def _list_module_names(pkg_path, parent_module):
module_names = []
for _, module_name, ispkg in pkgutil.iter_modules(path=[pkg_path]):
if module_name in IGNORED_MODULES:
# Skip this module.
continue
elif ispkg:
module_names.extend(
_list_module_names(pkg_path + "/" + module_name,
parent_module + "." + module_name))
else:
module_names.append(parent_module + "." + module_name)
return module_names
def _import_modules(module_names):
imported_modules = []
for module_name in module_names:
module = importlib.import_module(module_name)
if hasattr(module, 'list_opts'):
print("Pulling options from module %s" % module.__name__)
imported_modules.append(module)
return imported_modules
def _append_config_options(imported_modules, config_options):
for module in imported_modules:
configs = module.list_opts()
for key, val in configs.items():
if key not in config_options:
config_options[key] = val
else:
config_options[key].extend(val)

View File

@ -1,13 +0,0 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@ -0,0 +1,63 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Common methods for use by action api classes as necessary
"""
DAG_STATE_MAPPING = {
'QUEUED': 'Pending',
'RUNNING': 'Processing',
'SUCCESS': 'Complete',
'SHUTDOWN': 'Failed',
'FAILED': 'Failed',
'UP_FOR_RETRY': 'Processing',
'UPSTREAM_FAILED': 'Failed',
'SKIPPED': 'Failed',
'REMOVED': 'Failed',
'SCHEDULED': 'Pending',
'NONE': 'Pending',
'PAUSED': 'Paused'
}
def determine_lifecycle(dag_status=None):
"""
Convert a dag_status to an action_lifecycle value
"""
if dag_status is None:
dag_status = 'NONE'
return DAG_STATE_MAPPING.get(dag_status.upper())
def format_action_steps(action_id, steps):
"""
Converts a list of action step database records to desired format
"""
if not steps:
return []
steps_response = []
for idx, step in enumerate(steps):
steps_response.append(format_step(action_id=action_id,
step=step,
index=idx + 1))
return steps_response
def format_step(action_id, step, index):
"""
reformat a step (dictionary) into a common response format
"""
return {
'url': '/actions/{}/steps/{}'.format(action_id, step.get('task_id')),
'state': step.get('state'),
'id': step.get('task_id'),
'index': index
}

View File

@ -0,0 +1,330 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import datetime
import falcon
import requests
from requests.exceptions import RequestException
from dateutil.parser import parse
from oslo_config import cfg
import ulid
from shipyard_airflow import policy
from shipyard_airflow.control.action_helper import (determine_lifecycle,
format_action_steps)
from shipyard_airflow.control.base import BaseResource
from shipyard_airflow.control.json_schemas import ACTION
from shipyard_airflow.db.db import AIRFLOW_DB, SHIPYARD_DB
from shipyard_airflow.errors import ApiError
CONF = cfg.CONF
# Mappings of actions to dags
SUPPORTED_ACTION_MAPPINGS = {
# action : dag, validation
'deploy_site': {
'dag': 'deploy_site',
'validator': None
},
'update_site': {
'dag': 'update_site',
'validator': None
},
'redeploy_server': {
'dag': 'redeploy_sever',
# TODO (Bryan Strassner) This should have a validator method
# Needs to be revisited when defined
'validator': None
}
}
# /api/v1.0/actions
class ActionsResource(BaseResource):
"""
The actions resource represent the asyncrhonous invocations of shipyard
"""
@policy.ApiEnforcer('workflow_orchestrator:list_actions')
def on_get(self, req, resp, **kwargs):
"""
Return actions that have been invoked through shipyard.
:returns: a json array of action entities
"""
resp.body = self.to_json(self.get_all_actions())
resp.status = falcon.HTTP_200
self.info(req.context, 'response data is %s' % resp.body)
@policy.ApiEnforcer('workflow_orchestrator:create_action')
def on_post(self, req, resp, **kwargs):
"""
Accept an action into shipyard
"""
input_action = self.req_json(req, validate_json_schema=ACTION)
action = self.create_action(action=input_action, context=req.context)
self.info(req.context, "Id %s generated for action %s " %
(action['id'], action['name']))
# respond with the action and location for checking status
resp.status = falcon.HTTP_201
resp.body = self.to_json(action)
# TODO (Bryan Strassner) figure out the right way to do this:
resp.location = '/api/v1.0/actions/{}'.format(action['id'])
def create_action(self, action, context):
# use uuid assigned for this request as the id of the action.
action['id'] = ulid.ulid()
# the invoking user
action['user'] = context.user
# add current timestamp (UTC) to the action.
action['timestamp'] = str(datetime.utcnow())
# validate that action is supported.
self.info(context, "Attempting action: %s" % action['name'])
if action['name'] not in SUPPORTED_ACTION_MAPPINGS:
raise ApiError(
title='Unable to start action',
description='Unsupported Action: {}'.format(action['name']))
dag = SUPPORTED_ACTION_MAPPINGS.get(action['name'])['dag']
action['dag_id'] = dag
# populate action parameters if they are not set
if 'parameters' not in action:
action['parameters'] = {}
# validate if there is any validation to do
validator = SUPPORTED_ACTION_MAPPINGS.get(action['name'])['validator']
if validator is not None:
# validators will raise ApiError if they are not validated.
validator(action)
# invoke airflow, get the dag's date
dag_execution_date = self.invoke_airflow_dag(
dag_id=dag, action=action, context=context)
# set values on the action
action['dag_execution_date'] = dag_execution_date
action['dag_status'] = 'SCHEDULED'
# context_marker is the uuid from the request context
action['context_marker'] = context.request_id
# insert the action into the shipyard db
self.insert_action(action=action)
self.audit_control_command_db({
'id': ulid.ulid(),
'action_id': action['id'],
'command': 'invoke',
'user': context.user
})
return action
def get_all_actions(self):
"""
Interacts with airflow and the shipyard database to return the list of
actions invoked through shipyard.
"""
# fetch actions from the shipyard db
all_actions = self.get_action_map()
# fetch the associated dags, steps from the airflow db
all_dag_runs = self.get_dag_run_map()
all_tasks = self.get_all_tasks_db()
# correlate the actions and dags into a list of action entites
actions = []
for action_id, action in all_actions.items():
dag_key = action['dag_id'] + action['dag_execution_date']
dag_key_id = action['dag_id']
dag_key_date = action['dag_execution_date']
# locate the dag run associated
dag_state = all_dag_runs.get(dag_key, {}).get('state', None)
# get the dag status from the dag run state
action['dag_status'] = dag_state
action['action_lifecycle'] = determine_lifecycle(dag_state)
# get the steps summary
action_tasks = [
step for step in all_tasks
if step['dag_id'].startswith(dag_key_id) and
step['execution_date'].strftime(
'%Y-%m-%dT%H:%M:%S') == dag_key_date
]
action['steps'] = format_action_steps(action_id, action_tasks)
actions.append(action)
return actions
def get_action_map(self):
"""
maps an array of dictionaries to a dictonary of the same results by id
:returns: a dictionary of dictionaries keyed by action id
"""
return {action['id']: action for action in self.get_all_actions_db()}
def get_all_actions_db(self):
"""
Wrapper for call to the shipyard database to get all actions
:returns: a dictionary of dictionaries keyed by action id
"""
return SHIPYARD_DB.get_all_submitted_actions()
def get_dag_run_map(self):
"""
Maps an array of dag runs to a keyed dictionary
:returns: a dictionary of dictionaries keyed by dag_id and
execution_date
"""
return {
run['dag_id'] +
run['execution_date'].strftime('%Y-%m-%dT%H:%M:%S'): run
for run in self.get_all_dag_runs_db()
}
def get_all_dag_runs_db(self):
"""
Wrapper for call to the airflow db to get all dag runs
:returns: a dictionary of dictionaries keyed by dag_id and
execution_date
"""
return AIRFLOW_DB.get_all_dag_runs()
def get_all_tasks_db(self):
"""
Wrapper for call to the airflow db to get all tasks
:returns: a list of task dictionaries
"""
return AIRFLOW_DB.get_all_tasks()
def insert_action(self, action):
"""
Wrapper for call to the shipyard db to insert an action
"""
return SHIPYARD_DB.insert_action(action)
def audit_control_command_db(self, action_audit):
"""
Wrapper for the shipyard db call to record an audit of the
action control taken
"""
return SHIPYARD_DB.insert_action_command_audit(action_audit)
def invoke_airflow_dag(self, dag_id, action, context):
"""
Call airflow, and invoke a dag
:param dag_id: the name of the dag to invoke
:param action: the action structure to invoke the dag with
"""
# Retrieve URL
web_server_url = CONF.base.web_server
if 'Error' in web_server_url:
raise ApiError(
title='Unable to invoke workflow',
description=('Airflow URL not found by Shipyard. '
'Shipyard configuration is missing web_server '
'value'),
status=falcon.HTTP_503,
retry=True, )
else:
conf_value = {'action': action}
# "conf" - JSON string that gets pickled into the DagRun's
# conf attribute
req_url = ('{}admin/rest_api/api?api=trigger_dag&dag_id={}'
'&conf={}'.format(web_server_url,
dag_id, self.to_json(conf_value)))
try:
resp = requests.get(req_url, timeout=15)
self.info(context,
'Response code from Airflow trigger_dag: %s' %
resp.status_code)
resp.raise_for_status()
response = resp.json()
self.info(context,
'Response from Airflow trigger_dag: %s' %
response)
except (RequestException) as rex:
self.error(context, "Request to airflow failed: %s" % rex.args)
raise ApiError(
title='Unable to complete request to Airflow',
description=(
'Airflow could not be contacted properly by Shipyard.'
),
status=falcon.HTTP_503,
error_list=[{
'message': str(type(rex))
}],
retry=True, )
# Returns error response if API call returns
# response code other than 200
if response["http_response_code"] != 200:
raise ApiError(
title='Unable to invoke workflow',
description=(
'Airflow URL not found by Shipyard.',
'Shipyard configuration is missing web_server value'),
status=falcon.HTTP_503,
error_list=[{
'message': response['output']
}],
retry=True, )
else:
dag_time = self._exhume_date(dag_id,
response['output']['stdout'])
dag_execution_date = dag_time.strftime('%Y-%m-%dT%H:%M:%S')
return dag_execution_date
def _exhume_date(self, dag_id, log_string):
# we are unable to use the response time because that
# does not match the time when the dag was recorded.
# We have to parse the stdout returned to find the
# Created <DagRun {dag_id} @ {timestamp}
# e.g.
# ...- Created <DagRun deploy_site @ 2017-09-22 22:16:14: man...
# split on "Created <DagRun deploy_site @ ", then ': "
# should get to the desired date string.
#
# returns the date found in a date object
log_split = log_string.split('Created <DagRun {} @ '.format(dag_id))
if len(log_split) < 2:
raise ApiError(
title='Unable to determine if workflow has started',
description=(
'Airflow has not responded with parseable output. ',
'Shipyard is unable to determine run timestamp'),
status=falcon.HTTP_500,
error_list=[{
'message': log_string
}],
retry=True,
)
else:
# everything before the ': ' should be a date/time
date_split = log_split[1].split(': ')[0]
try:
return parse(date_split, ignoretz=True)
except ValueError as valerr:
raise ApiError(
title='Unable to determine if workflow has started',
description=(
'Airflow has not responded with parseable output. ',
'Shipyard is unable to determine run timestamp'),
status=falcon.HTTP_500,
error_list=[{
'message': 'value {} has caused {}'.format(date_split,
valerr)
}],
retry=True,
)

View File

@ -0,0 +1,129 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import falcon
import ulid
from shipyard_airflow import policy
from shipyard_airflow.control.base import BaseResource
from shipyard_airflow.db.db import AIRFLOW_DB, SHIPYARD_DB
from shipyard_airflow.db.errors import AirflowStateError
from shipyard_airflow.errors import ApiError
# /api/v1.0/actions/{action_id}/control/{control_verb}
class ActionsControlResource(BaseResource):
"""
The actions control resource allows for runtime control
"""
def __init__(self):
BaseResource.__init__(self)
self.controls = {
'pause': self.pause_dag,
'unpause': self.unpause_dag,
'stop': self.stop_dag
}
@policy.ApiEnforcer('workflow_orchestrator:invoke_action_control')
def on_post(self, req, resp, **kwargs):
"""
Returns that a control was recevied (202 response)
:returns: a no-body response
"""
self.handle_control(kwargs['action_id'],
kwargs['control_verb'],
req.context)
resp.status = falcon.HTTP_202
def handle_control(self, action_id, control_verb, context):
"""
Interacts with airflow to trigger a dag control
:returns: nothing
"""
action = self.get_action_db(action_id=action_id)
if action is None:
raise ApiError(
title='Action not found',
description='Unknown action {}'.format(action_id),
status=falcon.HTTP_404)
if control_verb in self.controls:
self.controls.get(control_verb)(
dag_id=action['dag_id'],
execution_date=action['dag_execution_date'])
self.audit_control_command_db({
'id': ulid.ulid(),
'action_id': action_id,
'command': control_verb,
'user': context.user
})
else:
raise ApiError(
title='Control not supported',
description='Unknown control {}'.format(control_verb),
status=falcon.HTTP_404)
def get_action_db(self, action_id):
"""
Wrapper for call to the shipyard database to get an action
:returns: a dictionary of action details.
"""
return SHIPYARD_DB.get_action_by_id(
action_id=action_id)
def audit_control_command_db(self, action_audit):
"""
Wrapper for the shipyard db call to record an audit of the
action control taken
"""
return SHIPYARD_DB.insert_action_command_audit(action_audit)
def pause_dag(self, dag_id, execution_date):
"""
Sets the pause flag on this dag/execution
"""
try:
AIRFLOW_DB.pause_dag_run(
dag_id=dag_id, execution_date=execution_date)
except AirflowStateError as state_error:
raise ApiError(
title='Unable to pause action',
description=state_error.message,
status=falcon.HTTP_409)
def unpause_dag(self, dag_id, execution_date):
"""
Clears the pause flag on this dag/execution
"""
try:
AIRFLOW_DB.unpause_dag_run(
dag_id=dag_id, execution_date=execution_date)
except AirflowStateError as state_error:
raise ApiError(
title='Unable to unpause action',
description=state_error.message,
status=falcon.HTTP_409)
def stop_dag(self, dag_id, execution_date):
"""
Sets the stop flag on this dag/execution
"""
try:
AIRFLOW_DB.stop_dag_run(
dag_id=dag_id, execution_date=execution_date)
except AirflowStateError as state_error:
raise ApiError(
title='Unable to stop action',
description=state_error.message,
status=falcon.HTTP_409)

View File

@ -0,0 +1,117 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import falcon
from shipyard_airflow import policy
from shipyard_airflow.control.action_helper import (determine_lifecycle,
format_action_steps)
from shipyard_airflow.control.base import BaseResource
from shipyard_airflow.db.db import AIRFLOW_DB, SHIPYARD_DB
from shipyard_airflow.errors import ApiError
# /api/v1.0/actions/{action_id}
class ActionsIdResource(BaseResource):
"""
The actions resource represent the asyncrhonous invocations of shipyard
"""
@policy.ApiEnforcer('workflow_orchestrator:get_action')
def on_get(self, req, resp, **kwargs):
"""
Return actions that have been invoked through shipyard.
:returns: a json array of action entities
"""
resp.body = self.to_json(self.get_action(kwargs['action_id']))
resp.status = falcon.HTTP_200
def get_action(self, action_id):
"""
Interacts with airflow and the shipyard database to return the
requested action invoked through shipyard.
"""
# get the action from shipyard db
action = self.get_action_db(action_id=action_id)
if action is None:
raise ApiError(
title='Action not found',
description='Unknown Action: {}'.format(action_id),
status=falcon.HTTP_404)
# lookup the dag and tasks based on the associated dag_id,
# execution_date
dag_id = action['dag_id']
dag_execution_date = action['dag_execution_date']
dag = self.get_dag_run_by_id(dag_id, dag_execution_date)
steps = self.get_tasks_db(dag_id, dag_execution_date)
if dag is not None:
# put the values together into an "action" object
action['dag_status'] = dag['state']
action['action_lifecycle'] = determine_lifecycle(dag['state'])
action['steps'] = format_action_steps(action_id, steps)
action['validations'] = self.get_validations_db(action_id)
action['command_audit'] = self.get_action_command_audit_db(action_id)
return action
def get_dag_run_by_id(self, dag_id, execution_date):
"""
Wrapper for call to the airflow db to get a dag_run
:returns: a dag run dictionary
"""
dag_run_list = self.get_dag_run_db(dag_id, execution_date)
# should be only one result, return the first one
if dag_run_list:
return dag_run_list[0]
else:
return None
def get_action_db(self, action_id):
"""
Wrapper for call to the shipyard database to get an action
:returns: a dictionary of action details.
"""
return SHIPYARD_DB.get_action_by_id(
action_id=action_id)
def get_validations_db(self, action_id):
"""
Wrapper for call to the shipyard db to get validations associated with
an action
:returns: an array of dictionaries of validation details.
"""
return SHIPYARD_DB.get_validation_by_action_id(
action_id=action_id)
def get_tasks_db(self, dag_id, execution_date):
"""
Wrapper for call to the airflow db to get all tasks
:returns: a list of task dictionaries
"""
return AIRFLOW_DB.get_tasks_by_id(
dag_id=dag_id, execution_date=execution_date)
def get_dag_run_db(self, dag_id, execution_date):
"""
Wrapper for call to the airflow db to get a dag_run
:returns: a dag run dictionaries
"""
return AIRFLOW_DB.get_dag_runs_by_id(
dag_id=dag_id, execution_date=execution_date)
def get_action_command_audit_db(self, action_id):
"""
Wrapper for call to the shipyard db to get the history of
action command audit records
"""
return SHIPYARD_DB.get_command_audit_by_action_id(action_id)

View File

@ -0,0 +1,84 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import falcon
from shipyard_airflow import policy
from shipyard_airflow.control.base import BaseResource
from shipyard_airflow.db.db import AIRFLOW_DB, SHIPYARD_DB
from shipyard_airflow.errors import ApiError
# /api/v1.0/actions/{action_id}/steps/{step_id}
class ActionsStepsResource(BaseResource):
"""
The actions steps resource is the steps of an action
"""
@policy.ApiEnforcer('workflow_orchestrator:get_action_step')
def on_get(self, req, resp, **kwargs):
"""
Return step details for an action step
:returns: a json object describing a step
"""
resp.body = self.to_json(
self.get_action_step(kwargs['action_id'], kwargs['step_id']))
resp.status = falcon.HTTP_200
def get_action_step(self, action_id, step_id):
"""
Interacts with airflow and the shipyard database to return the
requested step invoked through shipyard.
"""
action = self.get_action_db(action_id=action_id)
if action is None:
raise ApiError(
title='Action not found',
description='Unknown action {}'.format(action_id),
status=falcon.HTTP_404)
# resolve the ids for lookup of steps
dag_id = action['dag_id']
dag_execution_date = action['dag_execution_date']
# get the action steps from shipyard db
steps = self.get_tasks_db(dag_id, dag_execution_date)
for idx, step in enumerate(steps):
if step_id == step['task_id']:
# TODO (Bryan Strassner) more info about the step?
# like logs? Need requirements defined
step['index'] = idx + 1
return step
# if we didn't find it, 404
raise ApiError(
title='Step not found',
description='Unknown step {}'.format(step_id),
status=falcon.HTTP_404)
def get_action_db(self, action_id):
"""
Wrapper for call to the shipyard database to get an action
:returns: a dictionary of action details.
"""
return SHIPYARD_DB.get_action_by_id(
action_id=action_id)
def get_tasks_db(self, dag_id, execution_date):
"""
Wrapper for call to the airflow db to get all tasks for a dag run
:returns: a list of task dictionaries
"""
return AIRFLOW_DB.get_tasks_by_id(
dag_id=dag_id, execution_date=execution_date)

View File

@ -0,0 +1,77 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import falcon
from shipyard_airflow import policy
from shipyard_airflow.control.base import BaseResource
from shipyard_airflow.db.db import SHIPYARD_DB
from shipyard_airflow.errors import ApiError
# /api/v1.0/actions/{action_id}/validations/{validation_id}
class ActionsValidationsResource(BaseResource):
"""
The actions validations resource is the validtions of an action
"""
@policy.ApiEnforcer('workflow_orchestrator:get_action_validation')
def on_get(self, req, resp, **kwargs):
"""
Return validation details for an action validation
:returns: a json object describing a validation
"""
resp.body = self.to_json(
self.get_action_validation(kwargs['action_id'],
kwargs['validation_id']))
resp.status = falcon.HTTP_200
def get_action_validation(self, action_id, validation_id):
"""
Interacts with the shipyard database to return the requested
validation information
:returns: the validation dicitonary object
"""
action = self.get_action_db(action_id=action_id)
if action is None:
raise ApiError(
title='Action not found',
description='Unknown action {}'.format(action_id),
status=falcon.HTTP_404)
validation = self.get_validation_db(validation_id=validation_id)
if validation is not None:
return validation
# if we didn't find it, 404
raise ApiError(
title='Validation not found',
description='Unknown validation {}'.format(validation_id),
status=falcon.HTTP_404)
def get_action_db(self, action_id):
"""
Wrapper for call to the shipyard database to get an action
:returns: a dictionary of action details.
"""
return SHIPYARD_DB.get_action_by_id(
action_id=action_id)
def get_validation_db(self, validation_id):
"""
Wrapper for call to the shipyard database to get an action
:returns: a dictionary of action details.
"""
return SHIPYARD_DB.get_validation_by_id(
validation_id=validation_id)

View File

@ -11,14 +11,26 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import falcon
from shipyard_airflow.errors import AppError
from .regions import RegionsResource, RegionResource
from .base import ShipyardRequest, BaseResource
from .middleware import AuthMiddleware, ContextMiddleware, LoggingMiddleware
from .health import HealthResource
from shipyard_airflow.control.actions_api import ActionsResource
from shipyard_airflow.control.actions_control_api import ActionsControlResource
from shipyard_airflow.control.actions_id_api import ActionsIdResource
from shipyard_airflow.control.actions_steps_id_api import ActionsStepsResource
from shipyard_airflow.control.actions_validations_id_api import \
ActionsValidationsResource
from shipyard_airflow.control.base import BaseResource, ShipyardRequest
from shipyard_airflow.control.health import HealthResource
from shipyard_airflow.control.middleware import (AuthMiddleware,
ContextMiddleware,
LoggingMiddleware)
from shipyard_airflow.errors import (AppError, default_error_serializer,
default_exception_handler)
LOG = logging.getLogger(__name__)
def start_api():
middlewares = [
@ -34,23 +46,43 @@ def start_api():
# v1.0 of Shipyard API
v1_0_routes = [
# API for managing region data
('/regions', RegionsResource()),
('/regions/{region_id}', RegionResource()),
('/health', HealthResource()),
('/actions', ActionsResource()),
('/actions/{action_id}', ActionsIdResource()),
('/actions/{action_id}/control/{control_verb}',
ActionsControlResource()),
('/actions/{action_id}/steps/{step_id}',
ActionsStepsResource()),
('/actions/{action_id}/validations/{validation_id}',
ActionsValidationsResource()),
]
# Set up the 1.0 routes
route_v1_0_prefix = '/api/v1.0'
for path, res in v1_0_routes:
control_api.add_route('/api/v1.0' + path, res)
route = '{}{}'.format(route_v1_0_prefix, path)
LOG.info(
'Adding route: %s Handled by %s',
route,
res.__class__.__name__
)
control_api.add_route(route, res)
# Error handlers (FILO handling)
control_api.add_error_handler(Exception, default_exception_handler)
control_api.add_error_handler(AppError, AppError.handle)
# built-in error serializer
control_api.set_error_serializer(default_error_serializer)
return control_api
class VersionsResource(BaseResource):
authorized_roles = ['anyone']
"""
Lists the versions supported by this API
"""
def on_get(self, req, resp):
resp.body = json.dumps({
resp.body = self.to_json({
'v1.0': {
'path': '/api/v1.0',
'status': 'stable'

View File

@ -11,95 +11,89 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import falcon
import uuid
import json
import configparser
import os
import logging
import uuid
try:
from collections import OrderedDict
except ImportError:
OrderedDict = dict
import falcon
import falcon.request as request
import falcon.routing as routing
from shipyard_airflow.errors import (
AppError,
ERR_UNKNOWN,
)
from shipyard_airflow.control.json_schemas import validate_json
from shipyard_airflow.errors import InvalidFormatError
class BaseResource(object):
def __init__(self):
self.logger = logging.getLogger('shipyard.control')
def on_options(self, req, resp):
self_attrs = dir(self)
methods = ['GET', 'HEAD', 'POST', 'PUT', 'DELETE', 'PATCH']
allowed_methods = []
for m in methods:
if 'on_' + m.lower() in self_attrs:
allowed_methods.append(m)
resp.headers['Allow'] = ','.join(allowed_methods)
def on_options(self, req, resp, **kwargs):
"""
Handle options requests
"""
method_map = routing.create_http_method_map(self)
for method in method_map:
if method_map.get(method).__name__ != 'method_not_allowed':
resp.append_header('Allow', method)
resp.status = falcon.HTTP_200
def to_json(self, body_dict):
return json.dumps(body_dict)
def on_success(self, res, message=None):
res.status = falcon.HTTP_200
response_dict = OrderedDict()
response_dict['type'] = 'success'
response_dict['message'] = message
res.body = self.to_json(response_dict)
# Error Handling
def return_error(self, resp, status_code, message="", retry=False):
def req_json(self, req, validate_json_schema=None):
"""
Write a error message body and throw a Falcon exception to trigger
an HTTP status
:param resp: Falcon response object to update
:param status_code: Falcon status_code constant
:param message: Optional error message to include in the body
:param retry: Optional flag whether client should retry the operation.
Can ignore if we rely solely on 4XX vs 5xx status codes
Reads and returns the input json message, optionally validates against
a provided jsonschema
:param req: the falcon request object
:param validate_json_schema: the optional jsonschema to use for
validation
"""
resp.body = self.to_json(
{'type': 'error', 'message': message, 'retry': retry})
resp.content_type = 'application/json'
resp.status = status_code
# Get Config Data
def retrieve_config(self, section="", data=""):
# Shipyard config will be located at /etc/shipyard/shipyard.conf
path = '/etc/shipyard/shipyard.conf'
# Check that shipyard.conf exists
if os.path.isfile(path):
config = configparser.ConfigParser()
config.read(path)
# Retrieve data from shipyard.conf
query_data = config.get(section, data)
return query_data
has_input = False
if ((req.content_length is not None or req.content_length != 0) and
(req.content_type is not None and
req.content_type.lower() == 'application/json')):
raw_body = req.stream.read(req.content_length or 0)
if raw_body is not None:
has_input = True
self.info(req.context, 'Input message body: %s' % raw_body)
else:
self.info(req.context, 'No message body specified')
if has_input:
# read the json and validate if necessary
try:
raw_body = raw_body.decode('utf-8')
json_body = json.loads(raw_body)
if validate_json_schema:
# rasises an exception if it doesn't validate
validate_json(json_body, validate_json_schema)
return json_body
except json.JSONDecodeError as jex:
self.error(req.context, "Invalid JSON in request: \n%s" %
raw_body)
raise InvalidFormatError(
title='JSON could not be decoded',
description='%s: Invalid JSON in body: %s' %
(req.path, jex)
)
else:
raise AppError(ERR_UNKNOWN, "Missing Configuration File")
# No body passed as input. Fail validation if it was asekd for
if validate_json_schema is not None:
raise InvalidFormatError(
title='Json body is required',
description='%s: Bad input, no body provided' %
(req.path)
)
else:
return None
def error(self, ctx, msg):
self.log_error(ctx, logging.ERROR, msg)
def to_json(self, body_dict):
"""
Thin wrapper around json.dumps, providing the default=str config
"""
return json.dumps(body_dict, default=str)
def info(self, ctx, msg):
self.log_error(ctx, logging.INFO, msg)
def log_error(self, ctx, level, msg):
extra = {
'user': 'N/A',
'req_id': 'N/A',
'external_ctx': 'N/A'
}
def log_message(self, ctx, level, msg):
"""
Logs a message with context, and extra populated.
"""
extra = {'user': 'N/A', 'req_id': 'N/A', 'external_ctx': 'N/A'}
if ctx is not None:
extra = {
@ -108,8 +102,36 @@ class BaseResource(object):
'external_ctx': ctx.external_marker,
}
class ShipyardRequestContext(object):
self.logger.log(level, msg, extra=extra)
def debug(self, ctx, msg):
"""
Debug logger for resources, incorporating context.
"""
self.log_message(ctx, logging.DEBUG, msg)
def info(self, ctx, msg):
"""
Info logger for resources, incorporating context.
"""
self.log_message(ctx, logging.INFO, msg)
def warn(self, ctx, msg):
"""
Warn logger for resources, incorporating context.
"""
self.log_message(ctx, logging.WARN, msg)
def error(self, ctx, msg):
"""
Error logger for resources, incorporating context.
"""
self.log_message(ctx, logging.ERROR, msg)
class ShipyardRequestContext(object):
"""
Context object for shipyard resource requests
"""
def __init__(self):
self.log_level = 'error'
self.user = None
@ -123,7 +145,6 @@ class ShipyardRequestContext(object):
self.project_domain_id = None # Domain owning project
self.is_admin_project = False
self.authenticated = False
self.request_id = str(uuid.uuid4())
def set_log_level(self, level):
if level in ['error', 'info', 'debug']:
@ -142,8 +163,7 @@ class ShipyardRequestContext(object):
self.roles.extend(roles)
def remove_role(self, role):
self.roles = [x for x in self.roles
if x != role]
self.roles = [x for x in self.roles if x != role]
def set_external_marker(self, marker):
self.external_marker = marker
@ -163,5 +183,6 @@ class ShipyardRequestContext(object):
return policy_dict
class ShipyardRequest(falcon.request.Request):
class ShipyardRequest(request.Request):
context_type = ShipyardRequestContext

View File

@ -15,9 +15,14 @@ import falcon
from shipyard_airflow.control.base import BaseResource
class HealthResource(BaseResource):
# Return empty response/body to show
# that shipyard is healthy
class HealthResource(BaseResource):
"""
Return empty response/body to show
that shipyard is healthy
"""
def on_get(self, req, resp):
"""
It really does nothing right now. It may do more later
"""
resp.status = falcon.HTTP_204

View File

@ -0,0 +1,126 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Contains the json schemas for the REST interface, and provides the functions
to validate against the schemas.
see: http://json-schema.org
see: https://pypi.python.org/pypi/jsonschema
"""
import json
import logging
from jsonschema import validate
from jsonschema.exceptions import FormatError, SchemaError, ValidationError
from shipyard_airflow.errors import AppError, InvalidFormatError
def validate_json(json_string, schema):
"""
invokes the validate function of jsonschema
"""
schema_dict = json.loads(schema)
schema_title = schema_dict['title']
try:
validate(json_string, schema_dict)
except ValidationError as err:
title = 'JSON validation failed: {}'.format(err.message)
description = 'Failed validator: {} : {}'.format(
err.validator,
err.validator_value
)
logging.error(title)
logging.error(description)
raise InvalidFormatError(
title=title,
description=description,
)
except SchemaError as err:
title = 'SchemaError: Unable to validate JSON: {}'.format(err)
description = 'Invalid Schema: {}'.format(schema_title)
logging.error(title)
logging.error(description)
raise AppError(
title=title,
description=description
)
except FormatError as err:
title = 'FormatError: Unable to validate JSON: {}'.format(err)
description = 'Invalid Format: {}'.format(schema_title)
logging.error(title)
logging.error(description)
raise AppError(
title=title,
description=description
)
# The action resource structure
ACTION = '''
{
"title": "Action schema",
"type" : "object",
"properties" : {
"id" : {"type" : "string"},
"name" : {"type" : "string"},
"parameters" : {"type" : "object"},
"user" : {"type" : "string"},
"time" : {"type" : "string"},
"actionStatus" : {
"enum" : [
"Pending",
"Validation Failed",
"Processing",
"Complete",
"Failed"
]
},
"dagStatus" : {"type" : "string"},
"validations" : {
"type" : "array",
"items" : {
"type" : "object",
"properties" : {
"id" : {"type" : "string"},
"status" : {
"enum" : [
"Passed",
"Failed",
"Pending"
]
}
}
}
},
"steps" : {
"type" : "array",
"items" : {
"type" : "object",
"properties" : {
"id" : {"type" : "string"},
"status" : {
"enum" : [
"Pending",
"Processing",
"Complete",
"Failed"
]
}
}
}
}
},
"required" : ["name"]
}
'''

View File

@ -12,8 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from uuid import UUID
from oslo_utils import uuidutils
from shipyard_airflow import policy
@ -73,28 +74,27 @@ class AuthMiddleware(object):
ctx.is_admin_project = False
self.logger.debug(
'Request from authenticated user %s with roles %s' %
(ctx.user, ','.join(ctx.roles)))
'Request from authenticated user %s with roles %s',
ctx.user, ','.join(ctx.roles)
)
else:
ctx.authenticated = False
class ContextMiddleware(object):
def __init__(self):
# Setup validation pattern for external marker
try:
uuid_value = uuidutils.generate_uuid(dashed=True)
UUID(uuid_value)
except:
self.logger.error('UUID generation fail')
"""
Handle looking at the X-Context_Marker to see if it has value and that
value is a UUID (or close enough). If not, generate one.
"""
def process_request(self, req, resp):
ctx = req.context
ext_marker = req.get_header('X-Context-Marker')
if ext_marker is not None and self.marker_re.fullmatch(ext_marker):
if ext_marker is not None and uuidutils.is_uuid_like(ext_marker):
# external passed in an ok context marker
ctx.set_external_marker(ext_marker)
else:
# use the request id
ctx.set_external_marker(ctx.request_id)
class LoggingMiddleware(object):
@ -111,4 +111,11 @@ class LoggingMiddleware(object):
}
resp.append_header('X-Shipyard-Req', ctx.request_id)
self.logger.info("%s - %s" % (req.uri, resp.status), extra=extra)
self.logger.info('%s %s - %s',
req.method,
req.uri,
resp.status,
extra=extra)
self.logger.debug('Response body:\n%s',
resp.body,
extra=extra)

View File

@ -1,320 +0,0 @@
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
[base]
web_server=http://localhost:32080
postgresql_db = postgresql+psycopg2://postgresql.ucp:5432/shipyard
postgresql_airflow_db = postgresql+psycopg2://postgresql.ucp:5432/airflow
[shipyard]
host=shipyard-int.ucp
port=9000
[deckhand]
host=deckhand-api.ucp
port=80
[armada]
host=armada-api.ucp
port=8000
[drydock]
host=drydock-api.ucp
port=9000
token=bigboss
site_yaml=/usr/local/airflow/plugins/drydock.yaml
prom_yaml=/usr/local/airflow/plugins/promenade.yaml
[keystone]
OS_AUTH_URL=http://keystone-api.ucp:80/v3
OS_PROJECT_NAME=service
OS_USER_DOMAIN_NAME=Default
OS_USERNAME=shipyard
OS_PASSWORD=password
OS_REGION_NAME=RegionOne
OS_IDENTITY_API_VERSION=3
[healthcheck]
schema=http
endpoint=/api/v1.0/health
[keystone_authtoken]
#
# From keystonemiddleware.auth_token
#
# Complete "public" Identity API endpoint. This endpoint should not be an
# "admin" endpoint, as it should be accessible by all end users. Unauthenticated
# clients are redirected to this endpoint to authenticate. Although this
# endpoint should  ideally be unversioned, client support in the wild varies.
# If you're using a versioned v2 endpoint here, then this  should *not* be the
# same endpoint the service user utilizes  for validating tokens, because normal
# end users may not be  able to reach that endpoint. (string value)
# from .keystone_authtoken.keystonemiddleware.auth_token.auth_uri
auth_uri = http://keystone-api.openstack:80/v3
# API version of the admin Identity API endpoint. (string value)
# from .keystone_authtoken.keystonemiddleware.auth_token.auth_version
#auth_version = <None>
# Do not handle authorization requests within the middleware, but delegate the
# authorization decision to downstream WSGI components. (boolean value)
# from .keystone_authtoken.keystonemiddleware.auth_token.delay_auth_decision
delay_auth_decision = true
# Request timeout value for communicating with Identity API server. (integer
# value)
# from .keystone_authtoken.keystonemiddleware.auth_token.http_connect_timeout
#http_connect_timeout = <None>
# How many times are we trying to reconnect when communicating with Identity API
# Server. (integer value)
# from .keystone_authtoken.keystonemiddleware.auth_token.http_request_max_retries
#http_request_max_retries = 3
# Request environment key where the Swift cache object is stored. When
# auth_token middleware is deployed with a Swift cache, use this option to have
# the middleware share a caching backend with swift. Otherwise, use the
# ``memcached_servers`` option instead. (string value)
# from .keystone_authtoken.keystonemiddleware.auth_token.cache
#cache = <None>
# Required if identity server requires client certificate (string value)
# from .keystone_authtoken.keystonemiddleware.auth_token.certfile
#certfile = <None>
# Required if identity server requires client certificate (string value)
# from .keystone_authtoken.keystonemiddleware.auth_token.keyfile
#keyfile = <None>
# A PEM encoded Certificate Authority to use when verifying HTTPs connections.
# Defaults to system CAs. (string value)
# from .keystone_authtoken.keystonemiddleware.auth_token.cafile
#cafile = <None>
# Verify HTTPS connections. (boolean value)
# from .keystone_authtoken.keystonemiddleware.auth_token.insecure
#insecure = false
# The region in which the identity server can be found. (string value)
# from .keystone_authtoken.keystonemiddleware.auth_token.region_name
#region_name = <None>
# Directory used to cache files related to PKI tokens. (string value)
# from .keystone_authtoken.keystonemiddleware.auth_token.signing_dir
#signing_dir = <None>
# Optionally specify a list of memcached server(s) to use for caching. If left
# undefined, tokens will instead be cached in-process. (list value)
# Deprecated group/name - [keystone_authtoken]/memcache_servers
# from .keystone_authtoken.keystonemiddleware.auth_token.memcached_servers
#memcached_servers = <None>
# In order to prevent excessive effort spent validating tokens, the middleware
# caches previously-seen tokens for a configurable duration (in seconds). Set to
# -1 to disable caching completely. (integer value)
# from .keystone_authtoken.keystonemiddleware.auth_token.token_cache_time
#token_cache_time = 300
# Determines the frequency at which the list of revoked tokens is retrieved from
# the Identity service (in seconds). A high number of revocation events combined
# with a low cache duration may significantly reduce performance. Only valid for
# PKI tokens. (integer value)
# from .keystone_authtoken.keystonemiddleware.auth_token.revocation_cache_time
#revocation_cache_time = 10
# (Optional) If defined, indicate whether token data should be authenticated or
# authenticated and encrypted. If MAC, token data is authenticated (with HMAC)
# in the cache. If ENCRYPT, token data is encrypted and authenticated in the
# cache. If the value is not one of these options or empty, auth_token will
# raise an exception on initialization. (string value)
# Allowed values: None, MAC, ENCRYPT
# from .keystone_authtoken.keystonemiddleware.auth_token.memcache_security_strategy
#memcache_security_strategy = None
# (Optional, mandatory if memcache_security_strategy is defined) This string is
# used for key derivation. (string value)
# from .keystone_authtoken.keystonemiddleware.auth_token.memcache_secret_key
#memcache_secret_key = <None>
# (Optional) Number of seconds memcached server is considered dead before it is
# tried again. (integer value)
# from .keystone_authtoken.keystonemiddleware.auth_token.memcache_pool_dead_retry
#memcache_pool_dead_retry = 300
# (Optional) Maximum total number of open connections to every memcached server.
# (integer value)
# from .keystone_authtoken.keystonemiddleware.auth_token.memcache_pool_maxsize
#memcache_pool_maxsize = 10
# (Optional) Socket timeout in seconds for communicating with a memcached
# server. (integer value)
# from .keystone_authtoken.keystonemiddleware.auth_token.memcache_pool_socket_timeout
#memcache_pool_socket_timeout = 3
# (Optional) Number of seconds a connection to memcached is held unused in the
# pool before it is closed. (integer value)
# from .keystone_authtoken.keystonemiddleware.auth_token.memcache_pool_unused_timeout
#memcache_pool_unused_timeout = 60
# (Optional) Number of seconds that an operation will wait to get a memcached
# client connection from the pool. (integer value)
# from .keystone_authtoken.keystonemiddleware.auth_token.memcache_pool_conn_get_timeout
#memcache_pool_conn_get_timeout = 10
# (Optional) Use the advanced (eventlet safe) memcached client pool. The
# advanced pool will only work under python 2.x. (boolean value)
# from .keystone_authtoken.keystonemiddleware.auth_token.memcache_use_advanced_pool
#memcache_use_advanced_pool = false
# (Optional) Indicate whether to set the X-Service-Catalog header. If False,
# middleware will not ask for service catalog on token validation and will not
# set the X-Service-Catalog header. (boolean value)
# from .keystone_authtoken.keystonemiddleware.auth_token.include_service_catalog
#include_service_catalog = true
# Used to control the use and type of token binding. Can be set to: "disabled"
# to not check token binding. "permissive" (default) to validate binding
# information if the bind type is of a form known to the server and ignore it if
# not. "strict" like "permissive" but if the bind type is unknown the token will
# be rejected. "required" any form of token binding is needed to be allowed.
# Finally the name of a binding method that must be present in tokens. (string
# value)
# from .keystone_authtoken.keystonemiddleware.auth_token.enforce_token_bind
#enforce_token_bind = permissive
# If true, the revocation list will be checked for cached tokens. This requires
# that PKI tokens are configured on the identity server. (boolean value)
# from .keystone_authtoken.keystonemiddleware.auth_token.check_revocations_for_cached
#check_revocations_for_cached = false
# Hash algorithms to use for hashing PKI tokens. This may be a single algorithm
# or multiple. The algorithms are those supported by Python standard
# hashlib.new(). The hashes will be tried in the order given, so put the
# preferred one first for performance. The result of the first hash will be
# stored in the cache. This will typically be set to multiple values only while
# migrating from a less secure algorithm to a more secure one. Once all the old
# tokens are expired this option should be set to a single value for better
# performance. (list value)
# from .keystone_authtoken.keystonemiddleware.auth_token.hash_algorithms
#hash_algorithms = md5
# Authentication type to load (string value)
# Deprecated group/name - [keystone_authtoken]/auth_plugin
# from .keystone_authtoken.keystonemiddleware.auth_token.auth_type
auth_type = password
# Config Section from which to load plugin specific options (string value)
# from .keystone_authtoken.keystonemiddleware.auth_token.auth_section
auth_section = keystone_authtoken
#
# From shipyard_orchestrator
#
# Authentication URL (string value)
# from .keystone_authtoken.shipyard_orchestrator.auth_url
auth_url = http://keystone-api.openstack:80/v3
# Domain ID to scope to (string value)
# from .keystone_authtoken.shipyard_orchestrator.domain_id
#domain_id = <None>
# Domain name to scope to (string value)
# from .keystone_authtoken.shipyard_orchestrator.domain_name
#domain_name = <None>
# Project ID to scope to (string value)
# Deprecated group/name - [keystone_authtoken]/tenant-id
# from .keystone_authtoken.shipyard_orchestrator.project_id
#project_id = <None>
# Project name to scope to (string value)
# Deprecated group/name - [keystone_authtoken]/tenant-name
# from .keystone_authtoken.shipyard_orchestrator.project_name
project_name = service
# Domain ID containing project (string value)
# from .keystone_authtoken.shipyard_orchestrator.project_domain_id
#project_domain_id = <None>
# Domain name containing project (string value)
# from .keystone_authtoken.shipyard_orchestrator.project_domain_name
project_domain_name = default
# Trust ID (string value)
# from .keystone_authtoken.shipyard_orchestrator.trust_id
#trust_id = <None>
# Optional domain ID to use with v3 and v2 parameters. It will be used for both
# the user and project domain in v3 and ignored in v2 authentication. (string
# value)
# from .keystone_authtoken.shipyard_orchestrator.default_domain_id
#default_domain_id = <None>
# Optional domain name to use with v3 API and v2 parameters. It will be used for
# both the user and project domain in v3 and ignored in v2 authentication.
# (string value)
# from .keystone_authtoken.shipyard_orchestrator.default_domain_name
#default_domain_name = <None>
# User id (string value)
# from .keystone_authtoken.shipyard_orchestrator.user_id
#user_id = <None>
# Username (string value)
# Deprecated group/name - [keystone_authtoken]/user-name
# from .keystone_authtoken.shipyard_orchestrator.username
username = shipyard
# User's domain id (string value)
# from .keystone_authtoken.shipyard_orchestrator.user_domain_id
#user_domain_id = <None>
# User's domain name (string value)
# from .keystone_authtoken.shipyard_orchestrator.user_domain_name
user_domain_name = default
# User's password (string value)
# from .keystone_authtoken.shipyard_orchestrator.password
password = password
[oslo_policy]
#
# From oslo.policy
#
# The file that defines policies. (string value)
# Deprecated group/name - [DEFAULT]/policy_file
# from .oslo_policy.oslo.policy.policy_file
#policy_file = policy.json
# Default rule. Enforced when a requested rule is not found. (string value)
# Deprecated group/name - [DEFAULT]/policy_default_rule
# from .oslo_policy.oslo.policy.policy_default_rule
#policy_default_rule = default
# Directories where policy configuration files are stored. They can be relative
# to any directory in the search path defined by the config_dir option, or
# absolute paths. The file defined by policy_file must exist for these
# directories to be searched.  Missing or empty directories are ignored. (multi
# valued)
# Deprecated group/name - [DEFAULT]/policy_dirs
# from .oslo_policy.oslo.policy.policy_dirs (multiopt)
#policy_dirs = policy.d

View File

View File

@ -0,0 +1,234 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Airflow database access - see db.py for instances to use
"""
import sqlalchemy
from oslo_config import cfg
from shipyard_airflow.db.common_db import DbAccess
from shipyard_airflow.db.errors import AirflowStateError
CONF = cfg.CONF
class AirflowDbAccess(DbAccess):
"""
Airflow database access
WARNING: This is a large set of assumptions based on the way airflow
arranges its database and are subject to change with airflow future
releases - i.e. we're leveraging undocumented/non-exposed interfaces
for airflow to work around lack of API and feature functionality.
"""
SELECT_ALL_DAG_RUNS = sqlalchemy.sql.text('''
SELECT
"dag_id",
"execution_date",
"state",
"run_id",
"external_trigger",
"start_date",
"end_date"
FROM
dag_run
''')
SELECT_DAG_RUNS_BY_ID = sqlalchemy.sql.text('''
SELECT
"dag_id",
"execution_date",
"state",
"run_id",
"external_trigger",
"start_date",
"end_date"
FROM
dag_run
WHERE
dag_id = :dag_id
AND
execution_date = :execution_date
''')
SELECT_ALL_TASKS = sqlalchemy.sql.text('''
SELECT
"task_id",
"dag_id",
"execution_date",
"start_date",
"end_date",
"duration",
"state",
"try_number",
"operator",
"queued_dttm"
FROM
task_instance
ORDER BY
priority_weight desc,
start_date
''')
SELECT_TASKS_BY_ID = sqlalchemy.sql.text('''
SELECT
"task_id",
"dag_id",
"execution_date",
"start_date",
"end_date",
"duration",
"state",
"try_number",
"operator",
"queued_dttm"
FROM
task_instance
WHERE
dag_id LIKE :dag_id
AND
execution_date = :execution_date
ORDER BY
priority_weight desc,
start_date
''')
UPDATE_DAG_RUN_STATUS = sqlalchemy.sql.text('''
UPDATE
dag_run
SET
state = :state
WHERE
dag_id = :dag_id
AND
execution_date = :execution_date
''')
def __init__(self):
DbAccess.__init__(self)
def get_connection_string(self):
"""
Returns the connection string for this db connection
"""
return CONF.base.postgresql_airflow_db
def get_all_dag_runs(self):
"""
Retrieves all dag runs.
"""
return self.get_as_dict_array(AirflowDbAccess.SELECT_ALL_DAG_RUNS)
def get_dag_runs_by_id(self, dag_id, execution_date):
"""
Retrieves dag runs by dag id and execution date
"""
return self.get_as_dict_array(
AirflowDbAccess.SELECT_DAG_RUNS_BY_ID,
dag_id=dag_id,
execution_date=execution_date)
def get_all_tasks(self):
"""
Retrieves all tasks.
"""
return self.get_as_dict_array(AirflowDbAccess.SELECT_ALL_TASKS)
def get_tasks_by_id(self, dag_id, execution_date):
"""
Retrieves tasks by dag id and execution date
"""
return self.get_as_dict_array(
AirflowDbAccess.SELECT_TASKS_BY_ID,
dag_id=dag_id + '%',
execution_date=execution_date)
def stop_dag_run(self, dag_id, execution_date):
"""
Triggers an update to set a dag_run to failed state
causing dag_run to be stopped
running -> failed
"""
self._control_dag_run(
dag_id=dag_id,
execution_date=execution_date,
expected_state='running',
desired_state='failed')
def pause_dag_run(self, dag_id, execution_date):
"""
Triggers an update to set a dag_run to paused state
causing dag_run to be paused
running -> paused
"""
self._control_dag_run(
dag_id=dag_id,
execution_date=execution_date,
expected_state='running',
desired_state='paused')
def unpause_dag_run(self, dag_id, execution_date):
"""
Triggers an update to set a dag_run to running state
causing dag_run to be unpaused
paused -> running
"""
self._control_dag_run(
dag_id=dag_id,
execution_date=execution_date,
expected_state='paused',
desired_state='running')
def check_dag_run_state(self, dag_id, execution_date, expected_state):
"""
Examines a dag_run for state. Throws execption if it's not right
"""
dag_run_list = self.get_dag_runs_by_id(
dag_id=dag_id, execution_date=execution_date)
if dag_run_list:
dag_run = dag_run_list[0]
if dag_run['state'] != expected_state:
raise AirflowStateError(
message='dag_run state must be running, but is {}'.format(
dag_run['state']))
else:
# not found
raise AirflowStateError(message='dag_run does not exist')
def _control_dag_run(self, dag_id, execution_date, expected_state,
desired_state):
"""
checks a dag_run's state for expected state, and sets it to the
desired state
"""
self.check_dag_run_state(
dag_id=dag_id,
execution_date=execution_date,
expected_state=expected_state)
self._set_dag_run_state(
state=desired_state, dag_id=dag_id, execution_date=execution_date)
def _set_dag_run_state(self, state, dag_id, execution_date):
"""
Sets a dag run to the specified state.
WARNING: this assumes that airflow works by reading state from the
dag_run table dynamically, is not caching results, and doesn't
start to use the states we're using in a new way.
"""
self.perform_insert(
AirflowDbAccess.UPDATE_DAG_RUN_STATUS,
state=state,
dag_id=dag_id,
execution_date=execution_date)

View File

@ -0,0 +1,121 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import sqlalchemy
from shipyard_airflow.errors import DatabaseError
LOG = logging.getLogger(__name__)
class DbAccess:
"""
Base class for simple database access
"""
def __init__(self):
self.engine = None
def get_connection_string(self):
"""
Override to return the connection string. This allows for
lazy initialization
"""
raise NotImplementedError()
def update_db(self):
"""
Unimplemented method for use in overriding to peform db updates
"""
LOG.info('No databse version updates specified for %s',
self.__class__.__name__)
def get_engine(self):
"""
Returns the engine for airflow
"""
try:
connection_string = self.get_connection_string()
if connection_string is not None and self.engine is None:
self.engine = sqlalchemy.create_engine(connection_string)
if self.engine is None:
self._raise_invalid_db_config(
connection_string=connection_string
)
LOG.info('Connected with <%s>, returning engine',
connection_string)
return self.engine
except sqlalchemy.exc.ArgumentError as exc:
self._raise_invalid_db_config(
exception=exc,
connection_string=connection_string
)
def _raise_invalid_db_config(self,
connection_string,
exception=None):
"""
Common handler for an invalid DB connection
"""
LOG.error('Connection string <%s> prevents database operation',
connection_string)
if exception is not None:
LOG.error("Associated exception: %s", exception)
raise DatabaseError(
title='No database connection',
description='Invalid database configuration'
)
def get_as_dict_array(self, query, **kwargs):
"""
executes the supplied query and returns the array of dictionaries of
the row results
"""
LOG.info('Query: %s', query)
result_dict_list = []
if query is not None:
with self.get_engine().connect() as connection:
result_set = connection.execute(query, **kwargs)
result_dict_list = [dict(row) for row in result_set]
LOG.info('Result has %s rows', len(result_dict_list))
for dict_row in result_dict_list:
LOG.info('Result: %s', dict_row)
return result_dict_list
def perform_insert(self, query, **kwargs):
"""
Performs a parameterized insert
"""
self.perform_change_dml(query, **kwargs)
def perform_update(self, query, **kwargs):
"""
Performs a parameterized update
"""
self.perform_change_dml(query, **kwargs)
def perform_delete(self, query, **kwargs):
"""
Performs a parameterized delete
"""
self.perform_change_dml(query, **kwargs)
def perform_change_dml(self, query, **kwargs):
"""
Performs an update/insert/delete
"""
LOG.debug('Query: %s', query)
if query is not None:
with self.get_engine().connect() as connection:
connection.execute(query, **kwargs)

View File

@ -11,17 +11,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#############################################################################
#
# services.yaml - Definitions of server hardware layout
#
#############################################################################
# version the schema in this file so consumers can rationally parse it
---
#
# Is this where I include a list of files per service ?
#
#
# Assuming something like this works for the insertion
"""
The Application scope instances of db access classes
"""
from shipyard_airflow.db import airflow_db, shipyard_db
imports:
SHIPYARD_DB = shipyard_db.ShipyardDbAccess()
AIRFLOW_DB = airflow_db.AirflowDbAccess()

View File

@ -11,19 +11,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import falcon
from .base import BaseResource
from shipyard_airflow import policy
class RegionsResource(BaseResource):
@policy.ApiEnforcer('workflow_orchestrator:get_regions')
def on_get(self, req, resp):
resp.status = falcon.HTTP_200
class RegionResource(BaseResource):
@policy.ApiEnforcer('workflow_orchestrator:get_regions')
def on_get(self, req, resp, region_id):
resp.status = falcon.HTTP_200
class AirflowStateError(Exception):
def __init__(self, message=""):
"""
An error to convey that an attempt to modify airflow data cannot
be accomplished due to existing state.
:param message: Optional message for consumer
"""
self.message = message

View File

@ -0,0 +1,254 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Shipyard database access - see db.py for instances to use
"""
import json
import logging
import os
import sqlalchemy
from alembic import command as alembic_command
from alembic.config import Config
from oslo_config import cfg
from shipyard_airflow.db.common_db import DbAccess
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class ShipyardDbAccess(DbAccess):
"""
Shipyard database access
"""
SELECT_ALL_ACTIONS = sqlalchemy.sql.text('''
SELECT
"id",
"name",
"parameters",
"dag_id",
"dag_execution_date",
"user",
"datetime",
"context_marker"
FROM
actions
''')
SELECT_ACTION_BY_ID = sqlalchemy.sql.text('''
SELECT
"id",
"name",
"parameters",
"dag_id",
"dag_execution_date",
"user",
"datetime",
"context_marker"
FROM
actions
WHERE
id = :action_id
''')
INSERT_ACTION = sqlalchemy.sql.text('''
INSERT INTO
actions (
"id",
"name",
"parameters",
"dag_id",
"dag_execution_date",
"user",
"datetime",
"context_marker"
)
VALUES (
:id,
:name,
:parameters,
:dag_id,
:dag_execution_date,
:user,
:timestamp,
:context_marker )
''')
SELECT_VALIDATIONS = sqlalchemy.sql.text('''
SELECT
"id",
"action_id",
"validation_name"
FROM
preflight_validation_failures
''')
SELECT_VALIDATION_BY_ID = sqlalchemy.sql.text('''
SELECT
"id",
"action_id",
"validation_name",
"details"
FROM
preflight_validation_failures
WHERE
id = :validation_id
''')
SELECT_VALIDATION_BY_ACTION_ID = sqlalchemy.sql.text('''
SELECT
"id",
"action_id",
"validation_name",
"details"
FROM
preflight_validation_failures
WHERE
action_id = :action_id
''')
SELECT_CMD_AUDIT_BY_ACTION_ID = sqlalchemy.sql.text('''
SELECT
"id",
"action_id",
"command",
"user",
"datetime"
FROM
action_command_audit
WHERE
action_id = :action_id
''')
INSERT_ACTION_COMMAND_AUDIT = sqlalchemy.sql.text('''
INSERT INTO
action_command_audit (
"id",
"action_id",
"command",
"user"
)
VALUES (
:id,
:action_id,
:command,
:user )
''')
def __init__(self):
DbAccess.__init__(self)
def get_connection_string(self):
"""
Returns the connection string for this db connection
"""
return CONF.base.postgresql_db
def update_db(self):
"""
Trigger Alembic to upgrade to the latest version of the DB
"""
try:
LOG.info("Checking for shipyard database upgrade")
cwd = os.getcwd()
os.chdir(CONF.base.alembic_ini_path)
config = Config('alembic.ini',
attributes={'configure_logger': False})
alembic_command.upgrade(config, 'head')
os.chdir(cwd)
except Exception as exception:
LOG.error('***\n'
'Failed Alembic DB upgrade. Check the config: %s\n'
'***',
exception)
# don't let things continue...
raise exception
def get_all_submitted_actions(self):
"""
Retrieves all actions.
"""
return self.get_as_dict_array(ShipyardDbAccess.SELECT_ALL_ACTIONS)
def get_action_by_id(self, action_id):
"""
Get a single action
:param action_id: the id of the action to retrieve
"""
actions_array = self.get_as_dict_array(
ShipyardDbAccess.SELECT_ACTION_BY_ID, action_id=action_id)
if actions_array:
return actions_array[0]
else:
# Not found
return None
def get_preflight_validation_fails(self):
"""
Retrieves the summary set of preflight validation failures
"""
return self.get_as_dict_array(ShipyardDbAccess.SELECT_VALIDATIONS)
def get_validation_by_id(self, validation_id):
"""
Retreives a single validation for a given validation id
"""
validation_array = self.get_as_dict_array(
ShipyardDbAccess.SELECT_VALIDATION_BY_ID,
validation_id=validation_id)
if validation_array:
return validation_array[0]
else:
return None
def get_validation_by_action_id(self, action_id):
"""
Retreives the validations for a given action id
"""
return self.get_as_dict_array(
ShipyardDbAccess.SELECT_VALIDATION_BY_ACTION_ID,
action_id=action_id)
def insert_action(self, action):
"""
Inserts a single action row
"""
self.perform_insert(ShipyardDbAccess.INSERT_ACTION,
id=action['id'],
name=action['name'],
parameters=json.dumps(action['parameters']),
dag_id=action['dag_id'],
dag_execution_date=action['dag_execution_date'],
user=action['user'],
timestamp=action['timestamp'],
context_marker=action['context_marker'])
def get_command_audit_by_action_id(self, action_id):
"""
Retreives the action audit records for a given action id
"""
return self.get_as_dict_array(
ShipyardDbAccess.SELECT_CMD_AUDIT_BY_ACTION_ID,
action_id=action_id)
def insert_action_command_audit(self, ac_audit):
"""
Inserts a single action command audit
"""
self.perform_insert(ShipyardDbAccess.INSERT_ACTION_COMMAND_AUDIT,
id=ac_audit['id'],
action_id=ac_audit['action_id'],
command=ac_audit['command'],
user=ac_audit['user'])

View File

@ -1,49 +1,224 @@
# -*- coding: utf-8 -*-
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import traceback
import falcon
try:
from collections import OrderedDict
except ImportError:
OrderedDict = dict
ERR_UNKNOWN = {'status': falcon.HTTP_500, 'title': 'Internal Server Error'}
def get_version_from_request(req):
"""
Attempt to extract the api version string
"""
for part in req.path.split('/'):
if '.' in part and part.startswith('v'):
return part
return 'N/A'
ERR_AIRFLOW_RESPONSE = {
'status': falcon.HTTP_400,
'title': 'Error response from Airflow'
}
# Standard error handler
def format_resp(req,
resp,
status_code,
message="",
reason="",
error_type="Unspecified Exception",
retry=False,
error_list=None):
"""
Write a error message body and throw a Falcon exception to trigger
an HTTP status
:param req: Falcon request object
:param resp: Falcon response object to update
:param status_code: Falcon status_code constant
:param message: Optional error message to include in the body
:param reason: Optional reason code to include in the body
:param retry: Optional flag whether client should retry the operation.
:param error_list: option list of errors
Can ignore if we rely solely on 4XX vs 5xx status codes
"""
if error_list is None:
error_list = [{'message': 'An error ocurred, but was not specified'}]
error_response = {
'kind': 'status',
'apiVersion': get_version_from_request(req),
'metadata': {},
'status': 'Failure',
'message': message,
'reason': reason,
'details': {
'errorType': error_type,
'errorCount': len(error_list),
'errorList': error_list
},
'code': status_code
}
resp.body = json.dumps(error_response, default=str)
resp.content_type = 'application/json'
resp.status = status_code
def default_error_serializer(req, resp, exception):
"""
Writes the default error message body, when we don't handle it otherwise
"""
format_resp(
req,
resp,
status_code=exception.status,
message=exception.description,
reason=exception.title,
error_type=exception.__class__.__name__,
error_list=[{'message': exception.description}]
)
def default_exception_handler(ex, req, resp, params):
"""
Catch-all execption handler for standardized output.
If this is a standard falcon HTTPError, rethrow it for handling
"""
if isinstance(ex, falcon.HTTPError):
# allow the falcon http errors to bubble up and get handled
raise ex
else:
# take care of the uncaught stuff
exc_string = traceback.format_exc()
logging.error('Unhanded Exception being handled: \n%s', exc_string)
format_resp(
req,
resp,
falcon.HTTP_500,
error_type=ex.__class__.__name__,
message="Unhandled Exception raised: %s" % str(ex),
retry=True
)
class AppError(Exception):
def __init__(self, error=ERR_UNKNOWN, description=None):
self.error = error
self.error['description'] = description
@property
def title(self):
return self.error['title']
@property
def status(self):
return self.error['status']
@property
def description(self):
return self.error['description']
"""
Base error containing enough information to make a shipyard formatted error
"""
def __init__(self,
title='Internal Server Error',
description=None,
error_list=None,
status=falcon.HTTP_500,
retry=False):
"""
:param description: The internal error description
:param error_list: The list of errors
:param status: The desired falcon HTTP resposne code
:param title: The title of the error message
:param retry: Optional retry directive for the consumer
"""
self.title = title
self.description = description
self.error_list = massage_error_list(error_list, description)
self.status = status
self.retry = retry
@staticmethod
def handle(exception, req, res, error=None):
res.status = exception.status
meta = OrderedDict()
meta['message'] = exception.title
if exception.description:
meta['description'] = exception.description
res.body = json.dumps(meta)
def handle(ex, req, resp, params):
format_resp(
req,
resp,
ex.status,
message=ex.title,
reason=ex.description,
error_list=ex.error_list,
error_type=ex.__class__.__name__,
retry=ex.retry)
class AirflowError(AppError):
def __init__(self, description=None):
super().__init__(ERR_AIRFLOW_RESPONSE)
self.error['description'] = description
"""
An error to handle errors returned by the Airflow API
"""
def __init__(self, description=None, error_list=None):
super().__init__(
title='Error response from Airflow',
description=description,
error_list=error_list,
status=falcon.HTTP_400,
retry=False
)
class DatabaseError(AppError):
"""
An error to handle general api errors.
"""
def __init__(self,
description=None,
error_list=None,
status=falcon.HTTP_500,
title='Database Access Error',
retry=False):
super().__init__(
status=status,
title=title,
description=description,
error_list=error_list,
retry=retry
)
class ApiError(AppError):
"""
An error to handle general api errors.
"""
def __init__(self,
description="",
error_list=None,
status=falcon.HTTP_400,
title="",
retry=False):
super().__init__(
status=status,
title=title,
description=description,
error_list=error_list,
retry=retry
)
class InvalidFormatError(AppError):
"""
An exception to cover invalid input formatting
"""
def __init__(self, title, description="Not Specified", error_list=None):
super().__init__(
title=title,
description='Validation has failed',
error_list=error_list,
status=falcon.HTTP_400,
retry=False
)
def massage_error_list(error_list, placeholder_description):
"""
Returns a best-effort attempt to make a nice error list
"""
output_error_list = []
if error_list:
for error in error_list:
if not error['message']:
output_error_list.append({'message': error})
else:
output_error_list.append(error)
if not output_error_list:
output_error_list.append({'message': placeholder_description})
return output_error_list

View File

@ -12,13 +12,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import functools
import falcon
import logging
import falcon
from oslo_config import cfg
from oslo_policy import policy
from shipyard_airflow.errors import ApiError, AppError
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
policy_engine = None
@ -26,6 +30,9 @@ class ShipyardPolicy(object):
"""
Initialize policy defaults
"""
RULE_ADMIN_REQUIRED = 'rule:admin_required'
# Base Policy
base_rules = [
policy.RuleDefault(
@ -36,18 +43,61 @@ class ShipyardPolicy(object):
# Orchestrator Policy
task_rules = [
policy.DocumentedRuleDefault('workflow_orchestrator:get_regions',
'role:admin', 'Get region information', [{
'path':
'/api/v1.0/regions',
'method':
'GET'
}, {
'path':
'/api/v1.0/regions/{region_id}',
'method':
'GET'
}])
policy.DocumentedRuleDefault(
'workflow_orchestrator:list_actions',
RULE_ADMIN_REQUIRED,
'List workflow actions invoked by users',
[{
'path': '/api/v1.0/actions',
'method': 'GET'
}]
),
policy.DocumentedRuleDefault(
'workflow_orchestrator:create_action',
RULE_ADMIN_REQUIRED,
'Create a workflow action',
[{
'path': '/api/v1.0/actions',
'method': 'POST'
}]
),
policy.DocumentedRuleDefault(
'workflow_orchestrator:get_action',
RULE_ADMIN_REQUIRED,
'Retreive an action by its id',
[{
'path': '/api/v1.0/actions/{action_id}',
'method': 'GET'
}]
),
policy.DocumentedRuleDefault(
'workflow_orchestrator:get_action_step',
RULE_ADMIN_REQUIRED,
'Retreive an action step by its id',
[{
'path': '/api/v1.0/actions/{action_id}/steps/{step_id}',
'method': 'GET'
}]
),
policy.DocumentedRuleDefault(
'workflow_orchestrator:get_action_validation',
RULE_ADMIN_REQUIRED,
'Retreive an action validation by its id',
[{
'path':
'/api/v1.0/actions/{action_id}/validations/{validation_id}',
'method': 'GET'
}]
),
policy.DocumentedRuleDefault(
'workflow_orchestrator:invoke_action_control',
RULE_ADMIN_REQUIRED,
'Send a control to an action',
[{
'path': '/api/v1.0/actions/{action_id}/control/{control_verb}',
'method': 'POST'
}]
),
]
# Regions Policy
@ -61,7 +111,6 @@ class ShipyardPolicy(object):
def authorize(self, action, ctx):
target = {'project_id': ctx.project_id, 'user_id': ctx.user_id}
self.enforcer.authorize(action, target, ctx.to_policy_view())
return self.enforcer.authorize(action, target, ctx.to_policy_view())
@ -72,44 +121,68 @@ class ApiEnforcer(object):
def __init__(self, action):
self.action = action
self.logger = logging.getLogger('shipyard.policy')
self.logger = LOG
def __call__(self, f):
@functools.wraps(f)
def secure_handler(slf, req, resp, *args, **kwargs):
ctx = req.context
policy_engine = ctx.policy_engine
self.logger.debug("Enforcing policy %s on request %s" %
(self.action, ctx.request_id))
policy_eng = ctx.policy_engine
slf.info(ctx, "Policy Engine: %s" % policy_eng.__class__.__name__)
# perform auth
slf.info(ctx, "Enforcing policy %s on request %s" %
(self.action, ctx.request_id))
# policy engine must be configured
if policy_eng is None:
slf.error(
ctx,
"Error-Policy engine required-action: %s" % self.action)
raise AppError(
title="Auth is not being handled by any policy engine",
status=falcon.HTTP_500,
retry=False
)
authorized = False
try:
if policy_engine is not None and policy_engine.authorize(
self.action, ctx):
return f(slf, req, resp, *args, **kwargs)
else:
if ctx.authenticated:
slf.info(ctx, "Error - Forbidden access - action: %s" %
self.action)
slf.return_error(
resp,
falcon.HTTP_403,
message="Forbidden",
retry=False)
else:
slf.info(ctx, "Error - Unauthenticated access")
slf.return_error(
resp,
falcon.HTTP_401,
message="Unauthenticated",
retry=False)
if policy_eng.authorize(self.action, ctx):
# authorized
slf.info(ctx, "Request is authorized")
authorized = True
except:
slf.info(
# couldn't service the auth request
slf.error(
ctx,
"Error - Expectation Failed - action: %s" % self.action)
slf.return_error(
resp,
falcon.HTTP_417,
message="Expectation Failed",
retry=False)
raise ApiError(
title="Expectation Failed",
status=falcon.HTTP_417,
retry=False
)
if authorized:
return f(slf, req, resp, *args, **kwargs)
else:
slf.error(ctx,
"Auth check failed. Authenticated:%s" %
ctx.authenticated)
# raise the appropriate response exeception
if ctx.authenticated:
slf.error(ctx,
"Error: Forbidden access - action: %s" %
self.action)
raise ApiError(
title="Forbidden",
status=falcon.HTTP_403,
description="Credentials do not permit access",
retry=False
)
else:
slf.error(ctx, "Error - Unauthenticated access")
raise ApiError(
title="Unauthenticated",
status=falcon.HTTP_401,
description="Credentials are not established",
retry=False
)
return secure_handler

View File

@ -1,32 +0,0 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from setuptools import setup
setup(name='shipyard_airflow',
version='0.1a1',
description='API for managing Airflow-based orchestration',
url='http://github.com/att-comdev/shipyard',
author='Anthony Lin - AT&T',
author_email='al498u@att.com',
license='Apache 2.0',
packages=['shipyard_airflow',
'shipyard_airflow.control'],
install_requires=[
'falcon',
'requests',
'configparser',
'uwsgi>1.4',
'python-dateutil'
])

View File

@ -15,28 +15,30 @@ import logging
from oslo_config import cfg
from shipyard_airflow import policy
import shipyard_airflow.control.api as api
# We need to import config so the initializing code can run for oslo config
import shipyard_airflow.config as config # noqa: F401
from shipyard_airflow import policy
from shipyard_airflow.conf import config
from shipyard_airflow.db import db
CONF = cfg.CONF
def start_shipyard():
# Setup configuration parsing
cli_options = [
cfg.BoolOpt(
'debug', short='d', default=False, help='Enable debug logging'),
]
# Trigger configuration resolution.
config.parse_args()
# Setup root logger
logger = logging.getLogger('shipyard')
base_console_handler = logging.StreamHandler()
logger.setLevel('DEBUG')
ch = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[base_console_handler])
logging.getLogger().info("Setting logging level to: %s",
logging.getLevelName(CONF.logging.log_level))
logging.basicConfig(level=CONF.logging.log_level,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[base_console_handler])
# Specalized format for API logging
logger = logging.getLogger('shipyard.control')
@ -45,14 +47,21 @@ def start_shipyard():
('%(asctime)s - %(levelname)s - %(user)s - %(req_id)s - '
'%(external_ctx)s - %(message)s'))
ch = logging.StreamHandler()
ch.setFormatter(formatter)
logger.addHandler(ch)
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# Setup the RBAC policy enforcer
policy.policy_engine = policy.ShipyardPolicy()
policy.policy_engine.register_policy()
# Upgrade database
if CONF.base.upgrade_db:
# this is a reasonable place to put any online alembic upgrades
# desired. Currently only shipyard db is under shipyard control.
db.SHIPYARD_DB.update_db()
# Start the API
return api.start_api()

View File

@ -0,0 +1,23 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
from shipyard_airflow.conf import config
@pytest.fixture
def setup_config():
"""
Initialize shipyard config - this is needed so that CONF resolves.
"""
config.parse_args()

View File

@ -0,0 +1,239 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import os
from datetime import datetime
from shipyard_airflow.control.actions_api import ActionsResource
from shipyard_airflow.control.base import ShipyardRequestContext
from shipyard_airflow.errors import ApiError
DATE_ONE = datetime(2017, 9, 13, 11, 13, 3, 57000)
DATE_TWO = datetime(2017, 9, 13, 11, 13, 5, 57000)
DATE_ONE_STR = DATE_ONE.strftime('%Y-%m-%dT%H:%M:%S')
DATE_TWO_STR = DATE_TWO.strftime('%Y-%m-%dT%H:%M:%S')
def actions_db():
"""
replaces the actual db call
"""
return [
{
'id': 'aaaaaa',
'name': 'dag_it',
'parameters': None,
'dag_id': 'did1',
'dag_execution_date': DATE_ONE_STR,
'user': 'robot1',
'timestamp': DATE_ONE,
'context_marker': '8-4-4-4-12a'
},
{
'id': 'bbbbbb',
'name': 'dag2',
'parameters': {
'p1': 'p1val'
},
'dag_id': 'did2',
'dag_execution_date': DATE_ONE_STR,
'user': 'robot2',
'timestamp': DATE_ONE,
'context_marker': '8-4-4-4-12b'
},
]
def dag_runs_db():
"""
replaces the actual db call
"""
return [
{
'dag_id': 'did2',
'execution_date': DATE_ONE,
'state': 'SUCCESS',
'run_id': '12345',
'external_trigger': 'something',
'start_date': DATE_ONE,
'end_date': DATE_TWO
},
{
'dag_id': 'did1',
'execution_date': DATE_ONE,
'state': 'FAILED',
'run_id': '99',
'external_trigger': 'something',
'start_date': DATE_ONE,
'end_date': DATE_ONE
},
]
def tasks_db():
"""
replaces the actual db call
"""
return [
{
'task_id': '1a',
'dag_id': 'did2',
'execution_date': DATE_ONE,
'state': 'SUCCESS',
'run_id': '12345',
'external_trigger': 'something',
'start_date': DATE_ONE,
'end_date': DATE_TWO,
'duration': '20mins',
'try_number': '1',
'operator': 'smooth',
'queued_dttm': DATE_TWO
},
{
'task_id': '1b',
'dag_id': 'did2',
'execution_date': DATE_ONE,
'state': 'SUCCESS',
'run_id': '12345',
'external_trigger': 'something',
'start_date': DATE_ONE,
'end_date': DATE_TWO,
'duration': '1minute',
'try_number': '1',
'operator': 'smooth',
'queued_dttm': DATE_TWO
},
{
'task_id': '1c',
'dag_id': 'did2',
'execution_date': DATE_ONE,
'state': 'SUCCESS',
'run_id': '12345',
'external_trigger': 'something',
'start_date': DATE_ONE,
'end_date': DATE_TWO,
'duration': '1day',
'try_number': '3',
'operator': 'smooth',
'queued_dttm': DATE_TWO
},
{
'task_id': '2a',
'dag_id': 'did1',
'execution_date': DATE_ONE,
'state': 'FAILED',
'start_date': DATE_ONE,
'end_date': DATE_ONE,
'duration': '1second',
'try_number': '2',
'operator': 'smooth',
'queued_dttm': DATE_TWO
},
]
def airflow_stub(**kwargs):
"""
asserts that the airflow invocation method was called with the right
parameters
"""
assert kwargs['dag_id']
assert kwargs['action']
print(kwargs)
return '2017-09-06 14:10:08.528402'
def insert_action_stub(**kwargs):
"""
asserts that the insert action was called with the right parameters
"""
assert kwargs['action']
def audit_control_command_db(action_audit):
"""
Stub for inserting the invoke record
"""
assert action_audit['command'] == 'invoke'
context = ShipyardRequestContext()
def test_get_all_actions():
"""
Tests the main response from get all actions
"""
action_resource = ActionsResource()
action_resource.get_all_actions_db = actions_db
action_resource.get_all_dag_runs_db = dag_runs_db
action_resource.get_all_tasks_db = tasks_db
os.environ['DB_CONN_AIRFLOW'] = 'nothing'
os.environ['DB_CONN_SHIPYARD'] = 'nothing'
result = action_resource.get_all_actions()
print(result)
assert len(result) == len(actions_db())
for action in result:
if action['name'] == 'dag_it':
assert len(action['steps']) == 1
assert action['dag_status'] == 'FAILED'
if action['name'] == 'dag2':
assert len(action['steps']) == 3
assert action['dag_status'] == 'SUCCESS'
def test_create_action():
action_resource = ActionsResource()
action_resource.get_all_actions_db = actions_db
action_resource.get_all_dag_runs_db = dag_runs_db
action_resource.get_all_tasks_db = tasks_db
action_resource.invoke_airflow_dag = airflow_stub
action_resource.insert_action = insert_action_stub
action_resource.audit_control_command_db = audit_control_command_db
# with invalid input. fail.
try:
action = action_resource.create_action(
action={'name': 'broken', 'parameters': {'a': 'aaa'}},
context=context
)
assert False, 'Should throw an ApiError'
except ApiError:
# expected
pass
# with valid input and some parameters
try:
action = action_resource.create_action(
action={'name': 'deploy_site', 'parameters': {'a': 'aaa'}},
context=context
)
assert action['timestamp']
assert action['id']
assert len(action['id']) == 26
assert action['dag_execution_date'] == '2017-09-06 14:10:08.528402'
assert action['dag_status'] == 'SCHEDULED'
except ApiError:
assert False, 'Should not raise an ApiError'
print(json.dumps(action, default=str))
# with valid input and no parameters
try:
action = action_resource.create_action(
action={'name': 'deploy_site'},
context=context
)
assert action['timestamp']
assert action['id']
assert len(action['id']) == 26
assert action['dag_execution_date'] == '2017-09-06 14:10:08.528402'
assert action['dag_status'] == 'SCHEDULED'
except ApiError:
assert False, 'Should not raise an ApiError'
print(json.dumps(action, default=str))

View File

@ -0,0 +1,164 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from shipyard_airflow.control.actions_control_api import ActionsControlResource
from shipyard_airflow.control.base import ShipyardRequestContext
from shipyard_airflow.db.errors import AirflowStateError
from shipyard_airflow.db.db import AIRFLOW_DB
from shipyard_airflow.errors import ApiError
def actions_db(action_id):
"""
replaces the actual db call
"""
if action_id == 'not found':
return None
elif action_id == 'state error':
return {
'id': 'state error',
'name': 'dag_it',
'parameters': None,
'dag_id': 'state error',
'dag_execution_date': '2017-09-06 14:10:08.528402',
'user': 'robot1',
'timestamp': '2017-09-06 14:10:08.528402',
'context_marker': '8-4-4-4-12a'
}
else:
return {
'id': '59bb330a-9e64-49be-a586-d253bb67d443',
'name': 'dag_it',
'parameters': None,
'dag_id': 'did2',
'dag_execution_date': '2017-09-06 14:10:08.528402',
'user': 'robot1',
'timestamp': '2017-09-06 14:10:08.528402',
'context_marker': '8-4-4-4-12a'
}
def control_dag_run(dag_id,
execution_date,
expected_state,
desired_state):
if dag_id == 'state error':
raise AirflowStateError(message='test error')
else:
pass
def audit_control_command_db(action_audit):
pass
def test_get_action():
"""
Tests the main response from get all actions
"""
saved_control_dag_run = AIRFLOW_DB._control_dag_run
try:
action_resource = ActionsControlResource()
# stubs for db
action_resource.get_action_db = actions_db
action_resource.audit_control_command_db = audit_control_command_db
AIRFLOW_DB._control_dag_run = control_dag_run
# bad action
try:
action_resource.handle_control(
action_id='not found',
control_verb='meep',
context=ShipyardRequestContext()
)
assert False, "shouldn't find the action"
except ApiError as api_error:
assert api_error.title == 'Action not found'
assert api_error.status == '404 Not Found'
# bad control
try:
action_resource.handle_control(
action_id='59bb330a-9e64-49be-a586-d253bb67d443',
control_verb='meep',
context=ShipyardRequestContext()
)
assert False, 'meep is not a valid action'
except ApiError as api_error:
assert api_error.title == 'Control not supported'
assert api_error.status == '404 Not Found'
# success on each action - pause, unpause, stop
try:
action_resource.handle_control(
action_id='59bb330a-9e64-49be-a586-d253bb67d443',
control_verb='pause',
context=ShipyardRequestContext()
)
except ApiError as api_error:
assert False, 'Should not raise an ApiError'
try:
action_resource.handle_control(
action_id='59bb330a-9e64-49be-a586-d253bb67d443',
control_verb='unpause',
context=ShipyardRequestContext()
)
except ApiError as api_error:
assert False, 'Should not raise an ApiError'
try:
action_resource.handle_control(
action_id='59bb330a-9e64-49be-a586-d253bb67d443',
control_verb='stop',
context=ShipyardRequestContext()
)
except ApiError as api_error:
assert False, 'Should not raise an ApiError'
# pause state conflict
try:
action_resource.handle_control(
action_id='state error',
control_verb='pause',
context=ShipyardRequestContext()
)
assert False, 'should raise a conflicting state'
except ApiError as api_error:
assert api_error.title == 'Unable to pause action'
assert api_error.status == '409 Conflict'
# Unpause state conflict
try:
action_resource.handle_control(
action_id='state error',
control_verb='unpause',
context=ShipyardRequestContext()
)
assert False, 'should raise a conflicting state'
except ApiError as api_error:
assert api_error.title == 'Unable to unpause action'
assert api_error.status == '409 Conflict'
# Stop state conflict
try:
action_resource.handle_control(
action_id='state error',
control_verb='stop',
context=ShipyardRequestContext()
)
assert False, 'should raise a conflicting state'
except ApiError as api_error:
assert api_error.title == 'Unable to stop action'
assert api_error.status == '409 Conflict'
finally:
# modified class variable... replace it
AIRFLOW_DB._control_dag_run = saved_control_dag_run

View File

@ -0,0 +1,152 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from datetime import datetime
from shipyard_airflow.control.actions_id_api import (ActionsIdResource)
DATE_ONE = datetime(2017, 9, 13, 11, 13, 3, 57000)
DATE_TWO = datetime(2017, 9, 13, 11, 13, 5, 57000)
DATE_ONE_STR = DATE_ONE.strftime('%Y-%m-%dT%H:%M:%S')
DATE_TWO_STR = DATE_TWO.strftime('%Y-%m-%dT%H:%M:%S')
def actions_db(action_id):
"""
replaces the actual db call
"""
return {
'id': '12345678901234567890123456',
'name': 'dag_it',
'parameters': None,
'dag_id': 'did2',
'dag_execution_date': DATE_ONE_STR,
'user': 'robot1',
'timestamp': DATE_ONE,
'context_marker': '8-4-4-4-12a'
}
def dag_runs_db(dag_id, execution_date):
"""
replaces the actual db call
"""
return [{
'dag_id': 'did2',
'execution_date': DATE_ONE,
'state': 'FAILED',
'run_id': '99',
'external_trigger': 'something',
'start_date': DATE_ONE,
'end_date': DATE_ONE
}]
def tasks_db(dag_id, execution_date):
"""
replaces the actual db call
"""
return [
{
'task_id': '1a',
'dag_id': 'did2',
'execution_date': DATE_ONE,
'state': 'SUCCESS',
'run_id': '12345',
'external_trigger': 'something',
'start_date': DATE_ONE,
'end_date': DATE_ONE,
'duration': '20mins',
'try_number': '1',
'operator': 'smooth',
'queued_dttm': DATE_ONE
},
{
'task_id': '1b',
'dag_id': 'did2',
'execution_date': DATE_ONE,
'state': 'SUCCESS',
'run_id': '12345',
'external_trigger': 'something',
'start_date': DATE_TWO,
'end_date': DATE_TWO,
'duration': '1minute',
'try_number': '1',
'operator': 'smooth',
'queued_dttm': DATE_ONE
},
{
'task_id': '1c',
'dag_id': 'did2',
'execution_date': DATE_ONE,
'state': 'FAILED',
'run_id': '12345',
'external_trigger': 'something',
'start_date': DATE_TWO,
'end_date': DATE_TWO,
'duration': '1day',
'try_number': '3',
'operator': 'smooth',
'queued_dttm': DATE_TWO
}
]
def get_validations(action_id):
"""
Stub to return validations
"""
return [
{
'id': '43',
'action_id': '12345678901234567890123456',
'validation_name': 'It has shiny goodness',
'details': 'This was not very shiny.'
}
]
def get_ac_audit(action_id):
"""
Stub to return command audit response
"""
return [
{
'id': 'ABCDEFGHIJKLMNOPQRSTUVWXYZ',
'action_id': '12345678901234567890123456',
'command': 'PAUSE',
'user': 'Operator 99',
'datetime': DATE_ONE
},
{
'id': 'ABCDEFGHIJKLMNOPQRSTUVWXYA',
'action_id': '12345678901234567890123456',
'command': 'UNPAUSE',
'user': 'Operator 99',
'datetime': DATE_TWO
}
]
def test_get_action():
"""
Tests the main response from get all actions
"""
action_resource = ActionsIdResource()
# stubs for db
action_resource.get_action_db = actions_db
action_resource.get_dag_run_db = dag_runs_db
action_resource.get_tasks_db = tasks_db
action_resource.get_validations_db = get_validations
action_resource.get_action_command_audit_db = get_ac_audit
action = action_resource.get_action('12345678901234567890123456')
print(json.dumps(action, default=str))
if action['name'] == 'dag_it':
assert len(action['steps']) == 3
assert action['dag_status'] == 'FAILED'
assert len(action['command_audit']) == 2

View File

@ -0,0 +1,116 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from datetime import datetime
from shipyard_airflow.errors import ApiError
from shipyard_airflow.control.actions_steps_id_api import ActionsStepsResource
DATE_ONE = datetime(2017, 9, 13, 11, 13, 3, 57000)
DATE_TWO = datetime(2017, 9, 13, 11, 13, 5, 57000)
DATE_ONE_STR = DATE_ONE.strftime('%Y-%m-%dT%H:%M:%S')
DATE_TWO_STR = DATE_TWO.strftime('%Y-%m-%dT%H:%M:%S')
def actions_db(action_id):
"""
replaces the actual db call
"""
return {
'id': '59bb330a-9e64-49be-a586-d253bb67d443',
'name': 'dag_it',
'parameters': None,
'dag_id': 'did2',
'dag_execution_date': DATE_ONE_STR,
'user': 'robot1',
'timestamp': DATE_ONE_STR,
'context_marker': '8-4-4-4-12a'
}
def tasks_db(dag_id, execution_date):
"""
replaces the actual db call
"""
return [
{
'task_id': '1a',
'dag_id': 'did2',
'execution_date': DATE_ONE,
'state': 'SUCCESS',
'run_id': '12345',
'external_trigger': 'something',
'start_date': DATE_ONE,
'end_date': DATE_ONE,
'duration': '20mins',
'try_number': '1',
'operator': 'smooth',
'queued_dttm': DATE_ONE
},
{
'task_id': '1b',
'dag_id': 'did2',
'execution_date': DATE_ONE,
'state': 'SUCCESS',
'run_id': '12345',
'external_trigger': 'something',
'start_date': DATE_TWO,
'end_date': DATE_TWO,
'duration': '1minute',
'try_number': '1',
'operator': 'smooth',
'queued_dttm': DATE_ONE
},
{
'task_id': '1c',
'dag_id': 'did2',
'execution_date': DATE_ONE,
'state': 'FAILED',
'run_id': '12345',
'external_trigger': 'something',
'start_date': DATE_TWO,
'end_date': DATE_TWO,
'duration': '1day',
'try_number': '3',
'operator': 'smooth',
'queued_dttm': DATE_TWO
}
]
def test_get_action_steps():
"""
Tests the main response from get all actions
"""
action_resource = ActionsStepsResource()
# stubs for db
action_resource.get_action_db = actions_db
action_resource.get_tasks_db = tasks_db
step = action_resource.get_action_step(
'59bb330a-9e64-49be-a586-d253bb67d443',
'1c'
)
assert step['index'] == 3
assert step['try_number'] == '3'
assert step['operator'] == 'smooth'
print(json.dumps(step, default=str))
try:
step = action_resource.get_action_step(
'59bb330a-9e64-49be-a586-d253bb67d443',
'cheese'
)
assert False, 'should raise an ApiError'
except ApiError as api_error:
assert api_error.title == 'Step not found'
assert api_error.status == '404 Not Found'

View File

@ -0,0 +1,87 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from shipyard_airflow.control.actions_validations_id_api import (
ActionsValidationsResource
)
from shipyard_airflow.errors import ApiError
def actions_db(action_id):
"""
replaces the actual db call
"""
if action_id == 'error_it':
return None
else:
return {
'id': '59bb330a-9e64-49be-a586-d253bb67d443',
'name': 'dag_it',
'parameters': None,
'dag_id': 'did2',
'dag_execution_date': '2017-09-06 14:10:08.528402',
'user': 'robot1',
'timestamp': '2017-09-06 14:10:08.528402',
'context_marker': '8-4-4-4-12a'
}
def get_validations(validation_id):
"""
Stub to return validations
"""
if validation_id == '43':
return {
'id': '43',
'action_id': '59bb330a-9e64-49be-a586-d253bb67d443',
'validation_name': 'It has shiny goodness',
'details': 'This was not very shiny.'
}
else:
return None
def test_get_action_validation():
"""
Tests the main response from get all actions
"""
action_resource = ActionsValidationsResource()
# stubs for db
action_resource.get_action_db = actions_db
action_resource.get_validation_db = get_validations
validation = action_resource.get_action_validation(
action_id='59bb330a-9e64-49be-a586-d253bb67d443',
validation_id='43'
)
print(json.dumps(validation, default=str))
assert validation['action_id'] == '59bb330a-9e64-49be-a586-d253bb67d443'
assert validation['validation_name'] == 'It has shiny goodness'
try:
validation = action_resource.get_action_validation(
action_id='59bb330a-9e64-49be-a586-d253bb67d443',
validation_id='not a chance'
)
assert False
except ApiError as api_error:
assert api_error.status == '404 Not Found'
assert api_error.title == 'Validation not found'
try:
validation = action_resource.get_action_validation(
action_id='error_it',
validation_id='not a chance'
)
assert False
except ApiError as api_error:
assert api_error.status == '404 Not Found'
assert api_error.title == 'Action not found'

16
tox.ini
View File

@ -14,13 +14,21 @@ commands=
commands = flake8 {posargs}
[testenv:bandit]
commands = bandit -r shipyard_airflow -x tests -n 5
# NOTE(Bryan Strassner) ignoring airflow plugin which uses a subexec
# tests are not under the shipyard_airflow directory, not exlcuding those
commands = bandit -r shipyard_airflow -x plugins/rest_api_plugin.py -n 5
[testenv:genconfig]
commands = oslo-config-generator --config-file=generator/config-generator.conf
[testenv:genpolicy]
commands = oslopolicy-sample-generator --config-file=generator/policy-generator.conf
[flake8]
# NOTE(Bryan Strassner) ignoring F841 because of the airflow example pattern
# of naming variables even if they aren't used for DAGs and Operators.
# Doing so adds readability and context in this case.
ignore=E302,H306,D100,D101,D102,F841
# NOTE(Bryan Strassner) excluding 3rd party code that is brought into the
ignore = E302,H306,D100,D101,D102,F841
# NOTE(Bryan Strassner) excluding 3rd party and generated code that is brought into the
# codebase.
exclude=*plugins/rest_api_plugin.py,*lib/python*,*egg,.git*,*.md,.tox*
exclude = *plugins/rest_api_plugin.py,*lib/python*,*egg,.git*,*.md,.tox*,alembic/env.py,build/*