मैं डेटाफ्रेम में एक आरडीडी [सेक [स्ट्रिंग]] को पार्स करने की कोशिश कर रहा हूं। यद्यपि यह स्ट्रिंग्स का एक सेक है, लेकिन उनके पास इंट, बूलियन, डबल, स्ट्रिंग इत्यादि जैसे अधिक विशिष्ट प्रकार हो सकते हैं। उदाहरण के लिए, एक पंक्ति हो सकती है:

"hello", "1", "bye", "1.1"
"hello1", "11", "bye1", "2.1"
...

एक अन्य निष्पादन में स्तंभों की संख्या भिन्न हो सकती है।

पहला कॉलम हमेशा एक स्ट्रिंग होने वाला है, दूसरा एक इंट वगैरह और यह हमेशा इस तरह से रहने वाला है। दूसरी ओर, एक निष्पादन में पाँच तत्वों का seq हो सकता है और अन्य निष्पादन में 2000 हो सकता है, इसलिए यह निष्पादन पर निर्भर करता है। प्रत्येक निष्पादन में कॉलम के प्रकार का नाम परिभाषित किया गया है।

ऐसा करने के लिए, मेरे पास ऐसा कुछ हो सकता है:

//I could have a parameter to generate the StructType dinamically.
def getSchema(): StructType = {
  var schemaArray = scala.collection.mutable.ArrayBuffer[StructField]()
  schemaArray += StructField("col1" , IntegerType, true)
  schemaArray += StructField("col2" , StringType, true)
  schemaArray += StructField("col3" , DoubleType, true)
  StructType(schemaArray)
}

//Array of Any?? it doesn't seem the best option!!
val l1: Seq[Any] = Seq(1,"2", 1.1 )
val rdd1 = sc.parallelize(l1).map(Row.fromSeq(_))

val schema = getSchema()
val df = sqlContext.createDataFrame(rdd1, schema)
df.show()
df.schema

मुझे किसी का Seq रखना बिल्कुल भी पसंद नहीं है, लेकिन वास्तव में मेरे पास यही है। एक और मौका??

दूसरी तरफ मैं सोच रहा था कि मेरे पास एक सीएसवी के समान कुछ है, मैं एक बना सकता हूं। स्पार्क के साथ एक सीएसवी पढ़ने के लिए एक पुस्तकालय है और एक डेटाफ्रेम लौटाता है जहां प्रकारों का अनुमान लगाया जाता है। अगर मेरे पास पहले से ही एक आरडीडी [स्ट्रिंग] है तो क्या इसे कॉल करना संभव है?

1
Guille 17 जुलाई 2019, 14:58

1 उत्तर

सबसे बढ़िया उत्तर

चूंकि प्रत्येक निष्पादन के लिए स्तंभों की संख्या में परिवर्तन होता है, इसलिए मैं सीएसवी विकल्प के साथ जाने का सुझाव दूंगा जिसमें सीमांकक अंतरिक्ष या कुछ और पर सेट हो। इस तरह स्पार्क आपके लिए कॉलम प्रकारों का पता लगाएगा।

अपडेट करें:

चूंकि आपने उल्लेख किया है कि आप HBase से डेटा पढ़ते हैं, एक तरीका यह है कि HBase पंक्ति को JSON या CSV में परिवर्तित किया जाए और फिर RDD को डेटाफ़्रेम में परिवर्तित किया जाए:

val jsons = hbaseContext.hbaseRDD(tableName, scan).map{case (_, r) =>
  val currentJson = new JSONObject
  val cScanner = r.cellScanner
  while (cScanner.advance) {
    currentJson.put(Bytes.toString(cScanner.current.getQualifierArray, cScanner.current.getQualifierOffset, cScanner.current.getQualifierLength),
      Bytes.toString(cScanner.current.getValueArray, cScanner.current.getValueOffset, cScanner.current.getValueLength))
  }
  currentJson.toString
}
val df = spark.read.json(spark.createDataset(jsons))

सीएसवी के लिए भी ऐसा ही किया जा सकता है।

2
Community 20 जून 2020, 12:12