Spark Streaming自定义接收器

Spark Streaming 可以接收来自任意数据源的流数据,这些数据源超出了它内置支持的数据源(也就是说,超出了 Kafka、 Kinesis、文件、套接字等)。这要求开发人员实现一个接收器,该接收器是为从相关数据源接收数据而自定义的。本指南将介绍实现定制接收器并在 Spark Streaming 应用程序中使用它的过程。注意,自定义接收器可以用 Scala 或 Java 实现。

1、实现自定义接收器

首先要实现一个Receiver (Scala docjava doc)。自定义接收方必须通过实现两个方法扩展此抽象类Receiver

  • onStart(): 开始接收数据要做的事情
  • onStop():停止接收数据要做的事情

不能无限期地阻塞 onStart ()和 onStop ()。通常,onStart ()将启动负责接收数据的线程,而 onStop ()将确保这些接收数据的线程停止。接收线程还可以使用接收方法 isStopped ()来检查它们是否应该停止接收数据。

一旦接收到数据,数据就可以通过调用 store (data)存储在 Spark 内部,这是由 Receiver 类提供的一种方法。Store ()有许多种类型,允许一次存储接收到的数据记录或作为对象/序列化字节的整个集合存储。注意,用于实现接收方的 store ()的风格影响其可靠性和容错语义。这将在后面更详细地讨论。

应该正确捕获和处理接收线程中的任何异常,以避免接收方的静默故障。Restart (< exception >)将通过异步调用 onStop ()并在延迟后调用 onStart ()来重新启动接收器。Stop (< exception >)将调用 onStop ()并终止接收器。另外,reportError (< error >)会向驱动程序报告错误消息(在日志和 UI 中可见) ,而不会停止/重新启动接收器。

下面是通过套接字接收文本流的自定义接收器。它将文本流中的“\n”分隔行分割视一条记录,并使用 Spark 存储它们。如果接收线程在连接或接收时出现错误,则重新启动接收器以再次尝试连接。

class CustomReceiver(host: String, port: Int)
  extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {

  def onStart() {
    // Start the thread that receives data over a connection
    new Thread("Socket Receiver") {
      override def run() { receive() }
    }.start()
  }

  def onStop() {
    // There is nothing much to do as the thread calling receive()
    // is designed to stop by itself if isStopped() returns false
  }

  /** Create a socket connection and receive data until receiver is stopped */
  private def receive() {
    var socket: Socket = null
    var userInput: String = null
    try {
      // Connect to host:port
      socket = new Socket(host, port)

      // Until stopped or connection broken continue reading
      val reader = new BufferedReader(
        new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))
      userInput = reader.readLine()
      while(!isStopped && userInput != null) {
        store(userInput)
        userInput = reader.readLine()
      }
      reader.close()
      socket.close()

      // Restart in an attempt to connect again when server is active again
      restart("Trying to connect again")
    } catch {
      case e: java.net.ConnectException =>
        // restart if could not connect to server
        restart("Error connecting to " + host + ":" + port, e)
      case t: Throwable =>
        // restart if there is any other error
        restart("Error receiving data", t)
    }
  }
}
复制代码

2、在 Spark Streaming 应用程序中使用自定义接收器

通过使用 streamingContext.receiverStream (自定义接收器的 < 实例 >) ,可以在 Spark Streaming 应用程序中使用自定义接收器。这将使用自定义接收器实例接收的数据创建输入 DStream,如下所示:

object CustomReceiver {
  def main(args: Array[String]): Unit = {
    if (args.length < 2) {
      System.err.println("Usage: CustomReceiver <hostname> <port>")
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()

    // Create the context with a 1 second batch size
    val sparkConf = new SparkConf().setAppName("CustomReceiver")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    // Create an input stream with the custom receiver on target ip:port and count the
    // words in input stream of \n delimited text (e.g. generated by 'nc')
    val lines = ssc.receiverStream(new CustomReceiver(args(0), args(1).toInt))
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}


class CustomReceiver(host: String, port: Int)
  extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {

  def onStart(): Unit = {
    // Start the thread that receives data over a connection
    new Thread("Socket Receiver") {
      override def run(): Unit = { receive() }
    }.start()
  }

  def onStop(): Unit = {
   // There is nothing much to do as the thread calling receive()
   // is designed to stop by itself isStopped() returns false
  }

  /** Create a socket connection and receive data until receiver is stopped */
  private def receive(): Unit = {
   var socket: Socket = null
   var userInput: String = null
   try {
     println(s"Connecting to $host : $port")
     socket = new Socket(host, port)
     println(s"Connected to $host : $port")
     val reader = new BufferedReader(
       new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))
     userInput = reader.readLine()
     while(!isStopped && userInput != null) {
       store(userInput)
       userInput = reader.readLine()
     }
     reader.close()
     socket.close()
     println("Stopped receiving")
     restart("Trying to connect again")
   } catch {
     case e: java.net.ConnectException =>
       restart(s"Error connecting to $host : $port", e)
     case t: Throwable =>
       restart("Error receiving data", t)
   }
  }
}
复制代码
© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享