Home
Base module

PacketStream

Processes and broadcasts IPackets through a configurable adapter graph.

PacketStream

#include <icy/packetstream.h>
class PacketStream

Defined in src/base/include/icy/packetstream.h:306

Inherits: Stateful< PacketStreamState >

Processes and broadcasts IPackets through a configurable adapter graph.

A PacketStream consists of one or many PacketSources, one or many PacketProcessors, and one or many delegate receivers.

This class enables the developer to setup a processor chain in order to perform arbitrary processing on data packets using interchangeable packet adapters, and pump the output to any delegate function or even another PacketStream.

Note that PacketStream itself inherits from PacketStreamAdapter, so a PacketStream can be the source of another PacketStream.

All PacketStream methods are thread-safe, but once the stream is running you will not be able to attach or detach stream adapters.

In order to synchronize output packets with the application event loop take a look at the SyncPacketQueue class. For lengthy operations you can add an AsyncPacketQueue to the start of the stream to defer processing from the PacketSource thread.

List of all members

NameKindOwner
emittervariableDeclared here
ErrorvariableDeclared here
ClosevariableDeclared here
PacketStreamfunctionDeclared here
~PacketStreamfunctionDeclared here
PacketStreamfunctionDeclared here
operator=functionDeclared here
PacketStreamfunctionDeclared here
operator=functionDeclared here
startfunctionDeclared here
stopfunctionDeclared here
pausefunctionDeclared here
resumefunctionDeclared here
closefunctionDeclared here
resetfunctionDeclared here
activefunctionDeclared here
stoppedfunctionDeclared here
closedfunctionDeclared here
lockfunctionDeclared here
lockedfunctionDeclared here
writefunctionDeclared here
writefunctionDeclared here
writefunctionDeclared here
attachSourcefunctionDeclared here
attachSourcefunctionDeclared here
attachSourcefunctionDeclared here
detachSourcefunctionDeclared here
detachSourcefunctionDeclared here
attachfunctionDeclared here
attachfunctionDeclared here
detachfunctionDeclared here
synchronizeOutputfunctionDeclared here
autoStartfunctionDeclared here
closeOnErrorfunctionDeclared here
errorfunctionDeclared here
namefunctionDeclared here
adaptersfunctionDeclared here
sourcesfunctionDeclared here
processorsfunctionDeclared here
numSourcesfunctionDeclared here
numProcessorsfunctionDeclared here
numAdaptersfunctionDeclared here
getSourcefunctionDeclared here
getProcessorfunctionDeclared here
getProcessorfunctionDeclared here
_mutexvariableDeclared here
_procMutexvariableDeclared here
_namevariableDeclared here
_sourcesvariableDeclared here
_processorsvariableDeclared here
_statesvariableDeclared here
_errorvariableDeclared here
_autoStartvariableDeclared here
_closeOnErrorvariableDeclared here
_wiredvariableDeclared here
setupfunctionDeclared here
teardownfunctionDeclared here
attachSourcefunctionDeclared here
attachfunctionDeclared here
startSourcesfunctionDeclared here
stopSourcesfunctionDeclared here
processfunctionDeclared here
emitfunctionDeclared here
synchronizeStatesfunctionDeclared here
onStateChangefunctionDeclared here
assertCanModifyfunctionDeclared here
handleExceptionfunctionDeclared here
PtrtypedefDeclared here
StateChangevariableInherited from Stateful
StatefulfunctionInherited from Stateful
~StatefulfunctionInherited from Stateful
stateEqualsfunctionInherited from Stateful
stateBetweenfunctionInherited from Stateful
statefunctionInherited from Stateful
statefunctionInherited from Stateful
_statevariableInherited from Stateful
beforeStateChangefunctionInherited from Stateful
onStateChangefunctionInherited from Stateful
setStatefunctionInherited from Stateful
setStatefunctionInherited from Stateful

Inherited from Stateful

KindNameDescription
variableStateChangeSignals when the state changes.
functionStateful inline
function~Stateful virtual inline
functionstateEquals virtual const inlineReturns true if the current state ID equals the given ID.
functionstateBetween virtual const inlineReturns true if the current state ID is in the inclusive range [lid, rid].
functionstate virtual inlineReturns a mutable reference to the current state.
functionstate virtual const inlineReturns a copy of the current state.
variable_state
functionbeforeStateChange virtual inlineOverride to handle pre state change logic. Return false to prevent state change.
functiononStateChange virtual inlineOverride to handle post state change logic.
functionsetState virtual inlineSets the state and sends the state signal if the state change was successful.
functionsetState virtual inlineSets the state and sends the state signal if the state change was successful.

Public Attributes

ReturnNameDescription
PacketSignalemitterSignals to delegates on outgoing packets.
ThreadSignal< void(PacketStream &, const std::exception_ptr &)>ErrorSignals that the PacketStream is in Error state. If stream output is synchronized then the Error signal will be sent from the synchronization context, otherwise it will be sent from the async processor context. See synchronizeOutput()
ThreadSignal< void(PacketStream &)>CloseSignals that the PacketStream is in Close state. This signal is sent immediately via the close() method, and as such will be sent from the calling thread context.

emitter

PacketSignal emitter

Defined in src/base/include/icy/packetstream.h:495

Signals to delegates on outgoing packets.


Error

ThreadSignal< void(PacketStream &, const std::exception_ptr &)> Error

Defined in src/base/include/icy/packetstream.h:501

Signals that the PacketStream is in Error state. If stream output is synchronized then the Error signal will be sent from the synchronization context, otherwise it will be sent from the async processor context. See synchronizeOutput()


Close

ThreadSignal< void(PacketStream &)> Close

Defined in src/base/include/icy/packetstream.h:506

Signals that the PacketStream is in Close state. This signal is sent immediately via the close() method, and as such will be sent from the calling thread context.

Public Methods

ReturnNameDescription
PacketStreamConstruct a named packet stream.
~PacketStream virtualDestroy the stream; calls close() then reset() to release all adapters.
PacketStreamDeleted constructor.
PacketStreamDeleted constructor.
voidstart virtualStart the stream and synchronized sources.
voidstop virtualStop the stream and synchronized sources.
voidpause virtualPause the stream.
voidresume virtualResume the stream.
voidclose virtualClose the stream and transition the internal state to Closed.
voidreset virtualCleanup all managed stream adapters and reset the stream state.
boolactive virtual constReturns true when the stream is in the Active state.
boolstopped virtual constReturns true when the stream is in the Stopping or Stopped state.
boolclosed virtual constReturns true when the stream is in the Closed or Error state.
boollock virtualSets the stream to locked state. In a locked state no new adapters can be added or removed from the stream until the stream is stopped.
boollocked virtual constReturns true is the stream is currently locked.
voidwrite virtualWrite a mutable buffer into the stream without copying. The caller must keep the buffer alive until processing crosses a Cloned/Retained boundary or, if the graph is fully synchronous, until write() returns.
voidwrite virtualWrite a read-only buffer into the stream; data is copied immediately into an owning RawPacket before any adapter sees it.
voidwrite virtualWrite a packet directly into the processing chain.
voidattachSource virtualAttach a bare packet signal as a stream source. The signal is wrapped in an unowned PacketStreamAdapter internally. Useful when the source is another PacketStream::emitter.
voidattachSource virtualAttach a PacketStreamAdapter as a source. Source adapters default to Borrowed retention unless overridden; they must not retain inbound packet storage asynchronously without reporting Cloned or Retained.
voidattachSource inlineAttach a shared_ptr-managed source adapter to the stream. The stream shares ownership; the adapter is kept alive at least until teardown. Throws std::runtime_error if ptr does not derive from PacketStreamAdapter.
booldetachSource virtualDetach a source by its packet signal. Disconnects the signal from the stream's process slot and removes the adapter entry.
booldetachSource virtualDetach a source by its adapter pointer. Disconnects the adapter's emitter from the stream's process slot and removes the entry.
voidattach virtualAttach a packet processor to the stream. Processors are executed in ascending order of their order value. Pass order = -1 to append at the end of the current processor list. Valid range is -1 to 101; values outside this range throw std::invalid_argument. Borrowed processors must finish with the packet before process() returns. Queue/processors that defer work must report Cloned or Retained via retention() so upstream code has an explicit ownership boundary.
voidattach inlineAttach a shared_ptr-managed processor to the stream. The stream shares ownership; the processor is kept alive at least until teardown. Throws std::runtime_error if ptr does not derive from PacketProcessor.
booldetach virtualDetach a packet processor from the stream. The processor's delegate connections are removed; ownership is released if held.
voidsynchronizeOutput virtualSynchronize stream output packets with a libuv event loop. Internally attaches a SyncPacketQueue at order 101 so that all packets emitted by the processor chain are dispatched from the loop thread rather than the source thread. Must be called before start().
voidautoStart virtualEnable or disable auto-start behaviour (default: false). When enabled, the stream automatically transitions to Active state upon receiving the first packet while in the None or Locked state. Must be called before start().
voidcloseOnError virtualEnable or disable close-on-error behaviour (default: true). When enabled, an unhandled processor exception causes the stream to transition from Error to Closed state automatically.
const std::exception_ptr &errorAccessors for the unmanaged client data pointer.
std::stringname constReturn the name assigned to this stream at construction.
PacketAdapterVecadapters constReturns a combined list of all stream sources and processors.
PacketAdapterVecsources constReturns a list of all stream sources.
PacketAdapterVecprocessors constReturns a list of all stream processors.
intnumSources constReturn the number of source adapters currently registered.
intnumProcessors constReturn the number of processor adapters currently registered.
intnumAdapters constReturn the total number of adapters (sources + processors).
AdapterT *getSource inlineReturn the nth source of type AdapterT, or nullptr if not found. Sources are searched in their registered order; only adapters that dynamic_cast successfully to AdapterT are counted.
AdapterT *getProcessor inlineReturn the nth processor of type AdapterT, or nullptr if not found. Processors are searched in their registered order; only adapters that dynamic_cast successfully to AdapterT are counted.
PacketProcessor *getProcessor inlineReturn the processor registered at a specific order value. Unlike the template overload, this searches by order rather than by type and index.

PacketStream

PacketStream(const std::string & name = "")

Defined in src/base/include/icy/packetstream.h:313

Construct a named packet stream.

Parameters

  • name Optional human-readable name used in log output.

~PacketStream

virtual

virtual ~PacketStream()

Defined in src/base/include/icy/packetstream.h:316

Destroy the stream; calls close() then reset() to release all adapters.


PacketStream

PacketStream(const PacketStream &) = delete

Defined in src/base/include/icy/packetstream.h:318

Deleted constructor.


PacketStream

PacketStream(PacketStream &&) = delete

Defined in src/base/include/icy/packetstream.h:320

Deleted constructor.


start

virtual

virtual void start()

Defined in src/base/include/icy/packetstream.h:324

Start the stream and synchronized sources.


stop

virtual

virtual void stop()

Defined in src/base/include/icy/packetstream.h:327

Stop the stream and synchronized sources.


pause

virtual

virtual void pause()

Defined in src/base/include/icy/packetstream.h:330

Pause the stream.


resume

virtual

virtual void resume()

Defined in src/base/include/icy/packetstream.h:333

Resume the stream.


close

virtual

virtual void close()

Defined in src/base/include/icy/packetstream.h:336

Close the stream and transition the internal state to Closed.


reset

virtual

virtual void reset()

Defined in src/base/include/icy/packetstream.h:339

Cleanup all managed stream adapters and reset the stream state.


active

virtual const

virtual bool active() const

Defined in src/base/include/icy/packetstream.h:342

Returns true when the stream is in the Active state.


stopped

virtual const

virtual bool stopped() const

Defined in src/base/include/icy/packetstream.h:345

Returns true when the stream is in the Stopping or Stopped state.


closed

virtual const

virtual bool closed() const

Defined in src/base/include/icy/packetstream.h:348

Returns true when the stream is in the Closed or Error state.


lock

virtual

virtual bool lock()

Defined in src/base/include/icy/packetstream.h:353

Sets the stream to locked state. In a locked state no new adapters can be added or removed from the stream until the stream is stopped.


locked

virtual const

virtual bool locked() const

Defined in src/base/include/icy/packetstream.h:356

Returns true is the stream is currently locked.


write

virtual

virtual void write(char * data, size_t len)

Defined in src/base/include/icy/packetstream.h:364

Write a mutable buffer into the stream without copying. The caller must keep the buffer alive until processing crosses a Cloned/Retained boundary or, if the graph is fully synchronous, until write() returns.

Parameters

  • data Pointer to the raw data buffer.

  • len Number of bytes to process.


write

virtual

virtual void write(const char * data, size_t len)

Defined in src/base/include/icy/packetstream.h:370

Write a read-only buffer into the stream; data is copied immediately into an owning RawPacket before any adapter sees it.

Parameters

  • data Pointer to the raw data buffer.

  • len Number of bytes to process.


write

virtual

virtual void write(IPacket && packet)

Defined in src/base/include/icy/packetstream.h:374

Write a packet directly into the processing chain.

Parameters

  • packet Packet to process; moved into the stream.

attachSource

virtual

virtual void attachSource(PacketSignal & source)

Defined in src/base/include/icy/packetstream.h:380

Attach a bare packet signal as a stream source. The signal is wrapped in an unowned PacketStreamAdapter internally. Useful when the source is another PacketStream::emitter.

Parameters

  • source The packet signal to attach; must outlive the stream.

attachSource

virtual

virtual void attachSource(PacketStreamAdapter * source, bool owned = true, bool syncState = false)

Defined in src/base/include/icy/packetstream.h:390

Attach a PacketStreamAdapter as a source. Source adapters default to Borrowed retention unless overridden; they must not retain inbound packet storage asynchronously without reporting Cloned or Retained.

Parameters

  • source The adapter to attach; must not be null.

  • owned If true the stream takes ownership and deletes the pointer on teardown.

  • syncState If true and source implements basic::Startable, its start()/stop() will be called by startSources()/stopSources().


attachSource

inline

template<class C> inline void attachSource(std::shared_ptr< C > ptr, bool syncState = false)

Defined in src/base/include/icy/packetstream.h:400

Attach a shared_ptr-managed source adapter to the stream. The stream shares ownership; the adapter is kept alive at least until teardown. Throws std::runtime_error if ptr does not derive from PacketStreamAdapter.

Parameters

Parameters


detachSource

virtual

virtual bool detachSource(PacketSignal & source)

Defined in src/base/include/icy/packetstream.h:415

Detach a source by its packet signal. Disconnects the signal from the stream's process slot and removes the adapter entry.

Parameters

Returns

true if the source was found and removed, false otherwise.


detachSource

virtual

virtual bool detachSource(PacketStreamAdapter * source)

Defined in src/base/include/icy/packetstream.h:421

Detach a source by its adapter pointer. Disconnects the adapter's emitter from the stream's process slot and removes the entry.

Parameters

  • source Pointer to the adapter previously attached.

Returns

true if the source was found and removed, false otherwise.


attach

virtual

virtual void attach(PacketProcessor * proc, int order = 0, bool owned = true)

Defined in src/base/include/icy/packetstream.h:433

Attach a packet processor to the stream. Processors are executed in ascending order of their order value. Pass order = -1 to append at the end of the current processor list. Valid range is -1 to 101; values outside this range throw std::invalid_argument. Borrowed processors must finish with the packet before process() returns. Queue/processors that defer work must report Cloned or Retained via retention() so upstream code has an explicit ownership boundary.

Parameters

  • proc The processor to attach; must not be null.

  • order Position in the processing chain (lower runs first).

  • owned If true the stream takes ownership and deletes the pointer on teardown.


attach

inline

template<class C> inline void attach(std::shared_ptr< C > ptr, int order = 0, bool syncState = false)

Defined in src/base/include/icy/packetstream.h:443

Attach a shared_ptr-managed processor to the stream. The stream shares ownership; the processor is kept alive at least until teardown. Throws std::runtime_error if ptr does not derive from PacketProcessor.

Parameters

Parameters

  • ptr Shared pointer to the processor instance.

  • order Position in the processing chain (lower runs first).

  • syncState Reserved for future use; currently unused.


detach

virtual

virtual bool detach(PacketProcessor * proc)

Defined in src/base/include/icy/packetstream.h:459

Detach a packet processor from the stream. The processor's delegate connections are removed; ownership is released if held.

Parameters

  • proc Pointer to the processor to remove.

Returns

true if the processor was found and removed, false otherwise.


synchronizeOutput

virtual

virtual void synchronizeOutput(uv::Loop * loop)

Defined in src/base/include/icy/packetstream.h:466

Synchronize stream output packets with a libuv event loop. Internally attaches a SyncPacketQueue at order 101 so that all packets emitted by the processor chain are dispatched from the loop thread rather than the source thread. Must be called before start().

Parameters

  • loop The event loop to synchronize output onto; must not be null.

autoStart

virtual

virtual void autoStart(bool flag)

Defined in src/base/include/icy/packetstream.h:473

Enable or disable auto-start behaviour (default: false). When enabled, the stream automatically transitions to Active state upon receiving the first packet while in the None or Locked state. Must be called before start().

Parameters

  • flag true to enable auto-start, false to disable.

closeOnError

virtual

virtual void closeOnError(bool flag)

Defined in src/base/include/icy/packetstream.h:479

Enable or disable close-on-error behaviour (default: true). When enabled, an unhandled processor exception causes the stream to transition from Error to Closed state automatically.

Parameters

  • flag true to close the stream on error, false to remain in Error state.

error

const std::exception_ptr & error()

Defined in src/base/include/icy/packetstream.h:488

Accessors for the unmanaged client data pointer.

Return the last captured exception, if the stream is in Error state. The pointer is null when no error has occurred.

Returns

A reference to the stored exception_ptr; empty if no error.


name

const

std::string name() const

Defined in src/base/include/icy/packetstream.h:492

Return the name assigned to this stream at construction.

Returns

The stream name; empty string if none was provided.


adapters

const

PacketAdapterVec adapters() const

Defined in src/base/include/icy/packetstream.h:509

Returns a combined list of all stream sources and processors.


sources

const

PacketAdapterVec sources() const

Defined in src/base/include/icy/packetstream.h:512

Returns a list of all stream sources.


processors

const

PacketAdapterVec processors() const

Defined in src/base/include/icy/packetstream.h:515

Returns a list of all stream processors.


numSources

const

int numSources() const

Defined in src/base/include/icy/packetstream.h:519

Return the number of source adapters currently registered.

Returns

Source count; thread-safe.


numProcessors

const

int numProcessors() const

Defined in src/base/include/icy/packetstream.h:523

Return the number of processor adapters currently registered.

Returns

Processor count; thread-safe.


numAdapters

const

int numAdapters() const

Defined in src/base/include/icy/packetstream.h:527

Return the total number of adapters (sources + processors).

Returns

Combined adapter count; thread-safe.


getSource

inline

template<class AdapterT> inline AdapterT * getSource(int index = 0)

Defined in src/base/include/icy/packetstream.h:536

Return the nth source of type AdapterT, or nullptr if not found. Sources are searched in their registered order; only adapters that dynamic_cast successfully to AdapterT are counted.

Parameters

Parameters

  • index Zero-based index among matching sources (default 0).

Returns

Pointer to the matching adapter, or nullptr.


getProcessor

inline

template<class AdapterT> inline AdapterT * getProcessor(int index = 0)

Defined in src/base/include/icy/packetstream.h:559

Return the nth processor of type AdapterT, or nullptr if not found. Processors are searched in their registered order; only adapters that dynamic_cast successfully to AdapterT are counted.

Parameters

Parameters

  • index Zero-based index among matching processors (default 0).

Returns

Pointer to the matching processor, or nullptr.


getProcessor

inline

inline PacketProcessor * getProcessor(int order = 0)

Defined in src/base/include/icy/packetstream.h:579

Return the processor registered at a specific order value. Unlike the template overload, this searches by order rather than by type and index.

Parameters

  • order The order value to match (default 0).

Returns

Pointer to the matching processor, or nullptr if none registered at that order.

Protected Attributes

ReturnNameDescription
std::mutex_mutex
std::mutex_procMutex
std::string_name
PacketAdapterVec_sources
PacketAdapterVec_processors
std::deque< PacketStreamState >_states
std::exception_ptr_error
bool_autoStart
bool_closeOnError
bool_wired

_mutex

std::mutex _mutex

Defined in src/base/include/icy/packetstream.h:633


_procMutex

std::mutex _procMutex

Defined in src/base/include/icy/packetstream.h:634


_name

std::string _name

Defined in src/base/include/icy/packetstream.h:635


_sources

PacketAdapterVec _sources

Defined in src/base/include/icy/packetstream.h:636


_processors

PacketAdapterVec _processors

Defined in src/base/include/icy/packetstream.h:637


_states

std::deque< PacketStreamState > _states

Defined in src/base/include/icy/packetstream.h:638


_error

std::exception_ptr _error

Defined in src/base/include/icy/packetstream.h:639


_autoStart

bool _autoStart

Defined in src/base/include/icy/packetstream.h:640


_closeOnError

bool _closeOnError

Defined in src/base/include/icy/packetstream.h:641


_wired

bool _wired

Defined in src/base/include/icy/packetstream.h:642

Protected Methods

ReturnNameDescription
voidsetupAttach the source and processor delegate chain.
voidteardownDetach the source and processor delegate chain.
voidattachSource
voidattach
voidstartSourcesStart synchronized sources.
voidstopSourcesStop synchronized sources.
voidprocess virtualProcess incoming packets.
voidemitEmit the final packet to listeners.
voidsynchronizeStatesSynchronize queued states with adapters.
voidonStateChange virtual overrideOverride the Stateful::onStateChange method.
voidassertCanModifyReturns true if the given state ID is queued.
voidhandleExceptionHandle an internal exception.

setup

void setup()

Defined in src/base/include/icy/packetstream.h:594

Attach the source and processor delegate chain.


teardown

void teardown()

Defined in src/base/include/icy/packetstream.h:597

Detach the source and processor delegate chain.


attachSource

void attachSource(PacketAdapterReference::Ptr ref)

Defined in src/base/include/icy/packetstream.h:599


attach

void attach(PacketAdapterReference::Ptr ref)

Defined in src/base/include/icy/packetstream.h:600


startSources

void startSources()

Defined in src/base/include/icy/packetstream.h:603

Start synchronized sources.


stopSources

void stopSources()

Defined in src/base/include/icy/packetstream.h:606

Stop synchronized sources.


process

virtual

virtual void process(IPacket & packet)

Defined in src/base/include/icy/packetstream.h:609

Process incoming packets.


emit

void emit(IPacket & packet)

Defined in src/base/include/icy/packetstream.h:615

Emit the final packet to listeners.

Synchronized signals such as Close and Error are sent from this method. See synchronizeOutput()


synchronizeStates

void synchronizeStates()

Defined in src/base/include/icy/packetstream.h:618

Synchronize queued states with adapters.


onStateChange

virtual override

virtual void onStateChange(PacketStreamState & state, const PacketStreamState & oldState) override

Defined in src/base/include/icy/packetstream.h:621

Override the Stateful::onStateChange method.

Reimplements

assertCanModify

void assertCanModify()

Defined in src/base/include/icy/packetstream.h:628

Returns true if the given state ID is queued.

Asserts that the stream can be modified, ie is not in the Locked, Stopping or Active states.


handleException

void handleException(std::exception & exc)

Defined in src/base/include/icy/packetstream.h:631

Handle an internal exception.

Public Types

NameDescription
Ptr

Ptr

using Ptr = std::shared_ptr< PacketStream >

Defined in src/base/include/icy/packetstream.h:309