@rmoff, thanks for your videos, are the best! We have a situation: We have implemented Debezium with kafka, we are experience a performance issue when both, source and sink connectors, are working at the same time the consumer performance decrease significantly, here some number of what we have: 1 Kafka Cluster with 3 nodes (8GB ram -Xms: 4096m -Xmx: 4096m each) - All the topics with Replication Factor = 3 1 Zookeper Cluster with 3 nodes (4GB RAM -Xms: 2048m -Xmx: 2048m each) 1 Connect Cluster with 5 nodes (12GB RAM c/u - -Xms: 6168m -Xmx: 6168m) with 1 Partition and 1 task but we have tried with 1 / 5 / 100 partitions and 1 / 5 / 15 / 100 task 1 Source Connector per table (4 tables) 1 Sink Connector per Table (4 tables) We are not using Schema Registry The problem: the Target DB has a delay that increase when the quantity of messages is more than 750msjs per seconds. Mesage size = 6kb Now we are processing 2000 msjs of 6kb per second (This was the best performance that we get and it was enabling ONLY the sink connectors) When we enabled only the sink connectors the performance is good, the same with source conectors with no sinks, performance is great Resources are OK, source and target environments have pleanty of CPU and Memory. When we see the messages consume per second there is like a wait for 30sec and then it consume some msjs and wait and then again. Question, what can we do to improve the consumer performance? Could you help me?
My data from Kafka is schema-less (string key, json values). I want to sink them to postgres. Any tips? Do I need schema registry? what converters transformers(if any) do i need?
When creating the sink connector I get "Failed to find any class that implements Connector and which name matches io.confluent.connect.jdbc.JdbcSinkConnector ..." So after already 3 min i cannot even follow your guide. Maybe there is something wrong with the docker containers?
It sounds like the JDBC connector isn't installed correctly in your connect worker. Did you restart it after installing it? If you're still stuck head to forum.confluent.io/ and post full details of what guide you're following and steps you've taken. Thanks.
Hi Robin I understand this tutorial clearly..it is very clear and easy to understand....But I am having a doubt. U r using Ksql to create a stream from existing topic and change it to type Avro and create a new topic I am not using KSQL so how can I do this without creating streams..
If you're not using ksqlDB and you want to apply a schema to the data you'll need to find another route, such as writing your own Kafka Streams application to do this. That's why it's always better to get the *producer* of the data to use a serialisation method that supports schemas (Avro, Protobuf, etc).
@@rmoff so as u said I am having my springboot application and configure that to use AvroSerializer for value converter thn if I send normal json data it will store in DB? Or else we need to add something else?
@@vishnumurali522 Yes, if you write Avro from your source application then you can use the AvroConverter in the JDBC sink and it will work. If you've got any more questions then head over to the #clients or #connect channel on cnfl.io/slack :-)
So when converting plain JSON to JSON schema using KsqlDB stream, can we specify only certain fields? In my case, I have a nested JSON for more than 100 field values, so do I have to explicitly mention each of them while creating a KsqlDB stream?
Yes, if you want to use them. If you have 100 on the input message but only want to use 5 of them you can just specify those 5, but the remainder will be dropped from any output message that you create
This is why using plain JSON is kind of a pain, because you end up with the situation of having to manually enter schemas. Whatever is producing that nested JSON of 100 values should instead serialise the data onto Kafka using Avro/Protobuf/JSONSchema.
@Robin Moffatt Is it possible to have a a mysql database table as source and we have a topic here say "sourceTopic" and then we use this "sourceTopic" in our sink curl config and the sink happens in another mysql table. Basically trying to set a table and audit table kind of scenario here.
I'm not quite clear - you want to read and write from the same MySQL database but from and to different tables? There's no reason why you couldn't do that, in theory. Perhaps head over to forum.confluent.io/ and ask there, with a bit more detail and context around what you're trying to do.
@@rmoff I have a table "books" in database motor. This is my source and for source connection I created a topic "mysql-books". So far all is good I am able to see messages on confluent platform UI. Now these messages I want to sink into another database called motor-audit so that in audit I am able to see all the changes that happened to the table "books". I have given the topic "mysql-books" in my sink curl for sink connector since changes are being published to this topic ,but this is not coming up and giving errors like - 1.This is with respect to the basic streams example you showed - Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro value schema for id 3 - 2. Error from my objective - [2021-04-28 18:14:48,231] ERROR WorkerSinkTask{id=sink-jdbc-mysql-02-json-0} Error converting message value in topic 'mysql-books' partition 0 at offset 0 and timestamp 1619602271548: Converting byte[] to Kafka Connect data failed due to serialization error: (org.apache.kafka.connect.runtime.WorkerSinkTask:547) org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
It’s a nice video appreciate your efforts. Can we create Key Schema as well for SOME_JASON _AS_AVRO? I have similar requirement where I need to do Upsert using JDBS Sink connector which reads from topic created out of streams.
@Robin, I have Azure Synapse Sync JDBC driver kafka-connect-azure-sql-dw-1.0.7.jar in the path /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc but still I get error - Caused by: java.sql.SQLException: No suitable driver found for "jdbc:sqlserver I have tried all possible options no of the option worked.
Hi Robin, regarding 'blacklist : "COL2"'. It is loading NULL incase of insert. BUT, IF i need to load the value while inserting and DO NOT want to update. then ? what should be the configuration ?
Hi Robin, I am a great fan of your KAFKA JDBC source and sink connector. We are right now facing a challenge where in using the KAFKA JDBC Connectors we are unable to connect to ORACLE Database which is in Cloud Kerberoized Environment . Any video or details would be a great help .
Can you plz help me reagarding how to connect mysql to kafka. If it is possible can you plz share any documentation Or link regarding this. Thank you .
Hi! I'm having trouble working with timestamp (date) values. Can't reflect changes in sink table if source table has a timestamp (with current_timestamp default). Is there any specific transformation to set in sink/source connectors to solve this problem? Thanks!
Yes, you can create multiple connectors reading from the same topic and route it to different tables using an Single Message Transform like RegexRouter (see rmoff.net/2020/12/11/twelve-days-of-smt-day-4-regexrouter/)
Isn't there a way to make a JDBC Sink perform an INSERT IGNORE? If I use mode insert I always get duplicate errors. Can it really be that this case has not been considered? (MySQL dialect)
Thanks for sharing it and I am following your videos a lot for learning Kafka, I have tried setting up JDBC sink connector to insert into SQL server with the batch size of 500, but it inserts into SQL server one by one rather than in batches which has a negative impact on SQL Server IO, Is something you can suggest to get the batch insertion working?Will look forward to your response. Thanks
Hi i have question regarding the automatic table creation in the sink connector, how we can define the custom colum name the sink connector configuration. for eg: by default the column name is after_userId, after_firstName . i want to change it to UserId and FirstName. how we can do this in connector configuration?
Hi Robin, Thanks for sharing. Can you use the Protobuff format from 5.5 instead of Avro? Also for MySQL, I notice you are using version 8. Will this also work on MySQL 5.6 or 5.7? Many thanks, Aidan
For me it is not working . I have sink-jdbc-mysql-01 | SINK | io.confluent.connect.jdbc.JdbcSinkConnector | WARNING (0/1 tasks RUNNING) . How i can fix ?
You need to look at the Kafka Connect worker log to find out the actual error. UA-cam comment threads aren't the easiest place to help debug, so head over to #connect on cnfl.io/slack
@@rmoff I've downloaded that jdbc connector and copied into that particular path using this command 'docker cp /home/foldername/Downloads/mysql-connector-java-8.0.22/. kafka-connect:/usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/mysql-connector-java-8.0.22.' i'm not getting any error message but mysql db is not updating and also my connector is showing 'warning 0/1' status. Is there any chat interface where i can have a brief discussion about this with you? I am new to kafka. please help. Thanks in advance.
@@abhrasarkar8040 The best place to ask any further questions is: → Slack group: cnfl.io/slack or → Mailing list: groups.google.com/forum/#!forum/confluent-platform
@@rmoff Thank you so much. I think i found the problem. This link is not working 'cdn.mysql.com/Downloads/Connector-J/mysql-connector-java-8.0.19.tar.gz'. So I'm getting the file from this link 'dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.22.tar.gz'. Is it fine?
@@rmoff Hello Robin! I am getting this error " java.sql.SQLSyntaxErrorException: Access denied for user 'connect_user'@'%' to database 'demo'". Can you please help me out with this?
You'll need a user with the appropriate permissions to write to the table(s). If you've got any more questions then head over to forum.confluent.io/ :)
Do you mean push the complex value of the Kafka message into a single target field in the database? If so, no I don't think this is supported. For more questions, head to forum.confluent.io/
@@vignesh9458 I think you would need to pre-process the topic and embed the message in a field first (since the data has to be serialised with a schema). If you would like to post this as a question over at forum.confluent.io/ I can try and answer properly there.
You're on the right video! The Kafka Connect JDBC Sink enables you to stream data from Kafka to MySQL. If you have more questions do head over to forum.confluent.io/
That's the point - you wouldn't. Kafka Connect is the integration API for Apache Kafka, and what you should generally use for getting data in and out of Kafka from systems such as RDBMS. Learn more: rmoff.dev/ljc-kafka-02
Hello Robin! Good Video, spanish : Tienes un video que se use un topico Dead Letter Queue donde guarde varios mensajes con diferentes schemas ,mejor dicho que use las propiedades confluent.value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy y TopicRecordNameStrategy
Simply the best instructor i have ever seen. Great job!
Thank you for your kind words! :)
@rmoff, thanks for your videos, are the best!
We have a situation:
We have implemented Debezium with kafka, we are experience a performance issue when both, source and sink connectors, are working at the same time the consumer performance decrease significantly, here some number of what we have:
1 Kafka Cluster with 3 nodes (8GB ram -Xms: 4096m -Xmx: 4096m each) - All the topics with Replication Factor = 3
1 Zookeper Cluster with 3 nodes (4GB RAM -Xms: 2048m -Xmx: 2048m each)
1 Connect Cluster with 5 nodes (12GB RAM c/u - -Xms: 6168m -Xmx: 6168m) with 1 Partition and 1 task but we have tried with 1 / 5 / 100 partitions and 1 / 5 / 15 / 100 task
1 Source Connector per table (4 tables)
1 Sink Connector per Table (4 tables)
We are not using Schema Registry
The problem: the Target DB has a delay that increase when the quantity of messages is more than 750msjs per seconds.
Mesage size = 6kb
Now we are processing 2000 msjs of 6kb per second (This was the best performance that we get and it was enabling ONLY the sink connectors)
When we enabled only the sink connectors the performance is good, the same with source conectors with no sinks, performance is great
Resources are OK, source and target environments have pleanty of CPU and Memory.
When we see the messages consume per second there is like a wait for 30sec and then it consume some msjs and wait and then again.
Question, what can we do to improve the consumer performance?
Could you help me?
Still learning, and so getting blown away by the amazing simple capabilities... once you've figured out how...
omg, Robin, your video save my internship! Thank you soooooo much!
Happy to help! :D
Off-topic tip: if you pipe your logs into `less -r` instead of `more` you can get a pager that interprets the ANSI color sequences
My data from Kafka is schema-less (string key, json values). I want to sink them to postgres. Any tips? Do I need schema registry? what converters transformers(if any) do i need?
Thank you so much for your best explanation 🏆
Glad it was helpful!
great bit on schemas. Going to link to this on my next video. :)
When creating the sink connector I get
"Failed to find any class that implements Connector and which name matches io.confluent.connect.jdbc.JdbcSinkConnector ..."
So after already 3 min i cannot even follow your guide. Maybe there is something wrong with the docker containers?
It sounds like the JDBC connector isn't installed correctly in your connect worker. Did you restart it after installing it?
If you're still stuck head to forum.confluent.io/ and post full details of what guide you're following and steps you've taken. Thanks.
@@rmoff Thanks that solved it. My proxy setting were blocking the download of the jdbc connector. After disabling it worked
@@niklaslehmann6551 Great!
why I keep getting empty set of tables in mysql while the connector worked? I also already make sure that I had my mysql connector on the directory
FANTASTIC. thank you brother
Thank you for sharing!
My pleasure!
Hi Robin
I understand this tutorial clearly..it is very clear and easy to understand....But I am having a doubt.
U r using Ksql to create a stream from existing topic and change it to type Avro and create a new topic
I am not using KSQL so how can I do this without creating streams..
If you're not using ksqlDB and you want to apply a schema to the data you'll need to find another route, such as writing your own Kafka Streams application to do this.
That's why it's always better to get the *producer* of the data to use a serialisation method that supports schemas (Avro, Protobuf, etc).
@@rmoff so as u said I am having my springboot application and configure that to use AvroSerializer for value converter thn if I send normal json data it will store in DB?
Or else we need to add something else?
@@vishnumurali522 Yes, if you write Avro from your source application then you can use the AvroConverter in the JDBC sink and it will work.
If you've got any more questions then head over to the #clients or #connect channel on cnfl.io/slack :-)
So I assume you can use ksql to do transformation of data and then pass to stream to update target database. Is this correct?
So when converting plain JSON to JSON schema using KsqlDB stream, can we specify only certain fields? In my case, I have a nested JSON for more than 100 field values, so do I have to explicitly mention each of them while creating a KsqlDB stream?
Yes, if you want to use them. If you have 100 on the input message but only want to use 5 of them you can just specify those 5, but the remainder will be dropped from any output message that you create
This is why using plain JSON is kind of a pain, because you end up with the situation of having to manually enter schemas. Whatever is producing that nested JSON of 100 values should instead serialise the data onto Kafka using Avro/Protobuf/JSONSchema.
@@rmoff Thanks for the clarification. BTW, your video was very informative.
@Robin Moffatt Is it possible to have a a mysql database table as source and we have a topic here say "sourceTopic" and then we use this "sourceTopic" in our sink curl config and the sink happens in another mysql table. Basically trying to set a table and audit table kind of scenario here.
I'm not quite clear - you want to read and write from the same MySQL database but from and to different tables? There's no reason why you couldn't do that, in theory. Perhaps head over to forum.confluent.io/ and ask there, with a bit more detail and context around what you're trying to do.
@@rmoff I have a table "books" in database motor. This is my source and for source connection I created a topic "mysql-books". So far all is good I am able to see messages on confluent platform UI. Now these messages I want to sink into another database called motor-audit so that in audit I am able to see all the changes that happened to the table "books". I have given the topic "mysql-books" in my sink curl for sink connector since changes are being published to this topic ,but this is not coming up and giving errors like - 1.This is with respect to the basic streams example you showed - Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro value schema for id 3 -
2. Error from my objective -
[2021-04-28 18:14:48,231] ERROR WorkerSinkTask{id=sink-jdbc-mysql-02-json-0} Error converting message value in topic 'mysql-books' partition 0 at offset 0 and timestamp 1619602271548: Converting byte[] to Kafka Connect data failed due to serialization error: (org.apache.kafka.connect.runtime.WorkerSinkTask:547)
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
@@vishalmalhotra2243 Please post this at forum.confluent.io/ and I can help you there :)
Can you update an existing mysql table that has more columns that the Kafka stream has - like an id/primary key?
Hi how to handle if you have array in avro schema with jdbcsinkconnector ? I am stuck with this . Please help
Hi, please post this at forum.confluent.io/ :)
It’s a nice video appreciate your efforts.
Can we create Key Schema as well for SOME_JASON _AS_AVRO? I have similar requirement where I need to do Upsert using JDBS Sink connector which reads from topic created out of streams.
Hi, the best place to ask this is on:
→ Slack group: cnfl.io/slack
or
→ Mailing list: groups.google.com/forum/#!forum/confluent-platform
@Robin, I have Azure Synapse Sync JDBC driver kafka-connect-azure-sql-dw-1.0.7.jar in the path /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc but still I get error - Caused by: java.sql.SQLException: No suitable driver found for "jdbc:sqlserver I have tried all possible options no of the option worked.
Hi Robin, regarding 'blacklist : "COL2"'. It is loading NULL incase of insert. BUT, IF i need to load the value while inserting and DO NOT want to update. then ? what should be the configuration ?
Please ask this question at forum.confluent.io/. Thanks!
When I create a stream with Avro format, I m getting Unable to create schema from topic, Connection reset error.
hi, the best place to get help is at www.confluent.io/en-gb/community/ask-the-community/ :)
Any changes needed for connection to Postgres instead of mysql?
You will need to amend the JDBC URL, as well as making sure that the Postgres JDBC driver is installed.
@@rmoff you show on 6:44 that we have already Postgres driver. Is it not enough ?
@@dmitriisergeev306 Correct, the Postgres JDBC Driver ships with it, so you should not need to install it again.
Hi Robin,
I am a great fan of your KAFKA JDBC source and sink connector. We are right now facing a challenge where in using the KAFKA JDBC Connectors we are unable to connect to ORACLE Database which is in Cloud Kerberoized Environment . Any video or details would be a great help .
Hi Kiran, this isn't a configuration I've tried, sorry. You could try asking at cnfl.io/slack.
Can you plz help me reagarding how to connect mysql to kafka. If it is possible can you plz share any documentation Or link regarding this.
Thank you .
You may want to start at the Debezium documentation
If you want to get data from MySQL into Kafka then check out rmoff.dev/no-more-silos
iam using jdbc connector sink from confluent developer how to setting this? please help
Please ask this question at forum.confluent.io/. Thanks!
Hi! I'm having trouble working with timestamp (date) values. Can't reflect changes in sink table if source table has a timestamp (with current_timestamp default). Is there any specific transformation to set in sink/source connectors to solve this problem? Thanks!
Hi, the best place to ask this is forum.confluent.io/ :)
Hi Robin, Is it possible to save single topic data into multiple table of Postgres
Yes, you can create multiple connectors reading from the same topic and route it to different tables using an Single Message Transform like RegexRouter (see rmoff.net/2020/12/11/twelve-days-of-smt-day-4-regexrouter/)
Isn't there a way to make a JDBC Sink perform an INSERT IGNORE? If I use mode insert I always get duplicate errors. Can it really be that this case has not been considered? (MySQL dialect)
I'm not aware of this being an option. Does it work if you use `upsert` instead?
Thanks for sharing it and I am following your videos a lot for learning Kafka, I have tried setting up JDBC sink connector to insert into SQL server with the batch size of 500, but it inserts into SQL server one by one rather than in batches which has a negative impact on SQL Server IO, Is something you can suggest to get the batch insertion working?Will look forward to your response. Thanks
Hi, the best place to ask this is on:
→ Mailing list: groups.google.com/forum/#!forum/confluent-platform
or
→ Slack group: cnfl.io/slack
Hi i have question regarding the automatic table creation in the sink connector, how we can define the custom colum name the sink connector configuration.
for eg: by default the column name is after_userId, after_firstName . i want to change it to UserId and FirstName. how we can do this in connector configuration?
Hi Sarath, this is a great question to ask over at forum.confluent.io/ :)
@@rmoffthanks for the reply, i got the answer. I use transform (smt) to achieve my requirement
Is there a good way to diagnose why a connector becomes degraded? Is it ok to use a symlink to put the jar under the jdbc drivers folder?
1. Diagnosing connector issues - look at the Kafka Connect worker log
2. symlink - I don't know, try it :)
For further help, head to #connect on cnfl.io/slack
I tried to replicate the steps in 1:51. When inserting a row into TEST01, I get a column name ROWKEY does not exists.
same error for me too. were u able to solve the issue ?
Hi Robin, Thanks for sharing. Can you use the Protobuff format from 5.5 instead of Avro? Also for MySQL, I notice you are using version 8. Will this also work on MySQL 5.6 or 5.7? Many thanks, Aidan
Yes as of Confluent Platform 5.5 you can use Avro, Protobuf, or JSON Schema
It'll work fine with earlier version of mySQL too.
hi @Robin just asking if the kafka connect works with Cloudera kafka and if I can use it in Production.
Kafka Connect is part of Apache Kafka. It's widely used in Production. I'm not familiar with what Cloudera support; you'd need to check with them.
For me it is not working . I have sink-jdbc-mysql-01 | SINK | io.confluent.connect.jdbc.JdbcSinkConnector | WARNING (0/1 tasks RUNNING) . How i can fix ?
You need to look at the Kafka Connect worker log to find out the actual error. UA-cam comment threads aren't the easiest place to help debug, so head over to #connect on cnfl.io/slack
Hello sir, I couldn't find my 'mysql-connector-java-8.0.20.jar' in this folder 'confluent-hub-components/confluentinc-kafka-connect-jdbc'
You need to install it yourself. Have a look at rmoff.dev/fix-jdbc-driver-video.
@@rmoff I've downloaded that jdbc connector and copied into that particular path using this command 'docker cp /home/foldername/Downloads/mysql-connector-java-8.0.22/. kafka-connect:/usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/mysql-connector-java-8.0.22.' i'm not getting any error message but mysql db is not updating and also my connector is showing 'warning 0/1' status. Is there any chat interface where i can have a brief discussion about this with you? I am new to kafka. please help. Thanks in advance.
@@abhrasarkar8040
The best place to ask any further questions is:
→ Slack group: cnfl.io/slack
or
→ Mailing list: groups.google.com/forum/#!forum/confluent-platform
@@rmoff Thank you so much. I think i found the problem. This link is not working 'cdn.mysql.com/Downloads/Connector-J/mysql-connector-java-8.0.19.tar.gz'. So I'm getting the file from this link 'dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.22.tar.gz'. Is it fine?
@@rmoff
Hello Robin! I am getting this error
" java.sql.SQLSyntaxErrorException: Access denied for user 'connect_user'@'%' to database 'demo'".
Can you please help me out with this?
Do we need any settings in oracle database to start with?
You'll need a user with the appropriate permissions to write to the table(s). If you've got any more questions then head over to forum.confluent.io/ :)
@Robin Moffatt Is it possible to store all the kafka data to a single column as a json or AVRO ? can you please help?
Do you mean push the complex value of the Kafka message into a single target field in the database? If so, no I don't think this is supported.
For more questions, head to forum.confluent.io/
@@rmoff exactly Robin. My requirement is to read the Json data from kafka and store it exactly as it is in a single column
@@vignesh9458 I think you would need to pre-process the topic and embed the message in a field first (since the data has to be serialised with a schema).
If you would like to post this as a question over at forum.confluent.io/ I can try and answer properly there.
@@rmoff sure I will post there.
Great video, many thanks. May i get your document in reference of this? Thx
You can find the JDBC Sink connector docs here: rmoff.dev/01r
@@rmoff ok, thanks ya
How to connect Kafka with MySQL database?
You're on the right video! The Kafka Connect JDBC Sink enables you to stream data from Kafka to MySQL. If you have more questions do head over to forum.confluent.io/
How to use , "pk.mode":"none", using DB sequence?
@Sumit Can you post full details of your question to forum.confluent.io? I can answer it over there. Thanks!
@@rmoff Done, "JDBC Sinc connector using DB Sequence for Primary key". Thanks for reply.
How to do this with java
That's the point - you wouldn't. Kafka Connect is the integration API for Apache Kafka, and what you should generally use for getting data in and out of Kafka from systems such as RDBMS. Learn more: rmoff.dev/ljc-kafka-02
Very fast English but anyway thanks for informative video.
Glad it was useful - sorry if it's not always easy to follow :)
ur keyboard noises are driving me crazy dude fix ur mic
Yeah, it's kinda clackity isn't it. I've got a quieter one since I filmed that video :)
Hello Robin! Good Video, spanish : Tienes un video que se use un topico Dead Letter Queue donde guarde varios mensajes con diferentes schemas ,mejor dicho que use las propiedades confluent.value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy y TopicRecordNameStrategy
I need your help regarding kafka connect jdbc sink performance issue. I have sent you a message on twitter. Looking forward to your response
How can i create a schema with payload for nested json data?
I need your help regarding kafka connect jdbc sink performance issue. I have sent you a message on twitter. Looking forward to your response
The best place to ask is www.confluent.io/en-gb/community/ask-the-community/