|
16 | 16 | brotli = None |
17 | 17 |
|
18 | 18 |
|
19 | | -AVAILABLE_DECOMPRESSORS = {"gzip", "deflate"} |
| 19 | +def decompress_deflate(body): |
| 20 | + try: |
| 21 | + return zlib.decompress(body) |
| 22 | + except zlib.error: |
| 23 | + # Assume the response was already decompressed |
| 24 | + return body |
| 25 | + |
| 26 | + |
| 27 | +def decompress_gzip(body): |
| 28 | + # To (de-)compress gzip format, use wbits = zlib.MAX_WBITS | 16. |
| 29 | + try: |
| 30 | + return zlib.decompress(body, zlib.MAX_WBITS | 16) |
| 31 | + except zlib.error: |
| 32 | + # Assume the response was already decompressed |
| 33 | + return body |
| 34 | + |
| 35 | + |
| 36 | +AVAILABLE_DECOMPRESSORS = { |
| 37 | + "deflate": decompress_deflate, |
| 38 | + "gzip": decompress_gzip, |
| 39 | +} |
| 40 | + |
20 | 41 | if brotli is not None: |
21 | | - AVAILABLE_DECOMPRESSORS.add("br") |
| 42 | + |
| 43 | + def decompress_brotli(body): |
| 44 | + try: |
| 45 | + return brotli.decompress(body) |
| 46 | + except brotli.error: |
| 47 | + # Assume the response was already decompressed |
| 48 | + return body |
| 49 | + |
| 50 | + AVAILABLE_DECOMPRESSORS["br"] = decompress_brotli |
22 | 51 |
|
23 | 52 |
|
24 | 53 | def replace_headers(request, replacements): |
@@ -157,45 +186,23 @@ def decode_response(response): |
157 | 186 | 3. update content-length header to decompressed length |
158 | 187 | """ |
159 | 188 |
|
160 | | - def is_decompressable(headers): |
161 | | - encoding = headers.get("content-encoding", []) |
162 | | - return encoding and encoding[0] in AVAILABLE_DECOMPRESSORS |
163 | | - |
164 | | - def decompress_body(body, encoding): |
165 | | - """Returns decompressed body according to encoding using zlib. |
166 | | - to (de-)compress gzip format, use wbits = zlib.MAX_WBITS | 16 |
167 | | - """ |
168 | | - if not body: |
169 | | - return "" |
170 | | - if encoding == "gzip": |
171 | | - try: |
172 | | - return zlib.decompress(body, zlib.MAX_WBITS | 16) |
173 | | - except zlib.error: |
174 | | - return body # assumes that the data was already decompressed |
175 | | - elif encoding == 'deflate': |
176 | | - try: |
177 | | - return zlib.decompress(body) |
178 | | - except zlib.error: |
179 | | - return body # assumes that the data was already decompressed |
180 | | - else: # encoding == 'br' |
181 | | - try: |
182 | | - return brotli.decompress(body) |
183 | | - except brotli.error: |
184 | | - return body # assumes that the data was already decompressed |
185 | | - |
186 | | - |
187 | 189 | # Deepcopy here in case `headers` contain objects that could |
188 | 190 | # be mutated by a shallow copy and corrupt the real response. |
189 | 191 | response = copy.deepcopy(response) |
190 | 192 | headers = CaseInsensitiveDict(response["headers"]) |
191 | | - if is_decompressable(headers): |
192 | | - encoding = headers["content-encoding"][0] |
193 | | - headers["content-encoding"].remove(encoding) |
194 | | - if not headers["content-encoding"]: |
195 | | - del headers["content-encoding"] |
196 | | - |
197 | | - new_body = decompress_body(response["body"]["string"], encoding) |
198 | | - response["body"]["string"] = new_body |
199 | | - headers["content-length"] = [str(len(new_body))] |
200 | | - response["headers"] = dict(headers) |
| 193 | + content_encoding = headers.get("content-encoding") |
| 194 | + if not content_encoding: |
| 195 | + return response |
| 196 | + decompressor = AVAILABLE_DECOMPRESSORS.get(content_encoding[0]) |
| 197 | + if not decompressor: |
| 198 | + return response |
| 199 | + |
| 200 | + headers["content-encoding"].remove(content_encoding[0]) |
| 201 | + if not headers["content-encoding"]: |
| 202 | + del headers["content-encoding"] |
| 203 | + |
| 204 | + new_body = decompressor(response["body"]["string"]) |
| 205 | + response["body"]["string"] = new_body |
| 206 | + headers["content-length"] = [str(len(new_body))] |
| 207 | + response["headers"] = dict(headers) |
201 | 208 | return response |
0 commit comments