opcua-session-manager · master
Docs · Recipes

Auto-publish pattern

Run the OPC UA publish loop in the daemon, dispatch PSR-14 events, push notifications onto a queue. Application workers consume the queue without ever talking to OPC UA directly.

The session manager's AutoPublisher drives the OPC UA publish loop on the daemon side. Application workers listen to a queue the daemon feeds — no application code touches the OPC UA protocol; no publish loop hand-coded.

This recipe builds the full pipeline: custom daemon launcher, PSR-14 listener, queue producer, worker consumer.

When this earns its keep

  • Many subscriptions, one daemon, many consumers. The daemon is the single OPC UA fan-in; the queue is the single fan-out.
  • Reactive architectures. You want notifications to land in a Symfony Messenger / Laravel Horizon worker, not in a while (true) { publish(); } script.
  • Cross-language consumers. The queue is the integration boundary — a Python worker can consume the same data the daemon pushed.

1 — Custom daemon launcher

The bin script does not expose auto-publish. Write a small launcher that constructs the daemon with both clientEventDispatcher and autoPublish: true:

php bin/opcua-daemon-with-pub
#!/usr/bin/env php
<?php
require __DIR__ . '/../vendor/autoload.php';

use PhpOpcua\Client\Cache\InMemoryCache;
use PhpOpcua\Client\Event\AlarmActivated;
use PhpOpcua\Client\Event\DataChangeReceived;
use PhpOpcua\Client\Event\EventNotificationReceived;
use PhpOpcua\SessionManager\Daemon\SessionManagerDaemon;
use PhpOpcua\SessionManager\Logging\StreamLogger;
use Predis\Client as Redis;
use Symfony\Component\EventDispatcher\EventDispatcher;

// 1. PSR-14 dispatcher
$dispatcher = new EventDispatcher();

// 2. Redis as the queue boundary
$redis = new Redis(['scheme' => 'tcp', 'host' => '127.0.0.1', 'port' => 6379]);

// 3. Listeners — turn OPC UA events into queue messages
$dispatcher->addListener(DataChangeReceived::class, function (DataChangeReceived $e) use ($redis) {
    $redis->rpush('opcua:data', json_encode([
        'subscriptionId' => $e->subscriptionId,
        'clientHandle'   => $e->clientHandle,
        'value'          => $e->dataValue->getValue(),
        'statusCode'     => $e->dataValue->statusCode,
        'at'             => $e->dataValue->sourceTimestamp?->format('c'),
    ]));
});

$dispatcher->addListener(AlarmActivated::class, function (AlarmActivated $e) use ($redis) {
    $redis->rpush('opcua:alarms', json_encode([
        'sourceName' => $e->sourceName,
        'severity'   => $e->severity,
        'at'         => date('c'),
    ]));
});

$dispatcher->addListener(EventNotificationReceived::class, function ($e) use ($redis) {
    $redis->rpush('opcua:events', json_encode([
        'clientHandle' => $e->clientHandle,
        'eventFields'  => $e->eventFields,
    ]));
});

// 4. Daemon — wire dispatcher + autoPublish
$daemon = new SessionManagerDaemon(
    socketPath:            getenv('OPCUA_SOCKET_PATH') ?: '/var/run/opcua/sessions.sock',
    timeout:               1800,
    cleanupInterval:       60,
    authToken:             getenv('OPCUA_AUTH_TOKEN') ?: null,
    maxSessions:           200,
    socketMode:            0660,
    logger:                new StreamLogger(getenv('OPCUA_LOG_FILE') ?: 'php://stderr', 'info'),
    clientCache:           new InMemoryCache(600),
    clientEventDispatcher: $dispatcher,
    autoPublish:           true,
);

// 5. Pre-register the subscriptions
$daemon->autoConnect(require __DIR__ . '/../config/opcua-endpoints.php');

// 6. Run
$daemon->run();

2 — Configuration file with subscriptions

php config/opcua-endpoints.php
return [
    'plc1' => [
        'endpoint' => 'opc.tcp://plc-1.plant.local:4840',
        'config'   => [
            'securityPolicy' => 'http://opcfoundation.org/UA/SecurityPolicy#Basic256Sha256',
            'securityMode'   => 3,
            'username'       => 'integrations',
            'password'       => getenv('PLC1_PASSWORD'),
        ],
        'subscriptions' => [
            [
                'publishing_interval' => 500.0,
                'monitored_items' => [
                    ['node_id' => 'ns=2;s=Devices/PLC1/Speed',  'sampling_interval' => 500.0],
                    ['node_id' => 'ns=2;s=Devices/PLC1/Mode',   'sampling_interval' => 1000.0],
                    ['node_id' => 'ns=2;s=Devices/PLC1/Health', 'sampling_interval' => 1000.0],
                ],
            ],
        ],
    ],
];

3 — Application-side worker

The application consumes from Redis — no OPC UA dependency at all in the worker:

php examples/redis-worker.php
<?php
require __DIR__ . '/vendor/autoload.php';

use Predis\Client as Redis;

$redis = new Redis(['scheme' => 'tcp', 'host' => '127.0.0.1', 'port' => 6379]);

while (true) {
    [$queue, $payload] = $redis->blpop(['opcua:data', 'opcua:alarms', 'opcua:events'], 0);
    $data = json_decode($payload, true);

    match ($queue) {
        'opcua:data'   => persistTagValue($data),
        'opcua:alarms' => triggerPagerduty($data),
        'opcua:events' => recordAuditLog($data),
    };
}

The worker is small, stateless, restartable, and has no idea OPC UA exists. The daemon is the abstraction boundary.

4 — Service unit for the custom launcher

text /etc/systemd/system/opcua-daemon-with-pub.service
[Unit]
Description=OPC UA Session Manager (with auto-publish)
After=network.target redis-server.service
Requires=redis-server.service

[Service]
Type=simple
User=opcua
EnvironmentFile=/etc/opcua/daemon.env
ExecStart=/opt/myapp/bin/opcua-daemon-with-pub
Restart=on-failure
RestartSec=5
TimeoutStopSec=30

[Install]
WantedBy=multi-user.target

/etc/opcua/daemon.env carries the env vars the launcher reads — OPCUA_AUTH_TOKEN, PLC1_PASSWORD, OPCUA_SOCKET_PATH, OPCUA_LOG_FILE.

Listener best practices

The listener runs inside the daemon process. Three rules:

Do

Push to a queue, return immediately. The queue is the boundary between the daemon's event loop and your application's processing. A $redis->rpush() is ~0.1 ms; everything slower than that should not happen in the listener.

Don't

Don't block the listener on a synchronous HTTP call, a slow database write, or an expensive computation. Every millisecond the listener spends blocks the publish loop for every session on the daemon — back-pressure that surfaces as missed notifications.

Also:

  • Wrap listener bodies in try/catch. Unhandled exceptions in a PSR-14 listener can cascade into the daemon's loop. Log and swallow.
  • Idempotent payloads. The queue may deliver duplicates if the consumer crashes mid-processing — design your worker to tolerate it.

Capacity planning

Each active subscription dispatches one publish() per publishing-interval per session. Worked example:

  • 50 sessions, each with 5 subscriptions, each at 500 ms publishing interval
  • 50 × 5 × 2 = 500 publish round-trips per second (worst case, no notification batching)
  • Each round-trip ~2-5 ms OPC UA + ~0.1 ms Redis enqueue
  • Listener CPU: ~50-200 ms/s total — comfortable on a single core

The daemon's event loop is single-threaded; if your listener load exceeds what a single core can handle, the architecture needs to change (shard subscriptions across multiple daemons, push to a queue earlier with no per-event work).

Comparison: drive publish() yourself

The alternative — every consumer running its own publish loop — is documented in opcua-client — subscriptions. Use it when:

  • The consumer is the only one needing the data (no fan-out)
  • The publish cadence is low (≤ 1 Hz) and the worker is already long-lived
  • You need per-call back-pressure that auto-publish cannot provide

For everything fan-out shaped, auto-publish + queue is the right tool.