使用Maven方式引入依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.0</version>
</dependency>
生产消息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class RabbitmqProducer {
// private final static String EXCHANGE_NAME = "exchangeTest";
private final static String QUEUE_NAME = "helloMQ";
// private final static String ROUTING_KEY = "test";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置主机ip
factory.setHost("127.0.0.1");
// 设置amqp的tcp端口号
factory.setPort(5672);
// 设置用户名密码
factory.setUsername("username");
factory.setPassword("password");
// 设置Vhost,需要在控制台先创建
factory.setVirtualHost("/");
// 基于网络环境合理设置超时时间
factory.setConnectionTimeout(30 * 1000);
factory.setHandshakeTimeout(30 * 1000);
factory.setShutdownTimeout(0);
// 创建一个连接
Connection connection = factory.newConnection();
// 创建一个频道
Channel channel = connection.createChannel();
// 指定一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i < 100; i++) {
// 发送的消息
String message = "Hello rabbitMQ!_" + i;
// 往队列中发送一条消息,使用默认的交换器
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" Sent message: '" + message + "'");
TimeUnit.MILLISECONDS.sleep(100);
}
//关闭频道和连接
channel.close();
connection.close();
}
}
消费消息
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class RabbitmqConsumer {
//队列名称
private final static String QUEUE_NAME = "helloMQ";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置主机ip
factory.setHost("127.0.0.1");
//设置amqp的tcp端口号
factory.setPort(5672);
//设置用户名密码
factory.setUsername("username");
factory.setPassword("password");
//设置Vhost
factory.setVirtualHost("/");
//基于网络环境合理设置超时时间
factory.setConnectionTimeout(30 * 1000);
factory.setHandshakeTimeout(30 * 1000);
factory.setShutdownTimeout(0);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, StandardCharsets.UTF_8);
System.out.println("Received message: '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
SSL生产消息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultSaslConfig;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.*;
import java.security.cert.CertificateException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class RabbitmqProducerSsl {
// private final static String EXCHANGE_NAME = "exchangeTest";
private final static String QUEUE_NAME = "helloMQ";
// private final static String ROUTING_KEY = "test";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置主机ip
factory.setHost("127.0.0.1");
// 设置amqp的ssl端口号
factory.setPort(5671);
String ksFile = "/sslpath/ssl/client_rabbitmq_key.p12";
String tksFile = "/sslpath/ssl/truststore";
SSLContext c = null;
try {
char[] keyPassphrase = "W3zT_98Zz9Io".toCharArray();
KeyStore ks = KeyStore.getInstance("PKCS12");
ks.load(new FileInputStream(ksFile), keyPassphrase);
KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
kmf.init(ks, keyPassphrase);
char[] trustPassphrase = "W3zT_98Zz9Io".toCharArray();
KeyStore tks = KeyStore.getInstance("JKS");
tks.load(new FileInputStream(tksFile), trustPassphrase);
TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
tmf.init(tks);
c = SSLContext.getInstance("tlsv1.2");
c.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
} catch (KeyStoreException e) {
throw new RuntimeException(e);
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException(e);
} catch (CertificateException e) {
throw new RuntimeException(e);
} catch (UnrecoverableKeyException e) {
throw new RuntimeException(e);
} catch (KeyManagementException e) {
throw new RuntimeException(e);
}
factory.setSaslConfig(DefaultSaslConfig.EXTERNAL);
factory.useSslProtocol(c);
// 设置Vhost,需要在控制台先创建
factory.setVirtualHost("/");
// 基于网络环境合理设置超时时间
factory.setConnectionTimeout(30 * 1000);
factory.setHandshakeTimeout(30 * 1000);
factory.setShutdownTimeout(0);
// 创建一个连接
Connection connection = factory.newConnection();
// 创建一个频道
Channel channel = connection.createChannel();
// 指定一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i < 100; i++) {
// 发送的消息
String message = "Hello rabbitMQ!_" + i;
// 往队列中发送一条消息,使用默认的交换器
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" Sent message: '" + message + "'");
TimeUnit.MILLISECONDS.sleep(100);
}
//关闭频道和连接
channel.close();
connection.close();
}
}
SSL消费消息
import com.rabbitmq.client.*;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.*;
import java.security.cert.CertificateException;
import java.util.concurrent.TimeoutException;
public class RabbitmqConsumerSsl {
//队列名称
private final static String QUEUE_NAME = "helloMQ";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置主机ip
factory.setHost("127.0.0.1");
// 设置amqp的ssl端口号
factory.setPort(5671);
String ksFile = "/sslpath/ssl/client_rabbitmq_key.p12";
String tksFile = "/sslpath/ssl/truststore";
SSLContext c = null;
try {
char[] keyPassphrase = "W3zT_98Zz9Io".toCharArray();
KeyStore ks = KeyStore.getInstance("PKCS12");
ks.load(new FileInputStream(ksFile), keyPassphrase);
KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
kmf.init(ks, keyPassphrase);
char[] trustPassphrase = "W3zT_98Zz9Io".toCharArray();
KeyStore tks = KeyStore.getInstance("JKS");
tks.load(new FileInputStream(tksFile), trustPassphrase);
TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
tmf.init(tks);
c = SSLContext.getInstance("tlsv1.2");
c.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
} catch (KeyStoreException e) {
throw new RuntimeException(e);
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException(e);
} catch (CertificateException e) {
throw new RuntimeException(e);
} catch (UnrecoverableKeyException e) {
throw new RuntimeException(e);
} catch (KeyManagementException e) {
throw new RuntimeException(e);
}
factory.setSaslConfig(DefaultSaslConfig.EXTERNAL);
factory.useSslProtocol(c);
//设置Vhost,需要在控制台先创建
factory.setVirtualHost("vhost");
//基于网络环境合理设置超时时间
factory.setConnectionTimeout(30 * 1000);
factory.setHandshakeTimeout(30 * 1000);
factory.setShutdownTimeout(0);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, StandardCharsets.UTF_8);
System.out.println("Received message: '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}