OpenTelemetry Collector 開発

カスタムコンポーネントの開発

OpenTelemetry Collector 用のコンポーネントを構築するには、3つの主要な部分が必要です

  1. Configuration - ユーザーに公開される設定値
  2. Factory - 提供された値を使用してコンポーネントを作成する
  3. Business Logic - コンポーネントが実行する必要がある処理

ここでは、プロジェクトの重要な DevOps メトリクスを追跡できるように、Jenkins と連携するコンポーネントを構築する例を使用します。

測定しようとしているメトリクスは以下の通りです

  1. Lead time for changes - “コミットが本番環境にデプロイされるまでにかかる時間”
  2. Change failure rate - “本番環境で障害を引き起こすデプロイメントの割合”
  3. Deployment frequency - "[チーム]が本番環境に正常にリリースする頻度"
  4. Mean time to recover - "[チーム]が本番環境の障害から復旧するまでにかかる時間"

これらの指標は、ソフトウェア開発チームのパフォーマンスを示すために、Google の DevOps Research and Assessment(DORA)[^1] チームによって特定されました。Jenkins CI を選択した理由は、同じオープンソースソフトウェアのエコシステム内にとどまることで、将来ベンダーが管理する CI ツールが採用するための例として機能できるからです。

計装 vs コンポーネント

組織内のオブザーバビリティのレベルを向上させる際に考慮すべきことがあります。いくつかのトレードオフが生じるためです。

メリットデメリット
(自動)計装システムを監視するために外部 API を監視する必要がありません。計装を変更するにはプロジェクトへの変更が必要です。
システムオーナー/開発者がオブザーバビリティを変更する権限を持てます。追加のランタイム依存関係が必要です。
システムコンテキストを理解し、キャプチャしたデータを Exemplars と関連付けることができます。システムのパフォーマンスに影響を与える可能性があります。
コンポーネントデータ名やセマンティクスへの変更をシステムのリリースサイクルとは独立して展開できます。API の破壊的な変更には、システムと Collector の間で調整されたリリースが必要です。
収集されるデータの更新/拡張は、ユーザーにとってシームレスな変更です。キャプチャされたデータのセマンティクスが、新しいシステムリリースと一致しない形で予期せず壊れる可能性があります。
サポートチームがオブザーバビリティの実践について深い理解を持つ必要がありません。システムから外部に公開された情報のみを表面化できます。
Last Modified 2026/01/23

8. Developのサブセクション

OpenTelemetry Collector 開発

プロジェクトのセットアップ Ninja

メモ

このワークショップセクションを完了するまでの時間は、経験によって異なります。

行き詰まった場合やインストラクターに沿って進めたい場合は、こちらに完全なソリューションがあります。

新しい Jenkins CI レシーバーの開発を始めるには、まず Golang プロジェクトをセットアップする必要があります。 新しい Golang プロジェクトを作成する手順は以下の通りです

  1. ${HOME}/go/src/jenkinscireceiver という名前の新しいディレクトリを作成し、そのディレクトリに移動します
    1. 実際のディレクトリ名や場所は厳密ではなく、独自の開発ディレクトリを選択して作成できます。
  2. go mod init splunk.conf/workshop/example/jenkinscireceiver を実行して golang モジュールを初期化します
    1. これにより go.mod というファイルが作成され、直接的および間接的な依存関係を追跡するために使用されます
    2. 最終的には、インポートされる依存関係のチェックサム値である go.sum が生成されます。
Check-ingo.mod を確認する
module splunk.conf/workshop/example/jenkinscireceiver

go 1.20
Last Modified 2026/01/23

OpenTelemetry Collector 開発

Configuration の構築

コンポーネントの Configuration 部分は、ユーザーがコンポーネントに対して入力を行う方法です。そのため、設定に使用される値は以下の条件を満たす必要があります

  1. そのフィールドが何を制御するかをユーザーが直感的に理解できること
  2. 必須と任意を明確にすること
  3. 一般的な名前とフィールドを再利用すること
  4. オプションをシンプルに保つこと
---
jenkins_server_addr: hostname
jenkins_server_api_port: 8089
interval: 10m
filter_builds_by:
    - name: my-awesome-build
      status: amber
track:
    values:
        example.metric.1: yes
        example.metric.2: yes
        example.metric.3: no
        example.metric.4: no
---
# Required Values
endpoint: http://my-jenkins-server:8089
auth:
    authenticator: basicauth/jenkins
# Optional Values
collection_interval: 10m
metrics:
    example.metric.1:
        enabled: true
    example.metric.2:
        enabled: true
    example.metric.3:
        enabled: true
    example.metric.4:
        enabled: true

悪い設定例は、設定のベストプラクティスの逆を行うことがコンポーネントの使いやすさにどのように影響するかを示しています。フィールド値が何であるべきかが明確ではなく、既存のプロセッサにプッシュできる機能が含まれており、フィールド名が Collector に存在する他のコンポーネントと一貫していません。

良い設定例は、必須の値をシンプルに保ち、他のコンポーネントからフィールド名を再利用し、コンポーネントが Jenkins と Collector 間の相互作用のみに焦点を当てることを確保しています。

コードタブは、私たちが追加する必要がある量と、Collector 内の共有ライブラリによってすでに提供されているものを示しています。これらはビジネスロジックに到達したときにより詳細に説明します。Configuration は小さく始まり、追加機能が必要になるとビジネスロジックが含まれるようになると変更されます。

コードを書く

Configuration に必要なコードを実装するために、以下の内容で config.go という新しいファイルを作成します

package jenkinscireceiver

import (
    "go.opentelemetry.io/collector/config/confighttp"
    "go.opentelemetry.io/collector/receiver/scraperhelper"

    "splunk.conf/workshop/example/jenkinscireceiver/internal/metadata"
)

type Config struct {
    // HTTPClientSettings contains all the values
    // that are commonly shared across all HTTP interactions
    // performed by the collector.
    confighttp.HTTPClientSettings `mapstructure:",squash"`
    // ScraperControllerSettings will allow us to schedule
    // how often to check for updates to builds.
    scraperhelper.ScraperControllerSettings `mapstructure:",squash"`
    // MetricsBuilderConfig contains all the metrics
    // that can be configured.
    metadata.MetricsBuilderConfig `mapstructure:",squash"`
}
Last Modified 2026/01/23

OpenTelemetry Collector 開発

コンポーネントのレビュー

Jenkins からメトリクスをキャプチャするために必要なコンポーネントの種類を振り返ります

Extension が解決するビジネスユースケースは以下の通りです

  1. ランタイム設定が必要な共有機能を持つこと
  2. Collector のランタイムを観測することを間接的に支援すること

詳細は Extensions の概要 を参照してください。

Receiver が解決するビジネスユースケースは以下の通りです

  • リモートソースからデータをフェッチする
  • リモートソースからデータを受信する

これは一般的に pull 型と push 型のデータ収集と呼ばれ、詳細は Receiver の概要 で読むことができます。

Processor が解決するビジネスユースケースは以下の通りです

  • データ、フィールド、または値の追加や削除
  • データを観測し、意思決定を行う
  • バッファリング、キューイング、並び替え

留意すべき点は、Processor を流れるデータタイプは、下流のコンポーネントに同じデータタイプを転送する必要があるということです。詳細は Processor の概要 をお読みください。

Exporter が解決するビジネスユースケースは以下の通りです

  • ツール、サービス、またはストレージにデータを送信する

OpenTelemetry Collector は「バックエンド」、つまりオールインワンのオブザーバビリティスイートになることを望んでおらず、むしろ OpenTelemetry を創設した原則を維持しています。つまり、すべての人のためのベンダーに依存しないオブザーバビリティです。詳細を再確認するには、Exporter の概要 をお読みください。

これはワークショップで見逃されたコンポーネントタイプです。比較的新しい Collector への追加であるためです。Connector を考える最良の方法は、異なるテレメトリタイプとパイプライン間で使用できる Processor のようなものです。つまり、Connector はログとしてデータを受け入れ、メトリクスを出力したり、あるパイプラインからメトリクスを受け入れ、観測したデータに関するメトリクスを提供したりできます。

Connector が解決するビジネスケースは以下の通りです

  • 異なるテレメトリタイプ間の変換
    • ログからメトリクス
    • トレースからメトリクス
    • メトリクスからログ
  • 受信データを観測し、独自のデータを生成する
    • メトリクスを受け入れ、データの分析メトリクスを生成する。

Processor の概要Ninja セクションに簡単な概要がありました。新しい Connector コンポーネントの更新についてはプロジェクトを確認してください。

コンポーネントの概要から、Jenkins 用のプルベースのレシーバーを開発することが明確です。

Last Modified 2026/01/23

OpenTelemetry Collector 開発

メトリクスの設計

レシーバーでキャプチャしたメトリクスを定義およびエクスポートするために、mdatagen を使用します。これは、YAML で定義されたメトリクスをコードに変換する Collector 用に開発されたツールです。

---
# Type defines the name to reference the component
# in the configuration file
type: jenkins

# Status defines the component type and the stability level
status:
  class: receiver
  stability:
    development: [metrics]

# Attributes are the expected fields reported
# with the exported values.
attributes:
  job.name:
    description: The name of the associated Jenkins job
    type: string
  job.status:
    description: Shows if the job had passed, or failed
    type: string
    enum:
    - failed
    - success
    - unknown

# Metrics defines all the pontentially exported values from this receiver.
metrics:
  jenkins.jobs.count:
    enabled: true
    description: Provides a count of the total number of configured jobs
    unit: "{Count}"
    gauge:
      value_type: int
  jenkins.job.duration:
    enabled: true
    description: Show the duration of the job
    unit: "s"
    gauge:
      value_type: int
    attributes:
    - job.name
    - job.status
  jenkins.job.commit_delta:
    enabled: true
    description: The calculation difference of the time job was finished minus commit timestamp
    unit: "s"
    gauge:
      value_type: int
    attributes:
    - job.name
    - job.status
// To generate the additional code needed to capture metrics,
// the following command to be run from the shell:
//  go generate -x ./...

//go:generate go run github.com/open-telemetry/opentelemetry-collector-contrib/cmd/mdatagen@v0.80.0 metadata.yaml
package jenkinscireceiver

// There is no code defined within this file.

次のセクションに進む前に、これらのファイルをプロジェクトフォルダー内に作成してください。

Factory の構築

Factory は、オブジェクト(この場合は jenkinscireceiver)を提供された設定で動的に作成できるようにするソフトウェアデザインパターンです。より現実世界の例を使うと、電話ショップに行き、自分の説明と正確に一致する電話を求め、それを提供してもらうようなものです。

go generate -x ./... コマンドを実行すると、定義されたメトリクスをエクスポートするために必要なすべてのコードを含む新しいフォルダー jenkinscireceiver/internal/metadata が作成されます。必要なコードは以下の通りです

package jenkinscireceiver

import (
    "errors"

    "go.opentelemetry.io/collector/component"
    "go.opentelemetry.io/collector/config/confighttp"
    "go.opentelemetry.io/collector/receiver"
    "go.opentelemetry.io/collector/receiver/scraperhelper"

    "splunk.conf/workshop/example/jenkinscireceiver/internal/metadata"
)

func NewFactory() receiver.Factory {
    return receiver.NewFactory(
        metadata.Type,
        newDefaultConfig,
        receiver.WithMetrics(newMetricsReceiver, metadata.MetricsStability),
    )
}

func newMetricsReceiver(_ context.Context, set receiver.CreateSettings, cfg component.Config, consumer consumer.Metrics) (receiver.Metrics, error) {
    // Convert the configuration into the expected type
    conf, ok := cfg.(*Config)
    if !ok {
        return nil, errors.New("can not convert config")
    }
    sc, err := newScraper(conf, set)
    if err != nil {
        return nil, err
    }
    return scraperhelper.NewScraperControllerReceiver(
        &conf.ScraperControllerSettings,
        set,
        consumer,
        scraperhelper.AddScraper(sc),
    )
}
package jenkinscireceiver

import (
    "go.opentelemetry.io/collector/config/confighttp"
    "go.opentelemetry.io/collector/receiver/scraperhelper"

    "splunk.conf/workshop/example/jenkinscireceiver/internal/metadata"
)

type Config struct {
    // HTTPClientSettings contains all the values
    // that are commonly shared across all HTTP interactions
    // performed by the collector.
    confighttp.HTTPClientSettings `mapstructure:",squash"`
    // ScraperControllerSettings will allow us to schedule
    // how often to check for updates to builds.
    scraperhelper.ScraperControllerSettings `mapstructure:",squash"`
    // MetricsBuilderConfig contains all the metrics
    // that can be configured.
    metadata.MetricsBuilderConfig `mapstructure:",squash"`
}

func newDefaultConfig() component.Config {
    return &Config{
        ScraperControllerSettings: scraperhelper.NewDefaultScraperControllerSettings(metadata.Type),
        HTTPClientSettings:        confighttp.NewDefaultHTTPClientSettings(),
        MetricsBuilderConfig:      metadata.DefaultMetricsBuilderConfig(),
    }
}
package jenkinscireceiver

type scraper struct {}

func newScraper(cfg *Config, set receiver.CreateSettings) (scraperhelper.Scraper, error) {
    // Create a our scraper with our values
    s := scraper{
        // To be filled in later
    }
    return scraperhelper.NewScraper(metadata.Type, s.scrape)
}

func (scraper) scrape(ctx context.Context) (pmetric.Metrics, error) {
    // To be filled in
    return pmetrics.NewMetrics(), nil
}
---
dist:
  name: otelcol
  description: "Conf workshop collector"
  output_path: ./dist
  version: v0.0.0-experimental

extensions:
  - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/extension/basicauthextension v0.80.0
  - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckextension v0.80.0

receivers:
  - gomod: go.opentelemetry.io/collector/receiver/otlpreceiver v0.80.0
  - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jaegerreceiver v0.80.0
  - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver v0.80.0
  - gomod: splunk.conf/workshop/example/jenkinscireceiver v0.0.0
    path: ./jenkinscireceiver

processors:
  - gomod: go.opentelemetry.io/collector/processor/batchprocessor v0.80.0

exporters:
  - gomod: go.opentelemetry.io/collector/exporter/loggingexporter v0.80.0
  - gomod: go.opentelemetry.io/collector/exporter/otlpexporter v0.80.0
  - gomod: go.opentelemetry.io/collector/exporter/otlphttpexporter v0.80.0

# This replace is a go directive that allows for redefine
# where to fetch the code to use since the default would be from a remote project.
replaces:
- splunk.conf/workshop/example/jenkinscireceiver => ./jenkinscireceiver
├── build-config.yaml
└── jenkinscireceiver
    ├── go.mod
    ├── config.go
    ├── factory.go
    ├── scraper.go
    └── internal
      └── metadata

これらのファイルを期待される内容でプロジェクトに書き込んだら、go mod tidy を実行します。これにより、すべてのリモート依存関係がフェッチされ、go.mod が更新され、go.sum ファイルが生成されます。

Last Modified 2026/01/23

OpenTelemetry Collector 開発

ビジネスロジックの構築

この時点で、現在何もしないカスタムコンポーネントがあるため、Jenkins からこのデータをキャプチャするために必要なロジックを追加する必要があります。

ここから、実行する必要があるステップは以下の通りです

  1. Jenkins に接続するクライアントを作成する
  2. 設定されたすべてのジョブをキャプチャする
  3. 設定されたジョブの最後のビルドのステータスを報告する
  4. コミットのタイムスタンプとジョブ完了の時間差を計算する

変更は scraper.go に対して行われます。

Jenkins サーバーに接続できるようにするために、“github.com/yosida95/golang-jenkins” パッケージを使用します。このパッケージは、Jenkins サーバーからデータを読み取るために必要な機能を提供します。

次に、“go.opentelemetry.io/collector/receiver/scraperhelper” ライブラリのヘルパー関数を利用して、コンポーネントの起動が完了した後に Jenkins サーバーに接続できるように start 関数を作成します。

package jenkinscireceiver

import (
    "context"

    jenkins "github.com/yosida95/golang-jenkins"
    "go.opentelemetry.io/collector/component"
    "go.opentelemetry.io/collector/pdata/pmetric"
    "go.opentelemetry.io/collector/receiver"
    "go.opentelemetry.io/collector/receiver/scraperhelper"

    "splunk.conf/workshop/example/jenkinscireceiver/internal/metadata"
)

type scraper struct {
    mb     *metadata.MetricsBuilder
    client *jenkins.Jenkins
}

func newScraper(cfg *Config, set receiver.CreateSettings) (scraperhelper.Scraper, error) {
    s := &scraper{
        mb : metadata.NewMetricsBuilder(cfg.MetricsBuilderConfig, set),
    }

    return scraperhelper.NewScraper(
        metadata.Type,
        s.scrape,
        scraperhelper.WithStart(func(ctx context.Context, h component.Host) error {
            client, err := cfg.ToClient(h, set.TelemetrySettings)
            if err != nil {
                return err
            }
            // The collector provides a means of injecting authentication
            // on our behalf, so this will ignore the libraries approach
            // and use the configured http client with authentication.
            s.client = jenkins.NewJenkins(nil, cfg.Endpoint)
            s.client.SetHTTPClient(client)
            return nil
        }),
    )
}

func (s scraper) scrape(ctx context.Context) (pmetric.Metrics, error) {
    // To be filled in
    return pmetric.NewMetrics(), nil
}

これで Jenkins レシーバーを初期化するために必要なすべてのセットアップコードが完了しました。

ここからは、入力を待っていた scrape メソッドに焦点を当てます。このメソッドは、設定で構成された間隔(デフォルトでは毎分)で実行されます。

設定されたジョブの数をキャプチャしたい理由は、Jenkins サーバーの成長を確認し、オンボードしたプロジェクトの数を測定できるようにするためです。これを行うために、Jenkins クライアントを呼び出してすべてのジョブをリストし、エラーが報告された場合はメトリクスなしでそれを返し、そうでなければメトリクスビルダーからデータを出力します。

func (s scraper) scrape(ctx context.Context) (pmetric.Metrics, error) {
    jobs, err := s.client.GetJobs()
    if err != nil {
        return pmetric.Metrics{}, err
    }

    // Recording the timestamp to ensure
    // all captured data points within this scrape have the same value.
    now := pcommon.NewTimestampFromTime(time.Now())

    // Casting to an int64 to match the expected type
    s.mb.RecordJenkinsJobsCountDataPoint(now, int64(len(jobs)))

    // To be filled in

    return s.mb.Emit(), nil
}

前のステップでは、すべてのジョブをキャプチャし、ジョブの数を報告することができました。このステップでは、各ジョブを調べ、報告された値を使用してメトリクスをキャプチャします。

func (s scraper) scrape(ctx context.Context) (pmetric.Metrics, error) {
    jobs, err := s.client.GetJobs()
    if err != nil {
        return pmetric.Metrics{}, err
    }

    // Recording the timestamp to ensure
    // all captured data points within this scrape have the same value.
    now := pcommon.NewTimestampFromTime(time.Now())

    // Casting to an int64 to match the expected type
    s.mb.RecordJenkinsJobsCountDataPoint(now, int64(len(jobs)))

    for _, job := range jobs {
        // Ensure we have valid results to start off with
        var (
            build  = job.LastCompletedBuild
            status = metadata.AttributeJobStatusUnknown
        )

        // This will check the result of the job, however,
        // since the only defined attributes are
        // `success`, `failure`, and `unknown`.
        // it is assume that anything did not finish
        // with a success or failure to be an unknown status.

        switch build.Result {
        case "aborted", "not_built", "unstable":
            status = metadata.AttributeJobStatusUnknown
        case "success":
            status = metadata.AttributeJobStatusSuccess
        case "failure":
            status = metadata.AttributeJobStatusFailed
        }

        s.mb.RecordJenkinsJobDurationDataPoint(
            now,
            int64(job.LastCompletedBuild.Duration),
            job.Name,
            status,
        )
    }

    return s.mb.Emit(), nil
}

最後のステップは、コミットからジョブ完了までにかかった時間を計算し、DORA メトリクスを推測するのに役立てることです。

func (s scraper) scrape(ctx context.Context) (pmetric.Metrics, error) {
    jobs, err := s.client.GetJobs()
    if err != nil {
        return pmetric.Metrics{}, err
    }

    // Recording the timestamp to ensure
    // all captured data points within this scrape have the same value.
    now := pcommon.NewTimestampFromTime(time.Now())

    // Casting to an int64 to match the expected type
    s.mb.RecordJenkinsJobsCountDataPoint(now, int64(len(jobs)))

    for _, job := range jobs {
        // Ensure we have valid results to start off with
        var (
            build  = job.LastCompletedBuild
            status = metadata.AttributeJobStatusUnknown
        )

        // Previous step here

        // Ensure that the `ChangeSet` has values
        // set so there is a valid value for us to reference
        if len(build.ChangeSet.Items) == 0 {
            continue
        }

        // Making the assumption that the first changeset
        // item is the most recent change.
        change := build.ChangeSet.Items[0]

        // Record the difference from the build time
        // compared against the change timestamp.
        s.mb.RecordJenkinsJobCommitDeltaDataPoint(
            now,
            int64(build.Timestamp-change.Timestamp),
            job.Name,
            status,
        )
    }

    return s.mb.Emit(), nil
}

これらのステップがすべて完了すると、カスタム Jenkins CI レシーバーの構築が完了です!

次のステップ

コンポーネントから欲しい機能がおそらくもっとあるでしょう。例えば

  • ジョブが使用したブランチ名を含めることはできますか?
  • ジョブのプロジェクト名を含めることはできますか?
  • プロジェクトの累積ジョブ期間をどのように計算しますか?
  • 変更が機能することをどのように検証しますか?

この時間を使って、遊んでみたり、壊してみたり、変更したり、ビルドからログをキャプチャしてみたりしてください。