BogoToBogo
  • Home
  • About
  • Big Data
  • Machine Learning
  • AngularJS
  • Python
  • C++
  • go
  • DevOps
  • Kubernetes
  • Algorithms
  • More...
    • Qt 5
    • Linux
    • FFmpeg
    • Matlab
    • Django 1.8
    • Ruby On Rails
    • HTML5 & CSS

RabbitMQ : Exchanges & Publish/Subscribe

RabbitMQ_Celery_Icon.png




Bookmark and Share





bogotobogo.com site search:



RabbitMQ & Celery Tutorials




Installing RabbitMQ & Celery

Hello World RabbitMQ

Work Queues (Task Queues) : RabbitMQ

Exchanges - Publish/Subscribe : RabbitMQ

Multiple bindings - Routing : RabbitMQ

Queueing Messages using Celery with RabbitMQ Message Broker Server

Exchange

In this chapter, we're going to build a simple logging system, and we'll able to broadcast log messages to many receivers.


RabbitMQRouting.png

Picture from slides.com.



Exchanges.png

The producer never sends any messages directly to a queue. Instead, the producer can only send messages to an exchange. It not only receives messages from producers but also pushes them to queues. The exchange must know exactly what to do with a message it receives: whether it should be appended to a particular queue or appended to many queues. The answer depends on the exchange type.

There are 4 few exchange types: direct, topic, headers and fanout. In this chapter, we'll use the fanout type. Let's create an exchange of that type, and call it logs. The fanout just broadcasts all the messages it receives to all the queues it knows:

channel.exchange_declare(exchange='logs', type='fanout'



Listing exchanges

To list the exchanges on the server, we run rabbitmqctl:

$ sudo rabbitmqctl list_exchanges
Listing exchanges ...
	direct
amq.direct	direct
amq.fanout	fanout
amq.headers	headers
amq.match	headers
amq.rabbitmq.log	topic
amq.rabbitmq.trace	topic
amq.topic	topic
...done.

As we can see from the list, there are amq.* exchanges and the default (unnamed) exchange which are created by default.

Recall, in our previous chapter, we were able to send messages to queues using a default exchange, which we identify by the empty string (""):

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body=message)

However, we can now publish to our named exchange instead:

channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)



Temporary queues

So far, we've been using named queue such as "hello" or "task_queue", but now we want to use "randomly named" queues. How? By not supplying the queue parameter to queue_declare:

result = channel.queue_declare()

the result.method.queue contains a random queue name.

Also, once we disconnect the consumer, the queue should be deleted:

result = channel.queue_declare(exclusive=True)




Bindings

Binding.png

We need to tell the exchange to send messages to our queue. This process is called binding.

channel.queue_bind(exchange='logs',
                   queue=result.method.queue)

Now the exchange (logs) will append messages to our queue.





emit_log.py

The producer program emits log messages. Since we now want to publish messages to our logs exchange instead of the nameless one, we supply a routing_key when sending, but its value is ignored for fanout exchanges. Since publishing to a non-existing exchange is forbidden, we declared the exchange after establishing the connection.

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
print " [x] Sent %r" % (message,)
connection.close()




receive_logs.py
#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs',
                   queue=queue_name)

print ' [*] Waiting for logs. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] %r" % (body,)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()




Run

To save logs to a file, just open a console and type:

$ python receive_logs.py > logs_from_rabbit.log

To see the logs on your screen, spawn a new terminal and run:

$ python receive_logs.py

To see the logs on your screen, spawn a new terminal and run:

$ python emit_log.py

Then, on each console that's running receive_logs.py, we can see the following:

$ python receive_logs.py
 [*] Waiting for logs. To exit press CTRL+C
 [x] 'info: Hello World!'
 [x] 'info: Hello World!'

Using rabbitmqctl list_bindings. we can verify that the code actually creates bindings and queues as we want:

$ sudo rabbitmqctl list_bindings
Listing bindings ...
	exchange	amq.gen-gq_vRxMnVA2rn2_rMlqi-A	queue	amq.gen-gq_vRxMnVA2rn2_rMlqi-A	[]
	exchange	amq.gen-pMYkFEP6cLAsLc1W1OA8tg	queue	amq.gen-pMYkFEP6cLAsLc1W1OA8tg	[]
	exchange	task_queue	queue	task_queue	[]
logs	exchange	amq.gen-gq_vRxMnVA2rn2_rMlqi-A	queue	amq.gen-gq_vRxMnVA2rn2_rMlqi-A	[]
logs	exchange	amq.gen-pMYkFEP6cLAsLc1W1OA8tg	queue	amq.gen-pMYkFEP6cLAsLc1W1OA8tg	[]
...done.

From the output, we see logs exchanges including the exchanges we used in previous chapters.





References
  1. RabbitMQ - Publish/Subscribe



RabbitMQ & Celery Tutorials




Installing RabbitMQ & Celery

Hello World RabbitMQ

Work Queues (Task Queues) : RabbitMQ

Exchanges - Publish/Subscribe : RabbitMQ

Multiple bindings - Routing : RabbitMQ

Queueing Messages using Celery with RabbitMQ Message Broker Server




Python tutorial



Python Home

Introduction

Running Python Programs (os, sys, import)

Modules and IDLE (Import, Reload, exec)

Object Types - Numbers, Strings, and None

Strings - Escape Sequence, Raw String, and Slicing

Strings - Methods

Formatting Strings - expressions and method calls

Files and os.path

Traversing directories recursively

Subprocess Module

Regular Expressions with Python

Regular Expressions Cheat Sheet

Object Types - Lists

Object Types - Dictionaries and Tuples

Functions def, *args, **kargs

Functions lambda

Built-in Functions

map, filter, and reduce

Decorators

List Comprehension

Sets (union/intersection) and itertools - Jaccard coefficient and shingling to check plagiarism

Hashing (Hash tables and hashlib)

Dictionary Comprehension with zip

The yield keyword

Generator Functions and Expressions

generator.send() method

Iterators

Classes and Instances (__init__, __call__, etc.)

if__name__ == '__main__'

argparse

Exceptions

@static method vs class method

Private attributes and private methods

bits, bytes, bitstring, and constBitStream

json.dump(s) and json.load(s)

Python Object Serialization - pickle and json

Python Object Serialization - yaml and json

Priority queue and heap queue data structure

Graph data structure

Dijkstra's shortest path algorithm

Prim's spanning tree algorithm

Closure

Functional programming in Python

Remote running a local file using ssh

SQLite 3 - A. Connecting to DB, create/drop table, and insert data into a table

SQLite 3 - B. Selecting, updating and deleting data

MongoDB with PyMongo I - Installing MongoDB ...

Python HTTP Web Services - urllib, httplib2

Web scraping with Selenium for checking domain availability

REST API : Http Requests for Humans with Flask

Blog app with Tornado

Multithreading ...

Python Network Programming I - Basic Server / Client : A Basics

Python Network Programming I - Basic Server / Client : B File Transfer

Python Network Programming II - Chat Server / Client

Python Network Programming III - Echo Server using socketserver network framework

Python Network Programming IV - Asynchronous Request Handling : ThreadingMixIn and ForkingMixIn

Python Coding Questions I

Python Coding Questions II

Python Coding Questions III

Python Coding Questions IV

Python Coding Questions V

Python Coding Questions VI

Python Coding Questions VII

Python Coding Questions VIII

Python Coding Questions IX

Python Coding Questions X

Image processing with Python image library Pillow

Python and C++ with SIP

PyDev with Eclipse

Matplotlib

Redis with Python

NumPy array basics A

NumPy Matrix and Linear Algebra

Pandas with NumPy and Matplotlib

Celluar Automata

Batch gradient descent algorithm

Longest Common Substring Algorithm

Python Unit Test - TDD using unittest.TestCase class

Simple tool - Google page ranking by keywords

Google App Hello World

Google App webapp2 and WSGI

Uploading Google App Hello World

Python 2 vs Python 3

virtualenv and virtualenvwrapper

Uploading a big file to AWS S3 using boto module

Scheduled stopping and starting an AWS instance

Cloudera CDH5 - Scheduled stopping and starting services

Removing Cloud Files - Rackspace API with curl and subprocess

Checking if a process is running/hanging and stop/run a scheduled task on Windows

Apache Spark 1.3 with PySpark (Spark Python API) Shell

Apache Spark 1.2 Streaming

bottle 0.12.7 - Fast and simple WSGI-micro framework for small web-applications ...

Flask app with Apache WSGI on Ubuntu14/CentOS7 ...

Fabric - streamlining the use of SSH for application deployment

Ansible Quick Preview - Setting up web servers with Nginx, configure enviroments, and deploy an App

Neural Networks with backpropagation for XOR using one hidden layer

NLP - NLTK (Natural Language Toolkit) ...

RabbitMQ(Message broker server) and Celery(Task queue) ...

OpenCV3 and Matplotlib ...

Simple tool - Concatenating slides using FFmpeg ...

iPython - Signal Processing with NumPy

iPython and Jupyter - Install Jupyter, iPython Notebook, drawing with Matplotlib, and publishing it to Github

iPython and Jupyter Notebook with Embedded D3.js

Downloading YouTube videos using youtube-dl embedded with Python

Machine Learning : scikit-learn ...

Django 1.6/1.8 Web Framework ...








Ph.D. / Golden Gate Ave, San Francisco / Seoul National Univ / Carnegie Mellon / UC Berkeley / DevOps / Deep Learning / Visualization

YouTubeMy YouTube channel

Sponsor Open Source development activities and free contents for everyone.

Thank you.

- K Hong



RabbitMQ & Celery Tutorials




Installing RabbitMQ & Celery

Hello World RabbitMQ

Work Queues (Task Queues) : RabbitMQ

Exchanges - Publish/Subscribe : RabbitMQ

Multiple bindings - Routing : RabbitMQ

Queueing Messages using Celery with RabbitMQ Message Broker Server

Sponsor Open Source development activities and free contents for everyone.

Thank you.

- K Hong






Python tutorial



Python Home

Introduction

Running Python Programs (os, sys, import)

Modules and IDLE (Import, Reload, exec)

Object Types - Numbers, Strings, and None

Strings - Escape Sequence, Raw String, and Slicing

Strings - Methods

Formatting Strings - expressions and method calls

Files and os.path

Traversing directories recursively

Subprocess Module

Regular Expressions with Python

Regular Expressions Cheat Sheet

Object Types - Lists

Object Types - Dictionaries and Tuples

Functions def, *args, **kargs

Functions lambda

Built-in Functions

map, filter, and reduce

Decorators

List Comprehension

Sets (union/intersection) and itertools - Jaccard coefficient and shingling to check plagiarism

Hashing (Hash tables and hashlib)

Dictionary Comprehension with zip

The yield keyword

Generator Functions and Expressions

generator.send() method

Iterators

Classes and Instances (__init__, __call__, etc.)

if__name__ == '__main__'

argparse

Exceptions

@static method vs class method

Private attributes and private methods

bits, bytes, bitstring, and constBitStream

json.dump(s) and json.load(s)

Python Object Serialization - pickle and json

Python Object Serialization - yaml and json

Priority queue and heap queue data structure

Graph data structure

Dijkstra's shortest path algorithm

Prim's spanning tree algorithm

Closure

Functional programming in Python

Remote running a local file using ssh

SQLite 3 - A. Connecting to DB, create/drop table, and insert data into a table

SQLite 3 - B. Selecting, updating and deleting data

MongoDB with PyMongo I - Installing MongoDB ...

Python HTTP Web Services - urllib, httplib2

Web scraping with Selenium for checking domain availability

REST API : Http Requests for Humans with Flask

Blog app with Tornado

Multithreading ...

Python Network Programming I - Basic Server / Client : A Basics

Python Network Programming I - Basic Server / Client : B File Transfer

Python Network Programming II - Chat Server / Client

Python Network Programming III - Echo Server using socketserver network framework

Python Network Programming IV - Asynchronous Request Handling : ThreadingMixIn and ForkingMixIn

Python Coding Questions I

Python Coding Questions II

Python Coding Questions III

Python Coding Questions IV

Python Coding Questions V

Python Coding Questions VI

Python Coding Questions VII

Python Coding Questions VIII

Python Coding Questions IX

Python Coding Questions X

Image processing with Python image library Pillow

Python and C++ with SIP

PyDev with Eclipse

Matplotlib

Redis with Python

NumPy array basics A

NumPy Matrix and Linear Algebra

Pandas with NumPy and Matplotlib

Celluar Automata

Batch gradient descent algorithm

Longest Common Substring Algorithm

Python Unit Test - TDD using unittest.TestCase class

Simple tool - Google page ranking by keywords

Google App Hello World

Google App webapp2 and WSGI

Uploading Google App Hello World

Python 2 vs Python 3

virtualenv and virtualenvwrapper

Uploading a big file to AWS S3 using boto module

Scheduled stopping and starting an AWS instance

Cloudera CDH5 - Scheduled stopping and starting services

Removing Cloud Files - Rackspace API with curl and subprocess

Checking if a process is running/hanging and stop/run a scheduled task on Windows

Apache Spark 1.3 with PySpark (Spark Python API) Shell

Apache Spark 1.2 Streaming

bottle 0.12.7 - Fast and simple WSGI-micro framework for small web-applications ...

Flask app with Apache WSGI on Ubuntu14/CentOS7 ...

Selenium WebDriver

Fabric - streamlining the use of SSH for application deployment

Ansible Quick Preview - Setting up web servers with Nginx, configure enviroments, and deploy an App

Neural Networks with backpropagation for XOR using one hidden layer

NLP - NLTK (Natural Language Toolkit) ...

RabbitMQ(Message broker server) and Celery(Task queue) ...

OpenCV3 and Matplotlib ...

Simple tool - Concatenating slides using FFmpeg ...

iPython - Signal Processing with NumPy

iPython and Jupyter - Install Jupyter, iPython Notebook, drawing with Matplotlib, and publishing it to Github

iPython and Jupyter Notebook with Embedded D3.js

Downloading YouTube videos using youtube-dl embedded with Python

Machine Learning : scikit-learn ...

Django 1.6/1.8 Web Framework ...









Contact

BogoToBogo
contactus@bogotobogo.com

Follow Bogotobogo

About Us

contactus@bogotobogo.com

YouTubeMy YouTube channel
Pacific Ave, San Francisco, CA 94115

Pacific Ave, San Francisco, CA 94115

Copyright © 2024, bogotobogo
Design: Web Master