Data events
DataChangeReceived — the most common subscription event. Field reference, listener patterns, persistence to Doctrine, broadcast via Mercure, and the throughput rules.
DataChangeReceived fires every time a publish response carries
a monitored-item value change. In a typical real-time UI it's the
event that drives everything.
The full FQN is PhpOpcua\Client\Event\DataChangeReceived
(singular Event\).
Note
Publish-loop driven. The event only fires while something is
calling publish() on the subscription. In managed mode with
auto-publish the daemon drives the loop. In direct mode your
own code drives it (e.g. a long-running console worker).
Field reference
DataChangeReceived carries these public readonly fields
(opcua-client/src/Event/DataChangeReceived.php):
| Field | Type | Meaning |
|---|---|---|
$client |
OpcUaClientInterface |
The live client instance |
$subscriptionId |
int | Server subscription id |
$sequenceNumber |
int | Publish sequence number |
$clientHandle |
int | The handle you assigned at item creation |
$dataValue |
DataValue |
The reading |
The DataValue carries getValue(), statusCode,
sourceTimestamp, serverTimestamp, type.
DataChangeReceiveddoes not carry anodeIdfield. It carriesclientHandle— the integer you supplied when callingcreateMonitoredItems(). Keep aclientHandle => nodeIdmap on your application side to route events back to a logical tag.
Simple log listener
namespace App\EventListener;
use PhpOpcua\Client\Event\DataChangeReceived;
use Psr\Log\LoggerInterface;
use Symfony\Component\DependencyInjection\Attribute\Autowire;
use Symfony\Component\EventDispatcher\Attribute\AsEventListener;
final class LogDataChange
{
public function __construct(
#[Autowire(service: 'monolog.logger.opcua_data')]
private LoggerInterface $logger,
) {}
#[AsEventListener]
public function __invoke(DataChangeReceived $event): void
{
$this->logger->info(
sprintf('handle %d = %s', $event->clientHandle, json_encode($event->dataValue->getValue())),
['status' => $event->dataValue->statusCode],
);
}
}
Persistence — Doctrine
For any non-trivial work, route via Messenger:
namespace App\Message;
final readonly class StoreReading
{
public function __construct(
public int $clientHandle,
public mixed $value,
public int $statusCode,
public ?\DateTimeImmutable $at,
) {}
}
namespace App\EventListener;
use App\Message\StoreReading;
use PhpOpcua\Client\Event\DataChangeReceived;
use Symfony\Component\EventDispatcher\Attribute\AsEventListener;
use Symfony\Component\Messenger\MessageBusInterface;
final class DispatchReading
{
public function __construct(private MessageBusInterface $bus) {}
#[AsEventListener]
public function __invoke(DataChangeReceived $event): void
{
$this->bus->dispatch(new StoreReading(
clientHandle: $event->clientHandle,
value: $event->dataValue->getValue(),
statusCode: $event->dataValue->statusCode,
at: $event->dataValue->sourceTimestamp,
));
}
}
namespace App\MessageHandler;
use App\Entity\PlcReading;
use App\Message\StoreReading;
use App\Service\HandleToNodeMap;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
#[AsMessageHandler]
final class StoreReadingHandler
{
public function __construct(
private EntityManagerInterface $em,
private HandleToNodeMap $map,
) {}
public function __invoke(StoreReading $msg): void
{
if ($msg->statusCode !== 0) return; // skip bad readings
$reading = (new PlcReading())
->setNodeId($this->map->resolve($msg->clientHandle))
->setValue($msg->value)
->setSourceAt($msg->at);
$this->em->persist($reading);
$this->em->flush();
}
}
Routing the listener through Messenger keeps the EventDispatcher fast.
Broadcasting via Mercure
For real-time UI updates:
namespace App\EventListener;
use App\Service\HandleToNodeMap;
use PhpOpcua\Client\Event\DataChangeReceived;
use Symfony\Component\EventDispatcher\Attribute\AsEventListener;
use Symfony\Component\Mercure\HubInterface;
use Symfony\Component\Mercure\Update;
final class BroadcastTagUpdate
{
public function __construct(
private HubInterface $hub,
private HandleToNodeMap $map,
) {}
#[AsEventListener]
public function __invoke(DataChangeReceived $event): void
{
$nodeId = $this->map->resolve($event->clientHandle);
$this->hub->publish(new Update(
topics: [
'/plc/' . $nodeId,
'/plc/all',
],
data: json_encode([
'node_id' => $nodeId,
'value' => $event->dataValue->getValue(),
'good' => $event->dataValue->statusCode === 0,
'source_at' => $event->dataValue->sourceTimestamp?->format('c'),
]),
));
}
}
The browser subscribes to /plc/all for a dashboard or
/plc/ns=2;s=Speed for a single tile. See
Integrations · Mercure.
Caching latest value
A pattern that pairs well with broadcasting — any reader can query the latest value cheaply:
namespace App\EventListener;
use App\Service\HandleToNodeMap;
use PhpOpcua\Client\Event\DataChangeReceived;
use Symfony\Component\EventDispatcher\Attribute\AsEventListener;
use Symfony\Contracts\Cache\CacheInterface;
use Symfony\Contracts\Cache\ItemInterface;
final class CacheLatestValue
{
public function __construct(
private CacheInterface $cache,
private HandleToNodeMap $map,
) {}
#[AsEventListener]
public function __invoke(DataChangeReceived $event): void
{
$nodeId = $this->map->resolve($event->clientHandle);
$key = 'plc.latest.' . hash('xxh3', $nodeId);
$this->cache->delete($key);
$this->cache->get($key, function (ItemInterface $item) use ($event) {
$item->expiresAfter(300);
return [
'value' => $event->dataValue->getValue(),
'status' => $event->dataValue->statusCode,
'at' => $event->dataValue->sourceTimestamp?->format('c'),
];
});
}
}
…then in a controller:
#[Route('/api/tags/{node}/latest', methods: ['GET'], requirements: ['node' => '.+'])]
public function latest(string $node, CacheInterface $cache): JsonResponse
{
$key = 'plc.latest.' . hash('xxh3', $node);
$data = $cache->get($key, fn() => null);
return $this->json($data);
}
No OPC UA round-trip — the cache is always within
publishingInterval ms of fresh.
Threshold-based alerting
namespace App\EventListener;
use App\Notification\HighTemperatureNotification;
use PhpOpcua\Client\Event\DataChangeReceived;
use Symfony\Component\EventDispatcher\Attribute\AsEventListener;
use Symfony\Component\Notifier\NotifierInterface;
use Symfony\Component\Notifier\Recipient\Recipient;
use Symfony\Contracts\Cache\CacheInterface;
use Symfony\Contracts\Cache\ItemInterface;
final class AlertHighTemperature
{
private const TEMPERATURE_HANDLE = 7; // assigned at createMonitoredItems
public function __construct(
private NotifierInterface $notifier,
private CacheInterface $cache,
) {}
#[AsEventListener]
public function __invoke(DataChangeReceived $event): void
{
if ($event->clientHandle !== self::TEMPERATURE_HANDLE) return;
$temp = (float) $event->dataValue->getValue();
if ($temp < 90.0) return;
// Throttle: one alert per 5 minutes
$key = 'alert.fired.handle.' . self::TEMPERATURE_HANDLE;
$alreadyFired = $this->cache->get($key, function (ItemInterface $i) {
$i->expiresAfter(300);
return false;
});
if ($alreadyFired) return;
$this->cache->delete($key);
$this->cache->get($key, function (ItemInterface $i) {
$i->expiresAfter(300);
return true;
});
$this->notifier->send(
new HighTemperatureNotification($temp),
new Recipient('[email protected]'),
);
}
}
The throttle cache stops fluctuating sensors from firing 100 alerts per second.
SubscriptionKeepAlive
When a subscription is idle, the server sends keep-alives. The
client dispatches PhpOpcua\Client\Event\SubscriptionKeepAlive —
useful for "the connection is alive" health checks. The class
carries $client and $subscriptionId.
use PhpOpcua\Client\Event\SubscriptionKeepAlive;
#[AsEventListener]
public function __invoke(SubscriptionKeepAlive $event): void
{
// Touch a heartbeat timestamp for $event->subscriptionId
}
Usually you don't need to listen — but useful if you build a custom freshness probe.
There is no
KeepAliveReceivedevent. The real class isSubscriptionKeepAlive.
What NOT to do in a listener
| Avoid | Why |
|---|---|
| Synchronous HTTP | Queue it (Messenger) |
| Synchronous DB transactions | Queue it |
| Blocking I/O | Queue it |
| Heavy computation | Queue it |
| In-memory transforms | OK sync |
| Single-row insert (low-volume) | OK sync |
The rule: if it can take more than 5 ms at p99, queue it.
Where to read next
- Alarm events — alarm-specific events.
- Async listeners with Messenger — scaling rules.
- Recipes · Persistent tag history — full Doctrine persistence pattern.