中文
注册
我要评分
文档获取效率
文档正确性
内容完整性
文档易理解
在线提单
论坛求助

Random Forest

Random Forest分为ML Classification API和ML Regression API两大类模型接口。

模型接口类别

函数接口

ML Classification API

def fit(dataset: Dataset[_]): RandomForestClassificationModel

def fit(dataset: Dataset[_], paramMap: ParamMap): RandomForestClassificationModel

def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Seq[RandomForestClassificationModel]

def fit(dataset: Dataset[_], firstParamPair: ParamPair[_], otherParamPairs: ParamPair[_]*): RandomForestClassificationModel

ML Regression API

def fit(dataset: Dataset[_]): RandomForestClassificationModel

def fit(dataset: Dataset[_], paramMap: ParamMap): GBTRegressionModel

def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Seq[RandomForestClassificationModel]

def fit(dataset: Dataset[_], firstParamPair: ParamPair[_], otherParamPairs: ParamPair[_]*): RandomForestClassificationModel

ML Classification API

  • 功能描述

    传入Dataset格式的样本数据,调用训练接口,输出随机森林分类模型。

  • 输入输出
    1. 包名package org.apache.spark.ml.classification
    2. 类名:RandomForestClassifier
    3. 方法名:fit
    4. 输入:Dataset[_],训练样本数据,必须字段如下

      参数名称

      取值类型

      缺省值

      描述

      labelCol

      Double

      label

      预测标签

      featuresCol

      Vector

      features

      特征标签

    5. 输入:paramMap,paramMaps, firstParamPair, otherParamPairs,fit接口的模型参数,说明如下

      参数名称

      取值类型

      示例

      描述

      paramMap

      ParamMap

      ParamMap(A.c -> b)

      将b的值赋给模型A的参数c

      paramMaps

      Array[ParamMap]

      Array[ParamMap](n)

      形成n个ParamMap模型参数列表

      firstParamPair

      ParamPair

      ParamPair(A.c, b)

      将b的值赋给模型A的参数c

      otherParamPairs

      ParamPair

      ParamPair(A.e, f)

      将f的值赋给模型A的参数e

    6. 基于原生算法优化的参数
      def setCheckpointInterval(value: Int): RandomForestClassifier.this.type
      def setFeatureSubsetStrategy(value: String): RandomForestClassifier.this.type
      def setFeaturesCol(value: String): RandomForestClassifier
      def setImpurity(value: String): RandomForestClassifier.this.type
      def setLabelCol(value: String): RandomForestClassifier
      def setMaxBins(value: Int): RandomForestClassifier.this.type
      def setMaxDepth(value: Int): RandomForestClassifier.this.type
      def setMinInfoGain(value: Double): RandomForestClassifier.this.type
      def setMinInstancesPerNode(value: Int): RandomForestClassifier.this.type
      def setNumTrees(value: Int): RandomForestClassifier.this.type
      def setPredictionCol(value: String): RandomForestClassifier
      def setProbabilityCol(value: String): RandomForestClassifier
      def setRawPredictionCol(value: String): RandomForestClassifier
      def setSeed(value: Long): RandomForestClassifier.this.type
      def setSubsamplingRate(value: Double): RandomForestClassifier.this.type
      def setThresholds(value: Array[Double]): RandomForestClassifier
    7. 新增算法参数

      参数名称

      spark conf参数名

      参数含义

      取值类型

      numTrainingDataCopies

      spark.boostkit.ml.rf.numTrainingDataCopies

      训练数据的副本数量

      Int,缺省值为1,必须大于等于1。

      broadcastVariables

      spark.boostkit.ml.rf.broadcastVariables

      是否广播具有较大存储空间的变量

      Boolean,缺省值为false。

      numPartsPerTrainingDataCopy

      spark.boostkit.ml.rf.numPartsPerTrainingDataCopy

      单个训练数据副本的分区数

      Int,必须大于等于0,缺省值为0,0为不重新分区。

      binnedFeaturesDataType

      spark.boostkit.ml.rf.binnedFeaturesDataType

      训练样本数据中特征的存储格式

      String,可支持存储格式:array、fasthashmap,缺省值为array。

      参数及fit代码接口示例:
      import org.apache.spark.ml.param.{ParamMap, ParamPair}
      
      val rf= new RandomForestClassifier()//定义随机森林分类器
      
      //定义def fit(dataset: Dataset[_], paramMap: ParamMap) 接口参数
      val paramMap = ParamMap(rf.maxDepth -> maxDepth).put(rf.numTrees, numTrees)
      
      // 定义def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): 接口参数
      val paramMaps = new Array[ParamMap](2)
      for (i <- 0 to  paramMaps.size) {
      paramMaps(i) = ParamMap(rf.maxDepth -> maxDepth)
      .put(rf.numTrees, numTrees)
      }//对paramMaps进行赋值
      
      // 定义def fit(dataset: Dataset[_], firstParamPair: ParamPair[_], otherParamPairs: ParamPair[_]*) 接口参数
      val firstParamPair= ParamPair(rf.maxDepth, maxDepth1)
      val otherParamPairs_1st= ParamPair(rf.maxDepth, maxDepth2)
      val otherParamPairs_2nd= ParamPair(rf.numTrees, numTrees)
      
      // 调用各个fit接口
      model = rf.fit(trainingData)
      model = rf.fit(trainingData, paramMap)
      models = rf.fit(trainingData, paramMaps)
      model = rf.fit(trainingData, firstParamPair, otherParamPairs_1st, otherParamPairs_2nd)
    8. 新增参数设置示例代码
      // spark: SparkSession
      spark.conf.set("spark.boostkit.ml.rf.binnedFeaturesDataType", binnedFeaturesType) spark.conf.set("spark.boostkit.ml.rf.numTrainingDataCopies", numCopiesInput) spark.conf.set("spark.boostkit.ml.rf.numPartsPerTrainingDataCopy", pt) spark.conf.set("spark.boostkit.ml.rf.broadcastVariables", bcVariables)
      val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
    9. 输出:RFClassificationModel,Random Forest分类模型,模型预测时的输出字段

      参数名称

      取值类型

      缺省值

      描述

      predictionCol

      Double

      prediction

      Predicted label

  • 使用样例
    fit(dataset: Dataset[_]): RFClassificationModel样例:
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    import org.apache.spark.ml.Pipeline
    import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
    import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
    import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
     
    // Load and parse the data file, converting it to a DataFrame.
    val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
     
    // Index labels, adding metadata to the label column.
    // Fit on whole dataset to include all labels in index.
    val labelIndexer = new StringIndexer()
      .setInputCol("label")
      .setOutputCol("indexedLabel")
      .fit(data)
    // Automatically identify categorical features, and index them.
    // Set maxCategories so features with > 4 distinct values are treated as continuous.
    val featureIndexer = new VectorIndexer()
      .setInputCol("features")
      .setOutputCol("indexedFeatures")
      .setMaxCategories(4)
      .fit(data)
     
    // Split the data into training and test sets (30% held out for testing).
    val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
     
    // Train a RandomForest model.
    val rf = new RandomForestClassifier()
      .setLabelCol("indexedLabel")
      .setFeaturesCol("indexedFeatures")
      .setNumTrees(10)
     
    // Convert indexed labels back to original labels.
    val labelConverter = new IndexToString()
      .setInputCol("prediction")
      .setOutputCol("predictedLabel")
      .setLabels(labelIndexer.labels)
     
    // Chain indexers and forest in a Pipeline.
    val pipeline = new Pipeline()
      .setStages(Array(labelIndexer, featureIndexer, rf, labelConverter))
     
    // Train model. This also runs the indexers.
    val model = pipeline.fit(trainingData)
     
    // Make predictions.
    val predictions = model.transform(testData)
     
    // Select example rows to display.
    predictions.select("predictedLabel", "label", "features").show(5)
     
    // Select (prediction, true label) and compute test error.
    val evaluator = new MulticlassClassificationEvaluator()
      .setLabelCol("indexedLabel")
      .setPredictionCol("prediction")
      .setMetricName("accuracy")
    val accuracy = evaluator.evaluate(predictions)
    println(s"Test Error = ${(1.0 - accuracy)}")
     
    val rfModel = model.stages(2).asInstanceOf[RandomForestClassificationModel]
    println(s"Learned classification forest model:\n ${rfModel.toDebugString}")
    
  • 结果样例:
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    +--------------+-----+--------------------+
    |predictedLabel|label|            features|
    +--------------+-----+--------------------+
    |           1.0|  1.0|(47236,[270,439,5...|
    |           1.0|  1.0|(47236,[3023,6093...|
    |          -1.0| -1.0|(47236,[270,391,4...|
    |          -1.0| -1.0|(47236,[3718,3723...|
    |           1.0|  1.0|(47236,[729,760,1...|
    +--------------+-----+--------------------+
    only showing top 5 rows
    
    Test Error = 0.06476632743800015
    

ML Regression API

  • 功能描述

    传入RDD格式的样本数据,调用训练接口,输出随机森林分类模型。

  • 输入输出
    1. 包名package org.apache.spark.mllib.tree
    2. 类名:RandomForestRegressor
    3. 方法名:fit
    4. 输入:Dataset[_],训练样本数据,必须字段如下

      参数名称

      取值类型

      缺省值

      描述

      labelCol

      Double

      label

      预测标签

      featuresCol

      Vector

      features

      特征标签

    5. 输入:paramMap,paramMaps, firstParamPair, otherParamPairs,fit接口的模型参数,说明如下

      参数名称

      取值类型

      示例

      描述

      paramMap

      ParamMap

      ParamMap(A.c -> b)

      将b的值赋给模型A的参数c

      paramMaps

      Array[ParamMap]

      Array[ParamMap](n)

      形成n个ParamMap模型参数列表

      firstParamPair

      ParamPair

      ParamPair(A.c, b)

      将b的值赋给模型A的参数c

      otherParamPairs

      ParamPair

      ParamPair(A.e, f)

      将f的值赋给模型A的参数e

    6. 基于原生算法优化的参数
      def setCheckpointInterval(value: Int): RandomForestRegressor.this.type
      def setFeatureSubsetStrategy(value: String): RandomForestRegressor.this.type
      def setFeaturesCol(value: String): RandomForestRegressor
      def setImpurity(value: String): RandomForestRegressor.this.type
      def setLabelCol(value: String): RandomForestRegressor
      def setMaxBins(value: Int): RandomForestRegressor.this.type
      def setMaxDepth(value: Int): RandomForestRegressor.this.type
      def setMinInfoGain(value: Double): RandomForestRegressor.this.type
      def setMinInstancesPerNode(value: Int): RandomForestRegressor.this.type
      def setNumTrees(value: Int): RandomForestRegressor.this.type
      def setPredictionCol(value: String): RandomForestRegressor
      def setSeed(value: Long): RandomForestRegressor.this.type
      def setSubsamplingRate(value: Double): RandomForestRegressor.this.type
    7. 新增算法参数

      参数名称

      spark conf参数名

      参数含义

      取值类型

      numTrainingDataCopies

      spark.boostkit.ml.rf.numTrainingDataCopies

      训练数据的副本数量

      Int,缺省值为1,必须大于等于1。

      broadcastVariables

      spark.boostkit.ml.rf.broadcastVariables

      是否广播具有较大存储空间的变量

      Boolean,缺省值为false。

      numPartsPerTrainingDataCopy

      spark.boostkit.ml.rf.numPartsPerTrainingDataCopy

      单个训练数据副本的分区数

      Int,必须大于等于0,缺省值为0,0->不重新分区。

      binnedFeaturesDataType

      spark.boostkit.ml.rf.binnedFeaturesDataType

      训练样本数据中特征的存储格式

      String,取值范围:array或者fasthashmap,缺省值为array。

      参数及fit代码接口示例:
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      import org.apache.spark.ml.param.{ParamMap, ParamPair}
      
      rf= new RandomForestRegressor() //定义回归模型
      
      //定义def fit(dataset: Dataset[_], paramMap: ParamMap) 接口参数
      val paramMap = ParamMap(rf.maxDepth -> maxDepth)
      .put(rf.numTrees, numTrees)
      
      // 定义def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): 接口参数
      val paramMaps = new Array[ParamMap](2)
      for (i <- 0 to  paramMaps.size) {
      paramMaps(i) = ParamMap(rf.maxDepth -> maxDepth)
      .put(rf.numTrees, numTrees)
      }//对paramMaps进行赋值
      
      // 定义def fit(dataset: Dataset[_], firstParamPair: ParamPair[_], otherParamPairs: ParamPair[_]*) 接口参数
      val firstParamPair= ParamPair(rf.maxDepth, maxDepth1)
      val otherParamPairs_1st= ParamPair(rf.maxDepth, maxDepth2)
      val otherParamPairs_2nd= ParamPair(rf.numTrees, numTrees)
      
      // 调用各个fit接口
      model = rf.fit(trainingData)
      model = rf.fit(trainingData, paramMap)
      models = rf.fit(trainingData, paramMaps)
      model = rf.fit(trainingData, firstParamPair, otherParamPairs_1st, otherParamPairs_2nd)
      
    8. 新增参数设置示例代码。
      // spark: SparkSession
      spark.conf.set("spark.boostkit.ml.rf.binnedFeaturesDataType", binnedFeaturesType) spark.conf.set("spark.boostkit.ml.rf.numTrainingDataCopies", numCopiesInput) spark.conf.set("spark.boostkit.ml.rf.numPartsPerTrainingDataCopy", pt) spark.conf.set("spark.boostkit.ml.rf.broadcastVariables", bcVariables)
      val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
    9. 输出:RFRegressionModel或Seq[RFRegressionModel],GBDT回归模型,模型预测时的输出字段如下。

      参数名称

      取值类型

      缺省值

      描述

      predictionCol

      Double

      "prediction"

      Predicted label

  • 使用样例
    fit(dataset: Dataset[_]): RFRegressionModel样例:
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    import org.apache.spark.ml.Pipeline
    import org.apache.spark.ml.evaluation.RegressionEvaluator
    import org.apache.spark.ml.feature.VectorIndexer
    import org.apache.spark.ml.regression.{RandomForestRegressionModel, RandomForestRegressor}
     
    // Load and parse the data file, converting it to a DataFrame.
    val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
     
    // Automatically identify categorical features, and index them.
    // Set maxCategories so features with > 4 distinct values are treated as continuous.
    val featureIndexer = new VectorIndexer()
      .setInputCol("features")
      .setOutputCol("indexedFeatures")
      .setMaxCategories(4)
      .fit(data)
     
    // Split the data into training and test sets (30% held out for testing).
    val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
     
    // Train a RandomForest model.
    val rf = new RandomForestRegressor()
      .setLabelCol("label")
      .setFeaturesCol("indexedFeatures")
     
    // Chain indexer and forest in a Pipeline.
    val pipeline = new Pipeline()
      .setStages(Array(featureIndexer, rf))
     
    // Train model. This also runs the indexer.
    val model = pipeline.fit(trainingData)
     
    // Make predictions.
    val predictions = model.transform(testData)
     
    // Select example rows to display.
    predictions.select("prediction", "label", "features").show(5)
     
    // Select (prediction, true label) and compute test error.
    val evaluator = new RegressionEvaluator()
      .setLabelCol("label")
      .setPredictionCol("prediction")
      .setMetricName("rmse")
    val rmse = evaluator.evaluate(predictions)
    println(s"Root Mean Squared Error (RMSE) on test data = $rmse")
     
    val rfModel = model.stages(1).asInstanceOf[RandomForestRegressionModel]
    println(s"Learned regression forest model:\n ${rfModel.toDebugString}")
    
  • 结果样例
    +----------+-----+--------------------+
    |prediction|label|            features|
    +----------+-----+--------------------+
    |      0.51|  0.3|(1000,[0,1,2,3,4,...|
    +----------+-----+--------------------+
    
    root mean squared error = 0.21000000000000002