Big Data made easy with Cassandra
A friend of mine at Datastax introduced me to their new Brisk 1.0 toolkit for rapid (think 90 seconds rapid) deployment of a working Cassandra + Hadoop cluster for Big Data applications.
The release is called Brisk 1.0 and it lets a neophyte like me run CQL statement (a SQL-like grammar) just a few minutes later. CQL is a grammar for expressing SQL-like data movement.
I am going to assume the reader has little familiarity with the concept of Hadoop data node clusters and MapReduce programming architectures but is interested in how a resilient, low-cost Data Warehouse or Operational Data Store (ODS) can be implemented using Cassandra. I have embedded some links I found helpful. I hope you enjoy your first Hadoop+Cassandra application experience!
Business Intelligence using Hadoop
The process of analyzing Big Data is typically referred to as Business Intelligence (BI), Data Warehousing, Online Analytical Processing (OLAP), Decision Support System (DSS) or Extract, Transform and Load (ETL). They all use SQL queries at their core.
Hadoop, Hive, HBase and Cassandra architectures distribute the data alongside the processing power in a grid/cloud of Hadoop data nodes. They leverage a non-relational approach to database schema definition. For queries and updates, Cassandra implements a SQL-like grammar called CQL. Hive and HBase utilize HiveQL.
NOSQL is non-relational
If you have been thinking about using a NOSQL (non-relational) database for your next multi-tier web application, this brisk tutorial may be a good way for you to evaluate the distributed, fault-tolerant features of Cassandra.
There are many aspects of these non-ACID database architectures that I won't pretend to understand (yet). Consistency, Availability, Partitioning and Replication are among them. Cassandra features a neat Tunable Consistency approach that cleanly solves the data consistency issue associated with multi-master distributed databases.
http://www.julianbrowne.com/article/viewer/brewers-cap-theorem
One thing I have learned is data models have to be designed up-front and free from change before deploying new applications to production. This is because it is difficult to make changes once the data has been deployed (laid out) across your cluster nodes' storage. Data access patterns should be fixed during the design phase since it may be difficult to accommodate different data access patterns down the road.
Cassandra Concepts
Cassandra relies on a non-relational model for storing information. For each uniquely identifiable record, the Cassandra schema provides a multidimensional Map (think Java HashMap) of Keys and Values. It has roots in Google BigTable and Amazon Dynamo but distinguishes iteself from both ancestors.
Cassandra doesn't have tables like RDBMS, but one can model the traditional rows and columns table schema to Cassandra. For a given row, think of a record identified by its primary key, perhaps ordered by that primary key. Each record has a Map where the Map Keys are like columns names and the Map Values may be Objects of any pre-determined type. Map Values are implemented as byte arrays and can thus store any serializable Object.
This models a one table schema, with column access using the Cassandra Map's Keys, but what about the typical multiple table join use case? In the RDBMS world this is characterized by a schema with two or more tables related by foreign keys. The next concept is that Cassandra schema are based on multi-dimensional or nested Maps.
Take the one table schema we defined above, but let any Value be a nested Map with its own Keys and Values, like another table related by a foreign key. Cassandra actually defines a tuple with the parent primary key value, child Map Key and child Map Value to keep track of this relationship.
Any given Map Value may point to a another Map in a nested fashion. The nesting can effectively be 4 or 5 level deep so the nested Map data structure should be more than sufficient for modeling most real world schema, in a fully denormalized fashion. Cassandra defines relatioships using the SuperColumn and Column grammar elements.
Physically, Cassandra will store that related/nested table data in close proximity to the parent/driving table. I would posit this is analogous to a physical relational schema realized in a denormalized manner. I'm learning this as I go.
http://schabby.de/cassandra-getting-started/
http://arin.me/blog/wtf-is-a-supercolumn-cassandra-data-model
http://www.datastax.com/faq
http://wiki.apache.org/cassandra/HadoopSupport
Hadoop Concepts
MapReduce is the idea of using multiple workers or processors to organize themselves in a Graph of nodes, for example in a binary tree, then starting with the leaf nodes iteratively Map to lookup data (say a String) by an index, and then pass the intermediate-results up the tree to a Reduce node that aggregates or summarizes the intermediate results before performing another Map operation. This continues up to the root node of the Graph. Hadoop is a framework and concrete implementation that provides these nodes with local storage in the Hadoop File System (HDFS).
http://en.wikipedia.org/wiki/Hadoop
http://hadoop.apache.org/mapreduce/docs/current/api/org/apache/hadoop/mapreduce/InputSplit.html
http://hadoop.apache.org/common/docs/current/hdfs_design.html
http://www.michael-noll.com/tutorials/
CQL and HiveQL
HiveQL is the SQL-like grammar associated with HBase and Hive. I think you could characterize it as a way to query table-oriented data stored in the Hadoop File System (HDFS). HBase is backwards-compatible with Hive.
For data access and update, Cassandra introducesCQL for its SQL-like dialect. I saw CQL referred to as:
Thrift + Avro = CQL
Cassandra and CQL improve on HiveDB since they define and declare the Tunable Consistency properties for Cassandra-controlled data. Datastax uses a replacement filesystem for HDFS called CassandraFS. CassandraFS addresses a key challenge of data consistency compared to the HDFS approach yet they retain fundamental compliance with the HDFS API for maximum compatibility.
Cassandra also features a true peer-to-peer file system name space resolution architecture with no single point of failure. The HDFS Namenode typically provides a file path (/directory/folder/filename) lookup mechanism analogous to the UNIX filesystem directory entry and inode pattern.
http://en.wikipedia.org/wiki/Apache_Hive
http://en.wikipedia.org/wiki/HBase
http://www.roadtofailure.com/2009/10/29/hbase-vs-cassandra-nosql-battle/
Brisk 1.0 beta2
The following comes directly from the Brisk release notes.
Brisk is an open-source Hadoop and Hive distribution developed by DataStax that
utilizes Apache Cassandra for its core services and storage. Brisk provides
Hadoop MapReduce capabilities using CassandraFS, an HDFS-compatible storage
layer inside Cassandra. By replacing HDFS with CassandraFS, users are able to
leverage their current MapReduce jobs on Cassandra’s peer-to-peer,
fault-tolerant, and scalable architecture. Brisk is also able to support dual
workloads, allowing you to use the same cluster of machines for both real-time
applications and data analytics without having to move the data around between
systems.
Brisk is comprised of the following components. For component-specific information, refer to their respective release notes and documentation.
• Apache Hadoop 0.20.203.0 + (HADOOP-7172, HADOOP-5759, HADOOP-7255)
• Cassandra 0.8.1
• Apache Hive 0.7
• Apache Pig 0.8.3
http://www.datastax.com/docs/0.8/brisk/about_brisk
Cassandra in 90 seconds
First download and extract the Brisk 1.0 binary package to your Linux environment. It can be downloaded from the following URL:
http://www.datastax.com/docs/0.8/brisk/install_brisk_packages
Second, verify JAVA_HOME is pointing to your 1.6 JRE and java is in your PATH.
Third, reference the Brisk Portfolio Manager demo documentation and run these commands from the Linux bash command line to:
- start the cluster
- visit the Hadoop job console
- load historical stock quote data
- start Jetty with the sample portfolio viewer web app
- run the SQL-like HiveQL to conduct the "demo ETL". This populates new tables with summarized results.
Here is some example CQL that implements a sample Business Intelligence/ETL use case. It is found in 10_day_loss.q in the Brisk Portfolio Manager demo. To enter CQL (or HiveQL if you prefer), run hive interactively using "brisk hive" from the command line.
DROP TABLE IF EXISTS Portfolios;
create external table Portfolios(row_key string, column_name string, value string)
STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
WITH SERDEPROPERTIES ("cassandra.columns.mapping" = ":key,:column,:value",
"cassandra.ks.name" = "PortfolioDemo",
"cassandra.ks.repfactor" = "1",
"cassandra.ks.strategy" = "org.apache.cassandra.locator.SimpleStrategy",
"cassandra.cf.name" = "Portfolios" ,
"cassandra.host" = "127.0.0.1" ,
"cassandra.port" = "9160",
"cassandra.partitioner" = "org.apache.cassandra.dht.RandomPartitioner")
TBLPROPERTIES (
"cassandra.input.split.size" = "64000",
"cassandra.range.size" = "1000",
"cassandra.slice.predicate.size" = "1000");
DROP TABLE IF EXISTS StockHist;
create external table StockHist(row_key string, column_name string, value string)
STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
WITH SERDEPROPERTIES ("cassandra.ks.name" = "PortfolioDemo");
--first calculate returns
DROP TABLE IF EXISTS 10dayreturns;
CREATE TABLE 10dayreturns(ticker string, rdate string, return double)
STORED AS SEQUENCEFILE;
INSERT OVERWRITE TABLE 10dayreturns
select a.row_key ticker, b.column_name rdate, (cast(b.value as DOUBLE) - cast(a.value as DOUBLE)) ret
from StockHist a JOIN StockHist b on
(a.row_key = b.row_key AND date_add(a.column_name,10) = b.column_name);
--CALCULATE PORTFOLIO RETURNS
DROP TABLE IF EXISTS portfolio_returns;
CREATE TABLE portfolio_returns(portfolio string, rdate string, preturn double)
STORED AS SEQUENCEFILE;
INSERT OVERWRITE TABLE portfolio_returns
select row_key portfolio, rdate, SUM(b.return)
from Portfolios a JOIN 10dayreturns b ON
(a.column_name = b.ticker)
group by row_key, rdate;
--Next find worst returns and save them back to cassandra
DROP TABLE IF EXISTS HistLoss;
create external table HistLoss(row_key string, worst_date string, loss string)
STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
WITH SERDEPROPERTIES ("cassandra.ks.name" = "PortfolioDemo");
INSERT OVERWRITE TABLE HistLoss
select a.portfolio, rdate, cast(minp as string)
FROM (
select portfolio, MIN(preturn) as minp
FROM portfolio_returns
group by portfolio
) a JOIN portfolio_returns b ON (a.portfolio = b.portfolio and a.minp = b.preturn);