日韩久久久精品,亚洲精品久久久久久久久久久,亚洲欧美一区二区三区国产精品 ,一区二区福利

Spark學習實例(Python):共享變量Shared Variables

系統 1764 0

通常在使用Spark算子函數,比如使用map()或者reduce函數我們向函數傳入條件時,函數內部可以使用驅動程序中定義的變量,但是這樣會使集群中所有任務都會得到變量新的副本,這些副本的更新不會傳播回驅動程序,導致讀寫共享變量效率低下或者內存溢出,為了解決這個問題Spark提供了兩種共享變量類型:廣播變量和累加器

  • 廣播變量:用來高效分發較大對象,只能在Driver定義,不能在Executor端定義,同時RDD不存儲數據所以不能廣播出去
  • 累加器:用來對信息進行聚合,常用場景reduce()

不使用廣播變量,直接定義一個變量list,然后在filter()來判斷元素是否存在list中,實現代碼如下:

            
              from pyspark import SparkContext

if __name__ == '__main__':
    sc = SparkContext(appName="broadcast", master="local[*]")
    list = [2, 4, 6, 8]
    data = [1, 2, 3, 4, 5]
    rdd = sc.parallelize(data)
    print(rdd.filter(lambda x: list.__contains__(x)).collect())
    # [2, 4]
    sc.stop()
            
          

定義的list變量在驅動端Driver創建的,但是要到Executor端運行,Driver端會把list以task形式拷貝到Executor端,如果有很多task那么就會有很多list復制過去,這個list非常大的時候就會造成內存溢出,關系圖如下所示:

Spark學習實例(Python):共享變量Shared Variables_第1張圖片

使用廣播變量,變量只會被發送到各節點一次,同時存放在Executor的BlockManager中,實現代碼如下:

            
              from pyspark import SparkContext

if __name__ == '__main__':
    sc = SparkContext(appName="broadcast", master="local[*]")
    list = [2, 4, 6, 8]
    bclist = sc.broadcast(list)
    data = [1, 2, 3]
    rdd = sc.parallelize(data)
    print(rdd.map(lambda x: bclist.value[x]).collect())
    # [4, 6, 8]
    sc.stop()
            
          

關系圖如下:

Spark學習實例(Python):共享變量Shared Variables_第2張圖片

累加器,對作業執行過程事件進行計數,實現代碼如下:

            
              from pyspark import SparkContext

if __name__ == '__main__':
    sc = SparkContext(appName="broadcast", master="local[*]")
    data = [1, 2, 3]
    rdd = sc.parallelize(data)
    accumulator = sc.accumulator(0)
    rdd.map(lambda x: accumulator.add(1)).collect()
    print(accumulator.value)
    # 3
    sc.stop()
            
          

關系圖如下:

Spark學習實例(Python):共享變量Shared Variables_第3張圖片

累加器在Driver端定義賦初始值,在Executor端更新,最后在Driver端讀取最后的值。

?

Spark學習目錄:

  • Spark學習實例1(Python):單詞統計 Word Count
  • Spark學習實例2(Python):加載數據源Load Data Source
  • Spark學習實例3(Python):保存數據Save Data
  • Spark學習實例4(Python):RDD轉換 Transformations
  • Spark學習實例5(Python):RDD執行 Actions
  • Spark學習實例6(Python):共享變量Shared Variables
  • Spark學習實例7(Python):RDD、DataFrame、DataSet相互轉換
  • Spark學習實例8(Python):輸入源實時處理 Input Sources Streaming
  • Spark學習實例9(Python):窗口操作 Window Operations

?

?

?


更多文章、技術交流、商務合作、聯系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦!!!

發表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 江孜县| 汉寿县| 尉氏县| 湄潭县| 湛江市| 石城县| 芜湖县| 盖州市| 北宁市| 邹平县| 宜黄县| 台安县| 周宁县| 太白县| 略阳县| 治多县| 松滋市| 固原市| 阳谷县| 吉安市| 衡水市| 水城县| 门源| 临江市| 堆龙德庆县| 伊川县| 凌云县| 田东县| 房产| 铜川市| 尉犁县| 望城县| 甘德县| 平和县| 托克逊县| 本溪| 绥芬河市| 宁河县| 攀枝花市| 雷波县| 平昌县|