मेरे पास नीचे दी गई तालिका है जहां गतिविधि_डेट लगातार होने पर मेरे पास बढ़ती लकीर है। यदि नहीं, तो स्ट्रीक 1 पर सेट हो जाएगी।
अब मुझे स्ट्रीक्स के प्रत्येक समूह का न्यूनतम और अधिकतम प्राप्त करने की आवश्यकता है। स्पार्क और स्कैला या स्पार्क एसक्यूएल का उपयोग करना।
Input
floor activity_date streak
--------------------------------
floor1 2018-11-08 1
floor1 2019-01-24 1
floor1 2019-04-05 1
floor1 2019-04-08 1
floor1 2019-04-09 2
floor1 2019-04-14 1
floor1 2019-04-17 1
floor1 2019-04-20 1
floor2 2019-05-04 1
floor2 2019-05-05 2
floor2 2019-06-04 1
floor2 2019-07-28 1
floor2 2019-08-14 1
floor2 2019-08-22 1
Output
floor activity_date end_activity_date
----------------------------------------
floor1 2018-11-08 2018-11-08
floor1 2019-01-24 2019-01-24
floor1 2019-04-05 2019-04-05
floor1 2019-04-08 2019-04-09
floor1 2019-04-14 2019-04-14
floor1 2019-04-17 2019-04-17
floor1 2019-04-20 2019-04-20
floor2 2019-05-04 2019-05-05
floor2 2019-06-04 2019-06-04
floor2 2019-07-28 2019-07-28
floor2 2019-08-14 2019-08-14
floor2 2019-08-22 2019-08-22
0
Ajith
12 अक्टूबर 2021, 22:14
1 उत्तर
सबसे बढ़िया उत्तर
आप निम्न दृष्टिकोण का उपयोग कर सकते हैं
स्पार्क एसक्यूएल का उपयोग करना
SELECT
floor,
activity_date,
MAX(activity_date) OVER (PARTITION BY gn,floor) as end_activity_date
FROM (
SELECT
*,
SUM(is_same_streak) OVER (
PARTITION BY floor ORDER BY activity_date
) as gn
FROM (
SELECT
*,
CASE
WHEN streak > LAG(streak,1,streak-1) OVER (
PARTITION BY floor
ORDER BY activity_date
) THEN 0
ELSE 1
END as is_same_streak
FROM
df
) t1
) t2
ORDER BY
"floor",
activity_date
स्कैला एपीआई का उपयोग करना
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val floorWindow = Window.partitionBy("floor").orderBy("activity_date")
val output = df.withColumn(
"is_same_streak",
when(
col("streak") > lag(col("streak"),1,col("streak")-1).over(floorWindow) , 0
).otherwise(1)
)
.withColumn(
"gn",
sum(col("is_same_streak")).over(floorWindow)
)
.select(
"floor",
"activity_date",
max(col("activity_date")).over(
Window.partitionBy("gn","floor")
).alias("end_activity_date")
)
Pyspark api . का उपयोग करना
from pyspark.sql import functions as F
from pyspark.sql import Window
floorWindow = Window.partitionBy("floor").orderBy("activity_date")
output = (
df.withColumn(
"is_same_streak",
F.when(
F.col("streak") > F.lag(F.col("streak"),1,F.col("streak")-1).over(floorWindow) , 0
).otherwise(1)
)
.withColumn(
"gn",
F.sum(F.col("is_same_streak")).over(floorWindow)
)
.select(
"floor",
"activity_date",
F.max(F.col("activity_date")).over(
Window.partitionBy("gn","floor")
).alias("end_activity_date")
)
)
अगर यह आपके काम का है, तो मुझे बताएं।
0
ggordon
12 अक्टूबर 2021, 23:03
सभी 3 तरीकों से उत्तर के लिए @ggordon धन्यवाद। इसने वास्तव में मदद की।
– Ajith
13 अक्टूबर 2021, 18:37