1:项目背景
当一个app达到一定的体量,千人千面、个性化营销是每一个app提升留存、付费必备法宝。最终始终离不开营销利器,用户画像。项目从0到1构建画像体系,由T+1升级为实时。过程中不断的优化画像方案,赋能产品、业务。在个性化营销的路上越来越顺滑。
2:整体的技术方案
1:实时画像通过flink stream job
1: app上报log日志到springboot,springboot 处理相关逻辑到kafka 。
2: db binlog 通过maxwell 发送到kafka。
3:所有的日志都到kafka, 画像、推荐相关的服务不需要消费所有的event。所以需要做一次二次分流降低处理所有消息的压力。我们根据具体的业务场景,把对应需要消费的数据对应的topic配置在db中。flink通过双流join (broadcast) 处理配置信息。
4:项目中把埋点、binlog 分为两个实时流,两个流不操作任何存储。把需要同步的结果发送到kafka. 启动专门的画像同步流到hbase 或es。
5:补充说明一下,我们的画像体系要满足多个app的使用。所以一定需要一个id_mapping 服务,人与设备的关系,设备与人的关系(建议保存最近的两条)。推荐、画像相关的服务需要用到mapping关系。另外appstore也禁用idfa,用户重装、以后设备信息就会发生变化。
FlinkKafkaConsumer<Event> eventConsumer = new FlinkKafkaConsumer<>(Topics.TEST, new EventSchema(), kafkaConsumerProperties);
DataStreamSource<Event> eventStream = env.addSource(eventConsumer);
//创建规则
MapStateDescriptor<String, HashMap<String, HashSet<String>>> topicRulesBroadcastState = new MapStateDescriptor<>(
"topicRulesBroadcastState",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<HashMap<String, HashSet<String>>>() {
}));
//EventRedistributeSource extends RichSourceFunction run method ,sleep 1-5分钟同步一次规则
DataStreamSource<List<TopicRedistribute>> topicInfo = env.addSource(new EventRedistributeSource());
BroadcastStream<List<TopicRedistribute>> broadcast = topicInfo.broadcast(topicRulesBroadcastState);
KeyedStream<EventResult, String> eventResultStringKeyedStream = eventStream.connect(broadcast).process(new EventRedistributeProcess(topicRulesBroadcastState)).
keyBy(new KeySelector<EventResult, String>() {
@Override
public String getKey(EventResult eventResult) throws Exception {
return eventResult.getTopic();
}
});
复制代码
2:离线画像通过spark sql
1:所有的业务数据收集通过spark streaming消费kafka存储到 hdfs.
2: 凌晨1点通过dolphinscheduler 调度spark sql 跑所有分层数据。如果app整体的埋点体系构建比较完善,历史遗留无效埋点比较少,可以设置所有埋点都可以上报。如果埋点建设有一定的混乱,建议部分埋点开启。很多无效埋点会占数据块,如果小文件管理不善,可能会导致整个集群宕机的情况。
3:dwd层的数据建议把不同APP,同一个行为的数据按照project,staticdate 时间分区。这样用户同一个行为数据集中在一起,方便后续的etl .
4: dm层的数据建议设置优先级,因为同一层的数据可能存在相互依赖的情况。一定要把数据来源, sink 目标表,ddl,exec_sqls(建议 JsonArray存储),app项目,分区,exec_level(建议越小约优先执行),状态,创建人,时间,修改人,时间(方便以后问题追踪),还可以扩展到kylin,clickhouse 等olap场景是否使用。
5:ads 层的数据基本都是聚合数据,目标数据更新到hbase ,elasticsearch 建议不要通过外表的方式更新。hbase的更新建议通过bulkload更新到hbase, elasticsearch的更新建议到通过spark rdd 通过发送kafka消息更新到画像体系。
6: etl的过程一定要记录一下执行时间,以及失败以后的重试。因为每一层的数据都是层层依赖,不然执行失败以后业务的同学发现数据有问题了。数据的同学再去补充数据,是很尴尬的做法。
for (i <- execSQLList.indices) {
var calculateStatus = 0
val sql: String = HiveUtils.hiveStaticdateReplace(execSQLList(i), jobParams.staticdate)
val start: Long = System.currentTimeMillis()
try {
spark.sql(sql)
calculateStatus = 1
} catch {
case e: Exception =>
calculateStatus = 2
throw e
} finally {
val end: Long = System.currentTimeMillis()
updateDwLoadInfo(job.getId, i + 1, start, end, end - start, calculateStatus,job.getCreator)
}
}
复制代码
3:画像结果存储问题 hbase& elasticsearch
1:当时做人群画像的时候,没有人群画像的经验。在网上找一些通用化的解决方案,一方面降低技术风险,另外一方面没有独立的服务给业务同学(业务系统有几千的qps),当时不敢大刀阔斧的干。
2:存储在hbase中主要方便实时画像,通过用户id或设备id判断用户属性是否满足规则。
3: 存储在elasticsearch中,一方面通过 elasticsearch-sql 把所有的规则拼装成sql,直接查询画像结果的预估数据,另外一方面我们的画像第一版本是离线的,创建画像结果以后我们通过spark任务,把符合目标的数据存储在redis中。做到与业务系统的解耦,起码不能因为服务问题导致业务系统出现问题。
4:业务人员根据标签选择对应的规则以后,我们通过规则动态拼装成sql 。sql再解析成dsl,最终的结果通过spark job 存储在redis 中。注意elasticsearch 支持elaticsearch-sql 需要安装对应的plugin. 相关细节可以见:github.com/NLPchina/el…
<dependency>
<groupId>org.nlpcn</groupId>
<artifactId>elasticsearch-sql</artifactId>
<version>7.4.2.0</version>
</dependency>
/**
* SQL转换DSL
*
* @param sql
* @return
* @throws Exception
*/
private String sqlToEsQuery(String sql) throws Exception {
try {
Settings settings = Settings.builder().build();
ThreadPool threadPool = new ThreadPool(settings);
Client client = new NodeClient(settings, threadPool);
SearchDao searchDao = new SearchDao(client);
return searchDao.explain(sql).explain().explain();
} catch (Exception ex) {
logger.error("error code xxx" String.format("error in xxx sqlToEsQuery method: %s ", sql) + ex);
throw new Exception(xxx, ex);
}
}
复制代码
4:业务系统如何提交spark 任务
1:业务系统封装相关提交参数,http请求通过SparkLauncher 提交spark 任务到yarn 上执行。具体可以在github 搜索一下 SparkLauncher。
SparkLauncher launcher = new SparkLauncher()
.setSparkHome("/opt/cloudera/parcels/CDH-6.1.2/lib/spark")
.setAppResource(sparkAppPara.getJarPath())
.setMainClass(sparkAppPara.getMainClass())
.setMaster(sparkAppPara.getMaster())
.setDeployMode(sparkAppPara.getDeployMode())
.setConf("spark.driver.memory", sparkAppPara.getDriverMemory() + "g")
.setConf("spark.executor.memory", sparkAppPara.getExecutorMemory() + "g")
.setConf("spark.executor.instances", sparkAppPara.getExecutorInstances())
.setConf("spark.executor.cores", sparkAppPara.getExecutorCores())
.setConf("spark.yarn.queue", "root.default");
复制代码
2:后续通过团队的同学研究,可以通过livy rest 服务提交spark 任务。我们的kylin cube 构建都是通过livy 提交,整体的性能有较大的提升。b站上有很丰富的livy 介绍,大家可以自行学习。
5:实时的画像如何毫秒级别响应
1:我们的大数据部署基于cdh,datanode与hbase混布。如果yarn任务执行起来cpu或内存使用较多,hbase的响应比较慢。基于成本考虑,我们的另外一套hbase没有重新部署一套。而是将数据存储在aliyun hbase 。整体性能比较稳定rt 20ms-50ms左右。
2:hbase 存储问题搞定,hbase查询也是一个问题,到底是基于rowkey 多 column 批量查询,还是基于rowkey 直接把同一个列簇对应的所有数据查出。我们在压测的过程中各有利弊,在这里就不做发散。
基于同一个rowkey,批量查询row对应的不同 column
/**
* 多列查询数据返回column名以及对应数据
*
* @param tableName
* @return
* @throws IOException
*/
public Map<String, String> batchQueryTableByColumnForMap(String tableName, String rowKey, String family, List<String> column) throws IOException {
Table table = null;
Map<String, String> value = Maps.newHashMap();
if (StringUtils.isBlank(tableName) || CollectionUtils.isEmpty(column)) {
return value;
}
try {
table = connection.getTable(TableName.valueOf(tableName));
List<Get> getList = new ArrayList();
for (String c : column) {
Get get = new Get(Bytes.toBytes(rowKey));
get.addColumn(Bytes.toBytes(family), Bytes.toBytes(c));
getList.add(get);
}
Result[] results = table.get(getList);//重点在这,直接查getList<Get>
for (Result result : results) {//对返回的结果集进行操作
for (Cell cell : result.rawCells()) {
value.put(Bytes.toString(Bytes.copy(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength())), Bytes.toString(CellUtil.cloneValue(cell)));
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
assert table != null;
table.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return value;
}
复制代码
根据rowkey查询出一个列簇对应的所有字段
/**
* 获取值
* expression : get 'tableName','rowkey','family:column'
*
* @param rowKey 行id
* @param family 列族名
* @return string
*/
public Map<String, String> getRowKeyValueMap(String tableName, String rowKey, String family) {
Table table = null;
Map<String, String> value = Maps.newHashMap();
if (StringUtils.isBlank(tableName) || StringUtils.isBlank(family) || StringUtils.isBlank(rowKey)) {
return value;
}
try {
table = connection.getTable(TableName.valueOf(tableName));
Get g = new Get(rowKey.getBytes());
Result result = table.get(g);
List<Cell> ceList = result.listCells();
if (ceList != null && ceList.size() > 0) {
for (Cell cell : ceList) {
value.put(Bytes.toString(Bytes.copy(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength())), Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
assert table != null;
table.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return value;
}
复制代码
6:如何根据数据动态判断画像结果
1:这里要非常感谢一下阿里的qlexpress,ql判断支持丰富,大部分语法跟sql类似。我们只需要业务的同学创建规则的时候,把相关规则转为ql语法,动态的传入相关画像参数。废话不多说,上代码。(目前来看运行比较稳定)
public static void main(String[] args) throws Exception {
ExpressRunner runner = new ExpressRunner();
DefaultContext<String, Object> context = new DefaultContext<String, Object>();
context.put("vip_endtime",1623513700000L);
context.put("vip_accumulat_count","11");
context.put("last_buy_member_type",2);
context.put("keyword","你视频打啊啊");
context.put("from_app",12);
String express = " ( ( vip_accumulat_count != null and vip_accumulat_count> 0 ) and ( ( ( (vip_endtime !=null && vip_endtime >= 1623513600000 and vip_endtime < 1624204800000 ) ) )" +
" and (last_buy_member_type!=null && last_buy_member_type != 1))) and from_app ==12 and keyword like '%你好%' ";
Object r = runner.execute(express, context, null, true, true);
System.out.println(r);
}
复制代码
3:画像的一些理解
人群画像不是万金油,如果想要做好用户留存,付费。还是需要回归到用户的核心诉求中。根据相关用户行为分析把核心功能做好,吃透。相信用户会为你提供的服务买单。欢迎大家一起交流学习。