Skip to content

Using a customer converter (2.0)

Drew Pirrone-Brusse edited this page Mar 15, 2017 · 7 revisions

Note: This document is for the 2.x Java Client series.

Why would I need a custom Converter?

When storing data in or retrieving data from Riak, a Converter is used to serialize the Java object passed in. When no Converter is specified and you're using your own Domain Object (POJO) the default JSONConverter is used. Behind the scenes this takes your object, uses the Jackson JSON library to serialize it, and stores the resulting JSON text in Riak. This can be advantageous if, for example, you are planning on accessing this data from other (non-java) applications.

If you wanted to use a different serialization library, a custom Converter would be required. Let's look at how you would write a Converter to use the popular Kryo serialization library to serialize and deserialize your objects.

Example #1: The Basic Converter
Example #2: Converter that supports links, metadata, and Secondary Indexes

To build and run these examples you'll need to include both the kryo and riak-client libraries. The easiest way to accomplish this is via Maven:

<dependencies>
  <dependency>
    <groupId>com.basho.riak</groupId>
    <artifactId>riak-client</artifactId>
    <version>2.0.6</version>
  </dependency>
  <dependency>
    <groupId>com.googlecode</groupId>
    <artifactId>kryo</artifactId>
    <version>1.04</version>
  </dependency>
</dependencies>

Example #1: The basic converter

Note this example is very basic. We're just covering the use of Kryo in a Converter. In this example any links, user metadata, indexes, etc. would be lost in translation. We'll cover that in the next example.

Main.java

import com.basho.riak.client.api.RiakClient;
import com.basho.riak.client.api.commands.kv.FetchValue;
import com.basho.riak.client.api.commands.kv.StoreValue;
import com.basho.riak.client.api.convert.ConverterFactory;
import com.basho.riak.client.core.query.Location;

import java.net.UnknownHostException;
import java.util.concurrent.ExecutionException;

public class Main 
{
    public static void main(String[] args) throws UnknownHostException, ExecutionException, InterruptedException
    {
        ConverterFactory factory = ConverterFactory.getInstance();
        KryoPersonConverter converter = new KryoPersonConverter();
        factory.registerConverterForClass(Person.class, converter);

        RiakClient client = RiakClient.newClient(10017, "127.0.0.1");

        Person p = new Person("Brian Roach", "1111 Basho Drive", "555-1212");

        final StoreValue store = new StoreValue.Builder(p).build();
        client.execute(store);

        final FetchValue fetch = new FetchValue.Builder(new Location(p.getPersonNamespace(), p.getName())).build();
        final FetchValue.Response fetchResp = client.execute(fetch);
        final Person fetchedPerson = fetchResp.getValue(Person.class);

        System.out.println(fetchedPerson.getName());
        System.out.println(fetchedPerson.getAddress());
        System.out.println(fetchedPerson.getPhone());

        client.shutdown();
    }
}

Person.java

import com.basho.riak.client.api.annotations.RiakBucketName;
import com.basho.riak.client.api.annotations.RiakKey;
import com.basho.riak.client.core.query.Namespace;

public class Person
{
    // The @RiakKey annotation marks the field you want to use as the Key in Riak
    @RiakKey
    private String name;

    @RiakBucketName
    private String riakBucketName = "people";

    private String address;
    private String phone;

    public Person() {}

    public Person(String name, String address, String phone)
    {
        this.name = name;
        this.address = address;
        this.phone = phone;
    }

    public String getName()
    {
        return name;
    }

    public void setName(String name)
    {
        this.name = name;
    }

    public String getAddress()
    {
        return address;
    }

    public void setAddress(String address)
    {
        this.address = address;
    }

    public String getPhone()
    {
        return phone;
    }

    public void setPhone(String phone)
    {
        this.phone = phone;
    }

    Namespace getPersonNamespace()
    {
        return new Namespace(riakBucketName);
    }
}

KryoPersonConverter.java

import com.basho.riak.client.api.convert.ConversionException;
import com.basho.riak.client.api.convert.Converter;
import com.basho.riak.client.core.util.BinaryValue;
import com.basho.riak.client.core.util.Constants;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;

import java.io.ByteArrayOutputStream;


public class KryoPersonConverter extends Converter<Person>
{
    public KryoPersonConverter()
    {
        super(Person.class);
    }

    public Person toDomain(BinaryValue value, String contentType) throws ConversionException
    {
        if (value == null)
        {
            return null;
        }

        final Kryo kryo = new Kryo();
        kryo.register(Person.class);
        final Input input = new Input(value.getValue());

        final Person person = kryo.readObject(input, Person.class);
        input.close();
        return person;
    }

    public ContentAndType fromDomain(Person domainObject) throws ConversionException
    {
        Kryo kryo = new Kryo();
        kryo.register(Person.class);

        Output out = new Output(new ByteArrayOutputStream());

        kryo.writeObject(out, domainObject);

        return new ContentAndType(BinaryValue.create(out.toBytes()), Constants.CTYPE_OCTET_STREAM);
    }
}

Example #2: Converter with secondary indexes, and user metadata

This Converter preserves all the Riak data when reading/writing. This example also demonstrates how to use Secondary Indexes (2i) with your serialized objects.

Main.java

import com.basho.riak.client.api.RiakClient;
import com.basho.riak.client.api.commands.indexes.BinIndexQuery;
import com.basho.riak.client.api.commands.kv.FetchValue;
import com.basho.riak.client.api.commands.kv.StoreValue;
import com.basho.riak.client.api.convert.ConverterFactory;

import java.net.UnknownHostException;
import java.util.concurrent.ExecutionException;

public class Main 
{
    public static void main(String[] args) throws UnknownHostException, ExecutionException, InterruptedException
    {
        ConverterFactory factory = ConverterFactory.getInstance();
        KryoPersonConverter converter = new KryoPersonConverter();
        factory.registerConverterForClass(Person.class, converter);

        RiakClient client = RiakClient.newClient(10017, "127.0.0.1");

        Person p = new Person("Brian", "Roach", "1111 Basho Drive", "555-1212", "engineer");
        Person p2 = new Person("Joe", "Smith", "1111 Basho Drive", "555-1211", "engineer");


        client.execute(new StoreValue.Builder(p).build());
        client.execute(new StoreValue.Builder(p2).build());

        // Get the list of keys using the index name we declared in our Person Object
        BinIndexQuery biq = new BinIndexQuery.Builder(p.getPersonNamespace(), "job_title", "engineer").build();
        final BinIndexQuery.Response indexResponse = client.execute(biq);

        for (BinIndexQuery.Response.Entry idxE : indexResponse.getEntries())
        {
            final FetchValue.Response execute =
                    client.execute(new FetchValue.Builder(idxE.getRiakObjectLocation()).build());

            final Person p3 = execute.getValue(Person.class);
            System.out.println(p3.getFullName());
            System.out.println(p3.getAddress());
            System.out.println(p3.getPhone());
            System.out.println(p3.getJobTitle());
            System.out.println();
        }

        client.shutdown();
    }
}

Person.java

import com.basho.riak.client.api.annotations.RiakIndex;
import com.basho.riak.client.api.annotations.RiakBucketName;
import com.basho.riak.client.api.annotations.RiakKey;
import com.basho.riak.client.api.annotations.RiakUsermeta;
import com.basho.riak.client.core.query.Namespace;

import java.util.Map;

public class Person
{
    // The @RiakKey annotation marks the field you want to use as the Key in Riak
    @RiakKey
    private String lastName;

    @RiakBucketName
    private String riakBucketName = "people";

    // Marked transient so kryo doesn't serialize them
    // The KryoPersonConverter will inject these from Riak
    @RiakIndex(name = "full_name")
    transient private String fullName;

    @RiakIndex(name = "job_title")
    transient private String jobTitle;

    @RiakUsermeta
    transient private Map<String, String> usermetaData;

    private String firstName;
    private String address;
    private String phone;

    public Person() {}

    public Person(String firstName, String lastName, String address, String phone, String title)
    {
        this.firstName = firstName;
        this.lastName = lastName;
        this.address = address;
        this.phone = phone;
        this.fullName = firstName + " " + lastName;
        this.jobTitle = title;
    }

    public String getFirstName()
    {
        return firstName;
    }

    public void setFirstName(String firstName)
    {
        this.firstName = firstName;
        this.setFullName(firstName + " " + lastName);
    }

    public String getLastName()
    {
        return lastName;
    }

    public void setLastName(String lastName)
    {
        this.lastName = lastName;
        this.setFullName(firstName + " " + lastName);
    }

    public String getAddress()
    {
        return address;
    }

    public void setAddress(String address)
    {
        this.address = address;
    }

    public String getPhone()
    {
        return phone;
    }

    public void setPhone(String phone)
    {
        this.phone = phone;
    }

    public String getJobTitle()
    {
        return jobTitle;
    }

    public void setJobTitle(String jobTitle)
    {
        this.jobTitle = jobTitle;
    }

    public String getFullName()
    {
        return fullName;
    }

    public void setFullName(String fullName)
    {
        this.fullName = fullName;
    }

    Namespace getPersonNamespace()
    {
        return new Namespace(riakBucketName);
    }
}

KryoPersonConverter.java

import com.basho.riak.client.api.convert.ConversionException;
import com.basho.riak.client.api.convert.Converter;
import com.basho.riak.client.core.util.BinaryValue;
import com.basho.riak.client.core.util.Constants;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;

import java.io.ByteArrayOutputStream;


public class KryoPersonConverter extends Converter<Person>
{
    public KryoPersonConverter()
    {
        super(Person.class);
    }

    public Person toDomain(BinaryValue value, String contentType) throws ConversionException
    {
        if (value == null)
        {
            return null;
        }

        final Kryo kryo = new Kryo();
        kryo.register(Person.class);
        final Input input = new Input(value.getValue());

        final Person person = kryo.readObject(input, Person.class);
        input.close();
        return person;
    }

    public ContentAndType fromDomain(Person domainObject) throws ConversionException
    {
        Kryo kryo = new Kryo();
        kryo.register(Person.class);

        Output out = new Output(new ByteArrayOutputStream());

        kryo.writeObject(out, domainObject);

        return new ContentAndType(BinaryValue.create(out.toBytes()), Constants.CTYPE_OCTET_STREAM);
    }
}