I'd want to use Spark SQL (1.6) to do "filtered equi-joins" of the kind
A inner join B where A.group_id = B.group_id and pair_filter_udf(A[cols], B[cols])
The group id is coarse in this case: a single group id value may be connected with, say, 10,000 entries in both A and B.
The coarseness of group id might cause computing concerns if the equi-join was conducted without the pair filter udf. For example, if a group id had 10,000 records in both A and B, the join would have 100 million entries. If we had thousands of such huge groups, we would build a massive table and would quickly run out of memory.
As a result, rather of waiting until all pairs are formed, we must insert pair filter udf inside the join and have it filter pairs as they are generated. My question is if Spark SQL accomplishes this as stated in this example using scaler topics.
I put up a basic filtered equi-join and asked Spark what its query strategy was:
# run in PySpark Shellimport pyspark.sql.functions as Fsq = sqlContextn=100g=10a = sq.range(n)a = a.withColumn('grp',F.floor(a['id']/g)*g)a = a.withColumnRenamed('id','id_a')b = sq.range(n)b = b.withColumn('grp',F.floor(b['id']/g)*g)b = b.withColumnRenamed('id','id_b')c = a.join(b,(a.grp == b.grp) & (F.abs(a['id_a'] - b['id_b']) < 2)).drop(b['grp'])c = c.sort('id_a')c = c[['grp','id_a','id_b']]c.explain()
Replies