IT박스

RabbitMQ : 토픽 교환이있는 지속성 메시지

itboxs 2020. 11. 25. 07:47
반응형

RabbitMQ : 토픽 교환이있는 지속성 메시지


저는 RabbitMQ를 처음 접했습니다.

나는 '주제'교환을 설정했습니다. 소비자는 게시자 이후에 시작될 수 있습니다. 소비자가 작동하기 전에 전송되었지만 아직 사용되지 않은 메시지를받을 수 있기를 바랍니다.

교환은 다음 매개 변수로 설정됩니다.

exchange_type => 'topic'
durable => 1
auto_delete => 0
passive => 0

메시지는 다음 매개 변수로 발행됩니다.

delivery_mode => 2

소비자는 get ()을 사용하여 교환에서 메시지를 검색합니다.

안타깝게도 클라이언트가 작동하기 전에 게시 된 모든 메시지는 손실됩니다. 나는 다른 조합을 사용했습니다.

내 문제는 거래소가 메시지를 보유하지 않는다는 것입니다. 게시자와 대기열 사이에 대기열이 필요할 수 있습니다. 그러나 이것은 메시지가 키로 라우팅되는 '주제'교환에서는 작동하지 않는 것 같습니다.

내가 어떻게 진행해야할지 알 수 있습니다. 나는 Perl 바인딩 Net :: RabbitMQ (중요하지 않음)와 RabbitMQ 2.2.0을 사용합니다.


메시지가 게시 될 때 메시지를 처리 ​​할 수있는 연결된 소비자가없는 경우 메시지를 저장하려면 지속 가능한 대기열이 필요합니다.

교환은 메시지를 저장하지 않지만 대기열은 저장할 수 있습니다. 혼란스러운 부분은 교환이 "지속성"으로 표시 될 수 있다는 입니다. 그러나 실제로는 브로커를 다시 시작하면 교환 자체 가 계속 유지되지만 해당 교환으로 보낸 메시지가 자동으로 유지된다는 의미 아닙니다 .

이를 감안할 때 두 가지 옵션이 있습니다.

  1. 게시자를 시작하기 전에 관리 단계수행 하여 대기열을 직접 만듭니다. 웹 UI 또는 명령 줄 도구를 사용하여이를 수행 할 수 있습니다. 활성 소비자가없는 경우에도 라우팅되는 모든 메시지를 저장할 수 있도록 영구 대기열로 생성해야합니다.
  2. 소비자가 시작시 항상 교환 및 대기열을 선언 (따라서 자동 생성)하도록 코딩되어 있다고 가정하고 (그리고이를 지속성으로 선언) 게시자를 시작하기 전에 모든 소비자를 한 번 이상 실행하면 됩니다. 그러면 모든 대기열이 올바르게 생성됩니다. 그런 다음 큐가 라우팅 된 향후 메시지를 지속적으로 저장하기 때문에 실제로 필요할 때까지 소비자를 종료 할 수 있습니다.

나는 # 1에 갈 것입니다. 수행 할 단계가 많지 않을 수 있으며 반복 될 수 있도록 필요한 단계를 항상 스크립팅 할 수 있습니다. 또한 모든 소비자가 동일한 단일 대기열에서 가져 오려는 경우 (각각 전용 대기열이있는 것이 아니라) 실제로 최소한의 관리 오버 헤드입니다.

대기열은 적절하게 관리하고 제어해야합니다. 그렇지 않으면 불량 소비자가 내구성 대기열을 선언하고 몇 분 동안 사용하지만 다시는 사용하지 않을 수 있습니다. 곧 크기를 줄이지 않고 영구적으로 증가하는 대기열과 임박한 브로커 종말이 생길 것입니다.


Brian이 언급했듯이 교환기는 메시지를 저장하지 않으며 주로 다른 교환기 또는 대기열로 메시지를 라우팅하는 역할을합니다. 교환이 대기열에 바인딩되지 않은 경우 해당 교환으로 전송 된 모든 메시지는 '손실'됩니다.

확장 가능하지 않을 수 있으므로 게시자 스크립트에서 고정 클라이언트 대기열을 선언 할 필요가 없습니다. 대기열은 게시자가 동적으로 생성하고 교환 대 교환 바인딩을 사용하여 내부적으로 라우팅 할 수 있습니다.

RabbitMQ는 토폴로지 유연성, 디커플링 및 기타 이점을 허용하는 exchange-to-exchange 바인딩을 지원합니다. 여기에서 RabbitMQ Exchange to Exchange Bindings [AMPQ] 에서 자세한 내용을 읽을 수 있습니다 .

RabbitMQ Exchange에서 Exchange 바인딩으로

토폴로지 예

대기열을 사용하는 소비자가없는 경우 지속성을 사용하여 교환-교환 바인딩을 생성하는 Python 코드 예제.

#!/usr/bin/env python
import pika
import sys


connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()


#Declares the entry exchange to be used by all producers to send messages. Could be external producers as well
channel.exchange_declare(exchange='data_gateway',
exchange_type='fanout',
durable=True,
auto_delete=False)

#Declares the processing exchange to be used.Routes messages to various queues. For internal use only
channel.exchange_declare(exchange='data_distributor',
exchange_type='topic',
durable=True,
auto_delete=False)

#Binds the external/producer facing exchange to the internal exchange
channel.exchange_bind(destination='data_distributor',source='data_gateway')

##Create Durable Queues binded to the data_distributor exchange
channel.queue_declare(queue='trade_db',durable=True)
channel.queue_declare(queue='trade_stream_service',durable=True)
channel.queue_declare(queue='ticker_db',durable=True)
channel.queue_declare(queue='ticker_stream_service',durable=True)
channel.queue_declare(queue='orderbook_db',durable=True)
channel.queue_declare(queue='orderbook_stream_service',durable=True)

#Bind queues to exchanges and correct routing key. Allows for messages to be saved when no consumer is present
channel.queue_bind(queue='orderbook_db',exchange='data_distributor',routing_key='*.*.orderbook')
channel.queue_bind(queue='orderbook_stream_service',exchange='data_distributor',routing_key='*.*.orderbook')
channel.queue_bind(queue='ticker_db',exchange='data_distributor',routing_key='*.*.ticker')
channel.queue_bind(queue='ticker_stream_service',exchange='data_distributor',routing_key='*.*.ticker')
channel.queue_bind(queue='trade_db',exchange='data_distributor',routing_key='*.*.trade')
channel.queue_bind(queue='trade_stream_service',exchange='data_distributor',routing_key='*.*.trade')

참고 URL : https://stackoverflow.com/questions/6148381/rabbitmq-persistent-message-with-topic-exchange

반응형