33
loading...
This website collects cookies to deliver better user experience
docker run --rm -it --hostname my-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.12.0</version>
</dependency>
package com.queue;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Queue {
private static final String HOST = "localhost";
private Channel channel;
public Queue() {
ConnectionFactory cf = new ConnectionFactory();
cf.setHost(HOST);
try {
Connection connection = cf.newConnection();
channel = connection.createChannel();
} catch (Exception e) {
System.err.println(e);
}
}
public void createExchangeQueue(String queueName, String exchangeName, String exchangeType, String key) {
try {
channel.queueDeclare(queueName, false, false, false, null);
channel.exchangeDeclare(exchangeName, exchangeType);
channel.queueBind(queueName, exchangeName, key);
} catch (Exception e) {
System.err.println(e);
}
}
}
package com.rabbit;
import com.queue.Queue;
public final class App {
private static String QUEUE_NAME = "square";
private static String EXCHANGE_NAME = "myExchange";
private static String KEY_NAME = "key";
public static void main(String[] args) {
Queue queue = new Queue();
queue.createExchangeQueue(QUEUE_NAME, EXCHANGE_NAME, "direct", KEY_NAME);
}
}
package com.queue;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.nio.charset.StandardCharsets;
import com.rabbitmq.client.DeliverCallback;
public class Queue {
private static final String HOST = "localhost";
private Channel channel;
public Queue() {
ConnectionFactory cf = new ConnectionFactory();
cf.setHost(HOST);
try {
Connection connection = cf.newConnection();
channel = connection.createChannel();
} catch (Exception e) {
System.err.println(e);
}
}
public void sendMessage(String exchange, String key, String message){
try {
channel.basicPublish(exchange, key, null, message.getBytes(StandardCharsets.UTF_8));
} catch (Exception e) {
System.err.println(e);
}
}
public void listenToQueue(String queueName, DeliverCallback dlr) {
try {
channel.basicConsume(queueName, true, dlr, consumerTag -> { });
} catch (Exception e) {
System.err.println(e);
}
}
public void createExchangeQueue(String queueName, String exchangeName, String exchangeType, String key) {
try {
channel.queueDeclare(queueName, false, false, false, null);
channel.exchangeDeclare(exchangeName, exchangeType);
channel.queueBind(queueName, exchangeName, key);
} catch (Exception e) {
System.err.println(e);
}
}
}
package com.square;
import com.rabbitmq.client.DeliverCallback;
import com.queue.Queue;
public class Square{
private static String QUEUE_NAME = "square";
private static String EXCHANGE_NAME = "myExchange";
private static String KEY_NAME = "key";
public void listenToMessage(){
Queue queue = new Queue();
queue.createExchangeQueue(QUEUE_NAME, EXCHANGE_NAME, "direct", KEY_NAME);
queue.listenToQueue(QUEUE_NAME, findSquare);
}
DeliverCallback findSquare = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
int number = Integer.parseInt(message);
int squareNumber = number * number;
System.out.println("Square of " + message + " is: " + squareNumber );
};
}
package com.rabbit;
import com.queue.Queue;
import com.square.Square;
public final class App {
private static String QUEUE_NAME = "square";
private static String EXCHANGE_NAME = "myExchange";
private static String KEY_NAME = "key";
public static void main(String[] args) {
Queue queue = new Queue();
queue.createExchangeQueue(QUEUE_NAME, EXCHANGE_NAME, "direct", KEY_NAME);
queue.sendMessage(EXCHANGE_NAME, KEY_NAME, "5");
Square sq = new Square();
sq.listenToMessage();
}
}
package com.rabbit;
import java.util.ArrayList;
import java.util.List;
import com.queue.Queue;
import com.square.Square;
public final class App {
private static String QUEUE_NAME = "square";
private static String EXCHANGE_NAME = "myExchange";
private static String KEY_NAME = "key";
public static void main(String[] args) {
Queue queue = new Queue();
queue.createExchangeQueue(QUEUE_NAME, EXCHANGE_NAME, "direct", KEY_NAME);
List<String> numbers = new ArrayList<String>();
numbers.add("1");
numbers.add("2");
numbers.add("3");
numbers.add("4");
numbers.add("5");
numbers.forEach((n)-> queue.sendMessage(EXCHANGE_NAME, KEY_NAME, n));
Square sq = new Square();
sq.listenToMessage();
}
}