symfony-opcua · master
Docs · Session manager

Auto-publish

Auto-publish — the bridge that turns daemon-side OPC UA subscription notifications into Symfony EventDispatcher events. The feature that makes real-time UIs from Symfony feasible.

When the daemon receives subscription notifications, it dispatches them to a PSR-14 event bus — which is Symfony's EventDispatcher. So notifications arrive in your listeners as if you dispatched them yourself.

What it gives you

text data flow
OPC UA Server              Daemon                          Symfony app
   │                        │                                  │
   │ PublishResponse        │                                  │
   ├───────────────────────►│                                  │
   │                        │ PSR-14: DataChangeReceived       │
   │                        ├─────────────────────────────────►│
   │                        │                                  │ EventDispatcher
   │                        │                                  ├─► #[AsEventListener]

In application code, you write a listener:

php src/EventListener/StoreSpeedReading.php
namespace App\EventListener;

use App\Entity\PlcReading;
use App\Service\HandleToNodeMap;
use Doctrine\ORM\EntityManagerInterface;
use PhpOpcua\Client\Event\DataChangeReceived;
use Symfony\Component\EventDispatcher\Attribute\AsEventListener;

final class StoreSpeedReading
{
    public function __construct(
        private EntityManagerInterface $em,
        private HandleToNodeMap $map,
    ) {}

    #[AsEventListener]
    public function __invoke(DataChangeReceived $event): void
    {
        $reading = (new PlcReading())
            ->setNodeId($this->map->resolve($event->clientHandle))
            ->setValue($event->dataValue->getValue())
            ->setStatusCode($event->dataValue->statusCode)
            ->setSourceAt($event->dataValue->sourceTimestamp);

        $this->em->persist($reading);
        $this->em->flush();
    }
}

The daemon does the OPC UA work; your listener just reacts. The event carries $clientHandle (the integer you assigned at monitored-item creation); resolve it back to a logical nodeId through a map your application maintains.

Enabling

Two requirements:

  1. session_manager.auto_publish: true in the bundle config.
  2. The daemon launched via php bin/console opcua:session (so the Symfony EventDispatcher is wired in).
text bundle config
php_opcua_symfony_opcua:
    session_manager:
        auto_publish: '%env(bool:OPCUA_AUTO_PUBLISH)%'

.env:

bash .env
OPCUA_AUTO_PUBLISH=true

That's it. Any subscription created via $opcua->connect()->createSubscription(...) plus createMonitoredItems(...) opts in automatically — once the daemon's auto-publish loop is running, every notification it receives is dispatched on the Symfony EventDispatcher.

Event types

Event class OPC UA notification
PhpOpcua\Client\Event\DataChangeReceived Monitored item data change
PhpOpcua\Client\Event\EventNotificationReceived Alarm / event from event-notifier
PhpOpcua\Client\Event\PublishResponseReceived Any publish response (incl. keep-alives)
PhpOpcua\Client\Event\SubscriptionKeepAlive Server keep-alive on an idle subscription
PhpOpcua\Client\Event\AlarmActivated Alarm transitioned to active
PhpOpcua\Client\Event\AlarmAcknowledged Alarm acknowledged

See Events overview for the field reference on each.

Listener registration

#[AsEventListener] is the modern way:

php auto-discovered
use PhpOpcua\Client\Event\DataChangeReceived;
use Symfony\Component\EventDispatcher\Attribute\AsEventListener;

#[AsEventListener]
public function __invoke(DataChangeReceived $event): void { /* ... */ }

Symfony's autoconfiguration finds the attribute and registers the listener.

Old-style services.yaml works too:

text services.yaml
services:
    App\EventListener\StoreSpeedReading:
        tags:
            - { name: kernel.event_listener, event: 'PhpOpcua\Client\Event\DataChangeReceived' }

Prefer the attribute — it lives next to the listener code.

Subscription lifecycle in managed mode

php end-to-end
// 1. Subscribe (from anywhere — controller, command, listener)
$client = $this->opcua->connect();
$sub = $client->createSubscription(publishingInterval: 500.0);

$client->createMonitoredItems(
    $sub->subscriptionId,
    [
        ['nodeId' => 'ns=2;s=Speed', 'clientHandle' => 1],
    ],
);

// 2. Done — daemon holds the subscription, your listener gets the events.

The subscription survives across requests, worker restarts, and deploys — as long as the daemon stays up.

Declarative subscriptions

For subscriptions you want always running (and recoverable on daemon restart), declare in YAML:

text declarative
php_opcua_symfony_opcua:
    session_manager:
        auto_publish: true
    connections:
        default:
            endpoint:     '%env(OPCUA_ENDPOINT)%'
            auto_connect: true
            subscriptions:
                -
                    publishing_interval: 500.0
                    max_keep_alive_count: 10
                    monitored_items:
                        - { node_id: 'ns=2;s=Speed',        client_handle: 1 }
                        - { node_id: 'ns=2;s=Temperature',  client_handle: 2 }
                        - { node_id: 'ns=2;s=Pressure',     client_handle: 3 }
                    event_monitored_items:
                        - node_id: 'ns=0;i=2253'
                          client_handle: 100
                          select_fields: [EventId, EventType, SourceName, Time, Message, Severity]

When the daemon starts, it:

  1. Connects to the endpoint.
  2. Creates the subscription with the specified parameters.
  3. Adds all monitored items.
  4. Starts publishing.

Per-connection auto_connect: true + at least one subscriptions: entry is the recipe.

Async listeners via Messenger

For listeners that do non-trivial work (DB writes, broadcasts, HTTP), route through Messenger to avoid blocking the dispatcher:

php async listener
namespace App\EventListener;

use App\Message\StoreReading;
use PhpOpcua\Client\Event\DataChangeReceived;
use Symfony\Component\EventDispatcher\Attribute\AsEventListener;
use Symfony\Component\Messenger\MessageBusInterface;

final class DispatchDataChange
{
    public function __construct(private MessageBusInterface $bus) {}

    #[AsEventListener]
    public function __invoke(DataChangeReceived $event): void
    {
        $this->bus->dispatch(new StoreReading(
            clientHandle: $event->clientHandle,
            value:        $event->dataValue->getValue(),
            statusCode:   $event->dataValue->statusCode,
            at:           $event->dataValue->sourceTimestamp,
        ));
    }
}

See Events · Async listeners with Messenger.

Broadcasting via Mercure

Pair with Symfony's Mercure component for real-time browser updates:

php Mercure broadcaster
namespace App\EventListener;

use App\Service\HandleToNodeMap;
use PhpOpcua\Client\Event\DataChangeReceived;
use Symfony\Component\EventDispatcher\Attribute\AsEventListener;
use Symfony\Component\Mercure\HubInterface;
use Symfony\Component\Mercure\Update;

final class BroadcastDataChange
{
    public function __construct(
        private HubInterface $hub,
        private HandleToNodeMap $map,
    ) {}

    #[AsEventListener]
    public function __invoke(DataChangeReceived $event): void
    {
        $nodeId = $this->map->resolve($event->clientHandle);

        $this->hub->publish(new Update(
            topics: ['/plc/' . $nodeId, '/plc/all'],
            data: json_encode([
                'node_id'   => $nodeId,
                'value'     => $event->dataValue->getValue(),
                'good'      => $event->dataValue->statusCode === 0,
                'at'        => $event->dataValue->sourceTimestamp?->format('c'),
            ]),
        ));
    }
}

See Integrations · Mercure and Recipes · Mercure real-time dashboard.

Performance

Workload What to watch
10 monitored items, 1 Hz Trivial
100 items, 1 Hz Watch listener time, queue if > 100 ms
1000 items, 1 Hz Queue listeners; batch DB writes
100 items at 100 Hz Rare — heavy queuing, deadband

The daemon handles thousands of notifications per second easily. The bottleneck is what listeners do.

Recovery after daemon restart

When the daemon restarts, all subscriptions are gone. If you used declarative auto_connect + subscriptions:, they're recreated on the next startup automatically.

For programmatic subscriptions, register a ExecStartPost hook on the systemd unit that calls a Symfony command to re-subscribe:

text systemd snippet
ExecStartPost=/usr/bin/php /var/www/html/bin/console app:opcua:resubscribe

…with the command opening subscriptions from a Doctrine table or hard-coded list.

When NOT to enable auto-publish

  • You don't use subscriptions (only on-demand reads/writes).
  • You run the daemon as a generic IPC service and don't want events leaking into Symfony.
  • You're testing — turn it off to isolate.

The setting is per daemon process, not per connection. Selective filtering happens in listeners (by clientHandle, client instance comparison, severity, etc.).

Documentation