本文介绍Python版本的RabbitMQ客户端连接指导,包括RabbitMQ客户端安装,以及生产、消费消息。
使用前请参考收集连接信息收集RabbitMQ所需的连接信息。
准备环境
-
安装Python3
-
安装kombu
pip3 install kombu
生产消息
SSL认证方式
import ssl
import sys
from kombu import Connection, Exchange, Producer
def Main(argv):
ca_certfile = "certs\\ca_certificate.pem"
certfile = "certs\\client_rabbitmq_certificate.pem"
private_key = "certs\\client_rabbitmq_key.pem"
conn = Connection('amqp://127.0.0.1:5671/', login_method='EXTERNAL', ssl={
'ca_certs': ca_certfile,
'keyfile': private_key,
'certfile': certfile,
'cert_reqs': ssl.CERT_REQUIRED,
'ssl_version': ssl.PROTOCOL_TLSv1_2,
})
channel = conn.channel()
exchange = Exchange("example-exchange", type="direct")
producer = Producer(exchange=exchange, channel=channel, routing_key="test-key")
message = "Hello Rabbimtq"
producer.publish(message, retry=True)
print('send message: %s' % message)
if __name__ == '__main__':
Main(sys.argv)
非SSL认证方式
import sys
from kombu import Connection, Exchange, Producer
def Main(argv):
rabbitmq_url = 'amqp://username:password@127.0.0.1:5672/'
conn = Connection(rabbitmq_url, login_method='PLAIN')
channel = conn.channel()
exchange = Exchange("example-exchange", type="direct")
producer = Producer(exchange=exchange, channel=channel, routing_key="test-key")
message = "Hello Rabbimtq"
producer.publish(message, retry=True)
print('send message: %s' % message)
if __name__ == '__main__':
Main(sys.argv)
消费消息
SSL认证方式
import sys
import ssl
from kombu import Connection, Exchange, Queue, Consumer
def Main(argv):
ca_certfile = "certs\\ca_certificate.pem"
certfile ="certs\\client_rabbitmq_certificate.pem"
private_key = "certs\\client_rabbitmq_key.pem"
conn = Connection('amqp://127.0.0.1:5671/', login_method='EXTERNAL', ssl={
'ca_certs': ca_certfile,
'keyfile': private_key,
'certfile': certfile,
'cert_reqs': ssl.CERT_REQUIRED,
'ssl_version': ssl.PROTOCOL_TLSv1_2,
})
exchange = Exchange("example-exchange", type="direct")
queue = Queue(name="example-queue", exchange=exchange, routing_key="test-key")
def process_message(body, message):
print("receive message: %s" % format(body))
message.ack()
with Consumer(conn, queues=queue, callbacks=[process_message], accept=["text/plain"]):
conn.drain_events(timeout=2)
if __name__ == '__main__':
Main(sys.argv)
非SSL认证方式
import sys
from kombu import Connection, Exchange, Queue, Consumer
def Main(argv):
rabbitmq_url = 'amqp://username:password@127.0.0.1:5672/'
conn = Connection(rabbitmq_url, login_method='PLAIN')
exchange = Exchange("example-exchange", type="direct")
queue = Queue(name="example-queue", exchange=exchange, routing_key="test-key")
def process_message(body, message):
print("receive message: %s" % format(body))
message.ack()
with Consumer(conn, queues=queue, callbacks=[process_message], accept=["text/plain"]):
conn.drain_events(timeout=2)
if __name__ == '__main__':
Main(sys.argv)