Це відео не доступне.
Перепрошуємо.

Advancing Spark - Identity Columns in Delta

Поділитися
Вставка
  • Опубліковано 14 сер 2024
  • A classic challenge in Data Warehousing is getting your surrogate key patterns right - but without the same tooling, how do we achieve it in a Lakehouse Environment? We've had several patterns in the past, each with their drawbacks, but now we've got a brand new IDENTITY column type... so how does it size up?
    In this video Simon does a quick recap of the existing surrogate key methods within Spark-based ETL processes, before looking through the new Delta Identity functionality!
    As always, if you're beginning your lakehouse journey, or need an expert eye to guide you on your way, you can always get in touch with Advancing Analytics.
    00:00 - Hello
    01:37 - Existing Key Methods
    10:36 - New Identity Functionality
    15:18 - Testing a larger insert

КОМЕНТАРІ • 39

  • @JulesTuffrey
    @JulesTuffrey 2 роки тому +1

    Thats a great addition to the runtime. Thank you for the awesome vid!

  • @WastedFury
    @WastedFury 2 роки тому +2

    'There's no Will' 😂 😂 😂
    Nicely done!

  • @NoahPitts713
    @NoahPitts713 13 годин тому

    Now that IDENTITY columns have been out for 2-ish years, how did they perform for you after you got a chance to put them thru the tests/paces?

  • @MrMikereeve89
    @MrMikereeve89 2 роки тому

    I finally find a good use case for the identity function in Databricks! Typically we use hash keys so we can parallelise our jobs but I needed to create a unique identifier for an XML output, which was limited to a maximum 30 character string. Our natural keys were all GUIDs and our hash keys were also too long - delta identity to the rescue! Now we have a nice little mapping table from our natural keys to a bigint identity which we use in our XML output :D

  • @user-is3gq7sp1h
    @user-is3gq7sp1h 2 роки тому +2

    Delta tables with Identity columns loose the ability to be inserted by several processes simultaneously. Only one process will insert data, others will get MetadataChangedException. Are there any workarounds?

  • @KristofDeMiddelaer
    @KristofDeMiddelaer 2 роки тому +4

    Any idea if this is coming to Azure Synapse soon?

  • @89seanp
    @89seanp 2 роки тому +2

    Some interesting ways to handle ids in the video. I used the monotonically_increasing_id for a while but have moved to using zipWuthUniqueId():
    new_schema = StructType([StructField(colName, LongType(), True)] + df.schema.fields)
    zipped_rdd = df.rdd.zipWithUniqueId()
    new_rdd = zipped_rdd.map(lambda row: ([row[1] + offset] + list(row[0])))
    spark.createDataFrame(new_rdd, new_schema)

    • @zycbrasil2618
      @zycbrasil2618 2 роки тому

      Falling back to rdds and then to dataframe can be quite expensive

  • @YogeshShivakumar
    @YogeshShivakumar 2 місяці тому

    Can this be used without Databricks as well? or Can I use Spark SQL to utilize identity column feature without Databricks and with using DeltaLake?
    Please tell.

  • @YogeshShivakumar
    @YogeshShivakumar 2 місяці тому

    Is this only in Databricks , I can utilize this feature?, or can I use Spark SQL without using Databricks and make use of identity column feature using Delta Lake ?
    Please tell.

  • @alexischicoine2072
    @alexischicoine2072 2 роки тому

    Nice video Simon. You can create the delta table with the python library called delta it works pretty well. You can change the table location here too if you don't want it where the database default is which will make it an external table.

  • @rajkumarv5791
    @rajkumarv5791 2 роки тому

    Good one!! Will save a lot of coding + execution time :)

  • @joyo2122
    @joyo2122 2 роки тому

    I‘m gonna try it

  • @zycbrasil2618
    @zycbrasil2618 2 роки тому +2

    Hi Simon. I see that new feature works only with Spark SQL. Any similar approach using dataframe api? I don't want to create a table and use it to load a dataframe..

    • @AdvancingAnalytics
      @AdvancingAnalytics  2 роки тому +1

      Not that I know of - we usually have a small function to create the table if it doesn't exist already, before we merge into the table!

    • @zycbrasil2618
      @zycbrasil2618 2 роки тому

      @@AdvancingAnalytics Same here. Tks

  • @surenderraja1304
    @surenderraja1304 Рік тому

    Very nice, Does IDENTITY auto generates number if id do df.write.() instaed on INSERT statement ?

  • @jefflingen8819
    @jefflingen8819 2 роки тому

    Any known issues with merges? .whenNotMatchedInsertAll() Getting error: AnalysisException: cannot resolve Key in UPDATE clause given columns src.Id....

  • @rakeshreddybadam6173
    @rakeshreddybadam6173 2 роки тому

    Its a great video. I have a small question can we add the identity column by altering the table. I have tried fe different ways but not working: 'Alter table db.student add column student_id bigint GENERATED ALWAYS AS IDENTITY (start with 100 increment by 1 )'

  • @flammenman5593
    @flammenman5593 Рік тому

    I´m looking for a way now to read from the metadata these details that were set for the identity. If I start at 100 and increase by 10 e.g. that must have been defined at the creation of the table and must be stored somewhere in the metadata. But how can I get to that information. I already tried with information_shema.columns but for some reason (not runtime) it does not work in my database. It doesnt recognize the information_schema function. Is there any other way to get this info from the metadata? Maybe in python or scala?
    Please let me know.
    Otherwise great Video. I quite enjoy your style of explaining.

  • @guilleromero3762
    @guilleromero3762 2 роки тому

    Is this new feature only available for delta tables or we can use the identity option in parquet tables? Reading the documentation I think you can use it for several kinds of tables, thanks for your super useful videos! I'm a big fan!

    • @AdvancingAnalytics
      @AdvancingAnalytics  2 роки тому +1

      Only Delta I'm afraid! There's a note in the create table docs identity section saying "this functionality only available in Delta lake". The 10.4 release notes are so pretty specific about it being a Delta release, sorry!

  • @sval4020
    @sval4020 2 роки тому

    Anyone knows what kind of function/logic sits behind the GENERATED ALWAYS AS IDENTITY .... ? Is it still doing windowing/sorting or is it hashing? I am not quite sure if this is mentioned somewhere in the docs so far, so wondering what it might be doing under the hood.

    • @AdvancingAnalytics
      @AdvancingAnalytics  2 роки тому

      Haven't dug into the execution plans yet, definitely going to have a dig and see how it changes. I did see a new generateidentifiers() function (or something like that!) in the explain steps, just haven't seen what impact that has on shuffles etc.

  • @Kinnoshachi
    @Kinnoshachi 2 роки тому

    Noiice!

  • @kyrilluk
    @kyrilluk 2 роки тому

    Hi Simon, regarding this issue (ua-cam.com/video/Gnn54rp5RWM/v-deo.html), I encounter it every time I'm using a temp view. So that might explain why your second query didn't have anything been skipped.

  • @andrewfogarty1447
    @andrewfogarty1447 2 роки тому

    Is there no way to reset the identity column? Say, 6 months from now, our Identity column has value 10 billion?

    • @AdvancingAnalytics
      @AdvancingAnalytics  2 роки тому

      I've not seen any syntax for resetting the seed/increment, no. That's not to say there aren't ways you can do it, but I've not seen any direct table update commands for it!

  • @alexischicoine2072
    @alexischicoine2072 2 роки тому

    What if you did a second pass and ranked the monotonic ids.

    • @zycbrasil2618
      @zycbrasil2618 2 роки тому

      Monotically_increasing_id is distributed. Row_number() using Window function without partitionBy is not distributed. When we don't define partitionBy, all the data are sent to one executor for generating row number. This can cause performance and memory issues.

  • @gamachu2000
    @gamachu2000 2 роки тому

    Does this work with dlt

    • @AdvancingAnalytics
      @AdvancingAnalytics  2 роки тому

      Gooooood question, I'd assume not as DLT assumes it will create and manage the table for you, so you wouldn't have the column definition supplied? Might be able to insert into a static (IE: non-dlt) table as a final step, but that misses some of the dlt managed pipeline point!

  • @22seb22
    @22seb22 2 роки тому

    3:47 "there's no Will" lol and they say Data Engineers are boring

  • @m1nkeh
    @m1nkeh 2 роки тому

    Chapters? 😆

    • @AdvancingAnalytics
      @AdvancingAnalytics  2 роки тому +2

      Urgh, fiiiiine. Added some chapters - but that meant I had to actually watch my own video! I'll take a looksie if there's a way we can encourage the auto-chapters to work better, or let people add their own in the comments!

    • @m1nkeh
      @m1nkeh 2 роки тому

      @@AdvancingAnalytics haha, only messing about.. probs only of most use on longer vids anyway. keep up the good work dude! 👍

  • @zycbrasil2618
    @zycbrasil2618 2 роки тому +1

    My approach to avoid those big unique ids.. val df_base_row_id = df_base.withColumn("row_id", monotonically_increasing_id())
    val df_base_row_id_partition_offset = df_base_row_id.withColumn("partition_id", shiftright('row_id,33))
    .withColumn("row_offset", 'row_id.bitwiseAND(2147483647))
    val partitions_size = df_base_row_id_partition_offset.groupBy("partition_id")
    .count()
    .withColumnRenamed("count", "partition_size")
    val windowSpec = Window.orderBy("partition_id")
    .rowsBetween(Window.unboundedPreceding, -1)
    val partitions_offset = partitions_size.withColumn("partition_offset",
    when(expr("partition_id = 0"), lit(0))
    .otherwise(sum("partition_size").over(windowSpec)))
    val df_inc_rownum = df_base_row_id_partition_offset.join(broadcast(partitions_offset), "partition_id")
    .withColumn("row_num", 'partition_offset+'row_offset+1)