Citus分布式扩展使用说明
简介
Citus 是一个 PostgreSQL 扩展,可将 PostgreSQL 转换为分布式数据库,因此您可以在任何规模上实现高性能。
Citus使用分片和复制在多台机器上横向扩展PostgreSQL。它的查询引擎在这些服务器上执行SQL进行并行化查询,以便在大型数据集上实现实时的响应。
Citus集群由一个中心的协调节点(CN)和若干个工作节点(Worker)构成。
coordinate:协调节点,存储元数据,不存实际数据。该节点直接对用户开放,等于一个客户端。
worker:工作节点,不存储元数据,存储实际数据,执行协调节点发来的查询请求。
安装部署
Yukon安装包里带有Citus所需的相关文件,安装完Yukon后,您只需执行下面的节点配置步骤即可。
或者从 Citus源码库 里下载源码,解压文件,进入citus目录,导出
pgconfig
文件目录, 如:export PG_CONFIG=/opt/pg13/bin/pg_config
,运行configure
配置文件,./configure
,如果出错提示缺少部分库文件,可以直接安装对应的库,如:yum install -y libzstd-devel lz4-devel
,或者使用without
选项,屏蔽该需求
编译安装
make -j && make install
单节点:
请将以下内容添加到:postgresql.conf
shared_preload_libraries = 'citus'
重启PostgreSQL后,登录数据库创建扩展
create extension citus;
多节点:
如果要设置多节点集群,使用 Citus 扩展配置其他 PostgreSQL 节点,并添加它们以形成 Citus 集群:
编辑 postgresql.conf
配置文件,设置 listen_addresses = '*'
,监听地址改为全部
请将以下内容添加到 pg_hba.conf
配置文件中的ipv4配置下
host all all 0.0.0.0/0 trust
重启PostgreSQL使配置生效
登录数据库,在协调节点上添加其他工作节点的ip地址, 数据库端口号
select * from master_add_node('192.168.xxx.xxx', 5432);
查询添加成功的节点,如果添加成功,则会显示已添加的节点信息
select * from master_get_active_worker_nodes();
node_name node_port
------------------------+
192.168.xxx.xxx 5432
Note
集群需要在每个节点上同样部署Postgresql数据库 + Citus扩展,所用数据库名、默认用户名保持一致 ,数据库都需要创建Citus扩展以及其他所需要的扩展,确保各节点之间可通信,选择一台机器作为协调节点,其他机器作为工作节点 (已经存在的表,想创建分布式表,首先在另外的机器上创建同名数据库,创建扩展)
使用说明
create_distributed_table()
定义分布式表并创建其分片,如果未指定分发方法,则该函数默认为“哈希”分布
参数名称 描述 |
|
---|---|
table_name |
需要分发的表的名称 |
distribution_column |
要将表分布到的列 |
distribution_type(可选) |
表的分布方法,允许的值为追加或哈希,默认为“哈希” |
alter_distributed_table()
更改分布式表的分布列、分片计数或共置属性
参数名称 |
描述 |
---|---|
table_name |
将要更改的分布式表的名称 |
distribution_column(可选) |
新分发列的名称 |
shard_count(可选) |
新分片计数 |
colocate_with(可选) |
当前分布式表将与之共置的表。可能的值为 ,用于启动新的共置组,或用于共置的另一个表的名称 |
cascade_to_colocated(可选) |
当此参数设置为“true”时,更改也将应用于以前与该表共置的所有表,并且将保留共置。如果它是“false”,则此表的当前共置将被破坏 |
master_add_node()
添加工作节点
参数名称 |
描述 |
---|---|
ip |
IP地址 |
port |
端口号 |
undistribute_table()
撤消create_distributed_table或create_reference_table的操作
参数名称 |
描述 |
---|---|
table_name |
要取消分发的分布式表或引用表的名称 |
cascade_via_foreign_keys(可选) |
当此参数设置为“true”时,undistribute_table还会通过外键取消分布与table_name相关的所有表。请谨慎使用此参数,因为它可能会影响许多表 |
扩容
对于新添加的节点,Citus不会自动移动数据到新节点上,因此我们可以手动平移数据到新的节点上,达到扩容减压的目的,目前有以下两个方法可以实现
可以手动复制某工作节点的表到新节点,然后修改 pg_dist_placement 表的表名和grids值,删除原工作节点上该表,此时总表数不变,完成扩容减压。
创建一张分布式表,以哈希值进行分发
create table t1(id serial primary key, geom geometry);
set citus.shard_count = 6;
select create_distributed_table('t1', 'id', 'hash');
with a as (select id ,(random()*170)::float x, (random()*80)::float y from generate_series(1, 600) t(id))
insert into t1(geom) select st_makeenvelope(x, y, x+0.001, y+0.001, 4490) from a;
首先,查看每个节点对应的groupid值
select * from pg_dist_node;
nodeid|groupid|nodename |nodeport|noderack|hasmetadata|isactive|noderole|nodecluster|metadatasynced|shouldhaveshards|
------+-------+--------------+--------+--------+-----------+--------+--------+-----------+--------------+----------------+
2| 2|192.168.12.205| 13000|default |false |true |primary |default |false |true |
1| 1|192.168.12.122| 13000|default |false |true |primary |default |false |true |
查看每一张表的shardid值
select * from citus_shards;
table_name|shardid|shard_name|citus_table_type|colocation_id|nodename |nodeport|shard_size|
----------+-------+----------+----------------+-------------+--------------+--------+----------+
t1 | 102161|t1_102161 |distributed | 8|192.168.12.122| 13000| 24576|
t1 | 102162|t1_102162 |distributed | 8|192.168.12.205| 13000| 16384|
t1 | 102163|t1_102163 |distributed | 8|192.168.12.122| 13000| 24576|
t1 | 102164|t1_102164 |distributed | 8|192.168.12.205| 13000| 16384|
t1 | 102165|t1_102165 |distributed | 8|192.168.12.122| 13000| 16384|
t1 | 102166|t1_102166 |distributed | 8|192.168.12.205| 13000| 16384|
查看每张分片表分布在哪一个节点上
select * from pg_dist_placement where shardid in (select shardid from pg_dist_shard where logicalrelid='tb1'::regclass);
placementid|shardid|shardstate|shardlength|groupid|
-----------+-------+----------+-----------+-------+
154| 102161| 1| 0| 1|
155| 102162| 1| 0| 2|
156| 102163| 1| 0| 1|
157| 102164| 1| 0| 2|
158| 102165| 1| 0| 1|
159| 102166| 1| 0| 2|
在协调节点上加入新的worker
select * from master_add_node('192.168.12.201', 5432);
检查pg_dist_node元数据表,新的worker节点的groupid为5
nodeid|groupid|nodename |nodeport|noderack|hasmetadata|isactive|noderole|nodecluster|metadatasynced|shouldhaveshards|
------+-------+--------------+--------+--------+-----------+--------+--------+-----------+--------------+----------------+
2| 2|192.168.12.205| 13000|default |false |true |primary |default |false |true |
1| 1|192.168.12.122| 13000|default |false |true |primary |default |false |true |
5| 5|192.168.12.201| 5432|default |false |true |primary |default |false |true |
复制分片,现在1、2号节点上各3片,为了保持均衡,我们可以移动部分分片到5号节点上
下面移动1号节点上的分片t1_102161到5号节点上:
在1号节点上创建PUBLICATION(此步操作是在1号工作节点上执行,不是在协调节点上)
create PUBLICATION pub_shard for table tb1_102046;
在3号节点上创建分片表和SUBSCRIPTION(此步操作是在3号工作节点上执行,不是在协调节点上,表名、表结构和要复制的表保持一致)
create table t1_102161(id int primary key, geom geometry);
CREATE SUBSCRIPTION sub_shard
CONNECTION 'host=192.168.12.122, port=13000, dbname=testcitus'
PUBLICATION pub_shard;
在协调节点上锁表,防止数据被修改
begin;
lock table tb1 IN EXCLUSIVE MODE;
等待3号节点上的表数据和1号节点上的表数据完全同步,解锁表,修改元数据表
end;
update pg_dist_placement set groupid=5 where shardid=102161 and groupid=1;
在1号节点上删除分片表和PUBLICATION
DROP PUBLICATION pub_shard;
drop table tb1_102046;
在5号节点上删除SUBSCRIPTION
DROP SUBSCRIPTION sub_shard;
扩容完成
alter_distributed_table函数,可以自动将已有的分片表再次重新平均分配到每个节点,如2个节点共2张表,添加新节点后,修改分片表数为3,则分布情况为3个节点共3张表,也可以达到扩容减压的目的。
使用案例
首先根据集群环境设置分表数,分表时会平均的分配到每个工作节点上,如count=32,工作节点有两个,则会在每台工作节点上创建16张表,在我们的测试中,使用GIN索引查询网格编码时,每个工作节点的表数不超过cpu逻辑核数的一半时效果最佳。
案例环境为 3台 8核x86 机器,组合为 1cn + 2worker 的集群。
set citus.shard_count = 8;
创建表
create table polygon_grid21(id serial primary key, geom geometry, grids geosotgrid[]);
按id列上的哈希值对源表进行分发
select create_distributed_table('polygon_grid21', 'id');
往表 polygon_grid21 里插入数据
with a as (select (random()*170)::float x, (random()*80)::float y from generate_series(1, 1000000))
insert into polygon_grid21(geom, grids) select st_makeenvelope(x, y, x+0.001, y+0.001, 4490), ST_GeoSOTGrid(st_makeenvelope(x, y, x+0.001, y+0.001, 4490), 21) from a;
查看分表情况、哈希分布范围
select * from citus_shards;
table_name |shardid|shard_name |citus_table_type|colocation_id|nodename |nodeport|shard_size|
--------------+-------+---------------------+----------------+-------------+--------------+--------+----------+
polygon_grid21| 102126|polygon_grid21_102126|distributed | 1|192.168.12.122| 13000| 66879488|
polygon_grid21| 102127|polygon_grid21_102127|distributed | 1|192.168.12.205| 13000| 66551808|
polygon_grid21| 102128|polygon_grid21_102128|distributed | 1|192.168.12.122| 13000| 66568192|
polygon_grid21| 102129|polygon_grid21_102129|distributed | 1|192.168.12.205| 13000| 66650112|
polygon_grid21| 102130|polygon_grid21_102130|distributed | 1|192.168.12.122| 13000| 66789376|
polygon_grid21| 102131|polygon_grid21_102131|distributed | 1|192.168.12.205| 13000| 66732032|
polygon_grid21| 102132|polygon_grid21_102132|distributed | 1|192.168.12.122| 13000| 66486272|
polygon_grid21| 102133|polygon_grid21_102133|distributed | 1|192.168.12.205| 13000| 66625536|
select * from pg_dist_shard;
table_name |shardid|shard_name |citus_table_type|colocation_id|nodename |nodeport|shard_size|
--------------+-------+---------------------+----------------+-------------+--------------+--------+----------+
polygon_grid21| 102126|polygon_grid21_102126|distributed | 1|192.168.12.122| 13000| 66879488|
polygon_grid21| 102127|polygon_grid21_102127|distributed | 1|192.168.12.205| 13000| 66551808|
polygon_grid21| 102128|polygon_grid21_102128|distributed | 1|192.168.12.122| 13000| 66568192|
polygon_grid21| 102129|polygon_grid21_102129|distributed | 1|192.168.12.205| 13000| 66650112|
polygon_grid21| 102130|polygon_grid21_102130|distributed | 1|192.168.12.122| 13000| 66789376|
polygon_grid21| 102131|polygon_grid21_102131|distributed | 1|192.168.12.205| 13000| 66732032|
polygon_grid21| 102132|polygon_grid21_102132|distributed | 1|192.168.12.122| 13000| 66486272|
polygon_grid21| 102133|polygon_grid21_102133|distributed | 1|192.168.12.205| 13000| 66625536|
创建索引
create index gin_polygon_grid21 on polygon_grid21 using GIN(grids);
相交查询
select * from polygon_grid21 where grids && ST_GeoSOTGrid(st_makeenvelope(0, 0, 0.01, 0.01, 4490), 21);
删除一半数据,查询结果
delete from polygon_grid21 where id % 2 = 0;
select count(*) from polygon_grid21;
count |
------+
500000|
再次插入10w数据,查看结果
with a as (select (random()*170)::float x, (random()*80)::float y from generate_series(1, 100000))
insert into polygon_grid21(geom, grids) select st_makeenvelope(x, y, x+0.001, y+0.001, 4490), ST_GeoSOTGrid(st_makeenvelope(x, y, x+0.001, y+0.001, 4490), 21) from a;
select count(*) from polygon_grid21;
count |
------+
600000|
想要获取更加详细的资料,请访问 Citus官方文档 。