PubSub
package
Introduction
The PubSub
package provides a pub-sub API with best-effort, at-most-once delivery guarantee.
If there are no subscribers reachable when a message is published, this message will not be re-transmitted.
If there are multiple subscribers reachable, the nearest subscriber will be notified of the published message in an any-cast style.
Note that the type Union[Iterable[Union[bytes, bytearray, memoryview, str]], str, bytes, bytearray, memoryview]
in the documentation is equivalent to the ndn.name.NonStrictName
type.
Process
Under the hood the PubSub
module transmits a series of Interest and Data packets:
1. The subscriber calls subscribe(topic, cb)
. This makes the subcriber listen on
"/<topic>/notify"
.
2. The publisher invokes publish(topic, msg)
. This method sends an Interest with name
"/<topic>/notify"
, which will be routed to a subscriber. The interest carries the following fields in its application parameters:
Publisher prefix: used by the subscriber to reach the publisher in the next step
NotifyNonce: a random bytes string, used by the publisher to de-multiplex among different publications
Forwarding hint (optional): if publisher prefix is not announced in the routing system, publisher can provide a forwarding hint
Meanwhile,
msg
is wrapped into a Data packet named"/<pub_prefix>/msg/<topic>/<notify_nonce>"
. Here, the data name containstopic
to establish a binding between topic and nonce, to prevent man-in-the-middle attacks that changes the topic.
3. The subscriber receives the notification interest, constructs a new Interest
"/<pub_prefix>/msg/<topic>/<notify_nonce>"
and send it to the publisher.
4. The publisher receives the interest "/<pub_prefix>/msg/<topic>/<notify_nonce>"
, and returns the
corresponding data.
5. The subscriber receives the data, and invokes cb(data.content)
to hand the message to the
application.
The publisher receives the acknowledgement Data packet, and erases the soft state.
Encoding
The notify Interest’s application parameter is encoded as follows:
NotifyAppParam = DATA-TYPE TLV-LENGTH
[PublisherPrefix]
[NotifyNonce]
[PublisherFwdHint]
PublisherPrefix = Name
NotifyNonce = NOTIFY-NONCE-TYPE TLV-LENGTH Bytes
PublisherFwdHint = PUBLISHER-FWD-HINT-TYPE TLV-LENGTH Name
The type number assignments are as follows:
type
Assigned number (decimal)
Assigned number (hexadecimal)
NOTIFY-NONCE-TYPE
128
0x80
PUBLISHER-FWD-HINT-TYPE
211
0xD3
Reference
- class ndn_python_repo.utils.PubSub(app, prefix=None, forwarding_hint=None)
Initialize a
PubSub
instance with identityprefix
and can be reached atforwarding_hint
. TODO: support msg larger than MTU- Parameters:
app (
NDNApp
) – NDNApp.prefix (
Union
[Iterable
[Union
[bytes
,bytearray
,memoryview
,str
]],str
,bytes
,bytearray
,memoryview
]) – NonStrictName. The identity of thisPubSub
instance. The publisher needs a prefix under which can publish data. Note that you cannot initialize twoPubSub
instances with the sameprefix
on the same node, since it will cause double registration error.forwarding_hint (
Union
[Iterable
[Union
[bytes
,bytearray
,memoryview
,str
]],str
,bytes
,bytearray
,memoryview
]) – NonStrictName. When working as publisher, ifprefix
is not reachable, the subscriber can useforwarding_hint
to reach the publisher.
- async publish(topic, msg)
Publish
msg
totopic
. Make several attempts until the subscriber returns a response.- Parameters:
topic (
Union
[Iterable
[Union
[bytes
,bytearray
,memoryview
,str
]],str
,bytes
,bytearray
,memoryview
]) – NonStrictName. The topic to publishmsg
to.msg (
bytes
) – bytes. The message to publish. The pub-sub API does not make any assumptions on the format of this message.
- Returns:
Return true if received response from a subscriber.
- set_base_prefix(prefix)
Avoid registering too many prefixes, by registering
prefix
with NFD. All other prefixes underprefix
will be registered with interest filters, and will not have to be registered with NFD. Need to be called before_wait_for_ready()
.- Parameters:
prefix (
Union
[Iterable
[Union
[bytes
,bytearray
,memoryview
,str
]],str
,bytes
,bytearray
,memoryview
]) – NonStrictName. The base prefix to register.
- set_publisher_prefix(prefix)
Set the identify of the publisher after initialization. Need to be called before
_wait_for_ready()
.- Parameters:
prefix (
Union
[Iterable
[Union
[bytes
,bytearray
,memoryview
,str
]],str
,bytes
,bytearray
,memoryview
]) – NonStrictName. The identity of thisPubSub
instance.
- subscribe(topic, cb)
Subscribe to
topic
withcb
.- Parameters:
topic (
Union
[Iterable
[Union
[bytes
,bytearray
,memoryview
,str
]],str
,bytes
,bytearray
,memoryview
]) – NonStrictName. The topic to subscribe to.cb (
callable
) – callable. A callback that will be called when a message undertopic
is received. This function takes onebytes
argument.
- unsubscribe(topic)
Unsubscribe from
topic
.- Parameters:
topic (
Union
[Iterable
[Union
[bytes
,bytearray
,memoryview
,str
]],str
,bytes
,bytearray
,memoryview
]) – NonStrictName. The topic to unsubscribe from.
- async wait_for_ready()
Need to be called to wait for pub-sub to be ready.