最新消息:Welcome to the puzzle paradise for programmers! Here, a well-designed puzzle awaits you. From code logic puzzles to algorithmic challenges, each level is closely centered on the programmer's expertise and skills. Whether you're a novice programmer or an experienced tech guru, you'll find your own challenges on this site. In the process of solving puzzles, you can not only exercise your thinking skills, but also deepen your understanding and application of programming knowledge. Come to start this puzzle journey full of wisdom and challenges, with many programmers to compete with each other and show your programming wisdom! Translated with DeepL.com (free version)

sql - Are there any techniques to solve skew data in databricks? - Stack Overflow

matteradmin6PV0评论

I created skewed data to test a salting approach and tried three different solutions, but none achieved the desired results with a significant runtime improvement. Can you guide me on the best approach to solve this problem effectively?

import pyspark.sql.functions as F
df1 = spark.range(300_000_000).withColumn('value',F.when(F.rand() < 0.6,1).otherwise((F.rand() * 100).cast('int)).drip('id')
df2 = spark.range(200_000_000).withColumn('value',F.when(F.rand() < 0.2,4).otherwise((F.rand() * 100).cast('int)).drip('id')
final_df = df1.join(df2,on='value',how='inner')
finaldf.write.format('parquet).save(path)

Process 1

Enabled AQE and other settings like skewjoin,enabled=true,coalescePartitions.enabled=True and shuffle.partition.=auto

It was running for more than 20 minutes, and I cancelled the job manually.

Process 2

Salting techniques. Disabled AQE and added shuffle partition size = 1000

df1 = (df1.withColumn('salt_numbers',F.expr('sequence(0,1)'))
   .withColum('salt',F.explode('salt_numbers'))
   .drop('salt_numbers))
df2 = (df2.withColumn('salt_numbers',F.expr('sequence(0,3)'))
   .withColum('salt',F.explode('salt_numbers'))
   .drop('salt_numbers))

Now both datasets have equal size (600_000_00) and added joining column - salt. on=['value','salt'] . It ran more that 30min and cancelled the job manually.

Process 3

Salting techniques: Enabled AQE

df1 = (df1.withColumn('salt_numbers',F.expr('sequence(0,1)'))
   .withColum('salt',F.explode('salt_numbers'))
   .drop('salt_numbers))
df2 = (df2.withColumn('salt_numbers',F.expr('sequence(0,3)'))
   .withColum('salt',F.explode('salt_numbers'))
   .drop('salt_numbers))

Now both datasets have equal size (600_000_00) and added joining column - salt. on=['value','salt'] . also took similar 30 min and manually cancelled the job.

Note: Databricks cluster size is 32 gb 4 cores and 8 workers. Please share your ideas on how efficiently we can run the job.

Post a comment

comment list (0)

  1. No comments so far