Udf для цикла в pySpark

Charles Van Damme спросил: 31 июля 2018 в 09:52 в: apache-spark

Кусок кода ниже пытается сделать следующее:

Для каждого customer_code в sdf1, проверьте, отображается ли этот код клиента в sdf2. Если это так, замените df1.actual_related_customer на df2.actual_related_customer.

Этот код не работает, потому что я неправильно обращаюсь к моим строкам в df2. Как я могу достичь вышеуказанной цели? (если у вас есть другое предложение, чем индексы, стреляйте!)

sdf1 = sqlCtx.createDataFrame(
    [
        ('customer1', 'customer_code1', 'other'),
        ('customer2', 'customer_code2', 'other'),
        ('customer3', 'customer_code3', 'other'),
        ('customer4', 'customer_code4', 'other')
    ],
    ('actual_related_customer', 'customer_code', 'other')
)sdf2 = sqlCtx.createDataFrame(
    [
        ('Peter', 'customer_code1'),
        ('Deran', 'customer_code5'),
        ('Christopher', 'customer_code3'),
        ('Nick', 'customer_code4')
    ],
    ('actual_related_customer', 'customer_code')
)def right_customer(x,y):
    for row in sdf2.collect() :
        if x == row['customer_code'] :
            return row['actual_related_customer']
    return yfun1 = udf(right_customer, StringType())
test = sdf1.withColumn(
    "actual_related_customer",
    fun1(sdf1.customer_code, sdf1.actual_related_customer)
)

И мой желаемый результат будет выглядеть так:

desired_output = sqlCtx.createDataFrame(
    [
        ('Peter', 'customer_code1', 'other'),
        ('customer2', 'customer_code2', 'other'),
        ('Christopher', 'customer_code3', 'other'),
        ('Nick', 'customer_code4', 'other')
    ],
    ('actual_related_customer', 'customer_code', 'other')
)

1 ответ

Есть решение
pault Alla Tarighati ответил: 31 июля 2018 в 01:06

Давайте сделаем это шаг за шагом:

Сначала переименуйте actual_related_customer в sdf1 на actual_1 и переименуйте actual_related_customer в sdf2 на :

sdf1=sdf1.withColumnRenamed('actual_related_customer', 'actual_1')
sdf2=sdf2.withColumnRenamed('actual_related_customer', 'actual_2')

Затем присоединитесь к ним:

sdf1= sdf1.join(sdf2, on='customer_code', how='left')
sdf1.show()

Вывод:

+--------------+---------+-----+-----------+
| customer_code| actual_1|other|   actual_2|
+--------------+---------+-----+-----------+
|customer_code4|customer4|other|       Nick|
|customer_code2|customer2|other|       null|
|customer_code3|customer3|other|Christopher|
|customer_code1|customer1|other|      Peter|
+--------------+---------+-----+-----------+

Теперь добавьте логику в actual_2:

sdf1= sdf1.withColumn('actual_related_customer', F.when(sdf1.actual_2.isNotNull(), sdf1.actual_2).otherwise(sdf1.actual_1))

И, наконец, покажите, что вы хотите:

sdf1.select('customer_code', 'other', 'actual_related_customer').show()
Вывод:
+--------------+-----+-----------------------+
| customer_code|other|actual_related_customer|
+--------------+-----+-----------------------+
|customer_code4|other|                   Nick|
|customer_code2|other|              customer2|
|customer_code3|other|            Christopher|
|customer_code1|other|                  Peter|
+--------------+-----+-----------------------+
pault ответил: 31 июля 2018 в 01:05
Вы также можете использовать pyspark.sql.functions.coalesce() вместо when.
Alla Tarighati ответил: 31 июля 2018 в 01:19
Это был хороший @pault: +1: