Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add partitioned support to Sequence and Table sources #8

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions intake_solr/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
from ._version import get_versions
__version__ = get_versions()['version']
del get_versions

import intake # Import this first to avoid circular imports during discovery.
del intake
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got exceptions when running with dask distributed. Removing these lines fixed the issue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the move to entrypoints to declare the drivers, I hope this is no longer needed

from .source import SOLRSequenceSource, SOLRTableSource
120 changes: 86 additions & 34 deletions intake_solr/source.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import math

from intake.source import base
import pandas as pd
import pysolr
Expand Down Expand Up @@ -28,18 +30,25 @@ class SOLRSequenceSource(base.DataSource):
zoocollection: bool or str
If using Zookeeper to orchestrate SOLR, this is the name of the
collection to connect to.
partition_len: int or None
The desired partition size. [default: 1024]
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We add a new parameter that limits the number of rows that are returned per partition.

Copy link
Author

@sodre sodre Jul 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at intake-es and it seems they use the npartitions as an input instead of partition_len.

❓ Let me know if you prefer that option.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong preference, but consistency might be good

"""
container = 'python'
name = 'solr'
version = __version__
partition_access = False
partition_access = True

def __init__(self, query, base_url, core, qargs=None, metadata=None,
auth=None, cert=None, zoocollection=False):
auth=None, cert=None, zoocollection=False,
partition_len=1024):
self.query = query
self.qargs = qargs or {}
self.metadata = metadata or {}
self._schema = None
self.partition_len = partition_len

if partition_len and partition_len <= 0:
raise ValueError(f"partition_len must be None or positive, got {partition_len}")
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When partition_len is None, we get the old behavior of not setting the row count.

We should verify that the old behavior was actually working. On my system, if we don't set the number of rows to return, then we only get back the first ten records of the dataset.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A test would be good. Does setting partition_len -> +inf (very large number) work for one-partition output?

if auth == 'kerberos':
from requests_kerberos import HTTPKerberosAuth, OPTIONAL
auth = HTTPKerberosAuth(mutual_authentication=OPTIONAL,
Expand All @@ -64,24 +73,57 @@ def __init__(self, query, base_url, core, qargs=None, metadata=None,
super(SOLRSequenceSource, self).__init__(metadata=metadata)

def _get_schema(self):
return base.Schema(datashape=None,
dtype=None,
shape=None,
npartitions=1,
extra_metadata={})
"""Do a 0 row query and get the number of hits from the response"""
qargs = self.qargs.copy()
qargs["rows"] = 0
start = qargs.get("start", 0)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The user may want to start the query at a different position, so we take that into account.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📜 If we add support for Cursors, then we can't use the start option, according to SOLR documentation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know what people would normally use.
Does offsetting with start cause the server to scan the whole table, or is solr smart here?

results = self.solr.search(self.query, **qargs)

if self.partition_len is None:
npartitions = 1
else:
npartitions = math.ceil((results.hits - start) / self.partition_len)

return base.Schema(
datashape=None,
dtype=None,
shape=(results.hits - start,),
Comment on lines +88 to +90
Copy link
Author

@sodre sodre Jul 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ What is the difference between datashape and shape?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

datashape isn't used; it was meant for forward compatibility with complex types (struct, nested list)

npartitions=npartitions,
extra_metadata={},
)

def _do_query(self):
def _do_query(self, index):
qargs = self.qargs.copy()
if self.partition_len is not None:
qargs["start"] = qargs.get("start", 0) + index * self.partition_len
qargs["rows"] = self.partition_len
return self.solr.search(self.query, **qargs)
Copy link
Author

@sodre sodre Jul 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets return the raw results of the query at this point. There are other valuable fields, like facets, that we can be used in sub-classes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok.
Are facets another useful way to partition? Are there shards too?


def _get_partition(self, index):
"""Downloads all data in query response"""
solr_rv = self._do_query(index)
out = []
data = self.solr.search(self.query, **self.qargs).docs
for d in data:
for d in solr_rv.docs:
out.append({k: (v[0] if isinstance(v, (tuple, list)) else v)
for k, v in d.items()})
return out

def _get_partition(self, _):
"""Downloads all data
"""
return self._do_query()
def _close(self):
pass

def read(self):
self._load_metadata()
from itertools import chain
return chain(*(self._get_partition(index) for index in range(self.npartitions)))

def to_dask(self):
from dask import delayed
import dask.bag

self._load_metadata()
return dask.bag.from_delayed(
[delayed(self.read_partition)(i) for i in range(self.npartitions)]
)


class SOLRTableSource(SOLRSequenceSource):
Expand All @@ -108,32 +150,42 @@ class SOLRTableSource(SOLRSequenceSource):
zoocollection: bool or str
If using Zookeeper to orchestrate SOLR, this is the name of the
collection to connect to.
partition_len: int or None
The desired partition size. [default: 1024]
"""

name = 'solrtab'
container = 'dataframe'
partition_access = True

def _get_schema(self, retry=2):
"""Get schema from first 10 hits or cached dataframe"""
if not hasattr(self, '_dataframe'):
self._get_partition(0)
dtype = {k: str(v)
for k, v in self._dataframe.dtypes.to_dict().items()}
return base.Schema(datashape=None,
dtype=dtype,
shape=self._dataframe.shape,
npartitions=1,
extra_metadata={})

def _get_partition(self, _):
"""Downloads all data
schema = super()._get_schema()
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We get the Schema from SOLRSequenceSource. This contains the number of partitions and the total number of records, but not the dtype.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it not worth grabbing the result head to figure this out (on request)?


df = self._get_partition(0)
Copy link
Author

@sodre sodre Jul 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This loads the first partition into a dataframe and uses it to discover the returned schema.
Note that the schema might be different from the overall SOLR core schema because the user can select a subset of fields using the fl qarg.

schema["dtype"] = {k: str(v)
for k, v in df.dtypes.to_dict().items()}
schema["shape"] = (schema["shape"][0], *df.shape[1:])
return schema

def _get_partition(self, index):
"""Downloads all data in the partition
"""
if not hasattr(self, '_dataframe'):
df = pd.DataFrame(self._do_query())
self._dataframe = df
self._schema = None
self.discover()
return self._dataframe
seq = super()._get_partition(index)
# Columns are sorted unless the user defines the field list (fl)
columns = self.qargs["fl"] if "fl" in self.qargs else sorted(seq[0].keys())
return pd.DataFrame(seq, columns=columns)

def read(self):
self._load_metadata()
return pd.concat(self._get_partition(index) for index in range(self.npartitions))

def to_dask(self):
from dask import delayed
import dask.dataframe

self._load_metadata()
return dask.dataframe.from_delayed(
[delayed(self.read_partition)(i) for i in range(self.npartitions)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is also bag.to_dataframe, which may be less code, reuse the sequence partitions

)

def _close(self):
self._dataframe = None
2 changes: 1 addition & 1 deletion tests/test_intake_solr.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from .util import start_solr, stop_docker, TEST_CORE

CONNECT = {'host': 'localhost', 'port': 9200}
TEST_DATA_DIR = 'tests'
TEST_DATA_DIR = os.path.abspath(os.path.dirname(__file__))
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was having issues running the test from within PyCharm, this fixed the problem for me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems good practice - should not require a particular CWD

TEST_DATA = 'sample1.csv'
df = pd.read_csv(os.path.join(TEST_DATA_DIR, TEST_DATA))

Expand Down