From 236f57075d99ceedd62259e5960b0ee85ae4d6c6 Mon Sep 17 00:00:00 2001 From: Niko Belokolodov Date: Fri, 14 Nov 2025 15:20:07 +1300 Subject: [PATCH 1/3] feat: adds Snowplow events tracking client --- events/client.go | 273 ++++++++++++++++++++++++++++++++++++++++++ events/client_test.go | 250 ++++++++++++++++++++++++++++++++++++++ go.mod | 4 + go.sum | 15 +++ 4 files changed, 542 insertions(+) create mode 100644 events/client.go create mode 100644 events/client_test.go diff --git a/events/client.go b/events/client.go new file mode 100644 index 0000000..2e27102 --- /dev/null +++ b/events/client.go @@ -0,0 +1,273 @@ +package events + +import ( + "errors" + "net/url" + "sync" + + storagememory "github.com/snowplow/snowplow-golang-tracker/v3/pkg/storage/memory" + sp "github.com/snowplow/snowplow-golang-tracker/v3/tracker" +) + +var ( + // ErrHostMissingScheme is returned when the host URL doesn't include a scheme (http/https). + ErrHostMissingScheme = errors.New("host must include a scheme (http:// or https://)") + + // ErrInvalidBufferSize is returned when buffer size is less than 1. + ErrInvalidBufferSize = errors.New("buffer_size must be positive") +) + +const ( + // SchemaCustomEvent is the Iglu schema for custom events. + SchemaCustomEvent = "iglu:com.gitlab/custom_event/jsonschema/1-0-0" + + // SchemaUserContext is the Iglu schema for user context. + SchemaUserContext = "iglu:com.gitlab/user_context/jsonschema/1-0-0" + + // DefaultTrackerNamespace is the default namespace for the tracker. + DefaultTrackerNamespace = "gitlab" + + // UserAgent is the user agent string sent with all events. + UserAgent = "GitLab Analytics Go SDK" +) + +// Client is the main GitLab SDK client for tracking events. +// Client is safe for concurrent use by multiple goroutines. +type Client struct { + tracker *sp.Tracker + emitter *sp.Emitter + mu sync.Mutex +} + +// UserTracker is a user-scoped tracker that automatically includes user context. +// This is safe for concurrent use and provides a cleaner API for tracking user events. +type UserTracker struct { + client *Client + userID string + userAttributes map[string]interface{} +} + +// ClientOption is a functional option for configuring the Client. +type ClientOption func(*clientConfig) error + +// TrackOption is a functional option for configuring individual track calls. +type TrackOption func(*trackConfig) + +// clientConfig holds configuration for creating a Client. +type clientConfig struct { + bufferSize int +} + +// trackConfig holds configuration for a single track call. +type trackConfig struct { + userID string + userAttributes map[string]interface{} +} + +// NewClient creates a new GitLab SDK client +// +// Required parameters: +// - appID: The ID specified in the GitLab Project Analytics setup guide +// - host: The GitLab Project Analytics instance URL (must include scheme, e.g., http://localhost:9091) +// +// Optional parameters can be provided via ClientOption functions: +// - WithBufferSize: How many events are sent in one request (default: 1) +func NewClient(appID, host string, opts ...ClientOption) (*Client, error) { + config := &clientConfig{ + bufferSize: 1, + } + + for _, opt := range opts { + if err := opt(config); err != nil { + return nil, err + } + } + + uri, err := url.Parse(host) + if err != nil { + return nil, err + } + if uri.Scheme == "" { + return nil, ErrHostMissingScheme + } + + endpoint := uri.Host + if uri.Path != "" && uri.Path != "/" { + endpoint += uri.Path + } + + requestType := "GET" + if config.bufferSize > 1 { + requestType = "POST" + } + + storage := storagememory.Init() + emitter := sp.InitEmitter( + sp.RequireCollectorUri(endpoint), + sp.RequireStorage(*storage), + sp.OptionRequestType(requestType), + sp.OptionProtocol(uri.Scheme), + ) + + client := &Client{ + emitter: emitter, + } + + tracker := sp.InitTracker( + sp.RequireEmitter(emitter), + sp.OptionAppId(appID), + sp.OptionNamespace(DefaultTrackerNamespace), + ) + client.tracker = tracker + + return client, nil +} + +// WithBufferSize sets how many events are sent in one request at a time. +// Setting more than 1 will change the HTTP method from GET to POST. +func WithBufferSize(size int) ClientOption { + return func(cfg *clientConfig) error { + if size < 1 { + return ErrInvalidBufferSize + } + cfg.bufferSize = size + return nil + } +} + +// WithUser sets user context for a specific event. +// This allows per-request user identification, making the SDK safe for +// concurrent use in multi-user applications (e.g., web servers). +// +// Parameters: +// - userID: The ID of the user +// - userAttributes: Optional user attributes to include with the event +// +// Example: +// +// client.Track("button_click", payload, events.WithUser("user123", map[string]interface{}{ +// "user_name": "John Doe", +// })) +func WithUser(userID string, userAttributes map[string]interface{}) TrackOption { + return func(cfg *trackConfig) { + cfg.userID = userID + if userAttributes != nil { + cfg.userAttributes = userAttributes + } + } +} + +// Track sends a custom event to GitLab Analytics. +// +// Parameters: +// - eventName: The name of the event +// - eventPayload: A map of event attributes to include with the event +// - opts: Optional TrackOption functions for per-event configuration (e.g., WithUser) +// +// Example without user: +// +// client.Track("page_view", map[string]interface{}{"page": "/home"}) +// +// Example with user context: +// +// client.Track("button_click", map[string]interface{}{"id": "submit"}, +// events.WithUser("user123", map[string]interface{}{"name": "John"})) +func (c *Client) Track(eventName string, eventPayload map[string]interface{}, opts ...TrackOption) { + c.mu.Lock() + defer c.mu.Unlock() + + cfg := &trackConfig{} + for _, opt := range opts { + opt(cfg) + } + + eventData := map[string]interface{}{ + "name": eventName, + "props": eventPayload, + } + eventJSON := sp.InitSelfDescribingJson(SchemaCustomEvent, eventData) + + event := sp.SelfDescribingEvent{ + Event: eventJSON, + } + + subject := sp.InitSubject() + subject.SetUseragent(UserAgent) + if cfg.userID != "" { + subject.SetUserId(cfg.userID) + } + c.tracker.SetSubject(subject) + + if len(cfg.userAttributes) > 0 { + userContext := sp.InitSelfDescribingJson(SchemaUserContext, cfg.userAttributes) + event.Contexts = []sp.SelfDescribingJson{*userContext} + } + + c.tracker.TrackSelfDescribingEvent(event) +} + +// Identify is a convenience method that returns a TrackOption for user identification. +// This allows you to create reusable user context that can be passed to multiple Track calls. +// +// Parameters: +// - userID: The ID of the user +// - userAttributes: Optional user attributes to include with events +// +// Example: +// +// userCtx := client.Identify("user123", map[string]interface{}{"name": "John"}) +// client.Track("event1", payload, userCtx) +// client.Track("event2", payload, userCtx) +// +// Note: This is a convenience wrapper around WithUser and does not store state in the client. +func (c *Client) Identify(userID string, userAttributes map[string]interface{}) TrackOption { + return WithUser(userID, userAttributes) +} + +// ForUser creates a UserTracker scoped to a specific user. +// This provides a cleaner API for tracking events for a single user without passing context repeatedly. +// +// Parameters: +// - userID: The ID of the user +// - userAttributes: Optional user attributes to include with all events +// +// Example: +// +// userTracker := client.ForUser("user123", map[string]interface{}{ +// "name": "John Doe", +// "email": "john@example.com", +// }) +// userTracker.Track("login", map[string]interface{}{}) +// userTracker.Track("page_view", map[string]interface{}{"page": "/dashboard"}) +func (c *Client) ForUser(userID string, userAttributes map[string]interface{}) *UserTracker { + return &UserTracker{ + client: c, + userID: userID, + userAttributes: userAttributes, + } +} + +// Track sends a custom event with the user context automatically included. +// +// Parameters: +// - eventName: The name of the event +// - eventPayload: A map of event attributes to include with the event +// +// Example: +// +// userTracker := client.ForUser("user123", map[string]interface{}{"name": "John"}) +// userTracker.Track("button_click", map[string]interface{}{"button": "submit"}) +func (ut *UserTracker) Track(eventName string, eventPayload map[string]interface{}) { + ut.client.Track(eventName, eventPayload, WithUser(ut.userID, ut.userAttributes)) +} + +// FlushEvents manually flushes all events from the emitter. +// +// Parameters: +// - async: If true, flush events asynchronously; if false, flush synchronously (default: false) +func (c *Client) FlushEvents(async bool) { + c.emitter.Flush() + if !async { + c.tracker.BlockingFlush(5, 10) + } +} diff --git a/events/client_test.go b/events/client_test.go new file mode 100644 index 0000000..907d91d --- /dev/null +++ b/events/client_test.go @@ -0,0 +1,250 @@ +package events + +import ( + "fmt" + "io" + "log" + "net/http" + "net/http/httptest" + "os" + "testing" +) + +func TestMain(m *testing.M) { + // Disable Snowplow's HTTP error logging during tests to avoid noise + log.SetOutput(io.Discard) + os.Exit(m.Run()) +} + +func TestNewClient(t *testing.T) { + tests := []struct { + name string + appID string + host string + wantErr bool + }{ + { + name: "valid https host", + appID: "test-app-id", + host: "https://snowplowcollector.com", + wantErr: false, + }, + { + name: "valid http host with port", + appID: "test-app-id", + host: "http://localhost:9091", + wantErr: false, + }, + { + name: "host without scheme", + appID: "test-app-id", + host: "snowplowcollector.com", + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + client, err := NewClient(tt.appID, tt.host) + if (err != nil) != tt.wantErr { + t.Errorf("NewClient() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !tt.wantErr && client == nil { + t.Error("NewClient() returned nil client") + } + }) + } +} + +// createTestClient creates a client with a mock HTTP server. +func createTestClient(t *testing.T) (*Client, *httptest.Server) { + t.Helper() + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) + + client, err := NewClient("test-app", server.URL) + if err != nil { + t.Fatalf("Failed to create client: %v", err) + } + + return client, server +} + +func TestClient_Track(t *testing.T) { + client, server := createTestClient(t) + defer server.Close() + + t.Run("without user context", func(t *testing.T) { + client.Track("test_event", map[string]interface{}{ + "id": 123, + "value": "test", + }) + }) + + t.Run("with empty payload", func(t *testing.T) { + client.Track("test_event_empty", map[string]interface{}{}) + }) + + t.Run("with user context", func(t *testing.T) { + client.Track("user_event", map[string]interface{}{"action": "click"}, + WithUser("user123", map[string]interface{}{ + "user_name": "Test User", + "email": "test@example.com", + })) + }) + + t.Run("with user ID only", func(t *testing.T) { + client.Track("user_event_id_only", map[string]interface{}{"action": "view"}, + WithUser("user456", nil)) + }) + + t.Run("multiple events with same user context", func(t *testing.T) { + userCtx := WithUser("user789", map[string]interface{}{"role": "admin"}) + client.Track("event1", map[string]interface{}{"id": 1}, userCtx) + client.Track("event2", map[string]interface{}{"id": 2}, userCtx) + }) + + t.Run("multiple users", func(t *testing.T) { + for i := range 3 { + userCtx := WithUser(fmt.Sprintf("user_%d", i), map[string]interface{}{"num": i}) + client.Track("multi_user_event", map[string]interface{}{"user_num": i}, userCtx) + } + }) +} + +func TestClient_ForUser(t *testing.T) { + client, server := createTestClient(t) + defer server.Close() + + t.Run("creates user tracker", func(t *testing.T) { + userTracker := client.ForUser("user_123", map[string]interface{}{ + "name": "John Doe", + "email": "john@example.com", + }) + + if userTracker == nil { + t.Fatal("ForUser returned nil") + } + }) + + t.Run("with nil attributes", func(t *testing.T) { + userTracker := client.ForUser("user_456", nil) + + if userTracker == nil { + t.Fatal("ForUser returned nil") + } + }) +} + +func TestUserTracker_Track(t *testing.T) { + t.Run("track event with user context", func(t *testing.T) { + client, server := createTestClient(t) + defer server.Close() + + userTracker := client.ForUser("user_123", map[string]interface{}{ + "name": "John Doe", + "email": "john@example.com", + }) + + userTracker.Track("login", map[string]interface{}{ + "method": "email", + }) + }) + + t.Run("track multiple events with same user tracker", func(t *testing.T) { + client, server := createTestClient(t) + defer server.Close() + + userTracker := client.ForUser("user_456", map[string]interface{}{ + "role": "admin", + }) + + userTracker.Track("page_view", map[string]interface{}{"page": "/dashboard"}) + userTracker.Track("button_click", map[string]interface{}{"button": "export"}) + userTracker.Track("file_download", map[string]interface{}{"file": "report.csv"}) + }) + + t.Run("multiple user trackers from same client", func(t *testing.T) { + client, server := createTestClient(t) + defer server.Close() + + user1Tracker := client.ForUser("user_1", map[string]interface{}{"name": "User 1"}) + user2Tracker := client.ForUser("user_2", map[string]interface{}{"name": "User 2"}) + + // Track events sequentially (Snowplow library has internal race conditions with concurrent use) + for i := range 3 { + user1Tracker.Track("event", map[string]interface{}{"num": i}) + } + + for i := range 3 { + user2Tracker.Track("event", map[string]interface{}{"num": i}) + } + }) +} + +func TestClient_FlushEvents(t *testing.T) { + t.Run("flush events synchronously", func(t *testing.T) { + client, server := createTestClient(t) + defer server.Close() + + client.Track("event1", map[string]interface{}{"id": 1}) + client.Track("event2", map[string]interface{}{"id": 2}) + + client.FlushEvents(false) + }) + + t.Run("flush events asynchronously", func(t *testing.T) { + client, server := createTestClient(t) + defer server.Close() + + client.Track("event3", map[string]interface{}{"id": 3}) + + client.FlushEvents(true) + }) +} + +func TestWithBufferSize(t *testing.T) { + tests := []struct { + name string + bufferSize int + wantErr bool + }{ + { + name: "valid buffer size 1", + bufferSize: 1, + wantErr: false, + }, + { + name: "valid buffer size 5", + bufferSize: 5, + wantErr: false, + }, + { + name: "invalid buffer size 0", + bufferSize: 0, + wantErr: true, + }, + { + name: "invalid buffer size negative", + bufferSize: -1, + wantErr: true, + }, + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := NewClient("test-app", server.URL, WithBufferSize(tt.bufferSize)) + if (err != nil) != tt.wantErr { + t.Errorf("WithBufferSize() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} diff --git a/go.mod b/go.mod index dda720d..6813137 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/prometheus/client_model v0.6.1 github.com/sebest/xff v0.0.0-20210106013422-671bd2870b3a github.com/sirupsen/logrus v1.9.3 + github.com/snowplow/snowplow-golang-tracker/v3 v3.1.0 github.com/stretchr/testify v1.10.0 github.com/uber/jaeger-client-go v2.29.1+incompatible gitlab.com/gitlab-org/go/reopen v1.0.0 @@ -48,6 +49,9 @@ require ( github.com/google/pprof v0.0.0-20210804190019-f964ff605595 // indirect github.com/google/uuid v1.6.0 // indirect github.com/googleapis/gax-go/v2 v2.0.5 // indirect + github.com/hashicorp/go-immutable-radix v1.3.1 // indirect + github.com/hashicorp/go-memdb v1.3.4 // indirect + github.com/hashicorp/golang-lru v0.5.4 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/klauspost/compress v1.17.9 // indirect github.com/kylelemons/godebug v1.1.0 // indirect diff --git a/go.sum b/go.sum index 7653da7..e4f70f1 100644 --- a/go.sum +++ b/go.sum @@ -203,12 +203,23 @@ github.com/grpc-ecosystem/go-grpc-middleware v1.3.0/go.mod h1:z0ButlSOZa5vEBq9m2 github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.2 h1:sGm2vDRFUrQJO/Veii4h4zG2vvqG6uWNkBHSTqXOZk0= github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.2/go.mod h1:wd1YpapPLivG6nQgbf7ZkG1hhSOXDhhn4MLTknx2aAc= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= +github.com/hashicorp/go-immutable-radix v1.3.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= +github.com/hashicorp/go-immutable-radix v1.3.1 h1:DKHmCUm2hRBK510BaiZlwvpD40f8bJFeZnpfm2KLowc= +github.com/hashicorp/go-immutable-radix v1.3.1/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= +github.com/hashicorp/go-memdb v1.3.4 h1:XSL3NR682X/cVk2IeV0d70N4DZ9ljI885xAEU8IoK3c= +github.com/hashicorp/go-memdb v1.3.4/go.mod h1:uBTr1oQbtuMgd1SSGoR8YV27eT3sBHbYiNm53bMpgSg= +github.com/hashicorp/go-uuid v1.0.0 h1:RS8zrF7PhGwyNPOtxSClXXj9HA8feRnJzgnI1RJCSnM= +github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= +github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= +github.com/jarcoal/httpmock v1.2.0 h1:gSvTxxFR/MEMfsGrvRbdfpRUMBStovlSRLw0Ep1bwwc= +github.com/jarcoal/httpmock v1.2.0/go.mod h1:oCoTsnAz4+UoOUIf5lJOWV2QQIW5UoeUI6aM2YnWAZk= github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= @@ -235,6 +246,8 @@ github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20210210170715-a github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20210210170715-a8dfcb80d3a7/go.mod h1:Spd59icnvRxSKuyijbbwe5AemzvcyXAUBgApa7VybMw= github.com/lightstep/lightstep-tracer-go v0.25.0 h1:sGVnz8h3jTQuHKMbUe2949nXm3Sg09N1UcR3VoQNN5E= github.com/lightstep/lightstep-tracer-go v0.25.0/go.mod h1:G1ZAEaqTHFPWpWunnbUn1ADEY/Jvzz7jIOaXwAfD6A8= +github.com/mattn/go-sqlite3 v1.14.16 h1:yOQRA0RpS5PFz/oikGwBEqvAWhWg5ufRz4ETLjwpU1Y= +github.com/mattn/go-sqlite3 v1.14.16/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= @@ -282,6 +295,8 @@ github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6Mwd github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/snowplow/snowplow-golang-tracker/v3 v3.1.0 h1:KltihpesBUw2gCTQ4YSFzoICLgrn4WpKinObXrPsdwY= +github.com/snowplow/snowplow-golang-tracker/v3 v3.1.0/go.mod h1:pG2FAfMzD7YSYju/xVSgIYthcI+0eyZ0hkwsq+VssKU= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= -- GitLab From 42b082e09836768dca20db4a42c8b995d310f670 Mon Sep 17 00:00:00 2001 From: Niko Belokolodov Date: Tue, 18 Nov 2025 12:17:20 +1300 Subject: [PATCH 2/3] Address review comments 1. Replaced interface{} with any throughout the MR 2. Added comprehensive test cases for URL path edge cases (5 new test scenarios) 3. Refactored magic numbers to named constants: - BufferSize = 5 - FlushAttempts = 5 - FlushSleepMs = 10 4. Removed WithBufferSize option and simplified API: - Deleted ClientOption type - Deleted clientConfig struct - Deleted ErrInvalidBufferSize error - Deleted TestWithBufferSize test - Simplified NewClient signature 5. Made constants non-configurable by removing "Default" prefix 6. Updated internal constants: - trackerNamespace (now lowercase/private) - UserAgent updated to "GitLab LabKit" --- events/client.go | 115 +++++++++---------------------- events/client_test.go | 153 ++++++++++++++++++++---------------------- 2 files changed, 107 insertions(+), 161 deletions(-) diff --git a/events/client.go b/events/client.go index 2e27102..568d62c 100644 --- a/events/client.go +++ b/events/client.go @@ -12,9 +12,6 @@ import ( var ( // ErrHostMissingScheme is returned when the host URL doesn't include a scheme (http/https). ErrHostMissingScheme = errors.New("host must include a scheme (http:// or https://)") - - // ErrInvalidBufferSize is returned when buffer size is less than 1. - ErrInvalidBufferSize = errors.New("buffer_size must be positive") ) const ( @@ -24,11 +21,20 @@ const ( // SchemaUserContext is the Iglu schema for user context. SchemaUserContext = "iglu:com.gitlab/user_context/jsonschema/1-0-0" - // DefaultTrackerNamespace is the default namespace for the tracker. - DefaultTrackerNamespace = "gitlab" + // trackerNamespace is the default namespace for the tracker. + trackerNamespace = "gitlab" // UserAgent is the user agent string sent with all events. - UserAgent = "GitLab Analytics Go SDK" + UserAgent = "GitLab LabKit" + + // BufferSize is the number of events sent in one request. + BufferSize = 5 + + // FlushAttempts is the number of retry attempts when blocking flush. + FlushAttempts = 5 + + // FlushSleepMs is the sleep time in milliseconds between flush attempts. + FlushSleepMs = 10 ) // Client is the main GitLab SDK client for tracking events. @@ -44,24 +50,16 @@ type Client struct { type UserTracker struct { client *Client userID string - userAttributes map[string]interface{} + userAttributes map[string]any } -// ClientOption is a functional option for configuring the Client. -type ClientOption func(*clientConfig) error - // TrackOption is a functional option for configuring individual track calls. type TrackOption func(*trackConfig) -// clientConfig holds configuration for creating a Client. -type clientConfig struct { - bufferSize int -} - // trackConfig holds configuration for a single track call. type trackConfig struct { userID string - userAttributes map[string]interface{} + userAttributes map[string]any } // NewClient creates a new GitLab SDK client @@ -69,20 +67,7 @@ type trackConfig struct { // Required parameters: // - appID: The ID specified in the GitLab Project Analytics setup guide // - host: The GitLab Project Analytics instance URL (must include scheme, e.g., http://localhost:9091) -// -// Optional parameters can be provided via ClientOption functions: -// - WithBufferSize: How many events are sent in one request (default: 1) -func NewClient(appID, host string, opts ...ClientOption) (*Client, error) { - config := &clientConfig{ - bufferSize: 1, - } - - for _, opt := range opts { - if err := opt(config); err != nil { - return nil, err - } - } - +func NewClient(appID, host string) (*Client, error) { uri, err := url.Parse(host) if err != nil { return nil, err @@ -96,17 +81,13 @@ func NewClient(appID, host string, opts ...ClientOption) (*Client, error) { endpoint += uri.Path } - requestType := "GET" - if config.bufferSize > 1 { - requestType = "POST" - } - storage := storagememory.Init() emitter := sp.InitEmitter( sp.RequireCollectorUri(endpoint), sp.RequireStorage(*storage), - sp.OptionRequestType(requestType), + sp.OptionRequestType("POST"), sp.OptionProtocol(uri.Scheme), + sp.OptionSendLimit(BufferSize), ) client := &Client{ @@ -116,25 +97,13 @@ func NewClient(appID, host string, opts ...ClientOption) (*Client, error) { tracker := sp.InitTracker( sp.RequireEmitter(emitter), sp.OptionAppId(appID), - sp.OptionNamespace(DefaultTrackerNamespace), + sp.OptionNamespace(trackerNamespace), ) client.tracker = tracker return client, nil } -// WithBufferSize sets how many events are sent in one request at a time. -// Setting more than 1 will change the HTTP method from GET to POST. -func WithBufferSize(size int) ClientOption { - return func(cfg *clientConfig) error { - if size < 1 { - return ErrInvalidBufferSize - } - cfg.bufferSize = size - return nil - } -} - // WithUser sets user context for a specific event. // This allows per-request user identification, making the SDK safe for // concurrent use in multi-user applications (e.g., web servers). @@ -145,10 +114,10 @@ func WithBufferSize(size int) ClientOption { // // Example: // -// client.Track("button_click", payload, events.WithUser("user123", map[string]interface{}{ +// client.Track("button_click", payload, events.WithUser("user123", map[string]any{ // "user_name": "John Doe", // })) -func WithUser(userID string, userAttributes map[string]interface{}) TrackOption { +func WithUser(userID string, userAttributes map[string]any) TrackOption { return func(cfg *trackConfig) { cfg.userID = userID if userAttributes != nil { @@ -166,13 +135,13 @@ func WithUser(userID string, userAttributes map[string]interface{}) TrackOption // // Example without user: // -// client.Track("page_view", map[string]interface{}{"page": "/home"}) +// client.Track("page_view", map[string]any{"page": "/home"}) // // Example with user context: // -// client.Track("button_click", map[string]interface{}{"id": "submit"}, -// events.WithUser("user123", map[string]interface{}{"name": "John"})) -func (c *Client) Track(eventName string, eventPayload map[string]interface{}, opts ...TrackOption) { +// client.Track("button_click", map[string]any{"id": "submit"}, +// events.WithUser("user123", map[string]any{"name": "John"})) +func (c *Client) Track(eventName string, eventPayload map[string]any, opts ...TrackOption) { c.mu.Lock() defer c.mu.Unlock() @@ -181,7 +150,7 @@ func (c *Client) Track(eventName string, eventPayload map[string]interface{}, op opt(cfg) } - eventData := map[string]interface{}{ + eventData := map[string]any{ "name": eventName, "props": eventPayload, } @@ -206,24 +175,6 @@ func (c *Client) Track(eventName string, eventPayload map[string]interface{}, op c.tracker.TrackSelfDescribingEvent(event) } -// Identify is a convenience method that returns a TrackOption for user identification. -// This allows you to create reusable user context that can be passed to multiple Track calls. -// -// Parameters: -// - userID: The ID of the user -// - userAttributes: Optional user attributes to include with events -// -// Example: -// -// userCtx := client.Identify("user123", map[string]interface{}{"name": "John"}) -// client.Track("event1", payload, userCtx) -// client.Track("event2", payload, userCtx) -// -// Note: This is a convenience wrapper around WithUser and does not store state in the client. -func (c *Client) Identify(userID string, userAttributes map[string]interface{}) TrackOption { - return WithUser(userID, userAttributes) -} - // ForUser creates a UserTracker scoped to a specific user. // This provides a cleaner API for tracking events for a single user without passing context repeatedly. // @@ -233,13 +184,13 @@ func (c *Client) Identify(userID string, userAttributes map[string]interface{}) // // Example: // -// userTracker := client.ForUser("user123", map[string]interface{}{ +// userTracker := client.ForUser("user123", map[string]any{ // "name": "John Doe", // "email": "john@example.com", // }) -// userTracker.Track("login", map[string]interface{}{}) -// userTracker.Track("page_view", map[string]interface{}{"page": "/dashboard"}) -func (c *Client) ForUser(userID string, userAttributes map[string]interface{}) *UserTracker { +// userTracker.Track("login", map[string]any{}) +// userTracker.Track("page_view", map[string]any{"page": "/dashboard"}) +func (c *Client) ForUser(userID string, userAttributes map[string]any) *UserTracker { return &UserTracker{ client: c, userID: userID, @@ -255,9 +206,9 @@ func (c *Client) ForUser(userID string, userAttributes map[string]interface{}) * // // Example: // -// userTracker := client.ForUser("user123", map[string]interface{}{"name": "John"}) -// userTracker.Track("button_click", map[string]interface{}{"button": "submit"}) -func (ut *UserTracker) Track(eventName string, eventPayload map[string]interface{}) { +// userTracker := client.ForUser("user123", map[string]any{"name": "John"}) +// userTracker.Track("button_click", map[string]any{"button": "submit"}) +func (ut *UserTracker) Track(eventName string, eventPayload map[string]any) { ut.client.Track(eventName, eventPayload, WithUser(ut.userID, ut.userAttributes)) } @@ -268,6 +219,6 @@ func (ut *UserTracker) Track(eventName string, eventPayload map[string]interface func (c *Client) FlushEvents(async bool) { c.emitter.Flush() if !async { - c.tracker.BlockingFlush(5, 10) + c.tracker.BlockingFlush(FlushAttempts, FlushSleepMs) } } diff --git a/events/client_test.go b/events/client_test.go index 907d91d..6199a26 100644 --- a/events/client_test.go +++ b/events/client_test.go @@ -18,22 +18,25 @@ func TestMain(m *testing.M) { func TestNewClient(t *testing.T) { tests := []struct { - name string - appID string - host string - wantErr bool + name string + appID string + host string + wantErr bool + expectedEndpoint string }{ { - name: "valid https host", - appID: "test-app-id", - host: "https://snowplowcollector.com", - wantErr: false, + name: "valid https host", + appID: "test-app-id", + host: "https://snowplowcollector.com", + wantErr: false, + expectedEndpoint: "snowplowcollector.com", }, { - name: "valid http host with port", - appID: "test-app-id", - host: "http://localhost:9091", - wantErr: false, + name: "valid http host with port", + appID: "test-app-id", + host: "http://localhost:9091", + wantErr: false, + expectedEndpoint: "localhost:9091", }, { name: "host without scheme", @@ -41,6 +44,34 @@ func TestNewClient(t *testing.T) { host: "snowplowcollector.com", wantErr: true, }, + { + name: "host with root path", + appID: "test-app-id", + host: "https://collector.com/", + wantErr: false, + expectedEndpoint: "collector.com", + }, + { + name: "host with path", + appID: "test-app-id", + host: "https://collector.com/api/v1", + wantErr: false, + expectedEndpoint: "collector.com/api/v1", + }, + { + name: "host with path and trailing slash", + appID: "test-app-id", + host: "https://collector.com/api/", + wantErr: false, + expectedEndpoint: "collector.com/api/", + }, + { + name: "host with port and path", + appID: "test-app-id", + host: "http://localhost:8080/collector", + wantErr: false, + expectedEndpoint: "localhost:8080/collector", + }, } for _, tt := range tests { @@ -52,6 +83,13 @@ func TestNewClient(t *testing.T) { } if !tt.wantErr && client == nil { t.Error("NewClient() returned nil client") + return + } + // Validate endpoint construction + if !tt.wantErr && tt.expectedEndpoint != "" { + if client.emitter.CollectorUri != tt.expectedEndpoint { + t.Errorf("NewClient() endpoint = %v, want %v", client.emitter.CollectorUri, tt.expectedEndpoint) + } } }) } @@ -78,39 +116,39 @@ func TestClient_Track(t *testing.T) { defer server.Close() t.Run("without user context", func(t *testing.T) { - client.Track("test_event", map[string]interface{}{ + client.Track("test_event", map[string]any{ "id": 123, "value": "test", }) }) t.Run("with empty payload", func(t *testing.T) { - client.Track("test_event_empty", map[string]interface{}{}) + client.Track("test_event_empty", map[string]any{}) }) t.Run("with user context", func(t *testing.T) { - client.Track("user_event", map[string]interface{}{"action": "click"}, - WithUser("user123", map[string]interface{}{ + client.Track("user_event", map[string]any{"action": "click"}, + WithUser("user123", map[string]any{ "user_name": "Test User", "email": "test@example.com", })) }) t.Run("with user ID only", func(t *testing.T) { - client.Track("user_event_id_only", map[string]interface{}{"action": "view"}, + client.Track("user_event_id_only", map[string]any{"action": "view"}, WithUser("user456", nil)) }) t.Run("multiple events with same user context", func(t *testing.T) { - userCtx := WithUser("user789", map[string]interface{}{"role": "admin"}) - client.Track("event1", map[string]interface{}{"id": 1}, userCtx) - client.Track("event2", map[string]interface{}{"id": 2}, userCtx) + userCtx := WithUser("user789", map[string]any{"role": "admin"}) + client.Track("event1", map[string]any{"id": 1}, userCtx) + client.Track("event2", map[string]any{"id": 2}, userCtx) }) t.Run("multiple users", func(t *testing.T) { for i := range 3 { - userCtx := WithUser(fmt.Sprintf("user_%d", i), map[string]interface{}{"num": i}) - client.Track("multi_user_event", map[string]interface{}{"user_num": i}, userCtx) + userCtx := WithUser(fmt.Sprintf("user_%d", i), map[string]any{"num": i}) + client.Track("multi_user_event", map[string]any{"user_num": i}, userCtx) } }) } @@ -120,7 +158,7 @@ func TestClient_ForUser(t *testing.T) { defer server.Close() t.Run("creates user tracker", func(t *testing.T) { - userTracker := client.ForUser("user_123", map[string]interface{}{ + userTracker := client.ForUser("user_123", map[string]any{ "name": "John Doe", "email": "john@example.com", }) @@ -144,12 +182,12 @@ func TestUserTracker_Track(t *testing.T) { client, server := createTestClient(t) defer server.Close() - userTracker := client.ForUser("user_123", map[string]interface{}{ + userTracker := client.ForUser("user_123", map[string]any{ "name": "John Doe", "email": "john@example.com", }) - userTracker.Track("login", map[string]interface{}{ + userTracker.Track("login", map[string]any{ "method": "email", }) }) @@ -158,29 +196,29 @@ func TestUserTracker_Track(t *testing.T) { client, server := createTestClient(t) defer server.Close() - userTracker := client.ForUser("user_456", map[string]interface{}{ + userTracker := client.ForUser("user_456", map[string]any{ "role": "admin", }) - userTracker.Track("page_view", map[string]interface{}{"page": "/dashboard"}) - userTracker.Track("button_click", map[string]interface{}{"button": "export"}) - userTracker.Track("file_download", map[string]interface{}{"file": "report.csv"}) + userTracker.Track("page_view", map[string]any{"page": "/dashboard"}) + userTracker.Track("button_click", map[string]any{"button": "export"}) + userTracker.Track("file_download", map[string]any{"file": "report.csv"}) }) t.Run("multiple user trackers from same client", func(t *testing.T) { client, server := createTestClient(t) defer server.Close() - user1Tracker := client.ForUser("user_1", map[string]interface{}{"name": "User 1"}) - user2Tracker := client.ForUser("user_2", map[string]interface{}{"name": "User 2"}) + user1Tracker := client.ForUser("user_1", map[string]any{"name": "User 1"}) + user2Tracker := client.ForUser("user_2", map[string]any{"name": "User 2"}) // Track events sequentially (Snowplow library has internal race conditions with concurrent use) for i := range 3 { - user1Tracker.Track("event", map[string]interface{}{"num": i}) + user1Tracker.Track("event", map[string]any{"num": i}) } for i := range 3 { - user2Tracker.Track("event", map[string]interface{}{"num": i}) + user2Tracker.Track("event", map[string]any{"num": i}) } }) } @@ -190,8 +228,8 @@ func TestClient_FlushEvents(t *testing.T) { client, server := createTestClient(t) defer server.Close() - client.Track("event1", map[string]interface{}{"id": 1}) - client.Track("event2", map[string]interface{}{"id": 2}) + client.Track("event1", map[string]any{"id": 1}) + client.Track("event2", map[string]any{"id": 2}) client.FlushEvents(false) }) @@ -200,51 +238,8 @@ func TestClient_FlushEvents(t *testing.T) { client, server := createTestClient(t) defer server.Close() - client.Track("event3", map[string]interface{}{"id": 3}) + client.Track("event3", map[string]any{"id": 3}) client.FlushEvents(true) }) } - -func TestWithBufferSize(t *testing.T) { - tests := []struct { - name string - bufferSize int - wantErr bool - }{ - { - name: "valid buffer size 1", - bufferSize: 1, - wantErr: false, - }, - { - name: "valid buffer size 5", - bufferSize: 5, - wantErr: false, - }, - { - name: "invalid buffer size 0", - bufferSize: 0, - wantErr: true, - }, - { - name: "invalid buffer size negative", - bufferSize: -1, - wantErr: true, - }, - } - - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - })) - defer server.Close() - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - _, err := NewClient("test-app", server.URL, WithBufferSize(tt.bufferSize)) - if (err != nil) != tt.wantErr { - t.Errorf("WithBufferSize() error = %v, wantErr %v", err, tt.wantErr) - } - }) - } -} -- GitLab From e62174a670b9a24f4c5471c1add0c864ac0e2ff0 Mon Sep 17 00:00:00 2001 From: Niko Belokolodov Date: Tue, 18 Nov 2025 20:15:00 +1300 Subject: [PATCH 3/3] Attempts to stop emitter manually to fix races --- events/client.go | 37 ++++- events/client_test.go | 343 ++++++++++++++++++++++++++++++++++++++---- 2 files changed, 344 insertions(+), 36 deletions(-) diff --git a/events/client.go b/events/client.go index 568d62c..ca19051 100644 --- a/events/client.go +++ b/events/client.go @@ -40,9 +40,10 @@ const ( // Client is the main GitLab SDK client for tracking events. // Client is safe for concurrent use by multiple goroutines. type Client struct { - tracker *sp.Tracker - emitter *sp.Emitter - mu sync.Mutex + tracker *sp.Tracker + emitter *sp.Emitter + mu sync.Mutex + emitterActive bool // tracks if emitter has been started } // UserTracker is a user-scoped tracker that automatically includes user context. @@ -173,6 +174,8 @@ func (c *Client) Track(eventName string, eventPayload map[string]any, opts ...Tr } c.tracker.TrackSelfDescribingEvent(event) + // Mark emitter as active after first event + c.emitterActive = true } // ForUser creates a UserTracker scoped to a specific user. @@ -212,11 +215,39 @@ func (ut *UserTracker) Track(eventName string, eventPayload map[string]any) { ut.client.Track(eventName, eventPayload, WithUser(ut.userID, ut.userAttributes)) } +// Close gracefully shuts down the client by stopping the emitter and flushing remaining events. +// This should be called when the client is no longer needed to ensure all events are sent +// and background goroutines are stopped. +// Close is safe to call multiple times. +// +// Example: +// +// client, err := events.NewClient("app-id", "https://collector.com") +// if err != nil { +// log.Fatal(err) +// } +// defer client.Close() +func (c *Client) Close() { + c.mu.Lock() + defer c.mu.Unlock() + + // Only flush and stop if emitter was activated (i.e., at least one event was tracked) + // The Snowplow emitter.Stop() blocks forever on a nil channel if never started + if c.emitterActive { + c.tracker.BlockingFlush(FlushAttempts, FlushSleepMs) + c.emitter.Stop() + c.emitterActive = false + } +} + // FlushEvents manually flushes all events from the emitter. // // Parameters: // - async: If true, flush events asynchronously; if false, flush synchronously (default: false) func (c *Client) FlushEvents(async bool) { + c.mu.Lock() + defer c.mu.Unlock() + c.emitter.Flush() if !async { c.tracker.BlockingFlush(FlushAttempts, FlushSleepMs) diff --git a/events/client_test.go b/events/client_test.go index 6199a26..a134a11 100644 --- a/events/client_test.go +++ b/events/client_test.go @@ -7,6 +7,7 @@ import ( "net/http" "net/http/httptest" "os" + "sync" "testing" ) @@ -16,6 +17,25 @@ func TestMain(m *testing.M) { os.Exit(m.Run()) } +// createTestClient creates a client with a mock HTTP server. +// Caller is responsible for calling client.Close() and server.Close(). +func createTestClient(t *testing.T) (*Client, *httptest.Server) { + t.Helper() + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) + + client, err := NewClient("test-app", server.URL) + if err != nil { + server.Close() + t.Fatalf("Failed to create client: %v", err) + } + + return client, server +} + +// TestNewClient validates URL parsing and endpoint construction func TestNewClient(t *testing.T) { tests := []struct { name string @@ -72,6 +92,12 @@ func TestNewClient(t *testing.T) { wantErr: false, expectedEndpoint: "localhost:8080/collector", }, + { + name: "invalid URL", + appID: "test-app-id", + host: "://invalid", + wantErr: true, + }, } for _, tt := range tests { @@ -81,41 +107,31 @@ func TestNewClient(t *testing.T) { t.Errorf("NewClient() error = %v, wantErr %v", err, tt.wantErr) return } - if !tt.wantErr && client == nil { - t.Error("NewClient() returned nil client") - return - } - // Validate endpoint construction - if !tt.wantErr && tt.expectedEndpoint != "" { - if client.emitter.CollectorUri != tt.expectedEndpoint { - t.Errorf("NewClient() endpoint = %v, want %v", client.emitter.CollectorUri, tt.expectedEndpoint) + if !tt.wantErr { + if client == nil { + t.Error("NewClient() returned nil client") + return + } + defer client.Close() + + // Validate endpoint construction + if tt.expectedEndpoint != "" { + if client.emitter.CollectorUri != tt.expectedEndpoint { + t.Errorf("NewClient() endpoint = %v, want %v", client.emitter.CollectorUri, tt.expectedEndpoint) + } } } }) } } -// createTestClient creates a client with a mock HTTP server. -func createTestClient(t *testing.T) (*Client, *httptest.Server) { - t.Helper() - - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - })) - - client, err := NewClient("test-app", server.URL) - if err != nil { - t.Fatalf("Failed to create client: %v", err) - } - - return client, server -} - +// TestClient_Track validates basic event tracking func TestClient_Track(t *testing.T) { - client, server := createTestClient(t) - defer server.Close() - t.Run("without user context", func(t *testing.T) { + client, server := createTestClient(t) + defer server.Close() + defer client.Close() + client.Track("test_event", map[string]any{ "id": 123, "value": "test", @@ -123,10 +139,26 @@ func TestClient_Track(t *testing.T) { }) t.Run("with empty payload", func(t *testing.T) { + client, server := createTestClient(t) + defer server.Close() + defer client.Close() + client.Track("test_event_empty", map[string]any{}) }) + t.Run("with nil payload", func(t *testing.T) { + client, server := createTestClient(t) + defer server.Close() + defer client.Close() + + client.Track("test_event_nil", nil) + }) + t.Run("with user context", func(t *testing.T) { + client, server := createTestClient(t) + defer server.Close() + defer client.Close() + client.Track("user_event", map[string]any{"action": "click"}, WithUser("user123", map[string]any{ "user_name": "Test User", @@ -135,29 +167,43 @@ func TestClient_Track(t *testing.T) { }) t.Run("with user ID only", func(t *testing.T) { + client, server := createTestClient(t) + defer server.Close() + defer client.Close() + client.Track("user_event_id_only", map[string]any{"action": "view"}, WithUser("user456", nil)) }) t.Run("multiple events with same user context", func(t *testing.T) { + client, server := createTestClient(t) + defer server.Close() + defer client.Close() + userCtx := WithUser("user789", map[string]any{"role": "admin"}) client.Track("event1", map[string]any{"id": 1}, userCtx) client.Track("event2", map[string]any{"id": 2}, userCtx) }) - t.Run("multiple users", func(t *testing.T) { - for i := range 3 { + t.Run("multiple events with different users", func(t *testing.T) { + client, server := createTestClient(t) + defer server.Close() + defer client.Close() + + for i := range 5 { userCtx := WithUser(fmt.Sprintf("user_%d", i), map[string]any{"num": i}) client.Track("multi_user_event", map[string]any{"user_num": i}, userCtx) } }) } +// TestClient_ForUser validates UserTracker creation and usage func TestClient_ForUser(t *testing.T) { - client, server := createTestClient(t) - defer server.Close() - t.Run("creates user tracker", func(t *testing.T) { + client, server := createTestClient(t) + defer server.Close() + defer client.Close() + userTracker := client.ForUser("user_123", map[string]any{ "name": "John Doe", "email": "john@example.com", @@ -166,21 +212,47 @@ func TestClient_ForUser(t *testing.T) { if userTracker == nil { t.Fatal("ForUser returned nil") } + + if userTracker.userID != "user_123" { + t.Errorf("UserTracker userID = %v, want user_123", userTracker.userID) + } }) t.Run("with nil attributes", func(t *testing.T) { + client, server := createTestClient(t) + defer server.Close() + defer client.Close() + userTracker := client.ForUser("user_456", nil) if userTracker == nil { t.Fatal("ForUser returned nil") } + + if userTracker.userAttributes != nil { + t.Errorf("UserTracker userAttributes = %v, want nil", userTracker.userAttributes) + } + }) + + t.Run("with empty attributes", func(t *testing.T) { + client, server := createTestClient(t) + defer server.Close() + defer client.Close() + + userTracker := client.ForUser("user_789", map[string]any{}) + + if userTracker == nil { + t.Fatal("ForUser returned nil") + } }) } +// TestUserTracker_Track validates user-scoped event tracking func TestUserTracker_Track(t *testing.T) { t.Run("track event with user context", func(t *testing.T) { client, server := createTestClient(t) defer server.Close() + defer client.Close() userTracker := client.ForUser("user_123", map[string]any{ "name": "John Doe", @@ -195,6 +267,7 @@ func TestUserTracker_Track(t *testing.T) { t.Run("track multiple events with same user tracker", func(t *testing.T) { client, server := createTestClient(t) defer server.Close() + defer client.Close() userTracker := client.ForUser("user_456", map[string]any{ "role": "admin", @@ -208,11 +281,12 @@ func TestUserTracker_Track(t *testing.T) { t.Run("multiple user trackers from same client", func(t *testing.T) { client, server := createTestClient(t) defer server.Close() + defer client.Close() user1Tracker := client.ForUser("user_1", map[string]any{"name": "User 1"}) user2Tracker := client.ForUser("user_2", map[string]any{"name": "User 2"}) - // Track events sequentially (Snowplow library has internal race conditions with concurrent use) + // Track events sequentially to avoid potential Snowplow internal races for i := range 3 { user1Tracker.Track("event", map[string]any{"num": i}) } @@ -223,23 +297,226 @@ func TestUserTracker_Track(t *testing.T) { }) } +// TestClient_FlushEvents validates manual event flushing func TestClient_FlushEvents(t *testing.T) { t.Run("flush events synchronously", func(t *testing.T) { client, server := createTestClient(t) defer server.Close() + defer client.Close() client.Track("event1", map[string]any{"id": 1}) client.Track("event2", map[string]any{"id": 2}) + // Synchronous flush should block until complete client.FlushEvents(false) }) t.Run("flush events asynchronously", func(t *testing.T) { client, server := createTestClient(t) defer server.Close() + defer client.Close() client.Track("event3", map[string]any{"id": 3}) + // Async flush returns immediately client.FlushEvents(true) }) + + t.Run("flush empty queue", func(t *testing.T) { + client, server := createTestClient(t) + defer server.Close() + defer client.Close() + + // Should not panic or error when flushing empty queue + client.FlushEvents(false) + }) +} + +// TestClient_Close validates cleanup and idempotency +func TestClient_Close(t *testing.T) { + t.Run("close after tracking events", func(t *testing.T) { + client, server := createTestClient(t) + defer server.Close() + + client.Track("event1", map[string]any{"id": 1}) + client.Track("event2", map[string]any{"id": 2}) + + // Should flush remaining events and stop emitter + client.Close() + }) + + t.Run("close multiple times", func(t *testing.T) { + client, server := createTestClient(t) + defer server.Close() + + client.Track("event1", map[string]any{"id": 1}) + + // First close + client.Close() + + // Second close should be safe (idempotent) + client.Close() + + // Third close should also be safe + client.Close() + }) + + t.Run("close immediately after creation", func(t *testing.T) { + client, server := createTestClient(t) + defer server.Close() + + // Should be safe to close without tracking any events + client.Close() + }) + + t.Run("track after close should not panic", func(t *testing.T) { + client, server := createTestClient(t) + defer server.Close() + + client.Close() + + // Tracking after close should not panic (though events won't be sent) + // This tests that the mutex protection works correctly + client.Track("event_after_close", map[string]any{"id": 1}) + }) +} + +// TestClient_Concurrency validates thread-safety +func TestClient_Concurrency(t *testing.T) { + t.Run("concurrent Track calls", func(t *testing.T) { + client, server := createTestClient(t) + defer server.Close() + defer client.Close() + + // Start the emitter by tracking one event first + // This avoids the Snowplow library's internal race in emitter.start() + client.Track("init", map[string]any{"init": true}) + + var wg sync.WaitGroup + numGoroutines := 10 + eventsPerGoroutine := 5 + + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + for j := 0; j < eventsPerGoroutine; j++ { + client.Track("concurrent_event", map[string]any{ + "goroutine": id, + "event": j, + }) + } + }(i) + } + + wg.Wait() + }) + + t.Run("concurrent Track with WithUser", func(t *testing.T) { + client, server := createTestClient(t) + defer server.Close() + defer client.Close() + + // Start the emitter by tracking one event first + // This avoids the Snowplow library's internal race in emitter.start() + client.Track("init", map[string]any{"init": true}) + + var wg sync.WaitGroup + numUsers := 5 + + for i := 0; i < numUsers; i++ { + wg.Add(1) + go func(userNum int) { + defer wg.Done() + userID := fmt.Sprintf("user_%d", userNum) + for j := 0; j < 3; j++ { + client.Track("user_event", map[string]any{"event": j}, + WithUser(userID, map[string]any{"user_num": userNum})) + } + }(i) + } + + wg.Wait() + }) + + t.Run("concurrent FlushEvents", func(t *testing.T) { + client, server := createTestClient(t) + defer server.Close() + defer client.Close() + + client.Track("event1", map[string]any{"id": 1}) + client.Track("event2", map[string]any{"id": 2}) + + var wg sync.WaitGroup + for i := 0; i < 3; i++ { + wg.Add(1) + go func() { + defer wg.Done() + client.FlushEvents(true) + }() + } + + wg.Wait() + }) + + t.Run("concurrent Track and FlushEvents", func(t *testing.T) { + client, server := createTestClient(t) + defer server.Close() + defer client.Close() + + // Start the emitter by tracking one event first + // This avoids the Snowplow library's internal race in emitter.start() + client.Track("init", map[string]any{"init": true}) + + var wg sync.WaitGroup + + // Track events concurrently + for i := 0; i < 5; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + for j := 0; j < 3; j++ { + client.Track("event", map[string]any{"id": id, "num": j}) + } + }(i) + } + + // Flush concurrently + wg.Add(1) + go func() { + defer wg.Done() + client.FlushEvents(true) + }() + + wg.Wait() + }) + + t.Run("concurrent Track and Close", func(t *testing.T) { + client, server := createTestClient(t) + defer server.Close() + + // Start the emitter by tracking one event first + // This avoids the Snowplow library's internal race in emitter.start() + client.Track("init", map[string]any{"init": true}) + + var wg sync.WaitGroup + + // Track events concurrently + for i := 0; i < 5; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + client.Track("event", map[string]any{"id": id}) + }(i) + } + + // Close concurrently (after a brief moment) + wg.Add(1) + go func() { + defer wg.Done() + client.Close() + }() + + wg.Wait() + }) } -- GitLab