通常在使用Spark算子函數,比如使用map()或者reduce函數我們向函數傳入條件時,函數內部可以使用驅動程序中定義的變量,但是這樣會使集群中所有任務都會得到變量新的副本,這些副本的更新不會傳播回驅動程序,導致讀寫共享變量效率低下或者內存溢出,為了解決這個問題Spark提供了兩種共享變量類型:廣播變量和累加器
- 廣播變量:用來高效分發較大對象,只能在Driver定義,不能在Executor端定義,同時RDD不存儲數據所以不能廣播出去
- 累加器:用來對信息進行聚合,常用場景reduce()
不使用廣播變量,直接定義一個變量list,然后在filter()來判斷元素是否存在list中,實現代碼如下:
from pyspark import SparkContext
if __name__ == '__main__':
sc = SparkContext(appName="broadcast", master="local[*]")
list = [2, 4, 6, 8]
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)
print(rdd.filter(lambda x: list.__contains__(x)).collect())
# [2, 4]
sc.stop()
定義的list變量在驅動端Driver創建的,但是要到Executor端運行,Driver端會把list以task形式拷貝到Executor端,如果有很多task那么就會有很多list復制過去,這個list非常大的時候就會造成內存溢出,關系圖如下所示:
使用廣播變量,變量只會被發送到各節點一次,同時存放在Executor的BlockManager中,實現代碼如下:
from pyspark import SparkContext
if __name__ == '__main__':
sc = SparkContext(appName="broadcast", master="local[*]")
list = [2, 4, 6, 8]
bclist = sc.broadcast(list)
data = [1, 2, 3]
rdd = sc.parallelize(data)
print(rdd.map(lambda x: bclist.value[x]).collect())
# [4, 6, 8]
sc.stop()
關系圖如下:
累加器,對作業執行過程事件進行計數,實現代碼如下:
from pyspark import SparkContext
if __name__ == '__main__':
sc = SparkContext(appName="broadcast", master="local[*]")
data = [1, 2, 3]
rdd = sc.parallelize(data)
accumulator = sc.accumulator(0)
rdd.map(lambda x: accumulator.add(1)).collect()
print(accumulator.value)
# 3
sc.stop()
關系圖如下:
累加器在Driver端定義賦初始值,在Executor端更新,最后在Driver端讀取最后的值。
?
Spark學習目錄:
- Spark學習實例1(Python):單詞統計 Word Count
- Spark學習實例2(Python):加載數據源Load Data Source
- Spark學習實例3(Python):保存數據Save Data
- Spark學習實例4(Python):RDD轉換 Transformations
- Spark學習實例5(Python):RDD執行 Actions
- Spark學習實例6(Python):共享變量Shared Variables
- Spark學習實例7(Python):RDD、DataFrame、DataSet相互轉換
- Spark學習實例8(Python):輸入源實時處理 Input Sources Streaming
- Spark學習實例9(Python):窗口操作 Window Operations
?
?
?
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
