開拓馬の厩

いろいろやる

Protobuf を SQS のスキーマ管理に使ってみる

Amazon SQS のスキーマ管理を Protobuf でやってみました。

サンプルコードはこちら。

github.com

.proto ファイル

以下のような .proto ファイルを用意します。

// proto/example.proto

syntax = "proto3";

message Item {
  string product_id = 1;
  float price = 2;
}

message Purchase {
  string user_id = 1;
  repeated Item items = 2;
  // Unix time
  uint32 timestamp = 3;
}

今回は Purchase message を binary serialize した後、 base64 encode したものを SQS に積む方針で進めます。

AWS で SQS のリソースを作る

まずはじめに Amazon SQS のリソースを準備します。 今回は Amazon SQS のキューをひとつ作って、手元の環境からアクセスする方針で進めます(EC2 からアクセスしたりとかバックエンドを Lambda にしたりとかは 面倒 本稿のテーマとは無関係なのでやらない)。 キューをひとつ作るだけなのでマネジメントコンソールをポチポチするだけでもいいのですが、なんとなく Terraform を使ってみます。

予め SQSFullAccess ポリシーを持ったユーザーを作っておき、その Access Key と Secret を使って aws-cli でプロファイルを作成します(AWSのIAMユーザーを作る部分は説明略)。

# Access Key を準備する部分は省略
# aws configure で sqs という profile を作る
$ aws configure --profile sqs
  AWS Access Key ID [None]: XXXXXXXXXXXXXXXXX
  AWS Secret Access Key [None]: XXXXXXXXXXXXXXX
  Default region name [None]: ap-northeast-1
  Default output format [None]: None

# AWS_PROFILE を指定
$ export AWS_PROFILE=sqs

.tf ファイルを準備します。

# terraform/terraform.tf

provider "aws" {
  region = "ap-northeast-1"
}

resource "aws_sqs_queue" "queue" {
  name = "example-queue"
}

# Queue の URL を出力する
output "queue_id" {
  value = aws_sqs_queue.queue.id
}

あとは Terraform CLI を叩くだけで、 example-queue というキューが作られて、キューの id が出力されます。 id は URL 形式になっており、キューを積んだり取り出したりする際に必要になります。 後々用いるので環境変数に追加しておきます。

# 初回だけ必要
$ terraform init

# dry run。 実行後に AWS 上にどのような変更が行われるか確認できる
$ terraform plan

# 実際に AWS 上のリソースが作る
$ terraform apply
# =>  queue_id = "https://sqs.ap-northeast-1.amazonaws.com/xxxxxxxxxx/example-queue"

# id を環境変数に登録しておく。ついでに AWS のリージョンも
export QUEUE_URL=https://sqs.ap-northeast-1.amazonaws.com/xxxxxxxxxx/example-queue
export AWS_REGION=ap-northeast-1

Terraformで作成したリソースは terraform destroy で削除できます。使い終わったら忘れずに消しておきましょう*1

Protobuf.js

.proto ファイルと SQS の準備ができたので、やっと実装が始まります。 今回は TypeScriptProtobuf.js を用いて adapter を実装します。

github.com

Protobuf.js はその名の通り Protobuf を取り扱うライブラリです。あくまでも Protobuf を扱うライブラリなので、 gRPC 関連の機能ほぼサポートされていませんが、これはむしろ今回の用途に適っているので好都合。 gRPC の文脈では @grpc/proto-loader と組み合わせて dynamic codegen ((実行時に .proto ファイルを読み込む方式))を行う際によく使われているライブラリです。 しかし、今回は Protobuf.js が提供する CLI ツール pbjspbts を用いて static codegen ((予め .proto ファイルを読み込んでコードを生成し、それを用いる方式)) を行います。

コード生成のコマンドはそこそこ長いので shell script にまとめておきます。

# bin/protogen.sh

#!/usr/bin/env bash
set -eu

NODE_BIN=$(dirname $0)/../node_modules/.bin
PROTO_DIR=$(dirname $0)/../proto/*.proto
OUTPUT_DIR=$(dirname $0)/../generated

rm -f ${OUTPUT_DIR}/*.js ${OUTPUT_DIR}/*.ts
${NODE_BIN}/pbjs --target static-module --out ${OUTPUT_DIR}/index.js ${PROTO_DIR}
${NODE_BIN}/pbts --out ${OUTPUT_DIR}/index.d.ts ${OUTPUT_DIR}/index.js

ちなみに pbts は jsdoc を元に型定義を吐くライブラリ tsd-jsdoc を使っているようです。なので pbjs--no-comment オプションを付けてコメントを省略してしまうと .d.ts ファイルが正しく生成されません(ちょっとしたハマりポイント)。 成功すると以下のような .d.ts ファイルが生成されます。

// generated/index.d.ts から抜粋

import * as $protobuf from "protobufjs";

export interface IItem {
  productId?: string | null;
  price?: number | null;
}
export interface IPurchase {
  userId?: string | null;
  items?: IItem[] | null;
  timestamp?: number | null;
}

export class Purchase implements IPurchase {
  // Message class を作るやつ
  public static create(properties?: IPurchase): Purchase;

  // Message を binary serialize するやつ
  public static encode(
    message: IPurchase,
    writer?: $protobuf.Writer,
  ): $protobuf.Writer;

  // binary を deserialize して Message Class を得るやつ
  public static decode(
    reader: $protobuf.Reader | Uint8Array,
    length?: number,
  ): Purchase;
}

pbts{ hoge?: (T|null); } のような undefined を許容した型を吐きます。これでは少し不便なので undefined を許容しない型へ変換するヘルパーを用意します。

// clinet/types.ts

export type DropUndefined<T> = T extends undefined
  ? never
  : {
      [P in keyof T]-?: T[P] extends undefined ? never : DropUndefined<T[P]>;
    };

// DropUndefined<{ hoge?: string|null }> = { hoge: string|null }

DropUndefined<T> を用いることで、値無しの項目について明示的に null を付ける必要が出るため、意図しない代入漏れを防ぐことができます*2

メッセージを積むやつ(client)を書く

こんな感じ

// client/index.ts

import { IPurchase, Purchase } from "../generated";
import { SQS } from "aws-sdk";
import { DropUndefined } from "./types";

async function main(): Promise<void> {
  const QueueUrl = process.env.QUEUE_URL;
  if (QueueUrl === undefined) {
    console.error("Please set env `QUEUE_URL`");
    process.exit(1);
  }
  const region = process.env.AWS_REGION ?? "ap-northeast-1";

  // TS の object
  const rawMessage: DropUndefined<IPurchase> = {
    userId: "hoge",
    items: [{ productId: "fuga", price: 123.4 }],
    timestamp: new Date(0).valueOf(),
  };

  // Protobuf.js が生成した Class
  const message: Purchase = Purchase.create(rawMessage);

  // serialize されたバイト列
  const serialized: Uint8Array = Purchase.encode(message).finish();

  // バイト列を base64 でエンコードして文字列にしたもの
  const base64String: string = Buffer.from(serialized).toString("base64");

  // SQS に積む処理
  await new SQS({ region })
    .sendMessage({
      MessageBody: base64String,
      QueueUrl,
    })
    .promise();
}

main()
  .catch(e => {
    console.error(e);
    process.exit(1);
  })
  .then(() => {
    console.log("finished");
    process.exit(0);
  });

実行するとキューにメッセージが積まれます。マネジメントコンソールや aws-cli から積まれているメッセージ数を確認すると値が増えていることがわかります。

$ yarn ts-node client

# finished

$ aws sqs get-queue-attributes --queue-url $QUEUE_URL --attribute-names ApproximateNumberOfMessages

# {
#    "Attributes": {
#        "ApproximateNumberOfMessages": "1"
#    }
#}

キューを処理するやつ(consumer)を書く

キューを処理するコードはこんな感じになります

// consumer/index.ts

import { SQS } from "aws-sdk";
import { Purchase } from "../generated";

async function main() {
  const QueueUrl = process.env.QUEUE_URL;
  if (QueueUrl === undefined) {
    console.error("Please set env `QUEUE_URL`");
    process.exit(1);
  }
  const region = process.env.AWS_REGION ?? "ap-northeast-1";

  const sqsInstance = new SQS({
    region,
  });

  // SQS からメッセージを取り出す
  const sqsResult = await sqsInstance
    .receiveMessage({
      QueueUrl,
    })
    .promise();
  if (sqsResult.Messages === undefined || sqsResult.Messages.length === 0) {
    console.log("no messages found");
    process.exit(0);
  }

  // base64 string から Protobuf.js の Message Class を生成する
  const getMessage = (base64String: string): Purchase => {
    // Protobuf の仕様に従って serialize されたバイト列
    const serialized = Uint8Array.from(Buffer.from(base64String, "base64"));
    // Protobuf.js の Message Class
    return Purchase.decode(serialized);
  };
  const messages = sqsResult.Messages.map(m => m.Body)
    .filter(<T>(x: T | undefined): x is T => x !== undefined)
    .map(str => getMessage(str));

  // 各 message について、何らかの処理を行う
  // 今回は console.log に適当に文を吐くだけ
  messages.forEach(message => {
    // `message.userId` 等でプロパティにアクセスできる
    const userId = message.userId;
    // 型も付く Protobuf で `uint32 timestamp = 3;` と定義したので、`message.timestamp` は number 型
    const date = new Date(message.timestamp);

    // ただし、 Message をネストすると型が正しく付かない
    // message.items[n].price は `number | null | undefined`
    const sumPrice = message.items.reduce((sum, i) => sum + (i.price ?? 0), 0);

    console.log(
      `ユーザー(${userId})が ${date.toLocaleDateString()} に合計 ${Math.floor(
        sumPrice,
      )}円の買い物をしました`,
    );
  });

  // 取得したメッセージを消す
  await sqsInstance
    .deleteMessageBatch({
      QueueUrl,
      Entries: sqsResult.Messages.map(m => ({
        Id: m.MessageId!,
        ReceiptHandle: m.ReceiptHandle!,
      })),
    })
    .promise();
}

main()
  .catch(e => {
    console.error(e);
    process.exit(1);
  })
  .then(() => {
    console.log("finished");
    process.exit(0);
  });
$ yarn ts-node consumer

# ユーザー(hoge)が 1970/1/1 に合計 123円の買い物をしました
# finished

こんな感じで base64 string から値を復元できます。 少し気になったのは Purchase.Items の型が IItems[] になっている点でしょうか。 IItems{ productId?: (string|null); price?: (number|null); } です。 本来なら値が指定されていない項目については空文字列や 0 で初期化されているため、 { productId: string; price: number; } とみなしてよいのですが、 null や undefined を含む型として扱われてしまい不便です。 as で対処する等の工夫が要るかもしれません*3

余談:serialize について

今回は Protobuf のメッセージをバイナリに serialize しました。 ですが、 Protobuf はバイナリ以外のフォーマットにも serialize 可能です。 例えば、 JSON に serialize すれば、base64 エンコードせずとも SQS に乗せることができますし、デバッグ目的で手動ポーリングする際に可読性のあるメッセージが拾えて便利です。 送受信双方のライブラリが対応しているのなら JSON を用いる選択肢も検討の余地がありそうです*4

まとめ

いかがでしたか?(枕詞) 本記事では Protobuf を SQS のスキーマ管理に用いる例を紹介しました。 今回は client/consumer ともに TypeScript で実装しましたが、 Protobuf は language-neutral をウリにしているので、別の言語で書いたアプリケーションを使うこともできます。 node 製の BFF サーバーがメッセージを積んで、 Java や Go や Rust や Haskell で書いた job worker が処理するといった運用は実際に使う場面がありそうです。

*1:Amazon SQS は 100 万リクエスト/月 まで無料でキューの維持費もとくにないので本記事の内容を一通り試した後にキューを放置してもとくに課金されることは無い。何らかの手違いで悪意ある第三者に id が見つかってイタズラされた場合はその限りではないが

*2:oneof が使われている proto との相性はあまりよくないので万能では無いが

*3:このあたりの型定義の緩さは JavaScript を使う上で避けられない面もある。現状 TS の型定義がちゃんとしている Protobuf ライブラリが見当たらず、 client 側はまだしも consumer 側を JS/TS で書くのは面倒そうだという印象を覚えた。 Protobuf は多言語対応なので、 consumer 側については別の言語で書いてしまうほうがいいのかもしれない

*4:Protobuf.js は JSON への serialize/deseriualize 双方に対応しているようなので、少し書き換えれば JSON を積む実装に変更できる