danan
发布于 2024-01-31 / 2 阅读
0
0

FlinkCDC采集SqlServer测试

1 源库开启CDC步骤

  1. 创建用户

-- 1 创建登录用户
create login danan with password='Tdy123456...', default_database=test;
-- 2 创建数据库用户
create user danan for login danan with default_schema=dbo;
-- 3 赋予权限
exec sp_addrolemember 'db_owner', 'danan';
  1. 开启SqlServer代理模式

  2. 开启库级别CDC

EXEC sys.sp_cdc_enable_db
  1. 开启表级别CDC

EXEC sys.sp_cdc_enable_table  
@source_schema = N'dbo',  
@source_name   = N'student',  
@role_name     = NULL,  
-- 文件组要存在,否则不设置
@filegroup_name = N'TEST_FN',
@supports_net_changes = 0

注:
	1.生成的cdc任务名称,默认是"schema名_table名"
  2.设置监控的字段列表:@captured_column_list = N'id,name'
  3.设置监控的实例名称:@capture_instance='dbo_teachers_1'
  4.每张表最多有两个CDC监控实例
-- 禁用表cdc
EXEC sys.sp_cdc_disable_table  
@source_schema = N'dbo',  
@source_name   = N'student',  
@capture_instance = N'dbo_student'
  1. 确定CDC步骤

USE MyDB;
GO
EXEC sys.sp_cdc_help_change_data_capture
GO

2 FlinkCDC采集

2.1 依赖

<dependency>
      <groupId>com.ververica</groupId>
      <artifactId>flink-sql-connector-sqlserver-cdc</artifactId>
      <version>1.17.2</version>
</dependency>

2.2 代码

public class SqlServerTest {
    public static void main(String[] args) throws Exception {
        JdbcIncrementalSource<String> incrSource =
                SqlServerSourceBuilder.SqlServerIncrementalSource.<String>builder()
                        .hostname("192.168.31.69")
                        .port(1433)
                        .databaseList("cdc_test")
                        .tableList("dbo.teachers")
                        .username("sa")
                        .password("123456")
                        .startupOptions(StartupOptions.initial())
                        .deserializer(new JsonDebeziumDeserializationSchema())
                        .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(3000);

        env
                .fromSource(incrSource, WatermarkStrategy.noWatermarks(),"test")
                .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering

        env.execute();
    }
}

3 备库采集验证

序号

备份模式

子模式

验证结果

1

镜像备份

\

未测试

2

发布/订阅模式

快照发布

测试失败,备份库开启CDC任务一段时间后,CDC任务会被杀死,无法采集到备份库增量数据

3

事务发布

测试成功

4

对等/合并发布

对主数据库有影响,暂不考虑

5

AlwaysOn模式

\

未测试

4 FlinkCDC监控表结构变化验证

4.1 FlinkCDC设置参数

JdbcIncrementalSource<String> incrSource =
SqlServerSourceBuilder.SqlServerIncrementalSource.<String>builder()
		.hostname("192.168.31.69")
		.port(1433)
		.databaseList("cdc_test")
		.tableList("dbo.teachers")
		.username("sa")
		.password("123456")
		.startupOptions(StartupOptions.initial())
		.deserializer(new JsonDebeziumDeserializationSchema())
		// 开启捕获表结构变更
		.includeSchemaChanges(true)
		.build();

4.2 验证场景

序号

场景说明

现象

验证结果

1

1. 源表新增字段
2. FlinkCDC任务不动

获取不到新增字段的数据

(×)无法监控

2

1. 源表新增字段
2. FlinkCDC任务重启

1. 可以获取新增字段的历史数据
2. 无法获取新增字段的增量数据

(×)无法监控

3

1. 源表新增字段
2. 重启源表CDC任务
3. FlinkCDC任务不动

1. 可以获取到新增字段的增量数据
2. 任务一直报错重启
java 一月 31, 2024 11:32:13 上午 com.microsoft.sqlserver.jdbc.TDSChannel enableSSL 警告: TLSv1 was negotiated. Please update server and client to use TLSv1.2 at minimum. 一月 31, 2024 11:32:13 上午 com.microsoft.sqlserver.jdbc.TDSChannel enableSSL 警告: TLSv1 was negotiated. Please update server and client to use TLSv1.2 at minimum. 11:32:13,412 ERROR io.debezium.pipeline.ErrorHandler [debezium-reader-0] - Producer failure com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.DataException: file is not a valid field name at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254) at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.getCheckType(Struct.java:261) at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.getString(Struct.java:158) at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher$SchemaChangeEventReceiver.schemaChangeRecordValue(JdbcSourceEventDispatcher.java:193) at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher$SchemaChangeEventReceiver.schemaChangeEvent(JdbcSourceEventDispatcher.java:223) at io.debezium.connector.sqlserver.SqlServerSchemaChangeEventEmitter.emitSchemaChangeEvent(SqlServerSchemaChangeEventEmitter.java:47) at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher.dispatchSchemaChangeEvent(JdbcSourceEventDispatcher.java:147) at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher.dispatchSchemaChangeEvent(JdbcSourceEventDispatcher.java:62) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.migrateTable(SqlServerStreamingChangeEventSource.java:491) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.lambda$executeIteration$1(SqlServerStreamingChangeEventSource.java:381) at io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:606) at io.debezium.connector.sqlserver.SqlServerConnection.getChangesForTables(SqlServerConnection.java:329) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.executeIteration(SqlServerStreamingChangeEventSource.java:250) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:138) at com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask$StreamSplitReadTask.execute(SqlServerStreamFetchTask.java:161) at com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask.execute(SqlServerStreamFetchTask.java:69) at com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:89) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 11:32:13,637 ERROR org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [Source Data Fetcher for Source: test (1/12)#93] - Received uncaught exception. java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped. at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:50) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.executeIteration(SqlServerStreamingChangeEventSource.java:459) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:138) at com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask$StreamSplitReadTask.execute(SqlServerStreamFetchTask.java:161) at com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask.execute(SqlServerStreamFetchTask.java:69) at com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:89) ... 5 more Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.DataException: file is not a valid field name at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254) at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.getCheckType(Struct.java:261) at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.getString(Struct.java:158) at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher$SchemaChangeEventReceiver.schemaChangeRecordValue(JdbcSourceEventDispatcher.java:193) at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher$SchemaChangeEventReceiver.schemaChangeEvent(JdbcSourceEventDispatcher.java:223) at io.debezium.connector.sqlserver.SqlServerSchemaChangeEventEmitter.emitSchemaChangeEvent(SqlServerSchemaChangeEventEmitter.java:47) at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher.dispatchSchemaChangeEvent(JdbcSourceEventDispatcher.java:147) at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher.dispatchSchemaChangeEvent(JdbcSourceEventDispatcher.java:62) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.migrateTable(SqlServerStreamingChangeEventSource.java:491) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.lambda$executeIteration$1(SqlServerStreamingChangeEventSource.java:381) at io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:606) at io.debezium.connector.sqlserver.SqlServerConnection.getChangesForTables(SqlServerConnection.java:329) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.executeIteration(SqlServerStreamingChangeEventSource.java:250) ... 9 more 11:32:13,639 ERROR io.debezium.pipeline.ErrorHandler [debezium-reader-0] - Producer failure com.microsoft.sqlserver.jdbc.SQLServerException: Socket closed at com.microsoft.sqlserver.jdbc.SQLServerConnection.terminate(SQLServerConnection.java:3422) at com.microsoft.sqlserver.jdbc.TDSChannel.read(IOBuffer.java:2066) at com.microsoft.sqlserver.jdbc.TDSReader.readPacket(IOBuffer.java:6616) at com.microsoft.sqlserver.jdbc.TDSCommand.startResponse(IOBuffer.java:7802) at com.microsoft.sqlserver.jdbc.SQLServerStatement.doExecuteStatement(SQLServerStatement.java:896) at com.microsoft.sqlserver.jdbc.SQLServerStatement$StmtExecCmd.doExecute(SQLServerStatement.java:793) at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7417) at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:3488) at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:262) at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:237) at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeQuery(SQLServerStatement.java:715) at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:641) at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:510) at io.debezium.connector.sqlserver.SqlServerConnection.getMaxTransactionLsn(SqlServerConnection.java:274) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.getToLsn(SqlServerStreamingChangeEventSource.java:614) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.executeIteration(SqlServerStreamingChangeEventSource.java:202) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:138) at com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask$StreamSplitReadTask.execute(SqlServerStreamFetchTask.java:161) at com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask.execute(SqlServerStreamFetchTask.java:69) at com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:89) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.net.SocketException: Socket closed at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) at java.net.SocketInputStream.read(SocketInputStream.java:171) at java.net.SocketInputStream.read(SocketInputStream.java:141) at com.microsoft.sqlserver.jdbc.TDSChannel.read(IOBuffer.java:2058) ... 23 more

(?)可以监控,需要解决报错信息

4

1. 源表新增字段
2. 重启源表CDC任务
3. FlinkCDC任务重启

1.可以获取新增字段的历史数据
2.可以获取新增字段的增量数据

(√)可以监控

5

1. 源表新增字段
2. 新增源表CDC字段级别CDC任务
3. FlinkCDC任务不动

1. 可以获取到新增字段的增量数据
2. 任务一直报错
java 一月 31, 2024 11:48:19 上午 com.microsoft.sqlserver.jdbc.TDSChannel enableSSL 警告: TLSv1 was negotiated. Please update server and client to use TLSv1.2 at minimum. 一月 31, 2024 11:48:19 上午 com.microsoft.sqlserver.jdbc.TDSChannel enableSSL 警告: TLSv1 was negotiated. Please update server and client to use TLSv1.2 at minimum. 11:48:19,958 ERROR io.debezium.pipeline.ErrorHandler [debezium-reader-0] - Producer failure com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.DataException: file is not a valid field name at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254) at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.getCheckType(Struct.java:261) at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.getString(Struct.java:158) at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher$SchemaChangeEventReceiver.schemaChangeRecordValue(JdbcSourceEventDispatcher.java:193) at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher$SchemaChangeEventReceiver.schemaChangeEvent(JdbcSourceEventDispatcher.java:223) at io.debezium.connector.sqlserver.SqlServerSchemaChangeEventEmitter.emitSchemaChangeEvent(SqlServerSchemaChangeEventEmitter.java:47) at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher.dispatchSchemaChangeEvent(JdbcSourceEventDispatcher.java:147) at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher.dispatchSchemaChangeEvent(JdbcSourceEventDispatcher.java:62) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.migrateTable(SqlServerStreamingChangeEventSource.java:491) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.lambda$executeIteration$1(SqlServerStreamingChangeEventSource.java:381) at io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:606) at io.debezium.connector.sqlserver.SqlServerConnection.getChangesForTables(SqlServerConnection.java:329) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.executeIteration(SqlServerStreamingChangeEventSource.java:250) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:138) at com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask$StreamSplitReadTask.execute(SqlServerStreamFetchTask.java:161) at com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask.execute(SqlServerStreamFetchTask.java:69) at com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:89) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 11:48:20,165 ERROR org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [Source Data Fetcher for Source: test (1/12)#7] - Received uncaught exception. java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped. at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:50) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.executeIteration(SqlServerStreamingChangeEventSource.java:459) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:138) at com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask$StreamSplitReadTask.execute(SqlServerStreamFetchTask.java:161) at com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask.execute(SqlServerStreamFetchTask.java:69) at com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:89) ... 5 more Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.DataException: file is not a valid field name at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254) at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.getCheckType(Struct.java:261) at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.getString(Struct.java:158) at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher$SchemaChangeEventReceiver.schemaChangeRecordValue(JdbcSourceEventDispatcher.java:193) at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher$SchemaChangeEventReceiver.schemaChangeEvent(JdbcSourceEventDispatcher.java:223) at io.debezium.connector.sqlserver.SqlServerSchemaChangeEventEmitter.emitSchemaChangeEvent(SqlServerSchemaChangeEventEmitter.java:47) at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher.dispatchSchemaChangeEvent(JdbcSourceEventDispatcher.java:147) at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher.dispatchSchemaChangeEvent(JdbcSourceEventDispatcher.java:62) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.migrateTable(SqlServerStreamingChangeEventSource.java:491) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.lambda$executeIteration$1(SqlServerStreamingChangeEventSource.java:381) at io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:606) at io.debezium.connector.sqlserver.SqlServerConnection.getChangesForTables(SqlServerConnection.java:329) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.executeIteration(SqlServerStreamingChangeEventSource.java:250) ... 9 more

(?)可以监控,需要解决报错信息,且新增字段是单独的一条数据

6

1. 源表新增字段
2. 新增源表CDC字段级别CDC任务
3. FlinkCDC任务重启

1. 可以获取新增字段的历史数据
2. 无法获取新增字段的增量数据
3. 任务一直报错
java 一月 31, 2024 11:50:13 上午 com.microsoft.sqlserver.jdbc.TDSChannel enableSSL 警告: TLSv1 was negotiated. Please update server and client to use TLSv1.2 at minimum. 11:50:14,027 ERROR io.debezium.relational.TableSchemaBuilder [debezium-reader-0] - Error requesting a row value, row: 2, requested index: 2 at position 2 11:50:14,028 ERROR io.debezium.pipeline.ErrorHandler [debezium-reader-0] - Producer failure com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: Error while processing event at offset {transaction_id=null, event_serial_no=2, commit_lsn=00000285:00000100:0003, change_lsn=00000285:00000100:0002} at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:246) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.lambda$executeIteration$1(SqlServerStreamingChangeEventSource.java:434) at io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:606) at io.debezium.connector.sqlserver.SqlServerConnection.getChangesForTables(SqlServerConnection.java:329) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.executeIteration(SqlServerStreamingChangeEventSource.java:250) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:138) at com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask$StreamSplitReadTask.execute(SqlServerStreamFetchTask.java:161) at com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask.execute(SqlServerStreamFetchTask.java:69) at com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:89) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: Data row is smaller than a column index, internal schema representation is probably out of sync with real database schema at io.debezium.relational.TableSchemaBuilder.validateIncomingRowToInternalMetadata(TableSchemaBuilder.java:254) at io.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$5(TableSchemaBuilder.java:283) at io.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:141) at io.debezium.relational.RelationalChangeRecordEmitter.emitUpdateRecord(RelationalChangeRecordEmitter.java:139) at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:60) at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:209) ... 13 more 11:50:14,365 ERROR org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [Source Data Fetcher for Source: test (1/12)#6] - Received uncaught exception. java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped. at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:50) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.executeIteration(SqlServerStreamingChangeEventSource.java:459) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:138) at com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask$StreamSplitReadTask.execute(SqlServerStreamFetchTask.java:161) at com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask.execute(SqlServerStreamFetchTask.java:69) at com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:89) ... 5 more Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: Error while processing event at offset {transaction_id=null, event_serial_no=2, commit_lsn=00000285:00000100:0003, change_lsn=00000285:00000100:0002} at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:246) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.lambda$executeIteration$1(SqlServerStreamingChangeEventSource.java:434) at io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:606) at io.debezium.connector.sqlserver.SqlServerConnection.getChangesForTables(SqlServerConnection.java:329) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.executeIteration(SqlServerStreamingChangeEventSource.java:250) ... 9 more Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: Data row is smaller than a column index, internal schema representation is probably out of sync with real database schema at io.debezium.relational.TableSchemaBuilder.validateIncomingRowToInternalMetadata(TableSchemaBuilder.java:254) at io.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$5(TableSchemaBuilder.java:283) at io.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:141) at io.debezium.relational.RelationalChangeRecordEmitter.emitUpdateRecord(RelationalChangeRecordEmitter.java:139) at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:60) at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:209) ... 13 more

(×)无法监控

4.3 结论

  1. 要想保证FlinkCDC任务一直运行,首先需要解决场景3中的报错,且源表结构发生变化后,必须先重启源表的CDC任务,再插入数据

报错信息参照:https://github.com/ververica/flink-cdc-connectors/pull/2315

  1. 实际情况出发,只能重抽,即重启源表的CDC任务,又重启FlinkCDC任务


评论