本文以WordCount为例,展示如何将应用程序转化为三层架构模式,从而降低耦合性,易于复用。
object Spark_WordCount {
def main(args: Array[String]): Unit = {
// 1. 环境变量 => App层
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
val sc = new SparkContext(sparkConf)
// 2. 数据交互 => Dao层
val lines: RDD[String] = sc.textFile("datas/word.txt")
// 3. 业务逻辑 => Service层
val words: RDD[String] = lines.flatMap(_.split(" "))
val wordToOne: RDD[(String, Int)] = words.map(
word => (word, 1)
)
val wordToCount: RDD[(String, Int)] = wordToOne.reduceByKey((x, y) => x + y)
val array: Array[(String, Int)] = wordToCount.collect()
// 4. 结果的调度 => Controller层
array.foreach(println)
// 5. 资源释放 => App层
sc.stop()
}
}
复制代码
三层架构
核心三层架构:
- DAO(持久层):与文件系统、数据库交互,进行数据的传递
- Service(服务层):通过DAO层获取数据,执行核心的业务逻辑
- Controller(调度层):通过Service层获取执行结果,进行任务调度
其他组成部分:
- Application(启动类):有一个Main方法,用于启动程序
- Common(通用模块):对相似的操作进行封装,形成接口/抽象类,
- Bean(对象模块):存放实体类JavaBean
- 内存共享(工具类):使用ThreadLocalMap存放SparkContext环境变量,用于同一线程的不同模块共享使用
DAO层
Dao接口:封装基本的数据IO操作
package spark.core.framework.common
/**
* 数据持久层接口
*/
trait Dao {
// 读取文本文件
def readFile(path:String): RDD[String] = {
EnvVarUtil.take().textFile(path)
}
// 保存文本文件
def saveFile(path:String): Unit = {
......
}
}
复制代码
WordCountDao类:
package spark.core.framework.dao
/**
* WordCount持久层
*/
class WordCountDao extends Dao {
}
复制代码
Service层
Service层接口:
package spark.core.framework.common
/**
* 服务层接口
*/
trait Service {
// 业务逻辑方法
def dataAnalysis():Any
}
复制代码
WordCountService类:
package spark.core.framework.service
/**
* WordCount服务层
*/
class WordCountService extends Service{
// 调用wordCountDao,实现数据交互
private val wordCountDao = new WordCountDao()
// 业务逻辑实现
override def dataAnalysis(): Array[(String, Int)] = {
val lines = wordCountDao.readFile("datas/1.txt")
val words: RDD[String] = lines.flatMap(_.split(" "))
val wordToOne = words.map(word=>(word,1))
val wordToSum: RDD[(String, Int)] = wordToOne.reduceByKey(_+_)
val array: Array[(String, Int)] = wordToSum.collect()
array
}
}
复制代码
Controller层
Controller接口:
package spark.core.framework.common
/**
* 调度层接口
*/
trait Controller {
// 调度方法
def dispatch(): Unit
}
复制代码
WordCountController类:
package spark.core.framework.controller
/**
* WordCount调度层
*/
class WordCountController extends Controller {
// 调用wordCountService,执行业务逻辑
private val wordCountService = new WordCountService()
override def dispatch(): Unit = {
// 执行业务,返回结果
val array = wordCountService.dataAnalysis()
// 对结果调度(打印、存储、传给其他方法...)
array.foreach(println)
}
}
复制代码
其他组成部分
Application
应用层接口:
package spark.core.framework.common
/**
* 应用层接口
*/
trait Application {
/**
* @param master master名称
* @param app App名称
* @param op 调度逻辑
*/
def start(master: String = "local[*]", app: String = "Application")
(op: => Unit): Unit = {
// 1. 获取环境变量,放入共享内存中
val sparkConf = new SparkConf().setMaster(master).setAppName(appName)
val sc = new SparkContext(sparkConf)
EnvVarUtil.put(sc)
// 2. 传入Controller层调度逻辑
try {
op //调度代码
} catch {
case ex => println(ex.getMessage)
}
// 3.资源的关闭释放
sc.stop()
EnvVarUtil.remove()
}
}
复制代码
启动类:
package spark.core.framework.application
/**
* 启动类
*/
object WordCountApplication extends App with Application{
/**
* 启动: 传入环境变量的参数、调度代码
*/
start("local[*]", "wordCount"){
// 调用wordCountController,进行任务调度
val controller = new WordCountController()
controller.dispatch()
}
}
复制代码
工具类
package spark.core.framework.util
/**
* 环境变量工具类: 将共享数据放在ThreadLocal,同一线程可以访问
* 这里用于SparkContext的共享使用
*/
object EnvVarUtil {
private val scThreadLocal = new ThreadLocal[SparkContext]()
def put(sc: SparkContext): Unit = {
scThreadLocal.set(sc)
}
def take(): SparkContext = {
scThreadLocal.get()
}
def remove(): Unit = {
scThreadLocal.remove()
}
}
复制代码
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END