Citus分布式扩展使用说明

简介

Citus 是一个 PostgreSQL 扩展,可将 PostgreSQL 转换为分布式数据库,因此您可以在任何规模上实现高性能。

Citus使用分片和复制在多台机器上横向扩展PostgreSQL。它的查询引擎在这些服务器上执行SQL进行并行化查询,以便在大型数据集上实现实时的响应。

Citus集群由一个中心的协调节点(CN)和若干个工作节点(Worker)构成。

  • coordinate:协调节点,存储元数据,不存实际数据。该节点直接对用户开放,等于一个客户端。

  • worker:工作节点,不存储元数据,存储实际数据,执行协调节点发来的查询请求。

安装部署

  • Yukon安装包里带有Citus所需的相关文件,安装完Yukon后,您只需执行下面的节点配置步骤即可。

  • 或者从 Citus源码库 里下载源码,解压文件,进入citus目录,导出 pgconfig 文件目录, 如:export PG_CONFIG=/opt/pg13/bin/pg_config,运行 configure 配置文件, ./configure ,如果出错提示缺少部分库文件,可以直接安装对应的库,如:yum install -y libzstd-devel lz4-devel,或者使用 without 选项,屏蔽该需求

编译安装

make -j && make install

单节点:

请将以下内容添加到:postgresql.conf

shared_preload_libraries = 'citus'

重启PostgreSQL后,登录数据库创建扩展

create extension citus;

多节点:

如果要设置多节点集群,使用 Citus 扩展配置其他 PostgreSQL 节点,并添加它们以形成 Citus 集群:

编辑 postgresql.conf 配置文件,设置 listen_addresses = '*' ,监听地址改为全部

请将以下内容添加到 pg_hba.conf 配置文件中的ipv4配置下

host all all 0.0.0.0/0 trust

重启PostgreSQL使配置生效

登录数据库,在协调节点上添加其他工作节点的ip地址, 数据库端口号

select * from master_add_node('192.168.xxx.xxx', 5432);

查询添加成功的节点,如果添加成功,则会显示已添加的节点信息

select * from master_get_active_worker_nodes();

node_name       node_port
------------------------+
192.168.xxx.xxx 5432

Note

集群需要在每个节点上同样部署Postgresql数据库 + Citus扩展,所用数据库名、默认用户名保持一致 ,数据库都需要创建Citus扩展以及其他所需要的扩展,确保各节点之间可通信,选择一台机器作为协调节点,其他机器作为工作节点 (已经存在的表,想创建分布式表,首先在另外的机器上创建同名数据库,创建扩展)

使用说明

create_distributed_table()

定义分布式表并创建其分片,如果未指定分发方法,则该函数默认为“哈希”分布

参数名称 描述

table_name

需要分发的表的名称

distribution_column

要将表分布到的列

distribution_type(可选)

表的分布方法,允许的值为追加或哈希,默认为“哈希”

alter_distributed_table()

更改分布式表的分布列、分片计数或共置属性

参数名称

描述

table_name

将要更改的分布式表的名称

distribution_column(可选)

新分发列的名称

shard_count(可选)

新分片计数

colocate_with(可选)

当前分布式表将与之共置的表。可能的值为 ,用于启动新的共置组,或用于共置的另一个表的名称

cascade_to_colocated(可选)

当此参数设置为“true”时,更改也将应用于以前与该表共置的所有表,并且将保留共置。如果它是“false”,则此表的当前共置将被破坏

master_add_node()

添加工作节点

参数名称

描述

ip

IP地址

port

端口号

undistribute_table()

撤消create_distributed_table或create_reference_table的操作

参数名称

描述

table_name

要取消分发的分布式表或引用表的名称

cascade_via_foreign_keys(可选)

当此参数设置为“true”时,undistribute_table还会通过外键取消分布与table_name相关的所有表。请谨慎使用此参数,因为它可能会影响许多表

扩容

对于新添加的节点,Citus不会自动移动数据到新节点上,因此我们可以手动平移数据到新的节点上,达到扩容减压的目的,目前有以下两个方法可以实现

  • 可以手动复制某工作节点的表到新节点,然后修改 pg_dist_placement 表的表名和grids值,删除原工作节点上该表,此时总表数不变,完成扩容减压。

创建一张分布式表,以哈希值进行分发

create table t1(id serial primary key, geom geometry);

set citus.shard_count = 6;

select create_distributed_table('t1', 'id', 'hash');

with a as (select id ,(random()*170)::float x, (random()*80)::float y from generate_series(1, 600) t(id))
insert into t1(geom) select st_makeenvelope(x, y, x+0.001, y+0.001, 4490) from a;

首先,查看每个节点对应的groupid值

select * from pg_dist_node;

nodeid|groupid|nodename      |nodeport|noderack|hasmetadata|isactive|noderole|nodecluster|metadatasynced|shouldhaveshards|
------+-------+--------------+--------+--------+-----------+--------+--------+-----------+--------------+----------------+
     2|      2|192.168.12.205|   13000|default |false      |true    |primary |default    |false         |true            |
     1|      1|192.168.12.122|   13000|default |false      |true    |primary |default    |false         |true            |

查看每一张表的shardid值

select * from citus_shards;

table_name|shardid|shard_name|citus_table_type|colocation_id|nodename      |nodeport|shard_size|
----------+-------+----------+----------------+-------------+--------------+--------+----------+
t1        | 102161|t1_102161 |distributed     |            8|192.168.12.122|   13000|     24576|
t1        | 102162|t1_102162 |distributed     |            8|192.168.12.205|   13000|     16384|
t1        | 102163|t1_102163 |distributed     |            8|192.168.12.122|   13000|     24576|
t1        | 102164|t1_102164 |distributed     |            8|192.168.12.205|   13000|     16384|
t1        | 102165|t1_102165 |distributed     |            8|192.168.12.122|   13000|     16384|
t1        | 102166|t1_102166 |distributed     |            8|192.168.12.205|   13000|     16384|

查看每张分片表分布在哪一个节点上

select * from pg_dist_placement where shardid in (select shardid from pg_dist_shard where logicalrelid='tb1'::regclass);

placementid|shardid|shardstate|shardlength|groupid|
-----------+-------+----------+-----------+-------+
        154| 102161|         1|          0|      1|
        155| 102162|         1|          0|      2|
        156| 102163|         1|          0|      1|
        157| 102164|         1|          0|      2|
        158| 102165|         1|          0|      1|
        159| 102166|         1|          0|      2|

在协调节点上加入新的worker

select * from master_add_node('192.168.12.201', 5432);

检查pg_dist_node元数据表,新的worker节点的groupid为5

nodeid|groupid|nodename      |nodeport|noderack|hasmetadata|isactive|noderole|nodecluster|metadatasynced|shouldhaveshards|
------+-------+--------------+--------+--------+-----------+--------+--------+-----------+--------------+----------------+
     2|      2|192.168.12.205|   13000|default |false      |true    |primary |default    |false         |true            |
     1|      1|192.168.12.122|   13000|default |false      |true    |primary |default    |false         |true            |
     5|      5|192.168.12.201|    5432|default |false      |true    |primary |default    |false         |true            |

复制分片,现在1、2号节点上各3片,为了保持均衡,我们可以移动部分分片到5号节点上

下面移动1号节点上的分片t1_102161到5号节点上:

在1号节点上创建PUBLICATION(此步操作是在1号工作节点上执行,不是在协调节点上)

create PUBLICATION pub_shard for table tb1_102046;

在3号节点上创建分片表和SUBSCRIPTION(此步操作是在3号工作节点上执行,不是在协调节点上,表名、表结构和要复制的表保持一致)

create table t1_102161(id int primary key, geom geometry);

CREATE SUBSCRIPTION sub_shard
    CONNECTION 'host=192.168.12.122, port=13000, dbname=testcitus'
    PUBLICATION pub_shard;

在协调节点上锁表,防止数据被修改

begin;
lock table tb1 IN EXCLUSIVE MODE;

等待3号节点上的表数据和1号节点上的表数据完全同步,解锁表,修改元数据表

end;
update pg_dist_placement set groupid=5 where shardid=102161 and groupid=1;

在1号节点上删除分片表和PUBLICATION

DROP PUBLICATION pub_shard;

drop table tb1_102046;

在5号节点上删除SUBSCRIPTION

DROP SUBSCRIPTION sub_shard;

扩容完成

  • alter_distributed_table函数,可以自动将已有的分片表再次重新平均分配到每个节点,如2个节点共2张表,添加新节点后,修改分片表数为3,则分布情况为3个节点共3张表,也可以达到扩容减压的目的。

使用案例

首先根据集群环境设置分表数,分表时会平均的分配到每个工作节点上,如count=32,工作节点有两个,则会在每台工作节点上创建16张表,在我们的测试中,使用GIN索引查询网格编码时,每个工作节点的表数不超过cpu逻辑核数的一半时效果最佳。

案例环境为 3台 8核x86 机器,组合为 1cn + 2worker 的集群。

set citus.shard_count = 8;

创建表

create table polygon_grid21(id serial primary key, geom geometry, grids geosotgrid[]);

按id列上的哈希值对源表进行分发

select create_distributed_table('polygon_grid21', 'id');

往表 polygon_grid21 里插入数据

with a as (select (random()*170)::float x, (random()*80)::float y from generate_series(1, 1000000))
insert into polygon_grid21(geom, grids) select st_makeenvelope(x, y, x+0.001, y+0.001, 4490), ST_GeoSOTGrid(st_makeenvelope(x, y, x+0.001, y+0.001, 4490), 21) from a;

查看分表情况、哈希分布范围

select * from citus_shards;

table_name    |shardid|shard_name           |citus_table_type|colocation_id|nodename      |nodeport|shard_size|
--------------+-------+---------------------+----------------+-------------+--------------+--------+----------+
polygon_grid21| 102126|polygon_grid21_102126|distributed     |            1|192.168.12.122|   13000|  66879488|
polygon_grid21| 102127|polygon_grid21_102127|distributed     |            1|192.168.12.205|   13000|  66551808|
polygon_grid21| 102128|polygon_grid21_102128|distributed     |            1|192.168.12.122|   13000|  66568192|
polygon_grid21| 102129|polygon_grid21_102129|distributed     |            1|192.168.12.205|   13000|  66650112|
polygon_grid21| 102130|polygon_grid21_102130|distributed     |            1|192.168.12.122|   13000|  66789376|
polygon_grid21| 102131|polygon_grid21_102131|distributed     |            1|192.168.12.205|   13000|  66732032|
polygon_grid21| 102132|polygon_grid21_102132|distributed     |            1|192.168.12.122|   13000|  66486272|
polygon_grid21| 102133|polygon_grid21_102133|distributed     |            1|192.168.12.205|   13000|  66625536|

select * from pg_dist_shard;

table_name    |shardid|shard_name           |citus_table_type|colocation_id|nodename      |nodeport|shard_size|
--------------+-------+---------------------+----------------+-------------+--------------+--------+----------+
polygon_grid21| 102126|polygon_grid21_102126|distributed     |            1|192.168.12.122|   13000|  66879488|
polygon_grid21| 102127|polygon_grid21_102127|distributed     |            1|192.168.12.205|   13000|  66551808|
polygon_grid21| 102128|polygon_grid21_102128|distributed     |            1|192.168.12.122|   13000|  66568192|
polygon_grid21| 102129|polygon_grid21_102129|distributed     |            1|192.168.12.205|   13000|  66650112|
polygon_grid21| 102130|polygon_grid21_102130|distributed     |            1|192.168.12.122|   13000|  66789376|
polygon_grid21| 102131|polygon_grid21_102131|distributed     |            1|192.168.12.205|   13000|  66732032|
polygon_grid21| 102132|polygon_grid21_102132|distributed     |            1|192.168.12.122|   13000|  66486272|
polygon_grid21| 102133|polygon_grid21_102133|distributed     |            1|192.168.12.205|   13000|  66625536|

创建索引

create index gin_polygon_grid21 on polygon_grid21 using GIN(grids);

相交查询

select * from polygon_grid21 where grids && ST_GeoSOTGrid(st_makeenvelope(0, 0, 0.01, 0.01, 4490), 21);

删除一半数据,查询结果

delete from polygon_grid21 where id % 2 = 0;

select count(*) from polygon_grid21;

count |
------+
500000|

再次插入10w数据,查看结果

with a as (select (random()*170)::float x, (random()*80)::float y from generate_series(1, 100000))
insert into polygon_grid21(geom, grids) select st_makeenvelope(x, y, x+0.001, y+0.001, 4490), ST_GeoSOTGrid(st_makeenvelope(x, y, x+0.001, y+0.001, 4490), 21) from a;

select count(*) from polygon_grid21;

count |
------+
600000|

想要获取更加详细的资料,请访问 Citus官方文档