Discussion at https://golang.org/issue/18802
As it stands, Go programs do not have a good way to avoid contention when combining highly concurrent code with shared memory locations that frequently require mutation. This proposal describes a new package and type to satisfy this need.
There are several scenarios in which a Go program might want to have shared data that is mutated frequently. We will briefly discuss some of these scenarios, such that we can evaluate the proposed API against these concrete use-cases.
In RPC servers or in scientific computing code, there is often a need for global
counters. For instance, in an RPC server, a global counter might count the
number of requests received by the server, or the number of bytes received by
the server. Go makes it easy to write RPC servers which are inherently
concurrent, often processing each connection and each request on a concurrent
goroutine. This means that in the context of a multicore machine, several
goroutines can be incrementing the same global counter in parallel. Using an API
like atomic.AddInt64
will ensure that such a counter is lock-free, but
parallel goroutines will be contending for the same cache line, so this counter
will not experience linear scalability as the number of cores is increased.
(Indeed, one might even expect scalability to unexpectedly decrease due to
increased core-to-core communication).
It's probably also worth noting that there are other similar use-cases in this space (e.g. types that record distributions rather than just sums, max-value trackers, etc).
It is common in programs to have data that is read frequently, but written very
rarely. In these cases, a common synchronization primitive is sync.RWMutex
,
which offers an RLock
/RUnlock
API for readers. When no writers are
interacting with an RWMutex
, an arbitrary number of goroutines can use the
read-side of the RWMutex
without blocking.
However, in order to correctly pair calls to RLock
with calls to RUnlock
,
RWMutex
internally maintains a counter, which is incremented during RLock
and decremented during RUnlock
. There is also other shared mutable state that
is atomically updated inside the RWMutex
during these calls (and during
calls to Lock
and Unlock
). For reasons similar to the previous example, if
many goroutines are acquiring and releasing read-locks concurrently and the
program is running on a multicore machine, then it is likely that the
performance of RWMutex.RLock
/RWMutex.RUnlock
will not scale linearly with
the number of cores given to the program.
In programs that make many RPC calls in parallel, there can be contention on shared mutable state stored inside the RPC or HTTP clients. For instance, an RPC client might support connecting to a pool of servers, and implements a configurable load balancing policy to select a connection to use for a given RPC; these load balancing policies often need to maintain state for each connection in the pool of managed connections. For instance, an implementation of the "Least-Loaded" policy needs to maintain a counter of active requests per connection, such that a new request can select the connection with the least number of active requests. In scenarios where a client is performing a large number of requests in parallel (perhaps enqueueing many RPCs before finally waiting on them at a later point in the program), then contention on this internal state can start to affect the rate at which the requests can be dispatched.
In data-processing pipelines, code running in a particular stage may want to
'batch' its output, such that it only sends data downstream in N-element
batches, rather than sending single elements through the pipeline at a time. In
the single-goroutine case and where the element type is 'byte', then the
familiar type bufio.Writer
implements this pattern. Indeed, one
option for the general data-processing pipeline, is to have a single goroutine
run every stage in the pipeline from end-to-end, and then instantiate a small
number of parallel pipeline instances. This strategy effectively handles
pipelines composed solely of stages dominated by CPU-time. However, if a
pipeline contains any IO (e.g. initially reading the input from a distributed
file system, making RPCs, or writing the result back to a distributed file
system), then this setup will not be efficient, as a single stall in IO will
take out a significant chunk of your throughput.
To mitigate this problem, IO bound stages need to run many goroutines. Indeed, a clever framework (like Apache Beam) can detect these sorts of situations dynamically, by measuring the rate of stage input compared to the rate of stage output; they can even reactively increase or decrease the "concurrency level" of a stage in response to these measurements. In Beam's case, it might do this by dynamically changing the number of threads-per-binary, or number of workers-per-stage.
When stages have varying concurrency levels, but are connected to each other in a pipeline structure, it is important to place a concurrency-safe abstraction between the stages to buffer elements waiting to be processed by the next stage. Ideally, this structure would minimize the contention experienced by the caller.
To solve these problems, we propose an API with a single new type
percpu.Sharded
. Here is an outline of the proposed API.
// Package percpu provides low-level utilities for avoiding contention on
// multicore machines.
package percpu
// A Sharded is a container of homogenously typed values.
//
// On a best effort basis, the runtime will strongly associate a given value
// with a CPU core. That is to say, retrieving a value twice on the same CPU
// core will return the same value with high probablity. Note that the runtime
// cannot guarantee this fact, and clients must assume that retrieved values
// can be shared between concurrently executing goroutines.
//
// Once a value is placed in a Sharded, the Sharded will retain a reference to
// this value permanently. Clients can control the maximum number of distinct
// values created using the SetMaxShards API.
//
// A Sharded must not be copied after first use.
//
// All methods are safe to call from multiple goroutines.
type Sharded struct {
// contains unexported fields
}
// SetMaxShards sets a limit on the maximum number of elements stored in the
// Sharded.
//
// It will not apply retroactively, any elements already created will remain
// inside the Sharded.
//
// If maxShards is less than 1, Sharded will panic.
func (s *Sharded) SetMaxShards(maxShards int)
// GetOrCreate retrieves a value roughly associated with the current CPU. If
// there is no such value, then createFn is called to create a value, store it
// in the Sharded, and return it.
//
// All calls to createFn are serialized; this means that one must complete
// before the next one is started.
//
// createFn should not return nil, or Sharded will panic.
//
// If createFn is called with a ShardInfo.ShardIndex equal to X, no future call
// to GetOrCreate will call createFn again with a ShardInfo.ShardIndex equal to
// X.
func (s *Sharded) GetOrCreate(createFn func(ShardInfo) interface{}) interface{}
// Get retrieves any preexisting value associated with the current CPU. If
// there is no such value, nil is returned.
func (s *Sharded) Get() interface{}
// Do iterates over a snapshot of all elements stored in the Sharded, and calls
// fn once for each element.
//
// If more elements are created during the iteration itself, they may be
// visible to the iteration, but this is not guaranteed. For stronger
// guarantees, see DoLocked.
func (s *Sharded) Do(fn func(interface{}))
// DoLocked iterates over all the elements stored in the Sharded, and calls fn
// once for each element.
//
// DoLocked will observe a consistent snapshot of the elements in the Sharded;
// any previous creations will complete before the iteration begins, and all
// subsequent creations will wait until the iteration ends.
func (s *Sharded) DoLocked(fn func(interface{}))
// ShardInfo contains information about a CPU core.
type ShardInfo struct {
// ShardIndex is strictly less than any call to any prior call to SetMaxShards.
ShardIndex int
}
Here, we evaluate the proposed API in light of the use-cases described above.
A counter API can be fairly easily built on top of percpu.Sharded
.
Specifically, it would offer two methods IncrementBy(int64)
, and Sum() int64
.
The former would only allow positive increments (if required, clients can build
negative increments by composing two counters of additions and subtractions).
The implementation of IncrementBy
, would call GetOrCreate
, passing a
function that returned an *int64
. To avoid false sharing between cache lines,
it would probably return it as an interior pointer into a struct with
appropriate padding. Once the pointer is retrieved from GetOrCreate
, the
function would then use atomic.AddInt64
on that pointer with the value passed
to IncrementBy
.
The implementation of Sum
would call Do
to retrieve a snapshot of all
previously created values, then sum up their values using atomic.LoadInt64
.
If the application is managing many long-lived counters, then one possible
optimization would be to implement the Counter
type in terms of a
counterBatch
(which logically encapsulates N
independent counters). This can
drastically limit the padding required to fix false sharing between cache lines.
It is a little tricky to implement a drop-in replacement for sync.RWMutex
on
top of percpu.Sharded
. Naively, one could imagine a sharded lock composed of
many internal sync.RWMutex
instances. Calling RLock()
on the aggregate lock
would grab a single sync.RWMutex
instance using GetOrCreate
and then call
RLock()
on that instance. Unfortunately, because there is no state passed
between RLock()
and RUnlock()
(something we should probably consider fixing
for Go 2), we cannot implement RUnlock()
efficiently, as the percpu.Sharded
might have migrated to a different shard and therefore we've lost the
association to the original RLock()
.
That said, since such a sharded lock would be considerably more memory-hungry
than a normal sync.RWMutex
, callers should only replace truly contended
mutexes with a sharded lock, so requiring them to make minor API changes should
not be too onerous (particularly for mutexes, which should always be private
implementation details, and therefore not cross API boundaries). In particular,
one could have RLock()
on the sharded lock return a RLockHandle
type, which
has a RUnlock()
method. This RLockHandle
could keep an internal pointer to
the sync.RWMutex
that was initially chosen, and it can then RUnlock()
that
specific instance.
It's worth noting that it's also possible to drastically change the standard
library's sync.RWMutex
implementation itself to be scalable by default using
percpu.Sharded
; this is why the implementation sketch below is careful not not
use the sync
package to avoid circular dependencies. See Facebook's
SharedMutex
class to get a sense of how this could be done. However, that requires
significant research and deserves a proposal of its own.
It's straightforward to use percpu.Sharded
to implement a sharded RPC client.
This is a case where its likely that the default implementation will continue to
be unsharded, and a program will need to explicitly say something like
grpc.Dial("some-server", grpc.ShardedClient(4))
(where the "4" might come from
an application flag). This kind of client-contrallable sharding is one place
where the SetMaxShards
API can be useful.
This can be implemented using percpu.Sharded
. For instance, a writer would
call GetOrCreate
to retrieve a shard-local buffer, they would acquire a lock,
and insert the element into the buffer. If the buffer became full, they would
flush it downstream.
A watchdog goroutine could walk the buffers periodically using AppendAll
, and
flush partially-full buffers to ensure that elements are flushed fairly
promptly. If it finds no elements to flush, it could start incrementing a
counter of "useless" scans, and stop scanning after it reaches a threshold. If a
writer is enqueuing the first element in a buffer, and it sees the counter over
the threshold, it could reset the counter, and wake the watchdog.
What follows is a rough sketch of an implementation of percpu.Sharded
. This is
to show that this is implementable, and to give some context to the discussion
of performance below.
First, a sketch of an implementation for percpu.sharder
, an internal helper
type for percpu.Sharded
.
const (
defaultUserDefinedMaxShards = 32
)
type sharder struct {
maxShards int32
}
func (s *sharder) SetMaxShards(maxShards int) {
if maxShards < 1 {
panic("maxShards < 1")
}
atomic.StoreInt32(&s.maxShards, roundDownToPowerOf2(int32(maxShards)))
}
func (s *sharder) userDefinedMaxShards() int32 {
s := atomic.LoadInt32(&s.maxShards)
if s == 0 {
return defaultUserDefinedMaxShards
}
return s
}
func (s *sharder) shardInfo() ShardInfo {
shardId := runtime_getShardIndex()
// If we're in race mode, then all bets are off. Half the time, randomize the
// shardId completely, the rest of the time, use shardId 0.
//
// If we're in a test but not in race mode, then we want an implementation
// that keeps cache contention to a minimum so benchmarks work properly, but
// we still want to flush out any assumption of a stable mapping to shardId.
// So half the time, we double the id. This catches fewer problems than what
// we get in race mode, but it should still catch one class of issue (clients
// assuming that two sequential calls to Get() will return the same value).
if raceEnabled {
rnd := runtime_fastRand()
if rnd%2 == 0 {
shardId = 0
} else {
shardId += rnd / 2
}
} else if testing {
if runtime_fastRand()%2 == 0 {
shardId *= 2
}
}
shardId &= runtimeDefinedMaxShards()-1
shardId &= userDefinedMaxShards()-1
return ShardInfo{ShardIndex: shardId}
}
func runtimeDefinedMaxShards() int32 {
max := runtime_getMaxShards()
if (testing || raceEnabled) && max < 4 {
max = 4
}
return max
}
// Implemented in the runtime, should effectively be
// roundUpToPowerOf2(min(GOMAXPROCS, NumCPU)).
// (maybe caching that periodically in the P).
func runtime_getMaxShards() int32 {
return 4
}
// Implemented in the runtime, should effectively be the result of the getcpu
// syscall, or similar. The returned index should densified if possible (i.e.
// if binary is locked to cores 2 and 4), they should return 0 and 1
// respectively, not 2 and 4.
//
// Densification can be best-effort, and done with a process-wide mapping table
// maintained by sysmon periodically.
//
// Does not have to be bounded by runtime_getMaxShards(), or indeed by
// anything.
func runtime_getShardIndex() int32 {
return 0
}
// Implemented in the runtime. Only technically needs an implementation for
// raceEnabled and tests. Should be scalable (e.g. using a per-P seed and
// state).
func runtime_fastRand() int32 {
return 0
}
Next, a sketch of percpu.Sharded
itself.
type Sharded struct {
sharder
lock uintptr
data atomic.Value // *shardedData
typ unsafe.Pointer
}
func (s *Sharded) loadData() *shardedData {
return s.data.Load().(*shardedData)
}
func (s *Sharded) getFastPath(createFn func(ShardInfo) interface{}) (out interface{}) {
shardInfo := s.shardInfo()
curData := s.loadData()
if curData == nil || shardInfo.ShardIndex >= len(curData.elems) {
if createFn == nil {
return nil
}
return s.getSlowPath(shardInfo, createFn)
}
existing := curData.load(shardInfo.ShardIndex)
if existing == nil {
if createFn == nil {
return nil
}
return s.getSlowPath(shardInfo, createFn)
}
outp := (*ifaceWords)(unsafe.Pointer(&out))
outp.typ = s.typ
outp.data = existing
return
}
func (s *Sharded) getSlowPath(shardInfo ShardInfo, createFn func(ShardInfo) interface{}) (out interface{}) {
runtime_lock(&s.lock)
defer runtime_unlock(&s.lock)
curData := s.loadData()
if curData == nil || shardInfo.ShardIndex >= len(curData.elems) {
curData = allocShardedData(curData, shardInfo)
s.data.Store(curData)
}
existing := curData.load(shardInfo.ShardIndex)
if existing != nil {
outp := (*ifaceWords)(unsafe.Pointer(&out))
outp.typ = s.typ
outp.data = existing
return
}
newElem := createFn(shardInfo)
if newElem == nil {
panic("createFn returned nil value")
}
newElemP := *(*ifaceWords)(unsafe.Pointer(&newElem))
// If this is the first call to createFn, then stash the type-pointer for
// later verification.
//
// Otherwise, verify its the same as the previous.
if s.typ == nil {
s.typ = newElemP.typ
} else if s.typ != newElemP.typ {
panic("percpu: GetOrCreate was called with function that returned inconsistently typed value")
}
// Store back the new value.
curData.store(shardInfo.ShardIndex, newElemP.val)
// Return it.
outp := (*ifaceWords)(unsafe.Pointer(&out))
outp.typ = s.typ
outp.data = newElemP.val
}
func (s *Sharded) loadData() *shardedData {
return s.data.Load().(*shardedData)
}
func (s *Sharded) GetOrCreate(createFn func(ShardInfo) interface{}) interface{} {
if createFn == nil {
panic("createFn nil")
}
return s.getFastPath(createFn)
}
func (s *Sharded) Get() interface{} {
return s.getFastPath(nil)
}
func (s *Sharded) Do(fn func(interface{})) {
curData := s.loadData()
if curData == nil {
return nil
}
for i := range curData.elems {
elem := curData.load(i)
if elem == nil {
continue
}
var next interface{}
nextP := (*ifaceWords)(unsafe.Pointer(&next))
nextP.typ = s.typ
nextP.val = elem
fn(next)
}
return elems
}
func (s *Sharded) DoLocked(fn func(interface{})) {
runtime_lock(&s.lock)
defer runtime_unlock(&s.lock)
s.Do(fn)
}
type shardedData struct {
elems []unsafe.Pointer
}
percpu.sharder
As presented, calling shardInfo
on a percpu.sharder
makes two calls to the
runtime, and does a single atomic load.
However, both of the calls to the runtime would be satisfied with stale values.
So, an obvious avenue of optimization is to squeeze these two pieces of
information (effectively "current shard", and "max shards") into a single word,
and cache it on the P
when first calling the shardInfo
API. To accommodate
changes in the underlying values, a P
can store a timestamp when it last
computed these values, and clear the cache when the value is older than X
and
the P
is in the process of switching goroutines.
This means that effectively, shardInfo
will consist of 2 atomic loads, and a
little bit of math on the resulting values.
percpu.Sharded
In the get-for-current-shard path, percpu.Sharded
will call shardInfo
, and
then perform 2 atomic loads (to retrieve the list of elements and to retrieve
the specific element for the current shard, respectively). If either of these
loads fails, it might fall back to a much-slower slow path. In the fast path,
there's no allocation.
In the get-all path, percpu.Sharded
will perform a single atomic followed by a
O(n)
atomic loads, proportional to the number of elements stored in the
percpu.Sharded
. It will not allocate.
percpu.Sharded
With the given API, if GOMAXPROCS
is temporarily increased (or the CPUs
assigned to the given program), and then decreased to its original value, a
percpu.Sharded
might have allocated additional elements to satisfy the
additional CPUs. These additional elements would not be eligible for garbage
collection, as the percpu.Sharded
would retain an internal reference.
First, its worth noting that we cannot unilaterally shrink the number of
elements stored in a percpu.Sharded
, because this might affect program
correctness. For instance, this could result in counters losing values, or in
breaking the invariants of sharded locks.
The presented solution just sidesteps this problem by defaulting to a fairly low
value of MaxShards
. This can be overridden by the user with explicit action
(though the runtime has the freedom to bound the number more strictly than the
user's value, e.g. to limit the size of internal data-structures to reasonable
levels.).
One thing to keep in mind, clients who require garbage collection of stale
values can build this on top of percpu.Sharded
. For instance, one could
imagine a design where clients would maintain a counter recording each use. A
watchdog goroutine can then scan the elements and if a particular value has not
been used for some period of time, swap in a nil
pointer, and then gracefully
tear down the value (potentially transferring the logical data encapsulated to
other elements in the percpu.Sharded
).
Requiring clients to implement their own GC in this way seems kinda gross, but on the other hand, its unclear to me how to generically solve this problem without knowledge of the client use-case. One could imagine some sort of reference-counting design, but again, without knowing the semantics of the use-case, its hard to know if its safe to clear the reference to the type.
Also, for a performance-oriented type, like percpu.Sharded
, it seems
unfortunate to add unnecessary synchronization to the fast path of the type (and
I don't see how to implement something in this direction without adding
synchronization).
ShardInfo
a struct and not just an int?This is mostly to retain the ability to extend the API in a compatible manner. One concrete avenue is to add additional details to allow clients to optimize their code for the NUMA architecture of the machine. For instance, for a sharded buffering scheme (i.e. the "Order-independent accumulator" above), it might make sense to have multiple levels of buffering in play, with another level at the NUMA-node layer.
ShardInfo.ShardIndex
returning an id for the CPU, or the P
executing the goroutine?This is left unspecified, but the name of the package seems to imply the former. In practice, I think we want a combination.
That is to say, we would prefer that a program running on a 2-core machine with
GOMAXPROCS
set to 100 should use 2 shards, not 100. On the other hand, we
would also prefer that a program running on a 100-core machine with GOMAXPROCS
set to 2 should also use 2 shards, not 100.
This ideal state should be achievable on systems that provide reasonable APIs to retrieve the id of the current CPU.
That said, any implementation effort would likely start with a simple portable
implementation which uses the id of the local P
. This will allow us to get a
sense of the performance of the type, and to serve as a fallback implementation
for platforms where the necessary APIs are either not available, or require
privileged execution.
percpu.Sharded
to behave differently during tests?This is a good question; I am not certain of the answer here. I am confident
that during race mode, we should definitely randomize the behaviour of
percpu.Sharded
significantly (and the implementation sketch above does that).
However, for tests, the answer seems less clear to me.
As presented, the implementation sketch above randomizes the value by flipping
randomly between two values for every CPU. That seems like it will catch bugs
where the client assumes that sequential calls to Get
/GetOrCreate
will return
the same values. That amount of randomness seems warranted to me, though I'd
understand if folks would prefer to avoid it in favor of keeping non-test code
and test code behaving identically.
On a more mundane note: I'm not entirely sure if this is implementable with
zero-cost. One fairly efficient strategy would be an internal package that
exposes an "IsTesting bool", which is set by the testing
package and read by
the percpu
package. But ideally, this could be optimized away at compile time;
I don't believe we have any mechanism to do this now.
ShardInfo.ShardIndex
at all?I think so. Even if we don't, clients can retrieve an effectively equivalent
value by just incrementing an atomic integer inside the createFn
passed to
GetOrCreate
. For pre-allocated use-cases (e.g. see the Facebook SharedMutex
linked above), it seems important to let clients index into pre-allocated
memory.
We could define GetOrCreate
to behave like Get
if the passed createFn
is
nil. This is less API (and might be more efficient, until mid-stack inlining
works), but seems less semantically clean to me. It seems better to just have
clients say what they want explicitly.
If we had to choose one of those, then I would say we should expose Do
. This
is because it is the higher performance, minimal-synchronization version, and
DoLocked
can be implemented on top. That said, I do think we should just
provide both. The implementation is simple, and implementing it on top feels
odd.
Of the 4 use-cases presented above, 2 would probably use Do
(counters and
order-independent accumulators), and 2 would probably use DoLocked
(read-write
locks, and RPC clients (for the latter, probably just for implementing
Close()
)).
I'm not particularly wedded to any of the names in the API sketch above, so I'm happy to see it changed to whatever people prefer.
The API presented above is straightforward to implement without any runtime
support; in particular, this could be implemented as a thin wrapper around a
sync.Once
. This will not effectively reduce contention, but it would still be
a correct implementation. It's probably a good idea to implement such a shim,
and put it in the x/sync
repo, with appropriate build tags and type-aliases to
allow clients to immediately start using the new type.