Sunday

The Apache Ignite Native persistence, a brief overview

In-memory approaches can achieve blazing speed by putting the working set of the data into the system memory. When all data is kept in memory, the need to deal with issues arising from the use of traditional spinning disks disappears. This means, for instance, there is no need to maintain additional cache copies of data and manage synchronization between them. But there is also a downside to this approach because the data is in memory only, it will not survive if the whole cluster gets terminated. Therefore, this types of data stores are not considered persistence at all.
In this blog post, I will do an effort to explore the Apache Ignite new native persistence feature and provide a clear, understandable picture how the Apache Ignite native persistence works. 
In most cases, you can’t (should not) store the whole data set in memory for your application, most often you should store relatively small hot or active subset of data to increase the performance of the application. The rest of the data should be stored somewhere in low-cost disks or tape for archiving. There are two main in-memory database storage requirements available:
  • Permanent media, to store committed transactions, thereby maintaining durability and for recovery purpose if the in-memory database needs to be reloaded into the memory.
  • Permanent storage, to hold a backup copy of the entire in-memory database.
Permanent storage or media can be any distributed or local file system, SAN, NoSQL database or even RDBMS like Postgres or Oracle. Apache Ignite (since 1.5) provides an elegant way to connect persistence data stores such as RDBMS or NoSQL DB like Mongo DB or Cassandra. Most often persistence in an RDBMS will be bottlenecks and you never got a horizontal scaling in your system. For more information, I recommended you to have a look at the sample chapter of the book "High performance in-memory computing with Apache Ignite".


So, from the version 2.1.0, Apache Ignite provides ACID and SQL-compliant disk store that transparently integrates with Ignite's durable memory as an optional disk layer storing data and indexes on SSD, Flash, 3D XPoint, and other types of non-volatile storages.
The Apache Ignite native persistence uses new durable memory architecture that allows storing and processing data and indexes both in-memory and on disk. Whenever the feature enables, Apache Ignite stores a superset of data on disk, and a subset of data in RAM based on its capacity. If a subset of data or an index is missing in RAM, the Durable Memory will take it from the disk as shown new pictures below.
Data can be also stored in the central disk storage where all the Ignite nodes connected as shown below.

Before we start, let's cover the prerequisites of the project in our sandbox:
  1. Apache Ignite version 2.1.0
  2. JVM 1.8
  3. Apache Maven version >3.0.3
  4. *nix based operating system
Installation.
There are basically two ways to use Apache Ignite:
  • Download the binary distribution and unzip the archive somewhere in your os and run the ./ignite.sh bash script with the spring config files.
  • Create a maven project with the required Apache Ignite dependencies, configure the node through the java code and run it.
Here, I am going to use the first option.
Step 1.
  • Download the Apache Ignite binary distribution and unzip the distribution somewhere in your sandbox. 
  • Modify the IGNITE_HOME/examples/config/persistentstore/example-persistent-store.xml file and comment the following part of the cache configuration.
<property name="cacheConfiguration">
<list>
    <bean class="org.apache.ignite.configuration.CacheConfiguration">
        <property name="name" value="testCache"/>
            <property name="backups" value="1"/>
            <property name="atomicityMode" value="TRANSACTIONAL"/>
            <property name="writeSynchronizationMode" value="FULL_SYNC"/>
            <property name="indexedTypes">
            <list>
            <value>java.lang.Long</value>
                <value>org.apache.ignite.examples.model.Organization</value>
            </list>
            </property>
        </bean>
    </list>
</property>
Note that, to enable the Ignite native persistence, you only need to pass the following configuration (an instance of the PersistentStoreConfiguration), which already pre-configured in the example-persistent-store.XML file.


<property name="persistentStoreConfiguration">
<bean class="org.apache.ignite.configuration.PersistentStoreConfiguration"/>
</property>
  • Run the following command from the IGNITE_HOME directory.
./ignite.sh $IGNITE_HOME/examples/config/persistentstore/example-persistent-store.xml
Step 2.
  • create a Maven project with the following command.
mvn archetype:create -DgroupId=com.blu.imdg -DartifactId=ignite-persistence
  • Add the following dependencies in the pom.xml
<dependency>
      <groupId>org.apache.ignite</groupId>
      <artifactId>ignite-core</artifactId>
      <version>2.1.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.ignite</groupId>
      <artifactId>ignite-spring</artifactId>
      <version>2.1.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.ignite</groupId>
      <artifactId>ignite-indexing</artifactId>
      <version>2.1.0</version>
    </dependency>
  • Create a Java class with the following contents.
public class HelloWorld {
    public static void main(String[] args) {
        System.out.println("Hello Ignite");
        // create a new instance of TCP Discovery SPI
        TcpDiscoverySpi spi = new TcpDiscoverySpi();
        // create a new instance of tcp discovery multicast ip finder
        TcpDiscoveryMulticastIpFinder tcMp = new TcpDiscoveryMulticastIpFinder();
        tcMp.setAddresses(Arrays.asList("localhost")); // change your IP address here
        // set the multi cast ip finder for spi
        spi.setIpFinder(tcMp);
        // create new ignite configuration
        IgniteConfiguration cfg = new IgniteConfiguration();
        cfg.setClientMode(true);
        // set the discovery§ spi to ignite configuration
        cfg.setDiscoverySpi(spi);
        // Start ignite
        Ignite ignite = Ignition.start(cfg);
        ignite.active(true);
        // get or create cache
        IgniteCache cache = ignite.getOrCreateCache("testCache");
        // put some cache elements
        for (int i = 1; i <= 100; i++) {
            cache.put(i, Integer.toString(i));
        }
        // get them from the cache and write to the console
        for (int i = 1; i <= 100; i++) {
            System.out.println("Cache get:" + cache.get(i));
        }
        ignite.close();

    }
}
Note that, we are using Ignite client mode for manipulating data. After running the HelloWorld application 100 elements should be inserted in the cache (cache name testCache).
Step 3.
  • let's use the ignitevisor command tool to examine the data. use cache -scan command in ignitevisor command tool. You should get the similar illustration in your console. All 100 elements in the cache.
    • Now, let's see whats happened under the hood. Run the following command from the IGNITE_HOME/work directory
du -h .
You should get something like in your console as shown below.
If Apache Ignite native persistence enables, Ignite will persist all the data and the index in memory and on disk across all the cluster nodes. 
If you will go through the directory db/0_0_0_0_0_0_0_1_10_211_55_2_10_37_129_2_127_0_0_1_192_168_1_37_47500 (in my case), you will find individual folder for every cache. The folder with name cache-testCache will contain all the cache entries (100 elements) which we have just inserted.
The file index.bin is the index of the cache entries and every cache element gets their individual page file. Why did this happen? now Ignite architecture is page based architecture. Let's take a closer look, memory now splits into regions -> regions split into segments -> segments split into pages. Pages can be swapped into the disk. Pages can store:
  • data 
  • metadata
  • index
Page are fixed-length block, it also supports automatic defragmentation. If you take a closer look at the pages size, all of them are 14 KB. Whenever Ignite needs to load data from the disk, it just loads the page file and so it's very fast.
Also, there is another concept over write-ahead log (WAL). If you doing an update, first it will be updating the data in-memory and marks the page dirty, and then it will persist the data into the write-ahead log. Ignite just append the update into the WAL file. WAL file is very much similar to Cassandra commitlog file, with one difference. Cassandra writes parallel into in-memory and the commitlog file on disk, on the other hand, Ignite update the data into the memory first and then append the data into the WAL. For more information, I recommend you to have a look at the documentation, which is quite exhaustive.
Step 4.
  • Restart the Ignite node, and check the cache testCache with ignitevisor. You will end up with a surprise that no data into the cache.
  • Let's slightly modify our helloworld class and run the application again, comment or delete the following fragments of the code as shown below.
// put some cache elements
for (int i = 1; i <= 100; i++) {
  cache.put(i, Integer.toString(i));
}
Run the application and check the cache testCache through ignitevisor and you application console. 
Whenever any read request occurs, Ignite first check the data into the memory. If the dataset doesn't exist in memory, Ignite immediately load the cache entries from the disk and load into the memory. Also note that, all entries into the memory in offheap. 
Benefits.
With Ignite native persistence, now you can easily do backup for the data recovery, Denis Magda writes a comprehensive article for data recovery by using Ignite native persistence. One thing I have to mention here is the data replication between clusters. By using Ignite native persistence, now you can replicate data from one cluster to another on line. You can use any standard disk based data replication tools to copy the changed data set from the primary data center to the stand-in data center or Ignite cluster.

Apache Ignite with Spring Data

Spring Data provides a unified and easy way to access the different kinds of persistence store, both relational database systems, and NoSQL data stores. It is on top of JPA, adding another layer of abstraction and defining a standard-based design to support persistence Layer in a Spring context.
Apache Ignite IgniteRepository implements Spring Data CrudRepository interface and extends basic capabilities of the CrudRepository, which in turns supports:
  1. Basic CRUD operations on a repository for a specific type.
  2. Access to the Apache Ignite SQL grid via Spring Data API.
With Spring Data's repositories, you only need to write an interface with finder methods to query the objects. All the CRUD method for manipulating the objects will be delivered automatically. As an example:


@RepositoryConfig(cacheName = "DogCache")
public interface DogRepository extends IgniteRepository<Dog, Long> {
    List<Dog> getDogByName(String name);
    Dog getDogById (Long id);
}
In this article, we are going to cover the following topics:
  • Create a Maven project from the scratch for using Spring Data with Apache Ignite Grid.
  • Persisting a few entities into Ignite caches through Spring Data framework.
Before we start, let's cover the prerequisites of the project in your sandbox:
  1. Java JDK 1.8
  2. Ignite version2.0
  3. Apache Maven version >3.0.3
Step 1
Let’s set up the sandbox first. Create a Maven project or Clone the project from the GitHub repository.
mvn archetype:create -DgroupId=com.blu.imdg -DartifactId=spring-data

Step 2

Modify the pom.xml, add the following maven dependencies:
<dependency>
    <groupId>org.apache.ignite</groupId>
    <artifactId>ignite-core</artifactId>
    <version>2.0.0</version>
</dependency>
<dependency>
    <groupId>org.apache.ignite</groupId>
    <artifactId>ignite-spring</artifactId>
    <version>2.0.0</version>
</dependency>
<dependency>
    <groupId>org.apache.ignite</groupId>
    <artifactId>ignite-spring-data</artifactId>
    <version>2.0.0</version>
</dependency>
<dependency>
    <groupId>org.apache.ignite</groupId>
    <artifactId>ignite-indexing</artifactId>
    <version>2.0.0</version>
</dependency>
<dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
    <version>1.4.195</version>
</dependency>
Note that, maven h2 dependency is optional. If you are getting an error like "org.h2.result.RowFactory", add the dependency explicitly.

The Domain Model

Our example domain model consisted of two different entities: Breed and Dog.

The association between Breed and Dog is ManyToOne. One Dog can have only one breed.

Step 3

Now, let’s map the domain model by creating the Java classes and annotating them with the required meta-information. Let’s start with the Breed class.
package com.blu.imdg.model;

import org.apache.ignite.cache.query.annotations.QuerySqlField;

import java.io.Serializable;

public class Breed implements Serializable {

    @QuerySqlField(index = true)
    private Long id;

    @QuerySqlField(index = true)
    private String name;

    public Long getId() {

        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return "Breed{" +
                "id='" + id + '\'' +
                ", name='" + name + '\'' +
                '}';
    }
}
Note that, @QuerySqlField annotation enables the fields for SQL queries.
Create another class named Dog and add the following contents to it.

package com.blu.imdg.model;

import org.apache.ignite.cache.query.annotations.QuerySqlField;

import java.io.Serializable;
import java.sql.Date;

public class Dog implements Serializable {

    @QuerySqlField(index = true)
    private Long id;
    @QuerySqlField(index = true)
    private String name;
    @QuerySqlField(index = true)
    private Long breedid;
    @QuerySqlField(index = true)
    private Date birthdate;

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Long getBreedid() {
        return breedid;
    }

    public void setBreedid(Long breedid) {
        this.breedid = breedid;
    }

    public Date getBirthdate() {
        return birthdate;
    }

    public void setBirthdate(Date birthdate) {
        this.birthdate = birthdate;
    }

    @Override
    public String toString() {
        return "Dog{" +
                "id=" + id +
                ", name='" + name + '\'' +
                ", breedid=" + breedid +
                ", birthdate=" + birthdate +
                '}';
    }
}

Step 4

Now, lets create the Spring repository for all the pojo's created before.
package com.blu.imdg.repositories;

import com.blu.imdg.model.Dog;
import org.apache.ignite.springdata.repository.IgniteRepository;
import org.apache.ignite.springdata.repository.config.RepositoryConfig;

import java.util.List;

@RepositoryConfig(cacheName = "DogCache")
public interface DogRepository extends IgniteRepository<Dog, Long> {
    List<Dog> getDogByName(String name);
    Dog getDogById (Long id);
}
@RepositoryConfig annotation should be specified to map a repository to a distributed cache. Also, we have two finder methods getDogByName and getDogById for querying the cache.
Lets' add a similar repository for the Breed domain as follows:
package com.blu.imdg.repositories;

import com.blu.imdg.model.Breed;
import org.apache.ignite.springdata.repository.IgniteRepository;
import org.apache.ignite.springdata.repository.config.Query;
import org.apache.ignite.springdata.repository.config.RepositoryConfig;
import org.springframework.data.domain.Pageable;

import java.util.List;

@RepositoryConfig(cacheName = "BreedCache")
public interface BreedRepository extends IgniteRepository<Breed, Long> {

    List<Breed> getAllBreedsByName (String name);

    @Query("SELECT id FROM Breed WHERE id = ?")
    List<Long> getById (long id, Pageable pageable);
}

In the above BreedRepository interface, we also use @Query(queryString) annotation, which can be used if a concrete SQL query needs to be executed as a result of a method call.
Step 5
Let’s create the cache configuration class. Create an Ignite cache configuration class and mark the application configuration with @EnableIgniteRepositories annotation, as shown below:
package com.blu.imdg.repositories;

import com.blu.imdg.model.Breed;
import com.blu.imdg.model.Dog;
import org.apache.ignite.Ignite;
import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.springdata.repository.config.EnableIgniteRepositories;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableIgniteRepositories
public class SpringAppConfig {
    @Bean
    public Ignite igniteInstance() {
        IgniteConfiguration cfg = new IgniteConfiguration();
        // Setting some custom name for the node.
        cfg.setIgniteInstanceName("springDataNode");
        // Enabling peer-class loading feature.
        cfg.setPeerClassLoadingEnabled(true);
        // Defining and creating a new cache to be used by Ignite Spring Data
        // repository.
        CacheConfiguration ccfgDog = new CacheConfiguration("DogCache");
        CacheConfiguration ccfgBreed = new CacheConfiguration("BreedCache");
        // Setting SQL schema for the cache.
        ccfgBreed.setIndexedTypes(Long.class, Breed.class);
        ccfgDog.setIndexedTypes(Long.class, Dog.class);

        cfg.setCacheConfiguration(new CacheConfiguration[]{ccfgDog, ccfgBreed});

        return Ignition.start(cfg);
    }
}
Note that, we have used two separate CacheConfiguration for Breed and Dog cache. Also, set the SQL schema for the cache.

Step 6

Once all the configurations and the repositories are ready to be used, we only need to register the configuration in a Spring application context.
package com.blu.imdg;

import com.blu.imdg.model.Breed;
import com.blu.imdg.model.Dog;
import com.blu.imdg.repositories.BreedRepository;
import com.blu.imdg.repositories.DogRepository;
import com.blu.imdg.repositories.SpringAppConfig;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

import java.sql.Date;
import java.util.List;

/**
 * Hello world!
 *
 */
public class App 
{
    private static AnnotationConfigApplicationContext ctx;
    private static BreedRepository breedRepository;
    private static DogRepository dogRepository;

    public static void main( String[] args )
    {
        System.out.println( "Spring Data Example!" );
        ctx = new AnnotationConfigApplicationContext();
        ctx.register(SpringAppConfig.class);
        ctx.refresh();

        breedRepository = ctx.getBean(BreedRepository.class);
        dogRepository = ctx.getBean(DogRepository.class);

        //fill the repository with data and Save
        Breed collie = new Breed();
        collie.setId(1L);
        collie.setName("collie");
        //save Breed with name collie
        breedRepository.save(1L, collie);

        System.out.println("Add one breed in the repository!");
        // Query the breed
        List<Breed> getAllBreeds = breedRepository.getAllBreedsByName("collie");

        for(Breed breed : getAllBreeds){
            System.out.println("Breed:" + breed);
        }
        //Add some dogs
        Dog dina = new Dog();
        dina.setName("dina");
        dina.setId(1L);
        dina.setBreedid(1L);
        dina.setBirthdate(new Date(System.currentTimeMillis()));
        //Save Dina
        dogRepository.save(2L,dina);
        System.out.println("Dog dina save into the cache!");
        //Query the Dog Dina
        List<Dog> dogs = dogRepository.getDogByName("dina");
        for(Dog dog : dogs){
            System.out.println("Dog:"+ dog);
        }

    }
}
The above code snippet is very straight forward. First, we create a Spring annotated context and register our repositories. Next, we get the reference to our BreedRepository and DogRepository to insert a few data. To query the data we use basic CRUD operations or methods that will be automatically turned into Apache Ignite SQL queries:


List<Dog> dogs = dogRepository.getDogByName("dina");
for(Dog dog : dogs){
  System.out.println("Dog:"+ dog);
}

Step 7

Let’s build and run the application. Execute the following command.
mvn clean install
mvn exec:java -Dexec.mainClass=com.blu.imdg.App

You should find a lot of log messages into the console.


The log messages confirm that two entries (dina and breed-collie) have been flushed into the Ignite cache and retrieved the dog Dina from the cache. Let’s explore the cache through Ignite Visor.

Two different caches have been created for the entities: Breed and Dog. If we scan the cache entries of the Dog cache, we should find the following entity on it.

Entity Dina has been persisted into the cache with the key of the Breed collie.
If you want to learn more about Apache Ignite (using JPA, Hibernate or MyBatis), please refer the book High Performance in-memory computing with Apache Ignite.

Saturday

In-Memory MapReduce and Your Hadoop Ecosystem (Part 2)

Portions of this article were taken from the book High-Performance In-Memory Computing With Apache Ignite. If it got you interested, check out the rest of the book for more helpful information.
Before reading, be sure to check out Part 1!
Apache Ignite provides a vanilla distributed in-memory file system called Ignite File System (IGFS) with similar functionality to Hadoop HDFS. This is one of the unique features of Apache Ignite that helps accelerate Big Data computing. IGFS implements the Hadoop file system API and is designed to support Hadoop v1 and Yarn Hadoop v2. Ignite IGFS can transparently plug into Hadoop or Spark deployment.
One of the greatest benefits of the IGFS is that it does away with Hadoop NamedNode in the Hadoop deployment; it seamlessly utilizes Ignite’s in-memory database under the hood to provide completely automatic scaling and failover without any additional shared storage. IGFS uses memory instead of disk to produce a distributed, fault-tolerant, and high throughput file system. Removing NamedNode from the architecture leads to a dramatically better performance of I/O operations. Furthermore, IGFS provides native file system API to working with directories and files in the in-memory file system.
IgniteFileSystem, or the IGFS interface, provides methods for regular file system operations such as create, update, delete, mkdirs, etc., as well as MapReduce task executions. Another interesting feature of IGFS is its amazing smart usages of the file-level caching and eviction design. IGFS utilizes file-level caching to ensure corruption free storage.
Note that IGFS is not an alternative like RAM disk — it’s a fully compliant in-memory file system like HDFS. A high-level architecture of the IGFS is shown below in Figure 1.

In this article, we are going to cover basic operations of the IGFS and deploy the IGFS in standalone mode to store files into IGFS and performs a few MapReduce tasks on top of it.
Note: We are not going to replace the HDFS completely; otherwise, we would not be able to start the Hadoop dataNode anymore. We are going to use both IGFS and HDFS simultaneously.
From the bird’s eyes view, running MapReduce in IGFS on top of HDFS looks like as follows:
  1. Configure the IGFS for the Ignite nodes.
  2. Put files into IGFS.
  3. Configure the Hadoop.
  4. Run MapReduce.
There are a several ways to configure the IGFS on the Ignite cluster. Unfortunately, Apache Ignite doesn’t provide any comprehensive GUI-based management tools nor command line interface for maintaining Hadoop accelerator. However, GridGain Visor (Ignite commercial version) as a management tool provides IGFS monitoring and file management between HDFS, local and IGFS file systems. To demonstrate, how to use IGFS, we will perform the following steps:
  1. Configure the IGFS file system in the Ignite cluster (default-config.xml).
  2. Run a standalone Java application to ingest a file into IGFS. In our case, the file will be the t8.shakespeare.txt.
  3. Configure Hadoop.
  4. Run a MapReduce wordcount job to compute the count of the words from the IGFS file.
  5. Run a standalone Java application to check the result of the MapReduce job.
Now that, we have dipped our toes into the IGFS, let’s configure the standalone IGFS and run some MapReduce jobs on it.

Step 1

Add the following springs configuration beans into the default-config.xml file of the Ignite node as follows:
<bean id="igfsCfgBase" class="org.apache.ignite.configuration.FileSystemConfiguration" abs\
tract="true">
<property name="blockSize" value="#{128 * 1024}"/>
<property name="perNodeBatchSize" value="512"/>
<property name="perNodeParallelBatchCount" value="16"/>
<property name="prefetchBlocks" value="32"/>
</bean>
<bean id="dataCacheCfgBase" class="org.apache.ignite.configuration.CacheConfiguration" a\
bstract="true">
<property name="cacheMode" value="PARTITIONED"/>
<property name="atomicityMode" value="TRANSACTIONAL"/>
<property name="writeSynchronizationMode" value="FULL_SYNC"/>
<property name="backups" value="0"/>
<property name="affinityMapper">
<bean class="org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper">
<constructor-arg value="512"/>
</bean>
</property>
</bean>
<bean id="metaCacheCfgBase" class="org.apache.ignite.configuration.CacheConfiguration" a\
bstract="true">
<property name="cacheMode" value="REPLICATED"/>
<property name="atomicityMode" value="TRANSACTIONAL"/>
<property name="writeSynchronizationMode" value="FULL_SYNC"/>
</bean>
<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
yVmIpFinder">
<property name="ipFinder">
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscover\
<property name="addresses">
<list>
<value>127.0.0.1:47500..47509</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
<property name="fileSystemConfiguration">
<list>
<bean class="org.apache.ignite.configuration.FileSystemConfiguration" parent\
="igfsCfgBase"></bean>
<property name="name" value="igfs"/>
<property name="metaCacheName" value="igfs-meta"/>
<property name="dataCacheName" value="igfs-data"/>
<property name="blockSize" value="1024"/>
<property name="streamBufferSize" value="1024"/>
<property name="ipcEndpointConfiguration">
<bean class="org.apache.ignite.igfs.IgfsIpcEndpointConfiguration">
<property name="type" value="SHMEM"/>
<property name="host" value="127.0.0.1"/>
<property name="port" value="10500"/>
</bean>
</property>

Next, we have configured base cache configuration called dataCacheCfgBase, which will be the parent of the IGFS data cache. Most of the properties of this configuration we have already discussed. Note that for demonstration purposes, we have set the backup value to 0.
Our subsequent configuration is the base configuration for the metadata cache called meta- CacheCfgBase. It is probably the most unfamiliar part of this configuration. IGFS contains metadata for all files ingested into the in-memory file system. The configuration of this property is very similar to the previous base cache configuration.
Next, we are going to configure the IGFS file system, it is the main part of the Ignite configuration. We set the name of the IGFS file system to IGFS. The block size and the stream buffer size of the IGFS file system will be 1024. To let IGFS accept requests from Hadoop, an endpoint should be configured. Ignite offers two endpoint types:
  1. shmem: Working over shared memory (not available on Windows).
  2. tcp: Working over standard socket API.

Step 2

When each Ignite node is configured (default-config.xml), start every node with the following commands:
$ignite.sh

Step 3

In this step, we are going to ingest our t8.shakespeare.txt file into the IGFS file system. As we described before, we will use a Java application to ingest the file into IGFS. The application is very simple; it ingests the t8.shakespeare.txt file once every time the application is launched. The application will take the name of the directory and the filename as an input parameter to put the files into IGFS. Open the pom.xml file and add the following code in the dependency section.
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-spring</artifactId>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-hadoop</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>19.0</version>
</dependency>
Now, add a new Java class with the name IngestFileInIGFS. The full listing of the Java class is shown below:
public class IngestFileInIGFS {
 private final static Logger LOGGER = LoggerFactory.getLogger(IngestFileInIGFS.class);
 private final static String IGFS_FS_NAME = "igfs";
 public static void main(String...args) {
  if (args.length < 2) {
   LOGGER.error("Usages [java -jar chapter-bigdata-1.0-SNAPSHOT.jar DIRECTORY_NAM\
E FILE NAME, for example java -jar chapter-bigdata-1.0-SNAPSHOT.jar myDir myFile]");
   System.exit(0);
  }
  Ignite ignite = Ignition.start("default-config.xml");
  Ignition.setClientMode(true);
  Collection < IgniteFileSystem > fs = ignite.fileSystems();
  for (Iterator ite = fs.iterator(); ite.hasNext();) {
   IgniteFileSystem igniteFileSystem = (IgniteFileSystem) ite.next();
   LOGGER.info("IGFS File System name:" + igniteFileSystem.name());
  }
  IgniteFileSystem igfs = ignite.fileSystem(IGFS_FS_NAME); // Create directory.
  IgfsPath dir = new IgfsPath("/" + args[0]);
  igfs.mkdirs(dir);
  // Create file and write some data to it.
  IgfsPath file = new IgfsPath(dir, args[1]);
  // Read the File Shakespeare
  InputStream inputStream = IngestFileInIGFS.class.getClassLoader().getResourceAsStr\
  eam("t8.shakespeare.txt");
 }
}
byte[] filesToByte;
try {
 filesToByte = ByteStreams.toByteArray(inputStream);
 OutputStream out = igfs.create(file, true);
 out.write(filesToByte);
 out.close();
} catch (IOException e) {
 LOGGER.error(e.getMessage());
} finally {
 try {
  inputStream.close();
 } catch (IOException e) {
  LOGGER.error(e.getMessage());
 }
}
LOGGER.info("Created file path:" + file.toString());
To compile and run the application, execute the following command:


mvn clean install
java -jar ./ IngestFileInIGFS.jar myDir myFile
After successfully compiling the Maven project, there will be Java executable JAR files in the target folder. The IngestFileInIGFS.jar file is for ingesting file into IGFS.

Step 4

It’s time for configuring Hadoop (the IGFS file system must be configured in Hadoop).
Let’s create a new directory under HADOOP_HOME/etc with the following command and copy all the files from the Hadoop directory. Execute the following command from the $HADOOP_HOME/etc directory.
cd $HADOOP_HOME/etc
$ mkdir hadoop-ignite
$ cp ./hadoop/*.* ./hadoop-ignite
Remove all the properties from the $HADOOP_HOME/etc/hadoop-ignite/core-site.xml and add the following properties as follows:
<configuration>
<property>
<name>fs.defaultFS</name>
<value>igfs:///igfs@127.0.0.1:10500/</value>
</property>
<property>
<name>fs.igfs.impl</name>
<value>org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem</value>
</property>
</configuration>
The full qualified file system class name org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem is sufficient for configuring the IGFS for Hadoop.
Note: v1 or v2 doesn’t stand for Hadoop 1.x and Hadoop 2.x. Instead, this is about either old FileSystem API or new AbstractFileSystem API.
At this moment Hadoop configuration has been completed, and we are ready to execute Map/Reduce jobs.

Step 5

There are several ways to execute MapReduce jobs with Hadoop configuration. One of the easiest ways is to pass the Hadoop config directory as an input parameter to the job as follows:
hadoop --config [path_to_config] [arguments]

Let’s run our wordcount MapReduce job with the file from the IGFS with the following command:

time hadoop --config /home/user/hadoop/hadoop-2.7.2/etc/hadoop-ignite jar $HADOOP_HOME/sha\
re/hadoop/mapreduce/hadoop-mapredu
ce-examples-2.7.2.jar wordcount /myDir/myFile /myDir/out

After running the above statement, you should get the similar output in your terminal as shown below.


Note that you have to change the name of the output directory every time you run the MapReduce job.
Such a simple way, you can replace the Hadoop HDFS with IGFS.

Tuesday

An impatient start with Apache Ignite machine learning grid

Recently Apache Ignite 2.0 introduce a beta version of the in-memory machine learning grid, which is a distributed machine learning library built on top of the Apache IMDG. This beta release of ML library can perform local and distributed vector, decompositions and matrix algebra operations. The data structure can be stored in Java heap, off-heap or distributed Ignite caches. At this moment, the Apache Ignite ML grid doesn't support any prediction or recommendation analysis. In this short post, we are going to download the new Apache Ignite 2.0 release, build the example and run them.

1. Download and unpack the Apache Ignite 2.0 distribution.

Download the Apache Ignite 2.0 binary release version from the following link. Unpack the distribution somewhere in your workstation (e.g /home/ignite/2.0) and set the IGNITE_HOME path to the directory.

2. Start the Apache Ignite remote node

Run the following command in the terminal window.
ignite.sh examples/config/example-ignite.xml 

Note that, Remote nodes for examples should always be started with the special configuration file which enables P2P class loading: `examples/config/example-ignite.xml`.

Also, note that Apache Ignite version 2.0 needs Java version 1.8 or higher.

3. Build the machine learning examples

Go to the /examples folder of the Apache Ignite distribution. If you already installed and configure maven, run the following command from the examples folder.

mvn clean install -Pml

The above command will active the machine learning (ml) profile and build the project.

4. Run it

Lets run the simple local onheap version of the Vector example. Execute the following command in your terminal windows:

mvn exec:java -Dexec.mainClass=org.apache.ignite.examples.ml.math.vector.VectorExample

You should get the following logs in your console.


All the examples are autonomous, does't need any special configuration. Examples name with 'Cache' such as CacheMatrixExample or CacheVectorExample needs remote Ignite node with P2P class loading. Let's run the CacheMatrixExample with the following command.
mvn exec:java -Dexec.mainClass=org.apache.ignite.examples.ml.math.matrix.CacheMatrixExample

You should get the following output as shown below.


Additionally, Apache Ignite ML grid provides a simple utility class allows pretty-printing of matrices/vectors. You can run the TracerExample as follows:
mvn exec:java -Dexec.mainClass=org.apache.ignite.examples.ml.math.tracer.TracerExample

This above command will launch a web browser and provide some HTML output as follows:


This enough for now. If you like this post, you should also like the book.

Saturday

Unboxing of the first copy of the book High performance in-memory computing with Apache Ignite

Yesterday I have got the first paperback version of the book High performance in-memory computing with Apache Ignite 







The book is available at Lulu.com & Amazon bookstore.


Product details

  • Paperback: 360 pages
  • Language: English
  • ISBN-10: 1365732355
  • ISBN-13: 978-1365732355
  • Product Dimensions: 8.3 x 0.8 x 11 inches
  • Shipping Weight: 1.8 pounds

Happy reading!!


Support independent publishing: Buy this book on Lulu.

In-Memory MapReduce and Your Hadoop Ecosystem: Part I

Portions of this article were taken from the book High-Performance In-Memory Computing With Apache Ignite. If it got you interested, check out the rest of the book for more helpful information.
Hadoop has quickly become the standard for business intelligence on huge data sets. However, its batch scheduling overhead and disk-based data storage have made it unsuitable for use in analysing live, real-time data in the production environment. One of the main factors that limits performance scaling of Hadoop and MapReduce is the fact that Hadoop relies on a file system that generates a lot of input/output (I/O) files. I/O adds latency that delays the MapReduce computation. An alternative is to store the needed distributed data within the memory. Placing MapReduce in-memory with the data it needs eliminates file I/O latency.
Figure 1.
The generic phases of a Hadoop job are shown in the above figure. Phase sort, merge, and shuffle are highly I/O intensive. These overheads are prohibitive when running real-time analytics that returns the result in milliseconds or seconds. 
The Ignite in-memory MapReduce engine executes MapReduce programs in seconds (or less) by incorporating several techniques. By avoiding Hadoop’s batch scheduling, it can start up jobs in milliseconds instead of tens of seconds. In-memory data storage dramatically reduces access times by eliminating data motion from the disk or across the network. This is the Ignite approach to accelerate Hadoop application performance without changing the code. The main advantages are that all the operations are highly transparent, all of this is accomplished without changing a line of MapReduce code.
Ignite Hadoop in-memory plug and play accelerator can be grouped by in three different categories.

1. In-Memory MapReduce

It’s an alternative implementation of Hadoop Job tracker and task tracker, which can accelerate job execution performance. It eliminates the overhead associated with job tracker and task trackers in a standard Hadoop architecture while providing low- latency, HPC-style distributed processing.

2. Ignite In-Memory File System (IGFS)

It’s also an alternate implementation of Hadoop file system named IgniteHadoopFileSystem, which can store data sets in-memory. This in-memory file system minimizes disk I/O and improves performances.

3. Hadoop File System Cache

This implementation works as a caching layer above HDFS. Every read and write operation should go through this layer and can improve MapReduce performance. 
Conceptual architecture of the Ignite Hadoop accelerator is shown in Figure 2:
Figure 2.
The Apache Ignite Hadoop accelerator tool is especially very useful when you already have up and running existing Hadoop cluster and want to get high-performance with minimum efforts. Note that the idea that Hadoop runs on commodity hardware is a myth. Most of the Hadoop process is I/O-intensive and requires homogenous and mid-end servers to performance well.
In this blog post, we are going to explore the details of the Apache Ignite in-memory MapReduce.
The Ignite in-memory MapReduce engine is 100% compatible with Hadoop HDFS and Yarn. It reduces the startup and the execution time of the Hadoop job tracker and the task tracker. Ignite in-memory MapReduce provides dramatic performance boosts for CPU-intensive tasks while requiring an only minimal change to existing applications. This module also provides an implementation of weight based MapReduce planner, which assigns mappers and reducers based on their weights. Weight describes how much resources are required to execute the particular map and reduce task. This planning algorithm assigns mappers so that, total resulting weight on all nodes as minimal as possible.
High-level architecture of the Ignite in-memory MapReduce is shown below:
Figure 3.
Ignite in-memory grid has a pre-stage Java-based execution environment on all grid nodes and reuses it for multiple data processing. This execution environment consists of a set of Java virtual machines one on each server within the cluster. This JVM’s forms the Ignite MapReduce engine as shown in the above figure. Also, the Ignite in-memory data grid can automatically deploy all necessary executable programs or libraries for the execution of the MapReduce across the grid, this greatly reducing the startup time down to milliseconds.
Now that we have got the basics, let’s try to configure the sandbox and execute a few MapReduce jobs in Ignite MapReduce engine.
For simplicity, we are going to install a Hadoop pseudo-distributed cluster in a single virtual machine and run the famous Hadoop word count example as a MapReduce job. The Hadoop pseudo-distributed cluster means that Hadoop data nodes, name nodes, task trackers and job trackers — everything will be on one virtual (host) machine.
Let’s have a look at our sandbox configuration as shown below.
  1. OS: RedHat enterprise Linux.
  2. CPU: 2.
  3. RAM: 2Gb.
  4. JVM: 1.7_60.
  5. Ignite version: 1.6 or above, single node cluster.
First, we are going to install and configure Hadoop and will proceed to Apache Ignite. Assume that Java has been installed and that JAVA_HOME is in the environment variables.

1. Unpack the Hadoop Distribution Archive

Unpack the Hadoop distribution archive and set the JAVA_HOME path in etc/.
 export JAVA_HOME=JAVA_HOME_PATH 

2. Add Configurations

Add the following configuration in the etc/hadoop/core-site.xml file:
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>
Also, append the following data replication strategy into the etc/hadoop/hdfs-site.xml file:
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>

3. Set Up Password-Less SSH

Set up password-less or passphrase-less SSH for your operating system:
$ ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
$ cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys
$ chmod 0600 ~/.ssh/authorized_keys
Try the following command on your console:
$ ssh localhost
It shouldn’t ask you for input password.

4. Set Up the Hadoop HDFS File System

Format the Hadoop HDFS file system:
$ bin/hdfs namenode -format
Next, start the namenode/datanode daemon by the following command:
$ sbin/start-dfs.sh
Also, I would like to suggest that you add the HADOOP_HOME environmental variable to the operating system.

5. Set Up Directories

Make a few directories in the HDFS file system to run MapReduce jobs.
bin/hdfs dfs -mkdir /user
bin/hdfs dfs -mkdir /input
The above command will create two folder user and input in HDFS file system. Insert some text files in directory input: bin/hdfs dfs -put $HADOOP_HOME/etc/hadoop /input .

6. Configure the Hadoop Pseudo Cluster

Run the Hadoop native MapReduce application to count the words of the file.
$ bin/hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.2.jar w\
ordcount /input/hadoop output
You can view the result of the word count with the following command:
bin/hdfs dfs -cat output/*
In my case, the file is huge. Let’s see the fragment of the file:
want 1 warnings. 1 when 9 where 4 which 7 while 1 who 6
will 23
window 1
window, 1
with 62
within 4
without 1
work 12
writing, 27
In this stage, our Hadoop pseudo cluster is configured and ready to use. Now, let’s start configuring Apache Ignite.

7. Unpack Apache Ignite Distribution

Unpack the distribution of Apache Ignite somewhere in your sandbox and add the IGNITE_- HOME path to the root directory of the installation. For getting statistics about tasks and executions, you have to add the following properties in your /config/default-config.xml file:
<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
  ....
<property name="includeEventTypes">
<list>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_JOB_MAPPED"/>
</list>
</property>
</bean>
The above configuration will enable the event task for statistics.
Note that, by default, all events are disabled. Whenever these above events are enabled, you can use the command “tasks” in Ignite Visor to get statistics about tasks executions.

8. Add Libraries

Add the following libraries in the $IGNITE_HOME/libs directory.
asm-all-4.2.jar
ignite-hadoop-1.6.0.jar
hadoop-mapreduce-client-core-2.7.2.jar
hadoop-common-2.7.2.jar
hadoop-auth-2.7.2.jar
Note that the asm-all-4.2.jar library version is dependent on your Hadoop version.

9. Start the Ignite Node

We are going to use the Apache Ignite default configuration config/default-config.xml file to start the Ignite node. Start the Ignite node with the following command:
bin/ignite.sh

10. Finish Setting Up Ignite Job Tracker

Add a few more things to use the Ignite job tracker instead of Hadoop. Add HADOOP_CLASSPATH to the environmental variables as follows.
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$IGNITE_HOME/libs/ignite-core-1.6.0.jar:$IGNITE_\
HOME/libs/ignite-hadoop-1.6.0.jar:$IGNITE_HOME/libs/ignite-shmem-1.0.0.jar

11. Override

In this stage, we are going to override the Hadoop mapred-site.xml file. For a quick start, add the following fragment of XML to the mapred-site.xml:
<property>
<name>mapreduce.framework.name</name>
<value>ignite</value>
</property>
<property>
<name>mapreduce.jobtracker.address</name>
<value>127.0.0.1:11211</value>
</property>

12. Run It

Run the above example of the word count MapReduce example again.
$bin/hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.2.jar wo\
rdcount /input/hadoop output2
The output should be very similar, as shown in Figure 4.
Figure 4.
Now, the execution time is faster than the last time that we used the Hadoop task tracker. Let’s examine the Ignite task execution statistics through Ignite Visor.
Figure 5.
From the above figure, we should notice the total executions and the durations times of the in-memory task tracker. In our case, the total executions task (HadoopProtocolJobStatusTask(@t1)) is 24 and the execution rate are 12 second.
In the next part, we will execute a few benchmarks for demonstrating the performance advantage of the Ignite MapReduce engine.