Skip to main content

A single node Hadoop + Cassandra + Pig setup

UP1: Our book High Performace in-memory computing with Apache Ignite has been released. The book briefly described how to improved performance in existing legacy Hadoop cluster with Apache Ignite.

In our current project, we have decided to store all operational logs into NoSQL DB. It's total volume about 97 TB per year. Cassandra was our main candidate to use as NoSQL DB. But we also have to analysis and monitor our data, where comes Hadoop and Pig to help. Within 2 days our team able to developed simple pilot projects to demonstrate all the power of Hadoop + Cassandra and Pig.
For the pilot project we used DataStax Enterprise edition. Seems this out of box product help us to quick install Hadoop, Cassandra stack and developed our pilot project. Here we made a decision to setup Hadoop, Cassandra, and Pig by our self. It's my first attempt to install Cassandra over Hadoop and Pig. Seems all these above products already running already a few years, but I haven't found any step by step tutorial to setup a single node cluster with Hadoop + Cassandra + pig.
First of all, we are going to install Hadoop and Cassandra, therefore, will try to run pig_cassandra Map only job over Cassandra column family which will save the result on Hadoop HDFS file system.
Setup Hadoop:
1) Download hadoop from the following link - http://www.sai.msu.su/apache/hadoop/core/stable/ then un archive the file
tar -xvf hadoop-0.20.2.tar.gz
rm hadoop-0.20.2.tar.gz
cd hadoop-0.20.2
2) Edit /conf/core-site.xml. I have used localhost in the value of fs.default.name

     
       fs.default.name
       hdfs://localhost:9000
     
     
3) Edit /conf/mapred-site.xml.

     
         mapred.job.tracker
         localhost:9001
     
   
4) Edit /conf/hdfs-site.xml. Since this test cluster has a single node, replication factor should be set to 1.

     
      dfs.replication
      1
     
   
5) Set your JAVA_HOME variable in /conf/hadoop-env.sh. If you already have the JAVA_HOME variable in your .bash_profile - it's redundant.
6) Format the name node (one per install).
$ bin/hadoop namenode -format
it should print out the following message
12/07/15 15:54:20 INFO namenode.NameNode: STARTUP_MSG: 
/************************************************************
STARTUP_MSG: Starting NameNode
STARTUP_MSG:   host = Shamim-2.local/192.168.0.103
STARTUP_MSG:   args = [-format]
STARTUP_MSG:   version = 0.20.2
STARTUP_MSG:   build = https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20 -r 911707; compiled by 'chrisdo' on Fri Feb 19 08:07:34 UTC 2010
************************************************************/
12/07/15 15:54:21 INFO namenode.FSNamesystem: fsOwner=samim,staff,com.apple.sharepoint.group.1,everyone,_appstore,localaccounts,_appserverusr,admin,_appserveradm,_lpadmin,_lpoperator,_developer,com.apple.access_screensharing
12/07/15 15:54:21 INFO namenode.FSNamesystem: supergroup=supergroup
12/07/15 15:54:21 INFO namenode.FSNamesystem: isPermissionEnabled=true
12/07/15 15:54:21 INFO common.Storage: Image file of size 95 saved in 0 seconds.
12/07/15 15:54:21 INFO common.Storage: Storage directory /tmp/hadoop-samim/dfs/name has been successfully formatted.
12/07/15 15:54:21 INFO namenode.NameNode: SHUTDOWN_MSG: 
/************************************************************
SHUTDOWN_MSG: Shutting down NameNode at Shamim-2.local/192.168.0.103
************************************************************/
6.1) set up passphraseless ssh
Check that you can login into localhost without passphrase
ssh localhost
if you cannot than first enable your ssh server
system preferences-> sharing-> check the box for remote loging, also you can allow access for all user
then execute the following commands
$ ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
$ cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys
7) Start all hadoop components
$ bin/hadoop-daemon.sh start namenode
$ bin/hadoop-daemon.sh start jobtracker
$ bin/hadoop-daemon.sh start datanode
$ bin/hadoop-daemon.sh start tasktracker
$ bin/hadoop-daemon.sh start secondarynamenode
starting namenode, logging to /Users/samim/Development/NoSQL/hadoop/core/hadoop-0.20.2/bin/../logs/hadoop-samim-namenode-Shamim-2.local.out
starting jobtracker, logging to /Users/samim/Development/NoSQL/hadoop/core/hadoop-0.20.2/bin/../logs/hadoop-samim-jobtracker-Shamim-2.local.out
starting datanode, logging to /Users/samim/Development/NoSQL/hadoop/core/hadoop-0.20.2/bin/../logs/hadoop-samim-datanode-Shamim-2.local.out
you can check all the log file to make sure that everything goes well.
8) Verify the NameNode and DataNodes communication through web interface. http://localhost:50070/dfshealth.jsp Check the page and confirm that you have one Live node
9) Verify that the JobTracker and TaskTrackers are communicating by looking at the JobTracker web interface and confirming one node listed in the Nodes column: http://localhost:50030/jobtracker.jsp
10) Use the hadoop command-line tool to test the file system:
$ hadoop dfs -ls /
$ hadoop dfs -mkdir /test_dir
$ echo "A few words to test" > /tmp/myfile
$ hadoop dfs -copyFromLocal /tmp/myfile /test_dir
$ hadoop dfs -cat /test_dir/myfile
A few words to test
Setup Cassandra: 1) Download the source code for cassandra verion 1.1.2 from the following link http://www.apache.org/dyn/closer.cgi?path=/cassandra/1.1.2/apache-cassandra-1.1.2-src.tar.gz assume you know how to build the cassandra from the source code, otherwise you will find a lot of information though google to build cassandra from the source code.
2) Edit CASSANDRA_HOME/conf/cassandra.yaml file to set the listen_address and rpc_address to localhost.
3) Start cassandra $ cassandra/bin/ ./cassandra 4) Check the cluster through node tool utility
$ /bin ./nodetool -h localhost ring
Note: Ownership information does not include topology, please specify a keyspace. 
Address         DC          Rack        Status State   Load            Owns                Token                                       
127.0.0.1       datacenter1 rack1       Up     Normal  55.17 KB        100.00%         96217188464178957452903952331500076192  
Cassandra cluster starts up, now we are going to configure pig
Setup Pig: 1) Download pig from the apache site as follows http://www.sai.msu.su/apache/pig/ tar -xvf pig-0.8.0.tar.gz rm pig-0.8.0.tar.gz At this moment we will try to run the pig_cassandra example which you can find with the source distribution. First of all it's better to read the README.TXT file from the folder apache-cassandra-1.1.2-src/examples/pig/README.txt Set all the env variables describes in the readme.txt file as follows:
export PIG_HOME=%YOUR_PIG_INSTALLION_FOLDER%
export PIG_INITIAL_ADDRESS=localhost
export PIG_RPC_PORT=9160
export PIG_PARTITIONER=org.apache.cassandra.dht.RandomPartitioner
Also if you would like to run using the Hadoop backend, you should also set PIG_CONF_DIR to the location of your Hadoop config. In my cases
export PIG_CONF_DIR=hadoop/core/hadoop-0.20.2/conf
In this stage you can run grunt shell to run map reduce task, run examples/pig$ bin/pig_cassandra -x local it should prompt grunt shell, but i have got the following clssnofound exception: java.lang.ClassNotFoundException: org.apache.hadoop.mapred.RunningJob For quick fix, i decide to edit the pig_cassandra file as follows:
export HADOOP_CLASSPATH="/Users/xyz/hadoop/core/hadoop-0.20.2/hadoop-0.20.2-core.jar"
CLASSPATH=$CLASSPATH:$PIG_JAR:$HADOOP_CLASSPATH
While i got the grunt shell, i create a keyspace and one column family in cassandra cluster and insert some value through cassandra-cli
[default@unknown] create keyspace Keyspace1;
  [default@unknown] use Keyspace1;
  [default@Keyspace1] create column family Users with comparator=UTF8Type and default_validation_class=UTF8Type and key_validation_class=UTF8Type;
  [default@KS1] set Users[jsmith][first] = 'John';
  [default@KS1] set Users[jsmith][last] = 'Smith';
  [default@KS1] set Users[jsmith][age] = long(42)
then i run following pig query in grunt shell
grunt> rows = LOAD 'cassandra://Keyspace1/Users' USING CassandraStorage() AS (key, columns: bag {T: tuple(name, value)});
grunt> cols = FOREACH rows GENERATE flatten(columns);
grunt> colnames = FOREACH cols GENERATE $0;
grunt> namegroups = GROUP colnames BY (chararray) $0;
grunt> namecounts = FOREACH namegroups GENERATE COUNT($1), group;
grunt> orderednames = ORDER namecounts BY $0;
grunt> topnames = LIMIT orderednames 50;
grunt> dump topnames;
Pig run the script and here is the statistics:
2012-07-15 17:29:35,878 [main] INFO  org.apache.pig.tools.pigstats.PigStats - Detected Local mode. Stats reported below may be incomplete
2012-07-15 17:29:35,881 [main] INFO  org.apache.pig.tools.pigstats.PigStats - Script Statistics: 

HadoopVersion PigVersion UserId StartedAt FinishedAt Features
0.20.2 0.8.3 samim 2012-07-15 17:29:14 2012-07-15 17:29:35 GROUP_BY,ORDER_BY,LIMIT

Success!

Job Stats (time in seconds):
JobId Alias Feature Outputs
job_local_0001 colnames,cols,namecounts,namegroups,rows GROUP_BY,COMBINER 
job_local_0002 orderednames SAMPLER 
job_local_0003 orderednames ORDER_BY,COMBINER file:/tmp/temp-833597378/tmp-220576755,

Input(s):
Successfully read records from: "cassandra://Keyspace1/Users"

Output(s):
Successfully stored records in: "file:/tmp/temp-833597378/tmp-220576755"

Job DAG:
job_local_0001 -> job_local_0002,
job_local_0002 -> job_local_0003,
job_local_0003


2012-07-15 17:29:35,881 [main] INFO  org.apache.hadoop.metrics.jvm.JvmMetrics - Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
2012-07-15 17:29:35,886 [main] INFO  org.apache.hadoop.metrics.jvm.JvmMetrics - Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
2012-07-15 17:29:35,887 [main] INFO  org.apache.hadoop.metrics.jvm.JvmMetrics - Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
2012-07-15 17:29:35,888 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!
2012-07-15 17:29:35,904 [main] INFO  org.apache.hadoop.metrics.jvm.JvmMetrics - Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
2012-07-15 17:29:35,907 [main] INFO  org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1
2012-07-15 17:29:35,907 [main] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1
(1,age)
(1,last)
(1,first)
You should found the output file in the hadoop file system tmp. In my case its
file:/tmp/temp-833597378/tmp-220576755
If you would like to run the example-script.pig, you would have to create one KeySpace name MyKeySpace and column family according to the pig script. I just edit the example-script.pig and set the newly created keyspace1 and column family Users. Then you can run it like this:
examples/pig$ bin/pig_cassandra example-script.pig
If you want to run the pig in local mode, add the following predicates -x local. For example pig_cassandra -x local example-script.pig. Without the instruction -x local, pig will run on Hadoop mode. See here for more information. Thank'x Nabanita to point out this moment.

see the statistics in the console. My next step is to set up Cassandra cluster with 4 nodes over Hadoop and run Map reduce all over the cluster nodes.
Resources:
1) Cassandra high-performance cook book.
2) Cassandra definitive guide.
3) http://stackoverflow.com/questions/8846788/pig-integrated-with-cassandra-simple-distributed-query-takes-a-few-minutes-to-c

Comments

Popular posts from this blog

Send e-mail with attachment through OSB

Oracle Service Bus (OSB) contains a good collection of adapter to integrate with any legacy application, including ftp, email, MQ, tuxedo. However e-mail still recognize as a stable protocol to integrate with any application asynchronously. Send e-mail with attachment is a common task of any business process. Inbound e-mail adapter which, integrated with OSB support attachment but outbound adapter doesn't. This post is all about sending attachment though JavaCallout action. There are two ways to handle attachment in OSB: 1) Use JavaCallout action to pass the binary data for further manipulation. It means write down a small java library which will get the attachment and send the e-mail. 2) Use integrated outbound e-mail adapter to send attachment, here you have to add a custom variable named attachment and assign the binary data to the body of the attachment variable. First option is very common and easy to implement through javax.mail api, however a much more developer manage t

Tip: SQL client for Apache Ignite cache

A new SQL client configuration described in  The Apache Ignite book . If it got you interested, check out the rest of the book for more helpful information. Apache Ignite provides SQL queries execution on the caches, SQL syntax is an ANSI-99 compliant. Therefore, you can execute SQL queries against any caches from any SQL client which supports JDBC thin client. This section is for those, who feels comfortable with SQL rather than execute a bunch of code to retrieve data from the cache. Apache Ignite out of the box shipped with JDBC driver that allows you to connect to Ignite caches and retrieve distributed data from the cache using standard SQL queries. Rest of the section of this chapter will describe how to connect SQL IDE (Integrated Development Environment) to Ignite cache and executes some SQL queries to play with the data. SQL IDE or SQL editor can simplify the development process and allow you to get productive much quicker. Most database vendors have their own front-en

Load balancing and fail over with scheduler

Every programmer at least develop one Scheduler or Job in their life time of programming. Nowadays writing or developing scheduler to get you job done is very simple, but when you are thinking about high availability or load balancing your scheduler or job it getting some tricky. Even more when you have a few instance of your scheduler but only one can be run at a time also need some tricks to done. A long time ago i used some data base table lock to achieved such a functionality as leader election. Around 2010 when Zookeeper comes into play, i always preferred to use Zookeeper to bring high availability and scalability. For using Zookeeper you have to need Zookeeper cluster with minimum 3 nodes and maintain the cluster. Our new customer denied to use such a open source product in their environment and i was definitely need to find something alternative. Definitely Quartz was the next choose. Quartz makes developing scheduler easy and simple. Quartz clustering feature brings the HA and