जैसा कि नीचे दिए गए कोड में दिखाया गया है, मैं एक JSON फ़ाइल को डेटाफ़्रेम में पढ़ रहा हूँ और फिर उस डेटाफ़्रेम से कुछ फ़ील्ड को दूसरे फ़ील्ड में चुन रहा हूँ।

df_record = spark.read.json("path/to/file.JSON",multiLine=True)

df_basicInfo = df_record.select(col("key1").alias("ID"), \
                                col("key2").alias("Status"), \
                                col("key3.ResponseType").alias("ResponseType"), \
                                col("key3.someIndicator").alias("SomeIndicator") \
                                )

समस्या यह है कि कभी-कभी, JSON फ़ाइल में कुछ कुंजियाँ नहीं होती हैं जिन्हें मैं लाने का प्रयास करता हूँ - जैसे ResponseType। तो यह त्रुटियों को फेंक देता है जैसे:

org.apache.spark.sql.AnalysisException: No such struct field ResponseType

पढ़ने के समय स्कीमा को मजबूर किए बिना मैं इस मुद्दे को कैसे हल कर सकता हूं? क्या यह उपलब्ध नहीं होने पर उस कॉलम के तहत एक NULL वापस करना संभव है?

मैं कैसे पता लगा सकता हूं कि स्पार्क डेटाफ़्रेम में कॉलम है या नहीं यह पता लगाने का तरीका बताता है कि डेटाफ़्रेम में कॉलम उपलब्ध है या नहीं। हालाँकि, यह प्रश्न उस फ़ंक्शन का उपयोग करने के तरीके के बारे में है।

6
pallupz 4 अक्टूबर 2018, 13:55

3 जवाब

सबसे बढ़िया उत्तर

has_column फ़ंक्शन का उपयोग करके यहां को zero323 और खाली कॉलम जोड़ने के बारे में सामान्य दिशानिर्देश या तो

from pyspark.sql.functions import lit, col, when
from pyspark.sql.types import *

if has_column(df_record, "key3.ResponseType"):
    df_basicInfo = df_record.withColumn("ResponseType", col("key3.ResponseType"))
else:
    # Adjust types according to your needs
    df_basicInfo = df_record.withColumn("ResponseType", lit(None).cast("string")) 

और अपनी जरूरत के प्रत्येक कॉलम के लिए दोहराएं, या

df_record.withColumn(
   "ResponseType", 
   when(
       lit(has_column(df_record, "key3.ResponseType")),
       col("key3.ResponseType")
   ).otherwise(lit(None).cast("string"))

अपनी आवश्यकताओं के अनुसार प्रकारों को समायोजित करें, और शेष स्तंभों के लिए प्रक्रिया दोहराएं।

वैकल्पिक रूप से एक स्कीमा परिभाषित करें जिसमें सभी वांछित प्रकार शामिल हों:

schema = StructType([
    StructField("key1", StringType()),
    StructField("key2", StringType()),
    StructField("key2", StructType([
        StructField("ResponseType", StringType()),
        StructField("someIndicator", StringType()),
    ]))
])

df_record = spark.read.schema(schema).json("path/to/file.JSON",multiLine=True)

(एक बार फिर से प्रकार समायोजित करें), और अपने वर्तमान कोड का उपयोग करें।

8
2 revs, 2 users 93%user10456460 4 अक्टूबर 2018, 15:08

मेरे पास एक ही मुद्दा था, मैंने थॉमस के समान दृष्टिकोण का उपयोग किया। मेरा उपयोगकर्ता परिभाषित फ़ंक्शन कोड:

import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.Row

spark.udf.register("tryGet", (root:GenericRowWithSchema, fieldName: String) => {
    var buffer:Row = root

    if (buffer != null) {
      if (buffer.schema.fieldNames.contains(fieldName)) {
         buffer.getString(buffer.fieldIndex(fieldName))
      } else {
        null
      }
    }
    else {
      null
    }
})

और फिर मेरी क्वेरी:

%sql

SELECT
  Id,
  Created,
  Payload.Type,
  tryGet(Payload, "Error") as Error,
FROM dataWithJson
WHERE Payload.Type = 'Action'
2
Roland Ebner 30 जुलाई 2019, 14:29

स्पार्क में एक साधारण फ़ंक्शन गुम है: struct_has(STRUCT, PATH) या struct_get(STRUCT, PATH, DEFAULT) जहां PATHडॉट नोटेशन का उपयोग करें।

तो मैंने एक बहुत ही सरल यूडीएफ लिखा:

https://gist.github.com/ebuildy/3c9b2663d47f7b65fbc12cfb469ae19c से:

import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.Row

spark.udf.register("struct_def", (root:GenericRowWithSchema, path: String, defaultValue: String) => {

    var fields = path.split("\\.")
    var buffer:Row = root
    val lastItem = fields.last

    fields = fields.dropRight(1)

    fields.foreach( (field:String) => {
        if (buffer != null) {
            if (buffer.schema.fieldNames.contains(field)) {
                buffer = buffer.getStruct(buffer.fieldIndex(field))
            } else {
                buffer = null
            }
        }
    })

    if (buffer == null) {
        defaultValue
    } else {
        buffer.getString(buffer.fieldIndex(lastItem))
    }
})

यह आपको इस तरह से क्वेरी करने देता है:

SELECT struct_get(MY_COL, "foo.bar", "no") FROM DATA
2
Thomas Decaux 28 जून 2019, 11:01