diff --git a/charts/shipyard/values.yaml b/charts/shipyard/values.yaml index 60233b13..263f5839 100644 --- a/charts/shipyard/values.yaml +++ b/charts/shipyard/values.yaml @@ -448,8 +448,9 @@ conf: encrypt_s3_logs: "False" logging_level: "INFO" fab_logging_level: "WARN" - # TODO(bryan-strassner) Use this for custom log formatting! - logging_config_class: "" + # See image-bundled log_config.py. + # Adds console logging of task/step logs. + logging_config_class: log_config.LOGGING_CONFIG # NOTE: Airflow 1.10 introduces extra newline characters between log # records. Version 1.10.1 should resolve this issue # https://issues.apache.org/jira/browse/AIRFLOW-1917 @@ -467,26 +468,29 @@ conf: simple_log_format: "%%(asctime)s %%(levelname)s - %%(message)s" log_filename_template: "{{ ti.dag_id }}/{{ ti.task_id }}/{{ execution_date.strftime('%%Y-%%m-%%dT%%H:%%M:%%S') }}/{{ try_number }}.log" log_processor_filename_template: "{{ filename }}.log" + dag_processor_manager_log_location: /usr/local/airflow/logs/dag_processor_manager/dag_processor_manager.log hostname_callable: "socket:getfqdn" default_timezone: "utc" executor: "CeleryExecutor" # sql_alchemy_conn is extracted from endpoints by the configmap template + sql_engine_encoding: "utf-8" sql_alchemy_pool_enabled: "True" sql_alchemy_pool_size: 5 sql_alchemy_pool_recycle: 1800 sql_alchemy_reconnect_timeout: 30 + sql_alchemy_schema: "" parallelism: 32 - dag_concurrency: 16 + dag_concurrency: 8 dags_are_paused_at_creation: "False" non_pooled_task_slot_count: 128 - max_active_runs_per_dag: 16 + max_active_runs_per_dag: 8 load_examples: "False" plugins_folder: /usr/local/airflow/plugins fernet_key: fKp7omMJ4QlTxfZzVBSiyXVgeCK-6epRjGgMpEIsjvs= donot_pickle: "False" dagbag_import_timeout: 30 - # NOTE: Versions after 1.10 will change this to StandardTaskRunner task_runner: "BashTaskRunner" + # task_runner: "StandardTaskRunner" -- coming soon? default_impersonation: "" security: "" secure_mode: "True" @@ -495,6 +499,7 @@ conf: enable_xcom_pickling: "False" killed_task_cleanup_time: 60 dag_run_conf_overrides_params: "False" + worker_precheck: "False" cli: api_client: airflow.api.client.local_client # endpoint_url is extracted from endpoints by the configmap template @@ -511,7 +516,7 @@ conf: username: "" password: "" operators: - default_owner: "Airflow" + default_owner: "airflow" default_cpus: 1 default_ram: 512 default_disk: 512 @@ -528,8 +533,8 @@ conf: web_server_master_timeout: 120 web_server_worker_timeout: 120 worker_refresh_batch_size: 1 - worker_refresh_interval: 30 - secret_key: "temporary_key" + worker_refresh_interval: 120 + secret_key: "{SECRET_KEY}" workers: 4 worker_class: "sync" access_logfile: "-" @@ -541,12 +546,13 @@ conf: dag_default_view: "tree" dag_orientation: "LR" demo_mode: "False" - log_fetch_timeout_sec: 10 + log_fetch_timeout_sec: 20 hide_paused_dags_by_default: "False" page_size: 100 rbac: "False" navbar_color: "#007A87" default_dag_run_display_number: 25 + enable_proxy_fix: "False" email: # Shipyard is not using this email_backend: airflow.utils.send_email_smtp @@ -561,14 +567,23 @@ conf: smtp_mail_from: airflow@airflow.local celery: celery_app_name: airflow.executors.celery_executor - worker_concurrency: 16 + # The concurrency that will be used when starting workers with the + # "airflow worker" command. This defines the number of task instances + # that a worker will take, so size up your workers based on the resources + # on your worker box and the nature of your tasks + worker_concurrency: 4 worker_log_server_port: 8793 # broker_url is extracted from endpoints by the configmap template # result_backend is extracted from endpoints by the configmap template flower_host: 0.0.0.0 - flower_url_prefix: + flower_url_prefix: "" flower_port: 5555 + flower_basic_auth: "" default_queue: "default" + # How many processes CeleryExecutor uses to sync task state. + # 0 means to use max(1, number of cores - 1) processes. + # set to 1 for low-volume use + sync_parallelism: 1 celery_config_options: airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG # TODO: Enable this for security ssl_active: "False" @@ -584,14 +599,17 @@ conf: tls_cert: "" tls_key: "" scheduler: - job_heartbeat_sec: 5 - scheduler_heartbeat_sec: 5 + # Task instances listen for external kill signal (when you clear tasks + # from the CLI or the UI), this defines the frequency at which they + # should listen (in seconds). + job_heartbeat_sec: 10 + # The scheduler constantly tries to trigger new tasks (look at the + # scheduler section in the docs for more information). This defines + # how often the scheduler should run (in seconds). + scheduler_heartbeat_sec: 10 run_duration: -1 - # Check for pending tasks no more than every 5 seconds - min_file_process_interval: 5 - # This is part of 1.10, but disabled in 1.10.1 (pending) See: - # https://github.com/apache/incubator-airflow/blob/master/UPDATING.md#min_file_parsing_loop_time-config-option-temporarily-disabled - min_file_parsing_loop_time: 1 + # Check for pending dag runs no more than every 10 seconds + min_file_process_interval: 10 dag_dir_list_interval: 300 # Stats for the scheduler are minimally useful - every 5 mins is enough print_stats_interval: 300 @@ -606,6 +624,9 @@ conf: # Shipyard's use of Airflow is low volume. 1 Thread is probably enough. max_threads: 1 authenticate: "False" + # Turn off scheduler use of cron intervals by setting this to False. + # DAGs submitted manually in the web UI or with trigger_dag will still run. + use_job_schedule: "False" ldap: # Shipyard is not using this uri: "" diff --git a/images/airflow/Dockerfile b/images/airflow/Dockerfile index f5ac4da9..8652f158 100644 --- a/images/airflow/Dockerfile +++ b/images/airflow/Dockerfile @@ -91,9 +91,11 @@ RUN pip3 install -r /tmp/requirements.txt \ && pip3 uninstall -y snakebite || true # Copy scripts used in the container: -# entrypoint.sh, airflow_start_service.sh and airflow_logrotate.sh COPY images/airflow/script/*.sh ${AIRFLOW_HOME}/ +# Copy configuration (e.g. logging config for Airflow): +COPY images/airflow/config/*.py ${AIRFLOW_HOME}/config/ + # Change permissions RUN chown -R airflow: ${AIRFLOW_HOME} diff --git a/images/airflow/config/__init__.py b/images/airflow/config/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/images/airflow/config/log_config.py b/images/airflow/config/log_config.py new file mode 100644 index 00000000..5afcbff1 --- /dev/null +++ b/images/airflow/config/log_config.py @@ -0,0 +1,261 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# !Important Shipyard/Airflow usage note: +# +# This file is copied from: +# https://github.com/apache/incubator-airflow/blob/master/airflow/config_templates/airflow_local_settings.py +# as this is the recommended way to configure logging as of version 1.10.x +# +# See documentation here: +# https://airflow.readthedocs.io/en/stable/howto/write-logs.html#writing-logs-to-azure-blob-storage +# +# (We are not using azure blob storage at this time, but the included +# instructional steps are pertinent) +# +# Because this file is in the "plugins" directory, it should be referenced +# in the Helm chart's values.yaml as config.log_config.LOGGING_CONFIG +# as opposed to log_config.LOGGING_CONFIG in a new directory in the PYTHONPATH +# as noted in the linked instructions. +# + +import os + +from airflow import configuration as conf +from airflow.utils.file import mkdirs + +# TODO: Logging format and level should be configured +# in this file instead of from airflow.cfg. Currently +# there are other log format and level configurations in +# settings.py and cli.py. Please see AIRFLOW-1455. +LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper() + +# Flask appbuilder's info level log is very verbose, +# so it's set to 'WARN' by default. +FAB_LOG_LEVEL = conf.get('core', 'FAB_LOGGING_LEVEL').upper() + +LOG_FORMAT = conf.get('core', 'LOG_FORMAT') + +BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER') + +PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'CHILD_PROCESS_LOG_DIRECTORY') + +DAG_PROCESSOR_MANAGER_LOG_LOCATION = \ + conf.get('core', 'DAG_PROCESSOR_MANAGER_LOG_LOCATION') + +FILENAME_TEMPLATE = conf.get('core', 'LOG_FILENAME_TEMPLATE') + +PROCESSOR_FILENAME_TEMPLATE = conf.get('core', + 'LOG_PROCESSOR_FILENAME_TEMPLATE') + +# Storage bucket url for remote logging +# s3 buckets should start with "s3://" +# gcs buckets should start with "gs://" +# wasb buckets should start with "wasb" +# just to help Airflow select correct handler +REMOTE_BASE_LOG_FOLDER = conf.get('core', 'REMOTE_BASE_LOG_FOLDER') + +ELASTICSEARCH_HOST = conf.get('elasticsearch', 'ELASTICSEARCH_HOST') + +LOG_ID_TEMPLATE = conf.get('elasticsearch', 'ELASTICSEARCH_LOG_ID_TEMPLATE') + +END_OF_LOG_MARK = conf.get('elasticsearch', 'ELASTICSEARCH_END_OF_LOG_MARK') + +# NOTE: Modified for use by Shipyard/Airflow (rename to LOGGING_CONFIG): +LOGGING_CONFIG = { + 'version': 1, + 'disable_existing_loggers': False, + 'formatters': { + 'airflow': { + 'format': LOG_FORMAT, + }, + }, + 'handlers': { + # NOTE: Add a "raw" python console logger. Using 'console' results + # in a state of recursion. + 'py-console': { + 'class': 'logging.StreamHandler', + 'formatter': 'airflow', + 'stream': 'ext://sys.stdout' + }, + 'console': { + 'class': 'airflow.utils.log.logging_mixin.RedirectStdHandler', + 'formatter': 'airflow', + 'stream': 'sys.stdout' + }, + 'task': { + 'class': 'airflow.utils.log.file_task_handler.FileTaskHandler', + 'formatter': 'airflow', + 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), + 'filename_template': FILENAME_TEMPLATE, + }, + 'processor': { + 'class': + 'airflow.utils.log.file_processor_handler.FileProcessorHandler', + 'formatter': + 'airflow', + 'base_log_folder': + os.path.expanduser(PROCESSOR_LOG_FOLDER), + 'filename_template': + PROCESSOR_FILENAME_TEMPLATE, + } + }, + 'loggers': { + 'airflow.processor': { + 'handlers': ['processor'], + 'level': LOG_LEVEL, + 'propagate': False, + }, + 'airflow.task': { + # NOTE: Modified for use by Shipyard/Airflow (add console logging) + # The supplied console logger cannot be used here, as it + # Leads to out-of-control memory usage + 'handlers': ['task', 'py-console'], + 'level': LOG_LEVEL, + 'propagate': False, + }, + 'flask_appbuilder': { + # NOTE: Modified this to be "handlers" + 'handlers': ['console'], + 'level': FAB_LOG_LEVEL, + 'propagate': True, + } + }, + 'root': { + 'handlers': ['console'], + 'level': LOG_LEVEL, + } +} + +DEFAULT_DAG_PARSING_LOGGING_CONFIG = { + 'handlers': { + 'processor_manager': { + 'class': 'logging.handlers.RotatingFileHandler', + 'formatter': 'airflow', + 'filename': DAG_PROCESSOR_MANAGER_LOG_LOCATION, + 'mode': 'a', + 'maxBytes': 104857600, # 100MB + 'backupCount': 5 + } + }, + 'loggers': { + 'airflow.processor_manager': { + 'handlers': ['processor_manager'], + 'level': LOG_LEVEL, + 'propagate': False, + } + } +} + +REMOTE_HANDLERS = { + 's3': { + 'task': { + 'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler', + 'formatter': 'airflow', + 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), + 's3_log_folder': REMOTE_BASE_LOG_FOLDER, + 'filename_template': FILENAME_TEMPLATE, + }, + 'processor': { + 'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler', + 'formatter': 'airflow', + 'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER), + 's3_log_folder': REMOTE_BASE_LOG_FOLDER, + 'filename_template': PROCESSOR_FILENAME_TEMPLATE, + }, + }, + 'gcs': { + 'task': { + 'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler', + 'formatter': 'airflow', + 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), + 'gcs_log_folder': REMOTE_BASE_LOG_FOLDER, + 'filename_template': FILENAME_TEMPLATE, + }, + 'processor': { + 'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler', + 'formatter': 'airflow', + 'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER), + 'gcs_log_folder': REMOTE_BASE_LOG_FOLDER, + 'filename_template': PROCESSOR_FILENAME_TEMPLATE, + }, + }, + 'wasb': { + 'task': { + 'class': 'airflow.utils.log.wasb_task_handler.WasbTaskHandler', + 'formatter': 'airflow', + 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), + 'wasb_log_folder': REMOTE_BASE_LOG_FOLDER, + 'wasb_container': 'airflow-logs', + 'filename_template': FILENAME_TEMPLATE, + 'delete_local_copy': False, + }, + 'processor': { + 'class': 'airflow.utils.log.wasb_task_handler.WasbTaskHandler', + 'formatter': 'airflow', + 'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER), + 'wasb_log_folder': REMOTE_BASE_LOG_FOLDER, + 'wasb_container': 'airflow-logs', + 'filename_template': PROCESSOR_FILENAME_TEMPLATE, + 'delete_local_copy': False, + }, + }, + 'elasticsearch': { + 'task': { + 'class': + 'airflow.utils.log.es_task_handler.ElasticsearchTaskHandler', + 'formatter': 'airflow', + 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), + 'log_id_template': LOG_ID_TEMPLATE, + 'filename_template': FILENAME_TEMPLATE, + 'end_of_log_mark': END_OF_LOG_MARK, + 'host': ELASTICSEARCH_HOST, + }, + }, +} + +# NOTE: Modified for use by Shipyard/Airflow to "getboolean" as existing +# code of conf.get would evaluate "False" as true. +REMOTE_LOGGING = conf.getboolean('core', 'remote_logging') + +# Only update the handlers and loggers when CONFIG_PROCESSOR_MANAGER_LOGGER is +# set. +# This is to avoid exceptions when initializing RotatingFileHandler multiple +# times in multiple processes. +if os.environ.get('CONFIG_PROCESSOR_MANAGER_LOGGER') == 'True': + LOGGING_CONFIG['handlers'] \ + .update(DEFAULT_DAG_PARSING_LOGGING_CONFIG['handlers']) + LOGGING_CONFIG['loggers'] \ + .update(DEFAULT_DAG_PARSING_LOGGING_CONFIG['loggers']) + + # Manually create log directory for processor_manager handler as + # RotatingFileHandler will only create file but not the directory. + processor_manager_handler_config = DEFAULT_DAG_PARSING_LOGGING_CONFIG[ + 'handlers']['processor_manager'] + directory = os.path.dirname(processor_manager_handler_config['filename']) + mkdirs(directory, 0o755) + +if REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('s3://'): + LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['s3']) +elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('gs://'): + LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['gcs']) +elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('wasb'): + LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['wasb']) +elif REMOTE_LOGGING and ELASTICSEARCH_HOST: + LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['elasticsearch']) diff --git a/images/airflow/requirements.txt b/images/airflow/requirements.txt index 57299038..3ec42468 100644 --- a/images/airflow/requirements.txt +++ b/images/airflow/requirements.txt @@ -18,7 +18,7 @@ ndg-httpsclient==0.5.1 pyasn1==0.4.4 psycopg2==2.7.5 docker==3.5.0 -apache-airflow[crypto,celery,postgres,hive,hdfs,jdbc]==1.10.0 +apache-airflow[crypto,celery,postgres,hive,hdfs,jdbc]==1.10.1 python-openstackclient==3.16.1 kubernetes>=6.0.0 diff --git a/src/bin/shipyard_airflow/test-requirements.txt b/src/bin/shipyard_airflow/test-requirements.txt index 0e8884d6..29479463 100644 --- a/src/bin/shipyard_airflow/test-requirements.txt +++ b/src/bin/shipyard_airflow/test-requirements.txt @@ -3,7 +3,7 @@ pytest==3.4 pytest-cov==2.5.1 responses==0.8.1 testfixtures==5.1.1 -apache-airflow[crypto,celery,postgres,hive,hdfs,jdbc]==1.10.0 +apache-airflow[crypto,celery,postgres,hive,hdfs,jdbc]==1.10.1 # TODO(bryan-strassner) Pin to version for airflow when added to the # requirements.txt in the airflow images directory