sparksql如何遍历DataFrame,得到的id作为条件去查询另外的DataFrame?

亓国梁
  • 1
新手上路,请多包涵

我通过SparkSql jdbc连接oracle查询两张表,一张项目清单,一张项目考勤明细,它们通过项目id关联。因为考勤数据量很大,所以考勤表根据项目id做了分区,查询时项目id必须作为第一个条件,否则查询会很慢。
所以我需要遍历项目清单,拿到项目id,作为条件再去查询考勤,然后做一些统计处理。
然而我这样做报空指针异常...
搜了好久没有找到一个好的办法,各位大神帮忙看下
spark2.4.6
scala2.11
以下是我的代码:

    //加载项目列表
    val projectDf: DataFrame = spark.read
      .format("jdbc")
      .option("driver", oracleDriver)
      .option("url", oracleUrl)
      .option("user", oracleUser)
      .option("password", oraclePwd)
      .option("numPartitions", 10)
      .option("lowerBound", 1)
      .option("upperBound", 400000)
      .option("partitionColumn", "rid")
      .option("dbtable",
        """(
          |SELECT
          |rownum as rid,
          |PROJECTID,
          |INPUTDATE
          |FROM T_PROJECT_LIST
          |and STARTDATE IS NOT NULL
          |)""".stripMargin)
      .load()

//遍历项目列表,根据项目id查询考勤
projectDf.foreach{
      record=>{
        println(getKq(record.getString(1)).count())
      }
    }

//根据项目id查询考勤的方法,定义在main方法之外
  def getKq(projectid:String): DataFrame ={
    val kqDf: DataFrame = spark.read
      .format("jdbc")
      .option("driver", oracleDriver)
      .option("url", oracleUrl)
      .option("user", oracleUser)
      .option("password", oraclePwd)
      .option("dbtable",
        s"""(
           |SELECT
           |AR_DATETIME,
           |AR_MODE,
           |CARDNO,
           |PROJECTID
           |FROM t_kq
           |where projectid='$projectid'
           |)""".stripMargin)
      .load()
    kqDf
  }

以下是报错信息:


22/03/17 11:15:50 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 12)
java.lang.NullPointerException
    at org.apache.spark.sql.execution.aggregate.HashAggregateExec.<init>(HashAggregateExec.scala:87)
    at org.apache.spark.sql.execution.aggregate.AggUtils$.createAggregate(AggUtils.scala:41)
    at org.apache.spark.sql.execution.aggregate.AggUtils$.planAggregateWithoutDistinct(AggUtils.scala:92)
    at org.apache.spark.sql.execution.SparkStrategies$Aggregation$.apply(SparkStrategies.scala:419)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
    at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
    at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
    at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:73)
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:69)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:78)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:78)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3365)
    at org.apache.spark.sql.Dataset.count(Dataset.scala:2835)
    at com.shengbang.sparko2o.MonthKq$$anonfun$main$1.apply(MonthKq.scala:130)
    at com.shengbang.sparko2o.MonthKq$$anonfun$main$1.apply(MonthKq.scala:128)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:972)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:972)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
22/03/17 11:15:50 ERROR TaskSetManager: Task 0 in stage 5.0 failed 1 times; aborting job
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 5.0 failed 1 times, most recent failure: Lost task 0.0 in stage 5.0 (TID 12, localhost, executor driver): java.lang.NullPointerException
    at org.apache.spark.sql.execution.aggregate.HashAggregateExec.<init>(HashAggregateExec.scala:87)
    at org.apache.spark.sql.execution.aggregate.AggUtils$.createAggregate(AggUtils.scala:41)
    at org.apache.spark.sql.execution.aggregate.AggUtils$.planAggregateWithoutDistinct(AggUtils.scala:92)
    at org.apache.spark.sql.execution.SparkStrategies$Aggregation$.apply(SparkStrategies.scala:419)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
    at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
    at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
    at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:73)
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:69)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:78)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:78)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3365)
    at org.apache.spark.sql.Dataset.count(Dataset.scala:2835)
    at com.shengbang.sparko2o.MonthKq$$anonfun$main$1.apply(MonthKq.scala:130)
    at com.shengbang.sparko2o.MonthKq$$anonfun$main$1.apply(MonthKq.scala:128)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:972)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:972)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:972)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:970)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
    at org.apache.spark.rdd.RDD.foreach(RDD.scala:970)
    at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply$mcV$sp(Dataset.scala:2722)
    at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply(Dataset.scala:2722)
    at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply(Dataset.scala:2722)
    at org.apache.spark.sql.Dataset$$anonfun$withNewRDDExecutionId$1.apply(Dataset.scala:3355)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
    at org.apache.spark.sql.Dataset.withNewRDDExecutionId(Dataset.scala:3351)
    at org.apache.spark.sql.Dataset.foreach(Dataset.scala:2721)
    at com.shengbang.sparko2o.MonthKq$.main(MonthKq.scala:127)
    at com.shengbang.sparko2o.MonthKq.main(MonthKq.scala)
Caused by: java.lang.NullPointerException
    at org.apache.spark.sql.execution.aggregate.HashAggregateExec.<init>(HashAggregateExec.scala:87)
    at org.apache.spark.sql.execution.aggregate.AggUtils$.createAggregate(AggUtils.scala:41)
    at org.apache.spark.sql.execution.aggregate.AggUtils$.planAggregateWithoutDistinct(AggUtils.scala:92)
    at org.apache.spark.sql.execution.SparkStrategies$Aggregation$.apply(SparkStrategies.scala:419)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
    at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
    at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
    at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:73)
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:69)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:78)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:78)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3365)
    at org.apache.spark.sql.Dataset.count(Dataset.scala:2835)
    at com.shengbang.sparko2o.MonthKq$$anonfun$main$1.apply(MonthKq.scala:130)
    at com.shengbang.sparko2o.MonthKq$$anonfun$main$1.apply(MonthKq.scala:128)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:972)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:972)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Process finished with exit code 1
回复
阅读 459
1 个回答

第一点,and STARTDATE IS NOT NULL 貌似应该写成 where STARTDATE IS NOT NULL

第二点,为什么不用join呢?

select
  rownum as rid,
  T_PROJECT_LIST.PROJECTID,
  INPUTDATE,
  AR_DATETIME,
  AR_MODE,
  CARDNO,
FROM T_PROJECT_LIST and t_kq
where STARTDATE IS NOT NULL
    and t_kq.projectid = T_PROJECT_LIST.PROJECTID

已参与了 SegmentFault 思否社区 10 周年「问答」打卡 ,欢迎正在阅读的你也加入。

宣传栏