公司原本只有使用 OLTP 的資料庫,直到這一年來資料量越來越多導致報表類的 query 越查越慢。於是開始研究起 OLAP 的資料庫,希望可以把報表類的 query 拆出去,讓 OLTP 的資料庫專注在 OLTP 的業務上。於是今天要來分享最近在研究的 OLAP 資料庫叫 clickhouse,它有一種能做到平行運算的 distributed table engine,並且測試他在撈上億筆的資料的性能如何。
- 介紹
- 架構
- 創建實驗用的表
- 往 distributed table 塞測試用假資料
- 實驗
- 透過 distributed table 取得單個 shard 內的資料
- 透過 distributed table 取得多個 shard 內的資料 (一)
- 透過 distributed table 取得多個 shard 內的資料 (二)
- 透過 distributed table 取得多個 shard 內的資料 (三)
- 總結
介紹
使用 clickhouse 的 distributed table engine 創出來的表自身並不實際儲存資料,而是會指向到 cluster 內多個實際儲存資料的表上,我們可以把它想像成 RDBMS 的 view。這樣的優勢是在當今天對 distributed 的表下 query 時,distributed 的表會平行將我們的 query 送到各個實際儲存資料的表撈出資料後匯總,最後再返回給我們。
也就是當我們今天架 cluser 並且資料做了 sharding 分散在不同台 node 上,就能將運算量分攤到數個 node,做到水平擴展。
白話文一點的例子就好比每個月財務要為大家計算薪資。公司有 100 位夥伴但是只有 1 位財務時,那麼這位財務夥伴在月底時就可能需要加班做員工編號 EMP001 到 EMP100 共 100 份薪資的帳;
如果今天公司又招募了另一位財務夥伴的加入,那麼財務夥伴 1 這時只要負責 EMP001 到 EMP050 號同仁的帳,剩下 EMP051 到 EMP101 由財務夥伴 2 負責,瞬間工作量減少了一半,大家都可以準時下班回家 (撒花。
大概的原理差不多說完了,接下來我們會拿遊戲注單紀錄表來做舉例,以及實際使用假注單來實驗 distributed table 平行運算的效能如何。
透過 distributed table 撈注單運作示意圖
架構
目前取得試玩的 dev 環境採用的是 3 shards 2 replicas 的架構。3 shards 的意思是將資料根據 sharding key 均攤 3 份,擁有相同 sharding key 的資料會落在同個 shard 上。2 replicas 是指每個 shard 的資料總共會存 2 份,以免其中一處資料損毀時還能從另一處取回。
(兩處都損毀的話差不多可以嘗試學習其他技能了,e.g., 甜度冰塊、會員統編載具...)
以下是 3 shards 2 replicas 在 dev 環境的架法,可以看到如果將任一一個 node 劃掉時,shard 1、shard 2、shard 3 的資料還是可以在另外 2 台找到。
- zookeeper
- clickhouse cluster (3 shards、2 replicas)
- node 1 (10.1.103.179)
- shard 1、replica 1
- shard 3、replica 2
- node 2 (10.1.103.180)
- shard 2、replica 1
- shard 1、replica 2
- node 3 (10.1.103.181)
- shard 3、replica 1
- shard 2、replica 2
創建實驗用的表
為了實驗測試我們的遊戲注單範例,我們首先需要創建 t_bet_origin
原始注單表在 3 shards * 2 replicas = 6 個不同 replicas 上,以下為 DDL 語法。
以及 3 個 t_bet_origin_distributed
虛擬的 distributed table 在 3 個 node 上,這樣程式連上任意一個 node 都能 query 到整個 cluster 的遊戲注單資料。
原始投注紀錄表
node 1
- shard 1、replica 1
CREATE TABLE dev01.t_bet_origin
(
`order_id` String COMMENT '注單編號',
`username` String COMMENT '用戶loginname',
`product_id` String COMMENT '產品號',
`action` String COMMENT '動作',
`ip` String COMMENT 'IP',
`game_type` String COMMENT '遊戲類型',
`play_type` Int32 COMMENT '玩法',
`table_id` String COMMENT '桌號',
`gmcode` String COMMENT '局號',
`amount` Decimal(18, 2) COMMENT '投注額',
`valid_amount` Decimal(18, 2) COMMENT '有效投注額',
`ratio` Decimal(14, 0) COMMENT '轉換成RMB的比率',
`currency` String COMMENT '幣別',
`agcode` String COMMENT 'agcode',
`up_agcode` String COMMENT '上層agcode',
`agid` String COMMENT '代理線ID',
`log_time` DateTime('Asia/Taipei') COMMENT 'log寫入時間(結算時間)',
`created_at` DateTime('Asia/Taipei') DEFAULT now() COMMENT '創建時間',
PROJECTION t_bet_origin_gmcode_projection_v2 (
SELECT * ORDER BY gmcode
)
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/andy_test_cluster/shard01/tables/t_bet_origin', 'replica01')
PRIMARY KEY (
agcode,
order_id,
play_type
)
ORDER BY (
agcode,
order_id,
play_type
)
COMMENT '原始投注紀錄';
- shard 3、replica 2
CREATE TABLE dev03.t_bet_origin
(
`order_id` String COMMENT '注單編號',
`username` String COMMENT '用戶loginname',
`product_id` String COMMENT '產品號',
`action` String COMMENT '動作',
`ip` String COMMENT 'IP',
`game_type` String COMMENT '遊戲類型',
`play_type` Int32 COMMENT '玩法',
`table_id` String COMMENT '桌號',
`gmcode` String COMMENT '局號',
`amount` Decimal(18, 2) COMMENT '投注額',
`valid_amount` Decimal(18, 2) COMMENT '有效投注額',
`ratio` Decimal(14, 0) COMMENT '轉換成RMB的比率',
`currency` String COMMENT '幣別',
`agcode` String COMMENT 'agcode',
`up_agcode` String COMMENT '上層agcode',
`agid` String COMMENT '代理線ID',
`log_time` DateTime('Asia/Taipei') COMMENT 'log寫入時間(結算時間)',
`created_at` DateTime('Asia/Taipei') DEFAULT now() COMMENT '創建時間',
PROJECTION t_bet_origin_gmcode_projection_v2 (
SELECT * ORDER BY gmcode
)
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/andy_test_cluster/shard03/tables/t_bet_origin', 'replica02')
PRIMARY KEY (
agcode,
order_id,
play_type
)
ORDER BY (
agcode,
order_id,
play_type
)
COMMENT '原始投注紀錄';
node 2
- shard 2、replica 1
CREATE TABLE dev02.t_bet_origin
(
`order_id` String COMMENT '注單編號',
`username` String COMMENT '用戶loginname',
`product_id` String COMMENT '產品號',
`action` String COMMENT '動作',
`ip` String COMMENT 'IP',
`game_type` String COMMENT '遊戲類型',
`play_type` Int32 COMMENT '玩法',
`table_id` String COMMENT '桌號',
`gmcode` String COMMENT '局號',
`amount` Decimal(18, 2) COMMENT '投注額',
`valid_amount` Decimal(18, 2) COMMENT '有效投注額',
`ratio` Decimal(14, 0) COMMENT '轉換成RMB的比率',
`currency` String COMMENT '幣別',
`agcode` String COMMENT 'agcode',
`up_agcode` String COMMENT '上層agcode',
`agid` String COMMENT '代理線ID',
`log_time` DateTime('Asia/Taipei') COMMENT 'log寫入時間(結算時間)',
`created_at` DateTime('Asia/Taipei') DEFAULT now() COMMENT '創建時間',
PROJECTION t_bet_origin_gmcode_projection_v2 (
SELECT * ORDER BY gmcode
)
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/andy_test_cluster/shard02/tables/t_bet_origin', 'replica01')
PRIMARY KEY (
agcode,
order_id,
play_type
)
ORDER BY (
agcode,
order_id,
play_type
)
COMMENT '原始投注紀錄';
- shard 1、replica 2
CREATE TABLE dev01.t_bet_origin
(
`order_id` String COMMENT '注單編號',
`username` String COMMENT '用戶loginname',
`product_id` String COMMENT '產品號',
`action` String COMMENT '動作',
`ip` String COMMENT 'IP',
`game_type` String COMMENT '遊戲類型',
`play_type` Int32 COMMENT '玩法',
`table_id` String COMMENT '桌號',
`gmcode` String COMMENT '局號',
`amount` Decimal(18, 2) COMMENT '投注額',
`valid_amount` Decimal(18, 2) COMMENT '有效投注額',
`ratio` Decimal(14, 0) COMMENT '轉換成RMB的比率',
`currency` String COMMENT '幣別',
`agcode` String COMMENT 'agcode',
`up_agcode` String COMMENT '上層agcode',
`agid` String COMMENT '代理線ID',
`log_time` DateTime('Asia/Taipei') COMMENT 'log寫入時間(結算時間)',
`created_at` DateTime('Asia/Taipei') DEFAULT now() COMMENT '創建時間',
PROJECTION t_bet_origin_gmcode_projection_v2 (
SELECT * ORDER BY gmcode
)
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/andy_test_cluster/shard01/tables/t_bet_origin', 'replica02')
PRIMARY KEY (
agcode,
order_id,
play_type
)
ORDER BY (
agcode,
order_id,
play_type
)
COMMENT '原始投注紀錄';
node 3
- shard 3、replica 1
CREATE TABLE dev03.t_bet_origin
(
`order_id` String COMMENT '注單編號',
`username` String COMMENT '用戶loginname',
`product_id` String COMMENT '產品號',
`action` String COMMENT '動作',
`ip` String COMMENT 'IP',
`game_type` String COMMENT '遊戲類型',
`play_type` Int32 COMMENT '玩法',
`table_id` String COMMENT '桌號',
`gmcode` String COMMENT '局號',
`amount` Decimal(18, 2) COMMENT '投注額',
`valid_amount` Decimal(18, 2) COMMENT '有效投注額',
`ratio` Decimal(14, 0) COMMENT '轉換成RMB的比率',
`currency` String COMMENT '幣別',
`agcode` String COMMENT 'agcode',
`up_agcode` String COMMENT '上層agcode',
`agid` String COMMENT '代理線ID',
`log_time` DateTime('Asia/Taipei') COMMENT 'log寫入時間(結算時間)',
`created_at` DateTime('Asia/Taipei') DEFAULT now() COMMENT '創建時間',
PROJECTION t_bet_origin_gmcode_projection_v2 (
SELECT * ORDER BY gmcode
)
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/andy_test_cluster/shard03/tables/t_bet_origin', 'replica01')
PRIMARY KEY (
agcode,
order_id,
play_type
)
ORDER BY (
agcode,
order_id,
play_type
)
COMMENT '原始投注紀錄';
- shard 2、replica 2
CREATE TABLE dev02.t_bet_origin
(
`order_id` String COMMENT '注單編號',
`username` String COMMENT '用戶loginname',
`product_id` String COMMENT '產品號',
`action` String COMMENT '動作',
`ip` String COMMENT 'IP',
`game_type` String COMMENT '遊戲類型',
`play_type` Int32 COMMENT '玩法',
`table_id` String COMMENT '桌號',
`gmcode` String COMMENT '局號',
`amount` Decimal(18, 2) COMMENT '投注額',
`valid_amount` Decimal(18, 2) COMMENT '有效投注額',
`ratio` Decimal(14, 0) COMMENT '轉換成RMB的比率',
`currency` String COMMENT '幣別',
`agcode` String COMMENT 'agcode',
`up_agcode` String COMMENT '上層agcode',
`agid` String COMMENT '代理線ID',
`log_time` DateTime('Asia/Taipei') COMMENT 'log寫入時間(結算時間)',
`created_at` DateTime('Asia/Taipei') DEFAULT now() COMMENT '創建時間',
PROJECTION t_bet_origin_gmcode_projection_v2 (
SELECT * ORDER BY gmcode
)
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/andy_test_cluster/shard02/tables/t_bet_origin', 'replica02')
PRIMARY KEY (
agcode,
order_id,
play_type
)
ORDER BY (
agcode,
order_id,
play_type
)
COMMENT '原始投注紀錄';
封裝原始投注紀錄表的 distributed table
node 1
CREATE TABLE dev.t_bet_origin_distributed (
`order_id` String COMMENT '注單編號',
`username` String COMMENT '用戶loginname',
`product_id` String COMMENT '產品號',
`action` String COMMENT '動作',
`ip` String COMMENT 'IP',
`game_type` String COMMENT '遊戲類型',
`play_type` Int32 COMMENT '玩法',
`table_id` String COMMENT '桌號',
`gmcode` String COMMENT '局號',
`amount` Decimal(18, 2) COMMENT '投注額',
`valid_amount` Decimal(18, 2) COMMENT '有效投注額',
`ratio` Decimal(14, 0) COMMENT '轉換成RMB的比率',
`currency` String COMMENT '幣別',
`agcode` String COMMENT 'agcode',
`up_agcode` String COMMENT '上層agcode',
`agid` String COMMENT '代理線ID',
`log_time` DateTime COMMENT 'log寫入時間(結算時間)',
`created_at` DateTime DEFAULT now() COMMENT '創建時間'
) ENGINE = Distributed (
'andy_test_cluster',
'',
't_bet_origin',
halfMD5(agcode)
);
node 2
CREATE TABLE dev.t_bet_origin_distributed (
`order_id` String COMMENT '注單編號',
`username` String COMMENT '用戶loginname',
`product_id` String COMMENT '產品號',
`action` String COMMENT '動作',
`ip` String COMMENT 'IP',
`game_type` String COMMENT '遊戲類型',
`play_type` Int32 COMMENT '玩法',
`table_id` String COMMENT '桌號',
`gmcode` String COMMENT '局號',
`amount` Decimal(18, 2) COMMENT '投注額',
`valid_amount` Decimal(18, 2) COMMENT '有效投注額',
`ratio` Decimal(14, 0) COMMENT '轉換成RMB的比率',
`currency` String COMMENT '幣別',
`agcode` String COMMENT 'agcode',
`up_agcode` String COMMENT '上層agcode',
`agid` String COMMENT '代理線ID',
`log_time` DateTime COMMENT 'log寫入時間(結算時間)',
`created_at` DateTime DEFAULT now() COMMENT '創建時間'
) ENGINE = Distributed (
'andy_test_cluster',
'',
't_bet_origin',
halfMD5(agcode)
);
node 3
CREATE TABLE dev.t_bet_origin_distributed (
`order_id` String COMMENT '注單編號',
`username` String COMMENT '用戶loginname',
`product_id` String COMMENT '產品號',
`action` String COMMENT '動作',
`ip` String COMMENT 'IP',
`game_type` String COMMENT '遊戲類型',
`play_type` Int32 COMMENT '玩法',
`table_id` String COMMENT '桌號',
`gmcode` String COMMENT '局號',
`amount` Decimal(18, 2) COMMENT '投注額',
`valid_amount` Decimal(18, 2) COMMENT '有效投注額',
`ratio` Decimal(14, 0) COMMENT '轉換成RMB的比率',
`currency` String COMMENT '幣別',
`agcode` String COMMENT 'agcode',
`up_agcode` String COMMENT '上層agcode',
`agid` String COMMENT '代理線ID',
`log_time` DateTime COMMENT 'log寫入時間(結算時間)',
`created_at` DateTime DEFAULT now() COMMENT '創建時間'
) ENGINE = Distributed (
'andy_test_cluster',
'',
't_bet_origin',
halfMD5(agcode)
);
往 distributed table 塞測試用假資料
創完表後接下來分別為 3 個 shard 各自塞 1 億筆注單,其中分為 5000 萬筆來自同一個人、另外 5000 萬筆來自另一個人。
由於我們已經為 t_bet_origin_distributed
設定 sharding key 為 halfMD5(agcode)
了,所以只要往 t_bet_origin_distributed
這個 distributed table 無腦塞資料,它就會自動根據 agcode
把資料推到對應的 shard 去。
為了實驗方便,那幾組 agcode
是我刻意挑過的,這樣才能製造出每個 shard 各一億筆資料。
塞的假資料分佈如下:
shard 1:
- username: test_data_shard_01_user_1854629 → 5000 萬筆注
- username:test_data_shard_01_user_1853014 → 5000 萬筆注單
shard 2:
- username:test_data_shard_02_user_1850001 → 5000 萬筆注
- username:test_data_shard_02_user_1850377 → 5000 萬筆注單
shard 3:
- username:test_data_shard_03_user_1854620 → 5000 萬筆注
- username:test_data_shard_03_user_1854628 → 5000 萬筆注單
package main
import (
"context"
"fmt"
"log"
"math/rand"
"time"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/google/uuid"
"github.com/shopspring/decimal"
)
var conn1 driver.Conn
func init() {
rand.Seed(1) // 產固定隨機
options1 := clickhouse.Options{
Addr: []string{"10.1.103.179:9000"},
Auth: clickhouse.Auth{
Username: "andywu",
Password: "123456",
},
Debug: true,
}
var err error
conn1, err = clickhouse.Open(&options1)
if err != nil {
log.Fatal(err)
}
}
func main() {
query1 := `
INSERT INTO dev.t_bet_origin_distributed (
order_id,
username,
product_id,
action,
ip,
game_type,
play_type,
table_id,
gmcode,
amount,
valid_amount,
ratio,
currency,
agcode,
up_agcode,
agid,
log_time
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`
var i int64
var err error
var batch1 driver.Batch
tm := time.Now().Add(-1 * 72 * time.Hour)
for i = 1; i <= 50_000_000; i++ {
if batch1 == nil {
batch1, err = conn1.PrepareBatch(context.Background(), query1)
if err != nil {
log.Fatal(err)
}
}
err = appendShard1DataToBatch(batch1, tm, i)
// err = appendShard2DataToBatch(batch1, tm, i)
// err = appendShard3DataToBatch(batch1, tm, i)
if err != nil {
log.Fatal(err)
}
if i%1_000_000 == 0 {
if err := batch1.Send(); err != nil {
log.Fatal(err)
}
batch1 = nil
log.Printf("Batch %v processed.\n", i)
}
}
log.Println("Data insertion completed.")
}
func appendShard1DataToBatch(batch driver.Batch, tm time.Time, i int64) error {
// 1853014 -- shard 1
// 1854629 -- shard 1
orderID := fmt.Sprintf("%09d-%s", i, uuid.New().String())
username := "test_data_shard_01_user_1853014"
agcode := "1853014"
upAgcode := "21563"
productID := "AFWF9"
action := "settle"
ip := "127.0.0.1"
gameType := "BAC"
playType := 3
table_id := "9D11"
gmcode := "9D11247290ZH"
amount := decimal.NewFromInt(1000)
validAmount := decimal.NewFromInt(1000)
ratio := decimal.NewFromInt(100)
currency := "HKD"
agid := "3121"
logTime := tm.Add(time.Duration(i) * (time.Millisecond))
return batch.Append(
orderID,
username,
productID,
action,
ip,
gameType,
playType,
table_id,
gmcode,
amount,
validAmount,
ratio,
currency,
agcode,
upAgcode,
agid,
logTime,
)
}
func appendShard2DataToBatch(batch driver.Batch, tm time.Time, i int64) error {
// 1850001 -- shard 2
// 1850377 -- shard 2
orderID := fmt.Sprintf("%09d-%s", i, uuid.New().String())
username := "test_data_shard_02_user_1850001"
agcode := "1850001"
upAgcode := "21563"
productID := "AFWF9"
action := "settle"
ip := "127.0.0.1"
gameType := "BAC"
playType := 3
table_id := "9D11"
gmcode := "9D11247290ZH"
amount := decimal.NewFromInt(1000)
validAmount := decimal.NewFromInt(1000)
ratio := decimal.NewFromInt(100)
currency := "HKD"
agid := "3121"
logTime := tm.Add(time.Duration(i) * (time.Millisecond))
return batch.Append(
orderID,
username,
productID,
action,
ip,
gameType,
playType,
table_id,
gmcode,
amount,
validAmount,
ratio,
currency,
agcode,
upAgcode,
agid,
logTime,
)
}
func appendShard3DataToBatch(batch driver.Batch, tm time.Time, i int64) error {
// 1854628 -- shard 3
// 1854620 -- shard 3
orderID := fmt.Sprintf("%09d-%s", i, uuid.New().String())
username := "test_data_shard_03_user_1854628"
agcode := "1854628"
upAgcode := "21563"
productID := "AFWF9"
action := "settle"
ip := "127.0.0.1"
gameType := "BAC"
playType := 3
table_id := "9D11"
gmcode := "9D11247290ZH"
amount := decimal.NewFromInt(1000)
validAmount := decimal.NewFromInt(1000)
ratio := decimal.NewFromInt(100)
currency := "HKD"
agid := "3121"
logTime := tm.Add(time.Duration(i) * (time.Millisecond))
return batch.Append(
orderID,
username,
productID,
action,
ip,
gameType,
playType,
table_id,
gmcode,
amount,
validAmount,
ratio,
currency,
agcode,
upAgcode,
agid,
logTime,
)
}
塞完後連上每個 node 確認一下 node 上的每個 shard 都有我們預期的資料:
node 1:
root@e516a8a6937a:/# clickhouse-client --host=10.1.103.179 --user=andywu --password=123456
ClickHouse client version 24.3.6.48 (official build).
Connecting to 10.1.103.179:9000 as user andywu.
Connected to ClickHouse server version 24.2.1.
dev-clickhouse-cluster01 :) SELECT
username,
COUNT(1)
FROM dev01.t_bet_origin
GROUP BY username
Query id: 6f1da9ff-233a-4530-af7c-016d353c6dc4
┌─username────────────────────────┬──count()─┐
1. │ test_data_shard_01_user_1853014 │ 50000000 │
2. │ test_data_shard_01_user_1854629 │ 50000000 │
└─────────────────────────────────┴──────────┘
2 rows in set. Elapsed: 1.305 sec. Processed 100.00 million rows, 4.00 GB (76.66 million rows/s., 3.07 GB/s.)
Peak memory usage: 442.27 KiB.
dev-clickhouse-cluster01 :) SELECT
username,
COUNT(1)
FROM dev03.t_bet_origin
GROUP BY username
Query id: 2d2dadb6-3632-4bb0-b561-a693efd0ddf0
┌─username────────────────────────┬──count()─┐
1. │ test_data_shard_03_user_1854620 │ 50000000 │
2. │ test_data_shard_03_user_1854628 │ 50000000 │
└─────────────────────────────────┴──────────┘
2 rows in set. Elapsed: 1.309 sec. Processed 100.00 million rows, 4.00 GB (76.40 million rows/s., 3.06 GB/s.)
Peak memory usage: 439.82 KiB.
dev-clickhouse-cluster01 :)
node 2:
root@e516a8a6937a:/# clickhouse-client --host=10.1.103.180 --user=andywu --password=123456
ClickHouse client version 24.3.6.48 (official build).
Connecting to 10.1.103.180:9000 as user andywu.
Connected to ClickHouse server version 24.2.1.
dev-clickhouse-cluster02 :) SELECT
username,
COUNT(1)
FROM dev01.t_bet_origin
GROUP BY username
Query id: 3b56b5d7-b613-4d53-b342-25b69f81ba54
┌─username────────────────────────┬──count()─┐
1. │ test_data_shard_01_user_1853014 │ 50000000 │
2. │ test_data_shard_01_user_1854629 │ 50000000 │
└─────────────────────────────────┴──────────┘
2 rows in set. Elapsed: 1.403 sec. Processed 100.00 million rows, 4.00 GB (71.27 million rows/s., 2.85 GB/s.)
Peak memory usage: 222.72 KiB.
dev-clickhouse-cluster02 :) SELECT
username,
COUNT(1)
FROM dev02.t_bet_origin
GROUP BY username
Query id: 6c1cf562-f58d-45a0-92d8-79f57b14d00a
┌─username────────────────────────┬──count()─┐
1. │ test_data_shard_02_user_1850001 │ 50000000 │
2. │ test_data_shard_02_user_1850377 │ 50000000 │
└─────────────────────────────────┴──────────┘
2 rows in set. Elapsed: 1.429 sec. Processed 100.00 million rows, 4.00 GB (69.97 million rows/s., 2.80 GB/s.)
Peak memory usage: 223.03 KiB.
dev-clickhouse-cluster02 :)
node 3:
root@e516a8a6937a:/# clickhouse-client --host=10.1.103.181 --user=andywu --password=123456
ClickHouse client version 24.3.6.48 (official build).
Connecting to 10.1.103.181:9000 as user andywu.
Connected to ClickHouse server version 24.2.1.
dev-clickhouse-cluster03 :) SELECT
username,
COUNT(1)
FROM dev02.t_bet_origin
GROUP BY username
Query id: 6f4b1346-d5f6-499c-8450-3752b32aa1b3
┌─username────────────────────────┬──count()─┐
1. │ test_data_shard_02_user_1850001 │ 50000000 │
2. │ test_data_shard_02_user_1850377 │ 50000000 │
└─────────────────────────────────┴──────────┘
2 rows in set. Elapsed: 1.759 sec. Processed 100.00 million rows, 4.00 GB (56.84 million rows/s., 2.27 GB/s.)
Peak memory usage: 419.44 KiB.
dev-clickhouse-cluster03 :) SELECT
username,
COUNT(1)
FROM dev03.t_bet_origin
GROUP BY username
Query id: 2e76c3c5-1a9e-42ab-9f22-558194654c24
┌─username────────────────────────┬──count()─┐
1. │ test_data_shard_03_user_1854620 │ 50000000 │
2. │ test_data_shard_03_user_1854628 │ 50000000 │
└─────────────────────────────────┴──────────┘
2 rows in set. Elapsed: 2.042 sec. Processed 100.00 million rows, 4.00 GB (48.98 million rows/s., 1.96 GB/s.)
Peak memory usage: 473.65 KiB.
dev-clickhouse-cluster03 :)
到這邊實驗會用到的資料就準備完畢了。
實驗
1. 透過 distributed table 取得單個 shard 內的資料
實驗 1 我們來看看透過 distributed table 拿到 agcode = '1854629'
這位用戶的有效投注加總。
root@e516a8a6937a:/# clickhouse-client --host=10.1.103.179 --user=andywu --password=123456
ClickHouse client version 24.3.6.48 (official build).
Connecting to 10.1.103.179:9000 as user andywu.
Connected to ClickHouse server version 24.2.1.
dev-clickhouse-cluster01 :) SELECT
agcode,
SUM(valid_amount)
FROM dev.t_bet_origin_distributed
WHERE agcode = '1854629'
GROUP BY agcode
Query id: a0a3cead-1ad3-4aad-ac0d-99550ef2ef74
┌─agcode──┬─sum(valid_amount)─┐
1. │ 1854629 │ 50000000000 │
└─────────┴───────────────────┘
1 row in set. Elapsed: 0.722 sec. Processed 50.00 million rows, 1.20 GB (69.24 million rows/s., 1.66 GB/s.)
Peak memory usage: 259.88 KiB.
dev-clickhouse-cluster01 :)
執行完畢,將 5 千萬筆資料做 sum 花不到一秒,clickhouse 效能非常好。
接著來更近一步透過以下語法查看 distributed table 是如何分派這個 query。
dev-clickhouse-cluster01 :) SELECT
hostname,
type,
is_initial_query,
databases,
query,
event_time_microseconds,
query_start_time_microseconds,
query_duration_ms,
read_rows,
read_bytes,
result_rows,
result_bytes,
memory_usage,
query_cache_usage
FROM remote('10.1.103.179', system.query_log, 'andywu', '123456')
WHERE initial_query_id = 'a0a3cead-1ad3-4aad-ac0d-99550ef2ef74'
ORDER BY event_time_microseconds DESC
UNION ALL
SELECT
hostname,
type,
is_initial_query,
databases,
query,
event_time_microseconds,
query_start_time_microseconds,
query_duration_ms,
read_rows,
read_bytes,
result_rows,
result_bytes,
memory_usage,
query_cache_usage
FROM remote('10.1.103.180', system.query_log, 'andywu', '123456')
WHERE initial_query_id = 'a0a3cead-1ad3-4aad-ac0d-99550ef2ef74'
ORDER BY event_time_microseconds DESC
UNION ALL
SELECT
hostname,
type,
is_initial_query,
databases,
query,
event_time_microseconds,
query_start_time_microseconds,
query_duration_ms,
read_rows,
read_bytes,
result_rows,
result_bytes,
memory_usage,
query_cache_usage
FROM remote('10.1.103.181', system.query_log, 'andywu', '123456')
WHERE initial_query_id = 'a0a3cead-1ad3-4aad-ac0d-99550ef2ef74'
ORDER BY event_time_microseconds DESC
Query id: 9dc8d8d1-2131-4e95-83a6-1495f4570ec7
┌─hostname────────────────────┬─type────────┬─is_initial_query─┬─databases───┬─query──────────────────────────────────────────────────────────────────────────────────────────────────────────┬────event_time_microseconds─┬─query_start_time_microseconds─┬─query_duration_ms─┬─read_rows─┬─read_bytes─┬─result_rows─┬─result_bytes─┬─memory_usage─┬─query_cache_usage─┐
1. │ dev-clickhouse-cluster01 │ QueryFinish │ 1 │ ['dev'] │ SELECT agcode, SUM(valid_amount) FROM dev.t_bet_origin_distributed WHERE agcode = '1854629' GROUP BY agcode; │ 2024-11-16 15:13:19.213501 │ 2024-11-16 15:13:18.491685 │ 721 │ 50000576 │ 1200013824 │ 1 │ 560 │ 10738721 │ None │
2. │ dev-clickhouse-cluster01 │ QueryFinish │ 0 │ ['dev01'] │ SELECT `agcode`, sum(`valid_amount`) FROM `t_bet_origin` WHERE `agcode` = '1854629' GROUP BY `agcode` │ 2024-11-16 15:13:19.212753 │ 2024-11-16 15:13:18.495064 │ 717 │ 50000576 │ 1200013824 │ 1 │ 656 │ 266112 │ None │
3. │ dev-clickhouse-cluster01 │ QueryStart │ 0 │ ['dev01'] │ SELECT `agcode`, sum(`valid_amount`) FROM `t_bet_origin` WHERE `agcode` = '1854629' GROUP BY `agcode` │ 2024-11-16 15:13:18.495064 │ 2024-11-16 15:13:18.495064 │ 0 │ 0 │ 0 │ 0 │ 0 │ 0 │ Unknown │
4. │ dev-clickhouse-cluster01 │ QueryStart │ 1 │ ['dev'] │ SELECT agcode, SUM(valid_amount) FROM dev.t_bet_origin_distributed WHERE agcode = '1854629' GROUP BY agcode; │ 2024-11-16 15:13:18.491685 │ 2024-11-16 15:13:18.491685 │ 0 │ 0 │ 0 │ 0 │ 0 │ 0 │ Unknown │
└─────────────────────────────┴─────────────┴──────────────────┴─────────────┴────────────────────────────────────────────────────────────────────────────────────────────────────────────────┴────────────────────────────┴───────────────────────────────┴───────────────────┴───────────┴────────────┴─────────────┴──────────────┴──────────────┴───────────────────┘
┌─hostname────────────────────┬─type────────┬─is_initial_query─┬─databases───┬─query─────────────────────────────────────────────────────────────────────────────────────────────────┬────event_time_microseconds─┬─query_start_time_microseconds─┬─query_duration_ms─┬─read_rows─┬─read_bytes─┬─result_rows─┬─result_bytes─┬─memory_usage─┬─query_cache_usage─┐
5. │ dev-clickhouse-cluster02 │ QueryFinish │ 0 │ ['dev02'] │ SELECT `agcode`, sum(`valid_amount`) FROM `t_bet_origin` WHERE `agcode` = '1854629' GROUP BY `agcode` │ 2024-11-16 15:13:19.053968 │ 2024-11-16 15:13:19.045409 │ 8 │ 0 │ 0 │ 0 │ 0 │ 22576 │ None │
6. │ dev-clickhouse-cluster02 │ QueryStart │ 0 │ ['dev02'] │ SELECT `agcode`, sum(`valid_amount`) FROM `t_bet_origin` WHERE `agcode` = '1854629' GROUP BY `agcode` │ 2024-11-16 15:13:19.045409 │ 2024-11-16 15:13:19.045409 │ 0 │ 0 │ 0 │ 0 │ 0 │ 0 │ Unknown │
└─────────────────────────────┴─────────────┴──────────────────┴─────────────┴───────────────────────────────────────────────────────────────────────────────────────────────────────┴────────────────────────────┴───────────────────────────────┴───────────────────┴───────────┴────────────┴─────────────┴──────────────┴──────────────┴───────────────────┘
┌─hostname────────────────────┬─type────────┬─is_initial_query─┬─databases───┬─query─────────────────────────────────────────────────────────────────────────────────────────────────┬────event_time_microseconds─┬─query_start_time_microseconds─┬─query_duration_ms─┬─read_rows─┬─read_bytes─┬─result_rows─┬─result_bytes─┬─memory_usage─┬─query_cache_usage─┐
7. │ dev-clickhouse-cluster03 │ QueryFinish │ 0 │ ['dev03'] │ SELECT `agcode`, sum(`valid_amount`) FROM `t_bet_origin` WHERE `agcode` = '1854629' GROUP BY `agcode` │ 2024-11-16 15:13:18.439070 │ 2024-11-16 15:13:18.427930 │ 11 │ 0 │ 0 │ 0 │ 0 │ 22576 │ None │
8. │ dev-clickhouse-cluster03 │ QueryStart │ 0 │ ['dev03'] │ SELECT `agcode`, sum(`valid_amount`) FROM `t_bet_origin` WHERE `agcode` = '1854629' GROUP BY `agcode` │ 2024-11-16 15:13:18.427930 │ 2024-11-16 15:13:18.427930 │ 0 │ 0 │ 0 │ 0 │ 0 │ 0 │ Unknown │
└─────────────────────────────┴─────────────┴──────────────────┴─────────────┴───────────────────────────────────────────────────────────────────────────────────────────────────────┴────────────────────────────┴───────────────────────────────┴───────────────────┴───────────┴────────────┴─────────────┴──────────────┴──────────────┴───────────────────┘
8 rows in set. Elapsed: 0.094 sec. Processed 555.18 thousand rows, 29.08 MB (5.91 million rows/s., 309.42 MB/s.)
Peak memory usage: 10.24 MiB.
dev-clickhouse-cluster01 :)
要解讀這段 query log 的話我們先從 is_initial_query
看起,我們對 node 1 (dev-clickhouse-cluster01) 的 distributed table 下 query 後,它會把 query 送到每個 shard 去執行。
還記得在剛剛創表時的 sharding 的 replica 資料分配,在 node 1 上已經有 shard 1 (dev01) 的資料了,所以 distributed table 決定 shard 1 的資料就從自身 node 1 撈就好。
node 2(dev-clickhouse-cluster02) 接收到去撈 shard 2 (dev02) 的 query。
node 3(dev-clickhouse-cluster03) 接收到去撈 shard 3 (dev03) 的 query。
當 node 2、node 3 算完自身 shard 內的結果後,會將結果返回給 node 1 的 distributed table,由 distributed table 做最後的匯總。
根據剛剛塞假資料的結果這個用戶的注單落在 shard 1,所以在查 shard 2、shard 3 時查無資料 (read_rows = 0) 跟預期相符。
shard 1 花費了 717 毫秒查詢並加總,shard 2、shard 3 因為沒有我們要的資料所以 query 僅花費 8 毫秒及 11 毫秒就完成了,3 個 shard 將結果返回給 distributed table 做最後的統整,而這整個過程總計費時 721 毫秒。
如果我們將每個 shard 的執行時間加總 (shard 1 的 717 毫秒 + shard 2 的 8 毫秒 + shard 3 的 11 毫秒) > 總執行時間 721 毫秒,可以推論出 distributed table 是平行分派執行 query。
2. 透過 distributed table 取得多個 shard 內的資料 (一)
實驗 2 我們來加總每個 shard 內的其中一位用戶的總有效投注,想辦法讓所有 node 都 busy 起來。
root@e516a8a6937a:/# clickhouse-client --host=10.1.103.179 --user=andywu --password=123456
ClickHouse client version 24.3.6.48 (official build).
Connecting to 10.1.103.179:9000 as user andywu.
Connected to ClickHouse server version 24.2.1.
dev-clickhouse-cluster01 :) SELECT
agcode,
SUM(valid_amount)
FROM dev.t_bet_origin_distributed
WHERE agcode IN ('1853014', '1850001', '1854620')
GROUP BY agcode
Query id: 73edd8c0-f8d2-4290-8b00-5cf659d71a8c
┌─agcode──┬─sum(valid_amount)─┐
1. │ 1850001 │ 50000000000 │
2. │ 1854620 │ 50000000000 │
3. │ 1853014 │ 50000000000 │
└─────────┴───────────────────┘
3 rows in set. Elapsed: 1.534 sec. Processed 150.00 million rows, 3.60 GB (97.81 million rows/s., 2.35 GB/s.)
Peak memory usage: 231.68 KiB.
dev-clickhouse-cluster01 :)
執行完畢,clickhouse 顯示總共掃過了 1 億 5 千萬筆資料做 summation,比起剛剛單個 shard 只花 0.7 秒,這次 3 個 shard 總共花了 1.5 秒。雖然執行時間上升了一倍,但是以資料量來說可是原本的 3 倍。
我們一樣來分析 distributed table 分派這個 query 的細節。
dev-clickhouse-cluster01 :) SELECT
hostname,
type,
is_initial_query,
databases,
query,
event_time_microseconds,
query_start_time_microseconds,
query_duration_ms,
read_rows,
read_bytes,
result_rows,
result_bytes,
memory_usage,
query_cache_usage
FROM remote('10.1.103.179', system.query_log, 'andywu', '123456')
WHERE initial_query_id = '73edd8c0-f8d2-4290-8b00-5cf659d71a8c'
ORDER BY event_time_microseconds DESC
UNION ALL
SELECT
hostname,
type,
is_initial_query,
databases,
query,
event_time_microseconds,
query_start_time_microseconds,
query_duration_ms,
read_rows,
read_bytes,
result_rows,
result_bytes,
memory_usage,
query_cache_usage
FROM remote('10.1.103.180', system.query_log, 'andywu', '123456')
WHERE initial_query_id = '73edd8c0-f8d2-4290-8b00-5cf659d71a8c'
ORDER BY event_time_microseconds DESC
UNION ALL
SELECT
hostname,
type,
is_initial_query,
databases,
query,
event_time_microseconds,
query_start_time_microseconds,
query_duration_ms,
read_rows,
read_bytes,
result_rows,
result_bytes,
memory_usage,
query_cache_usage
FROM remote('10.1.103.181', system.query_log, 'andywu', '123456')
WHERE initial_query_id = '73edd8c0-f8d2-4290-8b00-5cf659d71a8c'
ORDER BY event_time_microseconds DESC
Query id: 3993b39b-e615-4641-ac9a-eeea194967c5
┌─hostname────────────────────┬─type────────┬─is_initial_query─┬─databases───┬─query───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┬────event_time_microseconds─┬─query_start_time_microseconds─┬─query_duration_ms─┬─read_rows─┬─read_bytes─┬─result_rows─┬─result_bytes─┬─memory_usage─┬─query_cache_usage─┐
1. │ dev-clickhouse-cluster01 │ QueryFinish │ 1 │ ['dev'] │ SELECT agcode, SUM(valid_amount) FROM dev.t_bet_origin_distributed WHERE agcode IN ('1853014', '1850001', '1854620') GROUP BY agcode; │ 2024-11-16 16:20:07.745864 │ 2024-11-16 16:20:06.212569 │ 1533 │ 150000000 │ 3600000000 │ 3 │ 608 │ 2091553 │ None │
2. │ dev-clickhouse-cluster01 │ QueryFinish │ 0 │ ['dev01'] │ SELECT `agcode`, sum(`valid_amount`) FROM `t_bet_origin` WHERE `agcode` IN ('1853014', '1850001', '1854620') GROUP BY `agcode` │ 2024-11-16 16:20:07.744907 │ 2024-11-16 16:20:06.215478 │ 1529 │ 50000000 │ 1200000000 │ 1 │ 656 │ 237240 │ None │
3. │ dev-clickhouse-cluster01 │ QueryFinish │ 0 │ ['dev03'] │ SELECT `agcode`, sum(`valid_amount`) FROM `t_bet_origin` WHERE `agcode` IN ('1853014', '1850001', '1854620') GROUP BY `agcode` │ 2024-11-16 16:20:07.736969 │ 2024-11-16 16:20:06.216108 │ 1520 │ 50000000 │ 1200000000 │ 1 │ 656 │ 290384 │ None │
4. │ dev-clickhouse-cluster01 │ QueryStart │ 0 │ ['dev03'] │ SELECT `agcode`, sum(`valid_amount`) FROM `t_bet_origin` WHERE `agcode` IN ('1853014', '1850001', '1854620') GROUP BY `agcode` │ 2024-11-16 16:20:06.216108 │ 2024-11-16 16:20:06.216108 │ 0 │ 0 │ 0 │ 0 │ 0 │ 0 │ Unknown │
5. │ dev-clickhouse-cluster01 │ QueryStart │ 0 │ ['dev01'] │ SELECT `agcode`, sum(`valid_amount`) FROM `t_bet_origin` WHERE `agcode` IN ('1853014', '1850001', '1854620') GROUP BY `agcode` │ 2024-11-16 16:20:06.215478 │ 2024-11-16 16:20:06.215478 │ 0 │ 0 │ 0 │ 0 │ 0 │ 0 │ Unknown │
6. │ dev-clickhouse-cluster01 │ QueryStart │ 1 │ ['dev'] │ SELECT agcode, SUM(valid_amount) FROM dev.t_bet_origin_distributed WHERE agcode IN ('1853014', '1850001', '1854620') GROUP BY agcode; │ 2024-11-16 16:20:06.212569 │ 2024-11-16 16:20:06.212569 │ 0 │ 0 │ 0 │ 0 │ 0 │ 0 │ Unknown │
└─────────────────────────────┴─────────────┴──────────────────┴─────────────┴─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┴────────────────────────────┴───────────────────────────────┴───────────────────┴───────────┴────────────┴─────────────┴──────────────┴──────────────┴───────────────────┘
┌─hostname────────────────────┬─type────────┬─is_initial_query─┬─databases───┬─query──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┬────event_time_microseconds─┬─query_start_time_microseconds─┬─query_duration_ms─┬─read_rows─┬─read_bytes─┬─result_rows─┬─result_bytes─┬─memory_usage─┬─query_cache_usage─┐
7. │ dev-clickhouse-cluster02 │ QueryFinish │ 0 │ ['dev02'] │ SELECT `agcode`, sum(`valid_amount`) FROM `t_bet_origin` WHERE `agcode` IN ('1853014', '1850001', '1854620') GROUP BY `agcode` │ 2024-11-16 16:20:07.570665 │ 2024-11-16 16:20:06.812059 │ 758 │ 50000000 │ 1200000000 │ 1 │ 656 │ 236600 │ None │
8. │ dev-clickhouse-cluster02 │ QueryStart │ 0 │ ['dev02'] │ SELECT `agcode`, sum(`valid_amount`) FROM `t_bet_origin` WHERE `agcode` IN ('1853014', '1850001', '1854620') GROUP BY `agcode` │ 2024-11-16 16:20:06.812059 │ 2024-11-16 16:20:06.812059 │ 0 │ 0 │ 0 │ 0 │ 0 │ 0 │ Unknown │
└─────────────────────────────┴─────────────┴──────────────────┴─────────────┴────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┴────────────────────────────┴───────────────────────────────┴───────────────────┴───────────┴────────────┴─────────────┴──────────────┴──────────────┴───────────────────┘
8 rows in set. Elapsed: 0.066 sec. Processed 555.17 thousand rows, 27.32 MB (8.46 million rows/s., 416.54 MB/s.)
Peak memory usage: 10.21 MiB.
dev-clickhouse-cluster01 :)
看到 query log 才發現,原來 clickhouse 覺得 node 1 已經擁有 shard 1 與 shard 3 的資料了,所以並沒有將 query 送到 node 3 去讓 node 3 撈 shard 3 的資料,反而自己單幹完 shard 1 與 shard 3 的資料運算 ,這也解釋了為什麼花了 1.5 秒快兩倍的時間。
(i.e., 它覺得自己把事情做完比叫別人做事後還有來回的網路 I/O 延遲來得快,是不是跟平常寫程式時叫別人改 code 的心態有點像 XD)
3. 透過 distributed table 取得多個 shard 內的資料 (二)
實驗 3 我們把前一個實驗的 where 條件拔掉,來加總所有用戶的有效投注,看有沒有辦法真的讓所有 node 都 busy 起來。
這次我們改從剛剛閒閒沒事做的 node 3 下去 query 他的 distributed table。
root@e516a8a6937a:/# clickhouse-client --host=10.1.103.181 --user=andywu --password=123456
ClickHouse client version 24.3.6.48 (official build).
Connecting to 10.1.103.181:9000 as user andywu.
Connected to ClickHouse server version 24.2.1.
dev-clickhouse-cluster03 :) SELECT
agcode,
SUM(valid_amount)
FROM dev.t_bet_origin_distributed
GROUP BY agcode
Query id: 5e01c84f-e81d-4c09-86db-97f65a8f7281
┌─agcode──┬─sum(valid_amount)─┐
1. │ 1850001 │ 50000000000 │
2. │ 1854629 │ 50000000000 │
3. │ 1854628 │ 50000000000 │
4. │ 1854620 │ 50000000000 │
5. │ 1853014 │ 50000000000 │
6. │ 1850377 │ 50000000000 │
└─────────┴───────────────────┘
6 rows in set. Elapsed: 2.023 sec. Processed 300.00 million rows, 7.20 GB (148.28 million rows/s., 3.56 GB/s.)
Peak memory usage: 6.50 MiB.
dev-clickhouse-cluster03 :)
這次的資料量從前一次 1 億 5 千萬筆提升到了 3 億筆。運算時間只從原先的 1.5 秒提升到了 2 秒,性能看起來不錯。
我們一樣來分析 distributed table 分派這個 query 的細節。
dev-clickhouse-cluster03 :) SELECT
hostname,
type,
is_initial_query,
databases,
query,
event_time_microseconds,
query_start_time_microseconds,
query_duration_ms,
read_rows,
read_bytes,
result_rows,
result_bytes,
memory_usage,
query_cache_usage
FROM remote('10.1.103.179', system.query_log, 'andywu', '123456')
WHERE initial_query_id = '5e01c84f-e81d-4c09-86db-97f65a8f7281'
ORDER BY event_time_microseconds DESC
UNION ALL
SELECT
hostname,
type,
is_initial_query,
databases,
query,
event_time_microseconds,
query_start_time_microseconds,
query_duration_ms,
read_rows,
read_bytes,
result_rows,
result_bytes,
memory_usage,
query_cache_usage
FROM remote('10.1.103.180', system.query_log, 'andywu', '123456')
WHERE initial_query_id = '5e01c84f-e81d-4c09-86db-97f65a8f7281'
ORDER BY event_time_microseconds DESC
UNION ALL
SELECT
hostname,
type,
is_initial_query,
databases,
query,
event_time_microseconds,
query_start_time_microseconds,
query_duration_ms,
read_rows,
read_bytes,
result_rows,
result_bytes,
memory_usage,
query_cache_usage
FROM remote('10.1.103.181', system.query_log, 'andywu', '123456')
WHERE initial_query_id = '5e01c84f-e81d-4c09-86db-97f65a8f7281'
ORDER BY event_time_microseconds DESC
Query id: 57cc32d9-7d8f-4177-9485-63af88f98ce7
┌─hostname────────────────────┬─type────────┬─is_initial_query─┬─databases─┬─query─────────────────────────────────────────────────────────────────────────────────┬────event_time_microseconds─┬─query_start_time_microseconds─┬─query_duration_ms─┬─read_rows─┬─read_bytes─┬─result_rows─┬─result_bytes─┬─memory_usage─┬─query_cache_usage─┐
1. │ dev-clickhouse-cluster03 │ QueryFinish │ 1 │ ['dev'] │ SELECT agcode, SUM(valid_amount) FROM dev.t_bet_origin_distributed GROUP BY agcode; │ 2024-11-16 16:52:39.338032 │ 2024-11-16 16:52:37.315424 │ 2022 │ 300000000 │ 7200000000 │ 6 │ 680 │ 6820180 │ None │
2. │ dev-clickhouse-cluster03 │ QueryStart │ 1 │ ['dev'] │ SELECT agcode, SUM(valid_amount) FROM dev.t_bet_origin_distributed GROUP BY agcode; │ 2024-11-16 16:52:37.315424 │ 2024-11-16 16:52:37.315424 │ 0 │ 0 │ 0 │ 0 │ 0 │ 0 │ Unknown │
└─────────────────────────────┴─────────────┴──────────────────┴───────────┴───────────────────────────────────────────────────────────────────────────────────────┴────────────────────────────┴───────────────────────────────┴───────────────────┴───────────┴────────────┴─────────────┴──────────────┴──────────────┴───────────────────┘
┌─hostname────────────────────┬─type────────┬─is_initial_query─┬─databases───┬─query──────────────────────────────────────────────────────────────────────┬────event_time_microseconds─┬─query_start_time_microseconds─┬─query_duration_ms─┬─read_rows─┬─read_bytes─┬─result_rows─┬─result_bytes─┬─memory_usage─┬─query_cache_usage─┐
3. │ dev-clickhouse-cluster02 │ QueryFinish │ 0 │ ['dev01'] │ SELECT `agcode`, sum(`valid_amount`) FROM `t_bet_origin` GROUP BY `agcode` │ 2024-11-16 16:52:40.042443 │ 2024-11-16 16:52:38.041108 │ 2001 │ 100000000 │ 2400000000 │ 2 │ 664 │ 368096 │ None │
4. │ dev-clickhouse-cluster02 │ QueryFinish │ 0 │ ['dev02'] │ SELECT `agcode`, sum(`valid_amount`) FROM `t_bet_origin` GROUP BY `agcode` │ 2024-11-16 16:52:40.040456 │ 2024-11-16 16:52:38.041123 │ 1999 │ 100000000 │ 2400000000 │ 2 │ 664 │ 367600 │ None │
5. │ dev-clickhouse-cluster02 │ QueryStart │ 0 │ ['dev02'] │ SELECT `agcode`, sum(`valid_amount`) FROM `t_bet_origin` GROUP BY `agcode` │ 2024-11-16 16:52:38.041123 │ 2024-11-16 16:52:38.041123 │ 0 │ 0 │ 0 │ 0 │ 0 │ 0 │ Unknown │
6. │ dev-clickhouse-cluster02 │ QueryStart │ 0 │ ['dev01'] │ SELECT `agcode`, sum(`valid_amount`) FROM `t_bet_origin` GROUP BY `agcode` │ 2024-11-16 16:52:38.041108 │ 2024-11-16 16:52:38.041108 │ 0 │ 0 │ 0 │ 0 │ 0 │ 0 │ Unknown │
└─────────────────────────────┴─────────────┴──────────────────┴─────────────┴────────────────────────────────────────────────────────────────────────────┴────────────────────────────┴───────────────────────────────┴───────────────────┴───────────┴────────────┴─────────────┴──────────────┴──────────────┴───────────────────┘
┌─hostname────────────────────┬─type────────┬─is_initial_query─┬─databases───┬─query──────────────────────────────────────────────────────────────────────┬────event_time_microseconds─┬─query_start_time_microseconds─┬─query_duration_ms─┬─read_rows─┬─read_bytes─┬─result_rows─┬─result_bytes─┬─memory_usage─┬─query_cache_usage─┐
7. │ dev-clickhouse-cluster01 │ QueryFinish │ 0 │ ['dev03'] │ SELECT `agcode`, sum(`valid_amount`) FROM `t_bet_origin` GROUP BY `agcode` │ 2024-11-16 16:52:38.697425 │ 2024-11-16 16:52:37.423076 │ 1274 │ 100000000 │ 2400000000 │ 2 │ 664 │ 367640 │ None │
8. │ dev-clickhouse-cluster01 │ QueryStart │ 0 │ ['dev03'] │ SELECT `agcode`, sum(`valid_amount`) FROM `t_bet_origin` GROUP BY `agcode` │ 2024-11-16 16:52:37.423076 │ 2024-11-16 16:52:37.423076 │ 0 │ 0 │ 0 │ 0 │ 0 │ 0 │ Unknown │
└─────────────────────────────┴─────────────┴──────────────────┴─────────────┴────────────────────────────────────────────────────────────────────────────┴────────────────────────────┴───────────────────────────────┴───────────────────┴───────────┴────────────┴─────────────┴──────────────┴──────────────┴───────────────────┘
8 rows in set. Elapsed: 0.063 sec. Processed 555.29 thousand rows, 27.48 MB (8.80 million rows/s., 435.66 MB/s.)
Peak memory usage: 5.44 MiB.
dev-clickhouse-cluster03 :)
抓到,node 3 在給我偷懶,他把 query 丟到 node 1 與 node 2 去讓它們算,自己卻只做分派任務的動作並等著收割結果,跟人類世界會出現的行為 87% 像。
node 1 花了 1.2 秒加總完 shard 3 的資料。
node 2 花了 2 秒分別加總完 shard 1 與 shard 2 的資料。
node 3 的 distributed table 等到 node 2 運算完返回所花的 2 秒後,自己只再花了 21 毫秒就將結果匯總返回。
我們總得想個方法強迫 node 3 也加入勞動運算 shard 2 的資料,這樣勢必可以也讓 node 2 運算壓在 1 秒左右,降低 distributed table 等待 node 2 的時間。
4. 透過 distributed table 取得多個 shard 內的資料 (三)
實驗 4 這次我們不走 agcode
這個 sharding key 當條件而是改用 game_type
這個既不是 sharding key、sorting key、primary key 的欄位 (等同於 RDBMS 完全不走 index 的概念),藉此這個 cluster 就沒辦法事前知道資料的分佈。
(你各位不要再偷懶了,node 1、node 2、特別是 node 3 都給我跑起來,不要用走的。)
root@e516a8a6937a:/# clickhouse-client --host=10.1.103.179 --user=andywu --password=123456
ClickHouse client version 24.3.6.48 (official build).
Connecting to 10.1.103.179:9000 as user andywu.
Connected to ClickHouse server version 24.2.1.
dev-clickhouse-cluster01 :) SELECT
game_type,
SUM(valid_amount)
FROM dev.t_bet_origin_distributed
WHERE game_type = 'BAC'
GROUP BY game_type
Query id: aacd267d-e922-4d7d-9903-8ed7c3e57537
┌─game_type─┬─sum(valid_amount)─┐
1. │ BAC │ 300000000000 │
└───────────┴───────────────────┘
1 row in set. Elapsed: 1.291 sec. Processed 300.00 million rows, 6.00 GB (232.35 million rows/s., 4.65 GB/s.)
Peak memory usage: 2.00 MiB.
dev-clickhouse-cluster01 :)
有唷看起來這次有效,這次 3 億筆資料的運算時間下降到 1.2 秒左右了。
來看一下分派 query 的細節。
dev-clickhouse-cluster01 :) SELECT
hostname,
type,
is_initial_query,
databases,
query,
event_time_microseconds,
query_start_time_microseconds,
query_duration_ms,
read_rows,
read_bytes,
result_rows,
result_bytes,
memory_usage,
query_cache_usage
FROM remote('10.1.103.179', system.query_log, 'andywu', '123456')
WHERE initial_query_id = 'aacd267d-e922-4d7d-9903-8ed7c3e57537'
ORDER BY event_time_microseconds DESC
UNION ALL
SELECT
hostname,
type,
is_initial_query,
databases,
query,
event_time_microseconds,
query_start_time_microseconds,
query_duration_ms,
read_rows,
read_bytes,
result_rows,
result_bytes,
memory_usage,
query_cache_usage
FROM remote('10.1.103.180', system.query_log, 'andywu', '123456')
WHERE initial_query_id = 'aacd267d-e922-4d7d-9903-8ed7c3e57537'
ORDER BY event_time_microseconds DESC
UNION ALL
SELECT
hostname,
type,
is_initial_query,
databases,
query,
event_time_microseconds,
query_start_time_microseconds,
query_duration_ms,
read_rows,
read_bytes,
result_rows,
result_bytes,
memory_usage,
query_cache_usage
FROM remote('10.1.103.181', system.query_log, 'andywu', '123456')
WHERE initial_query_id = 'aacd267d-e922-4d7d-9903-8ed7c3e57537'
ORDER BY event_time_microseconds DESC
Query id: bbf1a3bd-a9b3-47af-a99c-c4205bce3683
┌─hostname────────────────────┬─type────────┬─is_initial_query─┬─databases───┬─query───────────────────────────────────────────────────────────────────────────────────────────────────────────────┬────event_time_microseconds─┬─query_start_time_microseconds─┬─query_duration_ms─┬─read_rows─┬─read_bytes─┬─result_rows─┬─result_bytes─┬─memory_usage─┬─query_cache_usage─┐
1. │ dev-clickhouse-cluster01 │ QueryFinish │ 1 │ ['dev'] │ SELECT game_type, SUM(valid_amount) FROM dev.t_bet_origin_distributed WHERE game_type = 'BAC' GROUP BY game_type; │ 2024-11-16 17:36:06.269881 │ 2024-11-16 17:36:04.978911 │ 1290 │ 300000000 │ 6000000000 │ 1 │ 560 │ 2094237 │ None │
2. │ dev-clickhouse-cluster01 │ QueryFinish │ 0 │ ['dev03'] │ SELECT `game_type`, sum(`valid_amount`) FROM `t_bet_origin` WHERE `game_type` = 'BAC' GROUP BY `game_type` │ 2024-11-16 17:36:06.163945 │ 2024-11-16 17:36:04.982116 │ 1181 │ 100000000 │ 2000000000 │ 1 │ 656 │ 445848 │ None │
3. │ dev-clickhouse-cluster01 │ QueryStart │ 0 │ ['dev03'] │ SELECT `game_type`, sum(`valid_amount`) FROM `t_bet_origin` WHERE `game_type` = 'BAC' GROUP BY `game_type` │ 2024-11-16 17:36:04.982116 │ 2024-11-16 17:36:04.982116 │ 0 │ 0 │ 0 │ 0 │ 0 │ 0 │ Unknown │
4. │ dev-clickhouse-cluster01 │ QueryStart │ 1 │ ['dev'] │ SELECT game_type, SUM(valid_amount) FROM dev.t_bet_origin_distributed WHERE game_type = 'BAC' GROUP BY game_type; │ 2024-11-16 17:36:04.978911 │ 2024-11-16 17:36:04.978911 │ 0 │ 0 │ 0 │ 0 │ 0 │ 0 │ Unknown │
└─────────────────────────────┴─────────────┴──────────────────┴─────────────┴─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┴────────────────────────────┴───────────────────────────────┴───────────────────┴───────────┴────────────┴─────────────┴──────────────┴──────────────┴───────────────────┘
┌─hostname────────────────────┬─type────────┬─is_initial_query─┬─databases───┬─query──────────────────────────────────────────────────────────────────────────────────────────────────────┬────event_time_microseconds─┬─query_start_time_microseconds─┬─query_duration_ms─┬─read_rows─┬─read_bytes─┬─result_rows─┬─result_bytes─┬─memory_usage─┬─query_cache_usage─┐
5. │ dev-clickhouse-cluster02 │ QueryFinish │ 0 │ ['dev01'] │ SELECT `game_type`, sum(`valid_amount`) FROM `t_bet_origin` WHERE `game_type` = 'BAC' GROUP BY `game_type` │ 2024-11-16 17:36:06.903508 │ 2024-11-16 17:36:05.632965 │ 1270 │ 100000000 │ 2000000000 │ 1 │ 656 │ 425640 │ None │
6. │ dev-clickhouse-cluster02 │ QueryStart │ 0 │ ['dev01'] │ SELECT `game_type`, sum(`valid_amount`) FROM `t_bet_origin` WHERE `game_type` = 'BAC' GROUP BY `game_type` │ 2024-11-16 17:36:05.632965 │ 2024-11-16 17:36:05.632965 │ 0 │ 0 │ 0 │ 0 │ 0 │ 0 │ Unknown │
└─────────────────────────────┴─────────────┴──────────────────┴─────────────┴────────────────────────────────────────────────────────────────────────────────────────────────────────────┴────────────────────────────┴───────────────────────────────┴───────────────────┴───────────┴────────────┴─────────────┴──────────────┴──────────────┴───────────────────┘
┌─hostname────────────────────┬─type────────┬─is_initial_query─┬─databases───┬─query──────────────────────────────────────────────────────────────────────────────────────────────────────┬────event_time_microseconds─┬─query_start_time_microseconds─┬─query_duration_ms─┬─read_rows─┬─read_bytes─┬─result_rows─┬─result_bytes─┬─memory_usage─┬─query_cache_usage─┐
7. │ dev-clickhouse-cluster03 │ QueryFinish │ 0 │ ['dev02'] │ SELECT `game_type`, sum(`valid_amount`) FROM `t_bet_origin` WHERE `game_type` = 'BAC' GROUP BY `game_type` │ 2024-11-16 17:36:06.170086 │ 2024-11-16 17:36:04.888504 │ 1281 │ 100000000 │ 2000000000 │ 1 │ 656 │ 425760 │ None │
8. │ dev-clickhouse-cluster03 │ QueryStart │ 0 │ ['dev02'] │ SELECT `game_type`, sum(`valid_amount`) FROM `t_bet_origin` WHERE `game_type` = 'BAC' GROUP BY `game_type` │ 2024-11-16 17:36:04.888504 │ 2024-11-16 17:36:04.888504 │ 0 │ 0 │ 0 │ 0 │ 0 │ 0 │ Unknown │
└─────────────────────────────┴─────────────┴──────────────────┴─────────────┴────────────────────────────────────────────────────────────────────────────────────────────────────────────┴────────────────────────────┴───────────────────────────────┴───────────────────┴───────────┴────────────┴─────────────┴──────────────┴──────────────┴───────────────────┘
8 rows in set. Elapsed: 0.071 sec. Processed 555.40 thousand rows, 24.99 MB (7.85 million rows/s., 353.26 MB/s.)
Peak memory usage: 5.69 MiB.
dev-clickhouse-cluster01 :)
這次 distributed table 選擇讓 node 1 去算 shard 3 的資料、node 2 去算 shard 1 的資料、node 3 去算 shard 2 的資料。
由於每個 node 的 I/O 都很平均,都以差異不大的 1.2 秒左右做完苦力活,最後由 distributed table 收集完資料後大約再花 10 毫秒做最後的匯總並返回。
這是我們預期中的結果 (⁎⁍̴̛ᴗ⁍̴̛⁎)!!
總結
透過實驗我們可以看到使用 distributed table engine 創建的 table 確實可以將運算平行分攤下去讓握有不同 shard 的 node 各自做完運算後將運算結果匯總後再返回,實現系統設計中所追求的水平擴展 ✨。雖然偶爾還是會發生不妥善利用全部 cluster node 的算力 (類比於 RDBMS 有 index 但不走 index) 的迷惑行為。
另外,本篇只展示了 distributed table engine + sharding 的賣點,還沒有玩到每個 shard 有 2 個 replica 的 HA 特性。
有機會的話也可以實驗當其中一個 node 死掉時,distributed table 還是可以拿到正確資料、以及在其中一個 node 死掉的期間對 distributed table 插入了新資料後,這個死掉的 node 復活後也會把 out of sync 的資料同步回來的玩法。