从 SQL 查询创建 Spark 数据框

新手上路,请多包涵

我确定这是一个简单的 SQLContext 问题,但我在 Spark 文档或 Stackoverflow 中找不到任何答案

我想从 MySQL 上的 SQL 查询创建 Spark Dataframe

例如,我有一个复杂的 MySQL 查询,例如

SELECT a.X,b.Y,c.Z FROM FOO as a JOIN BAR as b ON ... JOIN ZOT as c ON ... WHERE ...

我想要一个包含 X、Y 和 Z 列的数据框

我想出了如何将整个表加载到 Spark 中,我可以将它们全部加载,然后在那里进行连接和选择。然而,这是非常低效的。我只想加载我的 SQL 查询生成的表。

这是我当前的代码近似值,它不起作用。 Mysql-connector 有一个选项“dbtable”,可用于加载整个表。我希望有某种方法可以指定查询

  val df = sqlContext.format("jdbc").
    option("url", "jdbc:mysql://localhost:3306/local_content").
    option("driver", "com.mysql.jdbc.Driver").
    option("useUnicode", "true").
    option("continueBatchOnError","true").
    option("useSSL", "false").
    option("user", "root").
    option("password", "").
    sql(
"""
select dl.DialogLineID, dlwim.Sequence, wi.WordRootID from Dialog as d
join DialogLine as dl on dl.DialogID=d.DialogID
join DialogLineWordInstanceMatch as dlwim o n dlwim.DialogLineID=dl.DialogLineID
join WordInstance as wi on wi.WordInstanceID=dlwim.WordInstanceID
join WordRoot as wr on wr.WordRootID=wi.WordRootID
where d.InSite=1 and dl.Active=1
limit 100
"""
    ).load()

原文由 opus111 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 539
2 个回答

我在这里找到了 通过 Spark SQL 进行批量数据迁移

dbname 参数可以是使用别名括在括号中的任何查询。所以就我而言,我需要这样做:

 val query = """
  (select dl.DialogLineID, dlwim.Sequence, wi.WordRootID from Dialog as d
    join DialogLine as dl on dl.DialogID=d.DialogID
    join DialogLineWordInstanceMatch as dlwim on dlwim.DialogLineID=dl.DialogLineID
    join WordInstance as wi on wi.WordInstanceID=dlwim.WordInstanceID
    join WordRoot as wr on wr.WordRootID=wi.WordRootID
    where d.InSite=1 and dl.Active=1
    limit 100) foo
"""

val df = sqlContext.format("jdbc").
  option("url", "jdbc:mysql://localhost:3306/local_content").
  option("driver", "com.mysql.jdbc.Driver").
  option("useUnicode", "true").
  option("continueBatchOnError","true").
  option("useSSL", "false").
  option("user", "root").
  option("password", "").
  option("dbtable",query).
  load()

正如预期的那样,将每个表作为自己的 Dataframe 加载并在 Spark 中加入它们的效率非常低。

原文由 opus111 发布,翻译遵循 CC BY-SA 4.0 许可协议

TL;DR: 只需在您的数据库中创建一个视图。

详细信息: 我的 postgres 数据库中有一个表 t_city,我在其上创建了一个视图:

 create view v_city_3500 as
    select asciiname, country, population, elevation
    from t_city
    where elevation>3500
    and population>100000

select * from v_city_3500;

 asciiname | country | population | elevation
-----------+---------+------------+-----------
 Potosi    | BO      |     141251 |      3967
 Oruro     | BO      |     208684 |      3936
 La Paz    | BO      |     812799 |      3782
 Lhasa     | CN      |     118721 |      3651
 Puno      | PE      |     116552 |      3825
 Juliaca   | PE      |     245675 |      3834

在火花壳中:

 val sx= new org.apache.spark.sql.SQLContext(sc)

var props=new java.util.Properties()
props.setProperty("driver", "org.postgresql.Driver" )
val url="jdbc:postgresql://buya/dmn?user=dmn&password=dmn"

val city_df=sx.read.jdbc(url=url,table="t_city",props)
val city_3500_df=sx.read.jdbc(url=url,table="v_city_3500",props)

结果:

 city_df.count()
Long = 145725

city_3500_df.count()
Long = 6

原文由 wmoco_6725 发布,翻译遵循 CC BY-SA 3.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题
logo
Stack Overflow 翻译
子站问答
访问
宣传栏