Using Apache Cassandra as an highly available Big Data Platform is in my eyes a good choice as Cassandra Cluster is easy to handle. We are using Apache Cassandra as an archive platform for our human centric worklfow engine Imixs-Workflow. But how does Cassandra perform with large files?
If you start with Cassandra you have to change the way how you use databases, especially when you come from the SQL direction. Although Cassandra can handle very large amounts of data easily, you have to consider the concept of the partition size. This means in short that the data within a partition (defined by the Partitionkey) should not exceed 100 MB. If you plan to store large files (e.g media files) you need to split up your data into smaller chunks. In the following I will explain in short how this can be done.
The Problem – Heap Size
Media data (e.g. videos or music files) can consist of a huge amount of data of several 100MB per each file. As Cassandra runs in the JVM, reading and writing those objects end up in the heap as byte arrays. Reading and writing those business data in a lot of concurrent requests can force situations where latency becomes an issue.
On the read path, Cassandra build up an index of CQL rows within a CQL partition. This index scales with the width of the partition ON READ. In wide CQL partitions this will create JVM GC pressure. To solve this issue and guaranty best performance ON READ and ON WRITE you need to optimize the table design.
The Table Model
The data of a large media file can be split into 2MB chunks and stored in two separate data tables:
CREATE TABLE documents ( document_id text, chunk_order int, chunk_id text, PRIMARY KEY (document_id, chunk_order)) CREATE TABLE documents_data ( chunk_id text, chunk blob, PRIMARY KEY(chunk_id))
When a new media file need to be stored, the data can be split into 2MB chunks.
Each chunk is written into the documents_data table, and the chunk_id which can be the hash of the chunk, is written into the documents table in an ordered sequence.
When now the media data should be read, you can reassemble the data chunk by chunk by querying it from the chunk table. Each piece is optimized not to overwhelm the garbage collector of the VM.
As a result of this table model the only partition here that can get large is document_id, but it’d be incredibly unlikely that it get over 100MB per partition. So there is no need to worry about the index pain on the Cassandra read path.
Splitting the Data
The following code example shows how you can implement an Interator interface to easily split your data into junks:
public class DocumentSplitter implements Iterable<byte[]> { public int CHUNK_SIZE = 2097152; // 2mb ..... @Override public Iterator<byte[]> iterator() { try { return new ChunkIterator(); } catch (JAXBException e) { e.printStackTrace(); return null; } } // Inner class to iterate the bytes in 2mb chunks private class ChunkIterator implements Iterator<byte[]> { private int cursor; private byte[] data; public ChunkIterator() { this.cursor = 0; // fetch the whole data in one array data = DocumentSplitter.this.getBytes(); } public boolean hasNext() { return this.cursor < data.length; } public byte[] next() { if (this.hasNext()) { byte[] chunk; // check byte count from cursor... if (data.length > cursor + CHUNK_SIZE) { chunk = Arrays.copyOfRange(data, cursor, cursor + CHUNK_SIZE); cursor = cursor + CHUNK_SIZE; } else { // read last junk chunk = Arrays.copyOfRange(data, cursor, data.length); cursor = data.length; } return chunk; } throw new NoSuchElementException(); } public void remove() { throw new UnsupportedOperationException(); } } .... }
With this iterator interface you can easily chunk the data form a large file:
byte[] fileData; .... DocumentSplitter splitter = new DocumentSplitter(fileData); while (it.hasNext()) { byte[] cunk = it.next(); // write 2MB chunk into cassandra.... }
Deduplication
Optional you can also deduplicate chunks to reduce the size of a chunk.
public byte[] compress(byte[] input) throws JAXBException { // Compressor with highest level of compression Deflater compressor = new Deflater(); compressor.setLevel(Deflater.BEST_COMPRESSION); // Give the compressor the data to compress compressor.setInput(input); compressor.finish(); // Create an expandable byte array to hold the compressed data. // It is not necessary that the compressed data will be smaller than // the uncompressed data. ByteArrayOutputStream bos = new ByteArrayOutputStream(input.length); // Compress the data byte[] buf = new byte[1024]; while (!compressor.finished()) { int count = compressor.deflate(buf); bos.write(buf, 0, count); } try { bos.close(); } catch (IOException e) { //.... } // Get the compressed data return bos.toByteArray(); }
Hi,
Can you please share the code, if you can help great
You may take a look here:
https://github.com/imixs/imixs-archive/blob/c25154047ec49aa72fb14a44dcbcb6372b20faf8/imixs-archive-service/src/main/java/org/imixs/archive/service/cassandra/DataService.java#L215
This is where we use a so called ‘DocumentSplitter’ to separate large files into smaller chunks written into a cassandra table. .