本节介绍如何编写 Spark Streaming 应用程序,由简到难讲解使用几个核心概念来解决实际应用问题。
在实例演示中模拟实际情况,需要源源不断地接入流数据,为了在演示过程中更接近真实环境,首先需要定义流数据模拟器。该模拟器的主要功能是通过 Socket 方式监听指定的端口号,当外部程序通过该端口进行连接并请求数据时,模拟器将定时将指定的文件数据进行随机获取,并发送给外部程序。
流数据模拟器的代码如下。
import java.io.{PrintWriter} import java.net.ServerSocket import scala.io.Source object StreamingSimulation { //定义随机获取整数的方法 def index(length:Int) = { import java.util.Random val rdm = new Random rdm.nextInt(length) } def main(args: Array[String]) { //调用该模拟器需要 3 个参数,分别为文件路径、端口号和间隔时间(单位为毫秒) if (args.length != 3) { System.err.printIn("Usage:<filename> <port><millisecond>") System.exit(1) } //获取指定文件总的行数 val filename = args(0) val lines = Source.fromFile(filename).getLines.toList val filerow = lines.length //指定监听某端口,当外部程序请求时建立连接 val listener = new ServerSocket(args(1).toInt) while (true) { val socket = listener.accept() new Thread() { override def run = { printIn("Got client connected from: " + socket.getInetAddress) val out = new PrintWriter(socket.getOutputStream(), true) while (true) { Thread.sleep(args(2).toLong) //当该端口接受请求时,随机获取某行数据发送给对方 val content = lines(index(filerow)) printIn(content) out.write(content + '\n') out.flush() } socket.close() } }.start() } } }
在 IDEA 开发环境打包配置界面中:
cd /home/hadoop/IdeaProjects/out/artifacts/LearnSpark_jar
cp LearnSpark.jar /app/hadoop/spark-1.1.0/
在该实例中,Spark Streaming 将监控某目录中的文件,获取在该间隔时间段内变化的数据,然后通过 Spark Streaming 计算出该时间段内的单词统计数。
程序代码如下。
import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds,StreamingContext} import org.apache.spark.streaming.StreamingContext._ object FileWordCount { def main(args:Array[String]) { val sparkConf = new SparkConf().setAppName("FileWordCount").setMaster("local[2]") //创建 Streaming 的上下文,包括 Spark 的配置和时间间隔,这里时间间隔为 20 秒 val ssc = new StreamingContext(sparkConf,Seconds(20)) //指定监控的目录,这里为 /home/hadoop/temp/ val lines = ssc.textFileStream("/home/hadoop/temp/") //对指定文件夹中变化的数据进行单词统计并且打印 val words = lines.flatMap (_.split("")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_+_) wordCounts.print() // 启动 Streaming ssc.start() ssc.awaitTermination() } }
运行代码的步骤共有三步。
创建 /home/hadoop/temp 为 Spark Streaming 监控的目录,在该目录中定时添加文件,然后由 Spark Streaming 统计出新添加的文件中的单词个数。
$cd /app/hadoop/spark-1.1.0
$sbin/start-all.sh
在 IDEA 中运行该实例,由于该实例没有输入参数故不需要配置参数,在运行日志中将定时打印时间戳。如果在监控目录中加入文件,则输出时间戳的同时将输出该时间段内新添加的文件的单词统计个数。
在该实例中将由流数据模拟器以 1 秒的频度发送模拟数据,Spark Streaming 通过 Socket 接收流数据并每 20 秒运行一次来处理接收到的数据,处理完毕后打印该时间段内数据出现的频度,即在各处理段时间内的状态之间并无关系。
程序代码如下。
import org.apache.spark.{SparkContext,SparkConf} import org.apache.spark.streaming.{Milliseconds,Seconds,StreamingContext} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.storage.StorageLevel object NetworkWordCount { def main(args: Array[String]) { val conf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc,Seconds(20)) //通过 Socket 获取数据,需要提供 Socket 的主机名和端口号,数据保存在内存和硬盘中 val lines = ssc.socketTextStream(args(0),args(1).toInt,StorageLevel.MEMORY_AND_DISK_SER) //对读入的数据进行分割、计数 val words = lines.flatMap(_.split(",")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination() } }
运行代码的步骤共有四步。
启动流数据模拟器,模拟器 Socket 的端口号为 9999,频度为 1 秒。在该实例中将定时发送 /home/hadoop/upload/class7 目录下的 people.txt 数据文件,其中,people.txt 数据的内容如下。
1 Michael
2 Andy
3 Justin
4
启动流数据模拟器的命令如下。
$cd /app/hadoop/spark-1.1.0
$java -cp LearnSpark.jar class7.StreamingSimulation \
/home/hadoop/upload/class7/people.txt 9999 1000
在没有程序连接时,该程序处于阻塞状态。
在 IDEA 中运行该实例,需要配置连接 Socket 的主机名和端口号,在这里配置主机名为 hadoop1,端口号为 9999。
IDEA 中的 Spark Streaming 程序与模拟器建立连接,当模拟器检测到外部连接时开始发送测拭数据,数据是随机在指定的文件中获取的一行数据,时间间隔为 1 秒。图 1 是一个模拟器发送情况的截图。