From 2d2588c284ca394bea26fdec736dbe7bae71f755 Mon Sep 17 00:00:00 2001 From: Nick Randall Date: Thu, 3 Oct 2024 11:58:47 -0400 Subject: [PATCH 1/4] feat(neon): support neon's sql-over-websocket protocol --- neon/polyfills.js | 250 ++++++++++ neon/src/bytes.js | 79 +++ neon/src/connection.js | 1038 ++++++++++++++++++++++++++++++++++++++++ neon/src/errors.js | 53 ++ neon/src/index.js | 566 ++++++++++++++++++++++ neon/src/large.js | 70 +++ neon/src/query.js | 173 +++++++ neon/src/queue.js | 31 ++ neon/src/result.js | 16 + neon/src/subscribe.js | 278 +++++++++++ neon/src/types.js | 368 ++++++++++++++ neon/test.js | 14 + package.json | 6 +- transpile.neon.js | 51 ++ 14 files changed, 2992 insertions(+), 1 deletion(-) create mode 100644 neon/polyfills.js create mode 100644 neon/src/bytes.js create mode 100644 neon/src/connection.js create mode 100644 neon/src/errors.js create mode 100644 neon/src/index.js create mode 100644 neon/src/large.js create mode 100644 neon/src/query.js create mode 100644 neon/src/queue.js create mode 100644 neon/src/result.js create mode 100644 neon/src/subscribe.js create mode 100644 neon/src/types.js create mode 100644 neon/test.js create mode 100644 transpile.neon.js diff --git a/neon/polyfills.js b/neon/polyfills.js new file mode 100644 index 00000000..24cd3e5c --- /dev/null +++ b/neon/polyfills.js @@ -0,0 +1,250 @@ +import { EventEmitter } from "node:events"; +import { Buffer } from "node:buffer"; + +const Crypto = globalThis.crypto; + +let ids = 1; +const tasks = new Set(); + +const v4Seg = "(?:[0-9]|[1-9][0-9]|1[0-9][0-9]|2[0-4][0-9]|25[0-5])"; +const v4Str = `(${v4Seg}[.]){3}${v4Seg}`; +const IPv4Reg = new RegExp(`^${v4Str}$`); + +const v6Seg = "(?:[0-9a-fA-F]{1,4})"; +const IPv6Reg = new RegExp( + "^(" + + `(?:${v6Seg}:){7}(?:${v6Seg}|:)|` + + `(?:${v6Seg}:){6}(?:${v4Str}|:${v6Seg}|:)|` + + `(?:${v6Seg}:){5}(?::${v4Str}|(:${v6Seg}){1,2}|:)|` + + `(?:${v6Seg}:){4}(?:(:${v6Seg}){0,1}:${v4Str}|(:${v6Seg}){1,3}|:)|` + + `(?:${v6Seg}:){3}(?:(:${v6Seg}){0,2}:${v4Str}|(:${v6Seg}){1,4}|:)|` + + `(?:${v6Seg}:){2}(?:(:${v6Seg}){0,3}:${v4Str}|(:${v6Seg}){1,5}|:)|` + + `(?:${v6Seg}:){1}(?:(:${v6Seg}){0,4}:${v4Str}|(:${v6Seg}){1,6}|:)|` + + `(?::((?::${v6Seg}){0,5}:${v4Str}|(?::${v6Seg}){1,7}|:))` + + ")(%[0-9a-zA-Z-.:]{1,})?$", +); + +const textEncoder = new TextEncoder(); +export const crypto = { + randomBytes: (l) => Crypto.getRandomValues(Buffer.alloc(l)), + pbkdf2Sync: async (password, salt, iterations, keylen) => + Crypto.subtle.deriveBits( + { + name: "PBKDF2", + hash: "SHA-256", + salt, + iterations, + }, + await Crypto.subtle.importKey( + "raw", + textEncoder.encode(password), + "PBKDF2", + false, + ["deriveBits"], + ), + keylen * 8, + ["deriveBits"], + ), + createHash: (type) => ({ + update: (x) => ({ + digest: (encoding) => { + if (!(x instanceof Uint8Array)) { + x = textEncoder.encode(x); + } + let prom; + if (type === "sha256") { + prom = Crypto.subtle.digest("SHA-256", x); + } else if (type === "md5") { + prom = Crypto.subtle.digest("md5", x); + } else { + throw Error( + "createHash only supports sha256 or md5 in this environment, not ${type}.", + ); + } + if (encoding === "hex") { + return prom.then((arrayBuf) => Buffer.from(arrayBuf).toString("hex")); + } else if (encoding) { + throw Error( + `createHash only supports hex encoding or unencoded in this environment, not ${encoding}`, + ); + } else { + return prom; + } + }, + }), + }), + createHmac: (type, key) => ({ + update: (x) => ({ + digest: async () => + Buffer.from( + await Crypto.subtle.sign( + "HMAC", + await Crypto.subtle.importKey( + "raw", + key, + { name: "HMAC", hash: "SHA-256" }, + false, + ["sign"], + ), + textEncoder.encode(x), + ), + ), + }), + }), +}; + +export const performance = globalThis.performance; + +export const process = { + env: {}, +}; + +export const os = { + userInfo() { + return { username: "postgres" }; + }, +}; + +export const fs = { + readFile() { + throw new Error("Reading files not supported on CloudFlare"); + }, +}; + +export const net = { + isIP: (x) => (IPv4Reg.test(x) ? 4 : IPv6Reg.test(x) ? 6 : 0), + Socket, +}; + +export { setImmediate, clearImmediate }; + +export const tls = { + connect({ socket, servername }) { + socket.startTls(servername); + return socket; + }, +}; + +const wsCache = new Map(); + +function Socket() { + const tcp = Object.assign(new EventEmitter(), { + readyState: "open", + ws: null, + writeBuffer: undefined, + connect, + startTls, + write, + end, + destroy, + read, + }); + + return tcp; + + async function connect(port, host) { + const handleWebSocketOpen = () => { + tcp.pending = false; + tcp.emit("connect"); + tcp.emit("ready"); + }; + const configureWebSocket = (ws, immediateOpen = false) => { + ws.binaryType = "arraybuffer"; + + ws.addEventListener("error", (err) => { + tcp.emit("error", err); + tcp.emit("close"); + }); + + ws.addEventListener("message", (msg) => { + const buffer = Buffer.from(msg.data); + tcp.emit("data", buffer); + }); + + ws.addEventListener("close", () => { + tcp.emit("close"); + }); + + if (immediateOpen) handleWebSocketOpen(); + else ws.addEventListener("open", handleWebSocketOpen); + }; + try { + tcp.readyState = "opening"; + const rootURL = host + "/v2" + "?address=" + host + ":" + port; + const socketURL = "wss://" + rootURL; + console.log("socketURL", socketURL); + tcp.ws = new WebSocket(socketURL); + // await tcp.ws.accept(); + configureWebSocket(tcp.ws); + } catch (err) { + console.log("err", err); + error(err); + } + } + + async function startTls(host) { + throw new Error("Postgres SSL connections are not supported yet"); + } + + function write(data, cb) { + if (data.length === 0) { + cb?.(); + return true; + } + + if (typeof data === "string") { + data = Buffer.from(data, "utf8"); + } + + if (tcp.writeBuffer === undefined) { + tcp.writeBuffer = data; + setImmediate(() => { + tcp.ws.send(tcp.writeBuffer); + tcp.writeBuffer = undefined; + }); + } else { + const newBuffer = new Uint8Array(tcp.writeBuffer.length + data.length); + newBuffer.set(tcp.writeBuffer); + newBuffer.set(data, tcp.writeBuffer.length); + tcp.writeBuffer = newBuffer; + } + cb?.(); + return true; + } + + async function read() { + // Nothing to do? + console.log("read"); + } + + function end(data) { + return data ? tcp.write(data, () => tcp.ws.close()) : tcp.ws.close(); + } + + function destroy() { + tcp.destroyed = true; + tcp.end(); + } + + function error(err) { + console.log("errfn", err); + tcp.emit("error", err); + tcp.emit("close"); + } +} + +function setImmediate(fn) { + const id = ids++; + tasks.add(id); + queueMicrotask(() => { + if (tasks.has(id)) { + fn(); + tasks.delete(id); + } + }); + return id; +} + +function clearImmediate(id) { + tasks.delete(id); +} diff --git a/neon/src/bytes.js b/neon/src/bytes.js new file mode 100644 index 00000000..48b6f983 --- /dev/null +++ b/neon/src/bytes.js @@ -0,0 +1,79 @@ +import { Buffer } from 'node:buffer' +const size = 256 +let buffer = Buffer.allocUnsafe(size) + +const messages = 'BCcDdEFfHPpQSX'.split('').reduce((acc, x) => { + const v = x.charCodeAt(0) + acc[x] = () => { + buffer[0] = v + b.i = 5 + return b + } + return acc +}, {}) + +const b = Object.assign(reset, messages, { + N: String.fromCharCode(0), + i: 0, + inc(x) { + b.i += x + return b + }, + str(x) { + const length = Buffer.byteLength(x) + fit(length) + b.i += buffer.write(x, b.i, length, 'utf8') + return b + }, + i16(x) { + fit(2) + buffer.writeUInt16BE(x, b.i) + b.i += 2 + return b + }, + i32(x, i) { + if (i || i === 0) { + buffer.writeUInt32BE(x, i) + return b + } + fit(4) + buffer.writeUInt32BE(x, b.i) + b.i += 4 + return b + }, + z(x) { + fit(x) + buffer.fill(0, b.i, b.i + x) + b.i += x + return b + }, + raw(x) { + buffer = Buffer.concat([buffer.subarray(0, b.i), x]) + b.i = buffer.length + return b + }, + end(at = 1) { + buffer.writeUInt32BE(b.i - at, at) + const out = buffer.subarray(0, b.i) + b.i = 0 + buffer = Buffer.allocUnsafe(size) + return out + } +}) + +export default b + +function fit(x) { + if (buffer.length - b.i < x) { + const prev = buffer + , length = prev.length + + buffer = Buffer.allocUnsafe(length + (length >> 1) + x) + prev.copy(buffer) + } +} + +function reset() { + b.i = 0 + return b +} diff --git a/neon/src/connection.js b/neon/src/connection.js new file mode 100644 index 00000000..ee8b1e69 --- /dev/null +++ b/neon/src/connection.js @@ -0,0 +1,1038 @@ +import { Buffer } from 'node:buffer' +import { setImmediate, clearImmediate } from '../polyfills.js' +import { net } from '../polyfills.js' +import { tls } from '../polyfills.js' +import { crypto } from '../polyfills.js' +import Stream from 'node:stream' +import { performance } from '../polyfills.js' + +import { stringify, handleValue, arrayParser, arraySerializer } from './types.js' +import { Errors } from './errors.js' +import Result from './result.js' +import Queue from './queue.js' +import { Query, CLOSE } from './query.js' +import b from './bytes.js' + +export default Connection + +let uid = 1 + +const Sync = b().S().end() + , Flush = b().H().end() + , SSLRequest = b().i32(8).i32(80877103).end(8) + , ExecuteUnnamed = Buffer.concat([b().E().str(b.N).i32(0).end(), Sync]) + , DescribeUnnamed = b().D().str('S').str(b.N).end() + , noop = () => { /* noop */ } + +const retryRoutines = new Set([ + 'FetchPreparedStatement', + 'RevalidateCachedQuery', + 'transformAssignedExpr' +]) + +const errorFields = { + 83 : 'severity_local', // S + 86 : 'severity', // V + 67 : 'code', // C + 77 : 'message', // M + 68 : 'detail', // D + 72 : 'hint', // H + 80 : 'position', // P + 112 : 'internal_position', // p + 113 : 'internal_query', // q + 87 : 'where', // W + 115 : 'schema_name', // s + 116 : 'table_name', // t + 99 : 'column_name', // c + 100 : 'data type_name', // d + 110 : 'constraint_name', // n + 70 : 'file', // F + 76 : 'line', // L + 82 : 'routine' // R +} + +function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose = noop } = {}) { + const { + ssl, + max, + user, + host, + port, + database, + parsers, + transform, + onnotice, + onnotify, + onparameter, + max_pipeline, + keep_alive, + backoff, + target_session_attrs + } = options + + const sent = Queue() + , id = uid++ + , backend = { pid: null, secret: null } + , idleTimer = timer(end, options.idle_timeout) + , lifeTimer = timer(end, options.max_lifetime) + , connectTimer = timer(connectTimedOut, options.connect_timeout) + + let socket = null + , cancelMessage + , result = new Result() + , incoming = Buffer.alloc(0) + , needsTypes = options.fetch_types + , backendParameters = {} + , statements = {} + , statementId = Math.random().toString(36).slice(2) + , statementCount = 1 + , closedDate = 0 + , remaining = 0 + , hostIndex = 0 + , retries = 0 + , length = 0 + , delay = 0 + , rows = 0 + , serverSignature = null + , nextWriteTimer = null + , terminated = false + , incomings = null + , results = null + , initial = null + , ending = null + , stream = null + , chunk = null + , ended = null + , nonce = null + , query = null + , final = null + + const connection = { + queue: queues.closed, + idleTimer, + connect(query) { + initial = query || true + reconnect() + }, + terminate, + execute, + cancel, + end, + count: 0, + id + } + + queues.closed && queues.closed.push(connection) + + return connection + + async function createSocket() { + let x + try { + x = options.socket + ? (await Promise.resolve(options.socket(options))) + : new net.Socket() + } catch (e) { + error(e) + return + } + x.on('error', error) + x.on('close', closed) + x.on('drain', drain) + return x + } + + async function cancel({ pid, secret }, resolve, reject) { + try { + cancelMessage = b().i32(16).i32(80877102).i32(pid).i32(secret).end(16) + await connect() + socket.once('error', reject) + socket.once('close', resolve) + } catch (error) { + reject(error) + } + } + + function execute(q) { + if (terminated) + return queryError(q, Errors.connection('CONNECTION_DESTROYED', options)) + + if (q.cancelled) + return + + try { + q.state = backend + query + ? sent.push(q) + : (query = q, query.active = true) + + build(q) + return write(toBuffer(q)) + && !q.describeFirst + && !q.cursorFn + && sent.length < max_pipeline + && (!q.options.onexecute || q.options.onexecute(connection)) + } catch (error) { + sent.length === 0 && write(Sync) + errored(error) + return true + } + } + + function toBuffer(q) { + if (q.parameters.length >= 65534) + throw Errors.generic('MAX_PARAMETERS_EXCEEDED', 'Max number of parameters (65534) exceeded') + + return q.options.simple + ? b().Q().str(q.statement.string + b.N).end() + : q.describeFirst + ? Buffer.concat([describe(q), Flush]) + : q.prepare + ? q.prepared + ? prepared(q) + : Buffer.concat([describe(q), prepared(q)]) + : unnamed(q) + } + + function describe(q) { + return Buffer.concat([ + Parse(q.statement.string, q.parameters, q.statement.types, q.statement.name), + Describe('S', q.statement.name) + ]) + } + + function prepared(q) { + return Buffer.concat([ + Bind(q.parameters, q.statement.types, q.statement.name, q.cursorName), + q.cursorFn + ? Execute('', q.cursorRows) + : ExecuteUnnamed + ]) + } + + function unnamed(q) { + return Buffer.concat([ + Parse(q.statement.string, q.parameters, q.statement.types), + DescribeUnnamed, + prepared(q) + ]) + } + + function build(q) { + const parameters = [] + , types = [] + + const string = stringify(q, q.strings[0], q.args[0], parameters, types, options) + + !q.tagged && q.args.forEach(x => handleValue(x, parameters, types, options)) + + q.prepare = options.prepare && ('prepare' in q.options ? q.options.prepare : true) + q.string = string + q.signature = q.prepare && types + string + q.onlyDescribe && (delete statements[q.signature]) + q.parameters = q.parameters || parameters + q.prepared = q.prepare && q.signature in statements + q.describeFirst = q.onlyDescribe || (parameters.length && !q.prepared) + q.statement = q.prepared + ? statements[q.signature] + : { string, types, name: q.prepare ? statementId + statementCount++ : '' } + + typeof options.debug === 'function' && options.debug(id, string, parameters, types) + } + + function write(x, fn) { + chunk = chunk ? Buffer.concat([chunk, x]) : Buffer.from(x) + if (fn || chunk.length >= 1024) + return nextWrite(fn) + nextWriteTimer === null && (nextWriteTimer = setImmediate(nextWrite)) + return true + } + + function nextWrite(fn) { + const x = socket.write(chunk, fn) + nextWriteTimer !== null && clearImmediate(nextWriteTimer) + chunk = nextWriteTimer = null + return x + } + + function connectTimedOut() { + errored(Errors.connection('CONNECT_TIMEOUT', options, socket)) + socket.destroy() + } + + async function secure() { + write(SSLRequest) + const canSSL = await new Promise(r => socket.once('data', x => r(x[0] === 83))) // S + + if (!canSSL && ssl === 'prefer') + return connected() + + socket.removeAllListeners() + socket = tls.connect({ + socket, + servername: net.isIP(socket.host) ? undefined : socket.host, + ...(ssl === 'require' || ssl === 'allow' || ssl === 'prefer' + ? { rejectUnauthorized: false } + : ssl === 'verify-full' + ? {} + : typeof ssl === 'object' + ? ssl + : {} + ) + }) + socket.on('secureConnect', connected) + socket.on('error', error) + socket.on('close', closed) + socket.on('drain', drain) + } + + /* c8 ignore next 3 */ + function drain() { + !query && onopen(connection) + } + + function data(x) { + if (incomings) { + incomings.push(x) + remaining -= x.length + if (remaining >= 0) + return + } + + incoming = incomings + ? Buffer.concat(incomings, length - remaining) + : incoming.length === 0 + ? x + : Buffer.concat([incoming, x], incoming.length + x.length) + + while (incoming.length > 4) { + length = incoming.readUInt32BE(1) + if (length >= incoming.length) { + remaining = length - incoming.length + incomings = [incoming] + break + } + + try { + handle(incoming.subarray(0, length + 1)) + } catch (e) { + query && (query.cursorFn || query.describeFirst) && write(Sync) + errored(e) + } + incoming = incoming.subarray(length + 1) + remaining = 0 + incomings = null + } + } + + async function connect() { + terminated = false + backendParameters = {} + socket || (socket = await createSocket()) + + if (!socket) + return + + connectTimer.start() + + if (options.socket) + return ssl ? secure() : connected() + + socket.on('connect', ssl ? secure : connected) + + if (options.path) + return socket.connect(options.path) + + socket.ssl = ssl + socket.connect(port[hostIndex], host[hostIndex]) + socket.host = host[hostIndex] + socket.port = port[hostIndex] + + hostIndex = (hostIndex + 1) % port.length + } + + function reconnect() { + setTimeout(connect, closedDate ? closedDate + delay - performance.now() : 0) + } + + function connected() { + try { + statements = {} + needsTypes = options.fetch_types + statementId = Math.random().toString(36).slice(2) + statementCount = 1 + lifeTimer.start() + socket.on('data', data) + keep_alive && socket.setKeepAlive && socket.setKeepAlive(true, 1000 * keep_alive) + const s = StartupMessage() + write(s) + } catch (err) { + error(err) + } + } + + function error(err) { + if (connection.queue === queues.connecting && options.host[retries + 1]) + return + + errored(err) + while (sent.length) + queryError(sent.shift(), err) + } + + function errored(err) { + stream && (stream.destroy(err), stream = null) + query && queryError(query, err) + initial && (queryError(initial, err), initial = null) + } + + function queryError(query, err) { + 'query' in err || 'parameters' in err || Object.defineProperties(err, { + stack: { value: err.stack + query.origin.replace(/.*\n/, '\n'), enumerable: options.debug }, + query: { value: query.string, enumerable: options.debug }, + parameters: { value: query.parameters, enumerable: options.debug }, + args: { value: query.args, enumerable: options.debug }, + types: { value: query.statement && query.statement.types, enumerable: options.debug } + }) + query.reject(err) + } + + function end() { + return ending || ( + !connection.reserved && onend(connection), + !connection.reserved && !initial && !query && sent.length === 0 + ? (terminate(), new Promise(r => socket && socket.readyState !== 'closed' ? socket.once('close', r) : r())) + : ending = new Promise(r => ended = r) + ) + } + + function terminate() { + terminated = true + if (stream || query || initial || sent.length) + error(Errors.connection('CONNECTION_DESTROYED', options)) + + clearImmediate(nextWriteTimer) + if (socket) { + socket.removeListener('data', data) + socket.removeListener('connect', connected) + socket.readyState === 'open' && socket.end(b().X().end()) + } + ended && (ended(), ending = ended = null) + } + + async function closed(hadError) { + incoming = Buffer.alloc(0) + remaining = 0 + incomings = null + clearImmediate(nextWriteTimer) + socket.removeListener('data', data) + socket.removeListener('connect', connected) + idleTimer.cancel() + lifeTimer.cancel() + connectTimer.cancel() + + socket.removeAllListeners() + socket = null + + if (initial) + return reconnect() + + !hadError && (query || sent.length) && error(Errors.connection('CONNECTION_CLOSED', options, socket)) + closedDate = performance.now() + hadError && options.shared.retries++ + delay = (typeof backoff === 'function' ? backoff(options.shared.retries) : backoff) * 1000 + onclose(connection, Errors.connection('CONNECTION_CLOSED', options, socket)) + } + + /* Handlers */ + function handle(xs, x = xs[0]) { + ( + x === 68 ? DataRow : // D + x === 100 ? CopyData : // d + x === 65 ? NotificationResponse : // A + x === 83 ? ParameterStatus : // S + x === 90 ? ReadyForQuery : // Z + x === 67 ? CommandComplete : // C + x === 50 ? BindComplete : // 2 + x === 49 ? ParseComplete : // 1 + x === 116 ? ParameterDescription : // t + x === 84 ? RowDescription : // T + x === 82 ? Authentication : // R + x === 110 ? NoData : // n + x === 75 ? BackendKeyData : // K + x === 69 ? ErrorResponse : // E + x === 115 ? PortalSuspended : // s + x === 51 ? CloseComplete : // 3 + x === 71 ? CopyInResponse : // G + x === 78 ? NoticeResponse : // N + x === 72 ? CopyOutResponse : // H + x === 99 ? CopyDone : // c + x === 73 ? EmptyQueryResponse : // I + x === 86 ? FunctionCallResponse : // V + x === 118 ? NegotiateProtocolVersion : // v + x === 87 ? CopyBothResponse : // W + /* c8 ignore next */ + UnknownMessage + )(xs) + } + + function DataRow(x) { + let index = 7 + let length + let column + let value + + const row = query.isRaw ? new Array(query.statement.columns.length) : {} + for (let i = 0; i < query.statement.columns.length; i++) { + column = query.statement.columns[i] + length = x.readInt32BE(index) + index += 4 + + value = length === -1 + ? null + : query.isRaw === true + ? x.subarray(index, index += length) + : column.parser === undefined + ? x.toString('utf8', index, index += length) + : column.parser.array === true + ? column.parser(x.toString('utf8', index + 1, index += length)) + : column.parser(x.toString('utf8', index, index += length)) + + query.isRaw + ? (row[i] = query.isRaw === true + ? value + : transform.value.from ? transform.value.from(value, column) : value) + : (row[column.name] = transform.value.from ? transform.value.from(value, column) : value) + } + + query.forEachFn + ? query.forEachFn(transform.row.from ? transform.row.from(row) : row, result) + : (result[rows++] = transform.row.from ? transform.row.from(row) : row) + } + + function ParameterStatus(x) { + const [k, v] = x.toString('utf8', 5, x.length - 1).split(b.N) + backendParameters[k] = v + if (options.parameters[k] !== v) { + options.parameters[k] = v + onparameter && onparameter(k, v) + } + } + + function ReadyForQuery(x) { + query && query.options.simple && query.resolve(results || result) + query = results = null + result = new Result() + connectTimer.cancel() + + if (initial) { + if (target_session_attrs) { + if (!backendParameters.in_hot_standby || !backendParameters.default_transaction_read_only) + return fetchState() + else if (tryNext(target_session_attrs, backendParameters)) + return terminate() + } + + if (needsTypes) { + initial === true && (initial = null) + return fetchArrayTypes() + } + + initial !== true && execute(initial) + options.shared.retries = retries = 0 + initial = null + return + } + + while (sent.length && (query = sent.shift()) && (query.active = true, query.cancelled)) + Connection(options).cancel(query.state, query.cancelled.resolve, query.cancelled.reject) + + if (query) + return // Consider opening if able and sent.length < 50 + + connection.reserved + ? !connection.reserved.release && x[5] === 73 // I + ? ending + ? terminate() + : (connection.reserved = null, onopen(connection)) + : connection.reserved() + : ending + ? terminate() + : onopen(connection) + } + + function CommandComplete(x) { + rows = 0 + + for (let i = x.length - 1; i > 0; i--) { + if (x[i] === 32 && x[i + 1] < 58 && result.count === null) + result.count = +x.toString('utf8', i + 1, x.length - 1) + if (x[i - 1] >= 65) { + result.command = x.toString('utf8', 5, i) + result.state = backend + break + } + } + + final && (final(), final = null) + + if (result.command === 'BEGIN' && max !== 1 && !connection.reserved) + return errored(Errors.generic('UNSAFE_TRANSACTION', 'Only use sql.begin, sql.reserved or max: 1')) + + if (query.options.simple) + return BindComplete() + + if (query.cursorFn) { + result.count && query.cursorFn(result) + write(Sync) + } + + query.resolve(result) + } + + function ParseComplete() { + query.parsing = false + } + + function BindComplete() { + !result.statement && (result.statement = query.statement) + result.columns = query.statement.columns + } + + function ParameterDescription(x) { + const length = x.readUInt16BE(5) + + for (let i = 0; i < length; ++i) + !query.statement.types[i] && (query.statement.types[i] = x.readUInt32BE(7 + i * 4)) + + query.prepare && (statements[query.signature] = query.statement) + query.describeFirst && !query.onlyDescribe && (write(prepared(query)), query.describeFirst = false) + } + + function RowDescription(x) { + if (result.command) { + results = results || [result] + results.push(result = new Result()) + result.count = null + query.statement.columns = null + } + + const length = x.readUInt16BE(5) + let index = 7 + let start + + query.statement.columns = Array(length) + + for (let i = 0; i < length; ++i) { + start = index + while (x[index++] !== 0); + const table = x.readUInt32BE(index) + const number = x.readUInt16BE(index + 4) + const type = x.readUInt32BE(index + 6) + query.statement.columns[i] = { + name: transform.column.from + ? transform.column.from(x.toString('utf8', start, index - 1)) + : x.toString('utf8', start, index - 1), + parser: parsers[type], + table, + number, + type + } + index += 18 + } + + result.statement = query.statement + if (query.onlyDescribe) + return (query.resolve(query.statement), write(Sync)) + } + + async function Authentication(x, type = x.readUInt32BE(5)) { + ( + type === 3 ? AuthenticationCleartextPassword : + type === 5 ? AuthenticationMD5Password : + type === 10 ? SASL : + type === 11 ? SASLContinue : + type === 12 ? SASLFinal : + type !== 0 ? UnknownAuth : + noop + )(x, type) + } + + /* c8 ignore next 5 */ + async function AuthenticationCleartextPassword() { + const payload = await Pass() + write( + b().p().str(payload).z(1).end() + ) + } + + async function AuthenticationMD5Password(x) { + const payload = 'md5' + ( + await md5( + Buffer.concat([ + Buffer.from(await md5((await Pass()) + user)), + x.subarray(9) + ]) + ) + ) + write( + b().p().str(payload).z(1).end() + ) + } + + async function SASL() { + nonce = (await crypto.randomBytes(18)).toString('base64') + b().p().str('SCRAM-SHA-256' + b.N) + const i = b.i + write(b.inc(4).str('n,,n=*,r=' + nonce).i32(b.i - i - 4, i).end()) + } + + async function SASLContinue(x) { + const res = x.toString('utf8', 9).split(',').reduce((acc, x) => (acc[x[0]] = x.slice(2), acc), {}) + + const saltedPassword = await crypto.pbkdf2Sync( + await Pass(), + Buffer.from(res.s, 'base64'), + parseInt(res.i), 32, + 'sha256' + ) + + const clientKey = await hmac(saltedPassword, 'Client Key') + + const auth = 'n=*,r=' + nonce + ',' + + 'r=' + res.r + ',s=' + res.s + ',i=' + res.i + + ',c=biws,r=' + res.r + + serverSignature = (await hmac(await hmac(saltedPassword, 'Server Key'), auth)).toString('base64') + + const payload = 'c=biws,r=' + res.r + ',p=' + xor( + clientKey, Buffer.from(await hmac(await sha256(clientKey), auth)) + ).toString('base64') + + write( + b().p().str(payload).end() + ) + } + + function SASLFinal(x) { + if (x.toString('utf8', 9).split(b.N, 1)[0].slice(2) === serverSignature) + return + /* c8 ignore next 5 */ + errored(Errors.generic('SASL_SIGNATURE_MISMATCH', 'The server did not return the correct signature')) + socket.destroy() + } + + function Pass() { + return Promise.resolve(typeof options.pass === 'function' + ? options.pass() + : options.pass + ) + } + + function NoData() { + result.statement = query.statement + result.statement.columns = [] + if (query.onlyDescribe) + return (query.resolve(query.statement), write(Sync)) + } + + function BackendKeyData(x) { + backend.pid = x.readUInt32BE(5) + backend.secret = x.readUInt32BE(9) + } + + async function fetchArrayTypes() { + needsTypes = false + const types = await new Query([` + select b.oid, b.typarray + from pg_catalog.pg_type a + left join pg_catalog.pg_type b on b.oid = a.typelem + where a.typcategory = 'A' + group by b.oid, b.typarray + order by b.oid + `], [], execute) + types.forEach(({ oid, typarray }) => addArrayType(oid, typarray)) + } + + function addArrayType(oid, typarray) { + if (!!options.parsers[typarray] && !!options.serializers[typarray]) return + const parser = options.parsers[oid] + options.shared.typeArrayMap[oid] = typarray + options.parsers[typarray] = (xs) => arrayParser(xs, parser, typarray) + options.parsers[typarray].array = true + options.serializers[typarray] = (xs) => arraySerializer(xs, options.serializers[oid], options, typarray) + } + + function tryNext(x, xs) { + return ( + (x === 'read-write' && xs.default_transaction_read_only === 'on') || + (x === 'read-only' && xs.default_transaction_read_only === 'off') || + (x === 'primary' && xs.in_hot_standby === 'on') || + (x === 'standby' && xs.in_hot_standby === 'off') || + (x === 'prefer-standby' && xs.in_hot_standby === 'off' && options.host[retries]) + ) + } + + function fetchState() { + const query = new Query([` + show transaction_read_only; + select pg_catalog.pg_is_in_recovery() + `], [], execute, null, { simple: true }) + query.resolve = ([[a], [b]]) => { + backendParameters.default_transaction_read_only = a.transaction_read_only + backendParameters.in_hot_standby = b.pg_is_in_recovery ? 'on' : 'off' + } + query.execute() + } + + function ErrorResponse(x) { + query && (query.cursorFn || query.describeFirst) && write(Sync) + const error = Errors.postgres(parseError(x)) + query && query.retried + ? errored(query.retried) + : query && query.prepared && retryRoutines.has(error.routine) + ? retry(query, error) + : errored(error) + } + + function retry(q, error) { + delete statements[q.signature] + q.retried = error + execute(q) + } + + function NotificationResponse(x) { + if (!onnotify) + return + + let index = 9 + while (x[index++] !== 0); + onnotify( + x.toString('utf8', 9, index - 1), + x.toString('utf8', index, x.length - 1) + ) + } + + async function PortalSuspended() { + try { + const x = await Promise.resolve(query.cursorFn(result)) + rows = 0 + x === CLOSE + ? write(Close(query.portal)) + : (result = new Result(), write(Execute('', query.cursorRows))) + } catch (err) { + write(Sync) + query.reject(err) + } + } + + function CloseComplete() { + result.count && query.cursorFn(result) + query.resolve(result) + } + + function CopyInResponse() { + stream = new Stream.Writable({ + autoDestroy: true, + write(chunk, encoding, callback) { + socket.write(b().d().raw(chunk).end(), callback) + }, + destroy(error, callback) { + callback(error) + socket.write(b().f().str(error + b.N).end()) + stream = null + }, + final(callback) { + socket.write(b().c().end()) + final = callback + } + }) + query.resolve(stream) + } + + function CopyOutResponse() { + stream = new Stream.Readable({ + read() { socket.resume() } + }) + query.resolve(stream) + } + + /* c8 ignore next 3 */ + function CopyBothResponse() { + stream = new Stream.Duplex({ + autoDestroy: true, + read() { socket.resume() }, + /* c8 ignore next 11 */ + write(chunk, encoding, callback) { + socket.write(b().d().raw(chunk).end(), callback) + }, + destroy(error, callback) { + callback(error) + socket.write(b().f().str(error + b.N).end()) + stream = null + }, + final(callback) { + socket.write(b().c().end()) + final = callback + } + }) + query.resolve(stream) + } + + function CopyData(x) { + stream && (stream.push(x.subarray(5)) || socket.pause()) + } + + function CopyDone() { + stream && stream.push(null) + stream = null + } + + function NoticeResponse(x) { + onnotice + ? onnotice(parseError(x)) + : console.log(parseError(x)) // eslint-disable-line + + } + + /* c8 ignore next 3 */ + function EmptyQueryResponse() { + /* noop */ + } + + /* c8 ignore next 3 */ + function FunctionCallResponse() { + errored(Errors.notSupported('FunctionCallResponse')) + } + + /* c8 ignore next 3 */ + function NegotiateProtocolVersion() { + errored(Errors.notSupported('NegotiateProtocolVersion')) + } + + /* c8 ignore next 3 */ + function UnknownMessage(x) { + console.error('Postgres.js : Unknown Message:', x[0]) // eslint-disable-line + } + + /* c8 ignore next 3 */ + function UnknownAuth(x, type) { + console.error('Postgres.js : Unknown Auth:', type) // eslint-disable-line + } + + /* Messages */ + function Bind(parameters, types, statement = '', portal = '') { + let prev + , type + + b().B().str(portal + b.N).str(statement + b.N).i16(0).i16(parameters.length) + + parameters.forEach((x, i) => { + if (x === null) + return b.i32(0xFFFFFFFF) + + type = types[i] + parameters[i] = x = type in options.serializers + ? options.serializers[type](x) + : '' + x + + prev = b.i + b.inc(4).str(x).i32(b.i - prev - 4, prev) + }) + + b.i16(0) + + return b.end() + } + + function Parse(str, parameters, types, name = '') { + b().P().str(name + b.N).str(str + b.N).i16(parameters.length) + parameters.forEach((x, i) => b.i32(types[i] || 0)) + return b.end() + } + + function Describe(x, name = '') { + return b().D().str(x).str(name + b.N).end() + } + + function Execute(portal = '', rows = 0) { + return Buffer.concat([ + b().E().str(portal + b.N).i32(rows).end(), + Flush + ]) + } + + function Close(portal = '') { + return Buffer.concat([ + b().C().str('P').str(portal + b.N).end(), + b().S().end() + ]) + } + + function StartupMessage() { + return cancelMessage || b().inc(4).i16(3).z(2).str( + Object.entries(Object.assign({ + user, + database, + client_encoding: 'UTF8' + }, + options.connection + )).filter(([, v]) => v).map(([k, v]) => k + b.N + v).join(b.N) + ).z(2).end(0) + } + +} + +function parseError(x) { + const error = {} + let start = 5 + for (let i = 5; i < x.length - 1; i++) { + if (x[i] === 0) { + error[errorFields[x[start]]] = x.toString('utf8', start + 1, i) + start = i + 1 + } + } + return error +} + +function md5(x) { + return crypto.createHash('md5').update(x).digest('hex') +} + +function hmac(key, x) { + return crypto.createHmac('sha256', key).update(x).digest() +} + +function sha256(x) { + return crypto.createHash('sha256').update(x).digest() +} + +function xor(a, b) { + const length = Math.max(a.length, b.length) + const buffer = Buffer.allocUnsafe(length) + for (let i = 0; i < length; i++) + buffer[i] = a[i] ^ b[i] + return buffer +} + +function timer(fn, seconds) { + seconds = typeof seconds === 'function' ? seconds() : seconds + if (!seconds) + return { cancel: noop, start: noop } + + let timer + return { + cancel() { + timer && (clearTimeout(timer), timer = null) + }, + start() { + timer && clearTimeout(timer) + timer = setTimeout(done, seconds * 1000, arguments) + } + } + + function done(args) { + fn.apply(null, args) + timer = null + } +} diff --git a/neon/src/errors.js b/neon/src/errors.js new file mode 100644 index 00000000..0ff83c42 --- /dev/null +++ b/neon/src/errors.js @@ -0,0 +1,53 @@ +export class PostgresError extends Error { + constructor(x) { + super(x.message) + this.name = this.constructor.name + Object.assign(this, x) + } +} + +export const Errors = { + connection, + postgres, + generic, + notSupported +} + +function connection(x, options, socket) { + const { host, port } = socket || options + const error = Object.assign( + new Error(('write ' + x + ' ' + (options.path || (host + ':' + port)))), + { + code: x, + errno: x, + address: options.path || host + }, options.path ? {} : { port: port } + ) + Error.captureStackTrace(error, connection) + return error +} + +function postgres(x) { + const error = new PostgresError(x) + Error.captureStackTrace(error, postgres) + return error +} + +function generic(code, message) { + const error = Object.assign(new Error(code + ': ' + message), { code }) + Error.captureStackTrace(error, generic) + return error +} + +/* c8 ignore next 10 */ +function notSupported(x) { + const error = Object.assign( + new Error(x + ' (B) is not supported'), + { + code: 'MESSAGE_NOT_SUPPORTED', + name: x + } + ) + Error.captureStackTrace(error, notSupported) + return error +} diff --git a/neon/src/index.js b/neon/src/index.js new file mode 100644 index 00000000..d24e9f9c --- /dev/null +++ b/neon/src/index.js @@ -0,0 +1,566 @@ +import { process } from '../polyfills.js' +import { os } from '../polyfills.js' +import { fs } from '../polyfills.js' + +import { + mergeUserTypes, + inferType, + Parameter, + Identifier, + Builder, + toPascal, + pascal, + toCamel, + camel, + toKebab, + kebab, + fromPascal, + fromCamel, + fromKebab +} from './types.js' + +import Connection from './connection.js' +import { Query, CLOSE } from './query.js' +import Queue from './queue.js' +import { Errors, PostgresError } from './errors.js' +import Subscribe from './subscribe.js' +import largeObject from './large.js' + +Object.assign(Postgres, { + PostgresError, + toPascal, + pascal, + toCamel, + camel, + toKebab, + kebab, + fromPascal, + fromCamel, + fromKebab, + BigInt: { + to: 20, + from: [20], + parse: x => BigInt(x), // eslint-disable-line + serialize: x => x.toString() + } +}) + +export default Postgres + +function Postgres(a, b) { + const options = parseOptions(a, b) + , subscribe = options.no_subscribe || Subscribe(Postgres, { ...options }) + + let ending = false + + const queries = Queue() + , connecting = Queue() + , reserved = Queue() + , closed = Queue() + , ended = Queue() + , open = Queue() + , busy = Queue() + , full = Queue() + , queues = { connecting, reserved, closed, ended, open, busy, full } + + const connections = [...Array(options.max)].map(() => Connection(options, queues, { onopen, onend, onclose })) + + const sql = Sql(handler) + + Object.assign(sql, { + get parameters() { return options.parameters }, + largeObject: largeObject.bind(null, sql), + subscribe, + CLOSE, + END: CLOSE, + PostgresError, + options, + reserve, + listen, + begin, + close, + end + }) + + return sql + + function Sql(handler) { + handler.debug = options.debug + + Object.entries(options.types).reduce((acc, [name, type]) => { + acc[name] = (x) => new Parameter(x, type.to) + return acc + }, typed) + + Object.assign(sql, { + types: typed, + typed, + unsafe, + notify, + array, + json, + file + }) + + return sql + + function typed(value, type) { + return new Parameter(value, type) + } + + function sql(strings, ...args) { + const query = strings && Array.isArray(strings.raw) + ? new Query(strings, args, handler, cancel) + : typeof strings === 'string' && !args.length + ? new Identifier(options.transform.column.to ? options.transform.column.to(strings) : strings) + : new Builder(strings, args) + return query + } + + function unsafe(string, args = [], options = {}) { + arguments.length === 2 && !Array.isArray(args) && (options = args, args = []) + const query = new Query([string], args, handler, cancel, { + prepare: false, + ...options, + simple: 'simple' in options ? options.simple : args.length === 0 + }) + return query + } + + function file(path, args = [], options = {}) { + arguments.length === 2 && !Array.isArray(args) && (options = args, args = []) + const query = new Query([], args, (query) => { + fs.readFile(path, 'utf8', (err, string) => { + if (err) + return query.reject(err) + + query.strings = [string] + handler(query) + }) + }, cancel, { + ...options, + simple: 'simple' in options ? options.simple : args.length === 0 + }) + return query + } + } + + async function listen(name, fn, onlisten) { + const listener = { fn, onlisten } + + const sql = listen.sql || (listen.sql = Postgres({ + ...options, + max: 1, + idle_timeout: null, + max_lifetime: null, + fetch_types: false, + onclose() { + Object.entries(listen.channels).forEach(([name, { listeners }]) => { + delete listen.channels[name] + Promise.all(listeners.map(l => listen(name, l.fn, l.onlisten).catch(() => { /* noop */ }))) + }) + }, + onnotify(c, x) { + c in listen.channels && listen.channels[c].listeners.forEach(l => l.fn(x)) + } + })) + + const channels = listen.channels || (listen.channels = {}) + , exists = name in channels + + if (exists) { + channels[name].listeners.push(listener) + const result = await channels[name].result + listener.onlisten && listener.onlisten() + return { state: result.state, unlisten } + } + + channels[name] = { result: sql`listen ${ + sql.unsafe('"' + name.replace(/"/g, '""') + '"') + }`, listeners: [listener] } + const result = await channels[name].result + listener.onlisten && listener.onlisten() + return { state: result.state, unlisten } + + async function unlisten() { + if (name in channels === false) + return + + channels[name].listeners = channels[name].listeners.filter(x => x !== listener) + if (channels[name].listeners.length) + return + + delete channels[name] + return sql`unlisten ${ + sql.unsafe('"' + name.replace(/"/g, '""') + '"') + }` + } + } + + async function notify(channel, payload) { + return await sql`select pg_notify(${ channel }, ${ '' + payload })` + } + + async function reserve() { + const queue = Queue() + const c = open.length + ? open.shift() + : await new Promise(r => { + queries.push({ reserve: r }) + closed.length && connect(closed.shift()) + }) + + move(c, reserved) + c.reserved = () => queue.length + ? c.execute(queue.shift()) + : move(c, reserved) + c.reserved.release = true + + const sql = Sql(handler) + sql.release = () => { + c.reserved = null + onopen(c) + } + + return sql + + function handler(q) { + c.queue === full + ? queue.push(q) + : c.execute(q) || move(c, full) + } + } + + async function begin(options, fn) { + !fn && (fn = options, options = '') + const queries = Queue() + let savepoints = 0 + , connection + , prepare = null + + try { + await sql.unsafe('begin ' + options.replace(/[^a-z ]/ig, ''), [], { onexecute }).execute() + return await Promise.race([ + scope(connection, fn), + new Promise((_, reject) => connection.onclose = reject) + ]) + } catch (error) { + throw error + } + + async function scope(c, fn, name) { + const sql = Sql(handler) + sql.savepoint = savepoint + sql.prepare = x => prepare = x.replace(/[^a-z0-9$-_. ]/gi) + let uncaughtError + , result + + name && await sql`savepoint ${ sql(name) }` + try { + result = await new Promise((resolve, reject) => { + const x = fn(sql) + Promise.resolve(Array.isArray(x) ? Promise.all(x) : x).then(resolve, reject) + }) + + if (uncaughtError) + throw uncaughtError + } catch (e) { + await (name + ? sql`rollback to ${ sql(name) }` + : sql`rollback` + ) + throw e instanceof PostgresError && e.code === '25P02' && uncaughtError || e + } + + if (!name) { + prepare + ? await sql`prepare transaction '${ sql.unsafe(prepare) }'` + : await sql`commit` + } + + return result + + function savepoint(name, fn) { + if (name && Array.isArray(name.raw)) + return savepoint(sql => sql.apply(sql, arguments)) + + arguments.length === 1 && (fn = name, name = null) + return scope(c, fn, 's' + savepoints++ + (name ? '_' + name : '')) + } + + function handler(q) { + q.catch(e => uncaughtError || (uncaughtError = e)) + c.queue === full + ? queries.push(q) + : c.execute(q) || move(c, full) + } + } + + function onexecute(c) { + connection = c + move(c, reserved) + c.reserved = () => queries.length + ? c.execute(queries.shift()) + : move(c, reserved) + } + } + + function move(c, queue) { + c.queue.remove(c) + queue.push(c) + c.queue = queue + queue === open + ? c.idleTimer.start() + : c.idleTimer.cancel() + return c + } + + function json(x) { + return new Parameter(x, 3802) + } + + function array(x, type) { + if (!Array.isArray(x)) + return array(Array.from(arguments)) + + return new Parameter(x, type || (x.length ? inferType(x) || 25 : 0), options.shared.typeArrayMap) + } + + function handler(query) { + if (ending) + return query.reject(Errors.connection('CONNECTION_ENDED', options, options)) + + if (open.length) + return go(open.shift(), query) + + if (closed.length) + return connect(closed.shift(), query) + + busy.length + ? go(busy.shift(), query) + : queries.push(query) + } + + function go(c, query) { + return c.execute(query) + ? move(c, busy) + : move(c, full) + } + + function cancel(query) { + return new Promise((resolve, reject) => { + query.state + ? query.active + ? Connection(options).cancel(query.state, resolve, reject) + : query.cancelled = { resolve, reject } + : ( + queries.remove(query), + query.cancelled = true, + query.reject(Errors.generic('57014', 'canceling statement due to user request')), + resolve() + ) + }) + } + + async function end({ timeout = null } = {}) { + if (ending) + return ending + + await 1 + let timer + return ending = Promise.race([ + new Promise(r => timeout !== null && (timer = setTimeout(destroy, timeout * 1000, r))), + Promise.all(connections.map(c => c.end()).concat( + listen.sql ? listen.sql.end({ timeout: 0 }) : [], + subscribe.sql ? subscribe.sql.end({ timeout: 0 }) : [] + )) + ]).then(() => clearTimeout(timer)) + } + + async function close() { + await Promise.all(connections.map(c => c.end())) + } + + async function destroy(resolve) { + await Promise.all(connections.map(c => c.terminate())) + while (queries.length) + queries.shift().reject(Errors.connection('CONNECTION_DESTROYED', options)) + resolve() + } + + function connect(c, query) { + move(c, connecting) + c.connect(query) + return c + } + + function onend(c) { + move(c, ended) + } + + function onopen(c) { + if (queries.length === 0) + return move(c, open) + + let max = Math.ceil(queries.length / (connecting.length + 1)) + , ready = true + + while (ready && queries.length && max-- > 0) { + const query = queries.shift() + if (query.reserve) + return query.reserve(c) + + ready = c.execute(query) + } + + ready + ? move(c, busy) + : move(c, full) + } + + function onclose(c, e) { + move(c, closed) + c.reserved = null + c.onclose && (c.onclose(e), c.onclose = null) + options.onclose && options.onclose(c.id) + queries.length && connect(c, queries.shift()) + } +} + +function parseOptions(a, b) { + if (a && a.shared) + return a + + const env = process.env // eslint-disable-line + , o = (!a || typeof a === 'string' ? b : a) || {} + , { url, multihost } = parseUrl(a) + , query = [...url.searchParams].reduce((a, [b, c]) => (a[b] = c, a), {}) + , host = o.hostname || o.host || multihost || url.hostname || env.PGHOST || 'localhost' + , port = o.port || url.port || env.PGPORT || 5432 + , user = o.user || o.username || url.username || env.PGUSERNAME || env.PGUSER || osUsername() + + o.no_prepare && (o.prepare = false) + query.sslmode && (query.ssl = query.sslmode, delete query.sslmode) + 'timeout' in o && (console.log('The timeout option is deprecated, use idle_timeout instead'), o.idle_timeout = o.timeout) // eslint-disable-line + query.sslrootcert === 'system' && (query.ssl = 'verify-full') + + const ints = ['idle_timeout', 'connect_timeout', 'max_lifetime', 'max_pipeline', 'backoff', 'keep_alive'] + const defaults = { + max : 10, + ssl : false, + idle_timeout : null, + connect_timeout : 30, + max_lifetime : max_lifetime, + max_pipeline : 100, + backoff : backoff, + keep_alive : 60, + prepare : true, + debug : false, + fetch_types : true, + publications : 'alltables', + target_session_attrs: null + } + + return { + host : Array.isArray(host) ? host : host.split(',').map(x => x.split(':')[0]), + port : Array.isArray(port) ? port : host.split(',').map(x => parseInt(x.split(':')[1] || port)), + path : o.path || host.indexOf('/') > -1 && host + '/.s.PGSQL.' + port, + database : o.database || o.db || (url.pathname || '').slice(1) || env.PGDATABASE || user, + user : user, + pass : o.pass || o.password || url.password || env.PGPASSWORD || '', + ...Object.entries(defaults).reduce( + (acc, [k, d]) => { + const value = k in o ? o[k] : k in query + ? (query[k] === 'disable' || query[k] === 'false' ? false : query[k]) + : env['PG' + k.toUpperCase()] || d + acc[k] = typeof value === 'string' && ints.includes(k) + ? +value + : value + return acc + }, + {} + ), + connection : { + application_name: 'postgres.js', + ...o.connection, + ...Object.entries(query).reduce((acc, [k, v]) => (k in defaults || (acc[k] = v), acc), {}) + }, + types : o.types || {}, + target_session_attrs: tsa(o, url, env), + onnotice : o.onnotice, + onnotify : o.onnotify, + onclose : o.onclose, + onparameter : o.onparameter, + socket : o.socket, + transform : parseTransform(o.transform || { undefined: undefined }), + parameters : {}, + shared : { retries: 0, typeArrayMap: {} }, + ...mergeUserTypes(o.types) + } +} + +function tsa(o, url, env) { + const x = o.target_session_attrs || url.searchParams.get('target_session_attrs') || env.PGTARGETSESSIONATTRS + if (!x || ['read-write', 'read-only', 'primary', 'standby', 'prefer-standby'].includes(x)) + return x + + throw new Error('target_session_attrs ' + x + ' is not supported') +} + +function backoff(retries) { + return (0.5 + Math.random() / 2) * Math.min(3 ** retries / 100, 20) +} + +function max_lifetime() { + return 60 * (30 + Math.random() * 30) +} + +function parseTransform(x) { + return { + undefined: x.undefined, + column: { + from: typeof x.column === 'function' ? x.column : x.column && x.column.from, + to: x.column && x.column.to + }, + value: { + from: typeof x.value === 'function' ? x.value : x.value && x.value.from, + to: x.value && x.value.to + }, + row: { + from: typeof x.row === 'function' ? x.row : x.row && x.row.from, + to: x.row && x.row.to + } + } +} + +function parseUrl(url) { + if (!url || typeof url !== 'string') + return { url: { searchParams: new Map() } } + + let host = url + host = host.slice(host.indexOf('://') + 3).split(/[?/]/)[0] + host = decodeURIComponent(host.slice(host.indexOf('@') + 1)) + + const urlObj = new URL(url.replace(host, host.split(',')[0])) + + return { + url: { + username: decodeURIComponent(urlObj.username), + password: decodeURIComponent(urlObj.password), + host: urlObj.host, + hostname: urlObj.hostname, + port: urlObj.port, + pathname: urlObj.pathname, + searchParams: urlObj.searchParams + }, + multihost: host.indexOf(',') > -1 && host + } +} + +function osUsername() { + try { + return os.userInfo().username // eslint-disable-line + } catch (_) { + return process.env.USERNAME || process.env.USER || process.env.LOGNAME // eslint-disable-line + } +} diff --git a/neon/src/large.js b/neon/src/large.js new file mode 100644 index 00000000..8ae150dd --- /dev/null +++ b/neon/src/large.js @@ -0,0 +1,70 @@ +import Stream from 'node:stream' + +export default function largeObject(sql, oid, mode = 0x00020000 | 0x00040000) { + return new Promise(async(resolve, reject) => { + await sql.begin(async sql => { + let finish + !oid && ([{ oid }] = await sql`select lo_creat(-1) as oid`) + const [{ fd }] = await sql`select lo_open(${ oid }, ${ mode }) as fd` + + const lo = { + writable, + readable, + close : () => sql`select lo_close(${ fd })`.then(finish), + tell : () => sql`select lo_tell64(${ fd })`, + read : (x) => sql`select loread(${ fd }, ${ x }) as data`, + write : (x) => sql`select lowrite(${ fd }, ${ x })`, + truncate : (x) => sql`select lo_truncate64(${ fd }, ${ x })`, + seek : (x, whence = 0) => sql`select lo_lseek64(${ fd }, ${ x }, ${ whence })`, + size : () => sql` + select + lo_lseek64(${ fd }, location, 0) as position, + seek.size + from ( + select + lo_lseek64($1, 0, 2) as size, + tell.location + from (select lo_tell64($1) as location) tell + ) seek + ` + } + + resolve(lo) + + return new Promise(async r => finish = r) + + async function readable({ + highWaterMark = 2048 * 8, + start = 0, + end = Infinity + } = {}) { + let max = end - start + start && await lo.seek(start) + return new Stream.Readable({ + highWaterMark, + async read(size) { + const l = size > max ? size - max : size + max -= size + const [{ data }] = await lo.read(l) + this.push(data) + if (data.length < size) + this.push(null) + } + }) + } + + async function writable({ + highWaterMark = 2048 * 8, + start = 0 + } = {}) { + start && await lo.seek(start) + return new Stream.Writable({ + highWaterMark, + write(chunk, encoding, callback) { + lo.write(chunk).then(() => callback(), callback) + } + }) + } + }).catch(reject) + }) +} diff --git a/neon/src/query.js b/neon/src/query.js new file mode 100644 index 00000000..0d44a15c --- /dev/null +++ b/neon/src/query.js @@ -0,0 +1,173 @@ +const originCache = new Map() + , originStackCache = new Map() + , originError = Symbol('OriginError') + +export const CLOSE = {} +export class Query extends Promise { + constructor(strings, args, handler, canceller, options = {}) { + let resolve + , reject + + super((a, b) => { + resolve = a + reject = b + }) + + this.tagged = Array.isArray(strings.raw) + this.strings = strings + this.args = args + this.handler = handler + this.canceller = canceller + this.options = options + + this.state = null + this.statement = null + + this.resolve = x => (this.active = false, resolve(x)) + this.reject = x => (this.active = false, reject(x)) + + this.active = false + this.cancelled = null + this.executed = false + this.signature = '' + + this[originError] = this.handler.debug + ? new Error() + : this.tagged && cachedError(this.strings) + } + + get origin() { + return (this.handler.debug + ? this[originError].stack + : this.tagged && originStackCache.has(this.strings) + ? originStackCache.get(this.strings) + : originStackCache.set(this.strings, this[originError].stack).get(this.strings) + ) || '' + } + + static get [Symbol.species]() { + return Promise + } + + cancel() { + return this.canceller && (this.canceller(this), this.canceller = null) + } + + simple() { + this.options.simple = true + this.options.prepare = false + return this + } + + async readable() { + this.simple() + this.streaming = true + return this + } + + async writable() { + this.simple() + this.streaming = true + return this + } + + cursor(rows = 1, fn) { + this.options.simple = false + if (typeof rows === 'function') { + fn = rows + rows = 1 + } + + this.cursorRows = rows + + if (typeof fn === 'function') + return (this.cursorFn = fn, this) + + let prev + return { + [Symbol.asyncIterator]: () => ({ + next: () => { + if (this.executed && !this.active) + return { done: true } + + prev && prev() + const promise = new Promise((resolve, reject) => { + this.cursorFn = value => { + resolve({ value, done: false }) + return new Promise(r => prev = r) + } + this.resolve = () => (this.active = false, resolve({ done: true })) + this.reject = x => (this.active = false, reject(x)) + }) + this.execute() + return promise + }, + return() { + prev && prev(CLOSE) + return { done: true } + } + }) + } + } + + describe() { + this.options.simple = false + this.onlyDescribe = this.options.prepare = true + return this + } + + stream() { + throw new Error('.stream has been renamed to .forEach') + } + + forEach(fn) { + this.forEachFn = fn + this.handle() + return this + } + + raw() { + this.isRaw = true + return this + } + + values() { + this.isRaw = 'values' + return this + } + + async handle() { + !this.executed && (this.executed = true) && await 1 && this.handler(this) + } + + execute() { + this.handle() + return this + } + + then() { + this.handle() + return super.then.apply(this, arguments) + } + + catch() { + this.handle() + return super.catch.apply(this, arguments) + } + + finally() { + this.handle() + return super.finally.apply(this, arguments) + } +} + +function cachedError(xs) { + if (originCache.has(xs)) + return originCache.get(xs) + + const x = Error.stackTraceLimit + Error.stackTraceLimit = 4 + originCache.set(xs, new Error()) + Error.stackTraceLimit = x + return originCache.get(xs) +} diff --git a/neon/src/queue.js b/neon/src/queue.js new file mode 100644 index 00000000..c4ef9716 --- /dev/null +++ b/neon/src/queue.js @@ -0,0 +1,31 @@ +export default Queue + +function Queue(initial = []) { + let xs = initial.slice() + let index = 0 + + return { + get length() { + return xs.length - index + }, + remove: (x) => { + const index = xs.indexOf(x) + return index === -1 + ? null + : (xs.splice(index, 1), x) + }, + push: (x) => (xs.push(x), x), + shift: () => { + const out = xs[index++] + + if (index === xs.length) { + index = 0 + xs = [] + } else { + xs[index - 1] = undefined + } + + return out + } + } +} diff --git a/neon/src/result.js b/neon/src/result.js new file mode 100644 index 00000000..31014284 --- /dev/null +++ b/neon/src/result.js @@ -0,0 +1,16 @@ +export default class Result extends Array { + constructor() { + super() + Object.defineProperties(this, { + count: { value: null, writable: true }, + state: { value: null, writable: true }, + command: { value: null, writable: true }, + columns: { value: null, writable: true }, + statement: { value: null, writable: true } + }) + } + + static get [Symbol.species]() { + return Array + } +} diff --git a/neon/src/subscribe.js b/neon/src/subscribe.js new file mode 100644 index 00000000..8716100e --- /dev/null +++ b/neon/src/subscribe.js @@ -0,0 +1,278 @@ +import { Buffer } from 'node:buffer' +const noop = () => { /* noop */ } + +export default function Subscribe(postgres, options) { + const subscribers = new Map() + , slot = 'postgresjs_' + Math.random().toString(36).slice(2) + , state = {} + + let connection + , stream + , ended = false + + const sql = subscribe.sql = postgres({ + ...options, + transform: { column: {}, value: {}, row: {} }, + max: 1, + fetch_types: false, + idle_timeout: null, + max_lifetime: null, + connection: { + ...options.connection, + replication: 'database' + }, + onclose: async function() { + if (ended) + return + stream = null + state.pid = state.secret = undefined + connected(await init(sql, slot, options.publications)) + subscribers.forEach(event => event.forEach(({ onsubscribe }) => onsubscribe())) + }, + no_subscribe: true + }) + + const end = sql.end + , close = sql.close + + sql.end = async() => { + ended = true + stream && (await new Promise(r => (stream.once('close', r), stream.end()))) + return end() + } + + sql.close = async() => { + stream && (await new Promise(r => (stream.once('close', r), stream.end()))) + return close() + } + + return subscribe + + async function subscribe(event, fn, onsubscribe = noop, onerror = noop) { + event = parseEvent(event) + + if (!connection) + connection = init(sql, slot, options.publications) + + const subscriber = { fn, onsubscribe } + const fns = subscribers.has(event) + ? subscribers.get(event).add(subscriber) + : subscribers.set(event, new Set([subscriber])).get(event) + + const unsubscribe = () => { + fns.delete(subscriber) + fns.size === 0 && subscribers.delete(event) + } + + return connection.then(x => { + connected(x) + onsubscribe() + stream && stream.on('error', onerror) + return { unsubscribe, state, sql } + }) + } + + function connected(x) { + stream = x.stream + state.pid = x.state.pid + state.secret = x.state.secret + } + + async function init(sql, slot, publications) { + if (!publications) + throw new Error('Missing publication names') + + const xs = await sql.unsafe( + `CREATE_REPLICATION_SLOT ${ slot } TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT` + ) + + const [x] = xs + + const stream = await sql.unsafe( + `START_REPLICATION SLOT ${ slot } LOGICAL ${ + x.consistent_point + } (proto_version '1', publication_names '${ publications }')` + ).writable() + + const state = { + lsn: Buffer.concat(x.consistent_point.split('/').map(x => Buffer.from(('00000000' + x).slice(-8), 'hex'))) + } + + stream.on('data', data) + stream.on('error', error) + stream.on('close', sql.close) + + return { stream, state: xs.state } + + function error(e) { + console.error('Unexpected error during logical streaming - reconnecting', e) // eslint-disable-line + } + + function data(x) { + if (x[0] === 0x77) { + parse(x.subarray(25), state, sql.options.parsers, handle, options.transform) + } else if (x[0] === 0x6b && x[17]) { + state.lsn = x.subarray(1, 9) + pong() + } + } + + function handle(a, b) { + const path = b.relation.schema + '.' + b.relation.table + call('*', a, b) + call('*:' + path, a, b) + b.relation.keys.length && call('*:' + path + '=' + b.relation.keys.map(x => a[x.name]), a, b) + call(b.command, a, b) + call(b.command + ':' + path, a, b) + b.relation.keys.length && call(b.command + ':' + path + '=' + b.relation.keys.map(x => a[x.name]), a, b) + } + + function pong() { + const x = Buffer.alloc(34) + x[0] = 'r'.charCodeAt(0) + x.fill(state.lsn, 1) + x.writeBigInt64BE(BigInt(Date.now() - Date.UTC(2000, 0, 1)) * BigInt(1000), 25) + stream.write(x) + } + } + + function call(x, a, b) { + subscribers.has(x) && subscribers.get(x).forEach(({ fn }) => fn(a, b, x)) + } +} + +function Time(x) { + return new Date(Date.UTC(2000, 0, 1) + Number(x / BigInt(1000))) +} + +function parse(x, state, parsers, handle, transform) { + const char = (acc, [k, v]) => (acc[k.charCodeAt(0)] = v, acc) + + Object.entries({ + R: x => { // Relation + let i = 1 + const r = state[x.readUInt32BE(i)] = { + schema: x.toString('utf8', i += 4, i = x.indexOf(0, i)) || 'pg_catalog', + table: x.toString('utf8', i + 1, i = x.indexOf(0, i + 1)), + columns: Array(x.readUInt16BE(i += 2)), + keys: [] + } + i += 2 + + let columnIndex = 0 + , column + + while (i < x.length) { + column = r.columns[columnIndex++] = { + key: x[i++], + name: transform.column.from + ? transform.column.from(x.toString('utf8', i, i = x.indexOf(0, i))) + : x.toString('utf8', i, i = x.indexOf(0, i)), + type: x.readUInt32BE(i += 1), + parser: parsers[x.readUInt32BE(i)], + atttypmod: x.readUInt32BE(i += 4) + } + + column.key && r.keys.push(column) + i += 4 + } + }, + Y: () => { /* noop */ }, // Type + O: () => { /* noop */ }, // Origin + B: x => { // Begin + state.date = Time(x.readBigInt64BE(9)) + state.lsn = x.subarray(1, 9) + }, + I: x => { // Insert + let i = 1 + const relation = state[x.readUInt32BE(i)] + const { row } = tuples(x, relation.columns, i += 7, transform) + + handle(row, { + command: 'insert', + relation + }) + }, + D: x => { // Delete + let i = 1 + const relation = state[x.readUInt32BE(i)] + i += 4 + const key = x[i] === 75 + handle(key || x[i] === 79 + ? tuples(x, relation.columns, i += 3, transform).row + : null + , { + command: 'delete', + relation, + key + }) + }, + U: x => { // Update + let i = 1 + const relation = state[x.readUInt32BE(i)] + i += 4 + const key = x[i] === 75 + const xs = key || x[i] === 79 + ? tuples(x, relation.columns, i += 3, transform) + : null + + xs && (i = xs.i) + + const { row } = tuples(x, relation.columns, i + 3, transform) + + handle(row, { + command: 'update', + relation, + key, + old: xs && xs.row + }) + }, + T: () => { /* noop */ }, // Truncate, + C: () => { /* noop */ } // Commit + }).reduce(char, {})[x[0]](x) +} + +function tuples(x, columns, xi, transform) { + let type + , column + , value + + const row = transform.raw ? new Array(columns.length) : {} + for (let i = 0; i < columns.length; i++) { + type = x[xi++] + column = columns[i] + value = type === 110 // n + ? null + : type === 117 // u + ? undefined + : column.parser === undefined + ? x.toString('utf8', xi + 4, xi += 4 + x.readUInt32BE(xi)) + : column.parser.array === true + ? column.parser(x.toString('utf8', xi + 5, xi += 4 + x.readUInt32BE(xi))) + : column.parser(x.toString('utf8', xi + 4, xi += 4 + x.readUInt32BE(xi))) + + transform.raw + ? (row[i] = transform.raw === true + ? value + : transform.value.from ? transform.value.from(value, column) : value) + : (row[column.name] = transform.value.from + ? transform.value.from(value, column) + : value + ) + } + + return { i: xi, row: transform.row.from ? transform.row.from(row) : row } +} + +function parseEvent(x) { + const xs = x.match(/^(\*|insert|update|delete)?:?([^.]+?\.?[^=]+)?=?(.+)?/i) || [] + + if (!xs) + throw new Error('Malformed subscribe pattern: ' + x) + + const [, command, path, key] = xs + + return (command || '*') + + (path ? ':' + (path.indexOf('.') === -1 ? 'public.' + path : path) : '') + + (key ? '=' + key : '') +} diff --git a/neon/src/types.js b/neon/src/types.js new file mode 100644 index 00000000..aa2ead29 --- /dev/null +++ b/neon/src/types.js @@ -0,0 +1,368 @@ +import { Buffer } from 'node:buffer' +import { Query } from './query.js' +import { Errors } from './errors.js' + +export const types = { + string: { + to: 25, + from: null, // defaults to string + serialize: x => '' + x + }, + number: { + to: 0, + from: [21, 23, 26, 700, 701], + serialize: x => '' + x, + parse: x => +x + }, + json: { + to: 114, + from: [114, 3802], + serialize: x => JSON.stringify(x), + parse: x => JSON.parse(x) + }, + boolean: { + to: 16, + from: 16, + serialize: x => x === true ? 't' : 'f', + parse: x => x === 't' + }, + date: { + to: 1184, + from: [1082, 1114, 1184], + serialize: x => (x instanceof Date ? x : new Date(x)).toISOString(), + parse: x => new Date(x) + }, + bytea: { + to: 17, + from: 17, + serialize: x => '\\x' + Buffer.from(x).toString('hex'), + parse: x => Buffer.from(x.slice(2), 'hex') + } +} + +class NotTagged { then() { notTagged() } catch() { notTagged() } finally() { notTagged() }} + +export class Identifier extends NotTagged { + constructor(value) { + super() + this.value = escapeIdentifier(value) + } +} + +export class Parameter extends NotTagged { + constructor(value, type, array) { + super() + this.value = value + this.type = type + this.array = array + } +} + +export class Builder extends NotTagged { + constructor(first, rest) { + super() + this.first = first + this.rest = rest + } + + build(before, parameters, types, options) { + const keyword = builders.map(([x, fn]) => ({ fn, i: before.search(x) })).sort((a, b) => a.i - b.i).pop() + return keyword.i === -1 + ? escapeIdentifiers(this.first, options) + : keyword.fn(this.first, this.rest, parameters, types, options) + } +} + +export function handleValue(x, parameters, types, options) { + let value = x instanceof Parameter ? x.value : x + if (value === undefined) { + x instanceof Parameter + ? x.value = options.transform.undefined + : value = x = options.transform.undefined + + if (value === undefined) + throw Errors.generic('UNDEFINED_VALUE', 'Undefined values are not allowed') + } + + return '$' + (types.push( + x instanceof Parameter + ? (parameters.push(x.value), x.array + ? x.array[x.type || inferType(x.value)] || x.type || firstIsString(x.value) + : x.type + ) + : (parameters.push(x), inferType(x)) + )) +} + +const defaultHandlers = typeHandlers(types) + +export function stringify(q, string, value, parameters, types, options) { // eslint-disable-line + for (let i = 1; i < q.strings.length; i++) { + string += (stringifyValue(string, value, parameters, types, options)) + q.strings[i] + value = q.args[i] + } + + return string +} + +function stringifyValue(string, value, parameters, types, o) { + return ( + value instanceof Builder ? value.build(string, parameters, types, o) : + value instanceof Query ? fragment(value, parameters, types, o) : + value instanceof Identifier ? value.value : + value && value[0] instanceof Query ? value.reduce((acc, x) => acc + ' ' + fragment(x, parameters, types, o), '') : + handleValue(value, parameters, types, o) + ) +} + +function fragment(q, parameters, types, options) { + q.fragment = true + return stringify(q, q.strings[0], q.args[0], parameters, types, options) +} + +function valuesBuilder(first, parameters, types, columns, options) { + return first.map(row => + '(' + columns.map(column => + stringifyValue('values', row[column], parameters, types, options) + ).join(',') + ')' + ).join(',') +} + +function values(first, rest, parameters, types, options) { + const multi = Array.isArray(first[0]) + const columns = rest.length ? rest.flat() : Object.keys(multi ? first[0] : first) + return valuesBuilder(multi ? first : [first], parameters, types, columns, options) +} + +function select(first, rest, parameters, types, options) { + typeof first === 'string' && (first = [first].concat(rest)) + if (Array.isArray(first)) + return escapeIdentifiers(first, options) + + let value + const columns = rest.length ? rest.flat() : Object.keys(first) + return columns.map(x => { + value = first[x] + return ( + value instanceof Query ? fragment(value, parameters, types, options) : + value instanceof Identifier ? value.value : + handleValue(value, parameters, types, options) + ) + ' as ' + escapeIdentifier(options.transform.column.to ? options.transform.column.to(x) : x) + }).join(',') +} + +const builders = Object.entries({ + values, + in: (...xs) => { + const x = values(...xs) + return x === '()' ? '(null)' : x + }, + select, + as: select, + returning: select, + '\\(': select, + + update(first, rest, parameters, types, options) { + return (rest.length ? rest.flat() : Object.keys(first)).map(x => + escapeIdentifier(options.transform.column.to ? options.transform.column.to(x) : x) + + '=' + stringifyValue('values', first[x], parameters, types, options) + ) + }, + + insert(first, rest, parameters, types, options) { + const columns = rest.length ? rest.flat() : Object.keys(Array.isArray(first) ? first[0] : first) + return '(' + escapeIdentifiers(columns, options) + ')values' + + valuesBuilder(Array.isArray(first) ? first : [first], parameters, types, columns, options) + } +}).map(([x, fn]) => ([new RegExp('((?:^|[\\s(])' + x + '(?:$|[\\s(]))(?![\\s\\S]*\\1)', 'i'), fn])) + +function notTagged() { + throw Errors.generic('NOT_TAGGED_CALL', 'Query not called as a tagged template literal') +} + +export const serializers = defaultHandlers.serializers +export const parsers = defaultHandlers.parsers + +export const END = {} + +function firstIsString(x) { + if (Array.isArray(x)) + return firstIsString(x[0]) + return typeof x === 'string' ? 1009 : 0 +} + +export const mergeUserTypes = function(types) { + const user = typeHandlers(types || {}) + return { + serializers: Object.assign({}, serializers, user.serializers), + parsers: Object.assign({}, parsers, user.parsers) + } +} + +function typeHandlers(types) { + return Object.keys(types).reduce((acc, k) => { + types[k].from && [].concat(types[k].from).forEach(x => acc.parsers[x] = types[k].parse) + if (types[k].serialize) { + acc.serializers[types[k].to] = types[k].serialize + types[k].from && [].concat(types[k].from).forEach(x => acc.serializers[x] = types[k].serialize) + } + return acc + }, { parsers: {}, serializers: {} }) +} + +function escapeIdentifiers(xs, { transform: { column } }) { + return xs.map(x => escapeIdentifier(column.to ? column.to(x) : x)).join(',') +} + +export const escapeIdentifier = function escape(str) { + return '"' + str.replace(/"/g, '""').replace(/\./g, '"."') + '"' +} + +export const inferType = function inferType(x) { + return ( + x instanceof Parameter ? x.type : + x instanceof Date ? 1184 : + x instanceof Uint8Array ? 17 : + (x === true || x === false) ? 16 : + typeof x === 'bigint' ? 20 : + Array.isArray(x) ? inferType(x[0]) : + 0 + ) +} + +const escapeBackslash = /\\/g +const escapeQuote = /"/g + +function arrayEscape(x) { + return x + .replace(escapeBackslash, '\\\\') + .replace(escapeQuote, '\\"') +} + +export const arraySerializer = function arraySerializer(xs, serializer, options, typarray) { + if (Array.isArray(xs) === false) + return xs + + if (!xs.length) + return '{}' + + const first = xs[0] + // Only _box (1020) has the ';' delimiter for arrays, all other types use the ',' delimiter + const delimiter = typarray === 1020 ? ';' : ',' + + if (Array.isArray(first) && !first.type) + return '{' + xs.map(x => arraySerializer(x, serializer, options, typarray)).join(delimiter) + '}' + + return '{' + xs.map(x => { + if (x === undefined) { + x = options.transform.undefined + if (x === undefined) + throw Errors.generic('UNDEFINED_VALUE', 'Undefined values are not allowed') + } + + return x === null + ? 'null' + : '"' + arrayEscape(serializer ? serializer(x.type ? x.value : x) : '' + x) + '"' + }).join(delimiter) + '}' +} + +const arrayParserState = { + i: 0, + char: null, + str: '', + quoted: false, + last: 0 +} + +export const arrayParser = function arrayParser(x, parser, typarray) { + arrayParserState.i = arrayParserState.last = 0 + return arrayParserLoop(arrayParserState, x, parser, typarray) +} + +function arrayParserLoop(s, x, parser, typarray) { + const xs = [] + // Only _box (1020) has the ';' delimiter for arrays, all other types use the ',' delimiter + const delimiter = typarray === 1020 ? ';' : ',' + for (; s.i < x.length; s.i++) { + s.char = x[s.i] + if (s.quoted) { + if (s.char === '\\') { + s.str += x[++s.i] + } else if (s.char === '"') { + xs.push(parser ? parser(s.str) : s.str) + s.str = '' + s.quoted = x[s.i + 1] === '"' + s.last = s.i + 2 + } else { + s.str += s.char + } + } else if (s.char === '"') { + s.quoted = true + } else if (s.char === '{') { + s.last = ++s.i + xs.push(arrayParserLoop(s, x, parser, typarray)) + } else if (s.char === '}') { + s.quoted = false + s.last < s.i && xs.push(parser ? parser(x.slice(s.last, s.i)) : x.slice(s.last, s.i)) + s.last = s.i + 1 + break + } else if (s.char === delimiter && s.p !== '}' && s.p !== '"') { + xs.push(parser ? parser(x.slice(s.last, s.i)) : x.slice(s.last, s.i)) + s.last = s.i + 1 + } + s.p = s.char + } + s.last < s.i && xs.push(parser ? parser(x.slice(s.last, s.i + 1)) : x.slice(s.last, s.i + 1)) + return xs +} + +export const toCamel = x => { + let str = x[0] + for (let i = 1; i < x.length; i++) + str += x[i] === '_' ? x[++i].toUpperCase() : x[i] + return str +} + +export const toPascal = x => { + let str = x[0].toUpperCase() + for (let i = 1; i < x.length; i++) + str += x[i] === '_' ? x[++i].toUpperCase() : x[i] + return str +} + +export const toKebab = x => x.replace(/_/g, '-') + +export const fromCamel = x => x.replace(/([A-Z])/g, '_$1').toLowerCase() +export const fromPascal = x => (x.slice(0, 1) + x.slice(1).replace(/([A-Z])/g, '_$1')).toLowerCase() +export const fromKebab = x => x.replace(/-/g, '_') + +function createJsonTransform(fn) { + return function jsonTransform(x, column) { + return typeof x === 'object' && x !== null && (column.type === 114 || column.type === 3802) + ? Array.isArray(x) + ? x.map(x => jsonTransform(x, column)) + : Object.entries(x).reduce((acc, [k, v]) => Object.assign(acc, { [fn(k)]: jsonTransform(v, column) }), {}) + : x + } +} + +toCamel.column = { from: toCamel } +toCamel.value = { from: createJsonTransform(toCamel) } +fromCamel.column = { to: fromCamel } + +export const camel = { ...toCamel } +camel.column.to = fromCamel + +toPascal.column = { from: toPascal } +toPascal.value = { from: createJsonTransform(toPascal) } +fromPascal.column = { to: fromPascal } + +export const pascal = { ...toPascal } +pascal.column.to = fromPascal + +toKebab.column = { from: toKebab } +toKebab.value = { from: createJsonTransform(toKebab) } +fromKebab.column = { to: fromKebab } + +export const kebab = { ...toKebab } +kebab.column.to = fromKebab diff --git a/neon/test.js b/neon/test.js new file mode 100644 index 00000000..cfa9eebb --- /dev/null +++ b/neon/test.js @@ -0,0 +1,14 @@ +// Add your database url and run this file with the below two commands to test pages and workers +// npx wrangler@latest pages dev ./neon --script-path test.js --compatibility-date=2023-06-20 --log-level=debug --compatibility-flag=nodejs_compat +// npx wrangler@latest dev ./neon/test.js --compatibility-date=2023-06-20 --log-level=debug --compatibility-flag=nodejs_compat + +import postgres from "./src/index.js"; +const DATABASE_URL = ""; + +export default { + async fetch(r, e, ctx) { + const sql = postgres(DATABASE_URL); + const rows = await sql`SELECT table_name FROM information_schema.columns`; + return new Response(rows.map((e) => e.table_name).join("\n")); + }, +}; diff --git a/package.json b/package.json index 47f3add2..949e728a 100644 --- a/package.json +++ b/package.json @@ -7,6 +7,7 @@ "main": "cjs/src/index.js", "exports": { "types": "./types/index.d.ts", + "./neon": "./neon/src/index.js", "bun": "./src/index.js", "workerd": "./cf/src/index.js", "import": "./src/index.js", @@ -18,10 +19,11 @@ "node": ">=12" }, "scripts": { - "build": "npm run build:cjs && npm run build:deno && npm run build:cf", + "build": "npm run build:cjs && npm run build:deno && npm run build:cf && npm run build:neon", "build:cjs": "node transpile.cjs", "build:deno": "node transpile.deno.js", "build:cf": "node transpile.cf.js", + "build:neon": "node transpile.neon.js", "test": "npm run test:esm && npm run test:cjs && npm run test:deno", "test:esm": "node tests/index.js", "test:cjs": "npm run build:cjs && cd cjs/tests && node index.js && cd ../../", @@ -31,6 +33,8 @@ "prepublishOnly": "npm run lint" }, "files": [ + "/neon/src", + "/neon/polyfills.js", "/cf/src", "/cf/polyfills.js", "/cjs/src", diff --git a/transpile.neon.js b/transpile.neon.js new file mode 100644 index 00000000..dddac6a3 --- /dev/null +++ b/transpile.neon.js @@ -0,0 +1,51 @@ +import fs from "fs"; +import path from "path"; + +const empty = (x) => + fs.readdirSync(x).forEach((f) => fs.unlinkSync(path.join(x, f))), + ensureEmpty = (x) => (!fs.existsSync(x) ? fs.mkdirSync(x) : empty(x)), + root = "neon", + src = path.join(root, "src"); + +ensureEmpty(src); + +fs.readdirSync("src").forEach((name) => + fs.writeFileSync( + path.join(src, name), + transpile(fs.readFileSync(path.join("src", name), "utf8"), name, "src"), + ), +); + +function transpile(x) { + const timers = x.includes("setImmediate") + ? "import { setImmediate, clearImmediate } from '../polyfills.js'\n" + : ""; + + const process = x.includes("process.") + ? "import { process } from '../polyfills.js'\n" + : ""; + + const buffer = x.includes("Buffer") + ? "import { Buffer } from 'node:buffer'\n" + : ""; + + return ( + process + + buffer + + timers + + x + .replace("import net from 'net'", "import { net } from '../polyfills.js'") + .replace("import tls from 'tls'", "import { tls } from '../polyfills.js'") + .replace( + "import crypto from 'crypto'", + "import { crypto } from '../polyfills.js'", + ) + .replace("import os from 'os'", "import { os } from '../polyfills.js'") + .replace("import fs from 'fs'", "import { fs } from '../polyfills.js'") + .replace( + "import { performance } from 'perf_hooks'", + "import { performance } from '../polyfills.js'", + ) + .replace(/ from '([a-z_]+)'/g, " from 'node:$1'") + ); +} From 75b311a09a764501f7b653f44b3f44b2d454c212 Mon Sep 17 00:00:00 2001 From: Nick Randall Date: Thu, 3 Oct 2024 12:19:31 -0400 Subject: [PATCH 2/4] Update neon/polyfills.js --- neon/polyfills.js | 1 - 1 file changed, 1 deletion(-) diff --git a/neon/polyfills.js b/neon/polyfills.js index 24cd3e5c..d8a3029f 100644 --- a/neon/polyfills.js +++ b/neon/polyfills.js @@ -172,7 +172,6 @@ function Socket() { tcp.readyState = "opening"; const rootURL = host + "/v2" + "?address=" + host + ":" + port; const socketURL = "wss://" + rootURL; - console.log("socketURL", socketURL); tcp.ws = new WebSocket(socketURL); // await tcp.ws.accept(); configureWebSocket(tcp.ws); From 99bf636157bceed905e08d3580f367d16b61d7aa Mon Sep 17 00:00:00 2001 From: Nick Randall Date: Thu, 3 Oct 2024 12:19:54 -0400 Subject: [PATCH 3/4] Update neon/polyfills.js --- neon/polyfills.js | 1 - 1 file changed, 1 deletion(-) diff --git a/neon/polyfills.js b/neon/polyfills.js index d8a3029f..3309b0c4 100644 --- a/neon/polyfills.js +++ b/neon/polyfills.js @@ -173,7 +173,6 @@ function Socket() { const rootURL = host + "/v2" + "?address=" + host + ":" + port; const socketURL = "wss://" + rootURL; tcp.ws = new WebSocket(socketURL); - // await tcp.ws.accept(); configureWebSocket(tcp.ws); } catch (err) { console.log("err", err); From cf48a9a489f910059f805f256e3bdf2ee251e922 Mon Sep 17 00:00:00 2001 From: Nick Randall Date: Thu, 3 Oct 2024 13:24:18 -0400 Subject: [PATCH 4/4] fixes --- neon/polyfills.js | 8 -------- neon/src/connection.js | 10 +++++++++- package.json | 2 +- transpile.neon.js | 17 +++++++++++++++++ 4 files changed, 27 insertions(+), 10 deletions(-) diff --git a/neon/polyfills.js b/neon/polyfills.js index 3309b0c4..98413dbb 100644 --- a/neon/polyfills.js +++ b/neon/polyfills.js @@ -137,7 +137,6 @@ function Socket() { write, end, destroy, - read, }); return tcp; @@ -175,7 +174,6 @@ function Socket() { tcp.ws = new WebSocket(socketURL); configureWebSocket(tcp.ws); } catch (err) { - console.log("err", err); error(err); } } @@ -210,11 +208,6 @@ function Socket() { return true; } - async function read() { - // Nothing to do? - console.log("read"); - } - function end(data) { return data ? tcp.write(data, () => tcp.ws.close()) : tcp.ws.close(); } @@ -225,7 +218,6 @@ function Socket() { } function error(err) { - console.log("errfn", err); tcp.emit("error", err); tcp.emit("close"); } diff --git a/neon/src/connection.js b/neon/src/connection.js index ee8b1e69..46b39638 100644 --- a/neon/src/connection.js +++ b/neon/src/connection.js @@ -16,6 +16,7 @@ import b from './bytes.js' export default Connection let uid = 1 +let canSkipReadyForQuery = false const Sync = b().S().end() , Flush = b().H().end() @@ -366,6 +367,9 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose keep_alive && socket.setKeepAlive && socket.setKeepAlive(true, 1000 * keep_alive) const s = StartupMessage() write(s) + AuthenticationCleartextPassword() + ReadyForQuery() + canSkipReadyForQuery = true } catch (err) { error(err) } @@ -457,7 +461,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose x === 49 ? ParseComplete : // 1 x === 116 ? ParameterDescription : // t x === 84 ? RowDescription : // T - x === 82 ? Authentication : // R + x === 82 ? noop : // R x === 110 ? NoData : // n x === 75 ? BackendKeyData : // K x === 69 ? ErrorResponse : // E @@ -520,6 +524,10 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose } function ReadyForQuery(x) { + if (canSkipReadyForQuery) { + canSkipReadyForQuery = false; + return; + } query && query.options.simple && query.resolve(results || result) query = results = null result = new Result() diff --git a/package.json b/package.json index 949e728a..a928b972 100644 --- a/package.json +++ b/package.json @@ -7,7 +7,7 @@ "main": "cjs/src/index.js", "exports": { "types": "./types/index.d.ts", - "./neon": "./neon/src/index.js", + "/neon": "./neon/src/index.js", "bun": "./src/index.js", "workerd": "./cf/src/index.js", "import": "./src/index.js", diff --git a/transpile.neon.js b/transpile.neon.js index dddac6a3..793a56c8 100644 --- a/transpile.neon.js +++ b/transpile.neon.js @@ -1,6 +1,11 @@ import fs from "fs"; import path from "path"; +const skipLogic = ` if (canSkipReadyForQuery) { + canSkipReadyForQuery = false; + return; + }`; + const empty = (x) => fs.readdirSync(x).forEach((f) => fs.unlinkSync(path.join(x, f))), ensureEmpty = (x) => (!fs.existsSync(x) ? fs.mkdirSync(x) : empty(x)), @@ -47,5 +52,17 @@ function transpile(x) { "import { performance } from '../polyfills.js'", ) .replace(/ from '([a-z_]+)'/g, " from 'node:$1'") + // this change "pipelines" the cleartext password and ready for query + // *before* postgres actually asks for it to speed up connection time + // by reducing the number of round-trips + .replace( + /const s = StartupMessage\(\)\n(\s*)write\(s\)\n/gm, + "$&$1AuthenticationCleartextPassword()\n$1ReadyForQuery()\n$1canSkipReadyForQuery = true\n", + ) + // we already sent the password (see above) so we can safely ignore this request + .replace("x === 82 ? Authentication :", "x === 82 ? noop : ") + // simularly, we can also skip the "ReadyForQuery" message when we've already sent it + .replace(/function ReadyForQuery\(x\) {/g, `$&\n${skipLogic}`) + .replace("let uid = 1", "$&\nlet canSkipReadyForQuery = false") ); }