能夠做到平行運算的 ClickHouse Distributed Table Engine

分散式系統資料庫

公司原本只有使用 OLTP 的資料庫,直到這一年來資料量越來越多導致報表類的 query 越查越慢。於是開始研究起 OLAP 的資料庫,希望可以把報表類的 query 拆出去,讓 OLTP 的資料庫專注在 OLTP 的業務上。於是今天要來分享最近在研究的 OLAP 資料庫叫 clickhouse,它有一種能做到平行運算的 distributed table engine,並且測試他在撈上億筆的資料的性能如何。

介紹

使用 clickhouse 的 distributed table engine 創出來的表自身並不實際儲存資料,而是會指向到 cluster 內多個實際儲存資料的表上,我們可以把它想像成 RDBMS 的 view。這樣的優勢是在當今天對 distributed 的表下 query 時,distributed 的表會平行將我們的 query 送到各個實際儲存資料的表撈出資料後匯總,最後再返回給我們。

也就是當我們今天架 cluser 並且資料做了 sharding 分散在不同台 node 上,就能將運算量分攤到數個 node,做到水平擴展。

白話文一點的例子就好比每個月財務要為大家計算薪資。公司有 100 位夥伴但是只有 1 位財務時,那麼這位財務夥伴在月底時就可能需要加班做員工編號 EMP001 到 EMP100 共 100 份薪資的帳;

如果今天公司又招募了另一位財務夥伴的加入,那麼財務夥伴 1 這時只要負責 EMP001 到 EMP050 號同仁的帳,剩下 EMP051 到 EMP101 由財務夥伴 2 負責,瞬間工作量減少了一半,大家都可以準時下班回家 (撒花。

大概的原理差不多說完了,接下來我們會拿遊戲注單紀錄表來做舉例,以及實際使用假注單來實驗 distributed table 平行運算的效能如何。

透過 distributed table 撈注單運作示意圖

架構

目前取得試玩的 dev 環境採用的是 3 shards 2 replicas 的架構。3 shards 的意思是將資料根據 sharding key 均攤 3 份,擁有相同 sharding key 的資料會落在同個 shard 上。2 replicas 是指每個 shard 的資料總共會存 2 份,以免其中一處資料損毀時還能從另一處取回。

(兩處都損毀的話差不多可以嘗試學習其他技能了,e.g., 甜度冰塊、會員統編載具...)

以下是 3 shards 2 replicas 在 dev 環境的架法,可以看到如果將任一一個 node 劃掉時,shard 1、shard 2、shard 3 的資料還是可以在另外 2 台找到。

- zookeeper
- clickhouse cluster (3 shards、2 replicas)
    - node 1 (10.1.103.179)
        - shard 1、replica 1
        - shard 3、replica 2
    - node 2 (10.1.103.180)
        - shard 2、replica 1
        - shard 1、replica 2
    - node 3 (10.1.103.181)
        - shard 3、replica 1
        - shard 2、replica 2

創建實驗用的表

為了實驗測試我們的遊戲注單範例,我們首先需要創建 t_bet_origin 原始注單表在 3 shards * 2 replicas = 6 個不同 replicas 上,以下為 DDL 語法。

以及 3 個 t_bet_origin_distributed 虛擬的 distributed table 在 3 個 node 上,這樣程式連上任意一個 node 都能 query 到整個 cluster 的遊戲注單資料。

原始投注紀錄表

node 1

  • shard 1、replica 1
CREATE TABLE dev01.t_bet_origin
(
    `order_id` String COMMENT '注單編號',
    `username` String COMMENT '用戶loginname',
    `product_id` String COMMENT '產品號',
    `action` String COMMENT '動作',
    `ip` String COMMENT 'IP',
    `game_type` String COMMENT '遊戲類型',
    `play_type` Int32 COMMENT '玩法',
    `table_id` String COMMENT '桌號',
    `gmcode` String COMMENT '局號',
    `amount` Decimal(18, 2) COMMENT '投注額',
    `valid_amount` Decimal(18, 2) COMMENT '有效投注額',
    `ratio` Decimal(14, 0) COMMENT '轉換成RMB的比率',
    `currency` String COMMENT '幣別',
    `agcode` String COMMENT 'agcode',
    `up_agcode` String COMMENT '上層agcode',
    `agid` String COMMENT '代理線ID',
    `log_time` DateTime('Asia/Taipei') COMMENT 'log寫入時間(結算時間)',
    `created_at` DateTime('Asia/Taipei') DEFAULT now() COMMENT '創建時間',
    PROJECTION t_bet_origin_gmcode_projection_v2 (
        SELECT * ORDER BY gmcode
    )
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/andy_test_cluster/shard01/tables/t_bet_origin', 'replica01')
PRIMARY KEY (
    agcode,
    order_id,
    play_type
)
ORDER BY (
    agcode,
    order_id,
    play_type
)
COMMENT '原始投注紀錄';
  • shard 3、replica 2
CREATE TABLE dev03.t_bet_origin
(
    `order_id` String COMMENT '注單編號',
    `username` String COMMENT '用戶loginname',
    `product_id` String COMMENT '產品號',
    `action` String COMMENT '動作',
    `ip` String COMMENT 'IP',
    `game_type` String COMMENT '遊戲類型',
    `play_type` Int32 COMMENT '玩法',
    `table_id` String COMMENT '桌號',
    `gmcode` String COMMENT '局號',
    `amount` Decimal(18, 2) COMMENT '投注額',
    `valid_amount` Decimal(18, 2) COMMENT '有效投注額',
    `ratio` Decimal(14, 0) COMMENT '轉換成RMB的比率',
    `currency` String COMMENT '幣別',
    `agcode` String COMMENT 'agcode',
    `up_agcode` String COMMENT '上層agcode',
    `agid` String COMMENT '代理線ID',
    `log_time` DateTime('Asia/Taipei') COMMENT 'log寫入時間(結算時間)',
    `created_at` DateTime('Asia/Taipei') DEFAULT now() COMMENT '創建時間',
    PROJECTION t_bet_origin_gmcode_projection_v2 (
        SELECT * ORDER BY gmcode
    )
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/andy_test_cluster/shard03/tables/t_bet_origin', 'replica02')
PRIMARY KEY (
    agcode,
    order_id,
    play_type
)
ORDER BY (
    agcode,
    order_id,
    play_type
)
COMMENT '原始投注紀錄';

node 2

  • shard 2、replica 1
CREATE TABLE dev02.t_bet_origin
(
    `order_id` String COMMENT '注單編號',
    `username` String COMMENT '用戶loginname',
    `product_id` String COMMENT '產品號',
    `action` String COMMENT '動作',
    `ip` String COMMENT 'IP',
    `game_type` String COMMENT '遊戲類型',
    `play_type` Int32 COMMENT '玩法',
    `table_id` String COMMENT '桌號',
    `gmcode` String COMMENT '局號',
    `amount` Decimal(18, 2) COMMENT '投注額',
    `valid_amount` Decimal(18, 2) COMMENT '有效投注額',
    `ratio` Decimal(14, 0) COMMENT '轉換成RMB的比率',
    `currency` String COMMENT '幣別',
    `agcode` String COMMENT 'agcode',
    `up_agcode` String COMMENT '上層agcode',
    `agid` String COMMENT '代理線ID',
    `log_time` DateTime('Asia/Taipei') COMMENT 'log寫入時間(結算時間)',
    `created_at` DateTime('Asia/Taipei') DEFAULT now() COMMENT '創建時間',
    PROJECTION t_bet_origin_gmcode_projection_v2 (
        SELECT * ORDER BY gmcode
    )
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/andy_test_cluster/shard02/tables/t_bet_origin', 'replica01')
PRIMARY KEY (
    agcode,
    order_id,
    play_type
)
ORDER BY (
    agcode,
    order_id,
    play_type
)
COMMENT '原始投注紀錄';
  • shard 1、replica 2
CREATE TABLE dev01.t_bet_origin
(
    `order_id` String COMMENT '注單編號',
    `username` String COMMENT '用戶loginname',
    `product_id` String COMMENT '產品號',
    `action` String COMMENT '動作',
    `ip` String COMMENT 'IP',
    `game_type` String COMMENT '遊戲類型',
    `play_type` Int32 COMMENT '玩法',
    `table_id` String COMMENT '桌號',
    `gmcode` String COMMENT '局號',
    `amount` Decimal(18, 2) COMMENT '投注額',
    `valid_amount` Decimal(18, 2) COMMENT '有效投注額',
    `ratio` Decimal(14, 0) COMMENT '轉換成RMB的比率',
    `currency` String COMMENT '幣別',
    `agcode` String COMMENT 'agcode',
    `up_agcode` String COMMENT '上層agcode',
    `agid` String COMMENT '代理線ID',
    `log_time` DateTime('Asia/Taipei') COMMENT 'log寫入時間(結算時間)',
    `created_at` DateTime('Asia/Taipei') DEFAULT now() COMMENT '創建時間',
    PROJECTION t_bet_origin_gmcode_projection_v2 (
        SELECT * ORDER BY gmcode
    )
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/andy_test_cluster/shard01/tables/t_bet_origin', 'replica02')
PRIMARY KEY (
    agcode,
    order_id,
    play_type
)
ORDER BY (
    agcode,
    order_id,
    play_type
)
COMMENT '原始投注紀錄';

node 3

  • shard 3、replica 1
CREATE TABLE dev03.t_bet_origin
(
    `order_id` String COMMENT '注單編號',
    `username` String COMMENT '用戶loginname',
    `product_id` String COMMENT '產品號',
    `action` String COMMENT '動作',
    `ip` String COMMENT 'IP',
    `game_type` String COMMENT '遊戲類型',
    `play_type` Int32 COMMENT '玩法',
    `table_id` String COMMENT '桌號',
    `gmcode` String COMMENT '局號',
    `amount` Decimal(18, 2) COMMENT '投注額',
    `valid_amount` Decimal(18, 2) COMMENT '有效投注額',
    `ratio` Decimal(14, 0) COMMENT '轉換成RMB的比率',
    `currency` String COMMENT '幣別',
    `agcode` String COMMENT 'agcode',
    `up_agcode` String COMMENT '上層agcode',
    `agid` String COMMENT '代理線ID',
    `log_time` DateTime('Asia/Taipei') COMMENT 'log寫入時間(結算時間)',
    `created_at` DateTime('Asia/Taipei') DEFAULT now() COMMENT '創建時間',
    PROJECTION t_bet_origin_gmcode_projection_v2 (
        SELECT * ORDER BY gmcode
    )
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/andy_test_cluster/shard03/tables/t_bet_origin', 'replica01')
PRIMARY KEY (
    agcode,
    order_id,
    play_type
)
ORDER BY (
    agcode,
    order_id,
    play_type
)
COMMENT '原始投注紀錄';
  • shard 2、replica 2
CREATE TABLE dev02.t_bet_origin
(
    `order_id` String COMMENT '注單編號',
    `username` String COMMENT '用戶loginname',
    `product_id` String COMMENT '產品號',
    `action` String COMMENT '動作',
    `ip` String COMMENT 'IP',
    `game_type` String COMMENT '遊戲類型',
    `play_type` Int32 COMMENT '玩法',
    `table_id` String COMMENT '桌號',
    `gmcode` String COMMENT '局號',
    `amount` Decimal(18, 2) COMMENT '投注額',
    `valid_amount` Decimal(18, 2) COMMENT '有效投注額',
    `ratio` Decimal(14, 0) COMMENT '轉換成RMB的比率',
    `currency` String COMMENT '幣別',
    `agcode` String COMMENT 'agcode',
    `up_agcode` String COMMENT '上層agcode',
    `agid` String COMMENT '代理線ID',
    `log_time` DateTime('Asia/Taipei') COMMENT 'log寫入時間(結算時間)',
    `created_at` DateTime('Asia/Taipei') DEFAULT now() COMMENT '創建時間',
    PROJECTION t_bet_origin_gmcode_projection_v2 (
        SELECT * ORDER BY gmcode
    )
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/andy_test_cluster/shard02/tables/t_bet_origin', 'replica02')
PRIMARY KEY (
    agcode,
    order_id,
    play_type
)
ORDER BY (
    agcode,
    order_id,
    play_type
)
COMMENT '原始投注紀錄';

封裝原始投注紀錄表的 distributed table

node 1

CREATE TABLE dev.t_bet_origin_distributed (
    `order_id` String COMMENT '注單編號',
    `username` String COMMENT '用戶loginname',
    `product_id` String COMMENT '產品號',
    `action` String COMMENT '動作',
    `ip` String COMMENT 'IP',
    `game_type` String COMMENT '遊戲類型',
    `play_type` Int32 COMMENT '玩法',
    `table_id` String COMMENT '桌號',
    `gmcode` String COMMENT '局號',
    `amount` Decimal(18, 2) COMMENT '投注額',
    `valid_amount` Decimal(18, 2) COMMENT '有效投注額',
    `ratio` Decimal(14, 0) COMMENT '轉換成RMB的比率',
    `currency` String COMMENT '幣別',
    `agcode` String COMMENT 'agcode',
    `up_agcode` String COMMENT '上層agcode',
    `agid` String COMMENT '代理線ID',
    `log_time` DateTime COMMENT 'log寫入時間(結算時間)',
    `created_at` DateTime DEFAULT now() COMMENT '創建時間'
) ENGINE = Distributed (
    'andy_test_cluster',
    '',
    't_bet_origin',
    halfMD5(agcode)
);

node 2

CREATE TABLE dev.t_bet_origin_distributed (
    `order_id` String COMMENT '注單編號',
    `username` String COMMENT '用戶loginname',
    `product_id` String COMMENT '產品號',
    `action` String COMMENT '動作',
    `ip` String COMMENT 'IP',
    `game_type` String COMMENT '遊戲類型',
    `play_type` Int32 COMMENT '玩法',
    `table_id` String COMMENT '桌號',
    `gmcode` String COMMENT '局號',
    `amount` Decimal(18, 2) COMMENT '投注額',
    `valid_amount` Decimal(18, 2) COMMENT '有效投注額',
    `ratio` Decimal(14, 0) COMMENT '轉換成RMB的比率',
    `currency` String COMMENT '幣別',
    `agcode` String COMMENT 'agcode',
    `up_agcode` String COMMENT '上層agcode',
    `agid` String COMMENT '代理線ID',
    `log_time` DateTime COMMENT 'log寫入時間(結算時間)',
    `created_at` DateTime DEFAULT now() COMMENT '創建時間'
) ENGINE = Distributed (
    'andy_test_cluster',
    '',
    't_bet_origin',
    halfMD5(agcode)
);

node 3

CREATE TABLE dev.t_bet_origin_distributed (
    `order_id` String COMMENT '注單編號',
    `username` String COMMENT '用戶loginname',
    `product_id` String COMMENT '產品號',
    `action` String COMMENT '動作',
    `ip` String COMMENT 'IP',
    `game_type` String COMMENT '遊戲類型',
    `play_type` Int32 COMMENT '玩法',
    `table_id` String COMMENT '桌號',
    `gmcode` String COMMENT '局號',
    `amount` Decimal(18, 2) COMMENT '投注額',
    `valid_amount` Decimal(18, 2) COMMENT '有效投注額',
    `ratio` Decimal(14, 0) COMMENT '轉換成RMB的比率',
    `currency` String COMMENT '幣別',
    `agcode` String COMMENT 'agcode',
    `up_agcode` String COMMENT '上層agcode',
    `agid` String COMMENT '代理線ID',
    `log_time` DateTime COMMENT 'log寫入時間(結算時間)',
    `created_at` DateTime DEFAULT now() COMMENT '創建時間'
) ENGINE = Distributed (
    'andy_test_cluster',
    '',
    't_bet_origin',
    halfMD5(agcode)
);

往 distributed table 塞測試用假資料

創完表後接下來分別為 3 個 shard 各自塞 1 億筆注單,其中分為 5000 萬筆來自同一個人、另外 5000 萬筆來自另一個人。

由於我們已經為 t_bet_origin_distributed 設定 sharding key 為 halfMD5(agcode) 了,所以只要往 t_bet_origin_distributed 這個 distributed table 無腦塞資料,它就會自動根據 agcode 把資料推到對應的 shard 去。

為了實驗方便,那幾組 agcode 是我刻意挑過的,這樣才能製造出每個 shard 各一億筆資料。

塞的假資料分佈如下:

shard 1:
    - username: test_data_shard_01_user_1854629 → 5000 萬筆注
    - username:test_data_shard_01_user_1853014 → 5000 萬筆注單
shard 2:
    - username:test_data_shard_02_user_1850001 → 5000 萬筆注
    - username:test_data_shard_02_user_1850377 → 5000 萬筆注單
shard 3:
    - username:test_data_shard_03_user_1854620 → 5000 萬筆注
    - username:test_data_shard_03_user_1854628 → 5000 萬筆注單
package main

import (
    "context"
    "fmt"
    "log"
    "math/rand"
    "time"

    "github.com/ClickHouse/clickhouse-go/v2"
    "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
    "github.com/google/uuid"
    "github.com/shopspring/decimal"
)

var conn1 driver.Conn

func init() {
    rand.Seed(1) // 產固定隨機

    options1 := clickhouse.Options{
        Addr: []string{"10.1.103.179:9000"},
        Auth: clickhouse.Auth{
            Username: "andywu",
            Password: "123456",
        },
        Debug: true,
    }

    var err error
    conn1, err = clickhouse.Open(&options1)
    if err != nil {
        log.Fatal(err)
    }
}

func main() {
    query1 := `
        INSERT INTO dev.t_bet_origin_distributed (
            order_id,
            username,
            product_id,
            action,
            ip,
            game_type,
            play_type,
            table_id,
            gmcode,
            amount,
            valid_amount,
            ratio,
            currency,
            agcode,
            up_agcode,
            agid,
            log_time
        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
    `

    var i int64
    var err error
    var batch1 driver.Batch

    tm := time.Now().Add(-1 * 72 * time.Hour)

    for i = 1; i <= 50_000_000; i++ {
        if batch1 == nil {
            batch1, err = conn1.PrepareBatch(context.Background(), query1)
            if err != nil {
                log.Fatal(err)
            }
        }

        err = appendShard1DataToBatch(batch1, tm, i)
        // err = appendShard2DataToBatch(batch1, tm, i)
        // err = appendShard3DataToBatch(batch1, tm, i)

        if err != nil {
            log.Fatal(err)
        }

        if i%1_000_000 == 0 {
            if err := batch1.Send(); err != nil {
                log.Fatal(err)
            }
            batch1 = nil

            log.Printf("Batch %v processed.\n", i)
        }
    }

    log.Println("Data insertion completed.")
}

func appendShard1DataToBatch(batch driver.Batch, tm time.Time, i int64) error {
    // 1853014 -- shard 1
    // 1854629 -- shard 1
    orderID := fmt.Sprintf("%09d-%s", i, uuid.New().String())
    username := "test_data_shard_01_user_1853014"
    agcode := "1853014"
    upAgcode := "21563"
    productID := "AFWF9"
    action := "settle"
    ip := "127.0.0.1"
    gameType := "BAC"
    playType := 3
    table_id := "9D11"
    gmcode := "9D11247290ZH"
    amount := decimal.NewFromInt(1000)
    validAmount := decimal.NewFromInt(1000)
    ratio := decimal.NewFromInt(100)
    currency := "HKD"
    agid := "3121"
    logTime := tm.Add(time.Duration(i) * (time.Millisecond))

    return batch.Append(
        orderID,
        username,
        productID,
        action,
        ip,
        gameType,
        playType,
        table_id,
        gmcode,
        amount,
        validAmount,
        ratio,
        currency,
        agcode,
        upAgcode,
        agid,
        logTime,
    )
}

func appendShard2DataToBatch(batch driver.Batch, tm time.Time, i int64) error {
    // 1850001 -- shard 2
    // 1850377 -- shard 2
    orderID := fmt.Sprintf("%09d-%s", i, uuid.New().String())
    username := "test_data_shard_02_user_1850001"
    agcode := "1850001"
    upAgcode := "21563"
    productID := "AFWF9"
    action := "settle"
    ip := "127.0.0.1"
    gameType := "BAC"
    playType := 3
    table_id := "9D11"
    gmcode := "9D11247290ZH"
    amount := decimal.NewFromInt(1000)
    validAmount := decimal.NewFromInt(1000)
    ratio := decimal.NewFromInt(100)
    currency := "HKD"
    agid := "3121"
    logTime := tm.Add(time.Duration(i) * (time.Millisecond))

    return batch.Append(
        orderID,
        username,
        productID,
        action,
        ip,
        gameType,
        playType,
        table_id,
        gmcode,
        amount,
        validAmount,
        ratio,
        currency,
        agcode,
        upAgcode,
        agid,
        logTime,
    )
}

func appendShard3DataToBatch(batch driver.Batch, tm time.Time, i int64) error {
    // 1854628 -- shard 3
    // 1854620 -- shard 3
    orderID := fmt.Sprintf("%09d-%s", i, uuid.New().String())
    username := "test_data_shard_03_user_1854628"
    agcode := "1854628"
    upAgcode := "21563"
    productID := "AFWF9"
    action := "settle"
    ip := "127.0.0.1"
    gameType := "BAC"
    playType := 3
    table_id := "9D11"
    gmcode := "9D11247290ZH"
    amount := decimal.NewFromInt(1000)
    validAmount := decimal.NewFromInt(1000)
    ratio := decimal.NewFromInt(100)
    currency := "HKD"
    agid := "3121"
    logTime := tm.Add(time.Duration(i) * (time.Millisecond))

    return batch.Append(
        orderID,
        username,
        productID,
        action,
        ip,
        gameType,
        playType,
        table_id,
        gmcode,
        amount,
        validAmount,
        ratio,
        currency,
        agcode,
        upAgcode,
        agid,
        logTime,
    )
}

塞完後連上每個 node 確認一下 node 上的每個 shard 都有我們預期的資料:

node 1:

root@e516a8a6937a:/# clickhouse-client --host=10.1.103.179 --user=andywu --password=123456
ClickHouse client version 24.3.6.48 (official build).
Connecting to 10.1.103.179:9000 as user andywu.
Connected to ClickHouse server version 24.2.1.

dev-clickhouse-cluster01 :) SELECT
    username,
    COUNT(1)
FROM dev01.t_bet_origin
GROUP BY username

Query id: 6f1da9ff-233a-4530-af7c-016d353c6dc4

   ┌─username────────────────────────┬──count()─┐
1. │ test_data_shard_01_user_1853014 │ 50000000 │
2. │ test_data_shard_01_user_1854629 │ 50000000 │
   └─────────────────────────────────┴──────────┘

2 rows in set. Elapsed: 1.305 sec. Processed 100.00 million rows, 4.00 GB (76.66 million rows/s., 3.07 GB/s.)
Peak memory usage: 442.27 KiB.

dev-clickhouse-cluster01 :) SELECT
    username,
    COUNT(1)
FROM dev03.t_bet_origin
GROUP BY username

Query id: 2d2dadb6-3632-4bb0-b561-a693efd0ddf0

   ┌─username────────────────────────┬──count()─┐
1. │ test_data_shard_03_user_1854620 │ 50000000 │
2. │ test_data_shard_03_user_1854628 │ 50000000 │
   └─────────────────────────────────┴──────────┘

2 rows in set. Elapsed: 1.309 sec. Processed 100.00 million rows, 4.00 GB (76.40 million rows/s., 3.06 GB/s.)
Peak memory usage: 439.82 KiB.

dev-clickhouse-cluster01 :)

node 2:

root@e516a8a6937a:/# clickhouse-client --host=10.1.103.180 --user=andywu --password=123456
ClickHouse client version 24.3.6.48 (official build).
Connecting to 10.1.103.180:9000 as user andywu.
Connected to ClickHouse server version 24.2.1.

dev-clickhouse-cluster02 :) SELECT
    username,
    COUNT(1)
FROM dev01.t_bet_origin
GROUP BY username

Query id: 3b56b5d7-b613-4d53-b342-25b69f81ba54

   ┌─username────────────────────────┬──count()─┐
1. │ test_data_shard_01_user_1853014 │ 50000000 │
2. │ test_data_shard_01_user_1854629 │ 50000000 │
   └─────────────────────────────────┴──────────┘

2 rows in set. Elapsed: 1.403 sec. Processed 100.00 million rows, 4.00 GB (71.27 million rows/s., 2.85 GB/s.)
Peak memory usage: 222.72 KiB.

dev-clickhouse-cluster02 :) SELECT
    username,
    COUNT(1)
FROM dev02.t_bet_origin
GROUP BY username

Query id: 6c1cf562-f58d-45a0-92d8-79f57b14d00a

   ┌─username────────────────────────┬──count()─┐
1. │ test_data_shard_02_user_1850001 │ 50000000 │
2. │ test_data_shard_02_user_1850377 │ 50000000 │
   └─────────────────────────────────┴──────────┘

2 rows in set. Elapsed: 1.429 sec. Processed 100.00 million rows, 4.00 GB (69.97 million rows/s., 2.80 GB/s.)
Peak memory usage: 223.03 KiB.

dev-clickhouse-cluster02 :)

node 3:

root@e516a8a6937a:/# clickhouse-client --host=10.1.103.181 --user=andywu --password=123456
ClickHouse client version 24.3.6.48 (official build).
Connecting to 10.1.103.181:9000 as user andywu.
Connected to ClickHouse server version 24.2.1.

dev-clickhouse-cluster03 :) SELECT
    username,
    COUNT(1)
FROM dev02.t_bet_origin
GROUP BY username

Query id: 6f4b1346-d5f6-499c-8450-3752b32aa1b3

   ┌─username────────────────────────┬──count()─┐
1. │ test_data_shard_02_user_1850001 │ 50000000 │
2. │ test_data_shard_02_user_1850377 │ 50000000 │
   └─────────────────────────────────┴──────────┘

2 rows in set. Elapsed: 1.759 sec. Processed 100.00 million rows, 4.00 GB (56.84 million rows/s., 2.27 GB/s.)
Peak memory usage: 419.44 KiB.

dev-clickhouse-cluster03 :) SELECT
    username,
    COUNT(1)
FROM dev03.t_bet_origin
GROUP BY username

Query id: 2e76c3c5-1a9e-42ab-9f22-558194654c24

   ┌─username────────────────────────┬──count()─┐
1. │ test_data_shard_03_user_1854620 │ 50000000 │
2. │ test_data_shard_03_user_1854628 │ 50000000 │
   └─────────────────────────────────┴──────────┘

2 rows in set. Elapsed: 2.042 sec. Processed 100.00 million rows, 4.00 GB (48.98 million rows/s., 1.96 GB/s.)
Peak memory usage: 473.65 KiB.

dev-clickhouse-cluster03 :)

到這邊實驗會用到的資料就準備完畢了。

實驗

1. 透過 distributed table 取得單個 shard 內的資料

實驗 1 我們來看看透過 distributed table 拿到 agcode = '1854629' 這位用戶的有效投注加總。

root@e516a8a6937a:/# clickhouse-client --host=10.1.103.179 --user=andywu --password=123456
ClickHouse client version 24.3.6.48 (official build).
Connecting to 10.1.103.179:9000 as user andywu.
Connected to ClickHouse server version 24.2.1.

dev-clickhouse-cluster01 :) SELECT
    agcode,
    SUM(valid_amount)
FROM dev.t_bet_origin_distributed
WHERE agcode = '1854629'
GROUP BY agcode

Query id: a0a3cead-1ad3-4aad-ac0d-99550ef2ef74

   ┌─agcode──┬─sum(valid_amount)─┐
1. │ 1854629 │       50000000000 │
   └─────────┴───────────────────┘

1 row in set. Elapsed: 0.722 sec. Processed 50.00 million rows, 1.20 GB (69.24 million rows/s., 1.66 GB/s.)
Peak memory usage: 259.88 KiB.

dev-clickhouse-cluster01 :)

執行完畢,將 5 千萬筆資料做 sum 花不到一秒,clickhouse 效能非常好。

接著來更近一步透過以下語法查看 distributed table 是如何分派這個 query。

dev-clickhouse-cluster01 :) SELECT
    hostname,
    type,
    is_initial_query,
    databases,
    query,
    event_time_microseconds,
    query_start_time_microseconds,
    query_duration_ms,
    read_rows,
    read_bytes,
    result_rows,
    result_bytes,
    memory_usage,
    query_cache_usage
FROM remote('10.1.103.179', system.query_log, 'andywu', '123456')
WHERE initial_query_id = 'a0a3cead-1ad3-4aad-ac0d-99550ef2ef74'
ORDER BY event_time_microseconds DESC
UNION ALL
SELECT
    hostname,
    type,
    is_initial_query,
    databases,
    query,
    event_time_microseconds,
    query_start_time_microseconds,
    query_duration_ms,
    read_rows,
    read_bytes,
    result_rows,
    result_bytes,
    memory_usage,
    query_cache_usage
FROM remote('10.1.103.180', system.query_log, 'andywu', '123456')
WHERE initial_query_id = 'a0a3cead-1ad3-4aad-ac0d-99550ef2ef74'
ORDER BY event_time_microseconds DESC
UNION ALL
SELECT
    hostname,
    type,
    is_initial_query,
    databases,
    query,
    event_time_microseconds,
    query_start_time_microseconds,
    query_duration_ms,
    read_rows,
    read_bytes,
    result_rows,
    result_bytes,
    memory_usage,
    query_cache_usage
FROM remote('10.1.103.181', system.query_log, 'andywu', '123456')
WHERE initial_query_id = 'a0a3cead-1ad3-4aad-ac0d-99550ef2ef74'
ORDER BY event_time_microseconds DESC

Query id: 9dc8d8d1-2131-4e95-83a6-1495f4570ec7

   ┌─hostname────────────────────┬─type────────┬─is_initial_query─┬─databases───┬─query──────────────────────────────────────────────────────────────────────────────────────────────────────────┬────event_time_microseconds─┬─query_start_time_microseconds─┬─query_duration_ms─┬─read_rows─┬─read_bytes─┬─result_rows─┬─result_bytes─┬─memory_usage─┬─query_cache_usage─┐
1. │ dev-clickhouse-cluster01 │ QueryFinish │                1 │ ['dev']   │ SELECT agcode, SUM(valid_amount) FROM dev.t_bet_origin_distributed WHERE agcode = '1854629' GROUP BY agcode; │ 2024-11-16 15:13:19.213501 │    2024-11-16 15:13:18.491685 │               721 │  50000576 │ 1200013824 │           1 │          560 │     10738721 │ None              │
2. │ dev-clickhouse-cluster01 │ QueryFinish │                0 │ ['dev01'] │ SELECT `agcode`, sum(`valid_amount`) FROM `t_bet_origin` WHERE `agcode` = '1854629' GROUP BY `agcode`          │ 2024-11-16 15:13:19.212753 │    2024-11-16 15:13:18.495064 │               717 │  50000576 │ 1200013824 │           1 │          656 │       266112 │ None              │
3. │ dev-clickhouse-cluster01 │ QueryStart  │                0 │ ['dev01'] │ SELECT `agcode`, sum(`valid_amount`) FROM `t_bet_origin` WHERE `agcode` = '1854629' GROUP BY `agcode`          │ 2024-11-16 15:13:18.495064 │    2024-11-16 15:13:18.495064 │                 0 │         0 │          0 │           0 │            0 │            0 │ Unknown           │
4. │ dev-clickhouse-cluster01 │ QueryStart  │                1 │ ['dev']   │ SELECT agcode, SUM(valid_amount) FROM dev.t_bet_origin_distributed WHERE agcode = '1854629' GROUP BY agcode; │ 2024-11-16 15:13:18.491685 │    2024-11-16 15:13:18.491685 │                 0 │         0 │          0 │           0 │            0 │            0 │ Unknown           │
   └─────────────────────────────┴─────────────┴──────────────────┴─────────────┴────────────────────────────────────────────────────────────────────────────────────────────────────────────────┴────────────────────────────┴───────────────────────────────┴───────────────────┴───────────┴────────────┴─────────────┴──────────────┴──────────────┴───────────────────┘
   ┌─hostname────────────────────┬─type────────┬─is_initial_query─┬─databases───┬─query─────────────────────────────────────────────────────────────────────────────────────────────────┬────event_time_microseconds─┬─query_start_time_microseconds─┬─query_duration_ms─┬─read_rows─┬─read_bytes─┬─result_rows─┬─result_bytes─┬─memory_usage─┬─query_cache_usage─┐
5. │ dev-clickhouse-cluster02 │ QueryFinish │                0 │ ['dev02'] │ SELECT `agcode`, sum(`valid_amount`) FROM `t_bet_origin` WHERE `agcode` = '1854629' GROUP BY `agcode` │ 2024-11-16 15:13:19.053968 │    2024-11-16 15:13:19.045409 │                 8 │         0 │          0 │           0 │            0 │        22576 │ None              │
6. │ dev-clickhouse-cluster02 │ QueryStart  │                0 │ ['dev02'] │ SELECT `agcode`, sum(`valid_amount`) FROM `t_bet_origin` WHERE `agcode` = '1854629' GROUP BY `agcode` │ 2024-11-16 15:13:19.045409 │    2024-11-16 15:13:19.045409 │                 0 │         0 │          0 │           0 │            0 │            0 │ Unknown           │
   └─────────────────────────────┴─────────────┴──────────────────┴─────────────┴───────────────────────────────────────────────────────────────────────────────────────────────────────┴────────────────────────────┴───────────────────────────────┴───────────────────┴───────────┴────────────┴─────────────┴──────────────┴──────────────┴───────────────────┘
   ┌─hostname────────────────────┬─type────────┬─is_initial_query─┬─databases───┬─query─────────────────────────────────────────────────────────────────────────────────────────────────┬────event_time_microseconds─┬─query_start_time_microseconds─┬─query_duration_ms─┬─read_rows─┬─read_bytes─┬─result_rows─┬─result_bytes─┬─memory_usage─┬─query_cache_usage─┐
7. │ dev-clickhouse-cluster03 │ QueryFinish │                0 │ ['dev03'] │ SELECT `agcode`, sum(`valid_amount`) FROM `t_bet_origin` WHERE `agcode` = '1854629' GROUP BY `agcode` │ 2024-11-16 15:13:18.439070 │    2024-11-16 15:13:18.427930 │                11 │         0 │          0 │           0 │            0 │        22576 │ None              │
8. │ dev-clickhouse-cluster03 │ QueryStart  │                0 │ ['dev03'] │ SELECT `agcode`, sum(`valid_amount`) FROM `t_bet_origin` WHERE `agcode` = '1854629' GROUP BY `agcode` │ 2024-11-16 15:13:18.427930 │    2024-11-16 15:13:18.427930 │                 0 │         0 │          0 │           0 │            0 │            0 │ Unknown           │
   └─────────────────────────────┴─────────────┴──────────────────┴─────────────┴───────────────────────────────────────────────────────────────────────────────────────────────────────┴────────────────────────────┴───────────────────────────────┴───────────────────┴───────────┴────────────┴─────────────┴──────────────┴──────────────┴───────────────────┘

8 rows in set. Elapsed: 0.094 sec. Processed 555.18 thousand rows, 29.08 MB (5.91 million rows/s., 309.42 MB/s.)
Peak memory usage: 10.24 MiB.

dev-clickhouse-cluster01 :)

要解讀這段 query log 的話我們先從 is_initial_query 看起,我們對 node 1 (dev-clickhouse-cluster01) 的 distributed table 下 query 後,它會把 query 送到每個 shard 去執行。

還記得在剛剛創表時的 sharding 的 replica 資料分配,在 node 1 上已經有 shard 1 (dev01) 的資料了,所以 distributed table 決定 shard 1 的資料就從自身 node 1 撈就好。

node 2(dev-clickhouse-cluster02) 接收到去撈 shard 2 (dev02) 的 query。

node 3(dev-clickhouse-cluster03) 接收到去撈 shard 3 (dev03) 的 query。

當 node 2、node 3 算完自身 shard 內的結果後,會將結果返回給 node 1 的 distributed table,由 distributed table 做最後的匯總。

根據剛剛塞假資料的結果這個用戶的注單落在 shard 1,所以在查 shard 2、shard 3 時查無資料 (read_rows = 0) 跟預期相符。

shard 1 花費了 717 毫秒查詢並加總,shard 2、shard 3 因為沒有我們要的資料所以 query 僅花費 8 毫秒及 11 毫秒就完成了,3 個 shard 將結果返回給 distributed table 做最後的統整,而這整個過程總計費時 721 毫秒。

如果我們將每個 shard 的執行時間加總 (shard 1 的 717 毫秒 + shard 2 的 8 毫秒 + shard 3 的 11 毫秒) > 總執行時間 721 毫秒,可以推論出 distributed table 是平行分派執行 query。

2. 透過 distributed table 取得多個 shard 內的資料 (一)

實驗 2 我們來加總每個 shard 內的其中一位用戶的總有效投注,想辦法讓所有 node 都 busy 起來。

root@e516a8a6937a:/# clickhouse-client --host=10.1.103.179 --user=andywu --password=123456
ClickHouse client version 24.3.6.48 (official build).
Connecting to 10.1.103.179:9000 as user andywu.
Connected to ClickHouse server version 24.2.1.

dev-clickhouse-cluster01 :) SELECT
    agcode,
    SUM(valid_amount)
FROM dev.t_bet_origin_distributed
WHERE agcode IN ('1853014', '1850001', '1854620')
GROUP BY agcode

Query id: 73edd8c0-f8d2-4290-8b00-5cf659d71a8c

   ┌─agcode──┬─sum(valid_amount)─┐
1. │ 1850001 │       50000000000 │
2. │ 1854620 │       50000000000 │
3. │ 1853014 │       50000000000 │
   └─────────┴───────────────────┘

3 rows in set. Elapsed: 1.534 sec. Processed 150.00 million rows, 3.60 GB (97.81 million rows/s., 2.35 GB/s.)
Peak memory usage: 231.68 KiB.

dev-clickhouse-cluster01 :)

執行完畢,clickhouse 顯示總共掃過了 1 億 5 千萬筆資料做 summation,比起剛剛單個 shard 只花 0.7 秒,這次 3 個 shard 總共花了 1.5 秒。雖然執行時間上升了一倍,但是以資料量來說可是原本的 3 倍。

我們一樣來分析 distributed table 分派這個 query 的細節。

dev-clickhouse-cluster01 :) SELECT
    hostname,
    type,
    is_initial_query,
    databases,
    query,
    event_time_microseconds,
    query_start_time_microseconds,
    query_duration_ms,
    read_rows,
    read_bytes,
    result_rows,
    result_bytes,
    memory_usage,
    query_cache_usage
FROM remote('10.1.103.179', system.query_log, 'andywu', '123456')
WHERE initial_query_id = '73edd8c0-f8d2-4290-8b00-5cf659d71a8c'
ORDER BY event_time_microseconds DESC
UNION ALL
SELECT
    hostname,
    type,
    is_initial_query,
    databases,
    query,
    event_time_microseconds,
    query_start_time_microseconds,
    query_duration_ms,
    read_rows,
    read_bytes,
    result_rows,
    result_bytes,
    memory_usage,
    query_cache_usage
FROM remote('10.1.103.180', system.query_log, 'andywu', '123456')
WHERE initial_query_id = '73edd8c0-f8d2-4290-8b00-5cf659d71a8c'
ORDER BY event_time_microseconds DESC
UNION ALL
SELECT
    hostname,
    type,
    is_initial_query,
    databases,
    query,
    event_time_microseconds,
    query_start_time_microseconds,
    query_duration_ms,
    read_rows,
    read_bytes,
    result_rows,
    result_bytes,
    memory_usage,
    query_cache_usage
FROM remote('10.1.103.181', system.query_log, 'andywu', '123456')
WHERE initial_query_id = '73edd8c0-f8d2-4290-8b00-5cf659d71a8c'
ORDER BY event_time_microseconds DESC

Query id: 3993b39b-e615-4641-ac9a-eeea194967c5

   ┌─hostname────────────────────┬─type────────┬─is_initial_query─┬─databases───┬─query───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┬────event_time_microseconds─┬─query_start_time_microseconds─┬─query_duration_ms─┬─read_rows─┬─read_bytes─┬─result_rows─┬─result_bytes─┬─memory_usage─┬─query_cache_usage─┐
1. │ dev-clickhouse-cluster01 │ QueryFinish │                1 │ ['dev']   │ SELECT agcode, SUM(valid_amount) FROM dev.t_bet_origin_distributed WHERE agcode IN ('1853014', '1850001', '1854620') GROUP BY agcode; │ 2024-11-16 16:20:07.745864 │    2024-11-16 16:20:06.212569 │              1533 │ 150000000 │ 3600000000 │           3 │          608 │      2091553 │ None              │
2. │ dev-clickhouse-cluster01 │ QueryFinish │                0 │ ['dev01'] │ SELECT `agcode`, sum(`valid_amount`) FROM `t_bet_origin` WHERE `agcode` IN ('1853014', '1850001', '1854620') GROUP BY `agcode`          │ 2024-11-16 16:20:07.744907 │    2024-11-16 16:20:06.215478 │              1529 │  50000000 │ 1200000000 │           1 │          656 │       237240 │ None              │
3. │ dev-clickhouse-cluster01 │ QueryFinish │                0 │ ['dev03'] │ SELECT `agcode`, sum(`valid_amount`) FROM `t_bet_origin` WHERE `agcode` IN ('1853014', '1850001', '1854620') GROUP BY `agcode`          │ 2024-11-16 16:20:07.736969 │    2024-11-16 16:20:06.216108 │              1520 │  50000000 │ 1200000000 │           1 │          656 │       290384 │ None              │
4. │ dev-clickhouse-cluster01 │ QueryStart  │                0 │ ['dev03'] │ SELECT `agcode`, sum(`valid_amount`) FROM `t_bet_origin` WHERE `agcode` IN ('1853014', '1850001', '1854620') GROUP BY `agcode`          │ 2024-11-16 16:20:06.216108 │    2024-11-16 16:20:06.216108 │                 0 │         0 │          0 │           0 │            0 │            0 │ Unknown           │
5. │ dev-clickhouse-cluster01 │ QueryStart  │                0 │ ['dev01'] │ SELECT `agcode`, sum(`valid_amount`) FROM `t_bet_origin` WHERE `agcode` IN ('1853014', '1850001', '1854620') GROUP BY `agcode`          │ 2024-11-16 16:20:06.215478 │    2024-11-16 16:20:06.215478 │                 0 │         0 │          0 │           0 │            0 │            0 │ Unknown           │
6. │ dev-clickhouse-cluster01 │ QueryStart  │                1 │ ['dev']   │ SELECT agcode, SUM(valid_amount) FROM dev.t_bet_origin_distributed WHERE agcode IN ('1853014', '1850001', '1854620') GROUP BY agcode; │ 2024-11-16 16:20:06.212569 │    2024-11-16 16:20:06.212569 │                 0 │         0 │          0 │           0 │            0 │            0 │ Unknown           │
   └─────────────────────────────┴─────────────┴──────────────────┴─────────────┴─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┴────────────────────────────┴───────────────────────────────┴───────────────────┴───────────┴────────────┴─────────────┴──────────────┴──────────────┴───────────────────┘
   ┌─hostname────────────────────┬─type────────┬─is_initial_query─┬─databases───┬─query──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┬────event_time_microseconds─┬─query_start_time_microseconds─┬─query_duration_ms─┬─read_rows─┬─read_bytes─┬─result_rows─┬─result_bytes─┬─memory_usage─┬─query_cache_usage─┐
7. │ dev-clickhouse-cluster02 │ QueryFinish │                0 │ ['dev02'] │ SELECT `agcode`, sum(`valid_amount`) FROM `t_bet_origin` WHERE `agcode` IN ('1853014', '1850001', '1854620') GROUP BY `agcode` │ 2024-11-16 16:20:07.570665 │    2024-11-16 16:20:06.812059 │               758 │  50000000 │ 1200000000 │           1 │          656 │       236600 │ None              │
8. │ dev-clickhouse-cluster02 │ QueryStart  │                0 │ ['dev02'] │ SELECT `agcode`, sum(`valid_amount`) FROM `t_bet_origin` WHERE `agcode` IN ('1853014', '1850001', '1854620') GROUP BY `agcode` │ 2024-11-16 16:20:06.812059 │    2024-11-16 16:20:06.812059 │                 0 │         0 │          0 │           0 │            0 │            0 │ Unknown           │
   └─────────────────────────────┴─────────────┴──────────────────┴─────────────┴────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┴────────────────────────────┴───────────────────────────────┴───────────────────┴───────────┴────────────┴─────────────┴──────────────┴──────────────┴───────────────────┘

8 rows in set. Elapsed: 0.066 sec. Processed 555.17 thousand rows, 27.32 MB (8.46 million rows/s., 416.54 MB/s.)
Peak memory usage: 10.21 MiB.

dev-clickhouse-cluster01 :)

看到 query log 才發現,原來 clickhouse 覺得 node 1 已經擁有 shard 1 與 shard 3 的資料了,所以並沒有將 query 送到 node 3 去讓 node 3 撈 shard 3 的資料,反而自己單幹完 shard 1 與 shard 3 的資料運算 ,這也解釋了為什麼花了 1.5 秒快兩倍的時間。

(i.e., 它覺得自己把事情做完比叫別人做事後還有來回的網路 I/O 延遲來得快,是不是跟平常寫程式時叫別人改 code 的心態有點像 XD)

3. 透過 distributed table 取得多個 shard 內的資料 (二)

實驗 3 我們把前一個實驗的 where 條件拔掉,來加總所有用戶的有效投注,看有沒有辦法真的讓所有 node 都 busy 起來。

這次我們改從剛剛閒閒沒事做的 node 3 下去 query 他的 distributed table。

root@e516a8a6937a:/# clickhouse-client --host=10.1.103.181 --user=andywu --password=123456
ClickHouse client version 24.3.6.48 (official build).
Connecting to 10.1.103.181:9000 as user andywu.
Connected to ClickHouse server version 24.2.1.

dev-clickhouse-cluster03 :) SELECT
    agcode,
    SUM(valid_amount)
FROM dev.t_bet_origin_distributed
GROUP BY agcode

Query id: 5e01c84f-e81d-4c09-86db-97f65a8f7281

   ┌─agcode──┬─sum(valid_amount)─┐
1. │ 1850001 │       50000000000 │
2. │ 1854629 │       50000000000 │
3. │ 1854628 │       50000000000 │
4. │ 1854620 │       50000000000 │
5. │ 1853014 │       50000000000 │
6. │ 1850377 │       50000000000 │
   └─────────┴───────────────────┘

6 rows in set. Elapsed: 2.023 sec. Processed 300.00 million rows, 7.20 GB (148.28 million rows/s., 3.56 GB/s.)
Peak memory usage: 6.50 MiB.

dev-clickhouse-cluster03 :)

這次的資料量從前一次 1 億 5 千萬筆提升到了 3 億筆。運算時間只從原先的 1.5 秒提升到了 2 秒,性能看起來不錯。

我們一樣來分析 distributed table 分派這個 query 的細節。

dev-clickhouse-cluster03 :) SELECT
    hostname,
    type,
    is_initial_query,
    databases,
    query,
    event_time_microseconds,
    query_start_time_microseconds,
    query_duration_ms,
    read_rows,
    read_bytes,
    result_rows,
    result_bytes,
    memory_usage,
    query_cache_usage
FROM remote('10.1.103.179', system.query_log, 'andywu', '123456')
WHERE initial_query_id = '5e01c84f-e81d-4c09-86db-97f65a8f7281'
ORDER BY event_time_microseconds DESC
UNION ALL
SELECT
    hostname,
    type,
    is_initial_query,
    databases,
    query,
    event_time_microseconds,
    query_start_time_microseconds,
    query_duration_ms,
    read_rows,
    read_bytes,
    result_rows,
    result_bytes,
    memory_usage,
    query_cache_usage
FROM remote('10.1.103.180', system.query_log, 'andywu', '123456')
WHERE initial_query_id = '5e01c84f-e81d-4c09-86db-97f65a8f7281'
ORDER BY event_time_microseconds DESC
UNION ALL
SELECT
    hostname,
    type,
    is_initial_query,
    databases,
    query,
    event_time_microseconds,
    query_start_time_microseconds,
    query_duration_ms,
    read_rows,
    read_bytes,
    result_rows,
    result_bytes,
    memory_usage,
    query_cache_usage
FROM remote('10.1.103.181', system.query_log, 'andywu', '123456')
WHERE initial_query_id = '5e01c84f-e81d-4c09-86db-97f65a8f7281'
ORDER BY event_time_microseconds DESC

Query id: 57cc32d9-7d8f-4177-9485-63af88f98ce7

   ┌─hostname────────────────────┬─type────────┬─is_initial_query─┬─databases─┬─query─────────────────────────────────────────────────────────────────────────────────┬────event_time_microseconds─┬─query_start_time_microseconds─┬─query_duration_ms─┬─read_rows─┬─read_bytes─┬─result_rows─┬─result_bytes─┬─memory_usage─┬─query_cache_usage─┐
1. │ dev-clickhouse-cluster03 │ QueryFinish │                1 │ ['dev'] │ SELECT agcode, SUM(valid_amount) FROM dev.t_bet_origin_distributed GROUP BY agcode; │ 2024-11-16 16:52:39.338032 │    2024-11-16 16:52:37.315424 │              2022 │ 300000000 │ 7200000000 │           6 │          680 │      6820180 │ None              │
2. │ dev-clickhouse-cluster03 │ QueryStart  │                1 │ ['dev'] │ SELECT agcode, SUM(valid_amount) FROM dev.t_bet_origin_distributed GROUP BY agcode; │ 2024-11-16 16:52:37.315424 │    2024-11-16 16:52:37.315424 │                 0 │         0 │          0 │           0 │            0 │            0 │ Unknown           │
   └─────────────────────────────┴─────────────┴──────────────────┴───────────┴───────────────────────────────────────────────────────────────────────────────────────┴────────────────────────────┴───────────────────────────────┴───────────────────┴───────────┴────────────┴─────────────┴──────────────┴──────────────┴───────────────────┘
   ┌─hostname────────────────────┬─type────────┬─is_initial_query─┬─databases───┬─query──────────────────────────────────────────────────────────────────────┬────event_time_microseconds─┬─query_start_time_microseconds─┬─query_duration_ms─┬─read_rows─┬─read_bytes─┬─result_rows─┬─result_bytes─┬─memory_usage─┬─query_cache_usage─┐
3. │ dev-clickhouse-cluster02 │ QueryFinish │                0 │ ['dev01'] │ SELECT `agcode`, sum(`valid_amount`) FROM `t_bet_origin` GROUP BY `agcode` │ 2024-11-16 16:52:40.042443 │    2024-11-16 16:52:38.041108 │              2001 │ 100000000 │ 2400000000 │           2 │          664 │       368096 │ None              │
4. │ dev-clickhouse-cluster02 │ QueryFinish │                0 │ ['dev02'] │ SELECT `agcode`, sum(`valid_amount`) FROM `t_bet_origin` GROUP BY `agcode` │ 2024-11-16 16:52:40.040456 │    2024-11-16 16:52:38.041123 │              1999 │ 100000000 │ 2400000000 │           2 │          664 │       367600 │ None              │
5. │ dev-clickhouse-cluster02 │ QueryStart  │                0 │ ['dev02'] │ SELECT `agcode`, sum(`valid_amount`) FROM `t_bet_origin` GROUP BY `agcode` │ 2024-11-16 16:52:38.041123 │    2024-11-16 16:52:38.041123 │                 0 │         0 │          0 │           0 │            0 │            0 │ Unknown           │
6. │ dev-clickhouse-cluster02 │ QueryStart  │                0 │ ['dev01'] │ SELECT `agcode`, sum(`valid_amount`) FROM `t_bet_origin` GROUP BY `agcode` │ 2024-11-16 16:52:38.041108 │    2024-11-16 16:52:38.041108 │                 0 │         0 │          0 │           0 │            0 │            0 │ Unknown           │
   └─────────────────────────────┴─────────────┴──────────────────┴─────────────┴────────────────────────────────────────────────────────────────────────────┴────────────────────────────┴───────────────────────────────┴───────────────────┴───────────┴────────────┴─────────────┴──────────────┴──────────────┴───────────────────┘
   ┌─hostname────────────────────┬─type────────┬─is_initial_query─┬─databases───┬─query──────────────────────────────────────────────────────────────────────┬────event_time_microseconds─┬─query_start_time_microseconds─┬─query_duration_ms─┬─read_rows─┬─read_bytes─┬─result_rows─┬─result_bytes─┬─memory_usage─┬─query_cache_usage─┐
7. │ dev-clickhouse-cluster01 │ QueryFinish │                0 │ ['dev03'] │ SELECT `agcode`, sum(`valid_amount`) FROM `t_bet_origin` GROUP BY `agcode` │ 2024-11-16 16:52:38.697425 │    2024-11-16 16:52:37.423076 │              1274 │ 100000000 │ 2400000000 │           2 │          664 │       367640 │ None              │
8. │ dev-clickhouse-cluster01 │ QueryStart  │                0 │ ['dev03'] │ SELECT `agcode`, sum(`valid_amount`) FROM `t_bet_origin` GROUP BY `agcode` │ 2024-11-16 16:52:37.423076 │    2024-11-16 16:52:37.423076 │                 0 │         0 │          0 │           0 │            0 │            0 │ Unknown           │
   └─────────────────────────────┴─────────────┴──────────────────┴─────────────┴────────────────────────────────────────────────────────────────────────────┴────────────────────────────┴───────────────────────────────┴───────────────────┴───────────┴────────────┴─────────────┴──────────────┴──────────────┴───────────────────┘

8 rows in set. Elapsed: 0.063 sec. Processed 555.29 thousand rows, 27.48 MB (8.80 million rows/s., 435.66 MB/s.)
Peak memory usage: 5.44 MiB.

dev-clickhouse-cluster03 :)

抓到,node 3 在給我偷懶,他把 query 丟到 node 1 與 node 2 去讓它們算,自己卻只做分派任務的動作並等著收割結果,跟人類世界會出現的行為 87% 像

node 1 花了 1.2 秒加總完 shard 3 的資料。

node 2 花了 2 秒分別加總完 shard 1 與 shard 2 的資料。

node 3 的 distributed table 等到 node 2 運算完返回所花的 2 秒後,自己只再花了 21 毫秒就將結果匯總返回。

我們總得想個方法強迫 node 3 也加入勞動運算 shard 2 的資料,這樣勢必可以也讓 node 2 運算壓在 1 秒左右,降低 distributed table 等待 node 2 的時間。

4. 透過 distributed table 取得多個 shard 內的資料 (三)

實驗 4 這次我們不走 agcode 這個 sharding key 當條件而是改用 game_type 這個既不是 sharding key、sorting key、primary key 的欄位 (等同於 RDBMS 完全不走 index 的概念),藉此這個 cluster 就沒辦法事前知道資料的分佈。

(你各位不要再偷懶了,node 1、node 2、特別是 node 3 都給我跑起來,不要用走的。)

root@e516a8a6937a:/# clickhouse-client --host=10.1.103.179 --user=andywu --password=123456
ClickHouse client version 24.3.6.48 (official build).
Connecting to 10.1.103.179:9000 as user andywu.
Connected to ClickHouse server version 24.2.1.

dev-clickhouse-cluster01 :) SELECT
    game_type,
    SUM(valid_amount)
FROM dev.t_bet_origin_distributed
WHERE game_type = 'BAC'
GROUP BY game_type

Query id: aacd267d-e922-4d7d-9903-8ed7c3e57537

   ┌─game_type─┬─sum(valid_amount)─┐
1. │ BAC       │      300000000000 │
   └───────────┴───────────────────┘

1 row in set. Elapsed: 1.291 sec. Processed 300.00 million rows, 6.00 GB (232.35 million rows/s., 4.65 GB/s.)
Peak memory usage: 2.00 MiB.

dev-clickhouse-cluster01 :)

有唷看起來這次有效,這次 3 億筆資料的運算時間下降到 1.2 秒左右了。

來看一下分派 query 的細節。

dev-clickhouse-cluster01 :) SELECT
    hostname,
    type,
    is_initial_query,
    databases,
    query,
    event_time_microseconds,
    query_start_time_microseconds,
    query_duration_ms,
    read_rows,
    read_bytes,
    result_rows,
    result_bytes,
    memory_usage,
    query_cache_usage
FROM remote('10.1.103.179', system.query_log, 'andywu', '123456')
WHERE initial_query_id = 'aacd267d-e922-4d7d-9903-8ed7c3e57537'
ORDER BY event_time_microseconds DESC
UNION ALL
SELECT
    hostname,
    type,
    is_initial_query,
    databases,
    query,
    event_time_microseconds,
    query_start_time_microseconds,
    query_duration_ms,
    read_rows,
    read_bytes,
    result_rows,
    result_bytes,
    memory_usage,
    query_cache_usage
FROM remote('10.1.103.180', system.query_log, 'andywu', '123456')
WHERE initial_query_id = 'aacd267d-e922-4d7d-9903-8ed7c3e57537'
ORDER BY event_time_microseconds DESC
UNION ALL
SELECT
    hostname,
    type,
    is_initial_query,
    databases,
    query,
    event_time_microseconds,
    query_start_time_microseconds,
    query_duration_ms,
    read_rows,
    read_bytes,
    result_rows,
    result_bytes,
    memory_usage,
    query_cache_usage
FROM remote('10.1.103.181', system.query_log, 'andywu', '123456')
WHERE initial_query_id = 'aacd267d-e922-4d7d-9903-8ed7c3e57537'
ORDER BY event_time_microseconds DESC

Query id: bbf1a3bd-a9b3-47af-a99c-c4205bce3683

   ┌─hostname────────────────────┬─type────────┬─is_initial_query─┬─databases───┬─query───────────────────────────────────────────────────────────────────────────────────────────────────────────────┬────event_time_microseconds─┬─query_start_time_microseconds─┬─query_duration_ms─┬─read_rows─┬─read_bytes─┬─result_rows─┬─result_bytes─┬─memory_usage─┬─query_cache_usage─┐
1. │ dev-clickhouse-cluster01 │ QueryFinish │                1 │ ['dev']   │ SELECT game_type, SUM(valid_amount) FROM dev.t_bet_origin_distributed WHERE game_type = 'BAC' GROUP BY game_type; │ 2024-11-16 17:36:06.269881 │    2024-11-16 17:36:04.978911 │              1290 │ 300000000 │ 6000000000 │           1 │          560 │      2094237 │ None              │
2. │ dev-clickhouse-cluster01 │ QueryFinish │                0 │ ['dev03'] │ SELECT `game_type`, sum(`valid_amount`) FROM `t_bet_origin` WHERE `game_type` = 'BAC' GROUP BY `game_type`          │ 2024-11-16 17:36:06.163945 │    2024-11-16 17:36:04.982116 │              1181 │ 100000000 │ 2000000000 │           1 │          656 │       445848 │ None              │
3. │ dev-clickhouse-cluster01 │ QueryStart  │                0 │ ['dev03'] │ SELECT `game_type`, sum(`valid_amount`) FROM `t_bet_origin` WHERE `game_type` = 'BAC' GROUP BY `game_type`          │ 2024-11-16 17:36:04.982116 │    2024-11-16 17:36:04.982116 │                 0 │         0 │          0 │           0 │            0 │            0 │ Unknown           │
4. │ dev-clickhouse-cluster01 │ QueryStart  │                1 │ ['dev']   │ SELECT game_type, SUM(valid_amount) FROM dev.t_bet_origin_distributed WHERE game_type = 'BAC' GROUP BY game_type; │ 2024-11-16 17:36:04.978911 │    2024-11-16 17:36:04.978911 │                 0 │         0 │          0 │           0 │            0 │            0 │ Unknown           │
   └─────────────────────────────┴─────────────┴──────────────────┴─────────────┴─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┴────────────────────────────┴───────────────────────────────┴───────────────────┴───────────┴────────────┴─────────────┴──────────────┴──────────────┴───────────────────┘
   ┌─hostname────────────────────┬─type────────┬─is_initial_query─┬─databases───┬─query──────────────────────────────────────────────────────────────────────────────────────────────────────┬────event_time_microseconds─┬─query_start_time_microseconds─┬─query_duration_ms─┬─read_rows─┬─read_bytes─┬─result_rows─┬─result_bytes─┬─memory_usage─┬─query_cache_usage─┐
5. │ dev-clickhouse-cluster02 │ QueryFinish │                0 │ ['dev01'] │ SELECT `game_type`, sum(`valid_amount`) FROM `t_bet_origin` WHERE `game_type` = 'BAC' GROUP BY `game_type` │ 2024-11-16 17:36:06.903508 │    2024-11-16 17:36:05.632965 │              1270 │ 100000000 │ 2000000000 │           1 │          656 │       425640 │ None              │
6. │ dev-clickhouse-cluster02 │ QueryStart  │                0 │ ['dev01'] │ SELECT `game_type`, sum(`valid_amount`) FROM `t_bet_origin` WHERE `game_type` = 'BAC' GROUP BY `game_type` │ 2024-11-16 17:36:05.632965 │    2024-11-16 17:36:05.632965 │                 0 │         0 │          0 │           0 │            0 │            0 │ Unknown           │
   └─────────────────────────────┴─────────────┴──────────────────┴─────────────┴────────────────────────────────────────────────────────────────────────────────────────────────────────────┴────────────────────────────┴───────────────────────────────┴───────────────────┴───────────┴────────────┴─────────────┴──────────────┴──────────────┴───────────────────┘
   ┌─hostname────────────────────┬─type────────┬─is_initial_query─┬─databases───┬─query──────────────────────────────────────────────────────────────────────────────────────────────────────┬────event_time_microseconds─┬─query_start_time_microseconds─┬─query_duration_ms─┬─read_rows─┬─read_bytes─┬─result_rows─┬─result_bytes─┬─memory_usage─┬─query_cache_usage─┐
7. │ dev-clickhouse-cluster03 │ QueryFinish │                0 │ ['dev02'] │ SELECT `game_type`, sum(`valid_amount`) FROM `t_bet_origin` WHERE `game_type` = 'BAC' GROUP BY `game_type` │ 2024-11-16 17:36:06.170086 │    2024-11-16 17:36:04.888504 │              1281 │ 100000000 │ 2000000000 │           1 │          656 │       425760 │ None              │
8. │ dev-clickhouse-cluster03 │ QueryStart  │                0 │ ['dev02'] │ SELECT `game_type`, sum(`valid_amount`) FROM `t_bet_origin` WHERE `game_type` = 'BAC' GROUP BY `game_type` │ 2024-11-16 17:36:04.888504 │    2024-11-16 17:36:04.888504 │                 0 │         0 │          0 │           0 │            0 │            0 │ Unknown           │
   └─────────────────────────────┴─────────────┴──────────────────┴─────────────┴────────────────────────────────────────────────────────────────────────────────────────────────────────────┴────────────────────────────┴───────────────────────────────┴───────────────────┴───────────┴────────────┴─────────────┴──────────────┴──────────────┴───────────────────┘

8 rows in set. Elapsed: 0.071 sec. Processed 555.40 thousand rows, 24.99 MB (7.85 million rows/s., 353.26 MB/s.)
Peak memory usage: 5.69 MiB.

dev-clickhouse-cluster01 :)

這次 distributed table 選擇讓 node 1 去算 shard 3 的資料、node 2 去算 shard 1 的資料、node 3 去算 shard 2 的資料。

由於每個 node 的 I/O 都很平均,都以差異不大的 1.2 秒左右做完苦力活,最後由 distributed table 收集完資料後大約再花 10 毫秒做最後的匯總並返回。

這是我們預期中的結果 (⁎⁍̴̛ᴗ⁍̴̛⁎)!!

總結

透過實驗我們可以看到使用 distributed table engine 創建的 table 確實可以將運算平行分攤下去讓握有不同 shard 的 node 各自做完運算後將運算結果匯總後再返回,實現系統設計中所追求的水平擴展 ✨。雖然偶爾還是會發生不妥善利用全部 cluster node 的算力 (類比於 RDBMS 有 index 但不走 index) 的迷惑行為。

另外,本篇只展示了 distributed table engine + sharding 的賣點,還沒有玩到每個 shard 有 2 個 replica 的 HA 特性。

有機會的話也可以實驗當其中一個 node 死掉時,distributed table 還是可以拿到正確資料、以及在其中一個 node 死掉的期間對 distributed table 插入了新資料後,這個死掉的 node 復活後也會把 out of sync 的資料同步回來的玩法。

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *