人人IT網

人人IT網

當前位置: 主頁 > 其它綜合 > MQ >

消息隊列_RabbitMQ-0002.深入MQ生產者/信道/交換機/隊列/消費者?

時間:2016-11-30 19:05來源:Internet 作者:Internet 點擊:
形象說明:比喻: RabbitMQ提供的消息投遞服務類似於現實生活中的快遞公司,雙11我們可能會買很多東西,自然會陆續收到很多寄自淘寶店主由快遞公司發來的快件,但是可能很多時候買回來的東西並不合心意,

形象說明:


比喻: RabbitMQ提供的消息投遞服務類似於現實生活中的快遞公司,雙11我們可能會買很多東西,自然會陆續收到很多寄自淘寶店主由快遞公司發來的快件,但是可能很多時候買回來的東西並不合心意,自然會陆續通過快遞公司退回快件,所以回歸到架構,這裏的快件就相當於消息,我們相當於應用程序,淘寶店主相當於服務器,而快遞公司相當於路由器,應用程序可以發送和接收消息,服務器也可以發送和接收消息,所以當應用程序連接到RabbitMQ時,就必須做一個决定:我是發送還是接收哪?

現實: 生產者(Producer)創建消息,然後發布(發送)到消息代理服務器(RabbitMQ),消息包含兩部分內容:有效載荷(想要傳輸的數據,支持任何內容)和標簽(描述有效載荷,最終由RabbitMQ來决定誰將獲得消息的拷貝),消費者(Consumer)启動時連接消息代理服務器上,並訂閱指定隊列,每當消息達到此隊列時,RabbitMQ會將其發送给訂閱的消費者,當消費者接收到消息時,它只是得到了有效載荷,因为消息在路由的過程中,消息的標簽並沒有隨着有效載荷一起傳遞,RabbitMQ甚至不會告訴你生產者是誰?當然如果覺得有必要,也可以將身份信息加入有效載荷一起傳遞~

wKioL1g9FvqyZ9s3AAE0kFq2RfM767.png

信道連接:


說明: 使用消息代理服務器RabbitMQ的前提是建立AMQP信道,應用程序可以基於一條TCP連接快速創建銷毀無數信道來減少傳統TCP連接消耗,每個信道有唯一ID(由AMQP庫維護),AMQP命令都是通過信道發送


消息路由:

wKioL1g9F1LCA_7jAALTlrIRsus695.png

# 消費消息

1. 消費者通過AMQP的basic.consume命令訂閱,這样做會將信道置为接收模式,訂閱消息後,消息一到達隊列時就自動接收,直到取消隊列的訂閱为止

2. 消費者通過AMQP的basic.get命令訂閱,這样做會將信道置为接收模式,訂閱消息後,獲得單條消息後,然後自動取消訂閱,千萬不要妄想放在循環裏代替basic.consume,否則無法發揮其高吞吐量特性

3. 如果消息到達了無人訂閱的隊列,消息會在隊列中等待,一旦有消費者訂閱該隊列,隊列的消息會發送给消費者

4. 如果隊列擁有多個消費者時,隊列的消息以輪詢的方式發送给消費者,每條消息只會發送给一個訂閱的消費者,且每個消費者接收到的每一條消息都必須進行確認,消費者必須通過AMQP的basic.ack命令顯式地向RabbitMQ發送一個確認,或者在訂閱到隊列的時候將auto_ack参數設置为true,此時一旦消費者接收消息,RabbitMQ會自動認为其確認了消息,一旦消息被確認,RabbitMQ才會安全的把消息從隊列中刪除,主要是防止確認之前RabbitMQ斷開連接或取消訂閱或程序崩溃,RabbitMQ會認为這條消息沒有分發,然後重新分發给下一個訂閱的消費者,RabbitMQ會認为沒有確認的消費者並沒有准備好接收下一條消息,所以可以好好利用這一點,如果處理消息內容非常耗時,則你的應用程序可以延遲確認消息,直到消息處理完成再確認,這样可防止RabbitMQ持續不斷的消息導致過載

5. 如果收到消息後想要明確拒絕而不是確認收到消息的話,可使用AMQP的basic.reject,當把其basic.reject参數設置为true時,RabbitMQ會將消息重新發送给下一個訂閱的消費者,如果設置为false,則RabbitMQ會把消息從隊列中移除,而不會把它發送给新的消費者,當然也可以通過對消息確認的方式來簡單地忽略該消息,如當你檢測到一條格式錯誤的消息而任何一個消費者都無法處理的時候,此時就非常有用了.

# 隊列創建

1. 消費者和生產者都能使用AMQP的queue.declare命令來創建隊列,但是如果消費者在同一條信道上訂閱了另一個隊列的話,就無法再聲明隊列,必須首先取消訂閱,將信道設置为"傳輸"模式,

2. 創建隊列時,最好指定一個隊列名稱,消費者訂閱隊列時需要隊列名稱,並在創建绑定時也需要隊列名稱,如果不指定,RabbitMQ會隨機分配一個名稱作为queue.declare的返回值(常用於構建在AMQP上的RPC應用,此時零時匿名隊列很有用),創建隊列時exclusive为true時,隊列會變为私有,此時只有你的應用程序才能消費隊列消息,當你想要限制一個隊列只有一個消費者時很有有,auto-delete为true時,當最後一個消費者取消訂閱的時候,隊列就會自動移除,當你需要零時隊列只为一個消費者服務的話,可結合auto-delete和exclusive,當消費者斷開連接時,隊列就被移除了.

3. 如果嘗試聲明一個已經存在的隊列時,RabbitMQ就什麼都不做,並成功返回,如果你只是为了檢測隊列是否存在,可設置queue.declare的passive为true,如果存在會成功返回,否則會直接返回一個錯誤

4. 由於生產者和消費者都可以通過queue.declare創建隊列,但是由於如果消息路由到了不存在的隊列RabbitMQ會直接忽略它們,所以最好是生產者和消費者都建隊列

#交換绑定

1. 如果你想要將消息投遞到隊列時,首先得把消息發送给交換機,然後根據確定的規則,RabbitMQ會將决定消息該投遞到哪個隊列,這些規則被稱为路由鍵(Routing Key),隊列通過路由鍵绑定到交換機,當你把消息發送到消息代理服務器時,消息將擁有一個路由鍵,即便为空,RabbitMQ也會將其和绑定使用的路由鍵進行匹配,如果匹配成功,消息會被投遞到該隊列,如果不匹配將進入"黑洞"

wKiom1g9F3HCPie-AACJHeYI8wM145.png

2. Direct直接交換機(channel->basic_publish(message, exchange, routingkey)),非常簡單,如果路由鍵匹配的話,消息就被投遞到對應的隊列,當聲明隊列時,會自動绑定到默認交換機,並以隊列名稱作为路由鍵,所以發送消息時exchange为空則會發送到默認交換機,routingkey直接填寫對應的隊列名即可,如果默認交換機無法滿足應用程序需求時,可通過exchange.declare創建其它交換機

wKioL1g9GDzjYG3IAAB6vj0PX48379.png

3. Fanout扇形交換機,非常簡單,當你發送一條消息到fanout交換機時,它會把消息投遞给所有附加在此交換機上的隊列,這允許你對單條消息做不同方式的反應,如一個WEB應用程序可能需要在用戶上傳新的圖片時,用戶相冊必須清除緩存,同時用戶應該得到些積分獎勵,你可以將兩個隊列绑定到圖片上傳交換機上,一個用於清除緩存,另一個用於增加用戶積分,後期如果有其它需求只需要为新的消費者寫段代碼,然後聲明新的隊列並將其绑定到fanout交換機上,這样就可以實現生產者和消費者完全解耦,允許你輕而易舉的添加應用程序的功能.

wKioL1g9GF6CR2mTAAB5dxEM-KA426.png

4. Topic主題交換機,非常簡單,當你發送一條消息到topic交換機時,它會把消息投遞给以點號分割的路由鍵,匹配模式中*匹配特定位置的任意文本,"http://go.rritw.com/xmdevops.blog.51cto.com/#"匹配所有的規則,是沒有類似"*"以點號特定塊兒匹配的概念的,它匹配包括點號在內的所有規則.


總結: 從上面幾種模式可以看出其實RabbitMQ在開發中的角色可以非常靈活,既可以作为隊列服務器使用,也可以作为RPC服務器使用,完全取决於你如何組織這些功能.


虛機隔離:

說明: RabbitMQ還支持Vhost"虛擬主機",每個Vhost本質上是一個迷你版擁有自己的隊列/交換機/绑定以及權限機制的RabbitMQ服務器,這样就可以通過一個RabbitMQ服務眾多應用程序,Vhost之間相互隔離,有效的避免了隊列/交換機的命名沖突,否則你不得不運行多個RabbitMQ,默認Vhost为vhost: "http://go.rritw.com/xmdevops.blog.51cto.com/"可通過guest/guest訪問,但是为了安全起見,應該及時更改


添加虛機: /xm-workspace/xm-apps/rabbitmq/sbin/rabbitmqctl add_vhost xmzoomeye

查看虛機: /xm-workspace/xm-apps/rabbitmq/sbin/rabbitmqctl list_vhosts

刪除虛機: /xm-workspace/xm-apps/rabbitmq/sbin/rabbitmqctl delete_vhost xmzoomeye


說明: 一旦Vhost創建成功之後,就可以連接上去開始添加隊列和交換機,如果想連接遠程RabbitMQ節點可通過rabbitmqctl -n rabbit@hostname list_vhosts,需要注意的是rabbit@hostname中rabbit@是固定的,而hostname必須正確的是遠程主機名


持久存儲:


1. 默認重启RabbitMQ後,之前定義的交換機/隊列都會消失,但是如果設置隊列和交換機的durable屬性为true,則在崩溃重启之後會重建隊列和交換機,但是消息並不會重建,如果要實現持久化消息,則需要首先將"投遞模式"設置为2將消息標記成持久化,然後發布到持久化的交換機並到達持久化的隊列,這样才可以保證消息的持久化.

2. RabbitMQ確保持久化消息能從服務器重启中恢复其實是將它們寫入磁盤上的一個持久化日志文件,當發布一條持久化消息到持久化交換機時,RabbitMQ會在消息提交到日志文件後才發送響應,如果消息後來被路由到非持久化隊列,它會自動從持久化日志中刪除,並且無法從服務器重启中恢复,如果消息後來被路由到持久化隊列且被消費者消費並確認,則RabbitMQ會在持久化日志中把這條消息標記为等待垃圾收集,但是並不是所有的消息都需要启用持久化,不然會嚴重影響RabbitMQ每秒處理的消息總數

3. 從業務分析性能需求,如果要單台RabbitMQ服務器每秒處理10萬條消息則[可以考慮更快的存儲系統]或[通過在生產者單獨信道上監聽應答隊列,發送消息時有效載荷帶上此隊列名,消費者就可以回答應答確認接收返回给生產者]或[分開建立持久化熱備非集群負載均衡和非持久化集群],這样持久化消息通信負載不會減慢非持久化消息的處理.

4. AMQP中,一旦把信道設置成事務模式後,通過信道發送需要確認的消息,如果第一個消息失敗則後續命令會忽略,雖然可以借助它確認消息是否持久化到磁盤,但是事務不但會降低消息吞吐量,而且會使生產者應用程序產生同步,而你使用消息通信就是想要避免同步,其實還有另一種發送確認模式和事務相仿,只需要將信道設置为confirm模式,所有信道上發布的消息都會被指派一個唯一的ID,一旦消息被投遞给匹配的隊列後,信道會發送一個發送方確認模式给生產者應用程序(包含唯一ID),使得生產者知道消息已經安全到達目的隊列,如果消息和隊列是可持久化的,那麼確認消息只會在隊列將消息寫入磁盤後才會發出,相比於事務來說,最大的好處在於都是異步的,一旦發布了一條消息,生產者應用程序就可以在等待確認的同時繼續發送下一條,當確認消息最終收到的時候,生產者應用的回調方法就會觸發來處理該確認消息,如果RabbitMQ發生內部錯誤而導致消息丟失,會發送一條nack未確認消息,只是這次說明消息確實丟失了,此方式更加輕量級對於RabbitMQ消息代理服務器的性能影響幾乎不記.


貫穿實例:

wKioL1g9GIqS9VB6AAFC8FC6gQ8839.png

說明: 如上講述了RabbitMQ的所有組件以及架構,但要結合起來理解一條真實消息的生命周期的最好方法是實踐出真知,下面會使用PY的pika模塊來演示Hello Word消息傳遞過程.

發布: 連接RabbitMQ->獲取信道->聲明交換機->創建消息->發布消息->關閉信道->關閉連接

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"http://go.rritw.com/xmdevops.blog.51cto.com/""
#
# Authors: limanman
# OsChina: http://go.rritw.com/xmdevops.blog.51cto.com/
# Purpose:
#
"http://go.rritw.com/xmdevops.blog.51cto.com/""
# 說明: 導入公共模塊
import sys
import pika
# 說明: 導入其它模塊


if __name__ == '__main__':
    # 創建憑證對象
    credentials = pika.PlainCredentials('guest', 'guest')
    # 創建参數對象
    conn_params = pika.ConnectionParameters(
        # RabbitMQ服務地址
        host='127.0.0.1',
        # RabbitMQ服務端口
        port=5672,
        # RabbitMQ登錄憑證
        credentials=credentials,
        # RabbitMQ虛擬主機
        virtual_host='/'
    )
    # 創建連接對象
    conn_broker = pika.BlockingConnection(conn_params)
    # 獲取信道對象
    channel = conn_broker.channel()
    # 創建交換機
    channel.exchange_declare(
        # 交換機名稱
        exchange="salt-exchange",
        # 交換機類型
        type="direct",
        # 如果同名交換機已存在依然返回成功,否則創建
        passive=False,
        # 聲明为非持久化交換機
        durable=False,
        # 交換機閑置也不會自動刪除
        auto_delete=False
    )
    msg = sys.argv[1]
    # 創建配置對象
    msg_props = pika.BasicProperties()
    # 設置內容類型
    msg_props.content_type = 'text/plain'
    # 嘗試發布消息
    channel.basic_publish(
        # 發布消息內容
        body=msg,
        # 發布到交換機
        exchange='salt-exchange',
        # 發布信息屬性
        properties=msg_props,
        # 發布信息時攜帶的路由鍵
        routing_key='salt'
    )

說明: 首先用使用默認帳號密碼guest,默認端口5672,默認虛擬主機/連接RabbitMQ Vhost,然後建立信道,利用信道和rabbitMQ進行通信,然後聲明交換機,需要指定交換機名稱,交換機類型,是否passive模式,如果非passive模式則表示想要聲明交換機而非獲取交換機信息,還可以指定是否持久化以及是否刪除,最後通過命令行創建一條攜帶salt路由鍵類型为text/plain的消息通過basic_publish發送到salt-exchange交換機,但是此時由於並沒有任何隊列绑定在此交換機,所以消息必然會進入"黑洞"丟失掉.~

接收: 連接RabbitMQ->獲得信道->聲明交換機->聲明隊列->绑定隊列到交換機->消費信息->關閉信道->關閉連接

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"http://go.rritw.com/xmdevops.blog.51cto.com/""
#
# Authors: limanman
# OsChina: http://go.rritw.com/xmdevops.blog.51cto.com/
# Purpose:
#
"http://go.rritw.com/xmdevops.blog.51cto.com/""
# 說明: 導入公共模塊
import pika
# 說明: 導入其它模塊


if __name__ == '__main__':
    # 創建憑證對象
    credentials = pika.PlainCredentials('guest', 'guest')
    # 創建参數對象
    conn_params = pika.ConnectionParameters(
        # RabbitMQ服務地址
        host='127.0.0.1',
        # RabbitMQ服務端口
        port=5672,
        # RabbitMQ服務憑證
        credentials=credentials,
        # RabbitMQ虛擬主機
        virtual_host='/'
    )
    # 創建連接對象
    conn_broker = pika.BlockingConnection(conn_params)
    # 獲取信道對象
    channel = conn_broker.channel()
    # 創建交換機
    channel.exchange_declare(
        # 交換機名稱
        exchange="salt-exchange",
        # 交換機類型
        type="direct",
        # 如果同名交換機已存在依然返回成功
        passive=False,
        # 聲明为持久化交換機
        durable=False,
        # 交換機閑置也不會自動刪除
        auto_delete=False
    )
    # 創建隊列
    channel.queue_declare(queue="salt")
    # 绑定隊列
    channel.queue_bind(
        # 隊列名稱
        queue="salt",
        # 交換機名稱
        exchange="salt-exchange",
        # 路由鍵名稱
        routing_key="salt"
    )

    # 消息回調處理函數
    def msg_consumer(channel, method, header, body):
        # 發送消息確認
        channel.basic_ack(delivery_tag=method.delivery_tag)
        # 退出監聽循環
        if body == 'exit':
            channel.basic_cancel(consumer_tag="salt-consumer")
            channel.stop_consuming()
        else:
            print 'found notice: recive queue message {0}'.format(body)
        return

    # 作为指定隊列消費者
    channel.basic_consume(msg_consumer, queue="salt", consumer_tag="salt-consumer")
    # 循環調用回調函數接收處理消息
    channel.start_consuming()

說明: 首先用使用默認帳號密碼guest,默認端口5672,默認虛擬主機/連接RabbitMQ Vhost,然後建立信道,利用信道和rabbitMQ進行通信,然後再次聲明交換機,防止由於生產者沒有聲明交換機導致後面绑定隊列失敗,然後就是創建隊列,創建隊列時需要指定隊列名稱,然後就是绑定交換機,绑定的時候需要指定隊列名稱,交換機名稱,绑定路由鍵,最後就是訂閱指定隊列,訂閱時需要傳遞一個回調函數來處理消息,一個隊列名稱來指明要訂閱的隊列,一個標識進程的消費者標記,一旦開始讀取消息則會開始一個阻塞的循環等待從信道進來的數據,如果要停止,則需要先使用basic_cancel結束消費(關閉信道和連接),注意需要提供進程標識,然後再stop_consuming停止消費者

確認: 連接RabbitMQ->獲取信道->設置確認模式->聲明交換機->創建消息->發布消息->關閉信道->關閉連接

wKioL1g9GSbjoUozAACEXLxs2hg797.png

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"http://go.rritw.com/xmdevops.blog.51cto.com/""
#
# Authors: limanman
# OsChina: http://go.rritw.com/xmdevops.blog.51cto.com/
# Purpose:
#
"http://go.rritw.com/xmdevops.blog.51cto.com/""
# 說明: 導入公共模塊
import sys
import pika
# 說明: 導入其它模塊


if __name__ == '__main__':
    # 創建憑證對象
    credentials = pika.PlainCredentials('guest', 'guest')
    # 創建参數對象
    conn_params = pika.ConnectionParameters(
        # RabbitMQ服務地址
        host='127.0.0.1',
        # RabbitMQ服務端口
        port=5672,
        # RabbitMQ登錄憑證
        credentials=credentials,
        # RabbitMQ虛擬主機
        virtual_host='/'
    )
    # 創建連接對象
    conn_broker = pika.BlockingConnection(conn_params)
    # 獲取信道對象
    channel = conn_broker.channel()
    msg_ids = []
    msg_ids.append(len(msg_ids)+1)
    # 確認模式回調函數
    def confirm_handler(frame):
        # 第一次信道被設置为確認模式時會觸發一次確認回調
        if type(frame.method) == pika.spec.Confirm.SelectOk:
            print 'found notice: channel in confirm mode'
        # 如果發送的消息達到隊列後沒有回應則說明消息丟失,需要重發
        elif type(frame.method) == pika.spec.Basic.Nack:
            # 如果丟的消息確實是msg_ids裏面的,則說明剛剛發的消息確實是丟失了~
            if frame.method.delivery_tag in msg_ids:
                print 'found errors: message may be lost'
        # 如果發送的消息到達隊列後發回響應
        elif type(frame.method) == pika.spec.Basic.Ack:
            # 如果確認消息id確實是msg_ids裏面的,則從msg_ids裏面刪除
            if frame.method.delivery_tag in msg_ids:
                print 'found notice: message confirm received'
                # 刪除已經確認的消息
                msg_ids.remove(frame.method.delivery_tag)
    # 設置信道为確認模式
    channel.confirm_delivery(callback=confirm_handler)
    # 創建交換機
    channel.exchange_declare(
        # 交換機名稱
        exchange="salt-exchange",
        # 交換機類型
        type="direct",
        # 如果同名交換機已存在依然返回成功,否則創建
        passive=False,
        # 聲明为非持久化交換機
        durable=False,
        # 交換機閑置也不會自動刪除
        auto_delete=False
    )
    msg = sys.argv[1]
    # 創建配置對象
    msg_props = pika.BasicProperties()
    # 設置內容類型
    msg_props.content_type = 'text/plain'
    # 嘗試發布消息
    channel.basic_publish(
        # 發布消息內容
        body=msg,
        # 發布到交換機
        exchange='salt-exchange',
        # 發布信息屬性
        properties=msg_props,
        # 發布信息時攜帶的路由鍵
        routing_key='salt'
    )
    channel.close()

說明: RabbitMQ任何一個信道上發布的第一條消息都將獲得ID1,並且信道上接下來的每一條消息的ID都會步進1,對於信道來說,消息ID是唯一的,所以一旦信道關閉,你將無法追蹤發布在該信道上任何未完成的發送方確認消息狀態,所以RabbitMQ並不會在發布消息時返回消息對應的ID,而需要我們自己为每個信道單獨維護一個消息計數器,在幾乎不影響RabbitMQ性能的前提下在生產者端用回調來處理消息確認.



本文出自 “滿滿李 - 運維開發之路” 博客,請務必保留此出處http://go.rritw.com/xmdevops.blog.51cto.com/11144840/1877695


From:51CTO
頂一下
(0)
0%
踩一下
(0)
0%
------分隔線----------------------------
發表評論
請自覺遵守互聯網相關的政策法規,嚴禁發布色情、暴力、反動的言論。
評價:
表情:
驗證碼:點擊我更換圖片
欄目列表
推薦內容