Spring boot + JMS + ActiveMQ を試す

こんにちわ、

猫好きリーマンのほげPGです。

今回は Spring boot + JMS + ActiveMQ を試してみます。

 

1、ActiveMQのインストール&起動

ここ(https://activemq.apache.org/)からActiveMQ 5 “Classic”をダウンロードする

ダウンロードしたapache-activemq-5.15.10-bin.zipをC:\java に展開する

コマンドプロンプトより、以下を実行(java.exeのpathは通っている前提)
C:\java\apache-activemq-5.15.10\bin\activemq start

起動確認。

管理画面(http://localhost:8161/admin/)にアクセス。ユーザとパスはadmin/admin

試しにほげって見る

Queue NameにhogeQを入力し、Createボタン押下

 

hogeQの send リンクを押下

 

MessageBody に HOGE を入力し、Sendボタン押下

メッセージがhogeQに積まれました。

 

2、送信側プログラム

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>sample</groupId>
  <artifactId>hoge-activemq-producer</artifactId>
  <packaging>war</packaging>
  <version>1.0.0-SNAPSHOT</version>
  <name>HogeActiveMQProducer</name>
  <url>http://maven.apache.org</url>

  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.2.1.RELEASE</version>
  </parent>

  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-activemq</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-tomcat</artifactId>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <scope>provided</scope>
   </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
      </plugin>
    </plugins>
  </build>
</project>

 

HogeApp.java

@EnableAutoConfiguration
@ComponentScan
public class HogeApp extends SpringBootServletInitializer {

    @Override
    protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
        return application.sources(HogeApp.class);
    }

    public static void main(String[] args) throws Exception {
        SpringApplication.run(HogeApp.class, args);
    }
}

HogeController.java 送信処理です。文字列を単に送信します。

@Controller
@Slf4j
public class HogeController {

    @Autowired
    JmsTemplate jmsTemplate;

    @RequestMapping(value = "/")
    @ResponseBody
    public String hello() {
        log.debug("called.");
        jmsTemplate.convertAndSend("hogeQ", "hoge "+ new Date());
        return "hoge!";
    }

    @RequestMapping("/hoge")
    @ResponseBody
    public String hello(HttpServletRequest request, HttpServletResponse response) throws IOException {
        String body = request.getReader().lines().collect(Collectors.joining("\r\n"));
        log.debug("body.length = {}", body.length());
        jmsTemplate.convertAndSend("hogeQ", body);
        return "hoge!";
    }
}

application.yml

spring.main:
  # 起動バナーなし
  banner-mode: "off"

server:
  servlet.context-path: /api

# ログ
logging: 
  pattern:
    console: "%d{HH:mm:ss.SSS} %thread %-5level \\(%file:%line\\) %M - %msg%n"
  level: 
    ROOT: INFO
    jp.co.ois.sample: DEBUG

# MQ
spring.activemq:
  broker-url: tcp://localhost:61616

 

起動し、ほげる
Producerプロジェクトフォルダにて、以下実行
mvn spring-boot:run

ブラウザより、http://localhost:8080/api にアクセス

もうちょいほげる
postmanやjmeterなどで http://localhost:8080/api/hoge にPOSTで呼ぶ。BodyがhogeQに積まれる。

以下は100こほどjmeterで呼出た結果

3、受信側プログラム

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>sample</groupId>
  <artifactId>hoge-activemq-consumer</artifactId>
  <packaging>jar</packaging>
  <version>1.0.0-SNAPSHOT</version>
  <name>HogeActiveMQConsumer</name>
  <url>http://maven.apache.org</url>

  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.2.1.RELEASE</version>
  </parent>

  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-activemq</artifactId>
    </dependency>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-configuration-processor</artifactId>
      <optional>true</optional>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
      </plugin>
    </plugins>
  </build>
</project>

HogeApp.java

@EnableAutoConfiguration
@ComponentScan
public class HogeApp {

    public static void main(String[] args) throws Exception {
        SpringApplication.run(HogeApp.class, args);
    }
}
@Configuration
public class HogeConfig {

    @Bean
    @ConfigurationProperties(prefix = "spring.activemq")
    public ActiveMQConnectionFactory connectionFactory() {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
        return factory;
    }

    @Bean
    public DefaultJmsListenerContainerFactory myFactory(
            ConnectionFactory connectionFactory,
            DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        ExecutorService executor = Executors.newCachedThreadPool(new HogeThreadFactory("Consumer-%d"));
        factory.setTaskExecutor(executor);
        return factory;
    }

    public static class HogeThreadFactory implements ThreadFactory {
        private final AtomicInteger counter = new AtomicInteger(0);
        private final String format;

        private HogeThreadFactory(String format) {
            this.format = format;
        }

        @Override
        public Thread newThread(Runnable r) {
            String name = String.format(format, counter.incrementAndGet());
            return new Thread(null, r, name);
        }
    }
}

補足)
スレッド名が気に入らなかったので自分で指定しています。
プリフェッチを設定しています。初期値が1000なのでキューが溜まっている状態でコンシューマーを起動するとメッセージが1000以上ないと分散処理してくれないので小さくしています。

HogeConsumer.java 受信処理です。文字列で取得します。ダミーでウェイトさせています。

@Slf4j
@Component
public class HogeConsumer {

    @JmsListener(destination = "hogeQ", containerFactory = "myFactory")
    void received(String message) throws InterruptedException {
        log.info("Message received: {} ", message);
        Thread.sleep(200);
    }
}
spring.main:
  # 起動バナーなし
  banner-mode: "off"
  # 組み込みWebサーバの自動起動無効
  web-application-type: none

# ログ
logging: 
  pattern:
    console: "%d{HH:mm:ss.SSS} %thread %-5level \\(%file:%line\\) %M - %msg%n"
  level: 
    ROOT: INFO
    jp.co.ois.sample: DEBUG

# MQ
spring.activemq:
  broker-url: tcp://localhost:61616
  prefetch-policy.queue-prefetch: 10
#  broker-url: tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1

spring.jms:
  listener:
    concurrency: 5
    max-concurrency: 10

補足)
broker-url: tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1 を指定してもプリフェッチを指定できます。

 

起動して、受信してみます。

Consumerプロジェクトフォルダにて、以下実行

mvn spring-boot:run

キューからほげ受信できました。

【プロジェクト一式】

HogeActiveMQConsumer.zip

HogeActiveMQProducer.zip

 

今回はここまで、次回はRabbitMQを試します。


◆WEB会議/セミナーシステム『Szia』
https://www.ois-yokohama.co.jp/szia/

◆サーバサイドで動作するミドルウェア『ReDois』
https://www.ois-yokohama.co.jp/redois/wp_redois/

オリエンタルインフォーメイションサービス(OIS)