From 8e076287f32bb0da3050dd793151112a01ce66f3 Mon Sep 17 00:00:00 2001 From: Anthony Lin Date: Tue, 16 Jan 2018 17:20:06 +0000 Subject: [PATCH] Update Operators There has been significant code changes in Airflow from version 1.8.2 to version 1.9.0 as seen in [0] In particular, it seems that we are not able to store the drydock session as xcom now due to serialization errors [1]. It appears that io.TextIOWrapper (which is a wrapper that converts binary file-like objects to text file-like objects) can't be serialized as it assumes it may have external/run time state, e.g. a file descriptor that has a specific position in a file that may not be available when it is deserialized at a later stage in time. Hence we are making changes to the logic such that we will create a new session for each new task instead of using xcom to store the session and re-use it across tasks. [0] https://github.com/apache/incubator-airflow/blob/master/CHANGELOG.txt [1] Exceptions Seen in Airflow 1.9.0 [2018-01-18 16:37:23,394] {base_task_runner.py:98} INFO - Subtask: Traceback (most recent call last): [2018-01-18 16:37:23,394] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/bin/airflow", line 27, in [2018-01-18 16:37:23,394] {base_task_runner.py:98} INFO - Subtask: args.func(args) [2018-01-18 16:37:23,395] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python3.5/dist-packages/airflow/bin/cli.py", line 392, in run [2018-01-18 16:37:23,395] {base_task_runner.py:98} INFO - Subtask: pool=args.pool, [2018-01-18 16:37:23,395] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python3.5/dist-packages/airflow/utils/db.py", line 50, in wrapper [2018-01-18 16:37:23,395] {base_task_runner.py:98} INFO - Subtask: result = func(*args, **kwargs) [2018-01-18 16:37:23,396] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python3.5/dist-packages/airflow/models.py", line 1497, in _run_raw_task [2018-01-18 16:37:23,396] {base_task_runner.py:98} INFO - Subtask: self.xcom_push(key=XCOM_RETURN_KEY, value=result) [2018-01-18 16:37:23,396] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python3.5/dist-packages/airflow/models.py", line 1817, in xcom_push [2018-01-18 16:37:23,396] {base_task_runner.py:98} INFO - Subtask: execution_date=execution_date or self.execution_date) [2018-01-18 16:37:23,397] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python3.5/dist-packages/airflow/utils/db.py", line 50, in wrapper [2018-01-18 16:37:23,397] {base_task_runner.py:98} INFO - Subtask: result = func(*args, **kwargs) [2018-01-18 16:37:23,397] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python3.5/dist-packages/airflow/models.py", line 4103, in set [2018-01-18 16:37:23,397] {base_task_runner.py:98} INFO - Subtask: value = pickle.dumps(value) [2018-01-18 16:37:23,398] {base_task_runner.py:98} INFO - Subtask: TypeError: cannot serialize '_io.TextIOWrapper' object Change-Id: I0fd686a91a86a36768a2caeed4b16a1dbbb040a3 --- shipyard_airflow/dags/armada_deploy_site.py | 10 ------- shipyard_airflow/dags/drydock_deploy_site.py | 9 ------- shipyard_airflow/plugins/armada_operator.py | 26 +++++++------------ shipyard_airflow/plugins/drydock_operators.py | 26 +++++++------------ 4 files changed, 18 insertions(+), 53 deletions(-) diff --git a/shipyard_airflow/dags/armada_deploy_site.py b/shipyard_airflow/dags/armada_deploy_site.py index 7007b1b7..e4f82171 100644 --- a/shipyard_airflow/dags/armada_deploy_site.py +++ b/shipyard_airflow/dags/armada_deploy_site.py @@ -29,15 +29,6 @@ def deploy_site_armada(parent_dag_name, child_dag_name, args): '{}.{}'.format(parent_dag_name, child_dag_name), default_args=args) - # Create Armada Client - armada_client = ArmadaOperator( - task_id='create_armada_client', - shipyard_conf=config_path, - action='create_armada_client', - main_dag_name=parent_dag_name, - sub_dag_name=child_dag_name, - dag=dag) - # Get Tiller Status armada_status = ArmadaOperator( task_id='armada_status', @@ -67,7 +58,6 @@ def deploy_site_armada(parent_dag_name, child_dag_name, args): dag=dag) # Define dependencies - armada_status.set_upstream(armada_client) armada_apply.set_upstream(armada_status) armada_get_releases.set_upstream(armada_apply) diff --git a/shipyard_airflow/dags/drydock_deploy_site.py b/shipyard_airflow/dags/drydock_deploy_site.py index 728938fc..0dd9bae8 100644 --- a/shipyard_airflow/dags/drydock_deploy_site.py +++ b/shipyard_airflow/dags/drydock_deploy_site.py @@ -30,14 +30,6 @@ def deploy_site_drydock(parent_dag_name, child_dag_name, args): '{}.{}'.format(parent_dag_name, child_dag_name), default_args=args) - drydock_client = DryDockOperator( - task_id='create_drydock_client', - shipyard_conf=config_path, - action='create_drydock_client', - main_dag_name=parent_dag_name, - sub_dag_name=child_dag_name, - dag=dag) - drydock_verify_site = DryDockOperator( task_id='verify_site', shipyard_conf=config_path, @@ -71,7 +63,6 @@ def deploy_site_drydock(parent_dag_name, child_dag_name, args): dag=dag) # Define dependencies - drydock_verify_site.set_upstream(drydock_client) drydock_prepare_site.set_upstream(drydock_verify_site) drydock_prepare_nodes.set_upstream(drydock_prepare_site) drydock_deploy_nodes.set_upstream(drydock_prepare_nodes) diff --git a/shipyard_airflow/plugins/armada_operator.py b/shipyard_airflow/plugins/armada_operator.py index a2f5c1fd..6d0f2553 100644 --- a/shipyard_airflow/plugins/armada_operator.py +++ b/shipyard_airflow/plugins/armada_operator.py @@ -76,19 +76,6 @@ class ArmadaOperator(BaseOperator): # Logs uuid of action performed by the Operator logging.info("Armada Operator for action %s", workflow_info['id']) - # Create Armada Client - if self.action == 'create_armada_client': - # Retrieve Endpoint Information - svc_type = 'armada' - context['svc_endpoint'] = ucp_service_endpoint(self, - svc_type=svc_type) - logging.info("Armada endpoint is %s", context['svc_endpoint']) - - # Set up Armada Client - session_client = self.armada_session_client(context) - - return session_client - # Retrieve Deckhand Design Reference design_ref = self.get_deckhand_design_ref(context) @@ -118,10 +105,15 @@ class ArmadaOperator(BaseOperator): return site_design_validity - # Retrieve armada_client via XCOM so as to perform other tasks - armada_client = task_instance.xcom_pull( - task_ids='create_armada_client', - dag_id=self.main_dag_name + '.' + self.sub_dag_name) + # Create Armada Client + # Retrieve Endpoint Information + svc_type = 'armada' + context['svc_endpoint'] = ucp_service_endpoint(self, + svc_type=svc_type) + logging.info("Armada endpoint is %s", context['svc_endpoint']) + + # Set up Armada Client + armada_client = self.armada_session_client(context) # Retrieve Tiller Information and assign to context 'query' context['query'] = self.get_tiller_info(context) diff --git a/shipyard_airflow/plugins/drydock_operators.py b/shipyard_airflow/plugins/drydock_operators.py index 3a27bc92..d71983ca 100644 --- a/shipyard_airflow/plugins/drydock_operators.py +++ b/shipyard_airflow/plugins/drydock_operators.py @@ -88,19 +88,6 @@ class DryDockOperator(BaseOperator): # Logs uuid of action performed by the Operator logging.info("DryDock Operator for action %s", workflow_info['id']) - # DrydockClient - if self.action == 'create_drydock_client': - # Retrieve Endpoint Information - svc_type = 'physicalprovisioner' - context['svc_endpoint'] = ucp_service_endpoint(self, - svc_type=svc_type) - logging.info("DryDock endpoint is %s", context['svc_endpoint']) - - # Set up DryDock Client - drydock_client = self.drydock_session_client(context) - - return drydock_client - # Retrieve Deckhand Design Reference self.design_ref = self.get_deckhand_design_ref(context) @@ -124,10 +111,15 @@ class DryDockOperator(BaseOperator): return site_design_validity - # Retrieve drydock_client via XCOM so as to perform other tasks - drydock_client = task_instance.xcom_pull( - task_ids='create_drydock_client', - dag_id=self.main_dag_name + '.' + self.sub_dag_name) + # DrydockClient + # Retrieve Endpoint Information + svc_type = 'physicalprovisioner' + context['svc_endpoint'] = ucp_service_endpoint(self, + svc_type=svc_type) + logging.info("DryDock endpoint is %s", context['svc_endpoint']) + + # Set up DryDock Client + drydock_client = self.drydock_session_client(context) # Read shipyard.conf config = configparser.ConfigParser()