val randomDF = df1.select("id").withColumn("uniform", rand(10L)).withColumn("normal", randn(10L)) println(randomDF.show()) // Handling Missing data val halfToNaN = udf[Double, Double](x => if(x > 0.5) Double.NaN else x) val negHalfToNaN = udf[Double, Double](x => if(x < - 0.5) Double.NaN else x) val nanRandomDF = randomDF.withColumn("uniformNan", halfToNaN(randomDF("uniform"))) .withColumn("normalNan", negHalfToNaN(randomDF("normal"))) .drop("uniform") .withColumnRenamed("uniformNan", "uniform") .drop("normal") .withColumnRenamed("normalNan", "normal") println(nanRandomDF.show()) // drop the rows with minNonNulls < 3 nanRandomDF.na.drop(minNonNulls = 3).show() // drop rows when all the specified columns are NaN nanRandomDF.na.drop("all", Array("uniform", "normal")).show() // drop rows when any the specified columns are NaN nanRandomDF.na.drop("any", Array("uniform", "normal")).show() // fill all nan values with 0.0 nanRandomDF.na.fill(0.0).show() // fill Uniform with columnMean value, fill takes valueMap Map("column" -> value) val uniformMean = nanRandomDF.filter("uniform <> 'NaN'").groupBy().agg(mean("uniform")).first()(0) nanRandomDF.na.fill(Map("uniform" -> uniformMean)).show() // column names except id val dfCols = nanRandomDF.columns.drop(1) val dfMeans = nanRandomDF.na.drop().groupBy().agg(mean("uniform"), mean("normal")).first().toSeq val meansMap = dfCols.zip(dfMeans).toMap nanRandomDF.na.fill(meansMap).show() // Replace all Nan in uniform column to 0 nanRandomDF.na.replace("uniform", Map(Double.NaN -> 0.0)).show()
0 Comments
Summary Statistics for DataFrames
import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.sql.functions.{rand, randn} val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ // Statistics in DF val recordsDF = sc.parallelize(Array( Record("alpha", 1, 2), Record("beta", 3, 4), Record("gamma", 5, 6) )).toDF() val recordStats = recordsDF.describe() println(s"df.describe() output: ${recordStats.show()}") // Fetching the results from dataFrame val stddev = recordStats.filter("summary = 'stddev'").first().toSeq.toArray.drop(1) map { _.toString.toDouble } val val1Stats: Array[Double] = recordStats.select("val1") map { x => x(0).toString.toDouble } collect() println(val1Stats) // Statistics using groupBy to find min of column val1 and max of column val2 val grpBy1 = recordsDF.groupBy().agg(Map("val1" -> "min", "val2" -> "max")) val grpBy1Summary = grpBy1.first().toSeq.toArray.map { _.toString.toDouble } println(s"column Names: ${grpBy1.columns} \n Data: $grpBy1Summary") // More statistics, corr(), cov(), freqItems() val recordStatFun = recordsDF.stat recordStatFun.corr("val1", "val2") recordStatFun.cov("val1", "val2") recordStatFun.freqItems(Seq("val1"), 0.3) // Sampling on dataFrames val df = sqlContext.createDataFrame(Seq((1,10),(2, 11),(3,12),(4,13))).toDF("key", "value") df.sample(withReplacement = false, fraction = 0.4, seed = 90L) val Array(train, test) = df.randomSplit(weights = Array(0.7, 0.3), seed = 90L) println(train.show()) println(test.show()) val stratifiedSampling = df.stat.sampleBy("key", fractions = Map(1 -> 0.7, 2 -> 0.7), seed = 90L) println(stratifiedSampling.show()) // Random Data Generation val df1 = sqlContext.range(0, 10) val newDF = df1.select("id").withColumn("uniform", rand(10L)).withColumn("normal", randn(10L)) println(newDF.show())
import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.stat.test.ChiSqTestResult import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.mllib.linalg._ import org.apache.spark.mllib.stat.{KernelDensity, Statistics, MultivariateStatisticalSummary} import org.apache.spark.rdd.RDD import org.apache.spark.mllib.random.RandomRDDs._ // chi-squared test for goodnessOfFit val testVector = Vectors.dense(0.3, 0.1, 0.2, 0.1, 0.9, 0.05) val goodnessOfFitResult: ChiSqTestResult = Statistics.chiSqTest(testVector) println(s"goodnessOfFitResult p-value: ${goodnessOfFitResult.pValue} nullHypothesis: ${goodnessOfFitResult.nullHypothesis}") //chi-squared test for independence on matrix val testMatrix = Matrices.dense(numRows = 3, numCols = 2, Array(2.0,4.0,1.0,5.0,7.0,3.0)) val independenceResult: ChiSqTestResult = Statistics.chiSqTest(testMatrix) println(s"independenceResult p-value: ${independenceResult.pValue} nullHypothesis: ${independenceResult.nullHypothesis}") //chi-squared test for independence on labeledPoint val testLabeledPoint = sc.parallelize(Array( LabeledPoint(0, Vectors.dense(0.5, 0.4)), LabeledPoint(1, Vectors.dense(0.3, 0.1)), LabeledPoint(0, Vectors.dense(0.2, 0.8)) )) val independenceResult1: Array[ChiSqTestResult] = Statistics.chiSqTest(testLabeledPoint) independenceResult1 foreach { r => println(s"independenceResult1 p-value: ${r.pValue} nullHypothesis: ${r.nullHypothesis}") } // Kolmogorov - Smirnov Test for equality of distribution val testNormal: RDD[Double] = normalRDD(sc, size = 1000L, numPartitions = 1, seed = 90L) val testNormalResult = Statistics.kolmogorovSmirnovTest(testNormal, "norm", 0, 1) println(s"testNormalResult p-value: ${testNormalResult.pValue} nullHypothesis: ${testNormalResult.nullHypothesis}") // Kernel-Density Estimate val kd = new KernelDensity().setSample(testNormal).setBandwidth(0.1) val densities = kd.estimate((-2.0 to 2 by 0.5).toArray) println(s"Kernel densities ${densities foreach println}")
import org.apache.spark.mllib.linalg.distributed.IndexedRow // RandomSplit val data = sc.parallelize(1 to 10000000) val Array(train, test, validation) = data.randomSplit(Array(0.5, 0.25, 0.25), seed = 90L) println(s"splits train ${train.count()} test ${test.count()} validate ${validation.count()}") // StratifiedSampling val rows: RDD[IndexedRow] = sc.parallelize(Array( IndexedRow(0L, Vectors.dense(1.0, 6.0, 2.0)), IndexedRow(1L, Vectors.dense(3.0, 1.0, 3.0)), IndexedRow(1L, Vectors.dense(4.0, 2.0, 1.0)) )) // set probability to pick a sample of label/index 0L is 1.0 and probability to pick a sample of label/index 1L is 0.5 val fractions = Map(0L -> 1.0, 1L -> 0.5) val sample = rows map { case IndexedRow(i, v) => (i, v) } sampleByKey(withReplacement = false, fractions = fractions, seed = 90L) println(sample.collect())
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.mllib.linalg._ import org.apache.spark.mllib.stat.Statistics import org.apache.spark.mllib.stat.MultivariateStatisticalSummary import org.apache.spark.rdd.RDD import org.apache.spark.mllib.random.RandomRDDs._ val observations: RDD[Vector] = sc.parallelize(Array( Vectors.dense(1.0, 2.0, 3.0), Vectors.dense(4.0, 3.0, 7.0), Vectors.dense(5.0, 6.0, 4.0) )) val x: RDD[Double] = sc.parallelize(Array(1.0, 2.0, 3.0)) val y: RDD[Double] = sc.parallelize(Array(3.0, 2.0, 1.0)) val stats: MultivariateStatisticalSummary = Statistics.colStats(observations) println(stats.mean) println(stats.count) val corr: Double = Statistics.corr(x, y, "pearson") println(corr) val corrMatrix: Matrix = Statistics.corr(observations, "pearson") println(corrMatrix) val rand1: RDD[Double] = poissonRDD(sc, mean=1, size=1000000L, numPartitions = 10) println(s"Rand1 mean ${rand1.mean()} variance ${rand1.variance()}") val rand2: RDD[Vector] = normalVectorRDD(sc, numRows = 1000L, numCols = 10, numPartitions = 10) val rand2Stats = Statistics.colStats(rand2) println(s"Rand2 mean ${rand2Stats.mean} variance ${rand2Stats.variance}")
Chaining of filter and map operations can be compressed into one operation, otherwise we end up with more garbage and more time spent building the final collection. This can be accomplished by using collect operation.
Below is the example: val x = List((1,2),(2,3),(3,4),(4,5),(5,6)) x.filter(_._1 % 2 == 0).map(_._2) # Above chaining can be replace by collect x.collect { case x if (x._1 % 2 == 0) => x._2 }
View: A view runs transformations as functional composition instead of as a series of intermediate collections.
(1 to 1000000).view map (_ + 5) map (_ * 2)
using lazy evaluation on collections doesn't always guarantee optimized performance. Lazy evaluation requires the creation of an additional closure. If creating the closures takes longer than creating intermediate collections, the lazy version will run slower. Typically, for smaller value of n strict version will run faster than Lazy version.
package com.examples.algorithms object MergeSortAlgorithm { def merge(left: List[Int], right: List[Int]): List[Int] = { (left, right) match { case (l, Nil) => l case (Nil, r) => r case(l :: l1, r :: r1) => if(l < r) l :: merge(l1, right) else r :: merge(left, r1) } } def run(input: List[Int]): List[Int] = { val n = input.length / 2 if (n == 0) input else { val (left, right) = input splitAt n merge(run(left), run(right)) } } } |
Archives
October 2016
Categories
All
|