Consider we have 3 validation functions, f1(), f2() and f3(), with each returning Try[A].
Now we would like to call a specific function g() if all three validations are success. def f1(i: Int): Try[Int] = if(i % 2 == 0) Success(i) else Failure(new Exception("Not divisible by 2")) def f2(i: Int): Try[Int] = if(i % 3 == 0) Success(i) else Failure(new Exception("Not divisible by 3")) def f3(i: Int): Try[Int] = if(i % 5 == 0) Success(i) else Failure(new Exception("Not divisible by 5")) def g(i: Int): Int = i * i
Traditional way of doing this is :
val x = 30 f1(x) match { case Success(_) => f2(x) match { case Success(_) => f3(x) match { case Success(_) => g(x) case Failure(ex) => println(ex.getMessage) } case Failure(ex) => println(ex.getMessage) } case Failure(ex) => println(ex.getMessage) }
Though above code is readable but not scalable, meaning the nesting goes on as number of validations increases. Repeating such recurring pattern is not a good design too.
Below is the code snippet that addresses this issue. We shall take the advantage of implicit class feature of scala. implicit class ExtendedTry[A](val x: Try[A]) extends AnyVal { def andElse[B >: A](y: => Try[B]): Try[B] = if (x.isSuccess) y else x } f1(x) andElse f2(x) andElse f3(x) match { case Success(_) => g(x) case Failure(ex) => println(ex.getMessage) }
0 Comments
Evaluators computes Metrics from predictions. Available evaluators are:
val sqlContext = new SQLContext(sc) import sqlContext.implicits._ import org.apache.spark.mllib.util.MLUtils import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.classification.RandomForestClassifier import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator import org.apache.spark.ml.feature.{IndexToString, VectorIndexer, StringIndexer} import org.apache.spark.ml.Pipeline import org.apache.spark.ml.classification.BinaryLogisticRegressionSummary import org.apache.spark.ml.evaluation.{RegressionEvaluator, MulticlassClassificationEvaluator} import org.apache.spark.ml.regression.RandomForestRegressor val data = MLUtils.loadLibSVMFile(sc, "./data/sample_libsvm_data.txt").toDF() val Array(train, test) = data.randomSplit(Array(0.7, 0.3)) /** BinaryClassificationEvaluator **/ val logr = new LogisticRegression().setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8) val modelLog = logr.fit(train) println(s"Weights: ${modelLog.coefficients} \n Intercept: ${modelLog.intercept}") // Get predictions on test data val predictionsLog = modelLog.transform(test) // Define evaluator val evaluatorBinary = new BinaryClassificationEvaluator().setLabelCol("label").setRawPredictionCol("rawPrediction").setMetricName("areaUnderROC") // Run Evaluation. The area under the ROC curve ranges from 0.5 and 1.0 with larger values indicative of better fit val roc = evaluatorBinary.evaluate(predictionsLog) /** MulticlassClassificationEvaluator **/ val labelIndexer = new StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(data) val vectorIndexer = new VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").setMaxCategories(4).fit(data) val indexToString = new IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(labelIndexer.labels) val classifierRF = new RandomForestClassifier().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures").setNumTrees(3) val pipelineRF = new Pipeline().setStages(Array(labelIndexer, vectorIndexer, classifierRF, indexToString)) val modelRF = pipelineRF.fit(train) // Get predictions on test data val predictionsRF = modelRF.transform(test) // Define MulticlassClassificationEvaluator val evaluatorMultiClass = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("precision") val accuracy = evaluatorMultiClass.evaluate(predictionsRF) println(s"Test Error: ${1 - accuracy}") /** RegressionEvaluator **/ val regressorRF = new RandomForestRegressor().setLabelCol("label").setPredictionCol("indexedFeatures") val pipelineRFR = new Pipeline().setStages(Array(vectorIndexer, regressorRF)) val modelRFR = pipelineRFR.fit(train) // Get predictions on test data val predictionRFR = modelRFR.transform(test) // Define RegressionEvaluator val evaluatorReg = new RegressionEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("rmse") val rmse = evaluatorReg.evaluate(predictionRFR) println(s"RMSE: $rmse") /** BinaryLogisticRegressionSummary **/ val summaryLog = modelLog.summary.asInstanceOf[BinaryLogisticRegressionSummary] println(s"areaUnderCurve: ${summaryLog.areaUnderROC}") val fMeasure = summaryLog.fMeasureByThreshold val maxFMeasure = fMeasure.agg("F-Measure" -> "Max").head().getDouble(0) val bestThreshold = fMeasure.where($"F-Measure" === maxFMeasure).select("threshold").head().getDouble(0) println(s"MaxFMeasure: $maxFMeasure & bestThreshold: $bestThreshold")
val sqlContext = new SQLContext(sc) import sqlContext.implicits._ import org.apache.spark.ml.regression.LinearRegression import org.apache.spark.mllib.util.MLUtils import org.apache.spark.ml.classification.LogisticRegression val data = MLUtils.loadLibSVMFile(sc, "./data/sample_libsvm_data.txt").toDF() val Array(train, test) = data.randomSplit(Array(0.7, 0.3)) // Logistic Regression val logr = new LogisticRegression().setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8) val modelLog = logr.fit(train) println(s"Weights: ${modelLog.coefficients} \n Intercept: ${modelLog.intercept}") // to get ObjectiveHistory - error of the model in every iteration val summaryLog = modelLog.summary println(s"Error of the model in every iteration: ${summaryLog.objectiveHistory}") // Linear Regression val linearReg = new LinearRegression().setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8) val modelLR = linearReg.fit(train) println(s"Weights: ${modelLR.coefficients} \n Intercept: ${modelLR.intercept}") val summaryLR = modelLR.summary println(s"Error of the model in every iteration: ${summaryLR.objectiveHistory}")
val sqlContext = new SQLContext(sc) import sqlContext.implicits._ import org.apache.spark.ml.Pipeline import org.apache.spark.mllib.util.MLUtils import org.apache.spark.ml.feature.{StringIndexer, IndexToString, VectorIndexer} import org.apache.spark.ml.classification.RandomForestClassifier import org.apache.spark.ml.classification.RandomForestClassificationModel import org.apache.spark.ml.regression.RandomForestRegressor import org.apache.spark.ml.classification.GBTClassifier import org.apache.spark.ml.classification.GBTClassificationModel val data = MLUtils.loadLibSVMFile(sc, "./data/sample_libsvm_data.txt").toDF() val labelIndexer = new StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(data) val vectorIndexer = new VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").setMaxCategories(4).fit(data) val indexToString = new IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(labelIndexer.labels) // RandomForestClassifier Example val classifier = new RandomForestClassifier().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures").setNumTrees(3) val Array(train, test) = data.randomSplit(Array(0.7, 0.3)) val stages = Array(labelIndexer, vectorIndexer, classifier, indexToString) val pipeline = new Pipeline().setStages(stages) val model = pipeline.fit(train) val modelRFC = model.stages(2).asInstanceOf[RandomForestClassificationModel] // to see feature Importances println(modelRFC.featureImportances) // to inspect rules of each tree in RF println(modelRFC.toDebugString) val predictions = model.transform(test) predictions.show() // RandomForestRegressor Example val regressor = new RandomForestRegressor().setLabelCol("label").setFeaturesCol("indexedFeatures") val stagesReg = Array(vectorIndexer, regressor) val pipelineReg = new Pipeline().setStages(stagesReg) val modelReg = pipelineReg.fit(train) val predictionsReg = modelReg.transform(test) predictionsReg.show() // GBT Classification Example val classifierGBT = new GBTClassifier().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures").setMaxIter(10) val stagesGBT = Array(labelIndexer, vectorIndexer, classifierGBT, indexToString) val pipelineGBTC = new Pipeline().setStages(stagesGBT) val modelGBTC = pipelineGBTC.fit(train) val modelGBTTree = modelGBTC.stages(2).asInstanceOf[GBTClassificationModel] println(modelGBTTree.toDebugString) val predictionsGBTC = modelGBTC.transform(test) println(predictionsGBTC.show()) // GBT Regression Example val regressorGBT = new GBTRegressor().setLabelCol("label").setFeaturesCol("indexedFeatures").setMaxIter(10) val pipelineGBTR = new Pipeline().setStages(Array(vectorIndexer, regressorGBT)) val modelGBTR = pipelineGBTR.fit(train) val predictionsGBTR = modelGBTR.transform(test) println(predictionsGBTR.show())
val sqlContext = new SQLContext(sc) import sqlContext.implicits._ import org.apache.spark.ml.classification.DecisionTreeClassifier import org.apache.spark.ml.classification.DecisionTreeClassificationModel import org.apache.spark.ml.Pipeline import org.apache.spark.mllib.util.MLUtils import org.apache.spark.ml.feature.{StringIndexer, IndexToString, VectorIndexer} // Decision Tree Classifier val classifier = new DecisionTreeClassifier().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures") val data = MLUtils.loadLibSVMFile(sc, "./data/sample_libsvm_data.txt").toDF() val labelIndexer = new StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(data) val indexToString = new IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(labelIndexer.labels) val vectorIndexer = new VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").setMaxCategories(4).fit(data) val stages = Array(labelIndexer, vectorIndexer, classifier, indexToString) val pipeline = new Pipeline().setStages(stages) val Array(train, test) = data.randomSplit(Array(0.7, 0.3)) val model = pipeline.fit(train) val treeModel = model.stages(2).asInstanceOf[DecisionTreeClassificationModel] // Print the tree model treeModel.toDebugString val prediction = model.transform(test) prediction.show() // Decision Tree Regressor import org.apache.spark.ml.regression.DecisionTreeRegressor import org.apache.spark.ml.regression.DecisionTreeRegressionModel val regressor = new DecisionTreeRegressor().setLabelCol("label").setFeaturesCol("indexedFeatures") val pipelineReg = new Pipeline().setStages(Array(vectorIndexer, regressor)) val modelReg = pipelineReg.fit(train) val treeModelReg = modelReg.stages(1).asInstanceOf[DecisionTreeRegressionModel] // Print the tree model treeModelReg.toDebugString val predictionReg = modelReg.transform(test) predictionReg.show()
import org.apache.spark.SparkContext import org.apache.spark.ml.feature.VectorAssembler import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.linalg.distributed.RowMatrix val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ val crimes = sqlContext.read.format("com.databricks.spark.csv").option("delimiter", ",").option("header", "true").option("inferSchema", "true").load("./data/UScrime2-colsLotsOfNAremoved.csv") // MLlib PCA requires Row Matrix not DataFrame, so lets convert val assembler = new VectorAssembler().setInputCols(crimes.columns).setOutputCol("features") val featuresDF = assembler.transform(crimes).select("features") val rddOfRows = featuresDF.rdd val rddOfVectors = rddOfRows map { _.get(0).asInstanceOf[Vector]} val mat = new RowMatrix(rddOfVectors) // compute top 10 PCs val pcs = mat.computePrincipalComponents(10) import org.apache.spark.SparkContext import org.apache.spark.ml.feature.{OneHotEncoder, IndexToString, StringIndexer} import org.apache.spark.mllib.linalg.{DenseVector, SparseVector} import org.apache.spark.sql.SQLContext import org.apache.spark.sql.functions._ val sqlContext = new SQLContext(sc) val df = sqlContext.createDataFrame(Seq( (0, "US"),(1, "IND"),(2, "AUS"),(3,"IND"),(4, "US") )).toDF("id", "nationality") // Using StringIndexer to convert categorical variable // Most common label will get value 0, then 1 so on.. val indexer = new StringIndexer().setInputCol("nationality").setOutputCol("index") val indexedDF = indexer.fit(df).transform(df) // converting back from index to nationality val converter = new IndexToString().setInputCol("predictedIndex").setOutputCol("predictedNationality") val predictions = indexedDF.selectExpr("index as predictedIndex") converter.transform(predictions) // using oneHotEncoder to convert categorical variable // It generates a sparse vector for each categorical variable. // oneHotEncoder generally excludes the last categorical variable. // if we want to include the last categorical variable, then we should set setDropLast to false. val oneHotEncoder = new OneHotEncoder().setInputCol("nationality").setOutputCol("nVector") val indexedDF1 = oneHotEncoder.transform(df) // to see the created nVector, either add a new column to the dataFrame val toDenseUDF = udf[DenseVector, SparseVector](_.toDense) indexedDF1.withColumn("denseVector", toDenseUDF(indexedDF1("nVector"))) // Or indexedDF1 foreach { e => val denseVector = e.getAs[SparseVector]("nVector").toDense println(s"${e(0)} ${e(1)} $denseVector") }
Mahalanobis Distance
import breeze.linalg._ import org.apache.spark.SparkContext import org.apache.spark.ml.feature.{StandardScaler, VectorAssembler} import org.apache.spark.mllib.linalg.{Vectors, Vector} import org.apache.spark.mllib.stat.Statistics import org.apache.spark.sql.SQLContext import org.apache.spark.sql.functions._ val sqlContext = new SQLContext(sc) val df = sqlContext.range(0, 10).select("id").withColumn("uniform", rand(10L)).withColumn("normal1", randn(10L)).withColumn("normal2", randn(11L)) val assembler = new VectorAssembler().setInputCols(Array("uniform", "normal1", "normal2")).setOutputCol("features") val df1 = assembler.transform(df) // Add outlier info val df2 = df1.select("id", "features").unionAll(sqlContext.createDataFrame(Seq((10, Vectors.dense(5,5,5))))) // Standardize the df2: This is important step in calculating mahalanobis distance. // When normalized to zero mean and unit standard deviation then correlation matrix is equal to covariance matrix val standardScalar = new StandardScaler().setInputCol("features").setOutputCol("scaledFeat").setWithMean(true).setWithStd(true) val scalarModel = standardScalar.fit(df2.select("id", "features")) val df2Scaled = scalarModel.transform(df2).select("id", "scaledFeat") //Compute the inverse covariance matrix val rddScaledFeat = df2Scaled.select("scaledFeat").rdd.map(_(0).asInstanceOf[Vector]) val corr = Statistics.corr(rddScaledFeat) val invCovariance = inv(new breeze.linalg.DenseMatrix(3, 3, corr.toArray)) // Note: DenseVector is from breeze.linalg library val mahalanobois = udf[Double, Vector] { v => val vB = DenseVector(v.toArray) vB.t * invCovariance * vB } val df2Mahalanobis = df2Scaled.withColumn("mahalanobis", mahalanobois(df2Scaled("scaledFeat"))) // Remove Outliers, lets say top 2 val ids = df2Mahalanobis.select("id", "mahalanobis").sort(df2Mahalanobis("mahalanobis").desc).drop("mahalanobis").collect() val idOutliers = ids map { _(0).asInstanceOf[Long] } slice(0, 2) df2.filter(s"id not in (${idOutliers.mkString(",")})").show()
Normalizer
import org.apache.spark.ml.feature._ import org.apache.spark.sql.SQLContext import org.apache.spark.sql.functions.{rand, randn} import org.apache.spark.{SparkContext, SparkConf} val sqlContext = new SQLContext(sc) val randomDF = sqlContext.range(0, 10).select("id").withColumn("uniform1", rand(10L)).withColumn("uniform2", rand(11L)).withColumn("normal", randn(10L)) val assembler = new VectorAssembler().setInputCols(Array("uniform1", "uniform2", "normal")).setOutputCol("features") val newRandomDF = assembler.transform(randomDF) newRandomDF.show() newRandomDF.select("id", "features").show() // Example of normalizer val normalizer = new Normalizer().setInputCol("features").setOutputCol("scaledFeat").setP(1.0) normalizer.transform(newRandomDF.select("id", "features")).show() // Example Standard Scalar // create a StandardScalar model val standardScalar = new StandardScaler().setInputCol("features").setOutputCol("scaledFeat").setWithStd(true).setWithMean(true) // Since standardScalar is a model we use the fit and pass DF as argument val standardScalarFit = standardScalar.fit(newRandomDF.select("id", "features")) standardScalarFit.transform(newRandomDF.select("id", "features")).show() // Example of MinMaxScalar val minMaxScalar = new MinMaxScaler().setInputCol("features").setOutputCol("scaledFeat").setMin(1).setMax(3) val minMaxScalarFit = minMaxScalar.fit(newRandomDF.select("id", "features")) minMaxScalarFit.transform(newRandomDF.select("id", "features")).show()
Tranformer
import org.apache.spark.ml.feature.{VectorAssembler, Tokenizer} import org.apache.spark.sql.SQLContext import org.apache.spark.sql.functions.{rand, randn} import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.param.ParamMap import org.apache.spark.mllib.linalg.Vectors // Tranformer Examples val sqlContext = new SQLContext(sc) val df = sqlContext.createDataFrame(Seq( (0, "Are new to Scala and want to work with other Scala developers"), (1, "Are experienced with Scala and want to mentor others"), (2, "Would like to offer a workshop or lightning talk.") )).toDF("label", "sentence") // Example of tokenizer val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words") val newDF = tokenizer.transform(df) newDF.show() // Example of vectorAssembler val randomDF = sqlContext.range(0, 10).select("id") .withColumn("uniform1", rand(10L)) .withColumn("uniform2", rand(11L)) .withColumn("normal", randn(10L)) val assembler = new VectorAssembler() .setInputCols(Array("uniform1", "uniform2", "normal")) .setOutputCol("features") val newRandomDF = assembler.transform(randomDF) newRandomDF.show() newRandomDF.select("id", "features").show() // Estimator Example val df = sqlContext.createDataFrame(Seq( (1.0, Vectors.dense(2.0, -1.0, 5.0)), (0.0, Vectors.dense(7.0, 9.0, 0.0)), (0.0, Vectors.dense(3.0, 4.0, 1.0)), (1.0, Vectors.dense(5.0, -3.0, 2.0)) )).toDF("label", "features") // define an estimator val lr = new LogisticRegression() //setRegParam regularization parameter that penalizes the predictors to avoid over-fitting lr.setMaxIter(10).setRegParam(0.01) val model = lr.fit(df) // output will have 3 new columns: rawPredictions(prediction interval), probabilities(associated probabilities // corresponding to prediction) intervals & prediction model.transform(df).show() // Using ParamMap val paramMap = ParamMap(lr.maxIter -> 20, lr.regParam -> 0.02) val model2 = lr.fit(df, paramMap) model2.transform(df).show() |
Archives
October 2016
Categories
All
|