Postgresql
概述
第一次连接到 PostgreSQL 服务器时,连接器会拍摄所有架构的一致快照。该快照完成后,连接器会持续捕获插入、更新和删除数据库内容以及提交到 PostgreSQL 数据库的行级更改。连接器生成数据更改事件记录并将其流式传输到 Kafka 主题 。对于每个表,默认行为是连接器将所有生成的事件流式传输到该表的单独 Kafka 主题。应用程序和服务使用来自该主题的数据更改事件记录。
配置postgresql
在注册连接器之前查看你的数据库中是否启动了Debizium所需的CDC相关配置,这里主要介绍postgres的配置。其他的可以查看官方文档,如Mysql设置。
配置postgresql.conf文件
# MODULES
shared_preload_libraries = 'pgoutput' #使用官方的逻辑解码插件
# REPLICATION
wal_level = logical #Postgres 预写日志中使用的编码类型
max_wal_senders = 10 # 用于处理 WAL 更改的最大进程数,根据需要调整
max_replication_slots = 10 # 允许流式传输 WAL 更改的最大复制槽数,根据需要调整
wal_keep_size = 1024 # 根据需要调整,单位为MB
创建专用只读 Postgres 用户
如果你使用的是postgresql超级用户进行复制,可以跳过下面的配置。从注册pg连接器开始。
创建一个专用的只读用户来复制数据。也可以使用数据库中的现有 Postgres 用户。
CREATE USER <user_name> PASSWORD 'your_password_here';
配置用户权限
设置 PostgreSQL 服务器以运行 Debezium 连接器需要一个可执行复制的数据库用户。复制只能由具有适当权限的数据库用户执行,且只能针对配置的主机执行。
虽然默认情况下,超级用户拥有必要的 REPLICATION 和 LOGIN 角色,但正如 “安全性 ”中所述,最好不要为 Debezium 复制用户提供更高的权限。而是,创建一个拥有最低权限的 Debezium 用户。
定义一个至少具有 REPLICATION 和 LOGIN 权限的 PostgreSQL 角色.
CREATE ROLE <name> REPLICATION LOGIN;
Debezium 要创建 PostgreSQL 出版物,必须以具有以下权限的用户身份运行:
- 数据库中的复制权限,以便将表添加到出版物中。
- 数据库中的 CREATE 权限,用于添加出版物。
- 在表上复制初始表数据的 SELECT 权限。表所有者自动拥有表的 SELECT 权限。
要在出版物中添加表,用户必须是表的所有者。但由于源表已经存在,因此需要一种机制来与原始所有者共享所有权。要启用共享所有权,需要创建一个 PostgreSQL 复制组,然后将现有表所有者和复制用户添加到组中。
步骤
- 创建复制组。
CREATE ROLE <replication_group>;
- 将表的原始所有者添加到组中。
GRANT REPLICATION_GROUP TO <original_owner>; --REPLICATION_GROUP为你创建的复制组
- 将 Debezium 复制用户添加到组中。
GRANT REPLICATION_GROUP TO <replication_user>;
- 将表的所有权转移到
<replication_group>
。如果想要将所有表的所有权转移到ALTER TABLE <table_name> OWNER TO REPLICATION_GROUP;
<replication_group>
。(可选)DO $$
DECLARE
r RECORD;
BEGIN
FOR r IN (SELECT tablename FROM pg_tables WHERE schemaname = 'public')
LOOP
EXECUTE 'ALTER TABLE ' || quote_ident(r.tablename) || ' OWNER TO REPLICATION_GROUP;';
END LOOP;
END $$;
配置 PostgreSQL 以允许 Debezium 连接主机进行复制
要启用 Debezium 复制 PostgreSQL 数据,必须配置数据库以允许与运行 PostgreSQL 连接器的主机进行复制。要指定允许与数据库复制的客户端,请在基于 PostgreSQL 主机的身份验证文件 pg_hba.conf
中添加条目。
host replication your_user 0.0.0.0/0 trust
上面配置表示允许从任何 IP 地址使用your_user
用户进行复制连接。
注册pg连接器
这个pg连接器是一个source连接器,负责将postgresql中的变更数据读取到kafka中。
操作: 进入script目录编辑postgres-source-config.sh
# Postgres config
SOURCE_CONNECTOR_NAME="postgres-source-connector"
POSTGRES_HOST="host" #postgres服务器ip
POSTGRES_PORT=5432 #端口号
POSTGRES_USER="user" #用户
POSTGRES_PASSWORD="password" #密码
DATABASE_DBNAME="database" #源数据库
TOPIC_PREFIX="postgres"
注册连接器
./postgres-source-register.sh postgres-source-config.sh
查看mssql-source-register.sh脚本的内容我们可以看到脚本读取了我们配置文件中的变量,并且还有一些其他的默认参数配置,这些配置参数的作用可以查看debezium文档。
#!/bin/bash
set -x
if [ ! $1 ];then
echo "Please specify a configuration file."
exit
fi
source $1
echo "register Postgres connector"
cat <<EOF | curl --request POST --url "http://127.0.0.1:8083/connectors" --header 'Content-Type: application/json' --data @-
{
"name": "${SOURCE_CONNECTOR_NAME}",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": 1,
"database.hostname": "${POSTGRES_HOST}",
"database.port": ${POSTGRES_PORT},
"database.user": "${POSTGRES_USER}",
"database.password": "${POSTGRES_PASSWORD}",
"topic.prefix": "${TOPIC_PREFIX}",
"database.dbname": "${DATABASE_DBNAME}",
"plugin.name": "pgoutput",
"publication.autocreate.mode": "all_tables",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "${TOPIC_PREFIX}.schema-changes"
}
}
EOF
注册clickhouse连接器
该clickhouse连接器为sink连接器,负责读取kafka中的topic数据,并写入到clickhouse中。
操作: 进入script目录编辑clickhouse-sink-config.sh
# Clickhouse config
SINK_CONNECTOR_NAME="clickhouse-sink-connector"
CLICKHOUSE_HOST="host" #postgres服务器ip
CLICKHOUSE_PORT=8123 #端口号
CLICKHOUSE_USER="user" #用户
CLICKHOUSE_PASSWORD="password" #密码
CLICKHOUSE_DATABASE="database" #目标数据库
TOPICS_REGEX="postgres.*" # 匹配该前缀的topic,将数据写入clickhouse
注册连接器
./clickhouse-sink-register.sh clickhouse-sink-config.sh
查看clickhouse-sink-register.sh脚本的内容。我们可以看到脚本读取了我们配置文件中的变量,并且还有一些其他的默认参数配置,这些配置参数的作用可以查看clickhouse-sink-connect文档
#!/bin/bash
set -x
if [ ! $1 ];then
echo "Please specify a configuration file."
exit
fi
source $1
echo "register clickhouse sink connector"
cat <<EOF | curl --request POST --url "http://127.0.0.1:18083/connectors" --header 'Content-Type: application/json' --data @-
{
"name": "${SINK_CONNECTOR_NAME}",
"config": {
"connector.class": "com.altinity.clickhouse.sink.connector.ClickHouseSinkConnector",
"tasks.max": "1",
"topics.regex": "${TOPICS_REGEX}",
"clickhouse.server.url": "${CLICKHOUSE_HOST}",
"clickhouse.server.user": "${CLICKHOUSE_USER}",
"clickhouse.server.password": "${CLICKHOUSE_PASSWORD}",
"clickhouse.server.database": "${CLICKHOUSE_DATABASE}",
"clickhouse.server.port": ${CLICKHOUSE_PORT},
"store.kafka.metadata": true,
"topic.creation.default.partitions": 6,
"store.raw.data": false,
"store.raw.data.column": "raw_data",
"metrics.enable": true,
"metrics.port": 8084,
"buffer.flush.time.ms": 500,
"thread.pool.size": 2,
"fetch.max.wait.ms": 1000,
"fetch.min.bytes": 52428800,
"sink.connector.max.queue.size": "10000"
"enable.kafka.offset": false,
"replacingmergetree.delete.column": "_sign",
"auto.create.tables": true,
"schema.evolution": false,
"deduplication.policy": "off"
}
}
EOF
当在启动该连接器复制数据的过程中,内存跑满。或者报错Java虚拟机内存溢出,可以将sink.connector.max.queue.size
配置调小些。
该配置参数配置了将数据从 kafka 流式传输到 ClickHouse 时应加载到内存中的最大记录数。