मैं डेटाफ्रेम में अलग-अलग कॉलम पर अलग-अलग एजीजी फ़ंक्शंस के एक समूह की गणना करना चाहता हूं।

मुझे पता है कि मैं ऐसा कुछ कर सकता हूं, लेकिन आउटपुट सभी एक पंक्ति है।

df.agg(max("cola"), min("cola"), max("colb"), min("colb"))

मान लीजिए कि मैं 10 अलग-अलग स्तंभों पर 100 अलग-अलग एकत्रीकरण कर रहा हूं।

मैं चाहता हूं कि आउटपुट डेटाफ्रेम इस तरह हो -

      |Min|Max|AnotherAggFunction1|AnotherAggFunction2|...etc..
cola  | 1 | 10| ... 
colb  | 2 | NULL| ... 
colc  | 5 | 20| ... 
cold  | NULL | 42| ... 
...

जहां मेरी पंक्तियां प्रत्येक कॉलम हैं, मैं एकत्रीकरण कर रहा हूं और मेरे कॉलम एकत्रीकरण कार्य हैं। उदाहरण के लिए यदि मैं colb अधिकतम की गणना नहीं करता तो कुछ क्षेत्र शून्य हो जाएंगे।

मैं यह कैसे हासिल कर सकता हूं?

2
asojidaiod 21 फरवरी 2020, 19:20

2 जवाब

आप एक नक्शा स्तंभ बना सकते हैं, जैसे Metrics, जहां कुंजियां स्तंभ नाम हैं और एकत्रीकरण की संरचना (अधिकतम, न्यूनतम, औसत, ...) को मानती हैं। मैं map_from_entries फ़ंक्शन का उपयोग कर रहा हूं एक नक्शा कॉलम बनाएं (स्पार्क 2.4+ से उपलब्ध)। और फिर, अपनी इच्छित संरचना प्राप्त करने के लिए बस मानचित्र में विस्फोट करें।

यहां एक उदाहरण दिया गया है जिसे आप अपनी आवश्यकता के अनुसार अनुकूलित कर सकते हैं:

df = spark.createDataFrame([("A", 1, 2), ("B", 2, 4), ("C", 5, 6), ("D", 6, 8)], ['cola', 'colb', 'colc'])

agg = map_from_entries(array(
    *[
        struct(lit(c),
               struct(max(c).alias("Max"), min(c).alias("Min"))
               )
        for c in df.columns
    ])).alias("Metrics")

df.agg(agg).select(explode("Metrics").alias("col", "Metrics")) \
    .select("col", "Metrics.*") \
    .show()

#+----+---+---+
#|col |Max|Min|
#+----+---+---+
#|cola|D  |A  |
#|colb|6  |1  |
#|colc|8  |2  |
#+----+---+---+
2
blackbishop 22 फरवरी 2020, 13:36

यहां एक समाधान है जो आपको पूर्वनिर्धारित सूची से गतिशील रूप से एकत्रीकरण सेट करने की अनुमति देता है। समाधान map_from_arrays का उपयोग करता है, इसलिए इसके साथ संगत है स्पार्क> = 2.4.0:

from pyspark.sql.functions import lit, expr, array, map_from_arrays

df = spark.createDataFrame([
  [1, 2.3, 5000],
  [2, 5.3, 4000],
  [3, 2.1, 3000],
  [4, 1.5, 4500]
], ["cola", "colb", "colc"])

aggs = ["min", "max", "avg", "sum"]
aggs_select_expr = [f"value[{idx}] as {agg}" for idx, agg in enumerate(aggs)]

agg_keys = []
agg_values = []

# generate map here where key is col name and value an array of aggregations
for c in df.columns:
  agg_keys.append(lit(c)) # the key i.e cola
  agg_values.append(array(*[expr(f"{a}({c})") for a in aggs])) # the value i.e [expr("min(a)"), expr("max(a)"), expr("avg(a)"), expr("sum(a)")]

df.agg(
  map_from_arrays(
    array(agg_keys), 
    array(agg_values)
  ).alias("aggs")
) \
.select(explode("aggs")) \
.selectExpr("key as col", *aggs_select_expr) \
.show(10, False)

# +----+------+------+------+-------+
# |col |min   |max   |avg   |sum    |
# +----+------+------+------+-------+
# |cola|1.0   |4.0   |2.5   |10.0   |
# |colb|1.5   |5.3   |2.8   |11.2   |
# |colc|3000.0|5000.0|4125.0|16500.0|
# +----+------+------+------+-------+

विवरण: अभिव्यक्ति array(*[expr(f"{a}({c})") for a in aggs]) के साथ हम एक सरणी बनाते हैं जिसमें वर्तमान कॉलम के सभी एकत्रीकरण शामिल हैं। जेनरेट किए गए सरणी के प्रत्येक आइटम का मूल्यांकन expr(f"{a}({c})" कथन के साथ किया जा रहा है जो कि expr("min(a)") उत्पन्न करेगा।

सरणी में agg_values के मान शामिल होंगे जो agg_keys के साथ मिलकर map_from_arrays(array(agg_keys), array(agg_values)) अभिव्यक्ति के माध्यम से हमारे अंतिम मानचित्र की रचना करेंगे। मानचित्र की संरचना इस तरह दिखती है:

map(
    cola -> [min(cola), max(cola), avg(cola), sum(cola)]
    colb -> [min(colb), max(colb), avg(colb), sum(colb)]
    colc -> [min(cola), max(colc), avg(cola), sum(colc)]
)

हमें जिस जानकारी की आवश्यकता है उसे निकालने के लिए हमें पिछले मानचित्र को explode("aggs") के साथ विस्फोट करना होगा, इससे दो कॉलम key और value बनेंगे जिनका उपयोग हम अपने चयन कथन में करते हैं।

aggs_select_expr में ["value[0] as min", "value[1] as max", "value[2] as avg", "value[3] as sum"] के रूप में मान होंगे जो selectExpr स्टेटमेंट का इनपुट होगा।

अद्यतन करें:

मैंने महसूस किया कि एग्रीगेशन उर्फ ​​इम्प्लीमेंट groupBy से agg को छोड़ कर एक बेहतर प्रदर्शन करने वाला तरीका है। हम इसे create_map बिल्ट-इन फ़ंक्शन के माध्यम से प्राप्त कर सकते हैं:

from pyspark.sql.functions import create_map, expr, array
from itertools import chain

df = spark.createDataFrame([
  [1, 2.3, 5000],
  [2, 5.3, 4000],
  [3, 2.1, 3000],
  [4, 1.5, 4500]
], ["cola", "colb", "colc"])

aggs = ["min", "max", "avg", "sum"]
aggs_select_expr = [f"value[{idx}] as {agg}" for idx, agg in enumerate(aggs)]

df.select(explode(
                  create_map(*list(
                        chain(*[(lit(c), array(*[expr(f"{a}({c})") for a in aggs])) 
                          for c in df.columns
                     ])))
                   )
         ) \
        .selectExpr("key as col", *aggs_select_expr)

नोट: कम कोड के अतिरिक्त, दूसरे दृष्टिकोण का मुख्य लाभ यह तथ्य है कि इसमें केवल संकीर्ण परिवर्तन होता है, न कि विस्तृत यानी groupBy। कि हम प्रदर्शन में सुधार करेंगे क्योंकि यह फेरबदल से बचता है।

1
abiratsis 7 अप्रैल 2020, 15:27