Skip to main content

Patch pig_cassandra for setting ttl to cassandra data

Apache pig provides a platform for analyzing very large data set. With apache pig you can easily analyze your data from Cassandra. Apache pig compiles instruction to sequences of Map-Reduce programs which will run on Hadoop cluster. Cassandra source provides a simple pig script to run pig with Cassandra data. Cassandra also provides CassandraStorage class which will load and store data from Cassandra DB, this class will no built in support for storing data with TTL (time to live). In many cases you have to update a few columns or rows with ttl to delete later automatically from DB. For that, i have patched the CassandraStorage class and add the similar functionality. Here is the patch
Index: src/main/java/ru/atc/smev/cassandra/storage/CassandraStorage.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- src/main/java/ru/atc/smev/cassandra/storage/CassandraStorage.java (revision 3711)
+++ src/main/java/ru/atc/smev/cassandra/storage/CassandraStorage.java (revision )
@@ -90,7 +90,7 @@
     private final static String PARTITION_FILTER_SIGNATURE = "cassandra.partition.filter";
 
     private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER;
-    private static final Log logger = LogFactory.getLog(CassandraStorage.class);
+    private static final Log logger = LogFactory.getLog(MyCassandraStorage.class);
 
     private ByteBuffer slice_start = BOUND;
     private ByteBuffer slice_end = BOUND;
@@ -113,6 +113,7 @@
     private Map<bytebuffer olumn="olumn"> lastRow;
     private boolean hasNext = true;
 
+    private int ttl;
 
     public CassandraStorage()
     {
@@ -131,8 +132,13 @@
     public int getLimit()
     {
         return limit;
+
     }
 
+    public int getTtl() {
+        return ttl;
+    }
+
     public Tuple getNextWide() throws IOException
     {
         CfDef cfDef = getCfDef(loadSignature);
@@ -337,14 +343,14 @@
     private CfDef getCfDef(String signature)
     {
         UDFContext context = UDFContext.getUDFContext();
-        Properties property = context.getUDFProperties(CassandraStorage.class);
+        Properties property = context.getUDFProperties(MyCassandraStorage.class);
         return cfdefFromString(property.getProperty(signature));
     }
 
     private List<indexexpression> getIndexExpressions()
     {
         UDFContext context = UDFContext.getUDFContext();
-        Properties property = context.getUDFProperties(CassandraStorage.class);
+        Properties property = context.getUDFProperties(MyCassandraStorage.class);
         if (property.getProperty(PARTITION_FILTER_SIGNATURE) != null)
             return indexExpressionsFromString(property.getProperty(PARTITION_FILTER_SIGNATURE));
         else
@@ -462,6 +468,8 @@
                     slice_reverse = Boolean.parseBoolean(urlQuery.get("reversed"));
                 if (urlQuery.containsKey("limit"))
                     limit = Integer.parseInt(urlQuery.get("limit"));
+                if(urlQuery.containsKey("ttl"))
+                    ttl = Integer.parseInt(urlQuery.get("ttl"));
             }
             String[] parts = urlParts[0].split("/+");
             keyspace = parts[1];
@@ -469,7 +477,7 @@
         }
         catch (Exception e)
         {
-            throw new IOException("Expected 'cassandra://<keyspace>/<columnfamily>[?slice_start=<start>&slice_end=<end>[&reversed=true][&limit=1]]': " + e.getMessage());
+            throw new IOException("Expected 'cassandra://<keyspace>/<columnfamily>[?slice_start=<start>&slice_end=<end>[&reversed=true][&limit=1][&ttl=86400]]': " + e.getMessage());
         }
     }
 
@@ -694,7 +702,7 @@
     public void setPartitionFilter(Expression partitionFilter)
     {
         UDFContext context = UDFContext.getUDFContext();
-        Properties property = context.getUDFProperties(CassandraStorage.class);
+        Properties property = context.getUDFProperties(MyCassandraStorage.class);
         property.setProperty(PARTITION_FILTER_SIGNATURE, indexExpressionsToString(filterToIndexExpressions(partitionFilter)));
     }
 
@@ -901,6 +909,11 @@
             column.setName(objToBB(t.get(0)));
             column.setValue(objToBB(t.get(1)));
             column.setTimestamp(FBUtilities.timestampMicros());
+            if(getTtl() != 0){
+                column.setTtl(getTtl());
+                column.setTtlIsSet(true);
+            }
+
             mutation.column_or_supercolumn = new ColumnOrSuperColumn();
             mutation.column_or_supercolumn.column = column;
         }
@@ -924,6 +937,11 @@
                     column.setName(objToBB(subcol.get(0)));
                     column.setValue(objToBB(subcol.get(1)));
                     column.setTimestamp(FBUtilities.timestampMicros());
+                    if(getTtl() != 0){
+                        column.setTtl(getTtl());
+                        column.setTtlIsSet(true);
+                    }
+
                     columns.add(column);
                 }
                 if (columns.isEmpty())
@@ -980,7 +998,7 @@
     private void initSchema(String signature)
     {
         UDFContext context = UDFContext.getUDFContext();
-        Properties property = context.getUDFProperties(CassandraStorage.class);
+        Properties property = context.getUDFProperties(MyCassandraStorage.class);
 
         // Only get the schema if we haven't already gotten it
         if (!property.containsKey(signature))
You can build the Cassandra with the above patch and using it on your pig script as follows:
STORE updated INTO 'cassandra://audit/auditlog?ttl=86400' USING CassandraStorage();
Or you can use the new class named MyCassandraStorage as a pig UDF function. First you have to compile and archive the class in jar. Later you can define the class in your pig script and use as follows:
DEFINE ParseSmevMessage com.abc.pig.utils.MyCassandraStorage();
raw = LOAD 'cassandra://audit/auditlog' USING MyCassandraStorage();
filtered = FILTER raw BY  processed.$1=='N';
updated = FOREACH filtered GENERATE  key,TOTUPLE('processed','Y');
STORE updated INTO 'cassandra://audit/auditlog?ttl=86400' USING MyCassandraStorage();

Comments

Popular posts from this blog

8 things every developer should know about the Apache Ignite caching

Any technology, no matter how advanced it is, will not be able to solve your problems if you implement it improperly. Caching, precisely when it comes to the use of a distributed caching, can only accelerate your application with the proper use and configurations of it. From this point of view, Apache Ignite is no different, and there are a few steps to consider before using it in the production environment. In this article, we describe various technics that can help you to plan and adequately use of Apache Ignite as cutting-edge caching technology. Do proper capacity planning before using Ignite cluster. Do paperwork for understanding the size of the cache, number of CPUs or how many JVMs will be required. Let’s assume that you are using Hibernate as an ORM in 10 application servers and wish to use Ignite as an L2 cache. Calculate the total memory usages and the number of Ignite nodes you have to need for maintaining your SLA. An incorrect number of the Ignite nodes can become a b...

Analyse with ANT - a sonar way

After the Javaone conference in Moscow, i have found some free hours to play with Sonar . Here is a quick steps to start analyzing with ANT projects. Sonar provides Analyze with ANT document to play around with ANT, i have just modify some parts. Here is it. 1) Download the Sonar Ant Task and put it in your ${ANT_HOME}/lib directory 2) Modify your ANT build.xml as follows: <?xml version = '1.0' encoding = 'windows-1251'?> <project name="abc" default="build" basedir="."> <!-- Define the Sonar task if this hasn't been done in a common script --> <taskdef uri="antlib:org.sonar.ant" resource="org/sonar/ant/antlib.xml"> <classpath path="E:\java\ant\1.8\apache-ant-1.8.0\lib" /> </taskdef> <!-- Out-of-the-box those parameters are optional --> <property name="sonar.jdbc.url" value="jdbc:oracle:thin:@xyz/sirius.xyz" /> <property na...

Apache Ignite Baseline Topology by Examples

Ignite Baseline Topology or BLT represents a set of server nodes in the cluster that persists data on disk. Where, N1-2 and N5 server nodes are the member of the Ignite clusters with native persistence which enable data to persist on disk. N3-4 and N6 server nodes are the member of the Ignite cluster but not a part of the baseline topology. The nodes from the baseline topology are a regular server node, that store's data in memory and on the disk, and also participates in computing tasks. Ignite clusters can have different nodes that are not a part of the baseline topology such as: Server nodes that are not used Ignite native persistence to persist data on disk. Usually, they store data in memory or persists data to a 3rd party database or NoSQL. In the above equitation, node N3 or N4 might be one of them. Client nodes that are not stored shared data. To better understand the baseline topology concept, let’s start at the beginning and try to understand its goal and what ...