以flink实时流的方式实现OneId

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

以flink实时流的方式实现OneId

前言

oneid相关概念,及其当前离线实现方式介绍请见以下链接及其系列其他文章:

用户标签(一):图计算实现ID_Mapping、Oneid打通数据孤岛

读完上述链接的相关文章,将理解oneid的需求及其实现方式.

背景

之前公司是做电商数据分析,可以接入多个数据源的数据(美团,饿了么,京东等),在我们系统中,我们将为每个用户统一打上在我们系统内部的user_id,即oneid.当时已经有了基于spark图计算实现的id-mapping来达成oneid.

到这里,我们已有的系统跟前言中链接文章提到的内容相似.

后来,我们有了新需求:实时化改造.实时为每个用户统计相关数据.

但是,实时计算的数据最后要归纳到用户上,那么我们的id-mapping也需要实时出现结果.

思考

本人不具备机器学习领域的知识,在看了几遍前言中的文章后,勉强搞懂了该图计算的原理,本质上是求最大连通图,研究后发现需要拿到全量数据进行迭代,不能改造为实时计算.

后来,基于"最大连通图"的算法,变种出了一种方法:

  1. 输入两个数据关联关系,例如 美团id1-饿了么id1,即输入两个点一个线.
  2. 拿到该关系,分别拿两个数据去与已有的id-mapping结果表对比.如,我们结果id-mapping表中,有关系
    美团id1 - oneid1
    饿了么id1 - oneid2
    
  3. 通过第一步中传入的关系,可以得出 美团id1 与 饿了么id1 在我们系统中应该识别为同一个人,对应同一个oneid,可以得到
    美团id1 - oneid1
    饿了么id1 - oneid1
    
    或者,都对应为另一个oneid
    美团id1 - oneid2
    饿了么id1 - oneid2
    
    即,我们通过传入关联关系,将 美团id1 与 饿了么id1 在我们系统中重新更新为关联到同一个oneid.
  4. 后续,某个用户id是 美团id1,那么它关联这个id-mapping结果表,可以得到它在我们系统中的id是oneid1(或者oneid2,此处根据第三步如何取值)
  5. 根据不同的对比结果,进行相应的替换或者新增,我们变相实现了"最大连通图"的算法,并且这个算法可以用flink实时计算实现

详细步骤

0. id-mapping结果表设计

结果表可以有多个描述字段,但是核心应该是以下两个字段:

原id , 计算出的oneid

1. 输入数据采集

我们在采集数据的时候,需要将数据解析成两两的关系对.如原始数据:

手机号1,美团id1,设备id1

需要将这条消息拆分为:

手机号1 - 美团id1
手机号1 - 设备id1
美团id1 - 设备id1

再将这三组关联关系传给后续对比计算.

2. 对比计算

假设我们得到关系对:

x - y

我们拿到此关系对到结果表中进行对比将有以下几种情况:

  1. x,y都没有对应oneid: 直接对结果表插入计算得出的新oneid(可以使用uuid)
    x - 新oneid
    y - 新oneid
    
  2. x已有对应oneid为 XXoneid,y没有:将y的oneid赋值为 XXoneid,并插入,得到
    x - XXoneid
    y - XXoneid
    
  3. x没有,y有oneid为 YYoneid :同第二种情况,得到
    x - YYoneid
    y - YYoneid
    
  4. x,y都有oneid,且一致,都为 ZZoneid: 不更新
  5. x,y都有oneid,且不一致,分别为 XXoneid,YYoneid :将 x,y更新为同一个oneid(XXoneid或者YYoneid),或者重新生成一个.此处看个人选择.
    并且!!!!!!
    将结果表中所有oneid为 XXoneid,YYoneid的相关数据,oneid都重设为新选择的oneid
    这是为了将相关联的其他数据一起指向新的oneid
    

至此,通过以上几种情况.我们复现了id-mapping中求最大连通图的算法.

实现程序设计

1. 数据源

kafka

2. 实时计算程序

flink

3. 对比中如何取数

redis:将结果表以k-v的形式放在内存中,这样flink可以快速取值并对比计算

4. 结果表存放

hbase:此处可以换为mysql,doris等支持更新的存储即可.并且还有以下原因:

对比计算中,第五种情况,需要从这里取所有oneid为 XXoneid,YYoneid的相关数据
而redis中没法根据value来取得key,所以第五种情况,需要查询此处存储得到相关数据

5. 结果更新

结果不但要更新hbase,还要更新redis中存放的k-v对!!!建议先更新redis,因为比较快.

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

“以flink实时流的方式实现OneId” 的相关文章