#Start and End is a range of dates. 
start = date(2019, 1, 20)
end = date(2019, 1, 22)

for single_date in daterange(start, end):
  query = "(SELECT ID, firstname,lastname,date FROM dbo.emp WHERE date = '%s' ) emp_alias" %((single_date).strftime("%Y-%m-%d %H:%M:%S")) 
  df = spark.read.jdbc(url=jdbcUrl, table=query, properties=connectionProperties)
  df.write.format("parquet").mode("ignore").partitionBy("Date").save("/mnt/data/empData.parquet")

मेरे पास तालिका में दिनों की संख्या के लिए डेटा है और मुझे तिथि के अनुसार विभाजित लकड़ी की छत फ़ाइलों की आवश्यकता है। मुझे लूप में दिन-ब-दिन बचत करनी है क्योंकि डेटा बहुत बड़ा है और मैं सभी दिनों जैसे वर्षों के डेटा को एक डेटाफ़्रेम में नहीं रख सकता। मैंने सभी सेव मोड पर कोशिश की। 'अनदेखा' मोड में यह पहले दिन के लिए बचाता है। 'ओवरराइट' मोड में, यह अंतिम दिन बचाता है। 'एपेंड' मोड में, यह डेटा जोड़ता है। मुझे जो चाहिए वह यह है कि यदि उस दिन के लिए डेटा उपलब्ध है तो उसे उस दिन के लिए अनदेखा करना चाहिए और जो डेटा पहले से मौजूद है उसे छोड़ देना चाहिए लेकिन यदि डेटा उपलब्ध नहीं है तो तिथि के अनुसार विभाजित लकड़ी की छत फ़ाइल में बनाएं। कृपया मदद करे।

-1
user2841795 13 पद 2019, 23:34

1 उत्तर

वर्तमान में कोई PySpark SaveMode नहीं है जो आपको मौजूदा विभाजनों को संरक्षित करने की अनुमति देता है, जबकि नए को सम्मिलित करते हुए, यदि आप हाइव विभाजन का उपयोग करना चाहते हैं (जो आप पूछ रहे हैं, जब आप विधि को कॉल करते हैं partitionBy ) ध्यान दें कि इसके विपरीत करने का विकल्प है, जो कुछ विभाजनों में डेटा को अधिलेखित करना है, जबकि उन विभाजनों को संरक्षित करना जिनके लिए डेटाफ़्रेम में कोई डेटा नहीं है (कॉन्फ़िगरेशन सेटिंग "spark.sql.sources.partitionOverwriteMode" को "dynamic" पर सेट करें और डेटासेट लिखते समय SaveMode.Overwrite का उपयोग करें)।

हालांकि, पहले से मौजूद सभी विभाजनों का एक सेट बनाकर आप अभी भी वह हासिल कर सकते हैं जो आप चाहते हैं। आप ऐसा PySpark के साथ कर सकते हैं, या किसी भी लाइब्रेरी का उपयोग कर सकते हैं जो आपको फाइल सिस्टम (जैसे Azure Data Lake Storage Gen2) या की-वैल्यू स्टोर्स (जैसे AWS S3) में लिस्टिंग ऑपरेशन करने की अनुमति देगा। एक बार आपके पास वह सूची हो जाने के बाद, आप इसका उपयोग उस डेटा के लिए नए डेटासेट को फ़िल्टर करने के लिए करते हैं जिसे आप अभी भी लिखना चाहते हैं। यहाँ केवल PySpark के साथ एक उदाहरण दिया गया है:

In [1]: from pyspark.sql.functions import lit
   ...: df = spark.range(3).withColumn("foo", lit("bar"))
   ...: dir = "/tmp/foo"
   ...: df.write.mode("overwrite").partitionBy("id").parquet(dir)  # initial seeding
   ...: ! tree /tmp/foo
   ...: 
   ...: 
/tmp/foo                                                                        
├── id=0
│   └── part-00001-5d14d286-81e1-4eb1-969e-c0d8089712ce.c000.snappy.parquet
├── id=1
│   └── part-00002-5d14d286-81e1-4eb1-969e-c0d8089712ce.c000.snappy.parquet
├── id=2
│   └── part-00003-5d14d286-81e1-4eb1-969e-c0d8089712ce.c000.snappy.parquet
└── _SUCCESS

3 directories, 4 files

In [2]: df2 = spark.range(5).withColumn("foo", lit("baz"))
   ...: existing_partitions = spark.read.parquet(dir).select("id").distinct()
   ...: df3 = df2.join(existing_partitions, "id", how="left_anti")
   ...: df3.write.mode("append").partitionBy("id").parquet(dir)
   ...: spark.read.parquet(dir).orderBy("id").show()
   ...: 
   ...: 
+---+---+                                                                       
|foo| id|
+---+---+
|bar|  0|
|bar|  1|
|bar|  2|
|baz|  3|
|baz|  4|
+---+---+

जैसा कि आप देख सकते हैं, केवल 2 विभाजन जोड़े गए थे। जो पहले से मौजूद थे, उन्हें संरक्षित कर लिया गया है।

अब, existing_partitions DataFrame प्राप्त करने के लिए डेटा को पढ़ने की आवश्यकता है। स्पार्क वास्तव में सभी डेटा को नहीं पढ़ेगा, हालांकि केवल विभाजन कॉलम और मेटाडेटा। जैसा कि पहले उल्लेख किया गया है, आप अपने डेटा को संग्रहीत करने के लिए प्रासंगिक किसी भी एपीआई का उपयोग करके यह डेटा प्राप्त कर सकते हैं। मेरे विशेष मामले में और आपके मामले में, यह देखते हुए कि आप डेटाब्रिक्स पर /mnt फ़ोल्डर में कैसे लिख रहे हैं, मैं बस अंतर्निहित पायथन फ़ंक्शन का उपयोग कर सकता था os.walk: dirnames = next(os.walk(dir))[1], और उससे एक डेटाफ़्रेम बनाया।

वैसे, आपके द्वारा देखे गए व्यवहारों को प्राप्त करने का कारण यह है:

  1. मोड को अनदेखा करें

    'अनदेखा' मोड में यह पहले दिन के लिए बचत करता है।

    चूंकि आप फॉर-लूप का उपयोग कर रहे हैं और आउटपुट निर्देशिका शुरू में शायद गैर-मौजूद थी, इसलिए पहली तारीख विभाजन लिखा जाएगा। फॉर-लूप के सभी बाद के पुनरावृत्तियों में, DataFrameWriter ऑब्जेक्ट अब और नहीं लिखेगा, क्योंकि इसमें पहले से ही कुछ डेटा (पहली तारीख के लिए एक विभाजन) है।

  2. ओवरराइट मोड

    'ओवरराइट' मोड में, यह अंतिम दिन बचाता है।

    वास्तव में, यह फॉर-लूप के प्रत्येक पुनरावृत्ति में एक विभाजन को सहेजता है, लेकिन क्योंकि आप DataFrameWriter को अधिलेखित करने का निर्देश दे रहे हैं, यह निर्देशिका में पहले से मौजूद सभी विभाजनों को हटा देगा। तो ऐसा लगता है कि केवल आखिरी लिखा गया था।

  3. मोड संलग्न करें

    'एपेंड' मोड में, यह डेटा जोड़ता है इसके लिए और स्पष्टीकरण की आवश्यकता नहीं है।

एक सुझाव: डेटाबेस से कई बार पढ़ने की आवश्यकता नहीं है (कई अलग-अलग प्रश्नों और जेडीबीसी-कनेक्शन बनाने के लिए फॉर-लूप का उपयोग करके)। आप शायद WHERE date BETWEEN %(start) AND %(end) के लिए क्वेरी को अपडेट कर सकते हैं, फॉर-लूप को पूरी तरह से हटा सकते हैं और एक कुशल लेखन का आनंद ले सकते हैं।

0
Oliver W. 15 पद 2019, 02:53