日韩久久久精品,亚洲精品久久久久久久久久久,亚洲欧美一区二区三区国产精品 ,一区二区福利

python操作kafka實踐的示例代碼

系統 1828 0

1、先看最簡單的場景,生產者生產消息,消費者接收消息,下面是生產者的簡單代碼。

            
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='xxxx:x')

msg_dict = {
  "sleep_time": 10,
  "db_config": {
    "database": "test_1",
    "host": "xxxx",
    "user": "root",
    "password": "root"
  },
  "table": "msg",
  "msg": "Hello World"
}
msg = json.dumps(msg_dict)
producer.send('test_rhj', msg, partition=0)
producer.close()

          

下面是消費者的簡單代碼:

            
from kafka import KafkaConsumer

consumer = KafkaConsumer('test_rhj', bootstrap_servers=['xxxx:x'])
for msg in consumer:
  recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
  print recv

          

下面是結果:

2、如果想要完成負載均衡,就需要知道kafka的分區機制,同一個主題,可以為其分區,在生產者不指定分區的情況,kafka會將多個消息分發到不同的分區,消費者訂閱時候如果不指定服務組,會收到所有分區的消息,如果指定了服務組,則同一服務組的消費者會消費不同的分區,如果2個分區兩個消費者的消費者組消費,則,每個消費者消費一個分區,如果有三個消費者的服務組,則會出現一個消費者消費不到數據;如果想要消費同一分區,則需要用不同的服務組。以此為原理,我們對消費者做如下修改:

            
from kafka import KafkaConsumer

consumer = KafkaConsumer('test_rhj', bootstrap_servers=['xxxx:x'])
for msg in consumer:
  recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
  print recv

          

然后我們開兩個消費者進行消費,生產者分別往0分區和1分區發消息結果如下,可以看到,一個消費者只能消費0分區,另一個只能消費1分區:


3、kafka提供了偏移量的概念,允許消費者根據偏移量消費之前遺漏的內容,這基于kafka名義上的全量存儲,可以保留大量的歷史數據,歷史保存時間是可配置的,一般是7天,如果偏移量定位到了已刪除的位置那也會有問題,但是這種情況可能很小;每個保存的數據文件都是以偏移量命名的,當前要查的偏移量減去文件名就是數據在該文件的相對位置。要指定偏移量消費數據,需要指定該消費者要消費的分區,否則代碼會找不到分區而無法消費,代碼如下:

            
from kafka import KafkaConsumer
from kafka.structs import TopicPartition

consumer = KafkaConsumer(group_id='123456', bootstrap_servers=['10.43.35.25:4531'])
consumer.assign([TopicPartition(topic='test_rhj', partition=0), TopicPartition(topic='test_rhj', partition=1)])
print consumer.partitions_for_topic("test_rhj") # 獲取test主題的分區信息
print consumer.assignment()
print consumer.beginning_offsets(consumer.assignment())
consumer.seek(TopicPartition(topic='test_rhj', partition=0), 0)
for msg in consumer:
  recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
  print recv

          

因為指定的便宜量為0,所以從一開始插入的數據都可以查到,而且因為指定了分區,指定的分區結果都可以消費,結果如下:

python操作kafka實踐的示例代碼_第1張圖片

4、有時候,我們并不需要實時獲取數據,因為這樣可能會造成性能瓶頸,我們只需要定時去獲取隊列里的數據然后批量處理就可以,這種情況,我們可以選擇主動拉取數據

            
from kafka import KafkaConsumer
import time

consumer = KafkaConsumer(group_id='123456', bootstrap_servers=['10.43.35.25:4531'])
consumer.subscribe(topics=('test_rhj',))
index = 0
while True:
  msg = consumer.poll(timeout_ms=5) # 從kafka獲取消息
  print msg
  time.sleep(2)
  index += 1
  print '--------poll index is %s----------' % index
          

結果如下,可以看到,每次拉取到的都是前面生產的數據,可能是多條的列表,也可能沒有數據,如果沒有數據,則拉取到的為空:

python操作kafka實踐的示例代碼_第2張圖片

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。


更多文章、技術交流、商務合作、聯系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦!!!

發表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 平舆县| 崇阳县| 工布江达县| 黔西县| 吴桥县| 郎溪县| 邢台市| 禹州市| 大邑县| 鞍山市| 万安县| 盐亭县| 邓州市| 乐陵市| 阳山县| 鲁山县| 宝兴县| 阿合奇县| 新泰市| 崇信县| 德惠市| 安丘市| 博乐市| 五家渠市| 泰和县| 庐江县| 榕江县| 金塔县| 江口县| 车致| 安康市| 永寿县| 财经| 秦皇岛市| 平顺县| 武功县| 宁津县| 台南市| 舟曲县| 长海县| 额敏县|