Spring boot + AMQP + RabbitMQ を試す

こんにちわ、猫好きリーマンのほげPGです。
今回は Spring boot + AMQP + RabbitMQ を試してみます。
1、RabbitMQのインストール&起動
RabbitMQを動かす前にerlnagをインストールしておく必要があります。
erlnagのインストール
ここ(https://www.erlang.org/downloads)からWindows 64-bit Binary Fileをクリックしotp_win64_22.1.exeをダウンロード
exeを実行し、指示に従ってインストール
環境変数に以下を設定する
変数名:ERLANG_HOME
変数値:C:\Program Files\erl10.5 …erlnagをインストールしたフォルダ
RabbitMQのインストール
ここ(https://www.rabbitmq.com/install-windows-manual.html)からrabbitmq-server-windows-3.8.1.zipをダウンロード
ダウンロードしたrabbitmq-server-windows-3.8.1.zipをC:\work に展開する
コマンドプロンプトより、以下を実行
C:\work\rabbitmq_server-3.8.1\sbin\rabbitmq-server start
プラグインの有効か
コマンドプロンプトより、以下を実行
C:\work\rabbitmq_server-3.8.1\sbin\rabbitmq-plugins enable rabbitmq_management
起動確認。
管理画面(http://localhost:15672)にアクセス。
ユーザとパスはguest/guest
試しにほげって見る
Queuesタブを押下し、NameにhogeQを入力し、Add queueボタン押下
Publish messagesのPayloadにHOGEを入力し、PublishMessageボタン押下
キューからメッセージを取得
HOGEメッセージがキューに積まれているのを確認
2、Exchange
せっかくなので作って置きます。
まず、もうひとつキュー(hogeP)を作成します。
Exchangesタブを押下し、Exchangeを作成します。
作成されたhogeExを押下し、以下3つのbindを作成します。
これで、Exchange=hogeEz、routingkey=hoge にメッセージを送信すれば、hogePとhogeQに積まれます。
Exchange=hogeEz、routingkey=moge にメッセージを送信すれば、hogeQに積まれます。
3、送信側プログラム
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>sample</groupId> <artifactId>hoge-rabbitmq-producer</artifactId> <packaging>war</packaging> <version>1.0.0-SNAPSHOT</version> <name>HogeRabbitMQProducer</name> <url>http://maven.apache.org</url> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.1.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-tomcat</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <scope>provided</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
HogeApp.java
@EnableAutoConfiguration @ComponentScan public class HogeApp extends SpringBootServletInitializer { @Override protected SpringApplicationBuilder configure(SpringApplicationBuilder application) { return application.sources(HogeApp.class); } public static void main(String[] args) throws Exception { SpringApplication.run(HogeApp.class, args); } }
HogeController.java 送信処理です。
@Controller @Slf4j public class HogeController { @Autowired RabbitTemplate rabbitTemplate; @RequestMapping(value = "/") @ResponseBody public String hello() { log.debug("called."); rabbitTemplate.convertAndSend("hogeEx", "hoge", "HOGE-"+ new Date()); return "hoge!"; } @RequestMapping("/hoge") @ResponseBody public String hello(HttpServletRequest request, HttpServletResponse response) throws IOException { String body = request.getReader().lines().collect(Collectors.joining("\r\n")); log.debug("body.length = {}", body.length()); rabbitTemplate.convertAndSend("hogeEx", "moge", body); return "hoge!"; } }
application.yml
spring.main: # 起動バナーなし banner-mode: "off" server: servlet.context-path: /api # ログ logging: pattern: console: "%d{HH:mm:ss.SSS} %thread %-5level \\(%file:%line\\) %M - %msg%n" level: ROOT: INFO jp.co.ois.sample: DEBUG # MQ spring.rabbitmq: host: localhost port: 5672
起動し、ほげります。
Producerプロジェクトフォルダにて、以下実行
mvn spring-boot:run
ブラウザより、http://localhost:8080/api にアクセス
もうちょいほげります。
postmanやjmeterなどで http://localhost:8080/api/hoge にPOSTで呼ぶ。BodyがhogeQに積まれる。
4、受信側プログラム
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>sample</groupId> <artifactId>hoge-activemq-consumer</artifactId> <packaging>jar</packaging> <version>1.0.0-SNAPSHOT</version> <name>HogeActiveMQConsumer</name> <url>http://maven.apache.org</url> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.1.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <scope>provided</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
HogeApp.java
@EnableAutoConfiguration @ComponentScan public class HogeApp { public static void main(String[] args) throws Exception { SpringApplication.run(HogeApp.class, args); } }
HogeConfig.java
@Configuration
public class HogeConfig {
@Bean
public SimpleRabbitListenerContainerFactory myFactory(
ConnectionFactory connectionFactory,
SimpleRabbitListenerContainerFactoryConfigurer configurer) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
ExecutorService executor = Executors.newCachedThreadPool(new HogeThreadFactory("amqp-%d"));
factory.setTaskExecutor(executor);
return factory;
}
public static class HogeThreadFactory implements ThreadFactory {
private final AtomicInteger counter = new AtomicInteger(0);
private final String format;
private HogeThreadFactory(String format) {
this.format = format;
}
@Override
public Thread newThread(Runnable r) {
String name = String.format(format, counter.incrementAndGet());
return new Thread(null, r, name);
}
}
}
補足)
スレッド名が気に入らなかったので自分で指定しています。
HogeConsumer.java 受信処理です。文字列で取得します。ダミーでウェイトさせています。
@Slf4j @Component public class HogeConsumer { @RabbitListener(queues = "hogeQ", containerFactory="myFactory") void receivedQ(String message) throws InterruptedException { log.info("Message received: {} ", message); Thread.sleep(200); } @RabbitListener(queues = "hogeP", containerFactory="myFactory") void receivedP(String message) throws InterruptedException { log.info("Message received: {} ", message); Thread.sleep(200); } }
application.yml
spring.main: # 起動バナーなし banner-mode: "off" # 組み込みWebサーバの自動起動無効 web-application-type: none # ログ logging: pattern: console: "%d{HH:mm:ss.SSS} %thread %-5level \\(%file:%line\\) %M - %msg%n" level: ROOT: INFO jp.co.ois.sample: DEBUG # MQ spring.rabbitmq: host: localhost port: 5672 listener.simple.concurrency: 5 listener.simple.max-concurrency: 10 listener.simple.prefetch: 10
起動して、受信してみます。
Consumerプロジェクトフォルダにて、以下実行
mvn spring-boot:run
10:54:38.441 main INFO (StartupInfoLogger.java:55) logStarting - Starting HogeApp on NDYWM7A3420152 with PID 16088 (C:\work\masuda\myrepo\HogeRabbitMQConsumer\target\classes started by horqu in C:\work\masuda\myrepo\HogeRabbitMQConsumer) 10:54:38.451 main DEBUG (StartupInfoLogger.java:56) logStarting - Running with Spring Boot v2.2.1.RELEASE, Spring v5.2.1.RELEASE 10:54:38.452 main INFO (SpringApplication.java:651) logStartupProfileInfo - No active profile set, falling back to default profiles: default 10:54:40.314 main INFO (AbstractConnectionFactory.java:524) connect - Attempting to connect to: [localhost:5672] 10:54:40.420 main INFO (AbstractConnectionFactory.java:497) createBareConnection - Created new connection: rabbitConnectionFactory#495b0487:0/SimpleConnection@13e3c1c7 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 62505] 10:54:40.591 main INFO (StartupInfoLogger.java:61) logStarted - Started HogeApp in 3.459 seconds (JVM running for 6.338) 10:54:40.597 amqp-3 INFO (HogeConsumer.java:15) receivedQ - Message received: HOGE-Fri Dec 13 10:46:34 JST 2019 10:54:40.596 amqp-6 INFO (HogeConsumer.java:21) receivedP - Message received: HOGE-Fri Dec 13 10:46:34 JST 2019 10:54:40.801 amqp-3 INFO (HogeConsumer.java:15) receivedQ - Message received: HOGE-fpvnwxjxjayfvavmjxfkeokuwqooebxbnqexfnumszqvcmdhtmrvsmxiagnyvranonohvzpeghlotnhqjzynddajlhbvdohkfvnr
キューからほげ受信できました。
【プロジェクト一式】
今回はここまで。
◆WEB会議/セミナーシステム『Szia』
https://www.ois-yokohama.co.jp/szia/
◆サーバサイドで動作するミドルウェア『ReDois』
https://www.ois-yokohama.co.jp/redois/wp_redois/