Datafusion Comet (part2) shuffle

#rust #spark #datafusion #comet #shuffle #uniffle

We are diving into the comet shuffle componet when the comet exec is disabled. (If the comet exec is enabled, this will be complex, let's reserve this case in the following parts)

Comet exec is disabled, but for Spark shuffle, we still can use Comet columnar shuffle (This is referred by the comet codebase)

Test on local machine

Run the comet spark shell

# compile the comet to package to use
make release

bin/comet-spark-shell -d . -o spark/target/ --conf spark.comet.shuffle.enforceMode.enabled=true --driver-class-path spark/target/comet-spark-spark3.4_2.12-0.1.0-SNAPSHOT.jar --conf spark.comet.explainFallback.enabled=true

create the test data

val data = Seq( ("key1", 1), ("key2", 2), ("key1", 3), ("key3", 4), ("key2", 5) ).toDF("key", "value")
data.write.mode("overwrite").parquet("/tmp/test")

spark.read.parquet("/tmp/test").createOrReplaceTempView("t1")
spark.sql("select key, sum(value) as total_value from t1 group by key").explain

It works well, the final physical plan is as following.

Pasted image 20240523145538.png

Whiteboard

Pasted image 20240516155530.png