파이썬 다중 처리 풀 imap_unordered 호출의 진행 상황을 표시하시겠습니까?
다중 처리 풀 작업 집합을 성공적으로 수행하는 스크립트가 있습니다.imap_unordered()
호출:
p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
p.join() # Wait for completion
하지만, 나의num_tasks
약 25만 명이고, 그래서.join()
메인 스레드를 10초 정도 잠그고, 메인 프로세스가 잠기지 않았음을 보여주기 위해 명령행에 점진적으로 에코아웃할 수 있습니다.다음과 같은 것:
p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
remaining = rs.tasks_remaining() # How many of the map call haven't been done yet?
if (remaining == 0): break # Jump out of while loop
print("Waiting for", remaining, "tasks to complete...")
time.sleep(2)
남은 작업 수를 나타내는 결과 개체 또는 풀 자체에 대한 방법이 있습니까?사용해 보았습니다.multiprocessing.Value
카운터로서의 객체(do_work
a를 부르다counter.value += 1
작업을 수행한 후 작업), 그러나 카운터는 증가를 중지하기 전에 총 값의 ~85%까지만 도달합니다.
제가 가장 좋아하는 것은, 모든 것이 동시에 실행되고 커밋되는 동안 멋진 작은 진행 표시줄과 완료 ETA를 제공합니다.
from multiprocessing import Pool
import tqdm
pool = Pool(processes=8)
for _ in tqdm.tqdm(pool.imap_unordered(do_work, tasks), total=len(tasks)):
pass
결과 집합의 개인 속성에 액세스할 필요가 없습니다.
from __future__ import division
import sys
for i, _ in enumerate(p.imap_unordered(do_work, xrange(num_tasks)), 1):
sys.stderr.write('\rdone {0:%}'.format(i/num_tasks))
제가 진행 상황을 확인하려고 했을 때 이미 작업이 완료되었다는 것을 알게 되었습니다.이것이 제가 tqdm을 사용하는 데 도움이 되었습니다.
pip install tqdm
from multiprocessing import Pool
from tqdm import tqdm
tasks = range(5)
pool = Pool()
pbar = tqdm(total=len(tasks))
def do_work(x):
# do something with x
pbar.update(1)
pool.imap_unordered(do_work, tasks)
pool.close()
pool.join()
pbar.close()
이것은 차단 여부에 관계없이 모든 종류의 멀티프로세싱에서 작동해야 합니다.
팀이 제안한 것처럼, 당신은 다음을 사용할 수 있습니다.tqdm
그리고.imap
이 문제를 해결하기 위해.나는 방금 이 문제를 우연히 발견하고 그것을 조작했습니다.imap_unordered
매핑 결과에 액세스할 수 있도록 솔루션을 참조하십시오.작동 방식은 다음과 같습니다.
from multiprocessing import Pool
import tqdm
pool = multiprocessing.Pool(processes=4)
mapped_values = list(tqdm.tqdm(pool.imap_unordered(do_work, range(num_tasks)), total=len(values)))
작업에서 반환되는 값이 중요하지 않은 경우에는 변수에 목록을 할당할 필요가 없습니다.
좀 더 깊이 파고들어 답을 찾았습니다.다음을 살펴봅니다.__dict__
의imap_unordered
결과 객체, 나는 그것이 있다는 것을 발견했습니다._index
작업이 완료될 때마다 증가하는 속성입니다.그래서 이것은 벌목을 위해 작동하고, 그것에 싸여 있습니다.while
루프:
p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
completed = rs._index
if (completed == num_tasks): break
print "Waiting for", num_tasks-completed, "tasks to complete..."
time.sleep(2)
하지만 결과 개체는 조금 다르지만 와 스왑하면 훨씬 더 빨리 실행된다는 것을 알게 되었습니다.대신, 결과 개체는map_async
을 가지고 있습니다._number_left
속성, 그리고 aready()
방법:
p = multiprocessing.Pool()
rs = p.map_async(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
if (rs.ready()): break
remaining = rs._number_left
print "Waiting for", remaining, "tasks to complete..."
time.sleep(0.5)
저는 이것이 꽤 오래된 질문이라는 것을 알지만, 제가 파이썬에서 작업 풀의 진행을 추적하고 싶을 때 하는 일이 있습니다.
from progressbar import ProgressBar, SimpleProgress
import multiprocessing as mp
from time import sleep
def my_function(letter):
sleep(2)
return letter+letter
dummy_args = ["A", "B", "C", "D"]
pool = mp.Pool(processes=2)
results = []
pbar = ProgressBar(widgets=[SimpleProgress()], maxval=len(dummy_args)).start()
r = [pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args]
while len(results) != len(dummy_args):
pbar.update(len(results))
sleep(0.5)
pbar.finish()
print results
기본적으로 apply_async를 callbak과 함께 사용하므로(이 경우 반환된 값을 목록에 추가하는 것입니다) 다른 작업을 위해 기다릴 필요가 없습니다.그런 다음 잠시 동안 작업 진행 상황을 확인합니다.이 경우, 저는 위젯을 추가하여 보기 좋게 만들었습니다.
출력:
4 of 4
['AA', 'BB', 'CC', 'DD']
도움이 되길 바랍니다.
단순한 솔루션으로Pool.apply_async()
:
from multiprocessing import Pool
from tqdm import tqdm
from time import sleep
def work(x):
sleep(0.2)
return x**2
n = 10
with Pool(4) as p, tqdm(total=n) as pbar:
res = [p.apply_async(
work, args=(i,), callback=lambda _: pbar.update(1)) for i in range(n)]
results = [r.get() for r in res]
빠른 시작
사용 및
설치하다
pip install tqdm
예
import time
import threading
from multiprocessing import Pool
from tqdm import tqdm
def do_work(x):
time.sleep(x)
return x
def progress():
time.sleep(3) # Check progress after 3 seconds
print(f'total: {pbar.total} finish:{pbar.n}')
tasks = range(10)
pbar = tqdm(total=len(tasks))
if __name__ == '__main__':
thread = threading.Thread(target=progress)
thread.start()
results = []
with Pool(processes=5) as pool:
for result in pool.imap_unordered(do_work, tasks):
results.append(result)
pbar.update(1)
print(results)
결과
플라스크
설치하다
pip install flask
main.py
import time
from multiprocessing import Pool
from tqdm import tqdm
from flask import Flask, make_response, jsonify
app = Flask(__name__)
def do_work(x):
time.sleep(x)
return x
total = 5 # num of tasks
tasks = range(total)
pbar = tqdm(total=len(tasks))
@app.route('/run/')
def run():
results = []
with Pool(processes=2) as pool:
for _result in pool.imap_unordered(do_work, tasks):
results.append(_result)
if pbar.n >= total:
pbar.n = 0 # reset
pbar.update(1)
response = make_response(jsonify(dict(results=results)))
response.headers.add('Access-Control-Allow-Origin', '*')
response.headers.add('Access-Control-Allow-Headers', '*')
response.headers.add('Access-Control-Allow-Methods', '*')
return response
@app.route('/progress/')
def progress():
response = make_response(jsonify(dict(n=pbar.n, total=pbar.total)))
response.headers.add('Access-Control-Allow-Origin', '*')
response.headers.add('Access-Control-Allow-Headers', '*')
response.headers.add('Access-Control-Allow-Methods', '*')
return response
실행(예: 윈도우즈)
set FLASK_APP=main
flask run
API 목록
- 태스크 실행: http://127.0.0.1:5000/run/
- 진행률 표시:http://127.0.0.1:5000/진행률/
시험을 보다
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Progress Bar</title>
<script src="https://cdn.bootcss.com/jquery/3.0.0/jquery.min.js"></script>
<script src="https://cdn.bootcdn.net/ajax/libs/twitter-bootstrap/3.3.7/js/bootstrap.min.js"></script>
<link href="https://cdn.bootcdn.net/ajax/libs/twitter-bootstrap/3.3.7/css/bootstrap.min.css" rel="stylesheet">
</head>
<body>
<button id="run">Run the task</button>
<br><br>
<div class="progress">
<div class="progress-bar" role="progressbar" aria-valuenow="1" aria-valuemin="0" aria-valuemax="100"
style="width: 10%">0.00%
</div>
</div>
</body>
<script>
function set_progress_rate(n, total) {
//Set the rate of progress bar
var rate = (n / total * 100).toFixed(2);
if (n > 0) {
$(".progress-bar").attr("aria-valuenow", n);
$(".progress-bar").attr("aria-valuemax", total);
$(".progress-bar").text(rate + "%");
$(".progress-bar").css("width", rate + "%");
}
}
$("#run").click(function () {
//Run the task
$.ajax({
url: "http://127.0.0.1:5000/run/",
type: "GET",
success: function (response) {
set_progress_rate(100, 100);
console.log('Results:' + response['results']);
}
});
});
setInterval(function () {
//Show progress every 1 second
$.ajax({
url: "http://127.0.0.1:5000/progress/",
type: "GET",
success: function (response) {
console.log(response);
var n = response["n"];
var total = response["total"];
set_progress_rate(n, total);
}
});
}, 1000);
</script>
</html>
결과
진행률 인쇄물을 만들기 위해 사용자 정의 클래스를 만들었습니다.이것은 도움이 됩니다.
from multiprocessing import Pool, cpu_count
class ParallelSim(object):
def __init__(self, processes=cpu_count()):
self.pool = Pool(processes=processes)
self.total_processes = 0
self.completed_processes = 0
self.results = []
def add(self, func, args):
self.pool.apply_async(func=func, args=args, callback=self.complete)
self.total_processes += 1
def complete(self, result):
self.results.extend(result)
self.completed_processes += 1
print('Progress: {:.2f}%'.format((self.completed_processes/self.total_processes)*100))
def run(self):
self.pool.close()
self.pool.join()
def get_results(self):
return self.results
몇 가지 조사를 한 후, 저는 평행봉이라는 작은 모듈을 썼습니다.풀의 전체 진행률과 각 코어에 대한 진행률을 개별적으로 표시할 수 있습니다.그것은 사용하기 쉽고 설명이 좋습니다.
예:
from parallelbar import progress_map
from parallelbar.tools import cpu_bench
if __name__=='__main__':
# create list of task
tasks = [1_000_000 + i for i in range(100)]
progress_map(cpu_bench, tasks)
풀링과 함께 사용할 수 있는 간단한 큐 기반 접근 방식을 사용해 보십시오.진행률 표시줄을 시작한 후 인쇄하면 적어도 이 특정 진행률 표시줄에 대해 해당 작업이 이동됩니다. (PyPI의 진행률 1.5)
import time
from progress.bar import Bar
def status_bar( queue_stat, n_groups, n ):
bar = Bar('progress', max = n)
finished = 0
while finished < n_groups:
while queue_stat.empty():
time.sleep(0.01)
gotten = queue_stat.get()
if gotten == 'finished':
finished += 1
else:
bar.next()
bar.finish()
def process_data( queue_data, queue_stat, group):
for i in group:
... do stuff resulting in new_data
queue_stat.put(1)
queue_stat.put('finished')
queue_data.put(new_data)
def multiprocess():
new_data = []
groups = [[1,2,3],[4,5,6],[7,8,9]]
combined = sum(groups,[])
queue_data = multiprocessing.Queue()
queue_stat = multiprocessing.Queue()
for i, group in enumerate(groups):
if i == 0:
p = multiprocessing.Process(target = status_bar,
args=(queue_stat,len(groups),len(combined)))
processes.append(p)
p.start()
p = multiprocessing.Process(target = process_data,
args=(queue_data, queue_stat, group))
processes.append(p)
p.start()
for i in range(len(groups)):
data = queue_data.get()
new_data += data
for p in processes:
p.join()
일부 답변은 진행률 표시줄에서 작동하지만 풀에서 결과를 가져올 수 없습니다.
tqdm을 사용하여 진행률 표시줄을 생성했습니다.pip install tqdm
아래의 간단한 코드는 진행률 표시줄에서 매우 잘 작동하며 결과도 얻을 수 있습니다.
from multiprocessing import Pool
from tqdm import tqdm
from time import sleep
tasks = range(5)
result = []
def do_work(x):
# do something with x and return the result
sleep(2)
return x + 2
if __name__ == '__main__':
pbar = tqdm(total=len(tasks))
with Pool(2) as p:
for i in p.imap_unordered(do_work, tasks):
result.append(i)
pbar.update(i)
pbar.close()
print(result)
언급URL : https://stackoverflow.com/questions/5666576/show-the-progress-of-a-python-multiprocessing-pool-imap-unordered-call
'programing' 카테고리의 다른 글
조건부('if') 문을 기준으로 데이터 프레임의 값 바꾸기 (0) | 2023.07.19 |
---|---|
Python으로 Excel(xls) 파일 읽기/파싱 (0) | 2023.07.19 |
Windows 7 시스템의 MongoDB:연결할 수 없습니다. (0) | 2023.07.09 |
Oracle PL/SQL : 문자열에서 "공백 문자" 제거 (0) | 2023.07.09 |
MongoDB에서 배열에 포함된 개체 필드를 어떻게 인덱싱합니까? (0) | 2023.07.09 |