Spring boot + JMS + ActiveMQ を試す
こんにちわ、
猫好きリーマンのほげPGです。
今回は Spring boot + JMS + ActiveMQ を試してみます。
1、ActiveMQのインストール&起動
ここ(https://activemq.apache.org/)からActiveMQ 5 “Classic”をダウンロードする
ダウンロードしたapache-activemq-5.15.10-bin.zipをC:\java に展開する
コマンドプロンプトより、以下を実行(java.exeのpathは通っている前提)
C:\java\apache-activemq-5.15.10\bin\activemq start
起動確認。
管理画面(http://localhost:8161/admin/)にアクセス。ユーザとパスはadmin/admin
試しにほげって見る
Queue NameにhogeQを入力し、Createボタン押下
hogeQの send リンクを押下
MessageBody に HOGE を入力し、Sendボタン押下
メッセージがhogeQに積まれました。
2、送信側プログラム
pom.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>sample</groupId> <artifactId>hoge-activemq-producer</artifactId> <packaging>war</packaging> <version>1.0.0-SNAPSHOT</version> <name>HogeActiveMQProducer</name> <url>http://maven.apache.org</url> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.1.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-tomcat</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <scope>provided</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project> |
HogeApp.java
1 2 3 4 5 6 7 8 9 10 11 12 13 |
@EnableAutoConfiguration @ComponentScan public class HogeApp extends SpringBootServletInitializer { @Override protected SpringApplicationBuilder configure(SpringApplicationBuilder application) { return application.sources(HogeApp.class); } public static void main(String[] args) throws Exception { SpringApplication.run(HogeApp.class, args); } } |
HogeController.java 送信処理です。文字列を単に送信します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
@Controller @Slf4j public class HogeController { @Autowired JmsTemplate jmsTemplate; @RequestMapping(value = "/") @ResponseBody public String hello() { log.debug("called."); jmsTemplate.convertAndSend("hogeQ", "hoge "+ new Date()); return "hoge!"; } @RequestMapping("/hoge") @ResponseBody public String hello(HttpServletRequest request, HttpServletResponse response) throws IOException { String body = request.getReader().lines().collect(Collectors.joining("\r\n")); log.debug("body.length = {}", body.length()); jmsTemplate.convertAndSend("hogeQ", body); return "hoge!"; } } |
application.yml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
spring.main: # 起動バナーなし banner-mode: "off" server: servlet.context-path: /api # ログ logging: pattern: console: "%d{HH:mm:ss.SSS} %thread %-5level \\(%file:%line\\) %M - %msg%n" level: ROOT: INFO jp.co.ois.sample: DEBUG # MQ spring.activemq: broker-url: tcp://localhost:61616 |
起動し、ほげる
Producerプロジェクトフォルダにて、以下実行
mvn spring-boot:run
ブラウザより、http://localhost:8080/api にアクセス
もうちょいほげる
postmanやjmeterなどで http://localhost:8080/api/hoge にPOSTで呼ぶ。BodyがhogeQに積まれる。
以下は100こほどjmeterで呼出た結果
3、受信側プログラム
pom.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>sample</groupId> <artifactId>hoge-activemq-consumer</artifactId> <packaging>jar</packaging> <version>1.0.0-SNAPSHOT</version> <name>HogeActiveMQConsumer</name> <url>http://maven.apache.org</url> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.1.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project> |
HogeApp.java
1 2 3 4 5 6 7 8 |
@EnableAutoConfiguration @ComponentScan public class HogeApp { public static void main(String[] args) throws Exception { SpringApplication.run(HogeApp.class, args); } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
@Configuration public class HogeConfig { @Bean @ConfigurationProperties(prefix = "spring.activemq") public ActiveMQConnectionFactory connectionFactory() { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); return factory; } @Bean public DefaultJmsListenerContainerFactory myFactory( ConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer configurer) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); configurer.configure(factory, connectionFactory); ExecutorService executor = Executors.newCachedThreadPool(new HogeThreadFactory("Consumer-%d")); factory.setTaskExecutor(executor); return factory; } public static class HogeThreadFactory implements ThreadFactory { private final AtomicInteger counter = new AtomicInteger(0); private final String format; private HogeThreadFactory(String format) { this.format = format; } @Override public Thread newThread(Runnable r) { String name = String.format(format, counter.incrementAndGet()); return new Thread(null, r, name); } } } |
補足)
スレッド名が気に入らなかったので自分で指定しています。
プリフェッチを設定しています。初期値が1000なのでキューが溜まっている状態でコンシューマーを起動するとメッセージが1000以上ないと分散処理してくれないので小さくしています。
HogeConsumer.java 受信処理です。文字列で取得します。ダミーでウェイトさせています。
1 2 3 4 5 6 7 8 9 10 |
@Slf4j @Component public class HogeConsumer { @JmsListener(destination = "hogeQ", containerFactory = "myFactory") void received(String message) throws InterruptedException { log.info("Message received: {} ", message); Thread.sleep(200); } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
spring.main: # 起動バナーなし banner-mode: "off" # 組み込みWebサーバの自動起動無効 web-application-type: none # ログ logging: pattern: console: "%d{HH:mm:ss.SSS} %thread %-5level \\(%file:%line\\) %M - %msg%n" level: ROOT: INFO jp.co.ois.sample: DEBUG # MQ spring.activemq: broker-url: tcp://localhost:61616 prefetch-policy.queue-prefetch: 10 # broker-url: tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1 spring.jms: listener: concurrency: 5 max-concurrency: 10 |
補足)
broker-url: tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1 を指定してもプリフェッチを指定できます。
起動して、受信してみます。
Consumerプロジェクトフォルダにて、以下実行
mvn spring-boot:run
キューからほげ受信できました。
【プロジェクト一式】
今回はここまで、次回はRabbitMQを試します。
◆WEB会議/セミナーシステム『Szia』
https://www.ois-yokohama.co.jp/szia/
◆サーバサイドで動作するミドルウェア『ReDois』
https://www.ois-yokohama.co.jp/redois/wp_redois/