मैं डेटाफ्रेम में एक आरडीडी [सेक [स्ट्रिंग]] को पार्स करने की कोशिश कर रहा हूं। यद्यपि यह स्ट्रिंग्स का एक सेक है, लेकिन उनके पास इंट, बूलियन, डबल, स्ट्रिंग इत्यादि जैसे अधिक विशिष्ट प्रकार हो सकते हैं। उदाहरण के लिए, एक पंक्ति हो सकती है:
"hello", "1", "bye", "1.1"
"hello1", "11", "bye1", "2.1"
...
एक अन्य निष्पादन में स्तंभों की संख्या भिन्न हो सकती है।
पहला कॉलम हमेशा एक स्ट्रिंग होने वाला है, दूसरा एक इंट वगैरह और यह हमेशा इस तरह से रहने वाला है। दूसरी ओर, एक निष्पादन में पाँच तत्वों का seq हो सकता है और अन्य निष्पादन में 2000 हो सकता है, इसलिए यह निष्पादन पर निर्भर करता है। प्रत्येक निष्पादन में कॉलम के प्रकार का नाम परिभाषित किया गया है।
ऐसा करने के लिए, मेरे पास ऐसा कुछ हो सकता है:
//I could have a parameter to generate the StructType dinamically.
def getSchema(): StructType = {
var schemaArray = scala.collection.mutable.ArrayBuffer[StructField]()
schemaArray += StructField("col1" , IntegerType, true)
schemaArray += StructField("col2" , StringType, true)
schemaArray += StructField("col3" , DoubleType, true)
StructType(schemaArray)
}
//Array of Any?? it doesn't seem the best option!!
val l1: Seq[Any] = Seq(1,"2", 1.1 )
val rdd1 = sc.parallelize(l1).map(Row.fromSeq(_))
val schema = getSchema()
val df = sqlContext.createDataFrame(rdd1, schema)
df.show()
df.schema
मुझे किसी का Seq रखना बिल्कुल भी पसंद नहीं है, लेकिन वास्तव में मेरे पास यही है। एक और मौका??
दूसरी तरफ मैं सोच रहा था कि मेरे पास एक सीएसवी के समान कुछ है, मैं एक बना सकता हूं। स्पार्क के साथ एक सीएसवी पढ़ने के लिए एक पुस्तकालय है और एक डेटाफ्रेम लौटाता है जहां प्रकारों का अनुमान लगाया जाता है। अगर मेरे पास पहले से ही एक आरडीडी [स्ट्रिंग] है तो क्या इसे कॉल करना संभव है?
1 उत्तर
चूंकि प्रत्येक निष्पादन के लिए स्तंभों की संख्या में परिवर्तन होता है, इसलिए मैं सीएसवी विकल्प के साथ जाने का सुझाव दूंगा जिसमें सीमांकक अंतरिक्ष या कुछ और पर सेट हो। इस तरह स्पार्क आपके लिए कॉलम प्रकारों का पता लगाएगा।
अपडेट करें:
चूंकि आपने उल्लेख किया है कि आप HBase से डेटा पढ़ते हैं, एक तरीका यह है कि HBase पंक्ति को JSON या CSV में परिवर्तित किया जाए और फिर RDD को डेटाफ़्रेम में परिवर्तित किया जाए:
val jsons = hbaseContext.hbaseRDD(tableName, scan).map{case (_, r) =>
val currentJson = new JSONObject
val cScanner = r.cellScanner
while (cScanner.advance) {
currentJson.put(Bytes.toString(cScanner.current.getQualifierArray, cScanner.current.getQualifierOffset, cScanner.current.getQualifierLength),
Bytes.toString(cScanner.current.getValueArray, cScanner.current.getValueOffset, cScanner.current.getValueLength))
}
currentJson.toString
}
val df = spark.read.json(spark.createDataset(jsons))
सीएसवी के लिए भी ऐसा ही किया जा सकता है।