代码:
package DAO
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{col, concat_ws, expr}
object resultSaver extends java.io.Serializable {
def saveResult(input: DataFrame, topicName: String, batchId: String): Unit = {
// 提前将推荐结果转换为字符串
val preparedData = input
.select(
col("userId"),
expr("transform(recommendations, r -> concat('(', cast(r.itemId as string), ',', cast(r.rating as string), ')')) as recommendations")
)
.withColumn("recommendations", concat_ws(", ", col("recommendations")))
.rdd
.map(row => (row.getInt(0), row.getString(1)))
preparedData.foreach { case (userId, recStr) =>
// 保存推荐结果
try {
RedisDAO.hset(s"$topicName:$batchId", userId.toString, recStr)
} catch {
case e: Exception =>
println(s"Failed to write Redis for user $userId", e.getMessage)
}
// 保存用户批次信息
val userIdStr = userId.toString
val oldBatchIds = RedisDAO.hget(userIdStr, topicName).getOrElse("")
val newBatchIds = if (oldBatchIds.isEmpty) batchId else s"$oldBatchIds,$batchId"
try {
RedisDAO.hset(userIdStr, topicName, newBatchIds)
} catch {
case e: Exception =>
println(s"Failed to write user batch for $userIdStr", e.getMessage)
}
}
}
}
错误:
25/03/14 19:45:23 ERROR Executor: Exception in task 0.0 in stage 182.0 (TID 582)
java.lang.StackOverflowError
at java.lang.reflect.InvocationTargetException.<init>(InvocationTargetException.java:72)
at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2053)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1634)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2256)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2053)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1634)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2365)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2289)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2147)
一直会报栈溢出
我尝试增大栈的大小:
sparkConfig.set("spark.executor.extraJavaOptions", "-Xss128m -XX:+UseG1GC") //避免stackoverflow 增大内存
sparkConfig.set("spark.driver.extraJavaOptions", "-Xss128m -XX:+UseG1GC") //避免stackoverflow 增大内存
但是时好时坏
好像也不是代码的问题
我想根治这个问题
谢谢大神!!!
补充:
redisDAO
package DAO
import com.typesafe.config.{Config, ConfigFactory}
import redis.clients.jedis._
import scala.collection.JavaConverters._
object RedisDAO {
// 所有字段必须标记为 @transient 或延迟初始化
@transient private var jedisPool: JedisPool = _
@transient private lazy val config: Config = ConfigFactory.load().getConfig("redis")
// 核心修复点:延迟加载配置并初始化连接池
private def initPool(): Unit = {
if (jedisPool == null || jedisPool.isClosed) {
// 从本地重新加载配置,而非依赖序列化的 config 对象
val host = config.getString("host")
val port = config.getInt("port")
val password = config.getString("password")
val timeout = config.getInt("timeout")
val maxTotal = config.getInt("pool.maxTotal")
val maxIdle = config.getInt("pool.maxIdle")
val minIdle = config.getInt("pool.minIdle")
val poolConfig = new JedisPoolConfig()
poolConfig.setMaxTotal(maxTotal)
poolConfig.setMaxIdle(maxIdle)
poolConfig.setMinIdle(minIdle)
jedisPool = new JedisPool(
poolConfig,
host,
port,
timeout,
if (password.isEmpty) null else password
)
}
}
def getJedis():Jedis = {
val jedis = jedisPool.getResource
jedis
}
private def withClient[T](block: Jedis => T): T = {
initPool() // 确保连接池已初始化
val jedis = jedisPool.getResource
try {
block(jedis)
} finally {
if (jedis != null) jedis.close()
}
}
// Key 操作
def exists(key: String): Boolean = withClient(_.exists(key))
def expire(key: String, seconds: Int): Long = withClient(_.expire(key, seconds))
def ttl(key: String): Long = withClient(_.ttl(key))
def del(keys: String*): Long = withClient(_.del(keys: _*))
def keys(pattern: String): Set[String] = withClient(_.keys(pattern).asScala.toSet)
// String 操作
def get(key: String): Option[String] = withClient { jedis =>
Option(jedis.get(key))
}
def set(key: String, value: String): String = withClient(_.set(key, value))
def setex(key: String, seconds: Int, value: String): String =
withClient(_.setex(key, seconds, value))
def incr(key: String): Long = withClient(_.incr(key))
def decr(key: String): Long = withClient(_.decr(key))
// Hash 操作
def hget(key: String, field: String): Option[String] = withClient { jedis =>
Option(jedis.hget(key, field))
}
def hset(key: String, field: String, value: String): Long =
withClient(_.hset(key, field, value))
def hgetAll(key: String): Map[String, String] = withClient { jedis =>
jedis.hgetAll(key).asScala.toMap
}
def hdel(key: String, fields: String*): Long =
withClient(_.hdel(key, fields: _*))
// List 操作
def lpush(key: String, values: String*): Long =
withClient(_.lpush(key, values: _*))
def rpush(key: String, values: String*): Long =
withClient(_.rpush(key, values: _*))
def lrange(key: String, start: Long, end: Long): List[String] =
withClient(_.lrange(key, start, end).asScala.toList)
// Set 操作
def sadd(key: String, members: String*): Long =
withClient(_.sadd(key, members: _*))
def smembers(key: String): Set[String] =
withClient(_.smembers(key).asScala.toSet)
def sismember(key: String, member: String): Boolean =
withClient(_.sismember(key, member))
// Sorted Set 操作
def zadd(key: String, score: Double, member: String): Long =
withClient(_.zadd(key, score, member))
def zrange(key: String, start: Long, end: Long): List[String] =
withClient(_.zrange(key, start, end).asScala.toList)
// HyperLogLog 操作
def pfadd(key: String, elements: String*): Long =
withClient(_.pfadd(key, elements: _*))
def pfcount(key: String): Long =
withClient(_.pfcount(key))
// 发布订阅
def publish(channel: String, message: String): Long =
withClient(_.publish(channel, message))
// 事务支持
def transaction(block: Transaction => Unit): List[Object] = withClient { jedis =>
val tx = jedis.multi()
block(tx)
tx.exec().asScala.toList
}
// 管道操作
def pipeline(block: Pipeline => Unit): List[Any] = withClient { jedis =>
val pipe = jedis.pipelined()
block(pipe)
pipe.sync()
pipe.syncAndReturnAll().asScala.toList
}
// 获取管道实例
def getPipiline():Pipeline = withClient { jedis =>
val pipe = jedis.pipelined()
pipe
}
// 服务器相关
def flushDB(): String = withClient(_.flushDB())
// 安全关闭
def close(): Unit = jedisPool.close()
// 批量操作
def mget(keys: String*): Map[String, String] = withClient { jedis =>
val values = jedis.mget(keys: _*).asScala
keys.zip(values).collect {
case (k, v) if v != null => (k, v)
}.toMap
}
}
试一下代码呢,优化了一下