Apache Spark is one of the most popular frameworks for creating distributed data processing pipelines and, in this blog, we’ll describe how to use Spark with Redis as the data repository for compute. Spark’s main feature is that a pipeline (a Java, Scala, Python or R script) can be run both locally (for development) and on a cluster, without having to change any of the source code.
Spark allows this flexibility by cleverly using delayed computation or, as it is called in some contexts, laziness. Everything starts with the classes RDD, DataFrame and the more recent Dataset, which are each distributed lazy representations of your data. They use distributed file systems, databases or other similar services as actual storage backend. Their operations — such as map/select, filter/where and reduce/groupBy — do not really make the computation happen. Rather, every operation adds a step to an execution plan that is eventually run when an actual result is needed (e.g., when trying to print it to screen).
When launching the script locally, all computations happen on your machine. Alternately, when launching on a distributed cluster, your data is partitioned to different nodes; the same operation happens (mostly) in parallel within the Spark cluster.
Over time, Spark developed three different APIs to deal with distributed data sets. While each new addition added more capabilities over the previous ones, no single API is a complete replacement of what came before. In order of creation (oldest to newest), here’s an overview:
For more details, check out “A Tale of Three Apache Spark APIs” by Jules Damji.
spark-redis is an open source connector that allows you to use Redis to store your data.
Three main reasons to use Redis as a backend are:
In this article we will focus on getting started with Python and how to use the DataFrame API. At time of writing, Scala which can be considered the “native” language of Spark, has access to some of the more advanced features of the integration, like Redis RDDs and Streams. Since Scala is a JVM language, by extension Java can also use these features. With Python, we’ll need to stick to DataFrames.
Our first step is to install pyspark using pip. You will also need Java 8 installed on your machine.
$ pip install pyspark
Next, we’ll need Maven to build spark-redis. You can get it from the official website or by using a package manager (e.g. homebrew on macOS).
Download spark-redis from GitHub (either git clone or download as a zip), and build it using Maven.
$ cd spark-redis
$ mvn clean package -DskipTests
In the target/ subdirectory, you’ll find the compiled jar file.
If you don’t have it already, you will need a running Redis server to connect to. You can download it in a number of ways: from the official website, package manager (apt-get or brew install redis) or Docker Hub (psst, this might be a good moment to try out Redis Enterprise).
Once you have it up and running, you can launch pyspark. Note that you’ll need to change VERSION to reflect the version you downloaded from GitHub.
$ pyspark –jars target/spark-redis-VERSION-jar-with-dependencies.jar
If your Redis server is in a container or has authentication enabled, add these switches to the previous invocation (and change the values to fit your situation).
–conf “spark.redis.host=localhost” –conf “spark.redis.port=6379” –conf “spark.redis.auth=passwd”
Now that we have a functioning pyspark shell that can store data on Redis, let’s play around with this famous people data set.
After downloading the TSV file, let’s load it as a Spark DataFrame.
>>> full_df = spark.read.csv("pantheon.tsv", sep="\t", quote="", header=True, inferSchema=True)
>>> full_df.dtypes
[('en_curid', 'int'), ('name', 'string'), ('numlangs', 'int'), ('birthcity', 'string'), ('birthstate', 'string'), ('countryName', 'string'), ('countryCode', 'string'), ('countryCode3', 'string'), ('LAT', 'double'), ('LON', 'double'), ('continentName', 'string'), ('birthyear', 'string'), ('gender', 'string'), ('occupation', 'string'), ('industry', 'string'), ('domain', 'string'), ('TotalPageViews', 'int'), ('L_star', 'double'), ('StdDevPageViews', 'double'), ('PageViewsEnglish', 'int'), ('PageViewsNonEnglish', 'int'), ('AverageViews', 'double'), ('HPI', 'double')]
Now, invoking .dtypes shows a list of all the columns (and relative types) in the data set. There are many things that could be interesting to investigate in this data set, but for the purpose of this example let’s focus on finding, for each country, the most frequent occupation of its famous people.
Let’s start by keeping only the columns relevant to our goal.
>>> data = full_df.select("en_curid", "countryCode", "occupation")
>>> data.show(2)
+--------+-----------+-----------+
|en_curid|countryCode| occupation|
+--------+-----------+-----------+
| 307| US| POLITICIAN|
| 308| GR|PHILOSOPHER|
+--------+-----------+-----------+
only showing top 2 rows
This will create a copy of the original DataFrame that only contains three columns: the unique ID of each person, their country and their occupation.
We started by downloading a small data set for the purpose of this blog post, but in real life, if you were using Spark, the data set would likely be much bigger and hosted remotely. For this reason, let’s try to make the situation more realistic in the next step by loading the data into Redis:
>>> data.write.format("org.apache.spark.sql.redis").option("table", "people").option("key.column", "en_curid").save()
This command will load our data set into Redis. The two options we specified help define the data layout in Redis, as we’ll now see.
Let’s jump for a moment in redis-cli to see how our DataFrame is stored on Redis:
$ redis-cli
> SCAN 0 MATCH people:* COUNT 3
1) "2048"
2) 1) "people:2113653"
2) "people:44849"
3) "people:399280"
4) "people:101393"
SCAN shows us some of the keys we loaded to Redis. You can immediately see how the options we gave before were used to define the key names:
Let’s take a look at the content of a random key:
> HGETALL people:2113653
1) "countryCode"
2) "DE"
3) "occupation"
4) "SOCCER PLAYER"
As you can see, each row of our DataFrame became a Redis Hash containing countryCode and occupation. As stated earlier, en_curid was used as primary key, so it became part of the key name.
Now that we’ve seen how the data is stored on Redis, let’s jump back into pyspark and see how we would actually write a pipeline to get the most common occupation for the famous people of each country.
Even though we should have the data still loaded into memory, let’s load it from Redis in order to write code that’s more similar to what you would do in real life.
>>> df = spark.read.format("org.apache.spark.sql.redis").option("table", "people").option("key.column", "en_curid").load()
>>> df.show(2)
+--------+-----------+----------+
|en_curid|countryCode|occupation|
+--------+-----------+----------+
| 915950| ZW| SWIMMER|
| 726159| UY|POLITICIAN|
+--------+-----------+----------+
only showing top 2 rows
This is how your Spark pipeline would start, so let’s finally do the computation!
>>> counts = df.groupby("countryCode", "occupation").agg({"en_curid": "count"})
>>> counts.show(2)
+-----------+-------------+---------------+
|countryCode| occupation|count(en_curid)|
+-----------+-------------+---------------+
| FR|MATHEMATICIAN| 34|
| IT|SOCCER PLAYER| 81|
+-----------+-------------+---------------+
only showing top 2 rows
Now each row represents the count of all the present (country, occupation) combinations. For the next step, we need to select only the occupation with the highest count for each country.
Let’s start by importing a few new modules we need, and then defining, using windows, the code to select the most frequent occupations:
>>> from pyspark.sql.window import Window
>>> from pyspark.sql.functions import count, col, row_number
>>> w = Window().partitionBy("countryCode").orderBy(col("count(en_curid)").desc())
>>> result = counts.withColumn("rn", row_number().over(w)).where(col("rn") == 1).select("countryCode", "occupation")
>>> result.show(5)
+-----------+-------------+
|countryCode| occupation|
+-----------+-------------+
| DZ| POLITICIAN|
| LT| POLITICIAN|
| MM| POLITICIAN|
| CI|SOCCER PLAYER|
| AZ| POLITICIAN|
+-----------+-------------+
only showing top 5 rows
This code grouped the original rows by countryCode, ordered the content of each group by count(en_curid)in descending order, and took only the first element. As you can see, within this small sample, politician seems a very common occupation.
Let’s see for how many countries this is true:
>>> result.where(col("occupation") == "POLITICIAN").count()
150
Wow, that’s a lot, considering there are 195 countries in the world as of today. Now, let’s just save the remaining countries in Redis:
>>> no_pol = result.where(col("occupation") != "POLITICIAN")
>>> no_pol.write.format("org.apache.spark.sql.redis").option("table", "occupation").option("key.column", "countryCode").save()
If you now jump into redis-cli, you will be able to see the new data:
$ redis-cli
> HGETALL occupation:IT
1) "occupation"
2) "RELIGIOUS FIGURE"
> HGETALL occupation:US
1) "occupation"
2) "ACTOR"
If you want to practice more, inspect the original data set and see if you find other details that pique your interest.
A very important point worth reiterating is that every operation on a RDD or DataFrame/set object will be distributed on multiple nodes. If our example was not just about famous people, we’d have had tens of millions of rows at the beginning. In that case, Spark would scale out the computation. But if you only had one Redis instance in place, you’d have N nodes hammering on it, most probably bottlenecking your network bandwidth.
To get the most out of Redis, you’ll need to scale it appropriately using the Redis Cluster API. This will ensure that all your computing nodes are not starved when reading, or choked when writing.
In this post, we explored how to download, compile and deploy spark-redis in order to use Redis as a backend for your Spark DataFrames. Redis offers full support for the DataFrame API, so it should be very easy to port existing scripts and start enjoying the added speed Redis offers. If you want to learn more, take a look at the documentation for spark-redis on GitHub.