論理レプリケーションを使用して RDS for PostgreSQL のテーブルを複製する方法を教えてください。

所要時間4分
0

Amazon Relational Database Service (Amazon RDS) for PostgreSQL のデータベース間で、拡張機能を使用せずにテーブルをレプリケートしたいと思います。

解決方法

論理レプリケーションの一般的なユースケースは、2 つの Amazon RDS for PostgreSQL の DB インスタンス間で、一連のテーブルを複製 (レプリケート) することです。RDS for PostgreSQL は、PostgreSQL 10.4 以降に対する論理レプリケーションをサポートしています。Amazon Aurora PostgreSQL 互換エディションのバージョン 2.2.0 以降では、PostgreSQL 10.6 以降での論理レプリケーションがサポートされています。

ここに示す解決策では、RDS for PostgreSQL の論理レプリケーションを使用して、2 つのソーステーブルが 2 つのターゲットテーブルに複製されます。論理レプリケーションは、まず、ソーステーブルに既に存在するデータの初期ロードを行い、その後、進行中の変更内容をターゲットに反映させます。

論理レプリケーションを有効にする

RDS for PostgreSQL で論理レプリケーションを有効にするには、カスタムパラメータグループを変更して rds.logical_replication1 を設定し、さらに rds.logical_replication を DB インスタンスにアタッチします。DB インスタンスにアタッチ済みのカスタムパラメータグループの場合は、rds.logical_replication1 を設定して、そのパラメータグループを更新します。rds.logical_replication パラメータは静的パラメータなので、有効にするには DB インスタンスの再起動が必要です。DB インスタンスを再起動すると、 wal_level パラメータが logical に設定されます。

wal_levelrds.logical_replication の値を確認します。

postgres=> SELECT name,setting FROM pg_settings WHERE name IN ('wal_level','rds.logical_replication');
          name           | setting
-------------------------+---------
 rds.logical_replication | on
 wal_level               | logical
(2 rows)

ソース DB インスタンス内のソースデータベースに接続する

ソース側の RDS for PostgreSQL DB インスタンスにある、ソースデータベースに接続します。ソーステーブルを作成します。

source=> CREATE TABLE reptab1 (slno int primary key);
CREATE TABLE
source=> CREATE TABLE reptab2 (name varchar(20));
CREATE TABLE

ソーステーブルにデータを挿入します。

source=> INSERT INTO reptab1 VALUES (generate_series(1,1000));
INSERT 0 1000
source=> INSERT INTO reptab2 SELECT SUBSTR ('ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789',((random()*(36-1)+1)::integer),1) FROM generate_series(1,50);
INSERT 0 50

ソーステーブルのパブリケーション (公開) を作成します。

ソーステーブルのパブリケーション (公開) を作成します。作成したパブリケーションの詳細を、SELECT クエリを使用して確認します。

source=> CREATE PUBLICATION testpub FOR TABLE reptab1,reptab2;
CREATE PUBLICATION
source=> SELECT * FROM pg_publication;
  oid   | pubname | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate | pubviaroot
--------+---------+----------+--------------+-----------+-----------+-----------+-------------+------------
 115069 | testpub |    16395 | f            | t         | t         | t         | t           | f
(1 row)

ソーステーブルがパブリケーションに追加されていることを確認します。

source=> SELECT * FROM pg_publication_tables;
 pubname | schemaname | tablename
---------+------------+-----------
 testpub | public     | reptab1
 testpub | public     | reptab2
(2 rows)

注: データベース内の、すべてのテーブルをレプリケートするには、次のコマンドを実行します。

CREATE PUBLICATION testpub FOR ALL TABLES;

ターゲットデータベースに接続してターゲットテーブルを作成する

ターゲット DB インスタンス内のターゲットデータベースに接続します。ソーステーブルと同じ名前でターゲットテーブルを作成します。ターゲットテーブルに対し SELECT クエリを実行して、そのテーブル内にデータがないことを確認します。

target=> CREATE TABLE reptab1 (slno int primary key);
CREATE TABLE
target=> CREATE TABLE reptab2 (name varchar(20));
CREATE TABLE
target=> SELECT count(*) FROM reptab1;
 count
-------
     0
(1 row)
target=> SELECT count(*) FROM reptab2;
 count
-------
     0
(1 row)

ターゲットデータベースでサブスクリプションを作成し検証する

ターゲットデータベース内にサブスクリプションを作成します。SELECT クエリを使用して、サブスクリプションが有効化されているかどうかを確認します。

target=> CREATE SUBSCRIPTION testsub CONNECTION 'host=<source RDS/host endpoint> port=5432 dbname=<source_db_name> user=<user> password=<password>' PUBLICATION testpub;
NOTICE:  Created replication slot "testsub" on publisher
CREATE SUBSCRIPTION
target=> SELECT oid,subname,subenabled,subslotname,subpublications FROM pg_subscription;
  oid  | subname | subenabled | subslotname | subpublications
-------+---------+------------+-------------+-----------------
 16434 | testsub | t          | testsub     | {testpub}
(1 row)

重要: ユーザー名とパスワードが、プレーンテキストの形式でデータベースのログに保存されることを防ぐには、サブスクリプションを作成する前に、次のコマンドを実行します。

target=> SET log_min_messages to 'PANIC';
SET
target=> SET log_statement to NONE;
SET

作成されたサブスクリプションは、ソーステーブルに存在するすべてのデータをターゲットテーブルにロードします。ターゲットテーブルで SELECT クエリを実行して、初期データのロードを確認します。

target=> SELECT count(*) FROM reptab1;
 count
-------
  1000
(1 row)
target=> SELECT count(*) FROM reptab2;
 count
-------
    50
(1 row)

ソースデータベースのレプリケーションスロットを検証する

ターゲットデータベースでサブスクリプションを作成すると、ソースデータベースにレプリケーションスロットが作成されます。ソースデータベース上で次の SELECT クエリを実行し、レプリケーションスロットの詳細を確認します。

source=> SELECT * FROM pg_replication_slots;
 slot_name |  plugin  | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size
 ----------+----------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+---------------
 testsub   | pgoutput | logical   | 115048 | source   | f         | t      |        846 |      |         6945 | 58/B4000568 | 58/B40005A0         | reserved   |
(1 row)

ソーステーブルからのレプリケーションのテスト

ソーステーブルに行を挿入して、ソーステーブルのデータに対する変更が、ターゲットテーブルに複製されるかをテストします。

source=> INSERT INTO reptab1 VALUES(generate_series(1001,2000));
INSERT 0 1000
source=> INSERT INTO reptab2 SELECT SUBSTR('ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789',((random()*(36-1)+1)::integer),1) FROM generate_series(1,50);
INSERT 0 50
source=> SELECT count(*) FROM reptab1;
 count
-------
  2000
(1 row)
source=> SELECT count(*) FROM reptab2; count
-------
   100
(1 row)

ターゲットテーブルの行数を確認してレプリケーションを検証する

ターゲットテーブルの行数を確認して、新しく挿入された行が、ターゲットテーブルに複製されていることを確認します。

target=> SELECT count(*) FROM reptab1;
 count
-------
  2000
(1 row)
target=> SELECT count(*) FROM reptab2;
 count
-------
   100
(1 row)

論理レプリケーションをクリーンアップし無効化する

レプリケーションが完了し不要になった論理レプリケーションは、クリーンアップした上で無効化します。非アクティブなレプリケーションスロットは、ソース DB インスタンスに WAL ファイルを蓄積します。WAL ファイルがストレージを使い果たすことで、システム停止を引き起こす可能性があります。

ターゲットデータベース上のサブスクリプションを削除します。

target=> DROP SUBSCRIPTION testsub;
NOTICE:  Dropped replication slot "testsub" on publisher
DROP SUBSCRIPTION
target=> SELECT * FROM pg_subscription;
oid | subdbid | subname | subowner | subenabled | subconninfo | subslotname | subsynccommit | subpublications
----+---------+---------+----------+------------+-------------+-------------+---------------+-----------------
(0 rows)

注: サブスクリプションを削除すると、そのサブスクリプションにより作成された、レプリケーションスロットも削除されます。

ソースデータベースに対し次の SELECT クエリ文を実行して、レプリケーションスロットがソースから削除されていることを確認します。

source=> SELECT * FROM pg_replication_slots;
slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size
----------+--------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+--------------
(0 rows)

パブリケーションを削除します。パブリケーションが正常に削除されたことを確認します。

source=> DROP PUBLICATION testpub;
DROP PUBLICATION
source=> SELECT * FROM pg_publication;
 oid | pubname | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate | pubviaroot
-----+---------+----------+--------------+-----------+-----------+-----------+-------------+------------
(0 rows)
source=> SELECT * FROM pg_publication_tables;
 pubname | schemaname | tablename
---------+------------+-----------
(0 rows)

DB インスタンスにアタッチされているカスタムパラメータグループで、rds.logical_replication0 に変更します。論理レプリケーションを使用していない DB インスタンスの場合は、必要に応じてそのインスタンスを再起動します。

使用状況に基づいて、max_replication_slotsmax_wal_sendersmax_logical_replication_workersmax_worker_processes、および max_sync_workers_per_subscription を確認します。

注: 次のコマンドは、非アクティブなレプリケーションスロットがあるかどうかを確認し、それぞれのスロットのサイズを特定した上で、必要に応じてそのスロットを削除します。

SELECT slot_name, pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(),restart_lsn)) AS replicationSlotLag, active FROM pg_replication_slots ;
SELECT pg_drop_replication_slot('Your_slotname_name');

関連情報

レプリケーション に関する PostgreSQL のドキュメント

AWS公式
AWS公式更新しました 2年前
コメントはありません