BogoToBogo
  • Home
  • About
  • Big Data
  • Machine Learning
  • AngularJS
  • Python
  • C++
  • go
  • DevOps
  • Kubernetes
  • Algorithms
  • More...
    • Qt 5
    • Linux
    • FFmpeg
    • Matlab
    • Django 1.8
    • Ruby On Rails
    • HTML5 & CSS

Flask app 3 - word counts with Redis task queue

Python-Flask.png




Bookmark and Share





bogotobogo.com site search:

Note

In this tutorial, we'll implement a Redis task queue to handle the text processing that because if lots of users all hitting our site at once to get word counts, the counting takes longer to process.

So, instead of counting the words after each user makes a request, we need to use a queue to process this in the backend.

New tools used in this tutorial:

  1. Redis (3.2.3) - to install see http://redis.io/download
  2. Python Redis (2.10.5)
  3. $ sudo pip install redis
    
  4. RQ (Redis Queue) (0.6.0) - a simple Python library for queueing jobs and processing them in the background with workers.
$ pip install redis==2.10.5 rq==0.6.0
$ pip freeze > requirements.txt

Github source : akadrone-flask







Creating a worker process

Let's create a worker process to listen for queued tasks.

Here is worker.py:

import os

import redis
from rq import Worker, Queue, Connection

listen = ['default']

redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379')

conn = redis.from_url(redis_url)

if __name__ == '__main__':
    with Connection(conn):
        worker = Worker(list(map(Queue, listen)))
        worker.work()

Here, we listened for a queue called default and established a connection to the Redis server on localhost:6379.

Start the Redis server:

$ redis-server
...
The server is now ready to accept connections on port 6379

In another terminal:

$ python worker.py
23:28:05 RQ worker u'rq:worker:K-Notebook.7562' started, version 0.6.0
23:28:05 Cleaning registries for queue: default
23:28:05 
23:28:05 *** Listening on default...




Updating app

Now we need to update our app (aka.py) to send jobs to the queue:

import os
import requests
import operator
import re
import nltk
from flask import Flask, render_template, request
from flask.ext.sqlalchemy import SQLAlchemy
from stop_words import stops
from collections import Counter
from bs4 import BeautifulSoup
from rq import Queue
from rq.job import Job
from worker import conn

app = Flask(__name__)

app.config.from_object('config')
app_settings = app.config['APP_SETTINGS']
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://localhost/wordcount_dev'

db = SQLAlchemy(app)

q = Queue(connection=conn)

from models import *

def count_and_save_words(url):

    errors = []

    try:
        r = requests.get(url)
    except:
        errors.append(
            "Unable to get URL. Please make sure it's valid and try again."
        )
        return {"error": errors}

    # text processing
    raw = BeautifulSoup(r.text).get_text()
    nltk.data.path.append('./nltk_data/')  # set the path
    tokens = nltk.word_tokenize(raw)
    text = nltk.Text(tokens)

    # remove punctuation, count raw words
    nonPunct = re.compile('.*[A-Za-z].*')
    raw_words = [w for w in text if nonPunct.match(w)]
    raw_word_count = Counter(raw_words)

    # stop words
    no_stop_words = [w for w in raw_words if w.lower() not in stops]
    no_stop_words_count = Counter(no_stop_words)

    # save the results
    try:
        result = Result(
            url=url,
            result_all=raw_word_count,
            result_no_stop_words=no_stop_words_count
        )
        db.session.add(result)
        db.session.commit()
        return result.id
    except:
        errors.append("Unable to add item to database.")
        return {"error": errors}

@app.route('/', methods=['GET', 'POST'])
def index():
    results = {}
    if request.method == "POST":
        # get url that the person has entered
        url = request.form['url']
        if 'http://' not in url[:7]:
            url = 'http://' + url
        job = q.enqueue_call(
            func=count_and_save_words, args=(url,), result_ttl=5000
        )
        print(job.get_id())

    return render_template('index.html', results=results)

@app.route("/results/<job_key>", methods=['GET'])
def get_results(job_key):

    job = Job.fetch(job_key, connection=conn)

    if job.is_finished:
        return str(job.result), 200
    else:
        return "Nay!", 202

if __name__ == '__main__':
    app.run()

The line q = Queue(connection=conn) set up a Redis connection and initialized a queue based on that connection.

Move the text processing functionality out of our index route and into a new function called count_and_save_words(). This function accepts one argument, a URL, which we will pass to it when we call it from our index route.

We used the queue that we initialized earlier. It is calling enqueue_call() function. This is adding a new job to the queue and that job is running the count_and_save_words() function with the URL as the argument.

The result_ttl=5000 line argument tells RQ how long (5,000 seconds) to hold the result of the job. Then, we print out the job id to the terminal. This id is needed to see if the processing job is done.

Note that the id number is generated when we add the results to the database.

    # save the results
    try:
        result = Result(
            url=url,
            result_all=raw_word_count,
            result_no_stop_words=no_stop_words_count
        )
        db.session.add(result)
        db.session.commit()
        return result.id




Getting result

Now we may want to test our implementation.

Let's run our server first:

$ python manage.py runserver
...
 * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
...
b9a81d99-94bf-43b5-9270-01bcac47b72b
...

Then, in another terminal, run our worker module:

$ python worker.py
...
16:11:52 default: Job OK (148c7c07-7102-41a4-a5f5-4517daa840b8)
16:11:52 Result is kept for 5000 seconds
16:11:52 
16:11:52 *** Listening on default...

Then use that id in the /results/ endpoint - i.e., http://localhost:5000/results/b9a81d99-94bf-43b5-9270-01bcac47b72b.

Nay-from-Worker.png



Results in JSON

We want the actual JSON formatted results from the database. So, let's refactor the route in aka.py:

import os
import requests
import operator
import re
import nltk
from flask import Flask, render_template, request
from flask.ext.sqlalchemy import SQLAlchemy
from stop_words import stops
from collections import Counter
from bs4 import BeautifulSoup
from rq import Queue
from rq.job import Job
from worker import conn
from flask import jsonify

app = Flask(__name__)

app.config.from_object('config')
app_settings = app.config['APP_SETTINGS']
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://localhost/wordcount_dev'

db = SQLAlchemy(app)

q = Queue(connection=conn)

from models import *

def count_and_save_words(url):

    errors = []

    try:
        r = requests.get(url)
    except:
        errors.append(
            "Unable to get URL. Please make sure it's valid and try again."
        )
        return {"error": errors}

    # text processing
    raw = BeautifulSoup(r.text).get_text()
    nltk.data.path.append('./nltk_data/')  # set the path
    tokens = nltk.word_tokenize(raw)
    text = nltk.Text(tokens)

    # remove punctuation, count raw words
    nonPunct = re.compile('.*[A-Za-z].*')
    raw_words = [w for w in text if nonPunct.match(w)]
    raw_word_count = Counter(raw_words)

    # stop words
    no_stop_words = [w for w in raw_words if w.lower() not in stops]
    no_stop_words_count = Counter(no_stop_words)

    # save the results
    try:
        result = Result(
            url=url,
            result_all=raw_word_count,
            result_no_stop_words=no_stop_words_count
        )
        db.session.add(result)
        db.session.commit()
        return result.id
    except:
        errors.append("Unable to add item to database.")
        return {"error": errors}

@app.route('/', methods=['GET', 'POST'])
def index():
    results = {}
    if request.method == "POST":
        # get url that the person has entered
        url = request.form['url']
        if 'http://' not in url[:7]:
            url = 'http://' + url
        job = q.enqueue_call(
            func=count_and_save_words, args=(url,), result_ttl=5000
        )
        print(job.get_id())

    return render_template('index.html', results=results)

@app.route("/results/<job_key>", methods=['GET'])
def get_results(job_key):

    job = Job.fetch(job_key, connection=conn)

    if job.is_finished:
        result = Result.query.filter_by(id=job.result).first()
        results = sorted(
            result.result_no_stop_words.items(),
            key=operator.itemgetter(1),
            reverse=True
        )[:10]
        return jsonify(results)

    else:
        return "Nay!", 202

if __name__ == '__main__':
    app.run()

Note that we added the import:

from flask import jsonify

Let's test it again. In one terminal:

$ python worker.py
14:14:24 RQ worker u'rq:worker:K-Notebook.8359' started, version 0.6.0
14:14:24 Cleaning registries for queue: default
14:14:24 
14:14:24 *** Listening on default...

On another terminal:

$ ./gunicorn_aka.sh
... Starting gunicorn 19.6.0
... Listening at: http://127.0.0.1:5000 (9486)

After we submit a url (i.e. google.com), we get id on the 2nd terminal were we ran 'gunicorn':

148c7c07-7102-41a4-a5f5-4517daa840b8

Then, we add '/results/id' to the url:

JSON-Response.png



References

Flask by Example Part4









Ph.D. / Golden Gate Ave, San Francisco / Seoul National Univ / Carnegie Mellon / UC Berkeley / DevOps / Deep Learning / Visualization

YouTubeMy YouTube channel

Sponsor Open Source development activities and free contents for everyone.

Thank you.

- K Hong








Flask



Deploying Flask Hello World App with Apache WSGI on Ubuntu 14

Flask Micro blog "Admin App" with Postgresql

Flask "Blog App" with MongoDB - Part 1 (Local via Flask server)

Flask "Blog App" with MongoDB on Ubuntu 14 - Part 2 (Local Apache WSGI)

Flask "Blog App" with MongoDB on CentOS 7 - Part 3 (Production Apache WSGI )

Flask word count app 1 with PostgreSQL and Flask-SQLAlchemy

Flask word count app 2 via BeautifulSoup, and Natural Language Toolkit (NLTK) with Gunicorn/PM2/Apache

Flask word count app 3 with Redis task queue

Flask word count app 4 with AngularJS polling the back-end

Flask word count app 5 with AngularJS front-end updates and submit error handling

Flask word count app 0 - Errors and Fixes

Flask with Embedded Machine Learning I : Serializing with pickle and DB setup

Flask with Embedded Machine Learning II : Basic Flask App

Flask with Embedded Machine Learning III : Embedding Classifier

Flask with Embedded Machine Learning IV : Deploy

Flask with Embedded Machine Learning V : Updating the classifier

Flask AJAX with jQuery

Flask blog app with Dashboard 1 - SignUp page

Flask blog app with Dashboard 2 - Sign-In / Sign-Out

Flask blog app with Dashboard 3 - Adding blog post item

Flask blog app with Dashboard 4 - Update / Delete

Flask blog app with Dashboard 5 - Uploading an image

Flask blog app with Dashboard 6 - Dash board

Flask blog app with Dashboard 7 - Like button

Flask blog app with Dashboard 8 - Deploy

Flask blog app with Dashboard - Appendix (tables and mysql stored procedures/functions

Sponsor Open Source development activities and free contents for everyone.

Thank you.

- K Hong






Python tutorial



Python Home

Introduction

Running Python Programs (os, sys, import)

Modules and IDLE (Import, Reload, exec)

Object Types - Numbers, Strings, and None

Strings - Escape Sequence, Raw String, and Slicing

Strings - Methods

Formatting Strings - expressions and method calls

Files and os.path

Traversing directories recursively

Subprocess Module

Regular Expressions with Python

Regular Expressions Cheat Sheet

Object Types - Lists

Object Types - Dictionaries and Tuples

Functions def, *args, **kargs

Functions lambda

Built-in Functions

map, filter, and reduce

Decorators

List Comprehension

Sets (union/intersection) and itertools - Jaccard coefficient and shingling to check plagiarism

Hashing (Hash tables and hashlib)

Dictionary Comprehension with zip

The yield keyword

Generator Functions and Expressions

generator.send() method

Iterators

Classes and Instances (__init__, __call__, etc.)

if__name__ == '__main__'

argparse

Exceptions

@static method vs class method

Private attributes and private methods

bits, bytes, bitstring, and constBitStream

json.dump(s) and json.load(s)

Python Object Serialization - pickle and json

Python Object Serialization - yaml and json

Priority queue and heap queue data structure

Graph data structure

Dijkstra's shortest path algorithm

Prim's spanning tree algorithm

Closure

Functional programming in Python

Remote running a local file using ssh

SQLite 3 - A. Connecting to DB, create/drop table, and insert data into a table

SQLite 3 - B. Selecting, updating and deleting data

MongoDB with PyMongo I - Installing MongoDB ...

Python HTTP Web Services - urllib, httplib2

Web scraping with Selenium for checking domain availability

REST API : Http Requests for Humans with Flask

Blog app with Tornado

Multithreading ...

Python Network Programming I - Basic Server / Client : A Basics

Python Network Programming I - Basic Server / Client : B File Transfer

Python Network Programming II - Chat Server / Client

Python Network Programming III - Echo Server using socketserver network framework

Python Network Programming IV - Asynchronous Request Handling : ThreadingMixIn and ForkingMixIn

Python Coding Questions I

Python Coding Questions II

Python Coding Questions III

Python Coding Questions IV

Python Coding Questions V

Python Coding Questions VI

Python Coding Questions VII

Python Coding Questions VIII

Python Coding Questions IX

Python Coding Questions X

Image processing with Python image library Pillow

Python and C++ with SIP

PyDev with Eclipse

Matplotlib

Redis with Python

NumPy array basics A

NumPy Matrix and Linear Algebra

Pandas with NumPy and Matplotlib

Celluar Automata

Batch gradient descent algorithm

Longest Common Substring Algorithm

Python Unit Test - TDD using unittest.TestCase class

Simple tool - Google page ranking by keywords

Google App Hello World

Google App webapp2 and WSGI

Uploading Google App Hello World

Python 2 vs Python 3

virtualenv and virtualenvwrapper

Uploading a big file to AWS S3 using boto module

Scheduled stopping and starting an AWS instance

Cloudera CDH5 - Scheduled stopping and starting services

Removing Cloud Files - Rackspace API with curl and subprocess

Checking if a process is running/hanging and stop/run a scheduled task on Windows

Apache Spark 1.3 with PySpark (Spark Python API) Shell

Apache Spark 1.2 Streaming

bottle 0.12.7 - Fast and simple WSGI-micro framework for small web-applications ...

Flask app with Apache WSGI on Ubuntu14/CentOS7 ...

Selenium WebDriver

Fabric - streamlining the use of SSH for application deployment

Ansible Quick Preview - Setting up web servers with Nginx, configure enviroments, and deploy an App

Neural Networks with backpropagation for XOR using one hidden layer

NLP - NLTK (Natural Language Toolkit) ...

RabbitMQ(Message broker server) and Celery(Task queue) ...

OpenCV3 and Matplotlib ...

Simple tool - Concatenating slides using FFmpeg ...

iPython - Signal Processing with NumPy

iPython and Jupyter - Install Jupyter, iPython Notebook, drawing with Matplotlib, and publishing it to Github

iPython and Jupyter Notebook with Embedded D3.js

Downloading YouTube videos using youtube-dl embedded with Python

Machine Learning : scikit-learn ...

Django 1.6/1.8 Web Framework ...









Contact

BogoToBogo
contactus@bogotobogo.com

Follow Bogotobogo

About Us

contactus@bogotobogo.com

YouTubeMy YouTube channel
Pacific Ave, San Francisco, CA 94115

Pacific Ave, San Francisco, CA 94115

Copyright © 2024, bogotobogo
Design: Web Master