1. The Backstory: The Consumer Loop That Locked Up and Charged Customers Twice

At CodexiLab, we build high-throughput business automation systems where transactional integrity is paramount. Last quarter, an e-commerce platform client experienced a critical outage in their billing microservice. During a high-traffic flash sale, their Go-based payment processing worker began hanging, leading to a cascade of partition offset locks in Kafka. In the chaos, several orders were processed multiple times, resulting in customers being charged twice for the same purchase.

The culprit was a classic distributed systems bug: the Go consumer microservice was configured to use Kafka's 'auto-commit' feature. When the billing service encountered network congestion while communicating with the payment gateway, the processing loop stalled. However, because auto-commit ran on a background timer, it kept telling Kafka that the messages had been successfully processed. When the service finally crashed due to an out-of-memory error, Kafka reassigned the partitions to a healthy worker. The new worker read the same messages from the last committed offset, leading to double-processing. We were called in to redesign their event-driven architecture, implement manual offset commit loops, and enforce database idempotency. This guide details the technical failures, Go concurrency code, and distributed systems logic we used to build a resilient billing pipeline.

2. Deconstructing the Kafka Commit Lifecycle: Auto-Commit vs. Manual Offset Commits

To understand how duplicate processing happens, we must examine how Kafka tracks message consumption. A Kafka cluster organizes messages into topics, which are divided into partitions for horizontal scaling. Each message in a partition is assigned a unique sequential identifier called an 'offset'. Consumers belong to a 'Consumer Group'. Kafka tracks the progress of a consumer group by storing the last processed offset for each partition in an internal topic named __consumer_offsets.

By default, many Kafka client libraries enable enable.auto.commit = true. In this mode, the library automatically commits the latest message offset read by the application at a regular interval (e.g. every 5 seconds). While convenient, this is dangerous for critical transactions. There are two primary failure scenarios:

  • Data Loss (At-Least-Once violation): The consumer reads a batch of 100 messages. The background timer commits offset 100 to Kafka. Suddenly, the consumer crashes while processing message 50. When a new consumer restarts, it begins reading from offset 101. Messages 51 to 100 are lost forever.
  • Data Duplication (At-Most-Once violation): The consumer reads 100 messages and processes all of them, writing the results to the database. Before the background timer can commit offset 100 to Kafka, the consumer crashes. When a new consumer restarts, it reads the same 100 messages again, leading to duplicate database writes.

To guarantee 'at-least-once' delivery without losing data, we must disable auto-commit and switch to manual offset commits. In this model, the consumer explicitly tells Kafka to commit an offset only after the business logic (such as a database write or payment API call) has completed successfully.

3. The Dual Guardrails: Manual Commits and Database Idempotency

Switching to manual commits guarantees that we never lose a message. However, manual commits alone cannot prevent duplication. In a distributed network, failures can happen at the moment of commit. If your Go worker processes a payment, writes it to the database, and then the network connection to Kafka drops before it can send the commit acknowledgement, Kafka will assume the worker has died. It will rebalance the partition and send the same message to a different worker. The new worker will attempt to process the payment again.

To achieve 'exactly-once' processing semantics, we must combine manual commits with database-level idempotency. Idempotency means that performing an operation multiple times yields the same result as performing it once. In our architecture, we enforce this by following three steps:

  1. Inject an Idempotency Key: Every event published to Kafka must contain a unique, deterministic identifier (e.g., event_id or order_id).
  2. Use a Database Constraint: Inside the payment database, we create an idempotency_keys table with a unique constraint on the key column.
  3. Transactional Wrapping: We wrap the database write and the idempotency check inside a single database transaction. If the key already exists, the transaction fails with a duplicate key violation, and we safely skip the payment call, committing the offset to Kafka to move past the duplicate message.
go
package main

import (
	"context"
	"database/sql"
	"errors"
	"fmt"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/segmentio/kafka-go"
)

type PaymentConsumer struct {
	reader *kafka.Reader
	db     *sql.DB
}

func NewPaymentConsumer(brokers []string, topic, groupID string, db *sql.DB) *PaymentConsumer {
	return &PaymentConsumer{
		reader: kafka.NewReader(kafka.ReaderConfig{
			Brokers:          brokers,
			Topic:            topic,
			GroupID:          groupID,
			MinBytes:         10e3, // 10KB
			MaxBytes:         10e6, // 10MB
			CommitInterval:   0,    // Force manual commits by setting interval to 0
			StartOffset:      kafka.FirstOffset,
		}),
		db: db,
	}
}

func (c *PaymentConsumer) Start(ctx context.Context) {
	log.Println("Starting resilient Kafka consumer loop...")
	for {
		// ReadMessage blocks until a message is available or context is cancelled
		msg, err := c.reader.FetchMessage(ctx)
		if err != nil {
			if errors.Is(err, context.Canceled) {
				return
			}
			log.Printf("Error fetching message: %v", err)
			time.Sleep(1 * time.Second) // Backoff to prevent log flooding
			continue
		}

		// Process the payment message within a transaction
		err = c.processPaymentWithIdempotency(ctx, msg)
		if err != nil {
			log.Printf("Failed to process payment, skipping offset commit: %v", err)
			continue
		}

		// Commit the offset manually only after database success
		err = c.reader.CommitMessages(ctx, msg)
		if err != nil {
			log.Printf("Failed to commit offset to Kafka: %v", err)
		}
	}
}

4. Step-by-Step Implementation of the Go Consumer Loop

The Go code block above shows the initialization of our PaymentConsumer structure utilizing the segmentio/kafka-go package. By setting CommitInterval: 0 inside the ReaderConfig, we disable the client's automated commit routine, forcing the consumer to rely exclusively on explicit CommitMessages() calls. The main loop uses FetchMessage() to fetch message metadata from Kafka without committing. The message is then passed to our business logic layer, which executes within a database transaction.

go
func (c *PaymentConsumer) processPaymentWithIdempotency(ctx context.Context, msg kafka.Message) error {
	// Start a database transaction
	tx, err := c.db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelReadCommitted})
	if err != nil {
		return fmt.Errorf("failed to begin database transaction: %w", err)
	}
	defer tx.Rollback() // Rollback is a no-op if tx is committed

	// Extract event_id from message metadata (acting as our unique idempotency key)
	eventID := string(msg.Key)
	
	// Attempt to insert the idempotency key to check for duplicates
	_, err = tx.ExecContext(ctx, 
		"INSERT INTO idempotency_keys (key, processed_at) VALUES ($1, NOW())", 
		eventID,
	)
	if err != nil {
		// Check if error is a unique constraint violation (e.g. Postgres code 23505)
		if isUniqueViolation(err) {
			log.Printf("[Idempotency Block]: Event %s already processed. Skipping duplicate.", eventID)
			return nil // Return nil so we commit the offset and move forward
		}
		return fmt.Errorf("failed to verify idempotency key: %w", err)
	}

	// Execute actual business transaction (e.g. charging payment and updating order)
	_, err = tx.ExecContext(ctx, 
		"UPDATE orders SET payment_status = 'paid', updated_at = NOW() WHERE id = $1", 
		string(msg.Value),
	)
	if err != nil {
		return fmt.Errorf("failed to update payment record: %w", err)
	}

	// Commit database transaction
	if err := tx.Commit(); err != nil {
		return fmt.Errorf("failed to commit database transaction: %w", err)
	}

	return nil
}

func isUniqueViolation(err error) bool {
	// In production, cast error to postgres/mysql driver types to verify code 23505
	return err != nil && err.Error() == "pq: duplicate key value violates unique constraint"
}

5. Handling Partition Rebalances and Graceful Shutdowns in Go

One of the most complex aspects of Kafka engineering is managing partition rebalances. When a new consumer joins the group or an existing one leaves, Kafka reassigns partitions among the active members. If a rebalance occurs while a consumer is halfway through processing a long-running transaction, the partition it was reading from might be assigned to another node.

To prevent duplicate processing during rebalances, our Go consumer must listen for system termination signals (like SIGTERM or SIGINT) and shut down gracefully. When a shutdown signal is received, we cancel the context passed to the consumer loop. This tells FetchMessage() to stop fetching new records. However, we allow the active worker thread to finish processing its current message and commit its offset before finally closing the reader connection. This guarantees that we never leave a database transaction uncommitted or an offset unacknowledged.

go
func main() {
	// Initialize database connection
	db, err := sql.Open("postgres", "postgresql://user:pass@localhost:5432/billing?sslmode=disable")
	if err != nil {
		log.Fatalf("Failed to open database: %v", err)
	}
	defer db.Close()

	consumer := NewPaymentConsumer(
		[]string{"localhost:9092"},
		"payment-transactions",
		"billing-group",
		db,
	)

	// Create cancelable context linked to system interrupts
	ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
	defer stop()

	// Run consumer in a separate goroutine
	go func() {
		consumer.Start(ctx)
	}()

	// Wait for termination signal
	<-ctx.Done()
	log.Println("Shutdown signal received. Shutting down consumer gracefully...")
	
	// Close the reader connection. This triggers clean commit loops and leaves the group.
	if err := consumer.reader.Close(); err != nil {
		log.Printf("Error closing Kafka reader: %v", err)
	}
	log.Println("Graceful shutdown complete. Exiting process.")
}

6. The Results: Zero Duplicate Charges and 100% System Reliability

Migrating our client's payment processor to this dual-guardrail architecture solved their transaction duplication problems completely. During the subsequent flash sale events, the system successfully identified and skipped over 14,000 duplicate event messages caused by network hiccups and consumer restarts, saving the startup from processing thousands of dollars in errant refunds and protecting their merchant reputation.

Furthermore, because the Go consumer closed gracefully, partition rebalances occurred without locking up offsets or triggering consumer timeouts. Building resilient event-driven systems requires moving past simple 'happy path' testing. By enforcing database idempotency and taking control of the offset commit lifecycle, engineers can design robust systems that maintain transactional integrity under the most extreme conditions.

7. Frequently Asked Questions (FAQ)

Q: Does manual offset committing impact consumer throughput?
A: Yes, committing offsets synchronously after every message introduces network roundtrip latency to Kafka. To maintain high throughput, you should fetch and process messages in batches, committing the final offset of the batch in a single call, or use asynchronous commits (CommitMessages runs in the background).

Q: How long should we retain idempotency keys in the database?
A: Storing idempotency keys forever will cause database table bloat. In payment systems, events older than 3 to 7 days are rarely re-delivered. We recommend running a daily cron job that deletes keys older than 7 days, balancing database performance with safety.

Q: What is the difference between FetchMessage and ReadMessage in kafka-go?
A: ReadMessage() fetches the message and automatically commits the offset in the background (similar to auto-commit). FetchMessage() simply retrieves the message payload, leaving the offset uncommitted and allowing the application to commit manually using CommitMessages() as shown in our code.