Running PySpark with Cassandra using spark-cassandra-connector in Jupyter Notebook

We are facing several out of memory issues when we are doing operations on big data which present in our DB Cassandra cluster. So we decided its better to use Spark to solve this problem.

It became a tough & interesting journey for us because of one decision we took as a team, That is we want to use maximum 2 languages for our project at backend which are Node.js & Python. According to us, these 2 are right tools to solve the problems for this project. Because all data collection & transformation issues will be handled easily by Node.js, All other stuff like Big Data operations, Artifical Intelligence/Machine Learning problems will be solved using Python. You may wonder why we put this restriction in the era of polyglot microservices architecture.

It’s fun to work in multiple languages, I used to be a Java Developer then moved to Node.js and then moved to Python according to project requirements and it makes sense also at the time of building but its very tough to maintain them or finding right people who are polyglot or sometimes it doesn’t make sense to recruit a developer who has expertise in some language for maintaining one module.

A lot of teams or companies are facing issues due to polyglot programming. Couple of them expressed their concerns also.

Once I started working on PySpark everything went smoothly until I thought of using Cassandra. Very less documentation or examples available due to that I used a couple of examples related to PySpark and a couple of examples related to Scala. Using these I started my journey.

If you didn’t installed PySpark & Jupyter you can refer to my previous article. Without wasting much time lets get our hands dirty.


We need some good data to work on it. So, I choose movie lens data for this. You can get the latest data at here. I choose ml-latest.zip instead of ml-latest-small.zip so that we can play with reasonably large data.

Let’s load this data first in our Cassandra DB.
Open your cqlsh shell. Create a keyspace for this. I created the movie_lens keyspace and started using it using below commands.

CREATE KEYSPACE movie_lens WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};

use movie_lens;

Then I created movies, ratings tables using below commands.

CREATE TABLE movies(movie_id int PRIMARY KEY, title text, genres text);

CREATE TABLE ratings(user_id int, movie_id int, rating double, timestamp bigint, primary key((user_id), movie_id));

Loaded data using below commands.

COPY movies(movie_id, title, genres) FROM '../ml-latest/movies.csv' WITH HEADER = true;

COPY ratings(user_id, movie_id, rating, timestamp) FROM '../ml-latest/ratings.csv' WITH HEADER = true;

Then you will get outputs like these

cqlsh:movie_lens> COPY movies(movie_id, title, genres) FROM '../ml-latest/movies.csv' WITH HEADER = true;
Reading options from the command line: {'header': 'true'}
Using 3 child processes

Starting copy of movie_lens.movies with columns [movie_id, title, genres].
Processed: 45843 rows; Rate:   12664 rows/s; Avg. rate:   11930 rows/s
45843 rows imported from 1 files in 3.843 seconds (0 skipped).
cqlsh:movie_lens> COPY ratings(user_id, movie_id, rating, timestamp) FROM '../ml-latest/ratings.csv' WITH HEADER = true;
Reading options from the command line: {'header': 'true'}
Using 3 child processes

Starting copy of movie_lens.ratings with columns [user_id, movie_id, rating, timestamp].
Processed: 26024289 rows; Rate:    9378 rows/s; Avg. rate:   22557 rows/s
26024289 rows imported from 1 files in 19 minutes and 13.714 seconds (0 skipped).

Everything set. It’s time to do coding.


Start your Jupyter notebook using below command.

jupyter notebook

Create a new notebook.

First, we need to set some arguments or configurations to make sure PySpark connects to our Cassandra node cluster.

# Configuratins related to Cassandra connector & Cluster
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.datastax.spark:spark-cassandra-connector_2.11:2.3.0 --conf spark.cassandra.connection.host=127.0.0.1 pyspark-shell'

Here we are saying that use spark-cassandra-connector to connect to our Cassandra cluster and its hostname is 127.0.0.1. That’s it. For example, you have multiple nodes in your Cassandra cluster then in the host configuration, we need to give all of their ips. For example,

# Configuratins related to Cassandra connector & Cluster
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.datastax.spark:spark-cassandra-connector_2.11:2.3.0 --conf spark.cassandra.connection.host=192.168.0.123,192.168.0.124 pyspark-shell'

First time when you run this it will take a while because it needs to download the jar and connect to our Cassandra cluster. You will get output like this

Then we need to create the Spark Context.

# Creating PySpark Context
from pyspark import SparkContext
sc = SparkContext("local", "movie lens app")

After this, we need to create SQL Context to do SQL operations on our data.

# Creating PySpark SQL Context
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

We are going to work on multiple tables so need their data frames to save some lines of code created a function which loads data frame for a table including key space given

# Loads and returns data frame for a table including key space given
def load_and_get_table_df(keys_space_name, table_name):
    table_df = sqlContext.read\
        .format("org.apache.spark.sql.cassandra")\
        .options(table=table_name, keyspace=keys_space_name)\
        .load()
    return table_df

Let’s load the movies, ratings data frames.

# Loading movies & ratings table data frames
movies = load_and_get_table_df("movie_lens", "movies")
ratings = load_and_get_table_df("movie_lens", "ratings")

Now, Let’s understand the data. First, let’s see how movies data looks like.

# First 20 rows of movies table
movies.show()

Then you will get data like this

+--------+--------------------+--------------------+
|movie_id|              genres|               title|
+--------+--------------------+--------------------+
|  166048|       Drama|Romance|Happy Birthday (2...|
|   99572|               Drama|     Cornelis (2010)|
|   96638|         Documentary|    Klitschko (2011)|
|  146455|       Drama|Fantasy|      Hallway (2015)|
|   87258|Crime|Drama|Film-...|    The Thief (1952)|
|   69227|     Children|Comedy|Ernest Rides Agai...|
|     678|      Drama|Thriller|Some Folks Call I...|
|  154214|    Children|Fantasy|Barbara the Fair ...|
|   71675|              Comedy|    Hiroshima (2009)|
|  139918|     Documentary|War|The Ghost Army (2...|
|  164753|             Romance|Anything for Love...|
|  168310|  (no genres listed)|Μαριχουάνα Στοπ !...|
|   93952|     Horror|Thriller|Silent House, The...|
|  118204|Adventure|Documen...|     McConkey (2013)|
|  168678|              Comedy|     Devolved (2010)|
|  111852|         Documentary|Generation Iron (...|
|   43333|       Drama|Romance|        Water (2005)|
|  106026|      Comedy|Musical|Folies Bergere de...|
|  105926|              Horror| Barrio Tales (2012)|
|   36477|Comedy|Drama|Romance|  Baxter, The (2005)|
+--------+--------------------+--------------------+
only showing top 20 rows

Then let’s see how ratings data looks like.

# First 20 rows of ratings table
ratings.show()

Then you will get data like this

+-------+--------+------+----------+
|user_id|movie_id|rating| timestamp|
+-------+--------+------+----------+
| 117752|     340|   5.0|1004221564|
| 117752|    1275|   5.0|1004221564|
| 117752|    1760|   5.0|1004221564|
| 117752|    2028|   5.0|1004221564|
| 117752|    2268|   5.0|1004221564|
| 117752|    2643|   5.0|1004221564|
| 117752|    3204|   5.0|1004221564|
| 117752|    3677|   5.0|1004221564|
| 117752|    4143|   4.0|1004221564|
| 117752|    4565|   5.0|1004221564|
| 122430|       1|   4.0| 832302134|
| 122430|      25|   5.0| 850195697|
| 122430|      34|   4.0| 832302184|
| 122430|      50|   5.0| 832302207|
| 122430|      52|   3.0| 850195768|
| 122430|      69|   4.0| 832303477|
| 122430|     110|   3.0| 832302172|
| 122430|     111|   3.0| 832302229|
| 122430|     122|   4.0| 832302922|
| 122430|     145|   3.0| 832302767|
+-------+--------+------+----------+
only showing top 20 rows

Now, want to see top 20 reviewers in terms of number of ratings given

# Top 20 reviewers in terms of number of ratings given
ratings.groupBy("user_id").count().orderBy('count', ascending=False).show()

Got data like this

+-------+-----+
|user_id|count|
+-------+-----+
|  45811|18276|
|   8659| 9279|
| 270123| 7638|
| 179792| 7515|
| 228291| 7410|
| 243443| 6320|
|  98415| 6094|
| 229879| 6024|
|  98787| 5814|
| 172224| 5701|
| 230417| 5619|
|  70648| 5356|
| 194690| 5206|
| 107720| 5169|
|  24025| 4946|
| 165352| 4921|
| 101276| 4834|
| 243331| 4834|
|  74275| 4815|
|  41190| 4785|
+-------+-----+
only showing top 20 rows

At the time of the above query running and data crunching, you will see in command prompt from where you started Jupyter Notebook.

It means its streaming data from DB(Cassandra Cluster) and doing operations on top of it.

Here I want to select 2 users and want to see how similar many movies they rated and how many unique movies either one of them reviewed.

To do all of this let’s see what schema looks like of ratings data frame.

#Schema of ratings data frame
ratings.printSchema()

Now let’s select the movies each user rated.

# Selecting 2 users rated movies
firstUserMovies = ratings.where(ratings["user_id"] == 45811).select("movie_id")
secondUserMovies = ratings.where(ratings["user_id"] == 98415).select("movie_id")

At this time spark doesn’t query any data. Spark works in a lazy manner until unless you want to do something related to data then only it will get the data. For every operation it’s going to get the data to avoid this we can cache it. Whenever first time it gets the data it just caches it and uses it from cache next time instead of getting again from DB.

Let’s cache the data frames.

# Caching the data frames of users
firstUserMovies = firstUserMovies.cache()
secondUserMovies = secondUserMovies.cache()

Now, everything set we need to get some movies both users reviewed

# 20 common movies both users reviewed
firstUserMovies.intersect(secondUserMovies).show()

It will give data like this

+--------+
|movie_id|
+--------+
|     471|
|    1088|
|    1238|
|    1342|
|    1580|
|    1645|
|    1959|
|    2122|
|    2366|
|    2866|
|    3175|
|    3918|
|    5300|
|    6357|
|    6466|
|    6620|
|    6658|
|    7253|
|    7982|
|    8638|
+--------+
only showing top 20 rows

Want to verify or just to make sure wether did it correctly or not. So, choose a movie and check it. In this case, I choose a movie with id 3918 and checked.

To check whether the first user reviewed or not

# Verifying wether first user reviewed these movies or not
firstUserMovies.where(firstUserMovies["movie_id"] == 3918).show()

Output:

+--------+
|movie_id|
+--------+
|    3918|
+--------+

To check whether the second user reviewed or not

# Verifying wether second user reviewed these movies or not
secondUserMovies.where(secondUserMovies["movie_id"] == 3918).show()

Output:

+--------+
|movie_id|
+--------+
|    3918|
+--------+

To know number of movies both users reviewed

firstUserMovies.intersect(secondUserMovies).count()

Output:

4488

To know in total How many unique movies they reviewed

# In total How many unique movies they reviewed
firstUserMovies.union(secondUserMovies).distinct().count()

Output:

19882

Once you played around don’t want that data to be retained in the cache then

# Removing data frame from Cache
firstUserMovies.unpersist()
secondUserMovies.unpersist()

If you want to shut down the PySpark context then

# Shutdowning PySpark Context
sc.stop()

You will find this Jupyter Notebook at my GitHub Repository.

Peace. Happy Coding.


Related Post

5 thoughts on “Running PySpark with Cassandra using spark-cassandra-connector in Jupyter Notebook

  1. Hi, really usefull, and it works perfectly!
    Do you know how can I insert data to Cassandra, i assume that i can use “sqlContext.” to write back but i don’t know how. Any help would be really appreciated!
    Thanks
    s

  2. Very helpful article. It’s 2020 and there are still not much of documentation for pyspark and cassandra. Thanks.

  3. How can I use copy command in python script without cqlsh command line. or is there efficient way to insert data from big .csv into Cassandra table from python script?

Leave a Reply

Your email address will not be published. Required fields are marked *