How to drain/delete/expire existing messages in Kafka

Some time you might have a bad record in Kafka topic that you want to delete. Kafka does not provide direct option to delete specific record. Only way to delete records is to expire them. You can achieve this by setting data retention to say 1 second that expires all the old messages. You can follow these steps
    First check the topic to find out value of retention.ms config parameter for the topic
  1. 
    kafka_2.11-0.11.0.1 spatil$ bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name my-topic --entity-type topics
    Configs for topic 'my-topic' are retention.ms=86400000
    
  2. Change value of retention.ms to 1 which means all messages older than 1 ms will be expired
    
    kafka_2.11-0.11.0.1 spatil$ bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-name my-topic --entity-type topics --add-config retention.ms=1
    Completed Updating config for entity: topic 'my-topic'
    
  3. Wait for few seconds and monitor logs of the Kafka server to make sure that messages from the topic are deleted
  4. Now change the value of retention.ms back to its original value which was 86400000 (7 days)
    
    kafka_2.11-0.11.0.1 spatil$ bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-name my-topic --entity-type topics --add-config retention.ms=86400000
    Completed Updating config for entity: topic 'my-topic'.
    
  5. You can verify that your changes are saved by running this command
    
    kafka_2.11-0.11.0.1 spatil$ bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name my-topic --entity-type topics
    Configs for topic 'my-topic' are retention.ms=86400000
    

No comments: