Spark Streaming 可以接收来自任意数据源的流数据,这些数据源超出了它内置支持的数据源(也就是说,超出了 Kafka、 Kinesis、文件、套接字等)。这要求开发人员实现一个接收器,该接收器是为从相关数据源接收数据而自定义的。本指南将介绍实现定制接收器并在 Spark Streaming 应用程序中使用它的过程。注意,自定义接收器可以用 Scala 或 Java 实现。
1、实现自定义接收器
首先要实现一个Receiver (Scala doc,java doc)。自定义接收方必须通过实现两个方法扩展此抽象类Receiver
- onStart(): 开始接收数据要做的事情
- onStop():停止接收数据要做的事情
不能无限期地阻塞 onStart ()和 onStop ()。通常,onStart ()将启动负责接收数据的线程,而 onStop ()将确保这些接收数据的线程停止。接收线程还可以使用接收方法 isStopped ()来检查它们是否应该停止接收数据。
一旦接收到数据,数据就可以通过调用 store (data)存储在 Spark 内部,这是由 Receiver 类提供的一种方法。Store ()有许多种类型,允许一次存储接收到的数据记录或作为对象/序列化字节的整个集合存储。注意,用于实现接收方的 store ()的风格影响其可靠性和容错语义。这将在后面更详细地讨论。
应该正确捕获和处理接收线程中的任何异常,以避免接收方的静默故障。Restart (< exception >)将通过异步调用 onStop ()并在延迟后调用 onStart ()来重新启动接收器。Stop (< exception >)将调用 onStop ()并终止接收器。另外,reportError (< error >)会向驱动程序报告错误消息(在日志和 UI 中可见) ,而不会停止/重新启动接收器。
下面是通过套接字接收文本流的自定义接收器。它将文本流中的“\n”分隔行分割视一条记录,并使用 Spark 存储它们。如果接收线程在连接或接收时出现错误,则重新启动接收器以再次尝试连接。
class CustomReceiver(host: String, port: Int)
extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
def onStart() {
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
override def run() { receive() }
}.start()
}
def onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself if isStopped() returns false
}
/** Create a socket connection and receive data until receiver is stopped */
private def receive() {
var socket: Socket = null
var userInput: String = null
try {
// Connect to host:port
socket = new Socket(host, port)
// Until stopped or connection broken continue reading
val reader = new BufferedReader(
new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))
userInput = reader.readLine()
while(!isStopped && userInput != null) {
store(userInput)
userInput = reader.readLine()
}
reader.close()
socket.close()
// Restart in an attempt to connect again when server is active again
restart("Trying to connect again")
} catch {
case e: java.net.ConnectException =>
// restart if could not connect to server
restart("Error connecting to " + host + ":" + port, e)
case t: Throwable =>
// restart if there is any other error
restart("Error receiving data", t)
}
}
}
复制代码
2、在 Spark Streaming 应用程序中使用自定义接收器
通过使用 streamingContext.receiverStream (自定义接收器的 < 实例 >) ,可以在 Spark Streaming 应用程序中使用自定义接收器。这将使用自定义接收器实例接收的数据创建输入 DStream,如下所示:
object CustomReceiver {
def main(args: Array[String]): Unit = {
if (args.length < 2) {
System.err.println("Usage: CustomReceiver <hostname> <port>")
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
// Create the context with a 1 second batch size
val sparkConf = new SparkConf().setAppName("CustomReceiver")
val ssc = new StreamingContext(sparkConf, Seconds(1))
// Create an input stream with the custom receiver on target ip:port and count the
// words in input stream of \n delimited text (e.g. generated by 'nc')
val lines = ssc.receiverStream(new CustomReceiver(args(0), args(1).toInt))
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
class CustomReceiver(host: String, port: Int)
extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
def onStart(): Unit = {
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
override def run(): Unit = { receive() }
}.start()
}
def onStop(): Unit = {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself isStopped() returns false
}
/** Create a socket connection and receive data until receiver is stopped */
private def receive(): Unit = {
var socket: Socket = null
var userInput: String = null
try {
println(s"Connecting to $host : $port")
socket = new Socket(host, port)
println(s"Connected to $host : $port")
val reader = new BufferedReader(
new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))
userInput = reader.readLine()
while(!isStopped && userInput != null) {
store(userInput)
userInput = reader.readLine()
}
reader.close()
socket.close()
println("Stopped receiving")
restart("Trying to connect again")
} catch {
case e: java.net.ConnectException =>
restart(s"Error connecting to $host : $port", e)
case t: Throwable =>
restart("Error receiving data", t)
}
}
}
复制代码