PubSub package
Introduction
The PubSub package provides a pub-sub API with best-effort, at-most-once delivery guarantee.
If there are no subscribers reachable when a message is published, this message will not be re-transmitted.
If there are multiple subscribers reachable, the nearest subscriber will be notified of the published message in an any-cast style.
Note that the type Union[Iterable[Union[bytes, bytearray, memoryview, str]], str, bytes, bytearray, memoryview]
in the documentation is equivalent to the ndn.name.NonStrictName type.
Process
Under the hood the PubSub module transmits a series of Interest and Data packets:
1. The subscriber calls subscribe(topic, cb). This makes the subcriber listen on
"/<topic>/notify".
2. The publisher invokes publish(topic, msg). This method sends an Interest with name
"/<topic>/notify", which will be routed to a subscriber. The interest carries the following fields in its application parameters:
Publisher prefix: used by the subscriber to reach the publisher in the next step
NotifyNonce: a random bytes string, used by the publisher to de-multiplex among different publications
Forwarding hint (optional): if publisher prefix is not announced in the routing system, publisher can provide a forwarding hint
Meanwhile,
msgis wrapped into a Data packet named"/<pub_prefix>/msg/<topic>/<notify_nonce>". Here, the data name containstopicto establish a binding between topic and nonce, to prevent man-in-the-middle attacks that changes the topic.
3. The subscriber receives the notification interest, constructs a new Interest
"/<pub_prefix>/msg/<topic>/<notify_nonce>" and send it to the publisher.
4. The publisher receives the interest "/<pub_prefix>/msg/<topic>/<notify_nonce>", and returns the
corresponding data.
5. The subscriber receives the data, and invokes cb(data.content) to hand the message to the
application.
The publisher receives the acknowledgement Data packet, and erases the soft state.
Encoding
The notify Interest’s application parameter is encoded as follows:
NotifyAppParam = DATA-TYPE TLV-LENGTH
[PublisherPrefix]
[NotifyNonce]
[PublisherFwdHint]
PublisherPrefix = Name
NotifyNonce = NOTIFY-NONCE-TYPE TLV-LENGTH Bytes
PublisherFwdHint = PUBLISHER-FWD-HINT-TYPE TLV-LENGTH Name
The type number assignments are as follows:
type
Assigned number (decimal)
Assigned number (hexadecimal)
NOTIFY-NONCE-TYPE
128
0x80
PUBLISHER-FWD-HINT-TYPE
211
0xD3
Reference
- class ndn_python_repo.utils.PubSub(app, prefix=None, forwarding_hint=None)
Initialize a
PubSubinstance with identityprefixand can be reached atforwarding_hint. TODO: support msg larger than MTU- Parameters:
app (
NDNApp) – NDNApp.prefix (
Union[Iterable[Union[bytes,bytearray,memoryview,str]],str,bytes,bytearray,memoryview]) – NonStrictName. The identity of thisPubSubinstance. The publisher needs a prefix under which can publish data. Note that you cannot initialize twoPubSubinstances with the sameprefixon the same node, since it will cause double registration error.forwarding_hint (
Union[Iterable[Union[bytes,bytearray,memoryview,str]],str,bytes,bytearray,memoryview]) – NonStrictName. When working as publisher, ifprefixis not reachable, the subscriber can useforwarding_hintto reach the publisher.
- async publish(topic, msg)
Publish
msgtotopic. Make several attempts until the subscriber returns a response.- Parameters:
topic (
Union[Iterable[Union[bytes,bytearray,memoryview,str]],str,bytes,bytearray,memoryview]) – NonStrictName. The topic to publishmsgto.msg (
bytes) – bytes. The message to publish. The pub-sub API does not make any assumptions on the format of this message.
- Returns:
Return true if received response from a subscriber.
- set_base_prefix(prefix)
Avoid registering too many prefixes, by registering
prefixwith NFD. All other prefixes underprefixwill be registered with interest filters, and will not have to be registered with NFD. Need to be called before_wait_for_ready().- Parameters:
prefix (
Union[Iterable[Union[bytes,bytearray,memoryview,str]],str,bytes,bytearray,memoryview]) – NonStrictName. The base prefix to register.
- set_publisher_prefix(prefix)
Set the identify of the publisher after initialization. Need to be called before
_wait_for_ready().- Parameters:
prefix (
Union[Iterable[Union[bytes,bytearray,memoryview,str]],str,bytes,bytearray,memoryview]) – NonStrictName. The identity of thisPubSubinstance.
- subscribe(topic, cb)
Subscribe to
topicwithcb.- Parameters:
topic (
Union[Iterable[Union[bytes,bytearray,memoryview,str]],str,bytes,bytearray,memoryview]) – NonStrictName. The topic to subscribe to.cb (
callable) – callable. A callback that will be called when a message undertopicis received. This function takes onebytesargument.
- unsubscribe(topic)
Unsubscribe from
topic.- Parameters:
topic (
Union[Iterable[Union[bytes,bytearray,memoryview,str]],str,bytes,bytearray,memoryview]) – NonStrictName. The topic to unsubscribe from.
- async wait_for_ready()
Need to be called to wait for pub-sub to be ready.