Skip to main content

Real time data processing with Cassandra, Part 2

The last few months I was busy with our new telecommunication project to develop MNP (Mobile number portability) for the Russian Federation. Now in Russia anybody can change their telephone operator without changing the number, it's a another history for another blog. Today I have found a few hours to keep my promise. In this blog, I will try to describe how to configure and manage spark pseudo cluster with shark for real time data processing. In the previous blog I will show how to use hive with Hadoop to process data from Cassandra. For whom, who doesn't familiar, Spark is execution engines that supports cyclic data flow and in-memory computing, in the otherhand Shark is an open source distributed SQL query engine for Hadoop data. It brings state-of-the-art performance and advanced analytics to Hive users. I am going to use following open source projects to configure and run the spark + shark cluster :
1) Scala-2.10.3
2) Spark-0.9.0-incubating-bin-hadoop1
3) Shark-0.9.0
4) Hive-0.11.0-bin-shark
6) Jdk 1.7
7) Hadoop-1.2.1
8) Cassandra-1.2.7
9) Cash (Hive Cassandra handler)

I have used my old Hadoop cluster from the previous post, for configure Hadoop cluster, please check my post link above. First, we have to download and install Scala-2.10.3 locally.

Setup Spark:

1) Unzip the downloaded bundle and add the SCALA_HOME to your environment variables. We will use Scala to build Spark and Shark.
2) Download Spark and unzip in any preferable folder.
3) From the home directory of Spark, run sbt/sbt assemble, you can go for coffee break. It will take more than 10 minutes to compile and build. If something goes wrong with build please
4) copy and rename the $SPARK_HOME/conf/spark-env.sh.templete file to $SPARK_HOME/conf/spark-env.sh by command cp spark-env.sh.templete spark-env.sh
5) add JAVA_HOME variable to spark-env.sh export JAVA_HOME=/System/Library/Frameworks/JavaVM.framework/Versions/1.7/Home
6) chmod +x spark-env.sh
7) Run spark master by $SPARK_HOME/sbin/start-master.sh
In the out put console you should found the following lines:
starting org.apache.spark.deploy.master.Master, logging to /Users/samim/Development/NoSQL/spark/spark-0.9.0-incubating-bin-hadoop1/sbin/../logs/spark-samim-org.apache.spark.deploy.master.Master-1-Shamim-2.local.out
With cat command you will get the following informations:
14/04/05 16:01:08 INFO Remoting: Starting remoting
14/04/05 16:01:09 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkMaster@Shamim-2.local:7077]
14/04/05 16:01:09 INFO Master: Starting Spark master at spark://Shamim-2.local:7077
14/04/05 16:01:09 INFO MasterWebUI: Started Master web UI at http://192.168.1.46:8080
14/04/05 16:01:09 INFO Master: I have been elected leader! New state: ALIVE
14/04/05 16:01:17 INFO Master: Registering worker Shamim-2.local:1617 with 2 cores, 3.0 GB RAM
In my case http://192.168.1.46:8080 will be web UI and the spark://Shamim-2.local:7077 will be the Spark master uri.
8) Now start the worker slave
$SPARK_HOME/sbin/start-slaves.sh
In the console you should find the log file location as follows:
localhost: starting org.apache.spark.deploy.worker.Worker, logging to /Users/samim/Development/NoSQL/spark/spark-0.9.0-incubating-bin-hadoop1/sbin/../logs/spark-samim-org.apache.spark.deploy.worker.Worker-1-Shamim-2.local.out
Point you browser to http://192.168.1.46:8080 and you should find the following page.


Now Spark is ready for use.
Setup Shark:
1) Download Hive from here.
2) Unzip hive
3) Download Cash (Hive Cassandra handler)
4) Unzip and build with maven. mvn clean package.
5) Copy target/*jar and target/dependency/cassandra-*.jar to $HIVE_HOME/lib
6) Download Shark from github
7) Unzip the archive and run $SHARK_HOME/sbt/sbt package
8) Copy and rename $SHARK_HOME/conf/shark-env.sh.template to $SHARK_HOME/conf/shark-env
9) chmod +x $SHARK_HOME/conf/shark-env.sh
10) Add follwing informations to the shark-env.sh
export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.7.0_09.jdk/Contents/Home
export SPARK_MEM=1g
# (Required) Set the master program's memory
export SHARK_MASTER_MEM=1g
# (Required) Point to your Scala installation.
export SCALA_HOME="/Users/samim/Development/scala/scala-2.10.3"
# (Required) Point to the patched Hive binary distribution
export HIVE_HOME="/Users/samim/Development/NoSQL/hive/hive-0.11.0-bin-shark-0.9.0"
# (Optional) Specify the location of Hive's configuration directory. By default,
# it points to $HIVE_HOME/conf
#export HIVE_CONF_DIR="$HIVE_HOME/conf"
# For running Shark in distributed mode, set the following:
#export HADOOP_HOME=""
export SPARK_HOME="/Users/samim/Development/NoSQL/spark/spark-0.9.0"
export MASTER="spark://Shamim-2.local:7077"

You don't need to add the Hadoop_home but hadoop data node will be need for Hive.
Now run shark with follwing commands:
$SHARK_HOME/bin/shark
If every thing goes well you will get the shark prompt
Logging initialized using configuration in jar:file:/Users/samim/Development/NoSQL/shark/shark-0.9.0/lib_managed/jars/edu.berkeley.cs.shark/hive-common/hive-common-0.11.0-shark.jar!/hive-log4j.properties
Hive history file=/tmp/samim/hive_job_log_samim_1966@Shamim-2.local_201404051958_1374334454.txt
2014-04-05 19:58:36.917 java[1966:1903] Unable to load realm info from SCDynamicStore
8.912: [GC 279616K->14731K(1013632K), 0.0679980 secs]
9.869: [Full GC 77899K->8724K(1013632K), 0.1434890 secs]
Reloading cached RDDs from previous Shark sessions... (use -skipRddReload flag to skip reloading)
10.806: [Full GC 86744K->13497K(1013632K), 0.1476060 secs]
12.345: [Full GC 85586K->19626K(1013632K), 0.2058780 secs]
13.053: [Full GC 47205K->9867K(1013632K), 0.2303530 secs]
13.287: [Full GC 14620K->9906K(1013632K), 0.1734460 secs]
18.762: [Full GC 85934K->16264K(1013632K), 0.2141430 secs]
25.049: [Full GC 237505K->22715K(1013632K), 0.3878240 secs]

30.287: [Full GC 83618K->19037K(1013632K), 0.3781930 secs]

shark>

Now lets play with Shark
1) We assume that Cassnadra node is up and runing (see part 1)
2) Create data base test in Shark
shark> create database test;
3) Create External table in Shark
shark> CREATE EXTERNAL TABLE test.pokes(foo int, bar string)
    STORED BY 'org.apache.hadoop.hive.cassandra.cql.CqlStorageHandler'
    WITH SERDEPROPERTIES ("cql.primarykey" = "foo", "comment"="check", "read_repair_chance" = "0.2",
    "dclocal_read_repair_chance" = "0.14", "gc_grace_seconds" = "989898", "bloom_filter_fp_chance" = "0.2",
    "compaction" = "{'class' : 'LeveledCompactionStrategy'}", "replicate_on_write" = "false", "caching" = "all");

4) run some command
shark> select count(*) from pokes;
185.022: [Full GC 106345K->24340K(1013632K), 0.2817830 secs]
189.722: [Full GC 228461K->30033K(1013632K), 0.3228080 secs]
OK
427
Time taken: 16.106 seconds
5) Now create cache table for pokes:
shark> CREATE TABLE pokes_cache TBLPROPERTIES ("shark.cache" = "true") AS SELECT * FROM test.pokes;
6) Run query
shark> select count(*) from pokes_cache;
OK
427
Time taken: 2.158 seconds
Only 2 seconds.
From the spark master web UI you can anyalize the stage running for every query
All the credit goes for Spark, Shark teams also to brian Oneil for this encourging blog.

References:
1) Shark on Cassandra (w/ Cash) : Interrogating cached data from C* using HiveQL
2) Spark
3) Building Shark from Source Code

Comments

Popular posts from this blog

Send e-mail with attachment through OSB

Oracle Service Bus (OSB) contains a good collection of adapter to integrate with any legacy application, including ftp, email, MQ, tuxedo. However e-mail still recognize as a stable protocol to integrate with any application asynchronously. Send e-mail with attachment is a common task of any business process. Inbound e-mail adapter which, integrated with OSB support attachment but outbound adapter doesn't. This post is all about sending attachment though JavaCallout action. There are two ways to handle attachment in OSB: 1) Use JavaCallout action to pass the binary data for further manipulation. It means write down a small java library which will get the attachment and send the e-mail. 2) Use integrated outbound e-mail adapter to send attachment, here you have to add a custom variable named attachment and assign the binary data to the body of the attachment variable. First option is very common and easy to implement through javax.mail api, however a much more developer manage t

Tip: SQL client for Apache Ignite cache

A new SQL client configuration described in  The Apache Ignite book . If it got you interested, check out the rest of the book for more helpful information. Apache Ignite provides SQL queries execution on the caches, SQL syntax is an ANSI-99 compliant. Therefore, you can execute SQL queries against any caches from any SQL client which supports JDBC thin client. This section is for those, who feels comfortable with SQL rather than execute a bunch of code to retrieve data from the cache. Apache Ignite out of the box shipped with JDBC driver that allows you to connect to Ignite caches and retrieve distributed data from the cache using standard SQL queries. Rest of the section of this chapter will describe how to connect SQL IDE (Integrated Development Environment) to Ignite cache and executes some SQL queries to play with the data. SQL IDE or SQL editor can simplify the development process and allow you to get productive much quicker. Most database vendors have their own front-en

8 things every developer should know about the Apache Ignite caching

Any technology, no matter how advanced it is, will not be able to solve your problems if you implement it improperly. Caching, precisely when it comes to the use of a distributed caching, can only accelerate your application with the proper use and configurations of it. From this point of view, Apache Ignite is no different, and there are a few steps to consider before using it in the production environment. In this article, we describe various technics that can help you to plan and adequately use of Apache Ignite as cutting-edge caching technology. Do proper capacity planning before using Ignite cluster. Do paperwork for understanding the size of the cache, number of CPUs or how many JVMs will be required. Let’s assume that you are using Hibernate as an ORM in 10 application servers and wish to use Ignite as an L2 cache. Calculate the total memory usages and the number of Ignite nodes you have to need for maintaining your SLA. An incorrect number of the Ignite nodes can become a b