1:项目背景
在日常的工作中,业务同学随时需要对比多种纬度的数据,快速的做出决策。(分析页面的漏斗、实时流量、曝光点击/转化) 所以我们需要沉淀一套实时的组件,降低数据使用门槛,提升决策效率,在一定方向上驱动业务增长。
2:Real Time Olap
联机分析处理Olap是一种软件技术,它使分析人员能够迅速、一致、交互地从各个方面观察信息,以达到深入理解数据的目的。它具有FASMI(Fast Analysis of Shared Multidimensional Information),即共享多维信息的快速分析的特征。其中F是快速性(Fast),指系统能在数秒内对用户的多数分析要求做出反应;A是可分析性(Analysis),指用户无需编程就可以定义新的专门计算,将其作为分析的一部 分,并以用户所希望的方式给出报告;M是多维性(Multi—dimensional),指提供对数据分析的多维视图和分析;I是信息性(Information),指能及时获得信息,并且管理大容量信息。
OLAP 分类,详细如下:
OLAP分类 | 相关介绍 | 相关技术组件 |
---|---|---|
MOLAP | 将分析用的数据物理上存储为多维数组的形式,形成CUBE结构。维度的属性值映射成多维数组的下标或者下标范围,事实以多维数组的值存储在数组单元中,优势是查询快速,缺点是数据量不容易控制,可能会出现维度爆炸的问题。 | Druid、Kylin、Doris |
ROLAP | 以关系模型的方式存储用作多为分析用的数据,优点在于存储体积小,查询方式灵活,然而缺点也显而易见,每次查询都需要对数据进行聚合计算,为了改善短板,ROLAP使用了列存、并行查询、查询优化、位图索引等技术。 | Clickhouse、Presto、Impala、Spark SQL、Flink SQL、GreenPlum、Elasticsearch |
HOLAP | 混合OLAP,是MOLAP和ROLAP的一种融合。当查询聚合性数据的时候,使用MOLAP技术;当查询明细数据时,使用ROLAP技术。在给定使用场景的前提下,以达到查询性能的最优化。 | 相关的介绍比较少 |
3:why clickhouse
组件 | 组件介绍 | 组件优点 | 组件缺点 |
---|---|---|---|
Kylin | 完全的预计算引擎,通过枚举所有维度的组合。建立各种Cube,提前聚合。以HBase为基础的OLAP引擎 , Hive 或 Kafka提供数据 , 一般做天级,小时级OLAP。 | 支持数据规模超大(依赖HBase) 易用性强,支持标准SQL 性能很高,查询速度很快。最新版本已剔除hbase,可以关注社区版本。 | 灵活性较弱,不支持adhoc查询 ,没有二级索引。过滤时性能一般 ,不支持join以及对数据的更新,处理方式复杂,需要定义Cube预计算。 当维度超过20个时,存储可能爆炸式增长;无法查询明细数据 , 维护复杂, 实时性 差,很多时候只能查询前一天或几个小时前的数据。 |
Impala | MPP 引擎 , 集成Kudu一般做分钟级OLAP 。集成Hive一般做天级和小时级OLAP。 | 基于内存运算,不需要把中间结果写入磁盘,省掉了大量的I/O开销。可以访问hive的metastore,对hive数据直接做数据分析。 | 采用全内存实现,需要大量的机器做支撑。这块是它的优势,也是它的劣势。 |
Presto | MPP 引擎, 集成Hudi 一般做分钟级OLAP。 集成Hive一般做天级和小时级OLAP。 | Presto是一个SQL计算引擎,分离计算层和存储层,其不存储数据,通过Connector SPI实现对各种数据源(Storage)的访问。支持数据源丰富. | 同上 |
SparkSQL/FlinkSQL | 计算引擎, 实时性取决于数据源 | 支持的数据规模大(非存储引擎) 灵活性高 易用性强,支持标准SQL以及多表join和窗口函数 处理方式简单,无需预处理,没有冗余数据 | 性能较差,当查询复杂度高且数据量大时,可能分钟级别的响应。非存储引擎,没有本地存储,当join时shuffle开销大,性能差。 SparkSql只是计算引擎,需要从外部加载数据,数据的实时性无法保证 ,多表join的时候性能很难秒级响应。 |
ClickHouse | 列存数据库,保存原始明细数据, MergeTree 数据存储本地化 | 性能高,列存压缩比高,通过索引实现秒级响应 实时性强,支持kafka导入 处理方式简单,无需预处理,保存明细数据。 | 数据规模一般 灵活性差,不支持任意的adhoc查询,join支持不好 易用性较弱,SQL语法不标准,不支持窗口函数 ,维护成本高。 |
clickhouse 可以不依赖任何组件,支持分布存储,响应快。对内存的要求没那么高,并且有丰富的查询函数支持(漏斗函数,留存函数)。详细见:clickhouse.tech/docs/zh/sql… 最终选择clickhouse 支持实时报表分析。
4:实时分析应该怎么做
1:全局属性建设
clickhouse 对于多表的join支持不好,建议所有的埋点事件都维护在一个大宽表。如果app的埋点体系设计的比较合理,并且不存在同名属性存在不同类型。那么只需要维护一个全局的属性表,把clickhouse的数据类型与hive埋点的数据类型mapping起来即可。详细如下:
hive 数据类型 | ck数据类型 |
---|---|
Double | Nullable(Float64) |
String | Nullable(String) |
Boolean | Nullable(Int8) |
Bigint | Nullable(Int64) |
Bigint | Nullable(UInt64) |
我们的bi系统对于所有的埋点元数据强一致性管理,历史存在同名属性,不同类型的数据做了历史数据处理。注意修改完数据类型需要执行一次insert overwrite 全表,不然load历史数据到clickhouse会存在数据类型不一致的情况。相关建表语句如下:
CREATE TABLE dbname.distributed_table_name on cluster clustername
(
distinct_id String,
event String,
staticdate Date,
project String,
event_date_time Nullable(DateTime)
) ENGINE = Distributed('clustername', 'default', localtablename, rand());
复制代码
2:那些埋点数据写入clickhouse
我们的app历史埋点建设比较混乱,经历了N轮产品。大家对于埋点的理解都不统一,最简单的做法就是新增埋点。我们只能在新的埋点规范大家的体系建设,逐步替换老的埋点体系 。所以并不是所有的埋点都写入clickhouse,而是业务选择对应事件的时候,判断clickhouse是否已同步相关数据。如果没有则通过spark launcher同步历史数据写入clickhouse ,实时数据隔天写入。(系统初始化的时候,已经将常用埋点分析初始化。)如果大家在实际的工作中,需要发起spark任务,建议通过livy。(rest服务即可完成交互,详细介绍点我)
注意clickhouse写入数据的时候需要唯一的key,我们是通过用户id+event+event_time 组成唯一key,而且我们的app不允许多端登录。
val frame: DataFrame = spark.sql(readSQL).na.fill("主键字段如果为空给默认值")
frame.write.format("jdbc")
.mode(SaveMode.Append)
.option("driver", clickHouseProperties.getProperty("jdbc.driver"))
.option("url", clickHouseProperties.getProperty("jdbc.url"))
.option("dbtable", projectMappingMap(eventInfo.getProject).getCkSchema)
.option("user", clickHouseProperties.getProperty("jdbc.username"))
.option("password", clickHouseProperties.getProperty("jdbc.password"))
.option("batchsize", "100000")
.option("isolationLevel", "NONE")
.option("numPartitions", partition)
.save()
复制代码
3:flink实时数据写入
clickhouse 写入数据不支持事务。如果实时任务挂了以后,基本无法保障hive的数据与clickhouse的数据是一致的。我们的任务调度是dolphinscheduler,任务挂了以后加了保活机制。在调度重启之前把当天的历史数据删除,重置kafka消费时间节点,所有数据重新写入。
clickhouse stream 启动的时候先删除数据
clickhouse-client --port 9002 -u xxx --password xxx -h xxx.com --query "alter table tablename on cluster clustername delete where staticdate >= '${1}'"
${2}是动态传入的时间点
kafka-consumer-groups.sh --bootstrap-server xxx:9092 --group ClickHouseSyncStream --reset-offsets --topic event --execute --to-datetime ${2}T15:59:00.000
复制代码
flink 写入clickhouse 通过window 添加 trigger 触发相关sink 操作(注意我们在实际的生产过程中遇到clickhouse 写入失败,建议把 socket_timeout=600000设置大一些)
eventStream.process(new EventFilterProcess())//过滤
.connect(broadcast)
.process(new EventParseProcess(startSyncTime, eventInfoStateDescriptor))
.keyBy(new EventKeyBy(100))
.window(TumblingProcessingTimeWindows.of(Time.seconds(10))).trigger(new CustomCountTriggerWithEventTime<>(3000))//触发器
复制代码
3:业务系统如何查询
业务系统在设计的时候一定要让业务同学可以操作,并且支持过滤相关数据。日常的实时分析可以通过bi建立好相关场景,把相关漏斗、自定义报表分享给业务即可。clickhouse数据查询响应存在一定的时间差,所以建议前端请求通过轮训。业务侧在处理请求的时候把相关配置改成异步。springboot的 Async 配置,即可完成相关异步操作。系统只需要引入clickhouse的windowFunnel即可完成漏斗计算,自定义报表可以通过clickhouse的groupArray完成分组数据的展示。
@EnableAsync
public class ClickhouseServiceApplication {
}
复制代码
clickhouse 的jdbc连接直接通JdbcTemplate 引入 clickhouseJdbcTemplate,详细如下:
@Configuration
public class ClickhouseDataSourceConfig implements BeanFactoryPostProcessor, EnvironmentAware {
private ConfigurableEnvironment environment;
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException {
ClickHouseProperties clickHouseProperties=resolverSetting();
createDataSourceBean(configurableListableBeanFactory, clickHouseProperties);
}
public void createDataSourceBean(ConfigurableListableBeanFactory beanFactory,
ClickHouseProperties sqlProperties) {
DataSource baseDataSource = clickHouseDruidDatasource(sqlProperties);
register(beanFactory, new JdbcTemplate(baseDataSource), "clickhouseJdbcTemplate",
"clickhouse");
}
private void register(ConfigurableListableBeanFactory beanFactory, Object bean, String name,
String alias) {
beanFactory.registerSingleton(name, bean);
if (!beanFactory.containsSingleton(alias)) {
beanFactory.registerAlias(name, alias);
}
}
@Override
public void setEnvironment(Environment environment) {
this.environment = (ConfigurableEnvironment) environment;
}
private ClickHouseProperties resolverSetting() {
Iterable<ConfigurationPropertySource> sources = ConfigurationPropertySources.get(environment);
Binder binder = new Binder(sources);
BindResult<ClickHouseProperties> bindResult = binder.bind("clickhouse", ClickHouseProperties.class);
ClickHouseProperties clickHouseProperties= bindResult.get();
return clickHouseProperties;
}
public DruidDataSource clickHouseDruidDatasource(ClickHouseProperties clickHouseProperties) {
DruidDataSource datasource = new DruidDataSource();
datasource.setUrl(clickHouseProperties.getUrl());
datasource.setUsername(clickHouseProperties.getUsername());
datasource.setPassword(clickHouseProperties.getPassword());
datasource.setDriverClassName(clickHouseProperties.getDriverClassName());
// pool configuration
datasource.setInitialSize(clickHouseProperties.getInitialSize());
datasource.setMinIdle(clickHouseProperties.getMinIdle());
datasource.setMaxActive(clickHouseProperties.getMaxActive());
datasource.setMaxWait(clickHouseProperties.getMaxWait());
datasource.setTimeBetweenEvictionRunsMillis(clickHouseProperties.getTimeBetweenEvictionRunsMillis());
datasource.setMinEvictableIdleTimeMillis(clickHouseProperties.getMinEvictableIdleTimeMillis());
datasource.setValidationQuery(clickHouseProperties.getValidationQuery());
return datasource;
}
}
复制代码
4:总结
clickhouse 写入性能极高,并且占用的内存以及cpu相对比较合理。如果公司业务在百万级别,并且机器资源并不是很宽松,建议使用clickhouse,即可完成相关业务的支撑。当然像impala+kudu很多大型互联公司使用的比较多,在我看来适合自己的就是最好的。