Dataset: lr_data.txt
Tolerance of +/- 0.2 between the predicted label and actual is been computed. Model can be tuned by playing with the split or changing the stepSize and numIterations. build.sbtname := "WordCountExample" version := "1.0" scalaVersion := "2.11.7" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "1.4.1", "org.apache.spark" %% "spark-mllib" % "1.4.0" ) Main.scalaobject Main { def main(args: Array[String]){ // Run LinearRegressionExample LinearRegression.run( master = Some("local[*]"), args = args.toList ) System.exit(0) } } LinearRegression.scalaimport org.apache.spark.mllib.regression._ import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.rdd.RDD import org.apache.spark.{SparkContext, SparkConf} object LinearRegression { private val AppName = "LinearRegressionSpark" val numIterations = 1000 val stepSize = 1.5 private def predict(model: RegressionModel, test: RDD[LabeledPoint]) = { test.map {point => val prediction = model.predict(point.features) (point.label, prediction) } } private def MSE(predictions: RDD[(Double, Double)]) = { predictions map { case (actual, prediction) => Math.pow(actual - prediction, 2) } reduce { _ + _ } } / predictions.count() def run(master: Option[String], args: List[String]){ val sc = { val conf = new SparkConf().setAppName(AppName) for { m <- master }{ conf.setMaster(m) } new SparkContext(conf) } val raw = sc.textFile(args(0)) val rawData = raw.map { line => val entries = line.split(",") LabeledPoint(entries(0).toDouble, Vectors.dense(entries.slice(1, 11).map(_.toDouble))) }.cache() val splits = rawData.randomSplit(Array(0.8, 0.2), seed = 90L) val train = splits(0) val test = splits(1) //Regression val model: LinearRegressionModel = LinearRegressionWithSGD.train(train, numIterations, stepSize) //RidgeRegression val ridgeModel: RidgeRegressionModel = RidgeRegressionWithSGD.train(train, numIterations) //LassoRegression val lassoModel: LassoModel = LassoWithSGD.train(train, numIterations) val predictions = predict(model, test) val predictionsRidge = predict(ridgeModel, test) val predictionsLasso = predict(lassoModel, test) val mse = MSE(predictions) val mseRidge = MSE(predictionsRidge) val mseLasso = MSE(predictionsLasso) println(s"Linear Regression Results: Total: ${test.count()} MSE: $mse") println(s"Ridge Regression Results: Total: ${test.count()} MSE: $mseRidge") println(s"Lasso Regression Results: Total: ${test.count()} MSE: $mseLasso") } }
0 Comments
build.sbtname := "WordCountExample" version := "1.0" scalaVersion := "2.11.7" libraryDependencies += "org.apache.spark" %% "spark-core" % "1.4.1" Main.scalaimport org.apache.spark.SparkContext object Main { def main(args: Array[String]){ Wordcount.run( master = Some("local[*]"), args = args.toList ) System.exit(0) } } WordCount.scalaimport org.apache.spark.rdd.RDD import org.apache.spark.{SparkContext, SparkConf} object Wordcount { private val AppName = "WordCountSpark" private val Stopwords = scala.io.Source.fromFile("data/stopwords.txt").getLines().toList def run(master: Option[String], args: List[String]) { val sc = { val conf = new SparkConf().setAppName(AppName) for(m <- master){ conf.setMaster(m) } new SparkContext(conf) } val file: RDD[String] = sc.textFile(args(0)) val words = file flatMap sanitize val wordCounts: RDD[(String, Int)] = words map {word => (word, 1)} reduceByKey(_ + _) val top10Words = wordCounts sortBy (- _._2) take 10 top10Words.foreach(println) wordCounts.saveAsTextFile(args(1)) } private def sanitize(line: String): Seq[String] = { val allWords = line.toLowerCase.replaceAll("[^a-zA-Z0-9\\s]", "").split("\\s+") allWords filter {x => Stopwords.indexOf(x) < 0} } } |
Archives
April 2016
Categories |