मेरे पास एक pyspark डेटा फ्रेम है जैसा कि नीचे दिखाया गया है।

+---+-------+--------+
|age|balance|duration|
+---+-------+--------+
|  2|   2143|     261|
| 44|     29|     151|
| 33|      2|      76|
| 50|   1506|      92|
| 33|      1|     198|
| 35|    231|     139|
| 28|    447|     217|
|  2|      2|     380|
| 58|    121|      50|
| 43|    693|      55|
| 41|    270|     222|
| 50|    390|     137|
| 53|      6|     517|
| 58|     71|      71|
| 57|    162|     174|
| 40|    229|     353|
| 45|     13|      98|
| 57|     52|      38|
|  3|      0|     219|
|  4|      0|      54|
+---+-------+--------+

और मेरा अपेक्षित आउटपुट दिखना चाहिए,

+---+-------+--------+-------+-----------+------------+
|age|balance|duration|age_out|balance_out|duration_out|
+---+-------+--------+-------+-----------+------------+
|  2|   2143|     261|      1|          1|           0|
| 44|     29|     151|      0|          0|           0|
| 33|      2|      76|      0|          0|           0|
| 50|   1506|      92|      0|          1|           0|
| 33|      1|     198|      0|          0|           0|
| 35|    231|     139|      0|          0|           0|
| 28|    447|     217|      0|          0|           0|
|  2|      2|     380|      1|          0|           0|
| 58|    121|      50|      0|          0|           0|
| 43|    693|      55|      0|          0|           0|
| 41|    270|     222|      0|          0|           0|
| 50|    390|     137|      0|          0|           0|
| 53|      6|     517|      0|          0|           1|
| 58|     71|      71|      0|          0|           0|
| 57|    162|     174|      0|          0|           0|
| 40|    229|     353|      0|          0|           0|
| 45|     13|      98|      0|          0|           0|
| 57|     52|      38|      0|          0|           0|
|  3|      0|     219|      1|          0|           0|
|  4|      0|      54|      0|          0|           0|
+---+-------+--------+-------+-----------+------------+

यहां मेरा उद्देश्य इंटर क्वार्टाइल विधि का उपयोग करके डेटा सेट में बाहरी रिकॉर्ड की पहचान करना है जैसा कि मैंने नीचे दिए गए पायथन कोड में वर्णित किया है। यदि हमें कोई बाहरी रिकॉर्ड मिलता है, तो हमें उन्हें 1 के रूप में चिह्नित करना होगा अन्यथा 0.

मैं नीचे दिए गए कोड का उपयोग करके अजगर का उपयोग करके वही काम कर सकता हूं।

import numpy as np
def outliers_iqr(ys):
    quartile_1, quartile_3 = np.percentile(ys, [25, 75])
    iqr = quartile_3 - quartile_1
    lower_bound = quartile_1 - (iqr * 1.5)
    upper_bound = quartile_3 + (iqr * 1.5)
    ser = np.zeros(len(ys))
    pos =np.where((ys > upper_bound) | (ys < lower_bound))[0]
    ser[pos]=1
    return(ser)

लेकिन मैं वही काम पाइस्पार्क में करना चाहता था। क्या कोई मेरी उसी पर मदद कर सकता है?

मेरा पाइस्पार्क कोड:

def outliers_iqr(ys):
    quartile_1, quartile_3 = np.percentile(ys, [25, 75])
    iqr = quartile_3 - quartile_1
    lower_bound = quartile_1 - (iqr * 1.5)
    upper_bound = quartile_3 + (iqr * 1.5)
    ser = np.zeros(len(ys))
    pos =np.where((ys > upper_bound) | (ys < lower_bound))[0]
    ser[pos]=1
    return(float(ser))

outliers_iqr_udf = udf(outliers_iqr, FloatType())
DF.withColumn('age_out', outliers_iqr_udf(DF.select('age').collect())).show()
6
RSK 3 अक्टूबर 2018, 21:54

2 जवाब

सबसे बढ़िया उत्तर

आप लूप के अंदर pyspark.sql.DataFrame.approxQuantile का उपयोग कर सकते हैं अपने प्रत्येक कॉलम के लिए वांछित 25वां और 75वां शतमक मान प्राप्त करने के लिए।

bounds = {
    c: dict(
        zip(["q1", "q3"], df.approxQuantile(c, [0.25, 0.75], 0))
    )
    for c in df.columns
}

पारित किया गया अंतिम तर्क सापेक्ष त्रुटि है, जिसके बारे में आप लिंक की गई पोस्ट के साथ-साथ दस्तावेज़। संक्षिप्त संस्करण यह है कि संख्या जितनी कम होगी, आपका परिणाम उतना ही सटीक होगा लेकिन सटीकता और कम्प्यूटेशनल खर्च के बीच एक ट्रेड-ऑफ है। (यहाँ मैंने सटीक मान प्राप्त करने के लिए 0 का उपयोग किया है, लेकिन आप अपने डेटा के आकार के आधार पर एक भिन्न मान चुनना चाह सकते हैं।)

एक बार जब आपके पास पहला और तीसरा चतुर्थक मान हो, तो आप iqr और ऊपरी/निचली सीमाओं की गणना आसानी से कर सकते हैं:

for c in bounds:
    iqr = bounds[c]['q3'] - bounds[c]['q1']
    bounds[c]['lower'] = bounds[c]['q1'] - (iqr * 1.5)
    bounds[c]['upper'] = bounds[c]['q3'] + (iqr * 1.5)
print(bounds)
#{'age': {'lower': 3.0, 'q1': 33.0, 'q3': 53.0, 'upper': 83.0},
# 'balance': {'lower': -570.0, 'q1': 6.0, 'q3': 390.0, 'upper': 966.0},
# 'duration': {'lower': -143.0, 'q1': 76.0, 'q3': 222.0, 'upper': 441.0}}

अब bounds के आधार पर बाहरी कॉलम बनाने के लिए सूची समझ में pyspark.sql.functions.when का उपयोग करें:

import pyspark.sql.functions as f
df.select(
    "*",
    *[
        f.when(
            f.col(c).between(bounds[c]['lower'], bounds[c]['upper']),
            0
        ).otherwise(1).alias(c+"_out") 
        for c in df.columns
    ]
).show()
#+---+-------+--------+-------+-----------+------------+
#|age|balance|duration|age_out|balance_out|duration_out|
#+---+-------+--------+-------+-----------+------------+
#|  2|   2143|     261|      1|          1|           0|
#| 44|     29|     151|      0|          0|           0|
#| 33|      2|      76|      0|          0|           0|
#| 50|   1506|      92|      0|          1|           0|
#| 33|      1|     198|      0|          0|           0|
#| 35|    231|     139|      0|          0|           0|
#| 28|    447|     217|      0|          0|           0|
#|  2|      2|     380|      1|          0|           0|
#| 58|    121|      50|      0|          0|           0|
#| 43|    693|      55|      0|          0|           0|
#| 41|    270|     222|      0|          0|           0|
#| 50|    390|     137|      0|          0|           0|
#| 53|      6|     517|      0|          0|           1|
#| 58|     71|      71|      0|          0|           0|
#| 57|    162|     174|      0|          0|           0|
#| 40|    229|     353|      0|          0|           0|
#| 45|     13|      98|      0|          0|           0|
#| 57|     52|      38|      0|          0|           0|
#|  3|      0|     219|      0|          0|           0|
#|  4|      0|      54|      0|          0|           0|
#+---+-------+--------+-------+-----------+------------+

यहां मैंने यह जांचने के लिए between का उपयोग किया है कि क्या कोई मान बाहरी नहीं है, और यह फ़ंक्शन समावेशी है (यानी x between a and b तार्किक रूप से x >= a and x <= b के बराबर है)।

8
pault 23 मई 2019, 14:28

कृपया मेरे समाधान के नीचे खोजें:

from pyspark.sql import functions as f


class Outlier():

    def __init__(self, df):
        self.df = df


    def _calculate_bounds(self):
        bounds = {
            c: dict(
                zip(["q1", "q3"], self.df.approxQuantile(c, [0.25, 0.75], 0))
            )
            for c, d in zip(self.df.columns, self.df.dtypes) if d[1] in ["bigint", "double"]
        }

        for c in bounds:
            iqr = bounds[c]['q3'] - bounds[c]['q1']
            bounds[c]['min'] = bounds[c]['q1'] - (iqr * 1.5)
            bounds[c]['max'] = bounds[c]['q3'] + (iqr * 1.5)

        return bounds


    def _flag_outliers_df(self):
        bounds = self._calculate_bounds()

        outliers_col = [
            f.when(
                ~f.col(c).between(bounds[c]['min'], bounds[c]['max']),
                f.col(c)
            ).alias(c + '_outlier')
            for c in bounds]

        return self.df.select(*outliers_col)


    def show_outliers(self):

        outlier_df = self._flag_outliers_df()

        for outlier in outlier_df.columns:
            outlier_df.select(outlier).filter(f.col(outlier).isNotNull()).show()

और फिर अपना डेटाफ्रेम नीचे के रूप में पास करें:

Outlier(df).show_outliers()
0
Florian 20 मई 2020, 09:03