Before trying to understand this document or avplumber's source code, read all the README.
Source codes of all nodes reside in src/nodes/ directory. Each .cpp file corresponds to one or more nodes.
There are also subdirectories, included conditionally depending on flags set in Makefile. For example, if your node is useful only for debugging purposes (e.g. it is intentionally corrupting packets to test recovery code), you should put it in debug/ directory.
Directories are scanned using a shell script (generate_node_list) and a find command inside Makefile, when make is run. There is no index file in sources (it is generated automatically when compiling), so you don't need to update anything after adding .cpp file to src/nodes/ directory or subdirectory.
It begins with:
#include "node_common.hpp"
which includes commonly used headers and DECLNODE macros (see below).
Then classes or class templates of nodes follow.
After class or class template is defined, DECLNODE macros need to be used to enable node creation and include it in the autogenerated index.
They serve 2 functions:
generate_node_listscript searches for them in all nodes sources and generates the nodes index- they are parsed by C++ preprocessor (as every macro in C/C++ source code), generating code needed for node creation
Available macros:
DECLNODE(nodetype, classname)- declares a node which will be created whentypeparameter in node's JSON object equalsnodetype. The node must be defined inclassnameclass.DECLNODE_ATD(nodetype, tplname)- declares a node which will be created whentypeparameter in node's JSON object equalsnodetype. The node template must be defined intplnameclass template. It will be specialized for single template argument being the type of data processed in this node. If the node has both input(s) and output(s), input and output types must be the same.ATDmeans automatic type detection.DECLNODE_ATD_RAW(nodetype, tplname)- like above, but auto-detect only raw frame/samples types (av::VideoFrame,av::AudioSamples).DECLNODE_ATD_TYPES(nodetype, tplname, T1, T2, ...)- auto-detect only from the explicit list of types (e.g.av::VideoFrame, EglImageFrame). Untyped creation (type = "nodetype") will select the first matching type from the provided list based on connected edges; typed creation ("nodetype<Ti>") is also registered for eachTi.DECLNODE_ALIAS(nodetype, classname)- declares a different type name for a node already declared usingDECLNODEDECLNODE_ATD_ALIAS(nodetype, tplname)- declares a different type name for a node already declared usingDECLNODE_ATD
In short:
DECLNODEdeclares a node that always uses the same queue typesDECLNODE_ATD(nodetype, tpl)registers a templated node over all queue types.DECLNODE_ATD_RAW(nodetype, tpl)limits auto-detect to raw frame/samples.DECLNODE_ATD_TYPES(nodetype, tpl, T1, T2, ...)limits to an explicit list (e.g.av::VideoFrame, EglImageFrame).
Arguments for blocking nodes:
- good for blocking operations on the file system & network
- good for computationally expensive operations: video & audio encoding, decoding and processing
Arguments for non-blocking nodes:
- less overhead (no context switch when passing data between nodes)
- more predictible timing
avplumber supported only blocking nodes for a very long time, thus even nodes that would benefit from being non-blocking are still implemented as blocking.
Every node class needs to be derived from Node. If the node is blocking, the only virtual function that absolutely needs to be overriden is process. Example: firewall node which doesn't allow packets or frames without PTS to pass:
virtual void process() {
T data = this->source_->get();
if (!data.pts().isValid()) return;
this->sink_->put(data);
}This method will be executed in node thread's main loop - NodeWrapper::threadFunction (src/graph_mgmt.cpp)
If the node is non-blocking, it should be derived from NonBlockingNode and the function to override is processNonBlocking. See src/nodes/realtime.cpp for an example.
If tick_source is not specified, processNonBlocking is called once with argument ticks = false when starting node and it is node's responsibility to add events to the event loop. NonBlockingNode defines some convenience functions (wrappers for EventLoop's methods) that will call processNonBlocking again when certain event happens:
processWhenSignalled(Event &event)- when event is signalled, for example queue has data available or space availablesleepAndProcess(int ms)- aftermsmillisecondsscheduleProcess(av::Timestamp when)- atwhentimestamp from wallclock. Usewallclock.ts()to get current timestamp.yieldAndProcess()- putsprocessNonBlockingat the end of the event loop.
If tick_source is specified, processNonBlocking is called with argument ticks = true on each tick. You can still use the functions mentioned above if you want to run the processNonBlocking sooner than the next tick, but usually it shouldn't be needed.
Example of using signals:
virtual void processNonBlocking(EventLoop& evl, bool ticks) {
T* dataptr = this->source_->peek(0);
if (dataptr==nullptr) {
// no data available in queue
if (!ticks) {
// retry when we have packet in source queue
this->processWhenSignalled(this->edgeSource()->edge()->producedEvent());
}
// if ticks==true, processNonBlocking will be called automatically with next tick
// no need to schedule it
return;
}
T &data = *dataptr;
// ...
// process the data
// ...
// put it in the sink queue:
if (this->sink_->put(data, true)) {
// put returned true, success, remove this packet from the source queue
this->source_->pop();
if (!ticks) {
// process next packet
this->yieldAndProcess();
}
} else {
// put returned false, no space in queue
if (!ticks) {
// retry when we have space in sink
// note that the whole processNonBlocking will be run again
// so it is not a good idea to do it if the processing is stateful (e.g. encoding or decoding)
this->processWhenSignalled(this->edgeSink()->edge()->consumedEvent());
}
}
}If your processing is stateful, you can:
- write your node as a regular blocking node, or
- use the underlying function
evl.asyncWaitAndExecutedirectly. Pass the packet to put in the queue in lambda's captured variables, as a value (not a reference). Do not capturethisorthis->shared_from_this()because it can lead to use-after-free memory corruption or a memory leak. Capture a weak pointer (std::weak_ptr) instead.
Note that the 2 options mentioned above create a hidden queue of size 1 on thread's stack or in the closure. The remaining options are:
- check for space in the output queue before doing processing
- undo data modifications when
putcall fails (that's what we do innodes/speed.cpp)
Note that in the above example, if tick_source is used (ticks==true), only one packet or frame will be processed per tick. If the input FPS is higher than FPS of the tick source, data may accumulate in the queue. Wrap then whole processing in a loop to fix this shortcoming (TODO: wouldn't calling yieldAndProcess without !ticks condition suffice?):
virtual void processNonBlocking(EventLoop& evl, bool ticks) {
do {
T* dataptr = this->source_->peek(0);
if (dataptr==nullptr) {
if (!ticks) {
this->processWhenSignalled(this->edgeSource()->edge()->producedEvent());
}
return; // this will exit the loop if no more input is available right now
}
T &data = *dataptr;
// ...
// process the data
// ...
if (this->sink_->put(data, true)) {
this->source_->pop();
if (!ticks) {
this->yieldAndProcess();
}
} else {
if (!ticks) {
this->processWhenSignalled(this->edgeSink()->edge()->consumedEvent());
}
return; // we don't have space in sink queue so, if ticks==true, retry in next tick
}
} while (ticks); // loop only if have tick_source, otherwise yieldAndProcess will handle processing the next input data
}Static method create needs to be defined in the class. It parses the data from node definition (JSON object) and creates the node object. NodeSISO has a helper static method createCommon. Example of its usage:
static std::shared_ptr<Firewall> create(NodeCreationInfo &nci) {
EdgeManager &edges = nci.edges;
const Parameters ¶ms = nci.params;
return NodeSISO<T, T>::template createCommon<Firewall>(edges, params);
}If your constructor requires additional parameters, they can be passed after edges and params in createCommon.
By the way, if you look at the source code of createCommon or other methods from the file src/graph_base.hpp, and abstract source/sink classes in src/graph_core.hpp, you'll notice that we're creating source and sink objects wrapping the edges. The idea was to have multiple possible implementation of sources and sinks, not only edges (queues). In practice it was never used and some nodes use edges directly, so it should be refactored - simplified, to reduce unnecessary boilerplate code. Pull requests welcome! (if you don't have time for coding but have an idea of possible architecture, that's welcome, too)
Commonly used node source & sink patterns are available as base classes or templates derived from Node:
NodeSingleInput<InputType>- use
this->source_->get()(blocking function) to get packet/frame - or
this->source_->peek(0)(0 to make it non-blocking) andthis->source_->pop()
- use
NodeSingleOutput<OutputType>- use
this->sink_->put(packet_or_frame)to output the packet/frame to the next node in the graph
- use
NodeSISO<InputType, OutputType>(Single Input, Single Output) - combinesNodeSingleInputandNodeSingleOutputNodeMultiInput<InputType>- use
int i = findSourceWithData()to get index of source edge that has packet/frame waiting to be read. If returned value isn't -1, read if usingthis->source_edges_[i]->peek()and callthis->source_edges_[i]->pop()when you've done using it (i.e. you've passed it along to the next node, or it is to be discarded) - if your multi-input node has fixed roles (for example
primaryandauxiliary), treat order insrcas part of node contract and validate expected input count increate
- use
NodeMultiOutputs<OutputType>- use
this->sink_edges_[output_index]->enqueue(packet_or_frame)(blocking function) to output packet/frame - or
this->sink_edges_[output_index]->try_enqueue(packet_or_frame)for non-blocking operation
- use
These bases define their own constructors, you should call them in your constructor or write using BaseTemplate<TemplateArgument>::BaseTemplate; to explicitly use base constructor in derived class.
You should generally use one or more of these bases, since they override virtual methods and implement interfaces important for internal avplumber operation. If you want to implement them on your own (because you need, for example, mixed data types on input or output), look at src/nodes/filters.cpp and src/graph_base.hpp for inspiration.
Thanks to multiple inheritance, you can use most (all?) of these bases in non-blocking nodes, too. See src/nodes/realtime.cpp for example.
Interfaces are defined in src/graph_interfaces.hpp. They're used:
- for node management (
src/graph_mgmt.cpp): main loop (NodeWrapper::threadFunction) flow depends on what interfaces the node implements;IInterruptiblecan be used to stop the node bypassing any locks (node.interruptcommand) - for graph traversal, usually when a node needs to know something about stream metadata.
EdgeBase(insrc/graph_core.hpp) andNodeSingleInput(insrc/graph_base.hpp) containfindNodeUpmethod, which finds the nearest node, up in the graph, implementing a specific interface. Graph topology required by existing nodes is summarized in the README. - for statistics extraction. Signal presence information is taken from
ISentinel. Video & audio parameters are taken fromIDecoder.
As summarized in the README, instance-shared objects can be used for storing state shared between nodes or even between instances.
If your shared object is specific to a node and doesn't need separate commands for controlling it, you can specify it directly within node's .cpp source file (example: src/nodes/sentinel.cpp).
If, on the other hand, it is to be used by multiple nodes and/or you want some commands to be able to create or control it, specify it in a separate .hpp file (example: src/RealTimeTeam.hpp)
Instance-shared struct or class is, by convention, derived from InstanceShared<typename> (CRTP) from src/instance_shared.hpp. However, currently the base class isn't used for anything so it is not necessary (but may make documentation a little more organized once someone brave enough decides to run Doxygen on avplumber sources ;) )
To find the instance-shared object with a given name, use the method from src/instance_shared.hpp:
InstanceSharedObjects<ObjectType>::get(nci.instance, name)It will return the shared pointer (std::shared_ptr<ObjectType>) to the object. If its fields are changed, all nodes will see the change (because it is a regular shared_ptr). So make sure to use atomics and/or mutexes when simultaneous access is likely.
ObjectTypeis the typename of your shared object's struct or classnci.instanceis theInstanceDatareference belonging to current avplumber instance. You can obtain it fromnciwhich is node creation information given as an argument to thecreatestatic function of the node. Or fromNodeManager::instanceData()fromsrc/graph_mgmt.hpp. In case of defining a command insidesrc/avplumber.cpp, usemanager_->instanceData().nameis the unique name of this specific object, used as a key in the hash map.
This assumes that either:
- the instance-shared object has no-arguments constructor. It will be created if it doesn't exist yet.
- or the instance-shared object has already been created by a different static method:
putoremplace
In simpler cases where instance-shared objects can be created implicitly, you don't need to think about putting the objects into the hash map. Just write an argumentless constructor or define desired default values for all fields, and the get static method will do the magic.
However, if you need to create the node with some argument, you have the following options:
using ISOs = InstanceSharedObjects<ObjectType>;
std::shared_ptr<ObjectType> shared_ptr_to_object = std::make_shared<ObjectType>(...arguments for constructor...)
ISOs::put(nci.instance, name, shared_ptr_to_object, policy_if_exists)or
using ISOs = InstanceSharedObjects<ObjectType>;
ISOs::emplace(nci.instance, name, policy_if_exists, ...arguments for constructor...)policy_if_exists can be:
ISOs::PolicyIfExists::Overwrite- overwrite existing objectISOs::PolicyIfExists::Ignore- do not insert new objectISOs::PolicyIfExists::Throw- throw an exception
Probably Ignore is the most useful - it allows to lazily initialize the object when its name appears for the first time and then re-use it. Be aware that if you use put with Ignore policy, the object given to it will not end up in the hash table so you need to re-read the object using get. It is better to use emplace - it doesn't waste CPU cycles creating an object that will be soon discarded.
In some cases you want the instance-shared object to be able to 'call back' the nodes. However there is a risk of 2 types of circular references: compile time, when a node wants to access the shared object, which in turn wants to access the node; and runtime, when shared_ptr loop is created and causes a memory leak. Possible ways of solving it:
- Store the callback (closure) inside the instanced-shared object. If it needs to access the node's data, the closure must contain
std::weak_ptr<NodeType>, notshared_ptror a raw pointer or reference. Otherwise the node object will never be destroyed or use-after-free memory corruption will occur. From the shared object, simply call the callback. - Declare an interface in
src/graph_interfaces.hpp. Implement it in your node. Storestd::weak_ptr<Interface>in the shared object. From the shared object, call methods of the node through the interface. This way, you don't need to split the node's source into separate.cppand.hppfiles.