मेरे पास एक पायथन लिपि है जो पायरो का उपयोग करके एक लकड़ी की छत फ़ाइल में पढ़ती है। मैं इसमें मूल्यों को अद्यतन करने के लिए तालिका के माध्यम से लूप करने की कोशिश कर रहा हूं। अगर मैं यह कोशिश करता हूं:

for col_name in table2.column_names:
    if col_name in my_columns:
        print('updating values in column '  + col_name)
        
        col_data = pa.Table.column(table2, col_name)
        
        row_ct = 1
        for i in col_data:
            pa.Table.column(table2, col_name)[row_ct] = change_str(pa.StringScalar.as_py(i))
            row_ct += 1

मुझे यह त्रुटि मिलती है:

 TypeError: 'pyarrow.lib.ChunkedArray' object does not support item assignment

मैं इन मूल्यों को कैसे अपडेट कर सकता हूं?

मैंने पांडा का उपयोग करने की कोशिश की, लेकिन यह मूल तालिका में शून्य मानों को संभाल नहीं सका, और इसने मूल तालिका में कॉलम के डेटाटाइप का गलत अनुवाद भी किया। क्या पायरो के पास डेटा संपादित करने का मूल तरीका है?

0
raphael75 22 जिंदा 2021, 16:01

3 जवाब

सबसे बढ़िया उत्तर

एरो टेबल (और सरणियाँ) अपरिवर्तनीय हैं। इसलिए आप अपनी तालिका को अपनी जगह पर अपडेट नहीं कर पाएंगे।

इसे प्राप्त करने का तरीका डेटा को संशोधित करते समय उसकी प्रतिलिपि बनाना है। एरो स्ट्रिंग्स को संशोधित करने के लिए कुछ बुनियादी संचालन का समर्थन करता है, लेकिन वे हैं बहुत सीमित।

एक अन्य विकल्प पांडा का उपयोग करना है, लेकिन जैसा कि आपने देखा है कि तीर से पांडा तक जाना और वापस जाना सहज नहीं है।

आइए लेते हैं और उदाहरण:

>>> table = pa.Table.from_arrays(
    [ 
        pa.array(['abc', 'def'], pa.string()),
        pa.array([1, None], pa.int32()),
    ],
    schema=pa.schema(
    [
        pa.field('str_col', pa.string()), 
        pa.field('int_col', pa.int32()), 
    ]
    )
)
>>> from_pandas = pa.Table.from_pandas(table.to_pandas())
>>> from_pandas.schema
str_col: string
int_col: double
-- schema metadata --
pandas: '{"index_columns": [{"kind": "range", "name": null, "start": 0, "' + 487

आप देख सकते हैं कि पांडा और बैक में कनवर्ट करने से इंट कॉलम का प्रकार डबल हो गया है। ऐसा इसलिए है क्योंकि पांडा अशक्त इंट मानों का बहुत अच्छी तरह से समर्थन नहीं करता है, इसलिए इसने इंट कॉलम को डबल में बदल दिया।

इस समस्या से बचने के लिए मैं कॉलम के आधार पर कॉलम पर काम करने का सुझाव दूंगा, केवल स्ट्रिंग कॉलम को पांडा में परिवर्तित करना:

def my_func(value):
    return 'hello ' + value + '!'


columns = []
my_columns = ['str_col']
for column_name in table.column_names:
    column_data = table[column_name]
    if column_name in my_columns:
        column_data = pa.array(table['str_col'].to_pandas().apply(my_func))
    columns.append(column_data)

updated_table = pa.Table.from_arrays(
    columns, 
    schema=table.schema
)
>>> table['str_col']
<pyarrow.lib.ChunkedArray object at 0x7f05f42b3f40>
[
  [
    "hello abc!",
    "hello def!"
  ]
]
4
0x26res 24 जिंदा 2021, 19:32

पाइरो में सरणी डेटा को अपडेट करने का मूल तरीका पाइरो कंप्यूट फंक्शन है। आपके द्वारा वर्णित पांडा में कनवर्ट करना भी इसे प्राप्त करने का एक वैध तरीका है ताकि आप इसे समझना चाहें। हालांकि, एपीआई आपके पास मौजूद दृष्टिकोण से मेल नहीं खाएगा।

आप वर्तमान में एक पायथन फ़ंक्शन change_str में तय करते हैं कि प्रत्येक आइटम का नया मान क्या होना चाहिए। उम्मीद है कि पाइरो कंप्यूट फ़ंक्शंस के एक संयोजन के रूप में आपको जो हेरफेर करने की आवश्यकता है उसे व्यक्त करना संभव है। यह संपूर्ण देशी सरणी को अजगर वस्तुओं में मार्शल करने की (महंगी) लागत से बच जाएगा। यदि आप change_str (शायद एक नए प्रश्न में) में जो हासिल करने की कोशिश कर रहे हैं उसका वर्णन कर सकते हैं तो मैं इसे समझने में मदद कर सकता हूं।

अगर, किसी कारण से, आपको change_str को पायथन में रखना होगा तो आपको ChunkedArray.to_pylist()

3
Pace 22 जिंदा 2021, 23:43

मैं इन संदर्भों का उपयोग करके इसे काम करने में सक्षम था:

http://arrow.apache.org/docs/python/generated/pyarrow.Table.html

http://arrow.apache.org/docs/python/generated/pyarrow.Field.html

https://github.com/apache/arrow/blob/master/python/pyarrow/tests/test_table.py

मूल रूप से यह मूल तालिका के माध्यम से लूप करता है और समायोजित पाठ के साथ नए कॉलम (pa.array) बनाता है जो इसे एक नई तालिका में जोड़ता है। यह शायद ऐसा करने का सबसे अच्छा तरीका नहीं है, लेकिन इसने काम किया। सबसे महत्वपूर्ण बात, यह मुझे नल को संरक्षित करने और प्रत्येक कॉलम के डेटा प्रकार को निर्दिष्ट करने देता है।

import sys, getopt
import random
import re
import math

import pyarrow.parquet as pq
import pyarrow.csv as pcsv
import numpy as np
import pandas as pd
import pyarrow as pa
import os.path

<a lot of other code here>

parquet_file = pq.ParquetFile(in_file)
table2 = pq.read_table(in_file)

<a lot of other code here>

changed_ct = 0
all_cols_ct = 0
table3 = pa.Table.from_arrays([pa.array(range(0,table2.num_rows))], names=('0')) # CREATE TEMP COLUMN!!
#print(table3)
#exit()
changed_column_list = []
for col_name in table2.column_names:
    print('processing column: ' + col_name)
    new_list = []
    col_data = pa.Table.column(table2, col_name)
    col_data_type = table2.schema.field(col_name).type
    printed_changed_flag = False
    for i in col_data:
        # GET STRING REPRESENTATION OF THE COLUMN DATA
        if(col_data_type == 'string'):
            col_str = pa.StringScalar.as_py(i)
        elif(col_data_type == 'int32'):
            col_str = pa.Int32Scalar.as_py(i)
        elif(col_data_type == 'int64'):
            col_str = pa.Int64Scalar.as_py(i)
            
            
        if col_name in change_columns:
            if printed_changed_flag == False:
                print('changing values in column '  + col_name)
                changed_column_list.append(col_name)
                changed_ct += 1
                printed_changed_flag = True

            new_list.append(change_str(col_str))
        
        else:
            new_list.append(col_str)
        
    #set data type for the column
    if(col_data_type == 'string'):
        col_data_type = pa.string()
    elif(col_data_type == 'int32'):
        col_data_type = pa.int32()
    elif(col_data_type == 'int64'):
        col_data_type = pa.int64()
        
    arr = pa.array(new_list, type=col_data_type)
        
    new_field = pa.field(col_name, col_data_type)
    
    table3 = pa.Table.append_column(table3, new_field, arr)
        
    all_cols_ct += 1
    
#for i in table3:
#   print(i)

table3 = pa.Table.remove_column(table3, 0) # REMOVE TEMP COLUMN!!
#print(table2)
#print('-------------------')
#print(table3)
#exit()

print('changed ' + str(changed_ct) + ' columns:')
print(*changed_column_list, sep='\n')

# WRITE NEW PARQUET FILE
pa.parquet.write_table(table3, out_file)
1
raphael75 15 फरवरी 2021, 16:05