.. _citusintroduce_label: Citus分布式扩展使用说明 ===================================== 简介 ^^^^^^^^^^^^ Citus 是一个 PostgreSQL 扩展,可将 PostgreSQL 转换为分布式数据库,因此您可以在任何规模上实现高性能。 Citus使用分片和复制在多台机器上横向扩展PostgreSQL。它的查询引擎在这些服务器上执行SQL进行并行化查询,以便在大型数据集上实现实时的响应。 Citus集群由一个中心的协调节点(CN)和若干个工作节点(Worker)构成。 - coordinate:协调节点,存储元数据,不存实际数据。该节点直接对用户开放,等于一个客户端。 - worker:工作节点,不存储元数据,存储实际数据,执行协调节点发来的查询请求。 安装部署 ^^^^^^^^^ - Yukon安装包里带有Citus所需的相关文件,安装完Yukon后,您只需执行下面的节点配置步骤即可。 - 或者从 `Citus源码库 `__ 里下载源码,解压文件,进入citus目录,导出 ``pgconfig`` 文件目录, 如:``export PG_CONFIG=/opt/pg13/bin/pg_config``,运行 ``configure`` 配置文件, ``./configure`` ,如果出错提示缺少部分库文件,可以直接安装对应的库,如:``yum install -y libzstd-devel lz4-devel``,或者使用 ``without`` 选项,屏蔽该需求 编译安装 .. code:: bash make -j && make install 单节点: ~~~~~~~~~~~ 请将以下内容添加到:``postgresql.conf`` .. code:: text shared_preload_libraries = 'citus' 重启PostgreSQL后,登录数据库创建扩展 .. code:: create extension citus; 多节点: ~~~~~~~~~~~ 如果要设置多节点集群,使用 Citus 扩展配置其他 PostgreSQL 节点,并添加它们以形成 Citus 集群: 编辑 ``postgresql.conf`` 配置文件,设置 ``listen_addresses = '*'`` ,监听地址改为全部 请将以下内容添加到 ``pg_hba.conf`` 配置文件中的ipv4配置下 .. code:: host all all 0.0.0.0/0 trust 重启PostgreSQL使配置生效 登录数据库,在协调节点上添加其他工作节点的ip地址, 数据库端口号 .. code:: select * from master_add_node('192.168.xxx.xxx', 5432); 查询添加成功的节点,如果添加成功,则会显示已添加的节点信息 .. code:: select * from master_get_active_worker_nodes(); node_name node_port ------------------------+ 192.168.xxx.xxx 5432 .. note:: 集群需要在每个节点上同样部署Postgresql数据库 + Citus扩展,所用数据库名、默认用户名保持一致 ,数据库都需要创建Citus扩展以及其他所需要的扩展,确保各节点之间可通信,选择一台机器作为协调节点,其他机器作为工作节点 (已经存在的表,想创建分布式表,首先在另外的机器上创建同名数据库,创建扩展) 使用说明 ^^^^^^^^^^^^^^ **create_distributed_table()** 定义分布式表并创建其分片,如果未指定分发方法,则该函数默认为“哈希”分布 ========================= ============================================= 参数名称 描述 ========================= ============================================= table_name 需要分发的表的名称 distribution_column 要将表分布到的列 distribution_type(可选) 表的分布方法,允许的值为追加或哈希,默认为“哈希” ========================= ============================================= **alter_distributed_table()** 更改分布式表的分布列、分片计数或共置属性 ============================ ================================================================================================================ 参数名称 描述 ============================ ================================================================================================================ table_name 将要更改的分布式表的名称 distribution_column(可选) 新分发列的名称 shard_count(可选) 新分片计数 colocate_with(可选) 当前分布式表将与之共置的表。可能的值为 ,用于启动新的共置组,或用于共置的另一个表的名称 cascade_to_colocated(可选) 当此参数设置为“true”时,更改也将应用于以前与该表共置的所有表,并且将保留共置。如果它是“false”,则此表的当前共置将被破坏 ============================ ================================================================================================================ **master_add_node()** 添加工作节点 =============== ======== 参数名称 描述 =============== ======== ip IP地址 port 端口号 =============== ======== **undistribute_table()** 撤消create_distributed_table或create_reference_table的操作 ================================ ======================================================================================================================== 参数名称 描述 ================================ ======================================================================================================================== table_name 要取消分发的分布式表或引用表的名称 cascade_via_foreign_keys(可选) 当此参数设置为“true”时,undistribute_table还会通过外键取消分布与table_name相关的所有表。请谨慎使用此参数,因为它可能会影响许多表 ================================ ======================================================================================================================== 扩容 ^^^^^^ 对于新添加的节点,Citus不会自动移动数据到新节点上,因此我们可以手动平移数据到新的节点上,达到扩容减压的目的,目前有以下两个方法可以实现 - 可以手动复制某工作节点的表到新节点,然后修改 pg_dist_placement 表的表名和grids值,删除原工作节点上该表,此时总表数不变,完成扩容减压。 创建一张分布式表,以哈希值进行分发 .. code:: create table t1(id serial primary key, geom geometry); set citus.shard_count = 6; select create_distributed_table('t1', 'id', 'hash'); with a as (select id ,(random()*170)::float x, (random()*80)::float y from generate_series(1, 600) t(id)) insert into t1(geom) select st_makeenvelope(x, y, x+0.001, y+0.001, 4490) from a; 首先,查看每个节点对应的groupid值 .. code:: select * from pg_dist_node; nodeid|groupid|nodename |nodeport|noderack|hasmetadata|isactive|noderole|nodecluster|metadatasynced|shouldhaveshards| ------+-------+--------------+--------+--------+-----------+--------+--------+-----------+--------------+----------------+ 2| 2|192.168.12.205| 13000|default |false |true |primary |default |false |true | 1| 1|192.168.12.122| 13000|default |false |true |primary |default |false |true | 查看每一张表的shardid值 .. code:: select * from citus_shards; table_name|shardid|shard_name|citus_table_type|colocation_id|nodename |nodeport|shard_size| ----------+-------+----------+----------------+-------------+--------------+--------+----------+ t1 | 102161|t1_102161 |distributed | 8|192.168.12.122| 13000| 24576| t1 | 102162|t1_102162 |distributed | 8|192.168.12.205| 13000| 16384| t1 | 102163|t1_102163 |distributed | 8|192.168.12.122| 13000| 24576| t1 | 102164|t1_102164 |distributed | 8|192.168.12.205| 13000| 16384| t1 | 102165|t1_102165 |distributed | 8|192.168.12.122| 13000| 16384| t1 | 102166|t1_102166 |distributed | 8|192.168.12.205| 13000| 16384| 查看每张分片表分布在哪一个节点上 .. code:: select * from pg_dist_placement where shardid in (select shardid from pg_dist_shard where logicalrelid='tb1'::regclass); placementid|shardid|shardstate|shardlength|groupid| -----------+-------+----------+-----------+-------+ 154| 102161| 1| 0| 1| 155| 102162| 1| 0| 2| 156| 102163| 1| 0| 1| 157| 102164| 1| 0| 2| 158| 102165| 1| 0| 1| 159| 102166| 1| 0| 2| 在协调节点上加入新的worker .. code:: select * from master_add_node('192.168.12.201', 5432); 检查pg_dist_node元数据表,新的worker节点的groupid为5 .. code:: nodeid|groupid|nodename |nodeport|noderack|hasmetadata|isactive|noderole|nodecluster|metadatasynced|shouldhaveshards| ------+-------+--------------+--------+--------+-----------+--------+--------+-----------+--------------+----------------+ 2| 2|192.168.12.205| 13000|default |false |true |primary |default |false |true | 1| 1|192.168.12.122| 13000|default |false |true |primary |default |false |true | 5| 5|192.168.12.201| 5432|default |false |true |primary |default |false |true | 复制分片,现在1、2号节点上各3片,为了保持均衡,我们可以移动部分分片到5号节点上 下面移动1号节点上的分片t1_102161到5号节点上: 在1号节点上创建PUBLICATION(此步操作是在1号工作节点上执行,不是在协调节点上) .. code:: create PUBLICATION pub_shard for table tb1_102046; 在3号节点上创建分片表和SUBSCRIPTION(此步操作是在3号工作节点上执行,不是在协调节点上,表名、表结构和要复制的表保持一致) .. code:: create table t1_102161(id int primary key, geom geometry); CREATE SUBSCRIPTION sub_shard CONNECTION 'host=192.168.12.122, port=13000, dbname=testcitus' PUBLICATION pub_shard; 在协调节点上锁表,防止数据被修改 .. code:: begin; lock table tb1 IN EXCLUSIVE MODE; 等待3号节点上的表数据和1号节点上的表数据完全同步,解锁表,修改元数据表 .. code:: end; update pg_dist_placement set groupid=5 where shardid=102161 and groupid=1; 在1号节点上删除分片表和PUBLICATION .. code:: DROP PUBLICATION pub_shard; drop table tb1_102046; 在5号节点上删除SUBSCRIPTION .. code:: DROP SUBSCRIPTION sub_shard; 扩容完成 - alter_distributed_table函数,可以自动将已有的分片表再次重新平均分配到每个节点,如2个节点共2张表,添加新节点后,修改分片表数为3,则分布情况为3个节点共3张表,也可以达到扩容减压的目的。 使用案例 ^^^^^^^^ 首先根据集群环境设置分表数,分表时会平均的分配到每个工作节点上,如count=32,工作节点有两个,则会在每台工作节点上创建16张表,在我们的测试中,使用GIN索引查询网格编码时,每个工作节点的表数不超过cpu逻辑核数的一半时效果最佳。 案例环境为 3台 8核x86 机器,组合为 1cn + 2worker 的集群。 .. code:: set citus.shard_count = 8; **创建表** .. code:: create table polygon_grid21(id serial primary key, geom geometry, grids geosotgrid[]); **按id列上的哈希值对源表进行分发** .. code:: select create_distributed_table('polygon_grid21', 'id'); **往表 polygon_grid21 里插入数据** .. code:: with a as (select (random()*170)::float x, (random()*80)::float y from generate_series(1, 1000000)) insert into polygon_grid21(geom, grids) select st_makeenvelope(x, y, x+0.001, y+0.001, 4490), ST_GeoSOTGrid(st_makeenvelope(x, y, x+0.001, y+0.001, 4490), 21) from a; **查看分表情况、哈希分布范围** .. code:: select * from citus_shards; table_name |shardid|shard_name |citus_table_type|colocation_id|nodename |nodeport|shard_size| --------------+-------+---------------------+----------------+-------------+--------------+--------+----------+ polygon_grid21| 102126|polygon_grid21_102126|distributed | 1|192.168.12.122| 13000| 66879488| polygon_grid21| 102127|polygon_grid21_102127|distributed | 1|192.168.12.205| 13000| 66551808| polygon_grid21| 102128|polygon_grid21_102128|distributed | 1|192.168.12.122| 13000| 66568192| polygon_grid21| 102129|polygon_grid21_102129|distributed | 1|192.168.12.205| 13000| 66650112| polygon_grid21| 102130|polygon_grid21_102130|distributed | 1|192.168.12.122| 13000| 66789376| polygon_grid21| 102131|polygon_grid21_102131|distributed | 1|192.168.12.205| 13000| 66732032| polygon_grid21| 102132|polygon_grid21_102132|distributed | 1|192.168.12.122| 13000| 66486272| polygon_grid21| 102133|polygon_grid21_102133|distributed | 1|192.168.12.205| 13000| 66625536| select * from pg_dist_shard; table_name |shardid|shard_name |citus_table_type|colocation_id|nodename |nodeport|shard_size| --------------+-------+---------------------+----------------+-------------+--------------+--------+----------+ polygon_grid21| 102126|polygon_grid21_102126|distributed | 1|192.168.12.122| 13000| 66879488| polygon_grid21| 102127|polygon_grid21_102127|distributed | 1|192.168.12.205| 13000| 66551808| polygon_grid21| 102128|polygon_grid21_102128|distributed | 1|192.168.12.122| 13000| 66568192| polygon_grid21| 102129|polygon_grid21_102129|distributed | 1|192.168.12.205| 13000| 66650112| polygon_grid21| 102130|polygon_grid21_102130|distributed | 1|192.168.12.122| 13000| 66789376| polygon_grid21| 102131|polygon_grid21_102131|distributed | 1|192.168.12.205| 13000| 66732032| polygon_grid21| 102132|polygon_grid21_102132|distributed | 1|192.168.12.122| 13000| 66486272| polygon_grid21| 102133|polygon_grid21_102133|distributed | 1|192.168.12.205| 13000| 66625536| **创建索引** .. code:: create index gin_polygon_grid21 on polygon_grid21 using GIN(grids); **相交查询** .. code:: select * from polygon_grid21 where grids && ST_GeoSOTGrid(st_makeenvelope(0, 0, 0.01, 0.01, 4490), 21); **删除一半数据,查询结果** .. code:: delete from polygon_grid21 where id % 2 = 0; select count(*) from polygon_grid21; count | ------+ 500000| **再次插入10w数据,查看结果** .. code:: with a as (select (random()*170)::float x, (random()*80)::float y from generate_series(1, 100000)) insert into polygon_grid21(geom, grids) select st_makeenvelope(x, y, x+0.001, y+0.001, 4490), ST_GeoSOTGrid(st_makeenvelope(x, y, x+0.001, y+0.001, 4490), 21) from a; select count(*) from polygon_grid21; count | ------+ 600000| 想要获取更加详细的资料,请访问 `Citus官方文档 `__ 。