如何使用 Pandas DataFrame 对数据库表的现有行执行更新?

新手上路,请多包涵

我正在尝试查询 MySql 数据库表的一个子集,将结果提供给 Pandas DataFrame,更改一些数据,然后将更新的行写回同一个表。我的表大小是 ~1MM 行,我要更改的行数将相对较小(<50,000),因此带回整个表并执行 df.to_sql(tablename,engine, if_exists='replace') 不是一个可行的选择。有没有一种直接的方法来更新已更改的行,而无需遍历 DataFrame 中的每一行?

我知道这个项目,它试图模拟一个“upsert”工作流程,但它似乎只完成了插入新的非重复行的任务,而不是更新现有行的部分内容:

GitHub Pandas-to_sql-更新插入

这是我试图在更大范围内完成的工作的框架:

 import pandas as pd
from sqlalchemy import create_engine
import threading

#Get sample data
d = {'A' : [1, 2, 3, 4], 'B' : [4, 3, 2, 1]}
df = pd.DataFrame(d)

engine = create_engine(SQLALCHEMY_DATABASE_URI)

#Create a table with a unique constraint on A.
engine.execute("""DROP TABLE IF EXISTS test_upsert """)
engine.execute("""CREATE TABLE test_upsert (
                  A INTEGER,
                  B INTEGER,
                  PRIMARY KEY (A))
                  """)

#Insert data using pandas.to_sql
df.to_sql('test_upsert', engine, if_exists='append', index=False)

#Alter row where 'A' == 2
df_in_db.loc[df_in_db['A'] == 2, 'B'] = 6

现在我想写 df_in_db 回到我的 'test_upsert' 反映更新数据的表。

这个 SO 问题非常相似,其中一条评论建议使用“sqlalchemy 表类”来执行任务。

使用 sqlalchemy 表类更新表

如果这是最好的(唯一的?)实现方式,谁能扩展我将如何针对我上面的特定案例实现它?

原文由 D Clancy 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 1.9k
2 个回答

我认为最简单的方法是:

首先删除那些将被“更新”的行。这可以在一个循环中完成,但它对于更大的数据集(5K+ 行)不是很有效,所以我将 DF 的这个片段保存到一个临时的 MySQL 表中:

 # assuming we have already changed values in the rows and saved those changed rows in a separate DF: `x`
x = df[mask]  # `mask` should help us to find changed rows...

# make sure `x` DF has a Primary Key column as index
x = x.set_index('a')

# dump a slice with changed rows to temporary MySQL table
x.to_sql('my_tmp', engine, if_exists='replace', index=True)

conn = engine.connect()
trans = conn.begin()

try:
    # delete those rows that we are going to "upsert"
    engine.execute('delete from test_upsert where a in (select a from my_tmp)')
    trans.commit()

    # insert changed rows
    x.to_sql('test_upsert', engine, if_exists='append', index=True)
except:
    trans.rollback()
    raise

PS 我没有测试这段代码,所以它可能有一些小错误,但它应该给你一个想法……

原文由 MaxU - stop russian terror 发布,翻译遵循 CC BY-SA 3.0 许可协议

使用 Panda 的 to_sql “method” arg 和 sqlalchemy 的 mysql insert on_duplicate_key_update 功能的 MySQL 特定解决方案:

 def create_method(meta):
    def method(table, conn, keys, data_iter):
        sql_table = db.Table(table.name, meta, autoload=True)
        insert_stmt = db.dialects.mysql.insert(sql_table).values([dict(zip(keys, data)) for data in data_iter])
        upsert_stmt = insert_stmt.on_duplicate_key_update({x.name: x for x in insert_stmt.inserted})
        conn.execute(upsert_stmt)

    return method

engine = db.create_engine(...)
conn = engine.connect()
with conn.begin():
    meta = db.MetaData(conn)
    method = create_method(meta)
    df.to_sql(table_name, conn, if_exists='append', method=method)

原文由 patrick 发布,翻译遵循 CC BY-SA 4.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题