From 7ac6c6e29099ccba4d50f5b842972dd7332d0e58 Mon Sep 17 00:00:00 2001 From: Andrew Zhang Date: Thu, 2 Mar 2023 15:25:13 -0500 Subject: [PATCH 01/30] Allow disabling thread wakeup in send_request_to_node (#2335) --- kafka/admin/client.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index fd4d66110..8eb7504a7 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -355,13 +355,14 @@ def _find_coordinator_ids(self, group_ids): } return groups_coordinators - def _send_request_to_node(self, node_id, request): + def _send_request_to_node(self, node_id, request, wakeup=True): """Send a Kafka protocol message to a specific broker. Returns a future that may be polled for status and results. :param node_id: The broker id to which to send the message. :param request: The message to send. + :param wakeup: Optional flag to disable thread-wakeup. :return: A future object that may be polled for status and results. :exception: The exception if the message could not be sent. """ @@ -369,7 +370,7 @@ def _send_request_to_node(self, node_id, request): # poll until the connection to broker is ready, otherwise send() # will fail with NodeNotReadyError self._client.poll() - return self._client.send(node_id, request) + return self._client.send(node_id, request, wakeup) def _send_request_to_controller(self, request): """Send a Kafka protocol message to the cluster controller. From f8a7e9b8b4b6db298a43d9fe5d427e6439d5824a Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Fri, 4 Aug 2023 17:57:27 -0400 Subject: [PATCH 02/30] Transition CI/CD to GitHub Workflows (#2378) * Create GH workflows to test code * Update tests for future Python versions --- .github/dependabot.yml | 7 + .github/workflows/codeql-analysis.yml | 67 ++++++++++ .github/workflows/python-package.yml | 179 ++++++++++++++++++++++++++ Makefile | 6 +- README.rst | 10 +- requirements-dev.txt | 34 ++--- setup.py | 9 +- test/test_assignors.py | 2 +- tox.ini | 21 +-- travis_java_install.sh | 0 10 files changed, 303 insertions(+), 32 deletions(-) create mode 100644 .github/dependabot.yml create mode 100644 .github/workflows/codeql-analysis.yml create mode 100644 .github/workflows/python-package.yml mode change 100644 => 100755 travis_java_install.sh diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 000000000..2c7d17083 --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,7 @@ +version: 2 +updates: + # Maintain dependencies for GitHub Actions + - package-ecosystem: "github-actions" + directory: "/" + schedule: + interval: "daily" diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml new file mode 100644 index 000000000..43427fab9 --- /dev/null +++ b/.github/workflows/codeql-analysis.yml @@ -0,0 +1,67 @@ +--- +# For most projects, this workflow file will not need changing; you simply need +# to commit it to your repository. +# +# You may wish to alter this file to override the set of languages analyzed, +# or to provide custom queries or build logic. +# +# ******** NOTE ******** +# We have attempted to detect the languages in your repository. Please check +# the `language` matrix defined below to confirm you have the correct set of +# supported CodeQL languages. +# +name: CodeQL +on: + push: + branches: [master] + pull_request: + # The branches below must be a subset of the branches above + branches: [master] + schedule: + - cron: 19 10 * * 6 +jobs: + analyze: + name: Analyze + runs-on: ubuntu-latest + permissions: + actions: read + contents: read + security-events: write + strategy: + fail-fast: false + matrix: + language: [python] + # CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python' ] + # Learn more: + # https://docs.github.com/en/free-pro-team@latest/github/finding-security-vulnerabilities-and-errors-in-your-code/configuring-code-scanning#changing-the-languages-that-are-analyzed + steps: + - name: Checkout repository + uses: actions/checkout@v3 + + # Initializes the CodeQL tools for scanning. + - name: Initialize CodeQL + uses: github/codeql-action/init@v2 + with: + languages: ${{ matrix.language }} + # If you wish to specify custom queries, you can do so here or in a config file. + # By default, queries listed here will override any specified in a config file. + # Prefix the list here with "+" to use these queries and those in the config file. + # queries: ./path/to/local/query, your-org/your-repo/queries@main + + # Autobuild attempts to build any compiled languages (C/C++, C#, or Java). + # If this step fails, then you should remove it and run the build manually (see below) + - name: Autobuild + uses: github/codeql-action/autobuild@v2 + + # â„šī¸ Command-line programs to run using the OS shell. + # 📚 https://git.io/JvXDl + + # âœī¸ If the Autobuild fails above, remove it and uncomment the following three lines + # and modify them (or add more) to build your code if your project + # uses a compiled language + + #- run: | + # make bootstrap + # make release + - name: Perform CodeQL Analysis + uses: github/codeql-action/analyze@v2 diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml new file mode 100644 index 000000000..9ef4846bd --- /dev/null +++ b/.github/workflows/python-package.yml @@ -0,0 +1,179 @@ +name: CI/CD + +on: + push: + branches: ["master"] + pull_request: + branches: ["master"] + release: + types: [created] + branches: + - 'master' + workflow_dispatch: + +env: + FORCE_COLOR: "1" # Make tools pretty. + PIP_DISABLE_PIP_VERSION_CHECK: "1" + PIP_NO_PYTHON_VERSION_WARNING: "1" + PYTHON_LATEST: "3.11" + KAFKA_LATEST: "2.6.0" + + # For re-actors/checkout-python-sdist + sdist-artifact: python-package-distributions + +jobs: + + build-sdist: + name: đŸ“Ļ Build the source distribution + runs-on: ubuntu-latest + steps: + - name: Checkout project + uses: actions/checkout@v3 + with: + fetch-depth: 0 + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: ${{ env.PYTHON_LATEST }} + cache: pip + - run: python -m pip install build + name: Install core libraries for build and install + - name: Build artifacts + run: python -m build + - name: Upload built artifacts for testing + uses: actions/upload-artifact@v3 + with: + name: ${{ env.sdist-artifact }} + # NOTE: Exact expected file names are specified here + # NOTE: as a safety measure — if anything weird ends + # NOTE: up being in this dir or not all dists will be + # NOTE: produced, this will fail the workflow. + path: dist/${{ env.sdist-name }} + retention-days: 15 + + test-python: + name: Tests on ${{ matrix.python-version }} + needs: + - build-sdist + runs-on: ubuntu-latest + continue-on-error: ${{ matrix.experimental }} + strategy: + fail-fast: false + matrix: + python-version: + - "3.8" + - "3.9" + - "3.10" + - "3.11" + experimental: [ false ] + include: + - python-version: "pypy3.9" + experimental: true +# - python-version: "~3.12.0-0" +# experimental: true + steps: + - name: Checkout the source code + uses: actions/checkout@v3 + with: + fetch-depth: 0 + - name: Setup java + uses: actions/setup-java@v3 + with: + distribution: temurin + java-version: 11 + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + cache: pip + cache-dependency-path: | + requirements-dev.txt + - name: Check Java installation + run: source travis_java_install.sh + - name: Pull Kafka releases + run: ./build_integration.sh + env: + PLATFORM: ${{ matrix.platform }} + KAFKA_VERSION: ${{ env.KAFKA_LATEST }} + # TODO: Cache releases to expedite testing + - name: Install dependencies + run: | + sudo apt install -y libsnappy-dev libzstd-dev + python -m pip install --upgrade pip + python -m pip install tox tox-gh-actions + pip install . + pip install -r requirements-dev.txt + - name: Test with tox + run: tox + env: + PLATFORM: ${{ matrix.platform }} + KAFKA_VERSION: ${{ env.KAFKA_LATEST }} + + test-kafka: + name: Tests for Kafka ${{ matrix.kafka-version }} + needs: + - build-sdist + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + kafka-version: + - "0.8.2.2" + - "0.9.0.1" + - "0.10.2.2" + - "0.11.0.2" + - "0.11.0.3" + - "1.1.1" + - "2.4.0" + - "2.5.0" + - "2.6.0" + steps: + - name: Checkout the source code + uses: actions/checkout@v3 + with: + fetch-depth: 0 + - name: Setup java + uses: actions/setup-java@v3 + with: + distribution: temurin + java-version: 8 + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: ${{ env.PYTHON_LATEST }} + cache: pip + cache-dependency-path: | + requirements-dev.txt + - name: Pull Kafka releases + run: ./build_integration.sh + env: + # This is fast enough as long as you pull only one release at a time, + # no need to worry about caching + PLATFORM: ${{ matrix.platform }} + KAFKA_VERSION: ${{ matrix.kafka-version }} + - name: Install dependencies + run: | + sudo apt install -y libsnappy-dev libzstd-dev + python -m pip install --upgrade pip + python -m pip install tox tox-gh-actions + pip install . + pip install -r requirements-dev.txt + - name: Test with tox + run: tox + env: + PLATFORM: ${{ matrix.platform }} + KAFKA_VERSION: ${{ matrix.kafka-version }} + + check: # This job does nothing and is only used for the branch protection + name: ✅ Ensure the required checks passing + if: always() + needs: + - build-sdist + - test-python + - test-kafka + runs-on: ubuntu-latest + steps: + - name: Decide whether the needed jobs succeeded or failed + uses: re-actors/alls-green@release/v1 + with: + jobs: ${{ toJSON(needs) }} diff --git a/Makefile b/Makefile index b4dcbffc9..fc8fa5b21 100644 --- a/Makefile +++ b/Makefile @@ -20,14 +20,14 @@ test37: build-integration test27: build-integration KAFKA_VERSION=$(KAFKA_VERSION) SCALA_VERSION=$(SCALA_VERSION) tox -e py27 -- $(FLAGS) -# Test using py.test directly if you want to use local python. Useful for other +# Test using pytest directly if you want to use local python. Useful for other # platforms that require manual installation for C libraries, ie. Windows. test-local: build-integration - KAFKA_VERSION=$(KAFKA_VERSION) SCALA_VERSION=$(SCALA_VERSION) py.test \ + KAFKA_VERSION=$(KAFKA_VERSION) SCALA_VERSION=$(SCALA_VERSION) pytest \ --pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF $(FLAGS) kafka test cov-local: build-integration - KAFKA_VERSION=$(KAFKA_VERSION) SCALA_VERSION=$(SCALA_VERSION) py.test \ + KAFKA_VERSION=$(KAFKA_VERSION) SCALA_VERSION=$(SCALA_VERSION) pytest \ --pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka \ --cov-config=.covrc --cov-report html $(FLAGS) kafka test @echo "open file://`pwd`/htmlcov/index.html" diff --git a/README.rst b/README.rst index 5f834442c..78a92a884 100644 --- a/README.rst +++ b/README.rst @@ -7,10 +7,16 @@ Kafka Python client :target: https://pypi.python.org/pypi/kafka-python .. image:: https://coveralls.io/repos/dpkp/kafka-python/badge.svg?branch=master&service=github :target: https://coveralls.io/github/dpkp/kafka-python?branch=master -.. image:: https://travis-ci.org/dpkp/kafka-python.svg?branch=master - :target: https://travis-ci.org/dpkp/kafka-python .. image:: https://img.shields.io/badge/license-Apache%202-blue.svg :target: https://github.com/dpkp/kafka-python/blob/master/LICENSE +.. image:: https://img.shields.io/pypi/dw/kafka-python.svg + :target: https://pypistats.org/packages/kafka-python +.. image:: https://img.shields.io/pypi/v/kafka-python.svg + :target: https://pypi.org/project/kafka-python +.. image:: https://img.shields.io/pypi/implementation/kafka-python + :target: https://github.com/dpkp/kafka-python/blob/master/setup.py + + Python client for the Apache Kafka distributed stream processing system. kafka-python is designed to function much like the official java client, with a diff --git a/requirements-dev.txt b/requirements-dev.txt index 00ad68c22..1fa933da2 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,17 +1,17 @@ -coveralls==2.1.2 -crc32c==2.1 -docker-py==1.10.6 -flake8==3.8.3 -lz4==3.1.0 -mock==4.0.2 -py==1.9.0 -pylint==2.6.0 -pytest==6.0.2 -pytest-cov==2.10.1 -pytest-mock==3.3.1 -pytest-pylint==0.17.0 -python-snappy==0.5.4 -Sphinx==3.2.1 -sphinx-rtd-theme==0.5.0 -tox==3.20.0 -xxhash==2.0.0 +coveralls +crc32c +docker-py +flake8 +lz4 +mock +py +pylint +pytest +pytest-cov +pytest-mock +pytest-pylint +python-snappy +Sphinx +sphinx-rtd-theme +tox +xxhash diff --git a/setup.py b/setup.py index fe8a594f3..2b5ca380f 100644 --- a/setup.py +++ b/setup.py @@ -50,7 +50,10 @@ def run(cls): license="Apache License 2.0", description="Pure Python client for Apache Kafka", long_description=README, - keywords="apache kafka", + keywords=[ + "apache kafka", + "kafka", + ], classifiers=[ "Development Status :: 5 - Production/Stable", "Intended Audience :: Developers", @@ -64,6 +67,10 @@ def run(cls): "Programming Language :: Python :: 3.6", "Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: Implementation :: CPython", "Programming Language :: Python :: Implementation :: PyPy", "Topic :: Software Development :: Libraries :: Python Modules", ] diff --git a/test/test_assignors.py b/test/test_assignors.py index 67e91e131..858ef426d 100644 --- a/test/test_assignors.py +++ b/test/test_assignors.py @@ -655,7 +655,7 @@ def test_conflicting_previous_assignments(mocker): 'execution_number,n_topics,n_consumers', [(i, randint(10, 20), randint(20, 40)) for i in range(100)] ) def test_reassignment_with_random_subscriptions_and_changes(mocker, execution_number, n_topics, n_consumers): - all_topics = set(['t{}'.format(i) for i in range(1, n_topics + 1)]) + all_topics = sorted(['t{}'.format(i) for i in range(1, n_topics + 1)]) partitions = dict([(t, set(range(1, i + 1))) for i, t in enumerate(all_topics)]) cluster = create_cluster(mocker, topics=all_topics, topic_partitions_lambda=lambda t: partitions[t]) diff --git a/tox.ini b/tox.ini index 10e9911dc..7a38ee4a8 100644 --- a/tox.ini +++ b/tox.ini @@ -1,17 +1,25 @@ [tox] -envlist = py{26,27,34,35,36,37,38,py}, docs +envlist = py{38,39,310,311,py}, docs [pytest] testpaths = kafka test addopts = --durations=10 log_format = %(created)f %(filename)-23s %(threadName)s %(message)s +[gh-actions] +python = + 3.8: py38 + 3.9: py39 + 3.10: py310 + 3.11: py311 + pypy-3.9: pypy + [testenv] deps = pytest pytest-cov - py{27,34,35,36,37,38,py}: pylint - py{27,34,35,36,37,38,py}: pytest-pylint + pylint + pytest-pylint pytest-mock mock python-snappy @@ -20,19 +28,16 @@ deps = xxhash crc32c commands = - py.test {posargs:--pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka --cov-config=.covrc} + pytest {posargs:--pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka --cov-config=.covrc} setenv = CRC32C_SW_MODE = auto PROJECT_ROOT = {toxinidir} passenv = KAFKA_VERSION -[testenv:py26] -# pylint doesn't support python2.6 -commands = py.test {posargs:--cov=kafka --cov-config=.covrc} [testenv:pypy] # pylint is super slow on pypy... -commands = py.test {posargs:--cov=kafka --cov-config=.covrc} +commands = pytest {posargs:--cov=kafka --cov-config=.covrc} [testenv:docs] deps = diff --git a/travis_java_install.sh b/travis_java_install.sh old mode 100644 new mode 100755 From 94901bb1b3a7322c778d60edb90156c9cc27e1f9 Mon Sep 17 00:00:00 2001 From: Majeed Dourandeesh Date: Sat, 5 Aug 2023 00:58:20 +0300 Subject: [PATCH 03/30] Update usage.rst (#2334) add imort json and msgpack into consumer and producer --- docs/usage.rst | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/usage.rst b/docs/usage.rst index 1cf1aa414..fb58509a7 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -8,6 +8,8 @@ KafkaConsumer .. code:: python from kafka import KafkaConsumer + import json + import msgpack # To consume latest messages and auto-commit offsets consumer = KafkaConsumer('my-topic', @@ -57,6 +59,8 @@ KafkaProducer from kafka import KafkaProducer from kafka.errors import KafkaError + import msgpack + import json producer = KafkaProducer(bootstrap_servers=['broker1:1234']) From 46473bacafd759bc6cd072876327c6a7a5415007 Mon Sep 17 00:00:00 2001 From: Tim Gates Date: Sat, 5 Aug 2023 07:58:37 +1000 Subject: [PATCH 04/30] docs: Fix a few typos (#2319) * docs: Fix a few typos There are small typos in: - kafka/codec.py - kafka/coordinator/base.py - kafka/record/abc.py - kafka/record/legacy_records.py Fixes: - Should read `timestamp` rather than `typestamp`. - Should read `minimum` rather than `miniumum`. - Should read `encapsulated` rather than `incapsulates`. - Should read `callback` rather than `callbak`. * Update abc.py --- kafka/codec.py | 2 +- kafka/coordinator/base.py | 2 +- kafka/record/abc.py | 2 +- kafka/record/legacy_records.py | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/kafka/codec.py b/kafka/codec.py index 917400e74..c740a181c 100644 --- a/kafka/codec.py +++ b/kafka/codec.py @@ -187,7 +187,7 @@ def _detect_xerial_stream(payload): The version is the version of this format as written by xerial, in the wild this is currently 1 as such we only support v1. - Compat is there to claim the miniumum supported version that + Compat is there to claim the minimum supported version that can read a xerial block stream, presently in the wild this is 1. """ diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 5e41309df..e71984108 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -952,7 +952,7 @@ def _run_once(self): # disable here to prevent propagating an exception to this # heartbeat thread # must get client._lock, or maybe deadlock at heartbeat - # failure callbak in consumer poll + # failure callback in consumer poll self.coordinator._client.poll(timeout_ms=0) with self.coordinator._lock: diff --git a/kafka/record/abc.py b/kafka/record/abc.py index d5c172aaa..8509e23e5 100644 --- a/kafka/record/abc.py +++ b/kafka/record/abc.py @@ -85,7 +85,7 @@ def build(self): class ABCRecordBatch(object): - """ For v2 incapsulates a RecordBatch, for v0/v1 a single (maybe + """ For v2 encapsulates a RecordBatch, for v0/v1 a single (maybe compressed) message. """ __metaclass__ = abc.ABCMeta diff --git a/kafka/record/legacy_records.py b/kafka/record/legacy_records.py index e2ee5490c..2f8523fcb 100644 --- a/kafka/record/legacy_records.py +++ b/kafka/record/legacy_records.py @@ -263,7 +263,7 @@ def __iter__(self): # When magic value is greater than 0, the timestamp # of a compressed message depends on the - # typestamp type of the wrapper message: + # timestamp type of the wrapper message: if timestamp_type == self.LOG_APPEND_TIME: timestamp = self._timestamp From b7a9be6c48f82a957dce0a12e4070aa612eb82f9 Mon Sep 17 00:00:00 2001 From: Atheer Abdullatif <42766508+athlatif@users.noreply.github.com> Date: Sat, 5 Aug 2023 00:59:06 +0300 Subject: [PATCH 05/30] Update usage.rst (#2308) Adding [ClusterMetadata] and [KafkaAdminClient] --- docs/usage.rst | 49 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/docs/usage.rst b/docs/usage.rst index fb58509a7..047bbad77 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -112,3 +112,52 @@ KafkaProducer # configure multiple retries producer = KafkaProducer(retries=5) + + +ClusterMetadata +============= +.. code:: python + + from kafka.cluster import ClusterMetadata + + clusterMetadata = ClusterMetadata(bootstrap_servers=['broker1:1234']) + + # get all brokers metadata + print(clusterMetadata.brokers()) + + # get specific broker metadata + print(clusterMetadata.broker_metadata('bootstrap-0')) + + # get all partitions of a topic + print(clusterMetadata.partitions_for_topic("topic")) + + # list topics + print(clusterMetadata.topics()) + + +KafkaAdminClient +============= +.. code:: python + from kafka import KafkaAdminClient + from kafka.admin import NewTopic + + admin = KafkaAdminClient(bootstrap_servers=['broker1:1234']) + + # create a new topic + topics_list = [] + topics_list.append(NewTopic(name="testtopic", num_partitions=1, replication_factor=1)) + admin.create_topics(topics_list,timeout_ms=None, validate_only=False) + + # delete a topic + admin.delete_topics(['testtopic']) + + # list consumer groups + print(admin.list_consumer_groups()) + + # get consumer group details + print(admin.describe_consumer_groups('cft-plt-qa.connect')) + + # get consumer group offset + print(admin.list_consumer_group_offsets('cft-plt-qa.connect')) + + From 57d833820bf20c84618954108767da08ea22f853 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Fri, 4 Aug 2023 18:51:28 -0400 Subject: [PATCH 06/30] Enable testing for Python 3.12 (#2379) I don't expect this to work yet since I know 3.12 is in an incomplete state, but here goes nothing. --- .github/workflows/python-package.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 9ef4846bd..37875fb9f 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -69,8 +69,8 @@ jobs: include: - python-version: "pypy3.9" experimental: true -# - python-version: "~3.12.0-0" -# experimental: true + - python-version: "~3.12.0-0" + experimental: true steps: - name: Checkout the source code uses: actions/checkout@v3 From 7e87a014a9e47a7da3af73c76b31e802e208e7a3 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Sun, 6 Aug 2023 15:23:04 -0400 Subject: [PATCH 07/30] Add py312 to tox.ini (#2382) --- tox.ini | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index 7a38ee4a8..d9b1e36d4 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist = py{38,39,310,311,py}, docs +envlist = py{38,39,310,311,312,py}, docs [pytest] testpaths = kafka test @@ -12,6 +12,7 @@ python = 3.9: py39 3.10: py310 3.11: py311 + 3.12: py312 pypy-3.9: pypy [testenv] From f98498411caabcf60894c536ce8fc9e83bd43241 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Sun, 6 Aug 2023 18:28:52 -0400 Subject: [PATCH 08/30] Update fixtures.py to use "127.0.0.1" for local ports (#2384) --- test/fixtures.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/fixtures.py b/test/fixtures.py index 26fb5e89d..d9c072b86 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -25,7 +25,7 @@ def get_open_port(): sock = socket.socket() - sock.bind(("", 0)) + sock.bind(("127.0.0.1", 0)) port = sock.getsockname()[1] sock.close() return port From d9201085f021aaa376b6ef429f9afc2cc4d29439 Mon Sep 17 00:00:00 2001 From: Felix B Date: Tue, 8 Aug 2023 15:33:53 +0200 Subject: [PATCH 09/30] use isinstance in builtin crc32 (#2329) --- kafka/record/_crc32c.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/record/_crc32c.py b/kafka/record/_crc32c.py index ecff48f5e..9b51ad8a9 100644 --- a/kafka/record/_crc32c.py +++ b/kafka/record/_crc32c.py @@ -105,7 +105,7 @@ def crc_update(crc, data): Returns: 32-bit updated CRC-32C as long. """ - if type(data) != array.array or data.itemsize != 1: + if not isinstance(data, array.array) or data.itemsize != 1: buf = array.array("B", data) else: buf = data From a33fcf4d22bdf34e9660e394a7a6f84225411325 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Wed, 9 Aug 2023 12:44:53 -0400 Subject: [PATCH 10/30] Update setup.py to install zstandard instead of python-zstandard (#2387) Closes https://github.com/dpkp/kafka-python/issues/2350, since it's a valid security concern. --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 2b5ca380f..483d7ab60 100644 --- a/setup.py +++ b/setup.py @@ -40,7 +40,7 @@ def run(cls): "crc32c": ["crc32c"], "lz4": ["lz4"], "snappy": ["python-snappy"], - "zstd": ["python-zstandard"], + "zstd": ["zstandard"], }, cmdclass={"test": Tox}, packages=find_packages(exclude=['test']), From d894e9aac0f5154b62f5cd08cc769a1f955d3eb7 Mon Sep 17 00:00:00 2001 From: shifqu Date: Thu, 2 Nov 2023 04:58:38 +0100 Subject: [PATCH 11/30] build: update vendored six from 1.11.0 to 1.16.0 (#2398) In this commit, the del X is still commented out due to the fact that upstream https://github.com/benjaminp/six/pull/176 is not merged. --- kafka/vendor/six.py | 149 +++++++++++++++++++++++++++++++++++++------- 1 file changed, 128 insertions(+), 21 deletions(-) diff --git a/kafka/vendor/six.py b/kafka/vendor/six.py index 3621a0ab4..319821353 100644 --- a/kafka/vendor/six.py +++ b/kafka/vendor/six.py @@ -1,6 +1,6 @@ # pylint: skip-file -# Copyright (c) 2010-2017 Benjamin Peterson +# Copyright (c) 2010-2020 Benjamin Peterson # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal @@ -31,7 +31,7 @@ import types __author__ = "Benjamin Peterson " -__version__ = "1.11.0" +__version__ = "1.16.0" # Useful for very coarse version differentiation. @@ -77,6 +77,11 @@ def __len__(self): # https://github.com/dpkp/kafka-python/pull/979#discussion_r100403389 # del X +if PY34: + from importlib.util import spec_from_loader +else: + spec_from_loader = None + def _add_doc(func, doc): """Add documentation to a function.""" @@ -192,6 +197,11 @@ def find_module(self, fullname, path=None): return self return None + def find_spec(self, fullname, path, target=None): + if fullname in self.known_modules: + return spec_from_loader(fullname, self) + return None + def __get_module(self, fullname): try: return self.known_modules[fullname] @@ -229,6 +239,12 @@ def get_code(self, fullname): return None get_source = get_code # same as get_code + def create_module(self, spec): + return self.load_module(spec.name) + + def exec_module(self, module): + pass + _importer = _SixMetaPathImporter(__name__) @@ -253,7 +269,7 @@ class _MovedItems(_LazyModule): MovedAttribute("reduce", "__builtin__", "functools"), MovedAttribute("shlex_quote", "pipes", "shlex", "quote"), MovedAttribute("StringIO", "StringIO", "io"), - MovedAttribute("UserDict", "UserDict", "collections"), + MovedAttribute("UserDict", "UserDict", "collections", "IterableUserDict", "UserDict"), MovedAttribute("UserList", "UserList", "collections"), MovedAttribute("UserString", "UserString", "collections"), MovedAttribute("xrange", "__builtin__", "builtins", "xrange", "range"), @@ -261,9 +277,11 @@ class _MovedItems(_LazyModule): MovedAttribute("zip_longest", "itertools", "itertools", "izip_longest", "zip_longest"), MovedModule("builtins", "__builtin__"), MovedModule("configparser", "ConfigParser"), + MovedModule("collections_abc", "collections", "collections.abc" if sys.version_info >= (3, 3) else "collections"), MovedModule("copyreg", "copy_reg"), MovedModule("dbm_gnu", "gdbm", "dbm.gnu"), - MovedModule("_dummy_thread", "dummy_thread", "_dummy_thread"), + MovedModule("dbm_ndbm", "dbm", "dbm.ndbm"), + MovedModule("_dummy_thread", "dummy_thread", "_dummy_thread" if sys.version_info < (3, 9) else "_thread"), MovedModule("http_cookiejar", "cookielib", "http.cookiejar"), MovedModule("http_cookies", "Cookie", "http.cookies"), MovedModule("html_entities", "htmlentitydefs", "html.entities"), @@ -643,13 +661,16 @@ def u(s): import io StringIO = io.StringIO BytesIO = io.BytesIO + del io _assertCountEqual = "assertCountEqual" if sys.version_info[1] <= 1: _assertRaisesRegex = "assertRaisesRegexp" _assertRegex = "assertRegexpMatches" + _assertNotRegex = "assertNotRegexpMatches" else: _assertRaisesRegex = "assertRaisesRegex" _assertRegex = "assertRegex" + _assertNotRegex = "assertNotRegex" else: def b(s): return s @@ -671,6 +692,7 @@ def indexbytes(buf, i): _assertCountEqual = "assertItemsEqual" _assertRaisesRegex = "assertRaisesRegexp" _assertRegex = "assertRegexpMatches" + _assertNotRegex = "assertNotRegexpMatches" _add_doc(b, """Byte literal""") _add_doc(u, """Text literal""") @@ -687,6 +709,10 @@ def assertRegex(self, *args, **kwargs): return getattr(self, _assertRegex)(*args, **kwargs) +def assertNotRegex(self, *args, **kwargs): + return getattr(self, _assertNotRegex)(*args, **kwargs) + + if PY3: exec_ = getattr(moves.builtins, "exec") @@ -722,16 +748,7 @@ def exec_(_code_, _globs_=None, _locs_=None): """) -if sys.version_info[:2] == (3, 2): - exec_("""def raise_from(value, from_value): - try: - if from_value is None: - raise value - raise value from from_value - finally: - value = None -""") -elif sys.version_info[:2] > (3, 2): +if sys.version_info[:2] > (3,): exec_("""def raise_from(value, from_value): try: raise value from from_value @@ -811,13 +828,33 @@ def print_(*args, **kwargs): _add_doc(reraise, """Reraise an exception.""") if sys.version_info[0:2] < (3, 4): + # This does exactly the same what the :func:`py3:functools.update_wrapper` + # function does on Python versions after 3.2. It sets the ``__wrapped__`` + # attribute on ``wrapper`` object and it doesn't raise an error if any of + # the attributes mentioned in ``assigned`` and ``updated`` are missing on + # ``wrapped`` object. + def _update_wrapper(wrapper, wrapped, + assigned=functools.WRAPPER_ASSIGNMENTS, + updated=functools.WRAPPER_UPDATES): + for attr in assigned: + try: + value = getattr(wrapped, attr) + except AttributeError: + continue + else: + setattr(wrapper, attr, value) + for attr in updated: + getattr(wrapper, attr).update(getattr(wrapped, attr, {})) + wrapper.__wrapped__ = wrapped + return wrapper + _update_wrapper.__doc__ = functools.update_wrapper.__doc__ + def wraps(wrapped, assigned=functools.WRAPPER_ASSIGNMENTS, updated=functools.WRAPPER_UPDATES): - def wrapper(f): - f = functools.wraps(wrapped, assigned, updated)(f) - f.__wrapped__ = wrapped - return f - return wrapper + return functools.partial(_update_wrapper, wrapped=wrapped, + assigned=assigned, updated=updated) + wraps.__doc__ = functools.wraps.__doc__ + else: wraps = functools.wraps @@ -830,7 +867,15 @@ def with_metaclass(meta, *bases): class metaclass(type): def __new__(cls, name, this_bases, d): - return meta(name, bases, d) + if sys.version_info[:2] >= (3, 7): + # This version introduced PEP 560 that requires a bit + # of extra care (we mimic what is done by __build_class__). + resolved_bases = types.resolve_bases(bases) + if resolved_bases is not bases: + d['__orig_bases__'] = bases + else: + resolved_bases = bases + return meta(name, resolved_bases, d) @classmethod def __prepare__(cls, name, this_bases): @@ -850,13 +895,75 @@ def wrapper(cls): orig_vars.pop(slots_var) orig_vars.pop('__dict__', None) orig_vars.pop('__weakref__', None) + if hasattr(cls, '__qualname__'): + orig_vars['__qualname__'] = cls.__qualname__ return metaclass(cls.__name__, cls.__bases__, orig_vars) return wrapper +def ensure_binary(s, encoding='utf-8', errors='strict'): + """Coerce **s** to six.binary_type. + + For Python 2: + - `unicode` -> encoded to `str` + - `str` -> `str` + + For Python 3: + - `str` -> encoded to `bytes` + - `bytes` -> `bytes` + """ + if isinstance(s, binary_type): + return s + if isinstance(s, text_type): + return s.encode(encoding, errors) + raise TypeError("not expecting type '%s'" % type(s)) + + +def ensure_str(s, encoding='utf-8', errors='strict'): + """Coerce *s* to `str`. + + For Python 2: + - `unicode` -> encoded to `str` + - `str` -> `str` + + For Python 3: + - `str` -> `str` + - `bytes` -> decoded to `str` + """ + # Optimization: Fast return for the common case. + if type(s) is str: + return s + if PY2 and isinstance(s, text_type): + return s.encode(encoding, errors) + elif PY3 and isinstance(s, binary_type): + return s.decode(encoding, errors) + elif not isinstance(s, (text_type, binary_type)): + raise TypeError("not expecting type '%s'" % type(s)) + return s + + +def ensure_text(s, encoding='utf-8', errors='strict'): + """Coerce *s* to six.text_type. + + For Python 2: + - `unicode` -> `unicode` + - `str` -> `unicode` + + For Python 3: + - `str` -> `str` + - `bytes` -> decoded to `str` + """ + if isinstance(s, binary_type): + return s.decode(encoding, errors) + elif isinstance(s, text_type): + return s + else: + raise TypeError("not expecting type '%s'" % type(s)) + + def python_2_unicode_compatible(klass): """ - A decorator that defines __unicode__ and __str__ methods under Python 2. + A class decorator that defines __unicode__ and __str__ methods under Python 2. Under Python 3 it does nothing. To support Python 2 and 3 with a single code base, define a __str__ method From 779a23c81755b763a5fd90194d12b997889f9f8c Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 2 Nov 2023 00:04:55 -0400 Subject: [PATCH 12/30] Bump actions/checkout from 3 to 4 (#2392) Bumps [actions/checkout](https://github.com/actions/checkout) from 3 to 4. - [Release notes](https://github.com/actions/checkout/releases) - [Changelog](https://github.com/actions/checkout/blob/main/CHANGELOG.md) - [Commits](https://github.com/actions/checkout/compare/v3...v4) --- updated-dependencies: - dependency-name: actions/checkout dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- .github/workflows/codeql-analysis.yml | 2 +- .github/workflows/python-package.yml | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml index 43427fab9..0d5078b39 100644 --- a/.github/workflows/codeql-analysis.yml +++ b/.github/workflows/codeql-analysis.yml @@ -36,7 +36,7 @@ jobs: # https://docs.github.com/en/free-pro-team@latest/github/finding-security-vulnerabilities-and-errors-in-your-code/configuring-code-scanning#changing-the-languages-that-are-analyzed steps: - name: Checkout repository - uses: actions/checkout@v3 + uses: actions/checkout@v4 # Initializes the CodeQL tools for scanning. - name: Initialize CodeQL diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 37875fb9f..50ade7486 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -28,7 +28,7 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout project - uses: actions/checkout@v3 + uses: actions/checkout@v4 with: fetch-depth: 0 - name: Set up Python @@ -73,7 +73,7 @@ jobs: experimental: true steps: - name: Checkout the source code - uses: actions/checkout@v3 + uses: actions/checkout@v4 with: fetch-depth: 0 - name: Setup java @@ -129,7 +129,7 @@ jobs: - "2.6.0" steps: - name: Checkout the source code - uses: actions/checkout@v3 + uses: actions/checkout@v4 with: fetch-depth: 0 - name: Setup java From 4861bee15458effd30e69f9ad0b6373d6f8417e0 Mon Sep 17 00:00:00 2001 From: Hirotaka Wakabayashi Date: Fri, 3 Nov 2023 11:40:34 +0900 Subject: [PATCH 13/30] Uses assert_called_with instead of called_with (#2375) --- test/test_client_async.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/test_client_async.py b/test/test_client_async.py index 74da66a36..66b227aa9 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -220,12 +220,12 @@ def test_send(cli, conn): request = ProduceRequest[0](0, 0, []) assert request.expect_response() is False ret = cli.send(0, request) - assert conn.send.called_with(request) + conn.send.assert_called_with(request, blocking=False) assert isinstance(ret, Future) request = MetadataRequest[0]([]) cli.send(0, request) - assert conn.send.called_with(request) + conn.send.assert_called_with(request, blocking=False) def test_poll(mocker): From 0362b87ab47ac198b5348936e9c89f3c454e20f1 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Thu, 2 Nov 2023 23:02:08 -0400 Subject: [PATCH 14/30] Update python-package.yml to expect 3.12 tests to pass and extend experimental tests (#2406) --- .github/workflows/python-package.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 50ade7486..6f9ef58a1 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -65,11 +65,12 @@ jobs: - "3.9" - "3.10" - "3.11" + - "3.12" experimental: [ false ] include: - python-version: "pypy3.9" experimental: true - - python-version: "~3.12.0-0" + - python-version: "~3.13.0-0" experimental: true steps: - name: Checkout the source code From 0dbf74689bb51dd517b6b8c8035c2370f2b8dd3a Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Thu, 2 Nov 2023 23:02:45 -0400 Subject: [PATCH 15/30] Update setup.py to indicate 3.12 support --- setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.py b/setup.py index 483d7ab60..77043da04 100644 --- a/setup.py +++ b/setup.py @@ -70,6 +70,7 @@ def run(cls): "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", "Programming Language :: Python :: Implementation :: CPython", "Programming Language :: Python :: Implementation :: PyPy", "Topic :: Software Development :: Libraries :: Python Modules", From 38e8d045e33b894bad30f55c212f8ff497a5a513 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Fri, 3 Nov 2023 00:20:15 -0400 Subject: [PATCH 16/30] Update conn.py to catch OSError in case of failed import (#2407) Closes #2399. --- kafka/conn.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/conn.py b/kafka/conn.py index cac354875..1efb8a0a1 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -78,7 +78,7 @@ class SSLWantWriteError(Exception): try: import gssapi from gssapi.raw.misc import GSSError -except ImportError: +except (ImportError, OSError): #no gssapi available, will disable gssapi mechanism gssapi = None GSSError = None From a1d268a95f34ed9d1b42b2e5dfc36dab6fbbc1e5 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Fri, 3 Nov 2023 00:30:54 -0400 Subject: [PATCH 17/30] Update PYTHON_LATEST in python-package.yml to 3.12 --- .github/workflows/python-package.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 6f9ef58a1..5829d899a 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -15,7 +15,7 @@ env: FORCE_COLOR: "1" # Make tools pretty. PIP_DISABLE_PIP_VERSION_CHECK: "1" PIP_NO_PYTHON_VERSION_WARNING: "1" - PYTHON_LATEST: "3.11" + PYTHON_LATEST: "3.12" KAFKA_LATEST: "2.6.0" # For re-actors/checkout-python-sdist From 364397c1b32ab3b8440d315516f46edfcfb7efbb Mon Sep 17 00:00:00 2001 From: rootlulu <110612150+rootlulu@users.noreply.github.com> Date: Sat, 4 Nov 2023 10:11:52 +0800 Subject: [PATCH 18/30] [FIX] suitablt for the high vresion python. (#2394) * [FIX] suitablt for the high vresion python. it won't import Mapping from collections at python3.11. tested it worked from python3.6 to 3.11.2. * Update selectors34.py to have conditional importing of Mapping from collections --------- Co-authored-by: William Barnhart --- kafka/vendor/selectors34.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/kafka/vendor/selectors34.py b/kafka/vendor/selectors34.py index ebf5d515e..787490340 100644 --- a/kafka/vendor/selectors34.py +++ b/kafka/vendor/selectors34.py @@ -15,7 +15,11 @@ from __future__ import absolute_import from abc import ABCMeta, abstractmethod -from collections import namedtuple, Mapping +from collections import namedtuple +try: + from collections.abc import Mapping +except ImportError: + from collections import Mapping from errno import EINTR import math import select From 0864817de97549ad71e7bc2432c53108c5806cf1 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Mon, 6 Nov 2023 23:20:17 -0500 Subject: [PATCH 19/30] Update python-package.yml to publish to PyPi for every release (#2381) I know that the typical release is uploaded to PyPi manually, however I figure I'd draft a PR with these changes because having the option to start doing this is worthwhile. More info can be found on https://github.com/pypa/gh-action-pypi-publish. --- .github/workflows/python-package.yml | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 5829d899a..f60926c0e 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -178,3 +178,21 @@ jobs: uses: re-actors/alls-green@release/v1 with: jobs: ${{ toJSON(needs) }} + publish: + name: đŸ“Ļ Publish to PyPI + runs-on: ubuntu-latest + needs: [build-sdist] + permissions: + id-token: write + environment: pypi + if: github.event_name == 'release' && github.event.action == 'created' + steps: + - name: Download the sdist artifact + uses: actions/download-artifact@v3 + with: + name: artifact + path: dist + - name: Publish package to PyPI + uses: pypa/gh-action-pypi-publish@release/v1 + with: + password: ${{ secrets.PYPI_API_TOKEN }} From 43822d05749b308ae638e0485bfc24a91583411f Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 13 Dec 2023 20:32:50 -0500 Subject: [PATCH 20/30] Bump github/codeql-action from 2 to 3 (#2419) Bumps [github/codeql-action](https://github.com/github/codeql-action) from 2 to 3. - [Release notes](https://github.com/github/codeql-action/releases) - [Changelog](https://github.com/github/codeql-action/blob/main/CHANGELOG.md) - [Commits](https://github.com/github/codeql-action/compare/v2...v3) --- updated-dependencies: - dependency-name: github/codeql-action dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- .github/workflows/codeql-analysis.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml index 0d5078b39..4f6360b71 100644 --- a/.github/workflows/codeql-analysis.yml +++ b/.github/workflows/codeql-analysis.yml @@ -40,7 +40,7 @@ jobs: # Initializes the CodeQL tools for scanning. - name: Initialize CodeQL - uses: github/codeql-action/init@v2 + uses: github/codeql-action/init@v3 with: languages: ${{ matrix.language }} # If you wish to specify custom queries, you can do so here or in a config file. @@ -51,7 +51,7 @@ jobs: # Autobuild attempts to build any compiled languages (C/C++, C#, or Java). # If this step fails, then you should remove it and run the build manually (see below) - name: Autobuild - uses: github/codeql-action/autobuild@v2 + uses: github/codeql-action/autobuild@v3 # â„šī¸ Command-line programs to run using the OS shell. # 📚 https://git.io/JvXDl @@ -64,4 +64,4 @@ jobs: # make bootstrap # make release - name: Perform CodeQL Analysis - uses: github/codeql-action/analyze@v2 + uses: github/codeql-action/analyze@v3 From e9dfaf9c48d898ea3e24538cb3d189d479898bfe Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 13 Dec 2023 20:32:59 -0500 Subject: [PATCH 21/30] Bump actions/setup-python from 4 to 5 (#2418) Bumps [actions/setup-python](https://github.com/actions/setup-python) from 4 to 5. - [Release notes](https://github.com/actions/setup-python/releases) - [Commits](https://github.com/actions/setup-python/compare/v4...v5) --- updated-dependencies: - dependency-name: actions/setup-python dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- .github/workflows/python-package.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index f60926c0e..9e0c2007c 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -32,7 +32,7 @@ jobs: with: fetch-depth: 0 - name: Set up Python - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version: ${{ env.PYTHON_LATEST }} cache: pip @@ -83,7 +83,7 @@ jobs: distribution: temurin java-version: 11 - name: Set up Python - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} cache: pip @@ -139,7 +139,7 @@ jobs: distribution: temurin java-version: 8 - name: Set up Python - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version: ${{ env.PYTHON_LATEST }} cache: pip From b68f61d49556377bf111bebb82f8f2bd360cc6f7 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 13 Dec 2023 21:11:04 -0500 Subject: [PATCH 22/30] Bump actions/setup-java from 3 to 4 (#2417) Bumps [actions/setup-java](https://github.com/actions/setup-java) from 3 to 4. - [Release notes](https://github.com/actions/setup-java/releases) - [Commits](https://github.com/actions/setup-java/compare/v3...v4) --- updated-dependencies: - dependency-name: actions/setup-java dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- .github/workflows/python-package.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 9e0c2007c..59ad718cf 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -78,7 +78,7 @@ jobs: with: fetch-depth: 0 - name: Setup java - uses: actions/setup-java@v3 + uses: actions/setup-java@v4 with: distribution: temurin java-version: 11 @@ -134,7 +134,7 @@ jobs: with: fetch-depth: 0 - name: Setup java - uses: actions/setup-java@v3 + uses: actions/setup-java@v4 with: distribution: temurin java-version: 8 From ce7d853198b4207fa52d6bbf4ef0f5426ae102bf Mon Sep 17 00:00:00 2001 From: Orange Kao Date: Tue, 4 Aug 2020 01:42:47 +0000 Subject: [PATCH 23/30] Avoid 100% CPU usage while socket is closed (sleep) After stop/start kafka service, kafka-python may use 100% CPU caused by busy-retry while the socket was closed. This fix the issue by sleep 0.1 second if the fd is negative. --- kafka/client_async.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/kafka/client_async.py b/kafka/client_async.py index 58f22d4ec..f32cd3a36 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -637,6 +637,9 @@ def _poll(self, timeout): self._sensors.select_time.record((end_select - start_select) * 1000000000) for key, events in ready: + if key.fileobj.fileno() < 0: + time.sleep(0.1) + if key.fileobj is self._wake_r: self._clear_wake_fd() continue From 6efff5251b887306f46c1aeff8984d6292390378 Mon Sep 17 00:00:00 2001 From: Heikki Nousiainen Date: Tue, 29 Sep 2020 21:45:27 +0300 Subject: [PATCH 24/30] Support connecting through SOCKS5 proxies (#2169) Implement support for SOCKS5 proxies. Implement a new proxy wrapper that handles SOCKS5 connection, authentication and requesting connections to the actual Kafka broker endpoint. The proxy can be configured via a new keyword argument `socks5_proxy` to consumers, producers or admin client. The value is URL with optional username and password. E.g. `socks5://user:secret@proxy.example.com:10800` The implementation is done in state machine that emulates repeated calls to connect_ex. The rationale with this design is minimal changes to the actual BrokerConnection object. --- kafka/admin/client.py | 2 + kafka/client_async.py | 4 +- kafka/conn.py | 17 ++- kafka/consumer/group.py | 2 + kafka/producer/kafka.py | 2 + kafka/socks5_wrapper.py | 248 ++++++++++++++++++++++++++++++++++++++++ 6 files changed, 271 insertions(+), 4 deletions(-) create mode 100644 kafka/socks5_wrapper.py diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 8eb7504a7..8cfbfc535 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -147,6 +147,7 @@ class KafkaAdminClient(object): sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider instance. (See kafka.oauth.abstract). Default: None kafka_client (callable): Custom class / callable for creating KafkaClient instances + socks5_proxy (str): Socks5 proxy url. Default: None """ DEFAULT_CONFIG = { @@ -182,6 +183,7 @@ class KafkaAdminClient(object): 'sasl_kerberos_service_name': 'kafka', 'sasl_kerberos_domain_name': None, 'sasl_oauth_token_provider': None, + 'socks5_proxy': None, # metrics configs 'metric_reporters': [], diff --git a/kafka/client_async.py b/kafka/client_async.py index f32cd3a36..354a44fbc 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -154,6 +154,7 @@ class KafkaClient(object): sasl mechanism handshake. Default: one of bootstrap servers sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider instance. (See kafka.oauth.abstract). Default: None + socks5_proxy (str): Socks5 proxy URL. Default: None """ DEFAULT_CONFIG = { @@ -192,7 +193,8 @@ class KafkaClient(object): 'sasl_plain_password': None, 'sasl_kerberos_service_name': 'kafka', 'sasl_kerberos_domain_name': None, - 'sasl_oauth_token_provider': None + 'sasl_oauth_token_provider': None, + 'socks5_proxy': None, } def __init__(self, **configs): diff --git a/kafka/conn.py b/kafka/conn.py index 1efb8a0a1..df903f264 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -33,6 +33,7 @@ from kafka.protocol.parser import KafkaProtocol from kafka.protocol.types import Int32, Int8 from kafka.scram import ScramClient +from kafka.socks5_wrapper import Socks5Wrapper from kafka.version import __version__ @@ -191,6 +192,7 @@ class BrokerConnection(object): sasl mechanism handshake. Default: one of bootstrap servers sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider instance. (See kafka.oauth.abstract). Default: None + socks5_proxy (str): Socks5 proxy url. Default: None """ DEFAULT_CONFIG = { @@ -224,7 +226,8 @@ class BrokerConnection(object): 'sasl_plain_password': None, 'sasl_kerberos_service_name': 'kafka', 'sasl_kerberos_domain_name': None, - 'sasl_oauth_token_provider': None + 'sasl_oauth_token_provider': None, + 'socks5_proxy': None, } SECURITY_PROTOCOLS = ('PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL') SASL_MECHANISMS = ('PLAIN', 'GSSAPI', 'OAUTHBEARER', "SCRAM-SHA-256", "SCRAM-SHA-512") @@ -236,6 +239,7 @@ def __init__(self, host, port, afi, **configs): self._sock_afi = afi self._sock_addr = None self._api_versions = None + self._socks5_proxy = None self.config = copy.copy(self.DEFAULT_CONFIG) for key in self.config: @@ -368,7 +372,11 @@ def connect(self): log.debug('%s: creating new socket', self) assert self._sock is None self._sock_afi, self._sock_addr = next_lookup - self._sock = socket.socket(self._sock_afi, socket.SOCK_STREAM) + if self.config["socks5_proxy"] is not None: + self._socks5_proxy = Socks5Wrapper(self.config["socks5_proxy"], self.afi) + self._sock = self._socks5_proxy.socket(self._sock_afi, socket.SOCK_STREAM) + else: + self._sock = socket.socket(self._sock_afi, socket.SOCK_STREAM) for option in self.config['socket_options']: log.debug('%s: setting socket option %s', self, option) @@ -385,7 +393,10 @@ def connect(self): # to check connection status ret = None try: - ret = self._sock.connect_ex(self._sock_addr) + if self._socks5_proxy: + ret = self._socks5_proxy.connect_ex(self._sock_addr) + else: + ret = self._sock.connect_ex(self._sock_addr) except socket.error as err: ret = err.errno diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index a1d1dfa37..969969932 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -245,6 +245,7 @@ class KafkaConsumer(six.Iterator): sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider instance. (See kafka.oauth.abstract). Default: None kafka_client (callable): Custom class / callable for creating KafkaClient instances + socks5_proxy (str): Socks5 proxy URL. Default: None Note: Configuration parameters are described in more detail at @@ -308,6 +309,7 @@ class KafkaConsumer(six.Iterator): 'sasl_oauth_token_provider': None, 'legacy_iterator': False, # enable to revert to < 1.4.7 iterator 'kafka_client': KafkaClient, + 'socks5_proxy': None, } DEFAULT_SESSION_TIMEOUT_MS_0_9 = 30000 diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index dd1cc508c..431642776 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -281,6 +281,7 @@ class KafkaProducer(object): sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider instance. (See kafka.oauth.abstract). Default: None kafka_client (callable): Custom class / callable for creating KafkaClient instances + socks5_proxy (str): Socks5 proxy URL. Default: None Note: Configuration parameters are described in more detail at @@ -335,6 +336,7 @@ class KafkaProducer(object): 'sasl_kerberos_domain_name': None, 'sasl_oauth_token_provider': None, 'kafka_client': KafkaClient, + 'socks5_proxy': None, } _COMPRESSORS = { diff --git a/kafka/socks5_wrapper.py b/kafka/socks5_wrapper.py new file mode 100644 index 000000000..18bea7c8d --- /dev/null +++ b/kafka/socks5_wrapper.py @@ -0,0 +1,248 @@ +try: + from urllib.parse import urlparse +except ImportError: + from urlparse import urlparse + +import errno +import logging +import random +import socket +import struct + +log = logging.getLogger(__name__) + + +class ProxyConnectionStates: + DISCONNECTED = '' + CONNECTING = '' + NEGOTIATE_PROPOSE = '' + NEGOTIATING = '' + AUTHENTICATING = '' + REQUEST_SUBMIT = '' + REQUESTING = '' + READ_ADDRESS = '' + COMPLETE = '' + + +class Socks5Wrapper: + """Socks5 proxy wrapper + + Manages connection through socks5 proxy with support for username/password + authentication. + """ + + def __init__(self, proxy_url, afi): + self._buffer_in = b'' + self._buffer_out = b'' + self._proxy_url = urlparse(proxy_url) + self._sock = None + self._state = ProxyConnectionStates.DISCONNECTED + self._target_afi = socket.AF_UNSPEC + + proxy_addrs = self.dns_lookup(self._proxy_url.hostname, self._proxy_url.port, afi) + # TODO raise error on lookup failure + self._proxy_addr = random.choice(proxy_addrs) + + @classmethod + def is_inet_4_or_6(cls, gai): + """Given a getaddrinfo struct, return True iff ipv4 or ipv6""" + return gai[0] in (socket.AF_INET, socket.AF_INET6) + + @classmethod + def dns_lookup(cls, host, port, afi=socket.AF_UNSPEC): + """Returns a list of getaddrinfo structs, optionally filtered to an afi (ipv4 / ipv6)""" + # XXX: all DNS functions in Python are blocking. If we really + # want to be non-blocking here, we need to use a 3rd-party + # library like python-adns, or move resolution onto its + # own thread. This will be subject to the default libc + # name resolution timeout (5s on most Linux boxes) + try: + return list(filter(cls.is_inet_4_or_6, + socket.getaddrinfo(host, port, afi, + socket.SOCK_STREAM))) + except socket.gaierror as ex: + log.warning("DNS lookup failed for proxy %s:%d, %r", host, port, ex) + return [] + + def socket(self, family, sock_type): + """Open and record a socket. + + Returns the actual underlying socket + object to ensure e.g. selects and ssl wrapping works as expected. + """ + self._target_afi = family # Store the address family of the target + afi, _, _, _, _ = self._proxy_addr + self._sock = socket.socket(afi, sock_type) + return self._sock + + def _flush_buf(self): + """Send out all data that is stored in the outgoing buffer. + + It is expected that the caller handles error handling, including non-blocking + as well as connection failure exceptions. + """ + while self._buffer_out: + sent_bytes = self._sock.send(self._buffer_out) + self._buffer_out = self._buffer_out[sent_bytes:] + + def _peek_buf(self, datalen): + """Ensure local inbound buffer has enough data, and return that data without + consuming the local buffer + + It's expected that the caller handles e.g. blocking exceptions""" + while True: + bytes_remaining = datalen - len(self._buffer_in) + if bytes_remaining <= 0: + break + data = self._sock.recv(bytes_remaining) + if not data: + break + self._buffer_in = self._buffer_in + data + + return self._buffer_in[:datalen] + + def _read_buf(self, datalen): + """Read and consume bytes from socket connection + + It's expected that the caller handles e.g. blocking exceptions""" + buf = self._peek_buf(datalen) + if buf: + self._buffer_in = self._buffer_in[len(buf):] + return buf + + def connect_ex(self, addr): + """Runs a state machine through connection to authentication to + proxy connection request. + + The somewhat strange setup is to facilitate non-intrusive use from + BrokerConnection state machine. + + This function is called with a socket in non-blocking mode. Both + send and receive calls can return in EWOULDBLOCK/EAGAIN which we + specifically avoid handling here. These are handled in main + BrokerConnection connection loop, which then would retry calls + to this function.""" + + if self._state == ProxyConnectionStates.DISCONNECTED: + self._state = ProxyConnectionStates.CONNECTING + + if self._state == ProxyConnectionStates.CONNECTING: + _, _, _, _, sockaddr = self._proxy_addr + ret = self._sock.connect_ex(sockaddr) + if not ret or ret == errno.EISCONN: + self._state = ProxyConnectionStates.NEGOTIATE_PROPOSE + else: + return ret + + if self._state == ProxyConnectionStates.NEGOTIATE_PROPOSE: + if self._proxy_url.username and self._proxy_url.password: + # Propose username/password + self._buffer_out = b"\x05\x01\x02" + else: + # Propose no auth + self._buffer_out = b"\x05\x01\x00" + self._state = ProxyConnectionStates.NEGOTIATING + + if self._state == ProxyConnectionStates.NEGOTIATING: + self._flush_buf() + buf = self._read_buf(2) + if buf[0:1] != b"\x05": + log.error("Unrecognized SOCKS version") + self._state = ProxyConnectionStates.DISCONNECTED + self._sock.close() + return errno.ECONNREFUSED + + if buf[1:2] == b"\x00": + # No authentication required + self._state = ProxyConnectionStates.REQUEST_SUBMIT + elif buf[1:2] == b"\x02": + # Username/password authentication selected + userlen = len(self._proxy_url.username) + passlen = len(self._proxy_url.password) + self._buffer_out = struct.pack( + "!bb{}sb{}s".format(userlen, passlen), + 1, # version + userlen, + self._proxy_url.username.encode(), + passlen, + self._proxy_url.password.encode(), + ) + self._state = ProxyConnectionStates.AUTHENTICATING + else: + log.error("Unrecognized SOCKS authentication method") + self._state = ProxyConnectionStates.DISCONNECTED + self._sock.close() + return errno.ECONNREFUSED + + if self._state == ProxyConnectionStates.AUTHENTICATING: + self._flush_buf() + buf = self._read_buf(2) + if buf == b"\x01\x00": + # Authentication succesful + self._state = ProxyConnectionStates.REQUEST_SUBMIT + else: + log.error("Socks5 proxy authentication failure") + self._state = ProxyConnectionStates.DISCONNECTED + self._sock.close() + return errno.ECONNREFUSED + + if self._state == ProxyConnectionStates.REQUEST_SUBMIT: + if self._target_afi == socket.AF_INET: + addr_type = 1 + addr_len = 4 + elif self._target_afi == socket.AF_INET6: + addr_type = 4 + addr_len = 16 + else: + log.error("Unknown address family, %r", self._target_afi) + self._state = ProxyConnectionStates.DISCONNECTED + self._sock.close() + return errno.ECONNREFUSED + + self._buffer_out = struct.pack( + "!bbbb{}sh".format(addr_len), + 5, # version + 1, # command: connect + 0, # reserved + addr_type, # 1 for ipv4, 4 for ipv6 address + socket.inet_pton(self._target_afi, addr[0]), # either 4 or 16 bytes of actual address + addr[1], # port + ) + self._state = ProxyConnectionStates.REQUESTING + + if self._state == ProxyConnectionStates.REQUESTING: + self._flush_buf() + buf = self._read_buf(2) + if buf[0:2] == b"\x05\x00": + self._state = ProxyConnectionStates.READ_ADDRESS + else: + log.error("Proxy request failed: %r", buf[1:2]) + self._state = ProxyConnectionStates.DISCONNECTED + self._sock.close() + return errno.ECONNREFUSED + + if self._state == ProxyConnectionStates.READ_ADDRESS: + # we don't really care about the remote endpoint address, but need to clear the stream + buf = self._peek_buf(2) + if buf[0:2] == b"\x00\x01": + _ = self._read_buf(2 + 4 + 2) # ipv4 address + port + elif buf[0:2] == b"\x00\x05": + _ = self._read_buf(2 + 16 + 2) # ipv6 address + port + else: + log.error("Unrecognized remote address type %r", buf[1:2]) + self._state = ProxyConnectionStates.DISCONNECTED + self._sock.close() + return errno.ECONNREFUSED + self._state = ProxyConnectionStates.COMPLETE + + if self._state == ProxyConnectionStates.COMPLETE: + return 0 + + # not reached; + # Send and recv will raise socket error on EWOULDBLOCK/EAGAIN that is assumed to be handled by + # the caller. The caller re-enters this state machine from retry logic with timer or via select & family + log.error("Internal error, state %r not handled correctly", self._state) + self._state = ProxyConnectionStates.DISCONNECTED + if self._sock: + self._sock.close() + return errno.ECONNREFUSED From a927ff28a2d78fdf45f8892ad7b67c66ce6276fc Mon Sep 17 00:00:00 2001 From: "Augusto F. Hack" Date: Thu, 14 Jan 2021 15:18:05 +0100 Subject: [PATCH 25/30] bugfix: fix infinite loop on KafkaAdminClient (#2194) An infinite loop may happen with the following pattern: self._send_request_to_node(self._client.least_loaded_node(), request) The problem happens when `self._client`'s cluster metadata is out-of-date, and the result of `least_loaded_node()` is a node that has been removed from the cluster but the client is unware of it. When this happens `_send_request_to_node` will enter an infinite loop waiting for the chosen node to become available, which won't happen, resulting in an infinite loop. This commit introduces a new method named `_send_request_to_least_loaded_node` which handles the case above. This is done by regularly checking if the target node is available in the cluster metadata, and if not, a new node is chosen. Notes: - This does not yet cover every call site to `_send_request_to_node`, there are some other places were similar race conditions may happen. - The code above does not guarantee that the request itself will be sucessful, since it is still possible for the target node to exit, however, it does remove the infinite loop which can render client code unusable. --- kafka/admin/client.py | 52 ++++++++++++++++++++++++++++++------------- 1 file changed, 37 insertions(+), 15 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 8cfbfc535..06004d69d 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -274,7 +274,7 @@ def _refresh_controller_id(self): version = self._matching_api_version(MetadataRequest) if 1 <= version <= 6: request = MetadataRequest[version]() - future = self._send_request_to_node(self._client.least_loaded_node(), request) + future = self._send_request_to_least_loaded_node(request) self._wait_for_futures([future]) @@ -312,7 +312,7 @@ def _find_coordinator_id_send_request(self, group_id): raise NotImplementedError( "Support for GroupCoordinatorRequest_v{} has not yet been added to KafkaAdminClient." .format(version)) - return self._send_request_to_node(self._client.least_loaded_node(), request) + return self._send_request_to_least_loaded_node(request) def _find_coordinator_id_process_response(self, response): """Process a FindCoordinatorResponse. @@ -357,9 +357,36 @@ def _find_coordinator_ids(self, group_ids): } return groups_coordinators - def _send_request_to_node(self, node_id, request, wakeup=True): + def _send_request_to_least_loaded_node(self, request): + """Send a Kafka protocol message to the least loaded broker. + + Returns a future that may be polled for status and results. + + :param request: The message to send. + :return: A future object that may be polled for status and results. + :exception: The exception if the message could not be sent. + """ + node_id = self._client.least_loaded_node() + while not self._client.ready(node_id): + # poll until the connection to broker is ready, otherwise send() + # will fail with NodeNotReadyError + self._client.poll() + + # node_id is not part of the cluster anymore, choose a new broker + # to connect to + if self._client.cluster.broker_metadata(node_id) is None: + node_id = self._client.least_loaded_node() + + return self._client.send(node_id, request) + + def _send_request_to_node(self, node_id, request): """Send a Kafka protocol message to a specific broker. + .. note:: + + This function will enter in an infinite loop if `node_id` is + removed from the cluster. + Returns a future that may be polled for status and results. :param node_id: The broker id to which to send the message. @@ -509,10 +536,7 @@ def _get_cluster_metadata(self, topics=None, auto_topic_creation=False): allow_auto_topic_creation=auto_topic_creation ) - future = self._send_request_to_node( - self._client.least_loaded_node(), - request - ) + future = self._send_request_to_least_loaded_node(request) self._wait_for_futures([future]) return future.value @@ -604,7 +628,7 @@ def describe_acls(self, acl_filter): .format(version) ) - future = self._send_request_to_node(self._client.least_loaded_node(), request) + future = self._send_request_to_least_loaded_node(request) self._wait_for_futures([future]) response = future.value @@ -695,7 +719,7 @@ def create_acls(self, acls): .format(version) ) - future = self._send_request_to_node(self._client.least_loaded_node(), request) + future = self._send_request_to_least_loaded_node(request) self._wait_for_futures([future]) response = future.value @@ -789,7 +813,7 @@ def delete_acls(self, acl_filters): .format(version) ) - future = self._send_request_to_node(self._client.least_loaded_node(), request) + future = self._send_request_to_least_loaded_node(request) self._wait_for_futures([future]) response = future.value @@ -849,8 +873,7 @@ def describe_configs(self, config_resources, include_synonyms=False): )) if len(topic_resources) > 0: - futures.append(self._send_request_to_node( - self._client.least_loaded_node(), + futures.append(self._send_request_to_least_loaded_node( DescribeConfigsRequest[version](resources=topic_resources) )) @@ -870,8 +893,7 @@ def describe_configs(self, config_resources, include_synonyms=False): )) if len(topic_resources) > 0: - futures.append(self._send_request_to_node( - self._client.least_loaded_node(), + futures.append(self._send_request_to_least_loaded_node( DescribeConfigsRequest[version](resources=topic_resources, include_synonyms=include_synonyms) )) else: @@ -918,7 +940,7 @@ def alter_configs(self, config_resources): # // a single request that may be sent to any broker. # # So this is currently broken as it always sends to the least_loaded_node() - future = self._send_request_to_node(self._client.least_loaded_node(), request) + future = self._send_request_to_least_loaded_node(request) self._wait_for_futures([future]) response = future.value From 6985761d2f981ace1943e728a33781afc454552c Mon Sep 17 00:00:00 2001 From: "Augusto F. Hack" Date: Thu, 14 Jan 2021 17:50:28 +0100 Subject: [PATCH 26/30] bugfix: infinite loop when send msgs to controller (#2194) If the value `_controller_id` is out-of-date and the node was removed from the cluster, `_send_request_to_node` would enter an infinite loop. --- kafka/admin/client.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 06004d69d..62d487543 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -412,10 +412,23 @@ def _send_request_to_controller(self, request): tries = 2 # in case our cached self._controller_id is outdated while tries: tries -= 1 - future = self._send_request_to_node(self._controller_id, request) + future = self._client.send(self._controller_id, request) self._wait_for_futures([future]) + if future.exception is not None: + log.error( + "Sending request to controller_id %s failed with %s", + self._controller_id, + future.exception, + ) + is_outdated_controler = ( + self._client.cluster.broker_metadata(self._controller_id) is None + ) + if is_outdated_controler: + self._refresh_controller_id() + continue + response = future.value # In Java, the error field name is inconsistent: # - CreateTopicsResponse / CreatePartitionsResponse uses topic_errors From 4ca2f45994ff71da76211ca28b8978fbc2278b17 Mon Sep 17 00:00:00 2001 From: "Augusto F. Hack" Date: Tue, 5 Jan 2021 13:20:38 +0100 Subject: [PATCH 27/30] bugfix: race among _connecting and cluster metadata (#2189) A call to `maybe_connect` can be performed while the cluster metadata is being updated. If that happens, the assumption that every entry in `_connecting` has metadata won't hold. The existing assert will then raise on every subsequent call to `poll` driving the client instance unusable. This fixes the issue by ignoring connetion request to nodes that do not have the metadata available anymore. --- kafka/client_async.py | 32 ++++++++++++++++++++------------ test/test_client_async.py | 9 ++------- 2 files changed, 22 insertions(+), 19 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index 354a44fbc..2e3a680ff 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -370,18 +370,26 @@ def _maybe_connect(self, node_id): conn = self._conns.get(node_id) if conn is None: - broker = self.cluster.broker_metadata(node_id) - assert broker, 'Broker id %s not in current metadata' % (node_id,) - - log.debug("Initiating connection to node %s at %s:%s", - node_id, broker.host, broker.port) - host, port, afi = get_ip_port_afi(broker.host) - cb = WeakMethod(self._conn_state_change) - conn = BrokerConnection(host, broker.port, afi, - state_change_callback=cb, - node_id=node_id, - **self.config) - self._conns[node_id] = conn + broker_metadata = self.cluster.broker_metadata(node_id) + + # The broker may have been removed from the cluster after the + # call to `maybe_connect`. At this point there is no way to + # recover, so just ignore the connection + if broker_metadata is None: + log.debug("Node %s is not available anymore, discarding connection", node_id) + if node_id in self._connecting: + self._connecting.remove(node_id) + return False + else: + log.debug("Initiating connection to node %s at %s:%s", + node_id, broker_metadata.host, broker_metadata.port) + host, port, afi = get_ip_port_afi(broker_metadata.host) + cb = WeakMethod(self._conn_state_change) + conn = BrokerConnection(host, broker_metadata.port, afi, + state_change_callback=cb, + node_id=node_id, + **self.config) + self._conns[node_id] = conn # Check if existing connection should be recreated because host/port changed elif self._should_recycle_connection(conn): diff --git a/test/test_client_async.py b/test/test_client_async.py index 66b227aa9..b0592086a 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -71,13 +71,8 @@ def test_can_connect(cli, conn): def test_maybe_connect(cli, conn): - try: - # Node not in metadata, raises AssertionError - cli._maybe_connect(2) - except AssertionError: - pass - else: - assert False, 'Exception not raised' + # Node not in metadata should be ignored + cli._maybe_connect(2) # New node_id creates a conn object assert 0 not in cli._conns From 919f61da9efdfe1cf760b94585d72b16d414cdda Mon Sep 17 00:00:00 2001 From: Jonas Keeling Date: Thu, 22 Feb 2024 23:00:34 +0100 Subject: [PATCH 28/30] bugfix: raise error in check_version if broker is unavailable This fixes an issue in check_version where KeyError is raised if the broker is unavailable or an invalid node_id is used. Instead it will return BrokerNotAvailableError. --- kafka/client_async.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index 2e3a680ff..a9dfd166b 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -911,7 +911,8 @@ def check_version(self, node_id=None, timeout=2, strict=False): if try_node is None: self._lock.release() raise Errors.NoBrokersAvailable() - self._maybe_connect(try_node) + if not self._maybe_connect(try_node): + raise Errors.BrokerNotAvailableError() conn = self._conns[try_node] # We will intentionally cause socket failures From 1a6bf016fe5c47c597afa081c6e103ca0d283d14 Mon Sep 17 00:00:00 2001 From: Jonas Keeling Date: Mon, 26 Feb 2024 10:29:17 +0100 Subject: [PATCH 29/30] Revert "bugfix: raise error in check_version if broker is unavailable" --- kafka/client_async.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index a9dfd166b..2e3a680ff 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -911,8 +911,7 @@ def check_version(self, node_id=None, timeout=2, strict=False): if try_node is None: self._lock.release() raise Errors.NoBrokersAvailable() - if not self._maybe_connect(try_node): - raise Errors.BrokerNotAvailableError() + self._maybe_connect(try_node) conn = self._conns[try_node] # We will intentionally cause socket failures From c662d941b13f11620f60fc48580f2f6643051ca6 Mon Sep 17 00:00:00 2001 From: Jonas Keeling Date: Mon, 26 Feb 2024 14:36:42 +0100 Subject: [PATCH 30/30] bugfix: catch KeyError in check_version and retry --- kafka/client_async.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index 2e3a680ff..9a720112d 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -912,7 +912,13 @@ def check_version(self, node_id=None, timeout=2, strict=False): self._lock.release() raise Errors.NoBrokersAvailable() self._maybe_connect(try_node) - conn = self._conns[try_node] + try: + conn = self._conns[try_node] + except KeyError: + if node_id is not None: + self._lock.release() + raise Errors.NodeNotReadyError() + continue # We will intentionally cause socket failures # These should not trigger metadata refresh