मेरे पास एक ग्राफ है जो एकाधिक gzipped फ़ाइलों से लाइनों को पढ़ता है और उन पंक्तियों को gzipped फ़ाइलों के दूसरे सेट पर लिखता है, प्रत्येक पंक्ति में कुछ मान के अनुसार मैप किया जाता है।

यह छोटे डेटा सेट के विरुद्ध सही ढंग से काम करता है, लेकिन बड़े डेटा पर समाप्त करने में विफल रहता है। (यह उस डेटा का आकार नहीं हो सकता है जिसके लिए दोष देना है, क्योंकि मैंने इसे सुनिश्चित करने के लिए पर्याप्त समय नहीं चलाया है - इसमें कुछ समय लगता है)।

def files: Source[File, NotUsed] =
  Source.fromIterator(
    () =>
      Files
        .fileTraverser()
        .breadthFirst(inDir)
        .asScala
        .filter(_.getName.endsWith(".gz"))
        .toIterator)

def extract =
  Flow[File]
    .mapConcat[String](unzip)
    .mapConcat(s =>
      (JsonMethods.parse(s) \ "tk").extract[Array[String]].map(_ -> s).to[collection.immutable.Iterable])
    .groupBy(1 << 16, _._1)
    .groupedWithin(1000, 1.second)
    .map { lines =>
      val w = writer(lines.head._1)
      w.println(lines.map(_._2).mkString("\n"))
      w.close()
      Done
    }
    .mergeSubstreams

def unzip(f: File) = {
  scala.io.Source
    .fromInputStream(new GZIPInputStream(new FileInputStream(f)))
    .getLines
    .toIterable
    .to[collection.immutable.Iterable]
}

def writer(tk: String): PrintWriter =
  new PrintWriter(
    new OutputStreamWriter(
      new GZIPOutputStream(
        new FileOutputStream(new File(outDir, s"$tk.json.gz"), true)
      ))
  )

val process = files.via(extract).toMat(Sink.ignore)(Keep.right).run()

Await.result(process, Duration.Inf)

थ्रेड डंप दिखाता है कि प्रक्रिया WAITING Await.result(process, Duration.Inf) पर है और कुछ भी नहीं हो रहा है।

अक्का v2.5.15 . के साथ OpenJDK v11

1
Synesso 17 अक्टूबर 2018, 10:28

1 उत्तर

सबसे बढ़िया उत्तर

सबसे अधिक संभावना है कि यह groupBy में फंस गया है क्योंकि यह सभी स्रोतों के लिए 2^16 समूहों में आइटम इकट्ठा करने के लिए प्रेषक में उपलब्ध धागे से बाहर हो गया है।

तो अगर मैं तुम होते तो शायद मैं extract में अर्ध-मैन्युअल रूप से statefulMapConcat का उपयोग करने योग्य Map[KeyType, List[String]] के साथ समूहीकरण लागू करता। या पहले groupedWithin के साथ बफर लाइनें और उन्हें समूहों में विभाजित करें जिन्हें आप Sink.foreach में विभिन्न फाइलों में लिखेंगे।

1
expert 26 अक्टूबर 2018, 13:00