RabbitMQ : Exchanges & Publish/Subscribe
RabbitMQ & Celery Tutorials
Installing RabbitMQ & Celery
Hello World RabbitMQ
Work Queues (Task Queues) : RabbitMQ
Exchanges - Publish/Subscribe : RabbitMQ
Multiple bindings - Routing : RabbitMQ
Queueing Messages using Celery with RabbitMQ Message Broker Server
In this chapter, we're going to build a simple logging system, and we'll able to broadcast log messages to many receivers.
Picture from slides.com.
The producer never sends any messages directly to a queue. Instead, the producer can only send messages to an exchange. It not only receives messages from producers but also pushes them to queues. The exchange must know exactly what to do with a message it receives: whether it should be appended to a particular queue or appended to many queues. The answer depends on the exchange type.
There are 4 few exchange types: direct, topic, headers and fanout. In this chapter, we'll use the fanout type. Let's create an exchange of that type, and call it logs. The fanout just broadcasts all the messages it receives to all the queues it knows:
channel.exchange_declare(exchange='logs', type='fanout'
To list the exchanges on the server, we run rabbitmqctl:
$ sudo rabbitmqctl list_exchanges Listing exchanges ... direct amq.direct direct amq.fanout fanout amq.headers headers amq.match headers amq.rabbitmq.log topic amq.rabbitmq.trace topic amq.topic topic ...done.
As we can see from the list, there are amq.* exchanges and the default (unnamed) exchange which are created by default.
Recall, in our previous chapter, we were able to send messages to queues using a default exchange, which we identify by the empty string (""):
channel.basic_publish(exchange='', routing_key='hello', body=message)
However, we can now publish to our named exchange instead:
channel.basic_publish(exchange='logs', routing_key='', body=message)
So far, we've been using named queue such as "hello" or "task_queue", but now we want to use "randomly named" queues. How? By not supplying the queue parameter to queue_declare:
result = channel.queue_declare()
the result.method.queue contains a random queue name.
Also, once we disconnect the consumer, the queue should be deleted:
result = channel.queue_declare(exclusive=True)
We need to tell the exchange to send messages to our queue. This process is called binding.
channel.queue_bind(exchange='logs', queue=result.method.queue)
Now the exchange (logs) will append messages to our queue.
The producer program emits log messages. Since we now want to publish messages to our logs exchange instead of the nameless one, we supply a routing_key when sending, but its value is ignored for fanout exchanges. Since publishing to a non-existing exchange is forbidden, we declared the exchange after establishing the connection.
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print " [x] Sent %r" % (message,) connection.close()
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) print ' [*] Waiting for logs. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] %r" % (body,) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
To save logs to a file, just open a console and type:
$ python receive_logs.py > logs_from_rabbit.log
To see the logs on your screen, spawn a new terminal and run:
$ python receive_logs.py
To see the logs on your screen, spawn a new terminal and run:
$ python emit_log.py
Then, on each console that's running receive_logs.py, we can see the following:
$ python receive_logs.py [*] Waiting for logs. To exit press CTRL+C [x] 'info: Hello World!' [x] 'info: Hello World!'
Using rabbitmqctl list_bindings. we can verify that the code actually creates bindings and queues as we want:
$ sudo rabbitmqctl list_bindings Listing bindings ... exchange amq.gen-gq_vRxMnVA2rn2_rMlqi-A queue amq.gen-gq_vRxMnVA2rn2_rMlqi-A [] exchange amq.gen-pMYkFEP6cLAsLc1W1OA8tg queue amq.gen-pMYkFEP6cLAsLc1W1OA8tg [] exchange task_queue queue task_queue [] logs exchange amq.gen-gq_vRxMnVA2rn2_rMlqi-A queue amq.gen-gq_vRxMnVA2rn2_rMlqi-A [] logs exchange amq.gen-pMYkFEP6cLAsLc1W1OA8tg queue amq.gen-pMYkFEP6cLAsLc1W1OA8tg [] ...done.
From the output, we see logs exchanges including the exchanges we used in previous chapters.
RabbitMQ & Celery Tutorials
Installing RabbitMQ & Celery
Hello World RabbitMQ
Work Queues (Task Queues) : RabbitMQ
Exchanges - Publish/Subscribe : RabbitMQ
Multiple bindings - Routing : RabbitMQ
Queueing Messages using Celery with RabbitMQ Message Broker Server
Python tutorial
Python Home
Introduction
Running Python Programs (os, sys, import)
Modules and IDLE (Import, Reload, exec)
Object Types - Numbers, Strings, and None
Strings - Escape Sequence, Raw String, and Slicing
Strings - Methods
Formatting Strings - expressions and method calls
Files and os.path
Traversing directories recursively
Subprocess Module
Regular Expressions with Python
Regular Expressions Cheat Sheet
Object Types - Lists
Object Types - Dictionaries and Tuples
Functions def, *args, **kargs
Functions lambda
Built-in Functions
map, filter, and reduce
Decorators
List Comprehension
Sets (union/intersection) and itertools - Jaccard coefficient and shingling to check plagiarism
Hashing (Hash tables and hashlib)
Dictionary Comprehension with zip
The yield keyword
Generator Functions and Expressions
generator.send() method
Iterators
Classes and Instances (__init__, __call__, etc.)
if__name__ == '__main__'
argparse
Exceptions
@static method vs class method
Private attributes and private methods
bits, bytes, bitstring, and constBitStream
json.dump(s) and json.load(s)
Python Object Serialization - pickle and json
Python Object Serialization - yaml and json
Priority queue and heap queue data structure
Graph data structure
Dijkstra's shortest path algorithm
Prim's spanning tree algorithm
Closure
Functional programming in Python
Remote running a local file using ssh
SQLite 3 - A. Connecting to DB, create/drop table, and insert data into a table
SQLite 3 - B. Selecting, updating and deleting data
MongoDB with PyMongo I - Installing MongoDB ...
Python HTTP Web Services - urllib, httplib2
Web scraping with Selenium for checking domain availability
REST API : Http Requests for Humans with Flask
Blog app with Tornado
Multithreading ...
Python Network Programming I - Basic Server / Client : A Basics
Python Network Programming I - Basic Server / Client : B File Transfer
Python Network Programming II - Chat Server / Client
Python Network Programming III - Echo Server using socketserver network framework
Python Network Programming IV - Asynchronous Request Handling : ThreadingMixIn and ForkingMixIn
Python Coding Questions I
Python Coding Questions II
Python Coding Questions III
Python Coding Questions IV
Python Coding Questions V
Python Coding Questions VI
Python Coding Questions VII
Python Coding Questions VIII
Python Coding Questions IX
Python Coding Questions X
Image processing with Python image library Pillow
Python and C++ with SIP
PyDev with Eclipse
Matplotlib
Redis with Python
NumPy array basics A
NumPy Matrix and Linear Algebra
Pandas with NumPy and Matplotlib
Celluar Automata
Batch gradient descent algorithm
Longest Common Substring Algorithm
Python Unit Test - TDD using unittest.TestCase class
Simple tool - Google page ranking by keywords
Google App Hello World
Google App webapp2 and WSGI
Uploading Google App Hello World
Python 2 vs Python 3
virtualenv and virtualenvwrapper
Uploading a big file to AWS S3 using boto module
Scheduled stopping and starting an AWS instance
Cloudera CDH5 - Scheduled stopping and starting services
Removing Cloud Files - Rackspace API with curl and subprocess
Checking if a process is running/hanging and stop/run a scheduled task on Windows
Apache Spark 1.3 with PySpark (Spark Python API) Shell
Apache Spark 1.2 Streaming
bottle 0.12.7 - Fast and simple WSGI-micro framework for small web-applications ...
Flask app with Apache WSGI on Ubuntu14/CentOS7 ...
Fabric - streamlining the use of SSH for application deployment
Ansible Quick Preview - Setting up web servers with Nginx, configure enviroments, and deploy an App
Neural Networks with backpropagation for XOR using one hidden layer
NLP - NLTK (Natural Language Toolkit) ...
RabbitMQ(Message broker server) and Celery(Task queue) ...
OpenCV3 and Matplotlib ...
Simple tool - Concatenating slides using FFmpeg ...
iPython - Signal Processing with NumPy
iPython and Jupyter - Install Jupyter, iPython Notebook, drawing with Matplotlib, and publishing it to Github
iPython and Jupyter Notebook with Embedded D3.js
Downloading YouTube videos using youtube-dl embedded with Python
Machine Learning : scikit-learn ...
Django 1.6/1.8 Web Framework ...
Ph.D. / Golden Gate Ave, San Francisco / Seoul National Univ / Carnegie Mellon / UC Berkeley / DevOps / Deep Learning / Visualization