Spark工程化代码_三层架构模式

本文以WordCount为例,展示如何将应用程序转化为三层架构模式,从而降低耦合性,易于复用。

object Spark_WordCount {
  def main(args: Array[String]): Unit = {
    // 1. 环境变量 => App层
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
    val sc = new SparkContext(sparkConf)
    
    // 2. 数据交互 => Dao层
    val lines: RDD[String] = sc.textFile("datas/word.txt")
      
    // 3. 业务逻辑 => Service层
    val words: RDD[String] = lines.flatMap(_.split(" "))
    val wordToOne: RDD[(String, Int)] = words.map(
      word => (word, 1)
    )
    val wordToCount: RDD[(String, Int)] = wordToOne.reduceByKey((x, y) => x + y)
    val array: Array[(String, Int)] = wordToCount.collect()
      
    // 4. 结果的调度 => Controller层
    array.foreach(println)

    // 5. 资源释放 => App层
    sc.stop()
  }
}
复制代码

三层架构

image-20210420164250316

核心三层架构:

  • DAO(持久层):与文件系统、数据库交互,进行数据的传递
  • Service(服务层):通过DAO层获取数据,执行核心的业务逻辑
  • Controller(调度层):通过Service层获取执行结果,进行任务调度

其他组成部分:

  • Application(启动类):有一个Main方法,用于启动程序
  • Common(通用模块):对相似的操作进行封装,形成接口/抽象类,
  • Bean(对象模块):存放实体类JavaBean
  • 内存共享(工具类):使用ThreadLocalMap存放SparkContext环境变量,用于同一线程的不同模块共享使用

DAO层

Dao接口:封装基本的数据IO操作

package spark.core.framework.common

/**
 * 数据持久层接口
 */
trait Dao {
  
    // 读取文本文件
  def readFile(path:String): RDD[String] = {
    EnvVarUtil.take().textFile(path)
  }
   
  // 保存文本文件
  def saveFile(path:String): Unit = {
   ......
  }
}
复制代码

WordCountDao类:

package spark.core.framework.dao

/**
 *  WordCount持久层
 */
class WordCountDao extends Dao {

}
复制代码

Service层

Service层接口:

package spark.core.framework.common

/**
 * 服务层接口
 */
trait Service {
  // 业务逻辑方法
  def dataAnalysis():Any
}
复制代码

WordCountService类:

package spark.core.framework.service

/**
 *  WordCount服务层
 */
class WordCountService extends Service{
	
  // 调用wordCountDao,实现数据交互
  private val wordCountDao = new WordCountDao()

  // 业务逻辑实现
  override def dataAnalysis(): Array[(String, Int)] = {
    val lines = wordCountDao.readFile("datas/1.txt")
    val words: RDD[String] = lines.flatMap(_.split(" "))
    val wordToOne = words.map(word=>(word,1))
    val wordToSum: RDD[(String, Int)] = wordToOne.reduceByKey(_+_)
    val array: Array[(String, Int)] = wordToSum.collect()
    array
  }
}
复制代码

Controller层

Controller接口:

package spark.core.framework.common

/**
 * 调度层接口
 */
trait Controller {

  // 调度方法
  def dispatch(): Unit

}
复制代码

WordCountController类:

package spark.core.framework.controller

/**
 * WordCount调度层
 */
class WordCountController extends Controller {
  
  // 调用wordCountService,执行业务逻辑
  private val wordCountService = new WordCountService()

  override def dispatch(): Unit = {
    // 执行业务,返回结果
    val array = wordCountService.dataAnalysis()
    // 对结果调度(打印、存储、传给其他方法...)
    array.foreach(println)
  }
}
复制代码

其他组成部分

Application

应用层接口:

package spark.core.framework.common

/**
 * 应用层接口
 */
trait Application {

  /**
   * @param master  master名称
   * @param app     App名称
   * @param op      调度逻辑
   */
  def start(master: String = "local[*]", app: String = "Application")
           (op: => Unit): Unit = {

    // 1. 获取环境变量,放入共享内存中
    val sparkConf = new SparkConf().setMaster(master).setAppName(appName)
    val sc = new SparkContext(sparkConf)
    EnvVarUtil.put(sc)

    // 2. 传入Controller层调度逻辑
    try {
      op		//调度代码
    } catch {
      case ex => println(ex.getMessage)
    }

    // 3.资源的关闭释放
    sc.stop()
    EnvVarUtil.remove()
  }

}
复制代码

启动类:

package spark.core.framework.application

/**
 * 启动类
 */
object WordCountApplication extends App with Application{

  /**
   * 启动: 传入环境变量的参数、调度代码
   */
  start("local[*]", "wordCount"){
    // 调用wordCountController,进行任务调度
    val controller = new WordCountController()
    controller.dispatch()
  }

}
复制代码

工具类

package spark.core.framework.util

/**
 * 环境变量工具类: 将共享数据放在ThreadLocal,同一线程可以访问
 * 这里用于SparkContext的共享使用
 */
object EnvVarUtil {

  private val scThreadLocal = new ThreadLocal[SparkContext]()

  def put(sc: SparkContext): Unit = {
    scThreadLocal.set(sc)
  }

  def take(): SparkContext = {
    scThreadLocal.get()
  }

  def remove(): Unit = {
    scThreadLocal.remove()
  }

}
复制代码
© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享