1+ -- 1. Create table schema and data refresh pipeline
2+ CREATE MATERIALIZED TABLE dwd_orders (PRIMARY KEY(ds, id) NOT ENFORCED) PARTITIONED BY (ds) FRESHNESS = INTERVAL '3' MINUTE AS
3+ SELECT
4+ o.ds o.id,
5+ o.order_number,
6+ o.user_id
7+ FROM
8+ orders AS o
9+ LEFT JOIN products FOR SYSTEM_TIME AS OF proctime() AS prod ON o.product_id = prod.id
10+ LEFT JOIN order_pay AS pay ON o.id = pay.order_id
11+ AND o.ds = pay.ds;
12+
13+ -- 2. Pause the data refresh pipeline
14+ ALTER MATERIALIZED TABLE dwd_orders SUSPEND;
15+
16+ -- 3. Resume the data refresh pipeline
17+ ALTER MATERIALIZED TABLE dwd_orders RESUME -- Set table option via WITH clause
18+ WITH('sink.parallesim' = '10');
19+
20+ -- Refresh historical partition manually
21+ ALTER MATERIALIZED TABLE dwd_orders REFRESH PARTITION(ds = '20231023');
22+
23+ -- declares a hash function on a fixed number of 4 buckets (i.e. HASH(uid) % 4 = target bucket).
24+ CREATE TABLE MyTable (uid BIGINT, NAME STRING, proctime AS PROCTIME()) WITH (
25+ 'connector' = 'filesystem',
26+ 'path' = '/path/to/example.csv',
27+ 'format' = 'csv'
28+ ) DISTRIBUTED BY HASH(uid) INTO 4 BUCKETS;
29+
30+ -- leaves the selection of an algorithm up to the connector.
31+ CREATE TABLE MyTable (uid BIGINT, NAME STRING, proctime AS PROCTIME()) WITH (
32+ 'connector' = 'filesystem',
33+ 'path' = '/path/to/example.csv',
34+ 'format' = 'csv'
35+ ) DISTRIBUTED BY (uid) INTO 4 BUCKETS;
36+
37+ -- leaves the number of buckets up to the connector.
38+ CREATE TABLE MyTable (uid BIGINT, NAME STRING, proctime AS PROCTIME()) WITH (
39+ 'connector' = 'filesystem',
40+ 'path' = '/path/to/example.csv',
41+ 'format' = 'csv'
42+ ) DISTRIBUTED BY (uid);
43+
44+ -- only defines the number of buckets.
45+ CREATE TABLE MyTable (uid BIGINT, NAME STRING, proctime AS PROCTIME()) WITH (
46+ 'connector' = 'filesystem',
47+ 'path' = '/path/to/example.csv',
48+ 'format' = 'csv'
49+ ) DISTRIBUTED INTO 4 BUCKETS;
0 commit comments