This repository has been archived by the owner on Jan 17, 2023. It is now read-only.
forked from contribsys/faktory_worker_go
-
Notifications
You must be signed in to change notification settings - Fork 1
/
pool.go
219 lines (183 loc) · 5.85 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
/*
The MIT License (MIT)
Copyright (c) 2013 Fatih Arslan
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package faktory_worker
import (
"errors"
"fmt"
"sync"
)
var (
// ErrClosed is the error resulting if the pool is closed via pool.Close().
ErrClosed = errors.New("pool is closed")
)
// Closeable interface describes a closable implementation. The underlying procedure of the Close() function is determined by its implementation
type Closeable interface {
// Close closes the object
Close() error
}
// Pool interface describes a pool implementation. A pool should have maximum
// capacity. An ideal pool is threadsafe and easy to use.
type Pool interface {
// Get returns a new connection from the pool. Closing the connections puts
// it back to the Pool. Closing it when the pool is destroyed or full will
// be counted as an error.
Get() (Closeable, error)
// Close closes the pool and all its connections. After Close() the pool is
// no longer usable.
Close()
// Len returns the current number of connections of the pool.
Len() int
}
// PoolConn is a wrapper around net.Conn to modify the the behavior of
// net.Conn's Close() method.
type PoolConn struct {
Closeable
mu sync.RWMutex
c *channelPool
unusable bool
}
// Close puts the given connects back to the pool instead of closing it.
func (p *PoolConn) Close() error {
p.mu.RLock()
defer p.mu.RUnlock()
if p.unusable {
if p.Closeable != nil {
return p.Closeable.Close()
}
return nil
}
return p.c.put(p.Closeable)
}
// MarkUnusable marks the connection not usable any more, to let the pool close it instead of returning it to pool.
func (p *PoolConn) MarkUnusable() {
p.mu.Lock()
p.unusable = true
p.mu.Unlock()
}
// wrapConn wraps a standard net.Conn to a poolConn net.Conn.
func (c *channelPool) wrapConn(conn Closeable) Closeable {
p := &PoolConn{c: c}
p.Closeable = conn
return p
}
// channelPool implements the Pool interface based on buffered channels.
type channelPool struct {
// storage for our net.Conn connections
mu sync.Mutex
conns chan Closeable
// net.Conn generator
factory Factory
}
// Factory is a function to create new connections.
type Factory func() (Closeable, error)
// NewChannelPool returns a new pool based on buffered channels with an initial
// capacity and maximum capacity. Factory is used when initial capacity is
// greater than zero to fill the pool. A zero initialCap doesn't fill the Pool
// until a new Get() is called. During a Get(), If there is no new connection
// available in the pool, a new connection will be created via the Factory()
// method.
func NewChannelPool(initialCap, maxCap int, factory Factory) (Pool, error) {
if initialCap < 0 || maxCap <= 0 || initialCap > maxCap {
return nil, errors.New("invalid capacity settings")
}
c := &channelPool{
conns: make(chan Closeable, maxCap),
factory: factory,
}
// create initial connections, if something goes wrong,
// just close the pool error out.
for i := 0; i < initialCap; i++ {
conn, err := factory()
if err != nil {
c.Close()
return nil, fmt.Errorf("factory is not able to fill the pool: %s", err)
}
c.conns <- conn
}
return c, nil
}
func (c *channelPool) getConns() chan Closeable {
c.mu.Lock()
conns := c.conns
c.mu.Unlock()
return conns
}
// Get implements the Pool interfaces Get() method. If there is no new
// connection available in the pool, a new connection will be created via the
// Factory() method.
func (c *channelPool) Get() (Closeable, error) {
conns := c.getConns()
if conns == nil {
return nil, ErrClosed
}
// wrap our connections with out custom net.Conn implementation (wrapConn
// method) that puts the connection back to the pool if it's closed.
select {
case conn := <-conns:
if conn == nil {
return nil, ErrClosed
}
return c.wrapConn(conn), nil
default:
conn, err := c.factory()
if err != nil {
return nil, err
}
return c.wrapConn(conn), nil
}
}
// put puts the connection back to the pool. If the pool is full or closed,
// conn is simply closed. A nil conn will be rejected.
func (c *channelPool) put(conn Closeable) error {
if conn == nil {
return errors.New("connection is nil. rejecting")
}
c.mu.Lock()
defer c.mu.Unlock()
if c.conns == nil {
// pool is closed, close passed connection
return conn.Close()
}
// put the resource back into the pool. If the pool is full, this will
// block and the default case will be executed.
select {
case c.conns <- conn:
return nil
default:
// pool is full, close passed connection
return conn.Close()
}
}
// Close shuts down all of the Closeable objects in the Pool
func (c *channelPool) Close() {
c.mu.Lock()
conns := c.conns
c.conns = nil
c.factory = nil
c.mu.Unlock()
if conns == nil {
return
}
close(conns)
for conn := range conns {
conn.Close()
}
}
// Len returns the number of Closeable objects in the Pool
func (c *channelPool) Len() int { return len(c.getConns()) }