ksqlDB and the Kafka Connect JDBC Sink

Поділитися
Вставка
  • Опубліковано 6 лис 2024

КОМЕНТАРІ • 31

  • @farchanjo
    @farchanjo 4 роки тому +1

    Great video!! Congrats.. You have solved to many doubts that I had about connectors. I will change how my kSQLdb is gonna work right now!! Thanks too much

    • @rmoff
      @rmoff  4 роки тому +1

      Thanks, glad it helped :)

    • @farchanjo
      @farchanjo 4 роки тому

      @@rmoff Videos like that is very nice.. kSQLdb is very power full tool, but there are few docs about it. Thanks again!!

    • @rmoff
      @rmoff  4 роки тому +1

      @@farchanjo Did you see all the docs at ksqldb.io? Is there a specific area you would like more documentation about? I can happily feed this back to the team :)

    • @farchanjo
      @farchanjo 4 роки тому

      @@rmoff Sure! I'd like to install kSQLdb without a docker for example. I know is possible to have connector for MongoDB, but I don't know if could be used as sink connector. I have a big infrastructure using MongoDB and sink to it is the best for me. I know kSQLdb works as a cluster, but I did not find too much information about that. I could be a idiot, there are all these information and I did not find it. Thanks for all replies.

    • @rmoff
      @rmoff  4 роки тому

      @@farchanjo you can use any connector with ksqlDB, and you'll find MongoDB sink connectors on Confluent Hub (hub.confluent.io).
      Regarding your other questions (installation & clustering) these docs are being worked on and for the moment I'd recommend heading over to #ksql channel on cnfl.io/slack for assistance :-)

  • @kailashyogeshwar8492
    @kailashyogeshwar8492 3 роки тому

    Hi Robin, This is an excellent video to understand the concepts. Few Questions though
    - How to modify the materialized table/stream. eg(source table downstream has changed now we want that column in target table as well )
    - Difference between CREATE TABLE WITH & CREATE TABLE AS pattern

    • @rmoff
      @rmoff  3 роки тому +1

      Hi Kailash,
      1. More recent versions of ksqlDB support `CREATE OR REPLACE` syntax to amend existing objects
      2. `CREATE TABLE` creates a table object on an _existing_ Kafka topic. `CREATE TABLE AS SELECT` creates a table object backed by a *new* Kafka topic, populated by the results of an aggregate SQL query.

  • @vishnumurali522
    @vishnumurali522 4 роки тому

    Hi Robin
    It's excellent demo for Ksql and JDBC sink
    So is it possible to send the data from stream to new topic through KSQL
    If yes can u able to share the ksql command for that 😊😊😊

    • @rmoff
      @rmoff  4 роки тому

      Hi Vishnu, when you `CREATE STREAM new_stream AS SELECT …` it writes data to the stream, which is backed by a new Kafka topic. Hope that makes sense.

  • @cristianperez2011
    @cristianperez2011 4 роки тому

    Excellent video Robin! Do you reckon KSQLDB is prod ready at this stage? If so, how would you recommend deploying it if all you had (obviously my situation) is a self-managed stateless K8s cluster or a given AWS offering?

    • @rmoff
      @rmoff  4 роки тому

      Yes it's used in production at plenty of places. I would recommend Confluent Cloud which provides it as a managed service :) Or you can run it for yourself if you'd rather.

  • @krishnachaitanyapulagam2769
    @krishnachaitanyapulagam2769 4 роки тому

    Hello I have one more question...
    Is it possible to add additional columns (audit columns) in the JDBC Sink Connector which are not part of the topic?

    • @rmoff
      @rmoff  4 роки тому

      Yes, you can use the InsertField Single Message Transform
      See www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-3/ ("Add Metadata to the Apache Kafka Message for Data Lineage")

  • @JonathanLevinTKY
    @JonathanLevinTKY 4 роки тому

    Hey, great video! Can you recommend a link to help how to install ksqldb locally? something that doesn't need a complicated zookeeper setup?

    • @JonathanLevinTKY
      @JonathanLevinTKY 4 роки тому

      @@rmoff This product looks amazing, btw. It looks like it can cut out 3-4 microservices with 3-4 streams and help build the data warehouse in one location.

  • @benpracht2655
    @benpracht2655 4 роки тому

    Is it possible for k. connect to detect table changes in a source table that doesn't have a key? I don't control it, otherwise I'd add one. Table is in Oracle

    • @rmoff
      @rmoff  4 роки тому

      If you have one, you can use a timestamp column to detect changes. Otherwise, you need to (and possibly should anyway) use log-based CDC.
      To learn more see:
      - rmoff.dev/ksny19-no-more-silos
      - www.confluent.io/blog/no-more-silos-how-to-integrate-your-databases-with-apache-kafka-and-cdc
      - rmoff.dev/oracle-and-kafka

  • @emmanuelharel
    @emmanuelharel Рік тому

    Hello, does anyone know if it is possible to write to the WAL of a postgres database from ksql ?

    • @emmanuelharel
      @emmanuelharel Рік тому

      @@RobinNMoffatt to implement outbox pattern from sql within a transaction. So it would mean for example with postgres to use pg_logical_emit_messages function to store the message at the same time as storing an entity into a table. But all this from ksql or flinksql. Does it make sense ?

  • @abhrasarkar8040
    @abhrasarkar8040 3 роки тому

    Hello Sir, How I can create a stream from debizium message? From that stream I will connect mysql sink connector for doing upsert events my table. Thanks is advance!

    • @rmoff
      @rmoff  3 роки тому

      Use the `CREATE STREAM` command and specify the Debezium topic as the source. There's an example of this in the demo here: rmoff.dev/build-a-streaming-data-pipeline

  • @SanjayNayak-sc6ys
    @SanjayNayak-sc6ys 4 роки тому

    In order to use record_key is there any additional parameters that we have to set as the fields are taken from record key?

    • @SanjayNayak-sc6ys
      @SanjayNayak-sc6ys 4 роки тому

      @@rmoff, I tried with `pk.fields` and provided the column names of the MySQL table that are the composite key. But it threw an error: `The schema type is primitive and should contain only one parameter and not a list`. The sink connector has an AVRO converter for values and String for the key. The topic does not have any ROWKEY value. Should I add any additional parameter while pushing the record from stream to topic? I am able to use record_value without any issues with the composite key.

  • @krishnachaitanyapulagam2769
    @krishnachaitanyapulagam2769 4 роки тому

    Very good information...I am also trying to do somewhat similar use case let me explain...
    The requirement is to perform CDC from source DB2 to target Mysql [Insert/Update/Delete]. We have an existing topic which is sourced from database DB2 table with Avro schema which has both Key and Value. The primary key is combination of three columns. One of these columns have ebcdic values and I am using ksql [custom UDF] to transform the data to readable format. My new topic which is created out the transformed stream has the converted values. When I PRINT the topic I see the column in Value is changes to readable format but not the column in the Key. Below snippet where Col2 is having ebcdic characters and I am transforming using custom udf in ksql.
    Key:{col1:'09-22-2020', col2:'#$%',col3:'123'} Value:{col1:'09-22-2020',col2:'999',col3:'123',col4:'john',col5:'NY'}
    In my jdbc sink connector I am using below properties and I can able to do INSERT and UPDATE
    insert.mode='upsert'
    pk.mode='record_value'
    pk.fields='col1,col2,col3'
    delete.enabled='false'
    I also need to perform DELETE and if change the config to below...the Col2 value it is taking from Key part which is the original [Not Converted] value.
    insert.mode='upsert'
    pk.mode='record_key'
    pk.fields='col1,col2,col3'
    delete.enabled='true'
    Can you please help me to resolve this issue?

    • @rmoff
      @rmoff  4 роки тому

      I've replied to your cross-post on cnfl.io/slack :)

  • @krishnachaitanyapulagam2769
    @krishnachaitanyapulagam2769 4 роки тому

    Hello,
    Can you please help me how to use mysql_clear_password plugin in JDBC Sink connector?
    I am using JDBC Sink Connector for performing CDC on mysql DB and we need to use Active Directory user and password.

    • @rmoff
      @rmoff  4 роки тому +1

      Looking at the docs for mysql_clear_password it sounds like this is something that the client needs to send - and thus something that the connector would need to be able to handle. I don't see anything in its docs, so it suggests that it doesn't support it. You could raise an issue at github.com/confluentinc/kafka-connect-jdbc/issues.