多線程開(kāi)發(fā)可能遇到的問(wèn)題
假設(shè)兩個(gè)線程t1和t2都要對(duì)num=0進(jìn)行增1運(yùn)算,t1和t2都各對(duì)num修改1000000次,num的最終的結(jié)果應(yīng)該為2000000。但是由于是多線程訪問(wèn),有可能出現(xiàn)下面情況:
from threading import Thread import time num = 0 def test1(): global num for i in range(1000000): num += 1 print("--test1--num=%d" % num) def test2(): global num for i in range(1000000): num += 1 print("--test2--num=%d" % num) if __name__ == '__main__': Thread(target=test1).start() Thread(target=test2).start() print("num = %d" % num) """ num = 134116 --test1--num=1032814 --test2--num=1166243 """
運(yùn)行結(jié)果可能不一樣,但是結(jié)果往往不是2000000。問(wèn)題產(chǎn)生的原因就是沒(méi)有控制多個(gè)線程對(duì)同一資源的訪問(wèn),對(duì)數(shù)據(jù)造成破壞,使得線程運(yùn)行的結(jié)果不可預(yù)期。這種現(xiàn)象稱為“線程不安全”。
線程同步――使用互斥鎖
如果多個(gè)線程共同對(duì)某個(gè)數(shù)據(jù)修改,則可能出現(xiàn)不可預(yù)料的結(jié)果,為了保證數(shù)據(jù)的正確性,需要對(duì)多個(gè)線程進(jìn)行同步。
使用 Thread 對(duì)象的 Lock 和 Rlock 可以實(shí)現(xiàn)簡(jiǎn)單的線程同步,這兩個(gè)對(duì)象都有 acquire 方法和 release 方法,對(duì)于那些需要每次只允許一個(gè)線程操作的數(shù)據(jù),可以將其操作放到 acquire 和 release 方法之間。
使用互斥鎖實(shí)現(xiàn)上面的例子:
from threading import Thread, Lock import time num = 0 def test1(): global num # 上鎖 mutex.acquire() for i in range(1000000): num += 1 # 解鎖 mutex.release() print("--test1--num=%d" % num) def test2(): global num mutex.acquire() for i in range(1000000): num += 1 mutex.release() print("--test2--num=%d" % num) start_time = time.time() # 開(kāi)始時(shí)間 # 創(chuàng)建一把互斥鎖,默認(rèn)沒(méi)有上鎖 mutex = Lock() p1 = Thread(target=test1) p1.start() # time.sleep(3) # 取消屏蔽之后 再次運(yùn)行程序,結(jié)果會(huì)不一樣,,,為啥呢? p2 = Thread(target=test2) p2.start() p1.join() p2.join() end_time = time.time() # 結(jié)束時(shí)間 print("num = %d" % num) print("運(yùn)行時(shí)間:%fs" % (end_time - start_time)) # 結(jié)束時(shí)間-開(kāi)始時(shí)間 """ 輸出結(jié)果: --test1--num=1000000 --test2--num=2000000 num = 2000000 運(yùn)行時(shí)間:0.287206s """
同步的應(yīng)用――多個(gè)線程有序執(zhí)行
from threading import Lock, Thread from time import sleep class Task1(Thread): def run(self): while True: # 判斷是否上鎖成功,返回值為bool類型 if lock1.acquire(): print("--task1--") sleep(0.5) lock2.release() class Task2(Thread): def run(self): while True: if lock2.acquire(): print("--task2--") sleep(0.5) lock3.release() class Task3(Thread): def run(self): while True: if lock3.acquire(): print("--task3--") sleep(0.5) lock1.release() if __name__ == '__main__': # 創(chuàng)建一把鎖 lock1 = Lock() # 創(chuàng)建一把鎖,并且鎖上 lock2 = Lock() lock2.acquire() # 創(chuàng)建一把鎖,并且鎖上 lock3 = Lock() lock3.acquire() t1 = Task1() t2 = Task2() t3 = Task3() t1.start() t2.start() t3.start() """ --task1-- --task2-- --task3-- --task1-- --task2-- --task3-- --task1-- --task2-- ... """
生產(chǎn)者與消費(fèi)者模式
為什么要使用生產(chǎn)者和消費(fèi)者模式
在線程世界里,生產(chǎn)者就是生產(chǎn)數(shù)據(jù)的線程,消費(fèi)者就是消費(fèi)數(shù)據(jù)的線程。在多線程開(kāi)發(fā)當(dāng)中,如果生產(chǎn)者處理速度很快,而消費(fèi)者處理速度很慢,那么生產(chǎn)者就必須等待消費(fèi)者處理完,才能繼續(xù)生產(chǎn)數(shù)據(jù)。同樣的道理,如果消費(fèi)者的處理能力大于生產(chǎn)者,那么消費(fèi)者就必須等待生產(chǎn)者。為了解決這個(gè)問(wèn)題于是引入了生產(chǎn)者和消費(fèi)者模式。
什么是生產(chǎn)者消費(fèi)者模式
生產(chǎn)者消費(fèi)者模式是通過(guò)一個(gè)容器來(lái)解決生產(chǎn)者和消費(fèi)者的強(qiáng)耦合問(wèn)題。生產(chǎn)者和消費(fèi)者彼此之間不直接通訊,而通過(guò)阻塞隊(duì)列來(lái)進(jìn)行通訊,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費(fèi)者處理,直接扔給阻塞隊(duì)列,消費(fèi)者不找生產(chǎn)者要數(shù)據(jù),而是直接從阻塞隊(duì)列里取,阻塞隊(duì)列就相當(dāng)于一個(gè)緩沖區(qū),平衡了生產(chǎn)者和消費(fèi)者的處理能力。
Python的Queue模塊中提供了同步的、線程安全的隊(duì)列類,包括FIFO(先入先出)隊(duì)列Queue,LIFO(后入先出)隊(duì)列LifoQueue,和優(yōu)先級(jí)隊(duì)列PriorityQueue。這些隊(duì)列都實(shí)現(xiàn)了鎖原語(yǔ)(可以理解為原子操作,即要么不做,要么就做完),能夠在多線程中直接使用。可以使用隊(duì)列來(lái)實(shí)現(xiàn)線程間的同步。
用FIFO隊(duì)列實(shí)現(xiàn)上述生產(chǎn)者與消費(fèi)者問(wèn)題的代碼如下:
import threading import time from queue import Queue class Producer(threading.Thread): def run(self): global queue count = 0 while True: if queue.qsize() < 1000: for i in range(100): count += 1 msg = "生成產(chǎn)品" + str(count) queue.put(msg) print(msg) time.sleep(0.5) class Consumer(threading.Thread): def run(self): global queue while True: if queue.qsize() > 100: for i in range(3): msg = self.name + "消費(fèi)了" + queue.get() print(msg) time.sleep(0.5)
ThreadLocal
在多線程環(huán)境下,每個(gè)線程都有自己的數(shù)據(jù)。一個(gè)線程使用自己的局部變量比使用全局變量好,因?yàn)榫植孔兞恐挥芯€程自己能看見(jiàn),不會(huì)影響其他線程,而全局變量的修改必須加鎖。
ThreadLocal解決了參數(shù)在一個(gè)線程中各個(gè)函數(shù)之間互相傳遞的問(wèn)題
import threading """ ?個(gè)ThreadLocal變量雖然是全局變量,但每個(gè)線程都只能讀寫??線程的獨(dú) ?副本,互不?擾。 """ # 創(chuàng)建全局ThreadLocal對(duì)象: local_school = threading.local() def process_student(): # 獲取當(dāng)前線程關(guān)聯(lián)的student: std = local_school.student print('Hello, %s (in %s)' % (std, threading.current_thread().name)) def process_thread(name): # 綁定ThreadLocal的student: local_school.student = name process_student() t1 = threading.Thread(target=process_thread, args=('dongGe',), name="Thread-A") t2 = threading.Thread(target=process_thread, args=('?王',), name="Thread-B") t1.start() t2.start()
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號(hào)聯(lián)系: 360901061
您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺(jué)我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長(zhǎng)非常感激您!手機(jī)微信長(zhǎng)按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對(duì)您有幫助就好】元
