diff --git a/.gitignore b/.gitignore index 3e5b7e98..94cb9589 100644 --- a/.gitignore +++ b/.gitignore @@ -128,3 +128,4 @@ airflow.db latest src/bin/shipyard_airflow/shipyard_airflow/config src/bin/shipyard_airflow/shipyard_airflow/webserver_config.py +airflow-runtime diff --git a/charts/shipyard/values.yaml b/charts/shipyard/values.yaml index 621b61ca..dea95b0f 100644 --- a/charts/shipyard/values.yaml +++ b/charts/shipyard/values.yaml @@ -476,8 +476,8 @@ conf: - broker_url - result_backend - fernet_key - lazy_discover_providers: "False" - lazy_load_plugins: "False" + lazy_discover_providers: "True" + lazy_load_plugins: "True" hostname_callable: "socket:getfqdn" default_timezone: "utc" executor: "CeleryExecutor" @@ -526,7 +526,7 @@ conf: # See image-bundled log_config.py. # Adds console logging of task/step logs. # logging_config_class: log_config.LOGGING_CONFIG - logging_config_class: new_log_config.LOGGING_CONFIG + logging_config_class: new_log_config.DEFAULT_LOGGING_CONFIG # NOTE: Airflow 1.10 introduces extra newline characters between log # records. Version 1.10.1 should resolve this issue # https://issues.apache.org/jira/browse/AIRFLOW-1917 @@ -544,9 +544,9 @@ conf: log_filename_template: "{{ ti.dag_id }}/{{ ti.task_id }}/{{ execution_date.strftime('%%Y-%%m-%%dT%%H:%%M:%%S') }}/{{ try_number }}.log" log_processor_filename_template: "{{ filename }}.log" dag_processor_manager_log_location: /usr/local/airflow/logs/dag_processor_manager/dag_processor_manager.log - logging_level: "INFO" + logging_level: "DEBUG" fab_logging_level: "WARNING" - celery_logging_level: "INFO" + celery_logging_level: "DEBUG" base_log_folder: /usr/local/airflow/logs remote_logging: "False" remote_log_conn_id: "" diff --git a/images/airflow/config/new_log_config.py b/images/airflow/config/new_log_config.py index 3394d32d..553d0f17 100644 --- a/images/airflow/config/new_log_config.py +++ b/images/airflow/config/new_log_config.py @@ -1,4 +1,341 @@ -from copy import deepcopy -from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Airflow logging settings.""" +from __future__ import annotations -LOGGING_CONFIG = deepcopy(DEFAULT_LOGGING_CONFIG) \ No newline at end of file +import os +from pathlib import Path +from typing import Any +from urllib.parse import urlsplit + +from airflow.configuration import conf +from airflow.exceptions import AirflowException + +LOG_LEVEL: str = conf.get_mandatory_value("logging", "LOGGING_LEVEL").upper() + + +# Flask appbuilder's info level log is very verbose, +# so it's set to 'WARN' by default. +FAB_LOG_LEVEL: str = conf.get_mandatory_value("logging", "FAB_LOGGING_LEVEL").upper() + +LOG_FORMAT: str = conf.get_mandatory_value("logging", "LOG_FORMAT") +DAG_PROCESSOR_LOG_FORMAT: str = conf.get_mandatory_value("logging", "DAG_PROCESSOR_LOG_FORMAT") + +LOG_FORMATTER_CLASS: str = conf.get_mandatory_value( + "logging", "LOG_FORMATTER_CLASS", fallback="airflow.utils.log.timezone_aware.TimezoneAware" +) + +COLORED_LOG_FORMAT: str = conf.get_mandatory_value("logging", "COLORED_LOG_FORMAT") + +COLORED_LOG: bool = conf.getboolean("logging", "COLORED_CONSOLE_LOG") + +COLORED_FORMATTER_CLASS: str = conf.get_mandatory_value("logging", "COLORED_FORMATTER_CLASS") + +DAG_PROCESSOR_LOG_TARGET: str = conf.get_mandatory_value("logging", "DAG_PROCESSOR_LOG_TARGET") + +BASE_LOG_FOLDER: str = conf.get_mandatory_value("logging", "BASE_LOG_FOLDER") + +PROCESSOR_LOG_FOLDER: str = conf.get_mandatory_value("scheduler", "CHILD_PROCESS_LOG_DIRECTORY") + +DAG_PROCESSOR_MANAGER_LOG_LOCATION: str = conf.get_mandatory_value( + "logging", "DAG_PROCESSOR_MANAGER_LOG_LOCATION" +) + +# FILENAME_TEMPLATE only uses in Remote Logging Handlers since Airflow 2.3.3 +# All of these handlers inherited from FileTaskHandler and providing any value rather than None +# would raise deprecation warning. +FILENAME_TEMPLATE: str | None = None + +PROCESSOR_FILENAME_TEMPLATE: str = conf.get_mandatory_value("logging", "LOG_PROCESSOR_FILENAME_TEMPLATE") + +DEFAULT_LOGGING_CONFIG: dict[str, Any] = { + "version": 1, + "disable_existing_loggers": False, + "formatters": { + "airflow": { + "format": LOG_FORMAT, + "class": LOG_FORMATTER_CLASS, + }, + "airflow_coloured": { + "format": COLORED_LOG_FORMAT if COLORED_LOG else LOG_FORMAT, + "class": COLORED_FORMATTER_CLASS if COLORED_LOG else LOG_FORMATTER_CLASS, + }, + "source_processor": { + "format": DAG_PROCESSOR_LOG_FORMAT, + "class": LOG_FORMATTER_CLASS, + }, + }, + "filters": { + "mask_secrets": { + "()": "airflow.utils.log.secrets_masker.SecretsMasker", + }, + }, + "handlers": { + # NOTE: Add a "raw" python console logger. Using 'console' results + # in a state of recursion. + 'py-console': { + 'class': 'logging.StreamHandler', + 'formatter': 'airflow', + 'stream': 'ext://sys.stdout', + "filters": ["mask_secrets"], + }, + "console": { + "class": "airflow.utils.log.logging_mixin.RedirectStdHandler", + "formatter": "airflow_coloured", + "stream": "sys.stdout", + "filters": ["mask_secrets"], + }, + "task": { + "class": "airflow.utils.log.file_task_handler.FileTaskHandler", + "formatter": "airflow", + "base_log_folder": os.path.expanduser(BASE_LOG_FOLDER), + "filters": ["mask_secrets"], + }, + "processor": { + "class": "airflow.utils.log.file_processor_handler.FileProcessorHandler", + "formatter": "airflow", + "base_log_folder": os.path.expanduser(PROCESSOR_LOG_FOLDER), + "filename_template": PROCESSOR_FILENAME_TEMPLATE, + "filters": ["mask_secrets"], + }, + "processor_to_stdout": { + "class": "airflow.utils.log.logging_mixin.RedirectStdHandler", + "formatter": "source_processor", + "stream": "sys.stdout", + "filters": ["mask_secrets"], + }, + }, + "loggers": { + "airflow.processor": { + "handlers": ["processor_to_stdout" if DAG_PROCESSOR_LOG_TARGET == "stdout" else "processor"], + "level": LOG_LEVEL, + # Set to true here (and reset via set_context) so that if no file is configured we still get logs! + "propagate": True, + }, + "airflow.task": { + # NOTE: Modified for use by Shipyard/Airflow (add console logging) + # The supplied console logger cannot be used here, as it + # Leads to out-of-control memory usage + 'handlers': ['task', 'py-console'], + "level": LOG_LEVEL, + # Set to true here (and reset via set_context) so that if no file is configured we still get logs! + "propagate": True, + "filters": ["mask_secrets"], + }, + "flask_appbuilder": { + "handlers": ["console"], + "level": FAB_LOG_LEVEL, + "propagate": True, + }, + }, + "root": { + "handlers": ["console"], + "level": LOG_LEVEL, + "filters": ["mask_secrets"], + }, +} + +EXTRA_LOGGER_NAMES: str | None = conf.get("logging", "EXTRA_LOGGER_NAMES", fallback=None) +if EXTRA_LOGGER_NAMES: + new_loggers = { + logger_name.strip(): { + "handlers": ["console"], + "level": LOG_LEVEL, + "propagate": True, + } + for logger_name in EXTRA_LOGGER_NAMES.split(",") + } + DEFAULT_LOGGING_CONFIG["loggers"].update(new_loggers) + +DEFAULT_DAG_PARSING_LOGGING_CONFIG: dict[str, dict[str, dict[str, Any]]] = { + "handlers": { + "processor_manager": { + "class": "airflow.utils.log.non_caching_file_handler.NonCachingRotatingFileHandler", + "formatter": "airflow", + "filename": DAG_PROCESSOR_MANAGER_LOG_LOCATION, + "mode": "a", + "maxBytes": 104857600, # 100MB + "backupCount": 5, + } + }, + "loggers": { + "airflow.processor_manager": { + "handlers": ["processor_manager"], + "level": LOG_LEVEL, + "propagate": False, + } + }, +} + +# Only update the handlers and loggers when CONFIG_PROCESSOR_MANAGER_LOGGER is set. +# This is to avoid exceptions when initializing RotatingFileHandler multiple times +# in multiple processes. +if os.environ.get("CONFIG_PROCESSOR_MANAGER_LOGGER") == "True": + DEFAULT_LOGGING_CONFIG["handlers"].update(DEFAULT_DAG_PARSING_LOGGING_CONFIG["handlers"]) + DEFAULT_LOGGING_CONFIG["loggers"].update(DEFAULT_DAG_PARSING_LOGGING_CONFIG["loggers"]) + + # Manually create log directory for processor_manager handler as RotatingFileHandler + # will only create file but not the directory. + processor_manager_handler_config: dict[str, Any] = DEFAULT_DAG_PARSING_LOGGING_CONFIG["handlers"][ + "processor_manager" + ] + directory: str = os.path.dirname(processor_manager_handler_config["filename"]) + Path(directory).mkdir(parents=True, exist_ok=True, mode=0o755) + +################## +# Remote logging # +################## + +REMOTE_LOGGING: bool = conf.getboolean("logging", "remote_logging") + +if REMOTE_LOGGING: + + ELASTICSEARCH_HOST: str | None = conf.get("elasticsearch", "HOST") + + # Storage bucket URL for remote logging + # S3 buckets should start with "s3://" + # Cloudwatch log groups should start with "cloudwatch://" + # GCS buckets should start with "gs://" + # WASB buckets should start with "wasb" + # HDFS path should start with "hdfs://" + # just to help Airflow select correct handler + REMOTE_BASE_LOG_FOLDER: str = conf.get_mandatory_value("logging", "REMOTE_BASE_LOG_FOLDER") + REMOTE_TASK_HANDLER_KWARGS = conf.getjson("logging", "REMOTE_TASK_HANDLER_KWARGS", fallback={}) + + if REMOTE_BASE_LOG_FOLDER.startswith("s3://"): + S3_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = { + "task": { + "class": "airflow.providers.amazon.aws.log.s3_task_handler.S3TaskHandler", + "formatter": "airflow", + "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)), + "s3_log_folder": REMOTE_BASE_LOG_FOLDER, + "filename_template": FILENAME_TEMPLATE, + }, + } + + DEFAULT_LOGGING_CONFIG["handlers"].update(S3_REMOTE_HANDLERS) + elif REMOTE_BASE_LOG_FOLDER.startswith("cloudwatch://"): + url_parts = urlsplit(REMOTE_BASE_LOG_FOLDER) + CLOUDWATCH_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = { + "task": { + "class": "airflow.providers.amazon.aws.log.cloudwatch_task_handler.CloudwatchTaskHandler", + "formatter": "airflow", + "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)), + "log_group_arn": url_parts.netloc + url_parts.path, + "filename_template": FILENAME_TEMPLATE, + }, + } + + DEFAULT_LOGGING_CONFIG["handlers"].update(CLOUDWATCH_REMOTE_HANDLERS) + elif REMOTE_BASE_LOG_FOLDER.startswith("gs://"): + key_path = conf.get_mandatory_value("logging", "GOOGLE_KEY_PATH", fallback=None) + GCS_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = { + "task": { + "class": "airflow.providers.google.cloud.log.gcs_task_handler.GCSTaskHandler", + "formatter": "airflow", + "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)), + "gcs_log_folder": REMOTE_BASE_LOG_FOLDER, + "filename_template": FILENAME_TEMPLATE, + "gcp_key_path": key_path, + }, + } + + DEFAULT_LOGGING_CONFIG["handlers"].update(GCS_REMOTE_HANDLERS) + elif REMOTE_BASE_LOG_FOLDER.startswith("wasb"): + WASB_REMOTE_HANDLERS: dict[str, dict[str, str | bool | None]] = { + "task": { + "class": "airflow.providers.microsoft.azure.log.wasb_task_handler.WasbTaskHandler", + "formatter": "airflow", + "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)), + "wasb_log_folder": REMOTE_BASE_LOG_FOLDER, + "wasb_container": "airflow-logs", + "filename_template": FILENAME_TEMPLATE, + }, + } + + DEFAULT_LOGGING_CONFIG["handlers"].update(WASB_REMOTE_HANDLERS) + elif REMOTE_BASE_LOG_FOLDER.startswith("stackdriver://"): + key_path = conf.get_mandatory_value("logging", "GOOGLE_KEY_PATH", fallback=None) + # stackdriver:///airflow-tasks => airflow-tasks + log_name = urlsplit(REMOTE_BASE_LOG_FOLDER).path[1:] + STACKDRIVER_REMOTE_HANDLERS = { + "task": { + "class": "airflow.providers.google.cloud.log.stackdriver_task_handler.StackdriverTaskHandler", + "formatter": "airflow", + "name": log_name, + "gcp_key_path": key_path, + } + } + + DEFAULT_LOGGING_CONFIG["handlers"].update(STACKDRIVER_REMOTE_HANDLERS) + elif REMOTE_BASE_LOG_FOLDER.startswith("oss://"): + OSS_REMOTE_HANDLERS = { + "task": { + "class": "airflow.providers.alibaba.cloud.log.oss_task_handler.OSSTaskHandler", + "formatter": "airflow", + "base_log_folder": os.path.expanduser(BASE_LOG_FOLDER), + "oss_log_folder": REMOTE_BASE_LOG_FOLDER, + "filename_template": FILENAME_TEMPLATE, + }, + } + DEFAULT_LOGGING_CONFIG["handlers"].update(OSS_REMOTE_HANDLERS) + elif REMOTE_BASE_LOG_FOLDER.startswith("hdfs://"): + HDFS_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = { + "task": { + "class": "airflow.providers.apache.hdfs.log.hdfs_task_handler.HdfsTaskHandler", + "formatter": "airflow", + "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)), + "hdfs_log_folder": REMOTE_BASE_LOG_FOLDER, + "filename_template": FILENAME_TEMPLATE, + }, + } + DEFAULT_LOGGING_CONFIG["handlers"].update(HDFS_REMOTE_HANDLERS) + elif ELASTICSEARCH_HOST: + ELASTICSEARCH_END_OF_LOG_MARK: str = conf.get_mandatory_value("elasticsearch", "END_OF_LOG_MARK") + ELASTICSEARCH_FRONTEND: str = conf.get_mandatory_value("elasticsearch", "frontend") + ELASTICSEARCH_WRITE_STDOUT: bool = conf.getboolean("elasticsearch", "WRITE_STDOUT") + ELASTICSEARCH_JSON_FORMAT: bool = conf.getboolean("elasticsearch", "JSON_FORMAT") + ELASTICSEARCH_JSON_FIELDS: str = conf.get_mandatory_value("elasticsearch", "JSON_FIELDS") + ELASTICSEARCH_HOST_FIELD: str = conf.get_mandatory_value("elasticsearch", "HOST_FIELD") + ELASTICSEARCH_OFFSET_FIELD: str = conf.get_mandatory_value("elasticsearch", "OFFSET_FIELD") + + ELASTIC_REMOTE_HANDLERS: dict[str, dict[str, str | bool | None]] = { + "task": { + "class": "airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler", + "formatter": "airflow", + "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)), + "filename_template": FILENAME_TEMPLATE, + "end_of_log_mark": ELASTICSEARCH_END_OF_LOG_MARK, + "host": ELASTICSEARCH_HOST, + "frontend": ELASTICSEARCH_FRONTEND, + "write_stdout": ELASTICSEARCH_WRITE_STDOUT, + "json_format": ELASTICSEARCH_JSON_FORMAT, + "json_fields": ELASTICSEARCH_JSON_FIELDS, + "host_field": ELASTICSEARCH_HOST_FIELD, + "offset_field": ELASTICSEARCH_OFFSET_FIELD, + }, + } + + DEFAULT_LOGGING_CONFIG["handlers"].update(ELASTIC_REMOTE_HANDLERS) + else: + raise AirflowException( + "Incorrect remote log configuration. Please check the configuration of option 'host' in " + "section 'elasticsearch' if you are using Elasticsearch. In the other case, " + "'remote_base_log_folder' option in the 'logging' section." + ) + DEFAULT_LOGGING_CONFIG["handlers"]["task"].update(REMOTE_TASK_HANDLER_KWARGS) \ No newline at end of file diff --git a/src/bin/shipyard_airflow/shipyard_airflow/control/action/actions_id_api.py b/src/bin/shipyard_airflow/shipyard_airflow/control/action/actions_id_api.py index ca7df80e..edfa4558 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/control/action/actions_id_api.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/control/action/actions_id_api.py @@ -89,12 +89,12 @@ class ActionsIdResource(BaseResource): action['notes'].append(note.view()) return action - def get_dag_run_by_id(self, dag_id, run_id): + def get_dag_run_by_id(self, dag_id, execution_date): """ Wrapper for call to the airflow db to get a dag_run :returns: a dag run dictionary """ - dag_run_list = self.get_dag_run_db(dag_id, run_id) + dag_run_list = self.get_dag_run_db(dag_id, execution_date) # should be only one result, return the first one if dag_run_list: return dag_run_list[0] diff --git a/src/bin/shipyard_airflow/shipyard_airflow/control/af_monitoring/workflows_api.py b/src/bin/shipyard_airflow/shipyard_airflow/control/af_monitoring/workflows_api.py index aa418ddc..9d368b31 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/control/af_monitoring/workflows_api.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/control/af_monitoring/workflows_api.py @@ -76,7 +76,7 @@ class WorkflowIdResource(BaseResource): """ Retrieve a workflow by id, :param helper: The WorkflowHelper constructed for this invocation - :param workflow_id: a string in {dag_id}__{run_id} format + :param workflow_id: a string in {dag_id}__{execution} format identifying a workflow :returns: a workflow detail dictionary including steps """ diff --git a/tools/gate/playbooks/airskiff-deploy.yaml b/tools/gate/playbooks/airskiff-deploy.yaml index f89455c4..68c007c3 100644 --- a/tools/gate/playbooks/airskiff-deploy.yaml +++ b/tools/gate/playbooks/airskiff-deploy.yaml @@ -21,6 +21,9 @@ - ensure-python - ensure-pip - ensure-docker + - role: add-authorized-keys + public_keys: + - public_key: ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQDA7eM8WFJrqQmki8rR0O3QBHyl8xq42jb1RduwuRwjWoGYJI5cX7Fx+7VR4A9ITCoiqxKS8DMfgKbt5jKC6SmvMALULZsnYlthB34KywurgxsW6fgp68DHWQ7J4CCBhoIpl0W3JW7s6b0vHLhab59r0E+AYemBVuWUqbFEy8nDAHcQv1S/2o1udhmljIN7c2ogO4KAJ7Lge0BoIP9ps4u6AVwyQZixp4anU9DHGNA/UQj4M5UyuALj5buEAuATBe9Vqj4sOvZjObPJAGPUrNRrGEWAFk+lSZHRzKXo0eeWtPqoh5UN9UDb5Pocg1krncMIZwjHKovlD1z/O1y91aY5LM1wxm/7aaIiX8eCihyVZaOuDCLF7WDT2SMs7ABcotX2MDtVQTrNNV3MmMAScFNDflzPKszd7cdjLl6PBq8bvPxmCkLmnitPTGOoh9d8i+JlbINvgx1pguYrpeciIyreCO1rjTW3MgB0tyoMEa31V+7HrauBMeNnE68YTqLTIB0= smarkin@mirantis.com tasks: diff --git a/tools/gate/playbooks/airskiff-reduce-site.yaml b/tools/gate/playbooks/airskiff-reduce-site.yaml index 7c80ed77..41eaee46 100644 --- a/tools/gate/playbooks/airskiff-reduce-site.yaml +++ b/tools/gate/playbooks/airskiff-reduce-site.yaml @@ -17,7 +17,7 @@ - name: Overwrite Armada manifest shell: | git checkout v1.9 - mv tools/gate/manifests/full-site.yaml \ - type/skiff/manifests/full-site.yaml + # commented out to force deploy more openstack components + # mv tools/gate/manifests/full-site.yaml type/skiff/manifests/full-site.yaml args: chdir: "{{ zuul.projects['opendev.org/airship/treasuremap'].src_dir }}"