Skip to content

Commit 485aeb2

Browse files
committed
Add recorded_at, dispatched_at columns
1 parent 0e9c0ef commit 485aeb2

File tree

3 files changed

+17
-13
lines changed

3 files changed

+17
-13
lines changed

composer.lock

Lines changed: 7 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/PostgresStorage.php

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,10 @@ public function setup(): void
3333
(
3434
endpoint text not null,
3535
incoming_message_id text not null,
36+
recorded_at timestamptz not null default now(),
3637
commands bytea not null,
3738
events bytea not null,
38-
dispatched bool not null default false,
39+
dispatched_at timestamptz default null,
3940
primary key (incoming_message_id, endpoint)
4041
)
4142
SQL,
@@ -59,7 +60,7 @@ public function findOutbox(string $endpoint, string $incomingMessageId): ?Outbox
5960
->postgres
6061
->execute(
6162
<<<SQL
62-
select commands, events, dispatched
63+
select commands, events, dispatched_at is not null as dispatched
6364
from {$this->outboxTable}
6465
where endpoint = ? and incoming_message_id = ?
6566
SQL,
@@ -88,8 +89,8 @@ public function markOutboxDispatched(string $endpoint, string $incomingMessageId
8889
$this->postgres->execute(
8990
<<<SQL
9091
update {$this->outboxTable}
91-
set dispatched = true
92-
where endpoint = ? and incoming_message_id = ? and not dispatched
92+
set dispatched_at = now()
93+
where endpoint = ? and incoming_message_id = ? and dispatched_at is null
9394
SQL,
9495
[
9596
$endpoint,

src/PostgresTransaction.php

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,21 +30,22 @@ public function __construct(
3030
private string $incomingMessageId,
3131
) {}
3232

33-
public function insertOutbox(Outbox $outbox): void
33+
public function recordOutbox(Outbox $outbox): void
3434
{
35+
$dispatched = $outbox->dispatched ? 'now()' : 'null';
36+
3537
try {
3638
$result = $this->wrappedTransaction->execute(
3739
<<<SQL
38-
insert into {$this->outboxTable} (endpoint, incoming_message_id, commands, events, dispatched)
39-
values (?, ?, ?, ?, ?)
40+
insert into {$this->outboxTable} (endpoint, incoming_message_id, commands, events, dispatched_at)
41+
values (?, ?, ?, ?, {$dispatched})
4042
on conflict (endpoint, incoming_message_id) do nothing
4143
SQL,
4244
[
4345
$this->endpoint,
4446
$this->incomingMessageId,
4547
new PostgresByteA(serialize($outbox->commands)),
4648
new PostgresByteA(serialize($outbox->events)),
47-
$outbox->dispatched,
4849
],
4950
);
5051
} catch (SqlTransactionError $exception) {

0 commit comments

Comments
 (0)