Knox in production: avoid pitfalls and common mistakes

I’ve already post articles about Knox some weeks ago about two subjects: how to use the HBase REST API througth Knox and how to submit Spark job via the Knox API. In my current mission, many projects are now using Knox as main gateway for many services like HBase and HDFS, but also for Oozie, Yarn… After some weeks of development and deployment in production, I’ve decided to write a post about some troubles that you may encouter when you are using the Java client.

NOTE
I’m a great supporter of Knox, this post doesn’t aim to burn Knox on the place at all, but only to present the common errors done when developing code around the client and some improvements that should be

Client configuration

The Knox Java client uses the HttpClient from httpcomponent. I wrote some posts on it and I found that not having the possibility to build a custom client can be a pain if you want to add headers, limit the length of the message send, define specific strategy on dns resolution or hostname verification… And this has a direct impact on the performances on your application. If you look at the source code of the org.apache.hadoop.gateway.shell.Hadoop class, the connection pool is built with the default params, which means according to the javadoc, only 2 connections by route:

PoolingConnectionManager maintains a maximum limit of connection on a per route basis and in total. Per default this implementation will create no more than 2 concurrent connections per given route and no more 20 connections in total. For many real-world applications these limits may prove too constraining, especially if they use HTTP as a transport protocol for their services. Connection limits, however, can be adjusted using HTTP parameters.

Most of the projects using Knox are “BigData” apps, so even if you are not using Knox for downloading 1PB files from WebHDFS, data exchanged between the apps and Knox can be medium-large (from 100kB to 100MB). And since some actions can be slow (put huge file in HDFS, but also deploy & start Oozie workflow…), connections can be held by long running tasks and then are not fairly shared among all the tasks/users. With some users on our application, you can quickly face to contention on the connection pool in the Knox client and see thread dumps containing traces like this:

"XNIO-2 task-8" #45 prio=5 os_prio=0 tid=0x0000000025878800 nid=0x57f74 waiting on condition [0x000000002f358000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000007a480fc08> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at org.apache.http.pool.PoolEntryFuture.await(PoolEntryFuture.java:139)
    at org.apache.http.pool.AbstractConnPool.getPoolEntryBlocking(AbstractConnPool.java:307)
    at org.apache.http.pool.AbstractConnPool.access$000(AbstractConnPool.java:65)
    at org.apache.http.pool.AbstractConnPool$2.getPoolEntry(AbstractConnPool.java:193)
    at org.apache.http.pool.AbstractConnPool$2.getPoolEntry(AbstractConnPool.java:186)
    at org.apache.http.pool.PoolEntryFuture.get(PoolEntryFuture.java:108)
    at org.apache.http.impl.conn.PoolingClientConnectionManager.leaseConnection(PoolingClientConnectionManager.java:208)
    at org.apache.http.impl.conn.PoolingClientConnectionManager$1.getConnection(PoolingClientConnectionManager.java:195)
    at org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:423)
    at org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:882)
    at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:71)
    at org.apache.hadoop.gateway.shell.Hadoop.executeNow(Hadoop.java:217)
    at org.apache.hadoop.gateway.shell.AbstractRequest.execute(AbstractRequest.java:47)
    at org.apache.hadoop.gateway.shell.hdfs.Rm$Request.access$300(Rm.java:32)
    at org.apache.hadoop.gateway.shell.hdfs.Rm$Request$1.call(Rm.java:63)
    at org.apache.hadoop.gateway.shell.hdfs.Rm$Request$1.call(Rm.java:56)
    at org.apache.hadoop.gateway.shell.AbstractRequest.now(AbstractRequest.java:70)

I ended to customize the connection pool via some reflection on the Hadoop session:

public static void improveHadoopSession(Hadoop hadoop) throws Exception {
    Field clientField = Hadoop.class.getDeclaredField("client");
    clientField.setAccessible(true);
    DefaultHttpClient client = (DefaultHttpClient) clientField.get(hadoop);

    // 20 connections at all...
    ((PoolingClientConnectionManager) client.getConnectionManager()).setMaxTotal(20);
    // and 20 max for each host
    ((PoolingClientConnectionManager) client.getConnectionManager()).setDefaultMaxPerRoute(20);

    // NOTE: if you have more than one Knox server, increase the max total

    // Determines whether Nagle's algorithm is to be used.
    // Set as true, decrease network latency and increase performance
    // at the cost of an increase in bandwidth consumption
    HttpConnectionParams.setTcpNoDelay(client.getParams(), true);

    // Send automatically a keepalive probe to the peer after an
    // interval of inactivity between this host and the peer
    HttpConnectionParams.setSoKeepalive(client.getParams(), true);

    // Socket can be bound even though a previous connection is
    // still in a timeout state
    HttpConnectionParams.setSoReuseaddr(client.getParams(), true);

    // Defines the socket timeout in milliseconds
    HttpConnectionParams.setSoTimeout(client.getParams(), 2000);
}

And used as:

Hadoop hadoop = Hadoop.login("https://$KNOX_SERVER:8443/gateway/default", "leUser", "lePassword");
improveHadoopSession(hadoop);

More properties can be configured, see org.apache.http.params.CoreConnectionPNames

System resource deallocation

All the response produced by the client extends the org.apache.hadoop.gateway.shell.BasicResponse class. This class is not java.io.Closable, so you have to do this:

BasicResponse response = null;
String jobId;
try {
    response = Workflow.submit(getHadoop()).text(xmlConfiguration).now();
    jobId = JsonPath.read(response.getString(), "$.id");
} finally {
    if(response != null) {
      response.close();
    }
}

instead of this, after implementing the Closeable interface:

String jobId;
try (BasicResponse response = Workflow.submit(getHadoop()).text(xmlConfiguration).now()){
    jobId = JsonPath.read(response.getString(), "$.id");
}

Not a big deal, only a convenient feature, but the close() method in BasicResponse does only EntityUtils.consumeQuietly(response.getEntity()), and the underlying HTTP connection is still held. If you want to ensure correct deallocation of OS resources you need to call CloseableHttpResponse#close(). So I modified the class by adding a close method with a close() on the CloseableHttpResponse:

public void close() {
    try {
        this.consume();
    } finally {
        if (response instanceof CloseableHttpResponse) {
            try {
                ((CloseableHttpResponse) response).close();
            } catch (IOException e) {
                throw Throwables.propagate(e);
            }
        }
    }
}

The call on consume() is done before the close(): if the response content is not fully consumed before, the underlying connection can not be safely re-used, will then be shut down and discarded by the connection manager.

Also when the Hadoop session is no more used, the only method present by to be call is shutdown(), but it close only the ExecutorService. When a CloseableHttpClient instance is no longer needed, you have to shut down the connection manager to ensure immediate deallocation of all OS resources. Moreover, the Hadoop instance is not Closeable, so you will not be able to autoclose the CloseableHttpClient. I modify the Hadoop by adding a close() method looking like this:

public void close() {
    try {
      executor.shutdownNow();
    } catch(Exception e) {
      // log something here
    }
    try {
      client.close();
      // client.close() should call getConnectionManager().shutdown();
    } catch(Exception e) {
      // log something here
    }
}

Serialization

HBase REST API can accept multiple serialization formats: XML, JSON and protobuf. protobuf is the most efficient here, due to its compact binary serialization, and JSON the most readable, but by default, XML is used in the Knox Java client for HBase when sending data. If you scan heavily via the REST API, the size of the XML response may quickly increase.

Bandwidth

In order to optimize the bandwidth, a short and efficient trick can be done on the HttpClient: compression. Knox server supports compression via a Jetty handler. But the HttpClient is not configured to use a compression algorithm such as GZIP or even DEFLATE. With the verbosity of XML and the amount of data exchanged between your client and the Knox server, you can gain signifiant bandwidth by activating the compression on the HttpClient:

public static void improveHadoopSession(Hadoop hadoop) throws Exception {
    Field clientField = Hadoop.class.getDeclaredField("client");
    clientField.setAccessible(true);
    DefaultHttpClient client = (DefaultHttpClient) clientField.get(hadoop);

    // ...................

    client.addRequestInterceptor(new HttpRequestInterceptor() {

        public void process(
                final HttpRequest request,
                final HttpContext context) throws HttpException, IOException {
            if (!request.containsHeader("Accept-Encoding")) {
                request.addHeader("Accept-Encoding", "gzip");
            }
        }
    });

    client.addResponseInterceptor(new HttpResponseInterceptor() {

        public void process(
                final HttpResponse response,
                final HttpContext context) throws HttpException, IOException {
            HttpEntity entity = response.getEntity();
            if (entity != null) {
                Header ceheader = entity.getContentEncoding();
                if (ceheader != null) {
                    HeaderElement[] codecs = ceheader.getElements();
                    for (int i = 0; i < codecs.length; i++) {
                        if (codecs[i].getName().equalsIgnoreCase("gzip")) {
                            response.setEntity(
                                    new GzipDecompressingEntity(response.getEntity()));
                            return;
                        }
                    }
                }
            }
        }
    });

But if you look closely in the code, only the following mimetypes are eligible for compression: text/html, text/plain, text/xml, text/css, application/javascript and text/javascript. And since most of the underlying services are dealing with json (mimetype application/json), this configuration will be mostly of the time useless…

Missing important features

Some useful features are not present in the Knox Java client and are really useful. Here is a list of ideas:

Multi-put for HBase

If you want to insert some data in HBase the only available method is store(), which allow you to put only one row in a table. That means if you want to put 10k rows, you have to do 10k HTTP calls… This was the main bottleneck of a new project, so I used the multi-put capabilities of the HBase REST API and implemented it like this:

package org.apache.hadoop.gateway.shell.hbase.table.row;

import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.gateway.shell.AbstractRequest;
import org.apache.hadoop.gateway.shell.EmptyResponse;
import org.apache.hadoop.gateway.shell.Hadoop;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.w3c.dom.Document;
import org.w3c.dom.Element;

import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
import java.io.StringWriter;
import java.util.*;
import java.util.concurrent.Callable;

public class StoreRows {
    public StoreRows() {
    }

    public static class Response extends EmptyResponse {
        Response(HttpResponse response) {
            super(response);
        }
    }

    public static class Request extends AbstractRequest<StoreRows.Response> {
        private String tableName;
        private SortedMap<String, List<InsertableColumn>> columnsByKey = new TreeMap<>();

        public Request(Hadoop session, String tableName) {
            super(session);
            this.tableName = tableName;
        }

        public StoreRows.Request put(String key, String family, String qualifier, Object value, Long time) {
            List<InsertableColumn> columns = columnsByKey.get(key);
            if (columns == null) {
                columns = new ArrayList<>();
                columnsByKey.put(key, columns);
            }
            columns.add(new InsertableColumn(family, qualifier, value, time));
            return this;
        }

        public StoreRows.Request put(String key, String family, String qualifier, Object value) {
            List<InsertableColumn> columns = columnsByKey.get(key);
            if (columns == null) {
                columns = new ArrayList<>();
                columnsByKey.put(key, columns);
            }
            columns.add(new InsertableColumn(family, qualifier, value, null));
            return this;
        }

        protected Callable<StoreRows.Response> callable() {
            return () -> {
                DocumentBuilderFactory builderFactory = DocumentBuilderFactory.newInstance();
                DocumentBuilder builder = builderFactory.newDocumentBuilder();
                Document document = builder.newDocument();
                document.setXmlStandalone(true);
                Element cellSet = document.createElement("CellSet");
                document.appendChild(cellSet);

                for (Map.Entry<String, List<InsertableColumn>> entry : columnsByKey.entrySet()) {

                    Element row = document.createElement("Row");
                    row.setAttribute("key", Base64.encodeBase64String(entry.getKey().getBytes("UTF-8")));
                    cellSet.appendChild(row);
                    Iterator transformerFactory = entry.getValue().iterator();

                    while (transformerFactory.hasNext()) {
                        InsertableColumn transformer = (InsertableColumn) transformerFactory.next();
                        Element writer = document.createElement("Cell");
                        writer.setAttribute("column", transformer.encodedName());
                        if (transformer.time() != null) {
                            writer.setAttribute("timestamp", transformer.time().toString());
                        }

                        writer.setTextContent(transformer.encodedValue());
                        row.appendChild(writer);
                    }
                }

                TransformerFactory transformerFactory1 = TransformerFactory.newInstance();
                Transformer transformer1 = transformerFactory1.newTransformer();
                transformer1.setOutputProperty("standalone", "yes");
                StringWriter writer1 = new StringWriter();
                StreamResult result = new StreamResult(writer1);
                DOMSource source = new DOMSource(document);
                transformer1.transform(source, result);
                URIBuilder uri = Request.this.uri(new String[]{"/hbase", "/", Request.this.tableName, "/false-row-key"});
                HttpPost request = new HttpPost(uri.build());
                StringEntity entity = new StringEntity(writer1.toString(), ContentType.create("text/xml", "UTF-8"));
                request.setEntity(entity);
                return new StoreRows.Response(Request.this.execute(request));
            };
        }
    }
}

copyFromLocal instead of put in HDFS

By default, Knox has the “put” endpoint for HDFS. It copies one or multiple sources from local file system to the destination file system. The copyFromLocal is very similar to the put command, except that you have an option to overwrite the destination if it already exists. You can still do it by yourself:

public void copyFromLocal(String localFile, String remoteFile, boolean erase) {

        if (erase) {
            BasicResponse response = null;
            try {
                response = Hdfs.rm(getHadoop()).file(remoteFile).now();
            } finally {
                closeQuietly(response);
            }
        }

        BasicResponse response = null;
        try {
            response = Hdfs.put(getHadoop()).file(localFile).to(remoteFile).now();
        } finally {
            closeQuietly(response);
        }
    }

exists in HDFS

Not a real command in HDFS, but you will find it really useful:

public boolean exists(String remoteFile) {
    BasicResponse response = null;
    try {
        response = Hdfs.ls(getHadoop()).dir(remoteFile).now();
        return JsonPath.read(response.getString(), "$.FileStatuses.FileStatus[0]") != null;
    } catch (HadoopException e) {
        log.warn("Ouuuuupsss...", e);
        return false;
    } catch (IOException e) {
        throw Throwables.propagate(e);
    } finally {
        closeQuietly(response);
    }
}

Sources:
https://github.com/apache/knox/
https://hc.apache.org/httpcomponents-client-ga/quickstart.html
https://hc.apache.org/httpcomponents-client-4.2.x/examples.html

Credits:
“Error 404 – Advert not found (close up)” by id iom is licensed under CC BY-NC 2.0 / Resized

Related Posts

Comments (5)

Interesting article Vincent. I’m excited about your extensive use of Knox’s client shell library. We would love your involvement in the Apache Knox community and input into improvements/features in the Knox project.

Thanks! I’ve created some Jira (KNOX-805, KNOX-806, KNOX-807, KNOX-808, KNOX-809, KNOX-810) in the Knox project and I will try to add soon some patches for these improvements.

Very nice post ! I Like the french Touch “leUser” and “lePassword” 😉

Awesome work Vincent! I noticed some of your changes and suggestions have now been implemented in knox version 0.12.0. From a quick look at source was there any reason your “multi-put” (StoreRows) and other “missing important features” were not and if there is intent for their inclusion in the future? Thanks again, great post.

Actually just saw your KNOX-808 and there is a patch available for the multi-put!

Leave a comment