这是我参与8月更文挑战的第4天,活动详情查看:8月更文挑战
分层介绍
需求分析及实现思路
在之前介绍实时数仓概念时讨论过,建设实时数仓的目的,主要是增加数据计算的复用性。每次新增加统计需求时,不至于从原始数据进行计算,而是从半成品继续加工而成。
采集到 kafka 直接作为 ODS
层。
从 kafka 的 ODS
层读取用户行为日志以及业务数据,并进行简单处理,写回到 kafka 作为 DWD
层。
从 DWD
的 DWS
可能有重复计算,所以抽取出来 DWM
层。
DWS
轻度聚合,应对很多实时查询。
ADS
简单集合,提供对外接口。
每层具体职责
分层 | 数据描述 | 生成计算工具 | 存储媒介 |
---|---|---|---|
ODS | 原始数据,日志和业务数据。 | 日志服务器,maxwell | kafka |
DWD | 根据数据对象为单位进行分流,比如订单、页面访问等等。 | FLINK | kafka |
DIM | 维度数据 | FLINK | HBase |
DWM | 对于部分数据对象进行进一步加工,比如独立访问、跳出行为。依旧是明细数据。 | FLINK | kafka |
DWS | 根据某个维度主题将多个事实数据轻度聚合,形成主题宽表。 | FLINK | Clickhouse |
ADS | 把Clickhouse中的数据根据可视化需要进行筛选聚合。 | Clickhouse SQL | 可视化展示 |
计算项目准备
-
新建项目
tmall-realtime
-
新建包
包名 用途 app flink 所有计算程序 app.dwd flink dwd层 计算程序 app.dwm flink dwm层 计算程序 app.dws flink dws层 计算程序 app.func 计算函数 bean 实体类 common 公共常量 enums 枚举类 utils 工具类 -
pom 添加依赖
<!-- 配置版本 --> <properties> <java.version>1.8</java.version> <maven.compiler.source>${java.version}</maven.compiler.source> <maven.compiler.target>${java.version}</maven.compiler.target> <flink.version>1.12.0</flink.version> <scala.version>2.12</scala.version> <hadoop.version>3.1.3</hadoop.version> </properties> <!-- 依赖 --> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch6_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cep_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.68</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-to-slf4j</artifactId> <version>2.14.0</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.20</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.47</version> </dependency> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <version>1.2.0</version> </dependency> <dependency> <groupId>commons-beanutils</groupId> <artifactId>commons-beanutils</artifactId> <version>1.9.3</version> </dependency> <dependency> <groupId>org.apache.phoenix</groupId> <artifactId>phoenix-spark</artifactId> <version>5.0.0-HBase-2.0</version> <exclusions> <exclusion> <groupId>org.glassfish</groupId> <artifactId>javax.el</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.3.0</version> </dependency> <dependency> <groupId>ru.yandex.clickhouse</groupId> <artifactId>clickhouse-jdbc</artifactId> <version>0.3.0</version> <exclusions> <exclusion> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </exclusion> <exclusion> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> </dependencies> <!-- 打包插件 --> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.0.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> 复制代码
-
配置文件
log4j.rootLogger=info,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
复制代码
下期预告:ODS层 和 DWD层 的具体实现
关注专栏持续更新 ????????????????
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END