大数据人群画像技术方案

1:项目背景

当一个app达到一定的体量,千人千面、个性化营销是每一个app提升留存、付费必备法宝。最终始终离不开营销利器,用户画像。项目从0到1构建画像体系,由T+1升级为实时。过程中不断的优化画像方案,赋能产品、业务。在个性化营销的路上越来越顺滑。

2:整体的技术方案

1:实时画像通过flink stream job

画像流程图.jpg
        1: app上报log日志到springboot,springboot 处理相关逻辑到kafka 。

        2: db binlog 通过maxwell 发送到kafka。

        3:所有的日志都到kafka, 画像、推荐相关的服务不需要消费所有的event。所以需要做一次二次分流降低处理所有消息的压力。我们根据具体的业务场景,把对应需要消费的数据对应的topic配置在db中。flink通过双流join (broadcast) 处理配置信息。

        4:项目中把埋点、binlog 分为两个实时流,两个流不操作任何存储。把需要同步的结果发送到kafka. 启动专门的画像同步流到hbase 或es。

       5:补充说明一下,我们的画像体系要满足多个app的使用。所以一定需要一个id_mapping 服务,人与设备的关系,设备与人的关系(建议保存最近的两条)。推荐、画像相关的服务需要用到mapping关系。另外appstore也禁用idfa,用户重装、以后设备信息就会发生变化。

FlinkKafkaConsumer<Event> eventConsumer = new FlinkKafkaConsumer<>(Topics.TEST, new EventSchema(), kafkaConsumerProperties);
DataStreamSource<Event> eventStream = env.addSource(eventConsumer);

//创建规则
MapStateDescriptor<String, HashMap<String, HashSet<String>>> topicRulesBroadcastState = new MapStateDescriptor<>(
        "topicRulesBroadcastState",
        BasicTypeInfo.STRING_TYPE_INFO,
        TypeInformation.of(new TypeHint<HashMap<String, HashSet<String>>>() {
        }));

//EventRedistributeSource extends RichSourceFunction run method ,sleep 1-5分钟同步一次规则
DataStreamSource<List<TopicRedistribute>> topicInfo = env.addSource(new EventRedistributeSource());

BroadcastStream<List<TopicRedistribute>> broadcast = topicInfo.broadcast(topicRulesBroadcastState);
KeyedStream<EventResult, String> eventResultStringKeyedStream = eventStream.connect(broadcast).process(new EventRedistributeProcess(topicRulesBroadcastState)).
                keyBy(new KeySelector<EventResult, String>() {
                    @Override
                    public String getKey(EventResult eventResult) throws Exception {
                        return eventResult.getTopic();
                    }
                });
复制代码
2:离线画像通过spark sql

Snip20210704_11.png
        1:所有的业务数据收集通过spark streaming消费kafka存储到 hdfs.

        2: 凌晨1点通过dolphinscheduler 调度spark sql 跑所有分层数据。如果app整体的埋点体系构建比较完善,历史遗留无效埋点比较少,可以设置所有埋点都可以上报。如果埋点建设有一定的混乱,建议部分埋点开启。很多无效埋点会占数据块,如果小文件管理不善,可能会导致整个集群宕机的情况。

        3:dwd层的数据建议把不同APP,同一个行为的数据按照project,staticdate 时间分区。这样用户同一个行为数据集中在一起,方便后续的etl .

        4: dm层的数据建议设置优先级,因为同一层的数据可能存在相互依赖的情况。一定要把数据来源, sink 目标表,ddl,exec_sqls(建议 JsonArray存储),app项目,分区,exec_level(建议越小约优先执行),状态,创建人,时间,修改人,时间(方便以后问题追踪),还可以扩展到kylin,clickhouse 等olap场景是否使用。

        5:ads 层的数据基本都是聚合数据,目标数据更新到hbase ,elasticsearch 建议不要通过外表的方式更新。hbase的更新建议通过bulkload更新到hbase, elasticsearch的更新建议到通过spark rdd 通过发送kafka消息更新到画像体系。

        6: etl的过程一定要记录一下执行时间,以及失败以后的重试。因为每一层的数据都是层层依赖,不然执行失败以后业务的同学发现数据有问题了。数据的同学再去补充数据,是很尴尬的做法。

for (i <- execSQLList.indices) {
        var calculateStatus = 0
        val sql: String = HiveUtils.hiveStaticdateReplace(execSQLList(i), jobParams.staticdate)
        val start: Long = System.currentTimeMillis()
        try {
          spark.sql(sql)
          calculateStatus = 1
        } catch {
          case e: Exception =>
            calculateStatus = 2
            throw e
        } finally {
          val end: Long = System.currentTimeMillis()
          updateDwLoadInfo(job.getId, i + 1, start, end, end - start, calculateStatus,job.getCreator)
        }
 }
复制代码
3:画像结果存储问题 hbase& elasticsearch

        1:当时做人群画像的时候,没有人群画像的经验。在网上找一些通用化的解决方案,一方面降低技术风险,另外一方面没有独立的服务给业务同学(业务系统有几千的qps),当时不敢大刀阔斧的干。

       2:存储在hbase中主要方便实时画像,通过用户id或设备id判断用户属性是否满足规则。

      3:  存储在elasticsearch中,一方面通过 elasticsearch-sql 把所有的规则拼装成sql,直接查询画像结果的预估数据,另外一方面我们的画像第一版本是离线的,创建画像结果以后我们通过spark任务,把符合目标的数据存储在redis中。做到与业务系统的解耦,起码不能因为服务问题导致业务系统出现问题。

      4:业务人员根据标签选择对应的规则以后,我们通过规则动态拼装成sql 。sql再解析成dsl,最终的结果通过spark job 存储在redis 中。注意elasticsearch 支持elaticsearch-sql 需要安装对应的plugin. 相关细节可以见:github.com/NLPchina/el…

 <dependency>
       <groupId>org.nlpcn</groupId>
       <artifactId>elasticsearch-sql</artifactId>
       <version>7.4.2.0</version>
 </dependency>

/**
     * SQL转换DSL
     *
     * @param sql
     * @return
     * @throws Exception
     */
    private String sqlToEsQuery(String sql) throws Exception {
        try {
            Settings settings = Settings.builder().build();
            ThreadPool threadPool = new ThreadPool(settings);
            Client client = new NodeClient(settings, threadPool);
            SearchDao searchDao = new SearchDao(client);
            return searchDao.explain(sql).explain().explain();
        } catch (Exception ex) {
            logger.error("error code  xxx" String.format("error in xxx sqlToEsQuery method: %s ", sql) + ex);
            throw new Exception(xxx, ex);
        }
    }
复制代码
4:业务系统如何提交spark 任务

1:业务系统封装相关提交参数,http请求通过SparkLauncher 提交spark 任务到yarn 上执行。具体可以在github 搜索一下 SparkLauncher。

 SparkLauncher launcher = new SparkLauncher()
                .setSparkHome("/opt/cloudera/parcels/CDH-6.1.2/lib/spark")
                .setAppResource(sparkAppPara.getJarPath())
                .setMainClass(sparkAppPara.getMainClass())
                .setMaster(sparkAppPara.getMaster())

                .setDeployMode(sparkAppPara.getDeployMode())
                .setConf("spark.driver.memory", sparkAppPara.getDriverMemory() + "g")
                .setConf("spark.executor.memory", sparkAppPara.getExecutorMemory() + "g")
                .setConf("spark.executor.instances", sparkAppPara.getExecutorInstances())
                .setConf("spark.executor.cores", sparkAppPara.getExecutorCores())
                .setConf("spark.yarn.queue", "root.default");
复制代码

2:后续通过团队的同学研究,可以通过livy rest 服务提交spark 任务。我们的kylin cube 构建都是通过livy 提交,整体的性能有较大的提升。b站上有很丰富的livy 介绍,大家可以自行学习。

5:实时的画像如何毫秒级别响应

1:我们的大数据部署基于cdh,datanode与hbase混布。如果yarn任务执行起来cpu或内存使用较多,hbase的响应比较慢。基于成本考虑,我们的另外一套hbase没有重新部署一套。而是将数据存储在aliyun hbase 。整体性能比较稳定rt 20ms-50ms左右。

2:hbase 存储问题搞定,hbase查询也是一个问题,到底是基于rowkey 多 column 批量查询,还是基于rowkey 直接把同一个列簇对应的所有数据查出。我们在压测的过程中各有利弊,在这里就不做发散。

基于同一个rowkey,批量查询row对应的不同 column

/**
     * 多列查询数据返回column名以及对应数据
     *
     * @param tableName
     * @return
     * @throws IOException
     */
    public Map<String, String> batchQueryTableByColumnForMap(String tableName, String rowKey, String family, List<String> column) throws IOException {
        Table table = null;
        Map<String, String> value = Maps.newHashMap();
        if (StringUtils.isBlank(tableName) || CollectionUtils.isEmpty(column)) {
            return value;
        }

        try {
            table = connection.getTable(TableName.valueOf(tableName));
            List<Get> getList = new ArrayList();
            for (String c : column) {
                Get get = new Get(Bytes.toBytes(rowKey));
                get.addColumn(Bytes.toBytes(family), Bytes.toBytes(c));
                getList.add(get);
            }
            Result[] results = table.get(getList);//重点在这,直接查getList<Get>
            for (Result result : results) {//对返回的结果集进行操作
                for (Cell cell : result.rawCells()) {
                    value.put(Bytes.toString(Bytes.copy(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength())), Bytes.toString(CellUtil.cloneValue(cell)));
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                assert table != null;
                table.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return value;
    }
复制代码

    根据rowkey查询出一个列簇对应的所有字段

/**
     * 获取值
     * expression : get 'tableName','rowkey','family:column'
     *
     * @param rowKey 行id
     * @param family 列族名
     * @return string
     */
    public  Map<String, String> getRowKeyValueMap(String tableName, String rowKey, String family) {

        Table table = null;
        Map<String, String> value = Maps.newHashMap();


        if (StringUtils.isBlank(tableName) || StringUtils.isBlank(family) || StringUtils.isBlank(rowKey)) {
            return value;
        }

        try {
            table = connection.getTable(TableName.valueOf(tableName));
            Get g = new Get(rowKey.getBytes());
            Result result = table.get(g);
            List<Cell> ceList = result.listCells();
            if (ceList != null && ceList.size() > 0) {
                for (Cell cell : ceList) {
                    value.put(Bytes.toString(Bytes.copy(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength())), Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                assert table != null;
                table.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return value;
    }
复制代码
6:如何根据数据动态判断画像结果

        1:这里要非常感谢一下阿里的qlexpress,ql判断支持丰富,大部分语法跟sql类似。我们只需要业务的同学创建规则的时候,把相关规则转为ql语法,动态的传入相关画像参数。废话不多说,上代码。(目前来看运行比较稳定)

 public static void main(String[] args) throws Exception {
        ExpressRunner runner = new ExpressRunner();
        DefaultContext<String, Object> context = new DefaultContext<String, Object>();
        context.put("vip_endtime",1623513700000L);
        context.put("vip_accumulat_count","11");
        context.put("last_buy_member_type",2);
        context.put("keyword","你视频打啊啊");
        context.put("from_app",12);
        String express = " (   ( vip_accumulat_count != null  and  vip_accumulat_count> 0 ) and  (  ( (  (vip_endtime !=null && vip_endtime >= 1623513600000 and vip_endtime < 1624204800000 )  ) )" +
                " and (last_buy_member_type!=null && last_buy_member_type != 1))) and  from_app ==12  and keyword  like '%你好%' ";
        Object r = runner.execute(express, context, null, true, true);
        System.out.println(r);

    }
复制代码

3:画像的一些理解

人群画像不是万金油,如果想要做好用户留存,付费。还是需要回归到用户的核心诉求中。根据相关用户行为分析把核心功能做好,吃透。相信用户会为你提供的服务买单。欢迎大家一起交流学习。

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享