跳到主要内容

Postgresql

概述

第一次连接到 PostgreSQL 服务器时,连接器会拍摄所有架构的一致快照。该快照完成后,连接器会持续捕获插入、更新和删除数据库内容以及提交到 PostgreSQL 数据库的行级更改。连接器生成数据更改事件记录并将其流式传输到 Kafka 主题。对于每个表,默认行为是连接器将所有生成的事件流式传输到该表的单独 Kafka 主题。应用程序和服务使用来自该主题的数据更改事件记录。

配置postgresql

在注册连接器之前查看你的数据库中是否启动了Debizium所需的CDC相关配置,这里主要介绍postgres的配置。其他的可以查看官方文档,如Mysql设置

配置postgresql.conf文件

postgresql.conf
# MODULES
shared_preload_libraries = 'pgoutput' #使用官方的逻辑解码插件

# REPLICATION
wal_level = logical #Postgres 预写日志中使用的编码类型
max_wal_senders = 10 # 用于处理 WAL 更改的最大进程数,根据需要调整
max_replication_slots = 10 # 允许流式传输 WAL 更改的最大复制槽数,根据需要调整
wal_keep_size = 1024 # 根据需要调整,单位为MB

创建专用只读 Postgres 用户

提示

如果你使用的是postgresql超级用户进行复制,可以跳过下面的配置。从注册pg连接器开始。

创建一个专用的只读用户来复制数据。也可以使用数据库中的现有 Postgres 用户。

CREATE USER <user_name> PASSWORD 'your_password_here';

配置用户权限

设置 PostgreSQL 服务器以运行 Debezium 连接器需要一个可执行复制的数据库用户。复制只能由具有适当权限的数据库用户执行,且只能针对配置的主机执行。
虽然默认情况下,超级用户拥有必要的 REPLICATION 和 LOGIN 角色,但正如 “安全性 ”中所述,最好不要为 Debezium 复制用户提供更高的权限。而是,创建一个拥有最低权限的 Debezium 用户。

定义一个至少具有 REPLICATION 和 LOGIN 权限的 PostgreSQL 角色.

CREATE ROLE <name> REPLICATION LOGIN;

Debezium 要创建 PostgreSQL 出版物,必须以具有以下权限的用户身份运行:

  • 数据库中的复制权限,以便将表添加到出版物中。
  • 数据库中的 CREATE 权限,用于添加出版物。
  • 在表上复制初始表数据的 SELECT 权限。表所有者自动拥有表的 SELECT 权限。

要在出版物中添加表,用户必须是表的所有者。但由于源表已经存在,因此需要一种机制来与原始所有者共享所有权。要启用共享所有权,需要创建一个 PostgreSQL 复制组,然后将现有表所有者和复制用户添加到组中。

步骤

  1. 创建复制组。
    CREATE ROLE <replication_group>;
  2. 将表的原始所有者添加到组中。
    GRANT REPLICATION_GROUP TO <original_owner>; --REPLICATION_GROUP为你创建的复制组
  3. 将 Debezium 复制用户添加到组中。
    GRANT REPLICATION_GROUP TO <replication_user>;
  4. 将表的所有权转移到 <replication_group>
    ALTER TABLE <table_name> OWNER TO REPLICATION_GROUP;
    如果想要将所有表的所有权转移到 <replication_group>。(可选)
    DO $$
    DECLARE
    r RECORD;
    BEGIN
    FOR r IN (SELECT tablename FROM pg_tables WHERE schemaname = 'public')
    LOOP
    EXECUTE 'ALTER TABLE ' || quote_ident(r.tablename) || ' OWNER TO REPLICATION_GROUP;';
    END LOOP;
    END $$;

配置 PostgreSQL 以允许 Debezium 连接主机进行复制

要启用 Debezium 复制 PostgreSQL 数据,必须配置数据库以允许与运行 PostgreSQL 连接器的主机进行复制。要指定允许与数据库复制的客户端,请在基于 PostgreSQL 主机的身份验证文件 pg_hba.conf 中添加条目。

pg_hba.conf
host    replication     your_user  0.0.0.0/0                 trust   

上面配置表示允许从任何 IP 地址使用your_user用户进行复制连接。

注册pg连接器

这个pg连接器是一个source连接器,负责将postgresql中的变更数据读取到kafka中。

操作: 进入script目录编辑postgres-source-config.sh

postgres-source-config.sh
# Postgres config
SOURCE_CONNECTOR_NAME="postgres-source-connector"
POSTGRES_HOST="host" #postgres服务器ip
POSTGRES_PORT=5432 #端口号
POSTGRES_USER="user" #用户
POSTGRES_PASSWORD="password" #密码
DATABASE_DBNAME="database" #源数据库
TOPIC_PREFIX="postgres"

注册连接器

./postgres-source-register.sh postgres-source-config.sh

查看mssql-source-register.sh脚本的内容我们可以看到脚本读取了我们配置文件中的变量,并且还有一些其他的默认参数配置,这些配置参数的作用可以查看debezium文档

postgres-source-register.sh
#!/bin/bash

set -x
if [ ! $1 ];then
echo "Please specify a configuration file."
exit
fi

source $1

echo "register Postgres connector"
cat <<EOF | curl --request POST --url "http://127.0.0.1:8083/connectors" --header 'Content-Type: application/json' --data @-
{
"name": "${SOURCE_CONNECTOR_NAME}",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": 1,
"database.hostname": "${POSTGRES_HOST}",
"database.port": ${POSTGRES_PORT},
"database.user": "${POSTGRES_USER}",
"database.password": "${POSTGRES_PASSWORD}",
"topic.prefix": "${TOPIC_PREFIX}",
"database.dbname": "${DATABASE_DBNAME}",
"plugin.name": "pgoutput",
"publication.autocreate.mode": "all_tables",

"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "${TOPIC_PREFIX}.schema-changes"
}
}
EOF

注册clickhouse连接器

该clickhouse连接器为sink连接器,负责读取kafka中的topic数据,并写入到clickhouse中。


操作: 进入script目录编辑clickhouse-sink-config.sh

clickhouse-sink-config.sh
# Clickhouse config
SINK_CONNECTOR_NAME="clickhouse-sink-connector"
CLICKHOUSE_HOST="host" #postgres服务器ip
CLICKHOUSE_PORT=8123 #端口号
CLICKHOUSE_USER="user" #用户
CLICKHOUSE_PASSWORD="password" #密码
CLICKHOUSE_DATABASE="database" #目标数据库
TOPICS_REGEX="postgres.*" # 匹配该前缀的topic,将数据写入clickhouse

注册连接器

./clickhouse-sink-register.sh clickhouse-sink-config.sh

查看clickhouse-sink-register.sh脚本的内容。我们可以看到脚本读取了我们配置文件中的变量,并且还有一些其他的默认参数配置,这些配置参数的作用可以查看clickhouse-sink-connect文档

clickhouse-sink-register.sh
#!/bin/bash

set -x

if [ ! $1 ];then
echo "Please specify a configuration file."
exit
fi


source $1

echo "register clickhouse sink connector"
cat <<EOF | curl --request POST --url "http://127.0.0.1:18083/connectors" --header 'Content-Type: application/json' --data @-
{
"name": "${SINK_CONNECTOR_NAME}",
"config": {
"connector.class": "com.altinity.clickhouse.sink.connector.ClickHouseSinkConnector",
"tasks.max": "1",
"topics.regex": "${TOPICS_REGEX}",
"clickhouse.server.url": "${CLICKHOUSE_HOST}",
"clickhouse.server.user": "${CLICKHOUSE_USER}",
"clickhouse.server.password": "${CLICKHOUSE_PASSWORD}",
"clickhouse.server.database": "${CLICKHOUSE_DATABASE}",
"clickhouse.server.port": ${CLICKHOUSE_PORT},


"store.kafka.metadata": true,
"topic.creation.default.partitions": 6,

"store.raw.data": false,
"store.raw.data.column": "raw_data",

"metrics.enable": true,
"metrics.port": 8084,
"buffer.flush.time.ms": 500,
"thread.pool.size": 2,
"fetch.max.wait.ms": 1000,
"fetch.min.bytes": 52428800,
"sink.connector.max.queue.size": "10000"

"enable.kafka.offset": false,

"replacingmergetree.delete.column": "_sign",

"auto.create.tables": true,
"schema.evolution": false,

"deduplication.policy": "off"
}
}
EOF
注意

当在启动该连接器复制数据的过程中,内存跑满。或者报错Java虚拟机内存溢出,可以将sink.connector.max.queue.size配置调小些。 该配置参数配置了将数据从 kafka 流式传输到 ClickHouse 时应加载到内存中的最大记录数。