From 99c2da745a096789b3197b808dc26f8cbef1e556 Mon Sep 17 00:00:00 2001 From: Sergiy Markin Date: Sat, 9 Sep 2023 19:39:21 +0000 Subject: [PATCH] Airflow fix This PS is mainly fixing SubDAGs timing issues when they started along with main DAG and not at the time the main DAG needs them. In Airflow 2.6.2 SubDAGs are deprecated in favor of TaskGroups. So in this PS all SubDAGS were replaced with TaskGroups. Also task level logging config was extended by adding py-console to obtain logs from tasks like it was configured in Airflow 1.10.5. Change-Id: I3f6d3961b1511e3b7cd2f7aab9810d033cfc14a3 --- .gitignore | 5 + charts/shipyard/values.yaml | 10 +- images/airflow/config/log_config.py | 474 ++++++++++-------- images/airflow/config/new_log_config.py | 4 - .../control/action/actions_id_api.py | 4 +- .../control/af_monitoring/workflows_api.py | 2 +- .../dags/armada_deploy_site.py | 43 +- .../dags/common_step_factory.py | 59 +-- .../shipyard_airflow/dags/dag_names.py | 1 + .../shipyard_airflow/dags/destroy_node.py | 3 +- .../dags/drydock_deploy_site.py | 63 ++- .../shipyard_airflow/dags/preflight_checks.py | 29 +- .../dags/validate_site_design.py | 74 ++- .../deckhand_create_site_action_tag.py | 2 +- .../gate/playbooks/airskiff-reduce-site.yaml | 3 +- 15 files changed, 410 insertions(+), 366 deletions(-) delete mode 100644 images/airflow/config/new_log_config.py diff --git a/.gitignore b/.gitignore index 3e5b7e98..930384cf 100644 --- a/.gitignore +++ b/.gitignore @@ -120,6 +120,10 @@ AUTHORS # vscode .vscode/ + +# pycharm +.idea + # tests airship-ucp-shipyard.values.yaml airflow-webserver.pid @@ -128,3 +132,4 @@ airflow.db latest src/bin/shipyard_airflow/shipyard_airflow/config src/bin/shipyard_airflow/shipyard_airflow/webserver_config.py +airflow-runtime diff --git a/charts/shipyard/values.yaml b/charts/shipyard/values.yaml index 621b61ca..32907f28 100644 --- a/charts/shipyard/values.yaml +++ b/charts/shipyard/values.yaml @@ -476,8 +476,8 @@ conf: - broker_url - result_backend - fernet_key - lazy_discover_providers: "False" - lazy_load_plugins: "False" + lazy_discover_providers: "True" + lazy_load_plugins: "True" hostname_callable: "socket:getfqdn" default_timezone: "utc" executor: "CeleryExecutor" @@ -526,7 +526,7 @@ conf: # See image-bundled log_config.py. # Adds console logging of task/step logs. # logging_config_class: log_config.LOGGING_CONFIG - logging_config_class: new_log_config.LOGGING_CONFIG + logging_config_class: log_config.DEFAULT_LOGGING_CONFIG # NOTE: Airflow 1.10 introduces extra newline characters between log # records. Version 1.10.1 should resolve this issue # https://issues.apache.org/jira/browse/AIRFLOW-1917 @@ -544,9 +544,9 @@ conf: log_filename_template: "{{ ti.dag_id }}/{{ ti.task_id }}/{{ execution_date.strftime('%%Y-%%m-%%dT%%H:%%M:%%S') }}/{{ try_number }}.log" log_processor_filename_template: "{{ filename }}.log" dag_processor_manager_log_location: /usr/local/airflow/logs/dag_processor_manager/dag_processor_manager.log - logging_level: "INFO" + logging_level: "DEBUG" fab_logging_level: "WARNING" - celery_logging_level: "INFO" + celery_logging_level: "DEBUG" base_log_folder: /usr/local/airflow/logs remote_logging: "False" remote_log_conn_id: "" diff --git a/images/airflow/config/log_config.py b/images/airflow/config/log_config.py index 5afcbff1..33bf4992 100644 --- a/images/airflow/config/log_config.py +++ b/images/airflow/config/log_config.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file @@ -16,246 +15,327 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -# -# !Important Shipyard/Airflow usage note: -# -# This file is copied from: -# https://github.com/apache/incubator-airflow/blob/master/airflow/config_templates/airflow_local_settings.py -# as this is the recommended way to configure logging as of version 1.10.x -# -# See documentation here: -# https://airflow.readthedocs.io/en/stable/howto/write-logs.html#writing-logs-to-azure-blob-storage -# -# (We are not using azure blob storage at this time, but the included -# instructional steps are pertinent) -# -# Because this file is in the "plugins" directory, it should be referenced -# in the Helm chart's values.yaml as config.log_config.LOGGING_CONFIG -# as opposed to log_config.LOGGING_CONFIG in a new directory in the PYTHONPATH -# as noted in the linked instructions. -# +"""Airflow logging settings.""" +from __future__ import annotations import os +from pathlib import Path +from typing import Any +from urllib.parse import urlsplit -from airflow import configuration as conf -from airflow.utils.file import mkdirs +from airflow.configuration import conf +from airflow.exceptions import AirflowException + +LOG_LEVEL: str = conf.get_mandatory_value("logging", "LOGGING_LEVEL").upper() -# TODO: Logging format and level should be configured -# in this file instead of from airflow.cfg. Currently -# there are other log format and level configurations in -# settings.py and cli.py. Please see AIRFLOW-1455. -LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper() # Flask appbuilder's info level log is very verbose, # so it's set to 'WARN' by default. -FAB_LOG_LEVEL = conf.get('core', 'FAB_LOGGING_LEVEL').upper() +FAB_LOG_LEVEL: str = conf.get_mandatory_value("logging", "FAB_LOGGING_LEVEL").upper() -LOG_FORMAT = conf.get('core', 'LOG_FORMAT') +LOG_FORMAT: str = conf.get_mandatory_value("logging", "LOG_FORMAT") +DAG_PROCESSOR_LOG_FORMAT: str = conf.get_mandatory_value("logging", "DAG_PROCESSOR_LOG_FORMAT") -BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER') +LOG_FORMATTER_CLASS: str = conf.get_mandatory_value( + "logging", "LOG_FORMATTER_CLASS", fallback="airflow.utils.log.timezone_aware.TimezoneAware" +) -PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'CHILD_PROCESS_LOG_DIRECTORY') +COLORED_LOG_FORMAT: str = conf.get_mandatory_value("logging", "COLORED_LOG_FORMAT") -DAG_PROCESSOR_MANAGER_LOG_LOCATION = \ - conf.get('core', 'DAG_PROCESSOR_MANAGER_LOG_LOCATION') +COLORED_LOG: bool = conf.getboolean("logging", "COLORED_CONSOLE_LOG") -FILENAME_TEMPLATE = conf.get('core', 'LOG_FILENAME_TEMPLATE') +COLORED_FORMATTER_CLASS: str = conf.get_mandatory_value("logging", "COLORED_FORMATTER_CLASS") -PROCESSOR_FILENAME_TEMPLATE = conf.get('core', - 'LOG_PROCESSOR_FILENAME_TEMPLATE') +DAG_PROCESSOR_LOG_TARGET: str = conf.get_mandatory_value("logging", "DAG_PROCESSOR_LOG_TARGET") -# Storage bucket url for remote logging -# s3 buckets should start with "s3://" -# gcs buckets should start with "gs://" -# wasb buckets should start with "wasb" -# just to help Airflow select correct handler -REMOTE_BASE_LOG_FOLDER = conf.get('core', 'REMOTE_BASE_LOG_FOLDER') +BASE_LOG_FOLDER: str = conf.get_mandatory_value("logging", "BASE_LOG_FOLDER") -ELASTICSEARCH_HOST = conf.get('elasticsearch', 'ELASTICSEARCH_HOST') +PROCESSOR_LOG_FOLDER: str = conf.get_mandatory_value("scheduler", "CHILD_PROCESS_LOG_DIRECTORY") -LOG_ID_TEMPLATE = conf.get('elasticsearch', 'ELASTICSEARCH_LOG_ID_TEMPLATE') +DAG_PROCESSOR_MANAGER_LOG_LOCATION: str = conf.get_mandatory_value( + "logging", "DAG_PROCESSOR_MANAGER_LOG_LOCATION" +) -END_OF_LOG_MARK = conf.get('elasticsearch', 'ELASTICSEARCH_END_OF_LOG_MARK') +# FILENAME_TEMPLATE only uses in Remote Logging Handlers since Airflow 2.3.3 +# All of these handlers inherited from FileTaskHandler and providing any value rather than None +# would raise deprecation warning. +FILENAME_TEMPLATE: str | None = None -# NOTE: Modified for use by Shipyard/Airflow (rename to LOGGING_CONFIG): -LOGGING_CONFIG = { - 'version': 1, - 'disable_existing_loggers': False, - 'formatters': { - 'airflow': { - 'format': LOG_FORMAT, +PROCESSOR_FILENAME_TEMPLATE: str = conf.get_mandatory_value("logging", "LOG_PROCESSOR_FILENAME_TEMPLATE") + +DEFAULT_LOGGING_CONFIG: dict[str, Any] = { + "version": 1, + "disable_existing_loggers": False, + "formatters": { + "airflow": { + "format": LOG_FORMAT, + "class": LOG_FORMATTER_CLASS, + }, + "airflow_coloured": { + "format": COLORED_LOG_FORMAT if COLORED_LOG else LOG_FORMAT, + "class": COLORED_FORMATTER_CLASS if COLORED_LOG else LOG_FORMATTER_CLASS, + }, + "source_processor": { + "format": DAG_PROCESSOR_LOG_FORMAT, + "class": LOG_FORMATTER_CLASS, }, }, - 'handlers': { + "filters": { + "mask_secrets": { + "()": "airflow.utils.log.secrets_masker.SecretsMasker", + }, + }, + "handlers": { # NOTE: Add a "raw" python console logger. Using 'console' results # in a state of recursion. 'py-console': { 'class': 'logging.StreamHandler', 'formatter': 'airflow', - 'stream': 'ext://sys.stdout' + 'stream': 'ext://sys.stdout', + "filters": ["mask_secrets"], }, - 'console': { - 'class': 'airflow.utils.log.logging_mixin.RedirectStdHandler', - 'formatter': 'airflow', - 'stream': 'sys.stdout' + "console": { + "class": "airflow.utils.log.logging_mixin.RedirectStdHandler", + "formatter": "airflow_coloured", + "stream": "sys.stdout", + "filters": ["mask_secrets"], }, - 'task': { - 'class': 'airflow.utils.log.file_task_handler.FileTaskHandler', - 'formatter': 'airflow', - 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), - 'filename_template': FILENAME_TEMPLATE, + "task": { + "class": "airflow.utils.log.file_task_handler.FileTaskHandler", + "formatter": "airflow", + "base_log_folder": os.path.expanduser(BASE_LOG_FOLDER), + "filters": ["mask_secrets"], + }, + "processor": { + "class": "airflow.utils.log.file_processor_handler.FileProcessorHandler", + "formatter": "airflow", + "base_log_folder": os.path.expanduser(PROCESSOR_LOG_FOLDER), + "filename_template": PROCESSOR_FILENAME_TEMPLATE, + "filters": ["mask_secrets"], + }, + "processor_to_stdout": { + "class": "airflow.utils.log.logging_mixin.RedirectStdHandler", + "formatter": "source_processor", + "stream": "sys.stdout", + "filters": ["mask_secrets"], }, - 'processor': { - 'class': - 'airflow.utils.log.file_processor_handler.FileProcessorHandler', - 'formatter': - 'airflow', - 'base_log_folder': - os.path.expanduser(PROCESSOR_LOG_FOLDER), - 'filename_template': - PROCESSOR_FILENAME_TEMPLATE, - } }, - 'loggers': { - 'airflow.processor': { - 'handlers': ['processor'], - 'level': LOG_LEVEL, - 'propagate': False, + "loggers": { + "airflow.processor": { + "handlers": ["processor_to_stdout" if DAG_PROCESSOR_LOG_TARGET == "stdout" else "processor"], + "level": LOG_LEVEL, + # Set to true here (and reset via set_context) so that if no file is configured we still get logs! + "propagate": True, }, - 'airflow.task': { + "airflow.task": { # NOTE: Modified for use by Shipyard/Airflow (add console logging) # The supplied console logger cannot be used here, as it # Leads to out-of-control memory usage 'handlers': ['task', 'py-console'], - 'level': LOG_LEVEL, - 'propagate': False, + "level": LOG_LEVEL, + # Set to true here (and reset via set_context) so that if no file is configured we still get logs! + "propagate": True, + "filters": ["mask_secrets"], + }, + "flask_appbuilder": { + "handlers": ["console"], + "level": FAB_LOG_LEVEL, + "propagate": True, }, - 'flask_appbuilder': { - # NOTE: Modified this to be "handlers" - 'handlers': ['console'], - 'level': FAB_LOG_LEVEL, - 'propagate': True, - } }, - 'root': { - 'handlers': ['console'], - 'level': LOG_LEVEL, + "root": { + "handlers": ["console"], + "level": LOG_LEVEL, + "filters": ["mask_secrets"], + }, +} + +EXTRA_LOGGER_NAMES: str | None = conf.get("logging", "EXTRA_LOGGER_NAMES", fallback=None) +if EXTRA_LOGGER_NAMES: + new_loggers = { + logger_name.strip(): { + "handlers": ["console"], + "level": LOG_LEVEL, + "propagate": True, + } + for logger_name in EXTRA_LOGGER_NAMES.split(",") } -} + DEFAULT_LOGGING_CONFIG["loggers"].update(new_loggers) -DEFAULT_DAG_PARSING_LOGGING_CONFIG = { - 'handlers': { - 'processor_manager': { - 'class': 'logging.handlers.RotatingFileHandler', - 'formatter': 'airflow', - 'filename': DAG_PROCESSOR_MANAGER_LOG_LOCATION, - 'mode': 'a', - 'maxBytes': 104857600, # 100MB - 'backupCount': 5 +DEFAULT_DAG_PARSING_LOGGING_CONFIG: dict[str, dict[str, dict[str, Any]]] = { + "handlers": { + "processor_manager": { + "class": "airflow.utils.log.non_caching_file_handler.NonCachingRotatingFileHandler", + "formatter": "airflow", + "filename": DAG_PROCESSOR_MANAGER_LOG_LOCATION, + "mode": "a", + "maxBytes": 104857600, # 100MB + "backupCount": 5, } }, - 'loggers': { - 'airflow.processor_manager': { - 'handlers': ['processor_manager'], - 'level': LOG_LEVEL, - 'propagate': False, + "loggers": { + "airflow.processor_manager": { + "handlers": ["processor_manager"], + "level": LOG_LEVEL, + "propagate": False, } - } -} - -REMOTE_HANDLERS = { - 's3': { - 'task': { - 'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler', - 'formatter': 'airflow', - 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), - 's3_log_folder': REMOTE_BASE_LOG_FOLDER, - 'filename_template': FILENAME_TEMPLATE, - }, - 'processor': { - 'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler', - 'formatter': 'airflow', - 'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER), - 's3_log_folder': REMOTE_BASE_LOG_FOLDER, - 'filename_template': PROCESSOR_FILENAME_TEMPLATE, - }, - }, - 'gcs': { - 'task': { - 'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler', - 'formatter': 'airflow', - 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), - 'gcs_log_folder': REMOTE_BASE_LOG_FOLDER, - 'filename_template': FILENAME_TEMPLATE, - }, - 'processor': { - 'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler', - 'formatter': 'airflow', - 'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER), - 'gcs_log_folder': REMOTE_BASE_LOG_FOLDER, - 'filename_template': PROCESSOR_FILENAME_TEMPLATE, - }, - }, - 'wasb': { - 'task': { - 'class': 'airflow.utils.log.wasb_task_handler.WasbTaskHandler', - 'formatter': 'airflow', - 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), - 'wasb_log_folder': REMOTE_BASE_LOG_FOLDER, - 'wasb_container': 'airflow-logs', - 'filename_template': FILENAME_TEMPLATE, - 'delete_local_copy': False, - }, - 'processor': { - 'class': 'airflow.utils.log.wasb_task_handler.WasbTaskHandler', - 'formatter': 'airflow', - 'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER), - 'wasb_log_folder': REMOTE_BASE_LOG_FOLDER, - 'wasb_container': 'airflow-logs', - 'filename_template': PROCESSOR_FILENAME_TEMPLATE, - 'delete_local_copy': False, - }, - }, - 'elasticsearch': { - 'task': { - 'class': - 'airflow.utils.log.es_task_handler.ElasticsearchTaskHandler', - 'formatter': 'airflow', - 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), - 'log_id_template': LOG_ID_TEMPLATE, - 'filename_template': FILENAME_TEMPLATE, - 'end_of_log_mark': END_OF_LOG_MARK, - 'host': ELASTICSEARCH_HOST, - }, }, } -# NOTE: Modified for use by Shipyard/Airflow to "getboolean" as existing -# code of conf.get would evaluate "False" as true. -REMOTE_LOGGING = conf.getboolean('core', 'remote_logging') +# Only update the handlers and loggers when CONFIG_PROCESSOR_MANAGER_LOGGER is set. +# This is to avoid exceptions when initializing RotatingFileHandler multiple times +# in multiple processes. +if os.environ.get("CONFIG_PROCESSOR_MANAGER_LOGGER") == "True": + DEFAULT_LOGGING_CONFIG["handlers"].update(DEFAULT_DAG_PARSING_LOGGING_CONFIG["handlers"]) + DEFAULT_LOGGING_CONFIG["loggers"].update(DEFAULT_DAG_PARSING_LOGGING_CONFIG["loggers"]) -# Only update the handlers and loggers when CONFIG_PROCESSOR_MANAGER_LOGGER is -# set. -# This is to avoid exceptions when initializing RotatingFileHandler multiple -# times in multiple processes. -if os.environ.get('CONFIG_PROCESSOR_MANAGER_LOGGER') == 'True': - LOGGING_CONFIG['handlers'] \ - .update(DEFAULT_DAG_PARSING_LOGGING_CONFIG['handlers']) - LOGGING_CONFIG['loggers'] \ - .update(DEFAULT_DAG_PARSING_LOGGING_CONFIG['loggers']) + # Manually create log directory for processor_manager handler as RotatingFileHandler + # will only create file but not the directory. + processor_manager_handler_config: dict[str, Any] = DEFAULT_DAG_PARSING_LOGGING_CONFIG["handlers"][ + "processor_manager" + ] + directory: str = os.path.dirname(processor_manager_handler_config["filename"]) + Path(directory).mkdir(parents=True, exist_ok=True, mode=0o755) - # Manually create log directory for processor_manager handler as - # RotatingFileHandler will only create file but not the directory. - processor_manager_handler_config = DEFAULT_DAG_PARSING_LOGGING_CONFIG[ - 'handlers']['processor_manager'] - directory = os.path.dirname(processor_manager_handler_config['filename']) - mkdirs(directory, 0o755) +################## +# Remote logging # +################## -if REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('s3://'): - LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['s3']) -elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('gs://'): - LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['gcs']) -elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('wasb'): - LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['wasb']) -elif REMOTE_LOGGING and ELASTICSEARCH_HOST: - LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['elasticsearch']) +REMOTE_LOGGING: bool = conf.getboolean("logging", "remote_logging") + +if REMOTE_LOGGING: + + ELASTICSEARCH_HOST: str | None = conf.get("elasticsearch", "HOST") + + # Storage bucket URL for remote logging + # S3 buckets should start with "s3://" + # Cloudwatch log groups should start with "cloudwatch://" + # GCS buckets should start with "gs://" + # WASB buckets should start with "wasb" + # HDFS path should start with "hdfs://" + # just to help Airflow select correct handler + REMOTE_BASE_LOG_FOLDER: str = conf.get_mandatory_value("logging", "REMOTE_BASE_LOG_FOLDER") + REMOTE_TASK_HANDLER_KWARGS = conf.getjson("logging", "REMOTE_TASK_HANDLER_KWARGS", fallback={}) + + if REMOTE_BASE_LOG_FOLDER.startswith("s3://"): + S3_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = { + "task": { + "class": "airflow.providers.amazon.aws.log.s3_task_handler.S3TaskHandler", + "formatter": "airflow", + "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)), + "s3_log_folder": REMOTE_BASE_LOG_FOLDER, + "filename_template": FILENAME_TEMPLATE, + }, + } + + DEFAULT_LOGGING_CONFIG["handlers"].update(S3_REMOTE_HANDLERS) + elif REMOTE_BASE_LOG_FOLDER.startswith("cloudwatch://"): + url_parts = urlsplit(REMOTE_BASE_LOG_FOLDER) + CLOUDWATCH_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = { + "task": { + "class": "airflow.providers.amazon.aws.log.cloudwatch_task_handler.CloudwatchTaskHandler", + "formatter": "airflow", + "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)), + "log_group_arn": url_parts.netloc + url_parts.path, + "filename_template": FILENAME_TEMPLATE, + }, + } + + DEFAULT_LOGGING_CONFIG["handlers"].update(CLOUDWATCH_REMOTE_HANDLERS) + elif REMOTE_BASE_LOG_FOLDER.startswith("gs://"): + key_path = conf.get_mandatory_value("logging", "GOOGLE_KEY_PATH", fallback=None) + GCS_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = { + "task": { + "class": "airflow.providers.google.cloud.log.gcs_task_handler.GCSTaskHandler", + "formatter": "airflow", + "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)), + "gcs_log_folder": REMOTE_BASE_LOG_FOLDER, + "filename_template": FILENAME_TEMPLATE, + "gcp_key_path": key_path, + }, + } + + DEFAULT_LOGGING_CONFIG["handlers"].update(GCS_REMOTE_HANDLERS) + elif REMOTE_BASE_LOG_FOLDER.startswith("wasb"): + WASB_REMOTE_HANDLERS: dict[str, dict[str, str | bool | None]] = { + "task": { + "class": "airflow.providers.microsoft.azure.log.wasb_task_handler.WasbTaskHandler", + "formatter": "airflow", + "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)), + "wasb_log_folder": REMOTE_BASE_LOG_FOLDER, + "wasb_container": "airflow-logs", + "filename_template": FILENAME_TEMPLATE, + }, + } + + DEFAULT_LOGGING_CONFIG["handlers"].update(WASB_REMOTE_HANDLERS) + elif REMOTE_BASE_LOG_FOLDER.startswith("stackdriver://"): + key_path = conf.get_mandatory_value("logging", "GOOGLE_KEY_PATH", fallback=None) + # stackdriver:///airflow-tasks => airflow-tasks + log_name = urlsplit(REMOTE_BASE_LOG_FOLDER).path[1:] + STACKDRIVER_REMOTE_HANDLERS = { + "task": { + "class": "airflow.providers.google.cloud.log.stackdriver_task_handler.StackdriverTaskHandler", + "formatter": "airflow", + "name": log_name, + "gcp_key_path": key_path, + } + } + + DEFAULT_LOGGING_CONFIG["handlers"].update(STACKDRIVER_REMOTE_HANDLERS) + elif REMOTE_BASE_LOG_FOLDER.startswith("oss://"): + OSS_REMOTE_HANDLERS = { + "task": { + "class": "airflow.providers.alibaba.cloud.log.oss_task_handler.OSSTaskHandler", + "formatter": "airflow", + "base_log_folder": os.path.expanduser(BASE_LOG_FOLDER), + "oss_log_folder": REMOTE_BASE_LOG_FOLDER, + "filename_template": FILENAME_TEMPLATE, + }, + } + DEFAULT_LOGGING_CONFIG["handlers"].update(OSS_REMOTE_HANDLERS) + elif REMOTE_BASE_LOG_FOLDER.startswith("hdfs://"): + HDFS_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = { + "task": { + "class": "airflow.providers.apache.hdfs.log.hdfs_task_handler.HdfsTaskHandler", + "formatter": "airflow", + "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)), + "hdfs_log_folder": REMOTE_BASE_LOG_FOLDER, + "filename_template": FILENAME_TEMPLATE, + }, + } + DEFAULT_LOGGING_CONFIG["handlers"].update(HDFS_REMOTE_HANDLERS) + elif ELASTICSEARCH_HOST: + ELASTICSEARCH_END_OF_LOG_MARK: str = conf.get_mandatory_value("elasticsearch", "END_OF_LOG_MARK") + ELASTICSEARCH_FRONTEND: str = conf.get_mandatory_value("elasticsearch", "frontend") + ELASTICSEARCH_WRITE_STDOUT: bool = conf.getboolean("elasticsearch", "WRITE_STDOUT") + ELASTICSEARCH_JSON_FORMAT: bool = conf.getboolean("elasticsearch", "JSON_FORMAT") + ELASTICSEARCH_JSON_FIELDS: str = conf.get_mandatory_value("elasticsearch", "JSON_FIELDS") + ELASTICSEARCH_HOST_FIELD: str = conf.get_mandatory_value("elasticsearch", "HOST_FIELD") + ELASTICSEARCH_OFFSET_FIELD: str = conf.get_mandatory_value("elasticsearch", "OFFSET_FIELD") + + ELASTIC_REMOTE_HANDLERS: dict[str, dict[str, str | bool | None]] = { + "task": { + "class": "airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler", + "formatter": "airflow", + "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)), + "filename_template": FILENAME_TEMPLATE, + "end_of_log_mark": ELASTICSEARCH_END_OF_LOG_MARK, + "host": ELASTICSEARCH_HOST, + "frontend": ELASTICSEARCH_FRONTEND, + "write_stdout": ELASTICSEARCH_WRITE_STDOUT, + "json_format": ELASTICSEARCH_JSON_FORMAT, + "json_fields": ELASTICSEARCH_JSON_FIELDS, + "host_field": ELASTICSEARCH_HOST_FIELD, + "offset_field": ELASTICSEARCH_OFFSET_FIELD, + }, + } + + DEFAULT_LOGGING_CONFIG["handlers"].update(ELASTIC_REMOTE_HANDLERS) + else: + raise AirflowException( + "Incorrect remote log configuration. Please check the configuration of option 'host' in " + "section 'elasticsearch' if you are using Elasticsearch. In the other case, " + "'remote_base_log_folder' option in the 'logging' section." + ) + DEFAULT_LOGGING_CONFIG["handlers"]["task"].update(REMOTE_TASK_HANDLER_KWARGS) diff --git a/images/airflow/config/new_log_config.py b/images/airflow/config/new_log_config.py deleted file mode 100644 index 3394d32d..00000000 --- a/images/airflow/config/new_log_config.py +++ /dev/null @@ -1,4 +0,0 @@ -from copy import deepcopy -from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG - -LOGGING_CONFIG = deepcopy(DEFAULT_LOGGING_CONFIG) \ No newline at end of file diff --git a/src/bin/shipyard_airflow/shipyard_airflow/control/action/actions_id_api.py b/src/bin/shipyard_airflow/shipyard_airflow/control/action/actions_id_api.py index ca7df80e..edfa4558 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/control/action/actions_id_api.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/control/action/actions_id_api.py @@ -89,12 +89,12 @@ class ActionsIdResource(BaseResource): action['notes'].append(note.view()) return action - def get_dag_run_by_id(self, dag_id, run_id): + def get_dag_run_by_id(self, dag_id, execution_date): """ Wrapper for call to the airflow db to get a dag_run :returns: a dag run dictionary """ - dag_run_list = self.get_dag_run_db(dag_id, run_id) + dag_run_list = self.get_dag_run_db(dag_id, execution_date) # should be only one result, return the first one if dag_run_list: return dag_run_list[0] diff --git a/src/bin/shipyard_airflow/shipyard_airflow/control/af_monitoring/workflows_api.py b/src/bin/shipyard_airflow/shipyard_airflow/control/af_monitoring/workflows_api.py index aa418ddc..9d368b31 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/control/af_monitoring/workflows_api.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/control/af_monitoring/workflows_api.py @@ -76,7 +76,7 @@ class WorkflowIdResource(BaseResource): """ Retrieve a workflow by id, :param helper: The WorkflowHelper constructed for this invocation - :param workflow_id: a string in {dag_id}__{run_id} format + :param workflow_id: a string in {dag_id}__{execution} format identifying a workflow :returns: a workflow detail dictionary including steps """ diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/armada_deploy_site.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/armada_deploy_site.py index 868edc4f..8a2dd0f3 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/armada_deploy_site.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/dags/armada_deploy_site.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from airflow.models import DAG +from airflow.utils.task_group import TaskGroup try: from airflow.operators import ArmadaGetReleasesOperator @@ -26,30 +26,29 @@ except ImportError: from shipyard_airflow.dags.config_path import config_path -def deploy_site_armada(parent_dag_name, child_dag_name, args): +def deploy_site_armada(dag): ''' - Armada Subdag + Armada TaskGroup ''' - dag = DAG( - '{}.{}'.format(parent_dag_name, child_dag_name), - default_args=args) + with TaskGroup(group_id="armada_build", dag=dag) as armada_build: + """Generate the armada post_apply step - # Armada Apply - armada_post_apply = ArmadaPostApplyOperator( - task_id='armada_post_apply', - shipyard_conf=config_path, - main_dag_name=parent_dag_name, - retries=5, - dag=dag) + Armada post_apply does the deployment of helm charts + """ + armada_post_apply = ArmadaPostApplyOperator( + task_id='armada_post_apply', + shipyard_conf=config_path, + retries=5, + dag=dag) + """Generate the armada get_releases step - # Get Helm Releases - armada_get_releases = ArmadaGetReleasesOperator( - task_id='armada_get_releases', - shipyard_conf=config_path, - main_dag_name=parent_dag_name, - dag=dag) + Armada get_releases does the verification of releases of helm charts + """ + armada_get_releases = ArmadaGetReleasesOperator( + task_id='armada_get_releases', + shipyard_conf=config_path, + dag=dag) - # Define dependencies - armada_get_releases.set_upstream(armada_post_apply) + armada_post_apply >> armada_get_releases - return dag + return armada_build diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/common_step_factory.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/common_step_factory.py index 3e964394..9424322c 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/common_step_factory.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/dags/common_step_factory.py @@ -125,19 +125,14 @@ class CommonStepFactory(object): on_failure_callback=step_failure_handler, dag=self.dag) - def get_preflight(self, task_id=dn.ALL_PREFLIGHT_CHECKS_DAG_NAME): + def get_preflight(self): """Generate the preflight step Preflight checks preconditions for running a DAG """ - return SubDagOperator( - subdag=all_preflight_checks( - self.parent_dag_name, - task_id, - args=self.default_args), - task_id=task_id, - on_failure_callback=step_failure_handler, - dag=self.dag) + return all_preflight_checks( + self.dag + ) def get_get_rendered_doc(self, task_id=dn.GET_RENDERED_DOC): """Generate the get deckhand rendered doc step @@ -152,23 +147,16 @@ class CommonStepFactory(object): on_failure_callback=step_failure_handler, dag=self.dag) - def get_validate_site_design(self, - targets=None, - task_id=dn.VALIDATE_SITE_DESIGN_DAG_NAME): + def get_validate_site_design(self, targets=None): """Generate the validate site design step Validation of the site design checks that the design to be used for a deployment passes checks before using it. """ - return SubDagOperator( - subdag=validate_site_design( - self.parent_dag_name, - task_id, - args=self.default_args, - targets=targets), - task_id=task_id, - on_failure_callback=step_failure_handler, - dag=self.dag) + return validate_site_design( + self.dag, + targets=targets + ) def get_deployment_configuration(self, task_id=dn.DEPLOYMENT_CONFIGURATION): @@ -184,21 +172,15 @@ class CommonStepFactory(object): on_failure_callback=step_failure_handler, dag=self.dag) - def get_drydock_build(self, task_id=dn.DRYDOCK_BUILD_DAG_NAME, - verify_nodes_exist=False): + def get_drydock_build(self, verify_nodes_exist=False): """Generate the drydock build step Drydock build does the hardware provisioning. """ - return SubDagOperator( - subdag=deploy_site_drydock( - self.parent_dag_name, - task_id, - args=self.default_args, - verify_nodes_exist=verify_nodes_exist), - task_id=task_id, - on_failure_callback=step_failure_handler, - dag=self.dag) + return deploy_site_drydock( + self.dag, + verify_nodes_exist=verify_nodes_exist + ) def get_relabel_nodes(self, task_id=dn.RELABEL_NODES_DAG_NAME): """Generate the relabel nodes step @@ -212,19 +194,14 @@ class CommonStepFactory(object): on_failure_callback=step_failure_handler, dag=self.dag) - def get_armada_build(self, task_id=dn.ARMADA_BUILD_DAG_NAME): + def get_armada_build(self): """Generate the armada build step Armada build does the deployment of helm charts """ - return SubDagOperator( - subdag=deploy_site_armada( - self.parent_dag_name, - task_id, - args=self.default_args), - task_id=task_id, - on_failure_callback=step_failure_handler, - dag=self.dag) + return deploy_site_armada( + self.dag + ) def get_armada_test_releases(self, task_id=dn.ARMADA_TEST_RELEASES): """Generate the armada_test_releases step diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/dag_names.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/dag_names.py index 854f3491..a24e3fc6 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/dag_names.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/dags/dag_names.py @@ -35,6 +35,7 @@ DESTROY_SERVER = 'destroy_nodes' DEPLOYMENT_STATUS = 'deployment_status' FINAL_DEPLOYMENT_STATUS = 'final_deployment_status' + # Define a list of critical steps, used to determine successfulness of a # still-running DAG CRITICAL_DAG_STEPS = [ diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/destroy_node.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/destroy_node.py index 3926fbcb..6ac45d10 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/destroy_node.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/dags/destroy_node.py @@ -47,7 +47,8 @@ def destroy_server(parent_dag_name, child_dag_name, args): """ dag = DAG( '{}.{}'.format(parent_dag_name, child_dag_name), - default_args=args) + default_args=args, + schedule_interval=None) # Drain Node promenade_drain_node = PromenadeDrainNodeOperator( diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/drydock_deploy_site.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/drydock_deploy_site.py index d4cf16ca..44c7d51b 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/drydock_deploy_site.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/dags/drydock_deploy_site.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from airflow.models import DAG +from airflow.utils.task_group import TaskGroup try: from airflow.operators import DrydockNodesOperator @@ -32,46 +32,39 @@ except ImportError: from shipyard_airflow.dags.config_path import config_path -def deploy_site_drydock(parent_dag_name, child_dag_name, args, - verify_nodes_exist=False): +def deploy_site_drydock(dag, verify_nodes_exist=False): ''' - DryDock Subdag + DryDock TaskGroup ''' - dag = DAG( - '{}.{}'.format(parent_dag_name, child_dag_name), - default_args=args) + with TaskGroup(group_id="drydock_build", dag=dag) as drydock_build: - if verify_nodes_exist: - drydock_verify_nodes_exist = DrydockVerifyNodesExistOperator( - task_id='verify_nodes_exist', + if verify_nodes_exist: + drydock_verify_nodes_exist = DrydockVerifyNodesExistOperator( + task_id='verify_nodes_exist', + shipyard_conf=config_path, + dag=dag) + + drydock_verify_site = DrydockVerifySiteOperator( + task_id='verify_site', shipyard_conf=config_path, - main_dag_name=parent_dag_name, dag=dag) - drydock_verify_site = DrydockVerifySiteOperator( - task_id='verify_site', - shipyard_conf=config_path, - main_dag_name=parent_dag_name, - dag=dag) + drydock_prepare_site = DrydockPrepareSiteOperator( + task_id='prepare_site', + shipyard_conf=config_path, + dag=dag) - drydock_prepare_site = DrydockPrepareSiteOperator( - task_id='prepare_site', - shipyard_conf=config_path, - main_dag_name=parent_dag_name, - dag=dag) + drydock_nodes = DrydockNodesOperator( + task_id='prepare_and_deploy_nodes', + shipyard_conf=config_path, + dag=dag) - drydock_nodes = DrydockNodesOperator( - task_id='prepare_and_deploy_nodes', - shipyard_conf=config_path, - main_dag_name=parent_dag_name, - dag=dag) + # Define dependencies + drydock_prepare_site.set_upstream(drydock_verify_site) + if verify_nodes_exist: + drydock_verify_nodes_exist.set_upstream(drydock_prepare_site) + drydock_nodes.set_upstream(drydock_verify_nodes_exist) + else: + drydock_nodes.set_upstream(drydock_prepare_site) - # Define dependencies - drydock_prepare_site.set_upstream(drydock_verify_site) - if verify_nodes_exist: - drydock_verify_nodes_exist.set_upstream(drydock_prepare_site) - drydock_nodes.set_upstream(drydock_verify_nodes_exist) - else: - drydock_nodes.set_upstream(drydock_prepare_site) - - return dag + return drydock_build diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/preflight_checks.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/preflight_checks.py index 6b0c1e1f..fa9e0859 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/preflight_checks.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/dags/preflight_checks.py @@ -11,7 +11,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from airflow.models import DAG + +from airflow.utils.task_group import TaskGroup try: from airflow.operators import UcpHealthCheckOperator @@ -22,22 +23,18 @@ except ImportError: from shipyard_airflow.dags.config_path import config_path -def all_preflight_checks(parent_dag_name, child_dag_name, args): +def all_preflight_checks(dag): ''' Pre-Flight Checks Subdag ''' - dag = DAG( - '{}.{}'.format(parent_dag_name, child_dag_name), - default_args=args) + with TaskGroup(group_id="preflight", dag=dag) as preflight: + ''' + Check that all Airship components are in good state for the purposes + of the Undercloud Platform to proceed with processing. + ''' + shipyard = UcpHealthCheckOperator( + task_id='ucp_preflight_check', + shipyard_conf=config_path, + dag=dag) - ''' - Check that all Airship components are in good state for the purposes - of the Undercloud Platform to proceed with processing. - ''' - shipyard = UcpHealthCheckOperator( - task_id='ucp_preflight_check', - shipyard_conf=config_path, - main_dag_name=parent_dag_name, - dag=dag) - - return dag + return preflight diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/validate_site_design.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/validate_site_design.py index 73391015..16d40b8b 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/validate_site_design.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/dags/validate_site_design.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from airflow.models import DAG +from airflow.utils.task_group import TaskGroup try: from airflow.operators import ArmadaValidateDesignOperator @@ -35,53 +35,49 @@ BAREMETAL = 'baremetal' SOFTWARE = 'software' -def validate_site_design(parent_dag_name, child_dag_name, args, targets=None): - """Subdag to delegate design verification to the Airship components +def validate_site_design(dag, targets=None): + """TaskGroup to delegate design verification to the Airship components There is no wiring of steps - they all execute in parallel """ - dag = DAG( - '{}.{}'.format(parent_dag_name, child_dag_name), - default_args=args) + with TaskGroup(group_id="validate_site_design", + dag=dag) as validate_site_design: - if targets is None: - targets = [BAREMETAL, SOFTWARE] + if targets is None: + targets = [BAREMETAL, SOFTWARE] - # Always add Deckhand validations - DeckhandValidateSiteDesignOperator( - task_id='deckhand_validate_site_design', - shipyard_conf=config_path, - main_dag_name=parent_dag_name, - retries=1, - dag=dag - ) - - if BAREMETAL in targets: - # Add Drydock and Promenade validations - DrydockValidateDesignOperator( - task_id='drydock_validate_site_design', + # Always add Deckhand validations + deckhand_validate_site_design = DeckhandValidateSiteDesignOperator( + task_id='deckhand_validate_site_design', shipyard_conf=config_path, - main_dag_name=parent_dag_name, retries=1, dag=dag ) - PromenadeValidateSiteDesignOperator( - task_id='promenade_validate_site_design', - shipyard_conf=config_path, - main_dag_name=parent_dag_name, - retries=1, - dag=dag - ) + if BAREMETAL in targets: + # Add Drydock and Promenade validations + drydock_validate_site_design = DrydockValidateDesignOperator( + task_id='drydock_validate_site_design', + shipyard_conf=config_path, + retries=1, + dag=dag + ) - if SOFTWARE in targets: - # Add Armada validations - ArmadaValidateDesignOperator( - task_id='armada_validate_site_design', - shipyard_conf=config_path, - main_dag_name=parent_dag_name, - retries=1, - dag=dag - ) + promenade_validate_site_design = \ + PromenadeValidateSiteDesignOperator( + task_id='promenade_validate_site_design', + shipyard_conf=config_path, + retries=1, + dag=dag + ) - return dag + if SOFTWARE in targets: + # Add Armada validations + armada_validate_site_design = ArmadaValidateDesignOperator( + task_id='armada_validate_site_design', + shipyard_conf=config_path, + retries=1, + dag=dag + ) + + return validate_site_design diff --git a/src/bin/shipyard_airflow/shipyard_airflow/plugins/deckhand_create_site_action_tag.py b/src/bin/shipyard_airflow/shipyard_airflow/plugins/deckhand_create_site_action_tag.py index 75c83343..a00bf4d8 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/plugins/deckhand_create_site_action_tag.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/plugins/deckhand_create_site_action_tag.py @@ -116,7 +116,7 @@ class DeckhandCreateSiteActionTagOperator(DeckhandBaseOperator): def check_workflow_result(self): # Initialize Variables - task = ['armada_build'] + task = ['armada_build.armada_get_releases'] task_result = {} if self.main_dag_name in ['update_site', 'update_software']: diff --git a/tools/gate/playbooks/airskiff-reduce-site.yaml b/tools/gate/playbooks/airskiff-reduce-site.yaml index 7c80ed77..0df60169 100644 --- a/tools/gate/playbooks/airskiff-reduce-site.yaml +++ b/tools/gate/playbooks/airskiff-reduce-site.yaml @@ -17,7 +17,6 @@ - name: Overwrite Armada manifest shell: | git checkout v1.9 - mv tools/gate/manifests/full-site.yaml \ - type/skiff/manifests/full-site.yaml + mv tools/gate/manifests/full-site.yaml type/skiff/manifests/full-site.yaml args: chdir: "{{ zuul.projects['opendev.org/airship/treasuremap'].src_dir }}"