Kafka Connect in Action: JDBC Sink

Поділитися
Вставка
  • Опубліковано 6 лис 2024

КОМЕНТАРІ • 109

  • @ВертиевДмитрий-ъ6ч
    @ВертиевДмитрий-ъ6ч 3 роки тому +7

    Simply the best instructor i have ever seen. Great job!

    • @rmoff
      @rmoff  3 роки тому

      Thank you for your kind words! :)

  • @carlaguelpa4574
    @carlaguelpa4574 Рік тому

    @rmoff, thanks for your videos, are the best!
    We have a situation:
    We have implemented Debezium with kafka, we are experience a performance issue when both, source and sink connectors, are working at the same time the consumer performance decrease significantly, here some number of what we have:
    1 Kafka Cluster with 3 nodes (8GB ram -Xms: 4096m -Xmx: 4096m each) - All the topics with Replication Factor = 3
    1 Zookeper Cluster with 3 nodes (4GB RAM -Xms: 2048m -Xmx: 2048m each)
    1 Connect Cluster with 5 nodes (12GB RAM c/u - -Xms: 6168m -Xmx: 6168m) with 1 Partition and 1 task but we have tried with 1 / 5 / 100 partitions and 1 / 5 / 15 / 100 task
    1 Source Connector per table (4 tables)
    1 Sink Connector per Table (4 tables)
    We are not using Schema Registry
    The problem: the Target DB has a delay that increase when the quantity of messages is more than 750msjs per seconds.
    Mesage size = 6kb
    Now we are processing 2000 msjs of 6kb per second (This was the best performance that we get and it was enabling ONLY the sink connectors)
    When we enabled only the sink connectors the performance is good, the same with source conectors with no sinks, performance is great
    Resources are OK, source and target environments have pleanty of CPU and Memory.
    When we see the messages consume per second there is like a wait for 30sec and then it consume some msjs and wait and then again.
    Question, what can we do to improve the consumer performance?
    Could you help me?

  • @georgelza
    @georgelza 4 роки тому +1

    Still learning, and so getting blown away by the amazing simple capabilities... once you've figured out how...

  • @jeffyeh4537
    @jeffyeh4537 2 роки тому +1

    omg, Robin, your video save my internship! Thank you soooooo much!

    • @rmoff
      @rmoff  2 роки тому

      Happy to help! :D

  • @JonWolski
    @JonWolski 4 роки тому +1

    Off-topic tip: if you pipe your logs into `less -r` instead of `more` you can get a pager that interprets the ANSI color sequences

  • @hamoudy41
    @hamoudy41 Рік тому +1

    My data from Kafka is schema-less (string key, json values). I want to sink them to postgres. Any tips? Do I need schema registry? what converters transformers(if any) do i need?

  • @mukeshdharajiya1261
    @mukeshdharajiya1261 Рік тому

    Thank you so much for your best explanation 🏆

    • @rmoff
      @rmoff  Рік тому

      Glad it was helpful!

  • @OwenRubel
    @OwenRubel 4 роки тому +1

    great bit on schemas. Going to link to this on my next video. :)

  • @niklaslehmann6551
    @niklaslehmann6551 2 роки тому +1

    When creating the sink connector I get
    "Failed to find any class that implements Connector and which name matches io.confluent.connect.jdbc.JdbcSinkConnector ..."
    So after already 3 min i cannot even follow your guide. Maybe there is something wrong with the docker containers?

    • @rmoff
      @rmoff  2 роки тому

      It sounds like the JDBC connector isn't installed correctly in your connect worker. Did you restart it after installing it?
      If you're still stuck head to forum.confluent.io/ and post full details of what guide you're following and steps you've taken. Thanks.

    • @niklaslehmann6551
      @niklaslehmann6551 2 роки тому

      @@rmoff Thanks that solved it. My proxy setting were blocking the download of the jdbc connector. After disabling it worked

    • @rmoff
      @rmoff  2 роки тому

      @@niklaslehmann6551 Great!

  • @BenedictReydo
    @BenedictReydo 2 роки тому

    why I keep getting empty set of tables in mysql while the connector worked? I also already make sure that I had my mysql connector on the directory

  • @timothyvandermeer9845
    @timothyvandermeer9845 7 місяців тому

    FANTASTIC. thank you brother

  • @jennana8003
    @jennana8003 3 роки тому +1

    Thank you for sharing!

    • @rmoff
      @rmoff  3 роки тому

      My pleasure!

  • @vishnumurali522
    @vishnumurali522 4 роки тому

    Hi Robin
    I understand this tutorial clearly..it is very clear and easy to understand....But I am having a doubt.
    U r using Ksql to create a stream from existing topic and change it to type Avro and create a new topic
    I am not using KSQL so how can I do this without creating streams..

    • @rmoff
      @rmoff  4 роки тому

      If you're not using ksqlDB and you want to apply a schema to the data you'll need to find another route, such as writing your own Kafka Streams application to do this.
      That's why it's always better to get the *producer* of the data to use a serialisation method that supports schemas (Avro, Protobuf, etc).

    • @vishnumurali522
      @vishnumurali522 4 роки тому

      @@rmoff so as u said I am having my springboot application and configure that to use AvroSerializer for value converter thn if I send normal json data it will store in DB?
      Or else we need to add something else?

    • @rmoff
      @rmoff  4 роки тому

      @@vishnumurali522 Yes, if you write Avro from your source application then you can use the AvroConverter in the JDBC sink and it will work.
      If you've got any more questions then head over to the #clients or #connect channel on cnfl.io/slack :-)

  • @dswan01
    @dswan01 Рік тому

    So I assume you can use ksql to do transformation of data and then pass to stream to update target database. Is this correct?

  • @rishikesanravichandran9914
    @rishikesanravichandran9914 4 роки тому +1

    So when converting plain JSON to JSON schema using KsqlDB stream, can we specify only certain fields? In my case, I have a nested JSON for more than 100 field values, so do I have to explicitly mention each of them while creating a KsqlDB stream?

    • @rmoff
      @rmoff  4 роки тому

      Yes, if you want to use them. If you have 100 on the input message but only want to use 5 of them you can just specify those 5, but the remainder will be dropped from any output message that you create

    • @rmoff
      @rmoff  4 роки тому

      This is why using plain JSON is kind of a pain, because you end up with the situation of having to manually enter schemas. Whatever is producing that nested JSON of 100 values should instead serialise the data onto Kafka using Avro/Protobuf/JSONSchema.

    • @rishikesanravichandran9914
      @rishikesanravichandran9914 4 роки тому +1

      @@rmoff Thanks for the clarification. BTW, your video was very informative.

  • @vishalmalhotra2243
    @vishalmalhotra2243 3 роки тому +1

    @Robin Moffatt Is it possible to have a a mysql database table as source and we have a topic here say "sourceTopic" and then we use this "sourceTopic" in our sink curl config and the sink happens in another mysql table. Basically trying to set a table and audit table kind of scenario here.

    • @rmoff
      @rmoff  3 роки тому

      I'm not quite clear - you want to read and write from the same MySQL database but from and to different tables? There's no reason why you couldn't do that, in theory. Perhaps head over to forum.confluent.io/ and ask there, with a bit more detail and context around what you're trying to do.

    • @vishalmalhotra2243
      @vishalmalhotra2243 3 роки тому

      @@rmoff I have a table "books" in database motor. This is my source and for source connection I created a topic "mysql-books". So far all is good I am able to see messages on confluent platform UI. Now these messages I want to sink into another database called motor-audit so that in audit I am able to see all the changes that happened to the table "books". I have given the topic "mysql-books" in my sink curl for sink connector since changes are being published to this topic ,but this is not coming up and giving errors like - 1.This is with respect to the basic streams example you showed - Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro value schema for id 3 -
      2. Error from my objective -
      [2021-04-28 18:14:48,231] ERROR WorkerSinkTask{id=sink-jdbc-mysql-02-json-0} Error converting message value in topic 'mysql-books' partition 0 at offset 0 and timestamp 1619602271548: Converting byte[] to Kafka Connect data failed due to serialization error: (org.apache.kafka.connect.runtime.WorkerSinkTask:547)
      org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:

    • @rmoff
      @rmoff  3 роки тому +1

      @@vishalmalhotra2243 Please post this at forum.confluent.io/ and I can help you there :)

  • @JonathanLevinTKY
    @JonathanLevinTKY 4 роки тому

    Can you update an existing mysql table that has more columns that the Kafka stream has - like an id/primary key?

  • @amrish331
    @amrish331 3 роки тому

    Hi how to handle if you have array in avro schema with jdbcsinkconnector ? I am stuck with this . Please help

    • @rmoff
      @rmoff  3 роки тому

      Hi, please post this at forum.confluent.io/ :)

  • @krishnachaitanyapulagam2769
    @krishnachaitanyapulagam2769 4 роки тому +1

    It’s a nice video appreciate your efforts.
    Can we create Key Schema as well for SOME_JASON _AS_AVRO? I have similar requirement where I need to do Upsert using JDBS Sink connector which reads from topic created out of streams.

    • @rmoff
      @rmoff  4 роки тому

      Hi, the best place to ask this is on:
      → Slack group: cnfl.io/slack
      or
      → Mailing list: groups.google.com/forum/#!forum/confluent-platform

  • @Pinaldba
    @Pinaldba 6 місяців тому

    @Robin, I have Azure Synapse Sync JDBC driver kafka-connect-azure-sql-dw-1.0.7.jar in the path /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc but still I get error - Caused by: java.sql.SQLException: No suitable driver found for "jdbc:sqlserver I have tried all possible options no of the option worked.

  • @vinaygold20
    @vinaygold20 2 роки тому

    Hi Robin, regarding 'blacklist : "COL2"'. It is loading NULL incase of insert. BUT, IF i need to load the value while inserting and DO NOT want to update. then ? what should be the configuration ?

    • @rmoff
      @rmoff  2 роки тому

      Please ask this question at forum.confluent.io/. Thanks!

  • @heinhtetzaw9463
    @heinhtetzaw9463 Рік тому

    When I create a stream with Avro format, I m getting Unable to create schema from topic, Connection reset error.

    • @rmoff
      @rmoff  Рік тому

      hi, the best place to get help is at www.confluent.io/en-gb/community/ask-the-community/ :)

  • @MrNiceseb
    @MrNiceseb 4 роки тому +1

    Any changes needed for connection to Postgres instead of mysql?

    • @rmoff
      @rmoff  4 роки тому

      You will need to amend the JDBC URL, as well as making sure that the Postgres JDBC driver is installed.

    • @dmitriisergeev306
      @dmitriisergeev306 4 роки тому

      @@rmoff you show on 6:44 that we have already Postgres driver. Is it not enough ?

    • @rmoff
      @rmoff  4 роки тому

      @@dmitriisergeev306 Correct, the Postgres JDBC Driver ships with it, so you should not need to install it again.

  • @kiran0711
    @kiran0711 4 роки тому

    Hi Robin,
    I am a great fan of your KAFKA JDBC source and sink connector. We are right now facing a challenge where in using the KAFKA JDBC Connectors we are unable to connect to ORACLE Database which is in Cloud Kerberoized Environment . Any video or details would be a great help .

    • @rmoff
      @rmoff  4 роки тому

      Hi Kiran, this isn't a configuration I've tried, sorry. You could try asking at cnfl.io/slack.

  • @shuchikumari8031
    @shuchikumari8031 2 роки тому

    Can you plz help me reagarding how to connect mysql to kafka. If it is possible can you plz share any documentation Or link regarding this.
    Thank you .

    • @JordanCricketMoore
      @JordanCricketMoore 2 роки тому

      You may want to start at the Debezium documentation

    • @rmoff
      @rmoff  2 роки тому

      If you want to get data from MySQL into Kafka then check out rmoff.dev/no-more-silos

  • @panjisadewo4891
    @panjisadewo4891 2 роки тому

    iam using jdbc connector sink from confluent developer how to setting this? please help

    • @rmoff
      @rmoff  2 роки тому

      Please ask this question at forum.confluent.io/. Thanks!

  • @naironviana
    @naironviana 3 роки тому

    Hi! I'm having trouble working with timestamp (date) values. Can't reflect changes in sink table if source table has a timestamp (with current_timestamp default). Is there any specific transformation to set in sink/source connectors to solve this problem? Thanks!

    • @rmoff
      @rmoff  3 роки тому

      Hi, the best place to ask this is forum.confluent.io/ :)

  • @shashanksrivastava5654
    @shashanksrivastava5654 3 роки тому

    Hi Robin, Is it possible to save single topic data into multiple table of Postgres

    • @rmoff
      @rmoff  3 роки тому

      Yes, you can create multiple connectors reading from the same topic and route it to different tables using an Single Message Transform like RegexRouter (see rmoff.net/2020/12/11/twelve-days-of-smt-day-4-regexrouter/)

  • @ThorstenInJapan
    @ThorstenInJapan 2 роки тому

    Isn't there a way to make a JDBC Sink perform an INSERT IGNORE? If I use mode insert I always get duplicate errors. Can it really be that this case has not been considered? (MySQL dialect)

    • @rmoff
      @rmoff  2 роки тому

      I'm not aware of this being an option. Does it work if you use `upsert` instead?

  • @shikhachawla7259
    @shikhachawla7259 4 роки тому

    Thanks for sharing it and I am following your videos a lot for learning Kafka, I have tried setting up JDBC sink connector to insert into SQL server with the batch size of 500, but it inserts into SQL server one by one rather than in batches which has a negative impact on SQL Server IO, Is something you can suggest to get the batch insertion working?Will look forward to your response. Thanks

    • @rmoff
      @rmoff  4 роки тому +1

      Hi, the best place to ask this is on:
      → Mailing list: groups.google.com/forum/#!forum/confluent-platform
      or
      → Slack group: cnfl.io/slack

  • @sarathbaiju6040
    @sarathbaiju6040 3 роки тому

    Hi i have question regarding the automatic table creation in the sink connector, how we can define the custom colum name the sink connector configuration.
    for eg: by default the column name is after_userId, after_firstName . i want to change it to UserId and FirstName. how we can do this in connector configuration?

    • @rmoff
      @rmoff  3 роки тому +1

      Hi Sarath, this is a great question to ask over at forum.confluent.io/ :)

    • @sarathbaiju6040
      @sarathbaiju6040 3 роки тому

      @@rmoffthanks for the reply, i got the answer. I use transform (smt) to achieve my requirement

  • @user-ow6jk5ix5q
    @user-ow6jk5ix5q 4 роки тому

    Is there a good way to diagnose why a connector becomes degraded? Is it ok to use a symlink to put the jar under the jdbc drivers folder?

    • @rmoff
      @rmoff  4 роки тому

      1. Diagnosing connector issues - look at the Kafka Connect worker log
      2. symlink - I don't know, try it :)

    • @rmoff
      @rmoff  4 роки тому

      For further help, head to #connect on cnfl.io/slack

  • @thetrilbies1
    @thetrilbies1 2 роки тому

    I tried to replicate the steps in 1:51. When inserting a row into TEST01, I get a column name ROWKEY does not exists.

    • @arunbhat105
      @arunbhat105 Рік тому

      same error for me too. were u able to solve the issue ?

  • @aidangmccarthy
    @aidangmccarthy 4 роки тому

    Hi Robin, Thanks for sharing. Can you use the Protobuff format from 5.5 instead of Avro? Also for MySQL, I notice you are using version 8. Will this also work on MySQL 5.6 or 5.7? Many thanks, Aidan

    • @rmoff
      @rmoff  4 роки тому +1

      Yes as of Confluent Platform 5.5 you can use Avro, Protobuf, or JSON Schema
      It'll work fine with earlier version of mySQL too.

  • @MahmoudShash
    @MahmoudShash 3 роки тому

    hi @Robin just asking if the kafka connect works with Cloudera kafka and if I can use it in Production.

    • @rmoff
      @rmoff  3 роки тому

      Kafka Connect is part of Apache Kafka. It's widely used in Production. I'm not familiar with what Cloudera support; you'd need to check with them.

  • @dmitrysergeev1845
    @dmitrysergeev1845 4 роки тому

    For me it is not working . I have sink-jdbc-mysql-01 | SINK | io.confluent.connect.jdbc.JdbcSinkConnector | WARNING (0/1 tasks RUNNING) . How i can fix ?

    • @rmoff
      @rmoff  4 роки тому

      You need to look at the Kafka Connect worker log to find out the actual error. UA-cam comment threads aren't the easiest place to help debug, so head over to #connect on cnfl.io/slack

  • @abhrasarkar8040
    @abhrasarkar8040 3 роки тому

    Hello sir, I couldn't find my 'mysql-connector-java-8.0.20.jar' in this folder 'confluent-hub-components/confluentinc-kafka-connect-jdbc'

    • @rmoff
      @rmoff  3 роки тому +1

      You need to install it yourself. Have a look at rmoff.dev/fix-jdbc-driver-video.

    • @abhrasarkar8040
      @abhrasarkar8040 3 роки тому

      @@rmoff I've downloaded that jdbc connector and copied into that particular path using this command 'docker cp /home/foldername/Downloads/mysql-connector-java-8.0.22/. kafka-connect:/usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/mysql-connector-java-8.0.22.' i'm not getting any error message but mysql db is not updating and also my connector is showing 'warning 0/1' status. Is there any chat interface where i can have a brief discussion about this with you? I am new to kafka. please help. Thanks in advance.

    • @rmoff
      @rmoff  3 роки тому +1

      ​@@abhrasarkar8040
      The best place to ask any further questions is:
      → Slack group: cnfl.io/slack
      or
      → Mailing list: groups.google.com/forum/#!forum/confluent-platform

    • @abhrasarkar8040
      @abhrasarkar8040 3 роки тому +1

      @@rmoff Thank you so much. I think i found the problem. This link is not working 'cdn.mysql.com/Downloads/Connector-J/mysql-connector-java-8.0.19.tar.gz'. So I'm getting the file from this link 'dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.22.tar.gz'. Is it fine?

    • @abhrasarkar8040
      @abhrasarkar8040 3 роки тому

      @@rmoff
      Hello Robin! I am getting this error
      " java.sql.SQLSyntaxErrorException: Access denied for user 'connect_user'@'%' to database 'demo'".
      Can you please help me out with this?

  • @nirmaladesai6964
    @nirmaladesai6964 3 роки тому

    Do we need any settings in oracle database to start with?

    • @rmoff
      @rmoff  3 роки тому

      You'll need a user with the appropriate permissions to write to the table(s). If you've got any more questions then head over to forum.confluent.io/ :)

  • @vignesh9458
    @vignesh9458 3 роки тому

    @Robin Moffatt Is it possible to store all the kafka data to a single column as a json or AVRO ? can you please help?

    • @rmoff
      @rmoff  3 роки тому

      Do you mean push the complex value of the Kafka message into a single target field in the database? If so, no I don't think this is supported.
      For more questions, head to forum.confluent.io/

    • @vignesh9458
      @vignesh9458 3 роки тому

      @@rmoff exactly Robin. My requirement is to read the Json data from kafka and store it exactly as it is in a single column

    • @rmoff
      @rmoff  3 роки тому +1

      @@vignesh9458 I think you would need to pre-process the topic and embed the message in a field first (since the data has to be serialised with a schema).
      If you would like to post this as a question over at forum.confluent.io/ I can try and answer properly there.

    • @vignesh9458
      @vignesh9458 3 роки тому

      @@rmoff sure I will post there.

  • @talahootube
    @talahootube 4 роки тому

    Great video, many thanks. May i get your document in reference of this? Thx

    • @rmoff
      @rmoff  4 роки тому +1

      You can find the JDBC Sink connector docs here: rmoff.dev/01r

    • @talahootube
      @talahootube 4 роки тому

      @@rmoff ok, thanks ya

  • @prateekshajoshi4539
    @prateekshajoshi4539 2 роки тому

    How to connect Kafka with MySQL database?

    • @rmoff
      @rmoff  2 роки тому

      You're on the right video! The Kafka Connect JDBC Sink enables you to stream data from Kafka to MySQL. If you have more questions do head over to forum.confluent.io/

  • @sumitpratap999
    @sumitpratap999 3 роки тому

    How to use , "pk.mode":"none", using DB sequence?

    • @rmoff
      @rmoff  3 роки тому +1

      @Sumit Can you post full details of your question to forum.confluent.io? I can answer it over there. Thanks!

    • @sumitpratap999
      @sumitpratap999 3 роки тому +1

      @@rmoff Done, "JDBC Sinc connector using DB Sequence for Primary key". Thanks for reply.

  • @ashrafkhaled9790
    @ashrafkhaled9790 4 роки тому

    How to do this with java

    • @rmoff
      @rmoff  4 роки тому +2

      That's the point - you wouldn't. Kafka Connect is the integration API for Apache Kafka, and what you should generally use for getting data in and out of Kafka from systems such as RDBMS. Learn more: rmoff.dev/ljc-kafka-02

  • @antoxin
    @antoxin 4 роки тому

    Very fast English but anyway thanks for informative video.

    • @rmoff
      @rmoff  4 роки тому

      Glad it was useful - sorry if it's not always easy to follow :)

  • @coltennabers634
    @coltennabers634 3 роки тому

    ur keyboard noises are driving me crazy dude fix ur mic

    • @rmoff
      @rmoff  3 роки тому

      Yeah, it's kinda clackity isn't it. I've got a quieter one since I filmed that video :)

  • @robertoojeda3263
    @robertoojeda3263 Рік тому

    Hello Robin! Good Video, spanish : Tienes un video que se use un topico Dead Letter Queue donde guarde varios mensajes con diferentes schemas ,mejor dicho que use las propiedades confluent.value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy y TopicRecordNameStrategy

  • @AashishOla
    @AashishOla 2 роки тому

    I need your help regarding kafka connect jdbc sink performance issue. I have sent you a message on twitter. Looking forward to your response

  • @sanket386
    @sanket386 2 роки тому

    How can i create a schema with payload for nested json data?

  • @AashishOla
    @AashishOla 2 роки тому

    I need your help regarding kafka connect jdbc sink performance issue. I have sent you a message on twitter. Looking forward to your response

    • @rmoff
      @rmoff  2 роки тому +1

      The best place to ask is www.confluent.io/en-gb/community/ask-the-community/