This repository was archived by the owner on May 25, 2025. It is now read-only.
File tree Expand file tree Collapse file tree 2 files changed +31
-6
lines changed Expand file tree Collapse file tree 2 files changed +31
-6
lines changed Original file line number Diff line number Diff line change 1- import { Readable } from 'node:stream'
1+ import { Readable , Transform } from 'node:stream'
22import { promisify } from 'node:util'
33import {
44 BaseCommonDB ,
@@ -277,10 +277,21 @@ export class MysqlDB extends BaseCommonDB implements CommonDB {
277277
278278 if ( this . cfg . logSQL ) this . cfg . logger . log ( `stream: ${ sql } ` )
279279
280- // return this.streamSQL(sql, opt)
281- return ( this . pool ( ) . query ( sql ) . stream ( ) as ReadableTyped < ROW > ) . map ( row =>
282- _filterUndefinedValues ( row , true ) ,
283- )
280+ // todo: this is nice, but `mysql` package uses `readable-stream@2` which is not compatible with `node:stream` iterable helpers
281+ // return (this.pool().query(sql).stream() as ReadableTyped<ROW>).map(row =>
282+ // _filterUndefinedValues(row, true),
283+ // )
284+ return this . pool ( )
285+ . query ( sql )
286+ . stream ( )
287+ . pipe (
288+ new Transform ( {
289+ objectMode : true ,
290+ transform ( row : ROW , _encoding , cb ) {
291+ cb ( null , _filterUndefinedValues ( row , true ) )
292+ } ,
293+ } ) ,
294+ )
284295 }
285296
286297 // SAVE
Original file line number Diff line number Diff line change 1+ import { Transform } from 'node:stream'
12import {
23 CommonDBCreateOptions ,
34 CommonKeyValueDB ,
@@ -88,7 +89,20 @@ export class MySQLKeyValueDB implements CommonKeyValueDB {
8889 if ( limit ) sql += ` LIMIT ${ limit } `
8990 if ( this . cfg . logSQL ) this . db . cfg . logger . log ( `stream: ${ sql } ` )
9091
91- return ( this . db . pool ( ) . query ( sql ) . stream ( ) as ReadableTyped < ObjectWithId > ) . map ( row => row . id )
92+ // todo: this is nice, but `mysql` package uses `readable-stream@2` which is not compatible with `node:stream` iterable helpers
93+ // return (this.db.pool().query(sql).stream() as ReadableTyped<ObjectWithId>).map(row => row.id)
94+ return this . db
95+ . pool ( )
96+ . query ( sql )
97+ . stream ( )
98+ . pipe (
99+ new Transform ( {
100+ objectMode : true ,
101+ transform ( row : ObjectWithId , _encoding , cb ) {
102+ cb ( null , row . id )
103+ } ,
104+ } ) ,
105+ )
92106 }
93107
94108 streamValues ( table : string , limit ?: number ) : ReadableTyped < Buffer > {
You can’t perform that action at this time.
0 commit comments