12 Spark Streaming Writing data to Multiple Sinks | foreachBatch | Writing data to JDBC(Postgres)

Поділитися
Вставка
  • Опубліковано 29 січ 2025

КОМЕНТАРІ • 46

  • @jbh7079
    @jbh7079 2 місяці тому

    Hello, Thanks for sharing this content. Like most people in the comment i am also facing the driver issue with postgres. I wish there was a separate video on this as many of us i stuck at this point.

    • @easewithdata
      @easewithdata  2 місяці тому

      This is because both jupyter and postgres are running in different containers on docker.
      Follow these steps to connect them over a network:
      docker network create my-network
      docker network connect my-network
      docker network connect my-network

  • @hamedtamadon6520
    @hamedtamadon6520 7 місяців тому

    Hello, and thanks for the sharing of these useful videos.
    How to handle the writing in delta tables:
    Because the best practice is that the size of each parquet file should be between 128 MB to 1 GB.
    How to handle this situation while each batch has very less than the size that is mentioned?
    or how to handle to collect the number of batches and to reach the mentioned size and finally to write in deltalake.

    • @easewithdata
      @easewithdata  7 місяців тому +1

      Usually microbatch execution in Spark can write multiple small files. This requires a later stage to read all those files and write a compacted file (say for each day) of bigger size to avoid small file issue. You can use this compacted file to read data in your downstream systems.

  • @ye__in
    @ye__in 8 місяців тому

    Thank you so much for the Streaming Lecture! While Studying, I got some question when using readStream from kafka. Do I always have to pass the schema for the streaming data? Even if the number of tables coming from the source increases to more than 100, do I need to define all schemas for each table and include them in the code? Is there any way to automatically define the schema of incoming tables?

    • @easewithdata
      @easewithdata  8 місяців тому

      Thanks, please share with your network 🛜
      Answer to second part of your question - You can set following conf in order to read schema in run time
      spark.sql.streaming.schemaInference to true

  • @priyachaturvedi1164
    @priyachaturvedi1164 7 місяців тому +1

    How do pyspark handles consumer group and consumer of kafka , generally by converting consumer group we can start consuming data from starting of topic , how to start consuming from beginning, startingoffset as earliest , will always read from starting .

    • @easewithdata
      @easewithdata  7 місяців тому

      Spark Streaming utilizes checkpoint directory to handle offsets, so that it doesn't consume the offset which is already consumed. You can find more details to set the starting offset with Kafka at this link - spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

  • @shubhamgupta9375
    @shubhamgupta9375 8 місяців тому

    What will happen if postgres is down, data will still be written to parquet as that is on hdfs and checkpoint will still have the ack done for those records.will be having data loss in that case in the postgres. If yes how can we recover. Some fault tolerance video on this will be highly appreciated.
    Edited: I just the next video glad to see that, still I have one more question is there a way to make the error recovery auto handled.

    • @easewithdata
      @easewithdata  8 місяців тому

      Hello Shubham,
      Hope the next video explained the first part of your doubt.
      Coming to the next part, yes you can have a weekly of daily batch job to look for error records and reprocess those.
      If you like the content, please make sure to share with your network over LinkedIn.

  • @babuganesh2000
    @babuganesh2000 11 місяців тому

    Would you be able to start a play list on Databricks, Unity Catalog, Delta Live Tables. I am sure for the understanding purpose you are using docker. But in real time it will be really helpful if you can create a play list just with Databricks with either Azure/AWS integration is fine. Is that something in your wish list or to do ?

    • @easewithdata
      @easewithdata  10 місяців тому

      Hello,
      You can lift and shift the code the I am teaching on Jupyter to get started with Databricks. Currently its on Docker so that anyone can setup in local and get started.
      And yes, I am planning to start a course on complete integration with Azure. Its in my wishlist.

  • @deepanshuaggarwal7042
    @deepanshuaggarwal7042 7 місяців тому

    Hi, I have a doubt: How can we check if a stream has multiple sink from spark UI?

    • @easewithdata
      @easewithdata  7 місяців тому +1

      Allow me sometime to search the exact screenshot for you.

  • @atonxment2868
    @atonxment2868 6 місяців тому

    Got the "scram authentication is not supported by this driver" error while trying to connect to postgres. This is driving me nuts.

    • @easewithdata
      @easewithdata  6 місяців тому

      Please make sure to use the correct driver version for the postgres you are using

    • @atonxment2868
      @atonxment2868 6 місяців тому +1

      ​@@easewithdata I solved this by setting up the Postgres and the Jupyter all with the same compose file. Before I was using a docker network to connect the two, didn't work no matter what. Everything breaks after I removed the network group so I tried setting it up again.

  • @shivakant4698
    @shivakant4698 8 місяців тому

    A separate video for connecting postgres from cluster pyspark how?

    • @easewithdata
      @easewithdata  8 місяців тому

      Hello,
      Set up postgres from docker hub using this command (docker compose up) after you download the following compose file
      github.com/subhamkharwal/docker-images/tree/master/postgres
      Once postgres is up, You can use docker network bridge to communicate between cluster and postgres container. checkout this example
      www.tutorialworks.com/container-networking/

  • @RahulGhosh-yl7hl
    @RahulGhosh-yl7hl 8 місяців тому

    Hi, Thanks very much for the video. While doing the implementation I am stuck at this error: java.lang.ClassNotFoundException: org.postgresql.Driver
    I have tried to add manually the postgres driver still it is not working. I have added the exact jar file as well in the specified location /home/jovyan/.ivy2/jars/org.postgresql_postgresql-42.2.20.jar still I am getting ClassNotFoundException while executing the writestream part.
    Please help.

    • @easewithdata
      @easewithdata  7 місяців тому

      Please download the driver and keep it at the mentioned loaction

    • @MuzicForSoul
      @MuzicForSoul 5 місяців тому +1

      were you able to resolve this issue? I am stuck at same place, can you please share the solution

  • @ryan7ait
    @ryan7ait 11 місяців тому

    Thank you !

  • @luonghuy7154
    @luonghuy7154 5 місяців тому

    Hi! I have problem when i do a ReadStream. i use load() and then this appears: AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide". please help me thank you ^^

    • @easewithdata
      @easewithdata  5 місяців тому

      MKe sure the Kafka library is loaded in the Spark Session before you use it in code.

  • @gagansingh3481
    @gagansingh3481 27 днів тому

    But how in industry the data will be put in kafka

    • @easewithdata
      @easewithdata  26 днів тому

      Kafka allows you to work over real time, you can use api, python code, java sdk etc to publish data over Kafka

  • @shivakant4698
    @shivakant4698 8 місяців тому

    please demo to create table there how can be done not being done?

    • @easewithdata
      @easewithdata  8 місяців тому

      its just create table command with the columns shown in the table

  • @zheyan4704
    @zheyan4704 9 місяців тому

    I keep having ERROR : ERROR MicroBatchExecution: Query [id = 570602f6-fc8e-41b5-b4b1-cc7a7c894a98, runId = e084fabe-f1b7-44e1-a274-b3c4be0959b9] terminated with error
    py4j.Py4JException: Error while obtaining a new communication channel. Any chance you knew why ?

    • @easewithdata
      @easewithdata  9 місяців тому

      Hello,
      Please kill all queries running and restart the application with new checkpoint

    • @zheyan4704
      @zheyan4704 9 місяців тому +1

      @@easewithdata hi thx for the reply! i got it resolved after switching from PySpark to Scala

  • @ryan7ait
    @ryan7ait 11 місяців тому

    can u give me the code to write into cassandra?

  • @shivakant4698
    @shivakant4698 8 місяців тому

    Please step by step give demo to establish connection with postgresq it is not being done. please

    • @easewithdata
      @easewithdata  8 місяців тому

      You can install postgres in your local machine and use it for the same examples

  • @shivakant4698
    @shivakant4698 8 місяців тому

    Any one from community give me answar.

    • @easewithdata
      @easewithdata  8 місяців тому

      Hello,
      Set up postgres from docker hub using this command (docker compose up) after you download the following compose file
      github.com/subhamkharwal/docker-images/tree/master/postgres
      Once postgres is up, You can use docker network bridge to communicate between cluster and postgres container. checkout this example
      www.tutorialworks.com/container-networking/

  • @satyasaivarunhanumanthu6370
    @satyasaivarunhanumanthu6370 4 місяці тому

    Hi, great content, I try to install postgres docker image which is available in the git repo provided by you it installed successful, but now it is asking email and password (I try to signup still is not working) please provide email id and password to login to the postgres

    • @easewithdata
      @easewithdata  4 місяці тому

      username: sqlpad
      password: sqlpad
      If you like my content, Please make sure to share with your network over LinkedIn 👍

  • @shivakant4698
    @shivakant4698 8 місяців тому

    py4j.protocol.Py4JJavaError: An error occurred while calling o185.save.
    : java.lang.ClassNotFoundException: org.postgresql.Driver

    • @easewithdata
      @easewithdata  8 місяців тому

      Hello,
      Set up postgres from docker hub using this command (docker compose up) after you download the following compose file
      github.com/subhamkharwal/docker-images/tree/master/postgres
      Once postgres is up, You can use docker network bridge to communicate between cluster and postgres container. checkout this example
      www.tutorialworks.com/container-networking/

    • @manojk1494
      @manojk1494 Місяць тому

      @@easewithdata localhost:3000/signup.
      unable to login to sqlpad on above link. User/password incorrect.