멀티프로세싱에서의 공유 메모리 객체
메모리 numpy 배열이 크다고 가정하면, 함수가 있습니다.func
이 거대한 배열을 입력(다른 매개 변수와 함께)으로 사용). func
다른 파라미터를 사용하여 병렬로 실행할 수 있습니다.예:
def func(arr, param):
# do stuff to arr, param
# build array arr
pool = Pool(processes = 6)
results = [pool.apply_async(func, [arr, param]) for param in all_params]
output = [res.get() for res in results]
멀티프로세싱 라이브러리를 사용하면 그 거대한 배열이 여러 번 다른 프로세스로 복사됩니다.
서로 다른 프로세스가 동일한 어레이를 공유하도록 하는 방법이 있습니까?이 어레이 개체는 읽기 전용이며 수정되지 않습니다.
더 복잡한 것은, arr이 배열이 아니라 임의의 파이썬 객체라면, 공유할 수 있는 방법이 있을까요?
[편집]
저는 답을 읽었지만 여전히 약간 혼란스럽습니다.fork()는 Copy-on-Write이기 때문에 python 다중 처리 라이브러리에서 새로운 프로세스를 생성할 때 추가 비용을 발생시키지 않아야 합니다.그러나 다음 코드는 엄청난 오버헤드가 있음을 시사합니다.
from multiprocessing import Pool, Manager
import numpy as np;
import time
def f(arr):
return len(arr)
t = time.time()
arr = np.arange(10000000)
print "construct array = ", time.time() - t;
pool = Pool(processes = 6)
t = time.time()
res = pool.apply_async(f, [arr,])
res.get()
print "multiprocessing overhead = ", time.time() - t;
출력(참고로 어레이의 크기가 증가함에 따라 비용이 증가하므로 메모리 복사와 관련된 오버헤드가 여전히 남아 있을 것으로 생각됨):
construct array = 0.0178790092468
multiprocessing overhead = 0.252444982529
어레이를 복사하지 않았다면 오버헤드가 왜 이렇게 클까요?그리고 공유 메모리는 어떤 부분에서 저를 구해줄 수 있을까요?
에는 다음과 같이 .fork()
의미론(일반적인 유닉스와 마찬가지)을 사용하면 데이터 구조를 변경하지 않는 한 추가 메모리를 사용하지 않고 모든 하위 프로세스에서 사용할 수 있습니다.당신은 특별한 일을 할 필요가 없을 것입니다. 단, 객체를 변경하지 않도록 확실히 해야 합니다.
문제를 해결하기 위해 할 수 있는 가장 효율적인 방법은 어레이를 효율적인 어레이 구조로 패키징하는 것입니다.numpy
또는 ), 공유 메모리에 저장한 후 다음과 같이 포장합니다.multiprocessing.Array
그리고 그것을 당신의 기능에 전달합니다.이 답은 그것을 하는 방법을 보여줍니다.
쓰기 가능한 공유 개체를 사용하려면 동기화 또는 잠금과 함께 개체를 래핑해야 합니다. multiprocessing
이를 위한 두 가지 방법을 제공합니다. 하나는 공유 메모리(단순 값, 배열 또는 ctype에 적합)를 사용하는 방법이거나Manager
프록시는 한 프로세스가 메모리를 보유하고 관리자가 다른 프로세스(네트워크를 통해서도)에서 메모리에 대한 액세스를 중재합니다.
그Manager
접근 방식은 임의의 Python 객체와 함께 사용할 수 있지만 객체를 직렬화/직렬화하고 프로세스 간에 전송해야 하기 때문에 공유 메모리를 사용하는 접근 방식보다 느립니다.
Python에서는 다양한 병렬 처리 라이브러리와 접근 방식을 사용할 수 있습니다. multiprocessing
훌륭하고 둥근 라이브러리이지만, 특별한 요구가 있다면 다른 접근법 중 하나가 더 나을 수도 있습니다.
이것은 병렬 및 분산 Python용 라이브러리인 Ray를 위한 사용 사례입니다.후드 아래에서는 Apache Arrow 데이터 레이아웃(제로 복사 형식)을 사용하여 개체를 직렬화하고 공유 메모리 개체 저장소에 저장하므로 복사본을 만들지 않고도 여러 프로세스에서 액세스할 수 있습니다.
코드는 다음과 같습니다.
import numpy as np
import ray
ray.init()
@ray.remote
def func(array, param):
# Do stuff.
return 1
array = np.ones(10**6)
# Store the array in the shared memory object store once
# so it is not copied multiple times.
array_id = ray.put(array)
result_ids = [func.remote(array_id, i) for i in range(4)]
output = ray.get(result_ids)
.ray.put
메모리에 되지만, 한 번씩 됩니다.func
당신이 원하는 것이 아닙니다.
이것은 배열뿐만 아니라 배열을 포함하는 개체(예: 아래와 같이 int를 배열에 매핑하는 사전)에도 적용됩니다.
IPython에서 다음을 실행하면 Ray와 Pickle의 직렬화 성능을 비교할 수 있습니다.
import numpy as np
import pickle
import ray
ray.init()
x = {i: np.ones(10**7) for i in range(20)}
# Time Ray.
%time x_id = ray.put(x) # 2.4s
%time new_x = ray.get(x_id) # 0.00073s
# Time pickle.
%time serialized = pickle.dumps(x) # 2.6s
%time deserialized = pickle.loads(serialized) # 1.9s
Ray를 사용한 직렬화는 피클보다 약간 빠르지만 공유 메모리를 사용하기 때문에 역직렬화는 1000배 더 빠릅니다(물론 이 숫자는 개체에 따라 다릅니다).
Ray 설명서를 참조하십시오.Ray(레이) 및 Arrow(화살표)를 사용하여 빠른 직렬화에 대해 자세히 알아볼 수 있습니다.참고로 저는 레이 개발자 중 한 명입니다.
저도 같은 문제에 부딪혔고, 이 문제를 해결하기 위해 공유 메모리 유틸리티 클래스를 작성했습니다.
는 중용사를 합니다.multiprocessing.RawArray
(잠금 해제) 및 어레이에 대한 액세스가 전혀 동기화되지 않습니다(잠금 해제). 자신의 발을 쏘지 않도록 주의하십시오.
이 솔루션을 사용하면 쿼드코어 i7에서 속도가 약 3배 향상됩니다.
코드는 다음과 같습니다.자유롭게 사용하고 개선하시고, 버그가 있으면 다시 보고해주세요.
'''
Created on 14.05.2013
@author: martin
'''
import multiprocessing
import ctypes
import numpy as np
class SharedNumpyMemManagerError(Exception):
pass
'''
Singleton Pattern
'''
class SharedNumpyMemManager:
_initSize = 1024
_instance = None
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = super(SharedNumpyMemManager, cls).__new__(
cls, *args, **kwargs)
return cls._instance
def __init__(self):
self.lock = multiprocessing.Lock()
self.cur = 0
self.cnt = 0
self.shared_arrays = [None] * SharedNumpyMemManager._initSize
def __createArray(self, dimensions, ctype=ctypes.c_double):
self.lock.acquire()
# double size if necessary
if (self.cnt >= len(self.shared_arrays)):
self.shared_arrays = self.shared_arrays + [None] * len(self.shared_arrays)
# next handle
self.__getNextFreeHdl()
# create array in shared memory segment
shared_array_base = multiprocessing.RawArray(ctype, np.prod(dimensions))
# convert to numpy array vie ctypeslib
self.shared_arrays[self.cur] = np.ctypeslib.as_array(shared_array_base)
# do a reshape for correct dimensions
# Returns a masked array containing the same data, but with a new shape.
# The result is a view on the original array
self.shared_arrays[self.cur] = self.shared_arrays[self.cnt].reshape(dimensions)
# update cnt
self.cnt += 1
self.lock.release()
# return handle to the shared memory numpy array
return self.cur
def __getNextFreeHdl(self):
orgCur = self.cur
while self.shared_arrays[self.cur] is not None:
self.cur = (self.cur + 1) % len(self.shared_arrays)
if orgCur == self.cur:
raise SharedNumpyMemManagerError('Max Number of Shared Numpy Arrays Exceeded!')
def __freeArray(self, hdl):
self.lock.acquire()
# set reference to None
if self.shared_arrays[hdl] is not None: # consider multiple calls to free
self.shared_arrays[hdl] = None
self.cnt -= 1
self.lock.release()
def __getArray(self, i):
return self.shared_arrays[i]
@staticmethod
def getInstance():
if not SharedNumpyMemManager._instance:
SharedNumpyMemManager._instance = SharedNumpyMemManager()
return SharedNumpyMemManager._instance
@staticmethod
def createArray(*args, **kwargs):
return SharedNumpyMemManager.getInstance().__createArray(*args, **kwargs)
@staticmethod
def getArray(*args, **kwargs):
return SharedNumpyMemManager.getInstance().__getArray(*args, **kwargs)
@staticmethod
def freeArray(*args, **kwargs):
return SharedNumpyMemManager.getInstance().__freeArray(*args, **kwargs)
# Init Singleton on module load
SharedNumpyMemManager.getInstance()
if __name__ == '__main__':
import timeit
N_PROC = 8
INNER_LOOP = 10000
N = 1000
def propagate(t):
i, shm_hdl, evidence = t
a = SharedNumpyMemManager.getArray(shm_hdl)
for j in range(INNER_LOOP):
a[i] = i
class Parallel_Dummy_PF:
def __init__(self, N):
self.N = N
self.arrayHdl = SharedNumpyMemManager.createArray(self.N, ctype=ctypes.c_double)
self.pool = multiprocessing.Pool(processes=N_PROC)
def update_par(self, evidence):
self.pool.map(propagate, zip(range(self.N), [self.arrayHdl] * self.N, [evidence] * self.N))
def update_seq(self, evidence):
for i in range(self.N):
propagate((i, self.arrayHdl, evidence))
def getArray(self):
return SharedNumpyMemManager.getArray(self.arrayHdl)
def parallelExec():
pf = Parallel_Dummy_PF(N)
print(pf.getArray())
pf.update_par(5)
print(pf.getArray())
def sequentialExec():
pf = Parallel_Dummy_PF(N)
print(pf.getArray())
pf.update_seq(5)
print(pf.getArray())
t1 = timeit.Timer("sequentialExec()", "from __main__ import sequentialExec")
t2 = timeit.Timer("parallelExec()", "from __main__ import parallelExec")
print("Sequential: ", t1.timeit(number=1))
print("Parallel: ", t2.timeit(number=1))
로버트 니시하라가 언급했듯이 Apache Arrow는 특히 Ray가 기반으로 하는 Plasma in-memory 객체 저장소를 통해 이를 쉽게 만듭니다.
저는 특히 이러한 이유로 뇌 플라즈마를 만들었습니다. 플라스크 앱에서 큰 물체를 빠르게 로딩하고 다시 로딩하는 것입니다.Apache Arrow 직렬화 가능 개체를 위한 공유 메모리 개체 네임스페이스입니다.pickle
'는 'd 생성한테링의에해스트에서 생성된 에 의해 생성됩니다pickle.dumps(...)
.
Apache Ray와 Plasma의 주요 차이점은 객체 ID를 추적한다는 것입니다.은 임의의에서 이름을 불러 변수 값을 할 수 .Brain
물건.
$ pip install brain-plasma
$ plasma_store -m 10000000 -s /tmp/plasma
from brain_plasma import Brain
brain = Brain(path='/tmp/plasma/')
brain['a'] = [1]*10000
brain['a']
# >>> [1,1,1,1,...]
언급URL : https://stackoverflow.com/questions/10721915/shared-memory-objects-in-multiprocessing
'programing' 카테고리의 다른 글
팬더 그림에서 x축 눈금 레이블을 회전하는 방법 (0) | 2023.07.19 |
---|---|
cx_Oracle을 사용하여 사전 목록 만들기 (0) | 2023.07.19 |
조건부('if') 문을 기준으로 데이터 프레임의 값 바꾸기 (0) | 2023.07.19 |
Python으로 Excel(xls) 파일 읽기/파싱 (0) | 2023.07.19 |
파이썬 다중 처리 풀 imap_unordered 호출의 진행 상황을 표시하시겠습니까? (0) | 2023.07.19 |