Random Forest
Random Forest分为ML Classification API和ML Regression API两大类模型接口。
模型接口类别 |
函数接口 |
---|---|
ML Classification API |
def fit(dataset: Dataset[_]): RandomForestClassificationModel |
def fit(dataset: Dataset[_], paramMap: ParamMap): RandomForestClassificationModel |
|
def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Seq[RandomForestClassificationModel] |
|
def fit(dataset: Dataset[_], firstParamPair: ParamPair[_], otherParamPairs: ParamPair[_]*): RandomForestClassificationModel |
|
ML Regression API |
def fit(dataset: Dataset[_]): RandomForestClassificationModel |
def fit(dataset: Dataset[_], paramMap: ParamMap): GBTRegressionModel |
|
def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Seq[RandomForestClassificationModel] |
|
def fit(dataset: Dataset[_], firstParamPair: ParamPair[_], otherParamPairs: ParamPair[_]*): RandomForestClassificationModel |
ML Classification API
- 输入输出
- 包名:package org.apache.spark.ml.classification
- 类名:RandomForestClassifier
- 方法名:fit
- 输入:Dataset[_],训练样本数据,必须字段如下
参数名称
取值类型
缺省值
描述
labelCol
Double
label
预测标签
featuresCol
Vector
features
特征标签
- 输入:paramMap,paramMaps, firstParamPair, otherParamPairs,fit接口的模型参数,说明如下
参数名称
取值类型
示例
描述
paramMap
ParamMap
ParamMap(A.c -> b)
将b的值赋给模型A的参数c
paramMaps
Array[ParamMap]
Array[ParamMap](n)
形成n个ParamMap模型参数列表
firstParamPair
ParamPair
ParamPair(A.c, b)
将b的值赋给模型A的参数c
otherParamPairs
ParamPair
ParamPair(A.e, f)
将f的值赋给模型A的参数e
- 基于原生算法优化的参数
def setCheckpointInterval(value: Int): RandomForestClassifier.this.type def setFeatureSubsetStrategy(value: String): RandomForestClassifier.this.type def setFeaturesCol(value: String): RandomForestClassifier def setImpurity(value: String): RandomForestClassifier.this.type def setLabelCol(value: String): RandomForestClassifier def setMaxBins(value: Int): RandomForestClassifier.this.type def setMaxDepth(value: Int): RandomForestClassifier.this.type def setMinInfoGain(value: Double): RandomForestClassifier.this.type def setMinInstancesPerNode(value: Int): RandomForestClassifier.this.type def setNumTrees(value: Int): RandomForestClassifier.this.type def setPredictionCol(value: String): RandomForestClassifier def setProbabilityCol(value: String): RandomForestClassifier def setRawPredictionCol(value: String): RandomForestClassifier def setSeed(value: Long): RandomForestClassifier.this.type def setSubsamplingRate(value: Double): RandomForestClassifier.this.type def setThresholds(value: Array[Double]): RandomForestClassifier
- 新增算法参数
参数名称
spark conf参数名
参数含义
取值类型
numTrainingDataCopies
spark.boostkit.ml.rf.numTrainingDataCopies
训练数据的副本数量
Int,缺省值为1,必须大于等于1。
broadcastVariables
spark.boostkit.ml.rf.broadcastVariables
是否广播具有较大存储空间的变量
Boolean,缺省值为false。
numPartsPerTrainingDataCopy
spark.boostkit.ml.rf.numPartsPerTrainingDataCopy
单个训练数据副本的分区数
Int,必须大于等于0,缺省值为0,0为不重新分区。
binnedFeaturesDataType
spark.boostkit.ml.rf.binnedFeaturesDataType
训练样本数据中特征的存储格式
String,可支持存储格式:array、fasthashmap,缺省值为array。
参数及fit代码接口示例:import org.apache.spark.ml.param.{ParamMap, ParamPair} val rf= new RandomForestClassifier()//定义随机森林分类器 //定义def fit(dataset: Dataset[_], paramMap: ParamMap) 接口参数 val paramMap = ParamMap(rf.maxDepth -> maxDepth).put(rf.numTrees, numTrees) // 定义def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): 接口参数 val paramMaps = new Array[ParamMap](2) for (i <- 0 to paramMaps.size) { paramMaps(i) = ParamMap(rf.maxDepth -> maxDepth) .put(rf.numTrees, numTrees) }//对paramMaps进行赋值 // 定义def fit(dataset: Dataset[_], firstParamPair: ParamPair[_], otherParamPairs: ParamPair[_]*) 接口参数 val firstParamPair= ParamPair(rf.maxDepth, maxDepth1) val otherParamPairs_1st= ParamPair(rf.maxDepth, maxDepth2) val otherParamPairs_2nd= ParamPair(rf.numTrees, numTrees) // 调用各个fit接口 model = rf.fit(trainingData) model = rf.fit(trainingData, paramMap) models = rf.fit(trainingData, paramMaps) model = rf.fit(trainingData, firstParamPair, otherParamPairs_1st, otherParamPairs_2nd)
- 新增参数设置示例代码
// spark: SparkSession spark.conf.set("spark.boostkit.ml.rf.binnedFeaturesDataType", binnedFeaturesType) spark.conf.set("spark.boostkit.ml.rf.numTrainingDataCopies", numCopiesInput) spark.conf.set("spark.boostkit.ml.rf.numPartsPerTrainingDataCopy", pt) spark.conf.set("spark.boostkit.ml.rf.broadcastVariables", bcVariables) val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
- 输出:RFClassificationModel,Random Forest分类模型,模型预测时的输出字段
参数名称
取值类型
缺省值
描述
predictionCol
Double
prediction
Predicted label
- 使用样例fit(dataset: Dataset[_]): RFClassificationModel样例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
import org.apache.spark.ml.Pipeline import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier} import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer} // Load and parse the data file, converting it to a DataFrame. val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt") // Index labels, adding metadata to the label column. // Fit on whole dataset to include all labels in index. val labelIndexer = new StringIndexer() .setInputCol("label") .setOutputCol("indexedLabel") .fit(data) // Automatically identify categorical features, and index them. // Set maxCategories so features with > 4 distinct values are treated as continuous. val featureIndexer = new VectorIndexer() .setInputCol("features") .setOutputCol("indexedFeatures") .setMaxCategories(4) .fit(data) // Split the data into training and test sets (30% held out for testing). val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3)) // Train a RandomForest model. val rf = new RandomForestClassifier() .setLabelCol("indexedLabel") .setFeaturesCol("indexedFeatures") .setNumTrees(10) // Convert indexed labels back to original labels. val labelConverter = new IndexToString() .setInputCol("prediction") .setOutputCol("predictedLabel") .setLabels(labelIndexer.labels) // Chain indexers and forest in a Pipeline. val pipeline = new Pipeline() .setStages(Array(labelIndexer, featureIndexer, rf, labelConverter)) // Train model. This also runs the indexers. val model = pipeline.fit(trainingData) // Make predictions. val predictions = model.transform(testData) // Select example rows to display. predictions.select("predictedLabel", "label", "features").show(5) // Select (prediction, true label) and compute test error. val evaluator = new MulticlassClassificationEvaluator() .setLabelCol("indexedLabel") .setPredictionCol("prediction") .setMetricName("accuracy") val accuracy = evaluator.evaluate(predictions) println(s"Test Error = ${(1.0 - accuracy)}") val rfModel = model.stages(2).asInstanceOf[RandomForestClassificationModel] println(s"Learned classification forest model:\n ${rfModel.toDebugString}")
- 结果样例:
1 2 3 4 5 6 7 8 9 10 11 12
+--------------+-----+--------------------+ |predictedLabel|label| features| +--------------+-----+--------------------+ | 1.0| 1.0|(47236,[270,439,5...| | 1.0| 1.0|(47236,[3023,6093...| | -1.0| -1.0|(47236,[270,391,4...| | -1.0| -1.0|(47236,[3718,3723...| | 1.0| 1.0|(47236,[729,760,1...| +--------------+-----+--------------------+ only showing top 5 rows Test Error = 0.06476632743800015
- 输入输出
- 包名:package org.apache.spark.mllib.tree
- 类名:RandomForestRegressor
- 方法名:fit
- 输入:Dataset[_],训练样本数据,必须字段如下
参数名称
取值类型
缺省值
描述
labelCol
Double
label
预测标签
featuresCol
Vector
features
特征标签
- 输入:paramMap,paramMaps, firstParamPair, otherParamPairs,fit接口的模型参数,说明如下
参数名称
取值类型
示例
描述
paramMap
ParamMap
ParamMap(A.c -> b)
将b的值赋给模型A的参数c
paramMaps
Array[ParamMap]
Array[ParamMap](n)
形成n个ParamMap模型参数列表
firstParamPair
ParamPair
ParamPair(A.c, b)
将b的值赋给模型A的参数c
otherParamPairs
ParamPair
ParamPair(A.e, f)
将f的值赋给模型A的参数e
- 基于原生算法优化的参数
def setCheckpointInterval(value: Int): RandomForestRegressor.this.type def setFeatureSubsetStrategy(value: String): RandomForestRegressor.this.type def setFeaturesCol(value: String): RandomForestRegressor def setImpurity(value: String): RandomForestRegressor.this.type def setLabelCol(value: String): RandomForestRegressor def setMaxBins(value: Int): RandomForestRegressor.this.type def setMaxDepth(value: Int): RandomForestRegressor.this.type def setMinInfoGain(value: Double): RandomForestRegressor.this.type def setMinInstancesPerNode(value: Int): RandomForestRegressor.this.type def setNumTrees(value: Int): RandomForestRegressor.this.type def setPredictionCol(value: String): RandomForestRegressor def setSeed(value: Long): RandomForestRegressor.this.type def setSubsamplingRate(value: Double): RandomForestRegressor.this.type
- 新增算法参数
参数名称
spark conf参数名
参数含义
取值类型
numTrainingDataCopies
spark.boostkit.ml.rf.numTrainingDataCopies
训练数据的副本数量
Int,缺省值为1,必须大于等于1。
broadcastVariables
spark.boostkit.ml.rf.broadcastVariables
是否广播具有较大存储空间的变量
Boolean,缺省值为false。
numPartsPerTrainingDataCopy
spark.boostkit.ml.rf.numPartsPerTrainingDataCopy
单个训练数据副本的分区数
Int,必须大于等于0,缺省值为0,0->不重新分区。
binnedFeaturesDataType
spark.boostkit.ml.rf.binnedFeaturesDataType
训练样本数据中特征的存储格式
String,取值范围:array或者fasthashmap,缺省值为array。
参数及fit代码接口示例:1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
import org.apache.spark.ml.param.{ParamMap, ParamPair} rf= new RandomForestRegressor() //定义回归模型 //定义def fit(dataset: Dataset[_], paramMap: ParamMap) 接口参数 val paramMap = ParamMap(rf.maxDepth -> maxDepth) .put(rf.numTrees, numTrees) // 定义def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): 接口参数 val paramMaps = new Array[ParamMap](2) for (i <- 0 to paramMaps.size) { paramMaps(i) = ParamMap(rf.maxDepth -> maxDepth) .put(rf.numTrees, numTrees) }//对paramMaps进行赋值 // 定义def fit(dataset: Dataset[_], firstParamPair: ParamPair[_], otherParamPairs: ParamPair[_]*) 接口参数 val firstParamPair= ParamPair(rf.maxDepth, maxDepth1) val otherParamPairs_1st= ParamPair(rf.maxDepth, maxDepth2) val otherParamPairs_2nd= ParamPair(rf.numTrees, numTrees) // 调用各个fit接口 model = rf.fit(trainingData) model = rf.fit(trainingData, paramMap) models = rf.fit(trainingData, paramMaps) model = rf.fit(trainingData, firstParamPair, otherParamPairs_1st, otherParamPairs_2nd)
- 新增参数设置示例代码。
// spark: SparkSession spark.conf.set("spark.boostkit.ml.rf.binnedFeaturesDataType", binnedFeaturesType) spark.conf.set("spark.boostkit.ml.rf.numTrainingDataCopies", numCopiesInput) spark.conf.set("spark.boostkit.ml.rf.numPartsPerTrainingDataCopy", pt) spark.conf.set("spark.boostkit.ml.rf.broadcastVariables", bcVariables) val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
- 输出:RFRegressionModel或Seq[RFRegressionModel],GBDT回归模型,模型预测时的输出字段如下。
参数名称
取值类型
缺省值
描述
predictionCol
Double
"prediction"
Predicted label
- 使用样例fit(dataset: Dataset[_]): RFRegressionModel样例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
import org.apache.spark.ml.Pipeline import org.apache.spark.ml.evaluation.RegressionEvaluator import org.apache.spark.ml.feature.VectorIndexer import org.apache.spark.ml.regression.{RandomForestRegressionModel, RandomForestRegressor} // Load and parse the data file, converting it to a DataFrame. val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt") // Automatically identify categorical features, and index them. // Set maxCategories so features with > 4 distinct values are treated as continuous. val featureIndexer = new VectorIndexer() .setInputCol("features") .setOutputCol("indexedFeatures") .setMaxCategories(4) .fit(data) // Split the data into training and test sets (30% held out for testing). val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3)) // Train a RandomForest model. val rf = new RandomForestRegressor() .setLabelCol("label") .setFeaturesCol("indexedFeatures") // Chain indexer and forest in a Pipeline. val pipeline = new Pipeline() .setStages(Array(featureIndexer, rf)) // Train model. This also runs the indexer. val model = pipeline.fit(trainingData) // Make predictions. val predictions = model.transform(testData) // Select example rows to display. predictions.select("prediction", "label", "features").show(5) // Select (prediction, true label) and compute test error. val evaluator = new RegressionEvaluator() .setLabelCol("label") .setPredictionCol("prediction") .setMetricName("rmse") val rmse = evaluator.evaluate(predictions) println(s"Root Mean Squared Error (RMSE) on test data = $rmse") val rfModel = model.stages(1).asInstanceOf[RandomForestRegressionModel] println(s"Learned regression forest model:\n ${rfModel.toDebugString}")
- 结果样例
+----------+-----+--------------------+ |prediction|label| features| +----------+-----+--------------------+ | 0.51| 0.3|(1000,[0,1,2,3,4,...| +----------+-----+--------------------+ root mean squared error = 0.21000000000000002