opcua-client · v4.3.x
Docs · Operations

Client-side aggregates

Compute OPC UA aggregate functions (Average, Min, Max, Count, Interpolate) client-side from a raw DataValue buffer — useful when the server lacks HistoryRead Processed or you need bucket boundaries the server will not honour.

The AggregateModule (added in v4.4.0, registered by default) computes OPC UA Part 13 aggregate functions on the client, from a buffer of raw DataValues you supply — or from raw history it fetches for you in one call.

Use it when:

  • The server does not implement HistoryRead Processed (Bad_ServiceUnsupported / Bad_HistoryOperationUnsupported).
  • You already have raw samples in memory (from a subscription, a CSV import, a test fixture) and want bucketed aggregates without a server round-trip.
  • You need bucket boundaries the server would not honour exactly — e.g. precisely-aligned 5-minute windows independent of what the historian decides to deliver.

Five aggregate functions ship out of the box:

AggregateFunction case Part 13 NodeId Semantics
Interpolate i=2341 Interpolated value at the start of each bucket
Minimum i=2346 Minimum raw value within the bucket
Maximum i=2345 Maximum raw value within the bucket
Average i=2342 Arithmetic mean of raw values within the bucket
Count i=2352 Number of raw values within the bucket

The Part 13 NodeIds are only used for reference here — the calculators are pure PHP and never round-trip through the server.

Method surface

Both methods are exposed via Client::__call() (not on OpcUaClientInterface) — they are added by the module at boot time. Static analysers won't see them; the runtime dispatcher will.

aggregate() is the in-memory call: you supply the DataValue[] ascending by sourceTimestamp, the module slices the [startTime, endTime] range into windows of processingIntervalMs and emits one DataValue per window. A processingIntervalMs of 0 collapses the whole range into a single window.

historyAggregate() is a convenience wrapper that calls historyReadRaw($nodeId, $startTime, $endTime, 0, true) first and feeds the result through aggregate(). It requires the server to support HistoryRead Raw and the node to be historicising; the same caveats from History reads apply.

Quick example — in-memory buffer

php aggregate live samples
use PhpOpcua\Client\Module\Aggregate\AggregateFunction;
use PhpOpcua\Client\Module\Aggregate\AggregateOptions;

$samples = $buffer->drainLastFiveMinutes();   // DataValue[], sourceTimestamp asc

$averages = $client->aggregate(
    $samples,
    new DateTimeImmutable('-5 minutes'),
    new DateTimeImmutable(),
    processingIntervalMs: 30_000.0,           // 30 s windows
    function:             AggregateFunction::Average,
    options:              new AggregateOptions(treatUncertainAsBad: true),
);

foreach ($averages as $dv) {
    echo $dv->sourceTimestamp->format('H:i:s') . " avg=" . $dv->getValue() . "\n";
}

The output array is one DataValue per window, in order. Each entry carries the aggregated value, the window's start sourceTimestamp, and a statusCode that combines the per-window severity with Part 11 Historian InfoBits (Calculated, Interpolated, Partial, ExtraData, MultiValue) where applicable.

Quick example — fetch-and-aggregate

php historyAggregate
$daily = $client->historyAggregate(
    nodeId:               'ns=2;s=Tank42/Level',
    startTime:            new DateTimeImmutable('-7 days'),
    endTime:              new DateTimeImmutable(),
    processingIntervalMs: 24 * 3_600_000.0,   // 1-day buckets
    function:             AggregateFunction::Maximum,
);

For high-cardinality ranges this can be expensive — historyReadRaw follows continuation points until the server reports done. Bound the range or use aggregate() directly with a pre-paginated buffer.

AggregateOptions

php default options
new AggregateOptions(
    stepped:                 false,
    treatUncertainAsBad:     true,
    useSlopedExtrapolation:  false,
    percentDataBad:          100,
    percentDataGood:         100,
)
Field Default Effect
stepped false Interpolate uses stepped interpolation instead of linear
treatUncertainAsBad true Uncertain* raw values are excluded from selection and averaging
useSlopedExtrapolation false Interpolate extrapolates past the last raw sample by slope
percentDataBad 100 Bad-data threshold (0–100) above which the result statusCode is Bad
percentDataGood 100 Good-data threshold (0–100) below which the result statusCode is Bad

AggregateOptions::default() returns the constructor defaults. All fields are readonly; build a new instance to tweak a single value.

When to prefer historyReadProcessed

The server-side historyReadProcessed is still the right call when:

  • The server supports it (most commercial historians do).
  • You need an aggregate this module does not ship (TotalDuration, TimeAverage, StandardDeviation, …).
  • The buckets are wide and the network is the bottleneck — the server returns one value per bucket; this module fetches every raw sample.

Reach for AggregateModule when the server is the bottleneck (missing service set, slow aggregates) or when you already hold the raw values in memory.

Status codes the module emits

Each output DataValue carries a statusCode composed of:

  • A severity code (Good, Uncertain*, Bad*) reflecting whether the bucket had enough good samples per percentDataGood/Bad.
  • Historian InfoBits OR-ed in via StatusCode::withDataValueInfoBits():
    • HistorianCalculated — value was computed (not a stored sample)
    • HistorianInterpolated — value was interpolated between samples
    • HistorianPartial — bucket only partly covered by raw data
    • HistorianExtraData — extra raw data is available in the bucket
    • HistorianMultiValue — multiple raw samples contributed

Three Part 13 status codes also appear when input validation fails:

Status code Cause
BadAggregateInvalidInputs Window range is invalid, raw buffer is malformed
BadAggregateNotSupported The requested AggregateFunction is not registered
BadAggregateConfigurationRejected AggregateOptions combination is rejected

These travel inside the result's statusCode (the call itself returns successfully). Inspect each output DataValue::$statusCode before consuming the value.

Events emitted by the aggregator

Both aggregate() and historyAggregate() dispatch a single PSR-14 event after computing their result. Wire a listener to instrument bucket counts, measure raw-buffer sizes, or push metrics:

Event Fires when Key fields
AggregateComputed aggregate() or historyAggregate() returned function (AggregateFunction), rawInputCount, intervalCount, nodeId (?NodeId, null for in-memory)

$nodeId is set only when the event came from historyAggregate() (the wrapper that first calls historyReadRaw) — for direct in-memory aggregation via aggregate(), it stays null. See Observability · Event reference.

Extending the module — custom calculators

AggregateModule::setCalculator(AggregateFunction $fn, AggregateCalculatorInterface $impl) swaps the implementation behind a function case. Useful in tests (stub the calculator) or to ship a vendor-specific aggregate without forking the module.

php swap a calculator
use PhpOpcua\Client\Module\Aggregate\AggregateFunction;
use PhpOpcua\Client\Module\Aggregate\AggregateModule;

/** @var AggregateModule $module */
$module = $client->getModule(AggregateModule::class);
$module->setCalculator(AggregateFunction::Average, new MedianCalculator());

AggregateCalculatorInterface::compute(Interval $window, AggregateOptions $opts, DataValue[] $rawValues): DataValue is the contract. Implementations live under src/Module/Aggregate/Calculator/; AbstractAggregateCalculator gives you the bucket-status helpers for percentDataBad/Good.

  • History reads — fetch raw or server-side processed history.
  • Modules — how AggregateModule fits alongside the other built-in modules.