Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions kafka/conn.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,8 @@ def _dns_lookup(self):
return True

def _next_afi_sockaddr(self):
if self._socks5_proxy.use_remote_lookup():
return (socket.AF_UNSPEC, (self.host, self.port))
if not self._gai:
if not self._dns_lookup():
return
Expand Down Expand Up @@ -366,6 +368,9 @@ def connect_blocking(self, timeout=float('inf')):

def connect(self):
"""Attempt to connect and return ConnectionState"""
if self.config["socks5_proxy"] is not None and self._socks5_proxy is None:
self._socks5_proxy = Socks5Wrapper(self.config["socks5_proxy"], self.afi)

if self.state is ConnectionStates.DISCONNECTED and not self.blacked_out():
self.state = ConnectionStates.CONNECTING
self.last_attempt = time.time()
Expand All @@ -379,7 +384,6 @@ def connect(self):
self._sock_afi, self._sock_addr = next_lookup
try:
if self.config["socks5_proxy"] is not None:
self._socks5_proxy = Socks5Wrapper(self.config["socks5_proxy"], self.afi)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering whether I could avoid this change. But next_lookup = self._next_afi_sockaddr() on line 377 will fail then because it uses the initialized self._socks5_proxy to check if it should look up hostnames remotely.

Let me know if you prefer this to be handled differently than it's now.

self._sock = self._socks5_proxy.socket(self._sock_afi, socket.SOCK_STREAM)
else:
self._sock = socket.socket(self._sock_afi, socket.SOCK_STREAM)
Expand Down Expand Up @@ -862,7 +866,7 @@ def connection_delay(self):
large number to handle slow/stalled connections.
"""
if self.disconnected() or self.connecting():
if len(self._gai) > 0:
if len(self._gai) > 0 or (self._socks5_proxy is not None and self._socks5_proxy.use_remote_lookup()):
return 0
else:
time_waited = time.time() - self.last_attempt
Expand Down
38 changes: 28 additions & 10 deletions kafka/socks5_wrapper.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ def dns_lookup(cls, host, port, afi=socket.AF_UNSPEC):
log.warning("DNS lookup failed for proxy %s:%d, %r", host, port, ex)
return []

def use_remote_lookup(self):
return self._proxy_url.scheme == 'socks5h'

def socket(self, family, sock_type):
"""Open and record a socket.

Expand Down Expand Up @@ -187,7 +190,10 @@ def connect_ex(self, addr):
return errno.ECONNREFUSED

if self._state == ProxyConnectionStates.REQUEST_SUBMIT:
if self._target_afi == socket.AF_INET:
if self.use_remote_lookup():
addr_type = 3
addr_len = len(addr[0])
elif self._target_afi == socket.AF_INET:
addr_type = 1
addr_len = 4
elif self._target_afi == socket.AF_INET6:
Expand All @@ -199,15 +205,27 @@ def connect_ex(self, addr):
self._sock.close()
return errno.ECONNREFUSED

self._buffer_out = struct.pack(
"!bbbb{}sh".format(addr_len),
5, # version
1, # command: connect
0, # reserved
addr_type, # 1 for ipv4, 4 for ipv6 address
socket.inet_pton(self._target_afi, addr[0]), # either 4 or 16 bytes of actual address
addr[1], # port
)
if self.use_remote_lookup():
self._buffer_out = struct.pack(
"!bbbbb{}sh".format(addr_len),
5, # version
1, # command: connect
0, # reserved
addr_type, # 1 for ipv4, 4 for ipv6 address, 3 for domain name
addr_len,
addr[0].encode('ascii'), # host
addr[1], # port
)
else:
self._buffer_out = struct.pack(
"!bbbb{}sh".format(addr_len),
5, # version
1, # command: connect
0, # reserved
addr_type, # 1 for ipv4, 4 for ipv6 address, 3 for domain name
socket.inet_pton(self._target_afi, addr[0]), # either 4 or 16 bytes of actual address
addr[1], # port
)
self._state = ProxyConnectionStates.REQUESTING

if self._state == ProxyConnectionStates.REQUESTING:
Expand Down