Skip to content

Fetching Data from Riak (v2.0)

Alex Moore edited this page Jun 25, 2015 · 5 revisions

Note: This document is for the 2.x Java Client series.

First, a word or two about Riak, CAP Theorem and eventual consistency.

Unless you're already familiar with CAP Theorem and eventual consistency, taking the time to read through at least Why Riak would be well worth your while.

It's ok, we'll wait.

Ok! Now that you've read through that and understand that Riak is a system that favors AP with eventual C, this might make some sense to you.

Fetching data in Riak with the Java client.

In Riak data is stored in buckets, and buckets can be grouped further by an optional bucket type. Buckets and bucket types have a number of options and tunable parameters, one of which is whether or not to allow sibling records. If you are using a bucket in the default bucket type, that bucket won't allow sibling creation by default. If you are using a bucket in a specific bucket type, that bucket will allow sibling creation by default. The Riak Java client is built to support both scenarios, you can simply say "fetch the data associated with this key", and by calling .getValue() you can fetch the data associated with the single object. You can also get all the sibling's values by calling .getValues() and all the siblings will be returned.

This of course does not reflect how you must use the client if your application is doing a typical read/modify/write cycle and you have multiple threads or instances of your application causing concurrency. We'll discuss that in the advanced section below.

With that in mind, the following basic examples show how you can retrieve data from Riak.

Basic Example #1: Fetch data as a String
Basic Example #2: Fetch JSON data, map to POJO
Basic Example #3: Changing query parameters

## Basic Fetch as a String
import com.basho.riak.client.api.RiakClient;
import com.basho.riak.client.api.commands.kv.FetchValue;
import com.basho.riak.client.api.commands.kv.StoreValue;
import com.basho.riak.client.core.query.Location;
import com.basho.riak.client.core.query.Namespace;

import java.net.UnknownHostException;
import java.util.concurrent.ExecutionException;

public class FetchObject {
    public static void main(String [] args) throws UnknownHostException, ExecutionException, InterruptedException {

        RiakClient client = RiakClient.newClient(10017, "127.0.0.1");
        Location location = new Location(new Namespace("TestBucket"),"TestKey");

        FetchValue fv = new FetchValue.Builder(location).build();
        FetchValue.Response response = client.execute(fv);

        // Fetch object as String
        String value = response.getValue(String.class);
        System.out.println(value);

        client.shutdown();
    }
}
## Fetch JSON data, map to POJO

By default, the Riak Java client provides a default Converter (see the advanced section below for more on this) that will automatically map JSON stored in Riak to a POJO class you provide.

import com.basho.riak.client.api.RiakClient;
import com.basho.riak.client.api.commands.kv.FetchValue;
import com.basho.riak.client.api.commands.kv.StoreValue;
import com.basho.riak.client.core.query.Location;
import com.basho.riak.client.core.query.Namespace;

import java.net.UnknownHostException;
import java.util.concurrent.ExecutionException;

public class FetchObject {

    // { "foo":"Some String", "bar":"some other string","foobar":5 }
    static class Pojo {
        public String foo;
        public String bar;
        public int foobar;
    }

    public static void main(String [] args) throws UnknownHostException, ExecutionException, InterruptedException {

        RiakClient client = RiakClient.newClient(10017, "127.0.0.1");
        Location location = new Location(new Namespace("TestBucket"), "TestKey");

        FetchValue fv = new FetchValue.Builder(location).build();
        FetchValue.Response response = client.execute(fv);

        // Fetch object as Pojo class (map json to object)
        Pojo myObject = response.getValue(Pojo.class);
        System.out.println(myObject.foo);

        client.shutdown();
    }
}
## Fetch data, changing query parameters for just this request ```java import com.basho.riak.client.api.RiakClient; import com.basho.riak.client.api.cap.Quorum; import com.basho.riak.client.api.commands.kv.StoreValue; import com.basho.riak.client.core.query.Location; import com.basho.riak.client.core.query.Namespace;

import java.net.UnknownHostException; import java.util.concurrent.ExecutionException; public static void main(String [] args) throws UnknownHostException, ExecutionException, InterruptedException {

    RiakClient client = RiakClient.newClient(10017, "127.0.0.1");
    Location location = new Location(new Namespace("TestBucket"), "TestKey");

    FetchValue fv = new FetchValue.Builder(location)
                    .withOption(StoreValue.Option.R, Quorum.oneQuorum())
                    .withOption(StoreValue.Option.PR, Quorum.oneQuorum()).build();
    FetchValue.Response response = client.execute(fv);

    // Fetch object as Pojo class (map json to object)
    Pojo myObject = response.getValue(Pojo.class);
    System.out.println(myObject.foo);

    client.shutdown();
}

}


<hr>
<a name="advanced"/>
# The Hard Way
## Eventual Consistency; Resolvers, and Converters

In many environments, you're going to configure your buckets to allow siblings and write the code that deals with them. 

It's worth mentioning here that there's two ways to handle a read/modify/write cycle with the Java client; either doing a fetch, modifying the object, then storing it back to Riak in a separate store operation or encapsulating the entire read/modify/write cycle in the store operation. Note that if you plan to do the former and your own POJO you must include a `byte[]` or `VClock` field annotated with the `@RiakVClock` annotation. This preserves the vector clock in your POJO and is used during the subsequent store operation. For more information on this please see the section on [Storing data in Riak](https://github.com/basho/riak-java-client/wiki/Storing-data-in-riak).

There are two Interfaces you're going to be using:

* [ConflictResolver&lt;T&gt;](http://basho.github.io/riak-java-client/2.0.1/com/basho/riak/client/api/cap/ConflictResolver.html)<BR>
    This Interface is used to resolve sibling records returned by Riak. (This interface is identical to the one used in the 1.x Java Client series)
* [Converter&lt;T&gt;](http://basho.github.io/riak-java-client/2.0.1/com/basho/riak/client/api/convert/Converter.html)<br>
    This interface is used to serialize/deserialize data to/from Riak.


Here's the anatomy of making a fetch request with a conflict resolver and a converter.

<a name="figure1"/>
### Figure 1
![Riak fetch](http://dl.dropbox.com/u/74693818/RJC-fetch-v2.png)

There are a few different ways to fetch & use data from Riak, but all of them use the [FetchOperation](http://basho.github.io/riak-java-client/2.0.1/com/basho/riak/client/core/operations/FetchOperation.html) class. The difference is with what `getValue()` method you use, and how you've configured your Conflict Resolvers and Converters.

If you do not provide a ConflictResolver, an instance of [DefaultResolver&lt;T&gt;](http://basho.github.io/riak-java-client/2.0.1/com/basho/riak/client/api/cap/DefaultResolver.html) is provided. This is actually not really a resolver at all; it throws an exception if siblings are present. If you do not provide a Converter, the [JSONConverter&lt;T&gt;](http://basho.github.io/riak-java-client/2.0.1/com/basho/riak/client/api/convert/JSONConverter.html) is provided. This Converter uses the Jackson JSON library to deserialize your POJO from JSON stored in Riak. For an example of implementing a customer converter that uses a different serialization library, check out [[using a customer converter|Using-a-custom-Converter]]. 

The following example demonstrates the use of these interfaces and your own POJO. These are the same implementations we use for our [[Advanced Examples for storing data in Riak|Storing-data-in-riak (v2.0)#wiki-advanced]] and model a game "leaderboard" system. 

### App.java
```java
package Leaderboard;

import com.basho.riak.client.api.RiakClient;
import com.basho.riak.client.api.cap.ConflictResolverFactory;
import com.basho.riak.client.api.commands.buckets.StoreBucketProperties;
import com.basho.riak.client.api.commands.kv.FetchValue;
import com.basho.riak.client.api.commands.kv.StoreValue;
import com.basho.riak.client.core.query.Location;
import com.basho.riak.client.core.query.Namespace;

import java.net.UnknownHostException;
import java.util.Random;
import java.util.concurrent.ExecutionException;

public class App {
    public static void main(String [] args) throws UnknownHostException, ExecutionException, InterruptedException {

        // We need some data, of course
        String playerNames[] = {"Steve","Brian","Bob" };
        Random generator = new Random();
        GameLeaderboard gl = new GameLeaderboard("SuperCoolGame");

        for (int i = 0; i < 5; i++)
        {
            NameScorePair nsp = new NameScorePair(playerNames[(i+3)%3], generator.nextInt(100));
            gl.addScore(nsp);
        }

        // Store our initial leaderboard in Riak
        RiakClient client = RiakClient.newClient(10017, "127.0.0.1");
        Namespace ns = new Namespace("demo_bucket");
        setAllowMultForBucket(client, ns);

        // Register our custom conflict resolver,
        // This tells the Java client to resolve siblings of GameLeaderboard with GameLeaderboardResolver
        ConflictResolverFactory.INSTANCE.registerConflictResolver(GameLeaderboard.class, new GameLeaderboardResolver());


        // If you don't specify a location to the builder, it will look on the object for fields noted with
        // @RiakBucketType, @RiakBucketName, and @RiakKey annotations to build the location automatically
        StoreValue storeBoard = new StoreValue.Builder(gl).build();
        client.execute(storeBoard);

        FetchValue leaderboardFetch =
                new FetchValue.Builder(
                    new Location(new Namespace("demo_bucket"), "SuperCoolGame")).build();

        FetchValue.Response response = client.execute(leaderboardFetch);

        // If there are any siblings they will be resolved on calling `getValue()`
        GameLeaderboard fetchedGL = response.getValue(GameLeaderboard.class);

        // Ouput the results!
        for ( NameScorePair n : fetchedGL.getScoreList())
        {
            System.out.println(n.getName() + " " + n.getScore());
        }

    }

    public static void setAllowMultForBucket(RiakClient c, Namespace ns)
            throws ExecutionException, InterruptedException {

        StoreBucketProperties storeBucketProperties =
                new StoreBucketProperties.Builder(ns)
                .withAllowMulti(true).build();

        c.execute(storeBucketProperties);
    }
}

GameLeaderboardResolver.java

package Leaderboard;

import com.basho.riak.client.api.cap.ConflictResolver;
import com.basho.riak.client.api.cap.UnresolvedConflictException;

import java.util.Iterator;
import java.util.List;

public class GameLeaderboardResolver implements ConflictResolver<GameLeaderboard>
{

    /*
     * Riak hands us a list of GameLeaderboard objects. Our job is to reconcile
     * those objects and return a single, resolved GameLeaderboard
     *
     * In this example, the logic is pretty straightforward. in our GameLeaderboard
     * class we created a addScores(Collection<NameScorePair>) method that will do the
     * heavy lifting for us. By adding all the lists into one GameLeaderboard
     * via that method, we end up with the top 5 scores from all the siblings
     *
     * Worth noting is that your ConflictResolver is *always* called, even if
     * there are no siblings, or even if there is no object in Riak
     */

    public GameLeaderboard resolve(List<GameLeaderboard> siblings) throws UnresolvedConflictException
    {
        if (siblings.size() > 1)
        {
            // We have siblings, need to resolve them
            Iterator<GameLeaderboard> i = siblings.iterator();

            GameLeaderboard resolvedLeaderboard = new GameLeaderboard(i.next());

            while (i.hasNext())
            {
                resolvedLeaderboard.addScores(i.next().getScoreList());
            }

            return resolvedLeaderboard;
        }
        else if (siblings.size() == 1)
        {
            // Only one object - just return it
            return siblings.iterator().next();
        }
        else
        {
            // No object returned - return null
            return null;
        }
    }

}

GameLeaderboard.java

package Leaderboard;

import com.basho.riak.client.api.annotations.RiakBucketName;
import com.basho.riak.client.api.annotations.RiakKey;
import com.basho.riak.client.api.annotations.RiakVClock;

import java.util.ArrayList;
import java.util.Collection;
import java.util.TreeSet;

public final class GameLeaderboard
{
    @RiakBucketName private String gameBucket = "demo_bucket";
    @RiakKey private String gameName;
    @RiakVClock private byte[] vClock = new byte[0]; // Used for Riak version tracking

    private TreeSet<NameScorePair> scoreList = new TreeSet<NameScorePair>();

    // required by Jackson for JSON serialization
    public GameLeaderboard() {}

    public GameLeaderboard(String gameName)
    {
        this.gameName = gameName;
    }

    public GameLeaderboard(GameLeaderboard other)
    {
        this.gameName = other.getGameName();
        this.addScores(other.getScoreList());
    }

    public void addScore(NameScorePair s)
    {
        scoreList.add(s);
        if (scoreList.size() > 5)
            scoreList.pollFirst();
    }

    public void addScores(Collection<NameScorePair> scores)
    {
        scoreList.addAll(scores);
        while (scoreList.size() > 5)
            scoreList.pollFirst();

    }

    public String getGameName()
    {
        return gameName;
    }

    public ArrayList<NameScorePair> getScoreList()
    {
        return new ArrayList<NameScorePair>(scoreList.descendingSet());
    }
}

NameScorePair.java

package Leaderboard;

public class NameScorePair implements Comparable<NameScorePair>
{
    private String name;
    private int score;

    // Required by Jackson for JSON serialization
    public NameScorePair() {}

    public NameScorePair(String name, int score)
    {
        this.name = name;
        this.score = score;
    }

    public int compareTo(NameScorePair t)
    {
        if (this.getScore() < t.getScore())
            return -1;
        else if (this.getScore() > t.getScore())
            return 1;
        else if (this.getName().equalsIgnoreCase(name))
            return 0;
        else
            return -1;
    }

    @Override
    public int hashCode()
    {
        int hash = 3;
        hash = 47 * hash + (this.name != null ? this.name.hashCode() : 0);
        hash = 47 * hash + this.score;
        return hash;
    }

    @Override
    public boolean equals(Object o)
    {
        if (o == null)
        {
            return false;
        }
        else if (o instanceof NameScorePair)
        {
            return ((name.equalsIgnoreCase(((NameScorePair)o).getName())) &&
                    (score == ((NameScorePair)o).getScore()));
        }
        else
            return false;
    }

    public int getScore()
    {
        return score;
    }

    public String getName()
    {
        return name;
    }
}