libp2p/protocols/pubsub/pubsub

Search:
Group by:
Source   Edit  

Base interface for pubsub protocols

You can subscribe to a topic, publish something on it, and eventually unsubscribe from it.

Types

InitializationError = object of LPError
Source   Edit  
libp2p_pubsub_broadcast_graft = IgnoredCollector
Source   Edit  
libp2p_pubsub_broadcast_ihave = IgnoredCollector
Source   Edit  
libp2p_pubsub_broadcast_iwant = IgnoredCollector
Source   Edit  
libp2p_pubsub_broadcast_messages = IgnoredCollector
Source   Edit  
libp2p_pubsub_broadcast_prune = IgnoredCollector
Source   Edit  
libp2p_pubsub_messages_published = IgnoredCollector
Source   Edit  
libp2p_pubsub_messages_rebroadcasted = IgnoredCollector
Source   Edit  
libp2p_pubsub_received_graft = IgnoredCollector
Source   Edit  
libp2p_pubsub_received_ihave = IgnoredCollector
Source   Edit  
libp2p_pubsub_received_iwant = IgnoredCollector
Source   Edit  
libp2p_pubsub_received_messages = IgnoredCollector
Source   Edit  
libp2p_pubsub_received_prune = IgnoredCollector
Source   Edit  
libp2p_pubsub_received_subscriptions = IgnoredCollector
Source   Edit  
MsgIdProvider {.public.} = proc (m: Message): Result[MessageId, ValidationResult] {.
    noSideEffect, ...raises: [], gcsafe.}
Source   Edit  
PeerMessageDecodeError = object of CatchableError
Source   Edit  
PublishParams = object
  useCustomConn*: bool ## Can be used to avoid having the node reply to IWANT messages when initially 
                       ## broadcasting a message, it's only after relaying its own message that the
                       ## node will reply to IWANTs
  skipMCache*: bool ## Determines whether an IDontWant message will be sent for the current message
                    ## when it is published if it's a large message. 
  skipIDontWant*: bool
  when defined(libp2p_gossipsub_1_4):
    skipPreamble*: bool ## Determines whether a Preamble message will be sent for the current message
                        ## when it is published if it's a large message
Used to indicate whether a message will be broadcasted using a custom connection defined when instantiating Pubsub, or if it will use the normal connection Source   Edit  
PubSub {.public.} = ref object of LPProtocol
  switch*: Switch
  peerInfo*: PeerInfo
  topics*: Table[string, seq[TopicHandler]]
  peers*: Table[PeerId, PubSubPeer]
  triggerSelf*: bool         ## trigger own local handler on publish
  verifySignature*: bool     ## enable signature verification
  sign*: bool                ## enable message signing
  validators*: Table[string, HashSet[ValidatorHandler]]
  msgIdProvider*: MsgIdProvider ## Turn message into message id (not nil)
  msgSeqno*: uint64
  anonymize*: bool           ## if we omit fromPeer and seqno from RPC messages we send
  subscriptionValidator*: SubscriptionValidator
  topicsHigh*: int           ## the maximum number of topics a peer is allowed to subscribe to
  maxMessageSize*: int ## \
                       ##  the maximum raw message size we'll globally allow
                       ##  for finer tuning, check message size on topic validator
                       ## 
                       ##  sending a big message to a peer with a lower size limit can
                       ##  lead to issues, from descoring to connection drops
                       ## 
                       ##  defaults to 1mB
  rng*: ref HmacDrbgContext
  knownTopics*: HashSet[string]
  customConnCallbacks*: Option[CustomConnectionCallbacks]
Source   Edit  
SubscriptionValidator {.public.} = proc (topic: string): bool {....raises: [],
    gcsafe.}
Every time a peer send us a subscription (even to an unknown topic), we have to store it, which may be an attack vector. This callback can be used to reject topic we're not interested in Source   Edit  
TopicHandler {.public.} = proc (topic: string; data: seq[byte]): Future[void] {.
    ...gcsafe, raises: [].}
Source   Edit  
TopicPair = tuple[topic: string, handler: TopicHandler]
Source   Edit  
ValidatorHandler {.public.} = proc (topic: string; message: Message): Future[
    ValidationResult] {....gcsafe, raises: [].}
Source   Edit  

Consts

KnownLibP2PTopics {.strdefine.} = ""
Source   Edit  

Procs

proc addObserver(p: PubSub; observer: PubSubObserver) {.public, ...raises: [],
    tags: [], forbids: [].}
Source   Edit  
proc broadcast(p: PubSub; sendPeers: auto; msg: RPCMsg; isHighPriority: bool;
               useCustomConn: bool = false) {....raises: [], raises: [].}

This procedure attempts to send a msg (of type RPCMsg) to a specified group of peers in the PubSub network.

Parameters:

  • p: The PubSub instance.
  • sendPeers: An iterable of PubSubPeer instances representing the peers to whom the message should be sent.
  • msg: The RPCMsg instance that contains the message to be broadcast.
  • isHighPriority: A boolean indicating whether the message should be treated as high priority.

High priority messages are sent immediately, while low priority messages are queued and sent only after all high priority messages have been sent.

Source   Edit  
proc handleData(p: PubSub; topic: string; data: seq[byte]): InternalRaisesFuture[
    void, void] {....stackTrace: false, raises: [], gcsafe, raises: [],
                  tags: [RootEffect], forbids: [].}
Source   Edit  
proc init[PubParams: object | bool](P: typedesc[PubSub]; switch: Switch;
                                    triggerSelf: bool = false;
                                    anonymize: bool = false;
                                    verifySignature: bool = true;
                                    sign: bool = true; msgIdProvider: MsgIdProvider = defaultMsgIdProvider;
    subscriptionValidator: SubscriptionValidator = nil;
                                    maxMessageSize: int = 1024 * 1024;
                                    rng: ref HmacDrbgContext = newRng();
                                    parameters: PubParams = false;
    customConnCallbacks: Option[CustomConnectionCallbacks] = none(
    CustomConnectionCallbacks)): P:type {....raises: [InitializationError], public,
    ...raises: [].}
Source   Edit  
proc removeObserver(p: PubSub; observer: PubSubObserver) {.public, ...raises: [],
    tags: [], forbids: [].}
Source   Edit  
proc send(p: PubSub; peer: PubSubPeer; msg: RPCMsg; isHighPriority: bool;
          useCustomConn: bool = false) {....raises: [], raises: [],
    tags: [RootEffect, WriteIOEffect], forbids: [].}

This procedure attempts to send a msg (of type RPCMsg) to the specified remote peer in the PubSub network.

Parameters:

  • p: The PubSub instance.
  • peer: An instance of PubSubPeer representing the peer to whom the message should be sent.
  • msg: The RPCMsg instance that contains the message to be sent.
  • isHighPriority: A boolean indicating whether the message should be treated as high priority.

High priority messages are sent immediately, while low priority messages are queued and sent only after all high priority messages have been sent.

Source   Edit  
proc sendSubs(p: PubSub; peer: PubSubPeer; topics: openArray[string];
              subscribe: bool) {....raises: [], tags: [RootEffect, WriteIOEffect],
                                 forbids: [].}
send subscriptions to remote peer Source   Edit  
proc subscribe(p: PubSub; topic: string; handler: TopicHandler) {.public,
    ...raises: [], tags: [RootEffect, WriteIOEffect], forbids: [].}

subscribe to a topic

topic - a string topic to subscribe to

handler - user provided proc that will be triggered on every received message

Source   Edit  
proc unsubscribe(p: PubSub; topic: string; handler: TopicHandler) {.public,
    ...raises: [], tags: [RootEffect], forbids: [].}
unsubscribe from a topic string Source   Edit  
proc unsubscribe(p: PubSub; topics: openArray[TopicPair]) {.public, ...raises: [],
    tags: [RootEffect], forbids: [].}
unsubscribe from a list of topic handlers Source   Edit  
proc unsubscribeAll(p: PubSub; topic: string) {.public, ...gcsafe, raises: [],
    tags: [WriteIOEffect, RootEffect], forbids: [].}
unsubscribe every handler from topic Source   Edit  
proc updateMetrics(p: PubSub; rpcMsg: RPCMsg) {....raises: [], tags: [],
    forbids: [].}
Source   Edit  

Methods

method addValidator(p: PubSub; topic: varargs[string]; hook: ValidatorHandler) {.
    base, public, ...gcsafe, raises: [], tags: [], forbids: [].}
Add a validator to a topic. Each new message received in this will be sent to hook. hook can return either Accept, Ignore or Reject (which can descore the peer) Source   Edit  
method getOrCreatePeer(p: PubSub; peerId: PeerId; protosToDial: seq[string];
                       protoNegotiated: string = ""): PubSubPeer {.base, ...gcsafe,
    raises: [], tags: [WriteIOEffect, RootEffect], forbids: [].}
Source   Edit  
method handleConn(p: PubSub; conn: Connection; proto: string): InternalRaisesFuture[
    void, (CancelledError,)] {.base, ...stackTrace: false, raises: [], gcsafe,
                               raises: [], tags: [RootEffect], forbids: [].}

handle incoming connections

this proc will:

  1. register a new PubSubPeer for the connection
  2. register a handler with the peer; this handler gets called on every rpc message that the peer receives
  3. ask the peer to subscribe us to every topic that we're interested in
Source   Edit  
method initPubSub(p: PubSub) {.base, ...raises: [InitializationError], raises: [],
                               tags: [], forbids: [].}
perform pubsub initialization Source   Edit  
method onPubSubPeerEvent(p: PubSub; peer: PubSubPeer; event: PubSubPeerEvent) {.
    base, ...gcsafe, raises: [], tags: [RootEffect, WriteIOEffect], forbids: [].}
Source   Edit  
method onTopicSubscription(p: PubSub; topic: string; subscribed: bool) {.base,
    ...gcsafe, raises: [], tags: [RootEffect, WriteIOEffect], forbids: [].}
Source   Edit  
method publish(p: PubSub; topic: string; data: seq[byte];
               publishParams: Option[PublishParams] = none(PublishParams)): InternalRaisesFuture[
    int, void] {.base, public, ...stackTrace: false, raises: [], gcsafe,
                 raises: [], tags: [RootEffect], forbids: [].}

publish to a topic

The return value is the number of neighbours that we attempted to send the message to, excluding self. Note that this is an optimistic number of attempts - the number of peers that actually receive the message might be lower.

Source   Edit  
method removeValidator(p: PubSub; topic: varargs[string]; hook: ValidatorHandler) {.
    base, public, ...gcsafe, raises: [], tags: [], forbids: [].}
Source   Edit  
method rpcHandler(p: PubSub; peer: PubSubPeer; data: sink seq[byte]): InternalRaisesFuture[
    void, (CancelledError, PeerMessageDecodeError, PeerRateLimitError)] {.base,
    ...stackTrace: false, raises: [], gcsafe, raises: [], tags: [RootEffect],
    forbids: [].}
Handler that must be overridden by concrete implementation Source   Edit  
method subscribePeer(p: PubSub; peer: PeerId) {.base, ...gcsafe, raises: [],
    tags: [RootEffect], forbids: [].}
subscribe to remote peer to receive/send pubsub messages Source   Edit  
method unsubscribePeer(p: PubSub; peerId: PeerId) {.base, ...gcsafe, raises: [],
    tags: [WriteIOEffect], forbids: [].}
handle peer disconnects Source   Edit  
method validate(p: PubSub; message: Message): InternalRaisesFuture[
    ValidationResult, (CancelledError,)] {.base, ...stackTrace: false, raises: [],
    gcsafe, raises: [], tags: [RootEffect], forbids: [].}
Source   Edit