最新消息:Welcome to the puzzle paradise for programmers! Here, a well-designed puzzle awaits you. From code logic puzzles to algorithmic challenges, each level is closely centered on the programmer's expertise and skills. Whether you're a novice programmer or an experienced tech guru, you'll find your own challenges on this site. In the process of solving puzzles, you can not only exercise your thinking skills, but also deepen your understanding and application of programming knowledge. Come to start this puzzle journey full of wisdom and challenges, with many programmers to compete with each other and show your programming wisdom! Translated with DeepL.com (free version)

spring - RabbitMQ queue is empty even if message was published - Stack Overflow

matteradmin5PV0评论

I am trying to create a app using RabbitMQ and i am stuck at a point and I do not know what to do. I tried to follow a tutorial and everything should have worked, but it doesn't. Even if I am sending a postman request, even if I am doing the publishing from the RabbitMQ portal at localhost:15672, the pop up with message published appears, I get this text in the console, but when I want to check the queue, it is saying that is empty.

It is written very clear that the message has arrived to the consumer, but still, it the queue when I am checking from the portal, it doesn't

I will show the entire code now.

CONFIG

package ro.tuc.ds2020.config;

import .springframework.amqp.core.*;
import .springframework.amqp.rabbit.connection.ConnectionFactory;
import .springframework.amqp.rabbit.core.RabbitTemplate;
import .springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import .springframework.amqp.support.converter.MessageConverter;
import .springframework.beans.factory.annotation.Value;
import .springframework.context.annotation.Bean;
import .springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    @Value("${rabbitmq.queue.name}")
    private String queue;

    @Value("${rabbitmq.queue.json.name}")
    private String jsonQueue;

    @Value("${rabbitmq.queue.exchange}")
    private String exchange;

    @Value("${rabbitmq.queue.routing_key_one}")
    private String routingKeyOne;

    @Value("${rabbitmq.queue.routing_key_json}")
    private String routingKeyJson;

    @Bean
    public Queue queue() {
        return new Queue(queue);
    }

    @Bean
    public Queue jsonQueue() {
        return new Queue(jsonQueue, true);
    }

    @Bean
    public TopicExchange exchange() {
        return new TopicExchange(exchange, true, false);
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(exchange()).with(routingKeyOne);
    }

    @Bean
    public Binding jsonBinding() {
        return BindingBuilder.bind(jsonQueue()).to(exchange()).with(routingKeyJson);
    }

    @Bean
    public MessageConverter converter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public AmqpTemplate amqpTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(converter());
        return rabbitTemplate;
    }
}

CONSUMER

package ro.tuc.ds2020.consumer;

import .slf4j.Logger;
import .slf4j.LoggerFactory;
import .springframework.amqp.rabbit.annotation.RabbitListener;
import .springframework.stereotype.Service;
import ro.tuc.ds2020.dtos.MeasurementDTO;

@Service
public class RabbitMQJsonConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQJsonConsumer.class);

    @RabbitListener(queues = {"${rabbitmq.queue.json.name}"})
    public void consumeJsonMessage(MeasurementDTO measurementDTO) {
        LOGGER.info(String.format("Received JSON message here -> %s", measurementDTO.toString()));
    }

}

CONTROLLER for when I am using postman

package ro.tuc.ds2020.controllers;

import .springframework.http.ResponseEntity;
import .springframework.web.bind.annotation.*;
import ro.tuc.ds2020.dtos.MeasurementDTO;
import ro.tuc.ds2020.publisher.RabbitMQJsonProducer;

@RequestMapping("/api/v1")
@RestController
@CrossOrigin(origins = "http://localhost:4200", allowCredentials = "true")
public class MessageJsonController {

    private RabbitMQJsonProducer jsonProducer;

    public MessageJsonController(RabbitMQJsonProducer rabbitMQJsonProducer) {
        this.jsonProducer = rabbitMQJsonProducer;
    }

    @PostMapping("/publish")
    public ResponseEntity<String> sendJsonMessage(@RequestBody MeasurementDTO measurementDTO) {
        jsonProducer.sendJsonMessage(measurementDTO);
        return  ResponseEntity.ok("Json message sent to RabbitMQ ...");
    }
}

PUBLISHER

package ro.tuc.ds2020.publisher;

import .slf4j.Logger;
import .slf4j.LoggerFactory;
import .springframework.amqp.rabbit.core.RabbitTemplate;
import .springframework.beans.factory.annotation.Autowired;
import .springframework.beans.factory.annotation.Value;
import .springframework.stereotype.Service;
import ro.tuc.ds2020.dtos.MeasurementDTO;

@Service
public class RabbitMQJsonProducer {

    @Value("${rabbitmq.queue.exchange}")
    private String exchange;

    @Value("${rabbitmq.queue.routing_key_json}")
    private String routingKeyJson;

    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQJsonProducer.class);

    private RabbitTemplate rabbitTemplate;

    @Autowired
    public RabbitMQJsonProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void sendJsonMessage(MeasurementDTO measurementDTO) {
        LOGGER.info(String.format("Json message sent -> %s", measurementDTO.toString()));
        rabbitTemplate.convertAndSend(exchange, routingKeyJson, measurementDTO);
    }

}

And here is the application.properties

spring.rabbitmq.host = localhost
spring.rabbitmq.port = 5672
spring.rabbitmq.username = guest
spring.rabbitmq.password = guest

rabbitmq.queue.name = queue_1
rabbitmq.queue.json.name = queue_json
rabbitmq.queue.exchange = exchange
rabbitmq.queue.routing_key_one = routing_key_1
rabbitmq.queue.routing_key_json = routing_key_json

I am trying to create a app using RabbitMQ and i am stuck at a point and I do not know what to do. I tried to follow a tutorial and everything should have worked, but it doesn't. Even if I am sending a postman request, even if I am doing the publishing from the RabbitMQ portal at localhost:15672, the pop up with message published appears, I get this text in the console, but when I want to check the queue, it is saying that is empty.

It is written very clear that the message has arrived to the consumer, but still, it the queue when I am checking from the portal, it doesn't

I will show the entire code now.

CONFIG

package ro.tuc.ds2020.config;

import .springframework.amqp.core.*;
import .springframework.amqp.rabbit.connection.ConnectionFactory;
import .springframework.amqp.rabbit.core.RabbitTemplate;
import .springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import .springframework.amqp.support.converter.MessageConverter;
import .springframework.beans.factory.annotation.Value;
import .springframework.context.annotation.Bean;
import .springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    @Value("${rabbitmq.queue.name}")
    private String queue;

    @Value("${rabbitmq.queue.json.name}")
    private String jsonQueue;

    @Value("${rabbitmq.queue.exchange}")
    private String exchange;

    @Value("${rabbitmq.queue.routing_key_one}")
    private String routingKeyOne;

    @Value("${rabbitmq.queue.routing_key_json}")
    private String routingKeyJson;

    @Bean
    public Queue queue() {
        return new Queue(queue);
    }

    @Bean
    public Queue jsonQueue() {
        return new Queue(jsonQueue, true);
    }

    @Bean
    public TopicExchange exchange() {
        return new TopicExchange(exchange, true, false);
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(exchange()).with(routingKeyOne);
    }

    @Bean
    public Binding jsonBinding() {
        return BindingBuilder.bind(jsonQueue()).to(exchange()).with(routingKeyJson);
    }

    @Bean
    public MessageConverter converter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public AmqpTemplate amqpTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(converter());
        return rabbitTemplate;
    }
}

CONSUMER

package ro.tuc.ds2020.consumer;

import .slf4j.Logger;
import .slf4j.LoggerFactory;
import .springframework.amqp.rabbit.annotation.RabbitListener;
import .springframework.stereotype.Service;
import ro.tuc.ds2020.dtos.MeasurementDTO;

@Service
public class RabbitMQJsonConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQJsonConsumer.class);

    @RabbitListener(queues = {"${rabbitmq.queue.json.name}"})
    public void consumeJsonMessage(MeasurementDTO measurementDTO) {
        LOGGER.info(String.format("Received JSON message here -> %s", measurementDTO.toString()));
    }

}

CONTROLLER for when I am using postman

package ro.tuc.ds2020.controllers;

import .springframework.http.ResponseEntity;
import .springframework.web.bind.annotation.*;
import ro.tuc.ds2020.dtos.MeasurementDTO;
import ro.tuc.ds2020.publisher.RabbitMQJsonProducer;

@RequestMapping("/api/v1")
@RestController
@CrossOrigin(origins = "http://localhost:4200", allowCredentials = "true")
public class MessageJsonController {

    private RabbitMQJsonProducer jsonProducer;

    public MessageJsonController(RabbitMQJsonProducer rabbitMQJsonProducer) {
        this.jsonProducer = rabbitMQJsonProducer;
    }

    @PostMapping("/publish")
    public ResponseEntity<String> sendJsonMessage(@RequestBody MeasurementDTO measurementDTO) {
        jsonProducer.sendJsonMessage(measurementDTO);
        return  ResponseEntity.ok("Json message sent to RabbitMQ ...");
    }
}

PUBLISHER

package ro.tuc.ds2020.publisher;

import .slf4j.Logger;
import .slf4j.LoggerFactory;
import .springframework.amqp.rabbit.core.RabbitTemplate;
import .springframework.beans.factory.annotation.Autowired;
import .springframework.beans.factory.annotation.Value;
import .springframework.stereotype.Service;
import ro.tuc.ds2020.dtos.MeasurementDTO;

@Service
public class RabbitMQJsonProducer {

    @Value("${rabbitmq.queue.exchange}")
    private String exchange;

    @Value("${rabbitmq.queue.routing_key_json}")
    private String routingKeyJson;

    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQJsonProducer.class);

    private RabbitTemplate rabbitTemplate;

    @Autowired
    public RabbitMQJsonProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void sendJsonMessage(MeasurementDTO measurementDTO) {
        LOGGER.info(String.format("Json message sent -> %s", measurementDTO.toString()));
        rabbitTemplate.convertAndSend(exchange, routingKeyJson, measurementDTO);
    }

}

And here is the application.properties

spring.rabbitmq.host = localhost
spring.rabbitmq.port = 5672
spring.rabbitmq.username = guest
spring.rabbitmq.password = guest

rabbitmq.queue.name = queue_1
rabbitmq.queue.json.name = queue_json
rabbitmq.queue.exchange = exchange
rabbitmq.queue.routing_key_one = routing_key_1
rabbitmq.queue.routing_key_json = routing_key_json
Share Improve this question asked Nov 16, 2024 at 19:31 CornelCornel 797 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 0

Java often presents more challenges in setting up a proper working environment compared to other languages like Node.js or Python, which are generally easier to configure.

Requirement

Maven 3.9.9 and JDK 17

> mvn --version
Apache Maven 3.9.9 (8e8579a9e76f7d015ee5ec7bfcdc97d260186937)
Maven home: C:\Users\benchvue\maven\apache-maven-3.9.9
Java version: 17.0.12, vendor: Amazon Inc., runtime: C:\Program Files\Amazon Corretto\jdk17.0.12_7
Default locale: en_US, platform encoding: Cp1252
OS name: "windows 11", version: "10.0", arch: "amd64", family: "windows"

File Tree

C:.
│   docker-compose.yml
│   pom.xml
│
├───.idea
│       .gitignore
│       compiler.xml
│       encodings.xml
│       jarRepositories.xml
│       misc.xml
│
└───src
    └───main
        ├───java
        │   └───ro
        │       └───tuc
        │           └───ds2020
        │               │   Ds2020Application.java
        │               │
        │               ├───config
        │               │       RabbitMQConfig.java
        │               │
        │               ├───consumer
        │               │       RabbitMQJsonConsumer.java
        │               │
        │               ├───controllers
        │               │       MessageJsonController.java
        │               │
        │               ├───dtos
        │               │       MeasurementDTO.java
        │               │
        │               └───publisher
        │                       RabbitMQJsonProducer.java
        │
        └───resources
            │   application.properties
            │
            └───static

RabbitMQConfig.java

package ro.tuc.ds2020.config;

import .springframework.amqp.core.*;
import .springframework.amqp.rabbit.connection.ConnectionFactory;
import .springframework.amqp.rabbit.core.RabbitTemplate;
import .springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import .springframework.amqp.support.converter.MessageConverter;
import .springframework.beans.factory.annotation.Value;
import .springframework.context.annotation.Bean;
import .springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    @Value("${rabbitmq.queue.name}")
    private String queue;

    @Value("${rabbitmq.queue.json.name}")
    private String jsonQueue;

    @Value("${rabbitmq.queue.exchange}")
    private String exchange;

    @Value("${rabbitmq.queue.routing_key_one}")
    private String routingKeyOne;

    @Value("${rabbitmq.queue.routing_key_json}")
    private String routingKeyJson;

    @Bean
    public Queue queue() {
        return new Queue(queue);
    }

    @Bean
    public Queue jsonQueue() {
        return new Queue(jsonQueue, true);
    }

    @Bean
    public TopicExchange exchange() {
        return new TopicExchange(exchange, true, false);
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(exchange()).with(routingKeyOne);
    }

    @Bean
    public Binding jsonBinding() {
        return BindingBuilder.bind(jsonQueue()).to(exchange()).with(routingKeyJson);
    }

    @Bean
    public MessageConverter converter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public AmqpTemplate amqpTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(converter());
        return rabbitTemplate;
    }
}

RabbitMQJsonConsumer.java

package ro.tuc.ds2020.consumer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import .slf4j.Logger;
import .slf4j.LoggerFactory;
import .springframework.amqp.rabbit.annotation.RabbitListener;
import .springframework.stereotype.Service;
import ro.tuc.ds2020.dtos.MeasurementDTO;

@Service
public class RabbitMQJsonConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQJsonConsumer.class);
    private final ObjectMapper objectMapper = new ObjectMapper();

    @RabbitListener(queues = {"${rabbitmq.queue.json.name}"})
    public void consumeJsonMessage(MeasurementDTO measurementDTO) {
        try {
            String jsonMessage = objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(measurementDTO);
            LOGGER.info("Received JSON message here -> \n{}", jsonMessage);
        } catch (JsonProcessingException e) {
            LOGGER.error("Failed to convert message to JSON", e);
        }
    }
}

MessageJsonController/java

package ro.tuc.ds2020.controllers;

import .springframework.http.ResponseEntity;
import .springframework.web.bind.annotation.*;
import ro.tuc.ds2020.dtos.MeasurementDTO;
import ro.tuc.ds2020.publisher.RabbitMQJsonProducer;

import java.util.HashMap;
import java.util.Map;

@RequestMapping("/api/v1")
@RestController
public class MessageJsonController {

    private final RabbitMQJsonProducer jsonProducer;

    public MessageJsonController(RabbitMQJsonProducer rabbitMQJsonProducer) {
        this.jsonProducer = rabbitMQJsonProducer;
    }

    @PostMapping("/publish")
    public ResponseEntity<Map<String, String>> sendJsonMessage(@RequestBody MeasurementDTO measurementDTO) {
        jsonProducer.sendJsonMessage(measurementDTO);

        // Create a JSON response body
        Map<String, String> response = new HashMap<>();
        response.put("message", "Json message sent to RabbitMQ");
        response.put("status", "success");

        return ResponseEntity.ok(response);
    }
}

MeasurementDTO.java

package ro.tuc.ds2020.dtos;

import com.fasterxml.jackson.annotation.JsonProperty;

public class MeasurementDTO {

    @JsonProperty("sensorId")
    private String sensorId;

    @JsonProperty("value")
    private double value;

    @JsonProperty("unit")
    private String unit;

    @JsonProperty("timestamp")
    private String timestamp;

    // Getters and Setters
    public String getSensorId() {
        return sensorId;
    }

    public void setSensorId(String sensorId) {
        this.sensorId = sensorId;
    }

    public double getValue() {
        return value;
    }

    public void setValue(double value) {
        this.value = value;
    }

    public String getUnit() {
        return unit;
    }

    public void setUnit(String unit) {
        this.unit = unit;
    }

    public String getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(String timestamp) {
        this.timestamp = timestamp;
    }

    @Override
    public String toString() {
        return "MeasurementDTO{" +
                "sensorId='" + sensorId + '\'' +
                ", value=" + value +
                ", unit='" + unit + '\'' +
                ", timestamp='" + timestamp + '\'' +
                '}';
    }
}

RabbitMQJsonProducer.java

package ro.tuc.ds2020.publisher;

import .slf4j.Logger;
import .slf4j.LoggerFactory;
import .springframework.amqp.rabbit.core.RabbitTemplate;
import .springframework.beans.factory.annotation.Autowired;
import .springframework.beans.factory.annotation.Value;
import .springframework.stereotype.Service;
import ro.tuc.ds2020.dtos.MeasurementDTO;

@Service
public class RabbitMQJsonProducer {

    @Value("${rabbitmq.queue.exchange}")
    private String exchange;

    @Value("${rabbitmq.queue.routing_key_json}")
    private String routingKeyJson;

    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQJsonProducer.class);

    private RabbitTemplate rabbitTemplate;

    @Autowired
    public RabbitMQJsonProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void sendJsonMessage(MeasurementDTO measurementDTO) {
        LOGGER.info(String.format("Json message sent -> %s", measurementDTO.toString()));
        rabbitTemplate.convertAndSend(exchange, routingKeyJson, measurementDTO);
    }
}

Ds2020Application.java

package ro.tuc.ds2020;

import .springframework.boot.SpringApplication;
import .springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Ds2020Application {
    public static void main(String[] args) {
        SpringApplication.run(Ds2020Application.class, args);
    }
}

application.properties

spring.rabbitmq.host = localhost
spring.rabbitmq.port = 5672
spring.rabbitmq.username = guest
spring.rabbitmq.password = guest

rabbitmq.queue.name = queue_1
rabbitmq.queue.json.name = queue_json
rabbitmq.queue.exchange = exchange
rabbitmq.queue.routing_key_one = routing_key_1
rabbitmq.queue.routing_key_json = routing_key_json

pom.xml

<project xmlns="http://maven.apache./POM/4.0.0" xmlns:xsi="http://www.w3./2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache./POM/4.0.0 http://maven.apache./xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>ro.tuc</groupId>
    <artifactId>ds2020</artifactId>
    <version>1.0.0</version>
    <packaging>jar</packaging>

    <parent>
        <groupId>.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.5</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <dependencies>
        <!-- Spring Boot Starter Web -->
        <dependency>
            <groupId>.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- Spring Boot Starter AMQP -->
        <dependency>
            <groupId>.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <!-- Jackson for JSON serialization -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>

        <!-- Spring Boot Starter Test -->
        <dependency>
            <groupId>.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- Spring Boot Maven Plugin -->
            <plugin>
                <groupId>.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
        <resources>
            <resource>
                <directory>src/main/resources</directory>
                <includes>
                    <include>**/*</include>
                </includes>
            </resource>
        </resources>        
    </build>
</project>

docker-compose.yml

version: '3.8'

services:
  rabbitmq:
    image: rabbitmq:3-management
    container_name: rabbitmq
    ports:
      - "5672:5672" # RabbitMQ messaging port
      - "15672:15672" # RabbitMQ management UI
    environment:
      RABBITMQ_DEFAULT_USER: guest
      RABBITMQ_DEFAULT_PASS: guest

Launching RabbitMQ

docker compose up

Access RabbitMQ UI

username: guest
password: guest
http://localhost:15672/#/

Compile jar

mvn clean install

dir target

launching Java project

java -jar target/ds2020-1.0.0.jar

Call REST API by Postman

POST http://localhost:8080/api/v1/publish

Input Body

{
    "sensorId": "12345",
    "value": 67.5,
    "unit": "Celsius",
    "timestamp": "2024-11-16T18:30:00Z"
}

Java Side

Consumer will display in Spring Log

2024-11-16 19:11:43.964  INFO 22464 --- [ntContainer#0-1] r.t.d.consumer.RabbitMQJsonConsumer      : Received JSON message here ->
{
  "sensorId" : "12345",
  "value" : 67.5,
  "unit" : "Celsius",
  "timestamp" : "2024-11-16T18:30:00Z"
}

You can see the Spike in Rabbit UI

If you want to see the queue message by RabbitMQ UI

you need to comment out RabbitMQJsonConsumer.java

From

@RabbitListener(queues = {"${rabbitmq.queue.json.name}"})

To

//@RabbitListener(queues = {"${rabbitmq.queue.json.name}"})

Then build jar and run it again

  • The @RabbitListener annotation makes the consumer automatically consume messages from the queue as soon as they arrive.
  • When the consumer processes a message, it removes it from the queue, leaving the queue empty.
  • By commenting out @RabbitListener, the consumer is disabled, and messages remain in the queue for inspection.
  • This behavior ensures that messages are not lost but immediately processed unless explicitly paused.
  • For debugging, disabling the consumer allows you to verify message flow and queue content in RabbitMQ.

Good Luck!

Post a comment

comment list (0)

  1. No comments so far