build.sbtname := "WordCountExample" version := "1.0" scalaVersion := "2.11.7" libraryDependencies += "org.apache.spark" %% "spark-core" % "1.4.1" Main.scalaimport org.apache.spark.SparkContext object Main { def main(args: Array[String]){ Wordcount.run( master = Some("local[*]"), args = args.toList ) System.exit(0) } } WordCount.scalaimport org.apache.spark.rdd.RDD import org.apache.spark.{SparkContext, SparkConf} object Wordcount { private val AppName = "WordCountSpark" private val Stopwords = scala.io.Source.fromFile("data/stopwords.txt").getLines().toList def run(master: Option[String], args: List[String]) { val sc = { val conf = new SparkConf().setAppName(AppName) for(m <- master){ conf.setMaster(m) } new SparkContext(conf) } val file: RDD[String] = sc.textFile(args(0)) val words = file flatMap sanitize val wordCounts: RDD[(String, Int)] = words map {word => (word, 1)} reduceByKey(_ + _) val top10Words = wordCounts sortBy (- _._2) take 10 top10Words.foreach(println) wordCounts.saveAsTextFile(args(1)) } private def sanitize(line: String): Seq[String] = { val allWords = line.toLowerCase.replaceAll("[^a-zA-Z0-9\\s]", "").split("\\s+") allWords filter {x => Stopwords.indexOf(x) < 0} } }
0 Comments
Leave a Reply. |
Archives
April 2016
Categories |