RabbitMQ实战详解

 2023-03-28 09:08:41  阅读 0

一、rabbitMQ有几种方式实现队列

  1. 单模式(Simple Mode):是最基本的消息传递方式,只包含一个生产者和一个消费者。生产者向消息队列发送消息,消费者从队列中接收消息,这种方式不需要额外的配置。

  2. 工作队列模式(Work Queues):也称为任务队列模式,包含多个工作者(Workers)和一个生产者。生产者向消息队列发送消息,工作者们从队列中获取并处理消息。该模式支持负载均衡和消息确认等功能。

  3. 发布/订阅模式(Publish/Subscribe):包含一个生产者和多个消费者,每个消费者都有自己的队列。生产者向交换机(Exchange)发送消息,交换机将消息广播给所有注册在该交换机上的队列,并由各个消费者接收和处理消息。

  4. 路由模式(Routing):也称为指定路由键模式,包含一个生产者、多个消费者和一个交换机。交换机根据消息的路由键(Routing Key)将消息发送到对应的队列,消费者从队列中接收并处理消息。

  5. 主题模式(Topics):与路由模式相似,但使用更灵活的主题(Topic)路由键来匹配消息和队列。主题路由键包含多个单词,用点(.)分隔,并支持通配符(*和#)来匹配不确定的单词。

Java语言以RabbitMQ实现各种队列的示例代码

生产者代码:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    private static final String HOST = "localhost";
    private static final String USERNAME = "guest";
    private static final String PASSWORD = "guest";
    private static final String QUEUE_NAME = "hello";
    private static final String EXCHANGE_NAME = "logs";
    private static final String ROUTING_KEY = "info.user";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);

        // 创建连接和通道
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 发送简单队列消息
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");

            // 发送工作队列消息
            for (int i = 0; i < 10; i++) {
                String task = "Task " + i;
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                channel.basicPublish("", QUEUE_NAME, null, task.getBytes());
                System.out.println(" [x] Sent '" + task + "'");
            }

            // 发送发布/订阅消息
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            String message = "info message";
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");

            // 发送路由消息
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
            String message = "user message";
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

消费者代码

import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class Consumer {
    private static final String HOST = "localhost";
    private static final String USERNAME = "guest";
    private static final String PASSWORD = "guest";
    private static final String QUEUE_NAME = "hello";
    private static final String EXCHANGE_NAME = "logs";
    private static final String ROUTING_KEY = "info.user";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);

        // 创建连接和通道
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 接收简单队列消息
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
                System.out.println(" [x] Received '" + message + "'");
            };
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
            });

            // 接收工作队列消息
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
            int prefetchCount = 1;
            channel.basicQos(prefetchCount); // 公平分发
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
                System.out.println(" [x] Received '" + message + "'");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println(" [x] Done");
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                }
            };
            boolean autoAck = false; // 禁止自动确认消息
            channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {
            });

            // 接收发布/订阅消息
            channel.exchangeDeclare(EXCHANGE_NAME,

 

标签:

如本站内容信息有侵犯到您的权益请联系我们删除,谢谢!!


Copyright © 2020 All Rights Reserved 京ICP5741267-1号 统计代码