Evaluators computes Metrics from predictions. Available evaluators are:
val sqlContext = new SQLContext(sc) import sqlContext.implicits._ import org.apache.spark.mllib.util.MLUtils import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.classification.RandomForestClassifier import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator import org.apache.spark.ml.feature.{IndexToString, VectorIndexer, StringIndexer} import org.apache.spark.ml.Pipeline import org.apache.spark.ml.classification.BinaryLogisticRegressionSummary import org.apache.spark.ml.evaluation.{RegressionEvaluator, MulticlassClassificationEvaluator} import org.apache.spark.ml.regression.RandomForestRegressor val data = MLUtils.loadLibSVMFile(sc, "./data/sample_libsvm_data.txt").toDF() val Array(train, test) = data.randomSplit(Array(0.7, 0.3)) /** BinaryClassificationEvaluator **/ val logr = new LogisticRegression().setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8) val modelLog = logr.fit(train) println(s"Weights: ${modelLog.coefficients} \n Intercept: ${modelLog.intercept}") // Get predictions on test data val predictionsLog = modelLog.transform(test) // Define evaluator val evaluatorBinary = new BinaryClassificationEvaluator().setLabelCol("label").setRawPredictionCol("rawPrediction").setMetricName("areaUnderROC") // Run Evaluation. The area under the ROC curve ranges from 0.5 and 1.0 with larger values indicative of better fit val roc = evaluatorBinary.evaluate(predictionsLog) /** MulticlassClassificationEvaluator **/ val labelIndexer = new StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(data) val vectorIndexer = new VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").setMaxCategories(4).fit(data) val indexToString = new IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(labelIndexer.labels) val classifierRF = new RandomForestClassifier().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures").setNumTrees(3) val pipelineRF = new Pipeline().setStages(Array(labelIndexer, vectorIndexer, classifierRF, indexToString)) val modelRF = pipelineRF.fit(train) // Get predictions on test data val predictionsRF = modelRF.transform(test) // Define MulticlassClassificationEvaluator val evaluatorMultiClass = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("precision") val accuracy = evaluatorMultiClass.evaluate(predictionsRF) println(s"Test Error: ${1 - accuracy}") /** RegressionEvaluator **/ val regressorRF = new RandomForestRegressor().setLabelCol("label").setPredictionCol("indexedFeatures") val pipelineRFR = new Pipeline().setStages(Array(vectorIndexer, regressorRF)) val modelRFR = pipelineRFR.fit(train) // Get predictions on test data val predictionRFR = modelRFR.transform(test) // Define RegressionEvaluator val evaluatorReg = new RegressionEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("rmse") val rmse = evaluatorReg.evaluate(predictionRFR) println(s"RMSE: $rmse") /** BinaryLogisticRegressionSummary **/ val summaryLog = modelLog.summary.asInstanceOf[BinaryLogisticRegressionSummary] println(s"areaUnderCurve: ${summaryLog.areaUnderROC}") val fMeasure = summaryLog.fMeasureByThreshold val maxFMeasure = fMeasure.agg("F-Measure" -> "Max").head().getDouble(0) val bestThreshold = fMeasure.where($"F-Measure" === maxFMeasure).select("threshold").head().getDouble(0) println(s"MaxFMeasure: $maxFMeasure & bestThreshold: $bestThreshold")
0 Comments
val sqlContext = new SQLContext(sc) import sqlContext.implicits._ import org.apache.spark.ml.Pipeline import org.apache.spark.mllib.util.MLUtils import org.apache.spark.ml.feature.{StringIndexer, IndexToString, VectorIndexer} import org.apache.spark.ml.classification.RandomForestClassifier import org.apache.spark.ml.classification.RandomForestClassificationModel import org.apache.spark.ml.regression.RandomForestRegressor import org.apache.spark.ml.classification.GBTClassifier import org.apache.spark.ml.classification.GBTClassificationModel val data = MLUtils.loadLibSVMFile(sc, "./data/sample_libsvm_data.txt").toDF() val labelIndexer = new StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(data) val vectorIndexer = new VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").setMaxCategories(4).fit(data) val indexToString = new IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(labelIndexer.labels) // RandomForestClassifier Example val classifier = new RandomForestClassifier().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures").setNumTrees(3) val Array(train, test) = data.randomSplit(Array(0.7, 0.3)) val stages = Array(labelIndexer, vectorIndexer, classifier, indexToString) val pipeline = new Pipeline().setStages(stages) val model = pipeline.fit(train) val modelRFC = model.stages(2).asInstanceOf[RandomForestClassificationModel] // to see feature Importances println(modelRFC.featureImportances) // to inspect rules of each tree in RF println(modelRFC.toDebugString) val predictions = model.transform(test) predictions.show() // RandomForestRegressor Example val regressor = new RandomForestRegressor().setLabelCol("label").setFeaturesCol("indexedFeatures") val stagesReg = Array(vectorIndexer, regressor) val pipelineReg = new Pipeline().setStages(stagesReg) val modelReg = pipelineReg.fit(train) val predictionsReg = modelReg.transform(test) predictionsReg.show() // GBT Classification Example val classifierGBT = new GBTClassifier().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures").setMaxIter(10) val stagesGBT = Array(labelIndexer, vectorIndexer, classifierGBT, indexToString) val pipelineGBTC = new Pipeline().setStages(stagesGBT) val modelGBTC = pipelineGBTC.fit(train) val modelGBTTree = modelGBTC.stages(2).asInstanceOf[GBTClassificationModel] println(modelGBTTree.toDebugString) val predictionsGBTC = modelGBTC.transform(test) println(predictionsGBTC.show()) // GBT Regression Example val regressorGBT = new GBTRegressor().setLabelCol("label").setFeaturesCol("indexedFeatures").setMaxIter(10) val pipelineGBTR = new Pipeline().setStages(Array(vectorIndexer, regressorGBT)) val modelGBTR = pipelineGBTR.fit(train) val predictionsGBTR = modelGBTR.transform(test) println(predictionsGBTR.show())
val sqlContext = new SQLContext(sc) import sqlContext.implicits._ import org.apache.spark.ml.classification.DecisionTreeClassifier import org.apache.spark.ml.classification.DecisionTreeClassificationModel import org.apache.spark.ml.Pipeline import org.apache.spark.mllib.util.MLUtils import org.apache.spark.ml.feature.{StringIndexer, IndexToString, VectorIndexer} // Decision Tree Classifier val classifier = new DecisionTreeClassifier().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures") val data = MLUtils.loadLibSVMFile(sc, "./data/sample_libsvm_data.txt").toDF() val labelIndexer = new StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(data) val indexToString = new IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(labelIndexer.labels) val vectorIndexer = new VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").setMaxCategories(4).fit(data) val stages = Array(labelIndexer, vectorIndexer, classifier, indexToString) val pipeline = new Pipeline().setStages(stages) val Array(train, test) = data.randomSplit(Array(0.7, 0.3)) val model = pipeline.fit(train) val treeModel = model.stages(2).asInstanceOf[DecisionTreeClassificationModel] // Print the tree model treeModel.toDebugString val prediction = model.transform(test) prediction.show() // Decision Tree Regressor import org.apache.spark.ml.regression.DecisionTreeRegressor import org.apache.spark.ml.regression.DecisionTreeRegressionModel val regressor = new DecisionTreeRegressor().setLabelCol("label").setFeaturesCol("indexedFeatures") val pipelineReg = new Pipeline().setStages(Array(vectorIndexer, regressor)) val modelReg = pipelineReg.fit(train) val treeModelReg = modelReg.stages(1).asInstanceOf[DecisionTreeRegressionModel] // Print the tree model treeModelReg.toDebugString val predictionReg = modelReg.transform(test) predictionReg.show() import org.apache.spark.SparkContext import org.apache.spark.ml.feature.{OneHotEncoder, IndexToString, StringIndexer} import org.apache.spark.mllib.linalg.{DenseVector, SparseVector} import org.apache.spark.sql.SQLContext import org.apache.spark.sql.functions._ val sqlContext = new SQLContext(sc) val df = sqlContext.createDataFrame(Seq( (0, "US"),(1, "IND"),(2, "AUS"),(3,"IND"),(4, "US") )).toDF("id", "nationality") // Using StringIndexer to convert categorical variable // Most common label will get value 0, then 1 so on.. val indexer = new StringIndexer().setInputCol("nationality").setOutputCol("index") val indexedDF = indexer.fit(df).transform(df) // converting back from index to nationality val converter = new IndexToString().setInputCol("predictedIndex").setOutputCol("predictedNationality") val predictions = indexedDF.selectExpr("index as predictedIndex") converter.transform(predictions) // using oneHotEncoder to convert categorical variable // It generates a sparse vector for each categorical variable. // oneHotEncoder generally excludes the last categorical variable. // if we want to include the last categorical variable, then we should set setDropLast to false. val oneHotEncoder = new OneHotEncoder().setInputCol("nationality").setOutputCol("nVector") val indexedDF1 = oneHotEncoder.transform(df) // to see the created nVector, either add a new column to the dataFrame val toDenseUDF = udf[DenseVector, SparseVector](_.toDense) indexedDF1.withColumn("denseVector", toDenseUDF(indexedDF1("nVector"))) // Or indexedDF1 foreach { e => val denseVector = e.getAs[SparseVector]("nVector").toDense println(s"${e(0)} ${e(1)} $denseVector") }
Mahalanobis Distance
import breeze.linalg._ import org.apache.spark.SparkContext import org.apache.spark.ml.feature.{StandardScaler, VectorAssembler} import org.apache.spark.mllib.linalg.{Vectors, Vector} import org.apache.spark.mllib.stat.Statistics import org.apache.spark.sql.SQLContext import org.apache.spark.sql.functions._ val sqlContext = new SQLContext(sc) val df = sqlContext.range(0, 10).select("id").withColumn("uniform", rand(10L)).withColumn("normal1", randn(10L)).withColumn("normal2", randn(11L)) val assembler = new VectorAssembler().setInputCols(Array("uniform", "normal1", "normal2")).setOutputCol("features") val df1 = assembler.transform(df) // Add outlier info val df2 = df1.select("id", "features").unionAll(sqlContext.createDataFrame(Seq((10, Vectors.dense(5,5,5))))) // Standardize the df2: This is important step in calculating mahalanobis distance. // When normalized to zero mean and unit standard deviation then correlation matrix is equal to covariance matrix val standardScalar = new StandardScaler().setInputCol("features").setOutputCol("scaledFeat").setWithMean(true).setWithStd(true) val scalarModel = standardScalar.fit(df2.select("id", "features")) val df2Scaled = scalarModel.transform(df2).select("id", "scaledFeat") //Compute the inverse covariance matrix val rddScaledFeat = df2Scaled.select("scaledFeat").rdd.map(_(0).asInstanceOf[Vector]) val corr = Statistics.corr(rddScaledFeat) val invCovariance = inv(new breeze.linalg.DenseMatrix(3, 3, corr.toArray)) // Note: DenseVector is from breeze.linalg library val mahalanobois = udf[Double, Vector] { v => val vB = DenseVector(v.toArray) vB.t * invCovariance * vB } val df2Mahalanobis = df2Scaled.withColumn("mahalanobis", mahalanobois(df2Scaled("scaledFeat"))) // Remove Outliers, lets say top 2 val ids = df2Mahalanobis.select("id", "mahalanobis").sort(df2Mahalanobis("mahalanobis").desc).drop("mahalanobis").collect() val idOutliers = ids map { _(0).asInstanceOf[Long] } slice(0, 2) df2.filter(s"id not in (${idOutliers.mkString(",")})").show()
Normalizer
import org.apache.spark.ml.feature._ import org.apache.spark.sql.SQLContext import org.apache.spark.sql.functions.{rand, randn} import org.apache.spark.{SparkContext, SparkConf} val sqlContext = new SQLContext(sc) val randomDF = sqlContext.range(0, 10).select("id").withColumn("uniform1", rand(10L)).withColumn("uniform2", rand(11L)).withColumn("normal", randn(10L)) val assembler = new VectorAssembler().setInputCols(Array("uniform1", "uniform2", "normal")).setOutputCol("features") val newRandomDF = assembler.transform(randomDF) newRandomDF.show() newRandomDF.select("id", "features").show() // Example of normalizer val normalizer = new Normalizer().setInputCol("features").setOutputCol("scaledFeat").setP(1.0) normalizer.transform(newRandomDF.select("id", "features")).show() // Example Standard Scalar // create a StandardScalar model val standardScalar = new StandardScaler().setInputCol("features").setOutputCol("scaledFeat").setWithStd(true).setWithMean(true) // Since standardScalar is a model we use the fit and pass DF as argument val standardScalarFit = standardScalar.fit(newRandomDF.select("id", "features")) standardScalarFit.transform(newRandomDF.select("id", "features")).show() // Example of MinMaxScalar val minMaxScalar = new MinMaxScaler().setInputCol("features").setOutputCol("scaledFeat").setMin(1).setMax(3) val minMaxScalarFit = minMaxScalar.fit(newRandomDF.select("id", "features")) minMaxScalarFit.transform(newRandomDF.select("id", "features")).show()
Tranformer
import org.apache.spark.ml.feature.{VectorAssembler, Tokenizer} import org.apache.spark.sql.SQLContext import org.apache.spark.sql.functions.{rand, randn} import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.param.ParamMap import org.apache.spark.mllib.linalg.Vectors // Tranformer Examples val sqlContext = new SQLContext(sc) val df = sqlContext.createDataFrame(Seq( (0, "Are new to Scala and want to work with other Scala developers"), (1, "Are experienced with Scala and want to mentor others"), (2, "Would like to offer a workshop or lightning talk.") )).toDF("label", "sentence") // Example of tokenizer val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words") val newDF = tokenizer.transform(df) newDF.show() // Example of vectorAssembler val randomDF = sqlContext.range(0, 10).select("id") .withColumn("uniform1", rand(10L)) .withColumn("uniform2", rand(11L)) .withColumn("normal", randn(10L)) val assembler = new VectorAssembler() .setInputCols(Array("uniform1", "uniform2", "normal")) .setOutputCol("features") val newRandomDF = assembler.transform(randomDF) newRandomDF.show() newRandomDF.select("id", "features").show() // Estimator Example val df = sqlContext.createDataFrame(Seq( (1.0, Vectors.dense(2.0, -1.0, 5.0)), (0.0, Vectors.dense(7.0, 9.0, 0.0)), (0.0, Vectors.dense(3.0, 4.0, 1.0)), (1.0, Vectors.dense(5.0, -3.0, 2.0)) )).toDF("label", "features") // define an estimator val lr = new LogisticRegression() //setRegParam regularization parameter that penalizes the predictors to avoid over-fitting lr.setMaxIter(10).setRegParam(0.01) val model = lr.fit(df) // output will have 3 new columns: rawPredictions(prediction interval), probabilities(associated probabilities // corresponding to prediction) intervals & prediction model.transform(df).show() // Using ParamMap val paramMap = ParamMap(lr.maxIter -> 20, lr.regParam -> 0.02) val model2 = lr.fit(df, paramMap) model2.transform(df).show()
val randomDF = df1.select("id").withColumn("uniform", rand(10L)).withColumn("normal", randn(10L)) println(randomDF.show()) // Handling Missing data val halfToNaN = udf[Double, Double](x => if(x > 0.5) Double.NaN else x) val negHalfToNaN = udf[Double, Double](x => if(x < - 0.5) Double.NaN else x) val nanRandomDF = randomDF.withColumn("uniformNan", halfToNaN(randomDF("uniform"))) .withColumn("normalNan", negHalfToNaN(randomDF("normal"))) .drop("uniform") .withColumnRenamed("uniformNan", "uniform") .drop("normal") .withColumnRenamed("normalNan", "normal") println(nanRandomDF.show()) // drop the rows with minNonNulls < 3 nanRandomDF.na.drop(minNonNulls = 3).show() // drop rows when all the specified columns are NaN nanRandomDF.na.drop("all", Array("uniform", "normal")).show() // drop rows when any the specified columns are NaN nanRandomDF.na.drop("any", Array("uniform", "normal")).show() // fill all nan values with 0.0 nanRandomDF.na.fill(0.0).show() // fill Uniform with columnMean value, fill takes valueMap Map("column" -> value) val uniformMean = nanRandomDF.filter("uniform <> 'NaN'").groupBy().agg(mean("uniform")).first()(0) nanRandomDF.na.fill(Map("uniform" -> uniformMean)).show() // column names except id val dfCols = nanRandomDF.columns.drop(1) val dfMeans = nanRandomDF.na.drop().groupBy().agg(mean("uniform"), mean("normal")).first().toSeq val meansMap = dfCols.zip(dfMeans).toMap nanRandomDF.na.fill(meansMap).show() // Replace all Nan in uniform column to 0 nanRandomDF.na.replace("uniform", Map(Double.NaN -> 0.0)).show()
import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.stat.test.ChiSqTestResult import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.mllib.linalg._ import org.apache.spark.mllib.stat.{KernelDensity, Statistics, MultivariateStatisticalSummary} import org.apache.spark.rdd.RDD import org.apache.spark.mllib.random.RandomRDDs._ // chi-squared test for goodnessOfFit val testVector = Vectors.dense(0.3, 0.1, 0.2, 0.1, 0.9, 0.05) val goodnessOfFitResult: ChiSqTestResult = Statistics.chiSqTest(testVector) println(s"goodnessOfFitResult p-value: ${goodnessOfFitResult.pValue} nullHypothesis: ${goodnessOfFitResult.nullHypothesis}") //chi-squared test for independence on matrix val testMatrix = Matrices.dense(numRows = 3, numCols = 2, Array(2.0,4.0,1.0,5.0,7.0,3.0)) val independenceResult: ChiSqTestResult = Statistics.chiSqTest(testMatrix) println(s"independenceResult p-value: ${independenceResult.pValue} nullHypothesis: ${independenceResult.nullHypothesis}") //chi-squared test for independence on labeledPoint val testLabeledPoint = sc.parallelize(Array( LabeledPoint(0, Vectors.dense(0.5, 0.4)), LabeledPoint(1, Vectors.dense(0.3, 0.1)), LabeledPoint(0, Vectors.dense(0.2, 0.8)) )) val independenceResult1: Array[ChiSqTestResult] = Statistics.chiSqTest(testLabeledPoint) independenceResult1 foreach { r => println(s"independenceResult1 p-value: ${r.pValue} nullHypothesis: ${r.nullHypothesis}") } // Kolmogorov - Smirnov Test for equality of distribution val testNormal: RDD[Double] = normalRDD(sc, size = 1000L, numPartitions = 1, seed = 90L) val testNormalResult = Statistics.kolmogorovSmirnovTest(testNormal, "norm", 0, 1) println(s"testNormalResult p-value: ${testNormalResult.pValue} nullHypothesis: ${testNormalResult.nullHypothesis}") // Kernel-Density Estimate val kd = new KernelDensity().setSample(testNormal).setBandwidth(0.1) val densities = kd.estimate((-2.0 to 2 by 0.5).toArray) println(s"Kernel densities ${densities foreach println}")
import org.apache.spark.mllib.linalg.distributed.IndexedRow // RandomSplit val data = sc.parallelize(1 to 10000000) val Array(train, test, validation) = data.randomSplit(Array(0.5, 0.25, 0.25), seed = 90L) println(s"splits train ${train.count()} test ${test.count()} validate ${validation.count()}") // StratifiedSampling val rows: RDD[IndexedRow] = sc.parallelize(Array( IndexedRow(0L, Vectors.dense(1.0, 6.0, 2.0)), IndexedRow(1L, Vectors.dense(3.0, 1.0, 3.0)), IndexedRow(1L, Vectors.dense(4.0, 2.0, 1.0)) )) // set probability to pick a sample of label/index 0L is 1.0 and probability to pick a sample of label/index 1L is 0.5 val fractions = Map(0L -> 1.0, 1L -> 0.5) val sample = rows map { case IndexedRow(i, v) => (i, v) } sampleByKey(withReplacement = false, fractions = fractions, seed = 90L) println(sample.collect()) |
Archives
October 2016
Categories
All
|