val sqlContext = new org.apache.spark.sql.SQLContext(sc) // Load Json File Example scala> val jsDF = sqlContext.read.json("sample.json") jsDF: org.apache.spark.sql.DataFrame = [address: struct<city:string,state:string>, name: string] scala> jsDF.show() +-----------------+-------+ | address| name| +-----------------+-------+ | [Columbus,Ohio]| Yin| |[null,California]|Michael| +-----------------+-------+ scala> jsDF.printSchema root |-- address: struct (nullable = true) | |-- city: string (nullable = true) | |-- state: string (nullable = true) |-- name: string (nullable = true) scala> jsDF.columns res17: Array[String] = Array(address, name) scala> jsDF.explain == Physical Plan == Scan JSONRelation[file:/Users/XXX/Documents/SparkExamples/Examples/data/sample.json][address#4,name#5] scala> jsDF.count res19: Long = 2 scala> jsDF.orderBy("name") res20: org.apache.spark.sql.DataFrame = [address: struct<city:string,state:string>, name: string] scala> jsDF.orderBy("name").show() +-----------------+-------+ | address| name| +-----------------+-------+ |[null,California]|Michael| | [Columbus,Ohio]| Yin| +-----------------+-------+ scala> jsDF.dtypes res22: Array[(String, String)] = Array((address,StructType(StructField(city,StringType,true), StructField(state,StringType,true))), (name,StringType)) scala> jsDF.select("name","address.city") res23: org.apache.spark.sql.DataFrame = [name: string, city: string] scala> jsDF.select("name","address.city").show() +-------+--------+ | name| city| +-------+--------+ | Yin|Columbus| |Michael| null| +-------+--------+ scala> val jsTable = jsDF.registerTempTable("sample") jsTable: Unit = () scala> val content = sqlContext.sql("select name, address.city, address.state from sample") content: org.apache.spark.sql.DataFrame = [name: string, city: string, state: string] scala> content.collect().foreach(println) [Yin,Columbus,Ohio] [Michael,null,California] // Saving as Parquet file. scala> jsDF.saveAsParquetFile("sample.parquet") // Load Parquet File Example scala> val parquetDF = sqlContext.read.parquet("sample.parquet") parquetDF: org.apache.spark.sql.DataFrame = [address: struct<city:string,state:string>, name: string] scala> val parquetTable = parquetDF.registerTempTable("sample") parquetContent: Unit = () scala> val content = sqlContext.sql("select name, address.city, address.state from sample") content: org.apache.spark.sql.DataFrame = [name: string, city: string, state: string] scala> content.collect().foreach(println) [Yin,Columbus,Ohio] [Michael,null,California] // Loading TextFile/csv file. scala> val sampleRDD = sc.textFile("sample.csv") sampleRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21 // Programmatically Specifying the Schema scala> val schemaString = "name city zip" schemaString: String = name city zip scala> import org.apache.spark.sql.types.{StructType, StringType, StructField} import org.apache.spark.sql.types.{StructType, StringType, StructField} scala> val schemaRDD = StructType(schemaString.split(" ").map(e => StructField(e, StringType, true))) schemaRDD: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(city,StringType,true), StructField(zip,StringType,true)) // Create DataFrame Using schemaRDD // For that we need to create rowsRDD scala> val rowsRDD = sampleRDD.map(_.split(",")).map(e => org.apache.spark.sql.Row(e(0), e(1), e(2).trim)) scala> val df = sqlContext.createDataFrame(rowsRDD, schemaRDD) df: org.apache.spark.sql.DataFrame = [name: string, city: string, zip: string]
0 Comments
Dataset: lr_data.txt
Tolerance of +/- 0.2 between the predicted label and actual is been computed. Model can be tuned by playing with the split or changing the stepSize and numIterations. build.sbtname := "WordCountExample" version := "1.0" scalaVersion := "2.11.7" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "1.4.1", "org.apache.spark" %% "spark-mllib" % "1.4.0" ) Main.scalaobject Main { def main(args: Array[String]){ // Run LinearRegressionExample LinearRegression.run( master = Some("local[*]"), args = args.toList ) System.exit(0) } } LinearRegression.scalaimport org.apache.spark.mllib.regression._ import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.rdd.RDD import org.apache.spark.{SparkContext, SparkConf} object LinearRegression { private val AppName = "LinearRegressionSpark" val numIterations = 1000 val stepSize = 1.5 private def predict(model: RegressionModel, test: RDD[LabeledPoint]) = { test.map {point => val prediction = model.predict(point.features) (point.label, prediction) } } private def MSE(predictions: RDD[(Double, Double)]) = { predictions map { case (actual, prediction) => Math.pow(actual - prediction, 2) } reduce { _ + _ } } / predictions.count() def run(master: Option[String], args: List[String]){ val sc = { val conf = new SparkConf().setAppName(AppName) for { m <- master }{ conf.setMaster(m) } new SparkContext(conf) } val raw = sc.textFile(args(0)) val rawData = raw.map { line => val entries = line.split(",") LabeledPoint(entries(0).toDouble, Vectors.dense(entries.slice(1, 11).map(_.toDouble))) }.cache() val splits = rawData.randomSplit(Array(0.8, 0.2), seed = 90L) val train = splits(0) val test = splits(1) //Regression val model: LinearRegressionModel = LinearRegressionWithSGD.train(train, numIterations, stepSize) //RidgeRegression val ridgeModel: RidgeRegressionModel = RidgeRegressionWithSGD.train(train, numIterations) //LassoRegression val lassoModel: LassoModel = LassoWithSGD.train(train, numIterations) val predictions = predict(model, test) val predictionsRidge = predict(ridgeModel, test) val predictionsLasso = predict(lassoModel, test) val mse = MSE(predictions) val mseRidge = MSE(predictionsRidge) val mseLasso = MSE(predictionsLasso) println(s"Linear Regression Results: Total: ${test.count()} MSE: $mse") println(s"Ridge Regression Results: Total: ${test.count()} MSE: $mseRidge") println(s"Lasso Regression Results: Total: ${test.count()} MSE: $mseLasso") } } build.sbtname := "WordCountExample" version := "1.0" scalaVersion := "2.11.7" libraryDependencies += "org.apache.spark" %% "spark-core" % "1.4.1" Main.scalaimport org.apache.spark.SparkContext object Main { def main(args: Array[String]){ Wordcount.run( master = Some("local[*]"), args = args.toList ) System.exit(0) } } WordCount.scalaimport org.apache.spark.rdd.RDD import org.apache.spark.{SparkContext, SparkConf} object Wordcount { private val AppName = "WordCountSpark" private val Stopwords = scala.io.Source.fromFile("data/stopwords.txt").getLines().toList def run(master: Option[String], args: List[String]) { val sc = { val conf = new SparkConf().setAppName(AppName) for(m <- master){ conf.setMaster(m) } new SparkContext(conf) } val file: RDD[String] = sc.textFile(args(0)) val words = file flatMap sanitize val wordCounts: RDD[(String, Int)] = words map {word => (word, 1)} reduceByKey(_ + _) val top10Words = wordCounts sortBy (- _._2) take 10 top10Words.foreach(println) wordCounts.saveAsTextFile(args(1)) } private def sanitize(line: String): Seq[String] = { val allWords = line.toLowerCase.replaceAll("[^a-zA-Z0-9\\s]", "").split("\\s+") allWords filter {x => Stopwords.indexOf(x) < 0} } }
Dataset: Record Linkage Comparison Patterns Data Set
Source: UCI Machine Learning Repository Description: Dataset was curated from a record linkage study that was performed at German hospital 2010. It contains million pairs of parent records that were matched according to several different criteria, like first name, last name, date of birth and address. Each match was assigned a numerical value from 0 to 1 on how similar they are and then the data was hand labeled to identify which record pairs matched. //Importing data from a folder and creating an RDD scala> val rawData = sc.textFile("/linkage/dataset") rawData: org.apache.spark.rdd.RDD[String] = /linkage/dataset MapPartitionsRDD[5] at textFile at <console>:21 // Some basic operations on RDD //Getting the first row in data scala> rawData.first res2: String = "id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match" //Counting the number of tuples in the data scala> rawData.count() res9: Long = 5749144 //To fetch all raw data scala> rawData.collect() //To save the RDD in persistent storage like HDFS scala> rawData.saveAsTextFile("hdfs://linkage/dataset") //Getting top 5 rows of data scala> val head = rawData.take(5) head: Array[String] = Array("id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match", 37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE, 39086,47614,1,?,1,?,1,1,1,1,1,TRUE, 70031,70237,1,?,1,?,1,1,1,1,1,TRUE, 84795,97439,1,?,1,?,1,1,1,1,1,TRUE) //basic operations on top 5 rows scala> head.length res3: Int = 5 scala> head foreach println "id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match" 37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE 39086,47614,1,?,1,?,1,1,1,1,1,TRUE 70031,70237,1,?,1,?,1,1,1,1,1,TRUE 84795,97439,1,?,1,?,1,1,1,1,1,TRUE //filter headers from head data. //For that lets create a function to check if the tuple is a header or not. //In our example header has "id_1" attribute. scala> def isHeader(tuple: String) = tuple.contains("id_1") isHeader: (tuple: String)Boolean scala> head filter isHeader res3: Array[String] = Array("id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match") //Get the data without header scala> val dataWithoutHeader = head filterNot isHeader dataWithoutHeader: Array[String] = Array(37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE, 39086,47614,1,?,1,?,1,1,1,1,1,TRUE, 70031,70237,1,?,1,?,1,1,1,1,1,TRUE, 84795,97439,1,?,1,?,1,1,1,1,1,TRUE) //The first 2 attributes of the tuple are patients ID which are Int, //while the last attribute is a Boolean telling if the patients //records matched or not. while the middle 9 attributes are raw scores //on which the conclusion is made. //Lets create a case class to accept every tuple scala> case class Entry(id1: Int, id2:Int, scores: Seq[Double], matched: Boolean) defined class Entry //Lets define a paring function that excepts tuple and converts //to Entry object. Some of the score attributes have "?", which //will be replaced with NaN. scala> def parse (tuple: String) = { | val attributes = tuple.split(",") | val id1 = attributes(0).toInt | val id2 = attributes(1).toInt | val matched = attributes(11).toBoolean | val scores = attributes slice(2,11) map { x => if("?".equals(x)) Double.NaN else x.toDouble } toSeq | Entry(id1, id2, scores, matched) | } parse: (tuple: String)Entry scala> val entries: Seq[Entry] = dataWithoutHeader map parse toSeq entries: Seq[Entry] = WrappedArray(Entry(37291,53113,WrappedArray(0.833333333333333, NaN, 1.0, NaN, 1.0, 1.0, 1.0, 1.0, 0.0),true), Entry(39086,47614,WrappedArray(1.0, NaN, 1.0, NaN, 1.0, 1.0, 1.0, 1.0, 1.0),true), Entry(70031,70237,WrappedArray(1.0, NaN, 1.0, NaN, 1.0, 1.0, 1.0, 1.0, 1.0),true), Entry(84795,97439,WrappedArray(1.0, NaN, 1.0, NaN, 1.0, 1.0, 1.0, 1.0, 1.0),true)) |
Archives
April 2016
Categories |