Skip to main content

Patch pig_cassandra for setting ttl to cassandra data

Apache pig provides a platform for analyzing very large data set. With apache pig you can easily analyze your data from Cassandra. Apache pig compiles instruction to sequences of Map-Reduce programs which will run on Hadoop cluster. Cassandra source provides a simple pig script to run pig with Cassandra data. Cassandra also provides CassandraStorage class which will load and store data from Cassandra DB, this class will no built in support for storing data with TTL (time to live). In many cases you have to update a few columns or rows with ttl to delete later automatically from DB. For that, i have patched the CassandraStorage class and add the similar functionality. Here is the patch
Index: src/main/java/ru/atc/smev/cassandra/storage/CassandraStorage.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- src/main/java/ru/atc/smev/cassandra/storage/CassandraStorage.java (revision 3711)
+++ src/main/java/ru/atc/smev/cassandra/storage/CassandraStorage.java (revision )
@@ -90,7 +90,7 @@
     private final static String PARTITION_FILTER_SIGNATURE = "cassandra.partition.filter";
 
     private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER;
-    private static final Log logger = LogFactory.getLog(CassandraStorage.class);
+    private static final Log logger = LogFactory.getLog(MyCassandraStorage.class);
 
     private ByteBuffer slice_start = BOUND;
     private ByteBuffer slice_end = BOUND;
@@ -113,6 +113,7 @@
     private Map<bytebuffer olumn="olumn"> lastRow;
     private boolean hasNext = true;
 
+    private int ttl;
 
     public CassandraStorage()
     {
@@ -131,8 +132,13 @@
     public int getLimit()
     {
         return limit;
+
     }
 
+    public int getTtl() {
+        return ttl;
+    }
+
     public Tuple getNextWide() throws IOException
     {
         CfDef cfDef = getCfDef(loadSignature);
@@ -337,14 +343,14 @@
     private CfDef getCfDef(String signature)
     {
         UDFContext context = UDFContext.getUDFContext();
-        Properties property = context.getUDFProperties(CassandraStorage.class);
+        Properties property = context.getUDFProperties(MyCassandraStorage.class);
         return cfdefFromString(property.getProperty(signature));
     }
 
     private List<indexexpression> getIndexExpressions()
     {
         UDFContext context = UDFContext.getUDFContext();
-        Properties property = context.getUDFProperties(CassandraStorage.class);
+        Properties property = context.getUDFProperties(MyCassandraStorage.class);
         if (property.getProperty(PARTITION_FILTER_SIGNATURE) != null)
             return indexExpressionsFromString(property.getProperty(PARTITION_FILTER_SIGNATURE));
         else
@@ -462,6 +468,8 @@
                     slice_reverse = Boolean.parseBoolean(urlQuery.get("reversed"));
                 if (urlQuery.containsKey("limit"))
                     limit = Integer.parseInt(urlQuery.get("limit"));
+                if(urlQuery.containsKey("ttl"))
+                    ttl = Integer.parseInt(urlQuery.get("ttl"));
             }
             String[] parts = urlParts[0].split("/+");
             keyspace = parts[1];
@@ -469,7 +477,7 @@
         }
         catch (Exception e)
         {
-            throw new IOException("Expected 'cassandra://<keyspace>/<columnfamily>[?slice_start=<start>&slice_end=<end>[&reversed=true][&limit=1]]': " + e.getMessage());
+            throw new IOException("Expected 'cassandra://<keyspace>/<columnfamily>[?slice_start=<start>&slice_end=<end>[&reversed=true][&limit=1][&ttl=86400]]': " + e.getMessage());
         }
     }
 
@@ -694,7 +702,7 @@
     public void setPartitionFilter(Expression partitionFilter)
     {
         UDFContext context = UDFContext.getUDFContext();
-        Properties property = context.getUDFProperties(CassandraStorage.class);
+        Properties property = context.getUDFProperties(MyCassandraStorage.class);
         property.setProperty(PARTITION_FILTER_SIGNATURE, indexExpressionsToString(filterToIndexExpressions(partitionFilter)));
     }
 
@@ -901,6 +909,11 @@
             column.setName(objToBB(t.get(0)));
             column.setValue(objToBB(t.get(1)));
             column.setTimestamp(FBUtilities.timestampMicros());
+            if(getTtl() != 0){
+                column.setTtl(getTtl());
+                column.setTtlIsSet(true);
+            }
+
             mutation.column_or_supercolumn = new ColumnOrSuperColumn();
             mutation.column_or_supercolumn.column = column;
         }
@@ -924,6 +937,11 @@
                     column.setName(objToBB(subcol.get(0)));
                     column.setValue(objToBB(subcol.get(1)));
                     column.setTimestamp(FBUtilities.timestampMicros());
+                    if(getTtl() != 0){
+                        column.setTtl(getTtl());
+                        column.setTtlIsSet(true);
+                    }
+
                     columns.add(column);
                 }
                 if (columns.isEmpty())
@@ -980,7 +998,7 @@
     private void initSchema(String signature)
     {
         UDFContext context = UDFContext.getUDFContext();
-        Properties property = context.getUDFProperties(CassandraStorage.class);
+        Properties property = context.getUDFProperties(MyCassandraStorage.class);
 
         // Only get the schema if we haven't already gotten it
         if (!property.containsKey(signature))
You can build the Cassandra with the above patch and using it on your pig script as follows:
STORE updated INTO 'cassandra://audit/auditlog?ttl=86400' USING CassandraStorage();
Or you can use the new class named MyCassandraStorage as a pig UDF function. First you have to compile and archive the class in jar. Later you can define the class in your pig script and use as follows:
DEFINE ParseSmevMessage com.abc.pig.utils.MyCassandraStorage();
raw = LOAD 'cassandra://audit/auditlog' USING MyCassandraStorage();
filtered = FILTER raw BY  processed.$1=='N';
updated = FOREACH filtered GENERATE  key,TOTUPLE('processed','Y');
STORE updated INTO 'cassandra://audit/auditlog?ttl=86400' USING MyCassandraStorage();

Comments

Popular posts from this blog

8 things every developer should know about the Apache Ignite caching

Any technology, no matter how advanced it is, will not be able to solve your problems if you implement it improperly. Caching, precisely when it comes to the use of a distributed caching, can only accelerate your application with the proper use and configurations of it. From this point of view, Apache Ignite is no different, and there are a few steps to consider before using it in the production environment. In this article, we describe various technics that can help you to plan and adequately use of Apache Ignite as cutting-edge caching technology. Do proper capacity planning before using Ignite cluster. Do paperwork for understanding the size of the cache, number of CPUs or how many JVMs will be required. Let’s assume that you are using Hibernate as an ORM in 10 application servers and wish to use Ignite as an L2 cache. Calculate the total memory usages and the number of Ignite nodes you have to need for maintaining your SLA. An incorrect number of the Ignite nodes can become a b...

Tip: SQL client for Apache Ignite cache

A new SQL client configuration described in  The Apache Ignite book . If it got you interested, check out the rest of the book for more helpful information. Apache Ignite provides SQL queries execution on the caches, SQL syntax is an ANSI-99 compliant. Therefore, you can execute SQL queries against any caches from any SQL client which supports JDBC thin client. This section is for those, who feels comfortable with SQL rather than execute a bunch of code to retrieve data from the cache. Apache Ignite out of the box shipped with JDBC driver that allows you to connect to Ignite caches and retrieve distributed data from the cache using standard SQL queries. Rest of the section of this chapter will describe how to connect SQL IDE (Integrated Development Environment) to Ignite cache and executes some SQL queries to play with the data. SQL IDE or SQL editor can simplify the development process and allow you to get productive much quicker. Most database vendors have their own fron...

Load balancing and fail over with scheduler

Every programmer at least develop one Scheduler or Job in their life time of programming. Nowadays writing or developing scheduler to get you job done is very simple, but when you are thinking about high availability or load balancing your scheduler or job it getting some tricky. Even more when you have a few instance of your scheduler but only one can be run at a time also need some tricks to done. A long time ago i used some data base table lock to achieved such a functionality as leader election. Around 2010 when Zookeeper comes into play, i always preferred to use Zookeeper to bring high availability and scalability. For using Zookeeper you have to need Zookeeper cluster with minimum 3 nodes and maintain the cluster. Our new customer denied to use such a open source product in their environment and i was definitely need to find something alternative. Definitely Quartz was the next choose. Quartz makes developing scheduler easy and simple. Quartz clustering feature brings the HA and...