Messenger
Symfony Messenger for OPC UA work — transports, async listeners, retry strategies, worker tuning. Complete end-to-end fleet-sampling pattern.
OPC UA work that's bursty (fleet sampling, large history reads, recipe loads) belongs on Messenger. Async transports, retries, and worker tuning are the standard tools.
Transport topology
Different OPC UA workloads have different SLAs. Separate them:
| Transport | What's on it | Priority |
|---|---|---|
opcua_control |
Setpoint writes, method calls | High — operator-visible |
opcua_data |
Tag reads, periodic samples | Normal |
opcua_history |
History reads, bulk samples | Low |
opcua_alarms |
Alarm-event processing | High |
Separating prevents slow history reads from blocking setpoint changes.
Config
framework:
messenger:
default_bus: messenger.bus.default
failure_transport: failed
transports:
opcua_control:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%?queue_name=opcua_control'
retry_strategy:
max_retries: 1 # control ops aren't idempotent
opcua_data:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%?queue_name=opcua_data'
retry_strategy:
max_retries: 3
multiplier: 2
opcua_history:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%?queue_name=opcua_history'
retry_strategy:
max_retries: 3
delay: 5000
opcua_alarms:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%?queue_name=opcua_alarms'
retry_strategy:
max_retries: 5
failed:
dsn: 'doctrine://default?queue_name=failed'
routing:
App\Message\WriteSetpoint: opcua_control
App\Message\LoadRecipe: opcua_control
App\Message\SamplePlc: opcua_data
App\Message\StoreReading: opcua_data
App\Message\FetchDailyHistory: opcua_history
App\Message\RecordAlarm: opcua_alarms
.env:
# Redis — recommended
MESSENGER_TRANSPORT_DSN=redis://localhost:6379/messages
# Or Doctrine
# MESSENGER_TRANSPORT_DSN=doctrine://default
End-to-end — fleet sampler
A scheduled job dispatches one sample message per PLC; handlers
run on the opcua_data queue.
The PlcUnit entity (Doctrine)
namespace App\Entity;
use Doctrine\ORM\Mapping as ORM;
#[ORM\Entity]
class PlcUnit
{
#[ORM\Id, ORM\GeneratedValue, ORM\Column(type: 'integer')]
public ?int $id = null;
#[ORM\Column(type: 'string', length: 100, unique: true)]
public string $serial;
#[ORM\Column(type: 'string', length: 255)]
public string $endpoint;
#[ORM\Column(type: 'string', length: 50, nullable: true)]
public ?string $username = null;
#[ORM\Column(type: 'boolean')]
public bool $active = true;
public function toConnectionConfig(): array
{
return [
'endpoint' => $this->endpoint,
'username' => $this->username,
// Password fetched from vault by serial
'timeout' => 8.0,
];
}
}
The message
namespace App\Message;
final readonly class SamplePlc
{
/**
* @param string[] $nodeIds
*/
public function __construct(
public string $serial,
public array $nodeIds,
) {}
}
The handler
namespace App\MessageHandler;
use App\Entity\PlcReading;
use App\Entity\PlcUnit;
use App\Message\SamplePlc;
use App\Repository\PlcUnitRepository;
use Doctrine\ORM\EntityManagerInterface;
use PhpOpcua\Client\Exception\ConnectionException;
use PhpOpcua\SymfonyOpcua\OpcuaManager;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\Exception\RecoverableMessageHandlingException;
#[AsMessageHandler]
final class SamplePlcHandler
{
public function __construct(
private OpcuaManager $opcua,
private PlcUnitRepository $repo,
private EntityManagerInterface $em,
) {}
public function __invoke(SamplePlc $message): void
{
$unit = $this->repo->findOneBy(['serial' => $message->serial]);
if ($unit === null || !$unit->active) {
return;
}
try {
$client = $this->opcua->connectTo(
$unit->endpoint,
$unit->toConnectionConfig(),
as: "fleet:{$message->serial}",
);
$builder = $client->readMulti(null);
foreach ($message->nodeIds as $node) {
$builder->node($node);
}
$results = $builder->execute();
} catch (ConnectionException $e) {
throw new RecoverableMessageHandlingException(
"PLC {$message->serial} unreachable", 0, $e,
);
}
foreach ($message->nodeIds as $i => $node) {
$reading = (new PlcReading())
->setPlcSerial($message->serial)
->setNodeId($node)
->setValue($results[$i]->getValue())
->setStatusCode($results[$i]->statusCode)
->setSourceAt($results[$i]->sourceTimestamp ?? new \DateTimeImmutable());
$this->em->persist($reading);
}
$this->em->flush();
}
}
The handler decides what's retryable: transport errors throw
RecoverableMessageHandlingException (retry per strategy);
logic errors propagate as-is (sent to failure transport).
Scheduling the dispatch
namespace App\Scheduler;
use App\Message\SamplePlc;
use App\Repository\PlcUnitRepository;
use Symfony\Component\Scheduler\Attribute\AsSchedule;
use Symfony\Component\Scheduler\RecurringMessage;
use Symfony\Component\Scheduler\Schedule;
use Symfony\Component\Scheduler\ScheduleProviderInterface;
#[AsSchedule('plc-fleet')]
final class PlcFleetSchedule implements ScheduleProviderInterface
{
public function __construct(private PlcUnitRepository $repo) {}
public function getSchedule(): Schedule
{
$schedule = new Schedule();
$nodes = ['ns=2;s=Speed', 'ns=2;s=Temperature', 'ns=2;s=Pressure'];
foreach ($this->repo->findActive() as $i => $unit) {
// Stagger by serial-hash modulo 60 — spreads over the minute
$offset = abs(crc32($unit->serial)) % 60;
$schedule->add(
RecurringMessage::cron("$offset * * * *", new SamplePlc($unit->serial, $nodes))
);
}
return $schedule;
}
}
Each PLC gets sampled once a minute, spread over the 60-second window.
Running workers
# /etc/systemd/system/[email protected]
[Service]
ExecStart=/usr/bin/php /var/www/html/bin/console messenger:consume opcua_data \
--time-limit=3600 --memory-limit=512M --limit=10000
[Install]
WantedBy=multi-user.target
sudo systemctl enable --now messenger-opcua-data@{1..4}
4 parallel workers on opcua_data.
Running the scheduler
php bin/console messenger:consume scheduler_plc_fleet --time-limit=3600
Dispatches messages at the scheduled cadence.
Watching Messenger
php bin/console messenger:stats
php bin/console messenger:failed:show
php bin/console messenger:failed:retry
For a UI, install
symfony/messenger-monitor —
a Symfony bundle that adds a dashboard.
Per-message retry tuning
Override the global retry strategy in the message:
use Symfony\Component\Messenger\Stamp\TransportConfigurationStamp;
$bus->dispatch(
new SamplePlc($serial, $nodes),
[new TransportConfigurationStamp([
'retry_strategy.max_retries' => 5,
])],
);
Useful for known-flaky destinations.
Failure handling
| Failure | Effect |
|---|---|
RecoverableMessageHandlingException |
Retried per strategy |
UnrecoverableMessageHandlingException |
Sent to failed immediately |
| Other exception | Retried per strategy |
| Retry exhausted | Sent to failed |
For OPC UA writes — don't retry. Set max_retries: 0 on
opcua_control.
Per-handler logging
use Monolog\Attribute\WithMonologChannel;
#[AsMessageHandler]
#[WithMonologChannel('opcua_data')]
final class SamplePlcHandler
{
public function __construct(
private OpcuaManager $opcua,
private LoggerInterface $logger, // Auto-injected from opcua_data channel
) {}
}
The handler's own logs go to the opcua_data Monolog channel.
Tuning throughput
| Subscription rate | Per-job time | Workers per queue |
|---|---|---|
| 100 msg / sec | 5 ms | 1-2 |
| 1 000 msg / sec | 10 ms | 4-8 |
| 5 000 msg / sec | 20 ms | 16-32 (consider batching) |
Watch messenger:stats and Redis INFO for backlog growth.
Where to read next
- Mercure — pushing handler results to the browser.
- Doctrine — the persistence side.
- Recipes · Persistent tag history — full canonical pattern.