Cassandra – How to Handle Large Media Files

Using Apache Cassandra as an highly available  Big Data Platform is in my eyes a good choice as Cassandra Cluster is easy to handle. We are using Apache Cassandra as an archive platform for our human centric worklfow engine Imixs-Workflow. But how does Cassandra perform with large files?

If you start with Cassandra you have to change the way how you use databases, especially when you come from the SQL direction.  Although Cassandra can handle very large amounts of data easily, you have to consider the concept of the partition size. This means in short that the data within a partition (defined by the Partitionkey) should not exceed 100 MB. If you plan to store large files (e.g media files) you need to split up your data into smaller chunks. In the following I will explain in short how this can be done.

The Problem – Heap Size

Media data (e.g. videos or music files) can consist of a huge amount of data of several 100MB per each file. As Cassandra runs in the JVM, reading and writing those objects end up in the heap as byte arrays. Reading and writing those business data in a lot of concurrent requests can force situations where latency becomes an issue.

On the read path, Cassandra build up an index of CQL rows within a CQL partition. This index scales with the width of the partition ON READ. In wide CQL partitions this will create JVM GC pressure. To solve this issue and guaranty best performance ON READ and ON WRITE you need to optimize the table design.

The Table Model

The data of a large media file can be split into 2MB chunks and stored in two separate data tables:

CREATE TABLE documents (
document_id text,
chunk_order int,
chunk_id text,
PRIMARY KEY (document_id, chunk_order))

CREATE TABLE documents_data (
chunk_id text, 
chunk blob,
PRIMARY KEY(chunk_id))

When a new media file need to be stored, the data can be split into 2MB chunks.
Each chunk is written into the documents_data table, and the chunk_id which can be the hash of the chunk, is written into the documents table in an ordered sequence.

When now the media data should be read, you can reassemble the data chunk by chunk by querying it from the chunk table. Each piece is optimized not to overwhelm the garbage collector of the VM.

As a result of this table model the only partition here that can get large is document_id, but it’d be incredibly unlikely that it get over 100MB per partition. So there is no need to worry about the index pain on the Cassandra read path.

Splitting the Data

The following code example shows how you can implement an Interator interface to easily split your data into junks:

public class DocumentSplitter implements Iterable<byte[]> {

public int CHUNK_SIZE = 2097152; // 2mb

.....

  @Override
  public Iterator<byte[]> iterator() {
    try {
      return new ChunkIterator();
    } catch (JAXBException e) {
      e.printStackTrace();
      return null;
    }
  }

  // Inner class to iterate the bytes in 2mb chunks
  private class ChunkIterator implements Iterator<byte[]> {
  private int cursor;
  private byte[] data;

  public ChunkIterator() {
    this.cursor = 0;
    // fetch the whole data in one array
    data = DocumentSplitter.this.getBytes();
    }

  public boolean hasNext() {
    return this.cursor < data.length;
  }

  public byte[] next() {
    if (this.hasNext()) {
    byte[] chunk;
    // check byte count from cursor...
    if (data.length > cursor + CHUNK_SIZE) {
      chunk = Arrays.copyOfRange(data, cursor, cursor + CHUNK_SIZE);
      cursor = cursor + CHUNK_SIZE;
    } else {
      // read last junk
      chunk = Arrays.copyOfRange(data, cursor, data.length);
      cursor = data.length;
    }
    return chunk;
    }
    throw new NoSuchElementException();
  }

  public void remove() {
    throw new UnsupportedOperationException();
  }
}
....
}

With this iterator interface you can easily chunk the data form a large file:

byte[] fileData;
....
DocumentSplitter splitter = new DocumentSplitter(fileData);

while (it.hasNext()) {
  byte[] cunk = it.next();
  // write 2MB chunk into cassandra....
}

Deduplication

Optional you can also deduplicate chunks to reduce the size of a chunk.

public byte[] compress(byte[] input) throws JAXBException {
  // Compressor with highest level of compression
  Deflater compressor = new Deflater();
  compressor.setLevel(Deflater.BEST_COMPRESSION);

  // Give the compressor the data to compress
  compressor.setInput(input);
  compressor.finish();

  // Create an expandable byte array to hold the compressed data.
  // It is not necessary that the compressed data will be smaller than
  // the uncompressed data.
  ByteArrayOutputStream bos = new ByteArrayOutputStream(input.length);

  // Compress the data
  byte[] buf = new byte[1024];
  while (!compressor.finished()) {
    int count = compressor.deflate(buf);
    bos.write(buf, 0, count);
  }
  try {
    bos.close();
  } catch (IOException e) {
    //....
  }

  // Get the compressed data
  return bos.toByteArray();
}

 

 

3 Replies to “Cassandra – How to Handle Large Media Files”

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.