Kafka consumer. Метод poll

Поділитися
Вставка
  • Опубліковано 26 гру 2021
  • Начинаем изучать детали работы Kafka.
    Сегодня поговорим про некоторые особенности метода poll.
    Код примеров:
    github.com/petrelevich/jvm-di...
    Чат в телеге для вопросов и обсуждений:
    t.me/jvm_home
  • Наука та технологія

КОМЕНТАРІ • 11

  • @user-eg5uc6xu8u
    @user-eg5uc6xu8u 2 роки тому +2

    Огромное вам спасибо, благодаря вашему уроку мне удалось побить задачу, которую мне поставили на работе☺☺☺☺☺

  • @user-eg5uc6xu8u
    @user-eg5uc6xu8u 2 роки тому

    Добрый день, как вытащить определенное количество сообщений с конца или с начала с помощью консьюмера?

    • @petrelevich
      @petrelevich  2 роки тому

      чтобы вытащить определенное кол-во сообщений с конца, используйте параметр MAX_POLL_RECORDS_CONFIG. Как в примере в видео. А сначала читать нельзя, это очередь, но можно offset сместить на начало и все перечитать.

    • @user-eg5uc6xu8u
      @user-eg5uc6xu8u 2 роки тому

      @@petrelevich большое спасибо)

  • @ekaterinagalkina7303
    @ekaterinagalkina7303 7 місяців тому

    Смущает фраза из доки "that max.poll.records does not impact the underlying fetching behavior". Т.е. звучит так, что сетевые запросы - это одно, а poll() - другое, и на сетевые запросы к кафке настройка не влияет. Т.е. запрошенный объем данных где-то еще кэшируется, а that max.poll.records - всего лишь размер массива.
    max.poll.records
    The maximum number of records returned in a single call to poll(). Note, that max.poll.records does not impact the underlying fetching behavior. The consumer will cache the records from each fetch request and returns them incrementally from each poll.
    А для fetch другие настройки есть - максимальный размер в байтах и т.д.
    Но тогда непонятно, как по poll кафка определяет, что консьюмер упал. Правда, есть еще другая настройка heartbeat.interval.ms для пинга от консьюмера.

    • @petrelevich
      @petrelevich  6 місяців тому +2

      Параметры запроса к брокеру определяются конфигом:
      package org.apache.kafka.clients.consumer.internals;
      public class FetchConfig {
      final int fetchSize;
      final int maxPollRecords;
      где
      this.fetchSize = config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG);
      this.maxPollRecords = config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG);
      сам запрос формируется тут:
      package org.apache.kafka.clients.consumer.internals;
      public abstract class AbstractFetch
      один из его параметров
      .setMaxBytes(fetchConfig.maxBytes)
      а fetchSize
      используется в вычитывании из внутреннего буфера.
      Вот с этим буфером еще надо поразбираться.

  • @user-ks2ob1qw2q
    @user-ks2ob1qw2q Рік тому

    А есть ли способ одной командой вычитать весь топик и таким образом его очистить?

    • @petrelevich
      @petrelevich  Рік тому

      Т.е. нужен метод poll, который не сдвинет offset, а именно удалит? Причем метод consumer-а?

  • @valera3146
    @valera3146 Рік тому

    Здравствуйте. Если метод poll отдает постоянно 0. Это означает, что топик не содержит сообщения? Или например нет соединения или ошиблись с названием топика?

    • @petrelevich
      @petrelevich  Рік тому +1

      Это значит, что нечего отдавать, т.е. в топике нет сообщений.
      А вот сообщений может не быть, например, если топик не верно указан.
      Если нет соединения, то будет ошибка.

    • @valera3146
      @valera3146 Рік тому

      @@petrelevich спасибо