はじめに
この記事では、MySQL を使って簡単なメッセージキューを手軽に実装する方法を解説します。
メッセージキューとは、メッセージを一時的に溜めておき、順次処理するための仕組みです。迅速なレスポンスが必要な Web アプリケーションにおいて、時間のかかる処理を非同期に行うために、バックグラウンドで順次処理していくような場合に利用できます。
簡単なメッセージキューと言っても、大規模な運用にも耐えられる程度の速度と堅牢性を持ちます。
また、ここで解説している方法で作られたメッセージキューは、弊社ウェブサービスであるニコニコ動画に最近追加されたtwitter連携機能でも利用しています。
メッセージキューを作るにあたって
今回実装するメッセージキューは
- メッセージの追加(push)を高速に行う事ができる
- メッセージの取得(pop)はある程度高速に行う事ができる
- 多くのクライアントから同時に push/pop を行っても矛盾が発生しない
- MySQL に特別な拡張を必要としない
事を目指します。
Q4M について
MySQL 上で動作するキューエンジンとしては、サイボウズ・ラボ様によって開発・保守されている Q4M が有名です。MySQL5.1 系で動作するプラガブルストレージエンジンとして実装されていて、SQL を利用してキューの操作を行う事ができます。
残念ながらニコニコ動画では MySQL 5.0 系を利用しており、Q4M の利用は諦めざるを得ませんでしたが、MySQL 自体は既に何年も運用していてノウハウがあり、それを活かしたいので、特に拡張機能を使わずに独自のメッセージキューを実装する事にしました。
InnoDB を利用
ニコニコ動画では、基本的に MySQL 組み込みのストレージエンジンである InnoDB を利用しています。
InnoDB は高度なメモリキャッシュ機構と ACID を満たすトランザクション機構を備え、パフォーマンスに優れたストレージエンジンです。
今回実装するメッセージキューは、MySQL の基本機能を利用して作るため、ストレージエンジンがある程度自由に選ぶ事ができます。今回は InnoDB を利用します。
実装方法
メッセージキューテーブルを作成
メッセージキューが保存されるテーブルとして、以下の test_queue テーブルを作成します。
CREATE TABLE test_queue (
`id` BIGINT UNSIGNED NOT NULL
AUTO_INCREMENT,
`locked_until` TIMESTAMP NOT NULL
DEFAULT "0000-00-00 00:00:00",
`data` BLOB NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB;
テーブル中のカラムはそれぞれ、
- id: プライマリキーとなる数値ID
- locked_until: ロックが無効となる日時
- data: メッセージ内容
を表します。
基本的にメッセージの内容は data カラムに収まる事を想定していますが、例えば優先度付きキューを作るためにカラムを追加する事も可能です。
メッセージの追加
メッセージキューの末尾にメッセージを追加するには、単に INSERT をするだけです。
INSERT INTO test_queue SET data = "message body";
メッセージ内容である data カラムのみ指定します。単なる INSERT 操作であるため、高速です。
また、矛盾が発生しない事はストレージエンジンによって保証されます。
メッセージの取得
メッセージキューの先頭にあるメッセージを取得するには、以下の手順をたどります。
- 先頭のメッセージをロックする
- ロックしたメッセージのIDを取得する
- ロックしたメッセージの内容を取得する
- ロックしたメッセージを削除する
一つ一つ解説します。
1. 先頭のメッセージをロックする
メッセージを処理するクライアントが複数存在する場合、ロックを行わないと1つのメッセージを2つのクライアントが同時に取得してしまう事が起こりえます。
そこで、今から処理を始めるメッセージをロックする事で、他のクライアントからは利用できないようにします。
以下の SQL を実行します。
UPDATE test_queue
SET id = LAST_INSERT_ID(id),
locked_until = NOW() + INTERVAL 10 SECOND
WHERE locked_until < NOW() ORDER BY id LIMIT 1;
なぜこの SQL が先頭のメッセージをロックする事になるのか考えてみましょう。
まず
id = LAST_INSERT_ID(id)
という部分についてですが、これはロックしたメッセージのIDを取得するためのものです。後で解説します。
locked_until = NOW() + INTERVAL 10 SECOND
は、locked_until カラムに現在時刻から10秒後の時刻を設定しています。
WHERE locked_until < NOW()
は、locked_until カラムが現在時刻より前のものを取得する事を表しています。
ORDER BY id LIMIT 1
で、id 昇順 (つまり追加された順) に並べ、先頭1件を取得しています。
locked_until カラムに10秒後の時刻を設定すると、他のクライアントは10秒間、WHERE 条件によりこのメッセージを SELECT しなくなります。このようにして、ロック機構を実現しています。
仮に、クライアントが突然終了しても、10秒後にはロックが解除され、他のクライアントによって処理される事になります。
2. ロックしたメッセージのIDを取得する
以下の SQL を実行します。
SELECT LAST_INSERT_ID();
LAST_INSERT_ID 関数 は、引数付きで呼び出された場合、内部に引数を保存した上で、引数をそのまま返します。引数なしで呼び出された場合、最後に保存された内部の引数を返します。
メッセージをロックした際に id = LAST_INSERT_ID(id) という指定を行っていましたが、そのお陰でメッセージのIDを取得する事ができます。
また、MySQL のクライアントライブラリによっては、この取得を関数で行う事ができます。
例として PHP では mysql_insert_id 関数 や PDO::lastInsertId メソッド を利用する事で取得できます。
3. ロックしたメッセージの内容を取得する
得られたメッセージのIDを使って、普通に SELECT します。
SELECT data FROM test_queue WHERE id = ?;
4. ロックしたメッセージを削除する
得られたメッセージのIDを使って、普通に DELETE します。
DELETE FROM test_queue WHERE id = ?;
通常は、この時点でレコードが削除されるため、他のクライアントからは参照できないままですが、仮にレコードが削除される前にプロセスが終了してしまっても、10秒後には新たなクライアントが処理を開始します。
以上で、メッセージキューを実装できました。
PHP での実装例
上記を元に PHP でメッセージキューを実装した例を以下で公開しています。
http://github.com/kotas/myqueue
メッセージキューの実装である MyQueue クラスのURLは以下です。
http://github.com/kotas/myqueue/blob/master/MyQueue.php
ベンチマーク
ニコニコ動画で使用している典型的な MySQL サーバー上で 1000 個のメッセージを追加(push)/取得(pop)した場合、以下のようになりました。
push
Total: 0.162686 sec.
QPS: 6146.815289 query/sec.
SPQ: 0.000163 sec/query.
pop
Total: 1.719178 sec.
QPS: 581.673348 query/sec.
SPQ: 0.001719 sec/query.
追加(push) は単なる INSERT なので高速ですが、取得は UPDATE + SELECT + DELETE という3つのクエリが含まれるため、やや低速になります(581メッセージ/秒)。
なお、ベンチマークのコードは以下にあります。
http://github.com/kotas/myqueue/blob/master/benchmark.php
おわりに
MySQL を使ったお手軽なメッセージキューの実装方法を紹介しました。
なお、この記事中に含まれるコードは全て The MIT License です。
Enjoy!
