@@ -2,7 +2,11 @@ import { AppError, CommonLogger, KeyValueTuple, pMap } from '@naturalcycles/js-l
22import { deflateString , inflateToString , ReadableTyped } from '@naturalcycles/nodejs-lib'
33import { CommonDaoLogLevel } from '../commondao/common.dao.model'
44import { CommonDBCreateOptions } from '../db.model'
5- import { CommonKeyValueDB , KeyValueDBTuple } from './commonKeyValueDB'
5+ import {
6+ CommonKeyValueDB ,
7+ CommonKeyValueDBSaveBatchOptions ,
8+ KeyValueDBTuple ,
9+ } from './commonKeyValueDB'
610
711export interface CommonKeyValueDaoCfg < T > {
812 db : CommonKeyValueDB
@@ -44,6 +48,8 @@ export interface CommonKeyValueDaoCfg<T> {
4448 deflatedJsonValue ?: boolean
4549}
4650
51+ export type CommonKeyValueDaoSaveOptions = CommonKeyValueDBSaveBatchOptions
52+
4753// todo: logging
4854// todo: readonly
4955
@@ -133,13 +139,13 @@ export class CommonKeyValueDao<T> {
133139 } as T
134140 }
135141
136- async patch ( id : string , patch : Partial < T > ) : Promise < T > {
142+ async patch ( id : string , patch : Partial < T > , opt ?: CommonKeyValueDaoSaveOptions ) : Promise < T > {
137143 const v : T = {
138144 ...( await this . getByIdOrEmpty ( id ) ) ,
139145 ...patch ,
140146 }
141147
142- await this . save ( id , v )
148+ await this . save ( id , v , opt )
143149
144150 return v
145151 }
@@ -158,31 +164,35 @@ export class CommonKeyValueDao<T> {
158164 return await this . cfg . db . getByIds ( this . cfg . table , ids )
159165 }
160166
161- async save ( id : string , value : T ) : Promise < void > {
162- await this . saveBatch ( [ [ id , value ] ] )
167+ async save ( id : string , value : T , opt ?: CommonKeyValueDaoSaveOptions ) : Promise < void > {
168+ await this . saveBatch ( [ [ id , value ] ] , opt )
163169 }
164170
165- async saveAsBuffer ( id : string , value : Buffer ) : Promise < void > {
166- await this . cfg . db . saveBatch ( this . cfg . table , [ [ id , value ] ] )
171+ async saveAsBuffer ( id : string , value : Buffer , opt ?: CommonKeyValueDaoSaveOptions ) : Promise < void > {
172+ await this . cfg . db . saveBatch ( this . cfg . table , [ [ id , value ] ] , opt )
167173 }
168174
169- async saveBatch ( entries : KeyValueTuple < string , T > [ ] ) : Promise < void > {
175+ async saveBatch (
176+ entries : KeyValueTuple < string , T > [ ] ,
177+ opt ?: CommonKeyValueDaoSaveOptions ,
178+ ) : Promise < void > {
179+ const { mapValueToBuffer } = this . cfg . hooks
170180 let bufferEntries : KeyValueDBTuple [ ]
171181
172- if ( ! this . cfg . hooks . mapValueToBuffer ) {
182+ if ( ! mapValueToBuffer ) {
173183 bufferEntries = entries as any
174184 } else {
175- bufferEntries = await pMap ( entries , async ( [ id , v ] ) => [
176- id ,
177- await this . cfg . hooks . mapValueToBuffer ! ( v ) ,
178- ] )
185+ bufferEntries = await pMap ( entries , async ( [ id , v ] ) => [ id , await mapValueToBuffer ( v ) ] )
179186 }
180187
181- await this . cfg . db . saveBatch ( this . cfg . table , bufferEntries )
188+ await this . cfg . db . saveBatch ( this . cfg . table , bufferEntries , opt )
182189 }
183190
184- async saveBatchAsBuffer ( entries : KeyValueDBTuple [ ] ) : Promise < void > {
185- await this . cfg . db . saveBatch ( this . cfg . table , entries )
191+ async saveBatchAsBuffer (
192+ entries : KeyValueDBTuple [ ] ,
193+ opt ?: CommonKeyValueDaoSaveOptions ,
194+ ) : Promise < void > {
195+ await this . cfg . db . saveBatch ( this . cfg . table , entries , opt )
186196 }
187197
188198 async deleteByIds ( ids : string [ ] ) : Promise < void > {
@@ -204,19 +214,19 @@ export class CommonKeyValueDao<T> {
204214 return this . cfg . db . streamValues ( this . cfg . table , limit ) as ReadableTyped < T >
205215 }
206216
207- const stream : ReadableTyped < T > = this . cfg . db
208- . streamValues ( this . cfg . table , limit )
209- // .on('error', err => stream.emit('error', err))
210- . flatMap ( async buf => {
217+ return this . cfg . db . streamValues ( this . cfg . table , limit ) . flatMap (
218+ async buf => {
211219 try {
212220 return [ await mapBufferToValue ( buf ) ]
213221 } catch ( err ) {
214222 this . cfg . logger . error ( err )
215223 return [ ] // SKIP
216224 }
217- } )
218-
219- return stream
225+ } ,
226+ {
227+ concurrency : 16 ,
228+ } ,
229+ )
220230 }
221231
222232 streamEntries ( limit ?: number ) : ReadableTyped < KeyValueTuple < string , T > > {
@@ -228,18 +238,18 @@ export class CommonKeyValueDao<T> {
228238 >
229239 }
230240
231- const stream : ReadableTyped < KeyValueTuple < string , T > > = this . cfg . db
232- . streamEntries ( this . cfg . table , limit )
233- // .on('error', err => stream.emit('error', err))
234- . flatMap ( async ( [ id , buf ] ) => {
241+ return this . cfg . db . streamEntries ( this . cfg . table , limit ) . flatMap (
242+ async ( [ id , buf ] ) => {
235243 try {
236244 return [ [ id , await mapBufferToValue ( buf ) ] ]
237245 } catch ( err ) {
238246 this . cfg . logger . error ( err )
239247 return [ ] // SKIP
240248 }
241- } )
242-
243- return stream
249+ } ,
250+ {
251+ concurrency : 16 ,
252+ } ,
253+ )
244254 }
245255}
0 commit comments