Doctrine
Persisting OPC UA data with Doctrine ORM. Entity shapes, batched inserts, partitioning, and repository patterns for tag history and alarms.
Most OPC UA-driven apps persist some subset of the data — readings, alarms, audit logs, fleet metadata. Doctrine ORM is the standard Symfony persistence layer; this page covers practical patterns.
A tag-history entity
The canonical shape:
namespace App\Entity;
use Doctrine\ORM\Mapping as ORM;
#[ORM\Entity]
#[ORM\Table(name: 'plc_readings', indexes: [
new ORM\Index(name: 'idx_node_sourceAt', columns: ['node_id', 'source_at']),
new ORM\Index(name: 'idx_connection_at', columns: ['connection', 'source_at']),
])]
class PlcReading
{
#[ORM\Id, ORM\GeneratedValue, ORM\Column(type: 'bigint')]
public ?int $id = null;
#[ORM\Column(type: 'string', length: 64)]
public string $connection = 'default';
#[ORM\Column(type: 'string', length: 255)]
public string $nodeId;
#[ORM\Column(type: 'decimal', precision: 20, scale: 6, nullable: true)]
public ?string $valueNumeric = null;
#[ORM\Column(type: 'string', nullable: true)]
public ?string $valueText = null;
#[ORM\Column(type: 'integer')]
public int $statusCode;
#[ORM\Column(type: 'datetime_immutable')]
public \DateTimeImmutable $sourceAt;
#[ORM\Column(type: 'datetime_immutable', options: ['default' => 'CURRENT_TIMESTAMP'])]
public \DateTimeImmutable $createdAt;
}
Two value columns (numeric + text) capture any BuiltinType
without loss. The (node_id, source_at) composite index is the
dominant query.
Migration
php bin/console make:migration
php bin/console doctrine:migrations:migrate
For PostgreSQL, consider partitioning large tag tables by
source_at — see Partitioning below.
Repository
namespace App\Repository;
use App\Entity\PlcReading;
use Doctrine\Bundle\DoctrineBundle\Repository\ServiceEntityRepository;
use Doctrine\Persistence\ManagerRegistry;
/**
* @extends ServiceEntityRepository<PlcReading>
*/
final class PlcReadingRepository extends ServiceEntityRepository
{
public function __construct(ManagerRegistry $registry)
{
parent::__construct($registry, PlcReading::class);
}
/** @return PlcReading[] */
public function findForNode(string $nodeId, \DateTimeInterface $from, \DateTimeInterface $to): array
{
return $this->createQueryBuilder('r')
->andWhere('r.nodeId = :node')
->andWhere('r.sourceAt BETWEEN :from AND :to')
->setParameter('node', $nodeId)
->setParameter('from', $from)
->setParameter('to', $to)
->orderBy('r.sourceAt', 'ASC')
->getQuery()
->getResult();
}
public function latestFor(string $nodeId): ?PlcReading
{
return $this->createQueryBuilder('r')
->andWhere('r.nodeId = :node')
->setParameter('node', $nodeId)
->orderBy('r.sourceAt', 'DESC')
->setMaxResults(1)
->getQuery()
->getOneOrNullResult();
}
}
Batched insert via Messenger
Per-event insert is fine for low volume. For high-frequency subscriptions, batch:
namespace App\MessageHandler;
use App\Entity\PlcReading;
use App\Message\StoreReadingBatch;
use Doctrine\DBAL\Connection;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
#[AsMessageHandler]
final class StoreReadingBatchHandler
{
public function __construct(private Connection $db) {}
public function __invoke(StoreReadingBatch $message): void
{
$sql = "INSERT INTO plc_readings
(connection, node_id, value_numeric, value_text, status_code, source_at, created_at)
VALUES " . implode(',', array_fill(0, count($message->readings), '(?,?,?,?,?,?,NOW())'));
$params = [];
foreach ($message->readings as $r) {
$isNumeric = is_numeric($r['value']);
$params[] = $r['connection'];
$params[] = $r['nodeId'];
$params[] = $isNumeric ? $r['value'] : null;
$params[] = $isNumeric ? null : (string) $r['value'];
$params[] = $r['statusCode'];
$params[] = $r['sourceAt']->format('Y-m-d H:i:s.u');
}
$this->db->executeStatement($sql, $params);
}
}
Bulk-INSERT via DBAL is dramatically faster than persist + flush
for 100+ rows. See Recipes · Persistent tag history.
Aggregation
For long retention without exploding storage, aggregate to 1-minute (or 1-hour) buckets:
namespace App\Entity;
use Doctrine\ORM\Mapping as ORM;
#[ORM\Entity]
#[ORM\Table(name: 'plc_reading_aggregates_1m')]
#[ORM\UniqueConstraint(name: 'uniq_node_bucket', columns: ['node_id', 'bucket_at'])]
class PlcReadingAggregate1m
{
#[ORM\Id, ORM\GeneratedValue, ORM\Column(type: 'bigint')]
public ?int $id = null;
#[ORM\Column(type: 'string', length: 255)]
public string $nodeId;
#[ORM\Column(type: 'datetime_immutable')]
public \DateTimeImmutable $bucketAt;
#[ORM\Column(type: 'decimal', precision: 20, scale: 6)]
public string $avgValue;
#[ORM\Column(type: 'decimal', precision: 20, scale: 6)]
public string $minValue;
#[ORM\Column(type: 'decimal', precision: 20, scale: 6)]
public string $maxValue;
#[ORM\Column(type: 'integer')]
public int $sampleCount;
}
Aggregation job (runs every minute):
#[AsCommand(name: 'app:plc:aggregate-1m')]
final class Aggregate1mCommand extends Command
{
public function __construct(private Connection $db) { parent::__construct(); }
protected function execute(InputInterface $i, OutputInterface $o): int
{
$this->db->executeStatement(<<<SQL
INSERT INTO plc_reading_aggregates_1m
(node_id, bucket_at, avg_value, min_value, max_value, sample_count)
SELECT
node_id,
DATE_TRUNC('minute', source_at) AS bucket_at,
AVG(value_numeric),
MIN(value_numeric),
MAX(value_numeric),
COUNT(*)
FROM plc_readings
WHERE source_at >= NOW() - INTERVAL '2 minutes'
AND source_at < NOW() - INTERVAL '1 minute'
AND value_numeric IS NOT NULL
GROUP BY node_id, bucket_at
ON CONFLICT (node_id, bucket_at) DO UPDATE
SET avg_value = EXCLUDED.avg_value,
min_value = EXCLUDED.min_value,
max_value = EXCLUDED.max_value,
sample_count = EXCLUDED.sample_count
SQL);
return Command::SUCCESS;
}
}
PostgreSQL-flavoured SQL — adapt for MySQL/MariaDB.
Retention — pruning
#[AsCommand(name: 'app:plc:prune-readings')]
final class PruneReadingsCommand extends Command
{
public function __construct(private Connection $db) { parent::__construct(); }
protected function execute(InputInterface $i, OutputInterface $o): int
{
$cutoff = (new \DateTimeImmutable('-7 days'))->format('Y-m-d H:i:s');
do {
$deleted = $this->db->executeStatement(
'DELETE FROM plc_readings WHERE source_at < ? LIMIT 10000',
[$cutoff],
);
sleep(1);
} while ($deleted > 0);
return Command::SUCCESS;
}
}
Chunked deletes avoid long table locks.
Schedule daily:
RecurringMessage::cron('0 2 * * *', new RunCommandMessage('app:plc:prune-readings'))
Partitioning
For very high-volume tag tables (>10 M rows), partition by month:
CREATE TABLE plc_readings (
id BIGSERIAL,
...
source_at TIMESTAMP NOT NULL,
PRIMARY KEY (id, source_at)
) PARTITION BY RANGE (source_at);
CREATE TABLE plc_readings_2026_05 PARTITION OF plc_readings
FOR VALUES FROM ('2026-05-01') TO ('2026-06-01');
Pre-create partitions for the next 12 months. Old partitions
can be dropped with DROP TABLE (fast) vs DELETE FROM (slow).
Multi-tenant scope
For per-tenant data, add a tenant_id column and use a global
Doctrine filter:
namespace App\Doctrine\Filter;
use Doctrine\ORM\Mapping\ClassMetadata;
use Doctrine\ORM\Query\Filter\SQLFilter;
final class TenantFilter extends SQLFilter
{
public function addFilterConstraint(ClassMetadata $target, string $alias): string
{
if (!$target->hasField('tenantId')) {
return '';
}
$tenantId = (int) $this->getParameter('tenantId');
return "$alias.tenant_id = $tenantId";
}
}
doctrine:
orm:
filters:
tenant:
class: App\Doctrine\Filter\TenantFilter
enabled: false # enabled per-request
In a request listener:
$filter = $em->getFilters()->enable('tenant');
$filter->setParameter('tenantId', $user->getTenantId());
See Recipes · Multi-plant tenant.
Where to read next
- Twig — rendering OPC UA data.
- Recipes · Persistent tag history — full pipeline.
- Recipes · Alarm routing — the alarm-table sibling.