scala栈溢出的问题?

代码:

package DAO

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{col, concat_ws, expr}

object resultSaver extends java.io.Serializable {

  def saveResult(input: DataFrame, topicName: String, batchId: String): Unit = {

    // 提前将推荐结果转换为字符串
    val preparedData = input
      .select(
        col("userId"),
        expr("transform(recommendations, r -> concat('(', cast(r.itemId as string), ',', cast(r.rating as string), ')')) as recommendations")
      )
      .withColumn("recommendations", concat_ws(", ", col("recommendations")))
      .rdd
      .map(row => (row.getInt(0), row.getString(1)))

    preparedData.foreach { case (userId, recStr) =>
      // 保存推荐结果
      try {
        RedisDAO.hset(s"$topicName:$batchId", userId.toString, recStr)
      } catch {
        case e: Exception =>
          println(s"Failed to write Redis for user $userId", e.getMessage)
      }
      // 保存用户批次信息
      val userIdStr = userId.toString
      val oldBatchIds = RedisDAO.hget(userIdStr, topicName).getOrElse("")
      val newBatchIds = if (oldBatchIds.isEmpty) batchId else s"$oldBatchIds,$batchId"
      try {
        RedisDAO.hset(userIdStr, topicName, newBatchIds)
      } catch {
        case e: Exception =>
          println(s"Failed to write user batch for $userIdStr", e.getMessage)
      }
    }
  }
}

错误:

25/03/14 19:45:23 ERROR Executor: Exception in task 0.0 in stage 182.0 (TID 582)
java.lang.StackOverflowError
    at java.lang.reflect.InvocationTargetException.<init>(InvocationTargetException.java:72)
    at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
    at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
    at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
    at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
    at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2053)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1634)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
    at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
    at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
    at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
    at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
    at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
    at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
    at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
    at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
    at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2053)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1634)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)

一直会报栈溢出
我尝试增大栈的大小:

sparkConfig.set("spark.executor.extraJavaOptions", "-Xss128m -XX:+UseG1GC") //避免stackoverflow 增大内存
    sparkConfig.set("spark.driver.extraJavaOptions", "-Xss128m -XX:+UseG1GC") //避免stackoverflow 增大内存

但是时好时坏
好像也不是代码的问题
我想根治这个问题
谢谢大神!!!

阅读 397
1 个回答
package DAO

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{col, concat_ws, expr}

object resultSaver extends java.io.Serializable {

  def saveResult(input: DataFrame, topicName: String, batchId: String): Unit = {

    // 提前将推荐结果转换为字符串
    val preparedData = input
      .select(
        col("userId").cast("string"),
        expr("transform(recommendations, r -> concat('(', cast(r.itemId as string), ',', cast(r.rating as string), ')')) as recommendations")
      )
      .withColumn("recommendations", concat_ws(", ", col("recommendations")))
    
    // 使用foreachPartition而不是foreach和rdd.map,减少序列化压力
    preparedData.foreachPartition { partition =>
      partition.foreach { row =>
        val userId = row.getString(0)
        val recStr = row.getString(1)
        
        // 保存推荐结果
        try {
          RedisDAO.hset(s"$topicName:$batchId", userId, recStr)
        } catch {
          case e: Exception =>
            println(s"Failed to write Redis for user $userId", e.getMessage)
        }
        
        // 保存用户批次信息
        val oldBatchIds = RedisDAO.hget(userId, topicName).getOrElse("")
        val newBatchIds = if (oldBatchIds.isEmpty) batchId else s"$oldBatchIds,$batchId"
        try {
          RedisDAO.hset(userId, topicName, newBatchIds)
        } catch {
          case e: Exception =>
            println(s"Failed to write user batch for $userId", e.getMessage)
        }
      }
    }
  }
}

试一下代码呢,优化了一下

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题
宣传栏