Great video!! Congrats.. You have solved to many doubts that I had about connectors. I will change how my kSQLdb is gonna work right now!! Thanks too much
@@farchanjo Did you see all the docs at ksqldb.io? Is there a specific area you would like more documentation about? I can happily feed this back to the team :)
@@rmoff Sure! I'd like to install kSQLdb without a docker for example. I know is possible to have connector for MongoDB, but I don't know if could be used as sink connector. I have a big infrastructure using MongoDB and sink to it is the best for me. I know kSQLdb works as a cluster, but I did not find too much information about that. I could be a idiot, there are all these information and I did not find it. Thanks for all replies.
@@farchanjo you can use any connector with ksqlDB, and you'll find MongoDB sink connectors on Confluent Hub (hub.confluent.io). Regarding your other questions (installation & clustering) these docs are being worked on and for the moment I'd recommend heading over to #ksql channel on cnfl.io/slack for assistance :-)
Hi Robin, This is an excellent video to understand the concepts. Few Questions though - How to modify the materialized table/stream. eg(source table downstream has changed now we want that column in target table as well ) - Difference between CREATE TABLE WITH & CREATE TABLE AS pattern
Hi Kailash, 1. More recent versions of ksqlDB support `CREATE OR REPLACE` syntax to amend existing objects 2. `CREATE TABLE` creates a table object on an _existing_ Kafka topic. `CREATE TABLE AS SELECT` creates a table object backed by a *new* Kafka topic, populated by the results of an aggregate SQL query.
Hi Robin It's excellent demo for Ksql and JDBC sink So is it possible to send the data from stream to new topic through KSQL If yes can u able to share the ksql command for that 😊😊😊
Excellent video Robin! Do you reckon KSQLDB is prod ready at this stage? If so, how would you recommend deploying it if all you had (obviously my situation) is a self-managed stateless K8s cluster or a given AWS offering?
Yes it's used in production at plenty of places. I would recommend Confluent Cloud which provides it as a managed service :) Or you can run it for yourself if you'd rather.
Yes, you can use the InsertField Single Message Transform See www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-3/ ("Add Metadata to the Apache Kafka Message for Data Lineage")
@@rmoff This product looks amazing, btw. It looks like it can cut out 3-4 microservices with 3-4 streams and help build the data warehouse in one location.
Is it possible for k. connect to detect table changes in a source table that doesn't have a key? I don't control it, otherwise I'd add one. Table is in Oracle
If you have one, you can use a timestamp column to detect changes. Otherwise, you need to (and possibly should anyway) use log-based CDC. To learn more see: - rmoff.dev/ksny19-no-more-silos - www.confluent.io/blog/no-more-silos-how-to-integrate-your-databases-with-apache-kafka-and-cdc - rmoff.dev/oracle-and-kafka
@@RobinNMoffatt to implement outbox pattern from sql within a transaction. So it would mean for example with postgres to use pg_logical_emit_messages function to store the message at the same time as storing an entity into a table. But all this from ksql or flinksql. Does it make sense ?
Hello Sir, How I can create a stream from debizium message? From that stream I will connect mysql sink connector for doing upsert events my table. Thanks is advance!
Use the `CREATE STREAM` command and specify the Debezium topic as the source. There's an example of this in the demo here: rmoff.dev/build-a-streaming-data-pipeline
@@rmoff, I tried with `pk.fields` and provided the column names of the MySQL table that are the composite key. But it threw an error: `The schema type is primitive and should contain only one parameter and not a list`. The sink connector has an AVRO converter for values and String for the key. The topic does not have any ROWKEY value. Should I add any additional parameter while pushing the record from stream to topic? I am able to use record_value without any issues with the composite key.
Very good information...I am also trying to do somewhat similar use case let me explain... The requirement is to perform CDC from source DB2 to target Mysql [Insert/Update/Delete]. We have an existing topic which is sourced from database DB2 table with Avro schema which has both Key and Value. The primary key is combination of three columns. One of these columns have ebcdic values and I am using ksql [custom UDF] to transform the data to readable format. My new topic which is created out the transformed stream has the converted values. When I PRINT the topic I see the column in Value is changes to readable format but not the column in the Key. Below snippet where Col2 is having ebcdic characters and I am transforming using custom udf in ksql. Key:{col1:'09-22-2020', col2:'#$%',col3:'123'} Value:{col1:'09-22-2020',col2:'999',col3:'123',col4:'john',col5:'NY'} In my jdbc sink connector I am using below properties and I can able to do INSERT and UPDATE insert.mode='upsert' pk.mode='record_value' pk.fields='col1,col2,col3' delete.enabled='false' I also need to perform DELETE and if change the config to below...the Col2 value it is taking from Key part which is the original [Not Converted] value. insert.mode='upsert' pk.mode='record_key' pk.fields='col1,col2,col3' delete.enabled='true' Can you please help me to resolve this issue?
Hello, Can you please help me how to use mysql_clear_password plugin in JDBC Sink connector? I am using JDBC Sink Connector for performing CDC on mysql DB and we need to use Active Directory user and password.
Looking at the docs for mysql_clear_password it sounds like this is something that the client needs to send - and thus something that the connector would need to be able to handle. I don't see anything in its docs, so it suggests that it doesn't support it. You could raise an issue at github.com/confluentinc/kafka-connect-jdbc/issues.
Great video!! Congrats.. You have solved to many doubts that I had about connectors. I will change how my kSQLdb is gonna work right now!! Thanks too much
Thanks, glad it helped :)
@@rmoff Videos like that is very nice.. kSQLdb is very power full tool, but there are few docs about it. Thanks again!!
@@farchanjo Did you see all the docs at ksqldb.io? Is there a specific area you would like more documentation about? I can happily feed this back to the team :)
@@rmoff Sure! I'd like to install kSQLdb without a docker for example. I know is possible to have connector for MongoDB, but I don't know if could be used as sink connector. I have a big infrastructure using MongoDB and sink to it is the best for me. I know kSQLdb works as a cluster, but I did not find too much information about that. I could be a idiot, there are all these information and I did not find it. Thanks for all replies.
@@farchanjo you can use any connector with ksqlDB, and you'll find MongoDB sink connectors on Confluent Hub (hub.confluent.io).
Regarding your other questions (installation & clustering) these docs are being worked on and for the moment I'd recommend heading over to #ksql channel on cnfl.io/slack for assistance :-)
Hi Robin, This is an excellent video to understand the concepts. Few Questions though
- How to modify the materialized table/stream. eg(source table downstream has changed now we want that column in target table as well )
- Difference between CREATE TABLE WITH & CREATE TABLE AS pattern
Hi Kailash,
1. More recent versions of ksqlDB support `CREATE OR REPLACE` syntax to amend existing objects
2. `CREATE TABLE` creates a table object on an _existing_ Kafka topic. `CREATE TABLE AS SELECT` creates a table object backed by a *new* Kafka topic, populated by the results of an aggregate SQL query.
Hi Robin
It's excellent demo for Ksql and JDBC sink
So is it possible to send the data from stream to new topic through KSQL
If yes can u able to share the ksql command for that 😊😊😊
Hi Vishnu, when you `CREATE STREAM new_stream AS SELECT …` it writes data to the stream, which is backed by a new Kafka topic. Hope that makes sense.
Excellent video Robin! Do you reckon KSQLDB is prod ready at this stage? If so, how would you recommend deploying it if all you had (obviously my situation) is a self-managed stateless K8s cluster or a given AWS offering?
Yes it's used in production at plenty of places. I would recommend Confluent Cloud which provides it as a managed service :) Or you can run it for yourself if you'd rather.
Hello I have one more question...
Is it possible to add additional columns (audit columns) in the JDBC Sink Connector which are not part of the topic?
Yes, you can use the InsertField Single Message Transform
See www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-3/ ("Add Metadata to the Apache Kafka Message for Data Lineage")
Hey, great video! Can you recommend a link to help how to install ksqldb locally? something that doesn't need a complicated zookeeper setup?
@@rmoff This product looks amazing, btw. It looks like it can cut out 3-4 microservices with 3-4 streams and help build the data warehouse in one location.
Is it possible for k. connect to detect table changes in a source table that doesn't have a key? I don't control it, otherwise I'd add one. Table is in Oracle
If you have one, you can use a timestamp column to detect changes. Otherwise, you need to (and possibly should anyway) use log-based CDC.
To learn more see:
- rmoff.dev/ksny19-no-more-silos
- www.confluent.io/blog/no-more-silos-how-to-integrate-your-databases-with-apache-kafka-and-cdc
- rmoff.dev/oracle-and-kafka
Hello, does anyone know if it is possible to write to the WAL of a postgres database from ksql ?
@@RobinNMoffatt to implement outbox pattern from sql within a transaction. So it would mean for example with postgres to use pg_logical_emit_messages function to store the message at the same time as storing an entity into a table. But all this from ksql or flinksql. Does it make sense ?
Hello Sir, How I can create a stream from debizium message? From that stream I will connect mysql sink connector for doing upsert events my table. Thanks is advance!
Use the `CREATE STREAM` command and specify the Debezium topic as the source. There's an example of this in the demo here: rmoff.dev/build-a-streaming-data-pipeline
In order to use record_key is there any additional parameters that we have to set as the fields are taken from record key?
@@rmoff, I tried with `pk.fields` and provided the column names of the MySQL table that are the composite key. But it threw an error: `The schema type is primitive and should contain only one parameter and not a list`. The sink connector has an AVRO converter for values and String for the key. The topic does not have any ROWKEY value. Should I add any additional parameter while pushing the record from stream to topic? I am able to use record_value without any issues with the composite key.
Very good information...I am also trying to do somewhat similar use case let me explain...
The requirement is to perform CDC from source DB2 to target Mysql [Insert/Update/Delete]. We have an existing topic which is sourced from database DB2 table with Avro schema which has both Key and Value. The primary key is combination of three columns. One of these columns have ebcdic values and I am using ksql [custom UDF] to transform the data to readable format. My new topic which is created out the transformed stream has the converted values. When I PRINT the topic I see the column in Value is changes to readable format but not the column in the Key. Below snippet where Col2 is having ebcdic characters and I am transforming using custom udf in ksql.
Key:{col1:'09-22-2020', col2:'#$%',col3:'123'} Value:{col1:'09-22-2020',col2:'999',col3:'123',col4:'john',col5:'NY'}
In my jdbc sink connector I am using below properties and I can able to do INSERT and UPDATE
insert.mode='upsert'
pk.mode='record_value'
pk.fields='col1,col2,col3'
delete.enabled='false'
I also need to perform DELETE and if change the config to below...the Col2 value it is taking from Key part which is the original [Not Converted] value.
insert.mode='upsert'
pk.mode='record_key'
pk.fields='col1,col2,col3'
delete.enabled='true'
Can you please help me to resolve this issue?
I've replied to your cross-post on cnfl.io/slack :)
Hello,
Can you please help me how to use mysql_clear_password plugin in JDBC Sink connector?
I am using JDBC Sink Connector for performing CDC on mysql DB and we need to use Active Directory user and password.
Looking at the docs for mysql_clear_password it sounds like this is something that the client needs to send - and thus something that the connector would need to be able to handle. I don't see anything in its docs, so it suggests that it doesn't support it. You could raise an issue at github.com/confluentinc/kafka-connect-jdbc/issues.