Async listeners with Messenger
Route OPC UA event listeners through Messenger to avoid blocking the dispatcher. Transports, retry, batching, and worker tuning for high-throughput subscriptions.
OPC UA subscriptions can produce thousands of events per minute. Synchronous listeners that do non-trivial work choke the publish loop. The fix: route through Symfony Messenger.
The pattern
A thin listener dispatches a message; a handler runs on a worker:
DataChangeReceived
│
│ (sync — microseconds)
▼
EventListener
│
│ $bus->dispatch(new StoreReading(...))
▼
Messenger transport (async)
│
│ (async — milliseconds to seconds)
▼
Worker process
│
│ MessageHandler (Doctrine, HTTP, notifier, etc.)
▼
Side effects
The listener returns in microseconds; the work happens on a worker.
The message
namespace App\Message;
final readonly class StoreReading
{
public function __construct(
public int $clientHandle,
public mixed $value,
public int $statusCode,
public ?\DateTimeImmutable $at,
) {}
}
Use readonly and primitive types. Messages are serialised — no
entity references (use IDs instead). DataChangeReceived only
carries $clientHandle (the integer you assigned at item
creation); resolve back to a logical nodeId in the handler.
The listener
namespace App\EventListener;
use App\Message\StoreReading;
use PhpOpcua\Client\Event\DataChangeReceived;
use Symfony\Component\EventDispatcher\Attribute\AsEventListener;
use Symfony\Component\Messenger\MessageBusInterface;
final class DispatchReading
{
public function __construct(private MessageBusInterface $bus) {}
#[AsEventListener]
public function __invoke(DataChangeReceived $event): void
{
$this->bus->dispatch(new StoreReading(
clientHandle: $event->clientHandle,
value: $event->dataValue->getValue(),
statusCode: $event->dataValue->statusCode,
at: $event->dataValue->sourceTimestamp,
));
}
}
The handler
namespace App\MessageHandler;
use App\Entity\PlcReading;
use App\Message\StoreReading;
use App\Service\HandleToNodeMap;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
#[AsMessageHandler]
final class StoreReadingHandler
{
public function __construct(
private EntityManagerInterface $em,
private HandleToNodeMap $map,
) {}
public function __invoke(StoreReading $msg): void
{
if ($msg->statusCode !== 0) return;
$reading = (new PlcReading())
->setNodeId($this->map->resolve($msg->clientHandle))
->setValue($msg->value)
->setSourceAt($msg->at);
$this->em->persist($reading);
$this->em->flush();
}
}
Transport config
framework:
messenger:
transports:
async_opcua:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
use_notify: true
check_delayed_interval: 60000
retry_strategy:
max_retries: 3
multiplier: 2
routing:
App\Message\StoreReading: async_opcua
App\Message\RecordAlarm: async_opcua
.env:
# Redis (recommended for OPC UA-driven traffic)
MESSENGER_TRANSPORT_DSN=redis://localhost:6379/messages
# Doctrine table (simpler, lower throughput)
# MESSENGER_TRANSPORT_DSN=doctrine://default
# RabbitMQ
# MESSENGER_TRANSPORT_DSN=amqp://guest:guest@localhost:5672/%2f/messages
Redis is the sweet spot for OPC UA throughput — sub-millisecond enqueue, easy to monitor.
Running the worker
php bin/console messenger:consume async_opcua \
--time-limit=3600 \
--memory-limit=512M \
--limit=10000
| Flag | Purpose |
|---|---|
--time-limit |
Restart after N seconds |
--memory-limit |
Restart after N MB |
--limit |
Restart after N messages |
Worker exits cleanly; the supervisor (systemd / Supervisor) restarts it. All three bound long-running PHP from leaking forever.
Per-handler retry
#[AsMessageHandler]
final class StoreReadingHandler
{
public function __invoke(StoreReading $msg): void
{
try {
// ...
} catch (\PDOException $e) {
throw new RecoverableMessageHandlingException(
'DB transient error', 0, $e
);
} catch (\InvalidArgumentException $e) {
throw new UnrecoverableMessageHandlingException(
'Bad message', 0, $e
);
}
}
}
| Exception | Effect |
|---|---|
RecoverableMessageHandlingException |
Retried per retry strategy |
UnrecoverableMessageHandlingException |
Sent to failed transport immediately |
| Any other | Retried per retry strategy |
Failed messages
framework:
messenger:
failure_transport: failed
transports:
failed: 'doctrine://default?queue_name=failed'
routing:
App\Message\StoreReading: async_opcua
Inspect / retry:
php bin/console messenger:failed:show
php bin/console messenger:failed:retry
php bin/console messenger:failed:remove
Multiple queues per priority
Route different message types to different queues:
framework:
messenger:
transports:
opcua_data: '%env(MESSENGER_DSN)%?queue_name=opcua_data'
opcua_alarms: '%env(MESSENGER_DSN)%?queue_name=opcua_alarms'
opcua_history: '%env(MESSENGER_DSN)%?queue_name=opcua_history'
routing:
App\Message\StoreReading: opcua_data
App\Message\RecordAlarm: opcua_alarms
App\Message\FetchDailyHistory: opcua_history
Run separate workers per queue:
php bin/console messenger:consume opcua_data --time-limit=3600
php bin/console messenger:consume opcua_alarms --time-limit=3600
php bin/console messenger:consume opcua_history --time-limit=3600
Heavy history reads can't starve fast data inserts.
Worker scaling — multiple processes per queue
Run N processes against the same queue:
for i in 1 2 3 4; do
php bin/console messenger:consume opcua_data --time-limit=3600 &
done
wait
For systemd, use a template unit
[email protected] and enable --now opcua-worker@{1..4}.
Throughput tuning
| Subscription rate | Per-job time | Workers per queue |
|---|---|---|
| 100 events / sec | 5 ms | 1-2 |
| 1000 events / sec | 10 ms | 4-8 |
| 5000 events / sec | 20 ms | 16-32 (consider batching) |
Watch the queue depth via messenger:stats or Redis CLI to size
correctly.
Batched handlers
For very high throughput, buffer in Redis and flush periodically:
#[AsMessageHandler]
final class StoreReadingBatchedHandler
{
private const BATCH_SIZE = 100;
public function __invoke(StoreReading $msg): void
{
// Push into a Redis list
$this->redis->rpush('opcua.batch', json_encode($msg));
$length = $this->redis->llen('opcua.batch');
if ($length >= self::BATCH_SIZE) {
$this->flush();
}
}
private function flush(): void
{
$items = $this->redis->multi()
->lrange('opcua.batch', 0, -1)
->del('opcua.batch')
->exec()[0];
$rows = array_map(fn(string $j) => json_decode($j, true), $items);
$this->em->getConnection()->executeStatement(
"INSERT INTO plc_readings (node_id, value, source_at) VALUES " . /* ... */
);
}
}
…with a scheduled command to drain the buffer every 5 seconds even if the threshold isn't hit. See Recipes · Persistent tag history.
Idempotency
If a handler can be retried, it must be idempotent. Use natural keys for upserts:
$em->getConnection()->executeStatement(
"INSERT INTO plc_readings (node_id, source_at, value)
VALUES (:n, :a, :v)
ON CONFLICT (node_id, source_at) DO UPDATE SET value = EXCLUDED.value",
['n' => $this->map->resolve($msg->clientHandle), 'a' => $msg->at, 'v' => $msg->value],
);
source_at is naturally unique per node. The ON CONFLICT clause
above is PostgreSQL syntax; for MySQL use
ON DUPLICATE KEY UPDATE, for SQLite use ON CONFLICT (…) DO UPDATE.
Monitoring
| Metric | Where | Alert threshold |
|---|---|---|
| Queue depth | messenger:stats or Redis |
> 1000 messages |
| Failed messages | failed_messages table |
> 10 / hour |
| Worker memory | systemd MemoryCurrent | Above your normal +50% |
| Worker restart count | systemd journal | > 5 in 10 minutes |
Wire into Prometheus / Datadog / your monitoring of choice.
Where to read next
- Integrations · Messenger — Messenger configuration in depth.
- Logging — what to log about worker runtime.
- Recipes · Persistent tag history — the canonical batched-persistence pattern.