前回 Cassandra Thrift APIのConsistencyLevel と内容的に前後してしまいますが、Cassandra APIのテストコードを一応載せておきます。データ操作(取得/登録/削除)に関するAPIはひととおり使用してあるので、コードを見ればだいたいの使い方は理解できると思います。また、各APIのシグネチャはCassandraのホームディレクトリにjavadocディレクトリがあるのでそこで確認できます。
(org.apache.cassandra.thrift.CassandraServer を参照)
以下のコードは、例としてブログ記事をデータとして扱っています。
・ColumnFamily は Entry
・entry* という key は title、category、tag の3つをcolumnを持つ
[java]
import java.util.*;
import java.io.UnsupportedEncodingException;
import org.apache.cassandra.thrift.*;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
public class TestClient {
private static final String KEYSPACE = “Keyspace1”;
private static final String COLUMN_FAMILY = “Entry”;
private TTransport transport = null;
private Cassandra.Client client = null;
TestClient() {
client = getConnection();
}
private Cassandra.Client getConnection() {
try {
transport = new TSocket(“localhost”, 9160);
TProtocol protocol = new TBinaryProtocol(transport);
Cassandra.Client client = new Cassandra.Client(protocol);
transport.open();
return client;
}catch(TTransportException e) {
e.printStackTrace();
}
return null;
}
private void closeConnection() {
try {
transport.flush();
transport.close();
}catch(TTransportException e) {
e.printStackTrace();
}
}
private void selectEntry(String key) {
try {
SlicePredicate slicePredicate = new SlicePredicate();
SliceRange sliceRange = new SliceRange();
sliceRange.setStart(new byte[] {});
sliceRange.setFinish(new byte[] {});
slicePredicate.setSlice_range(sliceRange);
ColumnParent columnParent = new ColumnParent(COLUMN_FAMILY);
List
printData(key, results);
}catch(Exception e) {
e.printStackTrace();
}
}
private void selectAllEntry() {
try {
KeyRange keyRange = new KeyRange();
keyRange.setStart_key(“”);
keyRange.setEnd_key(“”);
SliceRange sliceRange = new SliceRange();
sliceRange.setStart(new byte[] {});
sliceRange.setFinish(new byte[] {});
SlicePredicate slicePredicate = new SlicePredicate();
slicePredicate.setSlice_range(sliceRange);
ColumnParent columnParent = new ColumnParent(COLUMN_FAMILY);
List
for(KeySlice keySlice : keySlices) {
printData(keySlice.getKey(), keySlice.getColumns());
}
}catch(Exception e) {
e.printStackTrace();
}
}
private void insertEntry(String key, Map
try {
Map
List
Column column;
ColumnOrSuperColumn columnOrSuperColumn;
long timestamp = System.currentTimeMillis();
for(Map.Entry
column = new Column(e.getKey().getBytes(“utf-8”), e.getValue().getBytes(“utf-8”), timestamp);
columnOrSuperColumn = new ColumnOrSuperColumn();
columnOrSuperColumn.setColumn(column);
columns.add(columnOrSuperColumn);
}
cfmap.put(COLUMN_FAMILY, columns);
client.batch_insert(KEYSPACE, key, cfmap, ConsistencyLevel.ALL);
}catch(Exception e) {
e.printStackTrace();
}
}
private void deleteEntry(String key) {
try {
ColumnPath columnPath = new ColumnPath(COLUMN_FAMILY);
client.remove(KEYSPACE, key, columnPath, System.currentTimeMillis(), ConsistencyLevel.ALL);
}catch(Exception e) {
e.printStackTrace();
}
}
private void updateEntry(String key, Map
try {
Map
List
Column column;
ColumnOrSuperColumn columnOrSuperColumn;
Mutation mutation;
long timestamp = System.currentTimeMillis();
for(Map.Entry
column = new Column(e.getKey().getBytes(“utf-8”), e.getValue().getBytes(“utf-8”), timestamp);
columnOrSuperColumn = new ColumnOrSuperColumn();
columnOrSuperColumn.setColumn(column);
mutation = new Mutation();
mutation.setColumn_or_supercolumn(columnOrSuperColumn);
mutations.add(mutation);
}
Map
mutationsForColumnFamily.put(COLUMN_FAMILY, mutations);
mutationMap.put(key, mutationsForColumnFamily);
client.batch_mutate(KEYSPACE, mutationMap, ConsistencyLevel.ALL);
}catch(Exception e) {
e.printStackTrace();
}
}
private void printData(String key, List
try {
System.out.println(“Key: ‘” + key + “‘”);
for(ColumnOrSuperColumn c : results) {
if(c.getColumn() != null) {
String name = new String(c.getColumn().getName(), “utf-8”);
String value = new String(c.getColumn().getValue(), “utf-8″);
long timestamp = c.getColumn().getTimestamp();
System.out.println(” name: ‘” + name + “‘, value: ‘” + value + “‘, timestamp: ” + timestamp);
}
}
}catch(UnsupportedEncodingException e) {
e.printStackTrace();
}
}
/**
* test cassandra api.
*/
public static void main(String args[]) {
TestClient c = new TestClient();
// insert entry1
Map
data.put(“title”, “Cassandra Java Client”);
data.put(“category”, “tech/study”);
data.put(“tag”, “database”);
c.insertEntry(“entry1”, data);
// insert entry2
data = new HashMap
data.put(“title”, “OpenCV 2.1 is out!”);
data.put(“category”, “tech/study”);
data.put(“tag”, “cv/im”);
c.insertEntry(“entry2”, data);
// select entries
System.out.println(“select entries, using selectEntry()”);
c.selectEntry(“entry1”);
c.selectEntry(“entry2”);
// delete entry2
System.out.println(“delete entry2”);
c.deleteEntry(“entry2”);
System.out.println(“select entries, using selectEntry()”);
c.selectEntry(“entry1”);
c.selectEntry(“entry2”);
// update entry1
System.out.println(“update entry1: title, category”);
Map
updateData.put(“title”, “change title”);
updateData.put(“category”, “change category”);
c.updateEntry(“entry1”, updateData);
System.out.println(“select entry1, using selectEntry()”);
c.selectEntry(“entry1”);
// insert entry3
c.insertEntry(“entry3”, data);
// select all entries
System.out.println(“select all entries from Entry, using selectAllEntry()”);
c.selectAllEntry();
c.closeConnection();
}
}
[/java]
データ登録では Cassandra.Client.batch_insert を使って複数のcolumnを一度にwriteしてますが、Cassandra.Client.insert という単一のcolumnをwriteするメソッドもあります。writeするcolumnが複数ある場合、その数だけ insert を呼ぶより batch_insert で一度にwriteした方が高速です。 ただし、”Deprecated in 0.6 – use batch_mutate instead” とAPIリファレンスに書いてあるように、データ登録時も Cassandra.Client.batch_mutate の利用が推奨されているようです。
CassandraのクライアントはThriftを利用しているので多言語対応していますが、やはり個人的にはJavaから操作するのがいいかなと思います。プロトタイピングでは Net::Cassandra(Perlの場合)などでさっと作って検証してから、Javaでじっくり書き直す感じになるでしょうか。
* 関連記事
Cassandra Thrift APIのConsistencyLevel
Cassandraについて