日韩久久久精品,亚洲精品久久久久久久久久久,亚洲欧美一区二区三区国产精品 ,一区二区福利

python實現多線程抓取知乎用戶

系統 1714 0

需要用到的包:

beautifulsoup4
html5lib
image
requests
redis
PyMySQL

pip安裝所有依賴包:

          
pip install \
Image \
requests \
beautifulsoup4 \
html5lib \
redis \
PyMySQL
        

運行環境需要支持中文

測試運行環境python3.5,不保證其他運行環境能完美運行

需要安裝mysql和redis

配置 config.ini 文件,設置好mysql和redis,并且填寫你的知乎帳號

向數據庫導入 init.sql

Run

開始抓取數據: python get_user.py
查看抓取數量: python check_redis.py

效果

python實現多線程抓取知乎用戶_第1張圖片 python實現多線程抓取知乎用戶_第2張圖片

總體思路

1.首先是模擬登陸知乎,利用保存登陸的cookie信息
2.抓取知乎頁面的html代碼,留待下一步繼續進行分析提取信息
3.分析提取頁面中用戶的個性化url,放入redis(這里特別說明一下redis的思路用法,將提取到的用戶的個性化url放入redis的一個名為already_get_user的hash table,表示已抓取的用戶,對于已抓取過的用戶判斷是否存在于already_get_user以去除重復抓取,同時將個性化url放入user_queue的隊列中,需要抓取新用戶時pop隊列獲取新的用戶)
4.獲取用戶的關注列表和粉絲列表,繼續插入到redis
5.從redis的user_queue隊列中獲取新用戶繼續重復步驟3

模擬登陸知乎

首先是登陸,登陸功能作為一個包封裝了在login里面,方便整合調用

header部分,這里Connection最好設為close,不然可能會碰到max retireve exceed的錯誤
原因在于普通的連接是keep-alive的但是卻又沒有關閉

            
# http請求的header
headers = {
  "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.143 Safari/537.36",
  "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
  "Host": "www.zhihu.com",
  "Referer": "https://www.zhihu.com/",
  "Origin": "https://www.zhihu.com/",
  "Upgrade-Insecure-Requests": "1",
  "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
  "Pragma": "no-cache",
  "Accept-Encoding": "gzip, deflate, br",
  'Connection': 'close'
}

# 驗證是否登陸
def check_login(self):
  check_url = 'https://www.zhihu.com/settings/profile'
  try:
    login_check = self.__session.get(check_url, headers=self.headers, timeout=35)
  except Exception as err:
    print(traceback.print_exc())
    print(err)
    print("驗證登陸失敗,請檢查網絡")
    sys.exit()
  print("驗證登陸的http status code為:" + str(login_check.status_code))
  if int(login_check.status_code) == 200:
    return True
  else:
    return False


          

進入首頁查看http狀態碼來驗證是否登陸,200為已經登陸,一般304就是被重定向所以就是沒有登陸

            
# 獲取驗證碼
def get_captcha(self):
  t = str(time.time() * 1000)
  captcha_url = 'http://www.zhihu.com/captcha.gif?r=' + t + "&type=login"
  r = self.__session.get(captcha_url, headers=self.headers, timeout=35)
  with open('captcha.jpg', 'wb') as f:
    f.write(r.content)
    f.close()
    # 用pillow 的 Image 顯示驗證碼
    # 如果沒有安裝 pillow 到源代碼所在的目錄去找到驗證碼然后手動輸入
  '''try:
    im = Image.open('captcha.jpg')
    im.show()
    im.close()
  except:'''
  print(u'請到 %s 目錄找到captcha.jpg 手動輸入' % os.path.abspath('captcha.jpg'))
  captcha = input("請輸入驗證碼\n>")
  return captcha

          

獲取驗證碼的方法。當登錄次數太多有可能會要求輸入驗證碼,這里實現這個功能

            
# 獲取xsrf
def get_xsrf(self):
  index_url = 'http://www.zhihu.com'
  # 獲取登錄時需要用到的_xsrf
  try:
    index_page = self.__session.get(index_url, headers=self.headers, timeout=35)
  except:
    print('獲取知乎頁面失敗,請檢查網絡連接')
    sys.exit()
  html = index_page.text
  # 這里的_xsrf 返回的是一個list
  BS = BeautifulSoup(html, 'html.parser')
  xsrf_input = BS.find(attrs={'name': '_xsrf'})
  pattern = r'value=\"(.*?)\"'
  print(xsrf_input)
  self.__xsrf = re.findall(pattern, str(xsrf_input))
  return self.__xsrf[0]

          

獲取xsrf,為什么要獲取xsrf呢,因為xsrf是一種防止跨站攻擊的手段,具體介紹可以看這里csrf
在獲取到xsrf之后把xsrf存入cookie當中,并且在調用api的時候帶上xsrf作為頭部,不然的話知乎會返回403

            
# 進行模擬登陸
def do_login(self):
  try:
    # 模擬登陸
    if self.check_login():
      print('您已經登錄')
      return
    else:
      if self.config.get("zhihu_account", "username") and self.config.get("zhihu_account", "password"):
        self.username = self.config.get("zhihu_account", "username")
        self.password = self.config.get("zhihu_account", "password")
      else:
        self.username = input('請輸入你的用戶名\n> ')
        self.password = input("請輸入你的密碼\n> ")
  except Exception as err:
    print(traceback.print_exc())
    print(err)
    sys.exit()
  if re.match(r"^1\d{10}$", self.username):
    print("手機登陸\n")
    post_url = 'http://www.zhihu.com/login/phone_num'
    postdata = {
      '_xsrf': self.get_xsrf(),
      'password': self.password,
      'remember_me': 'true',
      'phone_num': self.username,
    }
  else:
    print("郵箱登陸\n")
    post_url = 'http://www.zhihu.com/login/email'
    postdata = {
      '_xsrf': self.get_xsrf(),
      'password': self.password,
      'remember_me': 'true',
      'email': self.username,
    }
  try:
    login_page = self.__session.post(post_url, postdata, headers=self.headers, timeout=35)
    login_text = json.loads(login_page.text.encode('latin-1').decode('unicode-escape'))
    print(postdata)
    print(login_text)
    # 需要輸入驗證碼 r = 0為登陸成功代碼
    if login_text['r'] == 1:
      sys.exit()
  except:
    postdata['captcha'] = self.get_captcha()
    login_page = self.__session.post(post_url, postdata, headers=self.headers, timeout=35)
    print(json.loads(login_page.text.encode('latin-1').decode('unicode-escape')))
  # 保存登陸cookie
  self.__session.cookies.save()

          

這個就是核心的登陸功能啦,非常關鍵的就是用到了requests庫,非常方便的保存到session
我們這里全局都是用單例模式,統一使用同一個requests.session對象進行訪問功能,保持登錄狀態的一致性

最后主要調用登陸的代碼為

            
# 創建login對象
lo = login.login.Login(self.session)
# 模擬登陸
if lo.check_login():
  print('您已經登錄')
else:
  if self.config.get("zhihu_account", "username") and self.config.get("zhihu_account", "username"):
    username = self.config.get("zhihu_account", "username")
    password = self.config.get("zhihu_account", "password")
  else:
    username = input('請輸入你的用戶名\n> ')
    password = input("請輸入你的密碼\n> ")
  lo.do_login(username, password)

          

知乎模擬登陸到此就完成啦

知乎用戶抓取

            
def __init__(self, threadID=1, name=''):
  # 多線程
  print("線程" + str(threadID) + "初始化")
  threading.Thread.__init__(self)
  self.threadID = threadID
  self.name = name
  try:
    print("線程" + str(threadID) + "初始化成功")
  except Exception as err:
    print(err)
    print("線程" + str(threadID) + "開啟失敗")
  self.threadLock = threading.Lock()
  # 獲取配置
  self.config = configparser.ConfigParser()
  self.config.read("config.ini")
  # 初始化session
  requests.adapters.DEFAULT_RETRIES = 5
  self.session = requests.Session()
  self.session.cookies = cookielib.LWPCookieJar(filename='cookie')
  self.session.keep_alive = False
  try:
    self.session.cookies.load(ignore_discard=True)
  except:
    print('Cookie 未能加載')
  finally:
    pass
  # 創建login對象
  lo = Login(self.session)
  lo.do_login()
  # 初始化redis連接
  try:
    redis_host = self.config.get("redis", "host")
    redis_port = self.config.get("redis", "port")
    self.redis_con = redis.Redis(host=redis_host, port=redis_port, db=0)
    # 刷新redis庫
    # self.redis_con.flushdb()
  except:
    print("請安裝redis或檢查redis連接配置")
    sys.exit()
  # 初始化數據庫連接
  try:
    db_host = self.config.get("db", "host")
    db_port = int(self.config.get("db", "port"))
    db_user = self.config.get("db", "user")
    db_pass = self.config.get("db", "password")
    db_db = self.config.get("db", "db")
    db_charset = self.config.get("db", "charset")
    self.db = pymysql.connect(host=db_host, port=db_port, user=db_user, passwd=db_pass, db=db_db,
                 charset=db_charset)
    self.db_cursor = self.db.cursor()
  except:
    print("請檢查數據庫配置")
    sys.exit()
  # 初始化系統設置
  self.max_queue_len = int(self.config.get("sys", "max_queue_len"))

          

這個是get_user.py的構造函數,主要功能就是初始化mysql連接、redis連接、驗證登陸、生成全局的session對象、導入系統配置、開啟多線程。

            
# 獲取首頁html
def get_index_page(self):
  index_url = 'https://www.zhihu.com/'
  try:
    index_html = self.session.get(index_url, headers=self.headers, timeout=35)
  except Exception as err:
    # 出現異常重試
    print("獲取頁面失敗,正在重試......")
    print(err)
    traceback.print_exc()
    return None
  finally:
    pass
  return index_html.text
# 獲取單個用戶詳情頁面
def get_user_page(self, name_url):
  user_page_url = 'https://www.zhihu.com' + str(name_url) + '/about'
  try:
    index_html = self.session.get(user_page_url, headers=self.headers, timeout=35)
  except Exception as err:
    # 出現異常重試
    print("失敗name_url:" + str(name_url) + "獲取頁面失敗,放棄該用戶")
    print(err)
    traceback.print_exc()
    return None
  finally:
    pass
  return index_html.text
# 獲取粉絲頁面
def get_follower_page(self, name_url):
  user_page_url = 'https://www.zhihu.com' + str(name_url) + '/followers'
  try:
    index_html = self.session.get(user_page_url, headers=self.headers, timeout=35)
  except Exception as err:
    # 出現異常重試
    print("失敗name_url:" + str(name_url) + "獲取頁面失敗,放棄該用戶")
    print(err)
    traceback.print_exc()
    return None
  finally:
    pass
  return index_html.text
def get_following_page(self, name_url):
  user_page_url = 'https://www.zhihu.com' + str(name_url) + '/followers'
  try:
    index_html = self.session.get(user_page_url, headers=self.headers, timeout=35)
  except Exception as err:
    # 出現異常重試
    print("失敗name_url:" + str(name_url) + "獲取頁面失敗,放棄該用戶")
    print(err)
    traceback.print_exc()
    return None
  finally:
    pass
  return index_html.text
# 獲取首頁上的用戶列表,存入redis
def get_index_page_user(self):
  index_html = self.get_index_page()
  if not index_html:
    return
  BS = BeautifulSoup(index_html, "html.parser")
  self.get_xsrf(index_html)
  user_a = BS.find_all("a", class_="author-link") # 獲取用戶的a標簽
  for a in user_a:
    if a:
      self.add_wait_user(a.get('href'))
    else:
      continue

          

這一部分的代碼就是用于抓取各個頁面的html代碼

            
# 加入帶抓取用戶隊列,先用redis判斷是否已被抓取過
def add_wait_user(self, name_url):
  # 判斷是否已抓取
  self.threadLock.acquire()
  if not self.redis_con.hexists('already_get_user', name_url):
    self.counter += 1
    print(name_url + " 加入隊列")
    self.redis_con.hset('already_get_user', name_url, 1)
    self.redis_con.lpush('user_queue', name_url)
    print("添加用戶 " + name_url + "到隊列")
  self.threadLock.release()
# 獲取頁面出錯移出redis
def del_already_user(self, name_url):
  self.threadLock.acquire()
  if not self.redis_con.hexists('already_get_user', name_url):
    self.counter -= 1
    self.redis_con.hdel('already_get_user', name_url)
  self.threadLock.release()

          

用戶加入redis的操作,在數據庫插入出錯時我們調用del_already_user刪除插入出錯的用戶

            
# 分析粉絲頁面獲取用戶的所有粉絲用戶
# @param follower_page get_follower_page()中獲取到的頁面,這里獲取用戶hash_id請求粉絲接口獲取粉絲信息
def get_all_follower(self, name_url):
  follower_page = self.get_follower_page(name_url)
  # 判斷是否獲取到頁面
  if not follower_page:
    return
  BS = BeautifulSoup(follower_page, 'html.parser')
  # 獲取關注者數量
  follower_num = int(BS.find('span', text='關注者').find_parent().find('strong').get_text())
  # 獲取用戶的hash_id
  hash_id = \
    json.loads(BS.select("#zh-profile-follows-list")[0].select(".zh-general-list")[0].get('data-init'))[
      'params'][
      'hash_id']
  # 獲取關注者列表
  self.get_xsrf(follower_page) # 獲取xsrf
  post_url = 'https://www.zhihu.com/node/ProfileFollowersListV2'
  # 開始獲取所有的關注者 math.ceil(follower_num/20)*20
  for i in range(0, math.ceil(follower_num / 20) * 20, 20):
    post_data = {
      'method': 'next',
      'params': json.dumps({"offset": i, "order_by": "created", "hash_id": hash_id})
    }
    try:
      j = self.session.post(post_url, params=post_data, headers=self.headers, timeout=35).text.encode(
        'latin-1').decode(
        'unicode-escape')
      pattern = re.compile(r"class=\"zm-item-link-avatar\"[^\"]*\"([^\"]*)", re.DOTALL)
      j = pattern.findall(j)
      for user in j:
        user = user.replace('\\', '')
        self.add_wait_user(user) # 保存到redis
    except Exception as err:
      print("獲取正在關注失敗")
      print(err)
      traceback.print_exc()
      pass
# 獲取正在關注列表
def get_all_following(self, name_url):
  following_page = self.get_following_page(name_url)
  # 判斷是否獲取到頁面
  if not following_page:
    return
  BS = BeautifulSoup(following_page, 'html.parser')
  # 獲取關注者數量
  following_num = int(BS.find('span', text='關注了').find_parent().find('strong').get_text())
  # 獲取用戶的hash_id
  hash_id = \
    json.loads(BS.select("#zh-profile-follows-list")[0].select(".zh-general-list")[0].get('data-init'))[
      'params'][
      'hash_id']
  # 獲取關注者列表
  self.get_xsrf(following_page) # 獲取xsrf
  post_url = 'https://www.zhihu.com/node/ProfileFolloweesListV2'
  # 開始獲取所有的關注者 math.ceil(follower_num/20)*20
  for i in range(0, math.ceil(following_num / 20) * 20, 20):
    post_data = {
      'method': 'next',
      'params': json.dumps({"offset": i, "order_by": "created", "hash_id": hash_id})
    }
    try:
      j = self.session.post(post_url, params=post_data, headers=self.headers, timeout=35).text.encode(
        'latin-1').decode(
        'unicode-escape')
      pattern = re.compile(r"class=\"zm-item-link-avatar\"[^\"]*\"([^\"]*)", re.DOTALL)
      j = pattern.findall(j)
      for user in j:
        user = user.replace('\\', '')
        self.add_wait_user(user) # 保存到redis
    except Exception as err:
      print("獲取正在關注失敗")
      print(err)
      traceback.print_exc()
      pass

          

調用知乎的API,獲取所有的關注用戶列表和粉絲用戶列表,遞歸獲取用戶
這里需要注意的是頭部要記得帶上xsrf不然會拋出403

            
# 分析about頁面,獲取用戶詳細資料
def get_user_info(self, name_url):
  about_page = self.get_user_page(name_url)
  # 判斷是否獲取到頁面
  if not about_page:
    print("獲取用戶詳情頁面失敗,跳過,name_url:" + name_url)
    return
  self.get_xsrf(about_page)
  BS = BeautifulSoup(about_page, 'html.parser')
  # 獲取頁面的具體數據
  try:
    nickname = BS.find("a", class_="name").get_text() if BS.find("a", class_="name") else ''
    user_type = name_url[1:name_url.index('/', 1)]
    self_domain = name_url[name_url.index('/', 1) + 1:]
    gender = 2 if BS.find("i", class_="icon icon-profile-female") else (1 if BS.find("i", class_="icon icon-profile-male") else 3)
    follower_num = int(BS.find('span', text='關注者').find_parent().find('strong').get_text())
    following_num = int(BS.find('span', text='關注了').find_parent().find('strong').get_text())
    agree_num = int(re.findall(r'
            
              (.*)
            
            .*贊同', about_page)[0])
    appreciate_num = int(re.findall(r'
            
              (.*)
            
            .*感謝', about_page)[0])
    star_num = int(re.findall(r'
            
              (.*)
            
            .*收藏', about_page)[0])
    share_num = int(re.findall(r'
            
              (.*)
            
            .*分享', about_page)[0])
    browse_num = int(BS.find_all("span", class_="zg-gray-normal")[2].find("strong").get_text())
    trade = BS.find("span", class_="business item").get('title') if BS.find("span",
                                       class_="business item") else ''
    company = BS.find("span", class_="employment item").get('title') if BS.find("span",
                                         class_="employment item") else ''
    school = BS.find("span", class_="education item").get('title') if BS.find("span",
                                        class_="education item") else ''
    major = BS.find("span", class_="education-extra item").get('title') if BS.find("span",
                                           class_="education-extra item") else ''
    job = BS.find("span", class_="position item").get_text() if BS.find("span",
                                      class_="position item") else ''
    location = BS.find("span", class_="location item").get('title') if BS.find("span",
                                         class_="location item") else ''
    description = BS.find("div", class_="bio ellipsis").get('title') if BS.find("div",
                                          class_="bio ellipsis") else ''
    ask_num = int(BS.find_all("a", class_='item')[1].find("span").get_text()) if \
      BS.find_all("a", class_='item')[
        1] else int(0)
    answer_num = int(BS.find_all("a", class_='item')[2].find("span").get_text()) if \
      BS.find_all("a", class_='item')[
        2] else int(0)
    article_num = int(BS.find_all("a", class_='item')[3].find("span").get_text()) if \
      BS.find_all("a", class_='item')[3] else int(0)
    collect_num = int(BS.find_all("a", class_='item')[4].find("span").get_text()) if \
      BS.find_all("a", class_='item')[4] else int(0)
    public_edit_num = int(BS.find_all("a", class_='item')[5].find("span").get_text()) if \
      BS.find_all("a", class_='item')[5] else int(0)
    replace_data = \
      (pymysql.escape_string(name_url), nickname, self_domain, user_type,
       gender, follower_num, following_num, agree_num, appreciate_num, star_num, share_num, browse_num,
       trade, company, school, major, job, location, pymysql.escape_string(description),
       ask_num, answer_num, article_num, collect_num, public_edit_num)
    replace_sql = '''REPLACE INTO
           user(url,nickname,self_domain,user_type,
           gender, follower,following,agree_num,appreciate_num,star_num,share_num,browse_num,
           trade,company,school,major,job,location,description,
           ask_num,answer_num,article_num,collect_num,public_edit_num)
           VALUES(%s,%s,%s,%s,
           %s,%s,%s,%s,%s,%s,%s,%s,
           %s,%s,%s,%s,%s,%s,%s,
           %s,%s,%s,%s,%s)'''
    try:
      print("獲取到數據:")
      print(replace_data)
      self.db_cursor.execute(replace_sql, replace_data)
      self.db.commit()
    except Exception as err:
      print("插入數據庫出錯")
      print("獲取到數據:")
      print(replace_data)
      print("插入語句:" + self.db_cursor._last_executed)
      self.db.rollback()
      print(err)
      traceback.print_exc()
  except Exception as err:
    print("獲取數據出錯,跳過用戶")
    self.redis_con.hdel("already_get_user", name_url)
    self.del_already_user(name_url)
    print(err)
    traceback.print_exc()
    pass

          

最后,到用戶的about頁面,分析頁面元素,利用正則或者beatifulsoup分析抓取頁面的數據
這里我們SQL語句用REPLACE INTO而不用INSERT INTO,這樣可以很好的防止數據重復問題

            
# 開始抓取用戶,程序總入口
def entrance(self):
  while 1:
    if int(self.redis_con.llen("user_queue")) < 1:
      self.get_index_page_user()
    else:
      # 出隊列獲取用戶name_url redis取出的是byte,要decode成utf-8
      name_url = str(self.redis_con.rpop("user_queue").decode('utf-8'))
      print("正在處理name_url:" + name_url)
      self.get_user_info(name_url)
      if int(self.redis_con.llen("user_queue")) <= int(self.max_queue_len):
        self.get_all_follower(name_url)
        self.get_all_following(name_url)
    self.session.cookies.save()
def run(self):
  print(self.name + " is running")
  self.entrance()

          

最后,入口

            
if __name__ == '__main__':
  login = GetUser(999, "登陸線程")
  threads = []
  for i in range(0, 4):
    m = GetUser(i, "thread" + str(i))
    threads.append(m)
  for i in range(0, 4):
    threads[i].start()
  for i in range(0, 4):
    threads[i].join()

          

這里就是多線程的開啟,需要開啟多少個線程就把4換成多少就可以了

Docker

嫌麻煩的可以參考一下我用docker簡單的搭建一個基礎環境:

mysql和redis都是官方鏡像

          
docker run --name mysql -itd mysql:latest
docker run --name redis -itd mysql:latest
        

再利用docker-compose運行python鏡像,我的python的docker-compose.yml:

          
python:
 container_name: python
 build: .
 ports:
  - "84:80"
 external_links:
  - memcache:memcache
  - mysql:mysql
  - redis:redis
 volumes:
  - /docker_containers/python/www:/var/www/html
 tty: true
 stdin_open: true
 extra_hosts:
  - "python:192.168.102.140"
 environment:
  PYTHONIOENCODING: utf-8
        

最后附上源代碼: GITHUB https://github.com/kong36088/ZhihuSpider

本站下載地址: http://xiazai.jb51.net/201612/yuanma/ZhihuSpider(jb51.net).zip


更多文章、技術交流、商務合作、聯系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦!!!

發表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 涞水县| 长垣县| 平邑县| 盐亭县| 枝江市| 大埔县| 兴宁市| 重庆市| 个旧市| 深圳市| 江都市| 盖州市| 广汉市| 靖宇县| 离岛区| 南充市| 华池县| 万宁市| 白玉县| 正定县| 满洲里市| 射洪县| 安康市| 绥芬河市| 威远县| 鹤山市| 吉水县| 京山县| 长乐市| 浦东新区| 江山市| 吴桥县| 宁蒗| 喀喇| 循化| 常山县| 唐海县| 龙口市| 镇赉县| 盐亭县| 布尔津县|