中文
注册
我要评分
文档获取效率
文档正确性
内容完整性
文档易理解
在线提单
论坛求助

RF

RF分为ML Classification API和ML Regression API两大类模型接口。

模型接口类别

函数接口

ML Classification API

def fit(dataset: Dataset[_]):

RandomForestClassificationModel

def fit(dataset: Dataset[_], paramMap: ParamMap): RandomForestClassificationModel

def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Seq[RandomForestClassificationModel]

def fit(dataset: Dataset[_], firstParamPair: ParamPair[_], otherParamPairs: ParamPair[_]*): RandomForestClassificationModel

ML Regression API

def fit(dataset: Dataset[_]):

RandomForestClassificationModel

def fit(dataset: Dataset[_], paramMap: ParamMap): GBTRegressionModel

def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Seq[RandomForestClassificationModel]

def fit(dataset: Dataset[_], firstParamPair: ParamPair[_], otherParamPairs: ParamPair[_]*): RandomForestClassificationModel

ML Classification API

  • 功能描述:

    传入Dataset格式的样本数据,调用训练接口,输出随机森林分类模型。

  • 输入输出
    1. 包名:package org.apache.spark.ml.classification
    2. 类名:RandomForestClassifier
    3. 方法名:fit
    4. 输入:Dataset[_],训练样本数据,必须字段如下。

      Param name

      Type(s)

      Default

      Description

      labelCol

      Double

      "label"

      预测标签

      featuresCol

      Vector

      "features"

      特征标签

    5. 输入:paramMap、paramMaps、firstParamPair、otherParamPairs,fit接口的模型参数,说明如下。

      Param name

      Type(s)

      Example

      Description

      paramMap

      ParamMap

      ParamMap(A.c -> b)

      将b的值赋给模型A的参数c

      paramMaps

      Array[ParamMa p]

      Array[ParamMa p](n)

      形成n个ParamMap模型参数列表

      firstParamPair

      ParamPair

      ParamPair(A.c, b)

      将b的值赋给模型A的参数c

      otherParamPair s

      ParamPair

      ParamPair(A.e, f)

      将f的值赋给模型

      A的参数e

    6. 算法参数

      算法参数

      def setCheckpointInterval(value: Int): RandomForestClassifier.this.type

      def setFeatureSubsetStrategy(value: String): RandomForestClassifier.this.type

      def setFeaturesCol(value: String): RandomForestClassifier

      def setImpurity(value: String): RandomForestClassifier.this.type

      def setLabelCol(value: String): RandomForestClassifier

      def setMaxBins(value: Int): RandomForestClassifier.this.t

      def setMaxDepth(value: Int): RandomForestClassifier.this.type

      def setMinInfoGain(value: Double): RandomForestClassifier.this.type

      def setMinInstancesPerNode(value: Int): RandomForestClassifier.this.type

      def setNumTrees(value: Int): RandomForestClassifier.this.type

      def setPredictionCol(value: String): RandomForestClassifier

      def setProbabilityCol(value: String): RandomForestClassifier

      def setRawPredictionCol(value: String): RandomForestClassifier

      def setSeed(value: Long): RandomForestClassifier.this.type

      def setSubsamplingRate(value: Double): RandomForestClassifier.this.type

      def setThresholds(value: Array[Double]): RandomForestClassifier

    7. 新增算法参数。

      参数名称

      spark conf参数名

      参数含义

      取值类型

      numTrainingD ataCopies

      spark.sophon.ml.rf. numTrainingDataC opies

      训练数据的副本数量

      Int,缺省值为 1,必须大于等于1

      broadcastVaria bles

      spark.sophon.ml.rf. broadcastVariables

      是否广播具有较大存储空间的变量

      Boolean,缺省值为false

      numPartsPerTr ainingDataCop y

      spark.sophon.ml.rf. numPartsPerTraini ngDataCopy

      单个训练数据副本的分区数

      Int,必须大于等于0,缺省值为0,0为不重新分区

      binnedFeature sDataType

      spark.sophon.ml.rf. binnedFeaturesDat aType

      训练样本数据中特征的存储格式

      String,可支持存储格式:array、 fasthashmap,缺省值为array

      参数及fit代码接口示例:

      import org.apache.spark.ml.param.{ParamMap, ParamPair}
      
      val rf= new RandomForestClassifier()//定义随机森林分类器
      
      //定义def fit(dataset: Dataset[_], paramMap: ParamMap) 接口参数
      val paramMap = ParamMap(rf.maxDepth -> maxDepth).put(rf.numTrees, numTrees)
      
      // 定义def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): 接口参数
      val paramMaps = new Array[ParamMap](2)
      for (i <- 0 to  paramMaps.size) {
      paramMaps(i) = ParamMap(rf.maxDepth -> maxDepth)
      .put(rf.numTrees, numTrees)
      }//对paramMaps进行赋值
      
      // 定义def fit(dataset: Dataset[_], firstParamPair: ParamPair[_], otherParamPairs: ParamPair[_]*) 接口参数
      val firstParamPair= ParamPair(rf.maxDepth, maxDepth1)
      val otherParamPairs_1st= ParamPair(rf.maxDepth, maxDepth2)
      val otherParamPairs_2nd= ParamPair(rf.numTrees, numTrees)
      
      // 调用各个fit接口
      model = rf.fit(trainingData)
      model = rf.fit(trainingData, paramMap)
      models = rf.fit(trainingData, paramMaps)
      model = rf.fit(trainingData, firstParamPair, otherParamPairs_1st, otherParamPairs_2nd)
    8. 新增参数设置示例代码:
      // spark: SparkSession
      spark.conf.set("spark.sophon.ml.rf.binnedFeaturesDataType", binnedFeaturesType)
      spark.conf.set("spark.sophon.ml.rf.numTrainingDataCopies", numCopiesInput)
      spark.conf.set("spark.sophon.ml.rf.numPartsPerTrainingDataCopy", pt)
      spark.conf.set("spark.sophon.ml.rf.broadcastVariables", bcVariables)
      val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
    9. 输出:RFClassificationModel,RF分类模型,模型预测时的输出字段。

      Param name

      Type(s)

      Default

      Description

      predictionCol

      Double

      "prediction"

      Predicted label

  • 使用样例
    fit(dataset: Dataset[_]): RFClassificationModel样例:
    import org.apache.spark.ml.Pipeline
    import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
    import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
    import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
    
    // Load and parse the data file, converting it to a DataFrame.
    val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
    
    // Index labels, adding metadata to the label column.
    // Fit on whole dataset to include all labels in index.
    val labelIndexer = new StringIndexer()
    .setInputCol("label")
    .setOutputCol("indexedLabel")
    .fit(data)
    
    // Automatically identify categorical features, and index them.
    // Set maxCategories so features with > 4 distinct values are treated as continuous.
    val featureIndexer = new VectorIndexer()
    .setInputCol("features")
    .setOutputCol("indexedFeatures")
    .setMaxCategories(4)
    .fit(data)
    
    // Split the data into training and test sets (30% held out for testing).
    val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
    
    // Train a RandomForest model.
    val rf = new RandomForestClassifier()
    .setLabelCol("indexedLabel")
    .setFeaturesCol("indexedFeatures")
    .setNumTrees(10)
    
    // Convert indexed labels back to original labels. val labelConverter = new IndexToString()
    .setInputCol("prediction")
    .setOutputCol("predictedLabel")
    .setLabels(labelIndexer.labels)
    
    // Chain indexers and forest in a Pipeline.
    val pipeline = new Pipeline()
    .setStages(Array(labelIndexer, featureIndexer, rf, labelConverter))
    
    // Train model. This also runs the indexers.
    val model = pipeline.fit(trainingData)
    
    // Make predictions.
    val predictions = model.transform(testData)
    
    // Select example rows to display.
    predictions.select("predictedLabel", "label", "features").show(5)
    
    // Select (prediction, true label) and compute test error.
    val evaluator = new MulticlassClassificationEvaluator()
    .setLabelCol("indexedLabel")
    .setPredictionCol("prediction")
    .setMetricName("accuracy")
    val accuracy = evaluator.evaluate(predictions)
    println(s"Test Error = ${(1.0 - accuracy)}")
    
    val rfModel = model.stages(2).asInstanceOf[RandomForestClassificationModel]
    println(s"Learned classification forest model:\n ${rfModel.toDebugString}")
  • 结果样例:
    +--------------+-----+--------------------+
    |predictedLabel|label|            features|
    +--------------+-----+--------------------+
    |           1.0|  1.0|(47236,[270,439,5...|
    |           1.0|  1.0|(47236,[3023,6093...|
    |          -1.0| -1.0|(47236,[270,391,4...|
    |          -1.0| -1.0|(47236,[3718,3723...|
    |           1.0|  1.0|(47236,[729,760,1...|
    +--------------+-----+--------------------+
    only showing top 5 rows
    Test Error = 0.06476632743800015

ML Regression API

  • 功能描述

    传入RDD格式的样本数据,调用训练接口,输出随机森林分类模型。

  • 输入输出
    1. 包名:package org.apache.spark.mllib.tree
    2. 类名:RandomForestRegressor
    3. 方法名:fit
    4. 输入:Dataset[_],训练样本数据,必须字段如下。

      Param name

      Type(s)

      Default

      Description

      labelCol

      Double

      "label"

      预测标签

      featuresCol

      Vector

      "features"

      特征标签

    5. 输入:paramMap、paramMaps、firstParamPair、otherParamPairs,fit接口的模型参数,说明如下。

      Param name

      Type(s)

      Example

      Description

      paramMap

      ParamMap

      ParamMap(A.c -> b)

      将b的值赋给模型A的参数c

      paramMaps

      Array[ParamMa p]

      Array[ParamMa p](n)

      形成n个ParamMap模型参数列表

      firstParamPair

      ParamPair

      ParamPair(A.c, b)

      将b的值赋给模型A的参数c

      otherParamPair s

      ParamPair

      ParamPair(A.e, f)

      将f的值赋给模型A的参数e

    6. 算法参数。

      算法参数

      def setCheckpointInterval(value: Int): RandomForestRegressor.this.type

      def setFeatureSubsetStrategy(value: String): RandomForestRegressor.this.type

      def setFeaturesCol(value: String): RandomForestRegressor

      def setImpurity(value: String): RandomForestRegressor.this.type

      def setLabelCol(value: String): RandomForestRegressor

      def setMaxBins(value: Int): RandomForestRegressor.this.type

      def setMaxDepth(value: Int): RandomForestRegressor.this.type

      def setMinInfoGain(value: Double): RandomForestRegressor.this.type

      def setMinInstancesPerNode(value: Int): RandomForestRegressor.this.type

      def setNumTrees(value: Int): RandomForestRegressor.this.type

      def setPredictionCol(value: String): RandomForestRegressor

      def setSeed(value: Long): RandomForestRegressor.this.type

      def setSubsamplingRate(value: Double):RandomForestRegressor.this.type

    7. 新增算法参数。

      参数名称

      spark conf参数名

      参数含义

      取值类型

      numTrainingD ataCopies

      spark.sophon.ml.rf. numTrainingDataC opies

      训练数据的副本数量

      Int,缺省值为 1,必须大于等于1

      broadcastVaria bles

      spark.sophon.ml.rf. broadcastVariables

      是否广播具有较大存储空间的变量

      Boolean,缺省值为false

      numPartsPerTr ainingDataCop y

      spark.sophon.ml.rf. numPartsPerTraini ngDataCopy

      单个训练数据副本的分区数

      Int,必须大于等于0,缺省值为0,0->不重新分区

      binnedFeature sDataType

      spark.sophon.ml.rf. binnedFeaturesDat aType

      训练样本数据中特征的存储格式

      String,取值范围:array或者fasthashmap,缺省值为array

      参数及fit代码接口示例:

      import org.apache.spark.ml.param.{ParamMap, ParamPair}
      
      rf= new RandomForestRegressor() //定义回归模型
      
      //定义def fit(dataset: Dataset[_], paramMap: ParamMap) 接口参数
      val paramMap = ParamMap(rf.maxDepth -> maxDepth)
      .put(rf.numTrees, numTrees)
      
      // 定义def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): 接口参数
      val paramMaps = new Array[ParamMap](2)
      for (i <- 0 to  paramMaps.size) {
      paramMaps(i) = ParamMap(rf.maxDepth -> maxDepth)
      .put(rf.numTrees, numTrees)
      }//对paramMaps进行赋值
      
      // 定义def fit(dataset: Dataset[_], firstParamPair: ParamPair[_], otherParamPairs: ParamPair[_]*) 接口参数
      val firstParamPair= ParamPair(rf.maxDepth, maxDepth1)
      val otherParamPairs_1st= ParamPair(rf.maxDepth, maxDepth2)
      val otherParamPairs_2nd= ParamPair(rf.numTrees, numTrees)
      
      // 调用各个fit接口
      model = rf.fit(trainingData)
      model = rf.fit(trainingData, paramMap)
      models = rf.fit(trainingData, paramMaps)
      model = rf.fit(trainingData, firstParamPair, otherParamPairs_1st, otherParamPairs_2nd)
    8. 新增参数设置示例代码。
      // spark: SparkSession
      spark.conf.set("spark.sophon.ml.rf.binnedFeaturesDataType", binnedFeaturesType)
      spark.conf.set("spark.sophon.ml.rf.numTrainingDataCopies", numCopiesInput)
      spark.conf.set("spark.sophon.ml.rf.numPartsPerTrainingDataCopy", pt)
      spark.conf.set("spark.sophon.ml.rf.broadcastVariables", bcVariables)
      val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
    9. 输出:RFRegressionModel或Seq[RFRegressionModel],GBDT回归模型,模型预测时的输出字段如下。

      Param name

      Type(s)

      Default

      Description

      predictionCol

      Double

      "prediction"

      Predicted label

  • 使用样例
    fit(dataset: Dataset[_]): RFClassificationModel样例:
    import org.apache.spark.ml.Pipeline
    import org.apache.spark.ml.evaluation.RegressionEvaluator
    import org.apache.spark.ml.feature.VectorIndexer
    import org.apache.spark.ml.regression.{RandomForestRegressionModel, RandomForestRegressor}
    
    // Load and parse the data file, converting it to a DataFrame.
    val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
    
    // Automatically identify categorical features, and index them.
    // Set maxCategories so features with > 4 distinct values are treated as continuous.
    val featureIndexer = new VectorIndexer()
    .setInputCol("features")
    .setOutputCol("indexedFeatures")
    .setMaxCategories(4)
    .fit(data)
    
    // Split the data into training and test sets (30% held out for testing).
    val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
    
    // Train a RandomForest model.
    val rf = new RandomForestRegressor()
    .setLabelCol("label")
    .setFeaturesCol("indexedFeatures")
    
    // Chain indexer and forest in a Pipeline.
    val pipeline = new Pipeline()
    .setStages(Array(featureIndexer, rf))
    
    // Train model. This also runs the indexer.
    val model = pipeline.fit(trainingData)
    
    // Make predictions.
    val predictions = model.transform(testData)
    
    // Select example rows to display.
    predictions.select("prediction", "label", "features").show(5)
    
    // Select (prediction, true label) and compute test error.
    val evaluator = new RegressionEvaluator()
    .setLabelCol("label")
    .setPredictionCol("prediction")
    .setMetricName("rmse")
    val rmse = evaluator.evaluate(predictions)
    println(s"Root Mean Squared Error (RMSE) on test data = $rmse")
    
    val rfModel = model.stages(1).asInstanceOf[RandomForestRegressionModel]
    println(s"Learned regression forest model:\n ${rfModel.toDebugString}")
  • 结果样例
    +----------+-----+--------------------+
    |prediction|label|            features|
    +----------+-----+--------------------+
    |      0.51|  0.3|(1000,[0,1,2,3,4,...|
    +----------+-----+--------------------+
    
    root mean squared error = 0.21000000000000002

接口适用性说明:

  • 本算法接口适用于基于鲲鹏服务器的HDP大数据平台,其中Java开发环境要求jdk1.8及以上版本,Spark开发环境要求2.3.2版本, 低的HDP版本是3.1.0。
  • 本算法运行在HDP大数据平台需部署的组件需要包括:HDFS、Spark2、Yarn、 ZooKeeper、Hive、MapReduce2。