diff --git a/go.sum b/go.sum new file mode 100644 index 000000000..fb9da4ff3 --- /dev/null +++ b/go.sum @@ -0,0 +1,97 @@ +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/RoaringBitmap/roaring v0.4.21 h1:WJ/zIlNX4wQZ9x8Ey33O1UaD9TCTakYsdLFSBcTwH+8= +github.com/RoaringBitmap/roaring v0.4.21/go.mod h1:D0gp8kJQgE1A4LQ5wFLggQEyvDi06Mq5mKs52e1TwOo= +github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= +github.com/blevesearch/bleve v1.0.9/go.mod h1:tb04/rbU29clbtNgorgFd8XdJea4x3ybYaOjWKr+UBU= +github.com/blevesearch/blevex v0.0.0-20190916190636-152f0fe5c040/go.mod h1:WH+MU2F4T0VmSdaPX+Wu5GYoZBrYWdOZWSjzvYcDmqQ= +github.com/blevesearch/go-porterstemmer v1.0.3 h1:GtmsqID0aZdCSNiY8SkuPJ12pD4jI+DdXTAn4YRcHCo= +github.com/blevesearch/go-porterstemmer v1.0.3/go.mod h1:angGc5Ht+k2xhJdZi511LtmxuEf0OVpvUUNrwmM1P7M= +github.com/blevesearch/mmap-go v1.0.2 h1:JtMHb+FgQCTTYIhtMvimw15dJwu1Y5lrZDMOFXVWPk0= +github.com/blevesearch/mmap-go v1.0.2/go.mod h1:ol2qBqYaOUsGdm7aRMRrYGgPvnwLe6Y+7LMvAB5IbSA= +github.com/blevesearch/segment v0.9.0 h1:5lG7yBCx98or7gK2cHMKPukPZ/31Kag7nONpoBt22Ac= +github.com/blevesearch/segment v0.9.0/go.mod h1:9PfHYUdQCgHktBgvtUOF4x+pc4/l8rdH0u5spnW85UQ= +github.com/blevesearch/snowballstem v0.9.0 h1:lMQ189YspGP6sXvZQ4WZ+MLawfV8wOmPoD/iWeNXm8s= +github.com/blevesearch/snowballstem v0.9.0/go.mod h1:PivSj3JMc8WuaFkTSRDW2SlrulNWPl4ABg1tC/hlgLs= +github.com/blevesearch/zap/v11 v11.0.9 h1:wlSrDBeGN1G4M51NQHIXca23ttwUfQpWaK7uhO5lRSo= +github.com/blevesearch/zap/v11 v11.0.9/go.mod h1:47hzinvmY2EvvJruzsSCJpro7so8L1neseaGjrtXHOY= +github.com/blevesearch/zap/v12 v12.0.9 h1:PpatkY+BLVFZf0Ok3/fwgI/I4RU0z5blXFGuQANmqXk= +github.com/blevesearch/zap/v12 v12.0.9/go.mod h1:paQuvxy7yXor+0Mx8p2KNmJgygQbQNN+W6HRfL5Hvwc= +github.com/blevesearch/zap/v13 v13.0.1 h1:NSCM6uKu77Vn/x9nlPp4pE1o/bftqcOWZEHSyZVpGBQ= +github.com/blevesearch/zap/v13 v13.0.1/go.mod h1:XmyNLMvMf8Z5FjLANXwUeDW3e1+o77TTGUWrth7T9WI= +github.com/blevesearch/zap/v14 v14.0.0 h1:HF8Ysjm13qxB0jTGaKLlatNXmJbQD8bY+PrPxm5v4hE= +github.com/blevesearch/zap/v14 v14.0.0/go.mod h1:sUc/gPGJlFbSQ2ZUh/wGRYwkKx+Dg/5p+dd+eq6QMXk= +github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= +github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= +github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= +github.com/couchbase/ghistogram v0.1.0/go.mod h1:s1Jhy76zqfEecpNWJfWUiKZookAFaiGOEoyzgHt9i7k= +github.com/couchbase/moss v0.1.0/go.mod h1:9MaHIaRuy9pvLPUJxB8sh8OrLfyDczECVL37grCIubs= +github.com/couchbase/vellum v1.0.1 h1:qrj9ohvZedvc51S5KzPfJ6P6z0Vqzv7Lx7k3mVc2WOk= +github.com/couchbase/vellum v1.0.1/go.mod h1:FcwrEivFpNi24R3jLOs3n+fs5RnuQnQqCLBJ1uAg1W4= +github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/glycerine/go-unsnap-stream v0.0.0-20181221182339-f9677308dec2 h1:Ujru1hufTHVb++eG6OuNDKMxZnGIvF6o/u8q/8h2+I4= +github.com/glycerine/go-unsnap-stream v0.0.0-20181221182339-f9677308dec2/go.mod h1:/20jfyN9Y5QPEAprSgKAUr+glWDY39ZiUEAYOEv5dsE= +github.com/glycerine/goconvey v0.0.0-20190410193231-58a59202ab31/go.mod h1:Ogl1Tioa0aV7gstGFO7KhffUsb9M4ydbEbbxpcEDc24= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/gopherjs/gopherjs v0.0.0-20190910122728-9d188e94fb99/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= +github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= +github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= +github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= +github.com/kljensen/snowball v0.6.0/go.mod h1:27N7E8fVU5H68RlUmnWwZCfxgt4POBJfENGMvNRhldw= +github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= +github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= +github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= +github.com/mschoch/smat v0.0.0-20160514031455-90eadee771ae/go.mod h1:qAyveg+e4CE+eKJXWVjKXM4ck2QobLqTDytGJbLLhJg= +github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw= +github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= +github.com/philhofer/fwd v1.0.0 h1:UbZqGr5Y38ApvM/V/jEljVxwocdweyH+vmYvRPBnbqQ= +github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= +github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= +github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= +github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tLCHU= +github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo= +github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= +github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s= +github.com/steveyen/gtreap v0.1.0 h1:CjhzTa274PyJLJuMZwIzCO1PfC00oRa8d1Kc78bFXJM= +github.com/steveyen/gtreap v0.1.0/go.mod h1:kl/5J7XbrOmlIbYIXdRHDDE5QxHqpk0cmkT7Z4dM9/Y= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= +github.com/tinylib/msgp v1.1.0 h1:9fQd+ICuRIu/ue4vxJZu6/LzxN0HwMds2nq/0cFvxHU= +github.com/tinylib/msgp v1.1.0/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE= +github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= +github.com/willf/bitset v1.1.10 h1:NotGKqX0KwQ72NUzqrjZq5ipPNDQex9lo3WpaS8L2sc= +github.com/willf/bitset v1.1.10/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= +github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= +go.etcd.io/bbolt v1.3.4 h1:hi1bXHMVrlQh6WwxAy+qZCV/SYIlqo+Ushwdpa4tAKg= +go.etcd.io/bbolt v1.3.4/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ= +golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181221143128-b4a75ba826a6/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5 h1:LfCXLvNmTYH9kEmVgqbnsWfruoXZIrh4YBgqVHtDvw0= +golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/index/scorch/README.md b/index/scorch/README.md index 9794aed70..ff6cf65e2 100644 --- a/index/scorch/README.md +++ b/index/scorch/README.md @@ -1,109 +1,12 @@ -# scorch +# Scorch +The Scorch object is the root object of the index. For all intents and purposes, it is the index. It is created with NewScorch and includes version info, configuration for the scorch index, and some other metadata. Notably, it includes the AnalysisQueue and a segment plugin. -## Definitions +When a client Opens a Scorch index with Scorch.Open(), the index spawns three goroutine loops... the introducer, the persister, and the merger. It also opens an embedded Bolt DB for metadata management and initializes a bunch of channels. The index keeps a count of the number of asynchronous tasks. -Batch -- A collection of Documents to mutate in the index. -Document -- Has a unique identifier (arbitrary bytes). -- Is comprised of a list of fields. - -Field -- Has a name (string). -- Has a type (text, number, date, geopoint). -- Has a value (depending on type). -- Can be indexed, stored, or both. -- If indexed, can be analyzed. --m If indexed, can optionally store term vectors. - -## Scope - -Scorch *MUST* implement the bleve.index API without requiring any changes to this API. - -Scorch *MAY* introduce new interfaces, which can be discovered to allow use of new capabilities not in the current API. - -## Implementation - -The scorch implementation starts with the concept of a segmented index. - -A segment is simply a slice, subset, or portion of the entire index. A segmented index is one which is composed of one or more segments. Although segments are created in a particular order, knowing this ordering is not required to achieve correct semantics when querying. Because there is no ordering, this means that when searching an index, you can (and should) search all the segments concurrently. - -### Internal Wrapper - -In order to accommodate the existing APIs while also improving the implementation, the scorch implementation includes some wrapper functionality that must be described. - -#### \_id field - -In scorch, field 0 is prearranged to be named \_id. All documents have a value for this field, which is the documents external identifier. In this version the field *MUST* be both indexed AND stored. The scorch wrapper adds this field, as it will not be present in the Document from the calling bleve code. - -NOTE: If a document already contains a field \_id, it will be replaced. If this is problematic, the caller must ensure such a scenario does not happen. - -### Proposed Structures - -``` -type Segment interface { - - Dictionary(field string) TermDictionary - -} - -type TermDictionary interface { - - PostingsList(term string, excluding PostingsList) PostingsList - -} - -type PostingsList interface { - - Next() Posting - - And(other PostingsList) PostingsList - Or(other PostingsList) PostingsList - -} - -type Posting interface { - Number() uint64 +## Mutation : Summary - Frequency() uint64 - Norm() float64 - - Locations() Locations -} - -type Locations interface { - Start() uint64 - End() uint64 - Pos() uint64 - ArrayPositions() ... -} - -type DeletedDocs { - -} - -type SegmentSnapshot struct { - segment Segment - deleted PostingsList -} - -type IndexSnapshot struct { - segment []SegmentSnapshot -} -``` -**What about errors?** -**What about memory mgmnt or context?** -**Postings List separate iterator to separate stateful from stateless** -### Mutating the Index - -The bleve.index API has methods for directly making individual mutations (Update/Delete/SetInternal/DeleteInternal), however for this first implementation, we assume that all of these calls can simply be turned into a Batch of size 1. This may be highly inefficient, but it will be correct. This decision is made based on the fact that Couchbase FTS always uses Batches. - -NOTE: As a side-effect of this decision, it should be clear that performance tuning may depend on the batch size, which may in-turn require changes in FTS. - -From this point forward, only Batch mutations will be discussed. - -Sequence of Operations: +Clients call Scorch.Batch() to get data into the index. Here is a summary of the sequence of operation: 1. For each document in the batch, search through all existing segments. The goal is to build up a per-segment bitset which tells us which documents in that segment are obsoleted by the addition of the new segment we're currently building. NOTE: we're not ready for this change to take effect yet, so rather than this operation mutating anything, they simply return bitsets, which we can apply later. Logically, this is something like: @@ -197,13 +100,155 @@ An ASCII art example: **is there opportunity to stop early when doc is found in one segment** **also, more efficient way to find bits for long lists of ids?** -### Searching +## Mutation: Details + +### Batches +A Batch is a collection of Documents to mutate (create / delete) in the index. Scorch enforces the use of the `_id` field (see Internal Wrapper below) and adds an `_id` field to each document populating it from doc.ID. + +Batches can include inserts and deletes. If the batch includes inserts... Batch spawns a goroutine to loop through the documents to insert. For each document, it creates an AnalysisWork object for each. + +### AnalysisWork and AnalysisQueue +AnalysisWork is work that is done when adding documents to an index. Bleve supports asynchronous / parallel analysis during batch insert operations and the AnalysisQueue coordinates parallel completion of the work. Batch spawns another goroutine to send the AnalysisWork objects to the AnalysisQueue via the AnalysisQueue.queue channel. This is done in a separate goroutine presumably because the AnalysisQueue.queue channel has no buffer and there is no sense in predefining the buffer since we never know how many documents might be in a batch. + +The AnalysisQueue is created with a number of workers and each worker runs in its own go routine to pull jobs off the queue and then call upon the index to do the analysis. The results of the analysis are passed back to Batch through a result channel on the AnalysisWork object. Batch creates each AnalysisWork object with the same result channel so it listens for results from just the one channel. + +Once analysis for the batch is complete, Batch generates some stats. + +### Segments +Every batch operation introduces a new **Segment**. A segment is simply a slice, subset, or portion of the entire index. A segmented index is one which is composed of one or more segments. Although segments are created in a particular order, knowing this ordering is not required to achieve correct semantics when querying. Because there is no ordering, this means that when searching an index, you can (and should) search all the segments concurrently. + +The segment plugin manages the persistence of a Segment and can create a new segment from a set of AnalysisResults or from a file on disk. The plugin also manages merging Segments. Since each batch produces a segment, the segments must merge eventually to reduce the number of segments that must be queried in parallel. While not exactly equivalent... it may be helpful to see https://en.wikipedia.org/wiki/Log-structured_merge-tree to better understand Scorch segments. + +The only segment plugin used in scorch is the `zapv` plugin. Multiple versions of zapv are registered at the time of this writing.... The current default is `zapv11` and the latest available is `zapv14` + +### Segment Preparation and Obsolescence +Scorch.Batch() now `prepares` the segment by calling Scorch.prepareSegment, which creates a `segmentIntroduction`. When a new segment is introduced to the index... it may mutate documents that are already in the index. Remember that each batch produces a segment to be merged at some point after the batch commits... so at any given moment... more than one segment may contain data for a document. Scorch.prepareSegment loops through all segments that have been committed to the index... Scorch.segment... it passes the set of ids for the documents about to be introduced into the segment.DocNumbers() function which returns the intersection of ids that are found in the persisted segment. This intersection is the list of documents that will become *obsolete* once the current segment is introduced and the list cached with the `segmentIntroduction` object. + +Note that DocNumbers returns a `roaring.Bitmap`. See (http://roaringbitmap.org/about/) for more info. In short, its a datastructure that represents a set of integers that performs set operations efficiently and also compresses well. + +Finally, the `segmentIntroduction` is queued on the Scorch.introduction channel and prepareSegment blocks on the introduction.applied channel, which signals that the introduction has been applied to the index. Note that this does not mean persisted. If Scorch.unsafeBatch is true, which is the default, Batch completes even though the data has not yet been written to disk. If Scorch.unsafeBatch is false, Batch will then block until the data is on disk. + +An event is fired upon introduction of the new segment, but it seems that only test code listens for this. Batch calls upon Scorch.segPlugin to to create the segment from . analysisResults. + + +### Introductions: IndexSnapshots, and SegmentSnapshots + +Remember the introducer loop. Once a `segmentIntroduction` is queued on the introductions channel, the introducerloop will pick it up and invoke `Scorch.introduceSegment`. + +`Scorch.introduceSegment` creates a new IndexSnapshot to eventually replace the Scorch.root IndexSnapshot. This happens at the conslusion of each introduction. As batches settle, the Scorch.root reflects their changes and represents a snapshot of the index at that point in time. + +`IndexSnapshots` consist, in part, of `SegmentSnapshots`. A `SegmentSnapshot` has some metadata, but most importantly references a Segment (which... for unmerged segments... is the Segment created in the batch) and the list of computed obsoletions. The new SegmentSnapshot... if it has any documents in it at all after obsoletions have been computed, is added to the new IndexSnapshot. The underlying segment includes a count of references from SegmentSnapshots so that as SegmentSnapshots are merged and deleted, their underlying segments can be garbage collected. The SegmentSnapshot is added to the IndexSnapshot and an offset is added to the IndexSnapshot referring to the SegmentSnapshot. The offset represents the first index of a document in the `IndexSnapshot` that can be found in the `SegmentSnapshot`. + +`Scorch.introduceSegment` loops through Scorch.segmentSnapshots again and checks the introduction for pre-computed obsoletes / deletions on that segment. Note that delete and obsolete are quite different. A document is obsolete in a segment if it is updated or deleted in segment that is added to the index later. If there is no precomputed set of obsoletions, it is computed again. This can happen because the root snapshot can be changed between the optimistic computation and this introduction. + +Now that `Scorch.introduceSegment` has processed existing SnapshotSegments, it adds the new one. This is fairly straightfoward because it looks a lot like the other introductions, but without the computation for obsolescence. + +### Introductions: Internal Data + +IndexSnapshot.internal is data that is used for... Internal Storage (see the section on internal storage below). At this point, it is copied to the new SnapshotIndex and new internal data introduced by the batch is added / overwritten. + +### Introductions: Wrap Up + +The finale of `Scorch.introduceSegment` is to register the `introduction.persisted` and `introduction.persistedCallback` channels on the scorch index. These will be called when the Snapshot and underlying segments have actually gone to disk. Note that the index accumulates these... they are not replaced merely because the root snapshot is updated. Thus they are valid for all Snapshots that have been applied so far. + +This introduction.applied channel is closed... The Batch, blocking on the introduction.applied channel moves forward. + +## Documents and Fields + +Document +- Has a unique identifier (arbitrary bytes). +- Is comprised of a list of fields. + +Field +- Has a name (string). +- Has a type (text, number, date, geopoint). +- Has a value (depending on type). +- Can be indexed, stored, or both. +- If indexed, can be analyzed. +-m If indexed, can optionally store term vectors. + +## Internal Wrapper + +In order to accommodate the existing APIs while also improving the implementation, the scorch implementation includes some wrapper functionality that must be described. + +### \_id field + +In scorch, field 0 is prearranged to be named \_id. All documents have a value for this field, which is the documents external identifier. In this version the field *MUST* be both indexed AND stored. The scorch wrapper adds this field, as it will not be present in the Document from the calling bleve code. + +NOTE: If a document already contains a field \_id, it will be replaced. If this is problematic, the caller must ensure such a scenario does not happen. + +## Proposed Structures + +``` +type Segment interface { + + Dictionary(field string) TermDictionary + +} + +type TermDictionary interface { + + PostingsList(term string, excluding PostingsList) PostingsList + +} + +type PostingsList interface { + + Next() Posting + + And(other PostingsList) PostingsList + Or(other PostingsList) PostingsList + +} + +type Posting interface { + Number() uint64 + + Frequency() uint64 + Norm() float64 + + Locations() Locations +} + +type Locations interface { + Start() uint64 + End() uint64 + Pos() uint64 + ArrayPositions() ... +} + +type DeletedDocs { + +} + +type SegmentSnapshot struct { + segment Segment + deleted PostingsList +} + +type IndexSnapshot struct { + segment []SegmentSnapshot +} +``` +**What about errors?** +**What about memory mgmnt or context?** +**Postings List separate iterator to separate stateful from stateless** + +## Backward Compatibility + +The bleve.index API has methods for directly making individual mutations (Update/Delete/SetInternal/DeleteInternal), however for this first implementation, we assume that all of these calls can simply be turned into a Batch of size 1. This may be highly inefficient, but it will be correct. This decision is made based on the fact that Couchbase FTS always uses Batches. + +NOTE: As a side-effect of this decision, it should be clear that performance tuning may depend on the batch size, which may in-turn require changes in FTS. + +From this point forward, only Batch mutations will be discussed. + +## Searching In the bleve.index API all searching starts by getting an IndexReader, which represents a snapshot of the index at a point in time. As described in the section above, our index implementation maintains a pointer to the current IndexSnapshot. When a caller gets an IndexReader, they get a copy of this pointer, and can use it as long as they like. The IndexSnapshot contains SegmentSnapshots, which only contain pointers to immutable segments. The deleted posting lists associated with a segment change over time, but the particular deleted posting list in YOUR snapshot is immutable. This gives a stable view of the data. -#### Term Search +### Term Search Term search is the only searching primitive exposed in today's bleve.index API. This ultimately could limit our ability to take advantage of the indexing improvements, but it also means it will be easier to get a first version of this working. @@ -314,20 +359,20 @@ Caller could then ask to get term locations, stored fields, external doc ID for ``` -#### Future improvements +### Future improvements In the future, interfaces to detect these non-serially operating TermFieldReaders could expose their own And() and Or() up to the higher level Conjunction/Disjunction searchers. Doing this alone offers some win, but also means there would be greater burden on the Searcher code rewriting logical expressions for maximum performance. Another related topic is that of peak memory usage. With serially operating TermFieldReaders it was necessary to start them all at the same time and operate in unison. However, with these non-serially operating TermFieldReaders we have the option of doing a few at a time, consolidating them, dispoting the intermediaries, and then doing a few more. For very complex queries with many clauses this could reduce peak memory usage. -### Memory Tracking +## Memory Tracking All segments must be able to produce two statistics, an estimate of their explicit memory usage, and their actual size on disk (if any). For in-memory segments, disk usage could be zero, and the memory usage represents the entire information content. For mmap-based disk segments, the memory could be as low as the size of tracking structure itself (say just a few pointers). This would allow the implementation to throttle or block incoming mutations when a threshold memory usage has (or would be) exceeded. -### Persistence +## Persistence Obviously, we want to support (but maybe not require) asynchronous persistence of segments. My expectation is that segments are initially built in memory. At some point they are persisted to disk. This poses some interesting challenges. @@ -336,14 +381,14 @@ At runtime, the state of an index (it's IndexSnapshot) is not only the contents This also relates to the topic rollback, addressed next... -### Rollback +## Rollback One desirable property in the Couchbase ecosystem is the ability to rollback to some previous (though typically not long ago) state. One idea for keeping this property in this design is to protect some of the most recent segments from merging. Then, if necessary, they could be "undone" to reveal previous states of the system. In these scenarios "undone" has to properly undo the deleted bitmasks on the other segments. Again, the current thinking is that rather than "undo" anything, it could be work that was deferred in the first place, thus making it easier to logically undo. Another possibly related approach would be to tie this into our existing snapshot mechanism. Perhaps simulating a slow reader (holding onto index snapshots) for some period of time, can be the mechanism to achieve the desired end goal. -### Internal Storage +## Internal Storage The bleve.index API has support for "internal storage". The ability to store information under a separate name space. @@ -351,7 +396,7 @@ This is not used for high volume storage, so it is tempting to think we could ju More thought is required here. -### Merging +## Merging The segmented index approach requires merging to prevent the number of segments from growing too large. diff --git a/index/scorch/builder.go b/index/scorch/builder.go index 1f4b41d63..23bf784ca 100644 --- a/index/scorch/builder.go +++ b/index/scorch/builder.go @@ -285,10 +285,10 @@ func (o *Builder) Close() error { segment: seg, } is := &IndexSnapshot{ - epoch: 3, // chosen to match scorch behavior when indexing a single batch - segment: []*SegmentSnapshot{ss}, - creator: "scorch-builder", - internal: o.internal, + epoch: 3, // chosen to match scorch behavior when indexing a single batch + segmentSnapshots: []*SegmentSnapshot{ss}, + creator: "scorch-builder", + internal: o.internal, } // create the root bolt diff --git a/index/scorch/introducer.go b/index/scorch/introducer.go index 7770c41c5..f858c6f5d 100644 --- a/index/scorch/introducer.go +++ b/index/scorch/introducer.go @@ -25,7 +25,7 @@ import ( type segmentIntroduction struct { id uint64 - data segment.Segment + segment segment.Segment obsoletes map[uint64]*roaring.Bitmap ids []string internal map[string][]byte @@ -92,7 +92,7 @@ OUTER: s.asyncTasks.Done() } -func (s *Scorch) introduceSegment(next *segmentIntroduction) error { +func (s *Scorch) introduceSegment(introduction *segmentIntroduction) error { atomic.AddUint64(&s.stats.TotIntroduceSegmentBeg, 1) defer atomic.AddUint64(&s.stats.TotIntroduceSegmentEnd, 1) @@ -103,62 +103,66 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error { defer func() { _ = root.DecRef() }() - nsegs := len(root.segment) + nsegs := len(root.segmentSnapshots) // prepare new index snapshot - newSnapshot := &IndexSnapshot{ - parent: s, - segment: make([]*SegmentSnapshot, 0, nsegs+1), - offsets: make([]uint64, 0, nsegs+1), - internal: make(map[string][]byte, len(root.internal)), - refs: 1, - creator: "introduceSegment", + newIndexSnapshot := &IndexSnapshot{ + parent: s, + segmentSnapshots: make([]*SegmentSnapshot, 0, nsegs+1), + offsets: make([]uint64, 0, nsegs+1), + internal: make(map[string][]byte, len(root.internal)), + refs: 1, + creator: "introduceSegment", } // iterate through current segments var running uint64 var docsToPersistCount, memSegments, fileSegments uint64 - for i := range root.segment { + for i := range root.segmentSnapshots { // see if optimistic work included this segment - delta, ok := next.obsoletes[root.segment[i].id] + delta, ok := introduction.obsoletes[root.segmentSnapshots[i].id] if !ok { var err error - delta, err = root.segment[i].segment.DocNumbers(next.ids) + delta, err = root.segmentSnapshots[i].segment.DocNumbers(introduction.ids) if err != nil { - next.applied <- fmt.Errorf("error computing doc numbers: %v", err) - close(next.applied) - _ = newSnapshot.DecRef() + introduction.applied <- fmt.Errorf("error computing doc numbers: %v", err) + close(introduction.applied) + _ = newIndexSnapshot.DecRef() return err } } - newss := &SegmentSnapshot{ - id: root.segment[i].id, - segment: root.segment[i].segment, - cachedDocs: root.segment[i].cachedDocs, - creator: root.segment[i].creator, + newSegmentSnapshot := &SegmentSnapshot{ + id: root.segmentSnapshots[i].id, + segment: root.segmentSnapshots[i].segment, + cachedDocs: root.segmentSnapshots[i].cachedDocs, + creator: root.segmentSnapshots[i].creator, } // apply new obsoletions - if root.segment[i].deleted == nil { - newss.deleted = delta + if root.segmentSnapshots[i].obsoleted == nil { + newSegmentSnapshot.obsoleted = delta } else { - newss.deleted = roaring.Or(root.segment[i].deleted, delta) + newSegmentSnapshot.obsoleted = roaring.Or(root.segmentSnapshots[i].obsoleted, delta) } - if newss.deleted.IsEmpty() { - newss.deleted = nil + if newSegmentSnapshot.obsoleted.IsEmpty() { + newSegmentSnapshot.obsoleted = nil } // check for live size before copying - if newss.LiveSize() > 0 { - newSnapshot.segment = append(newSnapshot.segment, newss) - root.segment[i].segment.AddRef() - newSnapshot.offsets = append(newSnapshot.offsets, running) - running += newss.segment.Count() + // this is number of docs in the segment minus number deleted + // if there are none, we don't need to add this to the IndexSnapshot + if newSegmentSnapshot.LiveSize() > 0 { + newIndexSnapshot.segmentSnapshots = append(newIndexSnapshot.segmentSnapshots, newSegmentSnapshot) + root.segmentSnapshots[i].segment.AddRef() + newIndexSnapshot.offsets = append(newIndexSnapshot.offsets, running) + running += newSegmentSnapshot.segment.Count() } - if isMemorySegment(root.segment[i]) { - docsToPersistCount += root.segment[i].Count() + // The segment isn't necessarily on the disk when we wrap up the introduction. + // So recalculate stats for the new IndexSnapshot as we add SegmentSnapshots + if isMemorySegment(root.segmentSnapshots[i]) { + docsToPersistCount += root.segmentSnapshots[i].Count() memSegments++ } else { fileSegments++ @@ -170,47 +174,48 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error { atomic.StoreUint64(&s.stats.TotFileSegmentsAtRoot, fileSegments) // append new segment, if any, to end of the new index snapshot - if next.data != nil { + if introduction.segment != nil { newSegmentSnapshot := &SegmentSnapshot{ - id: next.id, - segment: next.data, // take ownership of next.data's ref-count + id: introduction.id, + segment: introduction.segment, // take ownership of introduction.data's ref-count cachedDocs: &cachedDocs{cache: nil}, creator: "introduceSegment", } - newSnapshot.segment = append(newSnapshot.segment, newSegmentSnapshot) - newSnapshot.offsets = append(newSnapshot.offsets, running) + newIndexSnapshot.segmentSnapshots = append(newIndexSnapshot.segmentSnapshots, newSegmentSnapshot) + newIndexSnapshot.offsets = append(newIndexSnapshot.offsets, running) - // increment numItemsIntroduced which tracks the number of items - // queued for persistence. + // increment numItemsIntroduced which tracks the number of items queued for persistence. atomic.AddUint64(&s.stats.TotIntroducedItems, newSegmentSnapshot.Count()) atomic.AddUint64(&s.stats.TotIntroducedSegmentsBatch, 1) } + // copy old values for key, oldVal := range root.internal { - newSnapshot.internal[key] = oldVal + newIndexSnapshot.internal[key] = oldVal } + // set new values and apply deletes - for key, newVal := range next.internal { + for key, newVal := range introduction.internal { if newVal != nil { - newSnapshot.internal[key] = newVal + newIndexSnapshot.internal[key] = newVal } else { - delete(newSnapshot.internal, key) + delete(newIndexSnapshot.internal, key) } } - newSnapshot.updateSize() + newIndexSnapshot.updateSize() s.rootLock.Lock() - if next.persisted != nil { - s.rootPersisted = append(s.rootPersisted, next.persisted) + if introduction.persisted != nil { + s.rootPersisted = append(s.rootPersisted, introduction.persisted) } - if next.persistedCallback != nil { - s.persistedCallbacks = append(s.persistedCallbacks, next.persistedCallback) + if introduction.persistedCallback != nil { + s.persistedCallbacks = append(s.persistedCallbacks, introduction.persistedCallback) } // swap in new index snapshot - newSnapshot.epoch = s.nextSnapshotEpoch + newIndexSnapshot.epoch = s.nextSnapshotEpoch s.nextSnapshotEpoch++ rootPrev := s.root - s.root = newSnapshot + s.root = newIndexSnapshot atomic.StoreUint64(&s.stats.CurRootEpoch, s.root.epoch) // release lock s.rootLock.Unlock() @@ -219,7 +224,7 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error { _ = rootPrev.DecRef() } - close(next.applied) + close(introduction.applied) return nil } @@ -238,27 +243,27 @@ func (s *Scorch) introducePersist(persist *persistIntroduction) { defer func() { _ = root.DecRef() }() newIndexSnapshot := &IndexSnapshot{ - parent: s, - epoch: nextSnapshotEpoch, - segment: make([]*SegmentSnapshot, len(root.segment)), - offsets: make([]uint64, len(root.offsets)), - internal: make(map[string][]byte, len(root.internal)), - refs: 1, - creator: "introducePersist", + parent: s, + epoch: nextSnapshotEpoch, + segmentSnapshots: make([]*SegmentSnapshot, len(root.segmentSnapshots)), + offsets: make([]uint64, len(root.offsets)), + internal: make(map[string][]byte, len(root.internal)), + refs: 1, + creator: "introducePersist", } var docsToPersistCount, memSegments, fileSegments uint64 - for i, segmentSnapshot := range root.segment { + for i, segmentSnapshot := range root.segmentSnapshots { // see if this segment has been replaced if replacement, ok := persist.persisted[segmentSnapshot.id]; ok { newSegmentSnapshot := &SegmentSnapshot{ id: segmentSnapshot.id, segment: replacement, - deleted: segmentSnapshot.deleted, + obsoleted: segmentSnapshot.obsoleted, cachedDocs: segmentSnapshot.cachedDocs, creator: "introducePersist", } - newIndexSnapshot.segment[i] = newSegmentSnapshot + newIndexSnapshot.segmentSnapshots[i] = newSegmentSnapshot delete(persist.persisted, segmentSnapshot.id) // update items persisted incase of a new segment snapshot @@ -266,11 +271,11 @@ func (s *Scorch) introducePersist(persist *persistIntroduction) { atomic.AddUint64(&s.stats.TotPersistedSegments, 1) fileSegments++ } else { - newIndexSnapshot.segment[i] = root.segment[i] - newIndexSnapshot.segment[i].segment.AddRef() + newIndexSnapshot.segmentSnapshots[i] = root.segmentSnapshots[i] + newIndexSnapshot.segmentSnapshots[i].segment.AddRef() - if isMemorySegment(root.segment[i]) { - docsToPersistCount += root.segment[i].Count() + if isMemorySegment(root.segmentSnapshots[i]) { + docsToPersistCount += root.segmentSnapshots[i].Count() memSegments++ } else { fileSegments++ @@ -323,16 +328,16 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) { // iterate through current segments newSegmentDeleted := roaring.NewBitmap() var running, docsToPersistCount, memSegments, fileSegments uint64 - for i := range root.segment { - segmentID := root.segment[i].id + for i := range root.segmentSnapshots { + segmentID := root.segmentSnapshots[i].id if segSnapAtMerge, ok := nextMerge.old[segmentID]; ok { // this segment is going away, see if anything else was deleted since we started the merge - if segSnapAtMerge != nil && root.segment[i].deleted != nil { + if segSnapAtMerge != nil && root.segmentSnapshots[i].obsoleted != nil { // assume all these deletes are new - deletedSince := root.segment[i].deleted + deletedSince := root.segmentSnapshots[i].obsoleted // if we already knew about some of them, remove - if segSnapAtMerge.deleted != nil { - deletedSince = roaring.AndNot(root.segment[i].deleted, segSnapAtMerge.deleted) + if segSnapAtMerge.obsoleted != nil { + deletedSince = roaring.AndNot(root.segmentSnapshots[i].obsoleted, segSnapAtMerge.obsoleted) } deletedSinceItr := deletedSince.Iterator() for deletedSinceItr.HasNext() { @@ -346,21 +351,21 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) { // segments left behind in old map after processing // the root segments would be the obsolete segment set delete(nextMerge.old, segmentID) - } else if root.segment[i].LiveSize() > 0 { + } else if root.segmentSnapshots[i].LiveSize() > 0 { // this segment is staying - newSnapshot.segment = append(newSnapshot.segment, &SegmentSnapshot{ - id: root.segment[i].id, - segment: root.segment[i].segment, - deleted: root.segment[i].deleted, - cachedDocs: root.segment[i].cachedDocs, - creator: root.segment[i].creator, + newSnapshot.segmentSnapshots = append(newSnapshot.segmentSnapshots, &SegmentSnapshot{ + id: root.segmentSnapshots[i].id, + segment: root.segmentSnapshots[i].segment, + obsoleted: root.segmentSnapshots[i].obsoleted, + cachedDocs: root.segmentSnapshots[i].cachedDocs, + creator: root.segmentSnapshots[i].creator, }) - root.segment[i].segment.AddRef() + root.segmentSnapshots[i].segment.AddRef() newSnapshot.offsets = append(newSnapshot.offsets, running) - running += root.segment[i].segment.Count() + running += root.segmentSnapshots[i].segment.Count() - if isMemorySegment(root.segment[i]) { - docsToPersistCount += root.segment[i].Count() + if isMemorySegment(root.segmentSnapshots[i]) { + docsToPersistCount += root.segmentSnapshots[i].Count() memSegments++ } else { fileSegments++ @@ -389,10 +394,10 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) { if nextMerge.new != nil && nextMerge.new.Count() > newSegmentDeleted.GetCardinality() { // put new segment at end - newSnapshot.segment = append(newSnapshot.segment, &SegmentSnapshot{ + newSnapshot.segmentSnapshots = append(newSnapshot.segmentSnapshots, &SegmentSnapshot{ id: nextMerge.id, segment: nextMerge.new, // take ownership for nextMerge.new's ref-count - deleted: newSegmentDeleted, + obsoleted: newSegmentDeleted, cachedDocs: &cachedDocs{cache: nil}, creator: "introduceMerge", }) diff --git a/index/scorch/merge.go b/index/scorch/merge.go index 56c0953f4..5ab8e6409 100644 --- a/index/scorch/merge.go +++ b/index/scorch/merge.go @@ -243,7 +243,7 @@ func (s *Scorch) planMergeAtSnapshot(ctx context.Context, options *mergeplan.MergePlanOptions, ourSnapshot *IndexSnapshot) error { // build list of persisted segments in this snapshot var onlyPersistedSnapshots []mergeplan.Segment - for _, segmentSnapshot := range ourSnapshot.segment { + for _, segmentSnapshot := range ourSnapshot.segmentSnapshots { if _, ok := segmentSnapshot.segment.(segment.PersistedSegment); ok { onlyPersistedSnapshots = append(onlyPersistedSnapshots, segmentSnapshot) } @@ -296,7 +296,7 @@ func (s *Scorch) planMergeAtSnapshot(ctx context.Context, oldMap[segSnapshot.id] = nil } else { segmentsToMerge = append(segmentsToMerge, segSnapshot.segment) - docsToDrop = append(docsToDrop, segSnapshot.deleted) + docsToDrop = append(docsToDrop, segSnapshot.obsoleted) } // track the files getting merged for unsetting the // removal ineligibility. This helps to unflip files @@ -469,7 +469,7 @@ func (s *Scorch) mergeSegmentBases(snapshot *IndexSnapshot, } for i, idx := range sbsIndexes { - ss := snapshot.segment[idx] + ss := snapshot.segmentSnapshots[idx] sm.old[ss.id] = ss sm.oldNewDocNums[ss.id] = newDocNums[i] } diff --git a/index/scorch/optimize.go b/index/scorch/optimize.go index 658354cd7..012c17246 100644 --- a/index/scorch/optimize.go +++ b/index/scorch/optimize.go @@ -16,10 +16,11 @@ package scorch import ( "fmt" + "sync/atomic" + "github.com/RoaringBitmap/roaring" "github.com/blevesearch/bleve/index" "github.com/blevesearch/bleve/index/scorch/segment" - "sync/atomic" ) var OptimizeConjunction = true @@ -78,7 +79,7 @@ func (o *OptimizeTFRConjunction) Finish() (index.Optimized, error) { return nil, nil } - for i := range o.snapshot.segment { + for i := range o.snapshot.segmentSnapshots { itr0, ok := o.tfrs[0].iterators[i].(segment.OptimizablePostingsIterator) if !ok || itr0.ActualBitmap() == nil { continue @@ -165,9 +166,8 @@ func (o *OptimizeTFRConjunctionUnadorned) Finish() (rv index.Optimized, err erro OptimizeTFRConjunctionUnadornedTerm, OptimizeTFRConjunctionUnadornedField) var actualBMs []*roaring.Bitmap // Collected from regular posting lists. - OUTER: - for i := range o.snapshot.segment { + for i := range o.snapshot.segmentSnapshots { actualBMs = actualBMs[:0] var docNum1HitLast uint64 @@ -175,7 +175,7 @@ OUTER: for _, tfr := range o.tfrs { if _, ok := tfr.iterators[i].(*segment.EmptyPostingsIterator); ok { - // An empty postings iterator means the entire AND is empty. + // An empty postings itsegmentSnapshotsmeans the entire AND is empty. oTFR.iterators[i] = segment.AnEmptyPostingsIterator continue OUTER } @@ -307,7 +307,7 @@ func (o *OptimizeTFRDisjunctionUnadorned) Finish() (rv index.Optimized, err erro return nil, nil } - for i := range o.snapshot.segment { + for i := range o.snapshot.segmentSnapshots { var cMax uint64 for _, tfr := range o.tfrs { @@ -333,7 +333,7 @@ func (o *OptimizeTFRDisjunctionUnadorned) Finish() (rv index.Optimized, err erro var docNums []uint32 // Collected docNum's from 1-hit posting lists. var actualBMs []*roaring.Bitmap // Collected from regular posting lists. - for i := range o.snapshot.segment { + for i := range o.snapshot.segmentSnapshots { docNums = docNums[:0] actualBMs = actualBMs[:0] @@ -386,7 +386,7 @@ func (i *IndexSnapshot) unadornedTermFieldReader( term: term, field: field, snapshot: i, - iterators: make([]segment.PostingsIterator, len(i.segment)), + iterators: make([]segment.PostingsIterator, len(i.segmentSnapshots)), segmentOffset: 0, includeFreq: false, includeNorm: false, diff --git a/index/scorch/persister.go b/index/scorch/persister.go index 498378a4f..da7d36aa1 100644 --- a/index/scorch/persister.go +++ b/index/scorch/persister.go @@ -361,10 +361,10 @@ func (s *Scorch) persistSnapshotMaybeMerge(snapshot *IndexSnapshot) ( var sbsDrops []*roaring.Bitmap var sbsIndexes []int - for i, segmentSnapshot := range snapshot.segment { + for i, segmentSnapshot := range snapshot.segmentSnapshots { if _, ok := segmentSnapshot.segment.(segment.PersistedSegment); !ok { sbs = append(sbs, segmentSnapshot.segment) - sbsDrops = append(sbsDrops, segmentSnapshot.deleted) + sbsDrops = append(sbsDrops, segmentSnapshot.obsoleted) sbsIndexes = append(sbsIndexes, i) } } @@ -388,33 +388,33 @@ func (s *Scorch) persistSnapshotMaybeMerge(snapshot *IndexSnapshot) ( mergedSegmentIDs := map[uint64]struct{}{} for _, idx := range sbsIndexes { - mergedSegmentIDs[snapshot.segment[idx].id] = struct{}{} + mergedSegmentIDs[snapshot.segmentSnapshots[idx].id] = struct{}{} } // construct a snapshot that's logically equivalent to the input // snapshot, but with merged segments replaced by the new segment equiv := &IndexSnapshot{ - parent: snapshot.parent, - segment: make([]*SegmentSnapshot, 0, len(snapshot.segment)), - internal: snapshot.internal, - epoch: snapshot.epoch, - creator: "persistSnapshotMaybeMerge", + parent: snapshot.parent, + segmentSnapshots: make([]*SegmentSnapshot, 0, len(snapshot.segmentSnapshots)), + internal: snapshot.internal, + epoch: snapshot.epoch, + creator: "persistSnapshotMaybeMerge", } // copy to the equiv the segments that weren't replaced - for _, segment := range snapshot.segment { + for _, segment := range snapshot.segmentSnapshots { if _, wasMerged := mergedSegmentIDs[segment.id]; !wasMerged { - equiv.segment = append(equiv.segment, segment) + equiv.segmentSnapshots = append(equiv.segmentSnapshots, segment) } } // append to the equiv the new segment - for _, segment := range newSnapshot.segment { + for _, segment := range newSnapshot.segmentSnapshots { if segment.id == newSegmentID { - equiv.segment = append(equiv.segment, &SegmentSnapshot{ - id: newSegmentID, - segment: segment.segment, - deleted: nil, // nil since merging handled deletions + equiv.segmentSnapshots = append(equiv.segmentSnapshots, &SegmentSnapshot{ + id: newSegmentID, + segment: segment.segment, + obsoleted: nil, // nil since merging handled deletions }) break } @@ -473,7 +473,7 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string, newSegmentPaths := make(map[uint64]string) // first ensure that each segment in this snapshot has been persisted - for _, segmentSnapshot := range snapshot.segment { + for _, segmentSnapshot := range snapshot.segmentSnapshots { snapshotSegmentKey := segment.EncodeUvarintAscending(nil, segmentSnapshot.id) snapshotSegmentBucket, err := snapshotBucket.CreateBucketIfNotExists(snapshotSegmentKey) if err != nil { @@ -507,8 +507,8 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string, } // store current deleted bits var roaringBuf bytes.Buffer - if segmentSnapshot.deleted != nil { - _, err = segmentSnapshot.deleted.WriteTo(&roaringBuf) + if segmentSnapshot.obsoleted != nil { + _, err = segmentSnapshot.obsoleted.WriteTo(&roaringBuf) if err != nil { return nil, nil, fmt.Errorf("error persisting roaring bytes: %v", err) } @@ -749,7 +749,7 @@ func (s *Scorch) loadSnapshot(snapshot *bolt.Bucket) (*IndexSnapshot, error) { _ = rv.DecRef() return nil, fmt.Errorf("failed to decode segment id: %v", err) } - rv.segment = append(rv.segment, segmentSnapshot) + rv.segmentSnapshots = append(rv.segmentSnapshots, segmentSnapshot) rv.offsets = append(rv.offsets, running) running += segmentSnapshot.segment.Count() } @@ -782,7 +782,7 @@ func (s *Scorch) loadSegment(segmentBucket *bolt.Bucket) (*SegmentSnapshot, erro return nil, fmt.Errorf("error reading deleted bytes: %v", err) } if !deletedBitmap.IsEmpty() { - rv.deleted = deletedBitmap + rv.obsoleted = deletedBitmap } } diff --git a/index/scorch/scorch.go b/index/scorch/scorch.go index ba98a460d..45d83d39e 100644 --- a/index/scorch/scorch.go +++ b/index/scorch/scorch.go @@ -232,7 +232,7 @@ func (s *Scorch) openBolt() error { } } - atomic.StoreUint64(&s.stats.TotFileSegmentsAtRoot, uint64(len(s.root.segment))) + atomic.StoreUint64(&s.stats.TotFileSegmentsAtRoot, uint64(len(s.root.segmentSnapshots))) s.introductions = make(chan *segmentIntroduction) s.persists = make(chan *persistIntroduction) @@ -404,7 +404,7 @@ func (s *Scorch) prepareSegment(newSegment segment.Segment, ids []string, // new introduction introduction := &segmentIntroduction{ id: atomic.AddUint64(&s.nextSegmentID, 1), - data: newSegment, + segment: newSegment, ids: ids, obsoletes: make(map[uint64]*roaring.Bitmap), internal: internalOps, @@ -416,7 +416,9 @@ func (s *Scorch) prepareSegment(newSegment segment.Segment, ids []string, introduction.persisted = make(chan error, 1) } - // optimistically prepare obsoletes outside of rootLock + // prepare obsoletes outside of rootLock + // other segments could be introduced outside of rootLock that contain mutations on documents + // that are included in this batch. thus this is an optimistic locking approach. s.rootLock.RLock() root := s.root root.AddRef() @@ -424,7 +426,7 @@ func (s *Scorch) prepareSegment(newSegment segment.Segment, ids []string, defer func() { _ = root.DecRef() }() - for _, seg := range root.segment { + for _, seg := range root.segmentSnapshots { delta, err := seg.segment.DocNumbers(ids) if err != nil { return err @@ -516,8 +518,8 @@ func (s *Scorch) diskFileStats(rootSegmentPaths map[string]struct{}) (uint64, } func (s *Scorch) rootDiskSegmentsPaths() map[string]struct{} { - rv := make(map[string]struct{}, len(s.root.segment)) - for _, segmentSnapshot := range s.root.segment { + rv := make(map[string]struct{}, len(s.root.segmentSnapshots)) + for _, segmentSnapshot := range s.root.segmentSnapshots { if seg, ok := segmentSnapshot.segment.(segment.PersistedSegment); ok { rv[seg.Path()] = struct{}{} } diff --git a/index/scorch/scorch_test.go b/index/scorch/scorch_test.go index 5f0ec1b75..f21c4e430 100644 --- a/index/scorch/scorch_test.go +++ b/index/scorch/scorch_test.go @@ -845,7 +845,7 @@ func TestIndexInternalCRUD(t *testing.T) { t.Error(err) } - if len(indexReader.(*IndexSnapshot).segment) != 0 { + if len(indexReader.(*IndexSnapshot).segmentSnapshots) != 0 { t.Errorf("expected 0 segments") } @@ -874,7 +874,7 @@ func TestIndexInternalCRUD(t *testing.T) { t.Error(err) } - if len(indexReader2.(*IndexSnapshot).segment) != 0 { + if len(indexReader2.(*IndexSnapshot).segmentSnapshots) != 0 { t.Errorf("expected 0 segments") } @@ -903,7 +903,7 @@ func TestIndexInternalCRUD(t *testing.T) { t.Error(err) } - if len(indexReader3.(*IndexSnapshot).segment) != 0 { + if len(indexReader3.(*IndexSnapshot).segmentSnapshots) != 0 { t.Errorf("expected 0 segments") } @@ -1001,7 +1001,7 @@ func TestIndexBatch(t *testing.T) { } }() - numSegments := len(indexReader.(*IndexSnapshot).segment) + numSegments := len(indexReader.(*IndexSnapshot).segmentSnapshots) if numSegments <= 0 { t.Errorf("expected some segments, got: %d", numSegments) } diff --git a/index/scorch/snapshot_index.go b/index/scorch/snapshot_index.go index 9d17bcb2c..1424a1ab5 100644 --- a/index/scorch/snapshot_index.go +++ b/index/scorch/snapshot_index.go @@ -63,13 +63,13 @@ func init() { } type IndexSnapshot struct { - parent *Scorch - segment []*SegmentSnapshot - offsets []uint64 - internal map[string][]byte - epoch uint64 - size uint64 - creator string + parent *Scorch + segmentSnapshots []*SegmentSnapshot + offsets []uint64 + internal map[string][]byte + epoch uint64 + size uint64 + creator string m sync.Mutex // Protects the fields that follow. refs int64 @@ -79,7 +79,7 @@ type IndexSnapshot struct { } func (i *IndexSnapshot) Segments() []*SegmentSnapshot { - return i.segment + return i.segmentSnapshots } func (i *IndexSnapshot) Internal() map[string][]byte { @@ -96,7 +96,7 @@ func (i *IndexSnapshot) DecRef() (err error) { i.m.Lock() i.refs-- if i.refs == 0 { - for _, s := range i.segment { + for _, s := range i.segmentSnapshots { if s != nil { err2 := s.segment.DecRef() if err == nil { @@ -122,7 +122,7 @@ func (i *IndexSnapshot) Size() int { func (i *IndexSnapshot) updateSize() { i.size += uint64(reflectStaticSizeIndexSnapshot) - for _, s := range i.segment { + for _, s := range i.segmentSnapshots { i.size += uint64(s.Size()) } } @@ -132,7 +132,7 @@ func (i *IndexSnapshot) newIndexSnapshotFieldDict(field string, randomLookup bool) (*IndexSnapshotFieldDict, error) { results := make(chan *asynchSegmentResult) - for index, segment := range i.segment { + for index, segment := range i.segmentSnapshots { go func(index int, segment *SegmentSnapshot) { dict, err := segment.segment.Dictionary(field) if err != nil { @@ -150,9 +150,9 @@ func (i *IndexSnapshot) newIndexSnapshotFieldDict(field string, var err error rv := &IndexSnapshotFieldDict{ snapshot: i, - cursors: make([]*segmentDictCursor, 0, len(i.segment)), + cursors: make([]*segmentDictCursor, 0, len(i.segmentSnapshots)), } - for count := 0; count < len(i.segment); count++ { + for count := 0; count < len(i.segmentSnapshots); count++ { asr := <-results if asr.err != nil && err == nil { err = asr.err @@ -264,7 +264,7 @@ func (i *IndexSnapshot) FieldDictContains(field string) (index.FieldDictContains func (i *IndexSnapshot) DocIDReaderAll() (index.DocIDReader, error) { results := make(chan *asynchSegmentResult) - for index, segment := range i.segment { + for index, segment := range i.segmentSnapshots { go func(index int, segment *SegmentSnapshot) { results <- &asynchSegmentResult{ index: index, @@ -278,7 +278,7 @@ func (i *IndexSnapshot) DocIDReaderAll() (index.DocIDReader, error) { func (i *IndexSnapshot) DocIDReaderOnly(ids []string) (index.DocIDReader, error) { results := make(chan *asynchSegmentResult) - for index, segment := range i.segment { + for index, segment := range i.segmentSnapshots { go func(index int, segment *SegmentSnapshot) { docs, err := segment.DocNumbers(ids) if err != nil { @@ -298,10 +298,10 @@ func (i *IndexSnapshot) DocIDReaderOnly(ids []string) (index.DocIDReader, error) func (i *IndexSnapshot) newDocIDReader(results chan *asynchSegmentResult) (index.DocIDReader, error) { rv := &IndexSnapshotDocIDReader{ snapshot: i, - iterators: make([]roaring.IntIterable, len(i.segment)), + iterators: make([]roaring.IntIterable, len(i.segmentSnapshots)), } var err error - for count := 0; count < len(i.segment); count++ { + for count := 0; count < len(i.segmentSnapshots); count++ { asr := <-results if asr.err != nil { if err == nil { @@ -324,7 +324,7 @@ func (i *IndexSnapshot) Fields() ([]string, error) { // FIXME not making this concurrent for now as it's not used in hot path // of any searches at the moment (just a debug aid) fieldsMap := map[string]struct{}{} - for _, segment := range i.segment { + for _, segment := range i.segmentSnapshots { fields := segment.Fields() for _, field := range fields { fieldsMap[field] = struct{}{} @@ -343,7 +343,7 @@ func (i *IndexSnapshot) GetInternal(key []byte) ([]byte, error) { func (i *IndexSnapshot) DocCount() (uint64, error) { var rv uint64 - for _, segment := range i.segment { + for _, segment := range i.segmentSnapshots { rv += segment.Count() } return rv, nil @@ -378,7 +378,7 @@ func (i *IndexSnapshot) Document(id string) (rv *document.Document, err error) { segmentIndex, localDocNum := i.segmentIndexAndLocalDocNumFromGlobal(docNum) rv = document.NewDocument(id) - err = i.segment[segmentIndex].VisitDocument(localDocNum, func(name string, typ byte, val []byte, pos []uint64) bool { + err = i.segmentSnapshots[segmentIndex].VisitDocument(localDocNum, func(name string, typ byte, val []byte, pos []uint64) bool { if name == "_id" { return true } @@ -426,7 +426,7 @@ func (i *IndexSnapshot) ExternalID(id index.IndexInternalID) (string, error) { } segmentIndex, localDocNum := i.segmentIndexAndLocalDocNumFromGlobal(docNum) - v, err := i.segment[segmentIndex].DocID(localDocNum) + v, err := i.segmentSnapshots[segmentIndex].DocID(localDocNum) if err != nil { return "", err } @@ -465,10 +465,10 @@ func (i *IndexSnapshot) TermFieldReader(term []byte, field string, includeFreq, rv.field = field rv.snapshot = i if rv.postings == nil { - rv.postings = make([]segment.PostingsList, len(i.segment)) + rv.postings = make([]segment.PostingsList, len(i.segmentSnapshots)) } if rv.iterators == nil { - rv.iterators = make([]segment.PostingsIterator, len(i.segment)) + rv.iterators = make([]segment.PostingsIterator, len(i.segmentSnapshots)) } rv.segmentOffset = 0 rv.includeFreq = includeFreq @@ -478,8 +478,8 @@ func (i *IndexSnapshot) TermFieldReader(term []byte, field string, includeFreq, rv.currID = rv.currID[:0] if rv.dicts == nil { - rv.dicts = make([]segment.TermDictionary, len(i.segment)) - for i, segment := range i.segment { + rv.dicts = make([]segment.TermDictionary, len(i.segmentSnapshots)) + for i, segment := range i.segmentSnapshots { dict, err := segment.segment.Dictionary(field) if err != nil { return nil, err @@ -488,8 +488,8 @@ func (i *IndexSnapshot) TermFieldReader(term []byte, field string, includeFreq, } } - for i, segment := range i.segment { - pl, err := rv.dicts[i].PostingsList(term, segment.deleted, rv.postings[i]) + for i, segment := range i.segmentSnapshots { + pl, err := rv.dicts[i].PostingsList(term, segment.obsoleted, rv.postings[i]) if err != nil { return nil, err } @@ -578,7 +578,7 @@ func (i *IndexSnapshot) documentVisitFieldTerms(id index.IndexInternalID, } segmentIndex, localDocNum := i.segmentIndexAndLocalDocNumFromGlobal(docNum) - if segmentIndex >= len(i.segment) { + if segmentIndex >= len(i.segmentSnapshots) { return nil, nil } @@ -592,7 +592,7 @@ func (i *IndexSnapshot) documentVisitFieldTermsOnSegment( segmentIndex int, localDocNum uint64, fields []string, cFields []string, visitor index.DocumentFieldTermVisitor, dvs segment.DocVisitState) ( cFieldsOut []string, dvsOut segment.DocVisitState, err error) { - ss := i.segment[segmentIndex] + ss := i.segmentSnapshots[segmentIndex] var vFields []string // fields that are visitable via the segment @@ -669,7 +669,7 @@ func (dvr *DocValueReader) VisitDocValues(id index.IndexInternalID, } segmentIndex, localDocNum := dvr.i.segmentIndexAndLocalDocNumFromGlobal(docNum) - if segmentIndex >= len(dvr.i.segment) { + if segmentIndex >= len(dvr.i.segmentSnapshots) { return nil } diff --git a/index/scorch/snapshot_index_tfr.go b/index/scorch/snapshot_index_tfr.go index 239f68fbe..264bbe29d 100644 --- a/index/scorch/snapshot_index_tfr.go +++ b/index/scorch/snapshot_index_tfr.go @@ -143,9 +143,9 @@ func (i *IndexSnapshotTermFieldReader) Advance(ID index.IndexInternalID, preAllo return nil, fmt.Errorf("error converting to doc number % x - %v", ID, err) } segIndex, ldocNum := i.snapshot.segmentIndexAndLocalDocNumFromGlobal(num) - if segIndex >= len(i.snapshot.segment) { + if segIndex >= len(i.snapshot.segmentSnapshots) { return nil, fmt.Errorf("computed segment index %d out of bounds %d", - segIndex, len(i.snapshot.segment)) + segIndex, len(i.snapshot.segmentSnapshots)) } // skip directly to the target segment i.segmentOffset = segIndex diff --git a/index/scorch/snapshot_segment.go b/index/scorch/snapshot_segment.go index 96742b4f9..7f11c3d19 100644 --- a/index/scorch/snapshot_segment.go +++ b/index/scorch/snapshot_segment.go @@ -30,10 +30,10 @@ var TermSeparator byte = 0xff var TermSeparatorSplitSlice = []byte{TermSeparator} type SegmentSnapshot struct { - id uint64 - segment segment.Segment - deleted *roaring.Bitmap - creator string + id uint64 + segment segment.Segment + obsoleted *roaring.Bitmap + creator string cachedDocs *cachedDocs } @@ -43,7 +43,7 @@ func (s *SegmentSnapshot) Segment() segment.Segment { } func (s *SegmentSnapshot) Deleted() *roaring.Bitmap { - return s.deleted + return s.obsoleted } func (s *SegmentSnapshot) Id() uint64 { @@ -72,8 +72,8 @@ func (s *SegmentSnapshot) DocID(num uint64) ([]byte, error) { func (s *SegmentSnapshot) Count() uint64 { rv := s.segment.Count() - if s.deleted != nil { - rv -= s.deleted.GetCardinality() + if s.obsoleted != nil { + rv -= s.obsoleted.GetCardinality() } return rv } @@ -83,8 +83,8 @@ func (s *SegmentSnapshot) DocNumbers(docIDs []string) (*roaring.Bitmap, error) { if err != nil { return nil, err } - if s.deleted != nil { - rv.AndNot(s.deleted) + if s.obsoleted != nil { + rv.AndNot(s.obsoleted) } return rv, nil } @@ -93,8 +93,8 @@ func (s *SegmentSnapshot) DocNumbers(docIDs []string) (*roaring.Bitmap, error) { func (s *SegmentSnapshot) DocNumbersLive() *roaring.Bitmap { rv := roaring.NewBitmap() rv.AddRange(0, s.segment.Count()) - if s.deleted != nil { - rv.AndNot(s.deleted) + if s.obsoleted != nil { + rv.AndNot(s.obsoleted) } return rv } @@ -105,8 +105,8 @@ func (s *SegmentSnapshot) Fields() []string { func (s *SegmentSnapshot) Size() (rv int) { rv = s.segment.Size() - if s.deleted != nil { - rv += int(s.deleted.GetSizeInBytes()) + if s.obsoleted != nil { + rv += int(s.obsoleted.GetSizeInBytes()) } rv += s.cachedDocs.Size() return