千万级数据查询:CK、ES、RediSearch怎么选?

作者:微信小助手

发布时间:2022-06-08T00:33:12

文章来源:https://c1n.cn/EmgJv


目录
  • 前言

  • 初版设计方案

  • CK 分页查询

  • 使用ES Scroll Scan 优化深翻页

  • ES+Hbase 组合查询方案

  • RediSearch+RedisJSON 优化方案

  • 总结


前言


在开发中遇到一个业务诉求,需要在千万量级的底池数据中筛选出不超过 10W 的数据,并根据配置的权重规则进行排序、打散(如同一个类目下的商品数据不能连续出现 3 次)。下面对该业务诉求的实现,设计思路和方案优化进行介绍。


对“千万量级数据中查询 10W 量级的数据”设计了如下方案:

  • 多线程+CK 翻页方案

  • ES scroll scan 深翻页方案

  • ES+Hbase 组合方案

  • RediSearch+RedisJSON 组合方案


初版设计方案


整体方案设计为:

  • 先根据配置的「筛选规则」,从底池表中筛选出「目标数据」

  • 在根据配置的「排序规则」,对「目标数据」进行排序,得到「结果数据」


技术方案如下:


每天运行导数任务,把现有的千万量级的底池数据(Hive 表)导入到 Clickhouse 中,后续使用 CK 表进行数据筛选。


将业务配置的筛选规则和排序规则,构建为一个「筛选 + 排序」对象 SelectionQueryCondition。


从 CK 底池表取「目标数据」时,开启多线程,进行分页筛选,将获取到的「目标数据」存放到 result 列表中。

//分页大小  默认 5000
int pageSize = this.getPageSize();
//页码数
int pageCnt = totalNum / this.getPageSize() + 1;

List<Map<StringObject>> result = Lists.newArrayList();
List<Future<List<Map<StringObject>>>> futureList = new ArrayList<>(pageCnt);

//开启多线程调用
for (int i = 1; i <= pageCnt; i++) {
    //将业务配置的筛选规则和排序规则 构建为 SelectionQueryCondition 对象
    SelectionQueryCondition selectionQueryCondition = buildSelectionQueryCondition(selectionQueryRuleData);
    selectionQueryCondition.setPageSize(pageSize);
    selectionQueryCondition.setPage(i);
    futureList.add(selectionQueryEventPool.submit(new QuerySelectionDataThread(selectionQueryCondition)));
}


for (Future<List<Map<StringObject>>> future : futureList) {
    //RPC 调用
    List<Map<StringObject>> queryRes = future.get(20, TimeUnit.SECONDS);
    if (CollectionUtils.isNotEmpty(queryRes)) {
        // 将目标数据存放在 result 中
        result.addAll(queryRes);
    }
}


④对目标数据 result 进行排序,得到最终的「结果数据」。


CK 分页查询


在「初版设计方案」章节的第 3 步提到了「从 CK 底池表取目标数据时,开启多线程,进行分页筛选」。此处对 CK 分页查询进行介绍。


①封装了 queryPoolSkuList 方法,负责从 CK 表中获得目标数据。该方法内部调用了 sqlSession.selectList 方法。

public List<Map<StringObject>> queryPoolSkuList( Map<StringObject> params ) {
    List<Map<StringObject>> resultMaps = new ArrayList<>();

    QueryCondition queryCondition = parseQueryCondition(params);
    List<Map<StringObject>> mapList = lianNuDao.queryPoolSkuList(getCkDt(),queryCondition);
    if (CollectionUtils.isNotEmpty(mapList)) {
        for (Map<String,Object> data : mapList) {
            resultMaps.add(camelKey(data));
        }
    }
    return resultMaps;
}


// lianNuDao.queryPoolSkuList

@Autowired
@Qualifier("ckSqlNewSession")
private SqlSession sqlSession;

public List<Map<StringObject>> queryPoolSkuList( String dt, QueryCondition queryCondition ) {
    queryCondition.setDt(dt);
    queryCondition.checkMultiQueryItems();
    return sqlSession.selectList("LianNu.queryPoolSkuList",queryCondition);
}



   

②sqlSession.selectList 方法中调用了和 CK 交互的 queryPoolSkuList 查询方法,部分代码如下:

<select id="queryPoolSkuList" parameterType="com.jd.bigai.domain.liannu.QueryCondition" resultType="java.util.Map">
    select sku_pool_id,i
    tem_sku_id,
    skuPoolName,
    price,
    ...
    ...
    businessType
    from liannu_sku_pool_indicator_all
    where
    dt=#{dt}
    and
    <foreach collection="queryItems" separator=" and " item="queryItem" open=" " close=" " >
        <choose>
            <when test="queryItem.type == 'equal'">
                ${queryItem.field} = #{queryItem.value}
            </when>
            ...
            ...
        </choose>
    </foreach>
    <if test="orderBy == null">
        group by sku_pool_id,item_sku_id
    </if>
    <if test="orderBy != null">
        group by sku_pool_id,item_sku_id,${orderBy} order by ${orderBy} ${orderAd}
    </if>
    <if test="limitEnd != 0">
        limit #{limitStart},#{limitEnd}
    </if>
</select>


③可以看到,在 CK 分页查询时,是通过 limit #{limitStart},#{limitEnd} 实现的分页。


limit 分页方案,在「深翻页」时会存在性能问题。初版方案上线后,在 1000W 量级的底池数据中筛选 10W 的数据,最坏耗时会达到 10s~18s 左右。


使用 ES Scroll Scan 优化深翻页