前回 Cassandra Thrift APIのConsistencyLevel と内容的に前後してしまいますが、Cassandra APIのテストコードを一応載せておきます。データ操作(取得/登録/削除)に関するAPIはひととおり使用してあるので、コードを見ればだいたいの使い方は理解できると思います。また、各APIのシグネチャはCassandraのホームディレクトリにjavadocディレクトリがあるのでそこで確認できます。
(org.apache.cassandra.thrift.CassandraServer を参照)
以下のコードは、例としてブログ記事をデータとして扱っています。
・ColumnFamily は Entry
・entry* という key は title、category、tag の3つをcolumnを持つ
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
import java.util.*; import java.io.UnsupportedEncodingException; import org.apache.cassandra.thrift.*; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import org.apache.thrift.TException; import org.apache.thrift.transport.TTransportException; public class TestClient { private static final String KEYSPACE = "Keyspace1"; private static final String COLUMN_FAMILY = "Entry"; private TTransport transport = null; private Cassandra.Client client = null; TestClient() { client = getConnection(); } private Cassandra.Client getConnection() { try { transport = new TSocket("localhost", 9160); TProtocol protocol = new TBinaryProtocol(transport); Cassandra.Client client = new Cassandra.Client(protocol); transport.open(); return client; }catch(TTransportException e) { e.printStackTrace(); } return null; } private void closeConnection() { try { transport.flush(); transport.close(); }catch(TTransportException e) { e.printStackTrace(); } } private void selectEntry(String key) { try { SlicePredicate slicePredicate = new SlicePredicate(); SliceRange sliceRange = new SliceRange(); sliceRange.setStart(new byte[] {}); sliceRange.setFinish(new byte[] {}); slicePredicate.setSlice_range(sliceRange); ColumnParent columnParent = new ColumnParent(COLUMN_FAMILY); List<ColumnOrSuperColumn> results = client.get_slice(KEYSPACE, key, columnParent, slicePredicate, ConsistencyLevel.ONE); printData(key, results); }catch(Exception e) { e.printStackTrace(); } } private void selectAllEntry() { try { KeyRange keyRange = new KeyRange(); keyRange.setStart_key(""); keyRange.setEnd_key(""); SliceRange sliceRange = new SliceRange(); sliceRange.setStart(new byte[] {}); sliceRange.setFinish(new byte[] {}); SlicePredicate slicePredicate = new SlicePredicate(); slicePredicate.setSlice_range(sliceRange); ColumnParent columnParent = new ColumnParent(COLUMN_FAMILY); List<KeySlice> keySlices = client.get_range_slices(KEYSPACE, columnParent, slicePredicate, keyRange, ConsistencyLevel.ONE); for(KeySlice keySlice : keySlices) { printData(keySlice.getKey(), keySlice.getColumns()); } }catch(Exception e) { e.printStackTrace(); } } private void insertEntry(String key, Map<String, String> data) { try { Map<String, List<ColumnOrSuperColumn>> cfmap = new HashMap<String, List<ColumnOrSuperColumn>>(); List<ColumnOrSuperColumn> columns = new ArrayList<ColumnOrSuperColumn>(); Column column; ColumnOrSuperColumn columnOrSuperColumn; long timestamp = System.currentTimeMillis(); for(Map.Entry<String, String> e : data.entrySet()) { column = new Column(e.getKey().getBytes("utf-8"), e.getValue().getBytes("utf-8"), timestamp); columnOrSuperColumn = new ColumnOrSuperColumn(); columnOrSuperColumn.setColumn(column); columns.add(columnOrSuperColumn); } cfmap.put(COLUMN_FAMILY, columns); client.batch_insert(KEYSPACE, key, cfmap, ConsistencyLevel.ALL); }catch(Exception e) { e.printStackTrace(); } } private void deleteEntry(String key) { try { ColumnPath columnPath = new ColumnPath(COLUMN_FAMILY); client.remove(KEYSPACE, key, columnPath, System.currentTimeMillis(), ConsistencyLevel.ALL); }catch(Exception e) { e.printStackTrace(); } } private void updateEntry(String key, Map<String, String> data) { try { Map<String, Map<String, List<Mutation>>> mutationMap = new HashMap<String, Map<String, List<Mutation>>>(); List<Mutation> mutations = new ArrayList<Mutation>(); Column column; ColumnOrSuperColumn columnOrSuperColumn; Mutation mutation; long timestamp = System.currentTimeMillis(); for(Map.Entry<String, String> e : data.entrySet()) { column = new Column(e.getKey().getBytes("utf-8"), e.getValue().getBytes("utf-8"), timestamp); columnOrSuperColumn = new ColumnOrSuperColumn(); columnOrSuperColumn.setColumn(column); mutation = new Mutation(); mutation.setColumn_or_supercolumn(columnOrSuperColumn); mutations.add(mutation); } Map<String, List<Mutation>> mutationsForColumnFamily = new HashMap<String, List<Mutation>>(); mutationsForColumnFamily.put(COLUMN_FAMILY, mutations); mutationMap.put(key, mutationsForColumnFamily); client.batch_mutate(KEYSPACE, mutationMap, ConsistencyLevel.ALL); }catch(Exception e) { e.printStackTrace(); } } private void printData(String key, List<ColumnOrSuperColumn> results) { try { System.out.println("Key: '" + key + "'"); for(ColumnOrSuperColumn c : results) { if(c.getColumn() != null) { String name = new String(c.getColumn().getName(), "utf-8"); String value = new String(c.getColumn().getValue(), "utf-8"); long timestamp = c.getColumn().getTimestamp(); System.out.println(" name: '" + name + "', value: '" + value + "', timestamp: " + timestamp); } } }catch(UnsupportedEncodingException e) { e.printStackTrace(); } } /** * test cassandra api. */ public static void main(String args[]) { TestClient c = new TestClient(); // insert entry1 Map<String, String> data = new HashMap<String, String>(); data.put("title", "Cassandra Java Client"); data.put("category", "tech/study"); data.put("tag", "database"); c.insertEntry("entry1", data); // insert entry2 data = new HashMap<String, String>(); data.put("title", "OpenCV 2.1 is out!"); data.put("category", "tech/study"); data.put("tag", "cv/im"); c.insertEntry("entry2", data); // select entries System.out.println("select entries, using selectEntry()"); c.selectEntry("entry1"); c.selectEntry("entry2"); // delete entry2 System.out.println("delete entry2"); c.deleteEntry("entry2"); System.out.println("select entries, using selectEntry()"); c.selectEntry("entry1"); c.selectEntry("entry2"); // update entry1 System.out.println("update entry1: title, category"); Map<String, String> updateData = new HashMap<String, String>(); updateData.put("title", "change title"); updateData.put("category", "change category"); c.updateEntry("entry1", updateData); System.out.println("select entry1, using selectEntry()"); c.selectEntry("entry1"); // insert entry3 c.insertEntry("entry3", data); // select all entries System.out.println("select all entries from Entry, using selectAllEntry()"); c.selectAllEntry(); c.closeConnection(); } } |
データ登録では Cassandra.Client.batch_insert を使って複数のcolumnを一度にwriteしてますが、Cassandra.Client.insert という単一のcolumnをwriteするメソッドもあります。writeするcolumnが複数ある場合、その数だけ insert を呼ぶより batch_insert で一度にwriteした方が高速です。 ただし、”Deprecated in 0.6 – use batch_mutate instead” とAPIリファレンスに書いてあるように、データ登録時も Cassandra.Client.batch_mutate の利用が推奨されているようです。
CassandraのクライアントはThriftを利用しているので多言語対応していますが、やはり個人的にはJavaから操作するのがいいかなと思います。プロトタイピングでは Net::Cassandra(Perlの場合)などでさっと作って検証してから、Javaでじっくり書き直す感じになるでしょうか。
* 関連記事
Cassandra Thrift APIのConsistencyLevel
Cassandraについて