diff options
author | Bryan Strassner <bryan.strassner@gmail.com> | 2018-11-28 18:45:27 -0600 |
---|---|---|
committer | Bryan Strassner <strassner.bryan@gmail.com> | 2018-12-05 12:37:14 -0600 |
commit | d6c72d19e6bcad116238fb9882c54b75510c5b70 (patch) | |
tree | da8a5f625a3b200b9b4d2428a4df6449f0c00f28 | |
parent | 03d7269b6ad9d7b734cb3a7bc1693139e639a9c5 (diff) |
Update to Airflow 1.10.1 and restore sysout
Updates to Airflow 1.10.1; See (1), (2) for some notes
Related, and additionally: configures Airflow to restore logging of
workflow steps to a console/sdtout logger, supporting the desired
ability to attach logging and monitoring to standard container
mechanisms. This does not change the behavior of also logging to the
airflow-arranged log files for steps and DAG runs.
A side effect of updating to 1.10.1 includes a major decrease in
resource usage by the Airflow scheudler process (reducing from ~ 1 core
fully consumed to less than 5% of a core consumed YMMV, but significant)
Additional adjustment downward of resources allocated, threads produced,
and frequency of polling leads to an overall significant reduction in
resource usage.
Airship note:
Because Airflow 1.10.0 and 1.10.1 use compatible versions of celery and
dag_run information, updating from 1.10.0 - 1.10.1 in place is possible
if airflow-worker pods are allowed to continue to run.
(1) https://github.com/apache/incubator-airflow/blob/master/UPDATING.md
(2) https://github.com/apache/incubator-airflow/releases/tag/1.10.1
Change-Id: I9b024e3996c528c7b74e2888191d48c7a45a1f04
Notes
Notes (review):
Code-Review+2: Scott Hussey <sthussey@att.com>
Code-Review+1: Nishant Kumar <nishant.e.kumar@ericsson.com>
Code-Review+2: Aaron Sheffield <ajs@sheffieldfamily.net>
Workflow+1: Aaron Sheffield <ajs@sheffieldfamily.net>
Verified+2: Zuul
Submitted-by: Zuul
Submitted-at: Fri, 07 Dec 2018 15:37:09 +0000
Reviewed-on: https://review.openstack.org/620758
Project: openstack/airship-shipyard
Branch: refs/heads/master
-rw-r--r-- | charts/shipyard/values.yaml | 57 | ||||
-rw-r--r-- | images/airflow/Dockerfile | 4 | ||||
-rw-r--r-- | images/airflow/config/__init__.py | 0 | ||||
-rw-r--r-- | images/airflow/config/log_config.py | 261 | ||||
-rw-r--r-- | images/airflow/requirements.txt | 2 | ||||
-rw-r--r-- | src/bin/shipyard_airflow/test-requirements.txt | 2 |
6 files changed, 305 insertions, 21 deletions
diff --git a/charts/shipyard/values.yaml b/charts/shipyard/values.yaml index 60233b1..263f583 100644 --- a/charts/shipyard/values.yaml +++ b/charts/shipyard/values.yaml | |||
@@ -448,8 +448,9 @@ conf: | |||
448 | encrypt_s3_logs: "False" | 448 | encrypt_s3_logs: "False" |
449 | logging_level: "INFO" | 449 | logging_level: "INFO" |
450 | fab_logging_level: "WARN" | 450 | fab_logging_level: "WARN" |
451 | # TODO(bryan-strassner) Use this for custom log formatting! | 451 | # See image-bundled log_config.py. |
452 | logging_config_class: "" | 452 | # Adds console logging of task/step logs. |
453 | logging_config_class: log_config.LOGGING_CONFIG | ||
453 | # NOTE: Airflow 1.10 introduces extra newline characters between log | 454 | # NOTE: Airflow 1.10 introduces extra newline characters between log |
454 | # records. Version 1.10.1 should resolve this issue | 455 | # records. Version 1.10.1 should resolve this issue |
455 | # https://issues.apache.org/jira/browse/AIRFLOW-1917 | 456 | # https://issues.apache.org/jira/browse/AIRFLOW-1917 |
@@ -467,26 +468,29 @@ conf: | |||
467 | simple_log_format: "%%(asctime)s %%(levelname)s - %%(message)s" | 468 | simple_log_format: "%%(asctime)s %%(levelname)s - %%(message)s" |
468 | log_filename_template: "{{ ti.dag_id }}/{{ ti.task_id }}/{{ execution_date.strftime('%%Y-%%m-%%dT%%H:%%M:%%S') }}/{{ try_number }}.log" | 469 | log_filename_template: "{{ ti.dag_id }}/{{ ti.task_id }}/{{ execution_date.strftime('%%Y-%%m-%%dT%%H:%%M:%%S') }}/{{ try_number }}.log" |
469 | log_processor_filename_template: "{{ filename }}.log" | 470 | log_processor_filename_template: "{{ filename }}.log" |
471 | dag_processor_manager_log_location: /usr/local/airflow/logs/dag_processor_manager/dag_processor_manager.log | ||
470 | hostname_callable: "socket:getfqdn" | 472 | hostname_callable: "socket:getfqdn" |
471 | default_timezone: "utc" | 473 | default_timezone: "utc" |
472 | executor: "CeleryExecutor" | 474 | executor: "CeleryExecutor" |
473 | # sql_alchemy_conn is extracted from endpoints by the configmap template | 475 | # sql_alchemy_conn is extracted from endpoints by the configmap template |
476 | sql_engine_encoding: "utf-8" | ||
474 | sql_alchemy_pool_enabled: "True" | 477 | sql_alchemy_pool_enabled: "True" |
475 | sql_alchemy_pool_size: 5 | 478 | sql_alchemy_pool_size: 5 |
476 | sql_alchemy_pool_recycle: 1800 | 479 | sql_alchemy_pool_recycle: 1800 |
477 | sql_alchemy_reconnect_timeout: 30 | 480 | sql_alchemy_reconnect_timeout: 30 |
481 | sql_alchemy_schema: "" | ||
478 | parallelism: 32 | 482 | parallelism: 32 |
479 | dag_concurrency: 16 | 483 | dag_concurrency: 8 |
480 | dags_are_paused_at_creation: "False" | 484 | dags_are_paused_at_creation: "False" |
481 | non_pooled_task_slot_count: 128 | 485 | non_pooled_task_slot_count: 128 |
482 | max_active_runs_per_dag: 16 | 486 | max_active_runs_per_dag: 8 |
483 | load_examples: "False" | 487 | load_examples: "False" |
484 | plugins_folder: /usr/local/airflow/plugins | 488 | plugins_folder: /usr/local/airflow/plugins |
485 | fernet_key: fKp7omMJ4QlTxfZzVBSiyXVgeCK-6epRjGgMpEIsjvs= | 489 | fernet_key: fKp7omMJ4QlTxfZzVBSiyXVgeCK-6epRjGgMpEIsjvs= |
486 | donot_pickle: "False" | 490 | donot_pickle: "False" |
487 | dagbag_import_timeout: 30 | 491 | dagbag_import_timeout: 30 |
488 | # NOTE: Versions after 1.10 will change this to StandardTaskRunner | ||
489 | task_runner: "BashTaskRunner" | 492 | task_runner: "BashTaskRunner" |
493 | # task_runner: "StandardTaskRunner" -- coming soon? | ||
490 | default_impersonation: "" | 494 | default_impersonation: "" |
491 | security: "" | 495 | security: "" |
492 | secure_mode: "True" | 496 | secure_mode: "True" |
@@ -495,6 +499,7 @@ conf: | |||
495 | enable_xcom_pickling: "False" | 499 | enable_xcom_pickling: "False" |
496 | killed_task_cleanup_time: 60 | 500 | killed_task_cleanup_time: 60 |
497 | dag_run_conf_overrides_params: "False" | 501 | dag_run_conf_overrides_params: "False" |
502 | worker_precheck: "False" | ||
498 | cli: | 503 | cli: |
499 | api_client: airflow.api.client.local_client | 504 | api_client: airflow.api.client.local_client |
500 | # endpoint_url is extracted from endpoints by the configmap template | 505 | # endpoint_url is extracted from endpoints by the configmap template |
@@ -511,7 +516,7 @@ conf: | |||
511 | username: "" | 516 | username: "" |
512 | password: "" | 517 | password: "" |
513 | operators: | 518 | operators: |
514 | default_owner: "Airflow" | 519 | default_owner: "airflow" |
515 | default_cpus: 1 | 520 | default_cpus: 1 |
516 | default_ram: 512 | 521 | default_ram: 512 |
517 | default_disk: 512 | 522 | default_disk: 512 |
@@ -528,8 +533,8 @@ conf: | |||
528 | web_server_master_timeout: 120 | 533 | web_server_master_timeout: 120 |
529 | web_server_worker_timeout: 120 | 534 | web_server_worker_timeout: 120 |
530 | worker_refresh_batch_size: 1 | 535 | worker_refresh_batch_size: 1 |
531 | worker_refresh_interval: 30 | 536 | worker_refresh_interval: 120 |
532 | secret_key: "temporary_key" | 537 | secret_key: "{SECRET_KEY}" |
533 | workers: 4 | 538 | workers: 4 |
534 | worker_class: "sync" | 539 | worker_class: "sync" |
535 | access_logfile: "-" | 540 | access_logfile: "-" |
@@ -541,12 +546,13 @@ conf: | |||
541 | dag_default_view: "tree" | 546 | dag_default_view: "tree" |
542 | dag_orientation: "LR" | 547 | dag_orientation: "LR" |
543 | demo_mode: "False" | 548 | demo_mode: "False" |
544 | log_fetch_timeout_sec: 10 | 549 | log_fetch_timeout_sec: 20 |
545 | hide_paused_dags_by_default: "False" | 550 | hide_paused_dags_by_default: "False" |
546 | page_size: 100 | 551 | page_size: 100 |
547 | rbac: "False" | 552 | rbac: "False" |
548 | navbar_color: "#007A87" | 553 | navbar_color: "#007A87" |
549 | default_dag_run_display_number: 25 | 554 | default_dag_run_display_number: 25 |
555 | enable_proxy_fix: "False" | ||
550 | email: | 556 | email: |
551 | # Shipyard is not using this | 557 | # Shipyard is not using this |
552 | email_backend: airflow.utils.send_email_smtp | 558 | email_backend: airflow.utils.send_email_smtp |
@@ -561,14 +567,23 @@ conf: | |||
561 | smtp_mail_from: airflow@airflow.local | 567 | smtp_mail_from: airflow@airflow.local |
562 | celery: | 568 | celery: |
563 | celery_app_name: airflow.executors.celery_executor | 569 | celery_app_name: airflow.executors.celery_executor |
564 | worker_concurrency: 16 | 570 | # The concurrency that will be used when starting workers with the |
571 | # "airflow worker" command. This defines the number of task instances | ||
572 | # that a worker will take, so size up your workers based on the resources | ||
573 | # on your worker box and the nature of your tasks | ||
574 | worker_concurrency: 4 | ||
565 | worker_log_server_port: 8793 | 575 | worker_log_server_port: 8793 |
566 | # broker_url is extracted from endpoints by the configmap template | 576 | # broker_url is extracted from endpoints by the configmap template |
567 | # result_backend is extracted from endpoints by the configmap template | 577 | # result_backend is extracted from endpoints by the configmap template |
568 | flower_host: 0.0.0.0 | 578 | flower_host: 0.0.0.0 |
569 | flower_url_prefix: | 579 | flower_url_prefix: "" |
570 | flower_port: 5555 | 580 | flower_port: 5555 |
581 | flower_basic_auth: "" | ||
571 | default_queue: "default" | 582 | default_queue: "default" |
583 | # How many processes CeleryExecutor uses to sync task state. | ||
584 | # 0 means to use max(1, number of cores - 1) processes. | ||
585 | # set to 1 for low-volume use | ||
586 | sync_parallelism: 1 | ||
572 | celery_config_options: airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG | 587 | celery_config_options: airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG |
573 | # TODO: Enable this for security | 588 | # TODO: Enable this for security |
574 | ssl_active: "False" | 589 | ssl_active: "False" |
@@ -584,14 +599,17 @@ conf: | |||
584 | tls_cert: "" | 599 | tls_cert: "" |
585 | tls_key: "" | 600 | tls_key: "" |
586 | scheduler: | 601 | scheduler: |
587 | job_heartbeat_sec: 5 | 602 | # Task instances listen for external kill signal (when you clear tasks |
588 | scheduler_heartbeat_sec: 5 | 603 | # from the CLI or the UI), this defines the frequency at which they |
604 | # should listen (in seconds). | ||
605 | job_heartbeat_sec: 10 | ||
606 | # The scheduler constantly tries to trigger new tasks (look at the | ||
607 | # scheduler section in the docs for more information). This defines | ||
608 | # how often the scheduler should run (in seconds). | ||
609 | scheduler_heartbeat_sec: 10 | ||
589 | run_duration: -1 | 610 | run_duration: -1 |
590 | # Check for pending tasks no more than every 5 seconds | 611 | # Check for pending dag runs no more than every 10 seconds |
591 | min_file_process_interval: 5 | 612 | min_file_process_interval: 10 |
592 | # This is part of 1.10, but disabled in 1.10.1 (pending) See: | ||
593 | # https://github.com/apache/incubator-airflow/blob/master/UPDATING.md#min_file_parsing_loop_time-config-option-temporarily-disabled | ||
594 | min_file_parsing_loop_time: 1 | ||
595 | dag_dir_list_interval: 300 | 613 | dag_dir_list_interval: 300 |
596 | # Stats for the scheduler are minimally useful - every 5 mins is enough | 614 | # Stats for the scheduler are minimally useful - every 5 mins is enough |
597 | print_stats_interval: 300 | 615 | print_stats_interval: 300 |
@@ -606,6 +624,9 @@ conf: | |||
606 | # Shipyard's use of Airflow is low volume. 1 Thread is probably enough. | 624 | # Shipyard's use of Airflow is low volume. 1 Thread is probably enough. |
607 | max_threads: 1 | 625 | max_threads: 1 |
608 | authenticate: "False" | 626 | authenticate: "False" |
627 | # Turn off scheduler use of cron intervals by setting this to False. | ||
628 | # DAGs submitted manually in the web UI or with trigger_dag will still run. | ||
629 | use_job_schedule: "False" | ||
609 | ldap: | 630 | ldap: |
610 | # Shipyard is not using this | 631 | # Shipyard is not using this |
611 | uri: "" | 632 | uri: "" |
diff --git a/images/airflow/Dockerfile b/images/airflow/Dockerfile index f5ac4da..8652f15 100644 --- a/images/airflow/Dockerfile +++ b/images/airflow/Dockerfile | |||
@@ -91,9 +91,11 @@ RUN pip3 install -r /tmp/requirements.txt \ | |||
91 | && pip3 uninstall -y snakebite || true | 91 | && pip3 uninstall -y snakebite || true |
92 | 92 | ||
93 | # Copy scripts used in the container: | 93 | # Copy scripts used in the container: |
94 | # entrypoint.sh, airflow_start_service.sh and airflow_logrotate.sh | ||
95 | COPY images/airflow/script/*.sh ${AIRFLOW_HOME}/ | 94 | COPY images/airflow/script/*.sh ${AIRFLOW_HOME}/ |
96 | 95 | ||
96 | # Copy configuration (e.g. logging config for Airflow): | ||
97 | COPY images/airflow/config/*.py ${AIRFLOW_HOME}/config/ | ||
98 | |||
97 | # Change permissions | 99 | # Change permissions |
98 | RUN chown -R airflow: ${AIRFLOW_HOME} | 100 | RUN chown -R airflow: ${AIRFLOW_HOME} |
99 | 101 | ||
diff --git a/images/airflow/config/__init__.py b/images/airflow/config/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/images/airflow/config/__init__.py | |||
diff --git a/images/airflow/config/log_config.py b/images/airflow/config/log_config.py new file mode 100644 index 0000000..5afcbff --- /dev/null +++ b/images/airflow/config/log_config.py | |||
@@ -0,0 +1,261 @@ | |||
1 | # -*- coding: utf-8 -*- | ||
2 | # | ||
3 | # Licensed to the Apache Software Foundation (ASF) under one | ||
4 | # or more contributor license agreements. See the NOTICE file | ||
5 | # distributed with this work for additional information | ||
6 | # regarding copyright ownership. The ASF licenses this file | ||
7 | # to you under the Apache License, Version 2.0 (the | ||
8 | # "License"); you may not use this file except in compliance | ||
9 | # with the License. You may obtain a copy of the License at | ||
10 | # | ||
11 | # http://www.apache.org/licenses/LICENSE-2.0 | ||
12 | # | ||
13 | # Unless required by applicable law or agreed to in writing, | ||
14 | # software distributed under the License is distributed on an | ||
15 | # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
16 | # KIND, either express or implied. See the License for the | ||
17 | # specific language governing permissions and limitations | ||
18 | # under the License. | ||
19 | # | ||
20 | # !Important Shipyard/Airflow usage note: | ||
21 | # | ||
22 | # This file is copied from: | ||
23 | # https://github.com/apache/incubator-airflow/blob/master/airflow/config_templates/airflow_local_settings.py | ||
24 | # as this is the recommended way to configure logging as of version 1.10.x | ||
25 | # | ||
26 | # See documentation here: | ||
27 | # https://airflow.readthedocs.io/en/stable/howto/write-logs.html#writing-logs-to-azure-blob-storage | ||
28 | # | ||
29 | # (We are not using azure blob storage at this time, but the included | ||
30 | # instructional steps are pertinent) | ||
31 | # | ||
32 | # Because this file is in the "plugins" directory, it should be referenced | ||
33 | # in the Helm chart's values.yaml as config.log_config.LOGGING_CONFIG | ||
34 | # as opposed to log_config.LOGGING_CONFIG in a new directory in the PYTHONPATH | ||
35 | # as noted in the linked instructions. | ||
36 | # | ||
37 | |||
38 | import os | ||
39 | |||
40 | from airflow import configuration as conf | ||
41 | from airflow.utils.file import mkdirs | ||
42 | |||
43 | # TODO: Logging format and level should be configured | ||
44 | # in this file instead of from airflow.cfg. Currently | ||
45 | # there are other log format and level configurations in | ||
46 | # settings.py and cli.py. Please see AIRFLOW-1455. | ||
47 | LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper() | ||
48 | |||
49 | # Flask appbuilder's info level log is very verbose, | ||
50 | # so it's set to 'WARN' by default. | ||
51 | FAB_LOG_LEVEL = conf.get('core', 'FAB_LOGGING_LEVEL').upper() | ||
52 | |||
53 | LOG_FORMAT = conf.get('core', 'LOG_FORMAT') | ||
54 | |||
55 | BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER') | ||
56 | |||
57 | PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'CHILD_PROCESS_LOG_DIRECTORY') | ||
58 | |||
59 | DAG_PROCESSOR_MANAGER_LOG_LOCATION = \ | ||
60 | conf.get('core', 'DAG_PROCESSOR_MANAGER_LOG_LOCATION') | ||
61 | |||
62 | FILENAME_TEMPLATE = conf.get('core', 'LOG_FILENAME_TEMPLATE') | ||
63 | |||
64 | PROCESSOR_FILENAME_TEMPLATE = conf.get('core', | ||
65 | 'LOG_PROCESSOR_FILENAME_TEMPLATE') | ||
66 | |||
67 | # Storage bucket url for remote logging | ||
68 | # s3 buckets should start with "s3://" | ||
69 | # gcs buckets should start with "gs://" | ||
70 | # wasb buckets should start with "wasb" | ||
71 | # just to help Airflow select correct handler | ||
72 | REMOTE_BASE_LOG_FOLDER = conf.get('core', 'REMOTE_BASE_LOG_FOLDER') | ||
73 | |||
74 | ELASTICSEARCH_HOST = conf.get('elasticsearch', 'ELASTICSEARCH_HOST') | ||
75 | |||
76 | LOG_ID_TEMPLATE = conf.get('elasticsearch', 'ELASTICSEARCH_LOG_ID_TEMPLATE') | ||
77 | |||
78 | END_OF_LOG_MARK = conf.get('elasticsearch', 'ELASTICSEARCH_END_OF_LOG_MARK') | ||
79 | |||
80 | # NOTE: Modified for use by Shipyard/Airflow (rename to LOGGING_CONFIG): | ||
81 | LOGGING_CONFIG = { | ||
82 | 'version': 1, | ||
83 | 'disable_existing_loggers': False, | ||
84 | 'formatters': { | ||
85 | 'airflow': { | ||
86 | 'format': LOG_FORMAT, | ||
87 | }, | ||
88 | }, | ||
89 | 'handlers': { | ||
90 | # NOTE: Add a "raw" python console logger. Using 'console' results | ||
91 | # in a state of recursion. | ||
92 | 'py-console': { | ||
93 | 'class': 'logging.StreamHandler', | ||
94 | 'formatter': 'airflow', | ||
95 | 'stream': 'ext://sys.stdout' | ||
96 | }, | ||
97 | 'console': { | ||
98 | 'class': 'airflow.utils.log.logging_mixin.RedirectStdHandler', | ||
99 | 'formatter': 'airflow', | ||
100 | 'stream': 'sys.stdout' | ||
101 | }, | ||
102 | 'task': { | ||
103 | 'class': 'airflow.utils.log.file_task_handler.FileTaskHandler', | ||
104 | 'formatter': 'airflow', | ||
105 | 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), | ||
106 | 'filename_template': FILENAME_TEMPLATE, | ||
107 | }, | ||
108 | 'processor': { | ||
109 | 'class': | ||
110 | 'airflow.utils.log.file_processor_handler.FileProcessorHandler', | ||
111 | 'formatter': | ||
112 | 'airflow', | ||
113 | 'base_log_folder': | ||
114 | os.path.expanduser(PROCESSOR_LOG_FOLDER), | ||
115 | 'filename_template': | ||
116 | PROCESSOR_FILENAME_TEMPLATE, | ||
117 | } | ||
118 | }, | ||
119 | 'loggers': { | ||
120 | 'airflow.processor': { | ||
121 | 'handlers': ['processor'], | ||
122 | 'level': LOG_LEVEL, | ||
123 | 'propagate': False, | ||
124 | }, | ||
125 | 'airflow.task': { | ||
126 | # NOTE: Modified for use by Shipyard/Airflow (add console logging) | ||
127 | # The supplied console logger cannot be used here, as it | ||
128 | # Leads to out-of-control memory usage | ||
129 | 'handlers': ['task', 'py-console'], | ||
130 | 'level': LOG_LEVEL, | ||
131 | 'propagate': False, | ||
132 | }, | ||
133 | 'flask_appbuilder': { | ||
134 | # NOTE: Modified this to be "handlers" | ||
135 | 'handlers': ['console'], | ||
136 | 'level': FAB_LOG_LEVEL, | ||
137 | 'propagate': True, | ||
138 | } | ||
139 | }, | ||
140 | 'root': { | ||
141 | 'handlers': ['console'], | ||
142 | 'level': LOG_LEVEL, | ||
143 | } | ||
144 | } | ||
145 | |||
146 | DEFAULT_DAG_PARSING_LOGGING_CONFIG = { | ||
147 | 'handlers': { | ||
148 | 'processor_manager': { | ||
149 | 'class': 'logging.handlers.RotatingFileHandler', | ||
150 | 'formatter': 'airflow', | ||
151 | 'filename': DAG_PROCESSOR_MANAGER_LOG_LOCATION, | ||
152 | 'mode': 'a', | ||
153 | 'maxBytes': 104857600, # 100MB | ||
154 | 'backupCount': 5 | ||
155 | } | ||
156 | }, | ||
157 | 'loggers': { | ||
158 | 'airflow.processor_manager': { | ||
159 | 'handlers': ['processor_manager'], | ||
160 | 'level': LOG_LEVEL, | ||
161 | 'propagate': False, | ||
162 | } | ||
163 | } | ||
164 | } | ||
165 | |||
166 | REMOTE_HANDLERS = { | ||
167 | 's3': { | ||
168 | 'task': { | ||
169 | 'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler', | ||
170 | 'formatter': 'airflow', | ||
171 | 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), | ||
172 | 's3_log_folder': REMOTE_BASE_LOG_FOLDER, | ||
173 | 'filename_template': FILENAME_TEMPLATE, | ||
174 | }, | ||
175 | 'processor': { | ||
176 | 'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler', | ||
177 | 'formatter': 'airflow', | ||
178 | 'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER), | ||
179 | 's3_log_folder': REMOTE_BASE_LOG_FOLDER, | ||
180 | 'filename_template': PROCESSOR_FILENAME_TEMPLATE, | ||
181 | }, | ||
182 | }, | ||
183 | 'gcs': { | ||
184 | 'task': { | ||
185 | 'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler', | ||
186 | 'formatter': 'airflow', | ||
187 | 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), | ||
188 | 'gcs_log_folder': REMOTE_BASE_LOG_FOLDER, | ||
189 | 'filename_template': FILENAME_TEMPLATE, | ||
190 | }, | ||
191 | 'processor': { | ||
192 | 'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler', | ||
193 | 'formatter': 'airflow', | ||
194 | 'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER), | ||
195 | 'gcs_log_folder': REMOTE_BASE_LOG_FOLDER, | ||
196 | 'filename_template': PROCESSOR_FILENAME_TEMPLATE, | ||
197 | }, | ||
198 | }, | ||
199 | 'wasb': { | ||
200 | 'task': { | ||
201 | 'class': 'airflow.utils.log.wasb_task_handler.WasbTaskHandler', | ||
202 | 'formatter': 'airflow', | ||
203 | 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), | ||
204 | 'wasb_log_folder': REMOTE_BASE_LOG_FOLDER, | ||
205 | 'wasb_container': 'airflow-logs', | ||
206 | 'filename_template': FILENAME_TEMPLATE, | ||
207 | 'delete_local_copy': False, | ||
208 | }, | ||
209 | 'processor': { | ||
210 | 'class': 'airflow.utils.log.wasb_task_handler.WasbTaskHandler', | ||
211 | 'formatter': 'airflow', | ||
212 | 'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER), | ||
213 | 'wasb_log_folder': REMOTE_BASE_LOG_FOLDER, | ||
214 | 'wasb_container': 'airflow-logs', | ||
215 | 'filename_template': PROCESSOR_FILENAME_TEMPLATE, | ||
216 | 'delete_local_copy': False, | ||
217 | }, | ||
218 | }, | ||
219 | 'elasticsearch': { | ||
220 | 'task': { | ||
221 | 'class': | ||
222 | 'airflow.utils.log.es_task_handler.ElasticsearchTaskHandler', | ||
223 | 'formatter': 'airflow', | ||
224 | 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), | ||
225 | 'log_id_template': LOG_ID_TEMPLATE, | ||
226 | 'filename_template': FILENAME_TEMPLATE, | ||
227 | 'end_of_log_mark': END_OF_LOG_MARK, | ||
228 | 'host': ELASTICSEARCH_HOST, | ||
229 | }, | ||
230 | }, | ||
231 | } | ||
232 | |||
233 | # NOTE: Modified for use by Shipyard/Airflow to "getboolean" as existing | ||
234 | # code of conf.get would evaluate "False" as true. | ||
235 | REMOTE_LOGGING = conf.getboolean('core', 'remote_logging') | ||
236 | |||
237 | # Only update the handlers and loggers when CONFIG_PROCESSOR_MANAGER_LOGGER is | ||
238 | # set. | ||
239 | # This is to avoid exceptions when initializing RotatingFileHandler multiple | ||
240 | # times in multiple processes. | ||
241 | if os.environ.get('CONFIG_PROCESSOR_MANAGER_LOGGER') == 'True': | ||
242 | LOGGING_CONFIG['handlers'] \ | ||
243 | .update(DEFAULT_DAG_PARSING_LOGGING_CONFIG['handlers']) | ||
244 | LOGGING_CONFIG['loggers'] \ | ||
245 | .update(DEFAULT_DAG_PARSING_LOGGING_CONFIG['loggers']) | ||
246 | |||
247 | # Manually create log directory for processor_manager handler as | ||
248 | # RotatingFileHandler will only create file but not the directory. | ||
249 | processor_manager_handler_config = DEFAULT_DAG_PARSING_LOGGING_CONFIG[ | ||
250 | 'handlers']['processor_manager'] | ||
251 | directory = os.path.dirname(processor_manager_handler_config['filename']) | ||
252 | mkdirs(directory, 0o755) | ||
253 | |||
254 | if REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('s3://'): | ||
255 | LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['s3']) | ||
256 | elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('gs://'): | ||
257 | LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['gcs']) | ||
258 | elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('wasb'): | ||
259 | LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['wasb']) | ||
260 | elif REMOTE_LOGGING and ELASTICSEARCH_HOST: | ||
261 | LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['elasticsearch']) | ||
diff --git a/images/airflow/requirements.txt b/images/airflow/requirements.txt index 5729903..3ec4246 100644 --- a/images/airflow/requirements.txt +++ b/images/airflow/requirements.txt | |||
@@ -18,7 +18,7 @@ ndg-httpsclient==0.5.1 | |||
18 | pyasn1==0.4.4 | 18 | pyasn1==0.4.4 |
19 | psycopg2==2.7.5 | 19 | psycopg2==2.7.5 |
20 | docker==3.5.0 | 20 | docker==3.5.0 |
21 | apache-airflow[crypto,celery,postgres,hive,hdfs,jdbc]==1.10.0 | 21 | apache-airflow[crypto,celery,postgres,hive,hdfs,jdbc]==1.10.1 |
22 | python-openstackclient==3.16.1 | 22 | python-openstackclient==3.16.1 |
23 | kubernetes>=6.0.0 | 23 | kubernetes>=6.0.0 |
24 | 24 | ||
diff --git a/src/bin/shipyard_airflow/test-requirements.txt b/src/bin/shipyard_airflow/test-requirements.txt index 0e8884d..2947946 100644 --- a/src/bin/shipyard_airflow/test-requirements.txt +++ b/src/bin/shipyard_airflow/test-requirements.txt | |||
@@ -3,7 +3,7 @@ pytest==3.4 | |||
3 | pytest-cov==2.5.1 | 3 | pytest-cov==2.5.1 |
4 | responses==0.8.1 | 4 | responses==0.8.1 |
5 | testfixtures==5.1.1 | 5 | testfixtures==5.1.1 |
6 | apache-airflow[crypto,celery,postgres,hive,hdfs,jdbc]==1.10.0 | 6 | apache-airflow[crypto,celery,postgres,hive,hdfs,jdbc]==1.10.1 |
7 | 7 | ||
8 | # TODO(bryan-strassner) Pin to version for airflow when added to the | 8 | # TODO(bryan-strassner) Pin to version for airflow when added to the |
9 | # requirements.txt in the airflow images directory | 9 | # requirements.txt in the airflow images directory |