Submitting Spark Job via Knox on Yarn

Apache Knox is a REST API Gateway for interacting with Apache Hadoop clusters. It offers an extensible reverse proxy exposing securely REST APIs and HTTP based services in any Hadoop platform. Althought Knox is not designed to be a channel for high volume data ingest or export, it is perfectly suited for exposing a single entrypoint to your cluster and can be seen as a bastion for all your applications.

One of the possible use-case of Knox is to deploy applications on Yarn, like Spark or Hive, without exposing the access to the ResourceManager or other critical services on the network.

In order to ease the use of the Knox REST API, a Java client is available in the Maven central repositories (org.apache.knox:gateway-shell:0.9.1). Let’s see how to use it through a simple example, by deploying a Spark job via Knox and the Yarn REST API.

NOTE
This example assumes that:
– the spark-assembly-*.jar is present in HDFS
– your Spark application jar and its property file are packaged and deployed in HDFS

First, we need to import the Knox Java client and some Maven dependencies:

<dependency>
    <groupId>org.apache.knox</groupId>
    <artifactId>gateway-shell</artifactId>
    <version>0.9.1</version>
</dependency>
<dependency>
    <groupId>org.freemarker</groupId>
    <artifactId>freemarker</artifactId>
    <version>2.3.23</version>
</dependency>
<dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-classic</artifactId>
    <version>1.1.6</version>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.16.10</version>
</dependency>

In order to deploy an application via the Yarn API, we need to:

  • Find the status (timestamp and size) of the needed files for this application

Request

GET https://$KNOX_SERVER:8443/gateway/default/webhdfs/v1/?op=GETFILESTATUS
Accept: application/json; charset=utf-8

Response

HTTP/1.1 200 OK
Content-Type: application/json
Transfer-Encoding: chunked
Server: Jetty(6.1.26)

{
  "FileStatus": {
    "accessTime": ...,
    "blockSize": ...,
    "childrenNum": ...,
    "fileId": ...,
    "group": "hdfs",
    "length": <<size>>,
    "modificationTime": <<timestamp>>,
    "owner": "hdfs",
    "pathSuffix": "",
    "permission": "755",
    "replication": 0,
    "storagePolicy": 0,
    "type": "FILE"
   }
}

Request

POST https://$KNOX_SERVER:8443/gateway/default/resourcemanager/v1/cluster/apps/new-application
Accept: application/json; charset=utf-8

Response

HTTP/1.1 200 OK
Content-Type: application/json
Transfer-Encoding: chunked
Server: Jetty(6.1.26)

{
  "application-id":"application_XXXX",
  "maximum-resource-capability":
    {
      "memory":8192,
      "vCores":2
    }
}
  • Extract the returned application identifier
  • Submit the application defined by JSON configuration where the the previous identifier will be present, and a little more options

Request

POST https://$KNOX_SERVER:8443/gateway/default/resourcemanager/v1/cluster/apps
Accept: application/json; charset=utf-8

...{ "See below" }...

Response

HTTP/1.1 202
Transfer-Encoding: chunked
Location: https://$KNOX_SERVER:8443/gateway/default/resourcemanager/v1/cluster/apps/application_XXXX
Content-Type: application/json
Server: Jetty(6.1.26)
  • Track the application

Request

GET https://$KNOX_SERVER:8443/gateway/default/resourcemanager/v1/cluster/apps/{appid}/state
Accept: application/json; charset=utf-8

Response

{
  "state":"RUNNING"
}

Now, with the Knox Java client, this behavior look like this:

package fr.layer4.knox;

import com.jayway.jsonpath.JsonPath;
import freemarker.template.Configuration;
import freemarker.template.Template;
import freemarker.template.TemplateException;
import freemarker.template.Version;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.gateway.shell.BasicResponse;
import org.apache.hadoop.gateway.shell.Hadoop;
import org.apache.hadoop.gateway.shell.hdfs.Hdfs;
import org.apache.hadoop.gateway.shell.yarn.Yarn;

import java.io.IOException;
import java.io.StringWriter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

@Slf4j
public class Deploy {

    public static void main(String... args) throws Exception {

        String url = "https://$KNOX_SERVER:8443/gateway/default";
        String username = "XXX";
        String password = "XXX";
        String appName = "My app";
        String hdpVersion = "2.3.4.0-361";
        String jvmHome = "/usr/jdk64/jdk1.8.0_60/";
        String sparkJarPath = "hdfs://.../share/spark.jar"; // The assembly spark jar
        String appJarPath = "hdfs://.../project/app.jar";
        String appPropertiesPath = "hdfs://.../project/app.properties";
        String lzoJarPath = "";
        String className = "fr.layer4.spark.SimpleApp";
        int applicationMasterMemory = 8192;
        int applicationMasterCores = 1;
        // Optional
        String keytab = "";
        String principal = "";
        String userCredentialPath = ""; // Must end with a '/'

        // Prepare session
        Hadoop hadoop = Hadoop.login(url, username, password);

        // Find spark.jar
        Map<String, Object> sparkJarStatus = findStatus(hadoop, sparkJarPath);

        // Find app.jar
        Map<String, Object> appJarStatus = findStatus(hadoop, appJarPath);

        // Find app.properties
        Map<String, Object> appPropertiesStatus = findStatus(hadoop, appPropertiesPath);

        // Create a new app
        String appId = createNewApp(hadoop);

        // Prepare a model for the template
        String kerberosOptions = "";
        if (StringUtils.isNoneBlank(keytab) && StringUtils.isNoneBlank(principal) && StringUtils.isNoneBlank(userCredentialPath)) {
            String credentialsFile = userCredentialPath + "credentials_" + UUID.randomUUID().toString();
            kerberosOptions = " -Dspark.yarn.keytab= " + keytab +
                    " -Dspark.yarn.principal= " + principal +
                    " -Dspark.yarn.credentials.file= " + credentialsFile +
                    " -Dspark.history.kerberos.keytab= " + keytab +
                    " -Dspark.history.kerberos.principal= " + principal +
                    " -Dspark.history.kerberos.enabled=true";
        }

        Map<String, Object> data = new HashMap<>();
        data.put("appId", appId);
        data.put("appName", appName);
        data.put("hdpVersion", hdpVersion);
        data.put("jvmHome", jvmHome);
        data.put("sparkJarStatus", sparkJarStatus);
        data.put("sparkJarPath", sparkJarStatus);
        data.put("appJarStatus", appJarStatus);
        data.put("appJarPath", appJarPath);
        data.put("appPropertiesStatus", appPropertiesStatus);
        data.put("appPropertiesPath", appPropertiesPath);
        data.put("lzoJarPath", lzoJarPath);
        data.put("className", className);
        data.put("applicationMasterMemory", applicationMasterMemory);
        data.put("applicationMasterCores", applicationMasterCores);
        data.put("kerberosOptions", kerberosOptions);

        // Generate JSON
        String json = renderTemplate(data);

        // Submit app
        submitApp(hadoop, json);

        // Track the app
        String state = null;
        while (!"RUNNING".equals(state) && !"ACCEPTED".equals(state)) {
            state = trackApp(hadoop, appId);
            log.info("Status: {}", state);
        }
    }

    private static String renderTemplate(Map<String, Object> data) throws IOException, TemplateException {
        StringWriter writer = new StringWriter();
        Configuration cfg = new Configuration(new Version(2, 3, 23));
        Template template = cfg.getTemplate(Knox.class.getClassLoader().getResource("submit.json.template").getPath());
        template.process(data, writer);
        return writer.toString();
    }

    private static Map<String, Object> findStatus(Hadoop hadoop, String path) throws IOException {
        // Just a trick here, you use LISTSTATUS because GETFILESTATUS is not yet implemented in Knox Java client
        BasicResponse response = null;
        try {
            response = Hdfs.ls(hadoop).dir(path).now();
            return JsonPath.read(response.getString(), "$.FileStatuses.FileStatus[0]");
        } finally {
            close("findStatus", response);
        }
    }

    private static void submitApp(Hadoop hadoop, String jsonBody) {
        log.debug("Submitting Spark Job ...");
        BasicResponse response = null;
        try {
            response = Yarn.submitApp(hadoop).text(jsonBody).now();
        } finally {
            close("submitApp", response);
        }
    }

    private static String createNewApp(Hadoop hadoop) throws IOException {
        log.debug("Creating new application ...");
        BasicResponse response = null;
        try {
            response = Yarn.newApp(hadoop).now();
            return JsonPath.read(response.getString(), "$.application-id");
        } finally {
            close("createNewApp", response);
        }
    }

    private static String trackApp(Hadoop hadoop, String appId) throws IOException {
        log.debug("Tracking the app ...");
        BasicResponse response = null;
        try {
            response = Yarn.appState(hadoop).appId(appId).now();
            return JsonPath.read(response.getString(), "$.state");
        } finally {
            close("trackApp", response);
        }
    }

    private static void close(String step, BasicResponse response) {
        if (response != null) {
            log.debug("{} - status: {}", step, response.getStatusCode());
            response.close();
        }
    }
}

The json containing the application definition is defined below:

submit.json.template

{
   "am-container-spec":{
      "commands":{
         "command":"{{JAVA_HOME}}/bin/java -server -Xmx1024m -Dhdp.version=${hdpVersion} -Dspark.yarn.app.container.log.dir=/hadoop/yarn/log/rest-api -Dspark.app.name="${appName}" ${kerberosOptions} org.apache.spark.deploy.yarn.ApplicationMaster --class ${className} --jar __app__.jar --arg '--class' --arg '${appName}' 1><LOG_DIR>/AppMaster.stdout 2><LOG_DIR>/AppMaster.stderr"
      },
      "environment":{
         "entry":[
            {
               "key":"JAVA_HOME",
               "value":"${jvmHome}"
            },
            {
               "key":"SPARK_YARN_MODE",
               "value":true
            },
            {
               "key":"HDP_VERSION",
               "value":"${hdpVersion}"
            },
            {
               "key":"CLASSPATH",
               "value":"{{PWD}}<CPS>__spark__.jar<CPS>{{PWD}}/__app__.jar{{PWD}}/__app__.properties<CPS>{{HADOOP_CONF_DIR}}<CPS>/usr/hdp/current/hadoop-client/*<CPS>/usr/hdp/current/hadoop-client/lib/*<CPS>/usr/hdp/current/hadoop-hdfs-client/*<CPS>/usr/hdp/current/hadoop-hdfs-client/lib/*<CPS>/usr/hdp/current/hadoop-yarn-client/*<CPS>/usr/hdp/current/hadoop-yarn-client/lib/*<CPS>{{PWD}}/mr-framework/hadoop/share/hadoop/common/*<CPS>{{PWD}}/mr-framework/hadoop/share/hadoop/common/lib/*<CPS>{{PWD}}/mr-framework/hadoop/share/hadoop/yarn/*<CPS>{{PWD}}/mr-framework/hadoop/share/hadoop/yarn/lib/*<CPS>{{PWD}}/mr-framework/hadoop/share/hadoop/hdfs/*<CPS>{{PWD}}/mr-framework/hadoop/share/hadoop/hdfs/lib/*<CPS>{{PWD}}/mr-framework/hadoop/share/hadoop/tools/lib/*<CPS>${lzoJarPath}<CPS>/etc/hadoop/conf/secure<CPS>"
            },
            {
               "key":"SPARK_YARN_CACHE_FILES",
               "value":"${appJarPath}#__app__.jar,${sparkJarPath}#__spark__.jar"
            },
            {
               "key":"SPARK_YARN_CACHE_FILES_FILE_SIZES",
               "value":"${appJarStatus.length},${sparkJarStatus.length}"
            },
            {
               "key":"SPARK_YARN_CACHE_FILES_TIME_STAMPS",
               "value":"${appJarStatus.modificationTime},${sparkJarStatus.modificationTime}"
            },
            {
               "key":"SPARK_YARN_CACHE_FILES_VISIBILITIES",
               "value":"PUBLIC,PRIVATE"
            }
         ]
      },
      "local-resources":{
         "entry":[
            {
               "key":"__spark__.jar",
               "value":{
                  "resource":"${sparkJarPath}",
                  "size":${sparkJarStatus.length},
                  "timestamp":${sparkJarStatus.modificationTime},
                  "type":"FILE",
                  "visibility":"APPLICATION"
               }
            },
            {
               "key":"__app__.jar",
               "value":{
                  "resource":"${appJarPath}",
                  "size":${appJarStatus.length},
                  "timestamp":${appJarStatus.modificationTime},
                  "type":"FILE",
                  "visibility":"APPLICATION"
               }
            },
            {
               "key":"__app__.properties",
               "value":{
                  "resource":"${appPropertiesPath}",
                  "size":${appPropertiesStatus.length},
                  "timestamp":${appPropertiesStatus.modificationTime},
                  "type":"FILE",
                  "visibility":"APPLICATION"
               }
            }
         ]
      }
   },
   "application-id":"${appId}",
   "application-name":"${appName}",
   "application-type":"YARN",
   "keep-containers-across-application-attempts":false,
   "max-app-attempts":2,
   "resource":{
      "memory":${applicationMasterMemory?c},
      "vCores":${applicationMasterCores?c}
   },
   "unmanaged-AM":false
}

As we can see in the Java class, this file is just a template which is then rendered by Freemarker. All of this configuration is needed in order to start the Spark ApplicationMaster via the command line:

{{JAVA_HOME}}/bin/java -server -Xmx1024m -Dhdp.version=${hdpVersion} -Dspark.yarn.app.container.log.dir=/hadoop/yarn/log/rest-api -Dspark.app.name="${appName}" ${kerberosOptions} org.apache.spark.deploy.yarn.ApplicationMaster --class ${className} --jar __app__.jar --arg '--class' --arg '${appName}' 1><LOG_DIR>/AppMaster.stdout 2><LOG_DIR>/AppMaster.stderr

This command line depends on files (app.jar and spark.jar), which are defined by the variable SPARK_YARN_CACHE_FILES, is the format “#,#, …”. The 3 others variables (SPARK_YARN_CACHE_FILES_FILE_SIZES, SPARK_YARN_CACHE_FILES_TIME_STAMPS and SPARK_YARN_CACHE_FILES_VISIBILITIES) contain comma separated timestamps, file sizes and visbility of each file (in the same order). This tell to Spark which files to distribute across all executors.

The classpath contains all the necessary jars and also some configuration directory ({{HADOOP_CONF_DIR}}).

NOTE
is the classpath separator and is resolved to :

Next, we create a simple Spark application

package fr.layer4.spark;

import org.apache.spark.api.java.*;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;

public class SimpleApp {
  public static void main(String[] args) {
    SparkConf conf = new SparkConf().setAppName("Simple Application");
    JavaSparkContext sc = new JavaSparkContext(conf);
    JavaRDD<String> logData = sc.textFile("hdfs://....").cache();

    long numAs = logData.filter(new Function<String, Boolean>() {
      public Boolean call(String s) { return s.contains("a"); }
    }).count();

    long numBs = logData.filter(new Function<String, Boolean>() {
      public Boolean call(String s) { return s.contains("b"); }
    }).count();

    System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
  }
}

This class must be packaged with its dependencies and deployed in HDFS, this can be done with the maven-shade-plugin.

One last thing, we need to deploy the application properties in HDFS:

spark-yarn.properties

spark.yarn.submit.file.replication=3
spark.yarn.executor.memoryOverhead=384
spark.yarn.driver.memoryOverhead=384
spark.master=yarn
spark.submit.deployMode=cluster
spark.eventLog.enabled=true
spark.yarn.scheduler.heartbeat.interval-ms=5000
spark.yarn.preserve.staging.files=true
spark.yarn.queue=default
spark.yarn.containerLauncherMaxThreads=25
spark.yarn.max.executor.failures=3
spark.executor.instances=2
spark.eventLog.dir=hdfs\:///spark-history
spark.history.kerberos.enabled=true
spark.history.provider=org.apache.spark.deploy.history.FsHistoryProvider
spark.history.ui.port=18080
spark.history.fs.logDirectory=hdfs\:///spark-history

#
spark.executor.memory=2G
spark.executor.cores=2

spark.history.kerberos.keytab=/etc/security/keytabs/xxx.keytab
spark.history.kerberos.principal=xxx@XXX
spark.yarn.keytab=/etc/security/keytabs/xxx.keytab
spark.yarn.principal=xxx@XXX

Launch the deploy class, and it’s done!

Sources:
https://knox.apache.org/
https://hadoop.apache.org/docs/r2.7.2/hadoop-yarn/hadoop-yarn-site/ResourceManagerRest.html
https://community.hortonworks.com/articles/28100/starting-spark-jobs-via-rest-api-on-a-kerberized-c.html
http://freemarker.org

Credits:
“Shot of the night.” by Scott Schiller is licensed under CC BY-NC 2.0 / Resized

Related Posts

Comments (1)

[…] already introduced Knox in a previous post in order to deploy Spark Job with Knox using the Java client. This post is still about the Knox […]

Leave a comment