python threading 使用
- 添加thread, 檢視thread, 執行thread
import threading
def thread_job():
print(
"This is an added Thread, number is {}\n".format(threading.current_thread()))
def main():
added_thread = threading.Thread(target=thread_job)
added_thread.start()
print(threading.active_count())
print(threading.enumerate())
print(threading.current_thread())
if __name__ ==
'__main__':
main()
import threading
import time
def thread_job():
print('T1 start\n')
for i in range(10):
time.sleep(0.1)
print('T1 finish')
def main():
thread1 = threading.Thread(target=thread_job, name='T1')
thread1.start()
print('all done')
if __name__=='__main__':
main()
output:
因為 thread_job 裡面做的事情比較慢,所以T1 finish 在 all done 後面,也就是主線程(main thread)先執行完。
T1 start
all done
T1 finish
如果主線程要等 T1 做完的話,就要在 print(‘all done’) 前面使用 join()
例1 使用 join():
import threading
import time
def thread_job():
print('T1 start\n')
for i in range(10):
time.sleep(0.1)
print('T1 finish')
def main():
thread1 = threading.Thread(target=thread_job, name='T1')
thread1.start()
print('all done')
if __name__=='__main__':
main()
output:
T1 start
T1 finish
all done
例2:
我們新增另一個 thread T2 他要做 T2_job
import threading
import time
def thread_job():
print('T1 start\n')
for i in range(10):
time.sleep(0.1)
print('T1 finish')
def T2_job():
print('T2 start')
time.sleep(0.1)
print('T2 finish')
def main():
thread1 = threading.Thread(target=thread_job, name='T1')
thread1.start()
thread2 = threading.Thread(target=T2_job, name='T2')
thread2.start()
print('all done')
if __name__=='__main__':
main()
output:
T1 start
T2 start
all done
T2 finish
T1 finish
例2 使用 join:
import threading
import time
def thread_job():
print('T1 start\n')
for i in range(10):
time.sleep(0.1)
print('T1 finish')
def T2_job():
print('T2 start')
time.sleep(0.1)
print('T2 finish')
def main():
thread1 = threading.Thread(target=thread_job, name='T1')
thread1.start()
thread2 = threading.Thread(target=T2_job, name='T2')
thread2.start()
thread2.join()
thread1.join()
print('all done')
if __name__=='__main__':
main()
output:
T1 start
T2 start
T2 finish
T1 finish
all done
- 使用 queue 來接收 thread 執行後回傳的資料
目前我們的 thread 所做的事情是沒有回傳值的
如果要讓 thread 運算執行後回傳資料的話,則需要使用 queue 來接。
threading03_queue.py
import threading
import time
from queue import Queue
def thread_job(arr, q):
for i in range(len(arr)):
arr[i] = arr[i]**2
q.put(arr)
def multithreading():
q = Queue()
threads = []
data = [[1,2,3],[4,5,6],[7,7,7],[5,5,5]]
for i in range(4):
t = threading.Thread(target=thread_job, args=(data[i], q))
t.start()
threads.append(t)
for thread in threads:
thread.join()
results = []
for _ in range(4):
results.append(q.get())
print(results)
if __name__=='__main__':
multithreading()
'''
執行:python3 threading03_queue.py
output:
[[1, 4, 9], [16, 25, 36], [49, 49, 49], [25, 25, 25]]
'''
- lock的用法
當我們執行多線程時,如果想要等一個線程做完再做其他線程
就必須要使用 lock 先鎖住 thread 然後做完事情後再 release
import threading
def job1():
global A, lock
lock.acquire()
for i in range(10):
A += 1
print('job1',A)
lock.release()
def job2():
global A, lock
lock.acquire()
for i in range(10):
A += 10
print('job2',A)
lock.release()
if __name__=='__main__':
lock = threading.Lock()
A = 0
t1 = threading.Thread(target=job1)
t2 = threading.Thread(target=job2)
t1.start()
t2.start()
t1.join()
t2.join()
output:
job1 1
job1 2
job1 3
job1 4
job1 5
job1 6
job1 7
job1 8
job1 9
job1 10
job2 20
job2 30
job2 40
job2 50
job2 60
job2 70
job2 80
job2 90
job2 100
job2 110
python multiprocess
import multiprocessing as mp
import threading as td
def job(a,b):
print('doing job')
t1 = td.Thread(target=job, args=(1,2))
p1 = mp.Process(target=job, args=(1,2))
t1.start()
p1.start()
t1.join()
p1.join()
須在 main 裡面呼叫 multiprocess
import multiprocessing as mp
def job(a,b):
print('doing job')
if __name__=='__main__':
p1 = mp.Process(target=job, args=(1,2))
p1.start()
p1.join()
import multiprocessing as mp
def job(q):
result = 0
for i in range(10):
result += i
q.put(result)
if __name__=='__main__':
q = mp.Queue()
p1 = mp.Process(target=job, args=(q,))
p2 = mp.Process(target=job, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
res1 = q.get()
res2 = q.get()
print(res1)
print(res2)
print(res1+res2)
- 一般情形,multi-Thread 與 multi-Process 效能比較
import multiprocessing as mp
import threading as td
import time
def job(q):
result = 0
for i in range(100000):
result += i+i**2+i**3
q.put(result)
def multicore():
q = mp.Queue()
p1 = mp.Process(target=job, args=(q,))
p2 = mp.Process(target=job, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
res1 = q.get()
res2 = q.get()
print('multicore:',res1+res2)
def multithread():
q = mp.Queue()
t1 = td.Thread(target=job, args=(q,))
t2 = td.Thread(target=job, args=(q,))
t1.start()
t2.start()
t1.join()
t2.join()
res1 = q.get()
res2 = q.get()
print('multithread:',res1+res2)
def normal():
result = 0
for _ in range(2):
for i in range(100000):
result += i+i**2+i**3
print('normal:',result)
if __name__=='__main__':
time0 = time.time()
normal()
time1 = time.time()
print('normal time:', time1 - time0)
multithread()
time2 = time.time()
print('multithread time:', time2 - time1)
multicore()
time3 = time.time()
print('multicore time:', time3 - time2)
''' 結果
('normal:', 49999666671666600000L)
('normal time:', 0.03933405876159668)
('multithread:', 49999666671666600000L)
('multithread time:', 0.06222200393676758)
('multicore:', 49999666671666600000L)
('multicore time:', 0.025996923446655273)
'''
根據以上實驗結果,我們發現 multi-process (multicore) 最有效率,所需時間最短。
multi-thread 因為會受 GIL 影響,在某些 case 之下效率未必能比普通狀況還要好,所以要使用 multi-thread 來提升程式運算效能,要看任務的性質找適當的時機使用。
- 使用Process Pool自動分配多核心,接收function回傳值
import multiprocessing as mp
def job(x):
return x*x
def multicore():
pool = mp.Pool()
result = pool.map(job, range(10))
print(result)
pool = mp.Pool(processes=3)
result = pool.map(job, range(10))
print(result)
res = pool.apply_async(job,(2,))
print(res.get())
multi_res = [pool.apply_async(job, (i,)) for i in range(10)]
print([res.get() for res in multi_res])
if __name__=='__main__':
multicore()
import multiprocessing as mp
value1 = mp.Value('i', 0)
value2 = mp.Value('d', 3.14)
array = mp.Array('i', [1, 2, 3, 4])
multiprocess 的共享 Array 只能是一維
- Process Lock
以下是一個兩個 process 使用同一個 shared memory 的例子,先觀察一下如果沒有 lock 機制的情形:
import multiprocessing as mp
import time
def job(v, num):
for _ in range(10):
time.sleep(0.1)
v.value += num
print(v.value)
def multicode():
v = mp.Value("i",0)
p1 = mp.Process(target=job, args=(v,1))
p2 = mp.Process(target=job, args=(v,3))
p1.start()
p2.start()
p1.join()
p2.join()
if __name__ == '__main__':
multicode()
目前的 output
1
4
5
8
9
12
13
16
19
19
20
23
24
24
25
25
26
26
27
27
可以看出兩個 process 是平行處理的,所以會隨機搶奪 shared 的資料去執行 function
加上 Lock
用法跟之前的 Thread Lock 很像
import multiprocessing as mp
import time
def job(v, num, loc):
loc.acquire()
for _ in range(10):
time.sleep(0.1)
v.value += num
print(v.value)
loc.release()
def multicode():
loc = mp.Lock()
v = mp.Value("i",0)
p1 = mp.Process(target=job, args=(v,1, loc))
p2 = mp.Process(target=job, args=(v,3, loc))
p1.start()
p2.start()
p1.join()
p2.join()
if __name__ == '__main__':
multicode()
output:
1
2
3
4
5
6
7
8
9
10
13
16
19
22
25
28
31
34
37
40
有了 Lock 的保護機制,這次的 output 就很整齊,依序先做完 p1 做的事 (逐步加一) 再做 p2 做的事 (逐步加三)
使用 queue 來接收 thread 執行後回傳的資料時,為什麼要對threads array 每個 thread做 join?
回覆刪除在執行for 迴圈的 thread.start的時候不是都跑完了嗎?
不好意思,最近太少來 blogger 來晚了 QQ
刪除的確,上面的例子當中做 join() 沒有什麼必要,加了 join() 也不會因此確保 array output 的順序,感謝大大指正 :D
感謝分享 非常詳盡
回覆刪除