8e076287f3
There has been significant code changes in Airflow from version 1.8.2 to version 1.9.0 as seen in [0] In particular, it seems that we are not able to store the drydock session as xcom now due to serialization errors [1]. It appears that io.TextIOWrapper (which is a wrapper that converts binary file-like objects to text file-like objects) can't be serialized as it assumes it may have external/run time state, e.g. a file descriptor that has a specific position in a file that may not be available when it is deserialized at a later stage in time. Hence we are making changes to the logic such that we will create a new session for each new task instead of using xcom to store the session and re-use it across tasks. [0] https://github.com/apache/incubator-airflow/blob/master/CHANGELOG.txt [1] Exceptions Seen in Airflow 1.9.0 [2018-01-18 16:37:23,394] {base_task_runner.py:98} INFO - Subtask: Traceback (most recent call last): [2018-01-18 16:37:23,394] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/bin/airflow", line 27, in <module> [2018-01-18 16:37:23,394] {base_task_runner.py:98} INFO - Subtask: args.func(args) [2018-01-18 16:37:23,395] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python3.5/dist-packages/airflow/bin/cli.py", line 392, in run [2018-01-18 16:37:23,395] {base_task_runner.py:98} INFO - Subtask: pool=args.pool, [2018-01-18 16:37:23,395] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python3.5/dist-packages/airflow/utils/db.py", line 50, in wrapper [2018-01-18 16:37:23,395] {base_task_runner.py:98} INFO - Subtask: result = func(*args, **kwargs) [2018-01-18 16:37:23,396] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python3.5/dist-packages/airflow/models.py", line 1497, in _run_raw_task [2018-01-18 16:37:23,396] {base_task_runner.py:98} INFO - Subtask: self.xcom_push(key=XCOM_RETURN_KEY, value=result) [2018-01-18 16:37:23,396] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python3.5/dist-packages/airflow/models.py", line 1817, in xcom_push [2018-01-18 16:37:23,396] {base_task_runner.py:98} INFO - Subtask: execution_date=execution_date or self.execution_date) [2018-01-18 16:37:23,397] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python3.5/dist-packages/airflow/utils/db.py", line 50, in wrapper [2018-01-18 16:37:23,397] {base_task_runner.py:98} INFO - Subtask: result = func(*args, **kwargs) [2018-01-18 16:37:23,397] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python3.5/dist-packages/airflow/models.py", line 4103, in set [2018-01-18 16:37:23,397] {base_task_runner.py:98} INFO - Subtask: value = pickle.dumps(value) [2018-01-18 16:37:23,398] {base_task_runner.py:98} INFO - Subtask: TypeError: cannot serialize '_io.TextIOWrapper' object Change-Id: I0fd686a91a86a36768a2caeed4b16a1dbbb040a3 |
||
---|---|---|
.. | ||
conf | ||
control | ||
dags | ||
db | ||
plugins | ||
README.md | ||
__init__.py | ||
errors.py | ||
policy.py | ||
shipyard.py |
README.md
Shipyard Airflow
A python REST workflow orchestrator
To run:
$ virtualenv -p python3 /var/tmp/shipyard
$ . /var/tmp/shipyard/bin/activate
$ python setup.py install
$ uwsgi --http :9000 -w shipyard_airflow.shipyard --callable shipyard -L