作者:微信小助手
发布时间:2024-12-08T20:37:17
01
场景一:游戏视频指标上卷
CREATE VIEW view_01 AS
SELECT id,
f1,
f2,
MAX(f3) AS f3,
f4,
f5,
MAX(f6) AS f6,
MAX(f7) AS f7,
LAST_VALUE(f8, f5) AS f8
FROM source_table
GROUP BY
id,
f1,
f2,
f4,
f5;
INSERT INTO sink_table
SELECT id,
f1,
SUM(f3) AS f3,
CAST(f2 AS BIGINT) AS f2,
MAX(f4) AS f4,
MAX(f5) AS f5,
MAX(f6) AS f6,
MAX(f7) AS f7,
LAST_VALUE(f8, f5) AS f8
FROM view_01
GROUP BY
id,
f1,
f2;
--维表模型DDL
create table dim_table01 (
`id` BIGINT,
`f1` STRING,
`f2` BIGINT
PRIMART KEY (f1) NOT ENFORCED
) WITH (
'changelog-producer'='lookup',
'changelog-producer.row-deduplicate'='true',
'sequence.field'='f2',
...
)
create table dim_table02 (
`id` BIGINT,
`f1` STRING,
`f2` BIGINT
PRIMART KEY (f1) NOT ENFORCED
) WITH(
'changelog-producer'='lookup',
'changelog-producer.row-deduplicate'='true',
'sequence.field'='f2',
...
)
--分钟指标流关联维度
SELECT
AA.id,
BB.f1 as bb_f1,
CC.f1 as cc_f1
FROM source_table AA
LEFT JOIN
paimon.db_name.dim_table01 /*+ OPTIONS('lookup.async'='true', 'lookup.async-thread-number'='8') */
FOR SYSTEM_TIME AS OF proctime AS BB
ON AA.id = BB.id
LEFT JOIN
paimon.db_name.dim_table02 /*+ OPTIONS('lookup.async'='true', 'lookup.async-thread-number'='8') */
FOR SYSTEM_TIME AS OF proctime AS CC
ON AA.id = CC.id;
--分钟指标模型DDL
create table `db_name`.`table_name` (
`id` BIGINT,
`f1` STRING,
`f2` BIGINT,
`f3` BIGINT,
`f4` BIGINT,
`f5` STRING,
`f6` BIGINT,
`f7` STRING,
`f8` map<STRING, STRING>
PRIMARY KEY (id, f1, f2, f4, f5, date, hour) NOT ENFORCED
) PARTITIONED by (
`date` STRING comment '日期',
`hour` STRING comment '小时'
) WITH (
'changelog-producer'='lookup',
'partition.expiration-time'='30 d',
'partition.timestamp-pattern'='$date',
'partition.expiration-check-interval'='3h',
'sequence.field'='f8,f3',
...
);
--分钟指标上卷
INSERT INTO sink_table
SELECT id,
f1,
SUM(f3) AS f3,
f2,
MAX(f4) AS f4,
MAX(f5) AS f5,
MAX(f6) AS f6,
MAX(f7) AS f7,
LAST_VALUE(f8, f7) AS f8
FROM paimon.db_name.table_name
GROUP BY
id,
f1,
f2;
02
场景二:财经多流拼接
insert into sink
select id,
last_value(f1) as f1,
last_value(f2) as f2,
last_value(f3) as f3,
last_value(f4) as f4,
...
from (
select id,
f1,
f2,
cast(null as STRING) as f3,
cast(null as STRING) as f4,
...
from table1
union all
select id,
cast(null as STRING) as f1,
cast(null as STRING) as f2,
f3,
f4,
...
from table2
union all
......
)
group by id;
CREATE TABLE t (
trace_id BIGINT,
f1 STRING,
f2 STRING,
g_1 BIGINT,
f3 STRING,
f4 STRING,
g_2 BIGINT,
PRIMARY KEY (trace_id) NOT ENFORCED
) WITH (
'merge-engine'='partial-update',
'fields.g_1.sequence-group'='f1,f2', -- f1,f2字段根据 g_1 排序
'fields.g_2.sequence-group'='f3,f4' -- f3,f4字段根据 g_2 排序
);
insert into t
select trace_id,
f1,
f2,
g_1,
f3,
f4,
g_2,
...
from (
select trace_id,
f1,
f2,
g_1,
cast(null as STRING) as f3,
cast(null as STRING) as f4,
cast(null as BIGINT) as g_2,
xxx
from table1
union all
select trace_id,
cast(null as STRING) as f1,
cast(null as STRING) as f2,
cast(null as BIGINT) as g_1,
f3,
f4,
g_2,
xxx
from table2
union all
......
)
03
未来展望