run hdinsight spark job with HttpUrlConnection

guguli

Bekanntes Mitglied
Hallo zusammen,

ich möchte von außen einen spark job ausführen, welcher in einem azure cluster ist.

Mein Code ist bis jetzt wie folgt:

Code:
private void runJob(String jarFolder) throws Exception {
       SparkJobRunnerInput input = new SparkJobRunnerInput(/* livy url */ "https://$LivyHost/livy/batches",
               /* jarFileWasbPath */ "wasb://.../" + jarFolder,
               /* classname */ "com...", /* sparkJars */ "com....",
               /* requestArgs */ "[\"blob1\",\"blob2\"]",
               /* user */ "user", /* pwd */ "pwd");

       SparkJobRunner runner = new SparkJobRunner(input);
       String requestResult = runner.run();
   }

Code:
public class SparkJobRunner {

    private SparkJobRunnerInput input;

    public SparkJobRunner(SparkJobRunnerInput input) {
        this.input = input;
    }

    public String run() throws Exception {

        String requestBody = buildRequestBody();
        String response = RunSparkJob(requestBody);
        return response;
    }

    private String buildRequestBody() {
        return String.format(
                "{ \"file\":\"%s\", \"className\":\"%s\", \"args\":\"%s\", \"conf\": {\"spark.jars.packages\": \"%s\" } }",
                input.jarFileWasbPath, input.className, input.requestArgs, input.sparkJars);
    }

    private String runSparkJob(String requestBody) throws Exception {
       
HttpURLConnection connection = null;

       try {
           URL url = new URL(this.input.livyUrl);
           connection = (HttpURLConnection) url.openConnection();
           connection.setRequestMethod("POST");
           connection.setRequestProperty("Content-Type", "application/json");

           connection.setUseCaches(false);
           connection.setDoOutput(true);

           String urlParameters = "";
           // // Send request
           DataOutputStream wr = new DataOutputStream(connection.getOutputStream());
           wr.writeBytes(urlParameters);
           wr.close();

           // // Get Response
           InputStream is = connection.getInputStream();
           BufferedReader rd = new BufferedReader(new InputStreamReader(is));
           StringBuilder response = new StringBuilder();
           String line;
           while ((line = rd.readLine()) != null) {
               response.append(line);
               response.append('\r');
           }
           rd.close();
           return response.toString();
       } catch (Exception e) {
           e.printStackTrace();
           return null;
       } finally {
           if (connection != null) {
               connection.disconnect();
           }
       }
    }
}

in der Methode runSparkJob soll die Verbindung hergestellt werden und dann den Job ausgeführt werden.
1. ich weiss nicht was in UrlParameters sein soll.
2. wie kann ich denn spark job dann ausführen?

Danke für eure Hilfe.
 

Ähnliche Java Themen

Neue Themen


Oben