DRYD47 - Task persistance and refactor

Refactor the task model and add database persistence for tasks.

- Document task schema
- Use Alembic for database creation
- Use SQLalchemy for database abstraction
- Update Task status and result enumerations

Change-Id: I247b88f293144a0bdf891958e19711d975c729ba
This commit is contained in:
Scott Hussey 2017-09-20 16:46:46 -05:00
parent 1998b16d5a
commit e042811c76
14 changed files with 628 additions and 123 deletions

68
alembic.ini Normal file
View File

@ -0,0 +1,68 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = alembic
# template used to generate migration files
# file_template = %%(rev)s_%%(slug)s
# max length of characters to apply to the
# "slug" field
#truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; this defaults
# to alembic/versions. When using multiple version
# directories, initial revisions must be specified with --version-path
# version_locations = %(here)s/bar %(here)s/bat alembic/versions
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
sqlalchemy.url = driver://user:pass@localhost/dbname
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

1
alembic/README Normal file
View File

@ -0,0 +1 @@
Generic single-database configuration.

77
alembic/env.py Normal file
View File

@ -0,0 +1,77 @@
"""Alembic database creation and upgrades."""
import os
import sys
from alembic import context
from sqlalchemy import engine_from_config, pool
from logging.config import fileConfig
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = None
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
return # We don't support offline
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url, target_metadata=target_metadata, literal_binds=True)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
db_url = os.environ['DRYDOCK_DB_URL']
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix='sqlalchemy.',
poolclass=pool.NullPool,
url=db_url)
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

24
alembic/script.py.mako Normal file
View File

@ -0,0 +1,24 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
def upgrade():
${upgrades if upgrades else "pass"}
def downgrade():
${downgrades if downgrades else "pass"}

View File

@ -0,0 +1,31 @@
"""create base database tables
Revision ID: 9593a123e7c5
Revises:
Create Date: 2017-09-21 14:56:13.866443
"""
# revision identifiers, used by Alembic.
revision = '9593a123e7c5'
down_revision = None
branch_labels = None
depends_on = None
from alembic import op
import sqlalchemy as sa
from drydock_provisioner.statemgmt.db import tables
def upgrade():
op.create_table(tables.Tasks.__tablename__, *tables.Tasks.__schema__)
op.create_table(tables.ResultMessage.__tablename__, *tables.ResultMessage.__schema__)
op.create_table(tables.ActiveInstance.__tablename__, *tables.ActiveInstance.__schema__)
op.create_table(tables.BuildData.__tablename__, *tables.BuildData.__schema__)
def downgrade():
op.drop_table(tables.Tasks.__tablename__)
op.drop_table(tables.ResultMessage.__tablename__)
op.drop_table(tables.ActiveInstance.__tablename__)
op.drop_table(tables.BuildData.__tablename__)

145
docs/task.rst Normal file
View File

@ -0,0 +1,145 @@
Tasks
=====
Tasks are requests for Drydock to perform an action asynchronously. Depending on the
action being requested, tasks could take seconds to hours to complete. When a task is
created, a identifier is generated and returned. That identifier can be used to poll
the task API for task status and results.
Task Document Schema
--------------------
This document can be posted to the Drydock task API to create a new task.::
{
"action": "validate_design|verify_site|prepare_site|verify_node|prepare_node|deploy_node|destroy_node",
"design_ref": "http_uri|deckhand_uri|file_uri",
"node_filter": {
"filter_set_type": "intersection|union",
"filter_set": [
{
"filter_type": "intersection|union",
"node_names": [],
"node_tags": [],
"node_labels": {},
"rack_names": [],
"rack_labels": {},
}
]
}
}
The filter is computed by taking the set of all defined nodes. Each filter in the filter set is applied
by either finding the union or intersection of filtering the full set of nodes by the attribute values
specified. The result set of each filter is then combined as either an intersection or union with that result
being the final set the task is executed against.
Assuming you have a node inventory of::
[
{
"name": "a",
"labels": {
"type": "physical",
"color": "blue"
}
},
{
"name": "b",
"labels": {
"type": "virtual",
"color": "yellow"
}
},
{
"name": "c",
"labels": {
"type": "physical",
"color": "yellow"
}
}
Example::
"filter_set": [
{
"filter_type": "intersection",
"node_labels": {
"color": "yellow",
"type": "physical"
}
},
{
"filter_type": "intersection",
"node_names": ["a"]
}
],
"filter_set_type": "union"
The above filter set results in a set ``a`` and ``c``.
Task Status Schema
------------------
When querying the state of an existing task, the below document will be returned::
{
"Kind": "Task",
"apiVersion": "v1",
"task_id": "uuid",
"action": "validate_design|verify_site|prepare_site|verify_node|prepare_node|deploy_node|destroy_node",
"design_ref": "http_uri|deckhand_uri|file_uri",
"parent_task_id": "uuid",
"subtask_id_list": ["uuid","uuid",...],
"status": "requested|queued|running|terminating|complete|terminated",
"node_filter": {
"filter_set_type": "intersection|union",
"filter_set": [
{
"filter_type": "intersection|union",
"node_names": [],
"node_tags": [],
"node_labels": {},
"rack_names": [],
"rack_labels": {},
}
]
},
"created": iso8601 UTC timestamp,
"created_by": "user",
"updated": iso8601 UTC timestamp,
"terminated": iso8601 UTC timestamp,
"terminated_by": "user",
"result": Status object
}
The Status object is based on the UCP standardized response format::
{
"Kind": "Status",
"apiVersion": "v1",
"metadata": {},
"message": "Drydock Task ...",
"reason": "Failure reason",
"status": "failure|success|partial_success|incomplete",
"details": {
"errorCount": 0,
"messageList": [
StatusMessage
]
}
}
The StatusMessage object will change based on the context of the message, but will at a minimum
consist of the below::
{
"message": "Textual description",
"error": true|false,
"context_type": "site|network|node",
"context": "site_name|network_name|node_name",
"ts": iso8601 UTC timestamp,
}

View File

@ -80,9 +80,8 @@ class ActionResult(BaseDrydockEnum):
Success = 'success'
PartialSuccess = 'partial_success'
Failure = 'failure'
DependentFailure = 'dependent_failure'
ALL = (Incomplete, Success, PartialSuccess, Failure, DependentFailure)
ALL = (Incomplete, Success, PartialSuccess, Failure)
class ActionResultField(fields.BaseEnumField):
@ -90,17 +89,14 @@ class ActionResultField(fields.BaseEnumField):
class TaskStatus(BaseDrydockEnum):
Created = 'created'
Waiting = 'waiting'
Requested = 'requested'
Queued = 'queued'
Running = 'running'
Stopping = 'stopping'
Terminating = 'terminating'
Terminated = 'terminated'
Errored = 'errored'
Complete = 'complete'
Stopped = 'stopped'
ALL = (Created, Waiting, Running, Stopping, Terminated, Errored, Complete,
Stopped)
ALL = (Requested, Queued, Running, Terminating, Terminated, Complete)
class TaskStatusField(fields.BaseEnumField):

View File

@ -11,9 +11,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import uuid
"""Models for representing asynchronous tasks."""
from threading import Lock
import uuid
import datetime
import drydock_provisioner.error as errors
@ -21,23 +22,42 @@ import drydock_provisioner.objects.fields as hd_fields
class Task(object):
def __init__(self, **kwargs):
self.task_id = uuid.uuid4()
self.status = hd_fields.TaskStatus.Created
self.terminate = False
self.subtasks = []
self.lock_id = None
self.result = hd_fields.ActionResult.Incomplete
self.result_detail = None
self.action = kwargs.get('action', hd_fields.OrchestratorAction.Noop)
"""Asynchronous Task.
self.parent_task_id = kwargs.get('parent_task_id', '')
:param action: The enumerated action value to be executed
:param design_ref: A reference URI to the design data describing the context
of this task
:param parent_task_id: Optional UUID4 ID of the parent task to this task
:param node_filter: Optional instance of TaskNodeFilter limiting the set of nodes
this task will impact
"""
def __init__(self, **kwargs):
context = kwargs.get('context', None)
self.task_id = uuid.uuid4()
self.status = hd_fields.TaskStatus.Requested
self.subtask_id_list = []
self.result = TaskStatus()
self.action = kwargs.get('action', hd_fields.OrchestratorAction.Noop)
self.design_ref = kwargs.get('design_ref', None)
self.parent_task_id = kwargs.get('parent_task_id', None)
self.created = datetime.utcnow()
self.node_filter = kwargs.get('node_filter', None)
self.created_by = None
self.updated = None
self.terminated = None
self.terminated_by = None
self.context = context
if context is not None:
self.created_by = context.user
def get_id(self):
return self.task_id
def terminate_task(self):
self.terminate = True
self.set_Status(hd_fields.TaskStatus.Terminating)
def set_status(self, status):
self.status = status
@ -45,74 +65,113 @@ class Task(object):
def get_status(self):
return self.status
def set_result(self, result):
self.result = result
def get_result(self):
return self.result
def set_result_detail(self, detail):
self.result_detail = detail
def get_result_detail(self):
return self.result_detail
def add_result_message(self, **kwargs):
"""Add a message to result details."""
self.result.add_message(**kwargs)
def register_subtask(self, subtask_id):
if self.terminate:
raise errors.OrchestratorError("Cannot add subtask for parent" \
if self.status in [hd_fields.TaskStatus.Terminating]:
raise errors.OrchestratorError("Cannot add subtask for parent"
" marked for termination")
self.subtasks.append(subtask_id)
self.subtask_id_list.append(subtask_id)
def get_subtasks(self):
return self.subtasks
return self.subtask_id_list
def add_status_msg(self, **kwargs):
self.result.add_status_msg(**kwargs)
def to_dict(self):
return {
'Kind': 'Task',
'apiVersion': 'v1',
'task_id': str(self.task_id),
'action': self.action,
'parent_task': str(self.parent_task_id),
'design_ref': self.design_ref,
'status': self.status,
'result': self.result,
'result_detail': self.result_detail,
'subtasks': [str(x) for x in self.subtasks],
'result': self.result.to_dict(),
'node_filter': self.node_filter.to_dict(),
'subtask_id_list': [str(x) for x in self.subtask_id_list],
'created': self.created,
'created_by': self.created_by,
'updated': self.updated,
'terminated': self.terminated,
'terminated_by': self.terminated_by,
}
class OrchestratorTask(Task):
def __init__(self, design_id=None, **kwargs):
super(OrchestratorTask, self).__init__(**kwargs)
class TaskStatus(object):
"""Status/Result of this task's execution."""
self.design_id = design_id
def __init__(self):
self.details = {
'errorCount': 0,
'messageList': []
}
if self.action in [
hd_fields.OrchestratorAction.VerifyNode,
hd_fields.OrchestratorAction.PrepareNode,
hd_fields.OrchestratorAction.DeployNode,
hd_fields.OrchestratorAction.DestroyNode
]:
self.node_filter = kwargs.get('node_filter', None)
self.message = None
self.reason = None
self.status = hd_fields.ActionResult.Incomplete
def set_message(self, msg):
self.message = msg
def set_reason(self, reason):
self.reason = reason
def set_status(self, status):
self.status = status
def add_status_msg(self, msg=None, error=None, ctx_type=None, ctx=None, **kwargs):
if msg is None or error is None or ctx_type is None or ctx is None:
raise ValueError('Status message requires fields: msg, error, ctx_type, ctx')
new_msg = TaskStatusMessage(msg, error, ctx_type, ctx, **kwargs)
self.details.messageList.append(new_msg)
if error:
self.details.errorCount = self.details.errorCount + 1
def to_dict(self):
_dict = super(OrchestratorTask, self).to_dict()
_dict['design_id'] = self.design_id
_dict['node_filter'] = getattr(self, 'node_filter', None)
return _dict
return {
'Kind': 'Status',
'apiVersion': 'v1',
'metadata': {},
'message': self.message,
'reason': self.reason,
'status': self.status,
'details': {
'errorCount': self.details.errorCount,
'messageList': [x.to_dict() for x in self.details.messageList],
}
}
class DriverTask(Task):
def __init__(self, task_scope={}, **kwargs):
super(DriverTask, self).__init__(**kwargs)
class TaskStatusMessage(object):
"""Message describing an action or error from executing a Task."""
self.design_id = kwargs.get('design_id', None)
self.node_list = task_scope.get('node_names', [])
def __init__(self, msg, error, ctx_type, ctx, **kwargs):
self.message = msg
self.error = error
self.ctx_type = ctx_type
self.ctx = ctx
self.ts = datetime.utcnow()
self.extra = kwargs
def to_dict(self):
_dict = super(DriverTask, self).to_dict()
_dict = {
'message': self.message,
'error': self.error,
'context_type': self.ctx_type,
'context': self.ctx,
'ts': self.ts,
}
_dict['design_id'] = self.design_id
_dict['node_list'] = self.node_list
_dict.update(self.extra)
return _dict

View File

@ -0,0 +1,90 @@
"""Definitions for Drydock database tables."""
from sqlalchemy import Table, Column, MetaData
from sqlalchemy.types import Boolean, DateTime, String, Integer, JSON, BLOB
from sqlalchemy.dialects import postgresql as pg
metadata = MetaData()
class Tasks(Table):
"""Table for persisting Tasks."""
__tablename__ = 'tasks'
__schema__ = [
Column('task_id', pg.BYTEA(16), primary_key=True),
Column('parent_task_id', pg.BYTEA(16)),
Column('subtask_id_list', pg.ARRAY(pg.BYTEA(16))),
Column('result_status', String(32)),
Column('result_error_count', Integer),
Column('status', String(32)),
Column('created', DateTime),
Column('created_by', String(16)),
Column('updated', DateTime),
Column('design_ref', String(128)),
Column('request_context', pg.JSON),
Column('action', String(32)),
Column('terminated', DateTime),
Column('terminated_by', String(16))
]
def __init__(self):
super().__init__(
Tasks.__tablename__,
metadata,
*Tasks.__schema__)
class ResultMessage(Table):
"""Table for tracking result/status messages."""
__tablename__ = 'result_message'
__schema__ = [
Column('task_id', pg.BYTEA(16), primary_key=True),
Column('sequence', Integer, autoincrement='auto', primary_key=True),
Column('message', String(128)),
Column('error', Boolean),
Column('extra', pg.JSON)
]
def __init__(self):
super().__init__(
ResultMessage.__tablename__,
metadata,
*ResultMessage.__schema__)
class ActiveInstance(Table):
"""Table to organize multiple orchestrator instances."""
__tablename__ = 'active_instance'
__schema__ = [
Column('identity', pg.BYTEA(16), primary_key=True),
Column('last_ping', DateTime)
]
def __init__(self):
super().__init__(
ActiveInstance.__tablename__,
metadata,
*ActiveInstance.__schema__)
class BuildData(Table):
"""Table persisting node build data."""
__tablename__ = 'build_data'
__schema__ = [
Column('node_name', String(16), primary_key=True),
Column('task_id', pg.BYTEA(16)),
Column('message', String(128)),
]
def __init__(self):
super().__init__(
BuildData.__tablename__,
metadata,
*BuildData.__schema__)

View File

@ -1,47 +0,0 @@
# Statemgmt #
Statemgmt is the interface to the persistence store
for holding site design data as well as build status data
/drydock - Base namespace for drydock data
## As Designed ##
Serialization of Drydock internal model as ingested. Not externally writable.
/drydock/design
/drydock/design/base - The base site design used for the first deployment
/drydock/design/[changeID] - A change layer on top of the base site design. Chrono ordered
## As Built ##
Serialization of Drydock internal model as rendered to effective implementation including build status. Not externally writable.
/drydock/build
/drydock/build/[datestamp] - A point-in-time view of what was deployed with deployment results
## Tasks ##
Management of task state for the internal orchestrator
/drydock/tasks
## Node data ##
Per-node data that can drive introspection as well as accept updates from nodes
/drydock/nodedata
/drydock/nodedata/[nodename] - Per-node data, can be seeded with Ingested metadata but can be updated during by orchestrator or node
## Service data ##
Per-service data (may not be needed)
/drydock/servicedata
/drydock/servicedata/[servicename]
## Global data ##
Generic k:v store that can be produced/consumed by non-Drydock services via the Drydock API
/drydock/globaldata

View File

@ -0,0 +1,58 @@
======================================
Statemgmt - Persisted State Management
======================================
Statemgmt is the interface to the persistence store for managing task data and build
data for nodes. Currently Drydock only supports a Postgres database backend. This module
will also resolve design references to retrieve the design data from the specified
external reference
Tables
======
tasks
-----
The ``tasks`` table stores all tasks - Queued, Running, Complete. The orchestrator
will source all tasks from this table.
result_message
--------------
The ``result_message`` table is used for storing all of the detailed messages produced
while executing a task. These are sequenced and attached to the task when serializing
a task.
build_data
----------
The ``build_data`` table is used for storing the build history and details of nodes
in the site. When a node is destroyed and redeployed, the history will persist showing
that transition.
active_instance
---------------
``active_instance`` is a small semaphore table so that multiple instances of Drydock
can organize and ensure only a single orchestrator instance is executing tasks.
Design References
=================
Rather than Drydock storing design data internally, it instead supports a URI-based
reference scheme. The URI should specify the driver and transport details required to
source the data. Once the data is retrieved by the driver, it will be sent to an
ingester for translation into the internal Drydock object model.
Example design reference URI: ``deckhand+https://deckhand-api.ucp.svc.cluster.local:8443/e50b4d74-9fab-11e7-b7cc-080027ef795a``
Current Drivers
---------------
Drydock currently can resolve design references to simple ``file://`` and ``http://`` endpoints
with no authentication required. Local files must provide absolute paths.
Planned Drivers
---------------
There is planned support for ``https://`` and ``deckhand+https://`` endpoints.

View File

@ -1,17 +1,20 @@
PyYAML===3.12
pyghmi===1.0.18
PyYAML==3.12
pyghmi==1.0.18
netaddr
falcon
oslo.versionedobjects===1.23.0
oslo.versionedobjects==1.23.0
requests
oauthlib
uwsgi===2.0.15
bson===0.4.7
oslo.config===3.16.0
click===6.7
uwsgi==2.0.15
bson==0.4.7
oslo.config==3.16.0
click==6.7
PasteDeploy==1.5.2
keystonemiddleware===4.9.1
oslo.policy===1.22.1
iso8601===0.1.11
keystoneauth1===2.13.0
PTable==0.9.2
keystonemiddleware==4.9.1
oslo.policy==1.22.1
iso8601==0.1.11
keystoneauth1==2.13.0
alembic==0.8.2
sqlalchemy==1.1.14
psycopg2==2.7.3.1

View File

@ -41,8 +41,8 @@ setup(
'drydock_provisioner.drivers.node.maasdriver.models',
'drydock_provisioner.control', 'drydock_provisioner.cli',
'drydock_provisioner.cli.design', 'drydock_provisioner.cli.part',
'drydock_provisioner.cli.task', 'drydock_provisioner.cli.node',
'drydock_provisioner.drydock_client'
'drydock_provisioner.cli.task', 'drydock_provisioner.drydock_client',
'drydock_provisioner.statemgmt.db','drydock_provisioner.cli.node'
],
entry_points={
'oslo.config.opts':