日韩久久久精品,亚洲精品久久久久久久久久久,亚洲欧美一区二区三区国产精品 ,一区二区福利

Python多進(jìn)程與多線程編程及GIL詳解

系統(tǒng) 1726 0

介紹如何使用python的multiprocess和threading模塊進(jìn)行多線程和多進(jìn)程編程。

Python的多進(jìn)程編程與multiprocess模塊

python的多進(jìn)程編程主要依靠multiprocess模塊。我們先對(duì)比兩段代碼,看看多進(jìn)程編程的優(yōu)勢(shì)。我們模擬了一個(gè)非常耗時(shí)的任務(wù),計(jì)算8的20次方,為了使這個(gè)任務(wù)顯得更耗時(shí),我們還讓它sleep 2秒。第一段代碼是單進(jìn)程計(jì)算(代碼如下所示),我們按順序執(zhí)行代碼,重復(fù)計(jì)算2次,并打印出總共耗時(shí)。

            
              import
            
            
               time

            
            
              import
            
            
               os

            
            
              def
            
            
               long_time_task():
    
            
            
              print
            
            (
            
              '
            
            
              當(dāng)前進(jìn)程: {}
            
            
              '
            
            
              .format(os.getpid()))
    time.sleep(
            
            2
            
              )
    
            
            
              print
            
            (
            
              "
            
            
              結(jié)果: {}
            
            
              "
            
            .format(8 ** 20
            
              ))


            
            
              if
            
            
              __name__
            
             == 
            
              "
            
            
              __main__
            
            
              "
            
            
              :
    
            
            
              print
            
            (
            
              '
            
            
              當(dāng)前母進(jìn)程: {}
            
            
              '
            
            
              .format(os.getpid()))
    start 
            
            =
            
               time.time()
    
            
            
              for
            
             i 
            
              in
            
             range(2
            
              ):
        long_time_task()

    end 
            
            =
            
               time.time()
    
            
            
              print
            
            (
            
              "
            
            
              用時(shí){}秒
            
            
              "
            
            .format((end-start)))
          

輸出結(jié)果如下,總共耗時(shí)4秒,至始至終只有一個(gè)進(jìn)程14236。看來電腦計(jì)算8的20次方基本不費(fèi)時(shí)。

          當(dāng)前母進(jìn)程: 14236
          
當(dāng)前進(jìn)程: 14236
結(jié)果: 1152921504606846976
當(dāng)前進(jìn)程: 14236
結(jié)果: 1152921504606846976
用時(shí)4.01080060005188秒

第2段代碼是多進(jìn)程計(jì)算代碼。我們利用multiprocess模塊的Process方法創(chuàng)建了兩個(gè)新的進(jìn)程p1和p2來進(jìn)行并行計(jì)算。Process方法接收兩個(gè)參數(shù), 第一個(gè)是target,一般指向函數(shù)名,第二個(gè)時(shí)args,需要向函數(shù)傳遞的參數(shù)。對(duì)于創(chuàng)建的新進(jìn)程,調(diào)用start()方法即可讓其開始。我們可以使用os.getpid()打印出當(dāng)前進(jìn)程的名字。

            
              from
            
             multiprocessing 
            
              import
            
            
               Process

            
            
              import
            
            
               os

            
            
              import
            
            
               time

            
            
              def
            
            
               long_time_task(i):
    
            
            
              print
            
            (
            
              '
            
            
              子進(jìn)程: {} - 任務(wù){(diào)}
            
            
              '
            
            
              .format(os.getpid(), i))
    time.sleep(
            
            2
            
              )
    
            
            
              print
            
            (
            
              "
            
            
              結(jié)果: {}
            
            
              "
            
            .format(8 ** 20
            
              ))

            
            
              if
            
            
              __name__
            
            ==
            
              '
            
            
              __main__
            
            
              '
            
            
              :
    
            
            
              print
            
            (
            
              '
            
            
              當(dāng)前母進(jìn)程: {}
            
            
              '
            
            
              .format(os.getpid()))
    start 
            
            =
            
               time.time()
    p1 
            
            = Process(target=long_time_task, args=(1
            
              ,))
    p2 
            
            = Process(target=long_time_task, args=(2
            
              ,))
    
            
            
              print
            
            (
            
              '
            
            
              等待所有子進(jìn)程完成。
            
            
              '
            
            
              )
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    end 
            
            =
            
               time.time()
    
            
            
              print
            
            (
            
              "
            
            
              總共用時(shí){}秒
            
            
              "
            
            .format((end - start)))
          

輸出結(jié)果如下所示,耗時(shí)變?yōu)?秒,時(shí)間減了一半,可見并發(fā)執(zhí)行的時(shí)間明顯比順序執(zhí)行要快很多。你還可以看到盡管我們只創(chuàng)建了兩個(gè)進(jìn)程,可實(shí)際運(yùn)行中卻包含里1個(gè)母進(jìn)程和2個(gè)子進(jìn)程。之所以我們使用join()方法就是為了讓母進(jìn)程阻塞,等待子進(jìn)程都完成后才打印出總共耗時(shí),否則輸出時(shí)間只是母進(jìn)程執(zhí)行的時(shí)間。

          當(dāng)前母進(jìn)程: 6920
          
等待所有子進(jìn)程完成。
子進(jìn)程: 17020 - 任務(wù)1
子進(jìn)程: 5904 - 任務(wù)2
結(jié)果: 1152921504606846976
結(jié)果: 1152921504606846976
總共用時(shí)2.131091356277466秒

知識(shí)點(diǎn):

  • 新創(chuàng)建的進(jìn)程與進(jìn)程的切換都是要耗資源的,所以平時(shí)工作中進(jìn)程數(shù)不能開太大。

  • 同時(shí)可以運(yùn)行的進(jìn)程數(shù)一般受制于CPU的核數(shù)。

  • 除了使用Process方法,我們還可以使用Pool類創(chuàng)建多進(jìn)程。

?

利用multiprocess模塊的Pool類創(chuàng)建多進(jìn)程

很多時(shí)候系統(tǒng)都需要?jiǎng)?chuàng)建多個(gè)進(jìn)程以提高CPU的利用率,當(dāng)數(shù)量較少時(shí),可以手動(dòng)生成一個(gè)個(gè)Process實(shí)例。當(dāng)進(jìn)程數(shù)量很多時(shí),或許可以利用循環(huán),但是這需要程序員手動(dòng)管理系統(tǒng)中并發(fā)進(jìn)程的數(shù)量,有時(shí)會(huì)很麻煩。這時(shí)進(jìn)程池Pool就可以發(fā)揮其功效了。可以通過傳遞參數(shù)限制并發(fā)進(jìn)程的數(shù)量,默認(rèn)值為CPU的核數(shù)。?

Pool類可以提供指定數(shù)量的進(jìn)程供用戶調(diào)用,當(dāng)有新的請(qǐng)求提交到Pool中時(shí),如果進(jìn)程池還沒有滿,就會(huì)創(chuàng)建一個(gè)新的進(jìn)程來執(zhí)行請(qǐng)求。如果池滿,請(qǐng)求就會(huì)告知先等待,直到池中有進(jìn)程結(jié)束,才會(huì)創(chuàng)建新的進(jìn)程來執(zhí)行這些請(qǐng)求。?

下面介紹一下multiprocessing 模塊下的Pool類的幾個(gè)方法:

1.apply_async

函數(shù)原型:apply_async(func[, args=()[, kwds={}[, callback=None]]])

其作用是向進(jìn)程池提交需要執(zhí)行的函數(shù)及參數(shù),?各個(gè)進(jìn)程采用非阻塞(異步)的調(diào)用方式,即每個(gè)子進(jìn)程只管運(yùn)行自己的,不管其它進(jìn)程是否已經(jīng)完成。

2.map()

函數(shù)原型:map(func, iterable[, chunksize=None])

Pool類中的map方法,與內(nèi)置的map函數(shù)用法行為基本一致,它會(huì)使進(jìn)程阻塞直到結(jié)果返回。?注意:雖然第二個(gè)參數(shù)是一個(gè)迭代器,但在實(shí)際使用中,必須在整個(gè)隊(duì)列都就緒后,程序才會(huì)運(yùn)行子進(jìn)程。

3.map_async()

函數(shù)原型:map_async(func, iterable[, chunksize[, callback]])
與map用法一致,但是它是非阻塞的。其有關(guān)事項(xiàng)見apply_async。

4.close()

關(guān)閉進(jìn)程池(pool),使其不在接受新的任務(wù)。

5. terminate()

結(jié)束工作進(jìn)程,不在處理未處理的任務(wù)。

6.join()

主進(jìn)程阻塞等待子進(jìn)程的退出, join方法要在close或terminate之后使用。

?

下例是一個(gè)簡(jiǎn)單的multiprocessing.Pool類的實(shí)例。因?yàn)樾【幬业腃PU是4核的,一次最多可以同時(shí)運(yùn)行4個(gè)進(jìn)程,所以我開啟了一個(gè)容量為4的進(jìn)程池。4個(gè)進(jìn)程需要計(jì)算5次,你可以想象4個(gè)進(jìn)程并行4次計(jì)算任務(wù)后,還剩一次計(jì)算任務(wù)(任務(wù)4)沒有完成,系統(tǒng)會(huì)等待4個(gè)進(jìn)程完成后重新安排一個(gè)進(jìn)程來計(jì)算。

            
              from
            
             multiprocessing 
            
              import
            
            
               Pool, cpu_count

            
            
              import
            
            
               os

            
            
              import
            
            
               time

            
            
              def
            
            
               long_time_task(i):
    
            
            
              print
            
            (
            
              '
            
            
              子進(jìn)程: {} - 任務(wù){(diào)}
            
            
              '
            
            
              .format(os.getpid(), i))
    time.sleep(
            
            2
            
              )
    
            
            
              print
            
            (
            
              "
            
            
              結(jié)果: {}
            
            
              "
            
            .format(8 ** 20
            
              ))

            
            
              if
            
            
              __name__
            
            ==
            
              '
            
            
              __main__
            
            
              '
            
            
              :
    
            
            
              print
            
            (
            
              "
            
            
              CPU內(nèi)核數(shù):{}
            
            
              "
            
            
              .format(cpu_count()))
    
            
            
              print
            
            (
            
              '
            
            
              當(dāng)前母進(jìn)程: {}
            
            
              '
            
            
              .format(os.getpid()))
    start 
            
            =
            
               time.time()
    p 
            
            = Pool(4
            
              )
    
            
            
              for
            
             i 
            
              in
            
             range(5
            
              ):
        p.apply_async(long_time_task, args
            
            =
            
              (i,))
    
            
            
              print
            
            (
            
              '
            
            
              等待所有子進(jìn)程完成。
            
            
              '
            
            
              )
    p.close()
    p.join()
    end 
            
            =
            
               time.time()
    
            
            
              print
            
            (
            
              "
            
            
              總共用時(shí){}秒
            
            
              "
            
            .format((end - start)))
          

知識(shí)點(diǎn): ?

  • 對(duì)Pool對(duì)象調(diào)用join()方法會(huì)等待所有子進(jìn)程執(zhí)行完畢,調(diào)用join()之前必須先調(diào)用close()或terminate()方法,讓其不再接受新的Process了。

?輸出結(jié)果如下所示,5個(gè)任務(wù)(每個(gè)任務(wù)大約耗時(shí)2秒)使用多進(jìn)程并行計(jì)算只需4.37秒,, 耗時(shí)減少了60%。

          CPU內(nèi)核數(shù):4
          
當(dāng)前母進(jìn)程: 2556
等待所有子進(jìn)程完成。
子進(jìn)程: 16480 - 任務(wù)0
子進(jìn)程: 15216 - 任務(wù)1
子進(jìn)程: 15764 - 任務(wù)2
子進(jìn)程: 10176 - 任務(wù)3
結(jié)果: 1152921504606846976
結(jié)果: 1152921504606846976
子進(jìn)程: 15216 - 任務(wù)4
結(jié)果: 1152921504606846976
結(jié)果: 1152921504606846976
結(jié)果: 1152921504606846976
總共用時(shí)4.377134561538696秒

?相信大家都知道python解釋器中存在GIL(全局解釋器鎖), 它的作用就是保證同一時(shí)刻只有一個(gè)線程可以執(zhí)行代碼。由于GIL的存在,很多人認(rèn)為python中的多線程其實(shí)并不是真正的多線程,如果想要充分地使用多核CPU的資源,在python中大部分情況需要使用多進(jìn)程。然而這并意味著python多線程編程沒有意義哦,請(qǐng)繼續(xù)閱讀下文。

? ?多進(jìn)程間的數(shù)據(jù)共享與通信

?通常,進(jìn)程之間是相互獨(dú)立的,每個(gè)進(jìn)程都有獨(dú)立的內(nèi)存。通過共享內(nèi)存(nmap模塊),進(jìn)程之間可以共享對(duì)象,使多個(gè)進(jìn)程可以訪問同一個(gè)變量(地址相同,變量名可能不同)。多進(jìn)程共享資源必然會(huì)導(dǎo)致進(jìn)程間相互競(jìng)爭(zhēng),所以應(yīng)該盡最大可能防止使用共享狀態(tài)。還有一種方式就是使用隊(duì)列queue來實(shí)現(xiàn)不同進(jìn)程間的通信或數(shù)據(jù)共享,這一點(diǎn)和多線程編程類似。

            
              from
            
             multiprocessing 
            
              import
            
            
               Process, Queue

            
            
              import
            
            
               os, time, random

            
            
              #
            
            
               寫數(shù)據(jù)進(jìn)程執(zhí)行的代碼:
            
            
              def
            
            
               write(q):
    
            
            
              print
            
            (
            
              '
            
            
              Process to write: {}
            
            
              '
            
            
              .format(os.getpid()))
    
            
            
              for
            
             value 
            
              in
            
             [
            
              '
            
            
              A
            
            
              '
            
            , 
            
              '
            
            
              B
            
            
              '
            
            , 
            
              '
            
            
              C
            
            
              '
            
            
              ]:
        
            
            
              print
            
            (
            
              '
            
            
              Put %s to queue...
            
            
              '
            
             %
            
               value)
        q.put(value)
        time.sleep(random.random())

            
            
              #
            
            
               讀數(shù)據(jù)進(jìn)程執(zhí)行的代碼:
            
            
              def
            
            
               read(q):
    
            
            
              print
            
            (
            
              '
            
            
              Process to read:{}
            
            
              '
            
            
              .format(os.getpid()))
    
            
            
              while
            
            
               True:
        value 
            
            =
            
               q.get(True)
        
            
            
              print
            
            (
            
              '
            
            
              Get %s from queue.
            
            
              '
            
             %
            
               value)

            
            
              if
            
            
              __name__
            
            ==
            
              '
            
            
              __main__
            
            
              '
            
            
              :
    
            
            
              #
            
            
               父進(jìn)程創(chuàng)建Queue,并傳給各個(gè)子進(jìn)程:
            
            
   q =
            
               Queue()
    pw 
            
            = Process(target=write, args=
            
              (q,))
    pr 
            
            = Process(target=read, args=
            
              (q,))
    
            
            
              #
            
            
               啟動(dòng)子進(jìn)程pw,寫入:
            
            
                  pw.start()
    
            
            
              #
            
            
               啟動(dòng)子進(jìn)程pr,讀取:
            
            
                  pr.start()
    
            
            
              #
            
            
               等待pw結(jié)束:
            
            
                  pw.join()
    
            
            
              #
            
            
               pr進(jìn)程里是死循環(huán),無法等待其結(jié)束,只能強(qiáng)行終止:
            
            
    pr.terminate()
          

下例這段代碼中中創(chuàng)建了2個(gè)獨(dú)立進(jìn)程,一個(gè)負(fù)責(zé)寫(pw), 一個(gè)負(fù)責(zé)讀(pr), 實(shí)現(xiàn)了共享一個(gè)隊(duì)列queue。

輸出結(jié)果如下所示:

          Process to write: 3036
          
Put A to queue...
Process to read:9408
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.

? Python的多線程編程與threading模塊

?python 3中的多進(jìn)程編程主要依靠threading模塊。創(chuàng)建新線程與創(chuàng)建新進(jìn)程的方法非常類似。threading.Thread方法可以接收兩個(gè)參數(shù), 第一個(gè)是target,一般指向函數(shù)名,第二個(gè)時(shí)args,需要向函數(shù)傳遞的參數(shù)。對(duì)于創(chuàng)建的新線程,調(diào)用start()方法即可讓其開始。我們還可以使用current_thread().name打印出當(dāng)前線程的名字。 下例中我們使用多線程技術(shù)重構(gòu)之前的計(jì)算代碼。

            
              import
            
            
               threading

            
            
              import
            
            
               time

            
            
              def
            
            
               long_time_task(i):
    
            
            
              print
            
            (
            
              '
            
            
              當(dāng)前子線程: {} - 任務(wù){(diào)}
            
            
              '
            
            
              .format(threading.current_thread().name, i))
    time.sleep(
            
            2
            
              )
    
            
            
              print
            
            (
            
              "
            
            
              結(jié)果: {}
            
            
              "
            
            .format(8 ** 20
            
              ))

            
            
              if
            
            
              __name__
            
            ==
            
              '
            
            
              __main__
            
            
              '
            
            
              :
    start 
            
            =
            
               time.time()
    
            
            
              print
            
            (
            
              '
            
            
              這是主線程:{}
            
            
              '
            
            
              .format(threading.current_thread().name))
    t1 
            
            = threading.Thread(target=long_time_task, args=(1
            
              ,))
    t2 
            
            = threading.Thread(target=long_time_task, args=(2
            
              ,))
    t1.start()
    t2.start()
    end 
            
            =
            
               time.time()
    
            
            
              print
            
            (
            
              "
            
            
              總共用時(shí){}秒
            
            
              "
            
            .format((end -
            
               start)))
              

下面是輸出結(jié)果。為什么總耗時(shí)居然是0秒??我們可以明顯看到主線程和子線程其實(shí)是獨(dú)立運(yùn)行的,主線程根本沒有等子線程完成,而是自己結(jié)束后就打印了消耗時(shí)間。主線程結(jié)束后,子線程仍在獨(dú)立運(yùn)行,這顯然不是我們想要的。

          這是主線程:MainThread
          
當(dāng)前子線程: Thread-1 - 任務(wù)1
當(dāng)前子線程: Thread-2 - 任務(wù)2
總共用時(shí)0.0017192363739013672秒
結(jié)果: 1152921504606846976
結(jié)果: 1152921504606846976

如果要實(shí)現(xiàn)主線程和子線程的同步,我們必需使用join方法(代碼如下所示)。

            
              import
            
            
               threading

            
            
              import
            
            
               time

            
            
              def
            
            
               long_time_task(i):
    
            
            
              print
            
            (
            
              '
            
            
              當(dāng)前子線程: {} 任務(wù){(diào)}
            
            
              '
            
            
              .format(threading.current_thread().name, i))
    time.sleep(
            
            2
            
              )
    
            
            
              print
            
            (
            
              "
            
            
              結(jié)果: {}
            
            
              "
            
            .format(8 ** 20
            
              ))

            
            
              if
            
            
              __name__
            
            ==
            
              '
            
            
              __main__
            
            
              '
            
            
              :
    start 
            
            =
            
               time.time()
    
            
            
              print
            
            (
            
              '
            
            
              這是主線程:{}
            
            
              '
            
            
              .format(threading.current_thread().name))
    thread_list 
            
            =
            
               []
    
            
            
              for
            
             i 
            
              in
            
             range(1, 3
            
              ):
        t 
            
            = threading.Thread(target=long_time_task, args=
            
              (i, ))
        thread_list.append(t)
    
            
            
              for
            
             t 
            
              in
            
            
               thread_list:
        t.start()
    
            
            
              for
            
             t 
            
              in
            
            
               thread_list:
        t.join()
    end 
            
            =
            
               time.time()
    
            
            
              print
            
            (
            
              "
            
            
              總共用時(shí){}秒
            
            
              "
            
            .format((end - start)))
          

修改代碼后的輸出如下所示。這時(shí)你可以看到主線程在等子線程完成后才答應(yīng)出總消耗時(shí)間(2秒),比正常順序執(zhí)行代碼(4秒)還是節(jié)省了不少時(shí)間。

          這是主線程:MainThread
          
當(dāng)前子線程: Thread - 1 任務(wù)1
當(dāng)前子線程: Thread - 2 任務(wù)2
結(jié)果: 1152921504606846976
結(jié)果: 1152921504606846976
總共用時(shí)2.0166890621185303秒

當(dāng)我們?cè)O(shè)置多線程時(shí),主線程會(huì)創(chuàng)建多個(gè)子線程,在python中,默認(rèn)情況下主線程和子線程獨(dú)立運(yùn)行互不干涉。如果希望讓主線程等待子線程實(shí)現(xiàn)線程的同步,我們需要使用join()方法。如果我們希望一個(gè)主線程結(jié)束時(shí)不再執(zhí)行子線程,我們應(yīng)該怎么辦呢? 我們可以使用t.setDaemon(True),代碼如下所示。

            
              import
            
            
               threading

            
            
              import
            
            
               time

            
            
              def
            
            
               long_time_task():
    
            
            
              print
            
            (
            
              '
            
            
              當(dāng)子線程: {}
            
            
              '
            
            
              .format(threading.current_thread().name))
    time.sleep(
            
            2
            
              )
    
            
            
              print
            
            (
            
              "
            
            
              結(jié)果: {}
            
            
              "
            
            .format(8 ** 20
            
              ))

            
            
              if
            
            
              __name__
            
            ==
            
              '
            
            
              __main__
            
            
              '
            
            
              :
    start 
            
            =
            
               time.time()
    
            
            
              print
            
            (
            
              '
            
            
              這是主線程:{}
            
            
              '
            
            
              .format(threading.current_thread().name))
    
            
            
              for
            
             i 
            
              in
            
             range(5
            
              ):
        t 
            
            = threading.Thread(target=long_time_task, args=
            
              ())
        t.setDaemon(True)
        t.start()
    end 
            
            =
            
               time.time()
    
            
            
              print
            
            (
            
              "
            
            
              總共用時(shí){}秒
            
            
              "
            
            .format((end - start)))
          

通過繼承Thread類重寫run方法創(chuàng)建新線程

?除了使用Thread()方法創(chuàng)建新的線程外,我們還可以通過繼承Thread類重寫run方法創(chuàng)建新的線程,這種方法更靈活。下例中我們自定義的類為MyThread, 隨后我們通過該類的實(shí)例化創(chuàng)建了2個(gè)子線程。

            
              #
            
            
              -*- encoding:utf-8 -*-
            
            
              import
            
            
               threading

            
            
              import
            
            
               time

            
            
              def
            
            
               long_time_task(i):
    time.sleep(
            
            2
            
              )
    
            
            
              return
            
             8**20

            
              class
            
            
               MyThread(threading.Thread):
    
            
            
              def
            
            
              __init__
            
            (self, func, args , name=
            
              ''
            
            
              , ):
        threading.Thread.
            
            
              __init__
            
            
              (self)
        self.func 
            
            =
            
               func
        self.args 
            
            =
            
               args
        self.name 
            
            =
            
               name
        self.result 
            
            =
            
               None
    
            
            
              def
            
            
               run(self):
        
            
            
              print
            
            (
            
              '
            
            
              開始子進(jìn)程{}
            
            
              '
            
            
              .format(self.name))
        self.result 
            
            =
            
               self.func(self.args[0],)
        
            
            
              print
            
            (
            
              "
            
            
              結(jié)果: {}
            
            
              "
            
            
              .format(self.result))
        
            
            
              print
            
            (
            
              '
            
            
              結(jié)束子進(jìn)程{}
            
            
              '
            
            
              .format(self.name))

            
            
              if
            
            
              __name__
            
            ==
            
              '
            
            
              __main__
            
            
              '
            
            
              :
    start 
            
            =
            
               time.time()
    threads 
            
            =
            
               []
    
            
            
              for
            
             i 
            
              in
            
             range(1, 3
            
              ):
        t 
            
            =
            
               MyThread(long_time_task, (i,), str(i))
        threads.append(t)
    
            
            
              for
            
             t 
            
              in
            
            
               threads:
        t.start()
    
            
            
              for
            
             t 
            
              in
            
            
               threads:
        t.join()
    end 
            
            =
            
               time.time()
    
            
            
              print
            
            (
            
              "
            
            
              總共用時(shí){}秒
            
            
              "
            
            .format((end - start)))
          

輸出結(jié)果如下所示:

          開始子進(jìn)程1
          
開始子進(jìn)程2
結(jié)果: 1152921504606846976
結(jié)果: 1152921504606846976
結(jié)束子進(jìn)程1
結(jié)束子進(jìn)程2
總共用時(shí)2.005445718765259秒

? 不同線程間的數(shù)據(jù)共享

一個(gè)進(jìn)程所含的不同線程間共享內(nèi)存,這就意味著任何一個(gè)變量都可以被任何一個(gè)線程修改,因此線程之間共享數(shù)據(jù)最大的危險(xiǎn)在于多個(gè)線程同時(shí)改一個(gè)變量,把內(nèi)容給改亂了。如果不同線程間有共享的變量,其中一個(gè)方法就是在修改前給其上一把鎖lock,確保一次只有一個(gè)線程能修改它。threading.lock()方法可以輕易實(shí)現(xiàn)對(duì)一個(gè)共享變量的鎖定,修改完后release供其它線程使用。比如下例中賬戶余額balance是一個(gè)共享變量,使用lock可以使其不被改亂。

            
              #
            
            
               -*- coding: utf-8 -*
            
            
              import
            
            
               threading

            
            
              class
            
            
               Account:
    
            
            
              def
            
            
              __init__
            
            
              (self):
        self.balance 
            
            =
            
               0
    
            
            
              def
            
            
               add(self, lock):
        
            
            
              #
            
            
               獲得鎖
            
            
                      lock.acquire()
        
            
            
              for
            
             i 
            
              in
            
             range(0, 100000
            
              ):
            self.balance 
            
            += 1
        
            
              #
            
            
               釋放鎖
            
            
                      lock.release()
    
            
            
              def
            
            
               delete(self, lock):
        
            
            
              #
            
            
               獲得鎖
            
            
                      lock.acquire()
        
            
            
              for
            
             i 
            
              in
            
             range(0, 100000
            
              ):
            self.balance 
            
            -= 1
            
            
              #
            
            
               釋放鎖
            
            
                      lock.release()

            
            
              if
            
            
              __name__
            
             == 
            
              "
            
            
              __main__
            
            
              "
            
            
              :
    account 
            
            =
            
               Account()
    lock 
            
            =
            
               threading.Lock()
    
            
            
              #
            
            
               創(chuàng)建線程
            
            
   thread_add = threading.Thread(target=account.add, args=(lock,), name=
            
              '
            
            
              Add
            
            
              '
            
            
              )
    thread_delete 
            
            = threading.Thread(target=account.delete, args=(lock,), name=
            
              '
            
            
              Delete
            
            
              '
            
            
              )
    
            
            
              #
            
            
               啟動(dòng)線程
            
            
                 thread_add.start()
    thread_delete.start()
    
            
            
              #
            
            
               等待線程結(jié)束
            
            
                 thread_add.join()
    thread_delete.join()
    
            
            
              print
            
            (
            
              '
            
            
              The final balance is: {}
            
            
              '
            
            .format(account.balance))
          

?

另一種實(shí)現(xiàn)不同線程間數(shù)據(jù)共享的方法就是使用消息隊(duì)列queue。不像列表,queue是線程安全的,可以放心使用,見下文。

? 使用queue隊(duì)列通信-經(jīng)典的生產(chǎn)者和消費(fèi)者模型

下例中創(chuàng)建了兩個(gè)線程,一個(gè)負(fù)責(zé)生成,一個(gè)負(fù)責(zé)消費(fèi),所生成的產(chǎn)品存放在queue里,實(shí)現(xiàn)了不同線程間溝通。

            
              from
            
             queue 
            
              import
            
            
               Queue

            
            
              import
            
            
               random, threading, time

            
            
              #
            
            
               生產(chǎn)者類
            
            
              class
            
            
               Producer(threading.Thread):
    
            
            
              def
            
            
              __init__
            
            
              (self, name, queue):
        threading.Thread.
            
            
              __init__
            
            (self, name=
            
              name)
        self.queue 
            
            =
            
               queue
    
            
            
              def
            
            
               run(self):
        
            
            
              for
            
             i 
            
              in
            
             range(1, 5
            
              ):
            
            
            
              print
            
            (
            
              "
            
            
              {} is producing {} to the queue!
            
            
              "
            
            
              .format(self.getName(), i))
            self.queue.put(i)
            time.sleep(random.randrange(
            
            10) / 5
            
              )
        
            
            
              print
            
            (
            
              "
            
            
              %s finished!
            
            
              "
            
             %
            
               self.getName())

            
            
              #
            
            
               消費(fèi)者類
            
            
              class
            
            
               Consumer(threading.Thread):
    
            
            
              def
            
            
              __init__
            
            
              (self, name, queue):
        threading.Thread.
            
            
              __init__
            
            (self, name=
            
              name)
        self.queue 
            
            =
            
               queue

    
            
            
              def
            
            
               run(self):
        
            
            
              for
            
             i 
            
              in
            
             range(1, 5
            
              ):
            val 
            
            =
            
               self.queue.get()
            
            
            
              print
            
            (
            
              "
            
            
              {} is consuming {} in the queue.
            
            
              "
            
            
              .format(self.getName(), val))
            time.sleep(random.randrange(
            
            10
            
              ))
        
            
            
              print
            
            (
            
              "
            
            
              %s finished!
            
            
              "
            
             %
            
               self.getName())

            
            
              def
            
            
               main():
    queue 
            
            =
            
               Queue()
    producer 
            
            = Producer(
            
              '
            
            
              Producer
            
            
              '
            
            
              , queue)
    consumer 
            
            = Consumer(
            
              '
            
            
              Consumer
            
            
              '
            
            
              , queue)
    producer.start()
    consumer.start()
    producer.join()
    consumer.join()
    
            
            
              print
            
            (
            
              '
            
            
              All threads finished!
            
            
              '
            
            
              )

            
            
              if
            
            
              __name__
            
             == 
            
              '
            
            
              __main__
            
            
              '
            
            
              :
    main()
            
          

隊(duì)列queue的put方法可以將一個(gè)對(duì)象obj放入隊(duì)列中。如果隊(duì)列已滿,此方法將阻塞至隊(duì)列有空間可用為止。queue的get方法一次返回隊(duì)列中的一個(gè)成員。如果隊(duì)列為空,此方法將阻塞至隊(duì)列中有成員可用為止。queue同時(shí)還自帶emtpy(), full()等方法來判斷一個(gè)隊(duì)列是否為空或已滿,但是這些方法并不可靠,因?yàn)槎嗑€程和多進(jìn)程,在返回結(jié)果和使用結(jié)果之間,隊(duì)列中可能添加/刪除了成員。

? Python多進(jìn)程和多線程哪個(gè)快?

?由于GIL的存在,很多人認(rèn)為Python多進(jìn)程編程更快,針對(duì)多核CPU,理論上來說也是采用多進(jìn)程更能有效利用資源。網(wǎng)上很多人已做過比較,我直接告訴你結(jié)論吧。

  • 對(duì)CPU密集型代碼(比如循環(huán)計(jì)算) - 多進(jìn)程效率更高

  • 對(duì)IO密集型代碼(比如文件操作,網(wǎng)絡(luò)爬蟲) - 多線程效率更高。?

為什么是這樣呢?其實(shí)也不難理解。對(duì)于IO密集型操作,大部分消耗時(shí)間其實(shí)是等待時(shí)間,在等待時(shí)間中CPU是不需要工作的,那你在此期間提供雙CPU資源也是利用不上的,相反對(duì)于CPU密集型代碼,2個(gè)CPU干活肯定比一個(gè)CPU快很多。那么為什么多線程會(huì)對(duì)IO密集型代碼有用呢?這因是為python碰到等待會(huì)釋放GIL供新的線程使用,實(shí)現(xiàn)了線程間的切換。

?

GIL是什么

首先需要明確的一點(diǎn)是GIL并不是Python的特性,它是在實(shí)現(xiàn)Python解析器(CPython)時(shí)所引入的一個(gè)概念。就好比C++是一套語言(語法)標(biāo)準(zhǔn),但是可以用不同的編譯器來編譯成可執(zhí)行代碼。有名的編譯器例如GCC,INTEL C++,Visual C++等。Python也一樣,同樣一段代碼可以通過CPython,PyPy,Psyco等不同的Python執(zhí)行環(huán)境來執(zhí)行。像其中的JPython就沒有GIL。然而因?yàn)镃Python是大部分環(huán)境下默認(rèn)的Python執(zhí)行環(huán)境。所以在很多人的概念里CPython就是Python,也就想當(dāng)然的把GIL歸結(jié)為Python語言的缺陷。所以這里要先明確一點(diǎn):GIL并不是Python的特性,Python完全可以不依賴于GIL。

GIL:? 一個(gè)防止多線程并發(fā)執(zhí)行機(jī)器碼的一個(gè)Mutex,乍一看就是個(gè)BUG般存在的全局鎖嘛!別急,我們下面慢慢的分析。

為什么會(huì)有GIL

由于物理上得限制,各CPU廠商在核心頻率上的比賽已經(jīng)被多核所取代。為了更有效的利用多核處理器的性能,就出現(xiàn)了多線程的編程方式,而隨之帶來的就是線程間數(shù)據(jù)一致性和狀態(tài)同步的困難。即使在CPU內(nèi)部的Cache也不例外,為了有效解決多份緩存之間的數(shù)據(jù)同步時(shí)各廠商花費(fèi)了不少心思,也不可避免的帶來了一定的性能損失。

Python當(dāng)然也逃不開,為了利用多核,Python開始支持多線程。而解決多線程之間數(shù)據(jù)完整性和狀態(tài)同步的最簡(jiǎn)單方法自然就是加鎖。 于是有了GIL這把超級(jí)大鎖,而當(dāng)越來越多的代碼庫(kù)開發(fā)者接受了這種設(shè)定后,他們開始大量依賴這種特性(即默認(rèn)python內(nèi)部對(duì)象是thread-safe的,無需在實(shí)現(xiàn)時(shí)考慮額外的內(nèi)存鎖和同步操作)。

慢慢的這種實(shí)現(xiàn)方式被發(fā)現(xiàn)是蛋疼且低效的。但當(dāng)大家試圖去拆分和去除GIL的時(shí)候,發(fā)現(xiàn)大量庫(kù)代碼開發(fā)者已經(jīng)重度依賴GIL而非常難以去除了。有多難?做個(gè)類比,像MySQL這樣的“小項(xiàng)目”為了把Buffer Pool Mutex這把大鎖拆分成各個(gè)小鎖也花了從5.5到5.6再到5.7多個(gè)大版為期近5年的時(shí)間,本且仍在繼續(xù)。MySQL這個(gè)背后有公司支持且有固定開發(fā)團(tuán)隊(duì)的產(chǎn)品走的如此艱難,那又更何況Python這樣核心開發(fā)和代碼貢獻(xiàn)者高度社區(qū)化的團(tuán)隊(duì)呢?

所以簡(jiǎn)單的說GIL的存在更多的是歷史原因。如果推到重來,多線程的問題依然還是要面對(duì),但是至少會(huì)比目前GIL這種方式會(huì)更優(yōu)雅。

? GIL的影響

從上文的介紹和官方的定義來看,GIL無疑就是一把全局排他鎖。毫無疑問全局鎖的存在會(huì)對(duì)多線程的效率有不小影響。甚至就幾乎等于Python是個(gè)單線程的程序。
那么讀者就會(huì)說了,全局鎖只要釋放的勤快效率也不會(huì)差啊。只要在進(jìn)行耗時(shí)的IO操作的時(shí)候,能釋放GIL,這樣也還是可以提升運(yùn)行效率的嘛。或者說再差也不會(huì)比單線程的效率差吧。理論上是這樣,而實(shí)際上呢?Python比你想的更糟。

下面我們就對(duì)比下Python在多線程和單線程下得效率對(duì)比。測(cè)試方法很簡(jiǎn)單,一個(gè)循環(huán)1億次的計(jì)數(shù)器函數(shù)。一個(gè)通過單線程執(zhí)行兩次,一個(gè)多線程執(zhí)行。最后比較執(zhí)行總時(shí)間。測(cè)試環(huán)境為雙核的Mac pro。注:為了減少線程庫(kù)本身性能損耗對(duì)測(cè)試結(jié)果帶來的影響,這里單線程的代碼同樣使用了線程。只是順序的執(zhí)行兩次,模擬單線程。

順序執(zhí)行的單線程(single_thread.py)

                    
                      #
                    
                    
                      ! /usr/bin/python
                    
                    
                      from
                    
                     threading 
                    
                      import
                    
                    
                       Thread

                    
                    
                      import
                    
                    
                       time
 

                    
                    
                      def
                    
                    
                       my_counter():
    i 
                    
                    =
                    
                       0
    
                    
                    
                      for
                    
                     _ 
                    
                      in
                    
                     range(100000000
                    
                      ):
        i 
                    
                    = i + 1
    
                    
                      return
                    
                    
                       True
 

                    
                    
                      def
                    
                    
                       main():
    thread_array 
                    
                    =
                    
                       {}
    start_time 
                    
                    =
                    
                       time.time()
    
                    
                    
                      for
                    
                     tid 
                    
                      in
                    
                     range(2
                    
                      ):
        t 
                    
                    = Thread(target=
                    
                      my_counter)
        t.start()
        t.join()
    end_time 
                    
                    =
                    
                       time.time()
    
                    
                    
                      print
                    
                    (
                    
                      "
                    
                    
                      Total time: {}
                    
                    
                      "
                    
                    .format(end_time -
                    
                       start_time))
 

                    
                    
                      if
                    
                    
                      __name__
                    
                     == 
                    
                      '
                    
                    
                      __main__
                    
                    
                      '
                    
                    
                      :
    main()
                    
                  

同時(shí)執(zhí)行的兩個(gè)并發(fā)線程(multi_thread.py)

                    
                      #
                    
                    
                      ! /usr/bin/python
                    
                    
                      from
                    
                     threading 
                    
                      import
                    
                    
                       Thread

                    
                    
                      import
                    
                    
                       time
 

                    
                    
                      def
                    
                    
                       my_counter():
    i 
                    
                    =
                    
                       0
    
                    
                    
                      for
                    
                     _ 
                    
                      in
                    
                     range(100000000
                    
                      ):
        i 
                    
                    = i + 1
    
                    
                      return
                    
                    
                       True
 

                    
                    
                      def
                    
                    
                       main():
    thread_array 
                    
                    =
                    
                       {}
    start_time 
                    
                    =
                    
                       time.time()
    
                    
                    
                      for
                    
                     tid 
                    
                      in
                    
                     range(2
                    
                      ):
        t 
                    
                    = Thread(target=
                    
                      my_counter)
        t.start()
        thread_array[tid] 
                    
                    =
                    
                       t
    
                    
                    
                      for
                    
                     i 
                    
                      in
                    
                     range(2
                    
                      ):
        thread_array[i].join()
    end_time 
                    
                    =
                    
                       time.time()
    
                    
                    
                      print
                    
                    (
                    
                      "
                    
                    
                      Total time: {}
                    
                    
                      "
                    
                    .format(end_time -
                    
                       start_time))
 

                    
                    
                      if
                    
                    
                      __name__
                    
                     == 
                    
                      '
                    
                    
                      __main__
                    
                    
                      '
                    
                    
                      :
    main()
                    
                  

python在多線程的情況下居然比單線程整整慢了45%。按照之前的分析,即使是有GIL全局鎖的存在,串行化的多線程也應(yīng)該和單線程有一樣的效率才對(duì)。那么怎么會(huì)有這么糟糕的結(jié)果呢?

讓我們通過GIL的實(shí)現(xiàn)原理來分析這其中的原因。

當(dāng)前GIL設(shè)計(jì)的缺陷

基于pcode數(shù)量的調(diào)度方式

按照Python社區(qū)的想法,操作系統(tǒng)本身的線程調(diào)度已經(jīng)非常成熟穩(wěn)定了,沒有必要自己搞一套。所以Python的線程就是C語言的一個(gè)pthread,并通過操作系統(tǒng)調(diào)度算法進(jìn)行調(diào)度(例如linux是CFS)。為了讓各個(gè)線程能夠平均利用CPU時(shí)間,python會(huì)計(jì)算當(dāng)前已執(zhí)行的微代碼數(shù)量,達(dá)到一定閾值后就強(qiáng)制釋放GIL。而這時(shí)也會(huì)觸發(fā)一次操作系統(tǒng)的線程調(diào)度(當(dāng)然是否真正進(jìn)行上下文切換由操作系統(tǒng)自主決定)。

偽代碼

                    
                      while
                    
                    
                       True:
    acquire GIL
    
                    
                    
                      for
                    
                     i 
                    
                      in
                    
                     1000
                    
                      :
        do something
    release GIL
    
                    
                    /* Give Operating System a chance to do thread scheduling */
                  

這種模式在只有一個(gè)CPU核心的情況下毫無問題。任何一個(gè)線程被喚起時(shí)都能成功獲得到GIL(因?yàn)橹挥嗅尫帕薌IL才會(huì)引發(fā)線程調(diào)度)。但當(dāng)CPU有多個(gè)核心的時(shí)候,問題就來了。從偽代碼可以看到,從release GIL到acquire GIL之間幾乎是沒有間隙的。所以當(dāng)其他在其他核心上的線程被喚醒時(shí),大部分情況下主線程已經(jīng)又再一次獲取到GIL了。這個(gè)時(shí)候被喚醒執(zhí)行的線程只能白白的浪費(fèi)CPU時(shí)間,看著另一個(gè)線程拿著GIL歡快的執(zhí)行著。然后達(dá)到切換時(shí)間后進(jìn)入待調(diào)度狀態(tài),再被喚醒,再等待,以此往復(fù)惡性循環(huán)。

PS:當(dāng)然這種實(shí)現(xiàn)方式是原始而丑陋的,Python的每個(gè)版本中也在逐漸改進(jìn)GIL和線程調(diào)度之間的互動(dòng)關(guān)系。例如先嘗試持有GIL在做線程上下文切換,在IO等待時(shí)釋放GIL等嘗試。但是無法改變的是GIL的存在使得操作系統(tǒng)線程調(diào)度的這個(gè)本來就昂貴的操作變得更奢侈了。
關(guān)于GIL影響的擴(kuò)展閱讀

為了直觀的理解GIL對(duì)于多線程帶來的性能影響,這里直接借用的一張測(cè)試結(jié)果圖(見下圖)。圖中表示的是兩個(gè)線程在雙核CPU上得執(zhí)行情況。兩個(gè)線程均為CPU密集型運(yùn)算線程。綠色部分表示該線程在運(yùn)行,且在執(zhí)行有用的計(jì)算,紅色部分為線程被調(diào)度喚醒,但是無法獲取GIL導(dǎo)致無法進(jìn)行有效運(yùn)算等待的時(shí)間。

由圖可見,GIL的存在導(dǎo)致多線程無法很好的立即多核CPU的并發(fā)處理能力。

那么Python的IO密集型線程能否從多線程中受益呢?我們來看下面這張測(cè)試結(jié)果。顏色代表的含義和上圖一致。白色部分表示IO線程處于等待。可見,當(dāng)IO線程收到數(shù)據(jù)包引起終端切換后,仍然由于一個(gè)CPU密集型線程的存在,導(dǎo)致無法獲取GIL鎖,從而進(jìn)行無盡的循環(huán)等待。
Python多進(jìn)程與多線程編程及GIL詳解_第1張圖片

簡(jiǎn)單的總結(jié)下就是:Python的多線程在多核CPU上,只對(duì)于IO密集型計(jì)算產(chǎn)生正面效果;而當(dāng)有至少有一個(gè)CPU密集型線程存在,那么多線程效率會(huì)由于GIL而大幅下降。

如何避免受到GIL的影響

說了那么多,如果不說解決方案就僅僅是個(gè)科普帖,然并卵。GIL這么爛,有沒有辦法繞過呢?我們來看看有哪些現(xiàn)成的方案。

用multiprocess替代Thread

multiprocess庫(kù)的出現(xiàn)很大程度上是為了彌補(bǔ)thread庫(kù)因?yàn)镚IL而低效的缺陷。它完整的復(fù)制了一套thread所提供的接口方便遷移。唯一的不同就是它使用了多進(jìn)程而不是多線程。每個(gè)進(jìn)程有自己的獨(dú)立的GIL,因此也不會(huì)出現(xiàn)進(jìn)程之間的GIL爭(zhēng)搶。

當(dāng)然multiprocess也不是萬能良藥。它的引入會(huì)增加程序?qū)崿F(xiàn)時(shí)線程間數(shù)據(jù)通訊和同步的困難。就拿計(jì)數(shù)器來舉例子,如果我們要多個(gè)線程累加同一個(gè)變量,對(duì)于thread來說,申明一個(gè)global變量,用thread.Lock的context包裹住三行就搞定了。而multiprocess由于進(jìn)程之間無法看到對(duì)方的數(shù)據(jù),只能通過在主線程申明一個(gè)Queue,put再get或者用share memory的方法。這個(gè)額外的實(shí)現(xiàn)成本使得本來就非常痛苦的多線程程序編碼,變得更加痛苦了。

用其他解析器

之前也提到了既然GIL只是CPython的產(chǎn)物,那么其他解析器是不是更好呢?沒錯(cuò),像JPython和IronPython這樣的解析器由于實(shí)現(xiàn)語言的特性,他們不需要GIL的幫助。然而由于用了Java/C#用于解析器實(shí)現(xiàn),他們也失去了利用社區(qū)眾多C語言模塊有用特性的機(jī)會(huì)。所以這些解析器也因此一直都比較小眾。畢竟功能和性能大家在初期都會(huì)選擇前者,Done is better than perfect。

所以沒救了么?

當(dāng)然Python社區(qū)也在非常努力的不斷改進(jìn)GIL,甚至是嘗試去除GIL。并在各個(gè)小版本中有了不少的進(jìn)步。

另一個(gè)改進(jìn)Reworking the GIL
– 將切換顆粒度從基于opcode計(jì)數(shù)改成基于時(shí)間片計(jì)數(shù)
– 新增線程優(yōu)先級(jí)功能(高優(yōu)先級(jí)線程可以迫使其他線程釋放所持有的GIL鎖)
– 避免最近一次釋放GIL鎖的線程再次被立即調(diào)度

總結(jié)

Python GIL其實(shí)是功能和性能之間權(quán)衡后的產(chǎn)物,它尤其存在的合理性,也有較難改變的客觀因素。從本問的分析中,我們可以做以下一些簡(jiǎn)單的總結(jié):

      • 因?yàn)镚IL的存在,只有IO Bound場(chǎng)景下得多線程會(huì)得到較好的性能
      • 如果對(duì)并行計(jì)算性能較高的程序可以考慮把核心部分也成C模塊,或者索性用其他語言實(shí)現(xiàn)
      • GIL在較長(zhǎng)一段時(shí)間內(nèi)將會(huì)繼續(xù)存在,但是會(huì)不斷對(duì)其進(jìn)行改進(jìn)
參考資料:

一文看懂Python多進(jìn)程與多線程編程

python中的GIL詳解


更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號(hào)聯(lián)系: 360901061

您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長(zhǎng)非常感激您!手機(jī)微信長(zhǎng)按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對(duì)您有幫助就好】

您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對(duì)您有幫助,請(qǐng)用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長(zhǎng)會(huì)非常 感謝您的哦!!!

發(fā)表我的評(píng)論
最新評(píng)論 總共0條評(píng)論
主站蜘蛛池模板: 额敏县| 永仁县| 新密市| 肃南| 沾化县| 武安市| 余江县| 吴堡县| 崇文区| 磐安县| 台山市| 辉南县| 墨脱县| 汨罗市| 乌拉特后旗| 南丰县| 历史| 南陵县| 遂溪县| 瑞安市| 南江县| 滦南县| 崇仁县| 长顺县| 林周县| 三河市| 新沂市| 砀山县| 喜德县| 安多县| 德保县| 汉阴县| 花莲县| 庄河市| 彰化市| 凤翔县| 湄潭县| 安阳县| 阿拉善左旗| 贵州省| 四平市|