programing

파이썬 다중 처리 풀 imap_unordered 호출의 진행 상황을 표시하시겠습니까?

testmans 2023. 7. 19. 21:12
반응형

파이썬 다중 처리 풀 imap_unordered 호출의 진행 상황을 표시하시겠습니까?

다중 처리 풀 작업 집합을 성공적으로 수행하는 스크립트가 있습니다.imap_unordered()호출:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
p.join() # Wait for completion

하지만, 나의num_tasks약 25만 명이고, 그래서.join()메인 스레드를 10초 정도 잠그고, 메인 프로세스가 잠기지 않았음을 보여주기 위해 명령행에 점진적으로 에코아웃할 수 있습니다.다음과 같은 것:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  remaining = rs.tasks_remaining() # How many of the map call haven't been done yet?
  if (remaining == 0): break # Jump out of while loop
  print("Waiting for", remaining, "tasks to complete...")
  time.sleep(2)

남은 작업 수를 나타내는 결과 개체 또는 풀 자체에 대한 방법이 있습니까?사용해 보았습니다.multiprocessing.Value카운터로서의 객체(do_worka를 부르다counter.value += 1작업을 수행한 후 작업), 그러나 카운터는 증가를 중지하기 전에 총 값의 ~85%까지만 도달합니다.

제가 가장 좋아하는 것은, 모든 것이 동시에 실행되고 커밋되는 동안 멋진 작은 진행 표시줄과 완료 ETA를 제공합니다.

from multiprocessing import Pool
import tqdm

pool = Pool(processes=8)
for _ in tqdm.tqdm(pool.imap_unordered(do_work, tasks), total=len(tasks)):
    pass

결과 집합의 개인 속성에 액세스할 필요가 없습니다.

from __future__ import division
import sys

for i, _ in enumerate(p.imap_unordered(do_work, xrange(num_tasks)), 1):
    sys.stderr.write('\rdone {0:%}'.format(i/num_tasks))

제가 진행 상황을 확인하려고 했을 때 이미 작업이 완료되었다는 것을 알게 되었습니다.이것이 가 tqdm을 사용하는 데 도움이 되었습니다.

pip install tqdm

from multiprocessing import Pool
from tqdm import tqdm

tasks = range(5)
pool = Pool()
pbar = tqdm(total=len(tasks))

def do_work(x):
    # do something with x
    pbar.update(1)

pool.imap_unordered(do_work, tasks)
pool.close()
pool.join()
pbar.close()

이것은 차단 여부에 관계없이 모든 종류의 멀티프로세싱에서 작동해야 합니다.

팀이 제안한 것처럼, 당신은 다음을 사용할 수 있습니다.tqdm그리고.imap이 문제를 해결하기 위해.나는 방금 이 문제를 우연히 발견하고 그것을 조작했습니다.imap_unordered매핑 결과에 액세스할 수 있도록 솔루션을 참조하십시오.작동 방식은 다음과 같습니다.

from multiprocessing import Pool
import tqdm

pool = multiprocessing.Pool(processes=4)
mapped_values = list(tqdm.tqdm(pool.imap_unordered(do_work, range(num_tasks)), total=len(values)))

작업에서 반환되는 값이 중요하지 않은 경우에는 변수에 목록을 할당할 필요가 없습니다.

좀 더 깊이 파고들어 답을 찾았습니다.다음을 살펴봅니다.__dict__imap_unordered결과 객체, 나는 그것이 있다는 것을 발견했습니다._index작업이 완료될 때마다 증가하는 속성입니다.그래서 이것은 벌목을 위해 작동하고, 그것에 싸여 있습니다.while루프:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  completed = rs._index
  if (completed == num_tasks): break
  print "Waiting for", num_tasks-completed, "tasks to complete..."
  time.sleep(2)

하지만 결과 개체는 조금 다르지만 와 스왑하면 훨씬 더 빨리 실행된다는 것을 알게 되었습니다.대신, 결과 개체는map_async을 가지고 있습니다._number_left속성, 그리고 aready()방법:

p = multiprocessing.Pool()
rs = p.map_async(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  if (rs.ready()): break
  remaining = rs._number_left
  print "Waiting for", remaining, "tasks to complete..."
  time.sleep(0.5)

저는 이것이 꽤 오래된 질문이라는 것을 알지만, 제가 파이썬에서 작업 풀의 진행을 추적하고 싶을 때 하는 일이 있습니다.

from progressbar import ProgressBar, SimpleProgress
import multiprocessing as mp
from time import sleep

def my_function(letter):
    sleep(2)
    return letter+letter

dummy_args = ["A", "B", "C", "D"]
pool = mp.Pool(processes=2)

results = []

pbar = ProgressBar(widgets=[SimpleProgress()], maxval=len(dummy_args)).start()

r = [pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args]

while len(results) != len(dummy_args):
    pbar.update(len(results))
    sleep(0.5)
pbar.finish()

print results

기본적으로 apply_async를 callbak과 함께 사용하므로(이 경우 반환된 값을 목록에 추가하는 것입니다) 다른 작업을 위해 기다릴 필요가 없습니다.그런 다음 잠시 동안 작업 진행 상황을 확인합니다.이 경우, 저는 위젯을 추가하여 보기 좋게 만들었습니다.

출력:

4 of 4                                                                         
['AA', 'BB', 'CC', 'DD']

도움이 되길 바랍니다.

단순한 솔루션으로Pool.apply_async():

from multiprocessing import Pool
from tqdm import tqdm
from time import sleep


def work(x):
    sleep(0.2)
    return x**2


n = 10

with Pool(4) as p, tqdm(total=n) as pbar:
    res = [p.apply_async(
        work, args=(i,), callback=lambda _: pbar.update(1)) for i in range(n)]
    results = [r.get() for r in res]

빠른 시작

사용 및

설치하다

pip install tqdm

import time
import threading
from multiprocessing import Pool

from tqdm import tqdm


def do_work(x):
    time.sleep(x)
    return x


def progress():
    time.sleep(3)  # Check progress after 3 seconds
    print(f'total: {pbar.total} finish:{pbar.n}')


tasks = range(10)
pbar = tqdm(total=len(tasks))

if __name__ == '__main__':
    thread = threading.Thread(target=progress)
    thread.start()
    results = []
    with Pool(processes=5) as pool:
        for result in pool.imap_unordered(do_work, tasks):
            results.append(result)
            pbar.update(1)
    print(results)

결과




플라스크

설치하다

pip install flask

main.py

import time
from multiprocessing import Pool

from tqdm import tqdm
from flask import Flask, make_response, jsonify

app = Flask(__name__)


def do_work(x):
    time.sleep(x)
    return x


total = 5  # num of tasks
tasks = range(total)
pbar = tqdm(total=len(tasks))


@app.route('/run/')
def run():
    results = []
    with Pool(processes=2) as pool:
        for _result in pool.imap_unordered(do_work, tasks):
            results.append(_result)
            if pbar.n >= total:
                pbar.n = 0  # reset
            pbar.update(1)
    response = make_response(jsonify(dict(results=results)))
    response.headers.add('Access-Control-Allow-Origin', '*')
    response.headers.add('Access-Control-Allow-Headers', '*')
    response.headers.add('Access-Control-Allow-Methods', '*')
    return response


@app.route('/progress/')
def progress():
    response = make_response(jsonify(dict(n=pbar.n, total=pbar.total)))
    response.headers.add('Access-Control-Allow-Origin', '*')
    response.headers.add('Access-Control-Allow-Headers', '*')
    response.headers.add('Access-Control-Allow-Methods', '*')
    return response

실행(예: 윈도우즈)

set FLASK_APP=main
flask run

API 목록

시험을 보다

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Progress Bar</title>
    <script src="https://cdn.bootcss.com/jquery/3.0.0/jquery.min.js"></script>
    <script src="https://cdn.bootcdn.net/ajax/libs/twitter-bootstrap/3.3.7/js/bootstrap.min.js"></script>
    <link href="https://cdn.bootcdn.net/ajax/libs/twitter-bootstrap/3.3.7/css/bootstrap.min.css" rel="stylesheet">
</head>
<body>
<button id="run">Run the task</button>
<br><br>
<div class="progress">
    <div class="progress-bar" role="progressbar" aria-valuenow="1" aria-valuemin="0" aria-valuemax="100"
         style="width: 10%">0.00%
    </div>
</div>
</body>
<script>
    function set_progress_rate(n, total) {
        //Set the rate of progress bar
        var rate = (n / total * 100).toFixed(2);
        if (n > 0) {
            $(".progress-bar").attr("aria-valuenow", n);
            $(".progress-bar").attr("aria-valuemax", total);
            $(".progress-bar").text(rate + "%");
            $(".progress-bar").css("width", rate + "%");
        }
    }

    $("#run").click(function () {
        //Run the task
        $.ajax({
            url: "http://127.0.0.1:5000/run/",
            type: "GET",
            success: function (response) {
                set_progress_rate(100, 100);
                console.log('Results:' + response['results']);
            }
        });
    });
    setInterval(function () {
        //Show progress every 1 second
        $.ajax({
            url: "http://127.0.0.1:5000/progress/",
            type: "GET",
            success: function (response) {
                console.log(response);
                var n = response["n"];
                var total = response["total"];
                set_progress_rate(n, total);
            }
        });
    }, 1000);
</script>
</html>

결과

진행률 인쇄물을 만들기 위해 사용자 정의 클래스를 만들었습니다.이것은 도움이 됩니다.

from multiprocessing import Pool, cpu_count


class ParallelSim(object):
    def __init__(self, processes=cpu_count()):
        self.pool = Pool(processes=processes)
        self.total_processes = 0
        self.completed_processes = 0
        self.results = []

    def add(self, func, args):
        self.pool.apply_async(func=func, args=args, callback=self.complete)
        self.total_processes += 1

    def complete(self, result):
        self.results.extend(result)
        self.completed_processes += 1
        print('Progress: {:.2f}%'.format((self.completed_processes/self.total_processes)*100))

    def run(self):
        self.pool.close()
        self.pool.join()

    def get_results(self):
        return self.results

몇 가지 조사를 한 후, 저는 평행봉이라는 작은 모듈을 썼습니다.풀의 전체 진행률과 각 코어에 대한 진행률을 개별적으로 표시할 수 있습니다.그것은 사용하기 쉽고 설명이 좋습니다.

예:

from parallelbar import progress_map
from parallelbar.tools import cpu_bench


if __name__=='__main__':
    # create list of task
    tasks = [1_000_000 + i for i in range(100)]
    progress_map(cpu_bench, tasks)

풀링과 함께 사용할 수 있는 간단한 큐 기반 접근 방식을 사용해 보십시오.진행률 표시줄을 시작한 후 인쇄하면 적어도 이 특정 진행률 표시줄에 대해 해당 작업이 이동됩니다. (PyPI의 진행률 1.5)

import time
from progress.bar import Bar

def status_bar( queue_stat, n_groups, n ):

    bar = Bar('progress', max = n)  

    finished = 0
    while finished < n_groups:

        while queue_stat.empty():
            time.sleep(0.01)

        gotten = queue_stat.get()
        if gotten == 'finished':
            finished += 1
        else:
            bar.next()
    bar.finish()


def process_data( queue_data, queue_stat, group):

    for i in group:

        ... do stuff resulting in new_data

        queue_stat.put(1)

    queue_stat.put('finished')  
    queue_data.put(new_data)

def multiprocess():

    new_data = []

    groups = [[1,2,3],[4,5,6],[7,8,9]]
    combined = sum(groups,[])

    queue_data = multiprocessing.Queue()
    queue_stat = multiprocessing.Queue()

    for i, group in enumerate(groups): 

        if i == 0:

            p = multiprocessing.Process(target = status_bar,
                args=(queue_stat,len(groups),len(combined)))
                processes.append(p)
                p.start()

        p = multiprocessing.Process(target = process_data,
        args=(queue_data, queue_stat, group))
        processes.append(p)
        p.start()

    for i in range(len(groups)):
        data = queue_data.get() 
        new_data += data

    for p in processes:
        p.join()

일부 답변은 진행률 표시줄에서 작동하지만 풀에서 결과를 가져올 수 없습니다.

tqdm을 사용하여 진행률 표시줄을 생성했습니다.pip install tqdm

아래의 간단한 코드는 진행률 표시줄에서 매우 잘 작동하며 결과도 얻을 수 있습니다.

from multiprocessing import Pool
from tqdm import tqdm
from time import sleep

tasks = range(5)
result = []

def do_work(x):
    # do something with x and return the result
    sleep(2)
    return x + 2

if __name__ == '__main__':
    pbar = tqdm(total=len(tasks))

    with Pool(2) as p:
        for i in p.imap_unordered(do_work, tasks):

            result.append(i)
            pbar.update(i)
    
    pbar.close()
    print(result)

언급URL : https://stackoverflow.com/questions/5666576/show-the-progress-of-a-python-multiprocessing-pool-imap-unordered-call

반응형