Protobuf usage
In the previous tutorial, we created a simple "ping" protocol. Most real protocol want their messages to be structured and extensible, which is why most real protocols use protobuf to define their message structures.
Here, we'll create a slightly more complex protocol, which parses & generate protobuf messages. Let's start by importing our dependencies, as usual:
import chronos
import results # for Opt[T]
import protobuf_serialization, protobuf_serialization/pkg/results
import libp2p
Protobuf encoding & decoding
This will be the structure of our messages:
syntax = "proto2";
message MetricList {
message Metric {
string name = 1;
float value = 2;
}
repeated Metric metrics = 1;
}
type
Metric {.proto2.} = object
name {.fieldNumber: 1.}: Opt[string]
value {.fieldNumber: 2.}: Opt[float]
MetricList {.proto2.} = object
metrics {.fieldNumber: 1.}: seq[Metric]
{.push raises: [].}
func validate(m: Metric): Result[void, string] =
# although `name` and `value` are optional, they should always be set.
if m.name.isNone:
return err("invalid Metric: Metric.name must be set")
if m.value.isNone:
return err("invalid Metric: Metric.value must be set")
ok()
proc encode*(m: Metric): seq[byte] =
Protobuf.encode(m)
proc decode*(_: type Metric, buf: seq[byte]): Result[Metric, string] =
try:
let m = Protobuf.decode(buf, Metric)
# any metrics that is received (decoded) should be valid
?m.validate()
ok(m)
except SerializationError as e:
err("failed to decode Metric from protobuf bytes. " & e.msg)
proc encode*(c: MetricList): seq[byte] =
Protobuf.encode(c)
proc decode*(_: type MetricList, buf: seq[byte]): Result[MetricList, string] =
try:
ok(Protobuf.decode(buf, MetricList))
except SerializationError as e:
err("failed to decode MetricList from protobuf bytes. " & e.msg)
Results instead of exceptions
As you can see, this part of the program also uses Results instead of exceptions for error handling.
We start by {.push raises: [].}, which will prevent every non-async function from raising
exceptions.
Then, we use nim-result to convey
errors to function callers. A Result[T, E] will either hold a valid result of type
T, or an error of type E.
You can check if the call succeeded by using res.isOk, and then get the
value using res.value or the error by using res.error.
Another useful tool is ?, which will unpack a Result if it succeeded,
or if it failed, exit the current procedure returning the error.
nim-result is packed with other functionalities that you'll find in the nim-result repository.
Results and exception are generally interchangeable, but have different semantics that you may or may not prefer.
Creating the protocol
We'll next create a protocol, like in the last tutorial, to request these metrics from our host
type
MetricCallback =
proc(): Future[MetricList] {.async: (raises: [CancelledError]), gcsafe.}
MetricProto = ref object of LPProtocol
metricGetter: MetricCallback
proc new(_: typedesc[MetricProto], cb: MetricCallback): MetricProto =
var res: MetricProto
proc handle(stream: Stream, proto: string) {.async: (raises: [CancelledError]).} =
try:
let
metrics = await res.metricGetter()
asProtobuf = metrics.encode()
await stream.writeLp(asProtobuf)
except LPStreamError as exc:
echo "exception in handler", exc.msg
finally:
await stream.close()
res = MetricProto.new(@["/metric-getter/1.0.0"], handle)
res.metricGetter = cb
return res
proc fetch(p: MetricProto, stream: Stream): Future[MetricList] {.async.} =
let protobuf = await stream.readLp(2048)
# tryGet will raise an exception if the Result contains an error.
# It's useful to bridge between exception-world and result-world
return MetricList.decode(protobuf).tryGet()
proc createSwitch(rng: Rng): Switch {.raises: [LPError].} =
return SwitchBuilder
.new()
.withRng(rng)
.withAddress(MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet())
.withTcpTransport()
.withMplex()
.withNoise()
.build()
proc main() {.async.} =
let rng = newRng()
proc randomMetricGenerator(): Future[MetricList] {.async: (raises: [CancelledError]).} =
let metricCount = rng.generate(uint32) mod 16
var metricList: MetricList
for i in 0 ..< metricCount + 1:
metricList.metrics.add(
Metric(
name: Opt.some("metric_" & $i),
value: Opt.some(float(rng.generate(uint16)) / 1000.0),
)
)
metricList
let
metricProto1 = MetricProto.new(randomMetricGenerator)
metricProto2 = MetricProto.new(randomMetricGenerator)
switch1 = createSwitch(rng)
switch2 = createSwitch(rng)
switch1.mount(metricProto1)
await switch1.start()
await switch2.start()
let
stream = await switch2.dial(
switch1.peerInfo.peerId, switch1.peerInfo.addrs, metricProto2.codecs
)
metrics = await metricProto2.fetch(stream)
await stream.close()
for metric in metrics.metrics:
echo metric.name.get(), " = ", metric.value.get()
await allFutures(switch1.stop(), switch2.stop())
# close connections and shutdown all transports
waitFor(main())