Exploring Prometheus Code - part 1 (personal notes)
I recently started learning the Go language and part of the process was to explore the code of popular open-source projects. Prometheus is one of the projects that I selected. It's a popular project with 848 contributors and more than 51k starts.
The objectives of this article:
understand how the code is organized in the filesystem,
explore some of the popular packages used on this project,
understand how the different modules are loaded
understand how the communication between the modules is implemented.
I started with the `/cmd` folder, as it stores the main function that will be called when we run Prometheus from the CLI.
Before reading that function, it'll be useful to learn about the following packages:
kingpin: to parse the command line arguments
run: A universal mechanism to manage goroutine lifecycles (from the repo description). check reference
context: to share state between function/goroutines. check reference
The main function does the following things:
- parsing arguments and load configuration and populate the data structure `flagConfig` (using kingpin package)
a.Flag("config.file", "Prometheus configuration file path.").
Default("prometheus.yml").StringVar(&cfg.configFile)
a.Flag("web.listen-address", "Address to listen on for UI, API, and telemetry.").
Default("0.0.0.0:9090").StringVar(&cfg.web.ListenAddress)
- checking the consistency between the configurations
if agentMode && len(serverOnlyFlags) > 0 {
fmt.Fprintf(os.Stderr, "The following flag(s) can not be used in agent mode: %q", serverOnlyFlags)
os.Exit(3)
}
- Launching the different modules (using run package)
var g run.Group
...
{
// Scrape manager.
g.Add(
func() error {
<-reloadReady.C
err := scrapeManager.Run(discoveryManagerScrape.SyncCh())
level.Info(logger).Log("msg", "Scrape manager stopped")
return err
},
func(err error) {
level.Info(logger).Log("msg", "Stopping scrape manager...")
scrapeManager.Stop()
},
)
}
...
the project is composed of several modules, each module is stored in a folder at the root level of the project. (internal architecture)
I started with the two modules: discovery
and scrape
.
discovery
implements the service discovery (ScrapeDiscoveryManager
) that collects the target groups and sends them to scrape
scrape
implement the scrape manager (ScrapeManager
) that controls the scraper pools (check internal architecture for details).
Each of those two managers will run in a different go-routine.
ScrapeDiscoveryManager
will send the target groups to the ScrapeManager
through a sync channel (SyncCh). This channel will be passed on the creation time in the main function.
g.Add(
func() error {
// When the scrape manager receives a new targets list
// it needs to read a valid config for each job.
// It depends on the config being in sync with the discovery manager so
// we wait until the config is fully loaded.
<-reloadReady.C
err := scrapeManager.Run(discoveryManagerScrape.SyncCh())
return err
},
the ScrapeManager#Run
runs a loop that will watch for updates from the ScrapeDiscoveryManager
and update the scraper pools accordingly.
// tsets is returned channel from discoveryManagerScrape.SyncCh()
func (m *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) error {
go m.reloader()
for {
select {
case ts := <-tsets: // New Updates!!
m.updateTsets(ts)
select {
case m.triggerReload <- struct{}{}: // trigger reload
default:
}
case <-m.graceShut:
return nil
}
}
}
the reloader
will reload
that will create/update the scraper pools
func (m *Manager) reloader() {
...
ticker := time.NewTicker(time.Duration(reloadIntervalDuration))
defer ticker.Stop()
for {
select {
case <-m.graceShut:
return
case <-ticker.C:
select {
case <-m.triggerReload:
m.reload() // Update the scraper pools
case <-m.graceShut:
return
}
}
}
}
Now, let's return to ScrapeDiscoveryManager
, to understand how it works we need to explore the data structure (some details are omitted).
//discovery/manager.go
type Manager struct {
targets // contains the list of all the targets
providers []*Provider
syncCh // the communication channel to send targets to ScrapeManager
}
type Provider struct {
name string
d Discoverer
//....
}
type Discoverer interface {
// Run hands a channel to the discovery provider (Consul, DNS, etc.) through which
// it can send updated target groups.
Run(ctx context.Context, up chan<- []*targetgroup.Group)
}
The manager has a list of providers, each one includes a discoverer that'll periodically create lists of targets and send them to the discovery manager.
We can see that on the ScrapeDiscoveryManager`s startProvider
an update
channel is created and passed to the discoverer and the updater
.
func (m *Manager) startProvider(ctx context.Context, p *Provider) {
level.Debug(m.logger).Log("msg", "Starting provider", "provider", p.name, "subs", fmt.Sprintf("%v", p.subs))
ctx, cancel := context.WithCancel(ctx)
updates := make(chan []*targetgroup.Group)
p.cancel = cancel
go p.d.Run(ctx, updates)
go m.updater(ctx, p, updates)
}
Each discoverer implements the function refresh
function that will collect data and send it. (Some discoverers interact with the provider through refresh/refresh
).
The discoverer's logic can be summarized as follows:
func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
ticker := time.NewTicker(d.interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
tgs, err := d.refresh(ctx)
if err != nil {
if !errors.Is(ctx.Err(), context.Canceled) {
level.Error(d.logger).Log("msg", "Unable to refresh target groups", "err", err.Error())
}
continue
}
select {
case ch <- tgs:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}
Now on ScrapeDiscoveryManager
, when updater
receives new updates from the discoverer, it'll
update the statistics
update the list of targets
trigger send: unblock the
sender
(checkScrapeDiscoveryManager
'Run
below) loop to send the update to theScrapeManager
func (m *Manager) updater(ctx context.Context, p *Provider, updates chan []*targetgroup.Group) {
// Ensure targets from this provider are cleaned up.
defer m.cleaner(p)
for {
select {
case <-ctx.Done():
return
case tgs, ok := <-updates:
receivedUpdates.WithLabelValues(m.name).Inc()
//...
p.mu.RLock()
for s := range p.subs {
m.updateGroup(poolKey{setName: s, provider: p.name}, tgs)
}
p.mu.RUnlock()
select {
case m.triggerSend <- struct{}{}:
default:
}
}
}
}
The ScrapeDiscoveryManager
's Run method will run the sender
function which is responsible for sending updates to the ScrapeManager
func (m *Manager) Run() error {
go m.sender() // send updates to ScrapeManager
<-m.ctx.Done()
m.cancelDiscoverers()
return m.ctx.Err()
}
func (m *Manager) sender() {
ticker := time.NewTicker(m.updatert)
defer ticker.Stop()
for {
select {
case <-m.ctx.Done():
return
case <-ticker.C: // Some discoverers send updates too often, so we throttle these with the ticker.
select {
case <-m.triggerSend:
sentUpdates.WithLabelValues(m.name).Inc()
select {
case m.syncCh <- m.allGroups():
//...
default:
}
}
}
}
The diagram below summarizes the communication flow between the discoverer to the scrape manager