diff --git a/.github/workflows/test_sqlite3x.yml b/.github/workflows/test_sqllex.yml
similarity index 75%
rename from .github/workflows/test_sqlite3x.yml
rename to .github/workflows/test_sqllex.yml
index 8e19cf1..265c76d 100644
--- a/.github/workflows/test_sqlite3x.yml
+++ b/.github/workflows/test_sqllex.yml
@@ -1,6 +1,6 @@
# This workflow run tests for sqllex.SQLite3x class
-name: Test Sqlite3x
+name: Test Sqllex
on:
push:
@@ -35,13 +35,11 @@ jobs:
- name: Moving tests to current dir
run: |
- mv ./tests/sqlite3x/* ./
+ mv ./tests/* ./
- - name: Main test - tests/new_test_all.py
+ - name: Running unit tests (tests/test_sqllex.py)
run: |
- python new_test_all.py
- python old_test_all_1.py
- python old_test_all_2.py
+ python -m unittest test_sqllex.py
python-3-8:
@@ -60,10 +58,8 @@ jobs:
- name: Moving tests to current dir
run: |
- mv ./tests/sqlite3x/* ./
+ mv ./tests/* ./
- - name: Main test - tests/new_test_all.py
+ - name: Running unit tests (tests/test_sqllex.py)
run: |
- python new_test_all.py
- python old_test_all_1.py
- python old_test_all_2.py
+ python -m unittest test_sqllex.py
diff --git a/README.md b/README.md
index 8d391e1..d95fb81 100644
--- a/README.md
+++ b/README.md
@@ -3,7 +3,7 @@
-# SQLLEX ORM v0.2.0.5
+# SQLLEX ORM v0.2.0.6
![python-3-9]
[![lgtm-quality-img]][lgtm-quality-src]
@@ -28,7 +28,7 @@ pip install sqllex
| Version | Status | Tests, and actions |
| :--------: | :----------------------------: | :---: |
-| `==0.2.0.5` | ✔️ supported
✔️ stable | [![code-ql-img]][code-ql-src]
[![sqlite3x-test-img]][sqlite3x-test-src]
[![pypi-upload-img]][pypi-upload-img] |
+| `==0.2.0.6` | ✔️ supported
✔️ stable | [![code-ql-img]][code-ql-src]
[![sqlite3x-test-img]][sqlite3x-test-src]
[![pypi-upload-img]][pypi-upload-img] |
| `<=0.2.0.4` | ⚠️ outdated
⚠️ Security issue
CVE-2022-0329| ⚠️ Mostly passing |
| `<=0.1.10.4` | ❌️ outdated | ❌ |
diff --git a/docs/README.md b/docs/README.md
index 7e35407..21e6cc3 100644
--- a/docs/README.md
+++ b/docs/README.md
@@ -1,4 +1,4 @@
-
+
Welcome to the Sqllex Documentation! 👋
Here you can find some explanations and examples for Sqllex ORM
@@ -166,5 +166,31 @@ print(
WHERE=(column_age > 40) & (column_name |LIKE| 'kin%')
)
) # [(3, 'kingabzpro', 44)]
+```
+
+### Also, you can do crazy things like this
+
+```python
+self.db['employee'].select(
+ SELECT=[
+ db['employee']['id'],
+ db['employee']['firstName'],
+ db['position']['name']
+ ],
+ JOIN=(
+ (
+ LEFT_JOIN, db['position'],
+ ON, db['position']['id'] == db['employee']['positionID']
+ ),
+ (
+ INNER_JOIN, self.db['payments'],
+ ON, db['employee']['id'] == db['payments']['employeeID']
+ )
+ ),
+ ORDER_BY=(
+ db['payments']['amount'],
+ 'DESC'
+ )
+)
+```
-```
\ No newline at end of file
diff --git a/docs/all-parameters.md b/docs/all-parameters.md
index 4d1d113..e89e67e 100644
--- a/docs/all-parameters.md
+++ b/docs/all-parameters.md
@@ -1,9 +1,11 @@
-## How to use this document
+# All Parameters
+
+### How to use this document
Here you can find all possible parameter for sqllex databases methods.
```markdown
-## TABLE <-- parameter
+### TABLE <-- parameter
TABLE: Union[str, List[str], SQLite3xTable] <-- expected data types
@@ -15,7 +17,7 @@ TABLE = "my_table", # string
<-- the end -->
```
-### Usage
+#### Usage
```python
from sqllex.classes import AbstractDatabase
from sqllex.constants import TEXT, INTEGER
@@ -35,7 +37,7 @@ db.select(
---
-# All Parameters
+# Parameters
## TABLE
```python
@@ -293,35 +295,44 @@ from sqllex.constants import AS, ON, CROSS_JOIN, INNER_JOIN
db: AbstractDatabase = ...
users: AbstractTable = db['users']
+# Old and simple way
db.select(
TABLE='users',
SELECT=['username', 'group_name', 'description'],
- JOIN=[
- ['groups', AS, 'gr', ON, 'users.group_id == gr.group_id'], # INNER JOIN by default
- [CROSS_JOIN, 'about', 'ab', ON, 'ab.group_id == gr.group_id']
- ],
+ JOIN=(
+ ('groups', AS, 'gr', ON, 'users.group_id == gr.group_id'),
+ (INNER_JOIN, 'about', 'ab', ON, 'ab.group_id == gr.group_id')
+ ),
WHERE= (users['username'] != 'user_1') & (users['username'] != 'user_2')
)
+# Old and simple way
+JOIN=(
+ (INNER_JOIN, 'groups', AS, 'gr', ON, 'users.group_id == gr.group_id'),
+ (CROSS_JOIN, 'about', 'ab', ON, 'ab.group_id == gr.group_id')
+ ),
+
+# Modern and most stable way
+JOIN=(
+ (INNER_JOIN, db['groups'], ON, db['users']['group_id'] == db['groups']['group_id']),
+ (CROSS_JOIN, db['about'], ON, db['about']['group_id'] == db['users']['group_id'])
+ ),
+
+JOIN=(
+ ('groups', 'gr', ON, 'users.group_id == gr.group_id'), # INNER JOIN by default
+ ('about', 'ab', ON, 'ab.group_id == gr.group_id') # INNER JOIN by default
+ ),
-JOIN=[
- [INNER_JOIN, 'groups', AS, 'gr', ON, 'users.group_id == gr.group_id'],
- [CROSS_JOIN, 'about', 'ab', ON, 'ab.group_id == gr.group_id']
- ],
+JOIN=(
+ ('groups', ON, 'users.group_id == groups.group_id'), # INNER JOIN by default
+ ('about', ON, 'about.group_id == groups.group_id') # INNER JOIN by default
+ ),
-JOIN=[
- ['groups', 'gr', ON, 'users.group_id == gr.group_id'], # INNER JOIN by default
- ['about', 'ab', ON, 'ab.group_id == gr.group_id'] # INNER JOIN by default
- ],
+JOIN=(
+ ('groups', ON, 'users.group_id == groups.group_id'), # INNER JOIN by default
+ ),
-JOIN=[
- ['groups', ON, 'users.group_id == groups.group_id'], # INNER JOIN by default
- ['about', ON, 'about.group_id == groups.group_id'] # INNER JOIN by default
- ],
-JOIN=[
- ['groups', ON, 'users.group_id == groups.group_id'], # INNER JOIN by default
- ],
```
---
diff --git a/docs/database-select.md b/docs/database-select.md
index 4c48726..131aab3 100644
--- a/docs/database-select.md
+++ b/docs/database-select.md
@@ -144,32 +144,170 @@ db.select(
LIMIT=50,
OFFSET=20
)
+```
+
+
+# JOIN EXAMPLES
+
+## 1. Simples but not the best way
+
+```python
+# Old and simple way
+some_table.select(
+ SELECT=[ # SELECT username, group_name, description
+ 'username',
+ 'group_name',
+ 'description'
+ ],
+ JOIN=( # JOIN
+ ('groups', AS, 'gr', ON, 'users.group_id == gr.group_id'), # INNER JOIN groups AS gr ON us.group_id == gr.group_id
+ (LEFT_JOIN, 'about', 'ab', ON, 'ab.group_id == gr.group_id') # LEFT JOIN about ab ON ab.group_id == gr.group_id
+ ),
+ WHERE= (users['username'] != 'user_1') & (users['username'] != 'user_2'), # WHERE (users.username<>'user_1') AND (users.username<>'user_2')
+ ORDER_BY='age DESC', # ORDER BY age DESC
+ LIMIT=50, # LIMIT 50
+ OFFSET=20 # OFFSET 20
+)
+```
+### SQL script
+
+```shell
+SELECT username, group_name, description
+FROM x_table
+INNER JOIN groups AS gr ON us.group_id == gr.group_id
+LEFT JOIN about ab ON ab.group_id == gr.group_id
+WHERE (users.username<>'user_1') AND (users.username<>'user_2')
+ORDER BY age DESC
+LIMIT 50
+OFFSET 20
+```
-# For some another table
-x_table.select(
- SELECT=['username', 'group_name', 'description'], # SELECT username, group_name, description
- JOIN=[ # JOIN
- ['groups', AS, 'gr', ON, 'users.group_id == gr.group_id'], # INNER JOIN groups AS gr ON us.group_id == gr.group_id
- [CROSS_JOIN, 'about', 'ab', ON, 'ab.group_id == gr.group_id'] # CROSS JOIN about ab ON ab.group_id == gr.group_id
+## 2. Better way
+
+```python
+# DATABASE SCHEMA
+# {
+# 'position': {
+# 'id': [INTEGER, PRIMARY_KEY, AUTOINCREMENT],
+# 'name': TEXT,
+# 'description': [TEXT, DEFAULT, NULL],
+# },
+# 'employee': {
+# 'id': [INTEGER, PRIMARY_KEY, AUTOINCREMENT],
+# 'firstName': TEXT,
+# 'surname': TEXT,
+# 'age': [INTEGER, NOT_NULL],
+# 'positionID': INTEGER,
+#
+# FOREIGN_KEY: {
+# 'positionID': ['position', 'id']
+# }
+# },
+# 'payments': {
+# 'date': [TEXT],
+# 'employeeID': INTEGER,
+# 'amount': [INTEGER, NOT_NULL],
+#
+# FOREIGN_KEY: {
+# 'positionID': ['employee', 'id']
+# },
+# }
+# }
+
+db['employee'].select(
+ SELECT=[
+ db['employee']['id'],
+ db['employee']['firstName'],
+ db['position']['name']
],
- WHERE= (x_table['username'] != 'user_1') & (x_table['username'] != 'user_2'), # WHERE (users.username<>'user_1') AND (users.username<>'user_2')
- ORDER_BY='age DESC', # order by age DESC
- LIMIT=50, # limit = 50
- OFFSET=20 # offset = 20
+ JOIN=(
+ INNER_JOIN, self.db['position'],
+ ON, db['position']['id'] == db['employee']['positionID']
+ ),
+ ORDER_BY=(
+ db['position']['id'],
+ 'DESC'
+ )
)
+```
-# Same as SQL script like
-# SELECT username, group_name, description
-# FROM x_table
-# INNER JOIN groups AS gr ON us.group_id == gr.group_id
-# CROSS JOIN about ab ON ab.group_id == gr.group_id
-# WHERE (users.username<>'user_1') AND (users.username<>'user_2')
-# ORDER BY age DESC
-# LIMIT 50
-# OFFSET 20
+### SQL script
+
+```shell
+SELECT e.id, e.firstName, p.name
+FROM employee e
+INNER JOIN position p
+ON e.positionID == p.id
+ORDER BY e.positionID DESC
```
+## 3. More than one JOIN
+
+```python
+# DATABASE SCHEMA
+# {
+# 'position': {
+# 'id': [INTEGER, PRIMARY_KEY, AUTOINCREMENT],
+# 'name': TEXT,
+# 'description': [TEXT, DEFAULT, NULL],
+# },
+# 'employee': {
+# 'id': [INTEGER, PRIMARY_KEY, AUTOINCREMENT],
+# 'firstName': TEXT,
+# 'surname': TEXT,
+# 'age': [INTEGER, NOT_NULL],
+# 'positionID': INTEGER,
+#
+# FOREIGN_KEY: {
+# 'positionID': ['position', 'id']
+# }
+# },
+# 'payments': {
+# 'date': [TEXT],
+# 'employeeID': INTEGER,
+# 'amount': [INTEGER, NOT_NULL],
+#
+# FOREIGN_KEY: {
+# 'positionID': ['employee', 'id']
+# },
+# }
+# }
+
+self.db['employee'].select(
+ SELECT=[
+ db['employee']['id'],
+ db['employee']['firstName'],
+ db['position']['name']
+ ],
+ JOIN=(
+ (
+ LEFT_JOIN, db['position'],
+ ON, db['position']['id'] == db['employee']['positionID']
+ ),
+ (
+ INNER_JOIN, self.db['payments'],
+ ON, db['employee']['id'] == db['payments']['employeeID']
+ )
+ ),
+ ORDER_BY=(
+ db['payments']['amount'],
+ 'DESC'
+ )
+)
+```
+
+### SQL script
+
+```shell
+SELECT e.id, e.firstName, p.name
+FROM employee e
+LEFT JOIN position p
+ON e.positionID == p.id
+INNER JOIN payments
+ON e.id == payments.employeeID
+ORDER BY payments.amount DESC
+```
### [Back to home](README.md)
\ No newline at end of file
diff --git a/docs/examples/sqlite3x-aex-1.md b/docs/examples/sqlite3x-aex-1.md
index 1a1b46f..d69724f 100644
--- a/docs/examples/sqlite3x-aex-1.md
+++ b/docs/examples/sqlite3x-aex-1.md
@@ -179,31 +179,39 @@ print(users_group_1) # [('User_0',), ('User_3',), ('User_6',), ('User_9',)]
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
#
+# Old and simple way
users.select(
- SELECT=['username', 'group_name', 'description'], # SELECT username, group_name, description
- JOIN=[ # JOIN
- ['groups', AS, 'gr', ON, 'users.group_id == gr.group_id'], # INNER JOIN groups AS gr ON us.group_id == gr.group_id
- [CROSS_JOIN, 'about', 'ab', ON, 'ab.group_id == gr.group_id'] # CROSS JOIN about ab ON ab.group_id == gr.group_id
- ],
+ SELECT=[ # SELECT username, group_name, description
+ 'username',
+ 'group_name',
+ 'description'
+ ],
+ JOIN=( # JOIN
+ ('groups', AS, 'gr', ON, 'users.group_id == gr.group_id'), # INNER JOIN groups AS gr ON us.group_id == gr.group_id
+ (CROSS_JOIN, 'about', 'ab', ON, 'ab.group_id == gr.group_id') # CROSS JOIN about ab ON ab.group_id == gr.group_id
+ ),
WHERE= (users['username'] != 'user_1') & (users['username'] != 'user_2'), # WHERE (users.username<>'user_1') AND (users.username<>'user_2')
- ORDER_BY='age DESC', # order by age ASC
- LIMIT=50, # limit = 50
- OFFSET=20 # offset = 20
+ ORDER_BY='age DESC', # ORDER BY age DESC
+ LIMIT=50, # LIMIT 50
+ OFFSET=20 # OFFSET 20
)
-# Same as SQL script like
-# SELECT username, group_name, description
-# FROM users
-# INNER JOIN groups AS gr ON us.group_id == gr.group_id
-# CROSS JOIN about ab ON ab.group_id == gr.group_id
-# WHERE (users.username<>'user_1') AND (users.username<>'user_2')
-# ORDER BY age DESC
-# LIMIT 50
-# OFFSET 20
-
db.disconnect() # unlock your database and save all changes
```
+### SQL script
+
+```shell
+SELECT username, group_name, description
+FROM x_table
+INNER JOIN groups AS gr ON us.group_id == gr.group_id
+CROSS JOIN about ab ON ab.group_id == gr.group_id
+WHERE (users.username<>'user_1') AND (users.username<>'user_2')
+ORDER BY age DESC
+LIMIT 50
+OFFSET 20
+```
+
Code without comments
@@ -277,11 +285,15 @@ users_group_1 = users.select(
print(users_group_1)
users.select(
- SELECT=['username', 'group_name', 'description'],
- JOIN=[
- ['groups', AS, 'gr', ON, 'users.group_id == gr.group_id'],
- [CROSS_JOIN, 'about', 'ab', ON, 'ab.group_id == gr.group_id']
- ],
+ SELECT=[
+ 'username',
+ 'group_name',
+ 'description'
+ ],
+ JOIN=(
+ ('groups', AS, 'gr', ON, 'users.group_id == gr.group_id'),
+ (LEFT_JOIN, 'about', 'ab', ON, 'ab.group_id == gr.group_id')
+ ),
WHERE= (users['username'] != 'user_1') & (users['username'] != 'user_2'),
ORDER_BY='age DESC',
LIMIT=50,
@@ -292,6 +304,131 @@ db.disconnect()
```
+## 🔥 Recommended way to use JOIN in sqllex version <0.2.0.5
+
+```python
+# DATABASE SCHEMA
+# {
+# 'position': {
+# 'id': [INTEGER, PRIMARY_KEY, AUTOINCREMENT],
+# 'name': TEXT,
+# 'description': [TEXT, DEFAULT, NULL],
+# },
+# 'employee': {
+# 'id': [INTEGER, PRIMARY_KEY, AUTOINCREMENT],
+# 'firstName': TEXT,
+# 'surname': TEXT,
+# 'age': [INTEGER, NOT_NULL],
+# 'positionID': INTEGER,
+#
+# FOREIGN_KEY: {
+# 'positionID': ['position', 'id']
+# }
+# },
+# 'payments': {
+# 'date': [TEXT],
+# 'employeeID': INTEGER,
+# 'amount': [INTEGER, NOT_NULL],
+#
+# FOREIGN_KEY: {
+# 'positionID': ['employee', 'id']
+# },
+# }
+# }
+
+db['employee'].select(
+ SELECT=[
+ db['employee']['id'],
+ db['employee']['firstName'],
+ db['position']['name']
+ ],
+ JOIN=(
+ INNER_JOIN, self.db['position'],
+ ON, db['position']['id'] == db['employee']['positionID']
+ ),
+ ORDER_BY=(
+ db['position']['id'],
+ 'DESC'
+ )
+)
+```
+
+### SQL script
+
+```shell
+SELECT e.id, e.firstName, p.name
+FROM employee e
+INNER JOIN position p
+ON e.positionID == p.id
+ORDER BY e.positionID DESC
+```
+
+## More than one JOIN example
+
+```python
+# DATABASE SCHEMA
+# {
+# 'position': {
+# 'id': [INTEGER, PRIMARY_KEY, AUTOINCREMENT],
+# 'name': TEXT,
+# 'description': [TEXT, DEFAULT, NULL],
+# },
+# 'employee': {
+# 'id': [INTEGER, PRIMARY_KEY, AUTOINCREMENT],
+# 'firstName': TEXT,
+# 'surname': TEXT,
+# 'age': [INTEGER, NOT_NULL],
+# 'positionID': INTEGER,
+#
+# FOREIGN_KEY: {
+# 'positionID': ['position', 'id']
+# }
+# },
+# 'payments': {
+# 'date': [TEXT],
+# 'employeeID': INTEGER,
+# 'amount': [INTEGER, NOT_NULL],
+#
+# FOREIGN_KEY: {
+# 'positionID': ['employee', 'id']
+# },
+# }
+# }
+
+self.db['employee'].select(
+ SELECT=[
+ db['employee']['id'],
+ db['employee']['firstName'],
+ db['position']['name']
+ ],
+ JOIN=(
+ (
+ LEFT_JOIN, db['position'],
+ ON, db['position']['id'] == db['employee']['positionID']
+ ),
+ (
+ INNER_JOIN, self.db['payments'],
+ ON, db['employee']['id'] == db['payments']['employeeID']
+ )
+ ),
+ ORDER_BY=(
+ db['payments']['amount'],
+ 'DESC'
+ )
+)
+```
+
+### SQL script
+
+```shell
+SELECT e.id, e.firstName, p.name
+FROM employee e
+LEFT JOIN position p
+ON e.positionID == p.id
+INNER JOIN payments
+ON e.id == payments.employeeID
+ORDER BY payments.amount DESC
+```
### Congratulation, now you know basic SQLite3x methods! Explore more features and method on the links down below.
diff --git a/sqllex/__init__.py b/sqllex/__init__.py
index a4a700a..2646b9d 100644
--- a/sqllex/__init__.py
+++ b/sqllex/__init__.py
@@ -17,7 +17,7 @@
# "\033[0m"
# "\n")
-__version__ = '0.2.0.5'
+__version__ = '0.2.0.6'
__all__ = [
# classes
diff --git a/sqllex/core/entities/abc/script_gens.py b/sqllex/core/entities/abc/script_gens.py
index 131b0ae..56ffad7 100644
--- a/sqllex/core/entities/abc/script_gens.py
+++ b/sqllex/core/entities/abc/script_gens.py
@@ -64,7 +64,11 @@ def create(temp: str, if_not_exist: bool, name: str, content: str, without_rowid
f"TABLE " \
f"{'IF NOT EXISTS' if if_not_exist else ''} " \
f'"{name}" ' \
- f" (\n{content}\n) " \
+ f" {'(' if content else ''}" \
+ f"\n" \
+ f"{content}" \
+ f"\n" \
+ f"{')' if content else ''} " \
f"{'WITHOUT ROWID' if without_rowid else ''};"
diff --git a/sqllex/core/entities/abc/sql_column.py b/sqllex/core/entities/abc/sql_column.py
index 74449ce..d371887 100644
--- a/sqllex/core/entities/abc/sql_column.py
+++ b/sqllex/core/entities/abc/sql_column.py
@@ -52,7 +52,7 @@ def _str_gen(self, value, operator: str) -> SearchCondition:
)
elif isinstance(value, AbstractColumn):
return SearchCondition(
- f"({self}{operator}{value}",
+ f"({self}{operator}{value})",
placeholder=self.placeholder
)
else:
diff --git a/sqllex/core/entities/abc/sql_database.py b/sqllex/core/entities/abc/sql_database.py
index 8cbf040..fbb5c62 100644
--- a/sqllex/core/entities/abc/sql_database.py
+++ b/sqllex/core/entities/abc/sql_database.py
@@ -510,6 +510,7 @@ def __init__(self, placeholder):
"""
Init-ing database, connection, creating connection, setting parameters
"""
+
self.__connection = None
self.__placeholder = placeholder
@@ -683,6 +684,9 @@ def content_gen(parameters, column=None) -> str:
raise TypeError(f'Incorrect column "{column}" initialisation, parameters type {type(parameters)}, '
f'expected tuple, list or str')
+ if not columns:
+ raise ValueError("Zero-column tables aren't supported in SQLite")
+
content = ""
values = ()
@@ -976,7 +980,7 @@ def execute(
def executemany(
self,
script: AnyStr = None,
- values: Tuple[Tuple] = None,
+ values: Tuple = None,
) -> Union[Tuple, List, None]:
"""
Execute any SQL-script for many values sets
diff --git a/sqllex/core/entities/sqlite3x/sqlite3x.py b/sqllex/core/entities/sqlite3x/sqlite3x.py
index ae31bf0..40343a7 100644
--- a/sqllex/core/entities/sqlite3x/sqlite3x.py
+++ b/sqllex/core/entities/sqlite3x/sqlite3x.py
@@ -111,9 +111,13 @@ def __init__(
super(SQLite3x, self).__init__(placeholder='?')
- self.__path = path
self.__connection = None # init connection
+ if not path:
+ raise ValueError("Path can't be empty or undefined")
+ else:
+ self.__path = path
+
if init_connection:
self.connect() # creating connection with db
diff --git a/sqllex/core/tools/parsers/parsers.py b/sqllex/core/tools/parsers/parsers.py
index 2d5df3e..7602496 100644
--- a/sqllex/core/tools/parsers/parsers.py
+++ b/sqllex/core/tools/parsers/parsers.py
@@ -158,7 +158,7 @@ def where_wrapper(*args, **kwargs):
else:
operator = "="
- __script += f"({f' {operator} {placeholder} OR '.join(key for _ in values)} {operator} {placeholder} OR " # spaaces need for [LIKE, regexp]
+ __script += f"({f' {operator} {placeholder} OR '.join(str(key) for _ in values)} {operator} {placeholder} OR " # spaaces need for [LIKE, regexp]
__script = f"{__script[:-3].strip()}) " + "AND "
if __values:
@@ -192,7 +192,7 @@ def where_wrapper(*args, **kwargs):
raise TypeError
__script = (
- f"{__script.strip()}) " # .strip() removing spaces around
+ f"{__script}) " # .strip() removing spaces around
)
return __script, __values
@@ -238,9 +238,8 @@ def add_join_to_script(joins: tuple, base_script: str) -> str:
join_method = INNER_JOIN
# Adding JOIN to script
- base_script += (
- f"{join_method} {' '.join(j_arg for j_arg in _join)} "
- )
+ base_script += f" {join_method} {' '.join(str(j_arg) for j_arg in _join)} "
+
return base_script
if "JOIN" in kwargs:
@@ -334,9 +333,18 @@ def order_by_wrapper(*args, **kwargs):
if isinstance(order_by, (str, int)):
__script = f"{__script} ORDER BY {order_by} "
elif isinstance(order_by, (list, tuple)):
- __script = (
- f"{__script} ORDER BY {', '.join(str(item_ob) for item_ob in order_by)} "
- )
+ __script = f"{__script} ORDER BY"
+
+ for i, ord_ob in enumerate(order_by):
+
+ __script = f"{__script}" \
+ f"{', ' if i!=0 and ord_ob.lower() not in ['asc', 'desc'] else ''}" \
+ f" " \
+ f"{ord_ob}"
+
+ # __script = (
+ # f"{__script} ORDER BY {', '.join(str(item_ob) for item_ob in order_by)} "
+ # )
else:
raise TypeError(f"Unexpected type of ORDER_BY parameter, "
f"expected str or tuple, got {type(order_by)} instead")
diff --git a/sqllex/types/types.py b/sqllex/types/types.py
index 19e1115..c9bdd96 100644
--- a/sqllex/types/types.py
+++ b/sqllex/types/types.py
@@ -93,7 +93,7 @@
# Type for parameter of WHERE argument
WhereType = Union[
AnyStr,
- Mapping[AnyStr, Union[NumStr, List]],
+ Mapping,
bool, # temporary fix
]
@@ -127,23 +127,8 @@
JoinArgType = Union[
- Tuple[
- Tuple[
- Union[
- JoinMethod,
- NumStr,
- ConstantType,
- ]
- ]
- ],
- Tuple[
- Union[
- JoinMethod,
- NumStr,
- ConstantType,
- AnyStr
- ]
- ],
+ Tuple[Tuple],
+ Tuple,
AnyStr
]
diff --git a/tests/sqlite3x/new_test_all.py b/tests/sqlite3x/new_test_all.py
deleted file mode 100644
index 3715c17..0000000
--- a/tests/sqlite3x/new_test_all.py
+++ /dev/null
@@ -1,464 +0,0 @@
-from sqllex import *
-from sqllex.debug import debug_mode
-from sqllex.types import *
-from os import remove
-from time import sleep, time
-
-DB_NAME = "temp_table.db"
-
-DB_TEMPLATE: DBTemplateType = {
- "t1": {
- "text_t": TEXT,
- "num_t": NUMERIC,
- "int_t": INTEGER,
- "real_t": REAL,
- "none_t": NONE,
- "blob_t": BLOB,
- }
-}
-
-db = SQLite3x(path=DB_NAME)
-
-debug_mode(True, log_file='sqllex-test.log')
-
-
-def remove_db():
- for tab in ['t1', 't2', 't3', 't4', 't5', 't6', 't7', 'users', 'groups']:
- db.drop(tab)
-
- logger.stop()
- print("Logger stopped")
-
- remove(f"{DB_NAME}")
- print(f"{db} removed")
-
- remove("sqllex-test.log")
- print("Log removed")
-
-
-def tables_test():
- db = SQLite3x(path=DB_NAME, template=DB_TEMPLATE)
-
- db.markup(
- {
- "groups": {
- "group_id": [PRIMARY_KEY, UNIQUE, INTEGER],
- "group_name": [TEXT, NOT_NULL, DEFAULT, "GroupName"],
- },
-
- "users": {
- "user_id": [INTEGER, PRIMARY_KEY, UNIQUE],
- "user_name": TEXT,
- "group_id": INTEGER,
-
- FOREIGN_KEY: {
- "group_id": ["groups", "group_id"]
- },
- }
- }
- )
-
- db.create_table(
- "remove_me",
- {
- "xxx": [AUTOINCREMENT, INTEGER, PRIMARY_KEY],
- "yyy": [INTEGER],
- },
- IF_NOT_EXIST=True
- )
-
- for x in db.tables_names:
- if not (x in ('t1', 'groups', 'users', 'remove_me', 'sqlite_sequence')):
- print(db.tables_names)
- print('t1', 'groups', 'users', 'remove_me', 'sqlite_sequence')
- raise MemoryError
-
- db.drop('remove_me')
-
- for x in db.tables_names:
- if not (x in ('t1', 'groups', 'users', 'sqlite_sequence')):
- print(db.tables_names)
- raise MemoryError
-
-
-def insert_test():
- # just arg values
- db.insert("t1", 'asdf', 10.0, 1, 3.14, None, 2)
- db["t1"].insert('asdf', 10.0, 1, 3.14, None, 2)
-
- # arg list
- db.insert("t1", ['asdf', 10.0, 1, 3.14, None, 2])
- db["t1"].insert(['asdf', 10.0, 1, 3.14, None, 2])
-
- # arg tuple
- db.insert("t1", ('asdf', 10.0, 1, 3.14, None, 2))
- db["t1"].insert(('asdf', 10.0, 1, 3.14, None, 2))
-
- # arg tuple
- db.insert("t1", {"text_t": 'asdf', "num_t": 10.0, "int_t": 1, "real_t": 3.14, "none_t": None, "blob_t": 2})
- db["t1"].insert({"text_t": 'asdf', "num_t": 10.0, "int_t": 1, "real_t": 3.14, "none_t": None, "blob_t": 2})
-
- # kwargs
- db.insert("t1", text_t='asdf', num_t=10.0, int_t=1, real_t=3.14, none_t=None, blob_t=2)
- db["t1"].insert(text_t='asdf', num_t=10.0, int_t=1, real_t=3.14, none_t=None, blob_t=2)
-
- if db.select_all('t1') == \
- db.select('t1') == \
- db.select('t1', ALL) == \
- db.select('t1', '*') == \
- db['t1'].select_all() == \
- db['t1'].select() == \
- db['t1'].select(ALL):
- sel_all = db.select_all('t1')
- else:
- raise MemoryError
-
- if not sel_all == [('asdf', 10.0, 1, 3.14, None, 2)] * 10:
- print(sel_all)
- print([('asdf', 10, 1, 3.14, None, 2)] * 10)
- raise MemoryError
-
-
-def select_test():
- if not db.select('t1', 'text_t') == [('asdf',)] * 10: # <------------- HERE [] to () have to be changes!!!!!!!
- print(db.select('t1', 'text_t'))
- raise MemoryError
-
- if not db.select('t1', ['text_t', 'num_t']) == [('asdf', 10.0)] * 10:
- print(db.select('t1', ['text_t', 'num_t']))
- raise MemoryError
-
- db.insert('t1', ['qwerty1', 11.1, 2, 4.14, None, 5])
- db.insert('t1', ['qwerty2', 11.1, 2, 4.14, None, 6])
-
- # WHERE as dict
- if not db.select('t1', ['text_t', 'num_t'], WHERE={'num_t': 11.1}) == [('qwerty1', 11.1), ('qwerty2', 11.1)]:
- print(db.select('t1', ['text_t', 'num_t'], WHERE={'num_t': 11.1}))
- raise MemoryError
-
- # WHERE as dict
- if not db.select('t1', ['text_t', 'num_t'], WHERE={'num_t': ['=', 11.1], 'blob_t': ['<=', 5]}) == [
- ('qwerty1', 11.1)]:
- print(db.select('t1', ['text_t', 'num_t'], WHERE={'num_t': 11.1, 'blob_t': 5}))
- raise MemoryError
-
- # WHERE as kwarg
- if not db.select('t1', ['text_t', 'num_t'], num_t=11.1) == [('qwerty1', 11.1), ('qwerty2', 11.1)]:
- print(db.select('t1', ['text_t', 'num_t'], num_t=11.1))
- raise MemoryError
-
- # WHERE as kwargs
- if not db.select('t1', ['text_t', 'num_t'], num_t=11.1, blob_t=6) == [('qwerty2', 11.1)]:
- print(db.select('t1', ['text_t', 'num_t'], num_t=11.1, blob_t=6))
- raise MemoryError
-
- # LIMIT test
- if not db.select('t1', text_t='asdf', LIMIT=5) == [('asdf', 10.0, 1, 3.14, None, 2)] * 5:
- print(db.select('t1', text_t='asdf', LIMIT=5))
- raise MemoryError
-
- # OFFSET
- if not db.select('t1', text_t='asdf', LIMIT=5, OFFSET=6) == [('asdf', 10.0, 1, 3.14, None, 2)] * 4:
- print(db.select('t1', text_t='asdf', LIMIT=5, OFFSET=6))
- raise MemoryError
-
- if not db.select('t1', ['text_t', 'num_t'], WHERE={'num_t': ['>=', 11.1, 10], 'text_t': 'qwerty1'}) == \
- [('qwerty1', 11.1)]:
- print(db.select('t1', ['text_t', 'num_t'], WHERE={'num_t': ['>=', 11.1, 10], 'text_t': 'qwerty1'}))
- raise MemoryError
-
- db.create_table(
- "t2",
- {
- "id": [AUTOINCREMENT, INTEGER, PRIMARY_KEY],
- "value": [INTEGER, DEFAULT, 8],
- },
- )
-
- db.insertmany('t2', [[1], [2], [3], [4]])
-
- # ORDER_BY ASC
- if not db.select('t2', 'id', ORDER_BY='id ASC') == [(1,), (2,), (3,), (4,)]:
- print(db.select('t2', 'id', ORDER_BY='id ASC'))
- raise MemoryError
-
- # ORDER_BY DESC
- if not db.select('t2', ['id'], ORDER_BY='id DESC') == [(4,), (3,), (2,), (1,)]:
- print(db.select('t2', 'id', ORDER_BY='id DESC'))
- raise MemoryError
-
- if not db.select(
- 't2',
- WHERE={
- 'id': ['<', 3]
- },
- ) == [(1, 8), (2, 8)]:
- print(db.select('t2', WHERE={'id': ['<', 3]}))
- raise MemoryError
-
- # JOIN
- if not db.select(
- 't2',
- 'id',
- JOIN = (CROSS_JOIN, 't1', AS, 't', ON, 't.num_t > t2.value')
- ) == [
- (1,), (1,), (1,), (1,), (1,), (1,), (1,), (1,), (1,), (1,), (1,), (1,),
- (2,), (2,), (2,), (2,), (2,), (2,), (2,), (2,), (2,), (2,), (2,), (2,),
- (3,), (3,), (3,), (3,), (3,), (3,), (3,), (3,), (3,), (3,), (3,), (3,),
- (4,), (4,), (4,), (4,), (4,), (4,), (4,), (4,), (4,), (4,), (4,), (4,)
-
- ]:
- print(db.select('t2', 'id', JOIN=[[CROSS_JOIN, 't1', AS, 't', ON, 't.num_t > t2.value']]))
- raise MemoryError
-
-
-def insertmany_test():
- db.create_table(
- 't3',
- {
- 'id': [INTEGER, NOT_NULL],
- 'val': [TEXT, DEFAULT, 'NO']
- }
- )
-
- db.insertmany('t3', [[1, 'hi']] * 10)
- db.insertmany('t3', [[1]] * 10)
- db.insertmany('t3', ((1, 'hi'),) * 10)
- db.insertmany('t3', ((1,),) * 10)
- db.insertmany('t3', id=[2] * 10)
- db.insertmany('t3', id=(2,) * 10)
-
- if not db.select_all('t3') == [
- (1, 'hi',), (1, 'hi',), (1, 'hi',), (1, 'hi',), (1, 'hi',), (1, 'hi',), (1, 'hi',),
- (1, 'hi',), (1, 'hi',), (1, 'hi',), (1, 'NO',), (1, 'NO',), (1, 'NO',), (1, 'NO',),
- (1, 'NO',), (1, 'NO',), (1, 'NO',), (1, 'NO',), (1, 'NO',), (1, 'NO',), (1, 'hi',),
- (1, 'hi',), (1, 'hi',), (1, 'hi',), (1, 'hi',), (1, 'hi',), (1, 'hi',), (1, 'hi',),
- (1, 'hi',), (1, 'hi',), (1, 'NO',), (1, 'NO',), (1, 'NO',), (1, 'NO',), (1, 'NO',),
- (1, 'NO',), (1, 'NO',), (1, 'NO',), (1, 'NO',), (1, 'NO',), (2, 'NO',), (2, 'NO',),
- (2, 'NO',), (2, 'NO',), (2, 'NO',), (2, 'NO',), (2, 'NO',), (2, 'NO',), (2, 'NO',),
- (2, 'NO',), (2, 'NO',), (2, 'NO',), (2, 'NO',), (2, 'NO',), (2, 'NO',), (2, 'NO',),
- (2, 'NO',), (2, 'NO',), (2, 'NO',), (2, 'NO',)
- ]:
- print(db.select_all('t3'))
- raise MemoryError
-
- db.create_table(
- 't6',
- {
- 'id': [INTEGER, UNIQUE, NOT_NULL],
- 'val': [TEXT, DEFAULT, 'def_val']
- }
- )
-
- db.insertmany('t6', [[x, 'hi'] for x in range(100)])
-
- if not db.select_all('t6') == [(x, 'hi') for x in range(100)]:
- raise MemoryError
-
- db.insertmany('t6', [(x, 'bye') for x in range(100)], OR=REPLACE)
-
- if not db.select_all('t6') == [(x, 'bye') for x in range(100)]:
- raise MemoryError
-
- db.updatemany('t6', [[], [], []])
-
- if not db.select_all('t6') == [(x, 'bye') for x in range(100)]:
- raise MemoryError
-
-
-def update_test():
- db.create_table(
- 't4',
- {
- 'id': [INTEGER, NOT_NULL, UNIQUE],
- 'val': [TEXT, DEFAULT, 'NO']
- }
- )
-
- db.insertmany('t4', [[x, bin(x)] for x in range(100)])
-
- db.update(
- 't4',
- {'val': 'NEW_VAL'},
- WHERE={
- 'id': ['<', 50]
- }
- )
-
- if not db.select('t4', 'id', WHERE={"val": 'NEW_VAL'}) == [(x,) for x in range(50)]:
- print(db.select('t4', 'id', WHERE={"val": 'NEW_VAL'}))
- raise MemoryError
-
-
-def delete_test():
- db.delete('t4', id=['<', 50])
-
- if not db.select('t4', 'id', WHERE={"val": 'NEW_VAL'}) == []:
- print(db.select_all('t4'))
- raise MemoryError
-
-
-def replace_test():
- db.create_table(
- 't5',
- {
- 'id': [INTEGER, UNIQUE, NOT_NULL],
- 'val': [TEXT, DEFAULT, '_x_']
- }
- )
-
- db.insertmany('t5', [[x, ] for x in range(100)])
-
- db.replace('t5', [99, 'O_O'])
-
- if not db.select('t5', val='O_O') == [(99, 'O_O')]:
- print(db.select('t5', val='O_O'))
- raise MemoryError
-
-
-def get_tables_test():
- if " None:
+ cls.tests_counter = 0
+
+ @classmethod
+ def tearDownClass(cls) -> None:
+ pass
+
+ def setUp(self) -> None:
+ self.tests_counter += 1
+ self.db_name = f"test_sqllex_db_{self.tests_counter}"
+
+ self.db = SQLite3x(path=self.db_name)
+
+ def tearDown(self) -> None:
+ self.db.disconnect()
+ os.remove(self.db_name)
+
+ def test_sqlite_db_crating_db(self):
+ """
+ Testing SQLite database init
+ """
+
+ db_name = f"{self.db_name}_{1}"
+ SQLite3x(db_name)
+ self.assertTrue(os.path.isfile(db_name))
+ os.remove(db_name)
+
+ db_name = f"{self.db_name}_{2}"
+ SQLite3x(path=db_name)
+ self.assertTrue(os.path.isfile(db_name))
+ os.remove(db_name)
+
+ db_name = f"{self.db_name}_{3}"
+ SQLite3x(db_name, init_connection=False)
+ self.assertFalse(os.path.isfile(db_name))
+
+ db_name = f""
+ self.assertRaises(ValueError, SQLite3x, db_name)
+
+ db_name = f""
+ self.assertRaises(ValueError, SQLite3x, db_name)
+
+ db_name = f""
+ self.assertRaises(ValueError, SQLite3x, path=db_name)
+
+ def test_connection(self):
+ """
+ Testing connection with class object init
+ """
+
+ db_name = f"{self.db_name}_{1}"
+ self.assertIsInstance(SQLite3x(db_name).connection, sqlite3.Connection)
+ os.remove(db_name)
+
+ db_name = f"{self.db_name}_{1}"
+ self.assertIsInstance(SQLite3x(db_name, init_connection=True).connection, sqlite3.Connection)
+ os.remove(db_name)
+
+ db_name = f"{self.db_name}_{1}"
+ self.assertIs(SQLite3x(db_name, init_connection=False).connection, None)
+
+ def test_create_table_1(self):
+ """
+ Testing table creating
+ """
+
+ self.assertRaises(ValueError, self.db.create_table, 'test_table_1', {})
+ self.assertRaises(ValueError, self.db.create_table, 'test_table_2', '')
+
+ def test_create_table_basic(self):
+ """
+ Testing table creating
+ """
+ columns = {'id': INTEGER}
+
+ self.db.create_table(
+ 'test_table_1',
+ columns
+ )
+ self.assertEqual(self.db.tables_names, ('test_table_1',))
+
+ self.db.create_table(
+ 'test_table_2',
+ columns
+ )
+ self.assertEqual(self.db.tables_names, ('test_table_1', 'test_table_2'))
+
+ def test_create_table_columns(self):
+ """
+ Testing table creating
+ """
+
+ self.db.create_table(
+ name='test_table',
+ columns={
+ 'id': [INTEGER, AUTOINCREMENT, PRIMARY_KEY],
+ 'user': [TEXT, UNIQUE, NOT_NULL],
+ 'about': [TEXT, DEFAULT, NULL],
+ 'status': [TEXT, DEFAULT, 'offline']
+ }
+ )
+ self.assertEqual(self.db.tables_names, ('test_table', 'sqlite_sequence'))
+
+ def test_create_table_inx(self):
+ """
+ Testing if not exist kwarg
+ """
+ columns = {'id': INTEGER}
+
+ self.db.create_table('test_table_1', columns, IF_NOT_EXIST=True)
+ self.db.create_table('test_table_2', columns, IF_NOT_EXIST=True)
+ self.db.create_table('test_table_1', columns, IF_NOT_EXIST=True)
+
+ self.assertEqual(self.db.tables_names, ('test_table_1', 'test_table_2'))
+ self.assertRaises(sqlite3.OperationalError, self.db.create_table, 'test_table_1', columns, IF_NOT_EXIST=False)
+
+ def test_markup(self):
+ """
+ Markup table
+ """
+
+ self.db.markup(
+ {
+ "tt_groups": {
+ "group_id": [PRIMARY_KEY, UNIQUE, INTEGER],
+ "group_name": [TEXT, NOT_NULL, DEFAULT, "GroupName"],
+ },
+
+ "tt_users": {
+ "user_id": [INTEGER, PRIMARY_KEY, UNIQUE],
+ "user_name": TEXT,
+ "group_id": INTEGER,
+
+ FOREIGN_KEY: {
+ "group_id": ["tt_groups", "group_id"]
+ },
+ }
+ }
+ )
+
+ self.assertEqual(self.db.tables_names, ('tt_groups', 'tt_users'))
+
+ def test_drop_and_create_table(self):
+ """
+ Create and remove table
+ """
+ self.db.create_table('test_table', {'id': INTEGER})
+ self.assertEqual(self.db.tables_names, ('test_table',))
+
+ self.db.drop('test_table')
+ self.assertEqual(self.db.tables_names, tuple())
+
+ def test_insert(self):
+ """
+ All kind of possible inserts without extra arguments (OR, WITH)
+ """
+
+ def get_all_records():
+ return self.db.execute(f'SELECT * FROM "{table_name}"')
+
+ def count_all_records():
+ return len(get_all_records())
+
+ def count_records(step: int = None):
+ """
+ This decorator adding counting
+ with every run of decorated function it increases count_records.counter at value of step
+
+ @count_records(step=1)
+ def func(*args, **kwargs):
+ return None
+
+ func()
+ func()
+
+ count_records.counter == 2
+ """
+
+ def wrapper_(func: callable):
+ def wrapper(*args, **kwargs):
+ # one run == + step records
+ count_records.counter += func.step
+
+ func(*args, **kwargs)
+
+ # checking was it inserted or not
+ self.assertEqual(
+ count_all_records(),
+ count_records.counter,
+ msg=f"Incorrect records amount\n args={args}, kwargs={kwargs}"
+ )
+
+ func.step = step
+
+ return wrapper
+
+ if step is None:
+ step = 1
+
+ count_records.counter = 0
+
+ return wrapper_
+
+ @count_records(step=2) # warning - magic number!
+ def insert_process(*args, **kwargs):
+ self.db.insert(table_name, *args, **kwargs)
+ self.db[table_name].insert(*args, **kwargs)
+
+ @count_records(step=10) # warning - magic number!
+ def insert_many_process(*args, **kwargs):
+ self.db.insertmany(table_name, *args, **kwargs)
+ self.db[table_name].insertmany(*args, **kwargs)
+
+ table_name = 'test_table'
+ columns = ("text_c", "num_c", "int_c", "real_c", 'none_c', 'blob_c')
+ data = ('asdf', 10.0, 1, 3.14, None, 2)
+
+ self.db.execute(
+ """
+ CREATE TABLE IF NOT EXISTS "test_table" (
+ "text_c" TEXT,
+ "num_c" NUMERIC,
+ "int_c" INTEGER,
+ "real_c" REAL,
+ "none_c" NONE,
+ "blob_c" BLOB
+ );
+ """
+ )
+
+ # columns_types = (TEXT, NUMERIC, INTEGER, REAL, NONE, BLOB)
+ #
+ # self.db.markup(
+ # {
+ # table_name: dict(zip(columns, columns_types))
+ # }
+ # )
+
+ # just arg values
+ # 1, 2, 3
+ insert_process(*data)
+
+ # arg list
+ # [1, 2, 3]
+ insert_process(list(data))
+
+ # arg tuple
+ # (1, 2, 3)
+ insert_process(tuple(data))
+
+ # arg tuple
+ # {'col1': 1, 'col2': 2}
+ insert_process(dict(zip(columns, data)))
+
+ # kwargs
+ # col1=1, col2=2
+ insert_process(**dict(zip(columns, data)))
+
+ # not full tuple
+ insert_process(('asdf', 10.0))
+
+ # not full tuple
+ insert_process(['asdf', 10.0])
+
+ # insert_many args
+ # (1, 2), (3, 4) ...
+ insert_many_process(*((data,) * 5))
+
+ # insert_many one arg
+ # ((1, 2), (3, 4) ... )
+ insert_many_process((data,) * 5)
+
+ # Manually SQL script execution
+ all_data = get_all_records()
+
+ self.assertEqual(all_data, self.db.select(table_name))
+ self.assertEqual(all_data, self.db.select(table_name, ALL))
+ self.assertEqual(all_data, self.db.select(table_name, '*'))
+ self.assertEqual(all_data, self.db[table_name].select_all())
+ self.assertEqual(all_data, self.db[table_name].select())
+ self.assertEqual(all_data, self.db[table_name].select(ALL))
+
+ def test_insert_xa_and_update(self):
+ """
+ Insert with extra args and Update
+ """
+
+ def re_init_database():
+ self.db.executescript(
+ """
+ DROP TABLE IF EXISTS "salt";
+ DROP TABLE IF EXISTS "hashes";
+
+ CREATE TABLE IF NOT EXISTS "hashes" (
+ "id" INTEGER PRIMARY KEY AUTOINCREMENT,
+ "value" TEXT
+ );
+
+ CREATE TABLE "salt" (
+ "hashID" INTEGER PRIMARY KEY AUTOINCREMENT,
+ "value" TEXT,
+
+ FOREIGN KEY (hashID) REFERENCES hashes (id)
+ );
+
+ INSERT INTO "hashes" (id, value) VALUES (1, '6432642426695757642'), (2, '3259279587463616469'),
+ (3, '4169263184167314937'), (4, '-8758758971870855856'), (5, '-2558087477551224077');
+
+ INSERT INTO "salt" (hashID, value) VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5');
+ """
+ )
+
+ # sqlite3.IntegrityError: UNIQUE constraint failed: hashes.id
+ re_init_database()
+ self.assertRaises(
+ sqlite3.IntegrityError,
+ self.db.insert, 'hashes', (1, 'newHash'),
+ )
+
+ # INSERT OR FAIL
+ # sqlite3.IntegrityError: UNIQUE constraint failed: hashes.id
+ re_init_database()
+ self.assertRaises(
+ sqlite3.IntegrityError,
+ self.db.insert,
+ 'hashes', (1, 'newHash'), OR=FAIL
+ )
+
+ # INSERT OR ABORT
+ # sqlite3.IntegrityError: UNIQUE constraint failed: hashes.id
+ re_init_database()
+ self.assertRaises(
+ sqlite3.IntegrityError,
+ self.db.insert,
+ 'hashes', (1, 'newHash'), OR=ABORT
+ )
+
+ # INSERT OR ROLLBACK
+ # sqlite3.IntegrityError: UNIQUE constraint failed: hashes.id
+ re_init_database()
+ self.assertRaises(
+ sqlite3.IntegrityError,
+ self.db.insert,
+ 'hashes', (1, 'newHash'), OR=ROLLBACK
+ )
+
+ # INSERT OR REPLACE
+ re_init_database()
+ self.db.insert(
+ 'hashes',
+ (1, 'newHash'),
+ OR=REPLACE
+ )
+ self.assertEqual(
+ [
+ (1, 'newHash'),
+ (2, '3259279587463616469'),
+ (3, '4169263184167314937'),
+ (4, '-8758758971870855856'),
+ (5, '-2558087477551224077')
+ ],
+ self.db.execute("SELECT * FROM hashes"),
+ )
+
+ # INSERT OR IGNORE
+ re_init_database()
+ self.db.insert(
+ 'hashes',
+ (1, 'newHash'),
+ (2, 'anotherNewHash'),
+ OR=IGNORE
+ )
+ self.assertEqual(
+ [
+ (1, '6432642426695757642'),
+ (2, '3259279587463616469'),
+ (3, '4169263184167314937'),
+ (4, '-8758758971870855856'),
+ (5, '-2558087477551224077')
+ ],
+ self.db.execute("SELECT * FROM hashes"),
+ )
+
+ # INSERT many OR REPLACE
+ re_init_database()
+ self.db.insertmany(
+ 'hashes',
+ (
+ (1, 'newHash'),
+ (6, 'anotherNewHash'),
+ ),
+ OR=REPLACE
+ )
+ self.assertEqual(
+ [
+ (1, 'newHash'),
+ (2, '3259279587463616469'),
+ (3, '4169263184167314937'),
+ (4, '-8758758971870855856'),
+ (5, '-2558087477551224077'),
+ (6, 'anotherNewHash'),
+ ],
+ self.db.execute("SELECT * FROM hashes"),
+ )
+
+ # INSERT many OR IGNORE
+ re_init_database()
+ self.db.insertmany(
+ 'hashes',
+ (
+ (1, 'newHash'),
+ (6, 'anotherNewHash'),
+ ),
+ OR=IGNORE
+ )
+ self.assertEqual(
+ [
+ (1, '6432642426695757642'),
+ (2, '3259279587463616469'),
+ (3, '4169263184167314937'),
+ (4, '-8758758971870855856'),
+ (5, '-2558087477551224077'),
+ (6, 'anotherNewHash'),
+ ],
+ self.db.execute("SELECT * FROM hashes")
+ )
+
+ expected = [
+ (1, 'newHash'),
+ (2, '3259279587463616469'),
+ (3, '4169263184167314937'),
+ (4, '-8758758971870855856'),
+ (5, '-2558087477551224077')
+ ]
+
+ # UPDATE
+ re_init_database()
+ self.db.update(
+ 'hashes',
+ SET={'value': 'newHash'},
+ WHERE={'id': 1}
+ )
+ self.assertEqual(
+ expected,
+ self.db.execute("SELECT * FROM hashes"),
+ )
+
+ # UPDATE
+ re_init_database()
+ self.db.update(
+ 'hashes',
+ SET={
+ self.db['hashes']['value']: 'newHash'
+ },
+ WHERE={
+ self.db['hashes']['id']: 1
+ }
+ )
+ self.assertEqual(
+ expected,
+ self.db.execute("SELECT * FROM hashes"),
+ )
+
+ # UPDATE
+ re_init_database()
+ self.db.update(
+ 'hashes',
+ SET={
+ self.db['hashes']['value']: 'newHash'
+ },
+ WHERE=self.db['hashes']['id'] == 1
+ )
+ self.assertEqual(
+ expected,
+ self.db.execute("SELECT * FROM hashes"),
+ )
+
+ # UPDATE
+ re_init_database()
+ self.db.update(
+ 'hashes',
+ SET={
+ self.db['hashes']['value']: 'newHash'
+ },
+ WHERE=(1 < self.db['hashes']['id']) & (self.db['hashes']['id'] < 5)
+ )
+ self.assertEqual(
+ self.db.execute("SELECT * FROM hashes"),
+ [
+ (1, '6432642426695757642'),
+ (2, 'newHash'),
+ (3, 'newHash'),
+ (4, 'newHash'),
+ (5, '-2558087477551224077')
+ ]
+ )
+
+ # UPDATE with impossible WHERE condition
+ re_init_database()
+ self.db.update(
+ 'hashes',
+ SET={
+ self.db['hashes']['value']: 'newHash'
+ },
+ WHERE=(self.db['hashes']['id'] > 9999)
+ )
+ self.assertEqual(
+ self.db.execute("SELECT * FROM hashes"),
+ [
+ (1, '6432642426695757642'),
+ (2, '3259279587463616469'),
+ (3, '4169263184167314937'),
+ (4, '-8758758971870855856'),
+ (5, '-2558087477551224077')
+ ]
+ )
+
+ def test_select(self):
+ """
+ All kind of selects (WHERE, ORDER BY, JOIN)
+ """
+
+ self.db.executescript(
+ """
+ CREATE TABLE IF NOT EXISTS "position" (
+ "id" INTEGER PRIMARY KEY AUTOINCREMENT,
+ "name" TEXT,
+ "description" TEXT DEFAULT NULL
+ );
+
+ CREATE TABLE IF NOT EXISTS "employee" (
+ "id" INTEGER PRIMARY KEY AUTOINCREMENT,
+ "firstName" TEXT,
+ "surname" TEXT,
+ "age" INTEGER NOT NULL,
+ "positionID" INTEGER,
+ FOREIGN KEY (positionID) REFERENCES position (id)
+ );
+
+ CREATE TABLE IF NOT EXISTS "payments" (
+ "date" TEXT,
+ "employeeID" INTEGER,
+ "amount" INTEGER NOT NULL
+ );
+ """
+ )
+
+ # self.db.markup(
+ # {
+ # 'position': {
+ # 'id': [INTEGER, PRIMARY_KEY, AUTOINCREMENT],
+ # 'name': TEXT,
+ # 'description': [TEXT, DEFAULT, NULL],
+ # },
+ # 'employee': {
+ # 'id': [INTEGER, PRIMARY_KEY, AUTOINCREMENT],
+ # 'firstName': TEXT,
+ # 'surname': TEXT,
+ # 'age': [INTEGER, NOT_NULL],
+ # 'positionID': INTEGER,
+ #
+ # FOREIGN_KEY: {
+ # 'positionID': ['position', 'id']
+ # }
+ # },
+ # 'payments': {
+ # 'date': [TEXT],
+ # 'employeeID': INTEGER,
+ # 'amount': [INTEGER, NOT_NULL],
+ #
+ # FOREIGN_KEY: {
+ # 'positionID': ['employee', 'id']
+ # },
+ # }
+ # }
+ # )
+
+ self.db.executemany(
+ 'INSERT INTO "position" (id, name, description) VALUES (?, ?, ?)',
+ (
+ (0, 'Assistant', 'Novice developer'),
+ (1, 'Junior', 'Junior developer'),
+ (2, 'Middle', 'Middle developer'),
+ (3, 'Senior', 'senior developer'),
+ (4, 'DevOps', 'DevOps engineer')
+ )
+ )
+
+ self.db.executemany(
+ 'INSERT INTO "employee" (id, firstName, surname, age, positionID) VALUES (?, ?, ?, ?, ?)',
+ (
+ (None, 'Alis', 'A', 11, 1),
+ (None, 'Bob', 'B', 22, 1),
+ (None, 'Carl', 'C', 33, 2),
+ (None, 'Alis', 'B', 44, 3),
+ (None, 'Dexter', 'B', 55, 1),
+ (None, 'Elis', 'A', 22, 1),
+ (None, 'Frank', 'B', 33, 1),
+ (None, 'Georgy', 'D', 22, 2),
+ (None, 'FoxCpp', 'M', 22, 1),
+ (None, 'Ira', 'D', 22, 2)
+ )
+ )
+
+ self.db.executemany(
+ 'INSERT INTO "payments" (date, employeeID, amount) VALUES (?, ?, ?)',
+ (
+ ('01.01.2022', 2, 2000),
+ ('01.01.2022', 3, 3000),
+ ('01.01.2022', 7, 2000),
+ ('01.02.2022', 1, 4000),
+ ('01.02.2022', 2, 2000),
+ ('01.02.2022', 3, 4000),
+ ('01.02.2022', 5, 2000),
+ ('01.02.2022', 6, 4000),
+ ('01.02.2022', 7, 2000),
+ )
+ )
+
+ # SELECT all
+ expected = self.db.execute('SELECT * FROM "employee"')
+
+ self.assertEqual(
+ expected, self.db['employee'].select(ALL)
+ )
+ self.assertEqual(
+ expected, self.db['employee'].select_all()
+ )
+
+ # SELECT one column
+ expected = self.db.execute('SELECT id FROM "employee"')
+
+ self.assertEqual(
+ expected, self.db.select('employee', 'id')
+ )
+ self.assertEqual(
+ expected, self.db['employee']['id']
+ )
+ self.assertEqual(
+ expected, self.db['employee'].select('id')
+ )
+ self.assertEqual(
+ expected, self.db['employee'].select(self.db['employee']['id'])
+ )
+ self.assertEqual(
+ expected, self.db['employee'].select([self.db['employee']['id']])
+ )
+ self.assertEqual(
+ expected, self.db['employee'].select((self.db['employee']['id'],))
+ )
+
+ # SELECT 2 columns
+ expected = self.db.execute('SELECT id, firstName FROM "employee"')
+
+ self.assertEqual(
+ expected, self.db['employee'].select('id, firstName')
+ )
+ self.assertEqual(
+ expected, self.db['employee'].select(['id', 'firstName'])
+ )
+ self.assertEqual(
+ expected, self.db['employee'].select(('id', 'firstName'))
+ )
+ self.assertEqual(
+ expected, self.db['employee'].select(
+ [
+ self.db['employee']['id'],
+ self.db['employee']['firstName']
+ ]
+ )
+ )
+
+ # SELECT 2 columns WHERE (condition)
+ expected = self.db.execute('SELECT id, firstName FROM "employee" WHERE id > 2')
+
+ self.assertEqual(
+ expected,
+ self.db['employee'].select(
+ SELECT=['id', 'firstName'],
+ WHERE='id > 2',
+ )
+ )
+ self.assertEqual(
+ expected,
+ self.db['employee'].select(
+ SELECT=['id', 'firstName'],
+ WHERE='id > 2'
+ )
+ )
+ self.assertEqual(
+ expected,
+ self.db['employee'].select(
+ SELECT=['id', 'firstName'],
+ WHERE=(self.db['employee']['id'] > 2)
+ )
+ )
+ self.assertEqual(
+ expected,
+ self.db['employee'].select(
+ SELECT=[self.db['employee']['id'], self.db['employee']['firstName']],
+ WHERE=(self.db['employee']['id'] > 2)
+ )
+ )
+
+ # SELECT 2 columns WHERE (condition) AND (condition)
+ expected = self.db.execute('SELECT id, firstName FROM "employee" WHERE (age > 11) AND (positionID <> 2)')
+
+ self.assertEqual(
+ expected,
+ self.db['employee'].select(
+ SELECT=['id', 'firstName'],
+ WHERE=(self.db['employee']['age'] > 11) & (self.db['employee']['positionID'] != 2)
+ )
+ )
+
+ # SELECT 2 columns WHERE (condition) AND (condition)
+ expected = self.db.execute('SELECT id, firstName FROM "employee" WHERE (age == 11) AND (positionID == 2)')
+
+ self.assertEqual(
+ expected,
+ self.db['employee'].select(
+ SELECT=['id', 'firstName'],
+ WHERE=(self.db['employee']['age'] == 11) & (self.db['employee']['positionID'] == 2)
+ )
+ )
+
+ # SELECT 2 columns WHERE (condition) OR (condition)
+ expected = self.db.execute('SELECT id, firstName FROM "employee" WHERE (age == 11) OR (positionID == 2)')
+
+ self.assertEqual(
+ expected,
+ self.db['employee'].select(
+ SELECT=['id', 'firstName'],
+ WHERE=(self.db['employee']['age'] == 11) | (self.db['employee']['positionID'] == 2)
+ )
+ )
+
+ # SELECT 3 columns ORDERED BY column
+ expected = self.db.execute('SELECT id, firstName, positionID FROM "employee" ORDER BY positionID')
+
+ self.assertEqual(
+ expected,
+ self.db['employee'].select(
+ SELECT=['id', 'firstName', 'positionID'],
+ ORDER_BY='positionID'
+ )
+ )
+ self.assertEqual(
+ expected,
+ self.db['employee'].select(
+ SELECT=['id', 'firstName', 'positionID'],
+ ORDER_BY=3
+ )
+ )
+ self.assertEqual(
+ expected,
+ self.db['employee'].select(
+ SELECT=['id', 'firstName', 'positionID'],
+ ORDER_BY=['positionID']
+ )
+ )
+ self.assertEqual(
+ expected,
+ self.db['employee'].select(
+ SELECT=['id', 'firstName', 'positionID'],
+ ORDER_BY=('positionID',)
+ )
+ )
+
+ # SELECT 2 columns ORDERED BY column1, column2
+ expected = self.db.execute('SELECT id, firstName FROM "employee" ORDER BY firstName, surname')
+
+ self.assertEqual(
+ expected,
+ self.db['employee'].select(
+ SELECT=['id', 'firstName'],
+ ORDER_BY=['firstName', 'surname']
+ )
+ )
+ self.assertEqual(
+ expected,
+ self.db['employee'].select(
+ SELECT=['id', 'firstName'],
+ ORDER_BY=('firstName', 'surname')
+ )
+ )
+
+ # SELECT 2 columns ORDERED BY column ASC
+ expected = self.db.execute('SELECT id, firstName FROM "employee" ORDER BY firstName ASC')
+
+ self.assertEqual(
+ expected,
+ self.db['employee'].select(
+ SELECT=['id', 'firstName'],
+ ORDER_BY='firstName ASC'
+ )
+ )
+
+ # SELECT 2 columns ORDERED BY column DESC
+ expected = self.db.execute('SELECT id, firstName FROM "employee" ORDER BY firstName DESC')
+
+ self.assertEqual(
+ expected,
+ self.db['employee'].select(
+ SELECT=['id', 'firstName'],
+ ORDER_BY='firstName DESC'
+ )
+ )
+
+ # Issue #59 (fixed)
+ # self.assertRaises(
+ # sqlite3.OperationalError,
+ # self.db['employee'].select,
+ # SELECT=['id', 'firstName'],
+ # ORDER_BY=['firstName', 'ASC', 'surname', 'DESC']
+ # )
+ # self.assertRaises(
+ # sqlite3.OperationalError,
+ # self.db['employee'].select,
+ # SELECT=['id', 'firstName'],
+ # ORDER_BY=('firstName', 'ASC', 'surname', 'DESC')
+ # )
+
+ # When issue #59 will be fixed, this code have to work fine
+ #
+ # SELECT 2 columns ORDERED BY column1 ASC, column DESC
+ expected = self.db.execute('SELECT id, firstName FROM "employee" ORDER BY firstName ASC, surname DESC')
+
+ self.assertEqual(
+ expected,
+ self.db['employee'].select(
+ SELECT=['id', 'firstName'],
+ ORDER_BY=['firstName', 'ASC', 'surname', 'DESC']
+ )
+ )
+ self.assertEqual(
+ expected,
+ self.db['employee'].select(
+ SELECT=['id', 'firstName'],
+ ORDER_BY=('firstName', 'ASC', 'surname', 'DESC')
+ )
+ )
+
+ # SELECT with one INNER JOIN
+ expected = self.db.execute(''
+ 'SELECT e.id, e.firstName, p.name '
+ 'FROM employee e '
+ 'INNER JOIN position p '
+ 'ON e.positionID == p.id '
+ 'ORDER BY e.positionID DESC')
+
+ self.assertEqual(
+ expected,
+ self.db['employee'].select(
+ SELECT=[
+ self.db['employee']['id'],
+ self.db['employee']['firstName'],
+ self.db['position']['name']
+ ],
+ JOIN=(
+ INNER_JOIN, self.db['position'],
+ ON, self.db['position']['id'] == self.db['employee']['positionID']
+ ),
+ ORDER_BY=(
+ self.db['position']['id'],
+ 'DESC'
+ )
+ )
+ )
+
+ # SELECT with two INNER JOINS
+ expected = self.db.execute(''
+ 'SELECT e.id, e.firstName, p.name '
+ 'FROM employee e '
+ 'INNER JOIN position p '
+ 'ON e.positionID == p.id '
+ 'INNER JOIN payments '
+ 'ON e.id == payments.employeeID '
+ 'ORDER BY payments.amount DESC')
+
+ self.assertEqual(
+ expected,
+ self.db['employee'].select(
+ SELECT=[
+ self.db['employee']['id'],
+ self.db['employee']['firstName'],
+ self.db['position']['name']
+ ],
+ JOIN=(
+ (
+ INNER_JOIN, self.db['position'],
+ ON, self.db['position']['id'] == self.db['employee']['positionID']
+ ),
+ (
+ INNER_JOIN, self.db['payments'],
+ ON, self.db['employee']['id'] == self.db['payments']['employeeID']
+ )
+ ),
+ ORDER_BY=(
+ self.db['payments']['amount'],
+ 'DESC'
+ )
+ )
+ )
+
+ # SELECT with two FULL JOINS
+ expected = self.db.execute(''
+ 'SELECT e.id, e.firstName, p.name '
+ 'FROM employee e '
+ 'LEFT JOIN position p '
+ 'ON e.positionID == p.id '
+ 'LEFT JOIN payments '
+ 'ON e.id == payments.employeeID '
+ 'ORDER BY payments.amount DESC')
+
+ self.assertEqual(
+ expected,
+ self.db['employee'].select(
+ SELECT=[
+ self.db['employee']['id'],
+ self.db['employee']['firstName'],
+ self.db['position']['name']
+ ],
+ JOIN=(
+ (
+ LEFT_JOIN, self.db['position'],
+ ON, self.db['position']['id'] == self.db['employee']['positionID']
+ ),
+ (
+ LEFT_JOIN, self.db['payments'],
+ ON, self.db['employee']['id'] == self.db['payments']['employeeID']
+ )
+ ),
+ ORDER_BY=(
+ self.db['payments']['amount'],
+ 'DESC'
+ )
+ )
+ )
+
+ def test_select_or(self):
+ """
+ OR kwarg with different values
+ """
+
+ def test_extra_features(self):
+ """
+ Special extra features with no other options to test
+ """
+
+ self.db.markup(
+ {
+ 'test_table_1': {
+ 'id': INTEGER
+ },
+ 'test_table_2': {
+ 'id': INTEGER,
+ 'col_1': INTEGER,
+ 'col_2': INTEGER,
+ 'col_3': INTEGER,
+ },
+ }
+ )
+
+ # Database
+ #
+ self.assertEqual(self.db.path, self.db_name)
+
+ # Tables
+ #
+ self.assertEqual(self.db.tables_names, ('test_table_1', 'test_table_2'))
+
+ # self.db['notExistingTable']
+ self.assertRaises(KeyError, self.db.__getitem__, 'notExistingTable')
+
+ # self.db.get_table('notExistingTable')
+ self.assertRaises(KeyError, self.db.get_table, 'notExistingTable')
+
+ self.assertEqual(self.db['test_table_1'].name, 'test_table_1')
+ self.assertEqual(self.db['test_table_2'].name, 'test_table_2')
+
+ # Columns
+ #
+ self.assertEqual(self.db['test_table_1'].columns_names, ('id',))
+ self.assertEqual(self.db['test_table_2'].columns_names, ('id', 'col_1', 'col_2', 'col_3'))
+
+ # self.db['notExistingTable']
+ self.assertRaises(KeyError, self.db['test_table_1'].__getitem__, 'notExistingColumn')
+
+ self.assertFalse(self.db['test_table_1'].has_column('notExistingColumn'))
+ self.assertFalse(self.db['test_table_2'].has_column('notExistingColumn'))
+
+ self.assertEqual(self.db['test_table_1']['id'].name, 'id')
+ self.assertEqual(self.db['test_table_2']['id'].name, 'id')
+
+ self.assertEqual(self.db['test_table_1']['id'].table, 'test_table_1')
+ self.assertEqual(self.db['test_table_2']['id'].table, 'test_table_2')
+
+ @unittest.skipUnless(importlib.util.find_spec('numpy'), "Module numpy not found")
+ def test_numpy(self):
+ from numpy import array, nan
+
+ data = [
+ ('World', 2415712510, 318.1, '9.7%', nan),
+ ('United States', 310645827, 949.5, '44.3%', 'Johnson&Johnson, Moderna, Pfizer/BioNTech'),
+ ('India', 252760364, 186.9, '3.5%', 'Covaxin, Covishield, Oxford/AstraZeneca'),
+ ('Brazil', 78906225, 376.7, '11.3%', 'Oxford / AstraZeneca, Oxford/AstraZeneca, Pfizer/BioNTech'),
+ ]
+
+ self.db.execute(
+ """
+ CREATE TABLE "test_table_numpy" (
+ "Country" TEXT UNIQUE,
+ "Doses_Administered" INTEGER,
+ "Doses_per_1000" REAL,
+ "Fully_Vaccinated_Population" TEXT,
+ "Vaccine_Type_Used" TEXT
+ );
+ """
+ )
+
+ self.db.updatemany("test_table_numpy", array(data))
+ self.db.updatemany("test_table_numpy", array([[], []]))
+
+
+def save_prof(func: callable):
+ def wrapper(*args, **kwargs):
+ import pstats
+ import cProfile
+
+ with cProfile.Profile() as pr:
+ func(*args, **kwargs)
+
+ stat = pstats.Stats(pr)
+ stat.sort_stats(pstats.SortKey.TIME)
+ stat.dump_stats(filename=f'time_{func.__name__}.prof')
+
+ return wrapper
+
+
+# @unittest.skip("Turned off manually")
+@unittest.skipUnless(importlib.util.find_spec('cProfile'), "Module cProfile not found")
+@unittest.skipUnless(importlib.util.find_spec('pstats'), "Module pstats not found")
+class TimeTestsSqllexSQLite(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(cls) -> None:
+ cls.complexity = 1000
+ cls.db_name = f"test_sqllex_db"
+ cls.db = SQLite3x(path=cls.db_name)
+
+ @classmethod
+ def tearDownClass(cls) -> None:
+ cls.db.disconnect()
+ os.remove(cls.db_name)
+
+ @save_prof
+ def test_create_table(self):
+ self.db.create_table(
+ 'main',
+ {
+ 'id': [INTEGER, PRIMARY_KEY, UNIQUE],
+ 'name': [TEXT],
+ 'age': [INTEGER, DEFAULT, 33]
+ }
+ )
+
+ @save_prof
+ def test_insert_fast(self):
+ for _ in range(self.complexity):
+ self.db.insert('main', (None, f'Alex', 33))
+
+ @save_prof
+ def test_insert_slow(self):
+ for _ in range(self.complexity):
+ self.db.insert('main', (None, 'Alex'))
+
+ @save_prof
+ def test_insert_many_fast(self):
+ data = [(None, 'Alex', 33) for _ in range(self.complexity)]
+
+ self.db.insertmany('main', data)
+
+ @save_prof
+ def test_insert_many_slow(self):
+ data = [(None, 'Alex') for _ in range(self.complexity)]
+
+ self.db.insertmany('main', data)
+
+ @save_prof
+ def test_select_all(self):
+ self.db.select_all('main', LIMIT=self.complexity)
+
+ @save_prof
+ def test_select_where_1(self):
+ """
+ Select where (something)
+ """
+ self.db.select(
+ 'main', 'id',
+ WHERE={
+ 'name': 'Alex'
+ },
+ LIMIT=self.complexity
+ )
+
+ @save_prof
+ def test_select_where_2(self):
+ """
+ Modern new way for WHERE (SearchConditions)
+ """
+ main_tab = self.db['main']
+ id_col = main_tab['id']
+ name_col = main_tab['name']
+
+ self.db.select(
+ main_tab, id_col,
+ WHERE=(name_col == 'Alex'),
+ LIMIT=self.complexity
+ )
+
+
+if __name__ == '__main__':
+ unittest.main()