Skip to main content

Real time data processing with Cassandra, Part 1

This is the first part of getting start with real time data processing with Cassandra. In the first part i am going to describe how to configure Hadoop, Hive and Cassandra, also some adhoc query to use new CqlStorageHandler. In the second part i will show, how to use Shark and Spark for real time fast data processing with Cassandra. I was encourage by the blog from the Data Stax, you can find out ithere. Also all the credit goes for the author of the library cassandra-handler and Alex Lui for developing the CQLCassandraStorage. Of course you can use DataStax enterprise version for the first part, Data Stax enterprise version has built in support Hive and Hadoop. In this blog post i will use all the native apache products. If you are interested in Real time data process, please check this blog.
In the first part i will use following products:
1) Hadoop 1.2.1 (Single node cluster)
2) Hive 0.9.0
3) Cassandra 1.2.6 (Single node cluster)
4) cassandra-handler 1.2.6 (depends on Hive version 0.9.1, not working on other version of Hive)
Lets first download and configure Hadoop. Please check my old post to configure Hadoop. Configuration step is same as my old post. If you will got the following error
upgrade to version -41 is required.
please run the command hadoop-daemon.sh start namenode -upgrade and restart your hadoop server.
Now lets install and configure Hive.
1) Download Hive 0.9.0 and unzip somewhere in your local machine.
2) set HIVE_HOME in your bash_profile and path env variables.
3) create data warehouse directory in HDFS
$HADOOP_HOME/bin/hadoop fs -mkdir       /user/hive/warehouse
4) set them chmod g+w
$HADOOP_HOME/bin/hadoop fs -chmod g+w   /user/hive/warehouse
5) run hive by the command $HIVE_HOME/bin/hive or just hive (if you set the $HIVE_HOME/bin in your env path)
6) Create hive database
hive> CREATE DATABASE test
7) Use the database
hive> use test
8) create local hive table in database test
hive> CREATE TABLE hpokes (foo INT, bar STRING);
9) insert some data into table
hive> LOAD DATA LOCAL INPATH '$HIVE_HOME/examples/files/kv1.txt' OVERWRITE INTO TABLE pokes;
10) Run the following command
hive> select * from hpokes;
Command should be end up with a lot query result
29 val_30
242 val_243
285 val_286
35 val_36
227 val_228
395 val_396
244 val_245
Time taken: 0.334 seconds
If something goes wrong, check you installation, i have used following quick start guide.
11) Run some analytical function query
hive> select count(*) from hpokes;
above command should start Hadoop map reduce job. Progress of the job should be shown in console and should be end up with these following messages
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2013-09-22 20:02:43,178 Stage-1 map = 0%,  reduce = 0%
2013-09-22 20:02:49,214 Stage-1 map = 100%,  reduce = 0%
2013-09-22 20:03:00,403 Stage-1 map = 100%,  reduce = 33%
2013-09-22 20:03:01,418 Stage-1 map = 100%,  reduce = 100%
Ended Job = job_201309221750_0009
MapReduce Jobs Launched: 
Job 0: Map: 1  Reduce: 1   HDFS Read: 11870 HDFS Write: 5 SUCCESS
Total MapReduce CPU Time Spent: 0 msec
OK
1004
Time taken: 40.444 seconds
Now it's time for install and run Cassandra.
12) Download Cassandra version 1.2.6 and install it by quick start guide, or you can check my previous post to get some quick start.
13) Create the following keyspace and CF
CREATE KEYSPACE test WITH replication = {
  'class': 'SimpleStrategy',
  'replication_factor': '1'
};
Of course you can create keyspace and CF from the Hive, which we will see later.
Now we have clone the project cassandra-handler from the git
14)
git clone https://github.com/milliondreams/hive.git cassandra-hive
Or you can download the zip from the git and unzip in any folder
15) In my case i have change the Hadoop core version in pom.xml, because i am using Hadoop-1.2.1 version
<dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-core</artifactId>
            <!--<version>0.20.205.0</version>-->
            <version>1.2.1</version>
            <type>jar</type>
            <scope>provided</scope>
        </dependency>
16) compile and build the project
mvn clean install
17) Copy the following libraries from /target and /target/dependencies to $HIVE_HOME/lib and $HADOOP_HOME/lib directory
cassandra-all-1.2.6.jar
apache-cassandra-1.2.6.jar
apache-cassandra-thrift-1.2.6.jar
hive-cassandra-1.2.6.jar
18) Restart hive and Hadoop.

19) Now we have to create Cassandra CF from Hive
hive> use test;
hive> CREATE EXTERNAL TABLE test.pokes(foo int, bar string)
    STORED BY 'org.apache.hadoop.hive.cassandra.cql.CqlStorageHandler'
    WITH SERDEPROPERTIES ("cql.primarykey" = "foo", "comment"="check", "read_repair_chance" = "0.2",
    "dclocal_read_repair_chance" = "0.14", "gc_grace_seconds" = "989898", "bloom_filter_fp_chance" = "0.2",
    "compaction" = "{'class' : 'LeveledCompactionStrategy'}", "replicate_on_write" = "false", "caching" = "all");
20) Lets insert some data from table hive hpokes to cassandra pokes
hive> insert into table pokes select * from hpokes;
it should start Hadoop map reduce job and, insert data from hpokes table to cassandra pokes.
Hadoop job information for Stage-0: number of mappers: 1; number of reducers: 0
2013-09-22 18:01:19,671 Stage-0 map = 0%,  reduce = 0%
2013-09-22 18:01:29,811 Stage-0 map = 100%,  reduce = 0%
2013-09-22 18:01:34,866 Stage-0 map = 100%,  reduce = 100%
Ended Job = job_201309221750_0005
1004 Rows loaded to pokes
MapReduce Jobs Launched: 
Job 0: Map: 1   HDFS Read: 11870 HDFS Write: 0 SUCCESS
Total MapReduce CPU Time Spent: 0 msec
OK
Time taken: 40.162 seconds

in my case it inserted 1004 rows in tables.
21) Now you can run any analytical query in table pokes as follows
hive> select count(*) from pokes;
above command also runs Hadoop map reduce and should return the following messages
2013-09-22 18:13:40,390 Stage-1 map = 0%,  reduce = 0%
2013-09-22 18:13:58,229 Stage-1 map = 100%,  reduce = 0%
2013-09-22 18:14:12,642 Stage-1 map = 100%,  reduce = 33%
2013-09-22 18:14:13,649 Stage-1 map = 100%,  reduce = 100%
Ended Job = job_201309221750_0007
MapReduce Jobs Launched: 
Job 0: Map: 2  Reduce: 1   HDFS Read: 830 HDFS Write: 4 SUCCESS
Total MapReduce CPU Time Spent: 0 msec
OK
1004
Time taken: 70.168 seconds
22) Now insert a few rows in Cassandra CF through CQLSH
cqlsh> insert into pokes(foo, bar) values(1000, 'test');
23) run the command from hive to find the row
hive> select * from pokes where foo=1000;
Hadoop job information for Stage-1: number of mappers: 2; number of reducers: 0
2013-09-22 18:14:52,545 Stage-1 map = 0%,  reduce = 0%
2013-09-22 18:15:02,674 Stage-1 map = 100%,  reduce = 0%
2013-09-22 18:15:07,756 Stage-1 map = 100%,  reduce = 100%
Ended Job = job_201309221750_0008
MapReduce Jobs Launched: 
Job 0: Map: 2   HDFS Read: 830 HDFS Write: 10 SUCCESS
Total MapReduce CPU Time Spent: 0 msec
OK
1000 test
Time taken: 30.891 seconds
We have reached in the end of the post. Thank you everybody to come across the blog. Happy bloging. In the next part we will install Shark and Spark for real time data processing.

If you like this article, you would also like the book

Comments

Popular posts from this blog

8 things every developer should know about the Apache Ignite caching

Any technology, no matter how advanced it is, will not be able to solve your problems if you implement it improperly. Caching, precisely when it comes to the use of a distributed caching, can only accelerate your application with the proper use and configurations of it. From this point of view, Apache Ignite is no different, and there are a few steps to consider before using it in the production environment. In this article, we describe various technics that can help you to plan and adequately use of Apache Ignite as cutting-edge caching technology. Do proper capacity planning before using Ignite cluster. Do paperwork for understanding the size of the cache, number of CPUs or how many JVMs will be required. Let’s assume that you are using Hibernate as an ORM in 10 application servers and wish to use Ignite as an L2 cache. Calculate the total memory usages and the number of Ignite nodes you have to need for maintaining your SLA. An incorrect number of the Ignite nodes can become a b...

Benchmarking high performance java collection framework

I am an ultimate fan of java high performance framework or library. Java native collection framework always works with primitive wrapper class such as Integer, Float e.t.c. Boxing and unboxing of wrapper class to primitive data type always decrease the java execution performance. Most of us, always looking for such a library or framework to works with primitive data type in collections for increasing performance of Java application. Most of the time i uses javolution framework to get better performance, however, this holiday i have read about a few new java collections frameworks and decided to do some homework benchmarking to find out, how much they could better than Java native collection framework. I have examine two new java collection framework, one of them are fastutil and another one are HPPC. For benchmarking i have used java JMH with mode Throughput. For benchmarking i took similar collection for java ArrayList, HashSet and HasMap from two above described frameworks. Col...

Apache Ignite Baseline Topology by Examples

Ignite Baseline Topology or BLT represents a set of server nodes in the cluster that persists data on disk. Where, N1-2 and N5 server nodes are the member of the Ignite clusters with native persistence which enable data to persist on disk. N3-4 and N6 server nodes are the member of the Ignite cluster but not a part of the baseline topology. The nodes from the baseline topology are a regular server node, that store's data in memory and on the disk, and also participates in computing tasks. Ignite clusters can have different nodes that are not a part of the baseline topology such as: Server nodes that are not used Ignite native persistence to persist data on disk. Usually, they store data in memory or persists data to a 3rd party database or NoSQL. In the above equitation, node N3 or N4 might be one of them. Client nodes that are not stored shared data. To better understand the baseline topology concept, let’s start at the beginning and try to understand its goal and what ...