diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 939c8ff..3860f8a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,64 +1,69 @@ name: ci -on: - push: - branches: - - master - pull_request: - branches: - - master - -env: - DENO_VERSION: vx.x.x +on: [push, pull_request] jobs: - check: - name: Check format and lint - runs-on: ubuntu-latest - - steps: - - name: Clone repo - uses: actions/checkout@v4 - - - name: Install deno - uses: denoland/setup-deno@v1 - with: - deno-version: ${{env.DENO_VERSION}} - - - name: Check - run: deno task check - - tests: - name: Run tests + fmt: runs-on: ubuntu-latest - + continue-on-error: true steps: - - name: Clone repo - uses: actions/checkout@v4 - - - name: Install deno + - uses: actions/checkout@v1 + - name: Install Deno 1.x uses: denoland/setup-deno@v1 with: - deno-version: ${{env.DENO_VERSION}} - - - name: Test - run: deno task test:ga - - publish: + deno-version: v1.x + - name: Check fmt + run: deno fmt --check + test: runs-on: ubuntu-latest - - permissions: - contents: read - id-token: write + strategy: + fail-fast: false + matrix: + DENO_VERSION: + - v1.x + DB_VERSION: + - mysql:5.5 + - mysql:5.6 + - mysql:5.7 + - mysql:8 + - mysql:latest + - mariadb:5.5 + - mariadb:10.0 + - mariadb:10.1 + - mariadb:10.2 + - mariadb:10.3 + - mariadb:10.4 +# - mariadb:latest steps: - - name: Checkout - uses: actions/checkout@v4 - - - name: Install deno + - uses: actions/checkout@v1 + - name: Install Deno ${{ matrix.DENO_VERSION }} uses: denoland/setup-deno@v1 with: - deno-version: ${{env.DENO_VERSION}} - - - name: Publish (dry run) - run: deno publish --dry-run + deno-version: ${{ matrix.DENO_VERSION }} + - name: Show Deno version + run: deno --version + - name: Start ${{ matrix.DB_VERSION }} + run: | + sudo mkdir -p /var/run/mysqld/tmp + sudo chmod -R 777 /var/run/mysqld + docker container run --name mysql --rm -d -p 3306:3306 \ + -v /var/run/mysqld:/var/run/mysqld \ + -v /var/run/mysqld/tmp:/tmp \ + -e MYSQL_ROOT_PASSWORD=root \ + ${{ matrix.DB_VERSION }} + ./.github/workflows/wait-for-mysql.sh + - name: Run tests (TCP) + run: | + deno test --allow-env --allow-net=127.0.0.1:3306 ./test.ts + - name: Run tests (--unstable) (UNIX domain socket) + run: | + SOCKPATH=/var/run/mysqld/mysqld.sock + if [[ "${{ matrix.DB_VERSION }}" == "mysql:5.5" ]]; then + SOCKPATH=/var/run/mysqld/tmp/mysql.sock + fi + echo "DROP USER 'root'@'localhost';" | docker exec -i mysql mysql -proot + DB_SOCKPATH=$SOCKPATH TEST_METHODS=unix \ + deno test --unstable --allow-env \ + --allow-read=/var/run/mysqld/ --allow-write=/var/run/mysqld/ \ + ./test.ts diff --git a/.github/workflows/publish-to-nest.land.yml b/.github/workflows/publish-to-nest.land.yml new file mode 100644 index 0000000..87cc581 --- /dev/null +++ b/.github/workflows/publish-to-nest.land.yml @@ -0,0 +1,23 @@ +name: "publish current release to https://nest.land" + +on: + release: + types: + - published + +jobs: + publishToNestDotLand: + runs-on: ubuntu-latest + + steps: + - name: Setup repo + uses: actions/checkout@v2 + + - name: "setup" # check: https://github.com/actions/virtual-environments/issues/1777 + uses: denolib/setup-deno@v2 + with: + deno-version: v1.4.6 + + - name: "check nest.land" + run: | + deno run --allow-net --allow-read --allow-run https://deno.land/x/cicd/publish-on-nest.land.ts ${{ secrets.GITHUB_TOKEN }} ${{ secrets.NESTAPIKEY }} ${{ github.repository }} diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml deleted file mode 100644 index 58837de..0000000 --- a/.github/workflows/publish.yml +++ /dev/null @@ -1,29 +0,0 @@ -name: Publish - -on: - release: - types: [published] - -env: - DENO_VERSION: vx.x.x - -jobs: - publish: - runs-on: ubuntu-latest - - permissions: - contents: read - id-token: write - - steps: - - name: Checkout - uses: actions/checkout@v4 - - - name: Set up Deno - uses: denoland/setup-deno@v1 - with: - deno-version: ${{env.DENO_VERSION}} - - - name: Publish - if: github.event_name == 'release' - run: deno publish diff --git a/.github/workflows/wait-for-mysql.sh b/.github/workflows/wait-for-mysql.sh new file mode 100755 index 0000000..13302dd --- /dev/null +++ b/.github/workflows/wait-for-mysql.sh @@ -0,0 +1,11 @@ +#!/bin/sh + +echo "Waiting for MySQL" +for i in `seq 1 30`; +do + echo '\q' | mysql -h 127.0.0.1 -uroot --password=root -P 3306 && exit 0 + >&2 echo "MySQL is waking up" + sleep 1 +done + +echo "Failed waiting for MySQL" && exit 1 diff --git a/.gitignore b/.gitignore index 17b78fb..520cb2b 100644 --- a/.gitignore +++ b/.gitignore @@ -5,5 +5,4 @@ mysql.log docs .DS_Store .idea -dbtmp -tmp_test + diff --git a/cipher b/cipher new file mode 100644 index 0000000..356b2f8 Binary files /dev/null and b/cipher differ diff --git a/compose.yml b/compose.yml deleted file mode 100644 index 0b984f8..0000000 --- a/compose.yml +++ /dev/null @@ -1,86 +0,0 @@ -services: - mysql: - image: mysql:latest - ports: - - 3313:3306 - pull_policy: always - restart: always - environment: - MYSQL_ALLOW_EMPTY_PASSWORD: true - MYSQL_DATABASE: testdb - healthcheck: - test: ["CMD", "mysqladmin", "ping", "-h", "localhost", "--user", "root"] - interval: 3s - timeout: 3s - retries: 10 - mysql5: - image: mysql:5 - platform: linux/amd64 - ports: - - 3311:3306 - pull_policy: always - restart: always - environment: - MYSQL_ALLOW_EMPTY_PASSWORD: true - MYSQL_DATABASE: testdb - healthcheck: - test: ["CMD", "mysqladmin", "ping", "-h", "localhost", "--user", "root"] - interval: 3s - timeout: 3s - retries: 10 - mysql8: - image: mysql:8 - ports: - - 3312:3306 - pull_policy: always - restart: always - environment: - MYSQL_ALLOW_EMPTY_PASSWORD: true - MYSQL_DATABASE: testdb - healthcheck: - test: ["CMD", "mysqladmin", "ping", "-h", "localhost", "--user", "root"] - interval: 3s - timeout: 3s - retries: 10 - mariadb: - image: mariadb:latest - ports: - - 3316:3306 - pull_policy: always - restart: always - environment: - MARIADB_ALLOW_EMPTY_ROOT_PASSWORD: true - MARIADB_DATABASE: testdb - healthcheck: - test: ["CMD", "mariadb-admin", "ping", "-h", "127.0.0.1"] - interval: 3s - timeout: 3s - retries: 10 - mariadb10: - image: mariadb:10 - ports: - - 3314:3306 - pull_policy: always - restart: always - environment: - MARIADB_ALLOW_EMPTY_ROOT_PASSWORD: true - MARIADB_DATABASE: testdb - healthcheck: - test: ["CMD", "mariadb-admin", "ping", "-h", "127.0.0.1"] - interval: 3s - timeout: 3s - retries: 10 - mariadb11: - image: mariadb:11 - ports: - - 3315:3306 - pull_policy: always - restart: always - environment: - MARIADB_ALLOW_EMPTY_ROOT_PASSWORD: true - MARIADB_DATABASE: testdb - healthcheck: - test: ["CMD", "mariadb-admin", "ping", "-h", "127.0.0.1"] - interval: 3s - timeout: 3s - retries: 10 diff --git a/deno.json b/deno.json deleted file mode 100644 index 5f40acb..0000000 --- a/deno.json +++ /dev/null @@ -1,35 +0,0 @@ -{ - "name": "@db/mysql", - "version": "2.12.2", - "exports": "./mod.ts", - "lock": false, - "tasks": { - "check": "deno task format:check && deno task lint:check && deno task type:check", - "lint:check": "deno lint", - "format:check": "deno fmt --check", - "type:check": "deno check mod.ts", - "doc:check": "deno doc --lint src", - "test": "deno task db:restart && deno test -A; deno task db:stop", - "test:ga": "deno task db:start && deno test -A && deno task db:stop", - "db:restart": "deno task db:stop && deno task db:start", - "db:start": "docker compose up -d --remove-orphans --wait && sleep 2", - "db:stop": "docker compose down --remove-orphans --volumes" - }, - "imports": { - "@halvardm/sqlx": "jsr:@halvardm/sqlx@0.0.0-13", - "@std/assert": "jsr:@std/assert@^0.221.0", - "@std/async": "jsr:@std/async@^0.221.0", - "@std/crypto": "jsr:@std/crypto@^0.221.0", - "@std/encoding": "jsr:@std/encoding@^0.221.0", - "@std/flags": "jsr:@std/flags@^0.221.0", - "@std/fmt": "jsr:@std/fmt@^0.221.0", - "@std/fs": "jsr:@std/fs@^0.222.1", - "@std/log": "jsr:@std/log@^0.221.0", - "@std/path": "jsr:@std/path@^0.222.1", - "@std/semver": "jsr:@std/semver@^0.220.1", - "@std/testing": "jsr:@std/testing@^0.221.0", - "@std/text": "jsr:@std/text@^0.222.1", - "@std/yaml": "jsr:@std/yaml@^0.223.0", - "@stdext/encoding": "jsr:@stdext/encoding@^0.0.2" - } -} diff --git a/deps.ts b/deps.ts new file mode 100644 index 0000000..93a948b --- /dev/null +++ b/deps.ts @@ -0,0 +1,10 @@ +export type { Deferred } from "https://deno.land/std@0.104.0/async/mod.ts"; +export { deferred, delay } from "https://deno.land/std@0.104.0/async/mod.ts"; +export { format as byteFormat } from "https://deno.land/x/bytes_formater@v1.4.0/mod.ts"; +export { createHash } from "https://deno.land/std@0.104.0/hash/mod.ts"; +export { decode as base64Decode } from "https://deno.land/std@0.104.0/encoding/base64.ts"; +export type { + SupportedAlgorithm, +} from "https://deno.land/std@0.104.0/hash/mod.ts"; +export { replaceParams } from "https://deno.land/x/sql_builder@v1.9.1/util.ts"; +export * as log from "https://deno.land/std@0.104.0/log/mod.ts"; diff --git a/egg.json b/egg.json new file mode 100644 index 0000000..e78ef5a --- /dev/null +++ b/egg.json @@ -0,0 +1,10 @@ +{ + "name": "mysql", + "description": "MySQL driver for Deno", + "homepage": "https://github.com/manyuanrong/deno_mysql", + "files": [ + "./**/*.ts", + "README.md" + ], + "entry": "./mod.ts" +} diff --git a/lib/auth_plugins/caching_sha2_password.test.ts b/lib/auth_plugins/caching_sha2_password.test.ts deleted file mode 100644 index dac791a..0000000 --- a/lib/auth_plugins/caching_sha2_password.test.ts +++ /dev/null @@ -1,106 +0,0 @@ -import { assertEquals } from "@std/assert"; -import { PacketReader } from "../packets/packet.ts"; -import { - AuthPluginCachingSha2Password, - AuthStatusFlags, -} from "./caching_sha2_password.ts"; -import { ComQueryResponsePacket } from "../constant/packet.ts"; -import { BufferReader } from "../utils/buffer.ts"; - -Deno.test("AuthPluginCachingSha2Password", async (t) => { - await t.step("statusFlag FastPath", async () => { - const scramble = new Uint8Array([1, 2, 3]); - const password = "password"; - const authPlugin = new AuthPluginCachingSha2Password(scramble, password); - - assertEquals(authPlugin.scramble, scramble); - assertEquals(authPlugin.password, password); - assertEquals(authPlugin.done, false); - assertEquals(authPlugin.quickRead, false); - assertEquals(authPlugin.data, undefined); - - const bodyReader = new BufferReader( - new Uint8Array([0x00, AuthStatusFlags.FastPath]), - ); - await authPlugin.next( - new PacketReader( - { size: 2, no: 0 }, - bodyReader, - ComQueryResponsePacket.OK_Packet, - ), - ); - - assertEquals(authPlugin.done, false); - assertEquals(authPlugin.data, undefined); - assertEquals(authPlugin.quickRead, true); - - await authPlugin.next( - new PacketReader( - { size: 2, no: 0 }, - bodyReader, - ComQueryResponsePacket.OK_Packet, - ), - ); - - assertEquals(authPlugin.done, true); - }); - - await t.step("statusFlag FullAuth", async () => { - const scramble = new Uint8Array([1, 2, 3]); - const password = "password"; - const authPlugin = new AuthPluginCachingSha2Password(scramble, password); - - assertEquals(authPlugin.scramble, scramble); - assertEquals(authPlugin.password, password); - assertEquals(authPlugin.done, false); - assertEquals(authPlugin.quickRead, false); - assertEquals(authPlugin.data, undefined); - - let bodyReader = new BufferReader( - new Uint8Array([0x00, AuthStatusFlags.FullAuth]), - ); - await authPlugin.next( - new PacketReader( - { size: 2, no: 0 }, - bodyReader, - ComQueryResponsePacket.OK_Packet, - ), - ); - - assertEquals(authPlugin.done, false); - assertEquals(authPlugin.data, new Uint8Array([0x02])); - assertEquals(authPlugin.quickRead, false); - - const publicKey = `-----BEGIN PUBLIC KEY----- -MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCkFF85HndOJoTVsuYBNOu4N63s -bVMWfMVZ/ZXFVYeFE7H6Vp0jhu2d6JUUx9WCXV5JOt/mXoCirywhz2LM+f7kaBCh -0YIFh5JKS43a3COC9BJupj2dco/iWEmOFqRvCn/ErQNdmataqQlePq3SitusJwuj -PQogsoytp/nSKLsTLwIDA/+/ ------END PUBLIC KEY-----`; - - const encodedPublicKey = new TextEncoder().encode(publicKey); - - bodyReader = new BufferReader(new Uint8Array([0x00, ...encodedPublicKey])); - await authPlugin.next( - new PacketReader( - { size: 2, no: 0 }, - bodyReader, - ComQueryResponsePacket.OK_Packet, - ), - ); - - assertEquals(authPlugin.done, false); - assertEquals(authPlugin.data?.length, 128); - assertEquals(authPlugin.quickRead, false); - - await authPlugin.next( - new PacketReader( - { size: 2, no: 0 }, - bodyReader, - ComQueryResponsePacket.OK_Packet, - ), - ); - - assertEquals(authPlugin.done, true); - }); -}); diff --git a/lib/auth_plugins/caching_sha2_password.ts b/lib/auth_plugins/caching_sha2_password.ts deleted file mode 100644 index 9a35c2b..0000000 --- a/lib/auth_plugins/caching_sha2_password.ts +++ /dev/null @@ -1,84 +0,0 @@ -import { xor } from "../utils/bytes.ts"; -import type { PacketReader } from "../packets/packet.ts"; -import { encryptWithPublicKey } from "../utils/crypto.ts"; - -export const enum AuthStatusFlags { - FullAuth = 0x04, - FastPath = 0x03, -} - -export class AuthPluginCachingSha2Password { - readonly scramble: Uint8Array; - readonly password: string; - done: boolean = false; - quickRead: boolean = false; - data: Uint8Array | undefined = undefined; - - next: (packet: PacketReader) => Promise = this.authMoreResponse.bind( - this, - ); - - constructor(scramble: Uint8Array, password: string) { - this.scramble = scramble; - this.password = password; - } - - protected terminate() { - this.done = true; - return Promise.resolve(this); - } - - protected authMoreResponse(packet: PacketReader): Promise { - const REQUEST_PUBLIC_KEY = 0x02; - const statusFlag = packet.body.skip(1).readUint8(); - - switch (statusFlag) { - case AuthStatusFlags.FullAuth: { - this.data = new Uint8Array([REQUEST_PUBLIC_KEY]); - this.next = this.encryptWithKey.bind(this); - break; - } - case AuthStatusFlags.FastPath: { - this.quickRead = true; - this.next = this.terminate.bind(this); - break; - } - default: - this.done = true; - } - - return Promise.resolve(this); - } - - protected async encryptWithKey(packet: PacketReader): Promise { - const publicKey = this.parsePublicKey(packet); - const len = this.password.length; - const passwordBuffer: Uint8Array = new Uint8Array(len + 1); - for (let n = 0; n < len; n++) { - passwordBuffer[n] = this.password.charCodeAt(n); - } - passwordBuffer[len] = 0x00; - - const encryptedPassword = await this.encrypt( - passwordBuffer, - this.scramble, - publicKey, - ); - this.next = this.terminate.bind(this); - this.data = new Uint8Array(encryptedPassword); - return this; - } - - protected parsePublicKey(packet: PacketReader): string { - return packet.body.skip(1).readNullTerminatedString(); - } - - async encrypt( - password: Uint8Array, - scramble: Uint8Array, - key: string, - ): Promise { - const stage1 = xor(password, scramble); - return await encryptWithPublicKey(key, stage1); - } -} diff --git a/lib/auth_plugins/mod.ts b/lib/auth_plugins/mod.ts deleted file mode 100644 index 58a459d..0000000 --- a/lib/auth_plugins/mod.ts +++ /dev/null @@ -1,12 +0,0 @@ -import { AuthPluginCachingSha2Password } from "./caching_sha2_password.ts"; - -export { AuthPluginCachingSha2Password }; - -export const AuthPluginName = { - CachingSha2Password: "caching_sha2_password", -} as const; -export type AuthPluginName = typeof AuthPluginName[keyof typeof AuthPluginName]; - -export const AuthPlugins = { - caching_sha2_password: AuthPluginCachingSha2Password, -} as const; diff --git a/lib/client.test.ts b/lib/client.test.ts deleted file mode 100644 index ef1bfc6..0000000 --- a/lib/client.test.ts +++ /dev/null @@ -1,50 +0,0 @@ -import { MysqlClient } from "./client.ts"; -import { QUERIES, services } from "./utils/testing.ts"; -import { clientTest } from "@halvardm/sqlx/testing"; - -Deno.test("Client Test", async (t) => { - for (const service of services) { - await t.step(`Testing ${service.name}`, async (t) => { - await t.step(`TCP`, async (t) => { - await clientTest({ - t, - Client: MysqlClient, - connectionUrl: service.url, - connectionOptions: {}, - queries: QUERIES, - }); - }); - - // Enable once socket connection issue is fixed - // - // await t.step(`UNIX Socket`, async (t) => { - // await implementationTest({ - // t, - // Client: MysqlClient, - // // deno-lint-ignore no-explicit-any - // PoolClient: MysqlClientPool as any, - // connectionUrl: service.urlSocket, - // connectionOptions: {}, - // queries: { - // createTable: - // "CREATE TABLE IF NOT EXISTS sqlxtesttable (testcol TEXT)", - // dropTable: "DROP TABLE IF EXISTS sqlxtesttable", - // insertOneToTable: "INSERT INTO sqlxtesttable (testcol) VALUES (?)", - // insertManyToTable: - // "INSERT INTO sqlxtesttable (testcol) VALUES (?),(?),(?)", - // selectOneFromTable: - // "SELECT * FROM sqlxtesttable WHERE testcol = ? LIMIT 1", - // selectByMatchFromTable: - // "SELECT * FROM sqlxtesttable WHERE testcol = ?", - // selectManyFromTable: "SELECT * FROM sqlxtesttable", - // select1AsString: "SELECT '1' as result", - // select1Plus1AsNumber: "SELECT 1+1 as result", - // deleteByMatchFromTable: - // "DELETE FROM sqlxtesttable WHERE testcol = ?", - // deleteAllFromTable: "DELETE FROM sqlxtesttable", - // }, - // }); - // }); - }); - } -}); diff --git a/lib/client.ts b/lib/client.ts deleted file mode 100644 index 1a29909..0000000 --- a/lib/client.ts +++ /dev/null @@ -1,64 +0,0 @@ -import { SqlxClient } from "@halvardm/sqlx"; -import { MysqlConnection, type MysqlConnectionOptions } from "./connection.ts"; -import type { MysqlParameterType } from "./packets/parsers/result.ts"; -import { - MysqlClientCloseEvent, - MysqlClientEventTarget, -} from "./utils/events.ts"; -import { MysqlClientConnectEvent } from "../mod.ts"; -import { - type MysqlPrepared, - type MysqlQueryOptions, - type MySqlTransaction, - MysqlTransactionable, - type MysqlTransactionOptions, -} from "./sqlx.ts"; - -export interface MysqlClientOptions extends MysqlConnectionOptions { -} - -/** - * MySQL client - */ -export class MysqlClient extends MysqlTransactionable implements - SqlxClient< - MysqlClientEventTarget, - MysqlConnectionOptions, - MysqlConnection, - MysqlParameterType, - MysqlQueryOptions, - MysqlPrepared, - MysqlTransactionOptions, - MySqlTransaction - > { - eventTarget: MysqlClientEventTarget; - connectionUrl: string; - connectionOptions: MysqlConnectionOptions; - - constructor( - connectionUrl: string | URL, - connectionOptions: MysqlClientOptions = {}, - ) { - const conn = new MysqlConnection(connectionUrl, connectionOptions); - super(conn); - this.connectionUrl = connectionUrl.toString(); - this.connectionOptions = connectionOptions; - this.eventTarget = new MysqlClientEventTarget(); - } - async connect(): Promise { - await this.connection.connect(); - this.eventTarget.dispatchEvent( - new MysqlClientConnectEvent({ connectable: this }), - ); - } - async close(): Promise { - this.eventTarget.dispatchEvent( - new MysqlClientCloseEvent({ connectable: this }), - ); - await this.connection.close(); - } - - async [Symbol.asyncDispose](): Promise { - await this.close(); - } -} diff --git a/lib/connection.test.ts b/lib/connection.test.ts deleted file mode 100644 index 25c1ff9..0000000 --- a/lib/connection.test.ts +++ /dev/null @@ -1,460 +0,0 @@ -import { assertEquals, assertInstanceOf } from "@std/assert"; -import { emptyDir } from "@std/fs"; -import { join } from "@std/path"; -import { MysqlConnection } from "./connection.ts"; -import { DIR_TMP_TEST } from "./utils/testing.ts"; -import { buildQuery } from "./packets/builders/query.ts"; -import { URL_TEST_CONNECTION } from "./utils/testing.ts"; -import { connectionConstructorTest } from "@halvardm/sqlx/testing"; - -Deno.test("Connection", async (t) => { - await emptyDir(DIR_TMP_TEST); - - const PATH_PEM_CA = join(DIR_TMP_TEST, "ca.pem"); - const PATH_PEM_CA2 = join(DIR_TMP_TEST, "ca2.pem"); - const PATH_PEM_CERT = join(DIR_TMP_TEST, "cert.pem"); - const PATH_PEM_KEY = join(DIR_TMP_TEST, "key.pem"); - - await Deno.writeTextFile(PATH_PEM_CA, "ca"); - await Deno.writeTextFile(PATH_PEM_CA2, "ca2"); - await Deno.writeTextFile(PATH_PEM_CERT, "cert"); - await Deno.writeTextFile(PATH_PEM_KEY, "key"); - - await t.step("can construct", async (t) => { - const connection = new MysqlConnection(URL_TEST_CONNECTION); - - assertInstanceOf(connection, MysqlConnection); - assertEquals(connection.connectionUrl, URL_TEST_CONNECTION); - - await t.step("can parse connection config simple", () => { - const url = new URL("mysql://user:pass@127.0.0.1:3306/db"); - - const c = new MysqlConnection(url.toString()); - - assertEquals(c.config, { - protocol: "mysql", - username: "user", - password: "pass", - hostname: "127.0.0.1", - port: 3306, - schema: "db", - socket: undefined, - tls: undefined, - parameters: {}, - }); - }); - await t.step("can parse connection config full", () => { - const url = new URL("mysql://user:pass@127.0.0.1:3306/db"); - url.searchParams.set("socket", "/tmp/mysql.sock"); - url.searchParams.set("ssl-mode", "VERIFY_IDENTITY"); - url.searchParams.set("ssl-ca", PATH_PEM_CA); - url.searchParams.set("ssl-capath", DIR_TMP_TEST); - url.searchParams.set("ssl-cert", PATH_PEM_CERT); - url.searchParams.set("ssl-cipher", "cipher"); - url.searchParams.set("ssl-crl", "crl.pem"); - url.searchParams.set("ssl-crlpath", "crlpath.pem"); - url.searchParams.set("ssl-key", PATH_PEM_KEY); - url.searchParams.set("tls-version", "TLSv1.2,TLSv1.3"); - url.searchParams.set("tls-versions", "[TLSv1.2,TLSv1.3]"); - url.searchParams.set("tls-ciphersuites", "ciphersuites"); - url.searchParams.set("auth-method", "AUTO"); - url.searchParams.set("get-server-public-key", "true"); - url.searchParams.set("server-public-key-path", "key.pem"); - url.searchParams.set("ssh", "usr@host:port"); - url.searchParams.set("uri", "mysql://user@127.0.0.1:3306"); - url.searchParams.set("ssh-password", "pass"); - url.searchParams.set("ssh-config-file", "config"); - url.searchParams.set("ssh-config-file", "config"); - url.searchParams.set("ssh-identity-file", "identity"); - url.searchParams.set("ssh-identity-pass", "identitypass"); - url.searchParams.set("connect-timeout", "10"); - url.searchParams.set("compression", "preferred"); - url.searchParams.set("compression-algorithms", "algo"); - url.searchParams.set("compression-level", "level"); - url.searchParams.set("connection-attributes", "true"); - - const c = new MysqlConnection(url.toString()); - - assertEquals(c.config, { - protocol: "mysql", - username: "user", - password: "pass", - hostname: "127.0.0.1", - port: 3306, - socket: "/tmp/mysql.sock", - schema: "db", - tls: { - mode: "VERIFY_IDENTITY", - caCerts: [ - "ca", - "ca2", - "cert", - "key", - ], - cert: "cert", - hostname: "127.0.0.1", - key: "key", - port: 3306, - }, - parameters: { - socket: "/tmp/mysql.sock", - sslMode: "VERIFY_IDENTITY", - sslCa: [PATH_PEM_CA], - sslCapath: [DIR_TMP_TEST], - sslCert: PATH_PEM_CERT, - sslCipher: "cipher", - sslCrl: "crl.pem", - sslCrlpath: "crlpath.pem", - sslKey: PATH_PEM_KEY, - tlsVersion: "TLSv1.2,TLSv1.3", - tlsVersions: "[TLSv1.2,TLSv1.3]", - tlsCiphersuites: "ciphersuites", - authMethod: "AUTO", - getServerPublicKey: true, - serverPublicKeyPath: "key.pem", - ssh: "usr@host:port", - uri: "mysql://user@127.0.0.1:3306", - sshPassword: "pass", - sshConfigFile: "config", - sshIdentityFile: "identity", - sshIdentityPass: "identitypass", - connectTimeout: 10, - compression: "preferred", - compressionAlgorithms: "algo", - compressionLevel: "level", - connectionAttributes: "true", - }, - }); - }); - - await connection.close(); - }); - - await connectionConstructorTest({ - t, - Connection: MysqlConnection, - connectionUrl: URL_TEST_CONNECTION, - connectionOptions: {}, - }); - - await t.step("can query database", async (t) => { - await using connection = new MysqlConnection(URL_TEST_CONNECTION); - await connection.connect(); - await t.step("can sendData", async () => { - const data = buildQuery("SELECT 1+1 AS result;"); - for await (const result1 of connection.sendData(data)) { - assertEquals(result1, { - row: [2], - fields: [ - { - catalog: "def", - decimals: 0, - defaultVal: "", - encoding: 63, - fieldFlag: 129, - fieldLen: 3, - fieldType: 8, - name: "result", - originName: "", - originTable: "", - schema: "", - table: "", - }, - ], - }); - } - }); - - await t.step("can parse time", async () => { - const data = buildQuery(`SELECT CAST("09:04:10" AS time) as time`); - for await (const result1 of connection.sendData(data)) { - assertEquals(result1, { - row: ["09:04:10"], - fields: [ - { - catalog: "def", - decimals: 0, - defaultVal: "", - encoding: 63, - fieldFlag: 128, - fieldLen: 10, - fieldType: 11, - name: "time", - originName: "", - originTable: "", - schema: "", - table: "", - }, - ], - }); - } - }); - - await t.step("can parse date", async () => { - const data = buildQuery( - `SELECT CAST("2024-04-15 09:04:10" AS date) as date`, - ); - for await (const result1 of connection.sendData(data)) { - assertEquals(result1, { - row: [new Date("2024-04-15T00:00:00.000Z")], - fields: [ - { - catalog: "def", - decimals: 0, - defaultVal: "", - encoding: 63, - fieldFlag: 128, - fieldLen: 10, - fieldType: 10, - name: "date", - originName: "", - originTable: "", - schema: "", - table: "", - }, - ], - }); - } - }); - - await t.step("can parse bigint", async () => { - const data = buildQuery(`SELECT 9223372036854775807 as result`); - for await (const result1 of connection.sendData(data)) { - assertEquals(result1, { - row: [9223372036854775807n], - fields: [ - { - catalog: "def", - decimals: 0, - defaultVal: "", - encoding: 63, - fieldFlag: 129, - fieldLen: 20, - fieldType: 8, - name: "result", - originName: "", - originTable: "", - schema: "", - table: "", - }, - ], - }); - } - }); - - await t.step("can parse decimal", async () => { - const data = buildQuery( - `SELECT 0.012345678901234567890123456789 as result`, - ); - for await (const result1 of connection.sendData(data)) { - assertEquals(result1, { - row: ["0.012345678901234567890123456789"], - fields: [ - { - catalog: "def", - decimals: 30, - defaultVal: "", - encoding: 63, - fieldFlag: 129, - fieldLen: 33, - fieldType: 246, - name: "result", - originName: "", - originTable: "", - schema: "", - table: "", - }, - ], - }); - } - }); - - await t.step("can parse empty string", async () => { - const data = buildQuery(`SELECT '' as result`); - for await (const result1 of connection.sendData(data)) { - assertEquals(result1, { - row: [""], - fields: [ - { - catalog: "def", - decimals: 31, - defaultVal: "", - encoding: 33, - fieldFlag: 1, - fieldLen: 0, - fieldType: 253, - name: "result", - originName: "", - originTable: "", - schema: "", - table: "", - }, - ], - }); - } - }); - - await t.step("can drop and create table", async () => { - const dropTableSql = buildQuery("DROP TABLE IF EXISTS test;"); - const dropTableReturned = connection.sendData(dropTableSql); - assertEquals(await dropTableReturned.next(), { - done: true, - value: { affectedRows: 0, lastInsertId: 0 }, - }); - const createTableSql = buildQuery( - "CREATE TABLE IF NOT EXISTS test (id INT);", - ); - const createTableReturned = connection.sendData(createTableSql); - assertEquals(await createTableReturned.next(), { - done: true, - value: { affectedRows: 0, lastInsertId: 0 }, - }); - const result = await Array.fromAsync(createTableReturned); - assertEquals(result, []); - }); - - await t.step("can insert to table", async () => { - const data = buildQuery("INSERT INTO test (id) VALUES (1),(2),(3);"); - const returned = connection.sendData(data); - assertEquals(await returned.next(), { - done: true, - value: { affectedRows: 3, lastInsertId: 0 }, - }); - const result = await Array.fromAsync(returned); - assertEquals(result, []); - }); - - await t.step("can select from table using sendData", async () => { - const data = buildQuery("SELECT * FROM test;"); - const returned = connection.sendData(data); - const result = await Array.fromAsync(returned); - assertEquals(result, [ - { - fields: [ - { - catalog: "def", - decimals: 0, - defaultVal: "", - encoding: 63, - fieldFlag: 0, - fieldLen: 11, - fieldType: 3, - name: "id", - originName: "id", - originTable: "test", - schema: "testdb", - table: "test", - }, - ], - row: [ - 1, - ], - }, - { - fields: [ - { - catalog: "def", - decimals: 0, - defaultVal: "", - encoding: 63, - fieldFlag: 0, - fieldLen: 11, - fieldType: 3, - name: "id", - originName: "id", - originTable: "test", - schema: "testdb", - table: "test", - }, - ], - row: [ - 2, - ], - }, - { - fields: [ - { - catalog: "def", - decimals: 0, - defaultVal: "", - encoding: 63, - fieldFlag: 0, - fieldLen: 11, - fieldType: 3, - name: "id", - originName: "id", - originTable: "test", - schema: "testdb", - table: "test", - }, - ], - row: [ - 3, - ], - }, - ]); - }); - - await t.step("can insert to table using executeRaw", async () => { - const data = buildQuery("INSERT INTO test (id) VALUES (4);"); - const result = await connection.executeRaw(data); - assertEquals(result, 1); - }); - - await t.step("can select from table using executeRaw", async () => { - const data = buildQuery("SELECT * FROM test;"); - const result = await connection.executeRaw(data); - assertEquals(result, undefined); - }); - - await t.step("can insert to table using queryManyObjectRaw", async () => { - const data = buildQuery("INSERT INTO test (id) VALUES (5);"); - const result = await Array.fromAsync(connection.queryManyObjectRaw(data)); - assertEquals(result, []); - }); - - await t.step("can select from table using queryManyObjectRaw", async () => { - const data = buildQuery("SELECT * FROM test;"); - const result = await Array.fromAsync(connection.queryManyObjectRaw(data)); - assertEquals(result, [ - { id: 1 }, - { id: 2 }, - { id: 3 }, - { id: 4 }, - { id: 5 }, - ]); - }); - - await t.step("can insert to table using queryManyArrayRaw", async () => { - const data = buildQuery("INSERT INTO test (id) VALUES (6);"); - const result = await Array.fromAsync(connection.queryManyArrayRaw(data)); - assertEquals(result, []); - }); - - await t.step("can select from table using queryManyArrayRaw", async () => { - const data = buildQuery("SELECT * FROM test;"); - const result = await Array.fromAsync(connection.queryManyArrayRaw(data)); - assertEquals(result, [ - [1], - [2], - [3], - [4], - [5], - [6], - ]); - }); - - await t.step("can drop table", async () => { - const data = buildQuery("DROP TABLE IF EXISTS test;"); - const returned = connection.sendData(data); - assertEquals(await returned.next(), { - done: true, - value: { affectedRows: 0, lastInsertId: 0 }, - }); - const result = await Array.fromAsync(returned); - assertEquals(result, []); - }); - }); - - await emptyDir(DIR_TMP_TEST); -}); diff --git a/lib/connection.ts b/lib/connection.ts deleted file mode 100644 index 03c6157..0000000 --- a/lib/connection.ts +++ /dev/null @@ -1,623 +0,0 @@ -import { - MysqlConnectionError, - MysqlProtocolError, - MysqlReadError, - MysqlResponseTimeoutError, -} from "./utils/errors.ts"; -import { buildAuth } from "./packets/builders/auth.ts"; -import { PacketReader, PacketWriter } from "./packets/packet.ts"; -import { parseError } from "./packets/parsers/err.ts"; -import { - AuthResult, - parseAuth, - parseHandshake, -} from "./packets/parsers/handshake.ts"; -import { - type FieldInfo, - getRowObject, - type MysqlParameterType, - parseField, - parseRowArray, -} from "./packets/parsers/result.ts"; -import { ComQueryResponsePacket } from "./constant/packet.ts"; -import { AuthPlugins } from "./auth_plugins/mod.ts"; -import { parseAuthSwitch } from "./packets/parsers/authswitch.ts"; -import auth from "./utils/hash.ts"; -import { ServerCapabilities } from "./constant/capabilities.ts"; -import { buildSSLRequest } from "./packets/builders/tls.ts"; -import { logger } from "./utils/logger.ts"; -import { - type ArrayRow, - type Row, - SqlxBase, - type SqlxConnection, - type SqlxConnectionOptions, - type SqlxQueryOptions, -} from "@halvardm/sqlx"; -import { resolve } from "@std/path"; -import { toCamelCase } from "@std/text"; -import { AuthPluginName } from "./auth_plugins/mod.ts"; - -/** - * Connection state - */ -export enum ConnectionState { - CONNECTING, - CONNECTED, - CLOSING, - CLOSED, -} - -export type ConnectionSendDataNext = { - row: ArrayRow; - fields: FieldInfo[]; -}; -export type ConnectionSendDataResult = { - affectedRows: number | undefined; - lastInsertId: number | undefined; -}; - -/** - * Tls mode for mysql connection - * - * @see {@link https://dev.mysql.com/doc/refman/8.0/en/connection-options.html#option_general_ssl-mode} - */ -export const TlsMode = { - Preferred: "PREFERRED", - Disabled: "DISABLED", - Required: "REQUIRED", - VerifyCa: "VERIFY_CA", - VerifyIdentity: "VERIFY_IDENTITY", -} as const; -export type TlsMode = typeof TlsMode[keyof typeof TlsMode]; - -export interface TlsOptions extends Deno.ConnectTlsOptions { - mode: TlsMode; -} - -/** - * Aditional connection parameters - * - * @see {@link https://dev.mysql.com/doc/refman/8.0/en/connecting-using-uri-or-key-value-pairs.html#connecting-using-uri} - */ -export interface ConnectionParameters { - socket?: string; - sslMode?: TlsMode; - sslCa?: string[]; - sslCapath?: string[]; - sslCert?: string; - sslCipher?: string; - sslCrl?: string; - sslCrlpath?: string; - sslKey?: string; - tlsVersion?: string; - tlsVersions?: string; - tlsCiphersuites?: string; - authMethod?: string; - getServerPublicKey?: boolean; - serverPublicKeyPath?: string; - ssh?: string; - uri?: string; - sshPassword?: string; - sshConfigFile?: string; - sshIdentityFile?: string; - sshIdentityPass?: string; - connectTimeout?: number; - compression?: string; - compressionAlgorithms?: string; - compressionLevel?: string; - connectionAttributes?: string; -} - -export interface ConnectionConfig { - protocol: string; - username: string; - password?: string; - hostname: string; - port: number; - socket?: string; - schema?: string; - /** - * Tls options - */ - tls?: Partial; - /** - * Aditional connection parameters - */ - parameters: ConnectionParameters; -} - -export interface MysqlConnectionOptions extends SqlxConnectionOptions { - tls?: Partial; -} - -/** Connection for mysql */ -export class MysqlConnection extends SqlxBase implements - SqlxConnection< - MysqlConnectionOptions - > { - state: ConnectionState = ConnectionState.CONNECTING; - capabilities: number = 0; - serverVersion: string = ""; - - protected _conn: Deno.Conn | null = null; - private _timedOut = false; - - readonly connectionUrl: string; - readonly connectionOptions: MysqlConnectionOptions; - readonly config: ConnectionConfig; - - get conn(): Deno.Conn { - if (!this._conn) { - throw new MysqlConnectionError("Not connected"); - } - if (this.state != ConnectionState.CONNECTED) { - if (this.state == ConnectionState.CLOSED) { - throw new MysqlConnectionError("Connection is closed"); - } else { - throw new MysqlConnectionError("Must be connected first"); - } - } - return this._conn; - } - - set conn(conn: Deno.Conn | null) { - this._conn = conn; - } - get connected(): boolean { - return this.state === ConnectionState.CONNECTED; - } - constructor( - connectionUrl: string | URL, - connectionOptions: MysqlConnectionOptions = {}, - ) { - super(); - this.connectionUrl = connectionUrl.toString().split("?")[0]; - this.connectionOptions = connectionOptions; - this.config = this.#parseConnectionConfig( - connectionUrl, - connectionOptions, - ); - } - - async connect(): Promise { - // TODO: implement connect timeout - if ( - this.config.tls?.mode && - this.config.tls?.mode !== TlsMode.Disabled && - this.config.tls?.mode !== TlsMode.VerifyIdentity - ) { - throw new Error("unsupported tls mode"); - } - - logger().info( - `connecting ${this.connectionUrl},${JSON.stringify(this.config)}`, - ); - - if (this.config.socket) { - this.conn = await Deno.connect({ - transport: "unix", - path: this.config.socket, - }); - } else { - this.conn = await Deno.connect({ - transport: "tcp", - hostname: this.config.hostname, - port: this.config.port, - }); - } - - try { - let receive = await this.#nextPacket(); - const handshakePacket = parseHandshake(receive.body); - - let handshakeSequenceNumber = receive.header.no; - - // Deno.startTls() only supports VERIFY_IDENTITY now. - let isSSL = false; - if ( - this.config.tls?.mode === TlsMode.VerifyIdentity - ) { - if ( - (handshakePacket.serverCapabilities & - ServerCapabilities.CLIENT_SSL) === 0 - ) { - throw new Error("Server does not support TLS"); - } - if ( - (handshakePacket.serverCapabilities & - ServerCapabilities.CLIENT_SSL) !== 0 - ) { - const tlsData = buildSSLRequest(handshakePacket, { - db: this.config.schema, - }); - await PacketWriter.write( - this.conn, - tlsData, - ++handshakeSequenceNumber, - ); - this.conn = await Deno.startTls(this.conn as Deno.TcpConn, { - hostname: this.config.hostname, - caCerts: this.config.tls?.caCerts, - }); - } - isSSL = true; - } - - const data = await buildAuth(handshakePacket, { - username: this.config.username, - password: this.config.password, - db: this.config.schema, - ssl: isSSL, - }); - - await PacketWriter.write(this._conn!, data, ++handshakeSequenceNumber); - - this.state = ConnectionState.CONNECTING; - this.serverVersion = handshakePacket.serverVersion; - this.capabilities = handshakePacket.serverCapabilities; - - receive = await this.#nextPacket(); - - const authResult = parseAuth(receive); - let authPlugin: AuthPluginName | undefined = undefined; - - switch (authResult) { - case AuthResult.AuthMoreRequired: { - authPlugin = handshakePacket.authPluginName as AuthPluginName; - break; - } - case AuthResult.MethodMismatch: { - const authSwitch = parseAuthSwitch(receive.body); - // If CLIENT_PLUGIN_AUTH capability is not supported, no new cipher is - // sent and we have to keep using the cipher sent in the init packet. - if ( - authSwitch.authPluginData === undefined || - authSwitch.authPluginData.length === 0 - ) { - authSwitch.authPluginData = handshakePacket.seed; - } - - let authData; - if (this.config.password) { - authData = await auth( - authSwitch.authPluginName, - this.config.password, - authSwitch.authPluginData, - ); - } else { - authData = Uint8Array.from([]); - } - - await PacketWriter.write( - this.conn, - authData, - receive.header.no + 1, - ); - - receive = await this.#nextPacket(); - const authSwitch2 = parseAuthSwitch(receive.body); - if (authSwitch2.authPluginName !== "") { - throw new Error( - "Do not allow to change the auth plugin more than once!", - ); - } - } - } - - if (authPlugin) { - switch (authPlugin) { - case AuthPluginName.CachingSha2Password: { - const plugin = new AuthPlugins[authPlugin]( - handshakePacket.seed, - this.config.password!, - ); - - while (!plugin.done) { - if (plugin.data) { - const sequenceNumber = receive.header.no + 1; - await PacketWriter.write( - this.conn, - plugin.data, - sequenceNumber, - ); - receive = await this.#nextPacket(); - } - if (plugin.quickRead) { - await this.#nextPacket(); - } - - await plugin.next(receive); - } - break; - } - default: - throw new Error("Unsupported auth plugin"); - } - } - - const header = receive.body.readUint8(); - if (header === 0xff) { - const error = parseError(receive.body, this); - logger().error(`connect error(${error.code}): ${error.message}`); - this.close(); - throw new Error(error.message); - } else { - logger().info(`connected to ${this.connectionUrl}`); - this.state = ConnectionState.CONNECTED; - } - } catch (error) { - // Call close() to avoid leaking socket. - this.close(); - throw error; - } - } - - close(): Promise { - if (this.state != ConnectionState.CLOSED) { - logger().info("close connection"); - this._conn?.close(); - this.state = ConnectionState.CLOSED; - } - return Promise.resolve(); - } - - /** - * Parses the connection url and options into a connection config - */ - #parseConnectionConfig( - connectionUrl: string | URL, - connectionOptions: MysqlConnectionOptions, - ): ConnectionConfig { - function parseParameters(url: URL): ConnectionParameters { - const parameters: ConnectionParameters = {}; - for (const [key, value] of url.searchParams) { - const pKey = toCamelCase(key); - if (pKey === "sslCa") { - if (!parameters.sslCa) { - parameters.sslCa = []; - } - parameters.sslCa.push(value); - } else if (pKey === "sslCapath") { - if (!parameters.sslCapath) { - parameters.sslCapath = []; - } - parameters.sslCapath.push(value); - } else if (pKey === "getServerPublicKey") { - parameters.getServerPublicKey = value === "true"; - } else if (pKey === "connectTimeout") { - parameters.connectTimeout = parseInt(value); - } else { - // deno-lint-ignore no-explicit-any - parameters[pKey as keyof ConnectionParameters] = value as any; - } - } - return parameters; - } - - function parseTlsOptions(config: ConnectionConfig): TlsOptions | undefined { - const baseTlsOptions: TlsOptions = { - port: config.port, - hostname: config.hostname, - mode: TlsMode.Preferred, - }; - - if (connectionOptions.tls) { - return { - ...baseTlsOptions, - ...connectionOptions.tls, - }; - } - - if (config.parameters.sslMode) { - const tlsOptions: TlsOptions = { - ...baseTlsOptions, - mode: config.parameters.sslMode, - }; - - const caCertPaths = new Set(); - - if (config.parameters.sslCa?.length) { - for (const caCert of config.parameters.sslCa) { - caCertPaths.add(resolve(caCert)); - } - } - - if (config.parameters.sslCapath?.length) { - for (const caPath of config.parameters.sslCapath) { - for (const f of Deno.readDirSync(caPath)) { - if (f.isFile && f.name.endsWith(".pem")) { - caCertPaths.add(resolve(caPath, f.name)); - } - } - } - } - - if (caCertPaths.size) { - tlsOptions.caCerts = []; - for (const caCert of caCertPaths) { - const content = Deno.readTextFileSync(caCert); - tlsOptions.caCerts.push(content); - } - // Due to some random bug in CI, we need to sort this for the test to pass consistently. - tlsOptions.caCerts.sort(); - } - - if (config.parameters.sslKey) { - tlsOptions.key = Deno.readTextFileSync( - resolve(config.parameters.sslKey), - ); - } - - if (config.parameters.sslCert) { - tlsOptions.cert = Deno.readTextFileSync( - resolve(config.parameters.sslCert), - ); - } - - return tlsOptions; - } - return undefined; - } - - const url = new URL(connectionUrl); - const parameters = parseParameters(url); - const config: ConnectionConfig = { - protocol: url.protocol.slice(0, -1), - username: url.username, - password: url.password || undefined, - hostname: url.hostname, - port: parseInt(url.port || "3306"), - schema: url.pathname.slice(1), - parameters: parameters, - socket: parameters.socket, - }; - - config.tls = parseTlsOptions(config); - - return config; - } - - async #nextPacket(): Promise { - if (!this._conn) { - throw new MysqlConnectionError("Not connected"); - } - - const timeoutTimer = this.config.parameters.connectTimeout - ? setTimeout( - this.#timeoutCallback, - this.config.parameters.connectTimeout, - ) - : null; - let packet: PacketReader | null; - try { - packet = await PacketReader.read(this._conn); - } catch (error) { - if (this._timedOut) { - // Connection has been closed by timeoutCallback. - throw new MysqlResponseTimeoutError("Connection read timed out"); - } - timeoutTimer && clearTimeout(timeoutTimer); - this.close(); - throw error; - } - timeoutTimer && clearTimeout(timeoutTimer); - - if (!packet) { - // Connection is half-closed by the remote host. - // Call close() to avoid leaking socket. - this.close(); - throw new MysqlReadError("Connection closed unexpectedly"); - } - if (packet.type === ComQueryResponsePacket.ERR_Packet) { - packet.body.skip(1); - const error = parseError(packet.body, this); - throw new Error(error.message); - } - return packet; - } - - #timeoutCallback = () => { - logger().info("connection read timed out"); - this._timedOut = true; - this.close(); - }; - - async *sendData( - data: Uint8Array, - options?: SqlxQueryOptions, - ): AsyncGenerator< - ConnectionSendDataNext, - ConnectionSendDataResult | undefined - > { - try { - await PacketWriter.write(this.conn, data, 0); - let receive = await this.#nextPacket(); - logger().debug(`packet type: ${receive.type.toString()}`); - if (receive.type === ComQueryResponsePacket.OK_Packet) { - receive.body.skip(1); - return { - affectedRows: receive.body.readEncodedLen(), - lastInsertId: receive.body.readEncodedLen(), - }; - } else if (receive.type !== ComQueryResponsePacket.Result) { - throw new MysqlProtocolError(receive.type.toString()); - } - let fieldCount = receive.body.readEncodedLen(); - const fields: FieldInfo[] = []; - while (fieldCount--) { - const packet = await this.#nextPacket(); - if (packet) { - const field = parseField(packet.body); - fields.push(field); - } - } - - if (!(this.capabilities & ServerCapabilities.CLIENT_DEPRECATE_EOF)) { - // EOF(mysql < 5.7 or mariadb < 10.2) - receive = await this.#nextPacket(); - if (receive.type !== ComQueryResponsePacket.EOF_Packet) { - throw new MysqlProtocolError(receive.type.toString()); - } - } - - receive = await this.#nextPacket(); - - while (receive.type !== ComQueryResponsePacket.EOF_Packet) { - const row = parseRowArray(receive.body, fields, options); - yield { - row, - fields, - }; - receive = await this.#nextPacket(); - } - } catch (error) { - this.close(); - throw error; - } - } - - async executeRaw( - data: Uint8Array, - options?: SqlxQueryOptions, - ): Promise { - const gen = this.sendData(data, options); - let result = await gen.next(); - if (result.done) { - return result.value?.affectedRows; - } - - const debugRest = []; - debugRest.push(result); - while (!result.done) { - result = await gen.next(); - debugRest.push(result); - logger().debug(`executeRaw overflow: ${JSON.stringify(debugRest)}`); - } - logger().debug(`executeRaw overflow: ${JSON.stringify(debugRest)}`); - return undefined; - } - - async *queryManyObjectRaw = Row>( - data: Uint8Array, - options?: SqlxQueryOptions, - ): AsyncIterableIterator { - for await (const res of this.sendData(data, options)) { - yield getRowObject(res.fields, res.row) as T; - } - } - - async *queryManyArrayRaw = ArrayRow>( - data: Uint8Array, - options?: SqlxQueryOptions, - ): AsyncIterableIterator { - for await (const res of this.sendData(data, options)) { - const row = res.row as T; - yield row as T; - } - } - - async [Symbol.asyncDispose](): Promise { - await this.close(); - } -} diff --git a/lib/constant/mysql_types.ts b/lib/constant/mysql_types.ts deleted file mode 100644 index 997fba8..0000000 --- a/lib/constant/mysql_types.ts +++ /dev/null @@ -1,35 +0,0 @@ -/** - * MySQL data types - */ -export const MysqlDataType = { - Decimal: 0x00, - Tiny: 0x01, - Short: 0x02, - Long: 0x03, - Float: 0x04, - Double: 0x05, - Null: 0x06, - Timestamp: 0x07, - LongLong: 0x08, - Int24: 0x09, - Date: 0x0a, - Time: 0x0b, - DateTime: 0x0c, - Year: 0x0d, - NewDate: 0x0e, - VarChar: 0x0f, - Bit: 0x10, - Timestamp2: 0x11, - DateTime2: 0x12, - Time2: 0x13, - NewDecimal: 0xf6, - Enum: 0xf7, - Set: 0xf8, - TinyBlob: 0xf9, - MediumBlob: 0xfa, - LongBlob: 0xfb, - Blob: 0xfc, - VarString: 0xfd, - String: 0xfe, - Geometry: 0xff, -} as const; diff --git a/lib/packets/builders/query.ts b/lib/packets/builders/query.ts deleted file mode 100644 index 0ba9d75..0000000 --- a/lib/packets/builders/query.ts +++ /dev/null @@ -1,16 +0,0 @@ -import { replaceParams } from "../../utils/query.ts"; -import { BufferWriter } from "../../utils/buffer.ts"; -import { encode } from "../../utils/encoding.ts"; -import type { MysqlParameterType } from "../parsers/result.ts"; - -/** @ignore */ -export function buildQuery( - sql: string, - params: MysqlParameterType[] = [], -): Uint8Array { - const data = encode(replaceParams(sql, params)); - const writer = new BufferWriter(new Uint8Array(data.length + 1)); - writer.write(0x03); - writer.writeBuffer(data); - return writer.buffer; -} diff --git a/lib/packets/packet.ts b/lib/packets/packet.ts deleted file mode 100644 index 9af5894..0000000 --- a/lib/packets/packet.ts +++ /dev/null @@ -1,155 +0,0 @@ -import { dump } from "@stdext/encoding/hex"; -import { BufferReader, BufferWriter } from "../utils/buffer.ts"; -import { MysqlWriteError } from "../utils/errors.ts"; -import { logger } from "../utils/logger.ts"; -import { ComQueryResponsePacket } from "../constant/packet.ts"; - -/** @ignore */ -interface PacketHeader { - size: number; - no: number; -} - -/** - * Helper for sending a packet through the connection - */ -export class PacketWriter { - header: PacketHeader; - body: Uint8Array; - - constructor(body: Uint8Array, no: number) { - this.body = body; - this.header = { size: body.length, no }; - } - - /** - * Send the packet through the connection - * - * @param conn The connection - */ - async write(conn: Deno.Conn) { - const body = this.body; - - const data = new BufferWriter(new Uint8Array(4 + body.length)); - data.writeUints(3, this.header.size); - data.write(this.header.no); - data.writeBuffer(body); - logger().debug(`send: ${data.length}B \n${dump(data.buffer)}\n`); - try { - let wrote = 0; - do { - wrote += await conn.write(data.buffer.subarray(wrote)); - } while (wrote < data.length); - } catch (error) { - throw new MysqlWriteError(error.message); - } - } - - /** - * Send a packet through the connection - * - * @param conn The connection - * @param body The packet body - * @param no The packet number - * @returns SendPacket instance - */ - static async write( - conn: Deno.Conn, - body: Uint8Array, - no: number, - ): Promise { - const packet = new PacketWriter(body, no); - await packet.write(conn); - return packet; - } -} - -/** - * Helper for receiving a packet through the connection - */ -export class PacketReader { - header: PacketHeader; - body: BufferReader; - type: ComQueryResponsePacket; - - constructor( - header: PacketHeader, - body: BufferReader, - type: ComQueryResponsePacket, - ) { - this.header = header; - this.body = body; - this.type = type; - } - - /** - * Read a subarray from the connection - * - * @param conn The connection - * @param buffer The buffer to read into - * @returns The number of bytes read - */ - static async #readSubarray( - conn: Deno.Conn, - buffer: Uint8Array, - ): Promise { - const size = buffer.length; - let haveRead = 0; - while (haveRead < size) { - const nread = await conn.read(buffer.subarray(haveRead)); - if (nread === null) return null; - haveRead += nread; - } - return haveRead; - } - - /** - * Read a subarray from the connection - * - * @param conn - * @returns The PacketReader instance or null if nothing could be read - */ - static async read(conn: Deno.Conn): Promise { - const headerReader = new BufferReader(new Uint8Array(4)); - let readCount = 0; - let nread = await this.#readSubarray(conn, headerReader.buffer); - if (nread === null) return null; - readCount = nread; - const bodySize = headerReader.readUints(3); - const header = { - size: bodySize, - no: headerReader.readUint8(), - }; - const bodyReader = new BufferReader(new Uint8Array(bodySize)); - nread = await this.#readSubarray(conn, bodyReader.buffer); - if (nread === null) return null; - readCount += nread; - - let type: ComQueryResponsePacket; - switch (bodyReader.buffer[0]) { - case ComQueryResponsePacket.OK_Packet: - type = ComQueryResponsePacket.OK_Packet; - break; - case ComQueryResponsePacket.ERR_Packet: - type = ComQueryResponsePacket.ERR_Packet; - break; - case ComQueryResponsePacket.EOF_Packet: - type = ComQueryResponsePacket.EOF_Packet; - break; - default: - type = ComQueryResponsePacket.Result; - break; - } - - logger().debug(() => { - const data = new Uint8Array(readCount); - data.set(headerReader.buffer); - data.set(bodyReader.buffer, 4); - return `receive: ${readCount}B, size = ${header.size}, no = ${header.no} \n${ - dump(data) - }\n`; - }); - - return new PacketReader(header, bodyReader, type); - } -} diff --git a/lib/packets/parsers/result.ts b/lib/packets/parsers/result.ts deleted file mode 100644 index 2b889e1..0000000 --- a/lib/packets/parsers/result.ts +++ /dev/null @@ -1,159 +0,0 @@ -import type { BufferReader } from "../../utils/buffer.ts"; -import { MysqlDataType } from "../../constant/mysql_types.ts"; -import type { ArrayRow, Row, SqlxQueryOptions } from "@halvardm/sqlx"; - -export type MysqlParameterType = - | null - | string - | number - | boolean - | bigint - | Date - // deno-lint-ignore no-explicit-any - | Array - | object - | undefined; - -/** - * Field information - */ -export interface FieldInfo { - catalog: string; - schema: string; - table: string; - originTable: string; - name: string; - originName: string; - encoding: number; - fieldLen: number; - fieldType: number; - fieldFlag: number; - decimals: number; - defaultVal: string; -} - -/** - * Parses the field - */ -export function parseField(reader: BufferReader): FieldInfo { - const catalog = reader.readLenCodeString()!; - const schema = reader.readLenCodeString()!; - const table = reader.readLenCodeString()!; - const originTable = reader.readLenCodeString()!; - const name = reader.readLenCodeString()!; - const originName = reader.readLenCodeString()!; - reader.skip(1); - const encoding = reader.readUint16()!; - const fieldLen = reader.readUint32()!; - const fieldType = reader.readUint8()!; - const fieldFlag = reader.readUint16()!; - const decimals = reader.readUint8()!; - reader.skip(1); - const defaultVal = reader.readLenCodeString()!; - return { - catalog, - schema, - table, - originName, - fieldFlag, - originTable, - fieldLen, - name, - fieldType, - encoding, - decimals, - defaultVal, - }; -} - -/** - * Parse the row as an array - */ -export function parseRowArray( - reader: BufferReader, - fields: FieldInfo[], - options?: SqlxQueryOptions, -): ArrayRow { - const row: MysqlParameterType[] = []; - for (const field of fields) { - const val = reader.readLenCodeString(); - const parsedVal = val === null ? null : convertType(field, val, options); - row.push(parsedVal); - } - return row; -} - -/** - * Parses the row as an object - */ -export function parseRowObject( - reader: BufferReader, - fields: FieldInfo[], -): Row { - const rowArray = parseRowArray(reader, fields); - return getRowObject(fields, rowArray); -} - -export function getRowObject( - fields: FieldInfo[], - row: ArrayRow, -): Row { - const obj: Row = {}; - for (const [i, field] of fields.entries()) { - const name = field.name; - obj[name] = row[i]; - } - return obj; -} - -/** - * Converts the value to the correct type - */ -function convertType( - field: FieldInfo, - val: string, - options?: SqlxQueryOptions, -): MysqlParameterType { - if (options?.transformOutput) { - // deno-lint-ignore no-explicit-any - return options.transformOutput(val) as any; - } - const { fieldType } = field; - switch (fieldType) { - case MysqlDataType.Decimal: - case MysqlDataType.Double: - case MysqlDataType.Float: - case MysqlDataType.DateTime2: - return parseFloat(val); - case MysqlDataType.NewDecimal: - return val; // #42 MySQL's decimal type cannot be accurately represented by the Number. - case MysqlDataType.Tiny: - case MysqlDataType.Short: - case MysqlDataType.Long: - case MysqlDataType.Int24: - return parseInt(val); - case MysqlDataType.LongLong: - if ( - Number(val) < Number.MIN_SAFE_INTEGER || - Number(val) > Number.MAX_SAFE_INTEGER - ) { - return BigInt(val); - } else { - return parseInt(val); - } - case MysqlDataType.VarChar: - case MysqlDataType.VarString: - case MysqlDataType.String: - case MysqlDataType.Time: - case MysqlDataType.Time2: - return val; - case MysqlDataType.Date: - case MysqlDataType.Timestamp: - case MysqlDataType.DateTime: - case MysqlDataType.NewDate: - case MysqlDataType.Timestamp2: - return new Date(val); - default: - return val; - } -} diff --git a/lib/pool.test.ts b/lib/pool.test.ts deleted file mode 100644 index 0309a62..0000000 --- a/lib/pool.test.ts +++ /dev/null @@ -1,50 +0,0 @@ -import { MysqlClientPool } from "./pool.ts"; -import { QUERIES, services } from "./utils/testing.ts"; -import { clientPoolTest } from "@halvardm/sqlx/testing"; - -Deno.test("Pool Test", async (t) => { - for (const service of services) { - await t.step(`Testing ${service.name}`, async (t) => { - await t.step(`TCP`, async (t) => { - await clientPoolTest({ - t, - Client: MysqlClientPool, - connectionUrl: service.url, - connectionOptions: {}, - queries: QUERIES, - }); - }); - - // Enable once socket connection issue is fixed - // - // await t.step(`UNIX Socket`, async (t) => { - // await implementationTest({ - // t, - // Client: MysqlClient, - // // deno-lint-ignore no-explicit-any - // PoolClient: MysqlClientPool as any, - // connectionUrl: service.urlSocket, - // connectionOptions: {}, - // queries: { - // createTable: - // "CREATE TABLE IF NOT EXISTS sqlxtesttable (testcol TEXT)", - // dropTable: "DROP TABLE IF EXISTS sqlxtesttable", - // insertOneToTable: "INSERT INTO sqlxtesttable (testcol) VALUES (?)", - // insertManyToTable: - // "INSERT INTO sqlxtesttable (testcol) VALUES (?),(?),(?)", - // selectOneFromTable: - // "SELECT * FROM sqlxtesttable WHERE testcol = ? LIMIT 1", - // selectByMatchFromTable: - // "SELECT * FROM sqlxtesttable WHERE testcol = ?", - // selectManyFromTable: "SELECT * FROM sqlxtesttable", - // select1AsString: "SELECT '1' as result", - // select1Plus1AsNumber: "SELECT 1+1 as result", - // deleteByMatchFromTable: - // "DELETE FROM sqlxtesttable WHERE testcol = ?", - // deleteAllFromTable: "DELETE FROM sqlxtesttable", - // }, - // }); - // }); - }); - } -}); diff --git a/lib/pool.ts b/lib/pool.ts deleted file mode 100644 index 0228ccf..0000000 --- a/lib/pool.ts +++ /dev/null @@ -1,159 +0,0 @@ -import { - SqlxBase, - type SqlxClientPool, - type SqlxClientPoolOptions, - SqlxDeferredStack, - SqlxError, - type SqlxPoolClient, -} from "@halvardm/sqlx"; -import { - type MysqlPrepared, - type MysqlQueryOptions, - type MySqlTransaction, - MysqlTransactionable, - type MysqlTransactionOptions, -} from "./sqlx.ts"; -import { MysqlConnection, type MysqlConnectionOptions } from "./connection.ts"; -import type { MysqlParameterType } from "./packets/parsers/result.ts"; -import { - MysqlPoolAcquireEvent, - MysqlPoolCloseEvent, - MysqlPoolConnectEvent, - MysqlPoolReleaseEvent, -} from "./utils/events.ts"; -import { MysqlClientEventTarget } from "./utils/events.ts"; - -export interface MysqlClientPoolOptions - extends MysqlConnectionOptions, SqlxClientPoolOptions { -} - -export class MysqlPoolClient extends MysqlTransactionable - implements - SqlxPoolClient< - MysqlConnectionOptions, - MysqlConnection, - MysqlParameterType, - MysqlQueryOptions, - MysqlPrepared, - MysqlTransactionOptions, - MySqlTransaction - > { - /** - * Must be set by the client pool on creation - * @inheritdoc - */ - release(): Promise { - throw new Error("Method not implemented."); - } - - async [Symbol.asyncDispose](): Promise { - await this.release(); - } -} - -export class MysqlClientPool extends SqlxBase implements - SqlxClientPool< - MysqlConnectionOptions, - MysqlConnection, - MysqlParameterType, - MysqlQueryOptions, - MysqlPrepared, - MysqlTransactionOptions, - MySqlTransaction, - MysqlPoolClient, - SqlxDeferredStack - > { - readonly connectionUrl: string; - readonly connectionOptions: MysqlClientPoolOptions; - readonly eventTarget: EventTarget; - readonly deferredStack: SqlxDeferredStack; - readonly queryOptions: MysqlQueryOptions; - - #connected: boolean = false; - - get connected(): boolean { - return this.#connected; - } - - constructor( - connectionUrl: string | URL, - connectionOptions: MysqlClientPoolOptions = {}, - ) { - super(); - this.connectionUrl = connectionUrl.toString(); - this.connectionOptions = connectionOptions; - this.queryOptions = connectionOptions; - this.eventTarget = new MysqlClientEventTarget(); - this.deferredStack = new SqlxDeferredStack( - connectionOptions, - ); - } - - async connect(): Promise { - for (let i = 0; i < this.deferredStack.maxSize; i++) { - const conn = new MysqlConnection( - this.connectionUrl, - this.connectionOptions, - ); - const client = new MysqlPoolClient( - conn, - this.queryOptions, - ); - client.release = () => this.release(client); - - if (!this.connectionOptions.lazyInitialization) { - await client.connection.connect(); - this.eventTarget.dispatchEvent( - new MysqlPoolConnectEvent({ connectable: client }), - ); - } - - this.deferredStack.push(client); - } - - this.#connected = true; - } - - async close(): Promise { - this.#connected = false; - - for (const client of this.deferredStack.elements) { - this.eventTarget.dispatchEvent( - new MysqlPoolCloseEvent({ connectable: client }), - ); - await client.connection.close(); - } - } - - async acquire(): Promise { - const client = await this.deferredStack.pop(); - if (!client.connected) { - await client.connection.connect(); - } - - this.eventTarget.dispatchEvent( - new MysqlPoolAcquireEvent({ connectable: client }), - ); - return client; - } - - async release(client: MysqlPoolClient): Promise { - this.eventTarget.dispatchEvent( - new MysqlPoolReleaseEvent({ connectable: client }), - ); - try { - this.deferredStack.push(client); - } catch (e) { - if (e instanceof SqlxError && e.message === "Max pool size reached") { - await client.connection.close(); - throw e; - } else { - throw e; - } - } - } - - async [Symbol.asyncDispose](): Promise { - await this.close(); - } -} diff --git a/lib/sqlx.ts b/lib/sqlx.ts deleted file mode 100644 index 0dc5679..0000000 --- a/lib/sqlx.ts +++ /dev/null @@ -1,377 +0,0 @@ -import { - type ArrayRow, - type Row, - SqlxBase, - SqlxPreparable, - SqlxPreparedQueriable, - SqlxQueriable, - type SqlxQueryOptions, - SqlxTransactionable, - type SqlxTransactionOptions, - SqlxTransactionQueriable, -} from "@halvardm/sqlx"; -import type { MysqlConnection, MysqlConnectionOptions } from "./connection.ts"; -import { buildQuery } from "./packets/builders/query.ts"; -import type { MysqlParameterType } from "./packets/parsers/result.ts"; -import { MysqlTransactionError } from "./utils/errors.ts"; - -export interface MysqlQueryOptions extends SqlxQueryOptions { -} - -export interface MysqlTransactionOptions extends SqlxTransactionOptions { - beginTransactionOptions: { - withConsistentSnapshot?: boolean; - readWrite?: "READ WRITE" | "READ ONLY"; - }; - commitTransactionOptions: { - chain?: boolean; - release?: boolean; - }; - rollbackTransactionOptions: { - chain?: boolean; - release?: boolean; - savepoint?: string; - }; -} - -export class MysqlQueriable extends SqlxBase implements - SqlxQueriable< - MysqlConnectionOptions, - MysqlConnection, - MysqlParameterType, - MysqlQueryOptions - > { - readonly connection: MysqlConnection; - readonly queryOptions: MysqlQueryOptions; - - get connected(): boolean { - return this.connection.connected; - } - - constructor( - connection: MysqlConnection, - queryOptions: MysqlQueryOptions = {}, - ) { - super(); - this.connection = connection; - this.queryOptions = queryOptions; - } - - execute( - sql: string, - params?: MysqlParameterType[] | undefined, - _options?: MysqlQueryOptions | undefined, - ): Promise { - const data = buildQuery(sql, params); - return this.connection.executeRaw(data); - } - query = Row>( - sql: string, - params?: MysqlParameterType[] | undefined, - options?: MysqlQueryOptions | undefined, - ): Promise { - return Array.fromAsync(this.queryMany(sql, params, options)); - } - async queryOne = Row>( - sql: string, - params?: MysqlParameterType[] | undefined, - options?: MysqlQueryOptions | undefined, - ): Promise { - const res = await this.query(sql, params, options); - return res[0]; - } - async *queryMany = Row>( - sql: string, - params?: MysqlParameterType[], - options?: MysqlQueryOptions | undefined, - ): AsyncGenerator { - const data = buildQuery(sql, params); - for await ( - const res of this.connection.queryManyObjectRaw(data, options) - ) { - yield res; - } - } - - queryArray< - T extends ArrayRow = ArrayRow, - >( - sql: string, - params?: MysqlParameterType[] | undefined, - options?: MysqlQueryOptions | undefined, - ): Promise { - return Array.fromAsync(this.queryManyArray(sql, params, options)); - } - async queryOneArray< - T extends ArrayRow = ArrayRow, - >( - sql: string, - params?: MysqlParameterType[] | undefined, - options?: MysqlQueryOptions | undefined, - ): Promise { - const res = await this.queryArray(sql, params, options); - return res[0]; - } - async *queryManyArray< - T extends ArrayRow = ArrayRow, - >( - sql: string, - params?: MysqlParameterType[] | undefined, - options?: MysqlQueryOptions | undefined, - ): AsyncGenerator { - const data = buildQuery(sql, params); - for await ( - const res of this.connection.queryManyArrayRaw(data, options) - ) { - yield res; - } - } - sql = Row>( - strings: TemplateStringsArray, - ...parameters: MysqlParameterType[] - ): Promise { - return this.query(strings.join("?"), parameters); - } - sqlArray< - T extends ArrayRow = ArrayRow, - >( - strings: TemplateStringsArray, - ...parameters: MysqlParameterType[] - ): Promise { - return this.queryArray(strings.join("?"), parameters); - } -} - -/** - * Prepared statement - * - * @todo implement prepared statements properly - */ -export class MysqlPrepared extends SqlxBase implements - SqlxPreparedQueriable< - MysqlConnectionOptions, - MysqlConnection, - MysqlParameterType, - MysqlQueryOptions - > { - readonly sql: string; - readonly queryOptions: MysqlQueryOptions; - - #queriable: MysqlQueriable; - - connection: MysqlConnection; - - get connected(): boolean { - return this.connection.connected; - } - - constructor( - connection: MysqlConnection, - sql: string, - options: MysqlQueryOptions = {}, - ) { - super(); - this.connection = connection; - this.sql = sql; - this.queryOptions = options; - this.#queriable = new MysqlQueriable(connection, this.queryOptions); - } - - execute( - params?: MysqlParameterType[] | undefined, - _options?: MysqlQueryOptions | undefined, - ): Promise { - return this.#queriable.execute(this.sql, params); - } - query = Row>( - params?: MysqlParameterType[] | undefined, - options?: MysqlQueryOptions | undefined, - ): Promise { - return this.#queriable.query(this.sql, params, options); - } - queryOne = Row>( - params?: MysqlParameterType[] | undefined, - options?: MysqlQueryOptions | undefined, - ): Promise { - return this.#queriable.queryOne(this.sql, params, options); - } - queryMany = Row>( - params?: MysqlParameterType[] | undefined, - options?: MysqlQueryOptions | undefined, - ): AsyncGenerator { - return this.#queriable.queryMany(this.sql, params, options); - } - queryArray< - T extends ArrayRow = ArrayRow, - >( - params?: MysqlParameterType[] | undefined, - options?: MysqlQueryOptions | undefined, - ): Promise { - return this.#queriable.queryArray(this.sql, params, options); - } - queryOneArray< - T extends ArrayRow = ArrayRow, - >( - params?: MysqlParameterType[] | undefined, - options?: MysqlQueryOptions | undefined, - ): Promise { - return this.#queriable.queryOneArray(this.sql, params, options); - } - queryManyArray< - T extends ArrayRow = ArrayRow, - >( - params?: MysqlParameterType[] | undefined, - options?: MysqlQueryOptions | undefined, - ): AsyncGenerator { - return this.#queriable.queryManyArray(this.sql, params, options); - } -} - -export class MysqlPreparable extends MysqlQueriable implements - SqlxPreparable< - MysqlConnectionOptions, - MysqlConnection, - MysqlParameterType, - MysqlQueryOptions, - MysqlPrepared - > { - prepare(sql: string, options?: MysqlQueryOptions | undefined): MysqlPrepared { - return new MysqlPrepared(this.connection, sql, options); - } -} - -export class MySqlTransaction extends MysqlPreparable - implements - SqlxTransactionQueriable< - MysqlConnectionOptions, - MysqlConnection, - MysqlParameterType, - MysqlQueryOptions, - MysqlTransactionOptions - > { - #inTransaction: boolean = true; - get inTransaction(): boolean { - return this.connected && this.#inTransaction; - } - - get connected(): boolean { - if (!this.#inTransaction) { - throw new MysqlTransactionError( - "Transaction is not active, create a new one using beginTransaction", - ); - } - - return super.connected; - } - - async commitTransaction( - options?: MysqlTransactionOptions["commitTransactionOptions"], - ): Promise { - try { - let sql = "COMMIT"; - - if (options?.chain === true) { - sql += " AND CHAIN"; - } else if (options?.chain === false) { - sql += " AND NO CHAIN"; - } - - if (options?.release === true) { - sql += " RELEASE"; - } else if (options?.release === false) { - sql += " NO RELEASE"; - } - await this.execute(sql); - } catch (e) { - this.#inTransaction = false; - throw e; - } - } - async rollbackTransaction( - options?: MysqlTransactionOptions["rollbackTransactionOptions"], - ): Promise { - try { - let sql = "ROLLBACK"; - - if (options?.savepoint) { - sql += ` TO ${options.savepoint}`; - await this.execute(sql); - return; - } - - if (options?.chain === true) { - sql += " AND CHAIN"; - } else if (options?.chain === false) { - sql += " AND NO CHAIN"; - } - - if (options?.release === true) { - sql += " RELEASE"; - } else if (options?.release === false) { - sql += " NO RELEASE"; - } - - await this.execute(sql); - } catch (e) { - this.#inTransaction = false; - throw e; - } - } - async createSavepoint(name: string = `\t_bm.\t`): Promise { - await this.execute(`SAVEPOINT ${name}`); - } - async releaseSavepoint(name: string = `\t_bm.\t`): Promise { - await this.execute(`RELEASE SAVEPOINT ${name}`); - } -} - -/** - * Represents a queriable class that can be used to run transactions. - */ -export class MysqlTransactionable extends MysqlPreparable - implements - SqlxTransactionable< - MysqlConnectionOptions, - MysqlConnection, - MysqlParameterType, - MysqlQueryOptions, - MysqlTransactionOptions, - MySqlTransaction - > { - async beginTransaction( - options?: MysqlTransactionOptions["beginTransactionOptions"], - ): Promise { - let sql = "START TRANSACTION"; - if (options?.withConsistentSnapshot) { - sql += ` WITH CONSISTENT SNAPSHOT`; - } - - if (options?.readWrite) { - sql += ` ${options.readWrite}`; - } - - await this.execute(sql); - - return new MySqlTransaction(this.connection, this.queryOptions); - } - - async transaction( - fn: (t: MySqlTransaction) => Promise, - options?: MysqlTransactionOptions, - ): Promise { - const transaction = await this.beginTransaction( - options?.beginTransactionOptions, - ); - - try { - const result = await fn(transaction); - await transaction.commitTransaction(options?.commitTransactionOptions); - return result; - } catch (error) { - await transaction.rollbackTransaction( - options?.rollbackTransactionOptions, - ); - throw error; - } - } -} diff --git a/lib/utils/encoding.ts b/lib/utils/encoding.ts deleted file mode 100644 index c535c0a..0000000 --- a/lib/utils/encoding.ts +++ /dev/null @@ -1,16 +0,0 @@ -const encoder = new TextEncoder(); -const decoder = new TextDecoder(); - -/** - * Shorthand for `new TextEncoder().encode(input)`. - */ -export function encode(input: string) { - return encoder.encode(input); -} - -/** - * Shorthand for `new TextDecoder().decode(input)`. - */ -export function decode(input: BufferSource) { - return decoder.decode(input); -} diff --git a/lib/utils/errors.ts b/lib/utils/errors.ts deleted file mode 100644 index be73e89..0000000 --- a/lib/utils/errors.ts +++ /dev/null @@ -1,50 +0,0 @@ -import { isSqlxError, SqlxError } from "@halvardm/sqlx"; - -export class MysqlError extends SqlxError { - constructor(msg: string) { - super(msg); - } -} - -export class MysqlConnectionError extends MysqlError { - constructor(msg: string) { - super(msg); - } -} - -export class MysqlWriteError extends MysqlError { - constructor(msg: string) { - super(msg); - } -} - -export class MysqlReadError extends MysqlError { - constructor(msg: string) { - super(msg); - } -} - -export class MysqlResponseTimeoutError extends MysqlError { - constructor(msg: string) { - super(msg); - } -} - -export class MysqlProtocolError extends MysqlError { - constructor(msg: string) { - super(msg); - } -} - -export class MysqlTransactionError extends MysqlError { - constructor(msg: string) { - super(msg); - } -} - -/** - * Check if an error is a MysqlError - */ -export function isMysqlError(err: unknown): err is MysqlError { - return isSqlxError(err) && err instanceof MysqlError; -} diff --git a/lib/utils/events.ts b/lib/utils/events.ts deleted file mode 100644 index ba89720..0000000 --- a/lib/utils/events.ts +++ /dev/null @@ -1,68 +0,0 @@ -import { - type SqlxClientEventType, - SqlxConnectableCloseEvent, - SqlxConnectableConnectEvent, - type SqlxConnectableEventInit, - SqlxEventTarget, - SqlxPoolConnectableAcquireEvent, - SqlxPoolConnectableReleaseEvent, - type SqlxPoolConnectionEventType, -} from "@halvardm/sqlx"; -import type { MysqlConnectionOptions } from "../connection.ts"; -import type { MysqlConnection } from "../connection.ts"; -import type { MysqlClient } from "../client.ts"; -import type { MysqlPoolClient } from "../pool.ts"; - -export class MysqlClientEventTarget extends SqlxEventTarget< - MysqlConnectionOptions, - MysqlConnection, - SqlxClientEventType, - MysqlClientEventInit, - MysqlClientEvents -> { -} -export class MysqlPoolClientEventTarget extends SqlxEventTarget< - MysqlConnectionOptions, - MysqlConnection, - SqlxPoolConnectionEventType, - MysqlPoolEventInit, - MysqlPoolEvents -> { -} - -export type MysqlClientEventInit = SqlxConnectableEventInit< - MysqlClient ->; - -export type MysqlPoolEventInit = SqlxConnectableEventInit< - MysqlPoolClient ->; - -export class MysqlClientConnectEvent - extends SqlxConnectableConnectEvent {} - -export class MysqlClientCloseEvent - extends SqlxConnectableCloseEvent {} -export class MysqlPoolConnectEvent - extends SqlxConnectableConnectEvent {} - -export class MysqlPoolCloseEvent - extends SqlxConnectableCloseEvent {} - -export class MysqlPoolAcquireEvent - extends SqlxPoolConnectableAcquireEvent { -} - -export class MysqlPoolReleaseEvent - extends SqlxPoolConnectableReleaseEvent { -} - -export type MysqlClientEvents = - | MysqlClientConnectEvent - | MysqlClientCloseEvent; - -export type MysqlPoolEvents = - | MysqlClientConnectEvent - | MysqlClientCloseEvent - | MysqlPoolAcquireEvent - | MysqlPoolReleaseEvent; diff --git a/lib/utils/hash.ts b/lib/utils/hash.ts deleted file mode 100644 index b3f8aa5..0000000 --- a/lib/utils/hash.ts +++ /dev/null @@ -1,53 +0,0 @@ -import { crypto, type DigestAlgorithm } from "@std/crypto"; -import { xor } from "./bytes.ts"; -import { MysqlError } from "./errors.ts"; -import { encode } from "./encoding.ts"; - -async function hash( - algorithm: DigestAlgorithm, - data: Uint8Array, -): Promise { - return new Uint8Array(await crypto.subtle.digest(algorithm, data)); -} - -async function mysqlNativePassword( - password: string, - seed: Uint8Array, -): Promise { - const pwd1 = await hash("SHA-1", encode(password)); - const pwd2 = await hash("SHA-1", pwd1); - - let seedAndPwd2 = new Uint8Array(seed.length + pwd2.length); - seedAndPwd2.set(seed); - seedAndPwd2.set(pwd2, seed.length); - seedAndPwd2 = await hash("SHA-1", seedAndPwd2); - - return xor(seedAndPwd2, pwd1); -} - -async function cachingSha2Password( - password: string, - seed: Uint8Array, -): Promise { - const stage1 = await hash("SHA-256", encode(password)); - const stage2 = await hash("SHA-256", stage1); - const stage3 = await hash("SHA-256", Uint8Array.from([...stage2, ...seed])); - return xor(stage1, stage3); -} - -export default function auth( - authPluginName: string, - password: string, - seed: Uint8Array, -) { - switch (authPluginName) { - case "mysql_native_password": - // Native password authentication only need and will need 20-byte challenge. - return mysqlNativePassword(password, seed.slice(0, 20)); - - case "caching_sha2_password": - return cachingSha2Password(password, seed); - default: - throw new MysqlError("Not supported"); - } -} diff --git a/lib/utils/logger.ts b/lib/utils/logger.ts deleted file mode 100644 index 28830cd..0000000 --- a/lib/utils/logger.ts +++ /dev/null @@ -1,12 +0,0 @@ -import { getLogger, type Logger } from "@std/log"; -import { MODULE_NAME } from "./meta.ts"; - -/** - * Used for internal module logging, - * do not import this directly outside of this module. - * - * @see {@link https://deno.land/std/log/mod.ts} - */ -export function logger(): Logger { - return getLogger(MODULE_NAME); -} diff --git a/lib/utils/meta.ts b/lib/utils/meta.ts deleted file mode 100644 index 4b71791..0000000 --- a/lib/utils/meta.ts +++ /dev/null @@ -1,4 +0,0 @@ -import meta from "../../deno.json" with { type: "json" }; - -export const MODULE_NAME = meta.name; -export const VERSION = meta.version; diff --git a/lib/utils/query.ts b/lib/utils/query.ts deleted file mode 100644 index 1d4e5fc..0000000 --- a/lib/utils/query.ts +++ /dev/null @@ -1,110 +0,0 @@ -import type { MysqlParameterType } from "../packets/parsers/result.ts"; - -/** - * Replaces parameters in a SQL query with the given values. - * - * Taken from https://github.com/manyuanrong/sql-builder/blob/master/util.ts - */ -export function replaceParams( - sql: string, - params: MysqlParameterType[], -): string { - if (!params) return sql; - let paramIndex = 0; - sql = sql.replace( - /('[^'\\]*(?:\\.[^'\\]*)*')|("[^"\\]*(?:\\.[^"\\]*)*")|(\?\?)|(\?)/g, - (str) => { - if (paramIndex >= params.length) return str; - // ignore - if (/".*"/g.test(str) || /'.*'/g.test(str)) { - return str; - } - // identifier - if (str === "??") { - const val = params[paramIndex++]; - if (val instanceof Array) { - return `(${ - val.map((item) => replaceParams("??", [item])).join(",") - })`; - } else if (val === "*") { - return val; - } else if (typeof val === "string" && val.includes(".")) { - // a.b => `a`.`b` - const _arr = val.split("."); - return replaceParams(_arr.map(() => "??").join("."), _arr); - } else if ( - typeof val === "string" && - (val.includes(" as ") || val.includes(" AS ")) - ) { - // a as b => `a` AS `b` - const newVal = val.replace(" as ", " AS "); - const _arr = newVal.split(" AS "); - return replaceParams(_arr.map(() => "??").join(" AS "), _arr); - } else { - return ["`", val, "`"].join(""); - } - } - // value - const val = params[paramIndex++]; - if (val === null) return "NULL"; - switch (typeof val) { - // deno-lint-ignore no-fallthrough - case "object": - if (val instanceof Date) return `"${formatDate(val)}"`; - if ((val as unknown) instanceof Array) { - return `(${ - (val as Array).map((item) => replaceParams("?", [item])) - .join(",") - })`; - } - case "string": - return `"${escapeString(val as string)}"`; - case "undefined": - return "NULL"; - case "number": - case "boolean": - default: - return val.toString(); - } - }, - ); - return sql; -} - -/** - * Formats date to a 'YYYY-MM-DD HH:MM:SS.SSS' string. - */ -function formatDate(date: Date) { - date.toISOString(); - const year = date.getFullYear(); - const month = (date.getMonth() + 1).toString().padStart(2, "0"); - const days = date - .getDate() - .toString() - .padStart(2, "0"); - const hours = date - .getHours() - .toString() - .padStart(2, "0"); - const minutes = date - .getMinutes() - .toString() - .padStart(2, "0"); - const seconds = date - .getSeconds() - .toString() - .padStart(2, "0"); - // Date does not support microseconds precision, so we only keep the milliseconds part. - const milliseconds = date - .getMilliseconds() - .toString() - .padStart(3, "0"); - return `${year}-${month}-${days} ${hours}:${minutes}:${seconds}.${milliseconds}`; -} - -/** - * Escapes a string for use in a SQL query. - */ -function escapeString(str: string) { - return str.replaceAll("\\", "\\\\").replaceAll('"', '\\"'); -} diff --git a/lib/utils/testing.ts b/lib/utils/testing.ts deleted file mode 100644 index 013ceaf..0000000 --- a/lib/utils/testing.ts +++ /dev/null @@ -1,87 +0,0 @@ -import { resolve } from "@std/path"; -import { ConsoleHandler, setup } from "@std/log"; -import { MODULE_NAME } from "./meta.ts"; -import { parse } from "@std/yaml"; -import type { BaseQueriableTestOptions } from "@halvardm/sqlx/testing"; - -type DockerCompose = { - services: { - [key: string]: { - image: string; - ports: string[]; - environment: Record; - volumes: string[]; - }; - }; -}; - -type ServiceParsed = { - name: string; - port: string; - database: string; - // socket: string; - url: string; - // urlSocket: string; -}; - -setup({ - handlers: { - console: new ConsoleHandler("DEBUG"), - }, - loggers: { - // configure default logger available via short-hand methods above - default: { - level: "WARN", - handlers: ["console"], - }, - [MODULE_NAME]: { - level: "WARN", - handlers: ["console"], - }, - }, -}); - -export const DIR_TMP_TEST = resolve(Deno.cwd(), "tmp_test"); - -const composeParsed = parse( - Deno.readTextFileSync(resolve(Deno.cwd(), "compose.yml")), - { "onWarning": console.warn }, -) as DockerCompose; - -export const services: ServiceParsed[] = Object.entries(composeParsed.services) - .map( - ([key, value]) => { - const port = value.ports[0].split(":")[0]; - const database = Object.entries(value.environment).find(([e]) => - e.includes("DATABASE") - )?.[1] as string; - // const socket = resolve(value.volumes[0].split(":")[0])+"/mysqld.sock"; - const url = `mysql://root@0.0.0.0:${port}/${database}`; - // const urlSocket = `${url}?socket=${socket}`; - return { - name: key, - port, - database, - // socket, - url, - // urlSocket, - }; - }, - ); - -export const URL_TEST_CONNECTION = services.find((s) => s.name === "mysql") - ?.url as string; - -export const QUERIES: BaseQueriableTestOptions["queries"] = { - createTable: "CREATE TABLE IF NOT EXISTS sqlxtesttable (testcol TEXT)", - dropTable: "DROP TABLE IF EXISTS sqlxtesttable", - insertOneToTable: "INSERT INTO sqlxtesttable (testcol) VALUES (?)", - insertManyToTable: "INSERT INTO sqlxtesttable (testcol) VALUES (?),(?),(?)", - selectOneFromTable: "SELECT * FROM sqlxtesttable WHERE testcol = ? LIMIT 1", - selectByMatchFromTable: "SELECT * FROM sqlxtesttable WHERE testcol = ?", - selectManyFromTable: "SELECT * FROM sqlxtesttable", - select1AsString: "SELECT '1' as result", - select1Plus1AsNumber: "SELECT 1+1 as result", - deleteByMatchFromTable: "DELETE FROM sqlxtesttable WHERE testcol = ?", - deleteAllFromTable: "DELETE FROM sqlxtesttable", -}; diff --git a/mod.ts b/mod.ts index 97922ec..193240d 100644 --- a/mod.ts +++ b/mod.ts @@ -1,6 +1,12 @@ -export * from "./lib/client.ts"; -export * from "./lib/connection.ts"; -export * from "./lib/pool.ts"; -export * from "./lib/utils/errors.ts"; -export * from "./lib/utils/events.ts"; -export * from "./lib/utils/meta.ts"; +export type { ClientConfig } from "./src/client.ts"; +export { Client } from "./src/client.ts"; +export type { TLSConfig } from "./src/client.ts"; +export { TLSMode } from "./src/client.ts"; + +export type { ExecuteResult } from "./src/connection.ts"; +export { Connection } from "./src/connection.ts"; + +export type { LoggerConfig } from "./src/logger.ts"; +export { configLogger } from "./src/logger.ts"; + +export { log } from "./deps.ts"; diff --git a/package.json b/package.json new file mode 100644 index 0000000..1a9fcac --- /dev/null +++ b/package.json @@ -0,0 +1,23 @@ +{ + "name": "deno_mysql", + "version": "1.0.0", + "description": "[![Build Status](https://www.travis-ci.org/manyuanrong/deno_mysql.svg?branch=master)](https://www.travis-ci.org/manyuanrong/deno_mysql)", + "main": "index.js", + "scripts": { + "docs": "typedoc --theme minimal --ignoreCompilerErrors --excludePrivate --excludeExternals --entryPoint client.ts --mode file ./src --out ./docs" + }, + "repository": { + "type": "git", + "url": "git+https://github.com/manyuanrong/deno_mysql.git" + }, + "keywords": [], + "author": "", + "license": "ISC", + "bugs": { + "url": "https://github.com/manyuanrong/deno_mysql/issues" + }, + "homepage": "https://github.com/manyuanrong/deno_mysql#readme", + "devDependencies": { + "typedoc": "^0.14.2" + } +} diff --git a/src/auth.ts b/src/auth.ts new file mode 100644 index 0000000..deafa1d --- /dev/null +++ b/src/auth.ts @@ -0,0 +1,43 @@ +import { createHash, SupportedAlgorithm } from "../deps.ts"; +import { xor } from "./util.ts"; +import { encode } from "./buffer.ts"; + +function hash(algorithm: SupportedAlgorithm, data: Uint8Array): Uint8Array { + return new Uint8Array(createHash(algorithm).update(data).digest()); +} + +function mysqlNativePassword(password: string, seed: Uint8Array): Uint8Array { + const pwd1 = hash("sha1", encode(password)); + const pwd2 = hash("sha1", pwd1); + + let seedAndPwd2 = new Uint8Array(seed.length + pwd2.length); + seedAndPwd2.set(seed); + seedAndPwd2.set(pwd2, seed.length); + seedAndPwd2 = hash("sha1", seedAndPwd2); + + return xor(seedAndPwd2, pwd1); +} + +function cachingSha2Password(password: string, seed: Uint8Array): Uint8Array { + const stage1 = hash("sha256", encode(password)); + const stage2 = hash("sha256", stage1); + const stage3 = hash("sha256", Uint8Array.from([...stage2, ...seed])); + return xor(stage1, stage3); +} + +export default function auth( + authPluginName: string, + password: string, + seed: Uint8Array, +) { + switch (authPluginName) { + case "mysql_native_password": + // Native password authentication only need and will need 20-byte challenge. + return mysqlNativePassword(password, seed.slice(0, 20)); + + case "caching_sha2_password": + return cachingSha2Password(password, seed); + default: + throw new Error("Not supported"); + } +} diff --git a/src/auth_plugin/caching_sha2_password.ts b/src/auth_plugin/caching_sha2_password.ts new file mode 100644 index 0000000..1e8cbbe --- /dev/null +++ b/src/auth_plugin/caching_sha2_password.ts @@ -0,0 +1,78 @@ +import { xor } from "../util.ts"; +import { ReceivePacket } from "../packets/packet.ts"; +import { encryptWithPublicKey } from "./crypt.ts"; + +interface handler { + done: boolean; + quickRead?: boolean; + next?: (packet: ReceivePacket) => any; + data?: Uint8Array; +} + +let scramble: Uint8Array, password: string; + +async function start( + scramble_: Uint8Array, + password_: string, +): Promise { + scramble = scramble_; + password = password_; + return { done: false, next: authMoreResponse }; +} + +async function authMoreResponse(packet: ReceivePacket): Promise { + const enum AuthStatusFlags { + FullAuth = 0x04, + FastPath = 0x03, + } + const REQUEST_PUBLIC_KEY = 0x02; + const statusFlag = packet.body.skip(1).readUint8(); + let authMoreData, done = true, next, quickRead = false; + if (statusFlag === AuthStatusFlags.FullAuth) { + authMoreData = new Uint8Array([REQUEST_PUBLIC_KEY]); + done = false; + next = encryptWithKey; + } + if (statusFlag === AuthStatusFlags.FastPath) { + done = false; + quickRead = true; + next = terminate; + } + return { done, next, quickRead, data: authMoreData }; +} + +async function encryptWithKey(packet: ReceivePacket): Promise { + const publicKey = parsePublicKey(packet); + const len = password.length; + const passwordBuffer: Uint8Array = new Uint8Array(len + 1); + for (let n = 0; n < len; n++) { + passwordBuffer[n] = password.charCodeAt(n); + } + passwordBuffer[len] = 0x00; + + const encryptedPassword = await encrypt(passwordBuffer, scramble, publicKey); + return { + done: false, + next: terminate, + data: new Uint8Array(encryptedPassword), + }; +} + +function parsePublicKey(packet: ReceivePacket): string { + return packet.body.skip(1).readNullTerminatedString(); +} + +async function encrypt( + password: Uint8Array, + scramble: Uint8Array, + key: string, +): Promise { + const stage1 = xor(password, scramble); + return await encryptWithPublicKey(key, stage1); +} + +function terminate() { + return { done: true }; +} + +export { start }; diff --git a/lib/utils/crypto.ts b/src/auth_plugin/crypt.ts similarity index 78% rename from lib/utils/crypto.ts rename to src/auth_plugin/crypt.ts index af4f947..8eb2339 100644 --- a/lib/utils/crypto.ts +++ b/src/auth_plugin/crypt.ts @@ -1,6 +1,6 @@ -import { decodeBase64 } from "@std/encoding/base64"; +import { base64Decode } from "../../deps.ts"; -export async function encryptWithPublicKey( +async function encryptWithPublicKey( key: string, data: Uint8Array, ): Promise { @@ -10,7 +10,7 @@ export async function encryptWithPublicKey( key = key.substring(pemHeader.length, key.length - pemFooter.length); const importedKey = await crypto.subtle.importKey( "spki", - decodeBase64(key), + base64Decode(key), { name: "RSA-OAEP", hash: "SHA-256" }, false, ["encrypt"], @@ -24,3 +24,5 @@ export async function encryptWithPublicKey( data, ); } + +export { encryptWithPublicKey }; diff --git a/src/auth_plugin/index.ts b/src/auth_plugin/index.ts new file mode 100644 index 0000000..198e023 --- /dev/null +++ b/src/auth_plugin/index.ts @@ -0,0 +1,4 @@ +import * as caching_sha2_password from "./caching_sha2_password.ts"; +export default { + caching_sha2_password, +}; diff --git a/lib/utils/buffer.ts b/src/buffer.ts similarity index 87% rename from lib/utils/buffer.ts rename to src/buffer.ts index d1ab5ed..5c3e48b 100644 --- a/lib/utils/buffer.ts +++ b/src/buffer.ts @@ -1,8 +1,17 @@ -import { decode, encode } from "./encoding.ts"; +const encoder = new TextEncoder(); +const decoder = new TextDecoder(); -/** - * Buffer reader utility class - */ +/** @ignore */ +export function encode(input: string) { + return encoder.encode(input); +} + +/** @ignore */ +export function decode(input: BufferSource) { + return decoder.decode(input); +} + +/** @ignore */ export class BufferReader { private pos: number = 0; constructor(readonly buffer: Uint8Array) {} @@ -87,9 +96,7 @@ export class BufferReader { } } -/** - * Buffer writer utility class - */ +/** @ignore */ export class BufferWriter { private pos: number = 0; constructor(readonly buffer: Uint8Array) {} @@ -125,6 +132,14 @@ export class BufferWriter { return this; } + writeInt16LE(num: number) {} + + writeIntLE(num: number, len: number) { + const int = new Int32Array(1); + int[0] = 40; + console.log(int); + } + writeUint16(num: number): BufferWriter { return this.writeUints(2, num); } diff --git a/src/client.ts b/src/client.ts new file mode 100644 index 0000000..7d91489 --- /dev/null +++ b/src/client.ts @@ -0,0 +1,165 @@ +import { Connection, ConnectionState, ExecuteResult } from "./connection.ts"; +import { ConnectionPool, PoolConnection } from "./pool.ts"; +import { log } from "./logger.ts"; + +/** + * Client Config + */ +export interface ClientConfig { + /** Database hostname */ + hostname?: string; + /** Database UNIX domain socket path. When used, `hostname` and `port` are ignored. */ + socketPath?: string; + /** Database username */ + username?: string; + /** Database password */ + password?: string; + /** Database port */ + port?: number; + /** Database name */ + db?: string; + /** Whether to display packet debugging information */ + debug?: boolean; + /** Connection read timeout (default: 30 seconds) */ + timeout?: number; + /** Connection pool size (default: 1) */ + poolSize?: number; + /** Connection pool idle timeout in microseconds (default: 4 hours) */ + idleTimeout?: number; + /** charset */ + charset?: string; + /** tls config */ + tls?: TLSConfig; +} + +export enum TLSMode { + DISABLED = "disabled", + VERIFY_IDENTITY = "verify_identity", +} +/** + * TLS Config + */ +export interface TLSConfig { + /** mode of tls. only support disabled and verify_identity now*/ + mode?: TLSMode; + /** A list of root certificates (must be PEM format) that will be used in addition to the + * default root certificates to verify the peer's certificate. */ + caCerts?: string[]; +} + +/** Transaction processor */ +export interface TransactionProcessor { + (connection: Connection): Promise; +} + +/** + * MySQL client + */ +export class Client { + config: ClientConfig = {}; + private _pool?: ConnectionPool; + + private async createConnection(): Promise { + let connection = new PoolConnection(this.config); + await connection.connect(); + return connection; + } + + /** get pool info */ + get pool() { + return this._pool?.info; + } + + /** + * connect to database + * @param config config for client + * @returns Client instance + */ + async connect(config: ClientConfig): Promise { + this.config = { + hostname: "127.0.0.1", + username: "root", + port: 3306, + poolSize: 1, + timeout: 30 * 1000, + idleTimeout: 4 * 3600 * 1000, + ...config, + }; + Object.freeze(this.config); + this._pool = new ConnectionPool( + this.config.poolSize || 10, + this.createConnection.bind(this), + ); + return this; + } + + /** + * execute query sql + * @param sql query sql string + * @param params query params + */ + async query(sql: string, params?: any[]): Promise { + return await this.useConnection(async (connection) => { + return await connection.query(sql, params); + }); + } + + /** + * execute sql + * @param sql sql string + * @param params query params + */ + async execute(sql: string, params?: any[]): Promise { + return await this.useConnection(async (connection) => { + return await connection.execute(sql, params); + }); + } + + async useConnection(fn: (conn: Connection) => Promise) { + if (!this._pool) { + throw new Error("Unconnected"); + } + const connection = await this._pool.pop(); + try { + return await fn(connection); + } finally { + if (connection.state == ConnectionState.CLOSED) { + connection.removeFromPool(); + } else { + connection.returnToPool(); + } + } + } + + /** + * Execute a transaction process, and the transaction successfully + * returns the return value of the transaction process + * @param processor transation processor + */ + async transaction(processor: TransactionProcessor): Promise { + return await this.useConnection(async (connection) => { + try { + await connection.execute("BEGIN"); + const result = await processor(connection); + await connection.execute("COMMIT"); + return result; + } catch (error) { + if (connection.state == ConnectionState.CONNECTED) { + log.info(`ROLLBACK: ${error.message}`); + await connection.execute("ROLLBACK"); + } + throw error; + } + }); + } + + /** + * close connection + */ + async close() { + if (this._pool) { + this._pool.close(); + this._pool = undefined; + } + } +} diff --git a/src/connection.ts b/src/connection.ts new file mode 100644 index 0000000..3f2b23f --- /dev/null +++ b/src/connection.ts @@ -0,0 +1,388 @@ +import { ClientConfig, TLSMode } from "./client.ts"; +import { + ConnnectionError, + ProtocolError, + ReadError, + ResponseTimeoutError, +} from "./constant/errors.ts"; +import { log } from "./logger.ts"; +import { buildAuth } from "./packets/builders/auth.ts"; +import { buildQuery } from "./packets/builders/query.ts"; +import { ReceivePacket, SendPacket } from "./packets/packet.ts"; +import { parseError } from "./packets/parsers/err.ts"; +import { + AuthResult, + parseAuth, + parseHandshake, +} from "./packets/parsers/handshake.ts"; +import { FieldInfo, parseField, parseRow } from "./packets/parsers/result.ts"; +import { PacketType } from "./constant/packet.ts"; +import authPlugin from "./auth_plugin/index.ts"; +import { parseAuthSwitch } from "./packets/parsers/authswitch.ts"; +import auth from "./auth.ts"; +import ServerCapabilities from "./constant/capabilities.ts"; +import { buildSSLRequest } from "./packets/builders/tls.ts"; + +/** + * Connection state + */ +export enum ConnectionState { + CONNECTING, + CONNECTED, + CLOSING, + CLOSED, +} + +/** + * Result for execute sql + */ +export type ExecuteResult = { + affectedRows?: number; + lastInsertId?: number; + fields?: FieldInfo[]; + rows?: any[]; + iterator?: any; +}; + +/** Connection for mysql */ +export class Connection { + state: ConnectionState = ConnectionState.CONNECTING; + capabilities: number = 0; + serverVersion: string = ""; + + private conn?: Deno.Conn = undefined; + private _timedOut = false; + + get remoteAddr(): string { + return this.config.socketPath + ? `unix:${this.config.socketPath}` + : `${this.config.hostname}:${this.config.port}`; + } + + constructor(readonly config: ClientConfig) {} + + private async _connect() { + // TODO: implement connect timeout + if ( + this.config.tls?.mode && + this.config.tls.mode !== TLSMode.DISABLED && + this.config.tls.mode !== TLSMode.VERIFY_IDENTITY + ) { + throw new Error("unsupported tls mode"); + } + const { hostname, port = 3306, socketPath, username = "", password } = + this.config; + log.info(`connecting ${this.remoteAddr}`); + this.conn = !socketPath + ? await Deno.connect({ + transport: "tcp", + hostname, + port, + }) + : await Deno.connect({ + transport: "unix", + path: socketPath, + } as any); + + try { + let receive = await this.nextPacket(); + const handshakePacket = parseHandshake(receive.body); + + let handshakeSequenceNumber = receive.header.no; + + // Deno.startTls() only supports VERIFY_IDENTITY now. + let isSSL = false; + if ( + this.config.tls?.mode === TLSMode.VERIFY_IDENTITY + ) { + if ( + (handshakePacket.serverCapabilities & + ServerCapabilities.CLIENT_SSL) === 0 + ) { + throw new Error("Server does not support TLS"); + } + if ( + (handshakePacket.serverCapabilities & + ServerCapabilities.CLIENT_SSL) !== 0 + ) { + const tlsData = buildSSLRequest(handshakePacket, { + db: this.config.db, + }); + await new SendPacket(tlsData, ++handshakeSequenceNumber).send( + this.conn, + ); + this.conn = await Deno.startTls(this.conn, { + hostname, + caCerts: this.config.tls?.caCerts, + }); + } + isSSL = true; + } + + const data = buildAuth(handshakePacket, { + username, + password, + db: this.config.db, + ssl: isSSL, + }); + + await new SendPacket(data, ++handshakeSequenceNumber).send(this.conn); + + this.state = ConnectionState.CONNECTING; + this.serverVersion = handshakePacket.serverVersion; + this.capabilities = handshakePacket.serverCapabilities; + + receive = await this.nextPacket(); + + const authResult = parseAuth(receive); + let handler; + + switch (authResult) { + case AuthResult.AuthMoreRequired: + const adaptedPlugin = + (authPlugin as any)[handshakePacket.authPluginName]; + handler = adaptedPlugin; + break; + case AuthResult.MethodMismatch: + const authSwitch = parseAuthSwitch(receive.body); + // If CLIENT_PLUGIN_AUTH capability is not supported, no new cipher is + // sent and we have to keep using the cipher sent in the init packet. + if ( + authSwitch.authPluginData === undefined || + authSwitch.authPluginData.length === 0 + ) { + authSwitch.authPluginData = handshakePacket.seed; + } + + let authData; + if (password) { + authData = auth( + authSwitch.authPluginName, + password, + authSwitch.authPluginData, + ); + } else { + authData = Uint8Array.from([]); + } + + await new SendPacket(authData, receive.header.no + 1).send(this.conn); + + receive = await this.nextPacket(); + const authSwitch2 = parseAuthSwitch(receive.body); + if (authSwitch2.authPluginName !== "") { + throw new Error( + "Do not allow to change the auth plugin more than once!", + ); + } + } + + let result; + if (handler) { + result = await handler.start(handshakePacket.seed, password!); + while (!result.done) { + if (result.data) { + const sequenceNumber = receive.header.no + 1; + await new SendPacket(result.data, sequenceNumber).send(this.conn); + receive = await this.nextPacket(); + } + if (result.quickRead) { + await this.nextPacket(); + } + if (result.next) { + result = await result.next(receive); + } + } + } + + const header = receive.body.readUint8(); + if (header === 0xff) { + const error = parseError(receive.body, this); + log.error(`connect error(${error.code}): ${error.message}`); + this.close(); + throw new Error(error.message); + } else { + log.info(`connected to ${this.remoteAddr}`); + this.state = ConnectionState.CONNECTED; + } + + if (this.config.charset) { + await this.execute(`SET NAMES ${this.config.charset}`); + } + } catch (error) { + // Call close() to avoid leaking socket. + this.close(); + throw error; + } + } + + /** Connect to database */ + async connect(): Promise { + await this._connect(); + } + + private async nextPacket(): Promise { + if (!this.conn) { + throw new ConnnectionError("Not connected"); + } + + const timeoutTimer = this.config.timeout + ? setTimeout( + this._timeoutCallback, + this.config.timeout, + ) + : null; + let packet: ReceivePacket | null; + try { + packet = await new ReceivePacket().parse(this.conn!); + } catch (error) { + if (this._timedOut) { + // Connection has been closed by timeoutCallback. + throw new ResponseTimeoutError("Connection read timed out"); + } + timeoutTimer && clearTimeout(timeoutTimer); + this.close(); + throw error; + } + timeoutTimer && clearTimeout(timeoutTimer); + + if (!packet) { + // Connection is half-closed by the remote host. + // Call close() to avoid leaking socket. + this.close(); + throw new ReadError("Connection closed unexpectedly"); + } + if (packet.type === PacketType.ERR_Packet) { + packet.body.skip(1); + const error = parseError(packet.body, this); + throw new Error(error.message); + } + return packet!; + } + + private _timeoutCallback = () => { + log.info("connection read timed out"); + this._timedOut = true; + this.close(); + }; + + /** Close database connection */ + close(): void { + if (this.state != ConnectionState.CLOSED) { + log.info("close connection"); + this.conn?.close(); + this.state = ConnectionState.CLOSED; + } + } + + /** + * excute query sql + * @param sql query sql string + * @param params query params + */ + async query(sql: string, params?: any[]): Promise { + const result = await this.execute(sql, params); + if (result && result.rows) { + return result.rows; + } else { + return result; + } + } + + /** + * execute sql + * @param sql sql string + * @param params query params + * @param iterator whether to return an ExecuteIteratorResult or ExecuteResult + */ + async execute( + sql: string, + params?: any[], + iterator = false, + ): Promise { + if (this.state != ConnectionState.CONNECTED) { + if (this.state == ConnectionState.CLOSED) { + throw new ConnnectionError("Connection is closed"); + } else { + throw new ConnnectionError("Must be connected first"); + } + } + const data = buildQuery(sql, params); + try { + await new SendPacket(data, 0).send(this.conn!); + let receive = await this.nextPacket(); + if (receive.type === PacketType.OK_Packet) { + receive.body.skip(1); + return { + affectedRows: receive.body.readEncodedLen(), + lastInsertId: receive.body.readEncodedLen(), + }; + } else if (receive.type !== PacketType.Result) { + throw new ProtocolError(); + } + let fieldCount = receive.body.readEncodedLen(); + const fields: FieldInfo[] = []; + while (fieldCount--) { + const packet = await this.nextPacket(); + if (packet) { + const field = parseField(packet.body); + fields.push(field); + } + } + + const rows = []; + if (!(this.capabilities & ServerCapabilities.CLIENT_DEPRECATE_EOF)) { + // EOF(mysql < 5.7 or mariadb < 10.2) + receive = await this.nextPacket(); + if (receive.type !== PacketType.EOF_Packet) { + throw new ProtocolError(); + } + } + + if (!iterator) { + while (true) { + receive = await this.nextPacket(); + if (receive.type === PacketType.EOF_Packet) { + break; + } else { + const row = parseRow(receive.body, fields); + rows.push(row); + } + } + return { rows, fields }; + } + + return { + fields, + iterator: this.buildIterator(fields), + }; + } catch (error) { + this.close(); + throw error; + } + } + + private buildIterator(fields: FieldInfo[]): any { + const next = async () => { + const receive = await this.nextPacket(); + + if (receive.type === PacketType.EOF_Packet) { + return { done: true }; + } + + const value = parseRow(receive.body, fields); + + return { + done: false, + value, + }; + }; + + return { + [Symbol.asyncIterator]: () => { + return { + next, + }; + }, + }; + } +} diff --git a/lib/constant/capabilities.ts b/src/constant/capabilities.ts similarity index 92% rename from lib/constant/capabilities.ts rename to src/constant/capabilities.ts index d5cab54..a411d79 100644 --- a/lib/constant/capabilities.ts +++ b/src/constant/capabilities.ts @@ -1,7 +1,4 @@ -/** - * MySQL Server Capabilities - */ -export enum ServerCapabilities { +enum ServerCapabilities { CLIENT_LONG_PASSWORD = 0x00000001, CLIENT_FOUND_ROWS = 0x00000002, CLIENT_LONG_FLAG = 0x00000004, @@ -26,3 +23,5 @@ export enum ServerCapabilities { CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA = 0x00200000, CLIENT_DEPRECATE_EOF = 0x01000000, } + +export default ServerCapabilities; diff --git a/lib/constant/charset.ts b/src/constant/charset.ts similarity index 99% rename from lib/constant/charset.ts rename to src/constant/charset.ts index f450832..40447c9 100644 --- a/lib/constant/charset.ts +++ b/src/constant/charset.ts @@ -1,6 +1,3 @@ -/** - * MySQL Charset - */ export enum Charset { BIG5_CHINESE_CI = 1, LATIN2_CZECH_CS = 2, diff --git a/src/constant/errors.ts b/src/constant/errors.ts new file mode 100644 index 0000000..bd79fdb --- /dev/null +++ b/src/constant/errors.ts @@ -0,0 +1,29 @@ +export class ConnnectionError extends Error { + constructor(msg?: string) { + super(msg); + } +} + +export class WriteError extends ConnnectionError { + constructor(msg?: string) { + super(msg); + } +} + +export class ReadError extends ConnnectionError { + constructor(msg?: string) { + super(msg); + } +} + +export class ResponseTimeoutError extends ConnnectionError { + constructor(msg?: string) { + super(msg); + } +} + +export class ProtocolError extends ConnnectionError { + constructor(msg?: string) { + super(msg); + } +} diff --git a/src/constant/mysql_types.ts b/src/constant/mysql_types.ts new file mode 100644 index 0000000..dd6a62c --- /dev/null +++ b/src/constant/mysql_types.ts @@ -0,0 +1,60 @@ +/** @ignore */ +export const MYSQL_TYPE_DECIMAL = 0x00; +/** @ignore */ +export const MYSQL_TYPE_TINY = 0x01; +/** @ignore */ +export const MYSQL_TYPE_SHORT = 0x02; +/** @ignore */ +export const MYSQL_TYPE_LONG = 0x03; +/** @ignore */ +export const MYSQL_TYPE_FLOAT = 0x04; +/** @ignore */ +export const MYSQL_TYPE_DOUBLE = 0x05; +/** @ignore */ +export const MYSQL_TYPE_NULL = 0x06; +/** @ignore */ +export const MYSQL_TYPE_TIMESTAMP = 0x07; +/** @ignore */ +export const MYSQL_TYPE_LONGLONG = 0x08; +/** @ignore */ +export const MYSQL_TYPE_INT24 = 0x09; +/** @ignore */ +export const MYSQL_TYPE_DATE = 0x0a; +/** @ignore */ +export const MYSQL_TYPE_TIME = 0x0b; +/** @ignore */ +export const MYSQL_TYPE_DATETIME = 0x0c; +/** @ignore */ +export const MYSQL_TYPE_YEAR = 0x0d; +/** @ignore */ +export const MYSQL_TYPE_NEWDATE = 0x0e; +/** @ignore */ +export const MYSQL_TYPE_VARCHAR = 0x0f; +/** @ignore */ +export const MYSQL_TYPE_BIT = 0x10; +/** @ignore */ +export const MYSQL_TYPE_TIMESTAMP2 = 0x11; +/** @ignore */ +export const MYSQL_TYPE_DATETIME2 = 0x12; +/** @ignore */ +export const MYSQL_TYPE_TIME2 = 0x13; +/** @ignore */ +export const MYSQL_TYPE_NEWDECIMAL = 0xf6; +/** @ignore */ +export const MYSQL_TYPE_ENUM = 0xf7; +/** @ignore */ +export const MYSQL_TYPE_SET = 0xf8; +/** @ignore */ +export const MYSQL_TYPE_TINY_BLOB = 0xf9; +/** @ignore */ +export const MYSQL_TYPE_MEDIUM_BLOB = 0xfa; +/** @ignore */ +export const MYSQL_TYPE_LONG_BLOB = 0xfb; +/** @ignore */ +export const MYSQL_TYPE_BLOB = 0xfc; +/** @ignore */ +export const MYSQL_TYPE_VAR_STRING = 0xfd; +/** @ignore */ +export const MYSQL_TYPE_STRING = 0xfe; +/** @ignore */ +export const MYSQL_TYPE_GEOMETRY = 0xff; diff --git a/lib/constant/packet.ts b/src/constant/packet.ts similarity index 55% rename from lib/constant/packet.ts rename to src/constant/packet.ts index 0cd49fe..715e411 100644 --- a/lib/constant/packet.ts +++ b/src/constant/packet.ts @@ -1,7 +1,4 @@ -/** - * PacketType - */ -export enum ComQueryResponsePacket { +export enum PacketType { OK_Packet = 0x00, EOF_Packet = 0xfe, ERR_Packet = 0xff, diff --git a/lib/constant/server_status.ts b/src/constant/server_status.ts similarity index 93% rename from lib/constant/server_status.ts rename to src/constant/server_status.ts index d38c5d2..146889f 100644 --- a/lib/constant/server_status.ts +++ b/src/constant/server_status.ts @@ -1,6 +1,4 @@ -/** - * Server status flags - */ +/** @ignore */ export enum ServerStatus { IN_TRANSACTION = 0x0001, AUTO_COMMIT = 0x0002, diff --git a/src/deferred.ts b/src/deferred.ts new file mode 100644 index 0000000..0b3e95b --- /dev/null +++ b/src/deferred.ts @@ -0,0 +1,73 @@ +import { Deferred, deferred } from "../deps.ts"; + +/** @ignore */ +export class DeferredStack { + private _queue: Deferred[] = []; + private _size = 0; + + constructor( + readonly _maxSize: number, + private _array: T[] = [], + private readonly creator: () => Promise, + ) { + this._size = _array.length; + } + + get size(): number { + return this._size; + } + + get maxSize(): number { + return this._maxSize; + } + + get available(): number { + return this._array.length; + } + + async pop(): Promise { + if (this._array.length) { + return this._array.pop()!; + } else if (this._size < this._maxSize) { + this._size++; + let item: T; + try { + item = await this.creator(); + } catch (err) { + this._size--; + throw err; + } + return item; + } + const defer = deferred(); + this._queue.push(defer); + return await defer; + } + + /** Returns false if the item is consumed by a deferred pop */ + push(item: T): boolean { + if (this._queue.length) { + this._queue.shift()!.resolve(item); + return false; + } else { + this._array.push(item); + return true; + } + } + + tryPopAvailable() { + return this._array.pop(); + } + + remove(item: T): boolean { + const index = this._array.indexOf(item); + if (index < 0) return false; + this._array.splice(index, 1); + this._size--; + return true; + } + + reduceSize() { + this._size--; + } +} diff --git a/src/logger.ts b/src/logger.ts new file mode 100644 index 0000000..dad062a --- /dev/null +++ b/src/logger.ts @@ -0,0 +1,51 @@ +import { log } from "../deps.ts"; + +let logger = log.getLogger(); + +export { logger as log }; + +let isDebug = false; + +/** @ignore */ +export function debug(func: Function) { + if (isDebug) { + func(); + } +} + +export interface LoggerConfig { + /** Enable logging (default: true) */ + enable?: boolean; + /** The minimal level to print (default: "INFO") */ + level?: log.LevelName; + /** A deno_std/log.Logger instance to be used as logger. When used, `level` is ignored. */ + logger?: log.Logger; +} + +export async function configLogger(config: LoggerConfig) { + let { enable = true, level = "INFO" } = config; + if (config.logger) level = config.logger.levelName; + isDebug = level == "DEBUG"; + + if (!enable) { + logger = new log.Logger("fakeLogger", "NOTSET", {}); + logger.level = 0; + } else { + if (!config.logger) { + await log.setup({ + handlers: { + console: new log.handlers.ConsoleHandler(level), + }, + loggers: { + default: { + level: "DEBUG", + handlers: ["console"], + }, + }, + }); + logger = log.getLogger(); + } else { + logger = config.logger; + } + } +} diff --git a/lib/packets/builders/auth.ts b/src/packets/builders/auth.ts similarity index 85% rename from lib/packets/builders/auth.ts rename to src/packets/builders/auth.ts index 3cff7ac..194c485 100644 --- a/lib/packets/builders/auth.ts +++ b/src/packets/builders/auth.ts @@ -1,15 +1,15 @@ -import auth from "../../utils/hash.ts"; -import { BufferWriter } from "../../utils/buffer.ts"; -import { ServerCapabilities } from "../../constant/capabilities.ts"; +import auth from "../../auth.ts"; +import { BufferWriter } from "../../buffer.ts"; +import ServerCapabilities from "../../constant/capabilities.ts"; import { Charset } from "../../constant/charset.ts"; import type { HandshakeBody } from "../parsers/handshake.ts"; import { clientCapabilities } from "./client_capabilities.ts"; /** @ignore */ -export async function buildAuth( +export function buildAuth( packet: HandshakeBody, - params: { username: string; password?: string; db?: string; ssl: boolean }, -): Promise { + params: { username: string; password?: string; db?: string; ssl?: boolean }, +): Uint8Array { const clientParam: number = clientCapabilities(packet, params); if (packet.serverCapabilities & ServerCapabilities.CLIENT_PLUGIN_AUTH) { @@ -21,7 +21,7 @@ export async function buildAuth( .skip(23) .writeNullTerminatedString(params.username); if (params.password) { - const authData = await auth( + const authData = auth( packet.authPluginName, params.password, packet.seed, diff --git a/lib/packets/builders/client_capabilities.ts b/src/packets/builders/client_capabilities.ts similarity index 92% rename from lib/packets/builders/client_capabilities.ts rename to src/packets/builders/client_capabilities.ts index b03125e..842fdcb 100644 --- a/lib/packets/builders/client_capabilities.ts +++ b/src/packets/builders/client_capabilities.ts @@ -1,4 +1,4 @@ -import { ServerCapabilities } from "../../constant/capabilities.ts"; +import ServerCapabilities from "../../constant/capabilities.ts"; import type { HandshakeBody } from "../parsers/handshake.ts"; export function clientCapabilities( diff --git a/src/packets/builders/query.ts b/src/packets/builders/query.ts new file mode 100644 index 0000000..8882f06 --- /dev/null +++ b/src/packets/builders/query.ts @@ -0,0 +1,11 @@ +import { replaceParams } from "../../../deps.ts"; +import { BufferWriter, encode } from "../../buffer.ts"; + +/** @ignore */ +export function buildQuery(sql: string, params: any[] = []): Uint8Array { + const data = encode(replaceParams(sql, params)); + const writer = new BufferWriter(new Uint8Array(data.length + 1)); + writer.write(0x03); + writer.writeBuffer(data); + return writer.buffer; +} diff --git a/lib/packets/builders/tls.ts b/src/packets/builders/tls.ts similarity index 91% rename from lib/packets/builders/tls.ts rename to src/packets/builders/tls.ts index 5963c01..487301a 100644 --- a/lib/packets/builders/tls.ts +++ b/src/packets/builders/tls.ts @@ -1,4 +1,4 @@ -import { BufferWriter } from "../../utils/buffer.ts"; +import { BufferWriter } from "../../buffer.ts"; import { Charset } from "../../constant/charset.ts"; import type { HandshakeBody } from "../parsers/handshake.ts"; import { clientCapabilities } from "./client_capabilities.ts"; diff --git a/src/packets/packet.ts b/src/packets/packet.ts new file mode 100644 index 0000000..d58c41c --- /dev/null +++ b/src/packets/packet.ts @@ -0,0 +1,106 @@ +import { byteFormat } from "../../deps.ts"; +import { BufferReader, BufferWriter } from "../buffer.ts"; +import { WriteError } from "../constant/errors.ts"; +import { debug, log } from "../logger.ts"; +import { PacketType } from "../../src/constant/packet.ts"; + +/** @ignore */ +interface PacketHeader { + size: number; + no: number; +} + +/** @ignore */ +export class SendPacket { + header: PacketHeader; + + constructor(readonly body: Uint8Array, no: number) { + this.header = { size: body.length, no }; + } + + async send(conn: Deno.Conn) { + const body = this.body as Uint8Array; + const data = new BufferWriter(new Uint8Array(4 + body.length)); + data.writeUints(3, this.header.size); + data.write(this.header.no); + data.writeBuffer(body); + debug(() => { + log.debug(`send: ${data.length}B \n${byteFormat(data.buffer)}\n`); + }); + try { + let wrote = 0; + do { + wrote += await conn.write(data.buffer.subarray(wrote)); + } while (wrote < data.length); + } catch (error) { + throw new WriteError(error.message); + } + } +} + +/** @ignore */ +export class ReceivePacket { + header!: PacketHeader; + body!: BufferReader; + type!: PacketType; + + async parse(reader: Deno.Reader): Promise { + const header = new BufferReader(new Uint8Array(4)); + let readCount = 0; + let nread = await this.read(reader, header.buffer); + if (nread === null) return null; + readCount = nread; + const bodySize = header.readUints(3); + this.header = { + size: bodySize, + no: header.readUint8(), + }; + this.body = new BufferReader(new Uint8Array(bodySize)); + nread = await this.read(reader, this.body.buffer); + if (nread === null) return null; + readCount += nread; + + const { OK_Packet, ERR_Packet, EOF_Packet, Result } = PacketType; + switch (this.body.buffer[0]) { + case OK_Packet: + this.type = OK_Packet; + break; + case 0xff: + this.type = ERR_Packet; + break; + case 0xfe: + this.type = EOF_Packet; + break; + default: + this.type = Result; + break; + } + + debug(() => { + const data = new Uint8Array(readCount); + data.set(header.buffer); + data.set(this.body.buffer, 4); + log.debug( + `receive: ${readCount}B, size = ${this.header.size}, no = ${this.header.no} \n${ + byteFormat(data) + }\n`, + ); + }); + + return this; + } + + private async read( + reader: Deno.Reader, + buffer: Uint8Array, + ): Promise { + const size = buffer.length; + let haveRead = 0; + while (haveRead < size) { + const nread = await reader.read(buffer.subarray(haveRead)); + if (nread === null) return null; + haveRead += nread; + } + return haveRead; + } +} diff --git a/lib/packets/parsers/authswitch.ts b/src/packets/parsers/authswitch.ts similarity index 88% rename from lib/packets/parsers/authswitch.ts rename to src/packets/parsers/authswitch.ts index 698b186..ac8b728 100644 --- a/lib/packets/parsers/authswitch.ts +++ b/src/packets/parsers/authswitch.ts @@ -1,4 +1,4 @@ -import type { BufferReader } from "../../utils/buffer.ts"; +import { BufferReader } from "../../buffer.ts"; /** @ignore */ export interface authSwitchBody { diff --git a/lib/packets/parsers/err.ts b/src/packets/parsers/err.ts similarity index 72% rename from lib/packets/parsers/err.ts rename to src/packets/parsers/err.ts index dac14ef..8589446 100644 --- a/lib/packets/parsers/err.ts +++ b/src/packets/parsers/err.ts @@ -1,6 +1,6 @@ -import type { BufferReader } from "../../utils/buffer.ts"; -import type { MysqlConnection } from "../../connection.ts"; -import { ServerCapabilities } from "../../constant/capabilities.ts"; +import type { BufferReader } from "../../buffer.ts"; +import type { Connection } from "../../connection.ts"; +import ServerCapabilities from "../../constant/capabilities.ts"; /** @ignore */ export interface ErrorPacket { @@ -13,7 +13,7 @@ export interface ErrorPacket { /** @ignore */ export function parseError( reader: BufferReader, - conn: MysqlConnection, + conn: Connection, ): ErrorPacket { const code = reader.readUint16(); const packet: ErrorPacket = { diff --git a/lib/packets/parsers/handshake.ts b/src/packets/parsers/handshake.ts similarity index 81% rename from lib/packets/parsers/handshake.ts rename to src/packets/parsers/handshake.ts index c89ef1a..959e028 100644 --- a/lib/packets/parsers/handshake.ts +++ b/src/packets/parsers/handshake.ts @@ -1,7 +1,7 @@ -import { type BufferReader, BufferWriter } from "../../utils/buffer.ts"; -import { ServerCapabilities } from "../../constant/capabilities.ts"; -import { ComQueryResponsePacket } from "../../constant/packet.ts"; -import type { PacketReader } from "../packet.ts"; +import { BufferReader, BufferWriter } from "../../buffer.ts"; +import ServerCapabilities from "../../constant/capabilities.ts"; +import { PacketType } from "../../constant/packet.ts"; +import { ReceivePacket } from "../packet.ts"; /** @ignore */ export interface HandshakeBody { @@ -73,13 +73,13 @@ export enum AuthResult { MethodMismatch, AuthMoreRequired, } -export function parseAuth(packet: PacketReader): AuthResult { +export function parseAuth(packet: ReceivePacket): AuthResult { switch (packet.type) { - case ComQueryResponsePacket.EOF_Packet: + case PacketType.EOF_Packet: return AuthResult.MethodMismatch; - case ComQueryResponsePacket.Result: + case PacketType.Result: return AuthResult.AuthMoreRequired; - case ComQueryResponsePacket.OK_Packet: + case PacketType.OK_Packet: return AuthResult.AuthPassed; default: return AuthResult.AuthPassed; diff --git a/src/packets/parsers/result.ts b/src/packets/parsers/result.ts new file mode 100644 index 0000000..83adab6 --- /dev/null +++ b/src/packets/parsers/result.ts @@ -0,0 +1,125 @@ +import type { BufferReader } from "../../buffer.ts"; +import { + MYSQL_TYPE_DATE, + MYSQL_TYPE_DATETIME, + MYSQL_TYPE_DATETIME2, + MYSQL_TYPE_DECIMAL, + MYSQL_TYPE_DOUBLE, + MYSQL_TYPE_FLOAT, + MYSQL_TYPE_INT24, + MYSQL_TYPE_LONG, + MYSQL_TYPE_LONGLONG, + MYSQL_TYPE_NEWDATE, + MYSQL_TYPE_NEWDECIMAL, + MYSQL_TYPE_SHORT, + MYSQL_TYPE_STRING, + MYSQL_TYPE_TIME, + MYSQL_TYPE_TIME2, + MYSQL_TYPE_TIMESTAMP, + MYSQL_TYPE_TIMESTAMP2, + MYSQL_TYPE_TINY, + MYSQL_TYPE_VAR_STRING, + MYSQL_TYPE_VARCHAR, +} from "../../constant/mysql_types.ts"; + +/** @ignore */ +export interface FieldInfo { + catalog: string; + schema: string; + table: string; + originTable: string; + name: string; + originName: string; + encoding: number; + fieldLen: number; + fieldType: number; + fieldFlag: number; + decimals: number; + defaultVal: string; +} + +/** @ignore */ +export function parseField(reader: BufferReader): FieldInfo { + const catalog = reader.readLenCodeString()!; + const schema = reader.readLenCodeString()!; + const table = reader.readLenCodeString()!; + const originTable = reader.readLenCodeString()!; + const name = reader.readLenCodeString()!; + const originName = reader.readLenCodeString()!; + reader.skip(1); + const encoding = reader.readUint16()!; + const fieldLen = reader.readUint32()!; + const fieldType = reader.readUint8()!; + const fieldFlag = reader.readUint16()!; + const decimals = reader.readUint8()!; + reader.skip(1); + const defaultVal = reader.readLenCodeString()!; + return { + catalog, + schema, + table, + originName, + fieldFlag, + originTable, + fieldLen, + name, + fieldType, + encoding, + decimals, + defaultVal, + }; +} + +/** @ignore */ +export function parseRow(reader: BufferReader, fields: FieldInfo[]): any { + const row: any = {}; + for (const field of fields) { + const name = field.name; + const val = reader.readLenCodeString(); + row[name] = val === null ? null : convertType(field, val); + } + return row; +} + +/** @ignore */ +function convertType(field: FieldInfo, val: string): any { + const { fieldType, fieldLen } = field; + switch (fieldType) { + case MYSQL_TYPE_DECIMAL: + case MYSQL_TYPE_DOUBLE: + case MYSQL_TYPE_FLOAT: + case MYSQL_TYPE_DATETIME2: + return parseFloat(val); + case MYSQL_TYPE_NEWDECIMAL: + return val; // #42 MySQL's decimal type cannot be accurately represented by the Number. + case MYSQL_TYPE_TINY: + case MYSQL_TYPE_SHORT: + case MYSQL_TYPE_LONG: + case MYSQL_TYPE_INT24: + return parseInt(val); + case MYSQL_TYPE_LONGLONG: + if ( + Number(val) < Number.MIN_SAFE_INTEGER || + Number(val) > Number.MAX_SAFE_INTEGER + ) { + return BigInt(val); + } else { + return parseInt(val); + } + case MYSQL_TYPE_VARCHAR: + case MYSQL_TYPE_VAR_STRING: + case MYSQL_TYPE_STRING: + case MYSQL_TYPE_TIME: + case MYSQL_TYPE_TIME2: + return val; + case MYSQL_TYPE_DATE: + case MYSQL_TYPE_TIMESTAMP: + case MYSQL_TYPE_DATETIME: + case MYSQL_TYPE_NEWDATE: + case MYSQL_TYPE_TIMESTAMP2: + case MYSQL_TYPE_DATETIME2: + return new Date(val); + default: + return val; + } +} diff --git a/src/pool.ts b/src/pool.ts new file mode 100644 index 0000000..f1de757 --- /dev/null +++ b/src/pool.ts @@ -0,0 +1,128 @@ +import { DeferredStack } from "./deferred.ts"; +import { Connection } from "./connection.ts"; +import { log } from "./logger.ts"; + +/** @ignore */ +export class PoolConnection extends Connection { + _pool?: ConnectionPool = undefined; + + private _idleTimer?: number = undefined; + private _idle = false; + + /** + * Should be called by the pool. + */ + enterIdle() { + this._idle = true; + if (this.config.idleTimeout) { + this._idleTimer = setTimeout(() => { + log.info("connection idle timeout"); + this._pool!.remove(this); + try { + this.close(); + } catch (error) { + log.warning(`error closing idle connection`, error); + } + }, this.config.idleTimeout); + try { + // Don't block the event loop from finishing + Deno.unrefTimer(this._idleTimer); + } catch (_error) { + // unrefTimer() is unstable API in older version of Deno + } + } + } + + /** + * Should be called by the pool. + */ + exitIdle() { + this._idle = false; + if (this._idleTimer !== undefined) { + clearTimeout(this._idleTimer); + } + } + + /** + * Remove the connection from the pool permanently, when the connection is not usable. + */ + removeFromPool() { + this._pool!.reduceSize(); + this._pool = undefined; + } + + returnToPool() { + this._pool?.push(this); + } +} + +/** @ignore */ +export class ConnectionPool { + _deferred: DeferredStack; + _connections: PoolConnection[] = []; + _closed: boolean = false; + + constructor(maxSize: number, creator: () => Promise) { + this._deferred = new DeferredStack(maxSize, this._connections, async () => { + const conn = await creator(); + conn._pool = this; + return conn; + }); + } + + get info() { + return { + size: this._deferred.size, + maxSize: this._deferred.maxSize, + available: this._deferred.available, + }; + } + + push(conn: PoolConnection) { + if (this._closed) { + conn.close(); + this.reduceSize(); + } + if (this._deferred.push(conn)) { + conn.enterIdle(); + } + } + + async pop(): Promise { + if (this._closed) { + throw new Error("Connection pool is closed"); + } + let conn = this._deferred.tryPopAvailable(); + if (conn) { + conn.exitIdle(); + } else { + conn = await this._deferred.pop(); + } + return conn; + } + + remove(conn: PoolConnection) { + return this._deferred.remove(conn); + } + + /** + * Close the pool and all connections in the pool. + * + * After closing, pop() will throw an error, + * push() will close the connection immediately. + */ + close() { + this._closed = true; + + let conn: PoolConnection | undefined; + while (conn = this._deferred.tryPopAvailable()) { + conn.exitIdle(); + conn.close(); + this.reduceSize(); + } + } + + reduceSize() { + this._deferred.reduceSize(); + } +} diff --git a/lib/utils/bytes.ts b/src/util.ts similarity index 100% rename from lib/utils/bytes.ts rename to src/util.ts diff --git a/test.deps.ts b/test.deps.ts new file mode 100644 index 0000000..c48f9b8 --- /dev/null +++ b/test.deps.ts @@ -0,0 +1,6 @@ +export { + assertEquals, + assertThrowsAsync, +} from "https://deno.land/std@0.104.0/testing/asserts.ts"; +export * as semver from "https://deno.land/x/semver@v1.4.0/mod.ts"; +export { parse } from "https://deno.land/std@0.104.0/flags/mod.ts"; diff --git a/test.ts b/test.ts new file mode 100644 index 0000000..e030e8c --- /dev/null +++ b/test.ts @@ -0,0 +1,390 @@ +import { assertEquals, assertThrowsAsync, semver } from "./test.deps.ts"; +import { + ConnnectionError, + ResponseTimeoutError, +} from "./src/constant/errors.ts"; +import { + createTestDB, + delay, + isMariaDB, + registerTests, + testWithClient, +} from "./test.util.ts"; +import { log as stdlog } from "./deps.ts"; +import { log } from "./src/logger.ts"; +import { configLogger } from "./mod.ts"; + +testWithClient(async function testCreateDb(client) { + await client.query(`CREATE DATABASE IF NOT EXISTS enok`); +}); + +testWithClient(async function testCreateTable(client) { + await client.query(`DROP TABLE IF EXISTS users`); + await client.query(` + CREATE TABLE users ( + id int(11) NOT NULL AUTO_INCREMENT, + name varchar(100) NOT NULL, + is_top tinyint(1) default 0, + created_at timestamp not null default current_timestamp, + PRIMARY KEY (id) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + `); +}); + +testWithClient(async function testInsert(client) { + let result = await client.execute(`INSERT INTO users(name) values(?)`, [ + "manyuanrong", + ]); + assertEquals(result, { affectedRows: 1, lastInsertId: 1 }); + result = await client.execute(`INSERT INTO users ?? values ?`, [ + ["id", "name"], + [2, "MySQL"], + ]); + assertEquals(result, { affectedRows: 1, lastInsertId: 2 }); +}); + +testWithClient(async function testUpdate(client) { + let result = await client.execute( + `update users set ?? = ?, ?? = ? WHERE id = ?`, + ["name", "MYR🦕", "created_at", new Date(), 1], + ); + assertEquals(result, { affectedRows: 1, lastInsertId: 0 }); +}); + +testWithClient(async function testQuery(client) { + let result = await client.query( + "select ??,`is_top`,`name` from ?? where id = ?", + ["id", "users", 1], + ); + assertEquals(result, [{ id: 1, name: "MYR🦕", is_top: 0 }]); +}); + +testWithClient(async function testQueryErrorOccurred(client) { + assertEquals(client.pool, { + size: 0, + maxSize: client.config.poolSize, + available: 0, + }); + await assertThrowsAsync( + () => client.query("select unknownfield from `users`"), + Error, + ); + await client.query("select 1"); + assertEquals(client.pool, { + size: 1, + maxSize: client.config.poolSize, + available: 1, + }); +}); + +testWithClient(async function testQueryList(client) { + const sql = "select ??,?? from ??"; + let result = await client.query(sql, ["id", "name", "users"]); + assertEquals(result, [ + { id: 1, name: "MYR🦕" }, + { id: 2, name: "MySQL" }, + ]); +}); + +testWithClient(async function testQueryTime(client) { + const sql = `SELECT CAST("09:04:10" AS time) as time`; + let result = await client.query(sql); + assertEquals(result, [{ time: "09:04:10" }]); +}); + +testWithClient(async function testQueryBigint(client) { + await client.query(`DROP TABLE IF EXISTS test_bigint`); + await client.query(`CREATE TABLE test_bigint ( + id int(11) NOT NULL AUTO_INCREMENT, + bigint_column bigint NOT NULL, + PRIMARY KEY (id) + ) ENGINE=MEMORY DEFAULT CHARSET=utf8mb4`); + + const value = "9223372036854775807"; + await client.execute( + "INSERT INTO test_bigint(bigint_column) VALUES (?)", + [value], + ); + + const result = await client.query("SELECT bigint_column FROM test_bigint"); + assertEquals(result, [{ bigint_column: BigInt(value) }]); +}); + +testWithClient(async function testQueryDecimal(client) { + await client.query(`DROP TABLE IF EXISTS test_decimal`); + await client.query(`CREATE TABLE test_decimal ( + id int(11) NOT NULL AUTO_INCREMENT, + decimal_column decimal(65,30) NOT NULL, + PRIMARY KEY (id) + ) ENGINE=MEMORY DEFAULT CHARSET=utf8mb4`); + + const value = "0.012345678901234567890123456789"; + await client.execute( + "INSERT INTO test_decimal(decimal_column) VALUES (?)", + [value], + ); + + const result = await client.query("SELECT decimal_column FROM test_decimal"); + assertEquals(result, [{ decimal_column: value }]); +}); + +testWithClient(async function testQueryDatetime(client) { + await client.useConnection(async (connection) => { + if (isMariaDB(connection) || semver.lt(connection.serverVersion, "5.6.0")) { + return; + } + + await client.query(`DROP TABLE IF EXISTS test_datetime`); + await client.query(`CREATE TABLE test_datetime ( + id int(11) NOT NULL AUTO_INCREMENT, + datetime datetime(6) NOT NULL, + PRIMARY KEY (id) + ) ENGINE=MEMORY DEFAULT CHARSET=utf8mb4`); + const datetime = new Date(); + await client.execute( + ` + INSERT INTO test_datetime (datetime) + VALUES (?)`, + [datetime], + ); + + const [row] = await client.query("SELECT datetime FROM test_datetime"); + assertEquals(row.datetime.toISOString(), datetime.toISOString()); // See https://github.com/denoland/deno/issues/6643 + }); +}); + +testWithClient(async function testDelete(client) { + let result = await client.execute(`delete from users where ?? = ?`, [ + "id", + 1, + ]); + assertEquals(result, { affectedRows: 1, lastInsertId: 0 }); +}); + +testWithClient(async function testPool(client) { + assertEquals(client.pool, { + maxSize: client.config.poolSize, + available: 0, + size: 0, + }); + const expect = new Array(10).fill([{ "1": 1 }]); + const result = await Promise.all(expect.map(() => client.query(`select 1`))); + + assertEquals(client.pool, { + maxSize: client.config.poolSize, + available: 3, + size: 3, + }); + assertEquals(result, expect); +}); + +testWithClient(async function testQueryOnClosed(client) { + for (const i of [0, 0, 0]) { + await assertThrowsAsync(async () => { + await client.transaction(async (conn) => { + conn.close(); + await conn.query("SELECT 1"); + }); + }, ConnnectionError); + } + assertEquals(client.pool?.size, 0); + await client.query("select 1"); +}); + +testWithClient(async function testTransactionSuccess(client) { + const success = await client.transaction(async (connection) => { + await connection.execute("insert into users(name) values(?)", [ + "transaction1", + ]); + await connection.execute("delete from users where id = ?", [2]); + return true; + }); + assertEquals(true, success); + const result = await client.query("select name,id from users"); + assertEquals([{ name: "transaction1", id: 3 }], result); +}); + +testWithClient(async function testTransactionRollback(client) { + let success; + await assertThrowsAsync(async () => { + success = await client.transaction(async (connection) => { + // Insert an existing id + await connection.execute("insert into users(name,id) values(?,?)", [ + "transaction2", + 3, + ]); + return true; + }); + }); + assertEquals(undefined, success); + const result = await client.query("select name from users"); + assertEquals([{ name: "transaction1" }], result); +}); + +testWithClient(async function testIdleTimeout(client) { + assertEquals(client.pool, { + maxSize: 3, + available: 0, + size: 0, + }); + await Promise.all(new Array(10).fill(0).map(() => client.query("select 1"))); + assertEquals(client.pool, { + maxSize: 3, + available: 3, + size: 3, + }); + await delay(500); + assertEquals(client.pool, { + maxSize: 3, + available: 3, + size: 3, + }); + await client.query("select 1"); + await delay(500); + assertEquals(client.pool, { + maxSize: 3, + available: 1, + size: 1, + }); + await delay(500); + assertEquals(client.pool, { + maxSize: 3, + available: 0, + size: 0, + }); +}, { + idleTimeout: 750, +}); + +testWithClient(async function testReadTimeout(client) { + await client.execute("select sleep(0.3)"); + + await assertThrowsAsync(async () => { + await client.execute("select sleep(0.7)"); + }, ResponseTimeoutError); + + assertEquals(client.pool, { + maxSize: 3, + available: 0, + size: 0, + }); +}, { + timeout: 500, +}); + +testWithClient(async function testLargeQueryAndResponse(client) { + function buildLargeString(len: number) { + let str = ""; + for (let i = 0; i < len; i++) { + str += i % 10; + } + return str; + } + const largeString = buildLargeString(512 * 1024); + assertEquals( + await client.query(`select "${largeString}" as str`), + [{ str: largeString }], + ); +}); + +testWithClient(async function testExecuteIterator(client) { + await client.useConnection(async (conn) => { + await conn.execute(`DROP TABLE IF EXISTS numbers`); + await conn.execute(`CREATE TABLE numbers (num INT NOT NULL)`); + await conn.execute( + `INSERT INTO numbers (num) VALUES ${ + new Array(64).fill(0).map((v, idx) => `(${idx})`).join(",") + }`, + ); + const r = await conn.execute(`SELECT num FROM numbers`, [], true); + let count = 0; + for await (const row of r.iterator) { + assertEquals(row.num, count); + count++; + } + assertEquals(count, 64); + }); +}); + +// For MySQL 8, the default auth plugin is `caching_sha2_password`. Create user +// using `mysql_native_password` to test Authentication Method Mismatch. +testWithClient(async function testCreateUserWithMysqlNativePassword(client) { + const { version } = (await client.query(`SELECT VERSION() as version`))[0]; + if (version.startsWith("8.")) { + // MySQL 8 does not have `PASSWORD()` function + await client.execute( + `CREATE USER 'testuser'@'%' IDENTIFIED WITH mysql_native_password BY 'testpassword'`, + ); + } else { + await client.execute( + `CREATE USER 'testuser'@'%' IDENTIFIED WITH mysql_native_password`, + ); + await client.execute( + `SET PASSWORD FOR 'testuser'@'%' = PASSWORD('testpassword')`, + ); + } + await client.execute(`GRANT ALL ON test.* TO 'testuser'@'%'`); +}); + +testWithClient(async function testConnectWithMysqlNativePassword(client) { + assertEquals( + await client.query(`SELECT CURRENT_USER() AS user`), + [{ user: "testuser@%" }], + ); +}, { username: "testuser", password: "testpassword" }); + +testWithClient(async function testDropUserWithMysqlNativePassword(client) { + await client.execute(`DROP USER 'testuser'@'%'`); +}); + +testWithClient(async function testSelectEmptyString(client) { + assertEquals( + await client.query(`SELECT '' AS a`), + [{ a: "" }], + ); + assertEquals( + await client.query(`SELECT '' AS a, '' AS b, '' AS c`), + [{ a: "", b: "", c: "" }], + ); + assertEquals( + await client.query(`SELECT '' AS a, 'b' AS b, '' AS c`), + [{ a: "", b: "b", c: "" }], + ); +}); + +registerTests(); + +Deno.test("configLogger()", async () => { + let logCount = 0; + const fakeHandler = new class extends stdlog.handlers.BaseHandler { + constructor() { + super("INFO"); + } + log(msg: string) { + logCount++; + } + }(); + + await stdlog.setup({ + handlers: { + fake: fakeHandler, + }, + loggers: { + mysql: { + handlers: ["fake"], + }, + }, + }); + await configLogger({ logger: stdlog.getLogger("mysql") }); + log.info("Test log"); + assertEquals(logCount, 1); + + await configLogger({ enable: false }); + log.info("Test log"); + assertEquals(logCount, 1); +}); + +await createTestDB(); + +await new Promise((r) => setTimeout(r, 0)); +// Workaround to https://github.com/denoland/deno/issues/7844 diff --git a/test.util.ts b/test.util.ts new file mode 100644 index 0000000..985fb7f --- /dev/null +++ b/test.util.ts @@ -0,0 +1,98 @@ +import { Client, ClientConfig, Connection } from "./mod.ts"; +import { assertEquals, parse } from "./test.deps.ts"; + +const { DB_PORT, DB_NAME, DB_PASSWORD, DB_USER, DB_HOST, DB_SOCKPATH } = Deno + .env.toObject(); +const port = DB_PORT ? parseInt(DB_PORT) : 3306; +const db = DB_NAME || "test"; +const password = DB_PASSWORD || "root"; +const username = DB_USER || "root"; +const hostname = DB_HOST || "127.0.0.1"; +const sockPath = DB_SOCKPATH || "/var/run/mysqld/mysqld.sock"; +const testMethods = + Deno.env.get("TEST_METHODS")?.split(",") as ("tcp" | "unix")[] || ["tcp"]; +const unixSocketOnly = testMethods.length === 1 && testMethods[0] === "unix"; + +const config: ClientConfig = { + timeout: 10000, + poolSize: 3, + debug: true, + hostname, + username, + port, + db, + charset: "utf8mb4", + password, +}; + +const tests: (Parameters)[] = []; + +export function testWithClient( + fn: (client: Client) => void | Promise, + overrideConfig?: ClientConfig, +): void { + tests.push([fn, overrideConfig]); +} + +export function registerTests(methods: ("tcp" | "unix")[] = testMethods) { + if (methods!.includes("tcp")) { + tests.forEach(([fn, overrideConfig]) => { + Deno.test({ + name: fn.name + " (TCP)", + async fn() { + await test({ ...config, ...overrideConfig }, fn); + }, + }); + }); + } + if (methods!.includes("unix")) { + tests.forEach(([fn, overrideConfig]) => { + Deno.test({ + name: fn.name + " (UNIX domain socket)", + async fn() { + await test( + { ...config, socketPath: sockPath, ...overrideConfig }, + fn, + ); + }, + }); + }); + } +} + +async function test( + config: ClientConfig, + fn: (client: Client) => void | Promise, +) { + const resources = Deno.resources(); + const client = await new Client().connect(config); + try { + await fn(client); + } finally { + await client.close(); + } + assertEquals( + Deno.resources(), + resources, + "The client is leaking resources", + ); +} + +export async function createTestDB() { + const client = await new Client().connect({ + ...config, + poolSize: 1, + db: undefined, + socketPath: unixSocketOnly ? sockPath : undefined, + }); + await client.execute(`CREATE DATABASE IF NOT EXISTS ${db}`); + await client.close(); +} + +export function isMariaDB(connection: Connection): boolean { + return connection.serverVersion.includes("MariaDB"); +} + +export function delay(ms: number) { + return new Promise((resolve) => setTimeout(resolve, ms)); +}