Auto-Publish & Monitoring
Getting Started
Introduction InstallationUsage
Usage Connections Session-manager Logging-caching SecurityReference
Testing Examples Auto-publishAuto-Publish & Monitoring
Overview
Auto-publish eliminates the need for manual publish() loops when using OPC UA subscriptions with the session manager daemon. When enabled, the daemon automatically calls publish() for sessions that have active subscriptions and dispatches PSR-14 events (DataChangeReceived, EventNotificationReceived, AlarmActivated, etc.) through Symfony's event system. Combined with per-connection auto_connect, you can define your entire monitoring setup declaratively in YAML config — no application code needed.
How It Works
┌──────────────────────────────────────────────────────────────────────────┐
│ Symfony Application │
│ │
│ Event Listeners (via #[AsEventListener] or services.yaml) │
│ DataChangeReceived → StoreSensorReading │
│ AlarmActivated → HandleAlarm │
│ │
│ config/packages/php_opcua_symfony_opcua.yaml │
│ auto_publish: true │
│ connections: │
│ plc-1: { auto_connect: true, subscriptions: [...] } │
│ historian: { } ← on-demand only │
│ │
├──────────────────────────────────────────────────────────────────────────┤
│ php bin/console opcua:session │
│ │
│ Daemon (ReactPHP event loop) │
│ ├── Auto-connect: plc-1 → TCP → OPC UA Server │
│ ├── Create subscriptions + monitored items │
│ ├── AutoPublisher: │
│ │ ├── Timer → publish() → events dispatched │
│ │ ├── Acknowledgements tracked │
│ │ └── Recovery on connection errors │
│ └── IPC socket for runtime requests │
│ │
│ Events dispatched: │
│ DataChangeReceived → Symfony listener → DB / notification / etc. │
│ AlarmActivated → Symfony listener → alert operators │
│ SubscriptionKeepAlive → (optional logging) │
└──────────────────────────────────────────────────────────────────────────┘- The daemon starts with
auto_publish: trueand a PSR-14 event dispatcher (resolved from Symfony's container) - Connections with
auto_connect: trueare established on the first event loop tick - Subscriptions and monitored items defined in
subscriptionsare created - The daemon's
AutoPublisherstarts a self-rescheduling timer for each session with subscriptions - On each publish cycle, the OPC UA client calls
publish()which internally dispatches PSR-14 events - Your Symfony event listeners handle the notifications (store in DB, send alerts, broadcast to frontend, etc.)
Configuration
Enable auto-publish
# config/packages/php_opcua_symfony_opcua.yaml
php_opcua_symfony_opcua:
session_manager:
auto_publish: trueOr via environment variable:
OPCUA_AUTO_PUBLISH=true
php_opcua_symfony_opcua:
session_manager:
auto_publish: '%env(bool:OPCUA_AUTO_PUBLISH)%'Define auto-connect connections
# config/packages/php_opcua_symfony_opcua.yaml
php_opcua_symfony_opcua:
session_manager:
enabled: true
socket_path: '%kernel.project_dir%/var/opcua-session-manager.sock'
timeout: 600
auth_token: '%env(OPCUA_AUTH_TOKEN)%'
auto_publish: true
connections:
plc-linea-1:
endpoint: '%env(PLC1_ENDPOINT)%'
username: '%env(PLC1_USER)%'
password: '%env(PLC1_PASS)%'
timeout: 3.0
auto_retry: 3
# Per-connection auto-connect (requires auto_publish to be enabled)
auto_connect: true
# Subscriptions to create on daemon startup
subscriptions:
- publishing_interval: 500.0 # ms
max_keep_alive_count: 5 # reduces max publish blocking to 2.5s
monitored_items:
- { node_id: 'ns=2;s=Temperature', client_handle: 1 }
- { node_id: 'ns=2;s=Pressure', client_handle: 2 }
- { node_id: 'ns=2;s=MachineState', client_handle: 3 }
event_monitored_items:
- node_id: 'i=2253' # Server object
client_handle: 10
select_fields:
- EventId
- EventType
- SourceName
- Time
- Message
- Severity
- ActiveState
- AckedState
- ConfirmedState
# Connection without auto_connect — used on-demand from application code
historian:
endpoint: '%env(HISTORIAN_ENDPOINT)%'
timeout: 10.0Configuration Reference
Session manager keys
| Key | Env Variable | Default | Description |
|---|---|---|---|
auto_publish |
OPCUA_AUTO_PUBLISH |
false |
Enable automatic publishing for sessions with subscriptions |
Per-connection keys
| Key | Default | Description |
|---|---|---|
auto_connect |
false |
Auto-connect this endpoint when the daemon starts (requires auto_publish) |
subscriptions |
(none) | Array of subscription definitions with monitored_items and event_monitored_items |
Subscription definition keys
| Key | Default | Description |
|---|---|---|
publishing_interval |
500.0 |
Publishing interval in milliseconds |
lifetime_count |
2400 |
Subscription lifetime count |
max_keep_alive_count |
10 |
Max keep-alive count (lower = less publish blocking) |
max_notifications_per_publish |
0 |
Max notifications per publish (0 = unlimited) |
priority |
0 |
Subscription priority |
Monitored item keys
| Key | Default | Description |
|---|---|---|
node_id |
(required) | Node ID string ('ns=2;s=Temperature', 'i=2259') |
client_handle |
0 |
Client-assigned handle (returned in notifications) |
sampling_interval |
250.0 |
Sampling interval in milliseconds |
queue_size |
1 |
Queue size for buffered notifications |
attribute_id |
13 |
OPC UA attribute to monitor (13 = Value) |
Event monitored item keys
| Key | Default | Description |
|---|---|---|
node_id |
(required) | Node ID of the event source ('i=2253' for Server object) |
client_handle |
1 |
Client-assigned handle |
select_fields |
['EventId', 'EventType', 'SourceName', 'Time', 'Message', 'Severity'] |
Event fields to select |
Event Listeners
Using the #[AsEventListener] Attribute
Register listeners using Symfony's #[AsEventListener] attribute:
namespace App\EventListener;
use PhpOpcua\Client\Event\DataChangeReceived;
use Symfony\Component\EventDispatcher\Attribute\AsEventListener;
#[AsEventListener]
class StoreSensorReading
{
public function __invoke(DataChangeReceived $event): void
{
// $event->subscriptionId — subscription that generated this notification
// $event->sequenceNumber — sequence number for acknowledgement tracking
// $event->clientHandle — the handle you assigned in monitored_items config
// $event->dataValue — DataValue with getValue(), statusCode, sourceTimestamp, serverTimestamp
}
}namespace App\EventListener;
use PhpOpcua\Client\Event\EventNotificationReceived;
use Symfony\Component\EventDispatcher\Attribute\AsEventListener;
#[AsEventListener]
class HandleEventNotification
{
public function __invoke(EventNotificationReceived $event): void
{
// $event->subscriptionId
// $event->clientHandle
// $event->eventFields — Variant[] with values for each select_field
}
}namespace App\EventListener;
use PhpOpcua\Client\Event\AlarmActivated;
use Symfony\Component\EventDispatcher\Attribute\AsEventListener;
#[AsEventListener]
class HandleAlarm
{
public function __invoke(AlarmActivated $event): void
{
// $event->subscriptionId, $event->clientHandle
// $event->sourceName, $event->message, $event->severity, $event->eventType, $event->time
}
}Using services.yaml Registration
Alternatively, register listeners in config/services.yaml:
services:
App\EventListener\StoreSensorReading:
tags:
- { name: kernel.event_listener, event: PhpOpcua\Client\Event\DataChangeReceived }
App\EventListener\HandleAlarm:
tags:
- { name: kernel.event_listener, event: PhpOpcua\Client\Event\AlarmActivated }Available Events
| Event | Description |
|---|---|
DataChangeReceived |
A monitored item's value changed |
EventNotificationReceived |
An OPC UA event was received |
AlarmActivated |
An alarm condition became active |
AlarmDeactivated |
An alarm condition was deactivated |
AlarmAcknowledged |
An alarm was acknowledged |
SubscriptionKeepAlive |
Server confirms the subscription is alive (no data) |
Runtime Subscriptions
Auto-publish also works for subscriptions created at runtime via the OpcuaManager. Any session that gains a subscription is automatically published by the daemon:
use PhpOpcua\SymfonyOpcua\OpcuaManager;
class RuntimeSubscriptionService
{
public function __construct(private OpcuaManager $opcua) {}
public function startMonitoring(): void
{
$client = $this->opcua->connect('historian');
$sub = $client->createSubscription(publishingInterval: 2000.0);
$client->createMonitoredItems($sub->subscriptionId, [
['nodeId' => 'ns=4;s=HistorianStatus', 'clientHandle' => 200],
]);
// No publish() loop needed — daemon handles it automatically.
// DataChangeReceived events dispatched to your listeners.
}
}Blocking Behavior
Client::publish() is a synchronous call that blocks the daemon's ReactPHP event loop until the OPC UA server responds. The maximum blocking time depends on the subscription's maxKeepAliveCount x publishingInterval:
max_keep_alive_count |
publishing_interval |
Max block time |
|---|---|---|
| 10 (default) | 500ms | 5.0s |
| 5 | 500ms | 2.5s |
| 3 | 500ms | 1.5s |
| 5 | 250ms | 1.25s |
During the block, incoming IPC requests from your application queue up but are not lost (30s IPC timeout). In practice, when data is changing frequently, publish() returns almost immediately (~5ms).
Recommendation: use max_keep_alive_count: 5 or lower to reduce maximum blocking time.
Error Handling
The AutoPublisher handles errors automatically:
| Scenario | Behavior |
|---|---|
| Connection lost | Attempts reconnection + subscription transfer. Reschedules after 1s |
| Recovery failed | Auto-publish stops for that session. Logged as error |
| Transient error | Retries with 5s backoff |
| 5 consecutive errors | Auto-publish stops for that session. Logged as error |
| Handler exception | Logged, but auto-publish continues (handler bug should not kill monitoring) |
Connection recovery includes automatic transferSubscriptions() and republish() for missed notifications.
Real-World Use Case: Industrial Production Line Monitoring
This example demonstrates a complete industrial monitoring setup for a bottling plant with two PLC-controlled production lines and a historian server.
Scenario
- Line 1 (PLC at
192.168.1.10:4840): monitors temperature, pressure, fill level, and machine state. Needs alarms for over-temperature and over-pressure. - Line 2 (PLC at
192.168.1.11:4840): monitors motor speed and vibration. Needs alerts on excessive vibration. - Historian (at
192.168.1.20:4840): queried on-demand for historical reports — not auto-connected.
Step 1: Configuration
# .env
OPCUA_AUTO_PUBLISH=true
OPCUA_AUTH_TOKEN=your-secret-token-here
PLC1_ENDPOINT=opc.tcp://192.168.1.10:4840
PLC1_USER=operator
PLC1_PASS=secret123
PLC2_ENDPOINT=opc.tcp://192.168.1.11:4840
HISTORIAN_ENDPOINT=opc.tcp://192.168.1.20:4840
# config/packages/php_opcua_symfony_opcua.yaml
php_opcua_symfony_opcua:
default: line-1
session_manager:
enabled: true
socket_path: '%kernel.project_dir%/var/opcua-session-manager.sock'
timeout: 600
auth_token: '%env(OPCUA_AUTH_TOKEN)%'
auto_publish: '%env(bool:OPCUA_AUTO_PUBLISH)%'
connections:
line-1:
endpoint: '%env(PLC1_ENDPOINT)%'
username: '%env(PLC1_USER)%'
password: '%env(PLC1_PASS)%'
timeout: 3.0
auto_retry: 3
auto_connect: true
subscriptions:
- publishing_interval: 500.0
max_keep_alive_count: 5
monitored_items:
- { node_id: 'ns=2;s=Line1.Temperature', client_handle: 1 }
- { node_id: 'ns=2;s=Line1.Pressure', client_handle: 2 }
- { node_id: 'ns=2;s=Line1.FillLevel', client_handle: 3 }
- { node_id: 'ns=2;s=Line1.MachineState', client_handle: 4 }
event_monitored_items:
- node_id: 'i=2253'
client_handle: 100
select_fields:
- EventId
- EventType
- SourceName
- Time
- Message
- Severity
- ActiveState
- AckedState
- ConfirmedState
line-2:
endpoint: '%env(PLC2_ENDPOINT)%'
timeout: 3.0
auto_retry: 3
auto_connect: true
subscriptions:
- publishing_interval: 250.0
max_keep_alive_count: 3
monitored_items:
- { node_id: 'ns=3;s=Line2.MotorSpeed', client_handle: 10, sampling_interval: 100.0 }
- { node_id: 'ns=3;s=Line2.Vibration', client_handle: 11, sampling_interval: 100.0 }
historian:
endpoint: '%env(HISTORIAN_ENDPOINT)%'
timeout: 10.0Step 2: Doctrine Entity and Migration
// src/Entity/SensorReading.php
namespace App\Entity;
use Doctrine\ORM\Mapping as ORM;
#[ORM\Entity]
#[ORM\Table(name: 'sensor_readings')]
#[ORM\Index(columns: ['connection', 'client_handle', 'created_at'])]
class SensorReading
{
#[ORM\Id]
#[ORM\GeneratedValue]
#[ORM\Column]
private ?int $id = null;
#[ORM\Column]
private int $subscriptionId;
#[ORM\Column]
private int $clientHandle;
#[ORM\Column(length: 50)]
private string $connection;
#[ORM\Column(nullable: true)]
private ?float $value = null;
#[ORM\Column]
private int $statusCode;
#[ORM\Column(type: 'datetime_immutable', nullable: true)]
private ?\DateTimeImmutable $sourceTime = null;
#[ORM\Column(type: 'datetime_immutable', nullable: true)]
private ?\DateTimeImmutable $serverTime = null;
#[ORM\Column(type: 'datetime_immutable')]
private \DateTimeImmutable $createdAt;
public function __construct()
{
$this->createdAt = new \DateTimeImmutable();
}
// ... getters and setters
}// src/Entity/AlarmLog.php
namespace App\Entity;
use Doctrine\ORM\Mapping as ORM;
#[ORM\Entity]
#[ORM\Table(name: 'alarm_log')]
class AlarmLog
{
#[ORM\Id]
#[ORM\GeneratedValue]
#[ORM\Column]
private ?int $id = null;
#[ORM\Column(length: 255)]
private string $source;
#[ORM\Column(length: 255)]
private string $message;
#[ORM\Column]
private int $severity;
#[ORM\Column(length: 50)]
private string $state;
#[ORM\Column(type: 'datetime_immutable', nullable: true)]
private ?\DateTimeImmutable $eventTime = null;
#[ORM\Column(type: 'datetime_immutable')]
private \DateTimeImmutable $createdAt;
public function __construct()
{
$this->createdAt = new \DateTimeImmutable();
}
// ... getters and setters
}Generate and run the migration:
php bin/console doctrine:migrations:diff
php bin/console doctrine:migrations:migrateStep 3: Event Listeners
// src/EventListener/StoreSensorReading.php
namespace App\EventListener;
use App\Entity\SensorReading;
use Doctrine\ORM\EntityManagerInterface;
use PhpOpcua\Client\Event\DataChangeReceived;
use Symfony\Component\EventDispatcher\Attribute\AsEventListener;
#[AsEventListener]
class StoreSensorReading
{
private const HANDLE_TO_CONNECTION = [
1 => 'line-1', 2 => 'line-1', 3 => 'line-1', 4 => 'line-1',
10 => 'line-2', 11 => 'line-2',
];
public function __construct(private EntityManagerInterface $em) {}
public function __invoke(DataChangeReceived $event): void
{
$reading = new SensorReading();
$reading->setSubscriptionId($event->subscriptionId);
$reading->setClientHandle($event->clientHandle);
$reading->setConnection(self::HANDLE_TO_CONNECTION[$event->clientHandle] ?? 'unknown');
$reading->setValue($event->dataValue->getValue());
$reading->setStatusCode($event->dataValue->statusCode);
$reading->setSourceTime($event->dataValue->sourceTimestamp);
$reading->setServerTime($event->dataValue->serverTimestamp);
$this->em->persist($reading);
$this->em->flush();
}
}// src/EventListener/HandleAlarm.php
namespace App\EventListener;
use App\Entity\AlarmLog;
use Doctrine\ORM\EntityManagerInterface;
use PhpOpcua\Client\Event\AlarmActivated;
use Psr\Log\LoggerInterface;
use Symfony\Component\EventDispatcher\Attribute\AsEventListener;
#[AsEventListener]
class HandleAlarm
{
public function __construct(
private EntityManagerInterface $em,
private LoggerInterface $logger,
) {}
public function __invoke(AlarmActivated $event): void
{
$alarm = new AlarmLog();
$alarm->setSource($event->sourceName);
$alarm->setMessage($event->message);
$alarm->setSeverity($event->severity);
$alarm->setState('active');
$alarm->setEventTime($event->time);
$this->em->persist($alarm);
$this->em->flush();
$this->logger->warning('OPC UA Alarm: {source} — {message}', [
'source' => $event->sourceName,
'message' => $event->message,
'severity' => $event->severity,
]);
}
}// src/EventListener/DetectVibrationAnomaly.php
namespace App\EventListener;
use PhpOpcua\Client\Event\DataChangeReceived;
use Psr\Log\LoggerInterface;
use Symfony\Component\EventDispatcher\Attribute\AsEventListener;
use Symfony\Contracts\Cache\CacheInterface;
#[AsEventListener]
class DetectVibrationAnomaly
{
private const VIBRATION_HANDLE = 11;
private const THRESHOLD = 5.0;
public function __construct(
private LoggerInterface $logger,
private CacheInterface $cache,
) {}
public function __invoke(DataChangeReceived $event): void
{
if ($event->clientHandle !== self::VIBRATION_HANDLE) {
return;
}
$vibration = $event->dataValue->getValue();
if ($vibration > self::THRESHOLD) {
$key = "vibration_alert_{$event->clientHandle}";
$this->cache->get($key, function () use ($vibration) {
$this->logger->alert('High vibration detected on Line 2: {value} mm/s', [
'value' => $vibration,
]);
return true;
});
}
}
}Step 4: Register Listeners
When using #[AsEventListener] attributes (shown above), Symfony auto-registers the listeners if autoconfigure: true is set in config/services.yaml (the default). No additional registration is needed.
If you prefer explicit registration via services.yaml:
# config/services.yaml
services:
App\EventListener\StoreSensorReading:
tags:
- { name: kernel.event_listener, event: PhpOpcua\Client\Event\DataChangeReceived }
App\EventListener\HandleAlarm:
tags:
- { name: kernel.event_listener, event: PhpOpcua\Client\Event\AlarmActivated }
App\EventListener\DetectVibrationAnomaly:
tags:
- { name: kernel.event_listener, event: PhpOpcua\Client\Event\DataChangeReceived }Step 5: Start the Daemon
php bin/console opcua:sessionOutput:
Starting OPC UA Session Manager...
+---------------------+----------------------------------------------+
| Setting | Value |
+---------------------+----------------------------------------------+
| Socket | var/opcua-session-manager.sock |
| Timeout | 600s |
| Auth Token | configured |
| Auto-publish | enabled |
+---------------------+----------------------------------------------+
Auto-connecting 2 connection(s): line-1, line-2
Auto-connected "line-1" (session: a1b2c3d4...)
Auto-connected "line-2" (session: e5f6g7h8...)
Auto-publish enabled
OPC UA Session Manager started on var/opcua-session-manager.sockThe daemon:
- Connects to both PLCs with the configured credentials
- Creates subscriptions with the specified publishing intervals
- Registers all monitored items and event monitors
- Auto-publishes and dispatches events to your Symfony listeners
StoreSensorReadingstores every data change to the databaseHandleAlarmlogs alarms and persists them to the alarm log tableDetectVibrationAnomalywatches vibration levels and alerts on threshold breach
The historian connection is not auto-connected — it can be used from a controller or service whenever historical data is needed:
use PhpOpcua\SymfonyOpcua\OpcuaManager;
class HistoryReportService
{
public function __construct(private OpcuaManager $opcua) {}
public function getTemperatureHistory(): array
{
$client = $this->opcua->connect('historian');
$history = $client->historyReadRaw(
'ns=4;s=Line1.Temperature',
new \DateTimeImmutable('-1 hour'),
new \DateTimeImmutable('now'),
numValuesPerNode: 1000,
);
$client->disconnect();
return $history;
}
}Step 6: Production Deployment with systemd
# /etc/systemd/system/opcua-session.service
[Unit]
Description=OPC UA Session Manager Daemon
After=network.target
[Service]
Type=simple
User=www-data
Group=www-data
WorkingDirectory=/var/www/app
ExecStart=/usr/bin/php /var/www/app/bin/console opcua:session
Restart=always
RestartSec=5
Environment=APP_ENV=prod
EnvironmentFile=/var/www/app/.env.local
[Install]
WantedBy=multi-user.target
sudo systemctl daemon-reload
sudo systemctl enable opcua-session
sudo systemctl start opcua-session
sudo systemctl status opcua-session