diff --git a/.gitignore b/.gitignore index 3e5b7e98..930384cf 100644 --- a/.gitignore +++ b/.gitignore @@ -120,6 +120,10 @@ AUTHORS # vscode .vscode/ + +# pycharm +.idea + # tests airship-ucp-shipyard.values.yaml airflow-webserver.pid @@ -128,3 +132,4 @@ airflow.db latest src/bin/shipyard_airflow/shipyard_airflow/config src/bin/shipyard_airflow/shipyard_airflow/webserver_config.py +airflow-runtime diff --git a/charts/shipyard/values.yaml b/charts/shipyard/values.yaml index 621b61ca..32907f28 100644 --- a/charts/shipyard/values.yaml +++ b/charts/shipyard/values.yaml @@ -476,8 +476,8 @@ conf: - broker_url - result_backend - fernet_key - lazy_discover_providers: "False" - lazy_load_plugins: "False" + lazy_discover_providers: "True" + lazy_load_plugins: "True" hostname_callable: "socket:getfqdn" default_timezone: "utc" executor: "CeleryExecutor" @@ -526,7 +526,7 @@ conf: # See image-bundled log_config.py. # Adds console logging of task/step logs. # logging_config_class: log_config.LOGGING_CONFIG - logging_config_class: new_log_config.LOGGING_CONFIG + logging_config_class: log_config.DEFAULT_LOGGING_CONFIG # NOTE: Airflow 1.10 introduces extra newline characters between log # records. Version 1.10.1 should resolve this issue # https://issues.apache.org/jira/browse/AIRFLOW-1917 @@ -544,9 +544,9 @@ conf: log_filename_template: "{{ ti.dag_id }}/{{ ti.task_id }}/{{ execution_date.strftime('%%Y-%%m-%%dT%%H:%%M:%%S') }}/{{ try_number }}.log" log_processor_filename_template: "{{ filename }}.log" dag_processor_manager_log_location: /usr/local/airflow/logs/dag_processor_manager/dag_processor_manager.log - logging_level: "INFO" + logging_level: "DEBUG" fab_logging_level: "WARNING" - celery_logging_level: "INFO" + celery_logging_level: "DEBUG" base_log_folder: /usr/local/airflow/logs remote_logging: "False" remote_log_conn_id: "" diff --git a/images/airflow/config/log_config.py b/images/airflow/config/log_config.py index 5afcbff1..33bf4992 100644 --- a/images/airflow/config/log_config.py +++ b/images/airflow/config/log_config.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file @@ -16,246 +15,327 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -# -# !Important Shipyard/Airflow usage note: -# -# This file is copied from: -# https://github.com/apache/incubator-airflow/blob/master/airflow/config_templates/airflow_local_settings.py -# as this is the recommended way to configure logging as of version 1.10.x -# -# See documentation here: -# https://airflow.readthedocs.io/en/stable/howto/write-logs.html#writing-logs-to-azure-blob-storage -# -# (We are not using azure blob storage at this time, but the included -# instructional steps are pertinent) -# -# Because this file is in the "plugins" directory, it should be referenced -# in the Helm chart's values.yaml as config.log_config.LOGGING_CONFIG -# as opposed to log_config.LOGGING_CONFIG in a new directory in the PYTHONPATH -# as noted in the linked instructions. -# +"""Airflow logging settings.""" +from __future__ import annotations import os +from pathlib import Path +from typing import Any +from urllib.parse import urlsplit -from airflow import configuration as conf -from airflow.utils.file import mkdirs +from airflow.configuration import conf +from airflow.exceptions import AirflowException + +LOG_LEVEL: str = conf.get_mandatory_value("logging", "LOGGING_LEVEL").upper() -# TODO: Logging format and level should be configured -# in this file instead of from airflow.cfg. Currently -# there are other log format and level configurations in -# settings.py and cli.py. Please see AIRFLOW-1455. -LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper() # Flask appbuilder's info level log is very verbose, # so it's set to 'WARN' by default. -FAB_LOG_LEVEL = conf.get('core', 'FAB_LOGGING_LEVEL').upper() +FAB_LOG_LEVEL: str = conf.get_mandatory_value("logging", "FAB_LOGGING_LEVEL").upper() -LOG_FORMAT = conf.get('core', 'LOG_FORMAT') +LOG_FORMAT: str = conf.get_mandatory_value("logging", "LOG_FORMAT") +DAG_PROCESSOR_LOG_FORMAT: str = conf.get_mandatory_value("logging", "DAG_PROCESSOR_LOG_FORMAT") -BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER') +LOG_FORMATTER_CLASS: str = conf.get_mandatory_value( + "logging", "LOG_FORMATTER_CLASS", fallback="airflow.utils.log.timezone_aware.TimezoneAware" +) -PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'CHILD_PROCESS_LOG_DIRECTORY') +COLORED_LOG_FORMAT: str = conf.get_mandatory_value("logging", "COLORED_LOG_FORMAT") -DAG_PROCESSOR_MANAGER_LOG_LOCATION = \ - conf.get('core', 'DAG_PROCESSOR_MANAGER_LOG_LOCATION') +COLORED_LOG: bool = conf.getboolean("logging", "COLORED_CONSOLE_LOG") -FILENAME_TEMPLATE = conf.get('core', 'LOG_FILENAME_TEMPLATE') +COLORED_FORMATTER_CLASS: str = conf.get_mandatory_value("logging", "COLORED_FORMATTER_CLASS") -PROCESSOR_FILENAME_TEMPLATE = conf.get('core', - 'LOG_PROCESSOR_FILENAME_TEMPLATE') +DAG_PROCESSOR_LOG_TARGET: str = conf.get_mandatory_value("logging", "DAG_PROCESSOR_LOG_TARGET") -# Storage bucket url for remote logging -# s3 buckets should start with "s3://" -# gcs buckets should start with "gs://" -# wasb buckets should start with "wasb" -# just to help Airflow select correct handler -REMOTE_BASE_LOG_FOLDER = conf.get('core', 'REMOTE_BASE_LOG_FOLDER') +BASE_LOG_FOLDER: str = conf.get_mandatory_value("logging", "BASE_LOG_FOLDER") -ELASTICSEARCH_HOST = conf.get('elasticsearch', 'ELASTICSEARCH_HOST') +PROCESSOR_LOG_FOLDER: str = conf.get_mandatory_value("scheduler", "CHILD_PROCESS_LOG_DIRECTORY") -LOG_ID_TEMPLATE = conf.get('elasticsearch', 'ELASTICSEARCH_LOG_ID_TEMPLATE') +DAG_PROCESSOR_MANAGER_LOG_LOCATION: str = conf.get_mandatory_value( + "logging", "DAG_PROCESSOR_MANAGER_LOG_LOCATION" +) -END_OF_LOG_MARK = conf.get('elasticsearch', 'ELASTICSEARCH_END_OF_LOG_MARK') +# FILENAME_TEMPLATE only uses in Remote Logging Handlers since Airflow 2.3.3 +# All of these handlers inherited from FileTaskHandler and providing any value rather than None +# would raise deprecation warning. +FILENAME_TEMPLATE: str | None = None -# NOTE: Modified for use by Shipyard/Airflow (rename to LOGGING_CONFIG): -LOGGING_CONFIG = { - 'version': 1, - 'disable_existing_loggers': False, - 'formatters': { - 'airflow': { - 'format': LOG_FORMAT, +PROCESSOR_FILENAME_TEMPLATE: str = conf.get_mandatory_value("logging", "LOG_PROCESSOR_FILENAME_TEMPLATE") + +DEFAULT_LOGGING_CONFIG: dict[str, Any] = { + "version": 1, + "disable_existing_loggers": False, + "formatters": { + "airflow": { + "format": LOG_FORMAT, + "class": LOG_FORMATTER_CLASS, + }, + "airflow_coloured": { + "format": COLORED_LOG_FORMAT if COLORED_LOG else LOG_FORMAT, + "class": COLORED_FORMATTER_CLASS if COLORED_LOG else LOG_FORMATTER_CLASS, + }, + "source_processor": { + "format": DAG_PROCESSOR_LOG_FORMAT, + "class": LOG_FORMATTER_CLASS, }, }, - 'handlers': { + "filters": { + "mask_secrets": { + "()": "airflow.utils.log.secrets_masker.SecretsMasker", + }, + }, + "handlers": { # NOTE: Add a "raw" python console logger. Using 'console' results # in a state of recursion. 'py-console': { 'class': 'logging.StreamHandler', 'formatter': 'airflow', - 'stream': 'ext://sys.stdout' + 'stream': 'ext://sys.stdout', + "filters": ["mask_secrets"], }, - 'console': { - 'class': 'airflow.utils.log.logging_mixin.RedirectStdHandler', - 'formatter': 'airflow', - 'stream': 'sys.stdout' + "console": { + "class": "airflow.utils.log.logging_mixin.RedirectStdHandler", + "formatter": "airflow_coloured", + "stream": "sys.stdout", + "filters": ["mask_secrets"], }, - 'task': { - 'class': 'airflow.utils.log.file_task_handler.FileTaskHandler', - 'formatter': 'airflow', - 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), - 'filename_template': FILENAME_TEMPLATE, + "task": { + "class": "airflow.utils.log.file_task_handler.FileTaskHandler", + "formatter": "airflow", + "base_log_folder": os.path.expanduser(BASE_LOG_FOLDER), + "filters": ["mask_secrets"], + }, + "processor": { + "class": "airflow.utils.log.file_processor_handler.FileProcessorHandler", + "formatter": "airflow", + "base_log_folder": os.path.expanduser(PROCESSOR_LOG_FOLDER), + "filename_template": PROCESSOR_FILENAME_TEMPLATE, + "filters": ["mask_secrets"], + }, + "processor_to_stdout": { + "class": "airflow.utils.log.logging_mixin.RedirectStdHandler", + "formatter": "source_processor", + "stream": "sys.stdout", + "filters": ["mask_secrets"], }, - 'processor': { - 'class': - 'airflow.utils.log.file_processor_handler.FileProcessorHandler', - 'formatter': - 'airflow', - 'base_log_folder': - os.path.expanduser(PROCESSOR_LOG_FOLDER), - 'filename_template': - PROCESSOR_FILENAME_TEMPLATE, - } }, - 'loggers': { - 'airflow.processor': { - 'handlers': ['processor'], - 'level': LOG_LEVEL, - 'propagate': False, + "loggers": { + "airflow.processor": { + "handlers": ["processor_to_stdout" if DAG_PROCESSOR_LOG_TARGET == "stdout" else "processor"], + "level": LOG_LEVEL, + # Set to true here (and reset via set_context) so that if no file is configured we still get logs! + "propagate": True, }, - 'airflow.task': { + "airflow.task": { # NOTE: Modified for use by Shipyard/Airflow (add console logging) # The supplied console logger cannot be used here, as it # Leads to out-of-control memory usage 'handlers': ['task', 'py-console'], - 'level': LOG_LEVEL, - 'propagate': False, + "level": LOG_LEVEL, + # Set to true here (and reset via set_context) so that if no file is configured we still get logs! + "propagate": True, + "filters": ["mask_secrets"], + }, + "flask_appbuilder": { + "handlers": ["console"], + "level": FAB_LOG_LEVEL, + "propagate": True, }, - 'flask_appbuilder': { - # NOTE: Modified this to be "handlers" - 'handlers': ['console'], - 'level': FAB_LOG_LEVEL, - 'propagate': True, - } }, - 'root': { - 'handlers': ['console'], - 'level': LOG_LEVEL, + "root": { + "handlers": ["console"], + "level": LOG_LEVEL, + "filters": ["mask_secrets"], + }, +} + +EXTRA_LOGGER_NAMES: str | None = conf.get("logging", "EXTRA_LOGGER_NAMES", fallback=None) +if EXTRA_LOGGER_NAMES: + new_loggers = { + logger_name.strip(): { + "handlers": ["console"], + "level": LOG_LEVEL, + "propagate": True, + } + for logger_name in EXTRA_LOGGER_NAMES.split(",") } -} + DEFAULT_LOGGING_CONFIG["loggers"].update(new_loggers) -DEFAULT_DAG_PARSING_LOGGING_CONFIG = { - 'handlers': { - 'processor_manager': { - 'class': 'logging.handlers.RotatingFileHandler', - 'formatter': 'airflow', - 'filename': DAG_PROCESSOR_MANAGER_LOG_LOCATION, - 'mode': 'a', - 'maxBytes': 104857600, # 100MB - 'backupCount': 5 +DEFAULT_DAG_PARSING_LOGGING_CONFIG: dict[str, dict[str, dict[str, Any]]] = { + "handlers": { + "processor_manager": { + "class": "airflow.utils.log.non_caching_file_handler.NonCachingRotatingFileHandler", + "formatter": "airflow", + "filename": DAG_PROCESSOR_MANAGER_LOG_LOCATION, + "mode": "a", + "maxBytes": 104857600, # 100MB + "backupCount": 5, } }, - 'loggers': { - 'airflow.processor_manager': { - 'handlers': ['processor_manager'], - 'level': LOG_LEVEL, - 'propagate': False, + "loggers": { + "airflow.processor_manager": { + "handlers": ["processor_manager"], + "level": LOG_LEVEL, + "propagate": False, } - } -} - -REMOTE_HANDLERS = { - 's3': { - 'task': { - 'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler', - 'formatter': 'airflow', - 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), - 's3_log_folder': REMOTE_BASE_LOG_FOLDER, - 'filename_template': FILENAME_TEMPLATE, - }, - 'processor': { - 'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler', - 'formatter': 'airflow', - 'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER), - 's3_log_folder': REMOTE_BASE_LOG_FOLDER, - 'filename_template': PROCESSOR_FILENAME_TEMPLATE, - }, - }, - 'gcs': { - 'task': { - 'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler', - 'formatter': 'airflow', - 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), - 'gcs_log_folder': REMOTE_BASE_LOG_FOLDER, - 'filename_template': FILENAME_TEMPLATE, - }, - 'processor': { - 'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler', - 'formatter': 'airflow', - 'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER), - 'gcs_log_folder': REMOTE_BASE_LOG_FOLDER, - 'filename_template': PROCESSOR_FILENAME_TEMPLATE, - }, - }, - 'wasb': { - 'task': { - 'class': 'airflow.utils.log.wasb_task_handler.WasbTaskHandler', - 'formatter': 'airflow', - 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), - 'wasb_log_folder': REMOTE_BASE_LOG_FOLDER, - 'wasb_container': 'airflow-logs', - 'filename_template': FILENAME_TEMPLATE, - 'delete_local_copy': False, - }, - 'processor': { - 'class': 'airflow.utils.log.wasb_task_handler.WasbTaskHandler', - 'formatter': 'airflow', - 'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER), - 'wasb_log_folder': REMOTE_BASE_LOG_FOLDER, - 'wasb_container': 'airflow-logs', - 'filename_template': PROCESSOR_FILENAME_TEMPLATE, - 'delete_local_copy': False, - }, - }, - 'elasticsearch': { - 'task': { - 'class': - 'airflow.utils.log.es_task_handler.ElasticsearchTaskHandler', - 'formatter': 'airflow', - 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), - 'log_id_template': LOG_ID_TEMPLATE, - 'filename_template': FILENAME_TEMPLATE, - 'end_of_log_mark': END_OF_LOG_MARK, - 'host': ELASTICSEARCH_HOST, - }, }, } -# NOTE: Modified for use by Shipyard/Airflow to "getboolean" as existing -# code of conf.get would evaluate "False" as true. -REMOTE_LOGGING = conf.getboolean('core', 'remote_logging') +# Only update the handlers and loggers when CONFIG_PROCESSOR_MANAGER_LOGGER is set. +# This is to avoid exceptions when initializing RotatingFileHandler multiple times +# in multiple processes. +if os.environ.get("CONFIG_PROCESSOR_MANAGER_LOGGER") == "True": + DEFAULT_LOGGING_CONFIG["handlers"].update(DEFAULT_DAG_PARSING_LOGGING_CONFIG["handlers"]) + DEFAULT_LOGGING_CONFIG["loggers"].update(DEFAULT_DAG_PARSING_LOGGING_CONFIG["loggers"]) -# Only update the handlers and loggers when CONFIG_PROCESSOR_MANAGER_LOGGER is -# set. -# This is to avoid exceptions when initializing RotatingFileHandler multiple -# times in multiple processes. -if os.environ.get('CONFIG_PROCESSOR_MANAGER_LOGGER') == 'True': - LOGGING_CONFIG['handlers'] \ - .update(DEFAULT_DAG_PARSING_LOGGING_CONFIG['handlers']) - LOGGING_CONFIG['loggers'] \ - .update(DEFAULT_DAG_PARSING_LOGGING_CONFIG['loggers']) + # Manually create log directory for processor_manager handler as RotatingFileHandler + # will only create file but not the directory. + processor_manager_handler_config: dict[str, Any] = DEFAULT_DAG_PARSING_LOGGING_CONFIG["handlers"][ + "processor_manager" + ] + directory: str = os.path.dirname(processor_manager_handler_config["filename"]) + Path(directory).mkdir(parents=True, exist_ok=True, mode=0o755) - # Manually create log directory for processor_manager handler as - # RotatingFileHandler will only create file but not the directory. - processor_manager_handler_config = DEFAULT_DAG_PARSING_LOGGING_CONFIG[ - 'handlers']['processor_manager'] - directory = os.path.dirname(processor_manager_handler_config['filename']) - mkdirs(directory, 0o755) +################## +# Remote logging # +################## -if REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('s3://'): - LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['s3']) -elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('gs://'): - LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['gcs']) -elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('wasb'): - LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['wasb']) -elif REMOTE_LOGGING and ELASTICSEARCH_HOST: - LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['elasticsearch']) +REMOTE_LOGGING: bool = conf.getboolean("logging", "remote_logging") + +if REMOTE_LOGGING: + + ELASTICSEARCH_HOST: str | None = conf.get("elasticsearch", "HOST") + + # Storage bucket URL for remote logging + # S3 buckets should start with "s3://" + # Cloudwatch log groups should start with "cloudwatch://" + # GCS buckets should start with "gs://" + # WASB buckets should start with "wasb" + # HDFS path should start with "hdfs://" + # just to help Airflow select correct handler + REMOTE_BASE_LOG_FOLDER: str = conf.get_mandatory_value("logging", "REMOTE_BASE_LOG_FOLDER") + REMOTE_TASK_HANDLER_KWARGS = conf.getjson("logging", "REMOTE_TASK_HANDLER_KWARGS", fallback={}) + + if REMOTE_BASE_LOG_FOLDER.startswith("s3://"): + S3_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = { + "task": { + "class": "airflow.providers.amazon.aws.log.s3_task_handler.S3TaskHandler", + "formatter": "airflow", + "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)), + "s3_log_folder": REMOTE_BASE_LOG_FOLDER, + "filename_template": FILENAME_TEMPLATE, + }, + } + + DEFAULT_LOGGING_CONFIG["handlers"].update(S3_REMOTE_HANDLERS) + elif REMOTE_BASE_LOG_FOLDER.startswith("cloudwatch://"): + url_parts = urlsplit(REMOTE_BASE_LOG_FOLDER) + CLOUDWATCH_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = { + "task": { + "class": "airflow.providers.amazon.aws.log.cloudwatch_task_handler.CloudwatchTaskHandler", + "formatter": "airflow", + "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)), + "log_group_arn": url_parts.netloc + url_parts.path, + "filename_template": FILENAME_TEMPLATE, + }, + } + + DEFAULT_LOGGING_CONFIG["handlers"].update(CLOUDWATCH_REMOTE_HANDLERS) + elif REMOTE_BASE_LOG_FOLDER.startswith("gs://"): + key_path = conf.get_mandatory_value("logging", "GOOGLE_KEY_PATH", fallback=None) + GCS_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = { + "task": { + "class": "airflow.providers.google.cloud.log.gcs_task_handler.GCSTaskHandler", + "formatter": "airflow", + "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)), + "gcs_log_folder": REMOTE_BASE_LOG_FOLDER, + "filename_template": FILENAME_TEMPLATE, + "gcp_key_path": key_path, + }, + } + + DEFAULT_LOGGING_CONFIG["handlers"].update(GCS_REMOTE_HANDLERS) + elif REMOTE_BASE_LOG_FOLDER.startswith("wasb"): + WASB_REMOTE_HANDLERS: dict[str, dict[str, str | bool | None]] = { + "task": { + "class": "airflow.providers.microsoft.azure.log.wasb_task_handler.WasbTaskHandler", + "formatter": "airflow", + "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)), + "wasb_log_folder": REMOTE_BASE_LOG_FOLDER, + "wasb_container": "airflow-logs", + "filename_template": FILENAME_TEMPLATE, + }, + } + + DEFAULT_LOGGING_CONFIG["handlers"].update(WASB_REMOTE_HANDLERS) + elif REMOTE_BASE_LOG_FOLDER.startswith("stackdriver://"): + key_path = conf.get_mandatory_value("logging", "GOOGLE_KEY_PATH", fallback=None) + # stackdriver:///airflow-tasks => airflow-tasks + log_name = urlsplit(REMOTE_BASE_LOG_FOLDER).path[1:] + STACKDRIVER_REMOTE_HANDLERS = { + "task": { + "class": "airflow.providers.google.cloud.log.stackdriver_task_handler.StackdriverTaskHandler", + "formatter": "airflow", + "name": log_name, + "gcp_key_path": key_path, + } + } + + DEFAULT_LOGGING_CONFIG["handlers"].update(STACKDRIVER_REMOTE_HANDLERS) + elif REMOTE_BASE_LOG_FOLDER.startswith("oss://"): + OSS_REMOTE_HANDLERS = { + "task": { + "class": "airflow.providers.alibaba.cloud.log.oss_task_handler.OSSTaskHandler", + "formatter": "airflow", + "base_log_folder": os.path.expanduser(BASE_LOG_FOLDER), + "oss_log_folder": REMOTE_BASE_LOG_FOLDER, + "filename_template": FILENAME_TEMPLATE, + }, + } + DEFAULT_LOGGING_CONFIG["handlers"].update(OSS_REMOTE_HANDLERS) + elif REMOTE_BASE_LOG_FOLDER.startswith("hdfs://"): + HDFS_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = { + "task": { + "class": "airflow.providers.apache.hdfs.log.hdfs_task_handler.HdfsTaskHandler", + "formatter": "airflow", + "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)), + "hdfs_log_folder": REMOTE_BASE_LOG_FOLDER, + "filename_template": FILENAME_TEMPLATE, + }, + } + DEFAULT_LOGGING_CONFIG["handlers"].update(HDFS_REMOTE_HANDLERS) + elif ELASTICSEARCH_HOST: + ELASTICSEARCH_END_OF_LOG_MARK: str = conf.get_mandatory_value("elasticsearch", "END_OF_LOG_MARK") + ELASTICSEARCH_FRONTEND: str = conf.get_mandatory_value("elasticsearch", "frontend") + ELASTICSEARCH_WRITE_STDOUT: bool = conf.getboolean("elasticsearch", "WRITE_STDOUT") + ELASTICSEARCH_JSON_FORMAT: bool = conf.getboolean("elasticsearch", "JSON_FORMAT") + ELASTICSEARCH_JSON_FIELDS: str = conf.get_mandatory_value("elasticsearch", "JSON_FIELDS") + ELASTICSEARCH_HOST_FIELD: str = conf.get_mandatory_value("elasticsearch", "HOST_FIELD") + ELASTICSEARCH_OFFSET_FIELD: str = conf.get_mandatory_value("elasticsearch", "OFFSET_FIELD") + + ELASTIC_REMOTE_HANDLERS: dict[str, dict[str, str | bool | None]] = { + "task": { + "class": "airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler", + "formatter": "airflow", + "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)), + "filename_template": FILENAME_TEMPLATE, + "end_of_log_mark": ELASTICSEARCH_END_OF_LOG_MARK, + "host": ELASTICSEARCH_HOST, + "frontend": ELASTICSEARCH_FRONTEND, + "write_stdout": ELASTICSEARCH_WRITE_STDOUT, + "json_format": ELASTICSEARCH_JSON_FORMAT, + "json_fields": ELASTICSEARCH_JSON_FIELDS, + "host_field": ELASTICSEARCH_HOST_FIELD, + "offset_field": ELASTICSEARCH_OFFSET_FIELD, + }, + } + + DEFAULT_LOGGING_CONFIG["handlers"].update(ELASTIC_REMOTE_HANDLERS) + else: + raise AirflowException( + "Incorrect remote log configuration. Please check the configuration of option 'host' in " + "section 'elasticsearch' if you are using Elasticsearch. In the other case, " + "'remote_base_log_folder' option in the 'logging' section." + ) + DEFAULT_LOGGING_CONFIG["handlers"]["task"].update(REMOTE_TASK_HANDLER_KWARGS) diff --git a/images/airflow/config/new_log_config.py b/images/airflow/config/new_log_config.py deleted file mode 100644 index 3394d32d..00000000 --- a/images/airflow/config/new_log_config.py +++ /dev/null @@ -1,4 +0,0 @@ -from copy import deepcopy -from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG - -LOGGING_CONFIG = deepcopy(DEFAULT_LOGGING_CONFIG) \ No newline at end of file diff --git a/src/bin/shipyard_airflow/shipyard_airflow/control/action/actions_id_api.py b/src/bin/shipyard_airflow/shipyard_airflow/control/action/actions_id_api.py index ca7df80e..edfa4558 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/control/action/actions_id_api.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/control/action/actions_id_api.py @@ -89,12 +89,12 @@ class ActionsIdResource(BaseResource): action['notes'].append(note.view()) return action - def get_dag_run_by_id(self, dag_id, run_id): + def get_dag_run_by_id(self, dag_id, execution_date): """ Wrapper for call to the airflow db to get a dag_run :returns: a dag run dictionary """ - dag_run_list = self.get_dag_run_db(dag_id, run_id) + dag_run_list = self.get_dag_run_db(dag_id, execution_date) # should be only one result, return the first one if dag_run_list: return dag_run_list[0] diff --git a/src/bin/shipyard_airflow/shipyard_airflow/control/af_monitoring/workflows_api.py b/src/bin/shipyard_airflow/shipyard_airflow/control/af_monitoring/workflows_api.py index aa418ddc..9d368b31 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/control/af_monitoring/workflows_api.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/control/af_monitoring/workflows_api.py @@ -76,7 +76,7 @@ class WorkflowIdResource(BaseResource): """ Retrieve a workflow by id, :param helper: The WorkflowHelper constructed for this invocation - :param workflow_id: a string in {dag_id}__{run_id} format + :param workflow_id: a string in {dag_id}__{execution} format identifying a workflow :returns: a workflow detail dictionary including steps """ diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/armada_deploy_site.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/armada_deploy_site.py index 868edc4f..8a2dd0f3 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/armada_deploy_site.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/dags/armada_deploy_site.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from airflow.models import DAG +from airflow.utils.task_group import TaskGroup try: from airflow.operators import ArmadaGetReleasesOperator @@ -26,30 +26,29 @@ except ImportError: from shipyard_airflow.dags.config_path import config_path -def deploy_site_armada(parent_dag_name, child_dag_name, args): +def deploy_site_armada(dag): ''' - Armada Subdag + Armada TaskGroup ''' - dag = DAG( - '{}.{}'.format(parent_dag_name, child_dag_name), - default_args=args) + with TaskGroup(group_id="armada_build", dag=dag) as armada_build: + """Generate the armada post_apply step - # Armada Apply - armada_post_apply = ArmadaPostApplyOperator( - task_id='armada_post_apply', - shipyard_conf=config_path, - main_dag_name=parent_dag_name, - retries=5, - dag=dag) + Armada post_apply does the deployment of helm charts + """ + armada_post_apply = ArmadaPostApplyOperator( + task_id='armada_post_apply', + shipyard_conf=config_path, + retries=5, + dag=dag) + """Generate the armada get_releases step - # Get Helm Releases - armada_get_releases = ArmadaGetReleasesOperator( - task_id='armada_get_releases', - shipyard_conf=config_path, - main_dag_name=parent_dag_name, - dag=dag) + Armada get_releases does the verification of releases of helm charts + """ + armada_get_releases = ArmadaGetReleasesOperator( + task_id='armada_get_releases', + shipyard_conf=config_path, + dag=dag) - # Define dependencies - armada_get_releases.set_upstream(armada_post_apply) + armada_post_apply >> armada_get_releases - return dag + return armada_build diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/common_step_factory.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/common_step_factory.py index 3e964394..9424322c 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/common_step_factory.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/dags/common_step_factory.py @@ -125,19 +125,14 @@ class CommonStepFactory(object): on_failure_callback=step_failure_handler, dag=self.dag) - def get_preflight(self, task_id=dn.ALL_PREFLIGHT_CHECKS_DAG_NAME): + def get_preflight(self): """Generate the preflight step Preflight checks preconditions for running a DAG """ - return SubDagOperator( - subdag=all_preflight_checks( - self.parent_dag_name, - task_id, - args=self.default_args), - task_id=task_id, - on_failure_callback=step_failure_handler, - dag=self.dag) + return all_preflight_checks( + self.dag + ) def get_get_rendered_doc(self, task_id=dn.GET_RENDERED_DOC): """Generate the get deckhand rendered doc step @@ -152,23 +147,16 @@ class CommonStepFactory(object): on_failure_callback=step_failure_handler, dag=self.dag) - def get_validate_site_design(self, - targets=None, - task_id=dn.VALIDATE_SITE_DESIGN_DAG_NAME): + def get_validate_site_design(self, targets=None): """Generate the validate site design step Validation of the site design checks that the design to be used for a deployment passes checks before using it. """ - return SubDagOperator( - subdag=validate_site_design( - self.parent_dag_name, - task_id, - args=self.default_args, - targets=targets), - task_id=task_id, - on_failure_callback=step_failure_handler, - dag=self.dag) + return validate_site_design( + self.dag, + targets=targets + ) def get_deployment_configuration(self, task_id=dn.DEPLOYMENT_CONFIGURATION): @@ -184,21 +172,15 @@ class CommonStepFactory(object): on_failure_callback=step_failure_handler, dag=self.dag) - def get_drydock_build(self, task_id=dn.DRYDOCK_BUILD_DAG_NAME, - verify_nodes_exist=False): + def get_drydock_build(self, verify_nodes_exist=False): """Generate the drydock build step Drydock build does the hardware provisioning. """ - return SubDagOperator( - subdag=deploy_site_drydock( - self.parent_dag_name, - task_id, - args=self.default_args, - verify_nodes_exist=verify_nodes_exist), - task_id=task_id, - on_failure_callback=step_failure_handler, - dag=self.dag) + return deploy_site_drydock( + self.dag, + verify_nodes_exist=verify_nodes_exist + ) def get_relabel_nodes(self, task_id=dn.RELABEL_NODES_DAG_NAME): """Generate the relabel nodes step @@ -212,19 +194,14 @@ class CommonStepFactory(object): on_failure_callback=step_failure_handler, dag=self.dag) - def get_armada_build(self, task_id=dn.ARMADA_BUILD_DAG_NAME): + def get_armada_build(self): """Generate the armada build step Armada build does the deployment of helm charts """ - return SubDagOperator( - subdag=deploy_site_armada( - self.parent_dag_name, - task_id, - args=self.default_args), - task_id=task_id, - on_failure_callback=step_failure_handler, - dag=self.dag) + return deploy_site_armada( + self.dag + ) def get_armada_test_releases(self, task_id=dn.ARMADA_TEST_RELEASES): """Generate the armada_test_releases step diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/dag_names.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/dag_names.py index 854f3491..a24e3fc6 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/dag_names.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/dags/dag_names.py @@ -35,6 +35,7 @@ DESTROY_SERVER = 'destroy_nodes' DEPLOYMENT_STATUS = 'deployment_status' FINAL_DEPLOYMENT_STATUS = 'final_deployment_status' + # Define a list of critical steps, used to determine successfulness of a # still-running DAG CRITICAL_DAG_STEPS = [ diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/destroy_node.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/destroy_node.py index 3926fbcb..6ac45d10 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/destroy_node.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/dags/destroy_node.py @@ -47,7 +47,8 @@ def destroy_server(parent_dag_name, child_dag_name, args): """ dag = DAG( '{}.{}'.format(parent_dag_name, child_dag_name), - default_args=args) + default_args=args, + schedule_interval=None) # Drain Node promenade_drain_node = PromenadeDrainNodeOperator( diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/drydock_deploy_site.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/drydock_deploy_site.py index d4cf16ca..44c7d51b 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/drydock_deploy_site.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/dags/drydock_deploy_site.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from airflow.models import DAG +from airflow.utils.task_group import TaskGroup try: from airflow.operators import DrydockNodesOperator @@ -32,46 +32,39 @@ except ImportError: from shipyard_airflow.dags.config_path import config_path -def deploy_site_drydock(parent_dag_name, child_dag_name, args, - verify_nodes_exist=False): +def deploy_site_drydock(dag, verify_nodes_exist=False): ''' - DryDock Subdag + DryDock TaskGroup ''' - dag = DAG( - '{}.{}'.format(parent_dag_name, child_dag_name), - default_args=args) + with TaskGroup(group_id="drydock_build", dag=dag) as drydock_build: - if verify_nodes_exist: - drydock_verify_nodes_exist = DrydockVerifyNodesExistOperator( - task_id='verify_nodes_exist', + if verify_nodes_exist: + drydock_verify_nodes_exist = DrydockVerifyNodesExistOperator( + task_id='verify_nodes_exist', + shipyard_conf=config_path, + dag=dag) + + drydock_verify_site = DrydockVerifySiteOperator( + task_id='verify_site', shipyard_conf=config_path, - main_dag_name=parent_dag_name, dag=dag) - drydock_verify_site = DrydockVerifySiteOperator( - task_id='verify_site', - shipyard_conf=config_path, - main_dag_name=parent_dag_name, - dag=dag) + drydock_prepare_site = DrydockPrepareSiteOperator( + task_id='prepare_site', + shipyard_conf=config_path, + dag=dag) - drydock_prepare_site = DrydockPrepareSiteOperator( - task_id='prepare_site', - shipyard_conf=config_path, - main_dag_name=parent_dag_name, - dag=dag) + drydock_nodes = DrydockNodesOperator( + task_id='prepare_and_deploy_nodes', + shipyard_conf=config_path, + dag=dag) - drydock_nodes = DrydockNodesOperator( - task_id='prepare_and_deploy_nodes', - shipyard_conf=config_path, - main_dag_name=parent_dag_name, - dag=dag) + # Define dependencies + drydock_prepare_site.set_upstream(drydock_verify_site) + if verify_nodes_exist: + drydock_verify_nodes_exist.set_upstream(drydock_prepare_site) + drydock_nodes.set_upstream(drydock_verify_nodes_exist) + else: + drydock_nodes.set_upstream(drydock_prepare_site) - # Define dependencies - drydock_prepare_site.set_upstream(drydock_verify_site) - if verify_nodes_exist: - drydock_verify_nodes_exist.set_upstream(drydock_prepare_site) - drydock_nodes.set_upstream(drydock_verify_nodes_exist) - else: - drydock_nodes.set_upstream(drydock_prepare_site) - - return dag + return drydock_build diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/preflight_checks.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/preflight_checks.py index 6b0c1e1f..fa9e0859 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/preflight_checks.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/dags/preflight_checks.py @@ -11,7 +11,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from airflow.models import DAG + +from airflow.utils.task_group import TaskGroup try: from airflow.operators import UcpHealthCheckOperator @@ -22,22 +23,18 @@ except ImportError: from shipyard_airflow.dags.config_path import config_path -def all_preflight_checks(parent_dag_name, child_dag_name, args): +def all_preflight_checks(dag): ''' Pre-Flight Checks Subdag ''' - dag = DAG( - '{}.{}'.format(parent_dag_name, child_dag_name), - default_args=args) + with TaskGroup(group_id="preflight", dag=dag) as preflight: + ''' + Check that all Airship components are in good state for the purposes + of the Undercloud Platform to proceed with processing. + ''' + shipyard = UcpHealthCheckOperator( + task_id='ucp_preflight_check', + shipyard_conf=config_path, + dag=dag) - ''' - Check that all Airship components are in good state for the purposes - of the Undercloud Platform to proceed with processing. - ''' - shipyard = UcpHealthCheckOperator( - task_id='ucp_preflight_check', - shipyard_conf=config_path, - main_dag_name=parent_dag_name, - dag=dag) - - return dag + return preflight diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/validate_site_design.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/validate_site_design.py index 73391015..16d40b8b 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/validate_site_design.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/dags/validate_site_design.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from airflow.models import DAG +from airflow.utils.task_group import TaskGroup try: from airflow.operators import ArmadaValidateDesignOperator @@ -35,53 +35,49 @@ BAREMETAL = 'baremetal' SOFTWARE = 'software' -def validate_site_design(parent_dag_name, child_dag_name, args, targets=None): - """Subdag to delegate design verification to the Airship components +def validate_site_design(dag, targets=None): + """TaskGroup to delegate design verification to the Airship components There is no wiring of steps - they all execute in parallel """ - dag = DAG( - '{}.{}'.format(parent_dag_name, child_dag_name), - default_args=args) + with TaskGroup(group_id="validate_site_design", + dag=dag) as validate_site_design: - if targets is None: - targets = [BAREMETAL, SOFTWARE] + if targets is None: + targets = [BAREMETAL, SOFTWARE] - # Always add Deckhand validations - DeckhandValidateSiteDesignOperator( - task_id='deckhand_validate_site_design', - shipyard_conf=config_path, - main_dag_name=parent_dag_name, - retries=1, - dag=dag - ) - - if BAREMETAL in targets: - # Add Drydock and Promenade validations - DrydockValidateDesignOperator( - task_id='drydock_validate_site_design', + # Always add Deckhand validations + deckhand_validate_site_design = DeckhandValidateSiteDesignOperator( + task_id='deckhand_validate_site_design', shipyard_conf=config_path, - main_dag_name=parent_dag_name, retries=1, dag=dag ) - PromenadeValidateSiteDesignOperator( - task_id='promenade_validate_site_design', - shipyard_conf=config_path, - main_dag_name=parent_dag_name, - retries=1, - dag=dag - ) + if BAREMETAL in targets: + # Add Drydock and Promenade validations + drydock_validate_site_design = DrydockValidateDesignOperator( + task_id='drydock_validate_site_design', + shipyard_conf=config_path, + retries=1, + dag=dag + ) - if SOFTWARE in targets: - # Add Armada validations - ArmadaValidateDesignOperator( - task_id='armada_validate_site_design', - shipyard_conf=config_path, - main_dag_name=parent_dag_name, - retries=1, - dag=dag - ) + promenade_validate_site_design = \ + PromenadeValidateSiteDesignOperator( + task_id='promenade_validate_site_design', + shipyard_conf=config_path, + retries=1, + dag=dag + ) - return dag + if SOFTWARE in targets: + # Add Armada validations + armada_validate_site_design = ArmadaValidateDesignOperator( + task_id='armada_validate_site_design', + shipyard_conf=config_path, + retries=1, + dag=dag + ) + + return validate_site_design diff --git a/src/bin/shipyard_airflow/shipyard_airflow/plugins/deckhand_create_site_action_tag.py b/src/bin/shipyard_airflow/shipyard_airflow/plugins/deckhand_create_site_action_tag.py index 75c83343..a00bf4d8 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/plugins/deckhand_create_site_action_tag.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/plugins/deckhand_create_site_action_tag.py @@ -116,7 +116,7 @@ class DeckhandCreateSiteActionTagOperator(DeckhandBaseOperator): def check_workflow_result(self): # Initialize Variables - task = ['armada_build'] + task = ['armada_build.armada_get_releases'] task_result = {} if self.main_dag_name in ['update_site', 'update_software']: diff --git a/tools/gate/playbooks/airskiff-reduce-site.yaml b/tools/gate/playbooks/airskiff-reduce-site.yaml index 7c80ed77..0df60169 100644 --- a/tools/gate/playbooks/airskiff-reduce-site.yaml +++ b/tools/gate/playbooks/airskiff-reduce-site.yaml @@ -17,7 +17,6 @@ - name: Overwrite Armada manifest shell: | git checkout v1.9 - mv tools/gate/manifests/full-site.yaml \ - type/skiff/manifests/full-site.yaml + mv tools/gate/manifests/full-site.yaml type/skiff/manifests/full-site.yaml args: chdir: "{{ zuul.projects['opendev.org/airship/treasuremap'].src_dir }}"