[筆記] python3 多執行緒與多核心平行計算

Multi-Thread 與 Multi-Process

*本篇資料來源為莫煩 python:
https://morvanzhou.github.io/
  • python threading 使用

    • 添加thread, 檢視thread, 執行thread
    #coding=utf-8 import threading def thread_job():
    # 把目前的 thread 顯示出來看看 print("This is an added Thread, number is {}\n".format(threading.current_thread())) def main(): # 添加一個 thread added_thread = threading.Thread(target=thread_job) # 執行 thread added_thread.start() # This is an added Thread, number is <Thread(Thread-1, started 123145466363904)> # 看目前有幾個 thread print(threading.active_count()) # 2 # 把所有的 thread 顯示出來看看 print(threading.enumerate()) # [<_MainThread(MainThread, started 140736627270592)>, <Thread(Thread-1, started 123145466363904)>] # 把目前的 thread 顯示出來看看 print(threading.current_thread()) #<_MainThread(MainThread, started 140736627270592)> if __name__ == '__main__': main()
    • join 的功用與用法
      例1
    # without join import threading import time def thread_job(): print('T1 start\n') for i in range(10): time.sleep(0.1) print('T1 finish') def main(): thread1 = threading.Thread(target=thread_job, name='T1') thread1.start() print('all done') if __name__=='__main__': main()
    output:
    因為 thread_job 裡面做的事情比較慢,所以T1 finish 在 all done 後面,也就是主線程(main thread)先執行完。
    
    T1 start
    all done
    
    T1 finish
    
    如果主線程要等 T1 做完的話,就要在 print(‘all done’) 前面使用 join()
    例1 使用 join():
    # use join import threading import time def thread_job(): print('T1 start\n') for i in range(10): time.sleep(0.1) print('T1 finish') def main(): thread1 = threading.Thread(target=thread_job, name='T1') thread1.start() print('all done') if __name__=='__main__': main()
    output:
    T1 start
    
    T1 finish
    all done
    
    例2:
    我們新增另一個 thread T2 他要做 T2_job
    # without join import threading import time def thread_job(): print('T1 start\n') for i in range(10): time.sleep(0.1) print('T1 finish') def T2_job(): print('T2 start') time.sleep(0.1) print('T2 finish') def main(): thread1 = threading.Thread(target=thread_job, name='T1') thread1.start() thread2 = threading.Thread(target=T2_job, name='T2') thread2.start() print('all done') if __name__=='__main__': main()
    output:
    T1 start
    T2 start
    
    all done
    T2 finish
    T1 finish
    
    例2 使用 join:
    # without join import threading import time def thread_job(): print('T1 start\n') for i in range(10): time.sleep(0.1) print('T1 finish') def T2_job(): print('T2 start') time.sleep(0.1) print('T2 finish') def main(): thread1 = threading.Thread(target=thread_job, name='T1') thread1.start() thread2 = threading.Thread(target=T2_job, name='T2') thread2.start() thread2.join() thread1.join() print('all done') if __name__=='__main__': main()
    output:
    T1 start
    
    T2 start
    T2 finish
    T1 finish
    all done
    
    • 使用 queue 來接收 thread 執行後回傳的資料
      目前我們的 thread 所做的事情是沒有回傳值的
      如果要讓 thread 運算執行後回傳資料的話,則需要使用 queue 來接。
      threading03_queue.py
    #coding=utf-8 import threading import time from queue import Queue def thread_job(arr, q): for i in range(len(arr)): arr[i] = arr[i]**2 q.put(arr) # 將結果放進 queue # return arr # 上面這一步驟取代了 return def multithreading(): q = Queue() # 宣告 Queue 物件 threads = [] # 用來放 thread 的 array data = [[1,2,3],[4,5,6],[7,7,7],[5,5,5]] for i in range(4): t = threading.Thread(target=thread_job, args=(data[i], q)) # 將 data 與 queue 傳入 thread 裡面 t.start() threads.append(t) for thread in threads: thread.join() # 每個 thread 都要做 join results = [] # 用來接收與顯示結果的 array for _ in range(4): results.append(q.get()) # 取出 queue 裡面的資料 print(results) # 顯示執行後的結果 if __name__=='__main__': multithreading() ''' 執行:python3 threading03_queue.py output: [[1, 4, 9], [16, 25, 36], [49, 49, 49], [25, 25, 25]] '''
    • lock的用法
      當我們執行多線程時,如果想要等一個線程做完再做其他線程
      就必須要使用 lock 先鎖住 thread 然後做完事情後再 release
    import threading def job1(): global A, lock lock.acquire() for i in range(10): A += 1 print('job1',A) lock.release() def job2(): global A, lock lock.acquire() for i in range(10): A += 10 print('job2',A) lock.release() if __name__=='__main__': lock = threading.Lock() A = 0 t1 = threading.Thread(target=job1) t2 = threading.Thread(target=job2) t1.start() t2.start() t1.join() t2.join()
    output:
    job1 1
    job1 2
    job1 3
    job1 4
    job1 5
    job1 6
    job1 7
    job1 8
    job1 9
    job1 10
    job2 20
    job2 30
    job2 40
    job2 50
    job2 60
    job2 70
    job2 80
    job2 90
    job2 100
    job2 110
    
  • python multiprocess

    • 基本用法跟Threading很像
    #coding=utf-8 import multiprocessing as mp import threading as td # 使用 multiprocessing 跟使用 threading 的方法非常相似 def job(a,b): print('doing job') # 宣告 t1 = td.Thread(target=job, args=(1,2)) p1 = mp.Process(target=job, args=(1,2)) # 開始 t1.start() p1.start() # join t1.join() p1.join()
    須在 main 裡面呼叫 multiprocess
    #coding=utf-8 import multiprocessing as mp def job(a,b): print('doing job') # 使用 multiprocessing 要在 main 裡面呼叫 if __name__=='__main__': p1 = mp.Process(target=job, args=(1,2)) p1.start() p1.join()
    • 使用Queue接function的回傳值
    #coding=utf-8 import multiprocessing as mp def job(q): result = 0 for i in range(10): result += i q.put(result) # 使用 multiprocessing 須在 main 裡面用 if __name__=='__main__': q = mp.Queue() # 使用 queue 接收 function 的回傳值 p1 = mp.Process(target=job, args=(q,)) # 特別注意 這邊的傳入參數只有一個的話,後面要有逗號 p2 = mp.Process(target=job, args=(q,)) p1.start() p2.start() p1.join() p2.join() res1 = q.get() res2 = q.get() print(res1) # 45 print(res2) # 45 print(res1+res2) # 90
    process 跟 process 之間彼此是獨立工作的,並不會共用同一份資料,所以 process 之間可以確保可平行化處理,跟 Thread 不同,Thread 有 shared data (可以直接共用 global 變數),且會受 GIL(Global Interpreter Lock) 的保護影響,執行起來不一定能平行化
    GIL簡介:
    http://blog.ephrain.net/python-python-gil-的問題/
    • 一般情形,multi-Thread 與 multi-Process 效能比較
    #coding=utf-8 import multiprocessing as mp import threading as td import time def job(q): result = 0 for i in range(100000): result += i+i**2+i**3 q.put(result) # 使用 multi-process def multicore(): q = mp.Queue() # 使用 queue 接收 function 的回傳值 p1 = mp.Process(target=job, args=(q,)) # 特別注意 這邊的傳入參數只有一個的話,後面要有逗號 p2 = mp.Process(target=job, args=(q,)) p1.start() p2.start() p1.join() p2.join() res1 = q.get() res2 = q.get() print('multicore:',res1+res2) # 使用 multi-thread def multithread(): q = mp.Queue() # multiprocess 的 queue 可以用在 thread t1 = td.Thread(target=job, args=(q,)) # 特別注意 這邊的傳入參數只有一個的話,後面要有逗號 t2 = td.Thread(target=job, args=(q,)) t1.start() t2.start() t1.join() t2.join() res1 = q.get() res2 = q.get() print('multithread:',res1+res2) def normal(): result = 0 # 因為上面都是 2 個 thread 或 2 個 process (core) # 所以要對比一般狀況的效能,要做兩次才公平 for _ in range(2): for i in range(100000): result += i+i**2+i**3 print('normal:',result) if __name__=='__main__': time0 = time.time() normal() time1 = time.time() print('normal time:', time1 - time0) multithread() time2 = time.time() print('multithread time:', time2 - time1) multicore() time3 = time.time() print('multicore time:', time3 - time2) ''' 結果 ('normal:', 49999666671666600000L) ('normal time:', 0.03933405876159668) ('multithread:', 49999666671666600000L) ('multithread time:', 0.06222200393676758) ('multicore:', 49999666671666600000L) ('multicore time:', 0.025996923446655273) '''
    根據以上實驗結果,我們發現 multi-process (multicore) 最有效率,所需時間最短。
    multi-thread 因為會受 GIL 影響,在某些 case 之下效率未必能比普通狀況還要好,所以要使用 multi-thread 來提升程式運算效能,要看任務的性質找適當的時機使用。
    • 使用Process Pool自動分配多核心,接收function回傳值
    #coding=utf-8 import multiprocessing as mp def job(x): return x*x def multicore(): # 使用 Pool 自動分配給 CPU 的每個一核心 (core) pool = mp.Pool() result = pool.map(job, range(10)) # 使用 pool 還可以接到 function 的回傳值 print(result) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] # 使用 Pool 指定分配給 CPU 的 3 核心 (core) pool = mp.Pool(processes=3) result = pool.map(job, range(10)) # 使用 pool 還可以接到 function 的回傳值 print(result) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] # 除了 map 功能以外,還有另一個功能: apply_async # apply_async 一次只能在一個核心中算一個東西 res = pool.apply_async(job,(2,)) # 這個只能把一個值放在一個核心運算一次 print(res.get()) # 4 # 如果要輸入一串的話,可以用迭代的方法搭配 apply_async,達到多核心計算的效能 multi_res = [pool.apply_async(job, (i,)) for i in range(10)] print([res.get() for res in multi_res]) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] if __name__=='__main__': multicore()
    • Shared Memory共享內存
    import multiprocessing as mp # 宣告共用變數 value1 = mp.Value('i', 0) value2 = mp.Value('d', 3.14) # 宣告共用陣列 array = mp.Array('i', [1, 2, 3, 4])
    multiprocess 的共享 Array 只能是一維
    • Process Lock
      以下是一個兩個 process 使用同一個 shared memory 的例子,先觀察一下如果沒有 lock 機制的情形:
    #coding=utf-8 import multiprocessing as mp import time def job(v, num): for _ in range(10): time.sleep(0.1) v.value += num # 使用共享資料取值要用 value print(v.value) def multicode(): v = mp.Value("i",0) # 宣告一個 process 之間共享的變數 p1 = mp.Process(target=job, args=(v,1)) # 把 v 傳值進去 p2 = mp.Process(target=job, args=(v,3)) p1.start() p2.start() p1.join() p2.join() if __name__ == '__main__': multicode()
    目前的 output
    1
    4
    5
    8
    9
    12
    13
    16
    19
    19
    20
    23
    24
    24
    25
    25
    26
    26
    27
    27
    
    可以看出兩個 process 是平行處理的,所以會隨機搶奪 shared 的資料去執行 function
    加上 Lock
    用法跟之前的 Thread Lock 很像
    #coding=utf-8 import multiprocessing as mp import time def job(v, num, loc): loc.acquire() for _ in range(10): time.sleep(0.1) v.value += num # 使用共享資料取值要用 value print(v.value) loc.release() def multicode(): loc = mp.Lock() # 宣告一個 Lock v = mp.Value("i",0) # 宣告一個 process 之間共享的變數 p1 = mp.Process(target=job, args=(v,1, loc)) # 把 v 傳值進去 p2 = mp.Process(target=job, args=(v,3, loc)) p1.start() p2.start() p1.join() p2.join() if __name__ == '__main__': multicode()
    output:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    13
    16
    19
    22
    25
    28
    31
    34
    37
    40
    
    有了 Lock 的保護機制,這次的 output 就很整齊,依序先做完 p1 做的事 (逐步加一) 再做 p2 做的事 (逐步加三)

留言

  1. 使用 queue 來接收 thread 執行後回傳的資料時,為什麼要對threads array 每個 thread做 join?
    在執行for 迴圈的 thread.start的時候不是都跑完了嗎?

    回覆刪除
    回覆
    1. 不好意思,最近太少來 blogger 來晚了 QQ
      的確,上面的例子當中做 join() 沒有什麼必要,加了 join() 也不會因此確保 array output 的順序,感謝大大指正 :D

      刪除

張貼留言

這個網誌中的熱門文章

[筆記] CRLF跟LF之區別 --- 隱形的 bug

[ML筆記] Batch Normalization

[筆記] 統計實習(1) SAS 基礎用法 (匯入資料並另存SAS新檔,SUBSTR,計算總和與平均,BMI)

[ML筆記] Ensemble - Bagging, Boosting & Stacking