OpenTelemetry Collector を開発する

カスタムコンポーネントの開発

Open Telemetry Collectorのためのコンポーネントを構築するには、以下の3つの主要な部分が必要です:

  1. Configuration - ユーザーが設定できる値は何か
  2. Factory - 提供された値を使ってコンポーネントを作成する
  3. Business Logic - コンポーネントが実行する必要があること

これについて、プロジェクトの重要なDevOpsメトリクスを追跡するためにJenkinsと連携するコンポーネントを構築する例を考えていきます。

測定しようとしているメトリクスは次のとおりです:

  1. 変更に対するリードタイム - 「コミットが本番環境に入るまでにかかる時間」
  2. 変更失敗率 - 「本番環境での障害を引き起こすデプロイの割合」
  3. デプロイ頻度 - 「[チーム]が本番環境に成功してリリースする頻度」
  4. 平均復旧時間 - 「[チーム]が本番環境の障害から復旧するのにかかる時間」

これらの指標は Google の DevOps Research and Assessment (DORA) チームによって特定されたもので、ソフトウェア開発チームのパフォーマンスを示すのに役立ちます。Jenkins CI を選択した理由は、私たちが同じオープンソースソフトウェアエコシステムに留まり、将来的にベンダー管理のCIツールが採用する例となることができるためです。

計装 🆚 コンポーネント

組織内でオブザーバビリティを向上させる際には、トレードオフが発生するため、考慮する点があります。

長所短所
(自動)計装1システムを観測するために外部APIが不要計装を変更するにはプロジェクトの変更が必要
システム所有者/開発者は可観測性の変更が可能ランタイムへの追加の依存が必要
システムの文脈を理解し、Exemplar とキャプチャされたデータを関連付けることが可能システムのパフォーマンスに影響を与える可能性がある
コンポーネントデータ名や意味の変更をシステムのリリースサイクルから独立した展開が可能APIの破壊的な変更の可能性があり、システムとコレクター間でリリースの調整が必要
その後の利用に合わせて収集されるデータの更新/拡張が容易キャプチャされたデータの意味がシステムリリースと一致せず、予期せず壊れる可能性がある

  1. 計装(instrument, インストゥルメント)とは、アプリケーションなどのシステムコンポーネントに対して、トレースやメトリクス、ログなどのテレメトリーデータを出力させる実装。計装ライブラリを最低限セットアップするだけで一通りのトレースやメトリクスなどを出力できるような対応を「自動計装」と呼びます。 ↩︎

Last Modified 2023/12/01

8. Developのサブセクション

OpenTelemetry Collector を開発する

プロジェクトのセットアップ Ninja

メモ

このワークショップのセクションを完了する時間は経験によって異なる場合があります。

完成したものはこちらにあります。詰まった場合や講師と一緒に進めたい場合に利用してください。

新しい Jenkins CI レシーバーの開発を始めるため、まずは Go プロジェクトのセットアップから始めていきます。 新しい Go プロジェクトを作成する手順は以下の通りです:

  1. ${HOME}/go/src/jenkinscireceiver という名前の新しいディレクトリを作成し、そのディレクトリに移動します。
    1. 実際のディレクトリ名や場所は厳密ではありません。自分の開発ディレクトリを自由に選ぶことができます。
  2. go mod init splunk.conf/workshop/example/jenkinscireceiver を実行して、Go のモジュールを初期化します。
    1. 依存関係を追跡するために使用される go.mod というファイルが作成されます。
    2. インポートされている依存関係のチェックサム値が go.sum として保存されます。

`` text module splunk.conf/workshop/example/jenkinscireceiver

go 1.20

Last Modified 2023/12/01

OpenTelemetry Collector を開発する

Configuration の構築

コンポーネントの Configuration 部分は、ユーザーがコンポーネントに対する入力を行う方法であり、設定に使用される値は以下のようである必要があります:

  1. そのフィールドが何を制御するのか、ユーザーが直感的に理解できる
  2. 必須項目とオプション項目が明確である
  3. 共通の名前とフィールドを再利用する
  4. オプションをシンプルに保つ
---
# Required Values
endpoint: http://my-jenkins-server:8089
auth:
    authenticator: basicauth/jenkins
# Optional Values
collection_interval: 10m
metrics:
    example.metric.1:
        enabled: true
    example.metric.2:
        enabled: true
    example.metric.3:
        enabled: true
    example.metric.4:
        enabled: true
---
jenkins_server_addr: hostname
jenkins_server_api_port: 8089
interval: 10m
filter_builds_by:
    - name: my-awesome-build
      status: amber
track:
    values:
        example.metric.1: yes
        example.metric.2: yes
        example.metric.3: no
        example.metric.4: no

悪い例では、Configuration のベストプラクティスに反するとコンポーネントが使いにくくなってしまうことが理解できるはずです。 フィールドの値が何であるべきかを明確ではなく、既存のプロセッサーに移譲できる機能を含み、コレクター内の他のコンポーネントと比較してフィールドの命名に一貫性がありません。

良い例では、必要な値をシンプルに保ち、他のコンポーネントからのフィールド名を再利用し、コンポーネントが Jenkins とコレクター間の相互作用にのみ焦点を当てています。

設定値の中には、このコンポーネントで独自に追加するものと、コレクター内部の共有ライブラリによって提供されているものがあります。これらはビジネスロジックに取り組む際にさらに詳しく説明します。Configuration は小さく始めるべきで、ビジネスロジックに追加の機能が必要になったら、設定も追加していきましょう。

コードを書く

Configuration に必要なコードを実装するために、config.go という名前の新しいファイルを以下の内容で作成します:

package jenkinscireceiver

import (
    "go.opentelemetry.io/collector/config/confighttp"
    "go.opentelemetry.io/collector/receiver/scraperhelper"

    "splunk.conf/workshop/example/jenkinscireceiver/internal/metadata"
)

type Config struct {
    // HTTPClientSettings contains all the values
    // that are commonly shared across all HTTP interactions
    // performed by the collector.
    confighttp.HTTPClientSettings `mapstructure:",squash"`
    // ScraperControllerSettings will allow us to schedule 
    // how often to check for updates to builds.
    scraperhelper.ScraperControllerSettings `mapstructure:",squash"`
    // MetricsBuilderConfig contains all the metrics
    // that can be configured.
    metadata.MetricsBuilderConfig `mapstructure:",squash"`
}
Last Modified 2023/12/01

OpenTelemetry Collector を開発する

コンポーネントを検討する

Jenkinsからメトリクスを取得するために必要なコンポーネントの種類をおさらいしましょう:

エクステンションが解決するビジネスユースケースは以下の通りです:

  1. 実行時の設定が必要な共有機能を持つ
  2. コレクターの実行時間の観察に間接的に役立つ

詳細については、エクステンションの概要を参照してください。

レシーバーが解決するビジネスユースケースは以下の通りです:

  • リモートソースからのデータの取得
  • リモートソースからのデータの受信

これらは一般的に pullpush ベースのデータ収集と呼ばれ、詳細についてはレシーバーの概要で読むことができます。

プロセッサーが解決するビジネスユースケースは以下の通りです:

  • データ、フィールド、または値の追加または削除
  • データの観察と意思決定
  • バッファリング、キューイング、および並べ替え

プロセッサーを通過するデータタイプは、下流のコンポーネントに同じデータタイプを転送する必要があることを覚えておいてください。 詳細については、プロセッサーの概要をご覧ください。

エクスポーターが解決するビジネスユースケースは以下の通りです:

  • データをツール、サービス、またはストレージに送信する

OpenTelemetryコレクターは「バックエンド」、すべてを一元化した観測可能性スイートを目指すのではなく、OpenTelemetryの創設原則に忠実であり続けることを目指しています。つまり、ベンダーに依存しない全ての人のための観測可能性です。詳細については、エクスポーターの概要をお読みください。

コネクターは比較的新しいコンポーネントで、このワークショップではあまり触れていません。 コネクターは、異なるテレメトリタイプやパイプラインをまたいで使用できるプロセッサーのようなものだといえます。たとえば、コネクターはログとしてデータを受け取り、メトリクスとして出力したり、あるパイプラインからメトリクスを受け取り、テレメトリーデータに関するメトリクスを提供したりすることができます。

コネクターが解決するビジネスケースは以下の通りです:

  • 異なるテレメトリタイプ間の変換
    • ログからメトリクスへ
    • トレースからメトリクスへ
    • メトリクスからログへ
  • 受信したデータを観察し、自身のデータを生成する
    • メトリクスを受け取り、データの分析メトリクスを生成する。

Ninjaセクションの一部としてプロセッサーの概要内で簡単に概要が説明されています。

これらのコンポーネントについて考えると、Jenkins に対応する場合はプルベースのレシーバーを開発する必要があることがわかります。

Last Modified 2023/12/01

OpenTelemetry Collector を開発する

メトリクスを設計する

レシーバーによってキャプチャされるメトリクスを定義し、エクスポートするために、コレクターのために開発された mdatagen を使って、yaml で定義したメトリクスをコードに変換していきます。

---
# Type defines the name to reference the component
# in the configuration file
type: jenkins

# Status defines the component type and the stability level
status:
  class: receiver
  stability:
    development: [metrics]

# Attributes are the expected fields reported
# with the exported values.
attributes:
  job.name:
    description: The name of the associated Jenkins job
    type: string
  job.status:
    description: Shows if the job had passed, or failed
    type: string
    enum:
    - failed
    - success
    - unknown

# Metrics defines all the pontentially exported values from this receiver. 
metrics:
  jenkins.jobs.count:
    enabled: true
    description: Provides a count of the total number of configured jobs
    unit: "{Count}"
    gauge:
      value_type: int
  jenkins.job.duration:
    enabled: true
    description: Show the duration of the job
    unit: "s"
    gauge:
      value_type: int
    attributes:
    - job.name
    - job.status
  jenkins.job.commit_delta:
    enabled: true
    description: The calculation difference of the time job was finished minus commit timestamp
    unit: "s"
    gauge:
      value_type: int
    attributes:
    - job.name
    - job.status
// To generate the additional code needed to capture metrics, 
// the following command to be run from the shell:
//  go generate -x ./...

//go:generate go run github.com/open-telemetry/opentelemetry-collector-contrib/cmd/mdatagen@v0.80.0 metadata.yaml
package jenkinscireceiver

// There is no code defined within this file.

次のセクションに進む前に、これらのファイルをプロジェクトフォルダ内に作成してください。

Factory の構築

Factory はソフトウェアデザインパターンの一種で、提供された Configuration を使って、動的にオブジェクト(この場合は jenkinscireceiver)を作成するものです。現実的な例では、携帯電話店に行って、あなたの正確な説明に合った携帯電話を求め、それを提供されるようなものです。

コマンド go generate -x ./... を実行すると、定義されたメトリクスをエクスポートするために必要なすべてのコードを含む新しいフォルダ jenkinscireceiver/internal/metadata が作成されます。生成されるコードは以下の通りです:

package jenkinscireceiver

import (
    "errors"

    "go.opentelemetry.io/collector/component"
    "go.opentelemetry.io/collector/config/confighttp"
    "go.opentelemetry.io/collector/receiver"
    "go.opentelemetry.io/collector/receiver/scraperhelper"

    "splunk.conf/workshop/example/jenkinscireceiver/internal/metadata"
)

func NewFactory() receiver.Factory {
    return receiver.NewFactory(
        metadata.Type,
        newDefaultConfig,
        receiver.WithMetrics(newMetricsReceiver, metadata.MetricsStability),
    )
}

func newMetricsReceiver(_ context.Context, set receiver.CreateSettings, cfg component.Config, consumer consumer.Metrics) (receiver.Metrics, error) {
    // Convert the configuration into the expected type
    conf, ok := cfg.(*Config)
    if !ok {
        return nil, errors.New("can not convert config")
    }
    sc, err := newScraper(conf, set)
    if err != nil {
        return nil, err
    }
    return scraperhelper.NewScraperControllerReceiver(
        &conf.ScraperControllerSettings,
        set,
        consumer,
        scraperhelper.AddScraper(sc),
    )
}
package jenkinscireceiver

import (
    "go.opentelemetry.io/collector/config/confighttp"
    "go.opentelemetry.io/collector/receiver/scraperhelper"

    "splunk.conf/workshop/example/jenkinscireceiver/internal/metadata"
)

type Config struct {
    // HTTPClientSettings contains all the values
    // that are commonly shared across all HTTP interactions
    // performed by the collector.
    confighttp.HTTPClientSettings `mapstructure:",squash"`
    // ScraperControllerSettings will allow us to schedule 
    // how often to check for updates to builds.
    scraperhelper.ScraperControllerSettings `mapstructure:",squash"`
    // MetricsBuilderConfig contains all the metrics
    // that can be configured.
    metadata.MetricsBuilderConfig `mapstructure:",squash"`
}

func newDefaultConfig() component.Config {
    return &Config{
        ScraperControllerSettings: scraperhelper.NewDefaultScraperControllerSettings(metadata.Type),
        HTTPClientSettings:        confighttp.NewDefaultHTTPClientSettings(),
        MetricsBuilderConfig:      metadata.DefaultMetricsBuilderConfig(),
    }
}
package jenkinscireceiver

type scraper struct {}

func newScraper(cfg *Config, set receiver.CreateSettings) (scraperhelper.Scraper, error) {
    // Create a our scraper with our values 
    s := scraper{
        // To be filled in later
    }
    return scraperhelper.NewScraper(metadata.Type, s.scrape)
}

func (scraper) scrape(ctx context.Context) (pmetric.Metrics, error) {
    // To be filled in
    return pmetrics.NewMetrics(), nil
}
---
dist:
  name: otelcol
  description: "Conf workshop collector"
  output_path: ./dist
  version: v0.0.0-experimental

extensions:
  - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/extension/basicauthextension v0.80.0
  - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckextension v0.80.0

receivers:
  - gomod: go.opentelemetry.io/collector/receiver/otlpreceiver v0.80.0
  - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jaegerreceiver v0.80.0
  - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver v0.80.0
  - gomod: splunk.conf/workshop/example/jenkinscireceiver v0.0.0
    path: ./jenkinscireceiver

processors:
  - gomod: go.opentelemetry.io/collector/processor/batchprocessor v0.80.0

exporters:
  - gomod: go.opentelemetry.io/collector/exporter/loggingexporter v0.80.0
  - gomod: go.opentelemetry.io/collector/exporter/otlpexporter v0.80.0
  - gomod: go.opentelemetry.io/collector/exporter/otlphttpexporter v0.80.0

# This replace is a go directive that allows for redefine
# where to fetch the code to use since the default would be from a remote project.
replaces:
- splunk.conf/workshop/example/jenkinscireceiver => ./jenkinscireceiver
├── build-config.yaml
└── jenkinscireceiver
    ├── go.mod
    ├── config.go
    ├── factory.go
    ├── scraper.go
    └── internal
      └── metadata

これらのファイルがプロジェクトに作成されたら、go mod tidy を実行します。すると、すべての依存ライブラリが取得され、go.mod が更新されます。

Last Modified 2023/12/01

OpenTelemetry Collector を開発する

ビジネスロジックを作る

この時点では、何も行っていないカスタムコンポーネントが作成されています。ここから、Jenkins からデータを取得するための必要なロジックを追加していきましょう。

ここからのステップは以下の通りです:

  1. Jenkinsに接続するクライアントを作成する
  2. 設定されたすべてのジョブをキャプチャする
  3. 設定されたジョブの最後のビルドのステータスを報告する
  4. コミットタイムスタンプとジョブ完了の時間差を計算する

変更を scraper.go に加えていきます。

Jenkinsサーバーに接続するために、パッケージ “github.com/yosida95/golang-jenkins” を使用します。これには、Jenkinsサーバーからデータを読み取るために必要な機能が提供されています。

次に、“go.opentelemetry.io/collector/receiver/scraperhelper” ライブラリのいくつかのヘルパー関数を利用して、コンポーネントの起動が完了したらJenkinsサーバーに接続できるようにするスタート関数を作成します。

package jenkinscireceiver

import (
    "context"

    jenkins "github.com/yosida95/golang-jenkins"
    "go.opentelemetry.io/collector/component"
    "go.opentelemetry.io/collector/pdata/pmetric"
    "go.opentelemetry.io/collector/receiver"
    "go.opentelemetry.io/collector/receiver/scraperhelper"

    "splunk.conf/workshop/example/jenkinscireceiver/internal/metadata"
)

type scraper struct {
    mb     *metadata.MetricsBuilder
    client *jenkins.Jenkins
}

func newScraper(cfg *Config, set receiver.CreateSettings) (scraperhelper.Scraper, error) {
    s := &scraper{
        mb : metadata.NewMetricsBuilder(cfg.MetricsBuilderConfig, set),
    }
    
    return scraperhelper.NewScraper(
        metadata.Type,
        s.scrape,
        scraperhelper.WithStart(func(ctx context.Context, h component.Host) error {
            client, err := cfg.ToClient(h, set.TelemetrySettings)
            if err != nil {
                return err
            }
            // The collector provides a means of injecting authentication
            // on our behalf, so this will ignore the libraries approach
            // and use the configured http client with authentication.
            s.client = jenkins.NewJenkins(nil, cfg.Endpoint)
            s.client.SetHTTPClient(client)
            return nil
        }),
    )
}

func (s scraper) scrape(ctx context.Context) (pmetric.Metrics, error) {
    // To be filled in
    return pmetric.NewMetrics(), nil
}

これで、Jenkinsレシーバーを初期化するために必要なすべてのコードが完成しました。

ここから先は、実装が必要な scrape メソッドに焦点を当てます。このメソッドは、設定された間隔(デフォルトでは1分)ごとに実行されます。

Jenkins サーバーの負荷状況や、どの程度のプロジェクトが実行されているかを測定するために、Jenkins で設定されているジョブの数をキャプチャしたいと考えています。これを行うために、Jenkins クライアントを呼び出してすべてのジョブをリスト化し、エラーが報告された場合はメトリクスなしでそれを返し、そうでなければメトリクスビルダーからのデータを発行します。

func (s scraper) scrape(ctx context.Context) (pmetric.Metrics, error) {
    jobs, err := s.client.GetJobs()
    if err != nil {
        return pmetric.Metrics{}, err
    }

    // Recording the timestamp to ensure
    // all captured data points within this scrape have the same value. 
    now := pcommon.NewTimestampFromTime(time.Now())
    
    // Casting to an int64 to match the expected type
    s.mb.RecordJenkinsJobsCountDataPoint(now, int64(len(jobs)))
    
    // To be filled in

    return s.mb.Emit(), nil
}

前のステップにより、すべてのジョブをキャプチャしてジョブの数をレポートできるようになりました。 このステップでは、それぞれのジョブを調査し、レポートされた値を使用してメトリクスをキャプチャしていきます。

func (s scraper) scrape(ctx context.Context) (pmetric.Metrics, error) {
    jobs, err := s.client.GetJobs()
    if err != nil {
        return pmetric.Metrics{}, err
    }

    // Recording the timestamp to ensure
    // all captured data points within this scrape have the same value. 
    now := pcommon.NewTimestampFromTime(time.Now())
    
    // Casting to an int64 to match the expected type
    s.mb.RecordJenkinsJobsCountDataPoint(now, int64(len(jobs)))
    
    for _, job := range jobs {
        // Ensure we have valid results to start off with
        var (
            build  = job.LastCompletedBuild
            status = metadata.AttributeJobStatusUnknown
        )

        // This will check the result of the job, however,
        // since the only defined attributes are 
        // `success`, `failure`, and `unknown`. 
        // it is assume that anything did not finish 
        // with a success or failure to be an unknown status.

        switch build.Result {
        case "aborted", "not_built", "unstable":
            status = metadata.AttributeJobStatusUnknown
        case "success":
            status = metadata.AttributeJobStatusSuccess
        case "failure":
            status = metadata.AttributeJobStatusFailed
        }

        s.mb.RecordJenkinsJobDurationDataPoint(
            now,
            int64(job.LastCompletedBuild.Duration),
            job.Name,
            status,
        )
    }

    return s.mb.Emit(), nil
}

最後のステップでは、コミットからジョブ完了までにかかった時間を計算して、DORA メトリクス を推測するのに役立てていきます。

func (s scraper) scrape(ctx context.Context) (pmetric.Metrics, error) {
    jobs, err := s.client.GetJobs()
    if err != nil {
        return pmetric.Metrics{}, err
    }

    // Recording the timestamp to ensure
    // all captured data points within this scrape have the same value. 
    now := pcommon.NewTimestampFromTime(time.Now())
    
    // Casting to an int64 to match the expected type
    s.mb.RecordJenkinsJobsCountDataPoint(now, int64(len(jobs)))
    
    for _, job := range jobs {
        // Ensure we have valid results to start off with
        var (
            build  = job.LastCompletedBuild
            status = metadata.AttributeJobStatusUnknown
        )

        // Previous step here

        // Ensure that the `ChangeSet` has values
        // set so there is a valid value for us to reference
        if len(build.ChangeSet.Items) == 0 {
            continue
        }

        // Making the assumption that the first changeset
        // item is the most recent change.
        change := build.ChangeSet.Items[0]

        // Record the difference from the build time
        // compared against the change timestamp.
        s.mb.RecordJenkinsJobCommitDeltaDataPoint(
            now,
            int64(build.Timestamp-change.Timestamp),
            job.Name,
            status,
        )
    }

    return s.mb.Emit(), nil
}

これらのステップがすべて完了すると、Jenkins CI レシーバーが完成します!

次は何をするの?

コンポーネントに必要な機能は、おそらく他にもたくさん思いつくでしょう。例えば:

  • ジョブで使用されたブランチ名を含めることはできますか?
  • ジョブのプロジェクト名を含めることはできますか?
  • プロジェクトのジョブの総持続時間をどのように計算しますか?
  • 変更が機能するかどうかをどのように検証しますか?

この時間を使って遊んでみたり、壊してみたり、変更してみたり、ビルドからのログをキャプチャしてみるなどしてください。