Skip to content

Commit 87d2ce2

Browse files
committed
POC of prelock.
1 parent 1f751cf commit 87d2ce2

File tree

9 files changed

+126
-43
lines changed

9 files changed

+126
-43
lines changed

packages/driver/src/driver-api.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ export interface SqliteDriverConnection {
3535
getLastChanges(): Promise<SqliteChanges>;
3636

3737
close(): Promise<void>;
38+
39+
lock?(mode: 'exclusive' | 'shared' | 'deferred'): Promise<void>;
40+
release?(): void;
3841
}
3942

4043
export type SqliteParameterBinding =

packages/driver/src/util/SingleConnectionPool.ts

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,10 @@ export class SingleConnectionPool implements SqliteDriverConnectionPool {
2626
await this.connection.close();
2727
}
2828

29-
reserveConnection(
29+
async reserveConnection(
3030
options?: ReserveConnectionOptions
3131
): Promise<ReservedConnection> {
32+
console.log('single reserveConnection', this.connection.lock);
3233
if (options?.signal?.aborted) {
3334
throw new Error('Aborted');
3435
}
@@ -37,6 +38,7 @@ export class SingleConnectionPool implements SqliteDriverConnectionPool {
3738
async () => {
3839
// TODO: sync
3940
if (this.inUse === reserved) {
41+
reserved.connection.release?.();
4042
this.inUse = null;
4143
Promise.resolve().then(() => this.next());
4244
}
@@ -45,7 +47,10 @@ export class SingleConnectionPool implements SqliteDriverConnectionPool {
4547

4648
if (this.inUse == null) {
4749
this.inUse = reserved;
48-
return Promise.resolve(reserved);
50+
await reserved.connection.lock?.(
51+
options?.readonly ? 'shared' : 'exclusive'
52+
);
53+
return reserved;
4954
} else {
5055
const promise = new Promise<ReservedConnection>((resolve, reject) => {
5156
const item: QueuedItem = {
@@ -64,8 +69,11 @@ export class SingleConnectionPool implements SqliteDriverConnectionPool {
6469
);
6570
});
6671

67-
return promise.then((r) => {
72+
return promise.then(async (r) => {
6873
this.inUse = reserved;
74+
await reserved.connection.lock?.(
75+
options?.readonly ? 'shared' : 'exclusive'
76+
);
6977
return r;
7078
});
7179
}

packages/wa-sqlite-driver/src/OPFSCoopSyncVFS2.ts

Lines changed: 51 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ interface FileSystemSyncAccessHandle extends FileSystemHandle {
3434
}
3535

3636
class PersistentFile {
37+
path: string;
3738
fileHandle: FileSystemFileHandle;
3839
accessHandle: null | FileSystemSyncAccessHandle = null;
3940

@@ -47,7 +48,8 @@ class PersistentFile {
4748
handleRequestChannel: BroadcastChannel;
4849
isHandleRequested = false;
4950

50-
constructor(fileHandle) {
51+
constructor(path: string, fileHandle: FileSystemFileHandle) {
52+
this.path = path;
5153
this.fileHandle = fileHandle;
5254
}
5355
}
@@ -88,6 +90,20 @@ export class OPFSCoopSyncVFS2 extends FacadeVFS {
8890
return (this as unknown as WithModule)._module;
8991
}
9092

93+
async prelock(fileName: string): Promise<Disposable> {
94+
const file = this.persistentFiles.get('/' + fileName);
95+
this.log?.('prelock', fileName, file);
96+
await this.#requestAccessHandle(file);
97+
this.log?.('prelocked', fileName);
98+
const self = this;
99+
return {
100+
[Symbol.dispose]() {
101+
this.log?.('prelock release', fileName);
102+
self.#releaseAccessHandle(file);
103+
}
104+
};
105+
}
106+
91107
async #initialize(nTemporaryFiles) {
92108
// Delete temporary directories no longer in use.
93109
const root = await navigator.storage.getDirectory();
@@ -183,11 +199,11 @@ export class OPFSCoopSyncVFS2 extends FacadeVFS {
183199
// Get access handles for the files.
184200
const file = new File(path, flags);
185201
file.persistentFile = this.persistentFiles.get(path);
186-
await this.#requestAccessHandle(file);
202+
await this.#requestAccessHandle(file.persistentFile);
187203
} catch (e) {
188204
// Use an invalid persistent file to signal this error
189205
// for the retried open.
190-
const persistentFile = new PersistentFile(null);
206+
const persistentFile = new PersistentFile(path, null);
191207
this.persistentFiles.set(path, persistentFile);
192208
console.error(e);
193209
}
@@ -205,7 +221,7 @@ export class OPFSCoopSyncVFS2 extends FacadeVFS {
205221
(async () => {
206222
const file = new File(path, flags);
207223
file.persistentFile = this.persistentFiles.get(path);
208-
await this.#requestAccessHandle(file);
224+
await this.#requestAccessHandle(file.persistentFile);
209225
})()
210226
);
211227
return VFS.SQLITE_BUSY;
@@ -282,7 +298,7 @@ export class OPFSCoopSyncVFS2 extends FacadeVFS {
282298

283299
if (file?.flags & VFS.SQLITE_OPEN_MAIN_DB) {
284300
if (file.persistentFile?.handleLockReleaser) {
285-
this.#releaseAccessHandle(file);
301+
this.#releaseAccessHandle(file.persistentFile);
286302
}
287303
} else if (file?.flags & VFS.SQLITE_OPEN_DELETEONCLOSE) {
288304
file.accessHandle.truncate(0);
@@ -315,7 +331,7 @@ export class OPFSCoopSyncVFS2 extends FacadeVFS {
315331
file.flags & VFS.SQLITE_OPEN_MAIN_DB &&
316332
!file.persistentFile.isFileLocked
317333
) {
318-
this.#releaseAccessHandle(file);
334+
this.#releaseAccessHandle(file.persistentFile);
319335
}
320336

321337
if (bytesRead < pData.byteLength) {
@@ -408,12 +424,12 @@ export class OPFSCoopSyncVFS2 extends FacadeVFS {
408424
file.persistentFile.isHandleRequested = true;
409425
} else {
410426
// Release the access handles immediately.
411-
this.#releaseAccessHandle(file);
427+
this.#releaseAccessHandle(file.persistentFile);
412428
}
413429
file.persistentFile.handleRequestChannel.onmessage = null;
414430
};
415431

416-
this.#requestAccessHandle(file);
432+
this.#requestAccessHandle(file.persistentFile);
417433
this.log?.('returning SQLITE_BUSY');
418434
file.persistentFile.isLockBusy = true;
419435
return VFS.SQLITE_BUSY;
@@ -430,7 +446,7 @@ export class OPFSCoopSyncVFS2 extends FacadeVFS {
430446
if (!file.persistentFile.isLockBusy) {
431447
if (file.persistentFile.isHandleRequested) {
432448
// Another connection wants the access handle.
433-
this.#releaseAccessHandle(file);
449+
this.#releaseAccessHandle(file.persistentFile);
434450
file.persistentFile.isHandleRequested = false;
435451
}
436452
file.persistentFile.isFileLocked = false;
@@ -490,10 +506,10 @@ export class OPFSCoopSyncVFS2 extends FacadeVFS {
490506
async #createPersistentFile(
491507
fileHandle: FileSystemFileHandle
492508
): Promise<PersistentFile> {
493-
const persistentFile = new PersistentFile(fileHandle);
494509
const root = await navigator.storage.getDirectory();
495510
const relativePath = await root.resolve(fileHandle);
496511
const path = `/${relativePath.join('/')}`;
512+
const persistentFile = new PersistentFile(path, fileHandle);
497513
persistentFile.handleRequestChannel = new BroadcastChannel(`ahp:${path}`);
498514
this.persistentFiles.set(path, persistentFile);
499515

@@ -504,52 +520,54 @@ export class OPFSCoopSyncVFS2 extends FacadeVFS {
504520
return persistentFile;
505521
}
506522

507-
#requestAccessHandle(file: File): Promise<void> {
508-
console.assert(!file.persistentFile.handleLockReleaser);
509-
if (!file.persistentFile.isRequestInProgress) {
510-
file.persistentFile.isRequestInProgress = true;
523+
#requestAccessHandle(persistentFile: PersistentFile): Promise<void> {
524+
console.assert(!persistentFile.handleLockReleaser);
525+
if (!persistentFile.isRequestInProgress) {
526+
persistentFile.isRequestInProgress = true;
527+
this.log?.('Requesting lock for', persistentFile.path);
511528
this.#module.retryOps.push(
512529
(async () => {
513530
// Acquire the Web Lock.
514-
file.persistentFile.handleLockReleaser = await this.#acquireLock(
515-
file.persistentFile
516-
);
531+
persistentFile.handleLockReleaser =
532+
await this.#acquireLock(persistentFile);
517533

518534
// Get access handles for the database and releated files in parallel.
519-
this.log?.(`creating access handles for ${file.path}`);
535+
this.log?.(`creating access handles for ${persistentFile.path}`);
520536
await Promise.all(
521537
DB_RELATED_FILE_SUFFIXES.map(async (suffix) => {
522-
const persistentFile = this.persistentFiles.get(
523-
file.path + suffix
538+
const subPersistentFile = this.persistentFiles.get(
539+
persistentFile.path + suffix
524540
);
525-
if (persistentFile) {
526-
persistentFile.accessHandle = await (
527-
persistentFile.fileHandle as any
541+
if (subPersistentFile) {
542+
subPersistentFile.accessHandle = await (
543+
subPersistentFile.fileHandle as any
528544
).createSyncAccessHandle();
529545
}
530546
})
531547
);
532-
file.persistentFile.isRequestInProgress = false;
548+
persistentFile.isRequestInProgress = false;
533549
})()
534550
);
535551
return this.#module.retryOps.at(-1);
536552
}
537553
return Promise.resolve();
538554
}
539555

540-
async #releaseAccessHandle(file: File): Promise<void> {
556+
async #releaseAccessHandle(persistentFile: PersistentFile): Promise<void> {
541557
DB_RELATED_FILE_SUFFIXES.forEach(async (suffix) => {
542-
const persistentFile = this.persistentFiles.get(file.path + suffix);
543-
if (persistentFile) {
544-
persistentFile.accessHandle?.close();
545-
persistentFile.accessHandle = null;
558+
const subPersistentFile = this.persistentFiles.get(
559+
persistentFile.path + suffix
560+
);
561+
if (subPersistentFile) {
562+
subPersistentFile.accessHandle?.close();
563+
subPersistentFile.accessHandle = null;
546564
}
547565
});
548-
this.log?.(`access handles closed for ${file.path}`);
566+
this.log?.(`access handles closed for ${persistentFile.path}`);
549567

550-
file.persistentFile.handleLockReleaser?.();
551-
file.persistentFile.handleLockReleaser = null;
552-
this.log?.(`lock released for ${file.path}`);
568+
persistentFile.handleLockReleaser?.();
569+
persistentFile.handleLockReleaser = null;
570+
this.log?.(`lock released for ${persistentFile.path}`);
553571
}
554572

555573
#acquireLock(persistentFile: PersistentFile): Promise<() => void> {

packages/wa-sqlite-driver/src/wa-sqlite-driver.ts

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import {
1414
UpdateListener
1515
} from '@sqlite-js/driver';
1616
import * as mutex from 'async-mutex';
17+
import { OPFSCoopSyncVFS2 } from './OPFSCoopSyncVFS2';
1718

1819
// Initialize SQLite.
1920
export const module = await SQLiteESMFactory();
@@ -31,6 +32,7 @@ class StatementImpl implements SqliteDriverStatement {
3132

3233
constructor(
3334
private db: number,
35+
private con: WaSqliteConnection,
3436
public source: string,
3537
public options: PrepareOptions
3638
) {
@@ -251,17 +253,36 @@ class StatementImpl implements SqliteDriverStatement {
251253

252254
export class WaSqliteConnection implements SqliteDriverConnection {
253255
db: number;
256+
vfs: OPFSCoopSyncVFS2;
254257

255258
statements = new Set<StatementImpl>();
259+
lockDisposer: Disposable | null = null;
256260

257-
static async open(filename: string): Promise<WaSqliteConnection> {
261+
static async open(
262+
filename: string,
263+
vfs: OPFSCoopSyncVFS2
264+
): Promise<WaSqliteConnection> {
258265
// Open the database.
259266
const db = await sqlite3.open_v2(filename);
260-
return new WaSqliteConnection(db);
267+
return new WaSqliteConnection(db, vfs, filename);
261268
}
262269

263-
constructor(db: number) {
270+
constructor(
271+
db: number,
272+
vfs: OPFSCoopSyncVFS2,
273+
public path: string
274+
) {
264275
this.db = db;
276+
this.vfs = vfs;
277+
}
278+
279+
async lock() {
280+
this.lockDisposer = await this.vfs.prelock(this.path);
281+
}
282+
283+
release() {
284+
this.lockDisposer[Symbol.dispose]();
285+
this.lockDisposer = null;
265286
}
266287

267288
async close() {
@@ -285,7 +306,7 @@ export class WaSqliteConnection implements SqliteDriverConnection {
285306
}
286307

287308
prepare(sql: string, options?: PrepareOptions): StatementImpl {
288-
const st = new StatementImpl(this.db, sql, options ?? {});
309+
const st = new StatementImpl(this.db, this, sql, options ?? {});
289310
// TODO: cleanup on finalize
290311
this.statements.add(st);
291312
return st;

packages/wa-sqlite-driver/src/wa-sqlite-worker.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,6 @@ sqlite3.vfs_register(vfs as any, true);
99

1010
setupDriverWorker({
1111
async openConnection(options) {
12-
return await WaSqliteConnection.open(options.path);
12+
return await WaSqliteConnection.open(options.path, vfs);
1313
}
1414
});

packages/wa-sqlite-driver/src/worker_threads/WorkerDriverAdapter.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,10 @@ export class WorkerConnectionAdapter implements WorkerDriver {
112112
return this._parse(command);
113113
case SqliteCommandType.changes:
114114
return this.connnection.getLastChanges();
115+
case SqliteCommandType.lock:
116+
return this.connnection.lock?.(command.mode);
117+
case SqliteCommandType.release:
118+
return this.connnection.release?.();
115119
default:
116120
throw new Error(`Unknown command: ${command.type}`);
117121
}

packages/wa-sqlite-driver/src/worker_threads/async-commands.ts

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ export enum SqliteCommandType {
1414
sync = 6,
1515
parse = 7,
1616
run = 8,
17-
changes = 9
17+
changes = 9,
18+
lock = 10,
19+
release = 11
1820
}
1921

2022
export type SqliteDriverError = SerializedDriverError;
@@ -89,6 +91,15 @@ export interface SqliteGetChanges {
8991
type: SqliteCommandType.changes;
9092
}
9193

94+
export interface SqliteLock {
95+
type: SqliteCommandType.lock;
96+
mode: 'exclusive' | 'shared' | 'deferred';
97+
}
98+
99+
export interface SqliteRelease {
100+
type: SqliteCommandType.release;
101+
}
102+
92103
export type SqliteCommand =
93104
| SqlitePrepare
94105
| SqliteBind
@@ -98,7 +109,9 @@ export type SqliteCommand =
98109
| SqliteFinalize
99110
| SqliteSync
100111
| SqliteParse
101-
| SqliteGetChanges;
112+
| SqliteGetChanges
113+
| SqliteLock
114+
| SqliteRelease;
102115

103116
export type InferCommandResult<T extends SqliteCommand> = T extends SqliteRun
104117
? SqliteChanges

packages/wa-sqlite-driver/src/worker_threads/setup.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,13 @@ export function setupDriverPort(config: WorkerDriverConfig) {
5454
} catch (e: any) {
5555
port.postMessage({ id, value: { error: { message: e.message } } });
5656
}
57+
} else if (message == 'lock') {
58+
try {
59+
await opened;
60+
port.postMessage({ id });
61+
} catch (e: any) {
62+
port.postMessage({ id, value: { error: { message: e.message } } });
63+
}
5764
} else if (message == 'execute') {
5865
try {
5966
await opened;

0 commit comments

Comments
 (0)