2

In its simplest form, RDD is merely a placeholder of chained computations that can be arbitrarily scheduled to be executed on any machine:

val src = sc.parallelize(0 to 1000)

val rdd = src.mapPartitions { itr =>
  Iterator(SparkEnv.get.executorId)
}

for (i <- 1 to 3) {

  val vs = rdd.collect()
  println(vs.mkString)
}

/* yielding:
1230123012301230
0321032103210321
2130213021302130
*/

This behaviour can obviously be overridden by making any of the upstream RDD persisted, such that Spark scheduler will minimise redundant computation:

val src = sc.parallelize(0 to 1000)

src.persist()

val rdd = src.mapPartitions { itr =>
  Iterator(SparkEnv.get.executorId)
}

for (i <- 1 to 3) {

  val vs = rdd.collect()
  println(vs.mkString)
}

/* yield:
2013201320132013
2013201320132013
2013201320132013
each partition has a fixed executorID
*/

Now my problem is :

I don't like the vanilla caching mechanism (see this post: In Apache Spark, can I incrementally cache an RDD partition?) and have wrote my own caching mechanism (by implementing a new RDD). Since the new caching mechanism is only capable of reading existing values from local disk/memory, if there are multiple executors, my cache for each partition will be frequently missed every time the partition is executed in a task on another machine.

So my question is :

How do I mimic Spark RDD persistent implementation to ask the DAG scheduler to enforce/suggest locality aware task scheduling? Without actually calling the .persist() method, because it is unnecessary.

0