1 源库开启CDC步骤
创建用户
-- 1 创建登录用户
create login danan with password='Tdy123456...', default_database=test;
-- 2 创建数据库用户
create user danan for login danan with default_schema=dbo;
-- 3 赋予权限
exec sp_addrolemember 'db_owner', 'danan';
开启SqlServer代理模式
开启库级别CDC
EXEC sys.sp_cdc_enable_db
开启表级别CDC
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N'student',
@role_name = NULL,
-- 文件组要存在,否则不设置
@filegroup_name = N'TEST_FN',
@supports_net_changes = 0
注:
1.生成的cdc任务名称,默认是"schema名_table名"
2.设置监控的字段列表:@captured_column_list = N'id,name'
3.设置监控的实例名称:@capture_instance='dbo_teachers_1'
4.每张表最多有两个CDC监控实例
-- 禁用表cdc
EXEC sys.sp_cdc_disable_table
@source_schema = N'dbo',
@source_name = N'student',
@capture_instance = N'dbo_student'
确定CDC步骤
USE MyDB;
GO
EXEC sys.sp_cdc_help_change_data_capture
GO
2 FlinkCDC采集
2.1 依赖
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-sqlserver-cdc</artifactId>
<version>1.17.2</version>
</dependency>
2.2 代码
public class SqlServerTest {
public static void main(String[] args) throws Exception {
JdbcIncrementalSource<String> incrSource =
SqlServerSourceBuilder.SqlServerIncrementalSource.<String>builder()
.hostname("192.168.31.69")
.port(1433)
.databaseList("cdc_test")
.tableList("dbo.teachers")
.username("sa")
.password("123456")
.startupOptions(StartupOptions.initial())
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(3000);
env
.fromSource(incrSource, WatermarkStrategy.noWatermarks(),"test")
.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
env.execute();
}
}
3 备库采集验证
序号 | 备份模式 | 子模式 | 验证结果 |
|---|
1 | 镜像备份 | \ | 未测试 |
2 | 发布/订阅模式 | 快照发布 | 测试失败,备份库开启CDC任务一段时间后,CDC任务会被杀死,无法采集到备份库增量数据 |
3 | | 事务发布 | 测试成功 |
4 | | 对等/合并发布 | 对主数据库有影响,暂不考虑 |
5 | AlwaysOn模式 | \ | 未测试 |
4 FlinkCDC监控表结构变化验证
4.1 FlinkCDC设置参数
JdbcIncrementalSource<String> incrSource =
SqlServerSourceBuilder.SqlServerIncrementalSource.<String>builder()
.hostname("192.168.31.69")
.port(1433)
.databaseList("cdc_test")
.tableList("dbo.teachers")
.username("sa")
.password("123456")
.startupOptions(StartupOptions.initial())
.deserializer(new JsonDebeziumDeserializationSchema())
// 开启捕获表结构变更
.includeSchemaChanges(true)
.build();
4.2 验证场景
序号 | 场景说明 | 现象 | 验证结果 |
|---|
1 | 1. 源表新增字段 2. FlinkCDC任务不动 | 获取不到新增字段的数据 | (×)无法监控 |
2 | 1. 源表新增字段 2. FlinkCDC任务重启 | 1. 可以获取新增字段的历史数据 2. 无法获取新增字段的增量数据 | (×)无法监控 |
3 | 1. 源表新增字段 2. 重启源表CDC任务 3. FlinkCDC任务不动 | 1. 可以获取到新增字段的增量数据 2. 任务一直报错重启
java 一月 31, 2024 11:32:13 上午 com.microsoft.sqlserver.jdbc.TDSChannel enableSSL 警告: TLSv1 was negotiated. Please update server and client to use TLSv1.2 at minimum. 一月 31, 2024 11:32:13 上午 com.microsoft.sqlserver.jdbc.TDSChannel enableSSL 警告: TLSv1 was negotiated. Please update server and client to use TLSv1.2 at minimum. 11:32:13,412 ERROR io.debezium.pipeline.ErrorHandler [debezium-reader-0] - Producer failure com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.DataException: file is not a valid field name at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254) at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.getCheckType(Struct.java:261) at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.getString(Struct.java:158) at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher$SchemaChangeEventReceiver.schemaChangeRecordValue(JdbcSourceEventDispatcher.java:193) at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher$SchemaChangeEventReceiver.schemaChangeEvent(JdbcSourceEventDispatcher.java:223) at io.debezium.connector.sqlserver.SqlServerSchemaChangeEventEmitter.emitSchemaChangeEvent(SqlServerSchemaChangeEventEmitter.java:47) at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher.dispatchSchemaChangeEvent(JdbcSourceEventDispatcher.java:147) at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher.dispatchSchemaChangeEvent(JdbcSourceEventDispatcher.java:62) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.migrateTable(SqlServerStreamingChangeEventSource.java:491) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.lambda$executeIteration$1(SqlServerStreamingChangeEventSource.java:381) at io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:606) at io.debezium.connector.sqlserver.SqlServerConnection.getChangesForTables(SqlServerConnection.java:329) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.executeIteration(SqlServerStreamingChangeEventSource.java:250) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:138) at com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask$StreamSplitReadTask.execute(SqlServerStreamFetchTask.java:161) at com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask.execute(SqlServerStreamFetchTask.java:69) at com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:89) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 11:32:13,637 ERROR org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [Source Data Fetcher for Source: test (1/12)#93] - Received uncaught exception. java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped. at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:50) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.executeIteration(SqlServerStreamingChangeEventSource.java:459) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:138) at com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask$StreamSplitReadTask.execute(SqlServerStreamFetchTask.java:161) at com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask.execute(SqlServerStreamFetchTask.java:69) at com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:89) ... 5 more Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.DataException: file is not a valid field name at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254) at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.getCheckType(Struct.java:261) at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.getString(Struct.java:158) at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher$SchemaChangeEventReceiver.schemaChangeRecordValue(JdbcSourceEventDispatcher.java:193) at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher$SchemaChangeEventReceiver.schemaChangeEvent(JdbcSourceEventDispatcher.java:223) at io.debezium.connector.sqlserver.SqlServerSchemaChangeEventEmitter.emitSchemaChangeEvent(SqlServerSchemaChangeEventEmitter.java:47) at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher.dispatchSchemaChangeEvent(JdbcSourceEventDispatcher.java:147) at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher.dispatchSchemaChangeEvent(JdbcSourceEventDispatcher.java:62) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.migrateTable(SqlServerStreamingChangeEventSource.java:491) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.lambda$executeIteration$1(SqlServerStreamingChangeEventSource.java:381) at io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:606) at io.debezium.connector.sqlserver.SqlServerConnection.getChangesForTables(SqlServerConnection.java:329) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.executeIteration(SqlServerStreamingChangeEventSource.java:250) ... 9 more 11:32:13,639 ERROR io.debezium.pipeline.ErrorHandler [debezium-reader-0] - Producer failure com.microsoft.sqlserver.jdbc.SQLServerException: Socket closed at com.microsoft.sqlserver.jdbc.SQLServerConnection.terminate(SQLServerConnection.java:3422) at com.microsoft.sqlserver.jdbc.TDSChannel.read(IOBuffer.java:2066) at com.microsoft.sqlserver.jdbc.TDSReader.readPacket(IOBuffer.java:6616) at com.microsoft.sqlserver.jdbc.TDSCommand.startResponse(IOBuffer.java:7802) at com.microsoft.sqlserver.jdbc.SQLServerStatement.doExecuteStatement(SQLServerStatement.java:896) at com.microsoft.sqlserver.jdbc.SQLServerStatement$StmtExecCmd.doExecute(SQLServerStatement.java:793) at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7417) at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:3488) at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:262) at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:237) at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeQuery(SQLServerStatement.java:715) at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:641) at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:510) at io.debezium.connector.sqlserver.SqlServerConnection.getMaxTransactionLsn(SqlServerConnection.java:274) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.getToLsn(SqlServerStreamingChangeEventSource.java:614) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.executeIteration(SqlServerStreamingChangeEventSource.java:202) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:138) at com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask$StreamSplitReadTask.execute(SqlServerStreamFetchTask.java:161) at com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask.execute(SqlServerStreamFetchTask.java:69) at com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:89) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.net.SocketException: Socket closed at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) at java.net.SocketInputStream.read(SocketInputStream.java:171) at java.net.SocketInputStream.read(SocketInputStream.java:141) at com.microsoft.sqlserver.jdbc.TDSChannel.read(IOBuffer.java:2058) ... 23 more | (?)可以监控,需要解决报错信息 |
4 | 1. 源表新增字段 2. 重启源表CDC任务 3. FlinkCDC任务重启 | 1.可以获取新增字段的历史数据 2.可以获取新增字段的增量数据 | (√)可以监控 |
5 | 1. 源表新增字段 2. 新增源表CDC字段级别CDC任务 3. FlinkCDC任务不动 | 1. 可以获取到新增字段的增量数据 2. 任务一直报错
java 一月 31, 2024 11:48:19 上午 com.microsoft.sqlserver.jdbc.TDSChannel enableSSL 警告: TLSv1 was negotiated. Please update server and client to use TLSv1.2 at minimum. 一月 31, 2024 11:48:19 上午 com.microsoft.sqlserver.jdbc.TDSChannel enableSSL 警告: TLSv1 was negotiated. Please update server and client to use TLSv1.2 at minimum. 11:48:19,958 ERROR io.debezium.pipeline.ErrorHandler [debezium-reader-0] - Producer failure com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.DataException: file is not a valid field name at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254) at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.getCheckType(Struct.java:261) at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.getString(Struct.java:158) at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher$SchemaChangeEventReceiver.schemaChangeRecordValue(JdbcSourceEventDispatcher.java:193) at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher$SchemaChangeEventReceiver.schemaChangeEvent(JdbcSourceEventDispatcher.java:223) at io.debezium.connector.sqlserver.SqlServerSchemaChangeEventEmitter.emitSchemaChangeEvent(SqlServerSchemaChangeEventEmitter.java:47) at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher.dispatchSchemaChangeEvent(JdbcSourceEventDispatcher.java:147) at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher.dispatchSchemaChangeEvent(JdbcSourceEventDispatcher.java:62) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.migrateTable(SqlServerStreamingChangeEventSource.java:491) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.lambda$executeIteration$1(SqlServerStreamingChangeEventSource.java:381) at io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:606) at io.debezium.connector.sqlserver.SqlServerConnection.getChangesForTables(SqlServerConnection.java:329) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.executeIteration(SqlServerStreamingChangeEventSource.java:250) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:138) at com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask$StreamSplitReadTask.execute(SqlServerStreamFetchTask.java:161) at com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask.execute(SqlServerStreamFetchTask.java:69) at com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:89) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 11:48:20,165 ERROR org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [Source Data Fetcher for Source: test (1/12)#7] - Received uncaught exception. java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped. at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:50) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.executeIteration(SqlServerStreamingChangeEventSource.java:459) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:138) at com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask$StreamSplitReadTask.execute(SqlServerStreamFetchTask.java:161) at com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask.execute(SqlServerStreamFetchTask.java:69) at com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:89) ... 5 more Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.DataException: file is not a valid field name at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254) at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.getCheckType(Struct.java:261) at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.getString(Struct.java:158) at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher$SchemaChangeEventReceiver.schemaChangeRecordValue(JdbcSourceEventDispatcher.java:193) at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher$SchemaChangeEventReceiver.schemaChangeEvent(JdbcSourceEventDispatcher.java:223) at io.debezium.connector.sqlserver.SqlServerSchemaChangeEventEmitter.emitSchemaChangeEvent(SqlServerSchemaChangeEventEmitter.java:47) at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher.dispatchSchemaChangeEvent(JdbcSourceEventDispatcher.java:147) at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher.dispatchSchemaChangeEvent(JdbcSourceEventDispatcher.java:62) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.migrateTable(SqlServerStreamingChangeEventSource.java:491) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.lambda$executeIteration$1(SqlServerStreamingChangeEventSource.java:381) at io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:606) at io.debezium.connector.sqlserver.SqlServerConnection.getChangesForTables(SqlServerConnection.java:329) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.executeIteration(SqlServerStreamingChangeEventSource.java:250) ... 9 more | (?)可以监控,需要解决报错信息,且新增字段是单独的一条数据 |
6 | 1. 源表新增字段 2. 新增源表CDC字段级别CDC任务 3. FlinkCDC任务重启 | 1. 可以获取新增字段的历史数据 2. 无法获取新增字段的增量数据 3. 任务一直报错
java 一月 31, 2024 11:50:13 上午 com.microsoft.sqlserver.jdbc.TDSChannel enableSSL 警告: TLSv1 was negotiated. Please update server and client to use TLSv1.2 at minimum. 11:50:14,027 ERROR io.debezium.relational.TableSchemaBuilder [debezium-reader-0] - Error requesting a row value, row: 2, requested index: 2 at position 2 11:50:14,028 ERROR io.debezium.pipeline.ErrorHandler [debezium-reader-0] - Producer failure com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: Error while processing event at offset {transaction_id=null, event_serial_no=2, commit_lsn=00000285:00000100:0003, change_lsn=00000285:00000100:0002} at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:246) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.lambda$executeIteration$1(SqlServerStreamingChangeEventSource.java:434) at io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:606) at io.debezium.connector.sqlserver.SqlServerConnection.getChangesForTables(SqlServerConnection.java:329) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.executeIteration(SqlServerStreamingChangeEventSource.java:250) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:138) at com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask$StreamSplitReadTask.execute(SqlServerStreamFetchTask.java:161) at com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask.execute(SqlServerStreamFetchTask.java:69) at com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:89) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: Data row is smaller than a column index, internal schema representation is probably out of sync with real database schema at io.debezium.relational.TableSchemaBuilder.validateIncomingRowToInternalMetadata(TableSchemaBuilder.java:254) at io.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$5(TableSchemaBuilder.java:283) at io.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:141) at io.debezium.relational.RelationalChangeRecordEmitter.emitUpdateRecord(RelationalChangeRecordEmitter.java:139) at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:60) at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:209) ... 13 more 11:50:14,365 ERROR org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [Source Data Fetcher for Source: test (1/12)#6] - Received uncaught exception. java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped. at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:50) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.executeIteration(SqlServerStreamingChangeEventSource.java:459) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:138) at com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask$StreamSplitReadTask.execute(SqlServerStreamFetchTask.java:161) at com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask.execute(SqlServerStreamFetchTask.java:69) at com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:89) ... 5 more Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: Error while processing event at offset {transaction_id=null, event_serial_no=2, commit_lsn=00000285:00000100:0003, change_lsn=00000285:00000100:0002} at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:246) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.lambda$executeIteration$1(SqlServerStreamingChangeEventSource.java:434) at io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:606) at io.debezium.connector.sqlserver.SqlServerConnection.getChangesForTables(SqlServerConnection.java:329) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.executeIteration(SqlServerStreamingChangeEventSource.java:250) ... 9 more Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: Data row is smaller than a column index, internal schema representation is probably out of sync with real database schema at io.debezium.relational.TableSchemaBuilder.validateIncomingRowToInternalMetadata(TableSchemaBuilder.java:254) at io.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$5(TableSchemaBuilder.java:283) at io.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:141) at io.debezium.relational.RelationalChangeRecordEmitter.emitUpdateRecord(RelationalChangeRecordEmitter.java:139) at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:60) at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:209) ... 13 more | (×)无法监控 |
4.3 结论
要想保证FlinkCDC任务一直运行,首先需要解决场景3中的报错,且源表结构发生变化后,必须先重启源表的CDC任务,再插入数据
报错信息参照:https://github.com/ververica/flink-cdc-connectors/pull/2315
实际情况出发,只能重抽,即重启源表的CDC任务,又重启FlinkCDC任务