RF
RF分为ML Classification API和ML Regression API两大类模型接口。
模型接口类别 |
函数接口 |
---|---|
ML Classification API |
def fit(dataset: Dataset[_]): RandomForestClassificationModel |
def fit(dataset: Dataset[_], paramMap: ParamMap): RandomForestClassificationModel |
|
def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Seq[RandomForestClassificationModel] |
|
def fit(dataset: Dataset[_], firstParamPair: ParamPair[_], otherParamPairs: ParamPair[_]*): RandomForestClassificationModel |
|
ML Regression API |
def fit(dataset: Dataset[_]): RandomForestClassificationModel |
def fit(dataset: Dataset[_], paramMap: ParamMap): GBTRegressionModel |
|
def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Seq[RandomForestClassificationModel] |
|
def fit(dataset: Dataset[_], firstParamPair: ParamPair[_], otherParamPairs: ParamPair[_]*): RandomForestClassificationModel |
ML Classification API
- 功能描述:
- 输入输出
- 包名:package org.apache.spark.ml.classification
- 类名:RandomForestClassifier
- 方法名:fit
- 输入:Dataset[_],训练样本数据,必须字段如下。
Param name
Type(s)
Default
Description
labelCol
Double
"label"
预测标签
featuresCol
Vector
"features"
特征标签
- 输入:paramMap、paramMaps、firstParamPair、otherParamPairs,fit接口的模型参数,说明如下。
Param name
Type(s)
Example
Description
paramMap
ParamMap
ParamMap(A.c -> b)
将b的值赋给模型A的参数c
paramMaps
Array[ParamMa p]
Array[ParamMa p](n)
形成n个ParamMap模型参数列表
firstParamPair
ParamPair
ParamPair(A.c, b)
将b的值赋给模型A的参数c
otherParamPair s
ParamPair
ParamPair(A.e, f)
将f的值赋给模型
A的参数e
- 算法参数
算法参数
def setCheckpointInterval(value: Int): RandomForestClassifier.this.type
def setFeatureSubsetStrategy(value: String): RandomForestClassifier.this.type
def setFeaturesCol(value: String): RandomForestClassifier
def setImpurity(value: String): RandomForestClassifier.this.type
def setLabelCol(value: String): RandomForestClassifier
def setMaxBins(value: Int): RandomForestClassifier.this.t
def setMaxDepth(value: Int): RandomForestClassifier.this.type
def setMinInfoGain(value: Double): RandomForestClassifier.this.type
def setMinInstancesPerNode(value: Int): RandomForestClassifier.this.type
def setNumTrees(value: Int): RandomForestClassifier.this.type
def setPredictionCol(value: String): RandomForestClassifier
def setProbabilityCol(value: String): RandomForestClassifier
def setRawPredictionCol(value: String): RandomForestClassifier
def setSeed(value: Long): RandomForestClassifier.this.type
def setSubsamplingRate(value: Double): RandomForestClassifier.this.type
def setThresholds(value: Array[Double]): RandomForestClassifier
- 新增算法参数。
参数名称
spark conf参数名
参数含义
取值类型
numTrainingD ataCopies
spark.sophon.ml.rf. numTrainingDataC opies
训练数据的副本数量
Int,缺省值为 1,必须大于等于1
broadcastVaria bles
spark.sophon.ml.rf. broadcastVariables
是否广播具有较大存储空间的变量
Boolean,缺省值为false
numPartsPerTr ainingDataCop y
spark.sophon.ml.rf. numPartsPerTraini ngDataCopy
单个训练数据副本的分区数
Int,必须大于等于0,缺省值为0,0为不重新分区
binnedFeature sDataType
spark.sophon.ml.rf. binnedFeaturesDat aType
训练样本数据中特征的存储格式
String,可支持存储格式:array、 fasthashmap,缺省值为array
参数及fit代码接口示例:
import org.apache.spark.ml.param.{ParamMap, ParamPair} val rf= new RandomForestClassifier()//定义随机森林分类器 //定义def fit(dataset: Dataset[_], paramMap: ParamMap) 接口参数 val paramMap = ParamMap(rf.maxDepth -> maxDepth).put(rf.numTrees, numTrees) // 定义def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): 接口参数 val paramMaps = new Array[ParamMap](2) for (i <- 0 to paramMaps.size) { paramMaps(i) = ParamMap(rf.maxDepth -> maxDepth) .put(rf.numTrees, numTrees) }//对paramMaps进行赋值 // 定义def fit(dataset: Dataset[_], firstParamPair: ParamPair[_], otherParamPairs: ParamPair[_]*) 接口参数 val firstParamPair= ParamPair(rf.maxDepth, maxDepth1) val otherParamPairs_1st= ParamPair(rf.maxDepth, maxDepth2) val otherParamPairs_2nd= ParamPair(rf.numTrees, numTrees) // 调用各个fit接口 model = rf.fit(trainingData) model = rf.fit(trainingData, paramMap) models = rf.fit(trainingData, paramMaps) model = rf.fit(trainingData, firstParamPair, otherParamPairs_1st, otherParamPairs_2nd)
- 新增参数设置示例代码:
// spark: SparkSession spark.conf.set("spark.sophon.ml.rf.binnedFeaturesDataType", binnedFeaturesType) spark.conf.set("spark.sophon.ml.rf.numTrainingDataCopies", numCopiesInput) spark.conf.set("spark.sophon.ml.rf.numPartsPerTrainingDataCopy", pt) spark.conf.set("spark.sophon.ml.rf.broadcastVariables", bcVariables) val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
- 输出:RFClassificationModel,RF分类模型,模型预测时的输出字段。
Param name
Type(s)
Default
Description
predictionCol
Double
"prediction"
Predicted label
- 使用样例
fit(dataset: Dataset[_]): RFClassificationModel样例:
import org.apache.spark.ml.Pipeline import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier} import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer} // Load and parse the data file, converting it to a DataFrame. val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt") // Index labels, adding metadata to the label column. // Fit on whole dataset to include all labels in index. val labelIndexer = new StringIndexer() .setInputCol("label") .setOutputCol("indexedLabel") .fit(data) // Automatically identify categorical features, and index them. // Set maxCategories so features with > 4 distinct values are treated as continuous. val featureIndexer = new VectorIndexer() .setInputCol("features") .setOutputCol("indexedFeatures") .setMaxCategories(4) .fit(data) // Split the data into training and test sets (30% held out for testing). val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3)) // Train a RandomForest model. val rf = new RandomForestClassifier() .setLabelCol("indexedLabel") .setFeaturesCol("indexedFeatures") .setNumTrees(10) // Convert indexed labels back to original labels. val labelConverter = new IndexToString() .setInputCol("prediction") .setOutputCol("predictedLabel") .setLabels(labelIndexer.labels) // Chain indexers and forest in a Pipeline. val pipeline = new Pipeline() .setStages(Array(labelIndexer, featureIndexer, rf, labelConverter)) // Train model. This also runs the indexers. val model = pipeline.fit(trainingData) // Make predictions. val predictions = model.transform(testData) // Select example rows to display. predictions.select("predictedLabel", "label", "features").show(5) // Select (prediction, true label) and compute test error. val evaluator = new MulticlassClassificationEvaluator() .setLabelCol("indexedLabel") .setPredictionCol("prediction") .setMetricName("accuracy") val accuracy = evaluator.evaluate(predictions) println(s"Test Error = ${(1.0 - accuracy)}") val rfModel = model.stages(2).asInstanceOf[RandomForestClassificationModel] println(s"Learned classification forest model:\n ${rfModel.toDebugString}")
- 结果样例:
+--------------+-----+--------------------+ |predictedLabel|label| features| +--------------+-----+--------------------+ | 1.0| 1.0|(47236,[270,439,5...| | 1.0| 1.0|(47236,[3023,6093...| | -1.0| -1.0|(47236,[270,391,4...| | -1.0| -1.0|(47236,[3718,3723...| | 1.0| 1.0|(47236,[729,760,1...| +--------------+-----+--------------------+ only showing top 5 rows Test Error = 0.06476632743800015
ML Regression API
- 功能描述
- 输入输出
- 包名:package org.apache.spark.mllib.tree
- 类名:RandomForestRegressor
- 方法名:fit
- 输入:Dataset[_],训练样本数据,必须字段如下。
Param name
Type(s)
Default
Description
labelCol
Double
"label"
预测标签
featuresCol
Vector
"features"
特征标签
- 输入:paramMap、paramMaps、firstParamPair、otherParamPairs,fit接口的模型参数,说明如下。
Param name
Type(s)
Example
Description
paramMap
ParamMap
ParamMap(A.c -> b)
将b的值赋给模型A的参数c
paramMaps
Array[ParamMa p]
Array[ParamMa p](n)
形成n个ParamMap模型参数列表
firstParamPair
ParamPair
ParamPair(A.c, b)
将b的值赋给模型A的参数c
otherParamPair s
ParamPair
ParamPair(A.e, f)
将f的值赋给模型A的参数e
- 算法参数。
算法参数
def setCheckpointInterval(value: Int): RandomForestRegressor.this.type
def setFeatureSubsetStrategy(value: String): RandomForestRegressor.this.type
def setFeaturesCol(value: String): RandomForestRegressor
def setImpurity(value: String): RandomForestRegressor.this.type
def setLabelCol(value: String): RandomForestRegressor
def setMaxBins(value: Int): RandomForestRegressor.this.type
def setMaxDepth(value: Int): RandomForestRegressor.this.type
def setMinInfoGain(value: Double): RandomForestRegressor.this.type
def setMinInstancesPerNode(value: Int): RandomForestRegressor.this.type
def setNumTrees(value: Int): RandomForestRegressor.this.type
def setPredictionCol(value: String): RandomForestRegressor
def setSeed(value: Long): RandomForestRegressor.this.type
def setSubsamplingRate(value: Double):RandomForestRegressor.this.type
- 新增算法参数。
参数名称
spark conf参数名
参数含义
取值类型
numTrainingD ataCopies
spark.sophon.ml.rf. numTrainingDataC opies
训练数据的副本数量
Int,缺省值为 1,必须大于等于1
broadcastVaria bles
spark.sophon.ml.rf. broadcastVariables
是否广播具有较大存储空间的变量
Boolean,缺省值为false
numPartsPerTr ainingDataCop y
spark.sophon.ml.rf. numPartsPerTraini ngDataCopy
单个训练数据副本的分区数
Int,必须大于等于0,缺省值为0,0->不重新分区
binnedFeature sDataType
spark.sophon.ml.rf. binnedFeaturesDat aType
训练样本数据中特征的存储格式
String,取值范围:array或者fasthashmap,缺省值为array
参数及fit代码接口示例:
import org.apache.spark.ml.param.{ParamMap, ParamPair} rf= new RandomForestRegressor() //定义回归模型 //定义def fit(dataset: Dataset[_], paramMap: ParamMap) 接口参数 val paramMap = ParamMap(rf.maxDepth -> maxDepth) .put(rf.numTrees, numTrees) // 定义def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): 接口参数 val paramMaps = new Array[ParamMap](2) for (i <- 0 to paramMaps.size) { paramMaps(i) = ParamMap(rf.maxDepth -> maxDepth) .put(rf.numTrees, numTrees) }//对paramMaps进行赋值 // 定义def fit(dataset: Dataset[_], firstParamPair: ParamPair[_], otherParamPairs: ParamPair[_]*) 接口参数 val firstParamPair= ParamPair(rf.maxDepth, maxDepth1) val otherParamPairs_1st= ParamPair(rf.maxDepth, maxDepth2) val otherParamPairs_2nd= ParamPair(rf.numTrees, numTrees) // 调用各个fit接口 model = rf.fit(trainingData) model = rf.fit(trainingData, paramMap) models = rf.fit(trainingData, paramMaps) model = rf.fit(trainingData, firstParamPair, otherParamPairs_1st, otherParamPairs_2nd)
- 新增参数设置示例代码。
// spark: SparkSession spark.conf.set("spark.sophon.ml.rf.binnedFeaturesDataType", binnedFeaturesType) spark.conf.set("spark.sophon.ml.rf.numTrainingDataCopies", numCopiesInput) spark.conf.set("spark.sophon.ml.rf.numPartsPerTrainingDataCopy", pt) spark.conf.set("spark.sophon.ml.rf.broadcastVariables", bcVariables) val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
- 输出:RFRegressionModel或Seq[RFRegressionModel],GBDT回归模型,模型预测时的输出字段如下。
Param name
Type(s)
Default
Description
predictionCol
Double
"prediction"
Predicted label
- 使用样例
fit(dataset: Dataset[_]): RFClassificationModel样例:
import org.apache.spark.ml.Pipeline import org.apache.spark.ml.evaluation.RegressionEvaluator import org.apache.spark.ml.feature.VectorIndexer import org.apache.spark.ml.regression.{RandomForestRegressionModel, RandomForestRegressor} // Load and parse the data file, converting it to a DataFrame. val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt") // Automatically identify categorical features, and index them. // Set maxCategories so features with > 4 distinct values are treated as continuous. val featureIndexer = new VectorIndexer() .setInputCol("features") .setOutputCol("indexedFeatures") .setMaxCategories(4) .fit(data) // Split the data into training and test sets (30% held out for testing). val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3)) // Train a RandomForest model. val rf = new RandomForestRegressor() .setLabelCol("label") .setFeaturesCol("indexedFeatures") // Chain indexer and forest in a Pipeline. val pipeline = new Pipeline() .setStages(Array(featureIndexer, rf)) // Train model. This also runs the indexer. val model = pipeline.fit(trainingData) // Make predictions. val predictions = model.transform(testData) // Select example rows to display. predictions.select("prediction", "label", "features").show(5) // Select (prediction, true label) and compute test error. val evaluator = new RegressionEvaluator() .setLabelCol("label") .setPredictionCol("prediction") .setMetricName("rmse") val rmse = evaluator.evaluate(predictions) println(s"Root Mean Squared Error (RMSE) on test data = $rmse") val rfModel = model.stages(1).asInstanceOf[RandomForestRegressionModel] println(s"Learned regression forest model:\n ${rfModel.toDebugString}")
- 结果样例
+----------+-----+--------------------+ |prediction|label| features| +----------+-----+--------------------+ | 0.51| 0.3|(1000,[0,1,2,3,4,...| +----------+-----+--------------------+ root mean squared error = 0.21000000000000002
接口适用性说明:
- 本算法接口适用于基于鲲鹏服务器的HDP大数据平台,其中Java开发环境要求jdk1.8及以上版本,Spark开发环境要求2.3.2版本, 低的HDP版本是3.1.0。
- 本算法运行在HDP大数据平台需部署的组件需要包括:HDFS、Spark2、Yarn、 ZooKeeper、Hive、MapReduce2。