我一直在尝试使用 Airflow 来安排 DAG。 DAG 之一包括一个从 s3 存储桶加载数据的任务。
出于上述目的,我需要设置 s3 连接。但是气流提供的用户界面并不那么直观( http://pythonhosted.org/airflow/configuration.html?highlight=connection#connections )。任何人都成功设置了 s3 连接,如果有的话,你们有什么最佳实践吗?
谢谢。
原文由 Nikhil Reddy 发布,翻译遵循 CC BY-SA 4.0 许可协议
编辑:此答案以 明文 形式存储您的密钥,这可能存在 安全风险,不建议这样做。最好的方法是将访问密钥和密钥放在登录/密码字段中,如下面其他答案所述。结束编辑
很难找到参考资料,但经过一番挖掘后,我能够让它发挥作用。
TLDR
创建具有以下属性的新连接:
连接 ID: my_conn_S3
连接类型: S3
额外的:
长版本,设置 UI 连接:
my_conn_S3
S3
{"aws_access_key_id":"_your_aws_access_key_id_", "aws_secret_access_key": "_your_aws_secret_access_key_"}
要使用此连接,您可以在下面找到一个简单的 S3 传感器测试。这个测试的想法是设置一个传感器来监视 S3 中的文件(T1 任务),一旦满足以下条件,它就会触发 bash 命令(T2 任务)。
测试
airflow webserver
。airflow scheduler
。dag 定义中的 schedule_interval 设置为 ‘@once’,以方便调试。
要再次运行它,请保留所有内容,删除存储桶中的文件,然后通过选择第一个任务(在图表视图中)并选择“清除”所有“过去”、“未来”、“上游”、“下游”来重试…. 活动。这应该会再次启动 DAG。
让我知道进展如何。
s3_dag_test.py ;
主要参考资料: