作者:微信小助手
发布时间:2022-06-08T00:33:12
文章来源:https://c1n.cn/EmgJv 前言 初版设计方案 CK 分页查询 使用ES Scroll Scan 优化深翻页 ES+Hbase 组合查询方案 RediSearch+RedisJSON 优化方案 总结 前言 在开发中遇到一个业务诉求,需要在千万量级的底池数据中筛选出不超过 10W 的数据,并根据配置的权重规则进行排序、打散(如同一个类目下的商品数据不能连续出现 3 次)。下面对该业务诉求的实现,设计思路和方案优化进行介绍。 对“千万量级数据中查询 10W 量级的数据”设计了如下方案: 多线程+CK 翻页方案 ES scroll scan 深翻页方案 ES+Hbase 组合方案 RediSearch+RedisJSON 组合方案 初版设计方案 整体方案设计为: 先根据配置的「筛选规则」,从底池表中筛选出「目标数据」 在根据配置的「排序规则」,对「目标数据」进行排序,得到「结果数据」 技术方案如下: ①每天运行导数任务,把现有的千万量级的底池数据(Hive 表)导入到 Clickhouse 中,后续使用 CK 表进行数据筛选。 ②将业务配置的筛选规则和排序规则,构建为一个「筛选 + 排序」对象 SelectionQueryCondition。 ③从 CK 底池表取「目标数据」时,开启多线程,进行分页筛选,将获取到的「目标数据」存放到 result 列表中。 ④对目标数据 result 进行排序,得到最终的「结果数据」。 CK 分页查询 在「初版设计方案」章节的第 3 步提到了「从 CK 底池表取目标数据时,开启多线程,进行分页筛选」。此处对 CK 分页查询进行介绍。 ①封装了 queryPoolSkuList 方法,负责从 CK 表中获得目标数据。该方法内部调用了 sqlSession.selectList 方法。 ②sqlSession.selectList 方法中调用了和 CK 交互的 queryPoolSkuList 查询方法,部分代码如下: ③可以看到,在 CK 分页查询时,是通过 limit #{limitStart},#{limitEnd} 实现的分页。 limit 分页方案,在「深翻页」时会存在性能问题。初版方案上线后,在 1000W 量级的底池数据中筛选 10W 的数据,最坏耗时会达到 10s~18s 左右。 使用 ES Scroll Scan 优化深翻页
//分页大小 默认 5000
int pageSize = this.getPageSize();
//页码数
int pageCnt = totalNum / this.getPageSize() + 1;
List<Map<String, Object>> result = Lists.newArrayList();
List<Future<List<Map<String, Object>>>> futureList = new ArrayList<>(pageCnt);
//开启多线程调用
for (int i = 1; i <= pageCnt; i++) {
//将业务配置的筛选规则和排序规则 构建为 SelectionQueryCondition 对象
SelectionQueryCondition selectionQueryCondition = buildSelectionQueryCondition(selectionQueryRuleData);
selectionQueryCondition.setPageSize(pageSize);
selectionQueryCondition.setPage(i);
futureList.add(selectionQueryEventPool.submit(new QuerySelectionDataThread(selectionQueryCondition)));
}
for (Future<List<Map<String, Object>>> future : futureList) {
//RPC 调用
List<Map<String, Object>> queryRes = future.get(20, TimeUnit.SECONDS);
if (CollectionUtils.isNotEmpty(queryRes)) {
// 将目标数据存放在 result 中
result.addAll(queryRes);
}
}public List<Map<String, Object>> queryPoolSkuList( Map<String, Object> params ) {
List<Map<String, Object>> resultMaps = new ArrayList<>();
QueryCondition queryCondition = parseQueryCondition(params);
List<Map<String, Object>> mapList = lianNuDao.queryPoolSkuList(getCkDt(),queryCondition);
if (CollectionUtils.isNotEmpty(mapList)) {
for (Map<String,Object> data : mapList) {
resultMaps.add(camelKey(data));
}
}
return resultMaps;
}// lianNuDao.queryPoolSkuList
@Autowired
@Qualifier("ckSqlNewSession")
private SqlSession sqlSession;
public List<Map<String, Object>> queryPoolSkuList( String dt, QueryCondition queryCondition ) {
queryCondition.setDt(dt);
queryCondition.checkMultiQueryItems();
return sqlSession.selectList("LianNu.queryPoolSkuList",queryCondition);
}<select id="queryPoolSkuList" parameterType="com.jd.bigai.domain.liannu.QueryCondition" resultType="java.util.Map">
select sku_pool_id,i
tem_sku_id,
skuPoolName,
price,
...
...
businessType
from liannu_sku_pool_indicator_all
where
dt=#{dt}
and
<foreach collection="queryItems" separator=" and " item="queryItem" open=" " close=" " >
<choose>
<when test="queryItem.type == 'equal'">
${queryItem.field} = #{queryItem.value}
</when>
...
...
</choose>
</foreach>
<if test="orderBy == null">
group by sku_pool_id,item_sku_id
</if>
<if test="orderBy != null">
group by sku_pool_id,item_sku_id,${orderBy} order by ${orderBy} ${orderAd}
</if>
<if test="limitEnd != 0">
limit #{limitStart},#{limitEnd}
</if>
</select>