get started

参考:

start kafka-broker

bin/kafka-server-start.sh -daemon config/server.properties

start kafka-connect

bin/connect-standalone.sh -daemon \
    config/connect-standalone.properties \
    config/debezium-mysql.properties

list topic

bin/kafka-topics.sh --zookeeper localhost:2181 \
    --list
# Use kafka-broker instead of zookeeper. Same below
bin/kafka-topics.sh --bootstrap-server <kafkahost:port> \
    --list

desc topic

bin/kafka-topics.sh --bootstrap-server <kafkahost:port> \
    --topic $1 \
    --describe

create topic

bin/kafka-topics.sh --bootstrap-server <kafkahost:port> \
    --topic $1 \
    --partitions $2 \
    --create

remove topic

bin/kafka-topics.sh --bootstrap-server <kafkahost:port> \
    --topic $1 \
    --delete

produce msg to topic

echo "msg" | bin/kafka-console-producer.sh \
    --bootstrap-server <kafkahost:port> \
    --topic $1

consume msg from topic

bin/kafka-console-consumer.sh --bootstrap-server <kafkahost:port> \
    --topic $1 --from-beginning --property print.timestamp=true

search msg from topic

bin/kafka-console-consumer.sh --bootstrap-server <kafkahost:port> \
    --topic $1 --from-beginning | grep $2

describe consumer group

bin/kafka-consumer-groups.sh --bootstrap-server <kafkahost:port> \
    --group $1 \
    --describe

reset consumer offset for topic

# preview
bin/kafka-consumer-groups --bootstrap-server <kafkahost:port> \
    --group <group_id> \
    --topic <topic_name> \
    --reset-offsets --to-earliest
# execute
bin/kafka-consumer-groups --bootstrap-server <kafkahost:port> \
    --group <group_id> \
    --topic <topic_name> \
    --reset-offsets \
    --to-earliest \
    --execute

other resetting options, run kafka-consumer-groups for details

  • –shift-by
  • –to-current
  • –to-latest
  • –to-offset
  • –to-datetime
  • –by-duration

producer performance

bin/kafka-producer-perf-test.sh --bootstrap-server <kafkahost:port> \
    --topic <topic> \
    --num-records <msg-count> \
    --record-size <record-bytes> \
    --throughput <messages/sec> \
    --producer.config <producer-config-file>

consumer performance

bin/kafka-consumer-perf-test.sh --bootstrap-server <kafkahost:port> \
    --topic <topic> \
    --messages <msg-count> \
    --group <gid:default:perf-consumer-49221> \
    --fetch-size <fetch-size:default:1048576> \
    --date-format <timestamp-format:default:yyyy-MM-dd HH:mm:ss:SSS> \
    --consumer.config <consumer-config-file> \
    --from-latest