libp2p/protocols/pubsub/pubsub

    Dark Mode
Search:
Group by:
  Source   Edit

Base interface for pubsub protocols

You can subscribe to a topic, publish something on it, and eventually unsubscribe from it.

Types

InitializationError = object of LPError
  Source   Edit
libp2p_pubsub_broadcast_graft = IgnoredCollector
  Source   Edit
libp2p_pubsub_broadcast_ihave = IgnoredCollector
  Source   Edit
libp2p_pubsub_broadcast_iwant = IgnoredCollector
  Source   Edit
libp2p_pubsub_broadcast_messages = IgnoredCollector
  Source   Edit
libp2p_pubsub_broadcast_prune = IgnoredCollector
  Source   Edit
libp2p_pubsub_messages_published = IgnoredCollector
  Source   Edit
libp2p_pubsub_messages_rebroadcasted = IgnoredCollector
  Source   Edit
libp2p_pubsub_received_graft = IgnoredCollector
  Source   Edit
libp2p_pubsub_received_ihave = IgnoredCollector
  Source   Edit
libp2p_pubsub_received_iwant = IgnoredCollector
  Source   Edit
libp2p_pubsub_received_messages = IgnoredCollector
  Source   Edit
libp2p_pubsub_received_prune = IgnoredCollector
  Source   Edit
libp2p_pubsub_received_subscriptions = IgnoredCollector
  Source   Edit
MsgIdProvider {.public.} = proc (m: Message): Result[MessageId, ValidationResult] {.
    noSideEffect, ...raises: [], gcsafe.}
  Source   Edit
PeerMessageDecodeError = object of CatchableError
  Source   Edit
PubSub {.public.} = ref object of LPProtocol
  switch*: Switch
  peerInfo*: PeerInfo
  topics*: Table[string, seq[TopicHandler]]
  peers*: Table[PeerId, PubSubPeer]
  triggerSelf*: bool         ## trigger own local handler on publish
  verifySignature*: bool     ## enable signature verification
  sign*: bool                ## enable message signing
  validators*: Table[string, HashSet[ValidatorHandler]]
  observers: ref seq[PubSubObserver]
  msgIdProvider*: MsgIdProvider ## Turn message into message id (not nil)
  msgSeqno*: uint64
  anonymize*: bool           ## if we omit fromPeer and seqno from RPC messages we send
  subscriptionValidator*: SubscriptionValidator
  topicsHigh*: int           ## the maximum number of topics a peer is allowed to subscribe to
  maxMessageSize*: int ## \
                       ##  the maximum raw message size we'll globally allow
                       ##  for finer tuning, check message size on topic validator
                       ## 
                       ##  sending a big message to a peer with a lower size limit can
                       ##  lead to issues, from descoring to connection drops
                       ## 
                       ##  defaults to 1mB
  rng*: ref HmacDrbgContext
  knownTopics*: HashSet[string]
  Source   Edit
SubscriptionValidator {.public.} = proc (topic: string): bool {....raises: [],
    gcsafe.}
Every time a peer send us a subscription (even to an unknown topic), we have to store it, which may be an attack vector. This callback can be used to reject topic we're not interested in   Source   Edit
TopicHandler {.public.} = proc (topic: string; data: seq[byte]): Future[void] {.
    ...gcsafe, raises: [].}
  Source   Edit
TopicPair = tuple[topic: string, handler: TopicHandler]
  Source   Edit
ValidatorHandler {.public.} = proc (topic: string; message: Message): Future[
    ValidationResult] {....gcsafe, raises: [].}
  Source   Edit

Procs

proc addObserver(p: PubSub; observer: PubSubObserver) {.public, ...raises: [],
    tags: [].}
  Source   Edit
proc broadcast(p: PubSub; sendPeers: auto; msg: RPCMsg; isHighPriority: bool) {.
    ...raises: [], raises: [].}

This procedure attempts to send a msg (of type RPCMsg) to a specified group of peers in the PubSub network.

Parameters:

  • p: The PubSub instance.
  • sendPeers: An iterable of PubSubPeer instances representing the peers to whom the message should be sent.
  • msg: The RPCMsg instance that contains the message to be broadcast.
  • isHighPriority: A boolean indicating whether the message should be treated as high priority.

High priority messages are sent immediately, while low priority messages are queued and sent only after all high priority messages have been sent.

  Source   Edit
proc handleData(p: PubSub; topic: string; data: seq[byte]): InternalRaisesFuture[
    void, void] {.stackTrace: false, ...raises: [], gcsafe, raises: [], raises: [],
                  tags: [RootEffect].}
  Source   Edit
proc init[PubParams: object | bool](P: typedesc[PubSub]; switch: Switch;
                                    triggerSelf: bool = false;
                                    anonymize: bool = false;
                                    verifySignature: bool = true;
                                    sign: bool = true; msgIdProvider: MsgIdProvider = defaultMsgIdProvider;
    subscriptionValidator: SubscriptionValidator = nil;
                                    maxMessageSize: int = 1024 * 1024;
                                    rng: ref HmacDrbgContext = newRng();
                                    parameters: PubParams = false): P:type {.
    ...raises: [InitializationError], public, ...raises: [].}
  Source   Edit
proc removeObserver(p: PubSub; observer: PubSubObserver) {.public, ...raises: [],
    tags: [].}
  Source   Edit
proc send(p: PubSub; peer: PubSubPeer; msg: RPCMsg; isHighPriority: bool) {.
    ...raises: [], raises: [], tags: [RootEffect, WriteIOEffect, TimeEffect].}

This procedure attempts to send a msg (of type RPCMsg) to the specified remote peer in the PubSub network.

Parameters:

  • p: The PubSub instance.
  • peer: An instance of PubSubPeer representing the peer to whom the message should be sent.
  • msg: The RPCMsg instance that contains the message to be sent.
  • isHighPriority: A boolean indicating whether the message should be treated as high priority.

High priority messages are sent immediately, while low priority messages are queued and sent only after all high priority messages have been sent.

  Source   Edit
proc sendSubs(p: PubSub; peer: PubSubPeer; topics: openArray[string];
              subscribe: bool) {....raises: [],
                                 tags: [RootEffect, WriteIOEffect, TimeEffect].}
send subscriptions to remote peer   Source   Edit
proc subscribe(p: PubSub; topic: string; handler: TopicHandler) {.public,
    ...raises: [], tags: [RootEffect, WriteIOEffect, TimeEffect].}

subscribe to a topic

topic - a string topic to subscribe to

handler - user provided proc that
will be triggered on every received message
  Source   Edit
proc unsubscribe(p: PubSub; topic: string; handler: TopicHandler) {.public,
    ...raises: [], tags: [RootEffect, WriteIOEffect, TimeEffect].}
unsubscribe from a topic string   Source   Edit
proc unsubscribe(p: PubSub; topics: openArray[TopicPair]) {.public, ...raises: [],
    tags: [RootEffect, WriteIOEffect, TimeEffect].}
unsubscribe from a list of topic handlers   Source   Edit
proc unsubscribeAll(p: PubSub; topic: string) {.public, ...gcsafe, raises: [],
    tags: [WriteIOEffect, TimeEffect, RootEffect].}
unsubscribe every handler from topic   Source   Edit
proc updateMetrics(p: PubSub; rpcMsg: RPCMsg) {....raises: [], tags: [].}
  Source   Edit

Methods

method addValidator(p: PubSub; topic: varargs[string]; hook: ValidatorHandler) {.
    base, public, ...gcsafe, raises: [], tags: [RootEffect].}
Add a validator to a topic. Each new message received in this will be sent to hook. hook can return either Accept, Ignore or Reject (which can descore the peer)   Source   Edit
method getOrCreatePeer(p: PubSub; peerId: PeerId; protosToDial: seq[string];
                       protoNegotiated: string = ""): PubSubPeer {.base, ...gcsafe,
    raises: [], tags: [WriteIOEffect, TimeEffect, RootEffect].}
  Source   Edit
method handleConn(p: PubSub; conn: Connection; proto: string): InternalRaisesFuture[
    void, (CancelledError,)] {.base, stackTrace: false, ...raises: [], gcsafe,
                               raises: [], raises: [], tags: [RootEffect].}

handle incoming connections

this proc will:

  1. register a new PubSubPeer for the connection
  2. register a handler with the peer; this handler gets called on every rpc message that the peer receives
  3. ask the peer to subscribe us to every topic that we're interested in
  Source   Edit
method initPubSub(p: PubSub) {.base, ...raises: [InitializationError], raises: [],
                               tags: [].}
perform pubsub initialization   Source   Edit
method onPubSubPeerEvent(p: PubSub; peer: PubSubPeer; event: PubSubPeerEvent) {.
    base, ...gcsafe, raises: [], tags: [RootEffect, WriteIOEffect, TimeEffect].}
  Source   Edit
method onTopicSubscription(p: PubSub; topic: string; subscribed: bool) {.base,
    ...gcsafe, raises: [], tags: [RootEffect, WriteIOEffect, TimeEffect].}
  Source   Edit
method publish(p: PubSub; topic: string; data: seq[byte]): InternalRaisesFuture[
    int, (LPError,)] {.base, public(), stackTrace: false, ...raises: [], gcsafe,
                       raises: [], raises: [], tags: [RootEffect].}

publish to a topic

The return value is the number of neighbours that we attempted to send the message to, excluding self. Note that this is an optimistic number of attempts - the number of peers that actually receive the message might be lower.

  Source   Edit
method removeValidator(p: PubSub; topic: varargs[string]; hook: ValidatorHandler) {.
    base, public, ...raises: [], tags: [RootEffect].}
  Source   Edit
method rpcHandler(p: PubSub; peer: PubSubPeer; data: seq[byte]): InternalRaisesFuture[
    void, (CancelledError, PeerMessageDecodeError, PeerRateLimitError)] {.base,
    stackTrace: false, ...raises: [], gcsafe, raises: [], raises: [],
    tags: [RootEffect].}
Handler that must be overridden by concrete implementation   Source   Edit
method subscribePeer(p: PubSub; peer: PeerId) {.base, ...gcsafe, raises: [],
    tags: [WriteIOEffect, TimeEffect, RootEffect].}
subscribe to remote peer to receive/send pubsub messages   Source   Edit
method unsubscribePeer(p: PubSub; peerId: PeerId) {.base, ...gcsafe, raises: [],
    tags: [WriteIOEffect, TimeEffect].}
handle peer disconnects   Source   Edit
method validate(p: PubSub; message: Message): InternalRaisesFuture[
    ValidationResult, (CancelledError,)] {.base, stackTrace: false, ...raises: [],
    gcsafe, raises: [], raises: [], tags: [RootEffect].}
  Source   Edit