最新消息:Welcome to the puzzle paradise for programmers! Here, a well-designed puzzle awaits you. From code logic puzzles to algorithmic challenges, each level is closely centered on the programmer's expertise and skills. Whether you're a novice programmer or an experienced tech guru, you'll find your own challenges on this site. In the process of solving puzzles, you can not only exercise your thinking skills, but also deepen your understanding and application of programming knowledge. Come to start this puzzle journey full of wisdom and challenges, with many programmers to compete with each other and show your programming wisdom! Translated with DeepL.com (free version)

scala - Breaking up a large JDBC write with Spark - Stack Overflow

matteradmin5PV0评论

We want to copy a large Spark dataframe into Oracle, but I am finding the tuning options a bit limited. Looking at Spark documentation, the only related tuning property I could find for a JDBC write is numPartitions.

However, the dataframe we want to write is 700,000,000 records and Oracle is only 32 cores, so I don't want to overload the database with too many threads. My understanding is if I set numPartitions to 32, it will effectively do a .repartition(32) on the large dataset then write each partition into Oracle. 32 partitions in Spark is not enough and will cause memory issues.

Is there a way to break the job into further pieces so it doesn't try to do everything at once, but instead does 50,000,000 (or something) at a time?

I was thinking of something like this, but I was hoping there's something more efficient:

// Imagine "df" is the incoming dataframe we want to write.

val threads = 32
val recordsPerThread = 500000
val chunkSize = threads * recordsPerThread
val total = df.count
val chunks = (total/chunkSize).ceil.toInt

val chunkDf = df.withColumn("CHUNK_NUM", rand.multiply(chunks).cast(IntegerType))

for (chunkNum <- 0 to chunks) {
  chunkDf.filter(s"CHUNK_NUM = ${chunkNum}")
    .drop("CHUNK_NUM")
    .write
    .format("jdbc")
    .options(...) // DB info + numPartitions = 32
    .save
}

Basically, I'm dividing the dataset into "chunks" which can all be written at once with 32 threads (numPartitions). I feel like there should be a more efficient way of doing this, but I can't seem to find it in documentation.

I'm also using batchSize set to 10000 to reduce round trips, but I'm still limited to how many threads I want making round trips to Oracle, and how large partitions in Spark can be.

We want to copy a large Spark dataframe into Oracle, but I am finding the tuning options a bit limited. Looking at Spark documentation, the only related tuning property I could find for a JDBC write is numPartitions.

However, the dataframe we want to write is 700,000,000 records and Oracle is only 32 cores, so I don't want to overload the database with too many threads. My understanding is if I set numPartitions to 32, it will effectively do a .repartition(32) on the large dataset then write each partition into Oracle. 32 partitions in Spark is not enough and will cause memory issues.

Is there a way to break the job into further pieces so it doesn't try to do everything at once, but instead does 50,000,000 (or something) at a time?

I was thinking of something like this, but I was hoping there's something more efficient:

// Imagine "df" is the incoming dataframe we want to write.

val threads = 32
val recordsPerThread = 500000
val chunkSize = threads * recordsPerThread
val total = df.count
val chunks = (total/chunkSize).ceil.toInt

val chunkDf = df.withColumn("CHUNK_NUM", rand.multiply(chunks).cast(IntegerType))

for (chunkNum <- 0 to chunks) {
  chunkDf.filter(s"CHUNK_NUM = ${chunkNum}")
    .drop("CHUNK_NUM")
    .write
    .format("jdbc")
    .options(...) // DB info + numPartitions = 32
    .save
}

Basically, I'm dividing the dataset into "chunks" which can all be written at once with 32 threads (numPartitions). I feel like there should be a more efficient way of doing this, but I can't seem to find it in documentation.

I'm also using batchSize set to 10000 to reduce round trips, but I'm still limited to how many threads I want making round trips to Oracle, and how large partitions in Spark can be.

Share Improve this question asked Nov 18, 2024 at 21:03 DepressioDepressio 1,3792 gold badges21 silver badges43 bronze badges
Add a comment  | 

2 Answers 2

Reset to default 1

I was overthinking it. We can constrain how much Spark is writing at once by simply constraining the resources we give Spark. If I set numPartitions to 500 but only give Spark a single 32-core worker, it will only write 32 partitions at a time, limiting how much we're hammering Oracle. Thus effectively "chunks" the job.

In your case i would simply repartition

val threads = 32

df.repartition(threads).write.format("jdbc").options(...).save()
Post a comment

comment list (0)

  1. No comments so far