在下spark新手,最近一直在学习,用pyspark跑了一些例子,都没有问题,但是运行ml例子中的random_forest_example.py的时候却出现如下错误:
- py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
- org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 3, localhost): java.net.SocketException: Connection reset by peer: socket write error
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(Unknown Source)
at java.net.SocketOutputStream.write(Unknown Source)
at java.io.BufferedOutputStream.flushBuffer(Unknown Source)
at java.io.BufferedOutputStream.flush(Unknown Source)
at java.io.DataOutputStream.flush(Unknown Source)
at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:251)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1772)
at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:208)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
运行的代码如下:
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import StringIndexer
from pyspark.ml.regression import RandomForestRegressor
from pyspark.mllib.evaluation import MulticlassMetrics, RegressionMetrics
from pyspark.mllib.util import MLUtils
from pyspark.sql import Row, SQLContext
"""
A simple example demonstrating a RandomForest Classification/Regression Pipeline.
Run with:
bin/spark-submit examples/src/main/python/ml/random_forest_example.py
"""
def testClassification(train, test):
# Train a RandomForest model.
# Setting featureSubsetStrategy="auto" lets the algorithm choose.
# Note: Use larger numTrees in practice.
rf = RandomForestClassifier(labelCol="indexedLabel", numTrees=3, maxDepth=4)
model = rf.fit(train)
predictionAndLabels = model.transform(test).select("prediction", "indexedLabel") \
.map(lambda x: (x.prediction, x.indexedLabel))
metrics = MulticlassMetrics(predictionAndLabels)
print("weighted f-measure %.3f" % metrics.weightedFMeasure())
print("precision %s" % metrics.precision())
print("recall %s" % metrics.recall())
def testRegression(train, test):
# Train a RandomForest model.
# Note: Use larger numTrees in practice.
rf = RandomForestRegressor(labelCol="indexedLabel", numTrees=3, maxDepth=4)
model = rf.fit(train)
predictionAndLabels = model.transform(test).select("prediction", "indexedLabel") \
.map(lambda x: (x.prediction, x.indexedLabel))
metrics = RegressionMetrics(predictionAndLabels)
print("rmse %.3f" % metrics.rootMeanSquaredError)
print("r2 %.3f" % metrics.r2)
print("mae %.3f" % metrics.meanAbsoluteError)
if __name__ == "__main__":
if len(sys.argv) > 1:
print("Usage: random_forest_example", file=sys.stderr)
exit(1)
sc = SparkContext(appName="PythonRandomForestExample")
sqlContext = SQLContext(sc)
# Load and parse the data file into a dataframe.
df = MLUtils.loadLibSVMFile(sc, "D:\spark-1.4.0\examples\src\main\python\ml\sample_libsvm_data.txt").toDF()
# Map labels into an indexed column of labels in [0, numLabels)
stringIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel")
si_model = stringIndexer.fit(df)
td = si_model.transform(df)
[train, test] = td.randomSplit([0.7, 0.3])
testClassification(train, test)
testRegression(train, test)
sc.stop()
逐行运行发现问题出在
# Load and parse the data file into a dataframe.
df = MLUtils.loadLibSVMFile(sc, "D:\spark-1.4.0\examples\src\main\python\ml\sample_libsvm_data.txt").toDF()
中的toDF()方法上,loadLibSVMFile没有问题,得到的是RDD,而且可以collect得到数组,只是toDF()这一步出现了问题。在下新手不知道问题出在哪里,还请各位帮忙。
我也遇到这样的问题 你解决了么!??