मैं दस्तावेज़ीकरण और कुछ डेटा को "समतल" करने के लिए एक टेबल फ़ंक्शन बनाएं। टेबल फंक्शन ठीक काम करता प्रतीत होता है जब joinLateral को समतल करने के लिए उपयोग किया जाता है। हालांकि leftOuterJoinLateral का उपयोग करते समय, मुझे निम्न त्रुटि मिलती है। मैं स्कैला का उपयोग कर रहा हूं और एक ही परिणाम के साथ टेबल एपीआई और एसक्यूएल दोनों की कोशिश की है:

इसके कारण: java.lang.NullPointerException: शून्य परिणाम केस क्लास में संग्रहीत नहीं किया जा सकता है।

यहाँ मेरा काम है:

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.functions.TableFunction

object example_job{
  // Split the List[Int] into multiple rows
  class Split() extends TableFunction[Int] {
    def eval(nums: List[Int]): Unit = {
      nums.foreach(x =>
        if(x != 3) {
          collect(x)
      })
    }
  }

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.createLocalEnvironment()
    val tableEnv = StreamTableEnvironment.create(env)
    val splitMe = new Split()

    // Create some dummy data
    val events: DataStream[(String, List[Int])] = env.fromElements(("simon", List(1,2,3)), ("jessica", List(3)))
    
    val table = tableEnv.fromDataStream(events, 'name, 'numbers)
      .leftOuterJoinLateral(splitMe('numbers) as 'number)
      .select('name, 'number)
    table.toAppendStream[(String, Int)].print()
    env.execute("Flink jira ticket example")
  }
}

जब मैं .leftOuterJoinLateral को .joinLateral में बदलता हूं तो मुझे अपेक्षित परिणाम मिलता है:

(simon,1)
(simon,2)

.leftOuterJoinLateral का उपयोग करते समय मैं कुछ इस तरह की अपेक्षा करता हूं:

(simon,1)
(simon,2)
(simon,null) // or (simon, None)
(jessica,null) // or (jessica, None)

ऐसा लगता है कि यह स्कैला एपीआई के साथ एक बग हो सकता है? अगर मैं कुछ बेवकूफी कर रहा हूँ तो मैं टिकट बढ़ाने से पहले यहाँ जाँच करना चाहता हूँ!

1
Simon D 12 अक्टूबर 2019, 00:37

1 उत्तर

सबसे बढ़िया उत्तर

समस्या यह है कि फ़्लिंक प्रति डिफ़ॉल्ट अपेक्षा करता है कि एक पंक्ति के सभी फ़ील्ड गैर-शून्य हैं। यही कारण है कि प्रोग्राम तब विफल हो जाता है जब वह बाहरी जॉइन ऑपरेशन से null परिणाम देखता है। null मान स्वीकार करने के लिए, आपको या तो शून्य जांच को अक्षम करना होगा

val tableConfig = tableEnv.getConfig
tableConfig.setNullCheck(false)

या आपको शून्य मानों को सहन करने के लिए परिणाम प्रकार निर्दिष्ट करना होगा, उदा। एक कस्टम POJO आउटपुट प्रकार निर्दिष्ट करना:

table.toAppendStream[MyOutput].print()

साथ

class MyOutput(var name: String, var number: Integer) {
  def this() {
    this(null, null)
  }

  override def toString: String = s"($name, $number)"
}
1
Till Rohrmann 14 अक्टूबर 2019, 19:20