spark2.1 对临时表做cache,感觉不起作用

问题描述

因为需要对DF创建的临时表,进行多次查询,所以对这个临时表做了缓存,但是感觉不起作用?
数据量比较大,100亿左右,所以需要优化效率这个问题;

问题出现的环境背景及自己尝试过哪些方法

在此之前还尝试使用多线程的方式重复提交spark任务,也对临时表做了缓存,但是好像也不起作用

相关代码

// 请把代码文本粘贴到下方(请勿用图片代替代码)
这个是正常的方式提交spark任务,代码如下:

val data: DataFrame = sparkSession.read.parquet("XXX") 

data.createOrReplaceTempView("table_info")
sparkSession.catalog.cacheTable("table_info")

// 这里只是为了替换sql,生成不同的sql
val featureArray: ArrayBuffer[String] = StrsUtils.generateFeatures("type,`from`,page,value,source")

for(i <- 0 to 3) {

    val all_type = featureArray(i)

    val sql_merge =
        s"""
          | SELECT
          |      appid, soft_version, id, event_type, type, `from`, page, value, source,
          |      count(distinct cuid) as uv,
          |      sum(pv) AS pv,
          |      sum(duration) AS duration
          | FROM(
          |      SELECT cuid, appid, id, event_type, soft_version, duration, pv, event_day, $all_type
          |      FROM table_info
          | )tmp
          | GROUP BY appid, soft_version, id, event_type, type, `from`, page, value, source
          |
        """.stripMargin

    logger.info("merge_sql: " + sql_merge)
    sparkSession.sql(sql_merge).repartition(1).write.mode("overwrite").parquet(s"XXX/result/$event_day/" + i )
}

//清理cache
sparkSession.catalog.clearCache()
sparkSession.stop()

你期待的结果是什么?实际看到的错误信息又是什么?

clipboard.png
这个是第一次循环的Stage;
正在执行的Stage的DAG图如下:

clipboard.png
这个Stage的时间大概3~4分钟,正常第一次循环的时候应该是会进行缓存,第二次,第三次循环的时候会直接从缓存中查询,但是后面的DAG图和第一次的一样,而且时间也差不多,所以觉得可能是对临时表做cache没有起作用。

阅读 3.8k
撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进