मेरी आवश्यकता हैडर के साथ सीएसवी फ़ाइल से डेटा पढ़ना और डेटाफ्लो के साथ पायथन का उपयोग करके Google डेटा स्टोर में समान संरचना बनाना है। मैंने नीचे के रूप में एक नमूना कोड बनाने की कोशिश की थी।

मेरा नमूना सीएसवी नीचे है,

First Name,Last Name,Date of Birth
Tom,Cruise,"July 3, 1962"
Bruce,Willis,"March 19, 1955"
Morgan,Freeman,"June 1, 1937"
John,Wayne,"May 26, 1907"

मेरा अजगर 2.7 कोड स्निपेट नीचे जैसा है

import csv
import datetime
import logging

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
from google.cloud.proto.datastore.v1 import entity_pb2
from googledatastore import helper as datastore_helper
from apache_beam.io.filesystems import FileSystems
from apache_beam import pvalue


class CSVtoDict(beam.DoFn):
    """Converts line into dictionary"""
    def process(self, element, header):
        rec = ""
        element = element.encode('utf-8')
        try:
            for line in csv.reader([element]):
                rec = line

            if len(rec) == len(header):
                data = {header.strip(): val.strip() for header, val in zip(header, rec)}
                return [data]
            else:
                logging.info("row contains bad data")
        except Exception:
            pass

class CreateEntities(beam.DoFn):
    """Creates Datastore entity"""
    def process(self, element):
        entity = entity_pb2.Entity()
        sku = int(element.pop('sku'))
        element[1] = float(element[1])
        element['salePrice'] = float(element['salePrice'])
        element['name'] = unicode(element['name'].decode('utf-8'))
        element['type'] = unicode(element['type'].decode('utf-8'))
        element['url'] = unicode(element['url'].decode('utf-8'))
        element['image'] = unicode(element['image'].decode('utf-8'))
        element['inStoreAvailability'] = unicode(element['inStoreAvailability'])

        datastore_helper.add_key_path(entity.key, 'Productx', sku)
        datastore_helper.add_properties(entity, element)
        return [entity]


class ProcessOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
                '--input',
                dest='input',
                type=str,
                required=False,
                help='Input file to read. This can be a local file or a file in a Google Storage Bucket.')

def read_header_from_filename(filename):
  # note that depending on your newline character/file encoding, this may need to be modified
  file_handle = FileSystems.open(filename)  
  header = file_handle.readline()
  return header.split(',')

process_options = PipelineOptions().view_as(ProcessOptions)
p = beam.Pipeline(options=process_options)
# Create PCollection containing header line
header = (p
          | beam.Create(process_options.input)
          | beam.Map(read_header_from_filename))

def dataflow(argv=None):
    process_options = PipelineOptions().view_as(ProcessOptions)
    p = beam.Pipeline(options=process_options)

    (p
    | 'Reading input file' >> beam.io.ReadFromText(process_options.input)
    | 'Converting from csv to dict' >> beam.ParDo(CSVtoDict(), pvalue.AsSingleton(header))
    | 'Create entities' >> beam.ParDo(CreateEntities())
    | 'Write entities into Datastore' >> WriteToDatastore('isc-am-poc')
    )    
    p.run().wait_until_finish()

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    dataflow()

मैं डेटा प्रवाह का उपयोग करके इकाइयों को लोड करने में सक्षम हो सकता हूं, हालांकि मैं सीएसवी फ़ाइल को हैडर और फिर पंक्तियों से पार्स करना चाहता हूं, क्लास CreateEntities में मानों को हार्ड कोडिंग करने के बजाय और डेटा स्टोर इकाइयों में इसे लिखना।

मूल रूप से उसी CSV फ़ाइल को अपलोड करें जिसे पंक्तियों के साथ डेटा प्रवाह कार्य में इनपुट के रूप में दिया गया है। क्या कोई कृपया मदद कर सकता है?

Required Output in Data Store for Key Actor:

First Name Last Name Date of Birth
Tom,Cruise "July 3, 1962"
Bruce,Willis "March 19, 1955"
Morgan,Freeman "June 1, 1937"
John,Wayne "May 26, 1907"

संपादित करें: मैंने आपके द्वारा दिए गए कोड को शामिल कर लिया था और अब नीचे त्रुटि प्राप्त कर रहा हूं। मैं पायथन 2.7 का उपयोग कर रहा हूं और संबंधित पुस्तकालयों को आयात कर रहा हूं। क्षमा करें, मैं पायथन के लिए बहुत नया हूं।

Error:
  File "/usr/lib/python2.7/runpy.py", line 174, in _run_module_as_main
    "__main__", fname, loader, pkg_name)
  File "/usr/lib/python2.7/runpy.py", line 72, in _run_code
    exec code in run_globals
  File "/home/gurusankar_p/upload-data-datastore-dataflow/upload2.py", line 70, in <module>
    | beam.Map(read_header_from_filename))
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/transforms/core.py", line 2423, in __init__
    self.values = tuple(values)
TypeError: 'RuntimeValueProvider' object is not iterable

धन्यवाद, जी एस

0
Amdone 8 जिंदा 2020, 17:36

1 उत्तर

सबसे बढ़िया उत्तर

अपाचे बीम कई श्रमिकों में फ़ाइल के पढ़ने को विभाजित करके आपके डेटा के प्रसंस्करण को समानांतर करता है, जिसका अर्थ है कि अधिकांश कर्मचारी हेडर लाइन को कभी भी नहीं पढ़ेंगे।

आप क्या करना चाहते हैं शामिल हों हैडर लाइन के साथ पढ़ी जाने वाली पंक्तियां। चूंकि हेडर लाइन डेटा की एक छोटी मात्रा है, आप इसे एक अलग पीसीओलेक्शन के रूप में पढ़ सकते हैं और इसे CSVtoDict को साइड इनपुट के रूप में पास कर सकते हैं।

आपकी हेडर लाइन पढ़ने के लिए कुछ उदाहरण कोड:

def read_header_from_filename(filename):
  # note that depending on your newline character/file encoding, this may need to be modified
  file_handle = FileSystems.open(filename)  
  header = file_handle.readline()
  return header.split(',')

# Create PCollection containing header line
header = (p
          | beam.Create(process_options.input) 
          | beam.Map(read_header_from_filename))

आपका पाइपलाइन निर्माण कोड बन जाता है:

    (p 
    | 'Reading input file' >> beam.io.ReadFromText(process_options.input)
    | 'Converting from csv to dict' >> beam.ParDo(CSVtoDict(), pvalue.AsSingleton(header))
    | 'Create entities' >> beam.ParDo(CreateEntities())
    | 'Write entities into Datastore' >> WriteToDatastore('isc-am-poc')
     )    
    p.run().wait_until_finish()
1
Lukasz Cwik 8 जिंदा 2020, 19:44