Cassandra Java Client

前回 Cassandra Thrift APIのConsistencyLevel と内容的に前後してしまいますが、Cassandra APIのテストコードを一応載せておきます。データ操作(取得/登録/削除)に関するAPIはひととおり使用してあるので、コードを見ればだいたいの使い方は理解できると思います。また、各APIのシグネチャはCassandraのホームディレクトリにjavadocディレクトリがあるのでそこで確認できます。
(org.apache.cassandra.thrift.CassandraServer を参照)

以下のコードは、例としてブログ記事をデータとして扱っています。
・ColumnFamily は Entry
・entry* という key は title、category、tag の3つをcolumnを持つ
[java]
import java.util.*;
import java.io.UnsupportedEncodingException;
import org.apache.cassandra.thrift.*;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;

public class TestClient {
private static final String KEYSPACE = “Keyspace1”;
private static final String COLUMN_FAMILY = “Entry”;
private TTransport transport = null;
private Cassandra.Client client = null;
TestClient() {
client = getConnection();
}
private Cassandra.Client getConnection() {
try {
transport = new TSocket(“localhost”, 9160);
TProtocol protocol = new TBinaryProtocol(transport);
Cassandra.Client client = new Cassandra.Client(protocol);
transport.open();
return client;
}catch(TTransportException e) {
e.printStackTrace();
}
return null;
}
private void closeConnection() {
try {
transport.flush();
transport.close();
}catch(TTransportException e) {
e.printStackTrace();
}
}
private void selectEntry(String key) {
try {
SlicePredicate slicePredicate = new SlicePredicate();
SliceRange sliceRange = new SliceRange();
sliceRange.setStart(new byte[] {});
sliceRange.setFinish(new byte[] {});
slicePredicate.setSlice_range(sliceRange);
ColumnParent columnParent = new ColumnParent(COLUMN_FAMILY);
List results = client.get_slice(KEYSPACE, key, columnParent, slicePredicate, ConsistencyLevel.ONE);
printData(key, results);
}catch(Exception e) {
e.printStackTrace();
}
}
private void selectAllEntry() {
try {
KeyRange keyRange = new KeyRange();
keyRange.setStart_key(“”);
keyRange.setEnd_key(“”);
SliceRange sliceRange = new SliceRange();
sliceRange.setStart(new byte[] {});
sliceRange.setFinish(new byte[] {});
SlicePredicate slicePredicate = new SlicePredicate();
slicePredicate.setSlice_range(sliceRange);
ColumnParent columnParent = new ColumnParent(COLUMN_FAMILY);
List keySlices = client.get_range_slices(KEYSPACE, columnParent, slicePredicate, keyRange, ConsistencyLevel.ONE);
for(KeySlice keySlice : keySlices) {
printData(keySlice.getKey(), keySlice.getColumns());
}
}catch(Exception e) {
e.printStackTrace();
}
}
private void insertEntry(String key, Map data) {
try {
Map> cfmap = new HashMap>();
List columns = new ArrayList();
Column column;
ColumnOrSuperColumn columnOrSuperColumn;
long timestamp = System.currentTimeMillis();
for(Map.Entry e : data.entrySet()) {
column = new Column(e.getKey().getBytes(“utf-8”), e.getValue().getBytes(“utf-8”), timestamp);
columnOrSuperColumn = new ColumnOrSuperColumn();
columnOrSuperColumn.setColumn(column);
columns.add(columnOrSuperColumn);
}
cfmap.put(COLUMN_FAMILY, columns);
client.batch_insert(KEYSPACE, key, cfmap, ConsistencyLevel.ALL);
}catch(Exception e) {
e.printStackTrace();
}
}
private void deleteEntry(String key) {
try {
ColumnPath columnPath = new ColumnPath(COLUMN_FAMILY);
client.remove(KEYSPACE, key, columnPath, System.currentTimeMillis(), ConsistencyLevel.ALL);
}catch(Exception e) {
e.printStackTrace();
}
}
private void updateEntry(String key, Map data) {
try {
Map>> mutationMap = new HashMap>>();
List mutations = new ArrayList();
Column column;
ColumnOrSuperColumn columnOrSuperColumn;
Mutation mutation;
long timestamp = System.currentTimeMillis();
for(Map.Entry e : data.entrySet()) {
column = new Column(e.getKey().getBytes(“utf-8”), e.getValue().getBytes(“utf-8”), timestamp);
columnOrSuperColumn = new ColumnOrSuperColumn();
columnOrSuperColumn.setColumn(column);
mutation = new Mutation();
mutation.setColumn_or_supercolumn(columnOrSuperColumn);
mutations.add(mutation);
}
Map> mutationsForColumnFamily = new HashMap>();
mutationsForColumnFamily.put(COLUMN_FAMILY, mutations);
mutationMap.put(key, mutationsForColumnFamily);
client.batch_mutate(KEYSPACE, mutationMap, ConsistencyLevel.ALL);
}catch(Exception e) {
e.printStackTrace();
}
}
private void printData(String key, List results) {
try {
System.out.println(“Key: ‘” + key + “‘”);
for(ColumnOrSuperColumn c : results) {
if(c.getColumn() != null) {
String name = new String(c.getColumn().getName(), “utf-8”);
String value = new String(c.getColumn().getValue(), “utf-8″);
long timestamp = c.getColumn().getTimestamp();
System.out.println(” name: ‘” + name + “‘, value: ‘” + value + “‘, timestamp: ” + timestamp);
}
}
}catch(UnsupportedEncodingException e) {
e.printStackTrace();
}
}
/**
* test cassandra api.
*/
public static void main(String args[]) {
TestClient c = new TestClient();
// insert entry1
Map data = new HashMap();
data.put(“title”, “Cassandra Java Client”);
data.put(“category”, “tech/study”);
data.put(“tag”, “database”);
c.insertEntry(“entry1”, data);
// insert entry2
data = new HashMap();
data.put(“title”, “OpenCV 2.1 is out!”);
data.put(“category”, “tech/study”);
data.put(“tag”, “cv/im”);
c.insertEntry(“entry2”, data);
// select entries
System.out.println(“select entries, using selectEntry()”);
c.selectEntry(“entry1”);
c.selectEntry(“entry2”);
// delete entry2
System.out.println(“delete entry2”);
c.deleteEntry(“entry2”);
System.out.println(“select entries, using selectEntry()”);
c.selectEntry(“entry1”);
c.selectEntry(“entry2”);
// update entry1
System.out.println(“update entry1: title, category”);
Map updateData = new HashMap();
updateData.put(“title”, “change title”);
updateData.put(“category”, “change category”);
c.updateEntry(“entry1”, updateData);
System.out.println(“select entry1, using selectEntry()”);
c.selectEntry(“entry1”);
// insert entry3
c.insertEntry(“entry3”, data);
// select all entries
System.out.println(“select all entries from Entry, using selectAllEntry()”);
c.selectAllEntry();
c.closeConnection();
}
}
[/java]

データ登録では Cassandra.Client.batch_insert を使って複数のcolumnを一度にwriteしてますが、Cassandra.Client.insert という単一のcolumnをwriteするメソッドもあります。writeするcolumnが複数ある場合、その数だけ insert を呼ぶより batch_insert で一度にwriteした方が高速です。 ただし、”Deprecated in 0.6 – use batch_mutate instead” とAPIリファレンスに書いてあるように、データ登録時も Cassandra.Client.batch_mutate の利用が推奨されているようです。

CassandraのクライアントはThriftを利用しているので多言語対応していますが、やはり個人的にはJavaから操作するのがいいかなと思います。プロトタイピングでは Net::Cassandra(Perlの場合)などでさっと作って検証してから、Javaでじっくり書き直す感じになるでしょうか。

* 関連記事
Cassandra Thrift APIのConsistencyLevel
Cassandraについて

あわせて読む:

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です