Optimizing Concurrent Data Fetching in Go with singleflight

In high-throughput applications, efficiency isn’t just a nicety—it’s a necessity. When multiple requests for the same resource flood in simultaneously, handling them efficiently can make or break your system’s performance. This is a challenge I’ve encountered numerous times, and one elegant solution in Go is the singleflight package.

Let’s dive into how singleflight can optimize concurrent data fetching, using the example of fetching currency exchange rates in a financial application.

All code examples from this essay can be found at this repository .


The Problem: Duplicate Work and Resource Contention

Imagine a financial service that provides real-time currency exchange rates to thousands of users. Exchange rates are fetched from an external API, which has strict rate limits and charges per request. When multiple users request the same currency pair at the same time, naively forwarding all these requests to the external API leads to:

  • Redundant Work: The same data is fetched multiple times.
  • Increased Costs: More API calls mean higher expenses.
  • Potential Throttling: Exceeding rate limits can result in being temporarily blocked.
  • Inefficient Resource Utilization: Unnecessary load on both your system and the external API.

Data Flow without singleflight

sequenceDiagram
    participant User1
    participant User2
    participant Service
    participant ExternalAPI

    User1->>Service: Request exchange rate (USD/EUR)
    User2->>Service: Request exchange rate (USD/EUR)
    Note over Service: Service handles requests concurrently

    Service->>ExternalAPI: Fetch exchange rate (USD/EUR) [Call 1]
    Service->>ExternalAPI: Fetch exchange rate (USD/EUR) [Call 2]
    ExternalAPI-->>Service: Exchange rate data [Response 1]
    ExternalAPI-->>Service: Exchange rate data [Response 2]
    Service-->>User1: Return exchange rate
    Service-->>User2: Return exchange rate

Enter singleflight

Go’s singleflight package provides a mechanism to suppress duplicate function calls. It ensures that only one execution of a given function is in-flight for a particular key at a time. If multiple goroutines call the function with the same key, they wait for the first call to complete and receive the same result.

Data Flow with singleflight

sequenceDiagram
    participant User1
    participant User2
    participant Service
    participant ExternalAPI

    User1->>Service: Request exchange rate (USD/EUR)
    User2->>Service: Request exchange rate (USD/EUR)
    Note over Service: Service uses singleflight

    Service->>ExternalAPI: Fetch exchange rate (USD/EUR) [Single Call]
    ExternalAPI-->>Service: Exchange rate data
    Service-->>User1: Return exchange rate
    Service-->>User2: Return exchange rate

How singleflight Works

At its core, singleflight maintains a map of keys to in-flight calls:

  • When a function is called with a key, singleflight checks if there’s an ongoing call for that key.
  • If there is, it waits for the result of the in-flight call.
  • If not, it starts a new call and registers it.
  • Once the call completes, all waiting goroutines receive the result.

This mechanism effectively collapses multiple concurrent calls into a single call, sharing the result among all callers.

Implementing singleflight (for our example)

In our currency exchange rate service, we can use singleflight to prevent redundant API calls. Here’s how:

  1. Create a singleflight.Group: This group will manage in-flight calls.
  2. Use the Do Method: Wrap the API call in the Do method, using the currency pair as the key.
  3. Handle Contexts: Since singleflight doesn’t natively support contexts, we’ll manually check for context cancellation.
  4. Implement Caching: Store the fetched exchange rates to serve future requests without hitting the API.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func (s *ExchangeRateService) FetchExchangeRate(ctx context.Context, from string, to string) (float64, error) {
	key := fmt.Sprintf("%s_%s", from, to)

	// Check cache first
	if rate, ok := s.cache.Load(key); ok {
		return rate.(float64), nil
	}

	// Check if context is already cancelled
	select {
	case <-ctx.Done():
		return 0, ctx.Err()
	default:
	}

	// Use singleflight to prevent duplicate fetches
	v, err, _ := s.sfGroup.Do(key, func() (any, error) {
		// Re-check context inside function
		select {
		case <-ctx.Done():
			return nil, ctx.Err()
		default:
		}

		// Simulate fetching from external API
		rate, err := s.fetchExchangeRateFromAPI(ctx, from, to)
		if err != nil {
			return nil, err
		}
		s.cache.Store(key, rate)
		return rate, nil
	})

	if err != nil {
		return 0, err
	}

	return v.(float64), nil
}

Benchmarks

Handling High Concurrency and Resource Limits

To simulate high concurrency and external API limitations, we:

  • Increased the Number of Goroutines: Simulating thousands of concurrent requests.
  • Introduced a Semaphore: Limiting the number of concurrent API calls to mimic rate limits.
  • Added an API Call Counter: Tracking the number of actual API calls made.

Two scenarios have been benchmarked:

  1. With singleflight: Only one API call is made, regardless of the number of concurrent requests.
  2. Without singleflight: An API call is made for each request, leading to excessive API calls and increased latency due to semaphore blocking.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
func BenchmarkFetchExchangeRate_Singleflight(b *testing.B) {
	service := NewExchangeRateService(50)
	ctx := context.Background()
	from := "USD"
	to := "EUR"

	b.ResetTimer()
	for i := 0; i < b.N; i++ {
		service.Reset()

		var wg sync.WaitGroup
		numGoroutines := 1000
		wg.Add(numGoroutines)

		startTime := time.Now()
		for j := 0; j < numGoroutines; j++ {
			go func() {
				defer wg.Done()
				_, _ = service.FetchExchangeRate(ctx, from, to)
			}()
		}
		wg.Wait()
		elapsed := time.Since(startTime)
		b.ReportMetric(float64(service.GetAPICallCount()), "api_calls")
		b.ReportMetric(float64(elapsed.Milliseconds()), "ms_per_iter")
	}
}

func BenchmarkFetchExchangeRate_NoSingleflight(b *testing.B) {
	service := NewExchangeRateService(50)
	ctx := context.Background()
	from := "USD"
	to := "EUR"

	b.ResetTimer()
	for i := 0; i < b.N; i++ {
		service.Reset()

		var wg sync.WaitGroup
		numGoroutines := 1000
		wg.Add(numGoroutines)

		startTime := time.Now()
		for j := 0; j < numGoroutines; j++ {
			go func() {
				defer wg.Done()
				_, _ = service.FetchExchangeRateNoSingleflight(ctx, from, to)
			}()
		}
		wg.Wait()
		elapsed := time.Since(startTime)
		b.ReportMetric(float64(service.GetAPICallCount()), "api_calls")
		b.ReportMetric(float64(elapsed.Milliseconds()), "ms_per_iter")
	}
}

Results

1
go test -bench=. -benchmem
BenchmarkIterationsTime (ns/op)API CallsTime per IterationMemory (B/op)Allocations (allocs/op)
FetchExchangeRate_Singleflight-105202,096,7421.000202.0 ms154,9044,240
FetchExchangeRate_NoSingleflight-1014,023,039,41610004,023 ms636,30413,393

The benchmark results clearly show that using singleflight:

  1. Is ~20x faster per operation
  2. Uses ~4x less memory
  3. Has ~3x fewer allocations
  4. Makes 1000x fewer API calls

Testing

This opportunity was used to cover all different scenarios, both with a table-driven approach and case-by-case testing. Primarily, it demonstrates that easier maintenance can be achieved with the TD approach. Take a look at all the tests in this repo , for a more detailed overview.

To run and parse the tests, you can use the following command:

1
go test -json -race ./... | tparse -all

And if you don’t have tparse installed, you can do so by running:

1
go install github.com/mfridman/tparse@latest
Table-Driven tests for our example
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
package exchangerates

import (
	"context"
	"errors"
	"sync"
	"testing"
)

func TestExchangeRateService(t *testing.T) {
	service := NewExchangeRateService(50)
	const mixedConcurrencyTest = "Mixed Currencies Concurrent Fetch"
	tests := []struct {
		name               string
		setup              func()
		from               string
		to                 string
		numGoroutines      int
		useSingleflight    bool
		cancelContext      bool
		expectedAPICalls   int64
		expectError        bool
		expectedError      error
		expectSameRates    bool
		expectRateMismatch bool // For tests where rates may differ
	}{
		{
			name:             "Single Fetch",
			setup:            func() { service.Reset() },
			from:             "USD",
			to:               "EUR",
			numGoroutines:    1,
			useSingleflight:  true,
			expectedAPICalls: 1,
			expectError:      false,
			expectSameRates:  true,
		},
		{
			name:             "Concurrent Fetch with Singleflight",
			setup:            func() { service.Reset() },
			from:             "USD",
			to:               "EUR",
			numGoroutines:    100,
			useSingleflight:  true,
			expectedAPICalls: 1,
			expectError:      false,
			expectSameRates:  true,
		},
		{
			name:             "Concurrent Fetch without Singleflight",
			setup:            func() { service.Reset() },
			from:             "USD",
			to:               "EUR",
			numGoroutines:    100,
			useSingleflight:  false,
			expectedAPICalls: 100,
			expectError:      false,
			expectSameRates:  false,
		},
		{
			name:             "Cancellation Before Fetch",
			setup:            func() { service.Reset() },
			from:             "USD",
			to:               "EUR",
			numGoroutines:    1,
			useSingleflight:  true,
			cancelContext:    true,
			expectedAPICalls: 0,
			expectError:      true,
			expectedError:    context.Canceled,
		},
		{
			name:             mixedConcurrencyTest,
			setup:            func() { service.Reset() },
			numGoroutines:    100,
			useSingleflight:  true,
			expectedAPICalls: 5, // Number of unique currency pairs
			expectError:      false,
		},
	}

	// Define currency pairs for the mixed currencies test
	currencyPairs := []struct {
		from string
		to   string
	}{
		{"USD", "EUR"},
		{"USD", "JPY"},
		{"EUR", "GBP"},
		{"AUD", "NZD"},
		{"CAD", "CHF"},
	}

	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			tt.setup()

			ctx := context.Background()
			if tt.cancelContext {
				var cancel context.CancelFunc
				ctx, cancel = context.WithCancel(ctx)
				cancel()
			}

			var wg sync.WaitGroup
			wg.Add(tt.numGoroutines)

			rates := make([]float64, tt.numGoroutines)
			errs := make([]error, tt.numGoroutines)

			for i := range tt.numGoroutines {
				go func(i int) {
					defer wg.Done()
					var rate float64
					var err error
					if tt.name == mixedConcurrencyTest {
						// Cycle through predefined currency pairs
						pair := currencyPairs[i%len(currencyPairs)]
						rate, err = service.FetchExchangeRate(ctx, pair.from, pair.to)
					} else {
						if tt.useSingleflight {
							rate, err = service.FetchExchangeRate(ctx, tt.from, tt.to)
						} else {
							rate, err = service.FetchExchangeRateNoSingleflight(ctx, tt.from, tt.to)
						}
					}
					rates[i] = rate
					errs[i] = err
				}(i)
			}

			wg.Wait()

			for i, err := range errs {
				if tt.expectError {
					if err == nil {
						t.Logf("Expected error, but got nil. This may occur due to singleflight behavior.")
					} else if !errors.Is(err, tt.expectedError) {
						t.Errorf("Expected error %v, got %v", tt.expectedError, err)
					}
				} else {
					if err != nil {
						t.Errorf("Goroutine %d failed: %v", i, err)
					}
				}
			}

			// Check API call count
			if apiCalls := service.GetAPICallCount(); apiCalls != tt.expectedAPICalls {
				t.Errorf("Expected API call count to be %d, got %d", tt.expectedAPICalls, apiCalls)
			}

			// Check if rates are the same when expected
			if tt.expectSameRates && tt.numGoroutines > 1 {
				firstRate := rates[0]
				for i, rate := range rates {
					if rate != firstRate {
						t.Errorf("Rate mismatch at goroutine %d: expected %v, got %v", i, firstRate, rate)
					}
				}
			}

			if tt.name == mixedConcurrencyTest {
				uniqueRates := make(map[float64]struct{})
				for _, rate := range rates {
					uniqueRates[rate] = struct{}{}
				}
				if len(uniqueRates) != len(currencyPairs) {
					t.Errorf("Expected %d unique rates, got %d", len(currencyPairs), len(uniqueRates))
				}
			}
		})
	}
}

When to Use singleflight

singleflight is ideal when:

  • Multiple Requests for Identical Data: Your system frequently receives concurrent requests for the same resource.
  • Expensive Operations: The function you’re calling is resource-intensive or has significant latency.
  • External Rate Limits: You’re interacting with services that limit the number of requests or charge per request.

Conclusion

Optimizing concurrent data fetching is critical in building efficient, scalable applications. Go’s singleflight package provides a powerful yet simple mechanism to suppress duplicate function calls, reducing redundant work and improving performance.

In our currency exchange rate service, implementing singleflight led to a dramatic reduction in API calls and response times under high concurrency. By combining singleflight with proper caching and context handling, we built a robust solution that scales gracefully.


Key Takeaways:

  • Understand Your Concurrency Patterns: Identify where redundant work occurs and how it impacts your system.
  • Leverage Go’s Concurrency Tools: Packages like singleflight can significantly simplify concurrency management.
  • Benchmark and Test Thoroughly: Real-world performance gains are validated through careful benchmarking and testing.
  • Handle Contexts Appropriately: Always ensure that your functions respect cancellations and timeouts.

Special shoutout to Amaury Brisou for the inspiring questions that led me to reconsider how cancellation is handled with wait groups.

mermaid
1