1
/
5

Alpakka で AWS SQS 接続 (Scala)

はじめに
Alpakka

・Alpakka とは
・セットアップ
AWS SQS
・AWS SQS とは
・ElasticMQ のセットアップ
Alpakka で AWS SQS 接続
・Subscribe
・Publish
さいごに

はじめに

こんにちは、オープンエイトの山崎です。
今回は、Akka Streams で Alpakka を使って AWS SQS と接続しデータを送受信する方法について説明します。また本記事内では AWS SQS をローカル環境でシミュレートするために ElasticMQ を使用します。ElasticMQ のセットアップについてもあわせて簡単に説明します。記事執筆時の環境は以下の通りです。

  • Scala 2.13.5
  • sbt 1.4.7
  • Akka 2.6.13
  • Alpakka SQS 2.0.2
  • ElasticMQ 1.1.0

オープンエイトでは、SNS 配信効果測定サービス Insight BRAIN において内部のジョブ管理に SQS を利用していますが、その接続処理において実際に Akka Streams と Alpakka を使用しています。

Alpakka/Alpakka とは

Akka Streams で様々なリソースと接続するためのコンポーネント群を提供するプロジェクトが Alpakka です。Akka の公式プロジェクトとして開発・提供されています。たとえばローカルのテキストファイルをストリーム処理する Source や、処理結果を S3 に保存する Flow など、様々な入力ソースや出力先を Akka Streams のコンポーネントとして汎用的に使えるかたちにしてくれています。

Alpakka Documentation
https://doc.akka.io/docs/alpakka/current/

外部サービスとの連携処理が簡単に構築できてしまうのでうまく Alpakka を活用できれば開発効率の向上が見込めるでしょう。

今回はその中から AWS SQS 用のモジュールを使用します。

AWS SQS • Alpakka Documentation
https://doc.akka.io/docs/alpakka/current/sqs.html

ちなみに「Alpakka」という名前は動物のアルパカから取っているものと思われます。アルパカは英語の綴りだと「Alpaca」ですがフィンランドノルウェーでは「Alpakka」と表記するようです。「Akka」という単語が含まれているためこの名前が採用されたのかなと思うのですが、ドキュメントでもそのあたりは特に説明されてなさそうなので正確なところはわかりませんね…。

Alpakka/Alpakka とは

Akka Streams で様々なリソースと接続するためのコンポーネント群を提供するプロジェクトが Alpakka です。Akka の公式プロジェクトとして開発・提供されています。たとえばローカルのテキストファイルをストリーム処理する Source や、処理結果を S3 に保存する Flow など、様々な入力ソースや出力先を Akka Streams のコンポーネントとして汎用的に使えるかたちにしてくれています。

Alpakka Documentation
https://doc.akka.io/docs/alpakka/current/

外部サービスとの連携処理が簡単に構築できてしまうのでうまく Alpakka を活用できれば開発効率の向上が見込めるでしょう。

今回はその中から AWS SQS 用のモジュールを使用します。

AWS SQS • Alpakka Documentation
https://doc.akka.io/docs/alpakka/current/sqs.html

ちなみに「Alpakka」という名前は動物のアルパカから取っているものと思われます。アルパカは英語の綴りだと「Alpaca」ですがフィンランドノルウェーでは「Alpakka」と表記するようです。「Akka」という単語が含まれているためこの名前が採用されたのかなと思うのですが、ドキュメントでもそのあたりは特に説明されてなさそうなので正確なところはわかりませんね…。

Alpakka/セットアップ

Alpakka は接続対象ごとにモジュールが分かれているので必要なものだけを選んで導入します。たとえば今回は SQS モジュールが必要なので

libraryDependencies

を以下のように記述します。

AWS SQS/AWS SQS とは

Amazon SQS(サーバーレスアプリのためのメッセージキューサービス)| AWS
https://aws.amazon.com/jp/sqs/

SQS (Simple Queue Service) は AWS が提供するマネージド型のメッセージキューサービスです。AWS では MQ、Kinesis、MSK など複数のメッセージキュー系サービスが提供されていますが、名前の通り最もシンプルな機能を提供しているのが SQS です。プロプライエタリなサービスであり AWS にロックインされる点はネックですが、低コストで利用できる上に単純なユースケースなら大体 SQS でカバーできるのではと思います。

AWS SQS/ElasticMQ のセットアップ

SQS は AWS のサービスなので、開発用にローカル環境で動作させるといったことができないのですが、SQS 互換 API を提供する ElasticMQ というアプリケーションを利用することで擬似的にローカル環境で SQS を動作させることができます。

softwaremill/elasticmq: In-memory message queue with an Amazon SQS-compatible interface. Runs stand-alone or embedded.
https://github.com/softwaremill/elasticmq

キューのデータはインメモリで扱われるため永続化されず ElasticMQ サービスを終了すると消えてしまいます。なのであくまでも開発用のツールという位置づけで捉えておくのがよさそうです。

ElasticMQ は Alpakka SQS のテストでも利用されているようです。我々も開発環境で利用していますが、これまでのところ実際の SQS と挙動が異なって困ったというようなことは起きていません。

  • ちなみに ElasticMQ 自体は Scala 製です。ただし Docker イメージでも提供されているため特に Scala を意識せずに利用できます。Jar 単体で実行したりライブラリとしてアプリケーションに組み込んだり様々な利用方法が提供されていますが、今回は Docker イメージを利用することにします。
    docker-compose.ymlを作成し以下のように記述します。

設定ファイル

elasticmq.conf

を以下のように作成します。今回は

test-queue.fifo

という名前で FIFO キューを作成します。

コンテナが起動すると

test-queue.fifo

というキューが自動的に作成され

localhost:9324

で接続可能な状態になります。そのエンドポイントを指定すれば AWSCLI などを使って SQS として扱うことが可能です。

ちなみに、おそらくバグだと思うのですが、現行バージョンだと接続 URL の AWS アカウントの部分は

queue

という文字列でないとうまく動作しないようです。たとえば

elasticmq.conf

aws.accountId = aws01

のように記述すれば

http://localhost:9324/aws01/test-queue.fifo

で接続できそうに思えるのですが手元の環境では正しく動作しませんでした。

Alpakka で AWS SQS 接続

準備が整ったので実際に Alpakka で SQS (ElasticMQ) に接続してみます。を生成して SQS 接続情報を設定します。まず最初に

SqsAsyncClient

を生成して SQS 接続情報を設定します。

(1) 今回は本物の SQS ではなくローカルの ElasticMQ への接続なのでアクセスキーとシークレットキーは適当な値を記述しておきます。
(2)

endpointOverride

でエンドポイントを指定することで接続先を AWS ではなくローカルに向けることができます。

Subscribe

SQS からメッセージをストリーム取得するには SqsSource を使用します。このとき先程生成した SqsAsyncClient が implicit で参照されます。この SqsSource はその名の通り Akka Streams の Source なので Flow や Sink とつないでデータフローを構築できます。

設定等に問題がなければこれで SQS と接続された状態になります。試しにこのアプリケーションを実行した状態で AWSCLI を使ってメッセージを送信してみます。グループ ID が必要になるのですがとりあえず適当な値で大丈夫です。

成功すると Scala アプリケーション側のコンソールに received: hello と表示されるはずです。

ちなみにこのままだと SQS 上のメッセージは消えずに残り続けます。なので少し時間が経つとまた同じメッセージを受信してしまいます。一度受信した処理済みのメッセージを削除するには ACK メッセージを送ってメッセージを削除する必要があります。その場合、以下のように SqsAckFlow や SqsAckSink を使用します。

これで一度受信したメッセージは SQS 上から削除されます。

Publish

SQS にメッセージを送信する場合は SqsPublishSink や SqsPublishFlow を使います。

これを実行すると SQS にメッセージが 1 件送信されます。AWS CLI で確認することができます。

# AWS CLI でキューからメッセージを取得してみる
aws sqs receive-message \
--queue-url "http://localhost:9324/queue/test-queue.fifo" \
--endpoint-url "http://localhost:9324"
{
"Messages": [
{
"MessageId": "5be5527a-ec4e-4f3f-8ad2-67a713938673",
"ReceiptHandle": "5be5527a-ec4e-4f3f-8ad2-67a713938673#2406b392-60b7-4019-b3b9-4a62620bdbcc",
"MD5OfBody": "5d41402abc4b2a76b9719d911017c592",
"Body": "hello"
}
]
}

さいごに

Akka Streams はロジックを Source、Flow、Sink といったかたちでモジュール化して扱うことができるためモジュールの組み合わせでデータフローを構築できる点が非常に強力だと感じています。Alpakka は SQS 接続以外にも様々なモジュールを提供しているので、要件がマッチすれば外部サービスとのデータ連携部分は Alpakka で手軽に構築してしまうことができます。その分ビジネスロジックの開発に集中できるので、うまく活用することで開発効率の向上が図れるのではないでしょうか。

ということで、今回は以上となります。非常に駆け足の内容ではありますが本記事が Akka Streams 活用の一助となれば幸いです。最後までお読みいただきありがとうございました。


Akka実践バイブル アクターモデルによる並行・分散システムの実現

Scalaスケーラブルプログラミング第3版

株式会社オープンエイト's job postings

Weekly ranking

Show other rankings
If this story triggered your interest, go ahead and visit them to learn more