跳到主要内容

SQL Server

概述

要启用 Debezium SQL Server 连接器捕获数据库操作的变更事件记录,必须首先在 SQL Server 数据库上启用变更数据捕获。必须在数据库和要捕获的每个表上启用 CDC。在源数据库上设置 CDC 后,连接器就可以捕获数据库中发生的行级 INSERT、UPDATE 和 DELETE 操作。连接器会将每个源表的事件记录写入专门针对该表的 Kafka 主题。每个捕获的表都有一个主题。客户端应用程序读取它们所跟踪的数据库表的 Kafka 主题,并对从这些主题中获取的行级事件做出响应。

连接器首次连接到 SQL Server 数据库或群集时,会对配置为捕获更改的所有表的模式进行一致的快照,并将此状态流式传输到 Kafka。快照完成后,连接器会持续捕获随后发生的行级变更。通过首先建立所有数据的一致视图,连接器可以继续读取数据,而不会丢失快照期间发生的任何更改。

为 MSSQL 设置 CDC

在数据库和表上启用 CDC

MS SQL Server 提供了一些内置的存储过程来启用 CDC。

若要启用 CDC,首先需要具有必要权限(db_owner 或 sysadmin)的 SQL Server 管理员运行查询以在数据库级别启用 CDC。

USE {database name}
GO
EXEC sys.sp_cdc_enable_db
GO

然后,管理员必须为要捕获的每个表启用 CDC。下面是一个示例:

USE {database name}
GO

EXEC sys.sp_cdc_enable_table
@source_schema = N'{schema name}',
@source_name = N'{table name}',
@role_name = N'{role name}', [1]
@filegroup_name = N'{fiilegroup name}', [2]
@supports_net_changes = 0 [3]
GO
  • [1] 指定一个角色,该角色将获得对源表的捕获列的SELECT权限。建议在此处放置一个值,以便可以在下一步中使用此角色,但也可以将 @role name 的值设置为 NULL,仅允许 _sysadmin 和 db_owner 具有访问权限。确保用于连接到 Airbyte 中的源的凭据与此角色一致,以便 Airbyte 可以访问 cdc 表。
  • [2] 指定 SQL Server 放置更改表的文件组。建议为 CDC 创建单独的文件组,但可以省略此参数以使用默认文件组。
  • [3] 如果为 0,则仅生成用于查询所有更改的支持函数。如果为 1,则还会生成查询净更改所需的函数。如果 supports_net_changes 设置为 1,则必须指定 index_name,或者源表必须具有定义的主键。

有关参数的更多详细信息,请参阅此存储过程的 Microsoft 文档页。
如果有许多表要启用 CDC,并且希望避免必须为每个表逐个运行此查询,则此脚本可能会有所帮助!

有关详细信息,请参阅有关启用和禁用 CDC 的 Microsoft 文档

启用快照隔离

首次使用 CDC 运行同步时,Airbyte 会执行数据库的初始一致快照。为了避免获取表锁,Airbyte 使用快照隔离,允许其他数据库客户端同时写入。必须在数据库上启用此功能,如下所示:

ALTER DATABASE {database name}
SET ALLOW_SNAPSHOT_ISOLATION ON;

创建用户并授予适当的权限

我们建议不要使用 sysadmin 或 db_owner 凭据,而是创建一个具有相关 CDC 访问权限的新用户,以便与 Airbyte 一起使用。首先,让我们创建登录名和用户,并添加到db_datareader角色:

USE {database name};
CREATE LOGIN {user name}
WITH PASSWORD = '{password}';
CREATE USER {user name} FOR LOGIN {user name};
EXEC sp_addrolemember 'db_datareader', '{user name}';

将用户添加到之前在表上启用 cdc 时指定的角色:

EXEC sp_addrolemember '{role name}', '{user name}';

这应该足够了,但如果遇到问题,请尝试直接授予用户对 cdc 架构的SELECT访问权限(可选):

USE {database name};
GRANT SELECT ON SCHEMA :: [cdc] TO {user name};

如果可行,授予此用户“VIEW SERVER STATE”权限将允许检查 SQL Server 代理是否正在运行。这是首选,因为它可确保在源数据库中的代理未更新 CDC 表时同步将失败。

USE master;
GRANT VIEW SERVER STATE TO {user name};

延长CDC数据的保留期(可选)

在 SQL Server 中,默认情况下,更改表中仅保留三天的数据。除非您运行非常频繁的同步,否则我们建议增加此保留期,以便在同步失败或同步暂停时,仍有一些带宽可以从增量同步的最后一个点开始。

可以使用存储过程sys.sp_cdc_change_job更改这些设置,如下所示:

-- we recommend 14400 minutes (10 days) as retention period
EXEC sp_cdc_change_job @job_type='cleanup', @retention = {minutes}

进行此更改后,需要重新启动清理作业:

  EXEC sys.sp_cdc_stop_job @job_type = 'cleanup';

EXEC sys.sp_cdc_start_job @job_type = 'cleanup';

确保 SQL Server 代理正在运行

MSSQL使用SQL Server代理运行CDC所需的作业。因此,为了让CDC有效工作,代理必须正常运行。您可以检查SQL Server 代理的状态。
如下所示:

EXEC xp_servicecontrol 'QueryState', N'SQLServerAGENT';

如果您看到“正在运行”以外的内容,请关注 Microsoft 文档以启动服务。

注册 MSSQL 连接器

操作: 进入script目录编辑mssql-config-hhy.sh文件

mssql-config-hhy.sh
# SqlServer config
SOURCE_CONNECTOR_NAME="mssql-source-connector" # 连接器名称
MSSQL_HOST="192.168.0.108" # sqlserver数据库ip
MSSQL_PORT=1433 # 端口
MSSQL_USER="hhy" # 上面创建的CDC用户
MSSQL_PASSWORD="Aa123456.." # 密码
DATABASE_NAMES="hhy" # 复制的数据库
TOPIC_PREFIX="sqlserver" # 写入kafka中的topic前缀
# 排除的表,这些表将不会复制(可选)
EXCLUDE_TABLES="dbo.OHMM,dbo.f_splitStr,dbo.F_split"
# 排除的列,这些列将不会复制(可选)
EXCLUDE_COLUMNS="dbo.OFRM.FileContnt,dbo.IRD1.Template,dbo.OIRD.Definition,dbo.ASC6.SignData,dbo.CINF.DashConf,dbo.OPRO.BValue,dbo.OPCT.XMLFile,dbo.EBL1.DocContent,dbo.OULA.EULADoc,dbo.AWMG.XMLFile,dbo.OFBT.CONTENT,dbo.OEJB.XMLFile,dbo.OCPC.OLDContent,dbo.OCPC.DiExpoCont,dbo.SCL6.SignData,dbo.SRA2.ResDag,dbo.SRA2.ResPdf,dbo.SRA2.ResHtml,dbo.SRA2.ResXml,dbo.SRA2.ResLog,dbo.SRA2.ErrScreen,dbo.OADP.LogoImage,dbo.RDC1.Template,dbo.EJB2.FileStorag,dbo.OFUS.PROFPHOTO,dbo.OQRC.FileContnt,dbo.AUSR.UserPrefs,dbo.ACPN.Template,dbo.OCPN.Template,dbo.AADP.LogoImage,dbo.OTRX.Data,dbo.AINF.DashConf,dbo.RDOC.Template,dbo.OSRC.SysRptTemp,dbo.OSRC.CusRptTemp,dbo.OWMG.XMLFile,dbo.OHMM.InfoFile,dbo.OWPK.Content,dbo.OUSR.UserPrefs"

使用mssql-source-register.sh脚本发送请求到kafka注册连接器。执行命令:

./mssql-source-register.sh mssql-config-hhy.sh

查看mssql-source-register.sh脚本的内容我们可以看到脚本读取了我们配置文件中的变量,并且还有一些其他的默认参数配置,这些配置参数的作用可以查看debezium文档

mssql-source-register.sh
#!/bin/bash

set -x
if [ ! $1 ];then
echo "Please specify a configuration file."
exit
fi

source $1

echo "register SqlServer connector"
cat <<EOF | curl --request POST --url "http://127.0.0.1:8083/connectors" --header 'Content-Type: application/json' --data @-
{
"name": "${SOURCE_CONNECTOR_NAME}",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max": "1",

"database.hostname": "${MSSQL_HOST}",
"database.port": "${MSSQL_PORT}",
"database.user": "${MSSQL_USER}",
"database.password": "${MSSQL_PASSWORD}",

"topic.prefix": "${TOPIC_PREFIX}",
"database.names" : "${DATABASE_NAMES}",
"table.exclude.list" : "${EXCLUDE_TABLES}",
"column.exclude.list" : "${EXCLUDE_COLUMNS}",
"database.applicationIntent": "ReadOnly",
"snapshot.isolation.mode": "snapshot",
"schema.history.internal.kafka.bootstrap.servers" : "kafka:9092",
"schema.history.internal.kafka.topic": "${TOPIC_PREFIX}.schema-changes",
"database.encrypt": true,
"database.trustServerCertificate": true
}
}
EOF

注册 Clickhouse 连接器

该clickhouse连接器为sink连接器,负责读取kafka中的topic数据,并写入到clickhouse中。

操作: 进入script目录编辑clickhouse-sink-config.sh

clickhouse-sink-config.sh
# Clickhouse config
SINK_CONNECTOR_NAME="clickhouse-sink-connector" # 连接器名称
CLICKHOUSE_HOST="host" # clickhouse服务器ip
CLICKHOUSE_PORT=8123 # 端口号
CLICKHOUSE_USER="user" # 用户
CLICKHOUSE_PASSWORD="password" # 密码
CLICKHOUSE_DATABASE="database" # 目标数据库
TOPICS_REGEX="sqlserver.*" # 匹配该前缀的topic,将数据写入clickhouse

注册连接器

./clickhouse-sink-register.sh clickhouse-sink-config.sh

查看clickhouse-sink-register.sh脚本的内容。我们可以看到脚本读取了我们配置文件中的变量,并且还有一些其他的默认参数配置,这些配置参数的作用可以查看clickhouse-sink-connect文档

clickhouse-sink-register.sh
#!/bin/bash

set -x

if [ ! $1 ];then
echo "Please specify a configuration file."
exit
fi


source $1

echo "register clickhouse sink connector"
cat <<EOF | curl --request POST --url "http://127.0.0.1:18083/connectors" --header 'Content-Type: application/json' --data @-
{
"name": "${SINK_CONNECTOR_NAME}",
"config": {
"connector.class": "com.altinity.clickhouse.sink.connector.ClickHouseSinkConnector",
"tasks.max": "1",
"topics.regex": "${TOPICS_REGEX}",
"clickhouse.server.url": "${CLICKHOUSE_HOST}",
"clickhouse.server.user": "${CLICKHOUSE_USER}",
"clickhouse.server.password": "${CLICKHOUSE_PASSWORD}",
"clickhouse.server.database": "${CLICKHOUSE_DATABASE}",
"clickhouse.server.port": ${CLICKHOUSE_PORT},


"store.kafka.metadata": true,
"topic.creation.default.partitions": 6,

"store.raw.data": false,
"store.raw.data.column": "raw_data",

"metrics.enable": true,
"metrics.port": 8084,
"buffer.flush.time.ms": 500,
"thread.pool.size": 2,
"fetch.max.wait.ms": 1000,
"fetch.min.bytes": 52428800,
"sink.connector.max.queue.size": "10000"

"enable.kafka.offset": false,

"replacingmergetree.delete.column": "_sign",

"auto.create.tables": true,
"schema.evolution": false,

"deduplication.policy": "off"
}
}
EOF
注意

当在启动该连接器复制数据的过程中,内存跑满。或者报错Java虚拟机内存溢出,可以将sink.connector.max.queue.size配置调小些。 该配置参数配置了将数据从 kafka 流式传输到 ClickHouse 时应加载到内存中的最大记录数。