symfony-opcua · master
Docs · Events

Async listeners with Messenger

Route OPC UA event listeners through Messenger to avoid blocking the dispatcher. Transports, retry, batching, and worker tuning for high-throughput subscriptions.

OPC UA subscriptions can produce thousands of events per minute. Synchronous listeners that do non-trivial work choke the publish loop. The fix: route through Symfony Messenger.

The pattern

A thin listener dispatches a message; a handler runs on a worker:

text flow
DataChangeReceived

    │  (sync — microseconds)

EventListener

    │  $bus->dispatch(new StoreReading(...))

Messenger transport (async)

    │  (async — milliseconds to seconds)

Worker process

    │  MessageHandler (Doctrine, HTTP, notifier, etc.)

Side effects

The listener returns in microseconds; the work happens on a worker.

The message

php src/Message/StoreReading.php
namespace App\Message;

final readonly class StoreReading
{
    public function __construct(
        public int $clientHandle,
        public mixed $value,
        public int $statusCode,
        public ?\DateTimeImmutable $at,
    ) {}
}

Use readonly and primitive types. Messages are serialised — no entity references (use IDs instead). DataChangeReceived only carries $clientHandle (the integer you assigned at item creation); resolve back to a logical nodeId in the handler.

The listener

php dispatch listener
namespace App\EventListener;

use App\Message\StoreReading;
use PhpOpcua\Client\Event\DataChangeReceived;
use Symfony\Component\EventDispatcher\Attribute\AsEventListener;
use Symfony\Component\Messenger\MessageBusInterface;

final class DispatchReading
{
    public function __construct(private MessageBusInterface $bus) {}

    #[AsEventListener]
    public function __invoke(DataChangeReceived $event): void
    {
        $this->bus->dispatch(new StoreReading(
            clientHandle: $event->clientHandle,
            value:        $event->dataValue->getValue(),
            statusCode:   $event->dataValue->statusCode,
            at:           $event->dataValue->sourceTimestamp,
        ));
    }
}

The handler

php handler
namespace App\MessageHandler;

use App\Entity\PlcReading;
use App\Message\StoreReading;
use App\Service\HandleToNodeMap;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;

#[AsMessageHandler]
final class StoreReadingHandler
{
    public function __construct(
        private EntityManagerInterface $em,
        private HandleToNodeMap $map,
    ) {}

    public function __invoke(StoreReading $msg): void
    {
        if ($msg->statusCode !== 0) return;

        $reading = (new PlcReading())
            ->setNodeId($this->map->resolve($msg->clientHandle))
            ->setValue($msg->value)
            ->setSourceAt($msg->at);

        $this->em->persist($reading);
        $this->em->flush();
    }
}

Transport config

text config/packages/messenger.yaml
framework:
    messenger:
        transports:
            async_opcua:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    use_notify: true
                    check_delayed_interval: 60000
                retry_strategy:
                    max_retries: 3
                    multiplier: 2

        routing:
            App\Message\StoreReading: async_opcua
            App\Message\RecordAlarm:  async_opcua

.env:

bash DSN options
# Redis (recommended for OPC UA-driven traffic)
MESSENGER_TRANSPORT_DSN=redis://localhost:6379/messages

# Doctrine table (simpler, lower throughput)
# MESSENGER_TRANSPORT_DSN=doctrine://default

# RabbitMQ
# MESSENGER_TRANSPORT_DSN=amqp://guest:guest@localhost:5672/%2f/messages

Redis is the sweet spot for OPC UA throughput — sub-millisecond enqueue, easy to monitor.

Running the worker

bash terminal
php bin/console messenger:consume async_opcua \
    --time-limit=3600 \
    --memory-limit=512M \
    --limit=10000
Flag Purpose
--time-limit Restart after N seconds
--memory-limit Restart after N MB
--limit Restart after N messages

Worker exits cleanly; the supervisor (systemd / Supervisor) restarts it. All three bound long-running PHP from leaking forever.

Per-handler retry

php retry strategy
#[AsMessageHandler]
final class StoreReadingHandler
{
    public function __invoke(StoreReading $msg): void
    {
        try {
            // ...
        } catch (\PDOException $e) {
            throw new RecoverableMessageHandlingException(
                'DB transient error', 0, $e
            );
        } catch (\InvalidArgumentException $e) {
            throw new UnrecoverableMessageHandlingException(
                'Bad message', 0, $e
            );
        }
    }
}
Exception Effect
RecoverableMessageHandlingException Retried per retry strategy
UnrecoverableMessageHandlingException Sent to failed transport immediately
Any other Retried per retry strategy

Failed messages

text failure transport
framework:
    messenger:
        failure_transport: failed

        transports:
            failed: 'doctrine://default?queue_name=failed'

        routing:
            App\Message\StoreReading: async_opcua

Inspect / retry:

bash terminal
php bin/console messenger:failed:show
php bin/console messenger:failed:retry
php bin/console messenger:failed:remove

Multiple queues per priority

Route different message types to different queues:

text multi-queue
framework:
    messenger:
        transports:
            opcua_data:    '%env(MESSENGER_DSN)%?queue_name=opcua_data'
            opcua_alarms:  '%env(MESSENGER_DSN)%?queue_name=opcua_alarms'
            opcua_history: '%env(MESSENGER_DSN)%?queue_name=opcua_history'

        routing:
            App\Message\StoreReading:        opcua_data
            App\Message\RecordAlarm:         opcua_alarms
            App\Message\FetchDailyHistory:   opcua_history

Run separate workers per queue:

bash separate workers
php bin/console messenger:consume opcua_data    --time-limit=3600
php bin/console messenger:consume opcua_alarms  --time-limit=3600
php bin/console messenger:consume opcua_history --time-limit=3600

Heavy history reads can't starve fast data inserts.

Worker scaling — multiple processes per queue

Run N processes against the same queue:

bash parallel workers
for i in 1 2 3 4; do
    php bin/console messenger:consume opcua_data --time-limit=3600 &
done
wait

For systemd, use a template unit [email protected] and enable --now opcua-worker@{1..4}.

Throughput tuning

Subscription rate Per-job time Workers per queue
100 events / sec 5 ms 1-2
1000 events / sec 10 ms 4-8
5000 events / sec 20 ms 16-32 (consider batching)

Watch the queue depth via messenger:stats or Redis CLI to size correctly.

Batched handlers

For very high throughput, buffer in Redis and flush periodically:

php batched handler
#[AsMessageHandler]
final class StoreReadingBatchedHandler
{
    private const BATCH_SIZE = 100;

    public function __invoke(StoreReading $msg): void
    {
        // Push into a Redis list
        $this->redis->rpush('opcua.batch', json_encode($msg));

        $length = $this->redis->llen('opcua.batch');
        if ($length >= self::BATCH_SIZE) {
            $this->flush();
        }
    }

    private function flush(): void
    {
        $items = $this->redis->multi()
            ->lrange('opcua.batch', 0, -1)
            ->del('opcua.batch')
            ->exec()[0];

        $rows = array_map(fn(string $j) => json_decode($j, true), $items);
        $this->em->getConnection()->executeStatement(
            "INSERT INTO plc_readings (node_id, value, source_at) VALUES " . /* ... */
        );
    }
}

…with a scheduled command to drain the buffer every 5 seconds even if the threshold isn't hit. See Recipes · Persistent tag history.

Idempotency

If a handler can be retried, it must be idempotent. Use natural keys for upserts:

php upsert (PostgreSQL dialect)
$em->getConnection()->executeStatement(
    "INSERT INTO plc_readings (node_id, source_at, value)
     VALUES (:n, :a, :v)
     ON CONFLICT (node_id, source_at) DO UPDATE SET value = EXCLUDED.value",
    ['n' => $this->map->resolve($msg->clientHandle), 'a' => $msg->at, 'v' => $msg->value],
);

source_at is naturally unique per node. The ON CONFLICT clause above is PostgreSQL syntax; for MySQL use ON DUPLICATE KEY UPDATE, for SQLite use ON CONFLICT (…) DO UPDATE.

Monitoring

Metric Where Alert threshold
Queue depth messenger:stats or Redis > 1000 messages
Failed messages failed_messages table > 10 / hour
Worker memory systemd MemoryCurrent Above your normal +50%
Worker restart count systemd journal > 5 in 10 minutes

Wire into Prometheus / Datadog / your monitoring of choice.

Documentation