Part 1 Continued - Saving to mongodb
I decided to build an online course aggregator and introduced the project in this post, and last week I started part 1 explaining the backend and creating the first provider class for coursera. This post I will dive into the storage engines, initially I created two storage engines; mongodb and postgres using sql alchemy.
The storage engine is similar to the provider model, where I create an abstract base class that contains the store_courses() abstract method and each engine will implement that method. Here is the main program which iterates over a list of providers and collects courses, then iterates over the storage engines saving the courses.
import importlib
provider_list = ["coursera", "edx"]
storage_list = ["mongodb", "postgres"]
#loop over the providers and call their get courses method
for provider in provider_list:
print "Collecting Courses for provider: {}".format(provider) #use importlib to "polymorphicly call the providers"
mod = importlib.import_module("providers." + provider.lower()) #import module
func = getattr(mod, provider)() #create instance
data = getattr(func, "get_courses")() #call get courses, from any provide we expect the same dictionary structure to be returned
#loop through Storage Engines Lists
for store in storage_list:
print "Saving Courses into Engine: {}".format(store)
mod = importlib.import_module("storage." + store.lower())
func = getattr(mod, store)()
getattr(func, "store_courses")(data) #call the store courses method to save the data
Here we loop collect the data from the provider, then save that providers data for every enabled storage engine. Since my provider class returns a specific JSON structure creating the mongodb engine was very simple as we just need to store that json doc. First things first, here is the basic storage engine abstract base class.
from abc import ABCMeta, abstractmethod
import yaml
class StorageBase(object):
__metaclass__ = ABCMeta
@classmethod
def get_config(cls, engine_name):
config = yaml.load(open('config.yaml'))
return config['storage_configs'].get(engine_name, None)
@abstractmethod
def store_courses(self):
#abstract implemented in each provider
pass
Once again we are using python's ABC library to create the abstract class. I also created a get_config class method to get settings from a configuration file.
MongoDB Storage Engine
The mongodb simply needs to iterate over each course and save the specific course in a collection. However, I plan on running this program on a semi-regular basis and I only want to update existing data so I don't have duplication. Unfortunately I could not find an easy way to perform a bulk upsert, so I am iterating and upserting individually. This seems to work well enough. Here is what the mongoengine looks like.
from storage import StorageBase
import pymongo
from pymongo import ASCENDING
class MongoDB(StorageBase):
#do not override here, place this in the YAML configuration config.yaml
config = {
"server": "localhost",
"db": "wisdom",
"collection": "courses"}
def __init__(self):
config = self.get_config(self.__class__.__name__) # call the super init method to get config
if config is not None:
for item in config:
self.config[item] = config[item]
self.collection = self.config["collection"]
connection = pymongo.MongoClient(self.config["server"])
self.db = connection[self.config["db"]]
def store_courses(self, courses):
#we can't bulk upsert so lets loop and upsert
for course in courses:
self.db[self.collection].update({"id": course["id"]}, course, upsert=True)
self.__set_indicies()
#this method is to override the collection for testing
def set_collection(self, collection):
self.collection = collection
#setup any indexes we might want
def __set_indicies(self):
self.db[self.collection].ensure_index("id", unique=True)
self.db[self.collection].ensure_index([("course_name", ASCENDING), ("provider", ASCENDING)], unique=True)
We have a default configuration in case the collection, server and db names aren't set in the config file. The store_courses implementation with mongo is very easy.
def store_courses(self, courses):
#we can't bulk upsert so lets loop and upsert
for course in courses:
self.db[self.collection].update({"id": course["id"]}, course, upsert=True)
self.__set_indicies()
We simply call pymongo's update method and set upsert to true so it adds docs that don't exist. I also added a private method to create indexes on the id and course_name fields so we can have fast searches on those fields.
Unit Testing the mongo class
Once again I use a test driven approach not only because it is a best practice, but also because I don't want to hammer the coursera API over and over while I code the storage engines. In the test folder there is a stubs folder that contains a collected_courses.json
file that has a small subset of the data we get back from the providers class. Using this stub, I create a unit test for mongodb that saves this data into a test collection and makes sure the data is upserted properly.
import unittest
from storage.mongodb import MongoDB
import pymongo
import json
import yaml
class MongoTest(unittest.TestCase):
def setUp(self):
self.config = {
"unit_test_collection": "testing"
}
#get the configuration
configs = yaml.load(open('config.yaml'))['storage_configs']["MongoDB"]
for item in configs:
self.config[item] = configs[item]
with open('test/stubs/collected_courses.json') as f:
data = f.read()
self.courses = json.loads(data)
mongodb = pymongo.MongoClient()
self.mongo = mongodb[self.config['db']]
#drop the unit_test collection
self.mongo[self.config['unit_test_collection']].drop()
def test_input(self):
mDB = MongoDB()
#overide to testing collection
mDB.set_collection(self.config["unit_test_collection"])
mDB.store_courses(self.courses)
#now check the results
count = self.mongo[self.config["unit_test_collection"]].count()
self.assertEquals(count, 10)
#reinsert and make sure it still equals 10
def test_updates(self):
self.test_input()
#make a change to a few records and make sure the data is changed
def test_changes(self):
self.courses[1]["short_description"] = "A new short description"
self.courses[0]["providers_id"] = "unittests"
#reenter the data
self.test_input()
#check for test_updates
course = self.mongo[self.config["unit_test_collection"]].find({"id": "daf1d740782b94d750502fcb18a284fa"})
self.assertEquals(course[0]["short_description"], "A new short description")
#check for test_updates
course = self.mongo[self.config["unit_test_collection"]].find({"id": "d114184968727a76272e32f6e4417e78"})
self.assertEquals(course[0]["providers_id"], "unittests")
#make sure we still have 10 records
self.assertEquals(self.mongo[self.config["unit_test_collection"]].count(), 10)
After running the unit test I can go into mongodb and look at the data in the unit_test collection.
[Brett@bretts-mac-mini:~/src/published_blog (master)]$ mongo
MongoDB shell version: 2.2.2
connecting to: test
> show dbs
wisdom 0.203125GB
> use wisdom
switched to db wisdom
> show collections
courses
system.indexes
unit_tests
> db.unit_tests.count()
10
> db.unit_tests.findOne()
{
"_id" : ObjectId("533cc8ab61d30a89d1d279ae"),
"workload" : "8-10 hours/week, 10-20 hours/week with programming assignments",
"course_name" : "Compilers",
"language" : "english",
"sessions" : [
{
"duration" : "10 weeks",
"provider_session_id" : 76,
"start_date" : "20120423"
},
{
"duration" : "11 weeks",
"provider_session_id" : 150,
"start_date" : "20121001"
},
{
"duration" : "11 weeks",
"provider_session_id" : 970514,
"start_date" : "20130211"
},
{
"duration" : "11 weeks",
"provider_session_id" : 972209,
"start_date" : "20140317"
}
],
"media" : {
"icon_url" : "https://s3.amazonaws.com/coursera/topics/compilers/large-icon.png",
"video_id" : "sm0QQO-WZlM",
"video_url" : "https://d1a2y8pfnfh44t.cloudfront.net/sm0QQO-WZlM/",
"video_type" : "mp4",
"photo_url" : "https://s3.amazonaws.com/coursera/topics/compilers/large-icon.png"
},
"providers_id" : "compilers",
"tags" : [
"Computer Science: Systems & Security",
"Computer Science: Software Engineering"
],
"institution" : {
"website" : "http://online.stanford.edu/",
"city" : "Palo Alto",
"name" : "Stanford University",
"country" : "US",
"state" : "CA",
"logo_url" : "https://coursera-university-assets.s3.amazonaws.com/d8/4c69670e0826e42c6cd80b4a02b9a2/stanford.png",
"id" : "269fc60adb004b0b719031a97aedf5e9",
"description" : "The Leland Stanford Junior University, commonly referred to as Stanford University or Stanford, is an American private research university located in Stanford, California on an 8,180-acre (3,310 ha) campus near Palo Alto, California, United States."
},
"full_description" : "<p>This course will discuss the major ideas used today in the implementation of programming language compilers, including lexical analysis, parsing, syntax-directed translation, abstract syntax trees, types and type checking, intermediate languages, dataflow analysis, program optimization, code generation, and runtime systems. As a result, you will learn how a program written in a high-level language designed for humans is systematically translated into a program written in low-level assembly more suited to machines. Along the way we will also touch on how programming languages are designed, programming language semantics, and why there are so many different kinds of programming languages.</p>\n<p>The course lectures will be presented in short videos. To help you master the material, there will be in-lecture questions to answer, quizzes, and two exams: a midterm and a final. There will also be homework in the form of exercises that ask you to show a sequence of logical steps needed to derive a specific result, such as the sequence of steps a type checker would perform to type check a piece of code, or the sequence of steps a parser would perform to parse an input string. This checking technology is the result of ongoing research at Stanford into developing innovative tools for education, and we're excited to be the first course ever to make it available to students.</p>\nAn optional course project is to write a complete compiler for COOL, the Classroom Object Oriented Language. COOL has the essential features of a realistic programming language, but is small and simple enough that it can be implemented in a few thousand lines of code. Students who choose to do the project can implement it in either C++ or Java.<br>I hope you enjoy the course!\n<p><strong>Why Study Compilers?</strong></p>\nEverything that computers do is the result of some program, and all of the millions of programs in the world are written in one of the many thousands of programming languages that have been developed over the last 60 years. Designing and implementing a programming language turns out to be difficult; some of the best minds in computer science have thought about the problems involved and contributed beautiful and deep results. Learning something about compilers will show you the interplay of theory and practice in computer science, especially how powerful general ideas combined with engineering insight can lead to practical solutions to very hard problems. Knowing how a compiler works will also make you a better programmer and increase your ability to learn new programming languages quickly.\n<div></div>",
"provider" : "coursera",
"short_description" : "This course will discuss the major ideas used today in the implementation of programming language compilers. You will learn how a program written in a high-level language designed for humans is systematically translated into a program written in low-level assembly more suited to machines!",
"course_url" : "http://class.coursera.org/compilers/",
"instructor" : "Alex Aiken, Professor",
"id" : "d114184968727a76272e32f6e4417e78",
"categories" : [
{
"id" : 11,
"name" : "Computer Science: Systems & Security",
"mailing_list_id" : null,
"short_name" : "cs-systems",
"description" : "Our wide range of courses allows students to explore topics from many different fields of study. Sign up for a class today and join our global community of students and scholars!"
},
{
"id" : 12,
"name" : "Computer Science: Software Engineering",
"mailing_list_id" : null,
"short_name" : "cs-programming",
"description" : "Our wide range of courses allows students to explore topics from many different fields of study. Sign up for a class today and join our global community of students and scholars!"
}
]
}
>
Saving to a relational database
In addition to mongodb, I also wanted to save the data to a relational database as the data is naturally relational. I choose to use the ORM SQL Alchemy as the data should be very simple to model and this will allow me the flexibility to use any major SQL implementation. I decided to use postgres, but switching to mysql should be a simple single line change.
While mongo was a very easy implementation the SQL version is more complex as we need to setup the schema. Fortunately SQL ALchemy makes this very simple. In the storage
folder I created a sql_setup
folder that has the one off scripts to run to setup the schemas.
Here is the db_setup.py
script that creates the schema.
from sqlalchemy import Column, ForeignKey, Date, String, Text, Integer, Table
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
from sqlalchemy import create_engine
import yaml
def get_settings():
config = yaml.load(open('config.yaml'))
return config['storage_configs'].get("SQL", None)
Base = declarative_base()
course_tag_association_table = Table(
'course_tag_association',
Base.metadata,
Column('course_id', String(50), ForeignKey('course.id')),
Column('tag_id', Integer, ForeignKey('tag.id'))
)
course_category_association_table = Table(
'course_category_association',
Base.metadata,
Column('course_id', String(50), ForeignKey('course.id')),
Column('category_id', Integer, ForeignKey('category.id'))
)
class Institution(Base):
__tablename__ = 'institution'
# Here we define columns for the table address.
# Notice that each column is also a normal Python instance attribute.
id = Column(String(50), primary_key=True)
name = Column(String(250), nullable=False)
description = Column(Text)
website = Column(String(250))
logo_url = Column(String(250))
city = Column(String(250))
state = Column(String(250))
country = Column(String(250))
class Course(Base):
__tablename__ = "course"
id = Column(String(50), primary_key=True)
name = Column(String(250), nullable=False)
language = Column(String(250))
instructor = Column(String(250))
providers_id = Column(String(50), nullable=False)
short_description = Column(Text)
full_description = Column(Text)
course_url = Column(String(250))
workload = Column(String(250))
institution_id = Column(String(50), ForeignKey('institution.id'))
institution = relationship(Institution)
tags = relationship("Tag", secondary=course_tag_association_table)
categories = relationship("Category", secondary=course_category_association_table)
class Session(Base):
__tablename__ = "session"
id = Column(Integer, primary_key=True)
course_id = Column(String(50), ForeignKey('course.id'))
course = relationship(Course)
duration = Column(String(250))
start_date = Column(Date)
class Media(Base):
__tablename__ = "media"
id = Column(Integer, primary_key=True)
course_id = Column(String(50), ForeignKey('course.id'))
course = relationship(Course)
photo_url = Column(String(250))
icon_url = Column(String(250))
video_url = Column(String(250))
video_type = Column(String(20))
video_id = Column(String(50))
class Tag(Base):
__tablename__ = "tag"
id = Column(Integer, primary_key=True)
tag = Column(String(50), unique=True, nullable=False)
class Category(Base):
__tablename__ = "category"
id = Column(Integer, primary_key=True)
name = Column(String(250), unique=True, nullable=False)
description = Column(Text)
# Create an engine that stores data in the local directory's
# sqlalchemy_example.db file.
engine = create_engine(get_settings()['connection_string'])
# Create all tables in the engine. This is equivalent to "Create Table"
# statements in raw SQL.
Base.metadata.create_all(engine)
It should be pretty simple to follow, however if you don't have any experience with ORMs and SQL Alchemy start here. To create a schema, assuming you have your sql implementation installed and running, simply edit the connection_string statement in the config.yaml
file.
#configure the wisdom system
providers: [Coursera]
storage_engines: [SQL, MongoDB]
storage_configs:
MongoDB:
db: wisdom
unit_test_collection: unit_tests
SQL:
connection_string: postgresql://wisdom_backend:wisdom_backend@localhost/wisdom
Then run the db_setup.py script from the project root (since the yaml file is in the root).
Now that the schema is setup, we can create the SQL engine to save the courses. In our storage engine we need to import the table Classes from the db_setup file like this:
from sql_setup.db_setup import Institution, Base, Course, Session, Media, Tag, Category
We also want to inherit the StorageBase and implement the store_courses() method. The implementation is simply a matter of mapping the JSON from our providers to the fields in each table. We also use the session.merge
function to upsert the data so we don't create duplicates if we run the import weekly.
Here is the final SQL storage implementation.
from sqlalchemy.engine import create_engine
from sqlalchemy.orm import sessionmaker
from sql_setup.db_setup import Institution, Base, Course, Session, Media, Tag, Category
from datetime import datetime
from storage import StorageBase
class SQL(StorageBase):
#do not override here, place this in the YAML configuration config.yaml
def __init__(self):
config = self.get_config(self.__class__.__name__) # call the super init method to get config
engine = create_engine(config['connection_string'])
# Bind the engine to the metadata of the Base class so that the
# declaratives can be accessed through a instance
Base.metadata.bind = engine
DBSession = sessionmaker(bind=engine)
self.session = DBSession()
def store_courses(self, courses):
session = self.session
#we can't bulk upsert so lets loop and upsert
for course in courses:
i = course['institution']
new_institution = Institution(
id=i.get("id"),
name=i.get("name", "None"),
description=i.get("name"),
website=i.get("website"),
logo_url=i.get("logo_url"),
city=i.get("city"),
state=i.get("state"),
country=i.get("country")
)
new_institution = session.merge(new_institution)
session.add(new_institution)
session.commit()
new_course = Course(
id=course.get("id"),
name=course.get("course_name"),
language=course.get("language"),
instructor=course.get("instructor"),
providers_id=course.get("providers_id"),
short_description=course.get("short_description"),
full_description=course.get("full_description"),
course_url=course.get("course_url"),
workload=course.get("workload"),
institution=new_institution
)
new_course = session.merge(new_course)
session.add(new_course)
session.commit()
for s in course['sessions']:
new_session = Session(
id=s.get("provider_session_id"),
duration=s.get("duration", None),
start_date=datetime.strptime(s.get("start_date"), "%Y%m%d"),
course=new_course
)
new_session = session.merge(new_session)
session.add(new_session)
session.commit()
media = course['media']
new_media = session.query(Media).filter_by(course=new_course).first()
if not new_media:
new_media = Media(
course=new_course,
photo_url=media.get("photo_url", None),
video_url=media.get("video_url", None),
video_type=media.get("video_type", None),
video_id=media.get("video_id", None),
icon_url=media.get("icon_url", None)
)
session.add(new_media)
session.commit()
for t in course['tags']:
new_tag = session.query(Tag).filter_by(tag=t).first()
if not new_tag:
new_tag = Tag(tag=t)
new_course.tags.append(new_tag)
session.commit()
for c in course['categories']:
new_cat = Category(
id=c.get("id"),
name=c.get("name"),
description=c.get("description", None)
)
new_cat = session.merge(new_cat)
new_course.categories.append(new_cat)
session.commit()
After running the import, we can look in our database and see our imported data. Here is a listing of courses at The University of Florida.