Subscriptions
Subscriptions stream value changes. Direct mode runs the publish loop in PHP; managed mode delivers via Symfony's EventDispatcher. Same surface, different ops profile.
A subscription tells the OPC UA server: send notifications when this value changes or this event fires. The bundle supports two ways of consuming the stream.
Two modes at a glance
| Aspect | Direct mode | Managed mode (with auto-publish) |
|---|---|---|
| Who runs the publish loop? | Your PHP process | The daemon |
| Where do notifications surface? | PSR-14 → Symfony EventDispatcher | PSR-14 → Symfony EventDispatcher |
| Best for | Console / Messenger long-runners | Real-time UIs, broadcasting, declarative subs |
| Survives FPM request boundary? | No | Yes — daemon holds the subscription |
| Setup complexity | Low | Medium (daemon + Supervisor + dispatcher wire) |
The real subscription API
opcua-client exposes three primary methods on
OpcUaClientInterface:
createSubscription(
float $publishingInterval = 500.0,
int $lifetimeCount = 1200,
int $maxKeepAliveCount = 10,
int $maxNotificationsPerPublish = 0,
bool $publishingEnabled = true,
int $priority = 0,
): SubscriptionResult;
createMonitoredItems(
int $subscriptionId,
?array $items = null,
): array|MonitoredItemsBuilder;
createEventMonitoredItem(
int $subscriptionId,
NodeId|string $nodeId,
array $selectFields,
int $clientHandle,
): MonitoredItemCreateResult;
To actually drain the publish queue, you call publish() in a
loop (direct mode), or you let the daemon's auto-publish loop do
it for you (managed mode).
There is no fluent
subscribe(publishingInterval, onData, onEvent), no$sub->monitor(),$sub->monitorEvents(),$sub->run()— these names appeared in earlier drafts and were never part of the real surface.
Direct mode subscription
A Symfony console command that runs a publish loop:
namespace App\Command;
use PhpOpcua\SymfonyOpcua\OpcuaManager;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
#[AsCommand(name: 'app:plc:watch-speed')]
final class WatchSpeedCommand extends Command
{
public function __construct(private readonly OpcuaManager $opcua)
{
parent::__construct();
}
protected function execute(InputInterface $input, OutputInterface $output): int
{
$client = $this->opcua->connect();
$sub = $client->createSubscription(publishingInterval: 500.0);
$client->createMonitoredItems(
$sub->subscriptionId,
[
['nodeId' => 'ns=2;s=Speed', 'clientHandle' => 1],
],
);
// Drain the publish queue — dispatches DataChangeReceived
// on the bundle's EventDispatcher for each notification.
while (true) {
$client->publish(); // blocks for one publish cycle
// Listeners receive PhpOpcua\Client\Event\DataChangeReceived.
// Ctrl+C / SIGTERM signals are honoured between cycles.
}
}
}
Run under Supervisor in production — see
Session manager · Production supervisor.
A pcntl_signal setup is recommended so SIGTERM exits cleanly.
Managed-mode subscription
The daemon holds the subscription; Symfony reacts to events. The same imperative API works — the daemon proxies the calls and runs the publish loop on your behalf.
Subscribe from any service
final class StartMonitoringService
{
public function __construct(private OpcuaManager $opcua) {}
public function start(): void
{
$client = $this->opcua->connect();
$sub = $client->createSubscription(publishingInterval: 500.0);
$client->createMonitoredItems(
$sub->subscriptionId,
[
['nodeId' => 'ns=2;s=Speed', 'clientHandle' => 1],
['nodeId' => 'ns=2;s=Temperature', 'clientHandle' => 2],
],
);
// Done — the daemon keeps the subscription alive
}
}
Or declaratively in YAML
php_opcua_symfony_opcua:
session_manager:
auto_publish: true
connections:
default:
endpoint: '%env(OPCUA_ENDPOINT)%'
auto_connect: true
subscriptions:
-
publishing_interval: 500.0
monitored_items:
- { node_id: 'ns=2;s=Speed', client_handle: 1 }
- { node_id: 'ns=2;s=Temperature', client_handle: 2 }
On daemon start, the subscription is created automatically.
Listen with #[AsEventListener]
namespace App\EventListener;
use App\Entity\PlcReading;
use Doctrine\ORM\EntityManagerInterface;
use PhpOpcua\Client\Event\DataChangeReceived;
use Symfony\Component\EventDispatcher\Attribute\AsEventListener;
final class StoreSpeed
{
private const SPEED_HANDLE = 1;
public function __construct(private EntityManagerInterface $em) {}
#[AsEventListener]
public function __invoke(DataChangeReceived $event): void
{
if ($event->clientHandle !== self::SPEED_HANDLE) return;
$reading = (new PlcReading())
->setNodeId('ns=2;s=Speed')
->setValue($event->dataValue->getValue())
->setStatusCode($event->dataValue->statusCode)
->setSourceAt($event->dataValue->sourceTimestamp);
$this->em->persist($reading);
$this->em->flush();
}
}
DataChangeReceived carries $clientHandle — not $nodeId.
Maintain your handle→node mapping in application code (a const,
a service, or a YAML-loaded clientHandle from subscriptions:).
See Events · Data events.
Subscription parameters
$sub = $client->createSubscription(
publishingInterval: 500.0, // ms — server publish rate (float)
lifetimeCount: 1200, // publishes before tear-down
maxKeepAliveCount: 10, // keep-alives without data
maxNotificationsPerPublish: 0, // 0 = no cap
publishingEnabled: true, // start enabled
priority: 0, // client-side priority
);
For most cases, defaults are fine.
Monitored-item parameters
When calling createMonitoredItems() with an array, each entry
accepts the full MonitoredItemCreateRequest shape (e.g.
samplingInterval, queueSize, discardOldest, plus an
monitoringFilter for deadband). For ergonomic construction, call
createMonitoredItems(null) to obtain a builder:
$client->createMonitoredItems(null)
->subscription($sub->subscriptionId)
->node('ns=2;s=Speed')->clientHandle(1)->samplingInterval(250.0)->queueSize(10)
->execute();
For noisy high-frequency tags, configure a deadband filter (absolute or percent of EU range) so the server suppresses sub-tolerance fluctuations on the wire.
Event subscriptions
OPC UA event subscriptions track alarms / events rather than value changes:
$sub = $client->createSubscription(publishingInterval: 1000.0);
$client->createEventMonitoredItem(
$sub->subscriptionId,
'ns=0;i=2253', // Server node
['EventId', 'Time', 'Severity', 'Message'],
clientHandle: 100,
);
In managed mode these arrive as
PhpOpcua\Client\Event\EventNotificationReceived (and the
alarm-derived events) — see
Events · Alarm events.
Lifecycle — managed mode
The subscription survives:
- HTTP request boundary.
- Symfony worker restarts.
- Application redeploys (daemon is separate).
It does not survive daemon restart. Recovery pattern: a
ExecStartPost hook on the daemon's systemd unit that re-runs
a "resubscribe" Symfony command, or — preferred — use the
declarative auto_connect: true + subscriptions: config so the
daemon re-creates them on startup. See
Session manager · Auto-publish.
Lifecycle — direct mode
A direct-mode subscription dies with the PHP process. The
while (true) { $client->publish(); } loop is the only thing
keeping the OPC UA session alive.
Pattern: a Supervisor-managed Symfony command per subscription target.
Backpressure
Subscriptions can produce data faster than your app processes it.
| Failure mode | Mitigation |
|---|---|
| Server-side queue overflow | Tune discardOldest, queueSize |
| App-side listener blocking | Make listeners async via Messenger |
For non-trivial listener work, route through Messenger — see Events · Async listeners with Messenger.
Memory usage
A typical 100-tag subscription uses ~5-10 MB on top of the
worker. Memory grows linearly with monitored-item count and
queueSize. Set --memory-limit on long-running workers.
When NOT to use subscriptions
- Reading once — use
$client->read(). - Reading every 5 minutes from a scheduled task — a scheduled command is simpler.
- Sub-50 ms latency UI — OPC UA's minimum practical publishing interval is ~50 ms. Tighter than that, use a different protocol.
Where to read next
- Events · Data events — the listener-side reference.
- Recipes · Mercure real-time dashboard — end-to-end real-time UI with Mercure.
- Async listeners with Messenger — scaling the listener side.