Skip to main content

Patch pig_cassandra for setting ttl to cassandra data

Apache pig provides a platform for analyzing very large data set. With apache pig you can easily analyze your data from Cassandra. Apache pig compiles instruction to sequences of Map-Reduce programs which will run on Hadoop cluster. Cassandra source provides a simple pig script to run pig with Cassandra data. Cassandra also provides CassandraStorage class which will load and store data from Cassandra DB, this class will no built in support for storing data with TTL (time to live). In many cases you have to update a few columns or rows with ttl to delete later automatically from DB. For that, i have patched the CassandraStorage class and add the similar functionality. Here is the patch
Index: src/main/java/ru/atc/smev/cassandra/storage/CassandraStorage.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- src/main/java/ru/atc/smev/cassandra/storage/CassandraStorage.java (revision 3711)
+++ src/main/java/ru/atc/smev/cassandra/storage/CassandraStorage.java (revision )
@@ -90,7 +90,7 @@
     private final static String PARTITION_FILTER_SIGNATURE = "cassandra.partition.filter";
 
     private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER;
-    private static final Log logger = LogFactory.getLog(CassandraStorage.class);
+    private static final Log logger = LogFactory.getLog(MyCassandraStorage.class);
 
     private ByteBuffer slice_start = BOUND;
     private ByteBuffer slice_end = BOUND;
@@ -113,6 +113,7 @@
     private Map<bytebuffer olumn="olumn"> lastRow;
     private boolean hasNext = true;
 
+    private int ttl;
 
     public CassandraStorage()
     {
@@ -131,8 +132,13 @@
     public int getLimit()
     {
         return limit;
+
     }
 
+    public int getTtl() {
+        return ttl;
+    }
+
     public Tuple getNextWide() throws IOException
     {
         CfDef cfDef = getCfDef(loadSignature);
@@ -337,14 +343,14 @@
     private CfDef getCfDef(String signature)
     {
         UDFContext context = UDFContext.getUDFContext();
-        Properties property = context.getUDFProperties(CassandraStorage.class);
+        Properties property = context.getUDFProperties(MyCassandraStorage.class);
         return cfdefFromString(property.getProperty(signature));
     }
 
     private List<indexexpression> getIndexExpressions()
     {
         UDFContext context = UDFContext.getUDFContext();
-        Properties property = context.getUDFProperties(CassandraStorage.class);
+        Properties property = context.getUDFProperties(MyCassandraStorage.class);
         if (property.getProperty(PARTITION_FILTER_SIGNATURE) != null)
             return indexExpressionsFromString(property.getProperty(PARTITION_FILTER_SIGNATURE));
         else
@@ -462,6 +468,8 @@
                     slice_reverse = Boolean.parseBoolean(urlQuery.get("reversed"));
                 if (urlQuery.containsKey("limit"))
                     limit = Integer.parseInt(urlQuery.get("limit"));
+                if(urlQuery.containsKey("ttl"))
+                    ttl = Integer.parseInt(urlQuery.get("ttl"));
             }
             String[] parts = urlParts[0].split("/+");
             keyspace = parts[1];
@@ -469,7 +477,7 @@
         }
         catch (Exception e)
         {
-            throw new IOException("Expected 'cassandra://<keyspace>/<columnfamily>[?slice_start=<start>&slice_end=<end>[&reversed=true][&limit=1]]': " + e.getMessage());
+            throw new IOException("Expected 'cassandra://<keyspace>/<columnfamily>[?slice_start=<start>&slice_end=<end>[&reversed=true][&limit=1][&ttl=86400]]': " + e.getMessage());
         }
     }
 
@@ -694,7 +702,7 @@
     public void setPartitionFilter(Expression partitionFilter)
     {
         UDFContext context = UDFContext.getUDFContext();
-        Properties property = context.getUDFProperties(CassandraStorage.class);
+        Properties property = context.getUDFProperties(MyCassandraStorage.class);
         property.setProperty(PARTITION_FILTER_SIGNATURE, indexExpressionsToString(filterToIndexExpressions(partitionFilter)));
     }
 
@@ -901,6 +909,11 @@
             column.setName(objToBB(t.get(0)));
             column.setValue(objToBB(t.get(1)));
             column.setTimestamp(FBUtilities.timestampMicros());
+            if(getTtl() != 0){
+                column.setTtl(getTtl());
+                column.setTtlIsSet(true);
+            }
+
             mutation.column_or_supercolumn = new ColumnOrSuperColumn();
             mutation.column_or_supercolumn.column = column;
         }
@@ -924,6 +937,11 @@
                     column.setName(objToBB(subcol.get(0)));
                     column.setValue(objToBB(subcol.get(1)));
                     column.setTimestamp(FBUtilities.timestampMicros());
+                    if(getTtl() != 0){
+                        column.setTtl(getTtl());
+                        column.setTtlIsSet(true);
+                    }
+
                     columns.add(column);
                 }
                 if (columns.isEmpty())
@@ -980,7 +998,7 @@
     private void initSchema(String signature)
     {
         UDFContext context = UDFContext.getUDFContext();
-        Properties property = context.getUDFProperties(CassandraStorage.class);
+        Properties property = context.getUDFProperties(MyCassandraStorage.class);
 
         // Only get the schema if we haven't already gotten it
         if (!property.containsKey(signature))
You can build the Cassandra with the above patch and using it on your pig script as follows:
STORE updated INTO 'cassandra://audit/auditlog?ttl=86400' USING CassandraStorage();
Or you can use the new class named MyCassandraStorage as a pig UDF function. First you have to compile and archive the class in jar. Later you can define the class in your pig script and use as follows:
DEFINE ParseSmevMessage com.abc.pig.utils.MyCassandraStorage();
raw = LOAD 'cassandra://audit/auditlog' USING MyCassandraStorage();
filtered = FILTER raw BY  processed.$1=='N';
updated = FOREACH filtered GENERATE  key,TOTUPLE('processed','Y');
STORE updated INTO 'cassandra://audit/auditlog?ttl=86400' USING MyCassandraStorage();

Comments

Popular posts from this blog

8 things every developer should know about the Apache Ignite caching

Any technology, no matter how advanced it is, will not be able to solve your problems if you implement it improperly. Caching, precisely when it comes to the use of a distributed caching, can only accelerate your application with the proper use and configurations of it. From this point of view, Apache Ignite is no different, and there are a few steps to consider before using it in the production environment. In this article, we describe various technics that can help you to plan and adequately use of Apache Ignite as cutting-edge caching technology. Do proper capacity planning before using Ignite cluster. Do paperwork for understanding the size of the cache, number of CPUs or how many JVMs will be required. Let’s assume that you are using Hibernate as an ORM in 10 application servers and wish to use Ignite as an L2 cache. Calculate the total memory usages and the number of Ignite nodes you have to need for maintaining your SLA. An incorrect number of the Ignite nodes can become a b...

Analyse with ANT - a sonar way

After the Javaone conference in Moscow, i have found some free hours to play with Sonar . Here is a quick steps to start analyzing with ANT projects. Sonar provides Analyze with ANT document to play around with ANT, i have just modify some parts. Here is it. 1) Download the Sonar Ant Task and put it in your ${ANT_HOME}/lib directory 2) Modify your ANT build.xml as follows: <?xml version = '1.0' encoding = 'windows-1251'?> <project name="abc" default="build" basedir="."> <!-- Define the Sonar task if this hasn't been done in a common script --> <taskdef uri="antlib:org.sonar.ant" resource="org/sonar/ant/antlib.xml"> <classpath path="E:\java\ant\1.8\apache-ant-1.8.0\lib" /> </taskdef> <!-- Out-of-the-box those parameters are optional --> <property name="sonar.jdbc.url" value="jdbc:oracle:thin:@xyz/sirius.xyz" /> <property na...

Quick start with In memory Data Grid, Apache Ignite

UP1: For complete quick start guide, see also the sample chapter of the book "High performance in-memory computing with Apache Ignite" here . Even you can find the sample examples from the GitHub repository . IMDG or In memory data grid is not an in-memory relational database, an NoSQL database or a relational database. It is a different breed of software datastore. The data model is distributed across many servers in a single location or across multiple locations. This distribution is known as a data fabric. This distributed model is known as a ‘shared nothing’ architecture. IMDG has following characteristics: All servers can be active in each site. All data is stored in the RAM of the servers. Servers can be added or removed non-disruptively, to increase the amount of RAM available. The data model is non-relational and is object-based.  Distributed applications written on the platform independent language. The data fabric is resilient, allowing non-disruptive au...