How to Create a Basic CRUD API operation using Python Flask

we are going to create a CRUD API for a book library server using Python Flask and an SQLite database server side applications.

CRUD is a common acronym used in software development. It stands for the 4 services provided by a REST API, that is: Create, Read, Update and Delete. These services are usually provided by the POST, GET, PUT and DELETE endpoints of a RESTful API. In this article, we are going to create a CRUD API for a book library server using Python Flask and an SQLite database.


CrudApp


Building the Flask Server

Flask is a micro web framework for building application servers using Python. Before we begin creating our web app, we need to create a Python Virtual Environment. You can learn more about Python virtual environments here.


To begin, please enter the following into your CLI.

    $ mkdir flask-book-api

    $ cd flask-book-api

    $ python3 -m venv venv
    
    $ source venv/bin/activate
    
    $ pip install flask
    

The first two commands make a new app folder called ‘flask-book-api’ and enter the folder. The next two commands serve to create a new Python virtual environment called ‘venv’ and to activate it. We then use the pip command to install the Flask Python package.

To make a ‘Hello World’ app using Flask create a file named app.py in your app folder and enter the following into the file.

    from flask import Flask

    app = Flask(__name__)

    @app.route('/')
    def hello_world():
        return 'Hello World!'
    
    if __name__ == '__main__':
        app.run()
    

Now enter the following command on your CLI while in your app folder to run the server.

$ flask run 

CTRL + click the ‘http://127.0.0.1:5000’ link on your CLI to go to your browser and see your working server.


Building the SQLite DB

SQLite is a simple and fast, open source SQL engine that can be used with Python to store and manipulate application data. The SQlite3 module is usually shipped with the Python distribution so we can use it to create our database without having to install anything else. However, we first have to create a Book model class to represent a book object in our code that will correspond to a book table in our database. Create a ‘models.py’ file in your app folder and enter the following code into it.

class book:

    def __init__(self, id, available, title, timestamp):
    self.id = id
    self.title = title
    self.available = available
    self.timestamp = timestamp

   def __repr__(self):
    return '<id {}>'.format(self.id)

   def serialize(self):
    return {

       'id': self.id,

       'title': self.title,

       'available': self.available,

       'timestamp':self.timestamp
    }

  Next, we need to create a ‘db.py’ file that will have the code that does the CRUD operations to the database. For this project we will also add some initial data to the database every time we connect to it from an array of objects. Create the ‘db.py’ file and enter the following into it.

    import sqlite3, random, datetime
    from models import Book


    def getNewId():
        return random.getrandbits(28)
    
    books = [
    
        {
    
        'available': True,
    
        'title': 'Don Quixote',
    
        'timestamp': datetime.datetime.now()
    
        },
    
        {
    
            'available': True,
    
            'title': 'A Tale of Two Cities',
    
            'timestamp': datetime.datetime.now()
    
        },
    
        {
    
            'available': True,
    
            'title': 'The Lord of the Rings',
    
            'timestamp': datetime.datetime.now()
    
        },
    
        {
    
            'available': True,
    
            'title': 'The Little Prince',
    
            'timestamp': datetime.datetime.now()
    
        },
    
        {
    
            'available': True,
    
            'title': "Harry Potter and the Sorcerer's Stone",
    
            'timestamp': datetime.datetime.now()
    
        },
    
        {
    
            'available': True,
    
            'title': 'And Then There Were None',
    
            'timestamp': datetime.datetime.now()
    
        },
    
        {
    
            'available': True,
    
            'title': 'The Dream of the Red Table',
    
            'timestamp': datetime.datetime.now()
    
        },
    
        {
    
            'available': True,
    
            'title': 'The Hobbit',
    
            'timestamp': datetime.datetime.now()
    
        },
    
        {
    
            'available': True,
    
            'title': 'The Lion, the Witch and the Wardrobe',
    
            'timestamp': datetime.datetime.now()
    
        },
    
        {
    
            'available': True,
    
            'title': 'The Da Vinci Code',
    
            'timestamp': datetime.datetime.now()
    
         }
    
    ]
    
    
    def connect():
        conn = sqlite3.connect('books.db')
        cur = conn.cursor()
        cur.execute("CREATE TABLE IF NOT EXISTS books (id INTEGER PRIMARY KEY, available BOOLEAN, title TEXT, timestamp TEXT)")
        conn.commit()
        conn.close()
    
        for i in books:
            bk = Book(getNewId(), i['available'], i['title'], i['timestamp'])
            insert(bk)
    
    
    
    def insert(book):
        conn = sqlite3.connect('books.db')
        cur = conn.cursor()
        cur.execute("INSERT INTO books VALUES (?,?,?,?)", (
            book.id,
            book.available,
            book.title,
            book.timestamp
        ))
        conn.commit()
        conn.close()
    
    
    
    def view():
        conn = sqlite3.connect('books.db')
        cur = conn.cursor()
        cur.execute("SELECT * FROM books")
        rows = cur.fetchall()
        books = []
    
        for i in rows:
            book = Book(i[0], True if i[1] == 1 else False, i[2], i[3])
            books.append(book)
        conn.close()
        return books
    
    
    
    def update(book):
        conn = sqlite3.connect('books.db')
        cur = conn.cursor()
        cur.execute("UPDATE books SET available=?, title=? WHERE id=?", (book.available, book.title, book.id))
        conn.commit()
        conn.close()
    
    
    
    def delete(theId):
        conn = sqlite3.connect('books.db')
        cur = conn.cursor()
        cur.execute("DELETE FROM books WHERE id=?", (theId,))
        conn.commit()
        conn.close()
    
    
    
    def deleteAll():
        conn = sqlite3.connect('books.db')
        cur = conn.cursor()
        cur.execute("DELETE FROM books")
        conn.commit()
        conn.close()
    

We can now add code to our ‘app.py’ file to access the corresponding database method for the POST, GET, PUT & DELETE endpoints. Open the ‘app.py’ file and enter the following code.

    from flask import Flask, render_template, request, jsonify
    import os, re, datetime
    import db
    from models import Book
    
    app = Flask(__name__)
    
    
    # create the database and table. Insert 10 test books into db
    
    # Do this only once to avoid inserting the test books into
    
    # the db multiple times
    
    if not os.path.isfile('books.db'):
        db.connect()
    
    
    
    # route for landing page
    
    # check out the template folder for the index.html file
    
    # check out the static folder for css and js files
    
    @app.route("/")
    def index():
        return render_template("index.html")
    
    
    
    def isValid(email):
        regex = re.compile(r'([A-Za-z0-9]+[.-_])*[A-Za-z0-9]+@[A-Za-z0-9-]+(\ .[A-Z|a-z]{2,})+')
    
        if re.fullmatch(regex, email):
            return True
    
        else:   
            return False
    
    
    
    
    
    @app.route("/request", methods=['POST'])
    def postRequest():
        req_data = request.get_json()
        email = req_data['email']
        if not isValid(email):
            return jsonify({
                'status': '422',
                'res': 'failure',
                'error': 'Invalid email format. Please enter a valid email address'
            })
    
        title = req_data['title']
        bks = [b.serialize() for b in db.view()]
    
        for b in bks:
            if b['title'] == title:
                return jsonify({
                    # 'error': '',
                    'res': f'Error ⛔❌! Book with title {title} is already in library!',
                    'status': '404'
                })
    
    
    
        bk = Book(db.getNewId(), True, title, datetime.datetime.now())
        print('new book: ', bk.serialize())
        db.insert(bk)
        new_bks = [b.serialize() for b in db.view()]
        print('books in lib: ', new_bks)
    
        return jsonify({
            # 'error': '',
            'res': bk.serialize(),
            'status': '200',
            'msg': 'Success creating a new book!👍😀'
        })
    
    
    
    
    
    @app.route('/request', methods=['GET'])
    def getRequest():
        content_type = request.headers.get('Content-Type')
        bks = [b.serialize() for b in db.view()]
        if (content_type == 'application/json'):
            json = request.json
    
            for b in bks:
                if b['id'] == int(json['id']):
                    return jsonify({
                        # 'error': '',
                        'res': b,
                        'status': '200',
                        'msg': 'Success getting all books in library!👍😀'
                    })
    
            return jsonify({
                'error': f"Error ⛔❌! Book with id '{json['id']}' not found!",
                'res': '',
                'status': '404'
            })
    
        else:
            return jsonify({
                # 'error': '',
                'res': bks,
                'status': '200',
                'msg': 'Success getting all books in library!👍😀',
                'no_of_books': len(bks)
            })
    
    
    
    
    
    @app.route('/request/<id>', methods=['GET'])
    def getRequestId(id):
        req_args = request.view_args
        # print('req_args: ', req_args)
        bks = [b.serialize() for b in db.view()]
    
        if req_args:
            for b in bks:
                if b['id'] == int(req_args['id']):
                    return jsonify({
                        # 'error': '',
                        'res': b,
                        'status': '200',
                        'msg': 'Success getting book by ID!👍😀'
                    })
    
            return jsonify({
    
                'error': f"Error ⛔❌! Book with id '{req_args['id']}' was not found!",
                'res': '',
                'status': '404'
            })
    
        else:
            return jsonify({
    
                # 'error': '',
                'res': bks,
                'status': '200',
                'msg': 'Success getting book by ID!👍😀',
                'no_of_books': len(bks)
            })
    
    
    
    @app.route("/request", methods=['PUT'])
    def putRequest():
        req_data = request.get_json()
        availability = req_data['available']
        title = req_data['title']
        the_id = req_data['id']
        bks = [b.serialize() for b in db.view()]
        for b in bks:
            if b['id'] == the_id:
                bk = Book(
                    the_id,
                    availability,
                    title,
                    datetime.datetime.now()
                )
                print('new book: ', bk.serialize())
                db.update(bk)
                new_bks = [b.serialize() for b in db.view()]
                print('books in lib: ', new_bks)
                return jsonify({
                    # 'error': '',
                    'res': bk.serialize(),
                    'status': '200',
                    'msg': f'Success updating the book titled {title}!👍😀'
    
                })
    
        return jsonify({
            # 'error': '',
            'res': f'Error ⛔❌! Failed to update Book with title: {title}!',
            'status': '404'
    
        })
    
    @app.route('/request/<id>', methods=['DELETE'])
    def deleteRequest(id):
        req_args = request.view_args
        print('req_args: ', req_args)
        bks = [b.serialize() for b in db.view()]
    
        if req_args:
            for b in bks:
                if b['id'] == int(req_args['id']):
                    db.delete
    

With that, our CRUD server will now be complete. You should be able to test all the server’s endpoints using Postman and find them working as expected.


Building the Front-End Templates

Flask makes use of the Jinja2 templating engine to build dynamic web pages. Special placeholders in the template allow writing code similar to Python syntax. Create a folder named ‘templates’ in the app folder and a file named ‘index.html’ in the templates folder. The index html file will be our front-end’s starting point so copy the code from here and paste it into the file.


We also need to add some JavaScript code to have interactivity in our web page along with some CSS to make the page prettier. So create the following folder structure in our app folder.


   static
    |- css
    |-- style.css
    |- js
    |-- main.min.js
    templates
    |- index.html
    app.py
    models.py
    db.py
    

Now add the CSS and JS code from here and here respectively.


The server should reload itself for you such that you should now be able to see the following in your browser when you go to the ‘http://localhost:5000’ url.

Conclusion

So there you have it. We have built a Python Flask CRUD web app from scratch. Starting with a bare bones Flask app, we went through the process of building an SQLite database, a Books model class and a few endpoints for the CRUD operations on the web app. Finally, Remember to like this post if you enjoyed it.

1 comment

  1. Impressive