मुझे पता है कि समान समस्याओं के आसपास बहुत सारे विषय हैं (जैसे मैं प्रक्रियाओं को मुख्य प्रोग्राम की एक सरणी में लिखने में सक्षम कैसे बनाऊं?, मल्टीप्रोसेसिंग - शेयर्ड ऐरे या एक फ़ंक्शन के लूप को मल्टीप्रोसेस करना जो कि Python में किसी सरणी को लिखता है), लेकिन मुझे अभी समझ नहीं आया... फिर से पूछने के लिए खेद है।

मुझे एक विशाल सरणी के साथ कुछ सामान करने की ज़रूरत है और चीजों को ब्लॉक में विभाजित करके और उन ब्लॉक पर अपना फ़ंक्शन चलाकर चीजों को गति देना चाहता हूं, प्रत्येक ब्लॉक अपनी प्रक्रिया में चलाया जा रहा है। समस्या यह है: ब्लॉक एक सरणी से "कट" होते हैं और परिणाम तब एक नए, सामान्य सरणी में लिखा जाएगा। मैंने अब तक यही किया है (न्यूनतम कामकाजी उदाहरण; सरणी-आकार देने पर ध्यान न दें, यह मेरे असली दुनिया के मामले के लिए जरूरी है):

import numpy as np
import multiprocessing as mp

def calcArray(array, blocksize, n_cores=1):
    in_shape = (array.shape[0] * array.shape[1], array.shape[2])
    input_array = array[:, :, :array.shape[2]].reshape(in_shape)
    result_array = np.zeros(array.shape)
    # blockwise loop
    pix_count = array.size
    for position in range(0, pix_count, blocksize):
        if position + blocksize < array.shape[0] * array.shape[1]:
            num = blocksize
        else:
            num = pix_count - position
        result_part = input_array[position:position + num, :] * 2
        result_array[position:position + num] = result_part
    # finalize result
    final_result = result_array.reshape(array.shape)
    return final_result

if __name__ == '__main__':
    start = time.time()
    img = np.ones((4000, 4000, 4))
    result = calcArray(img, blocksize=100, n_cores=4)
    print 'Input:\n', img
    print '\nOutput:\n', result

अब मैं मल्टीप्रोसेसिंग को इस तरह से कैसे कार्यान्वित कर सकता हूं कि मैं कई कोर सेट करता हूं और फिर calcArray प्रत्येक ब्लॉक को n_cores तक पहुंचने तक प्रक्रियाओं को असाइन करता है?


@Blownhither Ma की बहुत सराहना की गई मदद से, कोड अब इस तरह दिखता है:

import time, datetime
import numpy as np
from multiprocessing import Pool

def calculate(array):
    return array * 2

if __name__ == '__main__':
    start = time.time()
    CORES = 4
    BLOCKSIZE = 100
    ARRAY = np.ones((4000, 4000, 4))
    pool = Pool(processes=CORES)
    in_shape = (ARRAY.shape[0] * ARRAY.shape[1], ARRAY.shape[2])
    input_array = ARRAY[:, :, :ARRAY.shape[2]].reshape(in_shape)
    result_array = np.zeros(input_array.shape)
    # do it
    pix_count = ARRAY.size
    handles = []
    for position in range(0, pix_count, BLOCKSIZE):
        if position + BLOCKSIZE < ARRAY.shape[0] * ARRAY.shape[1]:
            num = BLOCKSIZE
        else:
            num = pix_count - position
        ### OLD APPROACH WITH NO PARALLELIZATION ###
        # part = calculate(input_array[position:position + num, :])
        # result_array[position:position + num] = part
        ### NEW APPROACH WITH PARALLELIZATION ###
        handle = pool.apply_async(func=calculate, args=(input_array[position:position + num, :],))
        handles.append(handle)
    # finalize result
    ### OLD APPROACH WITH NO PARALLELIZATION ###
    # final_result = result_array.reshape(ARRAY.shape)
    ### NEW APPROACH WITH PARALLELIZATION ###
    final_result = [h.get() for h in handles]
    final_result = np.concatenate(final_result, axis=0)
    print 'Done!\nDuration (hh:mm:ss): {duration}'.format(duration=datetime.timedelta(seconds=time.time() - start))

कोड चलता है और वास्तव में मेरे द्वारा निर्दिष्ट संख्या प्रक्रियाओं को शुरू करता है, लेकिन पुराने दृष्टिकोण की तुलना में बहुत अधिक समय लेता है, केवल "जैसा है" लूप का उपयोग करके (1 मिनट की तुलना में 3 सेकंड)। यहाँ कुछ कमी होनी चाहिए।

0
s6hebern 4 मई 2018, 11:04

1 उत्तर

सबसे बढ़िया उत्तर

मुख्य कार्य pool.apply_async और handler.get है।

मैं हाल ही में उन्हीं कार्यों पर काम कर रहा हूं और मानक उपयोगिता फ़ंक्शन बनाने के लिए इसे उपयोगी पाता हूं। balanced_parallel मैट्रिक्स a पर फ़ंक्शन fn को समानांतर तरीके से चुपचाप लागू करता है। assigned_parallel प्रत्येक तत्व पर स्पष्ट रूप से फ़ंक्शन लागू करें।
मैं। जिस तरह से मैं सरणी विभाजित करता हूं वह np.array_split है। आप इसके बजाय ब्लॉक योजना का उपयोग कर सकते हैं।
द्वितीय मैं परिणाम एकत्रित करते समय एक खाली मैट्रिक्स को असाइन करने के बजाय concat का उपयोग करता हूं। कोई साझा स्मृति नहीं है।

from multiprocessing import cpu_count, Pool

def balanced_parallel(fn, a, processes=None, timeout=None):
    """ apply fn on slice of a, return concatenated result """
    if processes is None:
        processes = cpu_count()
    print('Parallel:\tstarting {} processes on input with shape {}'.format(processes, a.shape))
    results = assigned_parallel(fn, np.array_split(a, processes), timeout=timeout, verbose=False)
    return np.concatenate(results, 0)


def assigned_parallel(fn, l, processes=None, timeout=None, verbose=True):
    """ apply fn on each element of l, return list of results """
    if processes is None:
        processes = min(cpu_count(), len(l))
    pool = Pool(processes=processes)
    if verbose:
        print('Parallel:\tstarting {} processes on {} elements'.format(processes, len(l)))

    # add jobs to the pool
    handler = [pool.apply_async(fn, args=x if isinstance(x, tuple) else (x, )) for x in l]

    # pool running, join all results
    results = [handler[i].get(timeout=timeout) for i in range(len(handler))]

    pool.close()
    return results

आपके मामले में, fn होगा

def _fn(matrix_part): return matrix_part * 2
result = balanced_parallel(_fn, img)

अनुवर्ती: समानांतर होने के लिए आपका लूप इस तरह दिखना चाहिए।

handles = []
for position in range(0, pix_count, BLOCKSIZE):
    if position + BLOCKSIZE < ARRAY.shape[0] * ARRAY.shape[1]:
        num = BLOCKSIZE
    else:
        num = pix_count - position
    handle = pool.apply_async(func=calculate, args=(input_array[position:position + num, :], ))
    handles.append(handle)

# multiple handlers exist at this moment!! Don't `.get()` yet
results = [h.get() for h in handles]
results = np.concatenate(results, axis=0)
1
Blownhither Ma 21 सितंबर 2019, 13:32