pyspark返回每个分组某个值最大的行

  统计/机器学习 Python mysql    浏览次数:3763        分享
0

我现在一个hive表里有type,value,id等等好几列,我想返回type每个分组里value最大的行。

不能用groupby max,因为这样只能返回每个分组的最大值,而不是完整的行。我用pyspark应该怎么做呢?

 

ziyu   2020-04-15 23:39



   1个回答 
3

这种用reducebykey就可以了

from pyspark.sql import functions as F
from pyspark.sql.window import Window
df = sql_context.createDataFrame([
("A", "ARON", 1),
("A", "BILL", 2),
("A", "CLAIR", 3),
("B", "DANIEL", 1),
("B", "ERIC", 4),
("B", "FRANK", 2),
], ["id", "name", "weight"])

def max_row(row1,row2):
    if row1['weight'] > row2['weight']:
       return row1
    else:
        return row2


df_rdd = df.rdd.map(lambda row: (row['id'], row)).\
reduceByKey(lambda row1,row2: max_row(row1,row2)).map(lambda row: row[1])
sql_context.createDataFrame(df_rdd).show()


或者用分组排序后取第一个


windows_spec = Window.partitionBy("id").orderBy(F.col("weight").desc())
max_df = df.withColumn("rank", F.rank().over(windows_spec)).filter("rank =1")
max_df.show()

 可能第一个会快一些吧,一般也都是用第一个

SofaSofa数据科学社区DS面试题库 DS面经

wwb_306   2020-04-16 18:36

谢谢 - ziyu   2020-04-17 09:17


  相关讨论

spark里怎么refresh表?

spark sql里怎么用case when?

pyspark里怎么把一列日期转成是全年的第几周?

得到一个pyspark.sql.dataframe中所有列的名称

如何获取pyspark DataFrame的行数和列数?

pyspark中怎么对dataframe里的行按照列去重?

怎么对pyspark.sql.dataframe按照某一列降序排列?

怎么在pyspark里把dataframe写入csv文件?

pyspark dataframe的collect()方法是什么意思?

怎么对pyspark dataframe更改列名

  随便看看

matplotlib一个画板上多个图叠加,如何决定图层上下?

线性可分是什么意思?

什么是混淆矩阵(confusion matrix)

z test和t test什么区别?

pandas报错: 'DataFrame' object has no attribute 'unique'