Spring boot + JMS + ActiveMQ を試す

こんにちわ、
猫好きリーマンのほげPGです。
今回は Spring boot + JMS + ActiveMQ を試してみます。
1、ActiveMQのインストール&起動
ここ(https://activemq.apache.org/)からActiveMQ 5 “Classic”をダウンロードする
ダウンロードしたapache-activemq-5.15.10-bin.zipをC:\java に展開する
コマンドプロンプトより、以下を実行(java.exeのpathは通っている前提)
C:\java\apache-activemq-5.15.10\bin\activemq start
起動確認。
管理画面(http://localhost:8161/admin/)にアクセス。ユーザとパスはadmin/admin
試しにほげって見る
Queue NameにhogeQを入力し、Createボタン押下


hogeQの send リンクを押下

MessageBody に HOGE を入力し、Sendボタン押下

メッセージがhogeQに積まれました。
2、送信側プログラム
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>sample</groupId>
<artifactId>hoge-activemq-producer</artifactId>
<packaging>war</packaging>
<version>1.0.0-SNAPSHOT</version>
<name>HogeActiveMQProducer</name>
<url>http://maven.apache.org</url>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.1.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
HogeApp.java
@EnableAutoConfiguration
@ComponentScan
public class HogeApp extends SpringBootServletInitializer {
@Override
protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
return application.sources(HogeApp.class);
}
public static void main(String[] args) throws Exception {
SpringApplication.run(HogeApp.class, args);
}
}
HogeController.java 送信処理です。文字列を単に送信します。
@Controller
@Slf4j
public class HogeController {
@Autowired
JmsTemplate jmsTemplate;
@RequestMapping(value = "/")
@ResponseBody
public String hello() {
log.debug("called.");
jmsTemplate.convertAndSend("hogeQ", "hoge "+ new Date());
return "hoge!";
}
@RequestMapping("/hoge")
@ResponseBody
public String hello(HttpServletRequest request, HttpServletResponse response) throws IOException {
String body = request.getReader().lines().collect(Collectors.joining("\r\n"));
log.debug("body.length = {}", body.length());
jmsTemplate.convertAndSend("hogeQ", body);
return "hoge!";
}
}
application.yml
spring.main:
# 起動バナーなし
banner-mode: "off"
server:
servlet.context-path: /api
# ログ
logging:
pattern:
console: "%d{HH:mm:ss.SSS} %thread %-5level \\(%file:%line\\) %M - %msg%n"
level:
ROOT: INFO
jp.co.ois.sample: DEBUG
# MQ
spring.activemq:
broker-url: tcp://localhost:61616
起動し、ほげる
Producerプロジェクトフォルダにて、以下実行
mvn spring-boot:run
ブラウザより、http://localhost:8080/api にアクセス


もうちょいほげる
postmanやjmeterなどで http://localhost:8080/api/hoge にPOSTで呼ぶ。BodyがhogeQに積まれる。
以下は100こほどjmeterで呼出た結果

3、受信側プログラム
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>sample</groupId>
<artifactId>hoge-activemq-consumer</artifactId>
<packaging>jar</packaging>
<version>1.0.0-SNAPSHOT</version>
<name>HogeActiveMQConsumer</name>
<url>http://maven.apache.org</url>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.1.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
HogeApp.java
@EnableAutoConfiguration
@ComponentScan
public class HogeApp {
public static void main(String[] args) throws Exception {
SpringApplication.run(HogeApp.class, args);
}
}
@Configuration
public class HogeConfig {
@Bean
@ConfigurationProperties(prefix = "spring.activemq")
public ActiveMQConnectionFactory connectionFactory() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
return factory;
}
@Bean
public DefaultJmsListenerContainerFactory myFactory(
ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
configurer.configure(factory, connectionFactory);
ExecutorService executor = Executors.newCachedThreadPool(new HogeThreadFactory("Consumer-%d"));
factory.setTaskExecutor(executor);
return factory;
}
public static class HogeThreadFactory implements ThreadFactory {
private final AtomicInteger counter = new AtomicInteger(0);
private final String format;
private HogeThreadFactory(String format) {
this.format = format;
}
@Override
public Thread newThread(Runnable r) {
String name = String.format(format, counter.incrementAndGet());
return new Thread(null, r, name);
}
}
}
補足)
スレッド名が気に入らなかったので自分で指定しています。
プリフェッチを設定しています。初期値が1000なのでキューが溜まっている状態でコンシューマーを起動するとメッセージが1000以上ないと分散処理してくれないので小さくしています。
HogeConsumer.java 受信処理です。文字列で取得します。ダミーでウェイトさせています。
@Slf4j
@Component
public class HogeConsumer {
@JmsListener(destination = "hogeQ", containerFactory = "myFactory")
void received(String message) throws InterruptedException {
log.info("Message received: {} ", message);
Thread.sleep(200);
}
}
spring.main:
# 起動バナーなし
banner-mode: "off"
# 組み込みWebサーバの自動起動無効
web-application-type: none
# ログ
logging:
pattern:
console: "%d{HH:mm:ss.SSS} %thread %-5level \\(%file:%line\\) %M - %msg%n"
level:
ROOT: INFO
jp.co.ois.sample: DEBUG
# MQ
spring.activemq:
broker-url: tcp://localhost:61616
prefetch-policy.queue-prefetch: 10
# broker-url: tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1
spring.jms:
listener:
concurrency: 5
max-concurrency: 10
補足)
broker-url: tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1 を指定してもプリフェッチを指定できます。
起動して、受信してみます。
Consumerプロジェクトフォルダにて、以下実行
mvn spring-boot:run

キューからほげ受信できました。
【プロジェクト一式】
今回はここまで、次回はRabbitMQを試します。

◆WEB会議/セミナーシステム『Szia』
https://www.ois-yokohama.co.jp/szia/
◆サーバサイドで動作するミドルウェア『ReDois』
https://www.ois-yokohama.co.jp/redois/wp_redois/
