为气流中的日志设置 s3

新手上路,请多包涵

我正在使用 docker-compose 来设置可扩展的气流集群。我的方法基于这个 Dockerfile https://hub.docker.com/r/puckel/docker-airflow/

我的问题是将日志设置为从 s3 写入/读取。当一个 dag 完成时,我得到这样的错误

*** Log file isn't local.
*** Fetching here: http://ea43d4d49f35:8793/log/xxxxxxx/2017-06-26T11:00:00
*** Failed to fetch log file from worker.

*** Reading remote logs...
Could not read logs from s3://buckets/xxxxxxx/airflow/logs/xxxxxxx/2017-06-
26T11:00:00

我在 airflow.cfg 文件中设置了一个新部分,如下所示

[MyS3Conn]
aws_access_key_id = xxxxxxx
aws_secret_access_key = xxxxxxx
aws_default_region = xxxxxxx

然后在 airflow.cfg

 remote_base_log_folder = s3://buckets/xxxx/airflow/logs
remote_log_conn_id = MyS3Conn

我是否正确设置了它并且存在错误?这里有我缺少的成功秘诀吗?

- 更新

我尝试以 URI 和 JSON 格式导出,但似乎都不起作用。然后我导出了 aws_access_key_id 和 aws_secret_access_key 然后气流开始拾取它。现在我在工作日志中得到了他的错误

6/30/2017 6:05:59 PMINFO:root:Using connection to: s3
6/30/2017 6:06:00 PMERROR:root:Could not read logs from s3://buckets/xxxxxx/airflow/logs/xxxxx/2017-06-30T23:45:00
6/30/2017 6:06:00 PMERROR:root:Could not write logs to s3://buckets/xxxxxx/airflow/logs/xxxxx/2017-06-30T23:45:00
6/30/2017 6:06:00 PMLogging into: /usr/local/airflow/logs/xxxxx/2017-06-30T23:45:00

- 更新

我也找到了这个链接 https://www.mail-archive.com/dev@airflow.incubator.apache.org/msg00462.html

然后我进入我的一台工作机器(与网络服务器和调度程序分开)并在 python 中运行这段代码

import airflow
s3 = airflow.hooks.S3Hook('s3_conn')
s3.load_string('test', airflow.conf.get('core', 'remote_base_log_folder'))

我收到此错误。

 boto.exception.S3ResponseError: S3ResponseError: 403 Forbidden

我尝试导出几种不同类型的 AIRFLOW_CONN_ envs,如连接部分 https://airflow.incubator.apache.org/concepts.html 和此问题的其他答案中所述。

 s3://<AWS_ACCESS_KEY_ID>:<AWS_SECRET_ACCESS_KEY>@S3

{"aws_account_id":"<xxxxx>","role_arn":"arn:aws:iam::<xxxx>:role/<xxxxx>"}

{"aws_access_key_id":"<xxxxx>","aws_secret_access_key":"<xxxxx>"}

我还导出了 AWS_ACCESS_KEY_ID 和 AWS_SECRET_ACCESS_KEY 但没有成功。

这些凭据存储在数据库中,因此一旦我将它们添加到 UI 中,工作人员应该可以获取它们,但由于某种原因他们无法写入/读取日志。

原文由 JackStat 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 859
2 个回答

您需要通过 Airflow UI 设置 S3 连接。为此,您需要转到 airflow UI 上的 Admin -> Connections 选项卡,并为您的 S3 连接创建一个新行。

一个示例配置是:

 Conn Id: my_conn_S3

Conn Type: S3

Extra: {"aws_access_key_id":"your_aws_key_id", "aws_secret_access_key": "your_aws_secret_key"}

原文由 Him 发布,翻译遵循 CC BY-SA 4.0 许可协议

更新 Airflow 1.10 使日志记录 变得更加容易。

对于 s3 日志记录, 根据上述答案 设置连接挂钩

然后只需将以下内容添加到 airflow.cfg

     [core]
    # Airflow can store logs remotely in AWS S3. Users must supply a remote
    # location URL (starting with either 's3://...') and an Airflow connection
    # id that provides access to the storage location.
    remote_base_log_folder = s3://my-bucket/path/to/logs
    remote_log_conn_id = MyS3Conn
    # Use server-side encryption for logs stored in S3
    encrypt_s3_logs = False

对于 gcs 日志记录,

  1. 首先安装 gcp_api 包,如下所示:pip install apache-airflow[gcp_api]。

  2. 根据上述答案 设置连接挂钩

  3. 将以下内容添加到 airflow.cfg

     [core]
    # Airflow can store logs remotely in AWS S3. Users must supply a remote
    # location URL (starting with either 's3://...') and an Airflow connection
    # id that provides access to the storage location.
    remote_logging = True
    remote_base_log_folder = gs://my-bucket/path/to/logs
    remote_log_conn_id = MyGCSConn


注意:从 Airflow 1.9 开始,远程日志记录已发生 重大变化。如果您使用的是 1.9,请继续阅读。

参考 这里

完整说明:

  1. 创建一个目录来存储配置并将其放置在 PYTHONPATH 中。一个例子是 $AIRFLOW_HOME/config

  2. 创建名为 \(AIRFLOW_HOME/config/log_config.py 和 \)AIRFLOW_HOME/config/init.py 的空文件

  3. airflow/config_templates/airflow_local_settings.py 的内容复制到刚刚在上述步骤中创建的 log_config.py 文件中。

  4. 自定义模板的以下部分:

     #Add this variable to the top of the file. Note the trailing slash.
    S3_LOG_FOLDER = 's3://<bucket where logs should be persisted>/'

    Rename DEFAULT_LOGGING_CONFIG to LOGGING CONFIG
    LOGGING_CONFIG = ...

    Add a S3TaskHandler to the 'handlers' block of the LOGGING_CONFIG variable
    's3.task': {
        'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
        'formatter': 'airflow.task',
        'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
        's3_log_folder': S3_LOG_FOLDER,
        'filename_template': FILENAME_TEMPLATE,
    },

     Update the airflow.task and airflow.task_runner blocks to be 's3.task' instead >of 'file.task'.
    'loggers': {
        'airflow.task': {
            'handlers': ['s3.task'],
            ...
        },
        'airflow.task_runner': {
            'handlers': ['s3.task'],
            ...
        },
        'airflow': {
            'handlers': ['console'],
            ...
        },
    }

  1. 确保已按照 上述答案 在 Airflow 中定义了 s3 连接挂钩。该挂钩应该具有对上面在 S3_LOG_FOLDER 中定义的 s3 存储桶的读写访问权限。

  2. 更新 $AIRFLOW_HOME/airflow.cfg 以包含:

     task_log_reader = s3.task
    logging_config_class = log_config.LOGGING_CONFIG
    remote_log_conn_id = <name of the s3 platform hook>

  1. 重新启动 Airflow 网络服务器和调度程序,并触发(或等待)新任务执行。

  2. 验证日志是否显示您定义的存储桶中新执行的任务。

  3. 验证 s3 存储查看器是否在 UI 中运行。调出一个新执行的任务,并验证您是否看到类似以下内容:

     *** Reading remote log from gs://<bucket where logs should be persisted>/example_bash_operator/run_this_last/2017-10-03T00:00:00/16.log.
    [2017-10-03 21:57:50,056] {cli.py:377} INFO - Running on host chrisr-00532
    [2017-10-03 21:57:50,093] {base_task_runner.py:115} INFO - Running: ['bash', '-c', u'airflow run example_bash_operator run_this_last 2017-10-03T00:00:00 --job_id 47 --raw -sd DAGS_FOLDER/example_dags/example_bash_operator.py']
    [2017-10-03 21:57:51,264] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,263] {__init__.py:45} INFO - Using executor SequentialExecutor
    [2017-10-03 21:57:51,306] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,306] {models.py:186} INFO - Filling up the DagBag from /airflow/dags/example_dags/example_bash_operator.py

气流 2.4.2

按照上述步骤操作,但将其粘贴到 log_config.py

 # -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
from airflow import configuration as conf
from copy import deepcopy

S3_LOG_FOLDER = 's3://your/s3/log/folder'

LOG_LEVEL = conf.get('logging', 'LOGGING_LEVEL').upper()
LOG_FORMAT = conf.get('logging', 'log_format')

BASE_LOG_FOLDER = conf.get('logging', 'BASE_LOG_FOLDER')
PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'child_process_log_directory')

FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'
PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'

LOGGING_CONFIG = deepcopy(DEFAULT_LOGGING_CONFIG)

# Attach formatters to loggers (airflow.task, airflow.processor)
LOGGING_CONFIG['formatters']['airflow.task'] = { 'format': LOG_FORMAT }
LOGGING_CONFIG['formatters']['airflow.processor'] = { 'format': LOG_FORMAT }

# Add an S3 task handler
LOGGING_CONFIG['handlers']['s3.task'] = {
    'class': 'airflow.providers.amazon.aws.log.s3_task_handler.S3TaskHandler',
    'formatter': 'airflow.task',
    'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
    's3_log_folder': S3_LOG_FOLDER,
    'filename_template': FILENAME_TEMPLATE
}

# Specify handler for airflow.task
LOGGING_CONFIG['loggers']['airflow.task']['handlers'] = ['task', 's3.task']

原文由 Arne Huang 发布,翻译遵循 CC BY-SA 4.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题