@@ -169,85 +169,6 @@ def merge(
169169 rsuffix = suffixes [1 ],
170170 )
171171
172- def join (self , other , how = "left" , lsuffix = "" , rsuffix = "" ):
173- """Join two datatframes
174-
175- *on* is not supported.
176- """
177- if how == "right" :
178- return other .join (other = self , how = "left" , lsuffix = rsuffix , rsuffix = lsuffix )
179-
180- same_names = set (self .columns ) & set (other .columns )
181- if same_names and not (lsuffix or rsuffix ):
182- raise ValueError (
183- "there are overlapping columns but "
184- "lsuffix and rsuffix are not defined"
185- )
186-
187- left , leftuniques = self ._align_divisions ()
188- right , rightuniques = other ._align_to_indices (leftuniques )
189-
190- leftparts = left .to_delayed ()
191- rightparts = right .to_delayed ()
192-
193- @delayed
194- def part_join (left , right , how ):
195- return left .join (
196- right , how = how , sort = True , lsuffix = lsuffix , rsuffix = rsuffix
197- )
198-
199- def inner_selector ():
200- pivot = 0
201- for i in range (len (leftparts )):
202- for j in range (pivot , len (rightparts )):
203- if leftuniques [i ] & rightuniques [j ]:
204- yield leftparts [i ], rightparts [j ]
205- pivot = j + 1
206- break
207-
208- def left_selector ():
209- pivot = 0
210- for i in range (len (leftparts )):
211- for j in range (pivot , len (rightparts )):
212- if leftuniques [i ] & rightuniques [j ]:
213- yield leftparts [i ], rightparts [j ]
214- pivot = j + 1
215- break
216- else :
217- yield leftparts [i ], None
218-
219- selector = {"left" : left_selector , "inner" : inner_selector }[how ]
220-
221- rhs_dtypes = [(k , other ._meta .dtypes [k ]) for k in other ._meta .columns ]
222-
223- @delayed
224- def fix_column (lhs ):
225- df = cudf .DataFrame ()
226- for k in lhs .columns :
227- df [k + lsuffix ] = lhs [k ]
228-
229- for k , dtype in rhs_dtypes :
230- data = np .zeros (len (lhs ), dtype = dtype )
231- mask_size = cudf .utils .utils .calc_chunk_size (
232- data .size , cudf .utils .utils .mask_bitsize
233- )
234- mask = np .zeros (mask_size , dtype = cudf .utils .utils .mask_dtype )
235- sr = cudf .Series .from_masked_array (
236- data = data , mask = mask , null_count = data .size
237- )
238-
239- df [k + rsuffix ] = sr .set_index (df .index )
240-
241- return df
242-
243- joinedparts = [
244- (part_join (lhs , rhs , how = how ) if rhs is not None else fix_column (lhs ))
245- for lhs , rhs in selector ()
246- ]
247-
248- meta = self ._meta .join (other ._meta , how = how , lsuffix = lsuffix , rsuffix = rsuffix )
249- return from_delayed (joinedparts , meta = meta )
250-
251172 def _align_divisions (self ):
252173 """Align so that the values do not split across partitions
253174 """
0 commit comments