在 Airflow 中设置 S3 以记录日志

2025-03-17 09:11:00
admin
原创
56
摘要:问题描述:我正在使用 docker-compose 设置可扩展的 airflow 集群。我的方法基于这个 Dockerfile https://hub.docker.com/r/puckel/docker-airflow/我的问题是设置日志以从 s3 写入/读取。当 dag 完成时,我收到如下错误*** Lo...

问题描述:

我正在使用 docker-compose 设置可扩展的 airflow 集群。我的方法基于这个 Dockerfile https://hub.docker.com/r/puckel/docker-airflow/

我的问题是设置日志以从 s3 写入/读取。当 dag 完成时,我收到如下错误

*** Log file isn't local.
*** Fetching here: http://ea43d4d49f35:8793/log/xxxxxxx/2017-06-26T11:00:00
*** Failed to fetch log file from worker.

*** Reading remote logs...
Could not read logs from s3://buckets/xxxxxxx/airflow/logs/xxxxxxx/2017-06-
26T11:00:00

我在文件中设置了一个新部分,airflow.cfg如下所示

[MyS3Conn]
aws_access_key_id = xxxxxxx
aws_secret_access_key = xxxxxxx
aws_default_region = xxxxxxx

然后在远程日志部分指定 s3 路径airflow.cfg

remote_base_log_folder = s3://buckets/xxxx/airflow/logs
remote_log_conn_id = MyS3Conn

我是否正确设置了它,但有一个错误?我是否遗漏了成功的秘诀?

  • 更新

我尝试以 URI 和 JSON 格式导出,但似乎都不起作用。然后我导出了 aws_access_key_id 和 aws_secret_access_key,然后 airflow 开始拾取它。现在我在工作日志中收到了他的错误

6/30/2017 6:05:59 PMINFO:root:Using connection to: s3
6/30/2017 6:06:00 PMERROR:root:Could not read logs from s3://buckets/xxxxxx/airflow/logs/xxxxx/2017-06-30T23:45:00
6/30/2017 6:06:00 PMERROR:root:Could not write logs to s3://buckets/xxxxxx/airflow/logs/xxxxx/2017-06-30T23:45:00
6/30/2017 6:06:00 PMLogging into: /usr/local/airflow/logs/xxxxx/2017-06-30T23:45:00
  • 更新

我也发现了这个链接
https://www.mail-archive.com/ dev@airflow.incubator.apache.org /msg00462.html

然后我进入我的一台工作机器(与网络服务器和调度程序分开),用 Python 运行了这段代码

import airflow
s3 = airflow.hooks.S3Hook('s3_conn')
s3.load_string('test', airflow.conf.get('core', 'remote_base_log_folder'))

我收到此错误。

boto.exception.S3ResponseError: S3ResponseError: 403 Forbidden

我尝试导出几种不同类型的AIRFLOW_CONN_环境,如连接部分https://airflow.incubator.apache.org/concepts.html中以及该问题的其他答案中所述。

s3://<AWS_ACCESS_KEY_ID>:<AWS_SECRET_ACCESS_KEY>@S3

{"aws_account_id":"<xxxxx>","role_arn":"arn:aws:iam::<xxxx>:role/<xxxxx>"}

{"aws_access_key_id":"<xxxxx>","aws_secret_access_key":"<xxxxx>"}

我还导出了 AWS_ACCESS_KEY_ID 和 AWS_SECRET_ACCESS_KEY,但没有成功。

这些凭证被存储在数据库中,所以一旦我将它们添加到 UI 中,工作人员就会获取它们,但由于某种原因,他们无法写入/读取日志。


解决方案 1:

更新 Airflow 1.10 使日志记录变得更加容易。

对于 s3 日志记录,请按照上述答案设置连接挂钩

然后只需将以下内容添加到 airflow.cfg 中

    [core]
    # Airflow can store logs remotely in AWS S3. Users must supply a remote
    # location URL (starting with either 's3://...') and an Airflow connection
    # id that provides access to the storage location.
    remote_base_log_folder = s3://my-bucket/path/to/logs
    remote_log_conn_id = MyS3Conn
    # Use server-side encryption for logs stored in S3
    encrypt_s3_logs = False

对于 gcs 日志记录,

  1. 首先安装 gcp_api 包,如下所示:pip install apache-airflow[gcp_api]。

  2. 按照上述答案设置连接挂钩

  3. 将以下内容添加到 airflow.cfg

 [core]
 # Airflow can store logs remotely in AWS S3. Users must supply a remote
 # location URL (starting with either 's3://...') and an Airflow connection
 # id that provides access to the storage location.
 remote_logging = True
 remote_base_log_folder = gs://my-bucket/path/to/logs
 remote_log_conn_id = MyGCSConn

注意:从 Airflow 1.9 开始,远程日志记录已发生重大改变。如果您使用的是 1.9,请继续阅读。

参考这里

完整说明:

  1. 创建一个目录来存储配置,并将其放置在可以在 PYTHONPATH 中找到的位置。一个例子是 $AIRFLOW_HOME/config

  2. 创建名为 $AIRFLOW_HOME/config/log_config.py 和 $AIRFLOW_HOME/config/__init__.py 的空文件

  3. 将airflow/config_templates/airflow_local_settings.py的内容复制到上面步骤中刚刚创建的 log_config.py 文件中。

  4. 自定义模板的以下部分:

 #Add this variable to the top of the file. Note the trailing slash.
 S3_LOG_FOLDER = 's3://<bucket where logs should be persisted>/'

 Rename DEFAULT_LOGGING_CONFIG to LOGGING CONFIG
 LOGGING_CONFIG = ...

 Add a S3TaskHandler to the 'handlers' block of the LOGGING_CONFIG variable
 's3.task': {
     'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
     'formatter': 'airflow.task',
     'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
     's3_log_folder': S3_LOG_FOLDER,
     'filename_template': FILENAME_TEMPLATE,
 },

  Update the airflow.task and airflow.task_runner blocks to be 's3.task' instead >of 'file.task'.
 'loggers': {
     'airflow.task': {
         'handlers': ['s3.task'],
         ...
     },
     'airflow.task_runner': {
         'handlers': ['s3.task'],
         ...
     },
     'airflow': {
         'handlers': ['console'],
         ...
     },
 }
  1. 确保已在 Airflow 中定义了 s3 连接钩子,如上述答案所示。该钩子应该具有对上面在 S3_LOG_FOLDER 中定义的 s3 存储桶的读写访问权限。

  2. 更新 $AIRFLOW_HOME/airflow.cfg 以包含:

 task_log_reader = s3.task
 logging_config_class = log_config.LOGGING_CONFIG
 remote_log_conn_id = <name of the s3 platform hook>
  1. 重新启动 Airflow 网络服务器和调度程序,并触发(或等待)新任务执行。

  2. 验证您定义的存储桶中新执行的任务的日志是否显示。

  3. 验证 s3 存储查看器是否在 UI 中工作。拉出一个新执行的任务,并验证您是否看到类似以下内容:

 *** Reading remote log from gs://<bucket where logs should be persisted>/example_bash_operator/run_this_last/2017-10-03T00:00:00/16.log.
 [2017-10-03 21:57:50,056] {cli.py:377} INFO - Running on host chrisr-00532
 [2017-10-03 21:57:50,093] {base_task_runner.py:115} INFO - Running: ['bash', '-c', u'airflow run example_bash_operator run_this_last 2017-10-03T00:00:00 --job_id 47 --raw -sd DAGS_FOLDER/example_dags/example_bash_operator.py']
 [2017-10-03 21:57:51,264] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,263] {__init__.py:45} INFO - Using executor SequentialExecutor
 [2017-10-03 21:57:51,306] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,306] {models.py:186} INFO - Filling up the DagBag from /airflow/dags/example_dags/example_bash_operator.py

气流 2.4.2

按照上述步骤操作,但将其粘贴到log_config.py

# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
from airflow import configuration as conf
from copy import deepcopy


S3_LOG_FOLDER = 's3://your/s3/log/folder'

LOG_LEVEL = conf.get('logging', 'LOGGING_LEVEL').upper()
LOG_FORMAT = conf.get('logging', 'log_format')

BASE_LOG_FOLDER = conf.get('logging', 'BASE_LOG_FOLDER')
PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'child_process_log_directory')

FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'
PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'


LOGGING_CONFIG = deepcopy(DEFAULT_LOGGING_CONFIG)


# Attach formatters to loggers (airflow.task, airflow.processor)
LOGGING_CONFIG['formatters']['airflow.task'] = { 'format': LOG_FORMAT }
LOGGING_CONFIG['formatters']['airflow.processor'] = { 'format': LOG_FORMAT }

# Add an S3 task handler
LOGGING_CONFIG['handlers']['s3.task'] = {
    'class': 'airflow.providers.amazon.aws.log.s3_task_handler.S3TaskHandler',
    'formatter': 'airflow.task',
    'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
    's3_log_folder': S3_LOG_FOLDER,
    'filename_template': FILENAME_TEMPLATE
}

# Specify handler for airflow.task
LOGGING_CONFIG['loggers']['airflow.task']['handlers'] = ['task', 's3.task']

解决方案 2:

您需要通过 Airflow UI 设置 S3 连接。为此,您需要转到Airflow UI 上的“管理”->“连接”选项卡,然后为您的 S3 连接创建一个新行。

示例配置如下:

Conn Id: my_conn_S3

Conn Type: S3

Extra: {"aws_access_key_id":"your_aws_key_id", "aws_secret_access_key": "your_aws_secret_key"}

解决方案 3:

(截至 Airflow 1.10.2 更新)

如果您不使用管理 UI,这里有一个解决方案。

我的 Airflow 不在持久服务器上运行...(它每天都会在 Heroku 上的 Docker 容器中重新启动。)我知道我错过了很多很棒的功能,但在我的最小设置中,我从未接触过管理 UI 或 cfg 文件。相反,我必须在 bash 脚本中设置特定于 Airflow 的环境变量,这会覆盖 .cfg 文件。

Apache-Airflow[s3]

首先,您需要s3安装子包以将您的 Airflow 日志写入 S3。(boto3对于您的 DAG 中的 Python 作业来说效果很好,但S3Hook依赖于 s3 子包。)

另外注意一点:conda install还不能处理这个问题,所以我必须这么做pip install apache-airflow[s3]

环境变量

在 bash 脚本中,我设置了这些变量。从这些说明core开始,但使用环境变量的命名约定,我这样做:AIRFLOW__{SECTION}__{KEY}

export AIRFLOW__CORE__REMOTE_LOGGING=True
export AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://bucket/key
export AIRFLOW__CORE__REMOTE_LOG_CONN_ID=s3_uri
export AIRFLOW__CORE__ENCRYPT_S3_LOGS=False

S3 连接 ID

以上s3_uri是我编写的连接 ID。在 Airflow 中,它对应于另一个环境变量。AIRFLOW_CONN_S3_URI该变量的值是您的 S3 路径,必须是 URI 形式。即

s3://access_key:secret_key@bucket/key

无论您如何处理其他敏感环境变量,请存储此变量。

通过此配置,Airflow 将能够将您的日志写入 S3。它们将遵循的路径s3://bucket/key/dag/task_id/timestamp/1.log


关于从 Airflow 1.8 升级到 Airflow 1.10 的附录

我最近将生产管道从 Airflow 1.8 升级到 1.9,然后又升级到 1.10。好消息是变化非常小;其余工作只是弄清楚软件包安装的细微差别(与有关 S3 日志的原始问题无关)。

(1)首先,我需要将 Airflow 1.9 升级到 Python 3.6。

airflow(2) 从 1.9开始,软件包名称从 更改为。您还可能会在 中apache-airflow遇到此问题pip install

(3)软件包psutil必须属于 Airflow 的特定版本范围。您在执行时可能会遇到这种情况pip install apache-airflow

(4)Airflow 1.9+ 需要 python3-dev 标头。

(5)实质性修改如下:export AIRFLOW__CORE__REMOTE_LOGGING=True现在是必需的。并且

(6) 日志在 S3 中的路径略有不同,我在答案中对其进行了更新:s3://bucket/key/dag/task_id/timestamp/1.log

但就是这样!日志在 1.9 中不起作用,所以我建议直接升级到 1.10,因为它已经可用了。

解决方案 4:

要使用最新的 Airflow 更新来完成 Arne 的回答,您不需要将其设置task_log_reader为默认值以外的其他值:task

就像您遵循默认日志模板airflow/config_templates/airflow_local_settings.py一样,您可以看到,由于此提交(请注意处理程序的名称更改为's3': {'task'...而不是s3.task),因此远程文件夹上的值(REMOTE_BASE_LOG_FOLDER)将用正确的处理程序替换该处理程序:

REMOTE_LOGGING = conf.get('core', 'remote_logging')

if REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('s3://'):
        DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['s3'])
elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('gs://'):
        DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['gcs'])
elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('wasb'):
        DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['wasb'])
elif REMOTE_LOGGING and ELASTICSEARCH_HOST:
        DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['elasticsearch'])

有关如何登录 S3 或从 S3 读取数据的更多详细信息:https://github.com/apache/incubator-airflow/blob/master/docs/howto/write-logs.rst#writing-logs-to-amazon-s3

解决方案 5:

呼!继续消灭气流漏洞的动机是将其作为一堆 python 文件来应对 XD 这是我在这方面的经验apache-airflow==1.9.0

首先,根本没必要尝试
airflow connections ..........--conn_extra等等。

只需将您的设置airflow.cfg为:

remote_logging = True
remote_base_log_folder = s3://dev-s3-main-ew2-dmg-immutable-potns/logs/airflow-logs/
encrypt_s3_logs = False

# Logging level
logging_level = INFO
fab_logging_level = WARN

# Logging class
# Specify the class that will specify the logging configuration
# This class has to be on the python classpath
# logging_config_class = my.path.default_local_settings.LOGGING_CONFIG
logging_config_class = log_config.LOGGING_CONFIG
remote_log_conn_id = s3://<ACCESS-KEY>:<SECRET-ID>@<MY-S3-BUCKET>/<MY>/<SUB>/<FOLDER>/

保留上述文件$AIRFLOW_HOME/config/__ init __.py$AIRFLOW_HOME/config/log_config.py

我的问题是缺少“boto3”包,我可以通过以下方式解决:

vi /usr/lib/python3.6/site-packages/airflow/utils/log/s3_task_handler.py
then >> import traceback

并且包含以下内容的行:

无法创建连接 ID 为“%s”的 S3Hook。' '请确保已安装 airflow[s3] 并且 ' 'S3 连接存在。

做得好traceback.print_exc(),它开始抱怨想念 boto3!

安装它后生活又变得美好了!

解决方案 6:

对于遵循上述答案中非常有用的说明的任何人来说,这只是一个旁注:如果您偶然发现这个问题:“ModuleNotFoundError:没有名为‘airflow.utils.log.logging_mixin.RedirectStdHandler’的模块” ,如此处所述(使用 airflow 1.9 时发生),修复很简单 - 使用这个基本模板: https://github.com/apache/incubator-airflow/blob/v1-9-stable/airflow/config_templates/airflow_local_settings.py (并遵循上述答案中的所有其他说明)

主分支中当前的模板incubator-airflow/airflow/config_templates/airflow_local_settings.py包含对类“airflow.utils.log.s3_task_handler.S3TaskHandler”的引用,而该类在 apache-airflow==1.9.0 python 包中不存在。希望这对您有所帮助!

解决方案 7:

让它与 kube 中的 Airflow 1.10 一起工作。我有以下环境变量集:

AIRFLOW_CONN_LOGS_S3=s3://id:secret_uri_encoded@S3
AIRFLOW__CORE__REMOTE_LOGGING=True
AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://xxxx/logs
AIRFLOW__CORE__REMOTE_LOG_CONN_ID=logs_s3

解决方案 8:

对于 airflow 2.3.4,使用 Docker,我也遇到了记录到 s3 的问题。

最初我遇到了一些权限错误(尽管我的 IAM 角色设置得很好),然后在稍微改变配置后,我能够在正确的位置写入文件,但无法读取(回退到本地日志)。

无论如何,经过多次努力、调试、反复试验后,以下是对我有用的方法:

为 s3 定义一个连接(假设您的地区也是eu-west-1):

通过 UI,在这种情况下您需要设置:

  • 连接 ID:(my-conn或您喜欢的任何名称),

  • 连接类型:(Amazon Web Services这是我做的一个更改,s3对我来说不起作用),

  • 额外的:{"region_name": "eu-west-1", "endpoint_url": "https://s3.eu-west-1.amazonaws.com"}

或者通过 CLI:

airflow connections add my-conn --conn-type aws --conn-extra '{"region_name": "eu-west-1", "endpoint_url": "https://s3.eu-west-1.amazonaws.com"}'

至于气流配置,我已经在所有进程中设置了它们:

...
export AIRFLOW__LOGGING__REMOTE_LOGGING=True
export AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER=s3://my-bucket/path/to/log/folder
export AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID=my-conn
...

部署后,我仍然收到类似的错误Falling back to local log...,但最终文件被加载并显示(几次刷新后)。

但它现在似乎运行正常 :)

相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   2482  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1533  
  PLM(产品生命周期管理)项目对于企业优化产品研发流程、提升产品质量以及增强市场竞争力具有至关重要的意义。然而,在项目推进过程中,范围蔓延是一个常见且棘手的问题,它可能导致项目进度延迟、成本超支以及质量下降等一系列不良后果。因此,有效避免PLM项目范围蔓延成为项目成功的关键因素之一。以下将详细阐述三大管控策略,助力企业...
plm系统   0  
  PLM(产品生命周期管理)项目管理在企业产品研发与管理过程中扮演着至关重要的角色。随着市场竞争的加剧和产品复杂度的提升,PLM项目面临着诸多风险。准确量化风险优先级并采取有效措施应对,是确保项目成功的关键。五维评估矩阵作为一种有效的风险评估工具,能帮助项目管理者全面、系统地评估风险,为决策提供有力支持。五维评估矩阵概述...
免费plm软件   0  
  引言PLM(产品生命周期管理)开发流程对于企业产品的全生命周期管控至关重要。它涵盖了从产品概念设计到退役的各个阶段,直接影响着产品质量、开发周期以及企业的市场竞争力。在当今快速发展的科技环境下,客户对产品质量的要求日益提高,市场竞争也愈发激烈,这就使得优化PLM开发流程成为企业的必然选择。缺陷管理工具和六西格玛方法作为...
plm产品全生命周期管理   0  
热门文章
项目管理软件有哪些?
曾咪二维码

扫码咨询,免费领取项目管理大礼包!

云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用