Many of us eagerly wait for that notification, “Your favorite YouTuber has uploaded a new video!” Have you ever wondered how this notification system works behind the scenes? How does clicking that subscribe button register you for alerts? And what design pattern powers this kind of behavior?
Let’s delve into the PubSub design pattern, a powerful architecture that ensures you get relevant updates seamlessly!
The publish-subscribe (Pub-Sub) pattern
The publish-subscribe pattern is a messaging architecture where publishers do not directly send messages to subscribers. Instead, messages are published to channels without the need for the publisher to know which subscribers (if any) will receive them. Subscribers indicate interest in specific channels and receive only the messages that are relevant to them, promoting a clear separation of concerns.
This pattern allows for a decoupling of message producers (publishers) and message consumers (subscribers). This decoupling enhances flexibility and scalability, making it easier to add new subscribers or change the way messages are handled without affecting the publishers.
Components of the PubSub pattern
- Subscriber protocol:
class Subscriber[Message](Protocol):
def __call__(self, message: Message) -> None:
...
This protocol defines the interface for a subscriber. Any subscriber must implement the __call__
method, which takes a message as a parameter. By defining the protocol this way, we support the use of both class-based and function-based subscribers. The Subscriber
protocol ensures that any object conforming to it can be used interchangeably.
Note Subscribers can accept any type of message you define. It’s quite common for this to be JSON when working with web applications, as it allows us to send structured messages in a cross-platform way. This is why we have a generic annotation for Message, as this allows us to provide an implementation that is type-agnostic. For more generic information, check out my post here.
- Channel class:
@dataclass(slots=True, repr=False, kw_only=True)
class Channel[Message]:
subscribers: set[Subscriber[Message]] = field(default_factory=set)
def subscribe(self, subscriber: Subscriber[Message]) -> None:
self.subscribers.add(subscriber)
def unsubscribe(self, subscriber: Subscriber[Message]) -> None:
self.subscribers.remove(subscriber)
def publish(self, message: str) -> None:
for subscriber in self.subscribers:
subscriber(message)
The Channel
class represents a communication channel. It maintains a set of subscribers and provides methods to subscribe, unsubscribe, and publish messages to these subscribers. Using a set ensures that each subscriber is unique and avoids duplicates.
subscribe
: Adds a subscriber to the channel.unsubscribe
: Removes a subscriber from the channel.publish
: Sends a message to all subscribers in the channel.
- Publisher class:
@dataclass(slots=True)
class Publisher[Message]:
channels: dict[str, Channel[Message]] = field(default_factory=lambda: defaultdict(Channel))
def publish(self, channel_name: str, message: Message) -> None:
self.channels[channel_name].publish(message)
def publish_all(self, message: Message) -> None:
for channel in self.channels.values():
channel.publish(message)
def subscribe(self, channel_name: str, subscriber: Subscriber) -> None:
self.channels[channel_name].subscribe(subscriber)
def subscribe_all(self, subscriber: Subscriber) -> None:
for channel in self.channels.values():
channel.subscribe(subscriber)
def unsubscribe(self, channel_name: str, subscriber: Subscriber) -> None:
self.channels[channel_name].unsubscribe(subscriber)
def unsubscribe_all(self, subscriber: Subscriber) -> None:
for channel in self.channels.values():
channel.unsubscribe(subscriber)
def __repr__(self) -> str:
return f"{self.__class__.__name__}({self.channels})"
The Publisher
class manages multiple channels. It provides methods to get or create channels, subscribe/unsubscribe subscribers to/from channels, publish messages to a specific channel, or broadcast messages to all channels.
publish
: Publishes a message to a specific channel.publish_all
: Publishes a message to all channels.subscribe
: Subscribes a subscriber to a specific channel.subscribe_all
: Subscribes a subscriber to all channelsunsubscribe
: Unsubscribes a subscriber from a specific channel.unsubscribe_all
: Unsubscribes a subscriber from all channels
Putting it together
from collections import defaultdict
from typing import Protocol
from dataclasses import field, dataclass
class Subscriber[Message](Protocol):
def __call__(self, message: Message) -> None:
...
@dataclass(slots=True, repr=False, kw_only=True)
class Channel[Message]:
subscribers: set[Subscriber[Message]] = field(default_factory=set)
def subscribe(self, subscriber: Subscriber[Message]) -> None:
self.subscribers.add(subscriber)
def unsubscribe(self, subscriber: Subscriber[Message]) -> None:
self.subscribers.remove(subscriber)
def publish(self, message: str) -> None:
for subscriber in self.subscribers:
subscriber(message)
@dataclass(slots=True)
class Publisher[Message]:
channels: dict[str, Channel[Message]] = field(default_factory=lambda: defaultdict(Channel))
def publish(self, channel_name: str, message: Message) -> None:
self.channels[channel_name].publish(message)
def publish_all(self, message: Message) -> None:
for channel in self.channels.values():
channel.publish(message)
def subscribe(self, channel_name: str, subscriber: Subscriber) -> None:
self.channels[channel_name].subscribe(subscriber)
def subscribe_all(self, subscriber: Subscriber) -> None:
for channel in self.channels.values():
channel.subscribe(subscriber)
def unsubscribe(self, channel_name: str, subscriber: Subscriber) -> None:
self.channels[channel_name].unsubscribe(subscriber)
def unsubscribe_all(self, subscriber: Subscriber) -> None:
for channel in self.channels.values():
channel.unsubscribe(subscriber)
def __repr__(self) -> str:
return f"{self.__class__.__name__}({self.channels})"
Usage example
Here’s an example of how to use the publisher and channel classes to manage subscriptions and publish messages:
class EmailSubscriber:
def __init__(self, email: str):
self.email = email
def __call__(self, message: str):
print(f"Sending email to {self.email}: {message}")
def main() -> None:
publisher = Publisher()
email_subscriber = EmailSubscriber('arjan@arjancodes.com')
spam = publisher.channels["spam"]
eggs = publisher.channels["eggs"]
# Subscribing to channels
spam.subscribe(email_subscriber)
eggs.subscribe(email_subscriber)
# Publishing messages
spam.publish('Hello, spam subscribers!')
eggs.publish('Hello, eggs subscribers!')
# Unsubscribe
spam.unsubscribe(email_subscriber)
# Publishing after unsubscription
spam.publish('Hello again, spam subscribers!')
eggs.publish('Hello again, spam subscribers!')
if __name__ == '__main__':
main()
Output:
Sending email to arjan@arjancodes.com: Hello, spam subscribers!
Sending email to arjan@arjancodes.com: Hello, eggs subscribers!
Sending email to arjan@arjancodes.com: Hello again, spam subscribers!
In this example:
- The
email_subscriber
is subscribed to both channels and receives messages published to them. - After unsubscribing from the
spam
channel, messages published tospam
are no longer received byemail_subscriber
.
Final thoughts
This pattern allows for flexible and scalable message distribution systems. Subscribers can represent various entities, such as individual users, devices, or even pools of objects. By separating concerns, we reduce coupling, allowing for more flexibility and reusability.