Apache Flink Hello World Java example

apache flink java hello world example

Apache Flink is a distributed streaming platform for big datasets. In this article we are going to show you a simple Hello World example written in Java.

Flink has an agile API for Java and Scala that we need to access. We will use Maven as a build tool for dependency management. You don’t need Hadoop or any related stuff from its ecosystem.

Open your favorite IDE and create a maven project. Inside your pom.xml specify two dependencies: flink-java and flink-clients (both 0.9.1 version). See the example below:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <name>Apache Flink Java Hello World example</name>

    <groupId>flink01</groupId>
    <artifactId>flink01</artifactId>
    <version>1.0</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>0.9.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>0.9.1</version>
        </dependency>
    </dependencies>

</project>

Hello World Java application

Now it’s time to create the Hello World application. Of course, there is no better “Hello World” example in Big Data than a Word Count task. This is what we will try to create in just few lines of code.

We need just 2 classes for it – the runner class with main method and the line-splitter class that implements the FlatMapFuntion interface.

package app;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class FlinkHelloWorld {
    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<String> sampleData = env.fromElements(
                "Hello! This is a text sample",
                "to represent Apache Flink streaming",
                "Hello Hello Hello"
        );

        DataSet<Tuple2<String, Integer>> wordCountResults =
                sampleData.flatMap(new LineSplitter())
                        .groupBy(0)
                        .aggregate(Aggregations.SUM, 1);

        wordCountResults.print();
    }
}

class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
        String[] tokens = value.toLowerCase().split("\\W+");
        for (String token : tokens) {
            if (token.length() > 0) {
                out.collect(new Tuple2<String, Integer>(token, 1));
            }
        }
    }
}

Explanation

Let’s try to understand what we have done in our FlinkHelloWorld class.

  1. Create the ExecutionEnvironment instance. ExecutionEnvironment – is a class that controlls the job’s execution.

    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  2. Then we create the example data. ExecutionEnvironment object has a method called fromElements(). It accepts a vararg of Strings and returns a DataSet object.
  3. In the next step we execute the whole logic. It’s result is returned into a wordCountResults dataset. We are using a sequence of 3 methods: flatMap(), groupBy(), aggregate().
    • flatMap() applied the FlatMapFunction to every entry of a dataset. The result is returned in the flattened form. We created a LineSplitter class that implements the FlatMapFunction interface. Inside the overwritten function flatMap(String value, Collector<Tuple2<String, Integer>> out). It turns lines to lowercase and splits words apart by a simple regular expression. Then it passes to collector instance the pair of a word and “1” that means the 1 occurrence of the world (it will be used later for counting each word).
    • groupBy() method accepts the integer vararg of fields indexes that you want to group on. In our case there are only 2 of them (word and “1” that came from the previous step) so we pass a 0, which means that the pairs <word>-1 will be grouped by the <word>.
    • aggregate() method accepts the Aggregations object and the index of field that is used for aggregation. So when we pass Aggregation.SUM and 1 it will calculate the sum of occurrences of each word.
  4. At the end we simply call the print() method on the wordCountResults.

Afterall, we will see results of the Word Count in console:

10/10/2015 23:07:42	CHAIN Reduce (SUM(1), at main(FlinkHelloWorld.java:20) -> FlatMap (collect())(1/2) switched to FINISHED 
10/10/2015 23:07:42	DataSink (collect() sink)(2/2) switched to RUNNING 
10/10/2015 23:07:42	DataSink (collect() sink)(1/2) switched to RUNNING 
10/10/2015 23:07:42	DataSink (collect() sink)(2/2) switched to FINISHED 
10/10/2015 23:07:42	DataSink (collect() sink)(1/2) switched to FINISHED 
10/10/2015 23:07:42	Job execution switched to status FINISHED.
(apache,1)
(flink,1)
(hello,4)
(to,1)
(a,1)
(is,1)
(represent,1)
(sample,1)
(streaming,1)
(text,1)
(this,1)

If you have any questions about this Flink Hello World Java example, please, leave it in comments below. Thank’s for your attention.

Leave a Reply

Be the First to Comment!