val sqlContext = new org.apache.spark.sql.SQLContext(sc) // Load Json File Example scala> val jsDF = sqlContext.read.json("sample.json") jsDF: org.apache.spark.sql.DataFrame = [address: struct<city:string,state:string>, name: string] scala> jsDF.show() +-----------------+-------+ | address| name| +-----------------+-------+ | [Columbus,Ohio]| Yin| |[null,California]|Michael| +-----------------+-------+ scala> jsDF.printSchema root |-- address: struct (nullable = true) | |-- city: string (nullable = true) | |-- state: string (nullable = true) |-- name: string (nullable = true) scala> jsDF.columns res17: Array[String] = Array(address, name) scala> jsDF.explain == Physical Plan == Scan JSONRelation[file:/Users/XXX/Documents/SparkExamples/Examples/data/sample.json][address#4,name#5] scala> jsDF.count res19: Long = 2 scala> jsDF.orderBy("name") res20: org.apache.spark.sql.DataFrame = [address: struct<city:string,state:string>, name: string] scala> jsDF.orderBy("name").show() +-----------------+-------+ | address| name| +-----------------+-------+ |[null,California]|Michael| | [Columbus,Ohio]| Yin| +-----------------+-------+ scala> jsDF.dtypes res22: Array[(String, String)] = Array((address,StructType(StructField(city,StringType,true), StructField(state,StringType,true))), (name,StringType)) scala> jsDF.select("name","address.city") res23: org.apache.spark.sql.DataFrame = [name: string, city: string] scala> jsDF.select("name","address.city").show() +-------+--------+ | name| city| +-------+--------+ | Yin|Columbus| |Michael| null| +-------+--------+ scala> val jsTable = jsDF.registerTempTable("sample") jsTable: Unit = () scala> val content = sqlContext.sql("select name, address.city, address.state from sample") content: org.apache.spark.sql.DataFrame = [name: string, city: string, state: string] scala> content.collect().foreach(println) [Yin,Columbus,Ohio] [Michael,null,California] // Saving as Parquet file. scala> jsDF.saveAsParquetFile("sample.parquet") // Load Parquet File Example scala> val parquetDF = sqlContext.read.parquet("sample.parquet") parquetDF: org.apache.spark.sql.DataFrame = [address: struct<city:string,state:string>, name: string] scala> val parquetTable = parquetDF.registerTempTable("sample") parquetContent: Unit = () scala> val content = sqlContext.sql("select name, address.city, address.state from sample") content: org.apache.spark.sql.DataFrame = [name: string, city: string, state: string] scala> content.collect().foreach(println) [Yin,Columbus,Ohio] [Michael,null,California] // Loading TextFile/csv file. scala> val sampleRDD = sc.textFile("sample.csv") sampleRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21 // Programmatically Specifying the Schema scala> val schemaString = "name city zip" schemaString: String = name city zip scala> import org.apache.spark.sql.types.{StructType, StringType, StructField} import org.apache.spark.sql.types.{StructType, StringType, StructField} scala> val schemaRDD = StructType(schemaString.split(" ").map(e => StructField(e, StringType, true))) schemaRDD: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(city,StringType,true), StructField(zip,StringType,true)) // Create DataFrame Using schemaRDD // For that we need to create rowsRDD scala> val rowsRDD = sampleRDD.map(_.split(",")).map(e => org.apache.spark.sql.Row(e(0), e(1), e(2).trim)) scala> val df = sqlContext.createDataFrame(rowsRDD, schemaRDD) df: org.apache.spark.sql.DataFrame = [name: string, city: string, zip: string]
0 Comments
|
Archives
April 2016
Categories |