symfony-opcua · master
Docs · Events

Data events

DataChangeReceived — the most common subscription event. Field reference, listener patterns, persistence to Doctrine, broadcast via Mercure, and the throughput rules.

DataChangeReceived fires every time a publish response carries a monitored-item value change. In a typical real-time UI it's the event that drives everything.

The full FQN is PhpOpcua\Client\Event\DataChangeReceived (singular Event\).

Note

Publish-loop driven. The event only fires while something is calling publish() on the subscription. In managed mode with auto-publish the daemon drives the loop. In direct mode your own code drives it (e.g. a long-running console worker).

Field reference

DataChangeReceived carries these public readonly fields (opcua-client/src/Event/DataChangeReceived.php):

Field Type Meaning
$client OpcUaClientInterface The live client instance
$subscriptionId int Server subscription id
$sequenceNumber int Publish sequence number
$clientHandle int The handle you assigned at item creation
$dataValue DataValue The reading

The DataValue carries getValue(), statusCode, sourceTimestamp, serverTimestamp, type.

DataChangeReceived does not carry a nodeId field. It carries clientHandle — the integer you supplied when calling createMonitoredItems(). Keep a clientHandle => nodeId map on your application side to route events back to a logical tag.

Simple log listener

php log listener
namespace App\EventListener;

use PhpOpcua\Client\Event\DataChangeReceived;
use Psr\Log\LoggerInterface;
use Symfony\Component\DependencyInjection\Attribute\Autowire;
use Symfony\Component\EventDispatcher\Attribute\AsEventListener;

final class LogDataChange
{
    public function __construct(
        #[Autowire(service: 'monolog.logger.opcua_data')]
        private LoggerInterface $logger,
    ) {}

    #[AsEventListener]
    public function __invoke(DataChangeReceived $event): void
    {
        $this->logger->info(
            sprintf('handle %d = %s', $event->clientHandle, json_encode($event->dataValue->getValue())),
            ['status' => $event->dataValue->statusCode],
        );
    }
}

Persistence — Doctrine

For any non-trivial work, route via Messenger:

php src/Message/StoreReading.php
namespace App\Message;

final readonly class StoreReading
{
    public function __construct(
        public int    $clientHandle,
        public mixed  $value,
        public int    $statusCode,
        public ?\DateTimeImmutable $at,
    ) {}
}
php src/EventListener/DispatchReading.php
namespace App\EventListener;

use App\Message\StoreReading;
use PhpOpcua\Client\Event\DataChangeReceived;
use Symfony\Component\EventDispatcher\Attribute\AsEventListener;
use Symfony\Component\Messenger\MessageBusInterface;

final class DispatchReading
{
    public function __construct(private MessageBusInterface $bus) {}

    #[AsEventListener]
    public function __invoke(DataChangeReceived $event): void
    {
        $this->bus->dispatch(new StoreReading(
            clientHandle: $event->clientHandle,
            value:        $event->dataValue->getValue(),
            statusCode:   $event->dataValue->statusCode,
            at:           $event->dataValue->sourceTimestamp,
        ));
    }
}
php src/MessageHandler/StoreReadingHandler.php
namespace App\MessageHandler;

use App\Entity\PlcReading;
use App\Message\StoreReading;
use App\Service\HandleToNodeMap;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;

#[AsMessageHandler]
final class StoreReadingHandler
{
    public function __construct(
        private EntityManagerInterface $em,
        private HandleToNodeMap $map,
    ) {}

    public function __invoke(StoreReading $msg): void
    {
        if ($msg->statusCode !== 0) return;    // skip bad readings

        $reading = (new PlcReading())
            ->setNodeId($this->map->resolve($msg->clientHandle))
            ->setValue($msg->value)
            ->setSourceAt($msg->at);

        $this->em->persist($reading);
        $this->em->flush();
    }
}

Routing the listener through Messenger keeps the EventDispatcher fast.

Broadcasting via Mercure

For real-time UI updates:

php src/EventListener/BroadcastTagUpdate.php
namespace App\EventListener;

use App\Service\HandleToNodeMap;
use PhpOpcua\Client\Event\DataChangeReceived;
use Symfony\Component\EventDispatcher\Attribute\AsEventListener;
use Symfony\Component\Mercure\HubInterface;
use Symfony\Component\Mercure\Update;

final class BroadcastTagUpdate
{
    public function __construct(
        private HubInterface $hub,
        private HandleToNodeMap $map,
    ) {}

    #[AsEventListener]
    public function __invoke(DataChangeReceived $event): void
    {
        $nodeId = $this->map->resolve($event->clientHandle);

        $this->hub->publish(new Update(
            topics: [
                '/plc/' . $nodeId,
                '/plc/all',
            ],
            data: json_encode([
                'node_id'   => $nodeId,
                'value'     => $event->dataValue->getValue(),
                'good'      => $event->dataValue->statusCode === 0,
                'source_at' => $event->dataValue->sourceTimestamp?->format('c'),
            ]),
        ));
    }
}

The browser subscribes to /plc/all for a dashboard or /plc/ns=2;s=Speed for a single tile. See Integrations · Mercure.

Caching latest value

A pattern that pairs well with broadcasting — any reader can query the latest value cheaply:

php src/EventListener/CacheLatestValue.php
namespace App\EventListener;

use App\Service\HandleToNodeMap;
use PhpOpcua\Client\Event\DataChangeReceived;
use Symfony\Component\EventDispatcher\Attribute\AsEventListener;
use Symfony\Contracts\Cache\CacheInterface;
use Symfony\Contracts\Cache\ItemInterface;

final class CacheLatestValue
{
    public function __construct(
        private CacheInterface $cache,
        private HandleToNodeMap $map,
    ) {}

    #[AsEventListener]
    public function __invoke(DataChangeReceived $event): void
    {
        $nodeId = $this->map->resolve($event->clientHandle);
        $key = 'plc.latest.' . hash('xxh3', $nodeId);

        $this->cache->delete($key);
        $this->cache->get($key, function (ItemInterface $item) use ($event) {
            $item->expiresAfter(300);
            return [
                'value'  => $event->dataValue->getValue(),
                'status' => $event->dataValue->statusCode,
                'at'     => $event->dataValue->sourceTimestamp?->format('c'),
            ];
        });
    }
}

…then in a controller:

php controller
#[Route('/api/tags/{node}/latest', methods: ['GET'], requirements: ['node' => '.+'])]
public function latest(string $node, CacheInterface $cache): JsonResponse
{
    $key = 'plc.latest.' . hash('xxh3', $node);
    $data = $cache->get($key, fn() => null);

    return $this->json($data);
}

No OPC UA round-trip — the cache is always within publishingInterval ms of fresh.

Threshold-based alerting

php src/EventListener/AlertHighTemperature.php
namespace App\EventListener;

use App\Notification\HighTemperatureNotification;
use PhpOpcua\Client\Event\DataChangeReceived;
use Symfony\Component\EventDispatcher\Attribute\AsEventListener;
use Symfony\Component\Notifier\NotifierInterface;
use Symfony\Component\Notifier\Recipient\Recipient;
use Symfony\Contracts\Cache\CacheInterface;
use Symfony\Contracts\Cache\ItemInterface;

final class AlertHighTemperature
{
    private const TEMPERATURE_HANDLE = 7; // assigned at createMonitoredItems

    public function __construct(
        private NotifierInterface $notifier,
        private CacheInterface $cache,
    ) {}

    #[AsEventListener]
    public function __invoke(DataChangeReceived $event): void
    {
        if ($event->clientHandle !== self::TEMPERATURE_HANDLE) return;

        $temp = (float) $event->dataValue->getValue();
        if ($temp < 90.0) return;

        // Throttle: one alert per 5 minutes
        $key = 'alert.fired.handle.' . self::TEMPERATURE_HANDLE;
        $alreadyFired = $this->cache->get($key, function (ItemInterface $i) {
            $i->expiresAfter(300);
            return false;
        });

        if ($alreadyFired) return;

        $this->cache->delete($key);
        $this->cache->get($key, function (ItemInterface $i) {
            $i->expiresAfter(300);
            return true;
        });

        $this->notifier->send(
            new HighTemperatureNotification($temp),
            new Recipient('[email protected]'),
        );
    }
}

The throttle cache stops fluctuating sensors from firing 100 alerts per second.

SubscriptionKeepAlive

When a subscription is idle, the server sends keep-alives. The client dispatches PhpOpcua\Client\Event\SubscriptionKeepAlive — useful for "the connection is alive" health checks. The class carries $client and $subscriptionId.

php keep-alive listener
use PhpOpcua\Client\Event\SubscriptionKeepAlive;

#[AsEventListener]
public function __invoke(SubscriptionKeepAlive $event): void
{
    // Touch a heartbeat timestamp for $event->subscriptionId
}

Usually you don't need to listen — but useful if you build a custom freshness probe.

There is no KeepAliveReceived event. The real class is SubscriptionKeepAlive.

What NOT to do in a listener

Avoid Why
Synchronous HTTP Queue it (Messenger)
Synchronous DB transactions Queue it
Blocking I/O Queue it
Heavy computation Queue it
In-memory transforms OK sync
Single-row insert (low-volume) OK sync

The rule: if it can take more than 5 ms at p99, queue it.