Friday

Patch pig_cassandra for setting ttl to cassandra data

Apache pig provides a platform for analyzing very large data set. With apache pig you can easily analyze your data from Cassandra. Apache pig compiles instruction to sequences of Map-Reduce programs which will run on Hadoop cluster. Cassandra source provides a simple pig script to run pig with Cassandra data. Cassandra also provides CassandraStorage class which will load and store data from Cassandra DB, this class will no built in support for storing data with TTL (time to live). In many cases you have to update a few columns or rows with ttl to delete later automatically from DB. For that, i have patched the CassandraStorage class and add the similar functionality. Here is the patch
Index: src/main/java/ru/atc/smev/cassandra/storage/CassandraStorage.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- src/main/java/ru/atc/smev/cassandra/storage/CassandraStorage.java (revision 3711)
+++ src/main/java/ru/atc/smev/cassandra/storage/CassandraStorage.java (revision )
@@ -90,7 +90,7 @@
     private final static String PARTITION_FILTER_SIGNATURE = "cassandra.partition.filter";
 
     private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER;
-    private static final Log logger = LogFactory.getLog(CassandraStorage.class);
+    private static final Log logger = LogFactory.getLog(MyCassandraStorage.class);
 
     private ByteBuffer slice_start = BOUND;
     private ByteBuffer slice_end = BOUND;
@@ -113,6 +113,7 @@
     private Map<bytebuffer olumn="olumn"> lastRow;
     private boolean hasNext = true;
 
+    private int ttl;
 
     public CassandraStorage()
     {
@@ -131,8 +132,13 @@
     public int getLimit()
     {
         return limit;
+
     }
 
+    public int getTtl() {
+        return ttl;
+    }
+
     public Tuple getNextWide() throws IOException
     {
         CfDef cfDef = getCfDef(loadSignature);
@@ -337,14 +343,14 @@
     private CfDef getCfDef(String signature)
     {
         UDFContext context = UDFContext.getUDFContext();
-        Properties property = context.getUDFProperties(CassandraStorage.class);
+        Properties property = context.getUDFProperties(MyCassandraStorage.class);
         return cfdefFromString(property.getProperty(signature));
     }
 
     private List<indexexpression> getIndexExpressions()
     {
         UDFContext context = UDFContext.getUDFContext();
-        Properties property = context.getUDFProperties(CassandraStorage.class);
+        Properties property = context.getUDFProperties(MyCassandraStorage.class);
         if (property.getProperty(PARTITION_FILTER_SIGNATURE) != null)
             return indexExpressionsFromString(property.getProperty(PARTITION_FILTER_SIGNATURE));
         else
@@ -462,6 +468,8 @@
                     slice_reverse = Boolean.parseBoolean(urlQuery.get("reversed"));
                 if (urlQuery.containsKey("limit"))
                     limit = Integer.parseInt(urlQuery.get("limit"));
+                if(urlQuery.containsKey("ttl"))
+                    ttl = Integer.parseInt(urlQuery.get("ttl"));
             }
             String[] parts = urlParts[0].split("/+");
             keyspace = parts[1];
@@ -469,7 +477,7 @@
         }
         catch (Exception e)
         {
-            throw new IOException("Expected 'cassandra://<keyspace>/<columnfamily>[?slice_start=<start>&slice_end=<end>[&reversed=true][&limit=1]]': " + e.getMessage());
+            throw new IOException("Expected 'cassandra://<keyspace>/<columnfamily>[?slice_start=<start>&slice_end=<end>[&reversed=true][&limit=1][&ttl=86400]]': " + e.getMessage());
         }
     }
 
@@ -694,7 +702,7 @@
     public void setPartitionFilter(Expression partitionFilter)
     {
         UDFContext context = UDFContext.getUDFContext();
-        Properties property = context.getUDFProperties(CassandraStorage.class);
+        Properties property = context.getUDFProperties(MyCassandraStorage.class);
         property.setProperty(PARTITION_FILTER_SIGNATURE, indexExpressionsToString(filterToIndexExpressions(partitionFilter)));
     }
 
@@ -901,6 +909,11 @@
             column.setName(objToBB(t.get(0)));
             column.setValue(objToBB(t.get(1)));
             column.setTimestamp(FBUtilities.timestampMicros());
+            if(getTtl() != 0){
+                column.setTtl(getTtl());
+                column.setTtlIsSet(true);
+            }
+
             mutation.column_or_supercolumn = new ColumnOrSuperColumn();
             mutation.column_or_supercolumn.column = column;
         }
@@ -924,6 +937,11 @@
                     column.setName(objToBB(subcol.get(0)));
                     column.setValue(objToBB(subcol.get(1)));
                     column.setTimestamp(FBUtilities.timestampMicros());
+                    if(getTtl() != 0){
+                        column.setTtl(getTtl());
+                        column.setTtlIsSet(true);
+                    }
+
                     columns.add(column);
                 }
                 if (columns.isEmpty())
@@ -980,7 +998,7 @@
     private void initSchema(String signature)
     {
         UDFContext context = UDFContext.getUDFContext();
-        Properties property = context.getUDFProperties(CassandraStorage.class);
+        Properties property = context.getUDFProperties(MyCassandraStorage.class);
 
         // Only get the schema if we haven't already gotten it
         if (!property.containsKey(signature))
You can build the Cassandra with the above patch and using it on your pig script as follows:
STORE updated INTO 'cassandra://audit/auditlog?ttl=86400' USING CassandraStorage();
Or you can use the new class named MyCassandraStorage as a pig UDF function. First you have to compile and archive the class in jar. Later you can define the class in your pig script and use as follows:
DEFINE ParseSmevMessage com.abc.pig.utils.MyCassandraStorage();
raw = LOAD 'cassandra://audit/auditlog' USING MyCassandraStorage();
filtered = FILTER raw BY  processed.$1=='N';
updated = FOREACH filtered GENERATE  key,TOTUPLE('processed','Y');
STORE updated INTO 'cassandra://audit/auditlog?ttl=86400' USING MyCassandraStorage();
Post a Comment