问题描述
因为需要对DF创建的临时表,进行多次查询,所以对这个临时表做了缓存,但是感觉不起作用?
数据量比较大,100亿左右,所以需要优化效率这个问题;
问题出现的环境背景及自己尝试过哪些方法
在此之前还尝试使用多线程的方式重复提交spark任务,也对临时表做了缓存,但是好像也不起作用
相关代码
// 请把代码文本粘贴到下方(请勿用图片代替代码)
这个是正常的方式提交spark任务,代码如下:
val data: DataFrame = sparkSession.read.parquet("XXX")
data.createOrReplaceTempView("table_info")
sparkSession.catalog.cacheTable("table_info")
// 这里只是为了替换sql,生成不同的sql
val featureArray: ArrayBuffer[String] = StrsUtils.generateFeatures("type,`from`,page,value,source")
for(i <- 0 to 3) {
val all_type = featureArray(i)
val sql_merge =
s"""
| SELECT
| appid, soft_version, id, event_type, type, `from`, page, value, source,
| count(distinct cuid) as uv,
| sum(pv) AS pv,
| sum(duration) AS duration
| FROM(
| SELECT cuid, appid, id, event_type, soft_version, duration, pv, event_day, $all_type
| FROM table_info
| )tmp
| GROUP BY appid, soft_version, id, event_type, type, `from`, page, value, source
|
""".stripMargin
logger.info("merge_sql: " + sql_merge)
sparkSession.sql(sql_merge).repartition(1).write.mode("overwrite").parquet(s"XXX/result/$event_day/" + i )
}
//清理cache
sparkSession.catalog.clearCache()
sparkSession.stop()
你期待的结果是什么?实际看到的错误信息又是什么?
这个是第一次循环的Stage;
正在执行的Stage的DAG图如下:
这个Stage的时间大概3~4分钟,正常第一次循环的时候应该是会进行缓存,第二次,第三次循环的时候会直接从缓存中查询,但是后面的DAG图和第一次的一样,而且时间也差不多,所以觉得可能是对临时表做cache没有起作用。