मेरे पास नीचे दी गई तालिका है जहां गतिविधि_डेट लगातार होने पर मेरे पास बढ़ती लकीर है। यदि नहीं, तो स्ट्रीक 1 पर सेट हो जाएगी।

अब मुझे स्ट्रीक्स के प्रत्येक समूह का न्यूनतम और अधिकतम प्राप्त करने की आवश्यकता है। स्पार्क और स्कैला या स्पार्क एसक्यूएल का उपयोग करना।

Input

floor   activity_date   streak
--------------------------------
floor1     2018-11-08   1
floor1     2019-01-24   1
floor1     2019-04-05   1
floor1     2019-04-08   1
floor1     2019-04-09   2
floor1     2019-04-14   1
floor1     2019-04-17   1
floor1     2019-04-20   1
floor2     2019-05-04   1
floor2     2019-05-05   2
floor2     2019-06-04   1
floor2     2019-07-28   1
floor2     2019-08-14   1
floor2     2019-08-22   1



Output

floor   activity_date   end_activity_date
----------------------------------------
floor1     2018-11-08      2018-11-08
floor1     2019-01-24      2019-01-24
floor1     2019-04-05      2019-04-05
floor1     2019-04-08      2019-04-09
floor1     2019-04-14      2019-04-14
floor1     2019-04-17      2019-04-17
floor1     2019-04-20      2019-04-20
floor2     2019-05-04      2019-05-05
floor2     2019-06-04      2019-06-04
floor2     2019-07-28      2019-07-28
floor2     2019-08-14      2019-08-14
floor2     2019-08-22      2019-08-22
0
Ajith 12 अक्टूबर 2021, 22:14

1 उत्तर

सबसे बढ़िया उत्तर

आप निम्न दृष्टिकोण का उपयोग कर सकते हैं

स्पार्क एसक्यूएल का उपयोग करना

SELECT
    floor,
    activity_date,
    MAX(activity_date) OVER (PARTITION BY gn,floor) as end_activity_date
FROM (
    SELECT
         *,
         SUM(is_same_streak) OVER (
             PARTITION BY floor ORDER BY activity_date
         ) as gn 
    FROM (
        SELECT
            *,
             CASE
                 WHEN streak > LAG(streak,1,streak-1) OVER (
                     PARTITION BY floor 
                     ORDER BY activity_date 
                 ) THEN 0
                 ELSE 1
             END as is_same_streak
        FROM
            df
    ) t1
) t2
ORDER BY 
    "floor",
    activity_date

कार्यशील डेमो देखें db fiddle

स्कैला एपीआई का उपयोग करना


import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val floorWindow = Window.partitionBy("floor").orderBy("activity_date")

val output = df.withColumn(
                "is_same_streak",
                when(
                    col("streak") > lag(col("streak"),1,col("streak")-1).over(floorWindow) , 0
                ).otherwise(1)
                
             )
             .withColumn(
                 "gn",
                 sum(col("is_same_streak")).over(floorWindow)
             )
             .select(
                 "floor",
                 "activity_date",
                 max(col("activity_date")).over(
                     Window.partitionBy("gn","floor")
                 ).alias("end_activity_date")
             )

Pyspark api . का उपयोग करना


from pyspark.sql import functions as F
from pyspark.sql import Window

floorWindow = Window.partitionBy("floor").orderBy("activity_date")

output = (
            df.withColumn(
                "is_same_streak",
                F.when(
                    F.col("streak") > F.lag(F.col("streak"),1,F.col("streak")-1).over(floorWindow) , 0
                ).otherwise(1)
                
             )
             .withColumn(
                 "gn",
                 F.sum(F.col("is_same_streak")).over(floorWindow)
             )
             .select(
                 "floor",
                 "activity_date",
                 F.max(F.col("activity_date")).over(
                     Window.partitionBy("gn","floor")
                 ).alias("end_activity_date")
             )
)

अगर यह आपके काम का है, तो मुझे बताएं।

0
ggordon 12 अक्टूबर 2021, 23:03
सभी 3 तरीकों से उत्तर के लिए @ggordon धन्यवाद। इसने वास्तव में मदद की।
 – 
Ajith
13 अक्टूबर 2021, 18:37