如何使用 Boto3 get_query_results 方法从 AWS Athena 创建 Dataframe

新手上路,请多包涵

我正在使用 AWS Athena 从 S3 查询原始数据。由于 Athena 将查询输出写入 S3 输出桶,我曾经这样做过:

 df = pd.read_csv(OutputLocation)

但这似乎是一种昂贵的方式。最近我注意到 get_query_results 方法 boto3 返回结果的复杂字典。

 client = boto3.client('athena')
response = client.get_query_results(
        QueryExecutionId=res['QueryExecutionId']
        )

我面临两个主要问题:

  1. 如何将 get_query_results 的结果格式化为 pandas 数据框?
  2. get_query_results 只返回 1000 行。我怎样才能用它来获得两百万行?

原文由 Niv Cohen 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 831
2 个回答

get_query_results 只返回 1000 行。我如何使用它将 200 万行放入 Pandas 数据框中?

如果您尝试添加:

 client.get_query_results(QueryExecutionId=res['QueryExecutionId'], MaxResults=2000)

您将收到下一个错误:

调用 GetQueryResults 操作时发生错误 (InvalidRequestException):MaxResults 超过最大允许长度 1000。

如果您直接从存储桶 s3 获取文件(在下一个示例中为 Pandas Dataframe),您可以获得数百万行:

 def obtain_data_from_s3(self):
    self.resource = boto3.resource('s3',
                          region_name = self.region_name,
                          aws_access_key_id = self.aws_access_key_id,
                          aws_secret_access_key= self.aws_secret_access_key)

    response = self.resource \
    .Bucket(self.bucket) \
    .Object(key= self.folder + self.filename + '.csv') \
    .get()

    return pd.read_csv(io.BytesIO(response['Body'].read()), encoding='utf8')

self.filename 可以是:

 self.filename = response['QueryExecutionId'] + ".csv"

因为 Athena 将文件命名为 QueryExecutionId。我会写给你我所有的代码,这些代码接受查询并返回一个包含所有行和列的数据框。

 import time
import boto3
import pandas as pd
import io

class QueryAthena:

    def __init__(self, query, database):
        self.database = database
        self.folder = 'my_folder/'
        self.bucket = 'my_bucket'
        self.s3_input = 's3://' + self.bucket + '/my_folder_input'
        self.s3_output =  's3://' + self.bucket + '/' + self.folder
        self.region_name = 'us-east-1'
        self.aws_access_key_id = "my_aws_access_key_id"
        self.aws_secret_access_key = "my_aws_secret_access_key"
        self.query = query

    def load_conf(self, q):
        try:
            self.client = boto3.client('athena',
                              region_name = self.region_name,
                              aws_access_key_id = self.aws_access_key_id,
                              aws_secret_access_key= self.aws_secret_access_key)
            response = self.client.start_query_execution(
                QueryString = q,
                    QueryExecutionContext={
                    'Database': self.database
                    },
                    ResultConfiguration={
                    'OutputLocation': self.s3_output,
                    }
            )
            self.filename = response['QueryExecutionId']
            print('Execution ID: ' + response['QueryExecutionId'])

        except Exception as e:
            print(e)
        return response

    def run_query(self):
        queries = [self.query]
        for q in queries:
            res = self.load_conf(q)
        try:
            query_status = None
            while query_status == 'QUEUED' or query_status == 'RUNNING' or query_status is None:
                query_status = self.client.get_query_execution(QueryExecutionId=res["QueryExecutionId"])['QueryExecution']['Status']['State']
                print(query_status)
                if query_status == 'FAILED' or query_status == 'CANCELLED':
                    raise Exception('Athena query with the string "{}" failed or was cancelled'.format(self.query))
                time.sleep(10)
            print('Query "{}" finished.'.format(self.query))

            df = self.obtain_data()
            return df

        except Exception as e:
            print(e)

    def obtain_data(self):
        try:
            self.resource = boto3.resource('s3',
                                  region_name = self.region_name,
                                  aws_access_key_id = self.aws_access_key_id,
                                  aws_secret_access_key= self.aws_secret_access_key)

            response = self.resource \
            .Bucket(self.bucket) \
            .Object(key= self.folder + self.filename + '.csv') \
            .get()

            return pd.read_csv(io.BytesIO(response['Body'].read()), encoding='utf8')
        except Exception as e:
            print(e)

if __name__ == "__main__":
    query = "SELECT * FROM bucket.folder"
    qa = QueryAthena(query=query, database='myAthenaDb')
    dataframe = qa.run_query()

原文由 Eric Bellet 发布,翻译遵循 CC BY-SA 4.0 许可协议

您可以使用 AWS SDK for Pandas 创建直接通过 Athena 查询的 pandas 数据框。

 import awswrangler as wr
df = wr.athena.read_sql_query(sql="SELECT * FROM <table_name_in_Athena>", database="<database_name>")

您可以 在此处 找到更多信息

原文由 Cem 发布,翻译遵循 CC BY-SA 4.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题