clickhouse OLAP 实战之自定义报表&自定义漏斗

1:项目背景

在日常的工作中,业务同学随时需要对比多种纬度的数据,快速的做出决策。(分析页面的漏斗、实时流量、曝光点击/转化) 所以我们需要沉淀一套实时的组件,降低数据使用门槛,提升决策效率,在一定方向上驱动业务增长。

2:Real Time Olap

联机分析处理Olap是一种软件技术,它使分析人员能够迅速、一致、交互地从各个方面观察信息,以达到深入理解数据的目的。它具有FASMI(Fast Analysis of Shared Multidimensional Information),即共享多维信息的快速分析的特征。其中F是快速性(Fast),指系统能在数秒内对用户的多数分析要求做出反应;A是可分析性(Analysis),指用户无需编程就可以定义新的专门计算,将其作为分析的一部 分,并以用户所希望的方式给出报告;M是多维性(Multi—dimensional),指提供对数据分析的多维视图和分析;I是信息性(Information),指能及时获得信息,并且管理大容量信息。
OLAP 分类,详细如下:

OLAP分类 相关介绍 相关技术组件
MOLAP 将分析用的数据物理上存储为多维数组的形式,形成CUBE结构。维度的属性值映射成多维数组的下标或者下标范围,事实以多维数组的值存储在数组单元中,优势是查询快速,缺点是数据量不容易控制,可能会出现维度爆炸的问题。 Druid、Kylin、Doris
ROLAP 以关系模型的方式存储用作多为分析用的数据,优点在于存储体积小,查询方式灵活,然而缺点也显而易见,每次查询都需要对数据进行聚合计算,为了改善短板,ROLAP使用了列存、并行查询、查询优化、位图索引等技术。 Clickhouse、Presto、Impala、Spark SQL、Flink SQL、GreenPlum、Elasticsearch
HOLAP 混合OLAP,是MOLAP和ROLAP的一种融合。当查询聚合性数据的时候,使用MOLAP技术;当查询明细数据时,使用ROLAP技术。在给定使用场景的前提下,以达到查询性能的最优化。 相关的介绍比较少

3:why clickhouse

组件 组件介绍 组件优点 组件缺点
Kylin 完全的预计算引擎,通过枚举所有维度的组合。建立各种Cube,提前聚合。以HBase为基础的OLAP引擎 ,  Hive 或 Kafka提供数据 , 一般做天级,小时级OLAP。  支持数据规模超大(依赖HBase)  易用性强,支持标准SQL  性能很高,查询速度很快。最新版本已剔除hbase,可以关注社区版本。 灵活性较弱,不支持adhoc查询  ,没有二级索引。过滤时性能一般  ,不支持join以及对数据的更新,处理方式复杂,需要定义Cube预计算。  当维度超过20个时,存储可能爆炸式增长;无法查询明细数据 , 维护复杂, 实时性  差,很多时候只能查询前一天或几个小时前的数据。
Impala MPP 引擎 , 集成Kudu一般做分钟级OLAP 。集成Hive一般做天级和小时级OLAP。  基于内存运算,不需要把中间结果写入磁盘,省掉了大量的I/O开销。可以访问hive的metastore,对hive数据直接做数据分析。 采用全内存实现,需要大量的机器做支撑。这块是它的优势,也是它的劣势。
Presto MPP 引擎, 集成Hudi 一般做分钟级OLAP。 集成Hive一般做天级和小时级OLAP。 Presto是一个SQL计算引擎,分离计算层和存储层,其不存储数据,通过Connector SPI实现对各种数据源(Storage)的访问。支持数据源丰富. 同上
SparkSQL/FlinkSQL 计算引擎, 实时性取决于数据源 支持的数据规模大(非存储引擎) 灵活性高 易用性强,支持标准SQL以及多表join和窗口函数 处理方式简单,无需预处理,没有冗余数据 性能较差,当查询复杂度高且数据量大时,可能分钟级别的响应。非存储引擎,没有本地存储,当join时shuffle开销大,性能差。  SparkSql只是计算引擎,需要从外部加载数据,数据的实时性无法保证 ,多表join的时候性能很难秒级响应。
ClickHouse 列存数据库,保存原始明细数据, MergeTree 数据存储本地化 性能高,列存压缩比高,通过索引实现秒级响应 实时性强,支持kafka导入 处理方式简单,无需预处理,保存明细数据。 数据规模一般 灵活性差,不支持任意的adhoc查询,join支持不好 易用性较弱,SQL语法不标准,不支持窗口函数 ,维护成本高。

clickhouse 可以不依赖任何组件,支持分布存储,响应快。对内存的要求没那么高,并且有丰富的查询函数支持(漏斗函数,留存函数)。详细见:clickhouse.tech/docs/zh/sql… 最终选择clickhouse 支持实时报表分析。

4:实时分析应该怎么做

1:全局属性建设

clickhouse 对于多表的join支持不好,建议所有的埋点事件都维护在一个大宽表。如果app的埋点体系设计的比较合理,并且不存在同名属性存在不同类型。那么只需要维护一个全局的属性表,把clickhouse的数据类型与hive埋点的数据类型mapping起来即可。详细如下:

hive 数据类型 ck数据类型
Double Nullable(Float64)
String Nullable(String)
Boolean Nullable(Int8)
Bigint Nullable(Int64)
Bigint Nullable(UInt64)

我们的bi系统对于所有的埋点元数据强一致性管理,历史存在同名属性,不同类型的数据做了历史数据处理。注意修改完数据类型需要执行一次insert overwrite 全表,不然load历史数据到clickhouse会存在数据类型不一致的情况。相关建表语句如下:

CREATE TABLE dbname.distributed_table_name on cluster clustername
(
    distinct_id String,
    event String,
    staticdate Date,
    project String,
    event_date_time Nullable(DateTime)
) ENGINE = Distributed('clustername', 'default', localtablename, rand());
复制代码
2:那些埋点数据写入clickhouse

我们的app历史埋点建设比较混乱,经历了N轮产品。大家对于埋点的理解都不统一,最简单的做法就是新增埋点。我们只能在新的埋点规范大家的体系建设,逐步替换老的埋点体系 。所以并不是所有的埋点都写入clickhouse,而是业务选择对应事件的时候,判断clickhouse是否已同步相关数据。如果没有则通过spark launcher同步历史数据写入clickhouse ,实时数据隔天写入。(系统初始化的时候,已经将常用埋点分析初始化。)如果大家在实际的工作中,需要发起spark任务,建议通过livy。(rest服务即可完成交互,详细介绍点我
image.png

注意clickhouse写入数据的时候需要唯一的key,我们是通过用户id+event+event_time 组成唯一key,而且我们的app不允许多端登录。

 val frame: DataFrame = spark.sql(readSQL).na.fill("主键字段如果为空给默认值")
    frame.write.format("jdbc")
      .mode(SaveMode.Append)
      .option("driver", clickHouseProperties.getProperty("jdbc.driver"))
      .option("url", clickHouseProperties.getProperty("jdbc.url"))
      .option("dbtable", projectMappingMap(eventInfo.getProject).getCkSchema)
      .option("user", clickHouseProperties.getProperty("jdbc.username"))
      .option("password", clickHouseProperties.getProperty("jdbc.password"))
      .option("batchsize", "100000")
      .option("isolationLevel", "NONE")
      .option("numPartitions", partition)
      .save()
复制代码
3:flink实时数据写入

clickhouse 写入数据不支持事务。如果实时任务挂了以后,基本无法保障hive的数据与clickhouse的数据是一致的。我们的任务调度是dolphinscheduler,任务挂了以后加了保活机制。在调度重启之前把当天的历史数据删除,重置kafka消费时间节点,所有数据重新写入。

clickhouse stream 启动的时候先删除数据
clickhouse-client --port 9002 -u xxx --password xxx -h xxx.com --query "alter table tablename on cluster clustername delete where staticdate >= '${1}'"


${2}是动态传入的时间点
kafka-consumer-groups.sh --bootstrap-server xxx:9092 --group ClickHouseSyncStream --reset-offsets --topic event --execute --to-datetime ${2}T15:59:00.000
复制代码

flink 写入clickhouse 通过window 添加 trigger 触发相关sink 操作(注意我们在实际的生产过程中遇到clickhouse 写入失败,建议把 socket_timeout=600000设置大一些)

 eventStream.process(new EventFilterProcess())//过滤
                .connect(broadcast)
                .process(new EventParseProcess(startSyncTime, eventInfoStateDescriptor))
                .keyBy(new EventKeyBy(100))
               .window(TumblingProcessingTimeWindows.of(Time.seconds(10))).trigger(new CustomCountTriggerWithEventTime<>(3000))//触发器
复制代码

3:业务系统如何查询

        业务系统在设计的时候一定要让业务同学可以操作,并且支持过滤相关数据。日常的实时分析可以通过bi建立好相关场景,把相关漏斗、自定义报表分享给业务即可。clickhouse数据查询响应存在一定的时间差,所以建议前端请求通过轮训。业务侧在处理请求的时候把相关配置改成异步。springboot的 Async 配置,即可完成相关异步操作。系统只需要引入clickhouse的windowFunnel即可完成漏斗计算,自定义报表可以通过clickhouse的groupArray完成分组数据的展示。

@EnableAsync
public class ClickhouseServiceApplication {

}
复制代码

clickhouse 的jdbc连接直接通JdbcTemplate 引入 clickhouseJdbcTemplate,详细如下:

@Configuration
public class ClickhouseDataSourceConfig implements BeanFactoryPostProcessor, EnvironmentAware {
    private ConfigurableEnvironment environment;
    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException {
        ClickHouseProperties clickHouseProperties=resolverSetting();
        createDataSourceBean(configurableListableBeanFactory, clickHouseProperties);
    }

    public void createDataSourceBean(ConfigurableListableBeanFactory beanFactory,
                                     ClickHouseProperties sqlProperties) {

        DataSource baseDataSource = clickHouseDruidDatasource(sqlProperties);
        register(beanFactory, new JdbcTemplate(baseDataSource), "clickhouseJdbcTemplate",
                "clickhouse");
    }

    private void register(ConfigurableListableBeanFactory beanFactory, Object bean, String name,
                          String alias) {
        beanFactory.registerSingleton(name, bean);
        if (!beanFactory.containsSingleton(alias)) {
            beanFactory.registerAlias(name, alias);
        }
    }

    @Override
    public void setEnvironment(Environment environment) {
        this.environment = (ConfigurableEnvironment) environment;
    }

    private ClickHouseProperties resolverSetting() {

        Iterable<ConfigurationPropertySource> sources = ConfigurationPropertySources.get(environment);
        Binder binder = new Binder(sources);
        BindResult<ClickHouseProperties> bindResult = binder.bind("clickhouse", ClickHouseProperties.class);
        ClickHouseProperties clickHouseProperties= bindResult.get();
        return  clickHouseProperties;
    }

    public DruidDataSource clickHouseDruidDatasource(ClickHouseProperties clickHouseProperties) {
        DruidDataSource datasource = new DruidDataSource();
        datasource.setUrl(clickHouseProperties.getUrl());
        datasource.setUsername(clickHouseProperties.getUsername());
        datasource.setPassword(clickHouseProperties.getPassword());
        datasource.setDriverClassName(clickHouseProperties.getDriverClassName());
        // pool configuration
        datasource.setInitialSize(clickHouseProperties.getInitialSize());
        datasource.setMinIdle(clickHouseProperties.getMinIdle());
        datasource.setMaxActive(clickHouseProperties.getMaxActive());

        datasource.setMaxWait(clickHouseProperties.getMaxWait());
        datasource.setTimeBetweenEvictionRunsMillis(clickHouseProperties.getTimeBetweenEvictionRunsMillis());
        datasource.setMinEvictableIdleTimeMillis(clickHouseProperties.getMinEvictableIdleTimeMillis());
        datasource.setValidationQuery(clickHouseProperties.getValidationQuery());
        return datasource;
    }
}
复制代码

4:总结

clickhouse 写入性能极高,并且占用的内存以及cpu相对比较合理。如果公司业务在百万级别,并且机器资源并不是很宽松,建议使用clickhouse,即可完成相关业务的支撑。当然像impala+kudu很多大型互联公司使用的比较多,在我看来适合自己的就是最好的。

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享