Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TypeError: can not serialize 'Series' object while trying to read remote intake_sql.intake_sql.SQLSource #13

Open
emericauger opened this issue Aug 20, 2021 · 9 comments

Comments

@emericauger
Copy link

Using intake-sql.
It seems because msgpack recieve invalid data types in the response.

Traceback (server side) :

Traceback (most recent call last):
  File "/opt/conda/envs/intake/lib/python3.9/site-packages/tornado/web.py", line 1704, in _execute
    result = await result
  File "/opt/conda/envs/intake/lib/python3.9/site-packages/tornado/gen.py", line 216, in wrapper
    result = ctx_run(func, *args, **kwargs)
  File "/opt/conda/envs/intake/lib/python3.9/site-packages/intake/cli/server/server.py", line 329, in post
    self.write(msgpack.packb(response, **pack_kwargs))
  File "/opt/conda/envs/intake/lib/python3.9/site-packages/msgpack/__init__.py", line 35, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack/_packer.pyx", line 292, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 298, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 295, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 289, in msgpack._cmsgpack.Packer._pack
TypeError: can not serialize 'Series' object
@martindurant
Copy link
Member

I think, essentially, you will need to have arrow installed for both the server and client environments ( intake/intake#462 ). The branch that is getting activated should be giving you a meaningful error instead of the traceback you got.

@martindurant
Copy link
Member

@danielballan , I know tiled takes a more directly principled view on how to serialise data over HTTP. Do you handle dataframes yet, can you imagine making an example where the server is channeling the catalogue and data from a SQL connection?

@martindurant
Copy link
Member

@emericauger , I will investigate further next week, and may move this issue to intake-sql.

@emericauger
Copy link
Author

I think, essentially, you will need to have arrow installed for both the server and client environments ( intake/intake#462 ). The branch that is getting activated should be giving you a meaningful error instead of the traceback you got.

Even with arrow installed the traceback is the same.

@martindurant martindurant transferred this issue from intake/intake Aug 24, 2021
@martindurant
Copy link
Member

I transferred this issue to intake-sql.

I also created #12 (and updated the CI with it, which is why I merged immediately). That PR shows a server-based workflow like yours, but it passes. I am not sure how you hit the error you came across. Perhaps you can recreate with a test function working with a sqlite file?

@emericauger
Copy link
Author

I made two test functions : one with a sqlite file and an other with the Postgres database as a source. They both failed with the same issue.
When the catalog is opened locally, the intake_sql.SQLSource is fully accessible with its parameters.

Test output
    def test_with_pg():
        cat = intake.open_catalog('intake://localhost:5000')
>       s = cat.indices_communes(date = '2021-08-24').temp()

test_sql_cat.py:80:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\ProgramData\Anaconda3\envs\intake-latest\lib\site-packages\intake\catalog\base.py:350: in __getattr__
    return self[item]  # triggers reload_on_change
C:\ProgramData\Anaconda3\envs\intake-latest\lib\site-packages\intake\catalog\base.py:400: in __getitem__
    return e()
C:\ProgramData\Anaconda3\envs\intake-latest\lib\site-packages\intake\catalog\entry.py:77: in __call__
    s = self.get(**kwargs)
C:\ProgramData\Anaconda3\envs\intake-latest\lib\site-packages\intake\catalog\remote.py:450: in get
    return open_remote(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

url = 'http://localhost:5000/', entry = 'indices_communes', container = 'dataframe'
user_parameters = {'date': datetime.datetime(1970, 1, 1, 0, 0)}
description = "", http_args = {'headers': {}}, page_size = None
auth = <intake.auth.base.BaseClientAuth object at 0x00000181CEF16FD0>, getenv = True, getshell = True

    def open_remote(url, entry, container, user_parameters, description, http_args,
                    page_size=None, auth=None, getenv=None, getshell=None):
        """Create either local direct data source or remote streamed source"""
        from intake.container import container_map
        import msgpack
        import requests
        from requests.compat import urljoin

        if url.startswith('intake://'):
            url = url[len('intake://'):]
        payload = dict(action='open',
                       name=entry,
                       parameters=user_parameters,
                       available_plugins=list(plugin_registry.keys()))
        req = requests.post(urljoin(url, '/v1/source'),
                            data=msgpack.packb(payload, **pack_kwargs),
                            **http_args)
        if req.ok:
            response = msgpack.unpackb(req.content, **unpack_kwargs)

            if 'plugin' in response:
                pl = response['plugin']
                pl = [pl] if isinstance(pl, str) else pl
                # Direct access
                for p in pl:
                    if p in plugin_registry:
                        source = plugin_registry[p](**response['args'])
                        proxy = False
                        break
                else:
                    proxy = True
            else:
                proxy = True
            if proxy:
                response.pop('container')
                response.update({'name': entry, 'parameters': user_parameters})
                if container == 'catalog':
                    response.update({'auth': auth,
                                     'getenv': getenv,
                                     'getshell': getshell,
                                     'page_size': page_size
                                     # TODO ttl?
                                     # TODO storage_options?
                                     })
                source = container_map[container](url, http_args, **response)
            source.description = description
            return source

        else:
>           raise Exception('Server error: %d, %s' % (req.status_code, req.reason))
E           Exception: Server error: 500, Internal Server Error
Full debug log on the server side
2021-08-25 16:48:44,123 - intake - DEBUG - config.py:<module>:L125 - Intake logger set to debug
2021-08-25 16:48:44,420 - intake - INFO - __main__.py:main:L53 - Creating catalog from:
2021-08-25 16:48:44,420 - intake - INFO - __main__.py:main:L55 -   - cat.yml
2021-08-25 16:48:44,910 - intake - INFO - __main__.py:main:L62 - catalog_args: cat.yml
2021-08-25 16:48:44,910 - intake - INFO - __main__.py:main:L70 - Listening on localhost:5000
2021-08-25 16:48:44,911 - intake - DEBUG - server.py:__init__:L32 - auth: {'cls': 'intake.auth.base.BaseAuth'}
2021-08-25 16:49:23,230 - intake - DEBUG - server.py:post:L241 - Source POST: {'action': 'open', 'name': 'indices_communes', 'parameters': {'date': datetime.datetime(1970, 1, 1, 0, 0)}, 'available_plugins': ['yaml_file_cat', 'yaml_files_cat', 'alias', 'catalog', 'csv', 'intake_remote', 'ndzarr', 'numpy', 'textfiles', 'zarr_cat', 'parquet', 'sql', 'sql_auto', 'sql_cat', 'sql_manual', 'netcdf', 'opendap', 'rasterio', 'remote-xarray', 'xarray_image', 'zarr']}
DEBUG:intake:Source POST: {'action': 'open', 'name': 'indices_communes', 'parameters': {'date': datetime.datetime(1970, 1, 1, 0, 0)}, 'available_plugins': ['yaml_file_cat', 'yaml_files_cat', 'alias', 'catalog', 'csv', 'intake_remote', 'ndzarr', 'numpy', 'textfiles', 'zarr_cat', 'parquet', 'sql', 'sql_auto', 'sql_cat', 'sql_manual', 'netcdf', 'opendap', 'rasterio', 'remote-xarray', 'xarray_image', 'zarr']}
2021-08-25 16:49:23,232 - intake - DEBUG - server.py:post:L302 - Opening entry sources:
  indices_communes:
    args:
      sql_expr: select * from schema.table where date = '1970-01-01 00:00:00'
      uri: postgresql://xxxxx:xxxxx@dbserver:5432/db
    description: "test"
    driver: intake_sql.intake_sql.SQLSource
    metadata:
      catalog_dir: C:/Users/xxxxx/
      nom: "test"

DEBUG:intake:Opening entry sources:
  indices_communes:
    args:
      sql_expr: select * from schema.table where date = '1970-01-01 00:00:00'
      uri: postgresql://xxxxx:xxxxx@dbserver:5432/db
    description: "test"
    driver: intake_sql.intake_sql.SQLSource
    metadata:
      catalog_dir: C:/Users/xxxxx/
      nom: "test"

2021-08-25 16:49:23,694 - intake - DEBUG - server.py:add:L146 - Adding sources:
  indices_communes:
    args:
      sql_expr: select * from schema.table where date = '1970-01-01 00:00:00'
      uri: postgresql://xxxxx:xxxxx@dbserver:5432/db
    description: "test"
    driver: intake_sql.intake_sql.SQLSource
    metadata:
      catalog_dir: C:/Users/xxxxx/
      nom: "test"
 to cache, uuid 7b8ee32e-23b1-47fe-a77b-cfc535260640
DEBUG:intake:Adding sources:
  indices_communes:
    args:
      sql_expr: select * from schema.table where date = '1970-01-01 00:00:00'
      uri: postgresql://xxxxx:xxxxx@dbserver:5432/db
    description: "test"
    driver: intake_sql.intake_sql.SQLSource
    metadata:
      catalog_dir: C:/Users/xxxxx/
      nom: "test"
 to cache, uuid 7b8ee32e-23b1-47fe-a77b-cfc535260640
2021-08-25 16:49:23,696 - intake - DEBUG - server.py:post:L314 - Container: dataframe, ID: 7b8ee32e-23b1-47fe-a77b-cfc535260640
DEBUG:intake:Container: dataframe, ID: 7b8ee32e-23b1-47fe-a77b-cfc535260640
ERROR:tornado.application:Uncaught exception POST /v1/source (::1)
HTTPServerRequest(protocol='http', host='localhost:5000', method='POST', uri='/v1/source', version='HTTP/1.1', remote_ip='::1')
Traceback (most recent call last):
  File "C:\ProgramData\Anaconda3\envs\intake-latest\lib\site-packages\tornado\web.py", line 1704, in _execute
    result = await result
  File "C:\ProgramData\Anaconda3\envs\intake-latest\lib\site-packages\tornado\gen.py", line 216, in wrapper
    result = ctx_run(func, *args, **kwargs)
  File "C:\ProgramData\Anaconda3\envs\intake-latest\lib\site-packages\intake\cli\server\server.py", line 320, in post
    self.write(msgpack.packb(response, **pack_kwargs))
  File "C:\ProgramData\Anaconda3\envs\intake-latest\lib\site-packages\msgpack\__init__.py", line 35, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack/_packer.pyx", line 292, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 298, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 295, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 289, in msgpack._cmsgpack.Packer._pack
TypeError: can not serialize 'Series' object
ERROR:tornado.access:500 POST /v1/source (::1) 470.58ms

@martindurant
Copy link
Member

Sorry to ask more, but can you paste the sqlite test (no need for traceback), including code to generate the file and run the server.

@emericauger
Copy link
Author

I finally found which causes the error : direct_access was not specified in catalog entries and default seems to be forbid.
Setting direct_access to allow for drivers SQLSource, SQLSourceAutoPartition makes the job. Havn't tried for SQLSourceManualPartition.
I found it using the SQLCatalog driver (used in the test workflow), which was allowing to read the entries without the error (in that case direct_access has not to be specified though).

If that's helpful I made a test function to try an other sqlite file :

add to utils.py :

@pytest.fixture(scope='module')
def temp_db_from_web():
    """
    Test from a sqlite file
    """

    url = 'https://github.com/lerocha/chinook-database/raw/master/ChinookDatabase/DataSources/Chinook_Sqlite.sqlite'
    r = requests.get(url)
    f = 'chinook.db'
    open(f, 'wb').write(r.content)
    uri = 'sqlite:///' + f
    try:
        yield 'tables', uri
    finally:
        if os.path.isfile(f):
            os.remove(f)

with some modifications in test_sql_cat.py :

@pytest.fixture()
def remote_sql(temp_db_from_web):
    pytest.importorskip("tornado")
    pytest.importorskip("msgpack")
    pytest.importorskip("requests")
    port = random.randint(1025, 64000)
    table, uri = temp_db_from_web
    os.environ['TEST_SQLITE_URI'] = uri  # used in catalog default
    fn = os.path.join(here, 'cat.yaml')
    cmd = ["intake-server", fn, "-p", f"{port}"]
    proc = subprocess.Popen(cmd, env=os.environ)
    timeout = 5
    while True:
        try:
            if requests.get(f"http://localhost:{port}/v1/info").ok:
                print(requests.get(f"http://localhost:{port}/v1/info").content)

                break
        except:
            pass
        time.sleep(0.1)
        timeout -= 0.1
        assert timeout > 0, "Server did not come up"
    yield f"intake://localhost:{port}"
    proc.terminate()
    proc.wait()


def test_with_server(remote_sql):
    
    cat = intake.open_catalog(remote_sql)
    
    print(f"test + {list(cat.tables)}")
    s = cat.tables.Employee()
    data = s.read()
    print(data)
    assert len(data) > 0

Thank you very much for your guidance !

@martindurant
Copy link
Member

I'm glad you solved it with direct access, but it is supposed to work via streaming too, so I'll get around to fixing the issue - and thanks, too, for providing the test case, it should be helpful.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants