Skip to content

Commit 90826ee

Browse files
committed
Simplify custom type autoloading with pgxpool
Provide some backwards-compatible configuration options for pgxpool which streamlines the use of the bulk loading of custom types: - AutoLoadTypes: a list of type (or class) names to automatically load for each connection, automatically also loading any other types these depend on. - ReuseTypeMaps: if enabled, pgxpool will cache the typemap information, avoiding the need to perform any further queries as new connections are created. ReuseTypeMaps is disabled by default as in some situations, a connection string might resolve to a pool of servers which do not share the same type name -> OID mapping.
1 parent 06c0451 commit 90826ee

File tree

2 files changed

+84
-9
lines changed

2 files changed

+84
-9
lines changed

pgxpool/pool.go

Lines changed: 59 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,17 @@ import (
1212

1313
"github.com/jackc/pgx/v5"
1414
"github.com/jackc/pgx/v5/pgconn"
15+
"github.com/jackc/pgx/v5/pgtype"
1516
"github.com/jackc/puddle/v2"
1617
)
1718

18-
var defaultMaxConns = int32(4)
19-
var defaultMinConns = int32(0)
20-
var defaultMaxConnLifetime = time.Hour
21-
var defaultMaxConnIdleTime = time.Minute * 30
22-
var defaultHealthCheckPeriod = time.Minute
19+
var (
20+
defaultMaxConns = int32(4)
21+
defaultMinConns = int32(0)
22+
defaultMaxConnLifetime = time.Hour
23+
defaultMaxConnIdleTime = time.Minute * 30
24+
defaultHealthCheckPeriod = time.Minute
25+
)
2326

2427
type connResource struct {
2528
conn *pgx.Conn
@@ -100,6 +103,11 @@ type Pool struct {
100103

101104
closeOnce sync.Once
102105
closeChan chan struct{}
106+
107+
autoLoadTypes []string
108+
reuseTypeMap bool
109+
autoLoadMutex *sync.Mutex
110+
autoLoadTypeInfos []*pgtype.DerivedTypeInfo
103111
}
104112

105113
// Config is the configuration struct for creating a pool. It must be created by [ParseConfig] and then it can be
@@ -147,6 +155,15 @@ type Config struct {
147155
// HealthCheckPeriod is the duration between checks of the health of idle connections.
148156
HealthCheckPeriod time.Duration
149157

158+
// AutoLoadTypes is a list of user-defined types which should automatically be loaded
159+
// as each new connection is created. This will also load any related types, directly
160+
// or indirectly required to handle these types.
161+
AutoLoadTypes []string
162+
163+
// ReuseTypeMaps, if enabled, will reuse the typemap information being used by AutoLoadTypes.
164+
// This removes the need to query the database each time a new connection is created.
165+
ReuseTypeMaps bool
166+
150167
createdByParseConfig bool // Used to enforce created by ParseConfig rule.
151168
}
152169

@@ -185,6 +202,8 @@ func NewWithConfig(ctx context.Context, config *Config) (*Pool, error) {
185202
config: config,
186203
beforeConnect: config.BeforeConnect,
187204
afterConnect: config.AfterConnect,
205+
autoLoadTypes: config.AutoLoadTypes,
206+
reuseTypeMap: config.ReuseTypeMaps,
188207
beforeAcquire: config.BeforeAcquire,
189208
afterRelease: config.AfterRelease,
190209
beforeClose: config.BeforeClose,
@@ -196,6 +215,7 @@ func NewWithConfig(ctx context.Context, config *Config) (*Pool, error) {
196215
healthCheckPeriod: config.HealthCheckPeriod,
197216
healthCheckChan: make(chan struct{}, 1),
198217
closeChan: make(chan struct{}),
218+
autoLoadMutex: new(sync.Mutex),
199219
}
200220

201221
if t, ok := config.ConnConfig.Tracer.(AcquireTracer); ok {
@@ -237,6 +257,19 @@ func NewWithConfig(ctx context.Context, config *Config) (*Pool, error) {
237257
}
238258
}
239259

260+
if len(p.autoLoadTypes) > 0 {
261+
types, err := p.loadTypes(ctx, conn, p.autoLoadTypes)
262+
if err != nil {
263+
conn.Close(ctx)
264+
panic(err)
265+
}
266+
if err = conn.TypeMap().RegisterDerivedTypes(types); err != nil {
267+
conn.Close(ctx)
268+
269+
panic(err)
270+
}
271+
}
272+
240273
jitterSecs := rand.Float64() * config.MaxConnLifetimeJitter.Seconds()
241274
maxAgeTime := time.Now().Add(config.MaxConnLifetime).Add(time.Duration(jitterSecs) * time.Second)
242275

@@ -388,6 +421,27 @@ func (p *Pool) Close() {
388421
})
389422
}
390423

424+
// loadTypes is used internally to autoload the custom types for a connection,
425+
// potentially reusing previously-loaded typemap information.
426+
func (p *Pool) loadTypes(ctx context.Context, conn *pgx.Conn, typeNames []string) ([]*pgtype.DerivedTypeInfo, error) {
427+
if p.reuseTypeMap {
428+
p.autoLoadMutex.Lock()
429+
defer p.autoLoadMutex.Unlock()
430+
if p.autoLoadTypeInfos != nil {
431+
return p.autoLoadTypeInfos, nil
432+
}
433+
types, err := pgx.LoadDerivedTypes(ctx, conn, typeNames)
434+
if err != nil {
435+
return nil, err
436+
}
437+
p.autoLoadTypeInfos = types
438+
return types, err
439+
}
440+
// Avoid needing to acquire the mutex and allow connections to initialise in parallel
441+
// if we have chosen to not reuse the type mapping
442+
return pgx.LoadDerivedTypes(ctx, conn, typeNames)
443+
}
444+
391445
func (p *Pool) isExpired(res *puddle.Resource[*connResource]) bool {
392446
return time.Now().After(res.Value().maxAgeTime)
393447
}
@@ -482,7 +536,6 @@ func (p *Pool) checkMinConns() error {
482536
func (p *Pool) createIdleResources(parentCtx context.Context, targetResources int) error {
483537
ctx, cancel := context.WithCancel(parentCtx)
484538
defer cancel()
485-
486539
errs := make(chan error, targetResources)
487540

488541
for i := 0; i < targetResources; i++ {
@@ -495,7 +548,6 @@ func (p *Pool) createIdleResources(parentCtx context.Context, targetResources in
495548
errs <- err
496549
}()
497550
}
498-
499551
var firstError error
500552
for i := 0; i < targetResources; i++ {
501553
err := <-errs

pgxpool/pool_test.go

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,31 @@ func TestPoolBeforeConnect(t *testing.T) {
261261
assert.EqualValues(t, "pgx", str)
262262
}
263263

264+
func TestAutoLoadTypes(t *testing.T) {
265+
t.Parallel()
266+
267+
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
268+
defer cancel()
269+
270+
config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
271+
require.NoError(t, err)
272+
273+
db1, err := pgxpool.NewWithConfig(ctx, config)
274+
require.NoError(t, err)
275+
defer db1.Close()
276+
db1.Exec(ctx, "DROP DOMAIN IF EXISTS autoload_uint64; CREATE DOMAIN autoload_uint64 as numeric(20,0)")
277+
defer db1.Exec(ctx, "DROP DOMAIN autoload_uint64")
278+
279+
config.AutoLoadTypes = []string{"autoload_uint64"}
280+
db2, err := pgxpool.NewWithConfig(ctx, config)
281+
require.NoError(t, err)
282+
283+
var n uint64
284+
err = db2.QueryRow(ctx, "select 12::autoload_uint64").Scan(&n)
285+
require.NoError(t, err)
286+
assert.EqualValues(t, uint64(12), n)
287+
}
288+
264289
func TestPoolAfterConnect(t *testing.T) {
265290
t.Parallel()
266291

@@ -676,7 +701,6 @@ func TestPoolQuery(t *testing.T) {
676701
stats = pool.Stat()
677702
assert.EqualValues(t, 0, stats.AcquiredConns())
678703
assert.EqualValues(t, 1, stats.TotalConns())
679-
680704
}
681705

682706
func TestPoolQueryRow(t *testing.T) {
@@ -1104,7 +1128,6 @@ func TestConnectEagerlyReachesMinPoolSize(t *testing.T) {
11041128
}
11051129

11061130
t.Fatal("did not reach min pool size")
1107-
11081131
}
11091132

11101133
func TestPoolSendBatchBatchCloseTwice(t *testing.T) {

0 commit comments

Comments
 (0)