मुझे एक निश्चित पार्सर विरासत में मिला है जिसे प्रत्येक ~ 4m लाइनों के साथ 10 फाइलों को पार्स करना है।

कोड पायथन 2 में लिखा गया था, जिसे मैंने अपडेट किया था।

एक मल्टीप्रोसेसिंग तर्क है जो मुझे काम करने में सक्षम नहीं लगता है।

from multiprocessing.pool import ThreadPool
import glob

DATADIR = 'home/my_dir/where/all/my/files/are'

def process_file(filepath):
    # read line by line, parse and insert to postgres database. 

def process_directory(dirpath):
    pattern = f'{dirpath}/*dat'  # files have .dat extension. 
    tp = ThreadPool(10)
    for filepath in glob.glob(pattern):
        print(filepath)
        tp.apply_async(process_file, filepath)
    tp.close()
    tp.join()

if __name__ == '__main__':
    process_directory(DATADIR)

मैं बहुत सारे दस्तावेज़ीकरण और कुछ इसी तरह के प्रश्नों से गुज़रा हूं लेकिन यह काम नहीं कर रहा है।

पार्सर कोड के साथ क्या होता है कि मैं कंसोल पर फ़ाइल के सभी पथों को मुद्रित करता हूं जिन्हें मुझे पार्स करने की आवश्यकता होती है, लेकिन फिर यह प्रोग्राम कुछ और नहीं करता है।

0
Tytire Recubans 12 नवम्बर 2019, 18:41

1 उत्तर

सबसे बढ़िया उत्तर

समस्या यह है कि आप apply_async को कॉल कर रहे हैं। मैंने आपकी समस्या का एक सरल पुनरुत्पादक बनाया है, लेकिन प्रत्येक कॉल से परिणाम प्राप्त करने के लिए थोड़ा सा बदलाव के साथ:

from multiprocessing.pool import ThreadPool

def func(f):
    print("hey " + f)
    return f + "1"

l = ["name", "name2", "name3"]
pool = ThreadPool(3)
out = []
for a in l:
    print(a)
    out.append(pool.apply_async(func, a))

# Check the response from each `apply_async` call
for a in out:
    a.get()

pool.close()
pool.join()

यह एक त्रुटि देता है:

Traceback (most recent call last):
  File "a.py", line 16, in <module>
    a.get()
  File "/usr/lib64/python3.4/multiprocessing/pool.py", line 599, in get
    raise self._value
  File "/usr/lib64/python3.4/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
TypeError: func() takes 1 positional argument but 4 were given

ऐसा लगता है कि आप एक के बजाय चार स्थितीय तर्क पारित कर रहे हैं। ऐसा इसलिए है क्योंकि apply_async सभी तर्कों को एक टपल में पारित करना चाहता है, जैसे:

pool.apply_async(func, (a,))

यदि आप apply_async को कॉल करते समय filepath को टपल में रखते हैं, तो मुझे लगता है कि आपको वह व्यवहार मिलेगा जिसकी आप अपेक्षा करते हैं।

यह भी ध्यान देने योग्य है कि आपका उपयोगकेस apply_async के बजाय pool.map का उपयोग करने के लिए उपयुक्त है, जो थोड़ा अधिक संक्षिप्त है:

pool.map(process_file, glob.glob(pattern))
2
dano 12 नवम्बर 2019, 18:50