PacketStream
PacketStream
#include <icy/packetstream.h>class PacketStreamDefined in src/base/include/icy/packetstream.h:306
Inherits:
Stateful< PacketStreamState >
Processes and broadcasts IPackets through a configurable adapter graph.
A PacketStream consists of one or many PacketSources, one or many PacketProcessors, and one or many delegate receivers.
This class enables the developer to setup a processor chain in order to perform arbitrary processing on data packets using interchangeable packet adapters, and pump the output to any delegate function or even another PacketStream.
Note that PacketStream itself inherits from PacketStreamAdapter, so a PacketStream can be the source of another PacketStream.
All PacketStream methods are thread-safe, but once the stream is running you will not be able to attach or detach stream adapters.
In order to synchronize output packets with the application event loop take a look at the SyncPacketQueue class. For lengthy operations you can add an AsyncPacketQueue to the start of the stream to defer processing from the PacketSource thread.
List of all members
| Name | Kind | Owner |
|---|---|---|
emitter | variable | Declared here |
Error | variable | Declared here |
Close | variable | Declared here |
PacketStream | function | Declared here |
~PacketStream | function | Declared here |
PacketStream | function | Declared here |
operator= | function | Declared here |
PacketStream | function | Declared here |
operator= | function | Declared here |
start | function | Declared here |
stop | function | Declared here |
pause | function | Declared here |
resume | function | Declared here |
close | function | Declared here |
reset | function | Declared here |
active | function | Declared here |
stopped | function | Declared here |
closed | function | Declared here |
lock | function | Declared here |
locked | function | Declared here |
write | function | Declared here |
write | function | Declared here |
write | function | Declared here |
attachSource | function | Declared here |
attachSource | function | Declared here |
attachSource | function | Declared here |
detachSource | function | Declared here |
detachSource | function | Declared here |
attach | function | Declared here |
attach | function | Declared here |
detach | function | Declared here |
synchronizeOutput | function | Declared here |
autoStart | function | Declared here |
closeOnError | function | Declared here |
error | function | Declared here |
name | function | Declared here |
adapters | function | Declared here |
sources | function | Declared here |
processors | function | Declared here |
numSources | function | Declared here |
numProcessors | function | Declared here |
numAdapters | function | Declared here |
getSource | function | Declared here |
getProcessor | function | Declared here |
getProcessor | function | Declared here |
_mutex | variable | Declared here |
_procMutex | variable | Declared here |
_name | variable | Declared here |
_sources | variable | Declared here |
_processors | variable | Declared here |
_states | variable | Declared here |
_error | variable | Declared here |
_autoStart | variable | Declared here |
_closeOnError | variable | Declared here |
_wired | variable | Declared here |
setup | function | Declared here |
teardown | function | Declared here |
attachSource | function | Declared here |
attach | function | Declared here |
startSources | function | Declared here |
stopSources | function | Declared here |
process | function | Declared here |
emit | function | Declared here |
synchronizeStates | function | Declared here |
onStateChange | function | Declared here |
assertCanModify | function | Declared here |
handleException | function | Declared here |
Ptr | typedef | Declared here |
StateChange | variable | Inherited from Stateful |
Stateful | function | Inherited from Stateful |
~Stateful | function | Inherited from Stateful |
stateEquals | function | Inherited from Stateful |
stateBetween | function | Inherited from Stateful |
state | function | Inherited from Stateful |
state | function | Inherited from Stateful |
_state | variable | Inherited from Stateful |
beforeStateChange | function | Inherited from Stateful |
onStateChange | function | Inherited from Stateful |
setState | function | Inherited from Stateful |
setState | function | Inherited from Stateful |
Inherited from Stateful
| Kind | Name | Description |
|---|---|---|
variable | StateChange | Signals when the state changes. |
function | Stateful inline | |
function | ~Stateful virtual inline | |
function | stateEquals virtual const inline | Returns true if the current state ID equals the given ID. |
function | stateBetween virtual const inline | Returns true if the current state ID is in the inclusive range [lid, rid]. |
function | state virtual inline | Returns a mutable reference to the current state. |
function | state virtual const inline | Returns a copy of the current state. |
variable | _state | |
function | beforeStateChange virtual inline | Override to handle pre state change logic. Return false to prevent state change. |
function | onStateChange virtual inline | Override to handle post state change logic. |
function | setState virtual inline | Sets the state and sends the state signal if the state change was successful. |
function | setState virtual inline | Sets the state and sends the state signal if the state change was successful. |
Public Attributes
| Return | Name | Description |
|---|---|---|
PacketSignal | emitter | Signals to delegates on outgoing packets. |
ThreadSignal< void(PacketStream &, const std::exception_ptr &)> | Error | Signals that the PacketStream is in Error state. If stream output is synchronized then the Error signal will be sent from the synchronization context, otherwise it will be sent from the async processor context. See synchronizeOutput() |
ThreadSignal< void(PacketStream &)> | Close | Signals that the PacketStream is in Close state. This signal is sent immediately via the close() method, and as such will be sent from the calling thread context. |
emitter
PacketSignal emitterDefined in src/base/include/icy/packetstream.h:495
Signals to delegates on outgoing packets.
Error
ThreadSignal< void(PacketStream &, const std::exception_ptr &)> ErrorDefined in src/base/include/icy/packetstream.h:501
Signals that the PacketStream is in Error state. If stream output is synchronized then the Error signal will be sent from the synchronization context, otherwise it will be sent from the async processor context. See synchronizeOutput()
Close
ThreadSignal< void(PacketStream &)> CloseDefined in src/base/include/icy/packetstream.h:506
Signals that the PacketStream is in Close state. This signal is sent immediately via the close() method, and as such will be sent from the calling thread context.
Public Methods
| Return | Name | Description |
|---|---|---|
PacketStream | Construct a named packet stream. | |
~PacketStream virtual | Destroy the stream; calls close() then reset() to release all adapters. | |
PacketStream | Deleted constructor. | |
PacketStream | Deleted constructor. | |
void | start virtual | Start the stream and synchronized sources. |
void | stop virtual | Stop the stream and synchronized sources. |
void | pause virtual | Pause the stream. |
void | resume virtual | Resume the stream. |
void | close virtual | Close the stream and transition the internal state to Closed. |
void | reset virtual | Cleanup all managed stream adapters and reset the stream state. |
bool | active virtual const | Returns true when the stream is in the Active state. |
bool | stopped virtual const | Returns true when the stream is in the Stopping or Stopped state. |
bool | closed virtual const | Returns true when the stream is in the Closed or Error state. |
bool | lock virtual | Sets the stream to locked state. In a locked state no new adapters can be added or removed from the stream until the stream is stopped. |
bool | locked virtual const | Returns true is the stream is currently locked. |
void | write virtual | Write a mutable buffer into the stream without copying. The caller must keep the buffer alive until processing crosses a Cloned/Retained boundary or, if the graph is fully synchronous, until write() returns. |
void | write virtual | Write a read-only buffer into the stream; data is copied immediately into an owning RawPacket before any adapter sees it. |
void | write virtual | Write a packet directly into the processing chain. |
void | attachSource virtual | Attach a bare packet signal as a stream source. The signal is wrapped in an unowned PacketStreamAdapter internally. Useful when the source is another PacketStream::emitter. |
void | attachSource virtual | Attach a PacketStreamAdapter as a source. Source adapters default to Borrowed retention unless overridden; they must not retain inbound packet storage asynchronously without reporting Cloned or Retained. |
void | attachSource inline | Attach a shared_ptr-managed source adapter to the stream. The stream shares ownership; the adapter is kept alive at least until teardown. Throws std::runtime_error if ptr does not derive from PacketStreamAdapter. |
bool | detachSource virtual | Detach a source by its packet signal. Disconnects the signal from the stream's process slot and removes the adapter entry. |
bool | detachSource virtual | Detach a source by its adapter pointer. Disconnects the adapter's emitter from the stream's process slot and removes the entry. |
void | attach virtual | Attach a packet processor to the stream. Processors are executed in ascending order of their order value. Pass order = -1 to append at the end of the current processor list. Valid range is -1 to 101; values outside this range throw std::invalid_argument. Borrowed processors must finish with the packet before process() returns. Queue/processors that defer work must report Cloned or Retained via retention() so upstream code has an explicit ownership boundary. |
void | attach inline | Attach a shared_ptr-managed processor to the stream. The stream shares ownership; the processor is kept alive at least until teardown. Throws std::runtime_error if ptr does not derive from PacketProcessor. |
bool | detach virtual | Detach a packet processor from the stream. The processor's delegate connections are removed; ownership is released if held. |
void | synchronizeOutput virtual | Synchronize stream output packets with a libuv event loop. Internally attaches a SyncPacketQueue at order 101 so that all packets emitted by the processor chain are dispatched from the loop thread rather than the source thread. Must be called before start(). |
void | autoStart virtual | Enable or disable auto-start behaviour (default: false). When enabled, the stream automatically transitions to Active state upon receiving the first packet while in the None or Locked state. Must be called before start(). |
void | closeOnError virtual | Enable or disable close-on-error behaviour (default: true). When enabled, an unhandled processor exception causes the stream to transition from Error to Closed state automatically. |
const std::exception_ptr & | error | Accessors for the unmanaged client data pointer. |
std::string | name const | Return the name assigned to this stream at construction. |
PacketAdapterVec | adapters const | Returns a combined list of all stream sources and processors. |
PacketAdapterVec | sources const | Returns a list of all stream sources. |
PacketAdapterVec | processors const | Returns a list of all stream processors. |
int | numSources const | Return the number of source adapters currently registered. |
int | numProcessors const | Return the number of processor adapters currently registered. |
int | numAdapters const | Return the total number of adapters (sources + processors). |
AdapterT * | getSource inline | Return the nth source of type AdapterT, or nullptr if not found. Sources are searched in their registered order; only adapters that dynamic_cast successfully to AdapterT are counted. |
AdapterT * | getProcessor inline | Return the nth processor of type AdapterT, or nullptr if not found. Processors are searched in their registered order; only adapters that dynamic_cast successfully to AdapterT are counted. |
PacketProcessor * | getProcessor inline | Return the processor registered at a specific order value. Unlike the template overload, this searches by order rather than by type and index. |
PacketStream
PacketStream(const std::string & name = "")Defined in src/base/include/icy/packetstream.h:313
Construct a named packet stream.
Parameters
nameOptional human-readable name used in log output.
~PacketStream
virtual
virtual ~PacketStream()Defined in src/base/include/icy/packetstream.h:316
Destroy the stream; calls close() then reset() to release all adapters.
PacketStream
PacketStream(const PacketStream &) = deleteDefined in src/base/include/icy/packetstream.h:318
Deleted constructor.
PacketStream
PacketStream(PacketStream &&) = deleteDefined in src/base/include/icy/packetstream.h:320
Deleted constructor.
start
virtual
virtual void start()Defined in src/base/include/icy/packetstream.h:324
Start the stream and synchronized sources.
stop
virtual
virtual void stop()Defined in src/base/include/icy/packetstream.h:327
Stop the stream and synchronized sources.
pause
virtual
virtual void pause()Defined in src/base/include/icy/packetstream.h:330
Pause the stream.
resume
virtual
virtual void resume()Defined in src/base/include/icy/packetstream.h:333
Resume the stream.
close
virtual
virtual void close()Defined in src/base/include/icy/packetstream.h:336
Close the stream and transition the internal state to Closed.
reset
virtual
virtual void reset()Defined in src/base/include/icy/packetstream.h:339
Cleanup all managed stream adapters and reset the stream state.
active
virtual const
virtual bool active() constDefined in src/base/include/icy/packetstream.h:342
Returns true when the stream is in the Active state.
stopped
virtual const
virtual bool stopped() constDefined in src/base/include/icy/packetstream.h:345
Returns true when the stream is in the Stopping or Stopped state.
closed
virtual const
virtual bool closed() constDefined in src/base/include/icy/packetstream.h:348
Returns true when the stream is in the Closed or Error state.
lock
virtual
virtual bool lock()Defined in src/base/include/icy/packetstream.h:353
Sets the stream to locked state. In a locked state no new adapters can be added or removed from the stream until the stream is stopped.
locked
virtual const
virtual bool locked() constDefined in src/base/include/icy/packetstream.h:356
Returns true is the stream is currently locked.
write
virtual
virtual void write(char * data, size_t len)Defined in src/base/include/icy/packetstream.h:364
Write a mutable buffer into the stream without copying. The caller must keep the buffer alive until processing crosses a Cloned/Retained boundary or, if the graph is fully synchronous, until write() returns.
Parameters
dataPointer to the raw data buffer.lenNumber of bytes to process.
write
virtual
virtual void write(const char * data, size_t len)Defined in src/base/include/icy/packetstream.h:370
Write a read-only buffer into the stream; data is copied immediately into an owning RawPacket before any adapter sees it.
Parameters
dataPointer to the raw data buffer.lenNumber of bytes to process.
write
virtual
virtual void write(IPacket && packet)Defined in src/base/include/icy/packetstream.h:374
Write a packet directly into the processing chain.
Parameters
packetPacket to process; moved into the stream.
attachSource
virtual
virtual void attachSource(PacketSignal & source)Defined in src/base/include/icy/packetstream.h:380
Attach a bare packet signal as a stream source. The signal is wrapped in an unowned PacketStreamAdapter internally. Useful when the source is another PacketStream::emitter.
Parameters
sourceThe packet signal to attach; must outlive the stream.
attachSource
virtual
virtual void attachSource(PacketStreamAdapter * source, bool owned = true, bool syncState = false)Defined in src/base/include/icy/packetstream.h:390
Attach a PacketStreamAdapter as a source. Source adapters default to Borrowed retention unless overridden; they must not retain inbound packet storage asynchronously without reporting Cloned or Retained.
Parameters
sourceThe adapter to attach; must not be null.ownedIf true the stream takes ownership and deletes the pointer on teardown.syncStateIf true andsourceimplements basic::Startable, its start()/stop() will be called by startSources()/stopSources().
attachSource
inline
template<class C> inline void attachSource(std::shared_ptr< C > ptr, bool syncState = false)Defined in src/base/include/icy/packetstream.h:400
Attach a shared_ptr-managed source adapter to the stream. The stream shares ownership; the adapter is kept alive at least until teardown. Throws std::runtime_error if ptr does not derive from PacketStreamAdapter.
Parameters
CAdapter type; must derive from PacketStreamAdapter.
Parameters
ptrShared pointer to the adapter instance.syncStateIf true andptrimplements basic::Startable, its start()/stop() will be called by startSources()/stopSources().
detachSource
virtual
virtual bool detachSource(PacketSignal & source)Defined in src/base/include/icy/packetstream.h:415
Detach a source by its packet signal. Disconnects the signal from the stream's process slot and removes the adapter entry.
Parameters
sourceThe packet signal previously attached via attachSource(PacketSignal&).
Returns
true if the source was found and removed, false otherwise.
detachSource
virtual
virtual bool detachSource(PacketStreamAdapter * source)Defined in src/base/include/icy/packetstream.h:421
Detach a source by its adapter pointer. Disconnects the adapter's emitter from the stream's process slot and removes the entry.
Parameters
sourcePointer to the adapter previously attached.
Returns
true if the source was found and removed, false otherwise.
attach
virtual
virtual void attach(PacketProcessor * proc, int order = 0, bool owned = true)Defined in src/base/include/icy/packetstream.h:433
Attach a packet processor to the stream. Processors are executed in ascending order of their order value. Pass order = -1 to append at the end of the current processor list. Valid range is -1 to 101; values outside this range throw std::invalid_argument. Borrowed processors must finish with the packet before process() returns. Queue/processors that defer work must report Cloned or Retained via retention() so upstream code has an explicit ownership boundary.
Parameters
procThe processor to attach; must not be null.orderPosition in the processing chain (lower runs first).ownedIf true the stream takes ownership and deletes the pointer on teardown.
attach
inline
template<class C> inline void attach(std::shared_ptr< C > ptr, int order = 0, bool syncState = false)Defined in src/base/include/icy/packetstream.h:443
Attach a shared_ptr-managed processor to the stream. The stream shares ownership; the processor is kept alive at least until teardown. Throws std::runtime_error if ptr does not derive from PacketProcessor.
Parameters
CProcessor type; must derive from PacketProcessor.
Parameters
ptrShared pointer to the processor instance.orderPosition in the processing chain (lower runs first).syncStateReserved for future use; currently unused.
detach
virtual
virtual bool detach(PacketProcessor * proc)Defined in src/base/include/icy/packetstream.h:459
Detach a packet processor from the stream. The processor's delegate connections are removed; ownership is released if held.
Parameters
procPointer to the processor to remove.
Returns
true if the processor was found and removed, false otherwise.
synchronizeOutput
virtual
virtual void synchronizeOutput(uv::Loop * loop)Defined in src/base/include/icy/packetstream.h:466
Synchronize stream output packets with a libuv event loop. Internally attaches a SyncPacketQueue at order 101 so that all packets emitted by the processor chain are dispatched from the loop thread rather than the source thread. Must be called before start().
Parameters
loopThe event loop to synchronize output onto; must not be null.
autoStart
virtual
virtual void autoStart(bool flag)Defined in src/base/include/icy/packetstream.h:473
Enable or disable auto-start behaviour (default: false). When enabled, the stream automatically transitions to Active state upon receiving the first packet while in the None or Locked state. Must be called before start().
Parameters
flagtrue to enable auto-start, false to disable.
closeOnError
virtual
virtual void closeOnError(bool flag)Defined in src/base/include/icy/packetstream.h:479
Enable or disable close-on-error behaviour (default: true). When enabled, an unhandled processor exception causes the stream to transition from Error to Closed state automatically.
Parameters
flagtrue to close the stream on error, false to remain in Error state.
error
const std::exception_ptr & error()Defined in src/base/include/icy/packetstream.h:488
Accessors for the unmanaged client data pointer.
Return the last captured exception, if the stream is in Error state. The pointer is null when no error has occurred.
Returns
A reference to the stored exception_ptr; empty if no error.
name
const
std::string name() constDefined in src/base/include/icy/packetstream.h:492
Return the name assigned to this stream at construction.
Returns
The stream name; empty string if none was provided.
adapters
const
PacketAdapterVec adapters() constDefined in src/base/include/icy/packetstream.h:509
Returns a combined list of all stream sources and processors.
sources
const
PacketAdapterVec sources() constDefined in src/base/include/icy/packetstream.h:512
Returns a list of all stream sources.
processors
const
PacketAdapterVec processors() constDefined in src/base/include/icy/packetstream.h:515
Returns a list of all stream processors.
numSources
const
int numSources() constDefined in src/base/include/icy/packetstream.h:519
Return the number of source adapters currently registered.
Returns
Source count; thread-safe.
numProcessors
const
int numProcessors() constDefined in src/base/include/icy/packetstream.h:523
Return the number of processor adapters currently registered.
Returns
Processor count; thread-safe.
numAdapters
const
int numAdapters() constDefined in src/base/include/icy/packetstream.h:527
Return the total number of adapters (sources + processors).
Returns
Combined adapter count; thread-safe.
getSource
inline
template<class AdapterT> inline AdapterT * getSource(int index = 0)Defined in src/base/include/icy/packetstream.h:536
Return the nth source of type AdapterT, or nullptr if not found. Sources are searched in their registered order; only adapters that dynamic_cast successfully to AdapterT are counted.
Parameters
AdapterTTarget type; must derive from PacketStreamAdapter.
Parameters
indexZero-based index among matching sources (default 0).
Returns
Pointer to the matching adapter, or nullptr.
getProcessor
inline
template<class AdapterT> inline AdapterT * getProcessor(int index = 0)Defined in src/base/include/icy/packetstream.h:559
Return the nth processor of type AdapterT, or nullptr if not found. Processors are searched in their registered order; only adapters that dynamic_cast successfully to AdapterT are counted.
Parameters
AdapterTTarget type; must derive from PacketProcessor.
Parameters
indexZero-based index among matching processors (default 0).
Returns
Pointer to the matching processor, or nullptr.
getProcessor
inline
inline PacketProcessor * getProcessor(int order = 0)Defined in src/base/include/icy/packetstream.h:579
Return the processor registered at a specific order value. Unlike the template overload, this searches by order rather than by type and index.
Parameters
orderThe order value to match (default 0).
Returns
Pointer to the matching processor, or nullptr if none registered at that order.
Protected Attributes
| Return | Name | Description |
|---|---|---|
std::mutex | _mutex | |
std::mutex | _procMutex | |
std::string | _name | |
PacketAdapterVec | _sources | |
PacketAdapterVec | _processors | |
std::deque< PacketStreamState > | _states | |
std::exception_ptr | _error | |
bool | _autoStart | |
bool | _closeOnError | |
bool | _wired |
_mutex
std::mutex _mutexDefined in src/base/include/icy/packetstream.h:633
_procMutex
std::mutex _procMutexDefined in src/base/include/icy/packetstream.h:634
_name
std::string _nameDefined in src/base/include/icy/packetstream.h:635
_sources
PacketAdapterVec _sourcesDefined in src/base/include/icy/packetstream.h:636
_processors
PacketAdapterVec _processorsDefined in src/base/include/icy/packetstream.h:637
_states
std::deque< PacketStreamState > _statesDefined in src/base/include/icy/packetstream.h:638
_error
std::exception_ptr _errorDefined in src/base/include/icy/packetstream.h:639
_autoStart
bool _autoStartDefined in src/base/include/icy/packetstream.h:640
_closeOnError
bool _closeOnErrorDefined in src/base/include/icy/packetstream.h:641
_wired
bool _wiredDefined in src/base/include/icy/packetstream.h:642
Protected Methods
| Return | Name | Description |
|---|---|---|
void | setup | Attach the source and processor delegate chain. |
void | teardown | Detach the source and processor delegate chain. |
void | attachSource | |
void | attach | |
void | startSources | Start synchronized sources. |
void | stopSources | Stop synchronized sources. |
void | process virtual | Process incoming packets. |
void | emit | Emit the final packet to listeners. |
void | synchronizeStates | Synchronize queued states with adapters. |
void | onStateChange virtual override | Override the Stateful::onStateChange method. |
void | assertCanModify | Returns true if the given state ID is queued. |
void | handleException | Handle an internal exception. |
setup
void setup()Defined in src/base/include/icy/packetstream.h:594
Attach the source and processor delegate chain.
teardown
void teardown()Defined in src/base/include/icy/packetstream.h:597
Detach the source and processor delegate chain.
attachSource
void attachSource(PacketAdapterReference::Ptr ref)Defined in src/base/include/icy/packetstream.h:599
attach
void attach(PacketAdapterReference::Ptr ref)Defined in src/base/include/icy/packetstream.h:600
startSources
void startSources()Defined in src/base/include/icy/packetstream.h:603
Start synchronized sources.
stopSources
void stopSources()Defined in src/base/include/icy/packetstream.h:606
Stop synchronized sources.
process
virtual
virtual void process(IPacket & packet)Defined in src/base/include/icy/packetstream.h:609
Process incoming packets.
emit
void emit(IPacket & packet)Defined in src/base/include/icy/packetstream.h:615
Emit the final packet to listeners.
Synchronized signals such as Close and Error are sent from this method. See synchronizeOutput()
synchronizeStates
void synchronizeStates()Defined in src/base/include/icy/packetstream.h:618
Synchronize queued states with adapters.
onStateChange
virtual override
virtual void onStateChange(PacketStreamState & state, const PacketStreamState & oldState) overrideDefined in src/base/include/icy/packetstream.h:621
Override the Stateful::onStateChange method.
Reimplements
assertCanModify
void assertCanModify()Defined in src/base/include/icy/packetstream.h:628
Returns true if the given state ID is queued.
Asserts that the stream can be modified, ie is not in the Locked, Stopping or Active states.
handleException
void handleException(std::exception & exc)Defined in src/base/include/icy/packetstream.h:631
Handle an internal exception.
Public Types
| Name | Description |
|---|---|
Ptr |
Ptr
using Ptr = std::shared_ptr< PacketStream >