Protobuf を SQS のスキーマ管理に使ってみる
Amazon SQS のスキーマ管理を Protobuf でやってみました。
サンプルコードはこちら。
.proto ファイル
以下のような .proto
ファイルを用意します。
// proto/example.proto syntax = "proto3"; message Item { string product_id = 1; float price = 2; } message Purchase { string user_id = 1; repeated Item items = 2; // Unix time uint32 timestamp = 3; }
今回は Purchase
message を binary serialize した後、 base64 encode したものを SQS に積む方針で進めます。
AWS で SQS のリソースを作る
まずはじめに Amazon SQS のリソースを準備します。
今回は Amazon SQS のキューをひとつ作って、手元の環境からアクセスする方針で進めます(EC2 からアクセスしたりとかバックエンドを Lambda にしたりとかは 面倒 本稿のテーマとは無関係なのでやらない)。
キューをひとつ作るだけなのでマネジメントコンソールをポチポチするだけでもいいのですが、なんとなく Terraform を使ってみます。
予め SQSFullAccess
ポリシーを持ったユーザーを作っておき、その Access Key と Secret を使って aws-cli でプロファイルを作成します(AWSのIAMユーザーを作る部分は説明略)。
# Access Key を準備する部分は省略 # aws configure で sqs という profile を作る $ aws configure --profile sqs AWS Access Key ID [None]: XXXXXXXXXXXXXXXXX AWS Secret Access Key [None]: XXXXXXXXXXXXXXX Default region name [None]: ap-northeast-1 Default output format [None]: None # AWS_PROFILE を指定 $ export AWS_PROFILE=sqs
.tf
ファイルを準備します。
# terraform/terraform.tf provider "aws" { region = "ap-northeast-1" } resource "aws_sqs_queue" "queue" { name = "example-queue" } # Queue の URL を出力する output "queue_id" { value = aws_sqs_queue.queue.id }
あとは Terraform CLI を叩くだけで、 example-queue
というキューが作られて、キューの id
が出力されます。
id
は URL 形式になっており、キューを積んだり取り出したりする際に必要になります。
後々用いるので環境変数に追加しておきます。
# 初回だけ必要 $ terraform init # dry run。 実行後に AWS 上にどのような変更が行われるか確認できる $ terraform plan # 実際に AWS 上のリソースが作る $ terraform apply # => queue_id = "https://sqs.ap-northeast-1.amazonaws.com/xxxxxxxxxx/example-queue" # id を環境変数に登録しておく。ついでに AWS のリージョンも export QUEUE_URL=https://sqs.ap-northeast-1.amazonaws.com/xxxxxxxxxx/example-queue export AWS_REGION=ap-northeast-1
Terraformで作成したリソースは terraform destroy
で削除できます。使い終わったら忘れずに消しておきましょう*1。
Protobuf.js
.proto
ファイルと SQS の準備ができたので、やっと実装が始まります。
今回は TypeScript
と Protobuf.js
を用いて adapter を実装します。
Protobuf.js
はその名の通り Protobuf を取り扱うライブラリです。あくまでも Protobuf を扱うライブラリなので、 gRPC 関連の機能ほぼサポートされていませんが、これはむしろ今回の用途に適っているので好都合。
gRPC の文脈では @grpc/proto-loader
と組み合わせて dynamic codegen ((実行時に .proto
ファイルを読み込む方式))を行う際によく使われているライブラリです。
しかし、今回は Protobuf.js
が提供する CLI ツール pbjs
と pbts
を用いて static codegen ((予め .proto
ファイルを読み込んでコードを生成し、それを用いる方式)) を行います。
コード生成のコマンドはそこそこ長いので shell script にまとめておきます。
# bin/protogen.sh #!/usr/bin/env bash set -eu NODE_BIN=$(dirname $0)/../node_modules/.bin PROTO_DIR=$(dirname $0)/../proto/*.proto OUTPUT_DIR=$(dirname $0)/../generated rm -f ${OUTPUT_DIR}/*.js ${OUTPUT_DIR}/*.ts ${NODE_BIN}/pbjs --target static-module --out ${OUTPUT_DIR}/index.js ${PROTO_DIR} ${NODE_BIN}/pbts --out ${OUTPUT_DIR}/index.d.ts ${OUTPUT_DIR}/index.js
ちなみに pbts
は jsdoc を元に型定義を吐くライブラリ tsd-jsdoc
を使っているようです。なので pbjs
で --no-comment
オプションを付けてコメントを省略してしまうと .d.ts
ファイルが正しく生成されません(ちょっとしたハマりポイント)。
成功すると以下のような .d.ts
ファイルが生成されます。
// generated/index.d.ts から抜粋 import * as $protobuf from "protobufjs"; export interface IItem { productId?: string | null; price?: number | null; } export interface IPurchase { userId?: string | null; items?: IItem[] | null; timestamp?: number | null; } export class Purchase implements IPurchase { // Message class を作るやつ public static create(properties?: IPurchase): Purchase; // Message を binary serialize するやつ public static encode( message: IPurchase, writer?: $protobuf.Writer, ): $protobuf.Writer; // binary を deserialize して Message Class を得るやつ public static decode( reader: $protobuf.Reader | Uint8Array, length?: number, ): Purchase; }
pbts
は { hoge?: (T|null); }
のような undefined
を許容した型を吐きます。これでは少し不便なので undefined
を許容しない型へ変換するヘルパーを用意します。
// clinet/types.ts export type DropUndefined<T> = T extends undefined ? never : { [P in keyof T]-?: T[P] extends undefined ? never : DropUndefined<T[P]>; }; // DropUndefined<{ hoge?: string|null }> = { hoge: string|null }
DropUndefined<T>
を用いることで、値無しの項目について明示的に null
を付ける必要が出るため、意図しない代入漏れを防ぐことができます*2。
メッセージを積むやつ(client)を書く
こんな感じ
// client/index.ts import { IPurchase, Purchase } from "../generated"; import { SQS } from "aws-sdk"; import { DropUndefined } from "./types"; async function main(): Promise<void> { const QueueUrl = process.env.QUEUE_URL; if (QueueUrl === undefined) { console.error("Please set env `QUEUE_URL`"); process.exit(1); } const region = process.env.AWS_REGION ?? "ap-northeast-1"; // TS の object const rawMessage: DropUndefined<IPurchase> = { userId: "hoge", items: [{ productId: "fuga", price: 123.4 }], timestamp: new Date(0).valueOf(), }; // Protobuf.js が生成した Class const message: Purchase = Purchase.create(rawMessage); // serialize されたバイト列 const serialized: Uint8Array = Purchase.encode(message).finish(); // バイト列を base64 でエンコードして文字列にしたもの const base64String: string = Buffer.from(serialized).toString("base64"); // SQS に積む処理 await new SQS({ region }) .sendMessage({ MessageBody: base64String, QueueUrl, }) .promise(); } main() .catch(e => { console.error(e); process.exit(1); }) .then(() => { console.log("finished"); process.exit(0); });
実行するとキューにメッセージが積まれます。マネジメントコンソールや aws-cli から積まれているメッセージ数を確認すると値が増えていることがわかります。
$ yarn ts-node client # finished $ aws sqs get-queue-attributes --queue-url $QUEUE_URL --attribute-names ApproximateNumberOfMessages # { # "Attributes": { # "ApproximateNumberOfMessages": "1" # } #}
キューを処理するやつ(consumer)を書く
キューを処理するコードはこんな感じになります
// consumer/index.ts import { SQS } from "aws-sdk"; import { Purchase } from "../generated"; async function main() { const QueueUrl = process.env.QUEUE_URL; if (QueueUrl === undefined) { console.error("Please set env `QUEUE_URL`"); process.exit(1); } const region = process.env.AWS_REGION ?? "ap-northeast-1"; const sqsInstance = new SQS({ region, }); // SQS からメッセージを取り出す const sqsResult = await sqsInstance .receiveMessage({ QueueUrl, }) .promise(); if (sqsResult.Messages === undefined || sqsResult.Messages.length === 0) { console.log("no messages found"); process.exit(0); } // base64 string から Protobuf.js の Message Class を生成する const getMessage = (base64String: string): Purchase => { // Protobuf の仕様に従って serialize されたバイト列 const serialized = Uint8Array.from(Buffer.from(base64String, "base64")); // Protobuf.js の Message Class return Purchase.decode(serialized); }; const messages = sqsResult.Messages.map(m => m.Body) .filter(<T>(x: T | undefined): x is T => x !== undefined) .map(str => getMessage(str)); // 各 message について、何らかの処理を行う // 今回は console.log に適当に文を吐くだけ messages.forEach(message => { // `message.userId` 等でプロパティにアクセスできる const userId = message.userId; // 型も付く Protobuf で `uint32 timestamp = 3;` と定義したので、`message.timestamp` は number 型 const date = new Date(message.timestamp); // ただし、 Message をネストすると型が正しく付かない // message.items[n].price は `number | null | undefined` const sumPrice = message.items.reduce((sum, i) => sum + (i.price ?? 0), 0); console.log( `ユーザー(${userId})が ${date.toLocaleDateString()} に合計 ${Math.floor( sumPrice, )}円の買い物をしました`, ); }); // 取得したメッセージを消す await sqsInstance .deleteMessageBatch({ QueueUrl, Entries: sqsResult.Messages.map(m => ({ Id: m.MessageId!, ReceiptHandle: m.ReceiptHandle!, })), }) .promise(); } main() .catch(e => { console.error(e); process.exit(1); }) .then(() => { console.log("finished"); process.exit(0); });
$ yarn ts-node consumer # ユーザー(hoge)が 1970/1/1 に合計 123円の買い物をしました # finished
こんな感じで base64 string から値を復元できます。
少し気になったのは Purchase.Items
の型が IItems[]
になっている点でしょうか。
IItems
は { productId?: (string|null); price?: (number|null); }
です。
本来なら値が指定されていない項目については空文字列や 0 で初期化されているため、 { productId: string; price: number; }
とみなしてよいのですが、 null や undefined を含む型として扱われてしまい不便です。 as
で対処する等の工夫が要るかもしれません*3。
余談:serialize について
今回は Protobuf のメッセージをバイナリに serialize しました。 ですが、 Protobuf はバイナリ以外のフォーマットにも serialize 可能です。 例えば、 JSON に serialize すれば、base64 エンコードせずとも SQS に乗せることができますし、デバッグ目的で手動ポーリングする際に可読性のあるメッセージが拾えて便利です。 送受信双方のライブラリが対応しているのなら JSON を用いる選択肢も検討の余地がありそうです*4。
まとめ
いかがでしたか?(枕詞) 本記事では Protobuf を SQS のスキーマ管理に用いる例を紹介しました。 今回は client/consumer ともに TypeScript で実装しましたが、 Protobuf は language-neutral をウリにしているので、別の言語で書いたアプリケーションを使うこともできます。 node 製の BFF サーバーがメッセージを積んで、 Java や Go や Rust や Haskell で書いた job worker が処理するといった運用は実際に使う場面がありそうです。
*1:Amazon SQS は 100 万リクエスト/月 まで無料でキューの維持費もとくにないので本記事の内容を一通り試した後にキューを放置してもとくに課金されることは無い。何らかの手違いで悪意ある第三者に id が見つかってイタズラされた場合はその限りではないが
*2:oneof が使われている proto との相性はあまりよくないので万能では無いが
*3:このあたりの型定義の緩さは JavaScript を使う上で避けられない面もある。現状 TS の型定義がちゃんとしている Protobuf ライブラリが見当たらず、 client 側はまだしも consumer 側を JS/TS で書くのは面倒そうだという印象を覚えた。 Protobuf は多言語対応なので、 consumer 側については別の言語で書いてしまうほうがいいのかもしれない
*4:Protobuf.js は JSON への serialize/deseriualize 双方に対応しているようなので、少し書き換えれば JSON を積む実装に変更できる