Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add partitioned support to Sequence and Table sources #8

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from

Conversation

sodre
Copy link

@sodre sodre commented Jul 17, 2020

This PR tries to add support for Partitioned access to a Solr collection. By default, we define the partition_len to be 1024 records. During metadata lookup we can get a hit on the number of records in the solr collection. The number of partitions becomes ceil(numRecords/partition_len)

📜 Note:

  • Current implementation uses the simple pagination features (start, rows). This is problematic when we do deep-pagination.
  • We could implement cursors as an option, as long as the user understands that:
    • There is an additional cost the first time they access partitions out of order;
    • With my current knowledge of DASK, an all-upfront cost when we call to_dask;
    • The user provides us with the pre-computed cursors. I suggest creating a SOLRCursorSource to help in create this content.

The problem is that the cursors can only be obtained by iterating one page at a time(can't be paralleized). Fortunately, we only need the document id's and not the entire SOLR response, so the network transfer cost is small.

@sodre sodre changed the title Add partitioned support to SOLRSequenceSource Add partitioned support to Sequence and Table sources Jul 17, 2020
Copy link
Author

@sodre sodre left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@martindurant, let me know if you think this approach works for handling partitions in intake-solr.

@@ -28,18 +30,25 @@ class SOLRSequenceSource(base.DataSource):
zoocollection: bool or str
If using Zookeeper to orchestrate SOLR, this is the name of the
collection to connect to.
partition_len: int or None
The desired partition size. [default: 1024]
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We add a new parameter that limits the number of rows that are returned per partition.

Copy link
Author

@sodre sodre Jul 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at intake-es and it seems they use the npartitions as an input instead of partition_len.

❓ Let me know if you prefer that option.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong preference, but consistency might be good

self.partition_len = partition_len

if partition_len and partition_len <= 0:
raise ValueError(f"partition_len must be None or positive, got {partition_len}")
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When partition_len is None, we get the old behavior of not setting the row count.

We should verify that the old behavior was actually working. On my system, if we don't set the number of rows to return, then we only get back the first ten records of the dataset.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A test would be good. Does setting partition_len -> +inf (very large number) work for one-partition output?

"""Do a 0 row query and get the number of hits from the response"""
qargs = self.qargs.copy()
qargs["rows"] = 0
start = qargs.get("start", 0)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The user may want to start the query at a different position, so we take that into account.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📜 If we add support for Cursors, then we can't use the start option, according to SOLR documentation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know what people would normally use.
Does offsetting with start cause the server to scan the whole table, or is solr smart here?

Comment on lines +88 to +90
datashape=None,
dtype=None,
shape=(results.hits - start,),
Copy link
Author

@sodre sodre Jul 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ What is the difference between datashape and shape?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

datashape isn't used; it was meant for forward compatibility with complex types (struct, nested list)

if self.partition_len is not None:
qargs["start"] = qargs.get("start", 0) + index * self.partition_len
qargs["rows"] = self.partition_len
return self.solr.search(self.query, **qargs)
Copy link
Author

@sodre sodre Jul 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets return the raw results of the query at this point. There are other valuable fields, like facets, that we can be used in sub-classes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok.
Are facets another useful way to partition? Are there shards too?


def _get_partition(self, _):
"""Downloads all data
schema = super()._get_schema()
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We get the Schema from SOLRSequenceSource. This contains the number of partitions and the total number of records, but not the dtype.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it not worth grabbing the result head to figure this out (on request)?

"""Downloads all data
schema = super()._get_schema()

df = self._get_partition(0)
Copy link
Author

@sodre sodre Jul 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This loads the first partition into a dataframe and uses it to discover the returned schema.
Note that the schema might be different from the overall SOLR core schema because the user can select a subset of fields using the fl qarg.

@@ -9,7 +9,7 @@
from .util import start_solr, stop_docker, TEST_CORE

CONNECT = {'host': 'localhost', 'port': 9200}
TEST_DATA_DIR = 'tests'
TEST_DATA_DIR = os.path.abspath(os.path.dirname(__file__))
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was having issues running the test from within PyCharm, this fixed the problem for me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems good practice - should not require a particular CWD

@@ -1,7 +1,4 @@
from ._version import get_versions
__version__ = get_versions()['version']
del get_versions

import intake # Import this first to avoid circular imports during discovery.
del intake
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got exceptions when running with dask distributed. Removing these lines fixed the issue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the move to entrypoints to declare the drivers, I hope this is no longer needed

Copy link
Member

@martindurant martindurant left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking pretty good.
I think some tests will clarify usage and correctness.
The current code clearly demonstrates my ignorance of how SOLR really works - thank you for this!

@@ -1,7 +1,4 @@
from ._version import get_versions
__version__ = get_versions()['version']
del get_versions

import intake # Import this first to avoid circular imports during discovery.
del intake
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the move to entrypoints to declare the drivers, I hope this is no longer needed

self.partition_len = partition_len

if partition_len and partition_len <= 0:
raise ValueError(f"partition_len must be None or positive, got {partition_len}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A test would be good. Does setting partition_len -> +inf (very large number) work for one-partition output?

"""Do a 0 row query and get the number of hits from the response"""
qargs = self.qargs.copy()
qargs["rows"] = 0
start = qargs.get("start", 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know what people would normally use.
Does offsetting with start cause the server to scan the whole table, or is solr smart here?

Comment on lines +88 to +90
datashape=None,
dtype=None,
shape=(results.hits - start,),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

datashape isn't used; it was meant for forward compatibility with complex types (struct, nested list)

if self.partition_len is not None:
qargs["start"] = qargs.get("start", 0) + index * self.partition_len
qargs["rows"] = self.partition_len
return self.solr.search(self.query, **qargs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok.
Are facets another useful way to partition? Are there shards too?


def _get_partition(self, _):
"""Downloads all data
schema = super()._get_schema()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it not worth grabbing the result head to figure this out (on request)?


self._load_metadata()
return dask.dataframe.from_delayed(
[delayed(self.read_partition)(i) for i in range(self.npartitions)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is also bag.to_dataframe, which may be less code, reuse the sequence partitions

@@ -9,7 +9,7 @@
from .util import start_solr, stop_docker, TEST_CORE

CONNECT = {'host': 'localhost', 'port': 9200}
TEST_DATA_DIR = 'tests'
TEST_DATA_DIR = os.path.abspath(os.path.dirname(__file__))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems good practice - should not require a particular CWD

@sodre
Copy link
Author

sodre commented Jul 23, 2020

Looking pretty good.
I think some tests will clarify usage and correctness.
The current code clearly demonstrates my ignorance of how SOLR really works - thank you for this!

Hi @martindurant, I was pulled into a different task at work, and I won't be able to come back to this task for about a month. If it is okay, I would like to keep this still open as a draft.

@martindurant
Copy link
Member

No problem - ping me when you want me to have a look.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants