Невозможно использовать Pivot в Apache Spark [duplicate]

david nadal спросил: 28 апреля 2018 в 08:46 в: python

У этого вопроса уже есть ответ:

  • How to pivot Spark DataFrame? 6 ответов

Я пытаюсь использовать pivot в Apache Spark.

Мои данные:

+--------------------+---------+
|           timestamp|  user|
+--------------------+---------+
|2017-12-19T00:41:...|User_1|
|2017-12-19T00:01:...|User_2|
|2017-12-19T00:01:...|User_1|
|2017-12-19T00:01:...|User_1|
|2017-12-19T00:01:...|User_2|
+--------------------+---------+

Я хочу опираться на столбец пользователя.

Но я продолжаю получать ошибку:

'DataFrame' object has no attribute 'pivot'
Traceback (most recent call last):
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/dataframe.py", line 1020, in __getattr__
    "'%s' object has no attribute '%s'" % (self.__class__.__name__, name))
AttributeError: 'DataFrame' object has no attribute 'pivot'

Нет материи, как я ее использую.

т.е. df.groupBy('A').pivot('B') or df.pivot('B')

Мой фактический запрос:

# The Pivot operation will give timestamp vs Users data
pivot_pf = tf.groupBy(window(tf["timestamp"], "2 minutes"), 'user').count().select('window.start', 'user', 'count').pivot("user").sum("count")

Любая помощь с благодарностью.

Спасибо.


2 ответа

Rumoku ответил: 28 апреля 2018 в 09:23
import pyspark.sql.functions as func
from datetime import datetimedf = spark_session.createDataFrame(
    [[datetime.strptime("2012-01-01 00:00:00", '%Y-%m-%d %H:%M:%S'), 'one'],
     [datetime.strptime("2012-01-01 00:01:00", '%Y-%m-%d %H:%M:%S'), 'two'],
     [datetime.strptime("2012-01-01 00:02:00", '%Y-%m-%d %H:%M:%S'), 'three'],
     [datetime.strptime("2012-01-01 00:03:00", '%Y-%m-%d %H:%M:%S'), 'one'],
     [datetime.strptime("2012-01-01 00:04:00", '%Y-%m-%d %H:%M:%S'), 'two']],
    'dd: timestamp, user: string')df.groupBy(func.window(df["dd"], "2 minutes")).pivot('user').agg({'dd': 'count'}).show()

Ожидаемый результат:

+--------------------+----+-----+----+
|              window| one|three| two|
+--------------------+----+-----+----+
|[2012-01-01 00:00...|   1| null|   1|
|[2012-01-01 00:04...|null| null|   1|
|[2012-01-01 00:02...|   1|    1|null|
+--------------------+----+-----+----+
Mugdha ответил: 28 апреля 2018 в 09:00

Pivot работает хорошо, как указано ниже. Но он возвращает сгруппированные данные. Если мы используем некоторую агрегацию по сгруппированным данным, это приведет к передаче данных.

val d1 = Array(("a", "10"), ("b", "20"), ("c", "30"),("a","56"),("c","29"))
val rdd1= sc.parallelize(d1)
val df1 = rdd1.toDF("key","val")
df1.groupBy("key").pivot("val")
david nadal ответил: 28 апреля 2018 в 09:02
я использую sum ("count"), как указано в запросе в вопросе