Cassandra0.7 counter実践編

※(2011/01/10 22:30 一部訂正があります)

ようやくここまでたどり着きました。
Cassandra0.7.0(RC4)を使った、アクセスカウンターを作ってみたいと思います。

事前準備

cassandra-cliで、カウンター用のColumnFamilyを以下の要領で作っておきます。

[default@Keyspace1] create column family Counters with comparator=UTF8Type and default_validation_class=
CounterColumnType;

sample code for increment counter by Cassandra 0.7.0(RC4)

参考資料:https://issues.apache.org/jira/secure/attachment/12459754/Partitionedcountersdesigndoc.pdf
カウンター用のサンプルコードです:

// (ボイラープレート割愛)
public class AddCounter7 {
    public static final String KEYSPACE = "Keyspace1";
    public static final String COLUMN_FAMILY = "Counters";
    /** Cassandra0.7サーバの在り処 */
    private final String CAS7_SERVER = "192.168.133.128";

    public long add() {
        long count = 0;
        TTransport transport = new TFramedTransport(new TSocket(CAS7_SERVER, 9160));
        TProtocol protocol = new TBinaryProtocol(transport);
        try {
            transport.open();
        } catch (TTransportException e) {
            throw new RuntimeException(e);
        }
        Cassandra.Client client = new Cassandra.Client(protocol);
        try {
            client.set_keyspace(KEYSPACE);
            final ByteBuffer key = ByteBuffer.wrap("key1".getBytes("UTF-8"));
            final String columnName = "counter1";
            // カウンターカラム ← New!!
            // 第2引数が増分のようです
            CounterColumn column = new CounterColumn(ByteBuffer.wrap(columnName
                    .getBytes("UTF-8")), 1);
            final ColumnParent columnParent = new ColumnParent(COLUMN_FAMILY);
            // insertではなく、addというメソッドを使います
            client.add(key, columnParent, column, ConsistencyLevel.ONE);
            // 値の取得
            final ColumnPath columnPath = new ColumnPath(COLUMN_FAMILY);
            columnPath.setColumn(columnName.getBytes("UTF8"));
            // getではなく、get_counterになります
            column = client.get_counter(key, columnPath, ConsistencyLevel.ONE).getColumn();
            count = column.getValue();
            System.out.println("count of v7:" + count);
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            try {
                transport.flush();
            } catch (TTransportException e) {
                throw new RuntimeException(e);
            } finally {
                transport.close();
            }
        }
        return count;
    }
}

sample code for increment counter by Cassandra 0.6.8

結果の比較のために、0.6で無理やり作ったカウンターも載せておきます。ColumnFamilyはデフォルトのものを使っています。
酷いコードだとは思いますが、ロックもない、CAS(Check And Swap)もないとなると、こういう書き方しかできないですよね。

// (おまじない割愛)
public class AddCounter6 {
    public static final String KEYSPACE = "Keyspace1";
    public static final String COLUMN_FAMILY = "Standard1";
    /** Cassandra0.6サーバの在り処 */
    private final String CAS6_SERVER = "192.168.133.129";

    public int add() {
        int count = 0;
        TSocket transport = new TSocket(CAS6_SERVER, 9160);
        TProtocol protocol = new TBinaryProtocol(transport);
        try {
            transport.open();
        } catch (TTransportException e) {
            throw new RuntimeException(e);
        }
        Cassandra.Client client = new Cassandra.Client(protocol);
        try {
            final String key = "sample1";
            final String columnName = "count";

            final ColumnPath columnPath = new ColumnPath(COLUMN_FAMILY);
            columnPath.setColumn(columnName.getBytes("UTF8"));
            // 値の取得
            try {
                Column column = client.get(KEYSPACE, key, columnPath,
                        ConsistencyLevel.ALL).getColumn();
                // ↓この辺もうちょっと何とかならないかととも思ったのですが、
                //  整数型のまま扱おうとすると、エンディアンとかが絡んで面倒なので
                //  へっぽこコードで押し切ります
                count = Integer.parseInt(new String(column.value, "UTF8"));
            } catch (NotFoundException e) {
                ;
            }
            count++;
            final String value = Integer.toString(count);
            // 加算した値をインサート
            client.insert(KEYSPACE, key, columnPath, value.getBytes(),
                    System.currentTimeMillis(), ConsistencyLevel.ALL);
            System.out.println("count of v6:" + count);
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            try {
                transport.flush();
            } catch (TTransportException e) {
                throw new RuntimeException(e);
            } finally {
                transport.close();
            }
        }
        return count;
    }
}

試してみよう


上記それぞれのコードを、servlet経由で呼び出せるようにした上で、JMeterを使って複数スレッドから呼び出します。

50スレッドで10回ループ、つまり、50人が10回ずつF5を押す状況想定でJMeterのスレッドグループを設定しますので、

  • 0.6のコードでは整合性の保証がないため結果として500より少ない数字となる
  • 0.7のコードでは「最終的(eventually)」に500という「整合性(consistency)」が取れる

ことを期待値としましょう。

servlet作っておきながら集計用のログを標準出力に出しているあたり、自分のダサさに眩暈。

ほんとうに「結果」だけが整合しますた


上記が、カウンター値の推移となります。
赤い線がアクセス回数。シングルスレッドであれば、この線に完全に沿うはずです。マルチスレッドであっても、InnoDBのようにatomicなlockをかけられる仕組みがあれば、追い越しを考慮して結果ログをソートしなおすなどすることで、この線に完全に沿ったデータを作り出すことができるはずです。

0.6は予想通り、ずるずると遅れをとっていきました。最も悲観的な想定をすると、カウントは10しか追加されませんものね。最終的に、カウントは偶然ジャスト256になりました(w

0.7のほうのグラフですが、なにやら階段状になっています。こちらも予想通りで、1つ1つ丁寧にカウントアップした結果を返すわけではなく、カウントされたという情報はちゃんと残っているけど、その情報はすぐに表から見えるようになるのではなく、後ろで積まれていって、時間が経ってから正しい値が見えるようになる感覚ですね。(2011/01/10 22:30訂正) ロックをかけずに更新→読み出しを行っているため、別スレッドの更新情報も込みで反映された状態で読み出されているということだと思われます。いずれにしても、最終的な(結果)整合性は取れていると言えます。

性能


今回、ConsistencyLevelをONE(最も低い整合性)でテストしているのですが、それでも単純なColumnの読み書きよりは時間がかかるようです。(しかも0.6の整合性はALL)

今回の検証でできなかったこと

より高い整合性レベル(ALL)などを採用すれば上記のグラフの階段がなだらかになるという仮定のもと、get側のみConsistencyLevelをALLにして(addのほうはONE以外にすると怒られる)検証してみたのですが、違いは見られませんでした。複数台構成にしたり、replicationを設定したりしないと差は出ないのかもしれません。(つまり、これ以上良くなることはなさそう。ONEの整合性が悪化するだけで…)

いずれにしても、即時に最新の情報が反映されるわけではなさそうなので、サービスの要求に沿うケースを選んで採用すべきだと思われます。

# とはいえ、500回のアップデートにかかった時間が僅か2秒弱(しかも、このテストでは1スレッドが毎回0.6と0.7に1回ずつ投げている)であることを勘案すると、アクセスカウンターとしての実用上の不具合は殆どない、というのが個人的な意見。

(2011/01/10 22:30訂正) 読み出し→追加→読み出し という順序で処理したときに、1回目と2回目の結果に差がちゃんと出ていれば、カウンターとしての要件は満たすのではないかと考えて試そうとしたのですが、add()の前にget_counter()をやるとなぜかエラーになり、

InvalidRequestException(why:column parameter is not optional for standard CF Counters)
	org.apache.cassandra.thrift.Cassandra$get_counter_result.read(Cassandra.java:21340)
	org.apache.cassandra.thrift.Cassandra$Client.recv_get_counter(Cassandra.java:1166)
	org.apache.cassandra.thrift.Cassandra$Client.get_counter(Cassandra.java:1139)
	example.amanar.net.cassandra7.impl.AddCounter7.add(AddCounter7.java:51)

まだ検証出来ていません。