22
loading...
This website collects cookies to deliver better user experience
DataFrame
is easy, but none of the members are part of DF class itself and you need to call to .rdd
. Any of the following three lines will work:df.rdd.partitions.size
df.rdd.getNumbPartitions()
df.rdd.length
In: rbe_s1.rdd.getNumPartitions()
Out: 13
.mapPartitions
which is defined as follows:def mapPartitions(self, f, preservesPartitioning=False):
"""
Return a new RDD by applying a function to each partition of this RDD.
>>> rdd = sc.parallelize([1, 2, 3, 4], 2)
>>> def f(iterator): yield sum(iterator)
>>> rdd.mapPartitions(f).collect()
[3, 7]
"""
def func(s, iterator):
return f(iterator)
return self.mapPartitionsWithIndex(func, preservesPartitioning)
lengths = rdd.mapPartitions(get_partition_len, True).collect()
DataFrame
statistics:from pyspark import RDD
from pyspark.sql import DataFrame
def print_partition_info(df: DataFrame):
import statistics
def get_partition_len(iterator):
yield sum(1 for _ in iterator)
rdd: RDD = df.rdd
count = rdd.getNumPartitions()
# lengths = rdd.glom().map(len).collect() # much more memory hungry than next line
lengths = rdd.mapPartitions(get_partition_len, True).collect()
print("")
print(f"{count} partition(s) total.")
print(f"size stats")
print(f" min: {min(lengths)}")
print(f" max: {max(lengths)}")
print(f" avg: {sum(lengths)/len(lengths)}")
print(f" stddev: {statistics.stdev(lengths)}")
print("")
print("detailed info")
for i, pl in enumerate(lengths):
print(f" {i}. {pl}")
5 partition(s) total.
size stats
min: 13
max: 4403
avg: 1277.4
stddev: 1929.5741239973136
detailed info
0. 4403
1. 1914
2. 38
3. 19
4. 13
coalesce
and repartition
on the DF itself. The documentation for them is very similar and it's really confusing what to use when:DataFrame
0. 4403
1. 1914
2. 38
3. 19
4. 13
.coalesce(2)
) you will get:0. 4454
1. 1933
coalesce
does not shuffle data so there is nothing actually happening physically. It's really useful in many cases where you don't want the data to be moved, but want to process it sequentially with some parallelism involved.repartition(2)
on the DataFrame
above results in the following:0. 3198
1. 3195
@since(1.3)
def repartition(self, numPartitions, *cols):
cols
is actually a Column
type, meaning you can pass either column name, or any expression that results in a column. This is particularly important, as by supplying an expression you are essentially creating a partition hashing function. So it's not limited just to a dumb column name.are essentially creating a partition hashing function. So it's not limited just to a dumb column name.