How to drain/delete/expire existing messages in Kafka

Some time you might have a bad record in Kafka topic that you want to delete. Kafka does not provide direct option to delete specific record. Only way to delete records is to expire them. You can achieve this by setting data retention to say 1 second that expires all the old messages. You can follow these steps
    First check the topic to find out value of retention.ms config parameter for the topic
  1. 
    kafka_2.11-0.11.0.1 spatil$ bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name my-topic --entity-type topics
    Configs for topic 'my-topic' are retention.ms=86400000
    
  2. Change value of retention.ms to 1 which means all messages older than 1 ms will be expired
    
    kafka_2.11-0.11.0.1 spatil$ bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-name my-topic --entity-type topics --add-config retention.ms=1
    Completed Updating config for entity: topic 'my-topic'
    
  3. Wait for few seconds and monitor logs of the Kafka server to make sure that messages from the topic are deleted
  4. Now change the value of retention.ms back to its original value which was 86400000 (7 days)
    
    kafka_2.11-0.11.0.1 spatil$ bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-name my-topic --entity-type topics --add-config retention.ms=86400000
    Completed Updating config for entity: topic 'my-topic'.
    
  5. You can verify that your changes are saved by running this command
    
    kafka_2.11-0.11.0.1 spatil$ bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name my-topic --entity-type topics
    Configs for topic 'my-topic' are retention.ms=86400000
    

Kafka how to reset number of partitions in a topics

I wanted to figure out how to reset number of partitions in my topic in Kafka and I followed these steps
  1. I did create a sample topic called my-topic with single partition
    
    spatil$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic my-topic --replication-factor 1 --partitions 1
    Created topic "my-topic".
    
  2. I used describe command to verify that my topic has actually single partition
    
    spatil$ bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic my-topic
    Topic:my-topic PartitionCount:1 ReplicationFactor:1 Configs:
     Topic: my-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
    
  3. Then I did execute alter command on my partition and changed number of partitions from 1 to 3
    
    spatil$ bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic --partitions 3
    WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
    Adding partitions succeeded!
    
  4. I did execute describe command on my topic to verify that it actually has 3 topics
    
    spatil$ bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic my-topic
    Topic:my-topic PartitionCount:3 ReplicationFactor:1 Configs:
     Topic: my-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
     Topic: my-topic Partition: 1 Leader: 0 Replicas: 0 Isr: 0
     Topic: my-topic Partition: 2 Leader: 0 Replicas: 0 Isr: 0
    

How to reset consumer group offset

First run describe on topic to check what it the current LAG its zero in this case

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group user.kafkaconsumer
Now run following command to just preview what will be the next offset if you reset

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group user.kafkaconsumer --reset-offsets --to-earliest --all-topics
Shutdown the consumer so that you can restart the consumer Now go back/reset the offset so that it goes back to first message 3bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group user.kafkaconsumer --reset-offsets --to-earliest --all-topics --execute Go back and verify that the consumer offset actually went back by executing following command

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group user.kafkaconsumer
You should be able to see the offset set back to 22000 which is start of first message in Kafka.