scala栈溢出的问题?

代码:

package DAO

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{col, concat_ws, expr}

object resultSaver extends java.io.Serializable {

  def saveResult(input: DataFrame, topicName: String, batchId: String): Unit = {

    // 提前将推荐结果转换为字符串
    val preparedData = input
      .select(
        col("userId"),
        expr("transform(recommendations, r -> concat('(', cast(r.itemId as string), ',', cast(r.rating as string), ')')) as recommendations")
      )
      .withColumn("recommendations", concat_ws(", ", col("recommendations")))
      .rdd
      .map(row => (row.getInt(0), row.getString(1)))

    preparedData.foreach { case (userId, recStr) =>
      // 保存推荐结果
      try {
        RedisDAO.hset(s"$topicName:$batchId", userId.toString, recStr)
      } catch {
        case e: Exception =>
          println(s"Failed to write Redis for user $userId", e.getMessage)
      }
      // 保存用户批次信息
      val userIdStr = userId.toString
      val oldBatchIds = RedisDAO.hget(userIdStr, topicName).getOrElse("")
      val newBatchIds = if (oldBatchIds.isEmpty) batchId else s"$oldBatchIds,$batchId"
      try {
        RedisDAO.hset(userIdStr, topicName, newBatchIds)
      } catch {
        case e: Exception =>
          println(s"Failed to write user batch for $userIdStr", e.getMessage)
      }
    }
  }
}

错误:

25/03/14 19:45:23 ERROR Executor: Exception in task 0.0 in stage 182.0 (TID 582)
java.lang.StackOverflowError
    at java.lang.reflect.InvocationTargetException.<init>(InvocationTargetException.java:72)
    at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
    at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
    at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
    at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
    at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2053)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1634)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
    at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
    at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
    at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
    at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
    at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
    at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
    at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
    at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
    at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2053)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1634)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)

一直会报栈溢出
我尝试增大栈的大小:

sparkConfig.set("spark.executor.extraJavaOptions", "-Xss128m -XX:+UseG1GC") //避免stackoverflow 增大内存
    sparkConfig.set("spark.driver.extraJavaOptions", "-Xss128m -XX:+UseG1GC") //避免stackoverflow 增大内存

但是时好时坏
好像也不是代码的问题
我想根治这个问题
谢谢大神!!!

补充:
redisDAO

package DAO

import com.typesafe.config.{Config, ConfigFactory}
import redis.clients.jedis._
import scala.collection.JavaConverters._

object RedisDAO  {
  // 所有字段必须标记为 @transient 或延迟初始化
  @transient private var jedisPool: JedisPool = _
  @transient private lazy val config: Config = ConfigFactory.load().getConfig("redis")

  // 核心修复点:延迟加载配置并初始化连接池
  private def initPool(): Unit = {
    if (jedisPool == null || jedisPool.isClosed) {
      // 从本地重新加载配置,而非依赖序列化的 config 对象
      val host = config.getString("host")
      val port = config.getInt("port")
      val password = config.getString("password")
      val timeout = config.getInt("timeout")
      val maxTotal = config.getInt("pool.maxTotal")
      val maxIdle = config.getInt("pool.maxIdle")
      val minIdle = config.getInt("pool.minIdle")

      val poolConfig = new JedisPoolConfig()
      poolConfig.setMaxTotal(maxTotal)
      poolConfig.setMaxIdle(maxIdle)
      poolConfig.setMinIdle(minIdle)

      jedisPool = new JedisPool(
        poolConfig,
        host,
        port,
        timeout,
        if (password.isEmpty) null else password
      )
    }
  }

  def getJedis():Jedis = {
    val jedis = jedisPool.getResource
    jedis
  }

  private def withClient[T](block: Jedis => T): T = {
    initPool()  // 确保连接池已初始化
    val jedis = jedisPool.getResource
    try {
      block(jedis)
    } finally {
      if (jedis != null) jedis.close()
    }
  }

  // Key 操作
  def exists(key: String): Boolean = withClient(_.exists(key))

  def expire(key: String, seconds: Int): Long = withClient(_.expire(key, seconds))

  def ttl(key: String): Long = withClient(_.ttl(key))

  def del(keys: String*): Long = withClient(_.del(keys: _*))

  def keys(pattern: String): Set[String] = withClient(_.keys(pattern).asScala.toSet)

  // String 操作
  def get(key: String): Option[String] = withClient { jedis =>
    Option(jedis.get(key))
  }

  def set(key: String, value: String): String = withClient(_.set(key, value))

  def setex(key: String, seconds: Int, value: String): String =
    withClient(_.setex(key, seconds, value))

  def incr(key: String): Long = withClient(_.incr(key))

  def decr(key: String): Long = withClient(_.decr(key))

  // Hash 操作
  def hget(key: String, field: String): Option[String] = withClient { jedis =>
    Option(jedis.hget(key, field))
  }

  def hset(key: String, field: String, value: String): Long =
    withClient(_.hset(key, field, value))

  def hgetAll(key: String): Map[String, String] = withClient { jedis =>
    jedis.hgetAll(key).asScala.toMap
  }

  def hdel(key: String, fields: String*): Long =
    withClient(_.hdel(key, fields: _*))

  // List 操作
  def lpush(key: String, values: String*): Long =
    withClient(_.lpush(key, values: _*))

  def rpush(key: String, values: String*): Long =
    withClient(_.rpush(key, values: _*))

  def lrange(key: String, start: Long, end: Long): List[String] =
    withClient(_.lrange(key, start, end).asScala.toList)

  // Set 操作
  def sadd(key: String, members: String*): Long =
    withClient(_.sadd(key, members: _*))

  def smembers(key: String): Set[String] =
    withClient(_.smembers(key).asScala.toSet)

  def sismember(key: String, member: String): Boolean =
    withClient(_.sismember(key, member))

  // Sorted Set 操作
  def zadd(key: String, score: Double, member: String): Long =
    withClient(_.zadd(key, score, member))

  def zrange(key: String, start: Long, end: Long): List[String] =
    withClient(_.zrange(key, start, end).asScala.toList)

  // HyperLogLog 操作
  def pfadd(key: String, elements: String*): Long =
    withClient(_.pfadd(key, elements: _*))

  def pfcount(key: String): Long =
    withClient(_.pfcount(key))

  // 发布订阅
  def publish(channel: String, message: String): Long =
    withClient(_.publish(channel, message))

  // 事务支持
  def transaction(block: Transaction => Unit): List[Object] = withClient { jedis =>
    val tx = jedis.multi()
    block(tx)
    tx.exec().asScala.toList
  }

  // 管道操作
  def pipeline(block: Pipeline => Unit): List[Any] = withClient { jedis =>
    val pipe = jedis.pipelined()
    block(pipe)
    pipe.sync()
    pipe.syncAndReturnAll().asScala.toList
  }

  //  获取管道实例
  def getPipiline():Pipeline = withClient { jedis =>
    val pipe = jedis.pipelined()
    pipe
  }

  // 服务器相关
  def flushDB(): String = withClient(_.flushDB())

  // 安全关闭
  def close(): Unit = jedisPool.close()

  // 批量操作
  def mget(keys: String*): Map[String, String] = withClient { jedis =>
    val values = jedis.mget(keys: _*).asScala
    keys.zip(values).collect {
      case (k, v) if v != null => (k, v)
    }.toMap
  }
}
阅读 759
1 个回答
package DAO

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{col, concat_ws, expr}

object resultSaver extends java.io.Serializable {

  def saveResult(input: DataFrame, topicName: String, batchId: String): Unit = {

    // 提前将推荐结果转换为字符串
    val preparedData = input
      .select(
        col("userId").cast("string"),
        expr("transform(recommendations, r -> concat('(', cast(r.itemId as string), ',', cast(r.rating as string), ')')) as recommendations")
      )
      .withColumn("recommendations", concat_ws(", ", col("recommendations")))
    
    // 使用foreachPartition而不是foreach和rdd.map,减少序列化压力
    preparedData.foreachPartition { partition =>
      partition.foreach { row =>
        val userId = row.getString(0)
        val recStr = row.getString(1)
        
        // 保存推荐结果
        try {
          RedisDAO.hset(s"$topicName:$batchId", userId, recStr)
        } catch {
          case e: Exception =>
            println(s"Failed to write Redis for user $userId", e.getMessage)
        }
        
        // 保存用户批次信息
        val oldBatchIds = RedisDAO.hget(userId, topicName).getOrElse("")
        val newBatchIds = if (oldBatchIds.isEmpty) batchId else s"$oldBatchIds,$batchId"
        try {
          RedisDAO.hset(userId, topicName, newBatchIds)
        } catch {
          case e: Exception =>
            println(s"Failed to write user batch for $userId", e.getMessage)
        }
      }
    }
  }
}

试一下代码呢,优化了一下

package DAO

import com.typesafe.config.{Config, ConfigFactory}
import redis.clients.jedis._

object RedisDAO extends Serializable {
  @transient private var jedisPool: JedisPool = _
  
  private def getPool(): JedisPool = {
    if (jedisPool == null || jedisPool.isClosed) {
      // 直接读取配置,不依赖序列化的Config对象
      val config = ConfigFactory.load().getConfig("redis")
      val host = config.getString("host")
      val port = config.getInt("port")
      val password = config.getString("password")
      val timeout = config.getInt("timeout")
      
      val poolConfig = new JedisPoolConfig()
      poolConfig.setMaxTotal(config.getInt("pool.maxTotal"))
      poolConfig.setMaxIdle(config.getInt("pool.maxIdle"))
      poolConfig.setMinIdle(config.getInt("pool.minIdle"))
      
      jedisPool = new JedisPool(
        poolConfig,
        host,
        port,
        timeout,
        if (password.isEmpty) null else password
      )
    }
    jedisPool
  }
  
  // 其他方法保持不变,但使用getPool()来获取连接池实例
  // ...
}
撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题