我正在尝试查询 MySql 数据库表的一个子集,将结果提供给 Pandas DataFrame,更改一些数据,然后将更新的行写回同一个表。我的表大小是 ~1MM 行,我要更改的行数将相对较小(<50,000),因此带回整个表并执行 df.to_sql(tablename,engine, if_exists='replace')
不是一个可行的选择。有没有一种直接的方法来更新已更改的行,而无需遍历 DataFrame 中的每一行?
我知道这个项目,它试图模拟一个“upsert”工作流程,但它似乎只完成了插入新的非重复行的任务,而不是更新现有行的部分内容:
这是我试图在更大范围内完成的工作的框架:
import pandas as pd
from sqlalchemy import create_engine
import threading
#Get sample data
d = {'A' : [1, 2, 3, 4], 'B' : [4, 3, 2, 1]}
df = pd.DataFrame(d)
engine = create_engine(SQLALCHEMY_DATABASE_URI)
#Create a table with a unique constraint on A.
engine.execute("""DROP TABLE IF EXISTS test_upsert """)
engine.execute("""CREATE TABLE test_upsert (
A INTEGER,
B INTEGER,
PRIMARY KEY (A))
""")
#Insert data using pandas.to_sql
df.to_sql('test_upsert', engine, if_exists='append', index=False)
#Alter row where 'A' == 2
df_in_db.loc[df_in_db['A'] == 2, 'B'] = 6
现在我想写 df_in_db
回到我的 'test_upsert'
反映更新数据的表。
这个 SO 问题非常相似,其中一条评论建议使用“sqlalchemy 表类”来执行任务。
如果这是最好的(唯一的?)实现方式,谁能扩展我将如何针对我上面的特定案例实现它?
原文由 D Clancy 发布,翻译遵循 CC BY-SA 4.0 许可协议
我认为最简单的方法是:
首先删除那些将被“更新”的行。这可以在一个循环中完成,但它对于更大的数据集(5K+ 行)不是很有效,所以我将 DF 的这个片段保存到一个临时的 MySQL 表中:
PS 我没有测试这段代码,所以它可能有一些小错误,但它应该给你一个想法……