Using HBase REST API with the Knox Java client

I’ve already introduced Knox in a previous post in order to deploy Spark Job with Knox using the Java client. This post is still about the Knox Java client, but we’ll see here an other usage with HBase. HBase provides a well documented and rich REST API with many endpoints exposing the data in various formats (JSON, XML and Protobuf!).

First, we need to import the dependencies for the Knox Java client:

<dependency>
    <groupId>org.apache.knox</groupId>
    <artifactId>gateway-shell</artifactId>
    <version>0.10.0</version>
</dependency>

Then, let’s write some code:

Hadoop session = Hadoop.login("https://$KNOX_SERVER:8443/gateway/default", "user", "password");
String tableName = "ns1:my_table";

TableList.Response tables = HBase.session(session).table().list().now();
System.err.println(tables.getString());

TableSchema.Response schema = HBase.session(session).table(tableName).schema().now();
System.err.println(schema.getString());

String scannerId = HBase.session(session).table(tableName).scanner().create()
    .column("d")
    .startRow("aaa")
    .endRow("aab")
    .batch(100)
        .filter(
                "{" +
                        " \"type\": \"SingleColumnValueFilter\"," +
                        " \"op\": \"EQUAL\"," +
                        " \"family\": \"" + b64Encode("cf") + "\"," +
                        " \"qualifier\": \"" + b64Encode("col1") + "\"," +
                        " \"latestVersion\": true, " +
                        " \"ifMissing\":true," +
                        " \"comparator\": {" +
                        " \"type\": \"BinaryComparator\"," +
                        " \"value\": \"" + b64Encode("my_value") + "\"" +
                        " }" +
                        "}"
        )
        .maxVersions(1)
        .now().getScannerId();

ScannerGetNext.Response response = HBase.session(session).table(tableName).scanner(scannerId)
        .getNext()
        .now();

First problem, the documentation doesn’t detail how to use the filter in HBase via the REST API. After looking in the HBase source code, you can use the method org.apache.hadoop.hbase.rest.model.ScannerModel.stringifyFilter if you want to have to look of the JSON representation of a filter:

KeyOnlyFilter filter = new KeyOnlyFilter();
String stringified = ScannerModel.stringifyFilter(filter);
System.err.println(stringified);

For examples, here is a list of filters that you can use:

{"type":"QualifierFilter","op":"EQUAL","comparator":{"type":"BinaryComparator","value":"MTIz"}}
{"type":"ColumnCountGetFilter","limit":3}
{"type":"RowFilter","op":"EQUAL","comparator":{"type":"BinaryComparator","value":"dHl1aQ=="}}
{"type":"FamilyFilter","op":"EQUAL","comparator":{"type":"BinaryComparator","value":"dHl1aQ=="}}
{"type":"PageFilter","value":"1000"}
{"type":"FirstKeyOnlyFilter"}
{"type":"KeyOnlyFilter"}
{"type":"MultipleColumnPrefixFilter","prefixes":["Zmdo","dHl1aQ=="]}
{"type":"ValueFilter","op":"EQUAL","comparator":{"type":"BinaryComparator","value":"MTIz"}}
{"type":"ColumnRangeFilter","minColumn":"Yg==","minColumnInclusive":true,"maxColumn":"ZA==","maxColumnInclusive":true}
{"type":"SingleColumnValueFilter","op":"EQUAL","family":"ZA==","qualifier":"ZGZn","latestVersion":true,"comparator":{"type":"BinaryComparator","value":"Y3Zi"}}
{"type":"RandomRowFilter","chance":0.1}
{"type":"PrefixFilter","value":"Z2V0"}
{"type":"WhileMatchFilter","filters":[{"type":"PrefixFilter","value":"Z2V0"}]}
{"type":"InclusiveStopFilter","value":"ZW5kS2V5"}
{"type":"ColumnPrefixFilter","value":"cHJlZml4"}
{"type":"SkipFilter","filters":[{"type":"ColumnPrefixFilter","value":"cHJlZml4"}]}
{"type":"TimestampsFilter","timestamps":["1","2","3"]}
{"type":"ColumnPaginationFilter","limit":10,"offset":0}
{"type":"FilterList","op":"MUST_PASS_ALL","filters":[{"type":"SingleColumnValueFilter","op":"EQUAL","family":"ZA==","qualifier":"ZGZn","latestVersion":true,"comparator":{"type":"BinaryComparator","value":"Y3Zi"}},{"type":"RowFilter","op":"EQUAL","comparator":{"type":"BinaryComparator","value":"dHl1aQ=="}}]}

An other problem is that the Knox client is not very useful for dealing with the response. All you can get from the the response is a String, an array of bytes or an InputStream. We can use Jackson to parse the raw response. Let’s create some POJO mapping the JSON response from HBase:

@Data
public class Response {
    @JsonProperty("Row")
    private List<Row> rows;
}

@Data
public class Row {
    @JsonProperty("key")
    private String key;
    @JsonProperty("Cell")
    private List<Cell> cells;
}

@Data
public class Cell {
    @JsonProperty("column")
    private String column;
    @JsonProperty("timestamp")
    private long timestamp;
    @JsonProperty("$")
    private Object value;
}

We need Lombok to use the @Data annotation:

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.16.6</version>
</dependency>

Now, we can parse the response:

ObjectMapper mapper = new ObjectMapper();
ScannerGetNext.Response rawResponse = HBase.session(session).table(tableName).scanner(scannerId)
                .getNext()
                .now();
Response response = mapper.readValue(rawResponse.getString(), Response.class);

But wait: remember that the HBase REST server returns content encoded reponse with base64. So all your content, like the column family, the qualifier and the raw content will be encoded. We just need to create a custom JSON deserializer:

public class Base64Deserializer extends StdDeserializer<Object> {

    protected Base64Deserializer() {
        super(Object.class);
    }

    @Override
    public Object deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException {
        String value = jsonParser.getText();
        byte[] decode = Base64.getDecoder().decode(value);
        return new String(decode, StandardCharsets.UTF_8);
    }
}

Of course, this deserializer is very basic and assumes that all values are String. For advanced usages, we’ll have to manage with the type of the data. We use this deserializer on the key, the column and the value:

@Data
public class Response {
    @JsonProperty("Row")
    private List<Row> rows;
}

@Data
public class Row {
    @JsonProperty("key")
    @JsonDeserialize(using = Base64Deserializer.class)
    private String key;
    @JsonProperty("Cell")
    private List<Cell> cells;
}

@Data
public class Cell {
    @JsonProperty("column")
    @JsonDeserialize(using = Base64Deserializer.class)
    private String column;
    @JsonProperty("timestamp")
    private long timestamp;
    @JsonProperty("$")
    @JsonDeserialize(using = Base64Deserializer.class)
    private Object value;
}

Finally, our code looks like this:

String scannerId = HBase.session(session).table(tableName).scanner().create()
        .column("d")
        .startRow("t_")
        .endRow("z")
        .batch(100)
        .filter(
                "{" +
                        " \"type\": \"SingleColumnValueFilter\"," +
                        " \"op\": \"EQUAL\"," +
                        " \"family\": \"" + b64Encode("cf") + "\"," +
                        " \"qualifier\": \"" + b64Encode("col1") + "\"," +
                        " \"latestVersion\": true, " +
                        " \"ifMissing\":true," +
                        " \"comparator\": {" +
                        " \"type\": \"BinaryComparator\"," +
                        " \"value\": \"" + b64Encode("my_value") + "\"" +
                        " }" +
                        "}"
        )
        .maxVersions(1)
        .now().getScannerId();

ObjectMapper mapper = new ObjectMapper();

try {
    boolean loop = true;
    while (loop) {
        ScannerGetNext.Response rawResponse = HBase.session(session).table(tableName).scanner(scannerId)
                .getNext()
                .now();
        if (200 != rawResponse.getStatusCode()) {
            break;
        }
        Response response = mapper.readValue(rawResponse.getString(), Response.class);
        System.err.println(response);
    }
} finally {
    HBase.session(session).table(tableName).scanner(scannerId).delete().now();
}

Sources:
http://hbase.apache.org/book.html#_rest
https://cwiki.apache.org/confluence/display/KNOX/Examples+HBase

Credits:
“Processing 06” by Carsten is licensed under CC BY-SA 2.0 / Resized

Related Posts

Comments (1)

[…] already post articles about Knox some weeks ago about two subjects: how to use the HBase REST API througth Knox and how to submit Spark job via the Knox API. In my current mission, many projects are now using […]

Leave a comment