• <fieldset id="8imwq"><menu id="8imwq"></menu></fieldset>
  • <bdo id="8imwq"><input id="8imwq"></input></bdo>
    最新文章專題視頻專題問答1問答10問答100問答1000問答2000關(guān)鍵字專題1關(guān)鍵字專題50關(guān)鍵字專題500關(guān)鍵字專題1500TAG最新視頻文章推薦1 推薦3 推薦5 推薦7 推薦9 推薦11 推薦13 推薦15 推薦17 推薦19 推薦21 推薦23 推薦25 推薦27 推薦29 推薦31 推薦33 推薦35 推薦37視頻文章20視頻文章30視頻文章40視頻文章50視頻文章60 視頻文章70視頻文章80視頻文章90視頻文章100視頻文章120視頻文章140 視頻2關(guān)鍵字專題關(guān)鍵字專題tag2tag3文章專題文章專題2文章索引1文章索引2文章索引3文章索引4文章索引5123456789101112131415文章專題3
    問答文章1 問答文章501 問答文章1001 問答文章1501 問答文章2001 問答文章2501 問答文章3001 問答文章3501 問答文章4001 問答文章4501 問答文章5001 問答文章5501 問答文章6001 問答文章6501 問答文章7001 問答文章7501 問答文章8001 問答文章8501 問答文章9001 問答文章9501
    當(dāng)前位置: 首頁 - 科技 - 知識百科 - 正文

    RabbitMQ快速入門python教程

    來源:懂視網(wǎng) 責(zé)編:小采 時間:2020-11-27 14:25:50
    文檔

    RabbitMQ快速入門python教程

    RabbitMQ快速入門python教程:HelloWorld簡介RabbitMQ:接受消息再傳遞消息,可以視為一個郵局。發(fā)送者和接受者通過隊(duì)列來進(jìn)行交互,隊(duì)列的大小可以視為無限的,多個發(fā)送者可以發(fā)生給一個隊(duì)列,多個接收者也可以從一個隊(duì)列中接受消息。coderabbitmq使用的協(xié)議是amqp,用于pyth
    推薦度:
    導(dǎo)讀RabbitMQ快速入門python教程:HelloWorld簡介RabbitMQ:接受消息再傳遞消息,可以視為一個郵局。發(fā)送者和接受者通過隊(duì)列來進(jìn)行交互,隊(duì)列的大小可以視為無限的,多個發(fā)送者可以發(fā)生給一個隊(duì)列,多個接收者也可以從一個隊(duì)列中接受消息。coderabbitmq使用的協(xié)議是amqp,用于pyth

    HelloWorld

    簡介

    RabbitMQ:接受消息再傳遞消息,可以視為一個“郵局”。發(fā)送者和接受者通過隊(duì)列來進(jìn)行交互,隊(duì)列的大小可以視為無限的,多個發(fā)送者可以發(fā)生給一個隊(duì)列,多個接收者也可以從一個隊(duì)列中接受消息。

    code

    rabbitmq使用的協(xié)議是amqp,用于python的推薦客戶端是pika

    pip install pika -i https://pypi.douban.com/simple/

    send.py

    # coding: utf8
    import pika
    
    # 建立一個連接
    connection = pika.BlockingConnection(pika.ConnectionParameters(
     'localhost')) # 連接本地的RabbitMQ服務(wù)器
    channel = connection.channel() # 獲得channel

    這里鏈接的是本機(jī)的,如果想要連接其他機(jī)器上的服務(wù)器,只要填入地址或主機(jī)名即可。

    接下來我們開始發(fā)送消息了,注意要確保接受消息的隊(duì)列是存在的,否則rabbitmq就丟棄掉該消息

    channel.queue_declare(queue='hello') # 在RabbitMQ中創(chuàng)建hello這個隊(duì)列
    channel.basic_publish(exchange='', # 使用默認(rèn)的exchange來發(fā)送消息到隊(duì)列
     routing_key='hello', # 發(fā)送到該隊(duì)列 hello 中
     body='Hello World!') # 消息內(nèi)容
    
    connection.close() # 關(guān)閉 同時flush

    RabbitMQ默認(rèn)需要1GB的空閑磁盤空間,否則發(fā)送會失敗。

    這時已在本地隊(duì)列hello中存放了一個消息,如果使用 rabbitmqctl list_queues 可看到

    hello 1

    說明有一個hello隊(duì)列 里面存放了一個消息

    receive.py

    # coding: utf8
    import pika
    connection = pika.BlockingConnection(pika.ConnectionParameters(
     'localhost'))
    channel = connection.channel()

    還是先鏈接到服務(wù)器,和之前發(fā)送時相同

    channel.queue_declare(queue='hello') # 此處就是聲明了 來確保該隊(duì)列 hello 存在 可以多次聲明 這里主要是為了防止接受程序先運(yùn)行時出錯
    
    def callback(ch, method, properties, body): # 用于接收到消息后的回調(diào)
     print(" [x] Received %r" % body)
    
    channel.basic_consume(callback,
     queue='hello', # 收指定隊(duì)列hello的消息
     no_ack=True) #在處理完消息后不發(fā)送ack給服務(wù)器
    channel.start_consuming() # 啟動消息接受 這會進(jìn)入一個死循環(huán)

    工作隊(duì)列(任務(wù)隊(duì)列)

    工作隊(duì)列是用于分發(fā)耗時任務(wù)給多個工作進(jìn)程的。不立即做那些耗費(fèi)資源的任務(wù)(需要等待這些任務(wù)完成),而是安排這些任務(wù)之后執(zhí)行。例如我們把task作為message發(fā)送到隊(duì)列里,啟動工作進(jìn)程來接受并最終執(zhí)行,且可啟動多個工作進(jìn)程來工作。這適用于web應(yīng)用,即不應(yīng)在一個http請求的處理窗口內(nèi)完成復(fù)雜任務(wù)。

    channel.basic_publish(exchange='',
     routing_key='task_queue',
     body=message,
     properties=pika.BasicProperties(
     delivery_mode = 2, # 使得消息持久化
     ))

    分配消息的方式為 輪詢 即每個工作進(jìn)程獲得相同的消息數(shù)。

    消息ack

    如果消息分配給某個工作進(jìn)程,但是該工作進(jìn)程未處理完成就崩潰了,可能該消息就丟失了,因?yàn)閞abbitmq一旦把一個消息分發(fā)給工作進(jìn)程,它就把該消息刪掉了。

    為了預(yù)防消息丟失,rabbitmq提供了ack,即工作進(jìn)程在收到消息并處理后,發(fā)送ack給rabbitmq,告知rabbitmq這時候可以把該消息從隊(duì)列中刪除了。如果工作進(jìn)程掛掉 了,rabbitmq沒有收到ack,那么會把該消息 重新分發(fā)給其他工作進(jìn)程。不需要設(shè)置timeout,即使該任務(wù)需要很長時間也可以處理。

    ack默認(rèn)是開啟的,之前我們的工作進(jìn)程顯示指定了no_ack=True

    channel.basic_consume(callback, queue='hello') # 會啟用ack

    帶ack的callback:

    def callback(ch, method, properties, body):
     print " [x] Received %r" % (body,)
     time.sleep( body.count('.') )
     print " [x] Done"
     ch.basic_ack(delivery_tag = method.delivery_tag) # 發(fā)送ack

    消息持久化

    但是,有時RabbitMQ重啟了,消息也會丟失??稍趧?chuàng)建隊(duì)列時設(shè)置持久化:
    (隊(duì)列的性質(zhì)一旦確定無法改變)

    channel.queue_declare(queue='task_queue', durable=True)

    同時在發(fā)送消息時也得設(shè)置該消息的持久化屬性:

    channel.basic_publish(exchange='',

     routing_key="task_queue",
     body=message,
     properties=pika.BasicProperties(
     delivery_mode = 2, # make message persistent
     ))

    但是,如果在RabbitMQ剛接收到消息還沒來得及存儲,消息還是會丟失。同時,RabbitMQ也不是在接受到每個消息都進(jìn)行存盤操作。如果還需要更完善的保證,需要使用publisher confirm。

    公平的消息分發(fā)

    輪詢模式的消息分發(fā)可能并不公平,例如奇數(shù)的消息都是繁重任務(wù)的話,某些進(jìn)程則會一直運(yùn)行繁 重任務(wù)。即使某工作進(jìn)程上有積壓的消息未處理,如很多都沒發(fā)ack,但是RabbitMQ還是會按照順序發(fā)消息給它??梢栽诮邮苓M(jìn)程中加設(shè)置:

    channel.basic_qos(prefetch_count=1)

    告知RabbitMQ,這樣在一個工作進(jìn)程沒回發(fā)ack情況下是不會再分配消息給它。

    群發(fā)

    一般情況下,一條消息是發(fā)送給一個工作進(jìn)程,然后完成,有時想把一條消息同時發(fā)送給多個進(jìn)程:

    exchange

    發(fā)送者是不是直接發(fā)送消息到隊(duì)列中的,事實(shí)上發(fā)生者根本不知道消息會發(fā)送到那個隊(duì)列,發(fā)送者只能把消息發(fā)送到exchange里。exchange一方面收生產(chǎn)者的消息,另一方面把他們推送到隊(duì)列中。所以作為exchange,它需要知道當(dāng)收到消息時它需要做什么,是應(yīng)該把它加到一個特殊的隊(duì)列中還是放到很多的隊(duì)列中,或者丟棄。exchange有direct、topic、headers、fanout等種類,而群發(fā)使用的即fanout。之前在發(fā)布消息時,exchange的值為 '' 即使用default exchange。

    channel.exchange_declare(exchange='logs', type='fanout') # 該exchange會把消息發(fā)送給所有它知道的隊(duì)列中

    臨時隊(duì)列

    result = channel.queue_declare() # 創(chuàng)建一個隨機(jī)隊(duì)列
    result = channel.queue_declare(exclusive=True) # 創(chuàng)建一個隨機(jī)隊(duì)列,同時在沒有接收者連接該隊(duì)列后則銷毀它
    queue_name = result.method.queue

    這樣result.method.queue即是隊(duì)列名稱,在發(fā)送或接受時即可使用。

    綁定exchange 和 隊(duì)列

    channel.queue_bind(exchange='logs',
     queue='hello')

    logs在發(fā)送消息時給hello也發(fā)一份。

    在發(fā)送消息是使用剛剛創(chuàng)建的 logs exchange

     channel.basic_publish(exchange='logs',
     routing_key='',
     body=message)

    路由

    之前已經(jīng)使用過bind,即建立exchange和queue的關(guān)系(該隊(duì)列對來自該exchange的消息有興趣),bind時可另外指定routing_key選項(xiàng)。

    使用direct exchange

    將對應(yīng)routing key的消息發(fā)送到綁定相同routing key的隊(duì)列中

    channel.exchange_declare(exchange='direct_logs',
     type='direct')

    發(fā)送函數(shù),發(fā)布不同severity的消息:

    channel.basic_publish(exchange='direct_logs',
     routing_key=severity,
     body=message)

    接受函數(shù)中綁定對應(yīng)severity的:

    channel.queue_bind(exchange='direct_logs',
     queue=queue_name,
     routing_key=severity)

    使用topic exchange

    之前使用的direct exchange 只能綁定一個routing key,可以使用這種可以拿.隔開routing key的topic exchange,例如:

    "stock.usd.nyse" "nyse.vmw"

    和direct exchange一樣,在接受者那邊綁定的key與發(fā)送時指定的routing key相同即可,另外有些特殊的值:

    * 代表1個單詞
    # 代表0個或多個單詞

    如果發(fā)送者發(fā)出的routing key都是3個部分的,如:celerity.colour.species。

    Q1:
    *.orange.* 對應(yīng)的是中間的colour都為orange的
    
    Q2:
    *.*.rabbit 對應(yīng)的是最后部分的species為rabbit的
    lazy.# 對應(yīng)的是第一部分是lazy的

    qucik.orange.rabbit Q1 Q2都可接收到,quick.orange.fox 只有Q1能接受到,對于lazy.pink.rabbit雖然匹配到了Q2兩次,但是只會發(fā)送一次。如果綁定時直接綁定#,則會收到所有的。

     RPC

    在遠(yuǎn)程機(jī)器上運(yùn)行一個函數(shù)然后獲得結(jié)果。

    1、客戶端啟動 同時設(shè)置一個臨時隊(duì)列用于接受回調(diào),綁定該隊(duì)列

     self.connection = pika.BlockingConnection(pika.ConnectionParameters(
     host='localhost'))
     self.channel = self.connection.channel()
     result = self.channel.queue_declare(exclusive=True)
     self.callback_queue = result.method.queue
     self.channel.basic_consume(self.on_response, no_ack=True,
     queue=self.callback_queue)

    2、客戶端發(fā)送rpc請求,同時附帶reply_to對應(yīng)回調(diào)隊(duì)列,correlation_id設(shè)置為每個請求的唯一id(雖然說可以為每一次RPC請求都創(chuàng)建一個回調(diào)隊(duì)列,但是這樣效率不高,如果一個客戶端只使用一個隊(duì)列,則需要使用correlation_id來匹配是哪個請求),之后阻塞在回調(diào)隊(duì)列直到收到回復(fù)

    注意:如果收到了非法的correlation_id直接丟棄即可,因?yàn)橛羞@種情況--服務(wù)器已經(jīng)發(fā)了響應(yīng)但是還沒發(fā)ack就掛了,等一會服務(wù)器重啟了又會重新處理該任務(wù),又發(fā)了一遍相應(yīng),但是這時那個請求已經(jīng)被處理掉了

    channel.basic_publish(exchange='',
     routing_key='rpc_queue',
     properties=pika.BasicProperties(
     reply_to = self.callback_queue,
     correlation_id = self.corr_id,
     ),
     body=str(n)) # 發(fā)出調(diào)用
    
    while self.response is None: # 這邊就相當(dāng)于阻塞了
     self.connection.process_data_events() # 查看回調(diào)隊(duì)列
    return int(self.response)

    3、請求會發(fā)送到rpc_queue隊(duì)列
    4、RPC服務(wù)器從rpc_queue中取出,執(zhí)行,發(fā)送回復(fù)

    channel.basic_consume(on_request, queue='rpc_queue') # 綁定 等待請求
    
    # 處理之后:
    ch.basic_publish(exchange='',
     routing_key=props.reply_to,
     properties=pika.BasicProperties(correlation_id = 
     props.correlation_id),
     body=str(response)) # 發(fā)送回復(fù)到回調(diào)隊(duì)列
    ch.basic_ack(delivery_tag = method.delivery_tag) # 發(fā)送ack

    5、客戶端從回調(diào)隊(duì)列中取出數(shù)據(jù),檢查correlation_id,執(zhí)行相應(yīng)操作

    if self.corr_id == props.correlation_id:
     self.response = body

    聲明:本網(wǎng)頁內(nèi)容旨在傳播知識,若有侵權(quán)等問題請及時與本網(wǎng)聯(lián)系,我們將在第一時間刪除處理。TEL:177 7030 7066 E-MAIL:11247931@qq.com

    文檔

    RabbitMQ快速入門python教程

    RabbitMQ快速入門python教程:HelloWorld簡介RabbitMQ:接受消息再傳遞消息,可以視為一個郵局。發(fā)送者和接受者通過隊(duì)列來進(jìn)行交互,隊(duì)列的大小可以視為無限的,多個發(fā)送者可以發(fā)生給一個隊(duì)列,多個接收者也可以從一個隊(duì)列中接受消息。coderabbitmq使用的協(xié)議是amqp,用于pyth
    推薦度:
    標(biāo)簽: 教程 python 快速入門
    • 熱門焦點(diǎn)

    最新推薦

    猜你喜歡

    熱門推薦

    專題
    Top
    主站蜘蛛池模板: 久久夜色精品国产噜噜噜亚洲AV | 99久久精品费精品国产| 亚洲AV乱码久久精品蜜桃| 精品无码久久久久久久久久| 国产亚洲精品无码拍拍拍色欲| 精品亚洲一区二区三区在线播放| 精品国产三级a乌鸦在线观看| 国产精品女人呻吟在线观看| 久久青青草原精品国产不卡| 国产精品综合久久第一页| 久久99国产精品成人欧美| 欲帝精品福利视频导航| 99香蕉国产精品偷在线观看| 2021国产三级精品三级在专区| 国产欧美精品区一区二区三区 | 2021国产精品视频网站| 欧美精品一区二区在线精品| 国产在线精品一区二区在线观看 | 国产vA免费精品高清在线观看| 夜夜精品无码一区二区三区| 99国产精品久久久久久久成人热| 91久久精品电影| 精品无码无人网站免费视频| 精品国产亚洲一区二区三区| 欧美精品黑人粗大| 韩国精品欧美一区二区三区| 四虎影院国产精品| 热99re久久国超精品首页| 亚洲国产精品狼友中文久久久| 第一福利永久视频精品| 无码国内精品久久人妻麻豆按摩 | 国产精品第13页| 亚洲精品国自产拍在线观看| 99视频在线观看精品| 亚洲一区二区精品视频| 91亚洲精品麻豆| 国产成人精品日本亚洲专| 乱码精品一区二区三区 | 日本欧美韩国日本精品| 亚洲性日韩精品一区二区三区| 国产成人精品怡红院在线观看|