Apache Spark vs Flink performance comparison – Word Count

flink vs spark comparsion

This article is a comparison between Apache Flink and Apache Spark running locally on a single node. We will run a Word Count task to see who will do it quicker.

Please note, this comparison can have a lot of inaccuracies. First of all, it was measured in the simplest way – it compares time taken from the beginning of the task and the ending. The main point is to show the performance of both Spark and Flink in the same challenge – Word Count task on a 80Mb text file from Musicsmatch service (including the startup time). You can download this file by clicking on this link.

For this experiment I used the Ubuntu virtual machine. I ran this comparison on 2 VM configurations: with 4Gb RAM and 1 core, with 8Gb RAM and 2 cores. Code used for this Word Count will be listed below.

Apache Flink vs Apache Spark performance comparison

4Gb RAM, 1 core  8Gb RAM, 2 cores
 Apache Spark (time)  58 sec  45 sec
 Apache Flink (time)  36 sec  22 sec

For both cases there is average time in seconds for 3 runs.

The next time I will compare Flink with Spark streaming power.

If you noticed something wrong in this experiment, please leave me a note in comments.

Flink and Spark Word Count code

Maven dependencies:

<dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>0.9.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>0.9.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.5.1</version>
        </dependency>
</dependencies>

Spark word count implementation:

public class SparkWordCount {
    public static void main(String[] args) {
        long start = System.currentTimeMillis();
        String file = "mxm_779k_matches.txt";
        SparkConf configuration = new SparkConf()
                .setAppName("Spark WC")
                .setMaster("local");

        JavaSparkContext sc = new JavaSparkContext(configuration);
        JavaRDD<String> fileRDD = sc.textFile(file).cache();

        JavaPairRDD<String, Integer> counts = fileRDD
                .flatMap(
                        new FlatMapFunction<String, String>() {
                            public Iterable<String> call(String s) {
                                return Arrays.asList(s.toLowerCase().split("\\W+"));
                            }
                        }
                )
                .mapToPair(
                        new PairFunction<String, String, Integer>() {
                            public Tuple2<String, Integer> call(String s) {
                                return new Tuple2(s, 1);
                            }
                        }
                )
                .reduceByKey(
                        new Function2<Integer, Integer, Integer>() {
                            public Integer call(Integer i1, Integer i2) {
                                return i1 + i2;
                            }
                        }
                );

        int size = counts.collect().size();
        long end = System.currentTimeMillis();

        System.out.println("\n Time: " + (end - start) + " - " + size);

    }
}

Flink Word count implementation:

public class FlinkWordCount {
    public static void main(String[] args) throws Exception {
        long start = System.currentTimeMillis();

        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<String> sampleData = env.readTextFile("mxm_779k_matches.txt");

        DataSet<Tuple2<String, Integer>> wordCountResults =
                sampleData.flatMap(new LineSplitter())
                        .groupBy(0)
                        .aggregate(Aggregations.SUM, 1);

        List<Tuple2<String, Integer>> collect = wordCountResults.collect();

        long end = System.currentTimeMillis();

        System.out.println("\n Time: " + (end - start) + " - " + collect.size());
    }
}

class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
        String[] tokens = value.toLowerCase().split("\\W+");
        for (String token : tokens) {
            if (token.length() > 0) {
                out.collect(new Tuple2<String, Integer>(token, 1));
            }
        }
    }
}

If you have other look on Apache Spark vs Flink comparison – please leave a comment below.

 

Leave a Reply

Be the First to Comment!