Thank you Robin for excellent explanation. In all your examples you're using with auto.create option with true , where I was trying to auto.create with false where I am creating the table before hand but I am getting error as below. I have tried all possible option. Will you able to help . Thanks in advance. Caused by: io.confluent.connect.jdbc.sink.TableAlterOrCreateException: Table "table_name" is missing and auto-creation is disabled
Hi Robin, thank you for the video on JDBC Sink. Great content! I have 2 quick questions: 1) Whhen create a JDBC sink, what if the table already exists and you don't want to delete it and create from scratch 2) Can you change the primary key column as the first column instead of the last column in the sink table?
As always great video..Thanks a lot for sharing these with us..! Do you plan to connect Confluent CDC connector for Oracle in future videos? Also which terminal theme/plugin you're using .. Love that !!
Hi Robin, thanks for this demonstration. How can we produce tombstone messages ? I don't have debezium src connector in my system. Is there any workaround?
Hi Robin, thanks for the video. I love the extra details! I have a question that is not so much about CDC, but on how to share the normalised data of a database as a nice single entity (document) with the organisation. As an example I imagine an order, which refers to an address and a list of order items each in their own table. The order, the address and the order items are all mutable - the customer can change it all and as a result I want to send to my clients a single Kafka document with everything in it. How would you go about this? Both in combining the 3 change streams into a single new stream regardless of what changed, and only sharing the nice API with the organisation? In short, I'd love a video with a special focus on sharing a maintainable API with the rest of the organisation.
Thanks for the video. Though it is about 2 years old, I still find it useful. However, any idea why the error "Failed to create connector: {"error_code":409,"message":"Connector SINK_FOO_01_0 already exists"}" when I run for the first time, CREATE SINK CONNECTOR SINK_FOO_01_0 WITH (...) following your steps? Everything else worked up to that point.
How to handle tables which are only having columns as primary key and not any other column value? The syntax generated from JDBC Sink is wrong -> java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "public"."ModelSegment" ("OID","tenantx_id") VALUES (6,1) ON CONFLICT ("OID","tenantx_id") DO UPDATE SET was aborted: ERROR: syntax error at end of input Any suggestion?
is it possible to handle composite primary keys on record_keys without having to declare them explicitly in the connector? I am using JDBC to sink many tables at once and many of these topics have different key names, some have just one PK and others have 2, all record keys are defined in JSON format, without AVRO schema. It would be nice to know a flexible approach to handle this.
Hi Robin first of all thanks for effort and all other thing you do for help. So one thing is confusing me a lot is converters which you said at 4:49 "when you are reading a data from a topic we have to know to how to sterilized" when we reading data from topic to sink somewhere in DB so you know it was avro and you used avro converter however in your other video which "Kafka connect in action JDBC sink" you made similar example again creating stream with topic test01 and format was avro again but this time when configuring sink connector you specified the value converter with StringConverter instead of Avro like this video. So this converters are interchangeable like if we can use String Converter if it written to topic as Avro or vice versa?
Hi Ali, Can you link to the specific bit in the other video where I use a StringConverter? If the data is Avro, you have to use the Avro converter to deserialise it.
Robin i need your help. I´m new on development with kafka. So i have kafka connect with cdc as consumer and producer with jdbc connect sink. I can to to a upsert correctly, but i can´t to do work the delete operation. It is possible to use jdbc connect sink for make work all operations like insert, update and delete...? Can you help me please with a example kafka connect sql server to sql server without use debezium?
Hello Robin, I have a topic powered by kafka streams developed by ksql with criteria in the wehre part and the result is owned by kafka connect sink to a destination database, I want to delete the record from the database table if its value is changed over time and is not returned by the stream is this possible?
Hi Robin, Thanks for the video. I have a question related to this. How can we create single sink connectors for multiple tables with primary key column names are different in each table. Also how to do the same if we enable the delete mode. It will be helpful if you give some idea.
@@sivakaranb7265 Yes - unless multiple tables happened to share the same primary key field(s). For example, if they all had a key of ID then you could (I think) have multiple topics/tables in a single connector
Thank you so much Robin for all the videos, really really helpful. I've started from zero and growing to hero with your material. One topic that is still confusing to me is schema-registry in testing scenario. Do you have any material on this ? I've managed to use mock schema registry in unit tests using a url starting with mock://. However, when trying this in integration tests, I have the famous error `org.apache.kafka.common.errors.SerializationException: Error retrieving Avro unknown schema for id 1`. Any input ? Many thanks
Hi Ismail, I'm glad you find my videos useful :) Regarding your question I suggest you head over to forum.confluent.io/c/schema-registry/19 and ask there.
Thank you so much for this great video, it helped me alot. Could you please make a video on Debezium connector, ksqlDB and JDBC Sink connector write changes to postgres? Example: two databases with different structure, got CDC from the source with Debezium and transform it with ksqlDB then write it down to postgres with JDBC Sink
Yes, the source db contains three tables 1- REQUESTS with columns (ID NUMBER PK, CREATE_DATE TIMESTAMP(6), CREATED_BY VARCHAR2(255 char)) 2- REQUEST_DETAILS with columns (REQUEST_ID FK, DISTRICT_ID NUMBER(19) FK , NOTES VARCHAR2(4000 char)) 3- DISTRICTS is a lookup table with columns (ID, NAME) And in Sink database there are just two tables 1- CASES with columns (ID NUMBER PK, CREATE_DATE TIMESTAMP(6), CREATED_BY VARCHAR2(255 char), DISTRICT_ID NUMBER(19) FK , NOTES VARCHAR2(4000 char)) 2- LIST_OF_VALUES with columns (ID, NAME, PARENT_ID FK on the same table) We want to capture changes on the source database using Debezium connector and transform them using ksqlDB then write them down with JDBC Sink connector. Note: the ids in the lookup table are not the same so I think we will use ksqlDB table to put this mapping and update it when changed, then join the captured changes with this table to get the correct id. Also it's not only one direction sync, it's bidirectional Thank you in advance :)
Hi Robin, I'm working with Strimzi Kafka on openshift, and when I pulled the images for KSQL server and KSQL cli with 0.15.0 versions I got this error while writing any command.: The server has encountered an incompatible entry in its log and cannot process further DDL statements. This is most likely due to the service being rolled back to an earlier version. and the problem that when I'm using newer versions or the latest version I got the below error when I'm trying to identify fields as KEY or PRIMARY KEY: KSQL currently only supports KEY columns named ROWKEY, extraneous input 'PRIMARY.' so im struggiling to deal with keys and i have to go live soon so please kindly your support. Thanks in advance.
Yes, see docs.ksqldb.io/en/latest/operate-and-deploy/installation/server-config/security/#configure-ksqldb-for-secured-apache-kafka-clusters If you have more questions then head over to forum.confluent.io/
Hi Robin, I have a scenario to do 'Insert Only'. I just need to allow inserts onto a table. I should not allow updates. To handle this, I user insert.mode as 'insert' but sink connector is failing with duplicates error (a primary key violation from rds mysql). how can i handle this situtaion. Please help.
Thank You so much for this great video. Appreciate all the good work you are doing in terms of creating and publishing these demos
Glad it was helpful! :)
Excellent, well illustrated and so helpful! Thank you
Glad it was helpful!
Good stuff Robin.
Thanks :)
Great video.
Thank you Robin for excellent explanation. In all your examples you're using with auto.create option with true , where I was trying to auto.create with false where I am creating the table before hand but I am getting error as below. I have tried all possible option. Will you able to help . Thanks in advance. Caused by: io.confluent.connect.jdbc.sink.TableAlterOrCreateException: Table "table_name" is missing and auto-creation is disabled
Hi Robin, thank you for the video on JDBC Sink. Great content! I have 2 quick questions:
1) Whhen create a JDBC sink, what if the table already exists and you don't want to delete it and create from scratch
2) Can you change the primary key column as the first column instead of the last column in the sink table?
hi @Xingsheg - please post these to forum.confluent.io/ and I will try to answer there :)
As always great video..Thanks a lot for sharing these with us..! Do you plan to connect Confluent CDC connector for Oracle in future videos? Also which terminal theme/plugin you're using .. Love that !!
Thanks for the feedback! I'll certainly add that connector to my TODO list :)
I'm using oh-my-zsh with bira theme and a custom colour theme :)
Hi Robin, thanks for this demonstration. How can we produce tombstone messages ? I don't have debezium src connector in my system. Is there any workaround?
Hi Robin, thanks for the video. I love the extra details!
I have a question that is not so much about CDC, but on how to share the normalised data of a database as a nice single entity (document) with the organisation.
As an example I imagine an order, which refers to an address and a list of order items each in their own table.
The order, the address and the order items are all mutable - the customer can change it all and as a result I want to send to my clients a single Kafka document with everything in it.
How would you go about this? Both in combining the 3 change streams into a single new stream regardless of what changed, and only sharing the nice API with the organisation?
In short, I'd love a video with a special focus on sharing a maintainable API with the rest of the organisation.
Hi, this is a great question! Head over to forum.confluent.io/c/architecture-and-design/29 and post it there :)
Thanks for the video. Though it is about 2 years old, I still find it useful. However, any idea why the error "Failed to create connector: {"error_code":409,"message":"Connector SINK_FOO_01_0 already exists"}" when I run for the first time, CREATE SINK CONNECTOR SINK_FOO_01_0 WITH (...) following your steps? Everything else worked up to that point.
Hi,
I don't work with Kafka directly anymore. A good place to go for help is www.confluent.io/en-gb/community/ask-the-community/.
thanks.
How to handle tables which are only having columns as primary key and not any other column value? The syntax generated from JDBC Sink is wrong -> java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "public"."ModelSegment" ("OID","tenantx_id") VALUES (6,1) ON CONFLICT ("OID","tenantx_id") DO UPDATE SET was aborted: ERROR: syntax error at end of input
Any suggestion?
is it possible to handle composite primary keys on record_keys without having to declare them explicitly in the connector? I am using JDBC to sink many tables at once and many of these topics have different key names, some have just one PK and others have 2, all record keys are defined in JSON format, without AVRO schema. It would be nice to know a flexible approach to handle this.
i have found the answer myself. It works without declaring explicit fields in the pk.fields property
Hi Robin first of all thanks for effort and all other thing you do for help. So one thing is confusing me a lot is converters which you said at 4:49 "when you are reading a data from a topic we have to know to how to sterilized" when we reading data from topic to sink somewhere in DB so you know it was avro and you used avro converter however in your other video which "Kafka connect in action JDBC sink" you made similar example again creating stream with topic test01 and format was avro again but this time when configuring sink connector you specified the value converter with StringConverter instead of Avro like this video. So this converters are interchangeable like if we can use String Converter if it written to topic as Avro or vice versa?
Hi Ali,
Can you link to the specific bit in the other video where I use a StringConverter? If the data is Avro, you have to use the Avro converter to deserialise it.
Robin i need your help. I´m new on development with kafka. So i have kafka connect with cdc as consumer and producer with jdbc connect sink. I can to to a upsert correctly, but i can´t to do work the delete operation. It is possible to use jdbc connect sink for make work all operations like insert, update and delete...? Can you help me please with a example kafka connect sql server to sql server without use debezium?
Hello Robin,
I have a topic powered by kafka streams developed by ksql with criteria in the wehre part and the result is owned by kafka connect sink to a destination database, I want to delete the record from the database table if its value is changed over time and is not returned by the stream is this possible?
Hi Robin, Thanks for the video. I have a question related to this. How can we create single sink connectors for multiple tables with primary key column names are different in each table. Also how to do the same if we enable the delete mode. It will be helpful if you give some idea.
You can't - you need one connector per unique set of primary key column names
Oh thanks for the reply.. I got that.. so one connector for one table i need to go, Am i right?
@@sivakaranb7265 Yes - unless multiple tables happened to share the same primary key field(s). For example, if they all had a key of ID then you could (I think) have multiple topics/tables in a single connector
Ok. Thank you Robin.
Thank you so much Robin for all the videos, really really helpful. I've started from zero and growing to hero with your material.
One topic that is still confusing to me is schema-registry in testing scenario. Do you have any material on this ? I've managed to use mock schema registry in unit tests using a url starting with mock://. However, when trying this in integration tests, I have the famous error `org.apache.kafka.common.errors.SerializationException: Error retrieving Avro unknown schema for id 1`. Any input ? Many thanks
Hi Ismail, I'm glad you find my videos useful :) Regarding your question I suggest you head over to forum.confluent.io/c/schema-registry/19 and ask there.
Thank you so much for this great video, it helped me alot. Could you please make a video on Debezium connector, ksqlDB and JDBC Sink connector write changes to postgres?
Example: two databases with different structure, got CDC from the source with Debezium and transform it with ksqlDB then write it down to postgres with JDBC Sink
Sure - do you have an example of the kind of different structures that you're dealing with?
Yes, the source db contains three tables
1- REQUESTS with columns (ID NUMBER PK, CREATE_DATE TIMESTAMP(6), CREATED_BY VARCHAR2(255 char))
2- REQUEST_DETAILS with columns (REQUEST_ID FK, DISTRICT_ID NUMBER(19) FK , NOTES VARCHAR2(4000 char))
3- DISTRICTS is a lookup table with columns (ID, NAME)
And in Sink database there are just two tables
1- CASES with columns (ID NUMBER PK, CREATE_DATE TIMESTAMP(6), CREATED_BY VARCHAR2(255 char), DISTRICT_ID NUMBER(19) FK , NOTES VARCHAR2(4000 char))
2- LIST_OF_VALUES with columns (ID, NAME, PARENT_ID FK on the same table)
We want to capture changes on the source database using Debezium connector and transform them using ksqlDB then write them down with JDBC Sink connector.
Note: the ids in the lookup table are not the same so I think we will use ksqlDB table to put this mapping and update it when changed, then join the captured changes with this table to get the correct id. Also it's not only one direction sync, it's bidirectional
Thank you in advance :)
@@amawaziny Can you post this over at forum.confluent.io/and I can try and answer there
Hi Robin, I'm working with Strimzi Kafka on openshift, and when I pulled the images for KSQL server and KSQL cli with 0.15.0 versions I got this error while writing any command.: The server has encountered an incompatible entry in its log and cannot process further DDL statements.
This is most likely due to the service being rolled back to an earlier version.
and the problem that when I'm using newer versions or the latest version I got the below error when I'm trying to identify fields as KEY or PRIMARY KEY: KSQL currently only supports KEY columns named ROWKEY, extraneous input 'PRIMARY.'
so im struggiling to deal with keys and i have to go live soon so please kindly your support.
Thanks in advance.
Hi, A good place to go for help is www.confluent.io/en-gb/community/ask-the-community/.
thanks.
can ksql connect to tls secured Kafka? How?
Yes, see docs.ksqldb.io/en/latest/operate-and-deploy/installation/server-config/security/#configure-ksqldb-for-secured-apache-kafka-clusters
If you have more questions then head over to forum.confluent.io/
Hi Robin, I have a scenario to do 'Insert Only'. I just need to allow inserts onto a table. I should not allow updates. To handle this, I user insert.mode as 'insert' but sink connector is failing with duplicates error (a primary key violation from rds mysql). how can i handle this situtaion. Please help.
Please ask this question at forum.confluent.io/. Thanks!