From 7e7739ce2cbe3156a8ecec854639052542d40dcf Mon Sep 17 00:00:00 2001 From: Scott Hussey Date: Tue, 26 Jun 2018 14:38:40 -0500 Subject: [PATCH] (fix) Guard against DB connection leaks - Use connection contexts to ensure database connections are released back to the pool. - Make database connection pool size configurable, defaults to 15 Change-Id: Id8011fbf45a1b3c87835ff5f47ebfa9334488319 --- drydock_provisioner/config.py | 4 + drydock_provisioner/statemgmt/state.py | 216 ++++++++++++------------- 2 files changed, 104 insertions(+), 116 deletions(-) diff --git a/drydock_provisioner/config.py b/drydock_provisioner/config.py index 51981a54..3b167ff3 100644 --- a/drydock_provisioner/config.py +++ b/drydock_provisioner/config.py @@ -91,6 +91,10 @@ class DrydockConfig(object): cfg.StrOpt( 'database_connect_string', help='The URI database connect string.'), + cfg.IntOpt( + 'pool_size', + default=15, + help='The SQLalchemy database connection pool size.'), ] # Options for the boot action framework diff --git a/drydock_provisioner/statemgmt/state.py b/drydock_provisioner/statemgmt/state.py index aa4c85c9..dc785f66 100644 --- a/drydock_provisioner/statemgmt/state.py +++ b/drydock_provisioner/statemgmt/state.py @@ -42,7 +42,8 @@ class DrydockState(object): def connect_db(self): """Connect the state manager to the persistent DB.""" self.db_engine = create_engine( - config.config_mgr.conf.database.database_connect_string) + config.config_mgr.conf.database.database_connect_string, + pool_size=config.config_mgr.conf.database.pool_size) self.db_metadata = MetaData(bind=self.db_engine) self.tasks_tbl = tables.Tasks(self.db_metadata) @@ -67,12 +68,11 @@ class DrydockState(object): 'build_data', ] - conn = self.db_engine.connect() - for t in table_names: - query_text = sql.text( - "TRUNCATE TABLE %s" % t).execution_options(autocommit=True) - conn.execute(query_text) - conn.close() + with self.db_engine.connect() as conn: + for t in table_names: + query_text = sql.text( + "TRUNCATE TABLE %s" % t).execution_options(autocommit=True) + conn.execute(query_text) def get_design_documents(self, design_ref): return ReferenceResolver.resolve_reference(design_ref) @@ -80,19 +80,17 @@ class DrydockState(object): def get_tasks(self): """Get all tasks in the database.""" try: - conn = self.db_engine.connect() - query = sql.select([self.tasks_tbl]) - rs = conn.execute(query) + with self.db_engine.connect() as conn: + query = sql.select([self.tasks_tbl]) + rs = conn.execute(query) - task_list = [objects.Task.from_db(dict(r)) for r in rs] + task_list = [objects.Task.from_db(dict(r)) for r in rs] - self._assemble_tasks(task_list=task_list) + self._assemble_tasks(task_list=task_list) - # add reference to this state manager to each task - for t in task_list: - t.statemgr = self - - conn.close() + # add reference to this state manager to each task + for t in task_list: + t.statemgr = self return task_list except Exception as ex: @@ -164,25 +162,24 @@ class DrydockState(object): :param allowed_actions: list of string action names """ try: - conn = self.db_engine.connect() - if allowed_actions is None: - query = self.tasks_tbl.select().where( - self.tasks_tbl.c.status == - hd_fields.TaskStatus.Queued).order_by( - self.tasks_tbl.c.created.asc()) - rs = conn.execute(query) - else: - query = sql.text("SELECT * FROM tasks WHERE " - "status = :queued_status AND " - "action = ANY(:actions) " - "ORDER BY created ASC") - rs = conn.execute( - query, - queued_status=hd_fields.TaskStatus.Queued, - actions=allowed_actions) + with self.db_engine.connect() as conn: + if allowed_actions is None: + query = self.tasks_tbl.select().where( + self.tasks_tbl.c.status == + hd_fields.TaskStatus.Queued).order_by( + self.tasks_tbl.c.created.asc()) + rs = conn.execute(query) + else: + query = sql.text("SELECT * FROM tasks WHERE " + "status = :queued_status AND " + "action = ANY(:actions) " + "ORDER BY created ASC") + rs = conn.execute( + query, + queued_status=hd_fields.TaskStatus.Queued, + actions=allowed_actions) - r = rs.first() - conn.close() + r = rs.first() if r is not None: task = objects.Task.from_db(dict(r)) @@ -203,12 +200,11 @@ class DrydockState(object): :param task_id: uuid.UUID of a task_id to query against """ try: - conn = self.db_engine.connect() - query = self.tasks_tbl.select().where( - self.tasks_tbl.c.task_id == task_id.bytes) - rs = conn.execute(query) - - r = rs.fetchone() + with self.db_engine.connect() as conn: + query = self.tasks_tbl.select().where( + self.tasks_tbl.c.task_id == task_id.bytes) + rs = conn.execute(query) + r = rs.fetchone() task = objects.Task.from_db(dict(r)) @@ -217,8 +213,6 @@ class DrydockState(object): self._assemble_tasks(task_list=[task]) task.statemgr = self - conn.close() - return task except Exception as ex: @@ -234,11 +228,10 @@ class DrydockState(object): :param msg: instance of objects.TaskStatusMessage """ try: - conn = self.db_engine.connect() - query = self.result_message_tbl.insert().values( - task_id=task_id.bytes, **(msg.to_db())) - conn.execute(query) - conn.close() + with self.db_engine.connect() as conn: + query = self.result_message_tbl.insert().values( + task_id=task_id.bytes, **(msg.to_db())) + conn.execute(query) return True except Exception as ex: self.logger.error("Error inserting result message for task %s: %s" @@ -253,24 +246,22 @@ class DrydockState(object): if task_list is None: return None - conn = self.db_engine.connect() - query = sql.select([ - self.result_message_tbl - ]).where(self.result_message_tbl.c.task_id == sql.bindparam( - 'task_id')).order_by(self.result_message_tbl.c.sequence.asc()) - query.compile(self.db_engine) + with self.db_engine.connect() as conn: + query = sql.select([ + self.result_message_tbl + ]).where(self.result_message_tbl.c.task_id == sql.bindparam( + 'task_id')).order_by(self.result_message_tbl.c.sequence.asc()) + query.compile(self.db_engine) - for t in task_list: - rs = conn.execute(query, task_id=t.task_id.bytes) - error_count = 0 - for r in rs: - msg = objects.TaskStatusMessage.from_db(dict(r)) - if msg.error: - error_count = error_count + 1 - t.result.message_list.append(msg) - t.result.error_count = error_count - - conn.close() + for t in task_list: + rs = conn.execute(query, task_id=t.task_id.bytes) + error_count = 0 + for r in rs: + msg = objects.TaskStatusMessage.from_db(dict(r)) + if msg.error: + error_count = error_count + 1 + t.result.message_list.append(msg) + t.result.error_count = error_count def post_task(self, task): """Insert a task into the database. @@ -280,11 +271,10 @@ class DrydockState(object): :param task: instance of objects.Task to insert into the database. """ try: - conn = self.db_engine.connect() - query = self.tasks_tbl.insert().values(**( - task.to_db(include_id=True))) - conn.execute(query) - conn.close() + with self.db_engine.connect() as conn: + query = self.tasks_tbl.insert().values(**( + task.to_db(include_id=True))) + conn.execute(query) return True except Exception as ex: self.logger.error( @@ -297,17 +287,15 @@ class DrydockState(object): :param task: objects.Task instance to reference for update values """ try: - conn = self.db_engine.connect() - query = self.tasks_tbl.update().where( - self.tasks_tbl.c.task_id == task.task_id.bytes).values(**( - task.to_db(include_id=False))) - rs = conn.execute(query) - if rs.rowcount == 1: - conn.close() - return True - else: - conn.close() - return False + with self.db_engine.connect() as conn: + query = self.tasks_tbl.update().where( + self.tasks_tbl.c.task_id == task.task_id.bytes).values(**( + task.to_db(include_id=False))) + rs = conn.execute(query) + if rs.rowcount == 1: + return True + else: + return False except Exception as ex: self.logger.error( "Error updating task %s: %s" % (str(task.task_id), str(ex))) @@ -325,17 +313,16 @@ class DrydockState(object): "WHERE task_id = :task_id").execution_options(autocommit=True) try: - conn = self.db_engine.connect() - rs = conn.execute( - query_string, - new_subtask=subtask_id.bytes, - task_id=task_id.bytes) - rc = rs.rowcount - conn.close() - if rc == 1: - return True - else: - return False + with self.db_engine.connect() as conn: + rs = conn.execute( + query_string, + new_subtask=subtask_id.bytes, + task_id=task_id.bytes) + rc = rs.rowcount + if rc == 1: + return True + else: + return False except Exception as ex: self.logger.error("Error appending subtask %s to task %s: %s" % (str(subtask_id), str(task_id), str(ex))) @@ -347,18 +334,17 @@ class DrydockState(object): :param leader_id: uuid.UUID ID of the leader """ try: - conn = self.db_engine.connect() - query = self.active_instance_tbl.update().where( - self.active_instance_tbl.c.identity == leader_id.bytes).values( - last_ping=datetime.utcnow()) - rs = conn.execute(query) - rc = rs.rowcount - conn.close() + with self.db_engine.connect() as conn: + query = self.active_instance_tbl.update().where( + self.active_instance_tbl.c.identity == leader_id.bytes).values( + last_ping=datetime.utcnow()) + rs = conn.execute(query) + rc = rs.rowcount - if rc == 1: - return True - else: - return False + if rc == 1: + return True + else: + return False except Exception as ex: self.logger.error("Error maintaining leadership: %s" % str(ex)) @@ -385,13 +371,12 @@ class DrydockState(object): autocommit=True) try: - conn = self.db_engine.connect() - conn.execute(query_string, instance_id=leader_id.bytes) - check_query = self.active_instance_tbl.select().where( - self.active_instance_tbl.c.identity == leader_id.bytes) - rs = conn.execute(check_query) - r = rs.fetchone() - conn.close() + with self.db_engine.connect() as conn: + conn.execute(query_string, instance_id=leader_id.bytes) + check_query = self.active_instance_tbl.select().where( + self.active_instance_tbl.c.identity == leader_id.bytes) + rs = conn.execute(check_query) + r = rs.fetchone() if r is not None: return True else: @@ -406,12 +391,11 @@ class DrydockState(object): :param leader_id: a uuid.UUID instance identifying the instance giving up leadership """ try: - conn = self.db_engine.connect() - query = self.active_instance_tbl.delete().where( - self.active_instance_tbl.c.identity == leader_id.bytes) - rs = conn.execute(query) - rc = rs.rowcount - conn.close() + with self.db_engine.connect() as conn: + query = self.active_instance_tbl.delete().where( + self.active_instance_tbl.c.identity == leader_id.bytes) + rs = conn.execute(query) + rc = rs.rowcount if rc == 1: return True