一、rabbitMQ有几种方式实现队列
单模式(Simple Mode):是最基本的消息传递方式,只包含一个生产者和一个消费者。生产者向消息队列发送消息,消费者从队列中接收消息,这种方式不需要额外的配置。
工作队列模式(Work Queues):也称为任务队列模式,包含多个工作者(Workers)和一个生产者。生产者向消息队列发送消息,工作者们从队列中获取并处理消息。该模式支持负载均衡和消息确认等功能。
发布/订阅模式(Publish/Subscribe):包含一个生产者和多个消费者,每个消费者都有自己的队列。生产者向交换机(Exchange)发送消息,交换机将消息广播给所有注册在该交换机上的队列,并由各个消费者接收和处理消息。
路由模式(Routing):也称为指定路由键模式,包含一个生产者、多个消费者和一个交换机。交换机根据消息的路由键(Routing Key)将消息发送到对应的队列,消费者从队列中接收并处理消息。
主题模式(Topics):与路由模式相似,但使用更灵活的主题(Topic)路由键来匹配消息和队列。主题路由键包含多个单词,用点(.)分隔,并支持通配符(*和#)来匹配不确定的单词。
Java语言以RabbitMQ实现各种队列的示例代码
生产者代码:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
private static final String HOST = "localhost";
private static final String USERNAME = "guest";
private static final String PASSWORD = "guest";
private static final String QUEUE_NAME = "hello";
private static final String EXCHANGE_NAME = "logs";
private static final String ROUTING_KEY = "info.user";
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
// 创建连接和通道
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 发送简单队列消息
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
// 发送工作队列消息
for (int i = 0; i < 10; i++) {
String task = "Task " + i;
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicPublish("", QUEUE_NAME, null, task.getBytes());
System.out.println(" [x] Sent '" + task + "'");
}
// 发送发布/订阅消息
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = "info message";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
// 发送路由消息
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String message = "user message";
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
}
}
消费者代码
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class Consumer {
private static final String HOST = "localhost";
private static final String USERNAME = "guest";
private static final String PASSWORD = "guest";
private static final String QUEUE_NAME = "hello";
private static final String EXCHANGE_NAME = "logs";
private static final String ROUTING_KEY = "info.user";
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
// 创建连接和通道
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 接收简单队列消息
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
// 接收工作队列消息
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
int prefetchCount = 1;
channel.basicQos(prefetchCount); // 公平分发
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(" [x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
boolean autoAck = false; // 禁止自动确认消息
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {
});
// 接收发布/订阅消息
channel.exchangeDeclare(EXCHANGE_NAME,