मेरे प्रश्न की व्याख्या करने के लिए अलग है: यह प्रश्न चिह्नित प्रश्न से अलग है। सबसे पहले, इनपुट पैरामीटर पहले से ही एक निर्देशिका है (जो सही है लेकिन चिह्नित प्रश्न गलत है)। दूसरा, मैंने नई txt फ़ाइल आने का अनुकरण करने के लिए स्ट्रीमिंग के दौरान txt फ़ाइल को निर्देशिका में कॉपी किया (इसलिए इस निर्देशिका में मौजूद समान फ़ाइलों के बजाय नई फ़ाइलें उत्पन्न होती हैं)

मेरा प्रश्न नीचे है


मेरे पास एक निर्देशिका और txt फ़ाइल है /tmp/a.txt, फ़ाइल में सामग्री है

aaa
bbb

मैं pyspark का उपयोग करता हूं और मैन्युअल रूप से इस फ़ाइल को उसी निर्देशिका में कॉपी करता हूं, लगातार (स्ट्रीमिंग के दौरान फाइलें एक ही समय में बनाई जाती हैं)

def count(x):
    if x.isEmpty:
        print("empty")
        return
    print(x.count())

sc = SparkContext()
ssc = StreamingContext(sc, 3)
ssc.textFileStream("/tmp/").foreachRDD(count)

आउटपुट दिखाता है कि RDD खाली है

हालाँकि मैं उपयोग करता हूँ

c = sc.textFile("/tmp/").count()
print(c)

यह दिखाता है कि c 2 है (txt फ़ाइल सामग्री के अनुरूप)

स्ट्रीमिंग क्यों काम नहीं करती है?

1
Litchy 18 जून 2019, 09:49

2 जवाब

सबसे बढ़िया उत्तर

मुझे स्कैला में समाधान मिला है (अभी भी अजगर में नई फाइलें नहीं उठा सकता)

सबसे पहले, sc.textFile और sc.textFileStream समान पैरामीटर लेते हैं, जो एक निर्देशिका नाम है। तो उपरोक्त कोड सही है।

हालांकि, अंतर यह है कि यदि निर्देशिका मौजूद है तो sc.textFile फ़ाइलों को लेने के लिए ठीक है (और यह मौजूद होना चाहिए अन्यथा InvalidInputException उठाया जाएगा), लेकिन स्ट्रीमिंग मोड में sc.textFileStream (स्थानीय फ़ाइल सिस्टम), यह मांग करता है कि निर्देशिका मौजूद नहीं है और स्ट्रीमिंग प्रोग्राम द्वारा बनाई गई है, अन्यथा नई फाइलें नहीं उठाई जा सकती हैं (एक बग प्रतीत होता है, केवल स्थानीय फाइल सिस्टम में मौजूद है, एचडीएफएस में दूसरों के अनुसार अच्छी तरह से काम करता प्रतीत होता है अनुभव)।

इसके अलावा, कुछ अन्य अनुभव से वे कहते हैं कि यदि आप निर्देशिका को हटाते हैं और प्रोग्राम को फिर से चलाते हैं, तो रीसायकल बिन भी खाली होना चाहिए।


हालांकि, पायथन में यह समस्या अभी भी मौजूद है, और निर्देशिका में कोई फाइल मौजूद नहीं होने के दौरान, स्कैला प्रोग्राम सिर्फ 0 प्रिंट करेगा लेकिन पायथन प्रोग्राम चेतावनी देगा

WARN FileInputDStream:87 - Error finding new files 
java.lang.NullPointerException

यहाँ अजगर और स्कैला में मेरा कोड है, नई फाइलें लिखने का तरीका समान है इसलिए मैं इसे यहां पोस्ट नहीं करता हूं

पायथन कोड:

if __name__ == "__main__":
    sc = SparkContext()    
    ssc = StreamingContext(sc, 3)
    ssc.textFileStream(path).foreachRDD(lambda x: print(x.count()))
    ssc.start()
    ssc.awaitTermination()

स्कैला कोड:

def main(args: Array[String]): Unit = {  
  val sc = new SparkContext()
  val ssc = new StreamingContext(sc, Seconds(3))
  ssc.textFileStream(params.inputPath).foreachRDD { x =>
    print(x.count())
  }
  ssc.start()
  ssc.awaitTermination()
}
0
Litchy 20 जून 2019, 06:16

क्या आप /tmp/a.txt फ़ाइल में जोड़ी जा रही नई लाइनों को लेने का प्रयास कर रहे हैं या आप tmp निर्देशिका में जोड़ी जा रही नई फ़ाइलों को लेने का प्रयास कर रहे हैं?

यदि यह बाद वाला है तो अपनी अंतिम पंक्ति को इसके साथ बदलने का प्रयास करें

ssc.textFileStream("/tmp/*").foreachRDD(count)

0
kesubagu 19 जून 2019, 05:57