Объединить строки в искровой scala Dataframe

Darshan Shah спросил: 28 апреля 2018 в 09:32 в: scala

Объединить строки в искровом окне Dataframe

У меня есть данные вроде следующих

ID  Name    Passport    Country  License    UpdatedtimeStamp
1   Ostrich 12345       -       ABC         11-02-2018
1   -       -           -       BCD         10-02-2018
1   Shah    12345       -       -           12-02-2018
2   PJ      -           ANB     a           10-02-2018

Требуемый вывод

ID  Name    Passport    Country  License    UpdatedtimeStamp
1   Shah    12345       -       ABC         12-02-2018
2   PJ      -           ANB     a           10-02-2018

В принципе, данные в одном и том же ID должны сливаться, а последняя запись, а не null должна быть на выходе, если все значения null, тогда .

Пожалуйста, предложите ... Кроме того, предложите его без использования функций SparkSQL null, поскольку мне нужно, чтобы это было очень быстро


1 ответ

Есть решение
Ramesh Maharjan ответил: 28 апреля 2018 в 12:06

вы можете достичь своего результата, указав функцию udf и передав собранные столбцы структуры в функцию udf для s портирования и заполнения нули с ненулевыми значениями . (в коде для пояснения)

import org.apache.spark.sql.functions._
//udf function definition
def sortAndAggUdf = udf((structs: Seq[Row])=>{
  //sorting the collected list by timestamp in descending order
  val sortedStruct = structs.sortBy(str => str.getAs[Long]("UpdatedtimeStamp"))(Ordering[Long].reverse)
  //selecting the first struct and casting to out case class
  val first = out(sortedStruct(0).getAs[String]("Name"), sortedStruct(0).getAs[String]("Passport"), sortedStruct(0).getAs[String]("Country"), sortedStruct(0).getAs[String]("License"), sortedStruct(0).getAs[Long]("UpdatedtimeStamp"))
  //aggregation for checking nulls and populating first not null value
  sortedStruct
    .foldLeft(first)((x, y) => {
      out(
        if(x.Name == null || x.Name.isEmpty) y.getAs[String]("Name") else x.Name,
        if(x.Passport == null || x.Passport.isEmpty) y.getAs[String]("Passport") else x.Passport,
        if(x.Country == null || x.Country.isEmpty) y.getAs[String]("Country") else x.Country,
        if(x.License == null || x.License.isEmpty) y.getAs[String]("License") else x.License,
        x.UpdatedtimeStamp)
    })
})
//making the rest of the columns as one column and changing the UpdatedtimeStamp column to long for sorting in udf
df.select(col("ID"), struct(col("Name"), col("Passport"), col("Country"), col("License"), unix_timestamp(col("UpdatedtimeStamp"), "MM-dd-yyyy").as("UpdatedtimeStamp")).as("struct"))
    //grouping and collecting the structs and passing to udf function for manipulation
    .groupBy("ID").agg(sortAndAggUdf(collect_list("struct")).as("struct"))
    //separating the aggregated columns to separate columns
    .select(col("ID"), col("struct.*"))
    //getting the date in correct format
    .withColumn("UpdatedtimeStamp", date_format(col("UpdatedtimeStamp").cast("timestamp"), "MM-dd-yyyy"))
 .show(false)

, который должен предоставить вам

+---+----+--------+-------+-------+----------------+
|ID |Name|Passport|Country|License|UpdatedtimeStamp|
+---+----+--------+-------+-------+----------------+
|1  |Shah|12345   |null   |ABC    |12-02-2018      |
|2  |PJ  |null    |ANB    |a      |10-02-2018      |
+---+----+--------+-------+-------+----------------+

и конечно, необходим класс case

case class out(Name: String, Passport: String, Country: String, License: String, UpdatedtimeStamp: Long)
Darshan Shah ответил: 17 мая 2018 в 05:46
Привет, Это решение работает. Однако мне нужна помощь в его расширении ... У меня есть более 30 полей в фрейме данных и в Spark 1.6 (Scala 2.10), не разрешено иметь класс case с более чем 22 полями. Кроме того, существует несколько последних обновленных столбцов: каждый из которых соответствует различным полям ... Можете ли вы предложить лучший способ достичь этого?