diff --git a/events/snowplow/README.md b/events/snowplow/README.md new file mode 100644 index 0000000000000000000000000000000000000000..4dc27558b8eabc57248900d566af574e4c3b4b32 --- /dev/null +++ b/events/snowplow/README.md @@ -0,0 +1,191 @@ +# Snowplow Events Tracking + +This module provides a lightweight Snowplow tracking implementation for Go applications, enabling asynchronous event collection and batch sending to Snowplow collectors. + +## Overview + +The implementation consists of: +- **Tracker**: High-level API for tracking events +- **Emitter**: Handles asynchronous batch sending of events to collectors +- **Storage**: In-memory event buffering with thread-safe operations +- **Internal utilities**: JSON schema handling and protocol constants + +## Tracker API + +### Creating a Tracker + +```go +import "gitlab.com/gitlab-org/labkit/events/snowplow" + +// Initialize emitter with collector URL +emitter, err := snowplow.InitEmitter("https://collector.example.com/com.snowplowanalytics.snowplow/tp2") +``` + +#### `InitTracker(appId string, emitter *Emitter) (*Tracker, error)` + +Creates a new Tracker instance. + +**Parameters:** +- `appId` (string): Application identifier (required, cannot be empty) +- `emitter` (*Emitter): Emitter instance for sending events (required, cannot be nil) + +**Returns:** +- `*Tracker`: Configured tracker instance +- `error`: Error if validation fails + +**Example:** +```go +// Create tracker with application ID and emitter + +tracker, err := snowplow.InitTracker("gitlab-web", emitter) +``` + +### Tracking Events + +#### `tracker.TrackEvent(eventName string, props ...map[string]any) error` + +Tracks a custom event with the given name and properties. Events are queued asynchronously and sent in batches to the collector. + +**Parameters:** +- `eventName` (string): Name of the event (required) +- `props` (...map[string]any): Event properties (optional) + +**Returns:** +- `error`: Error if validation fails + +**Supported property types:** +- `string` +- `int`, `int64`, `float64` (numeric types) +- `bool` +- `map[string]any` (nested objects) +- `[]any` (arrays) +- `nil` + +**Example:** +```go +// Event without properties +err := tracker.TrackEvent("user_logout") + +// Simple event with properties +err := tracker.TrackEvent("button_clicked", map[string]any{ + "button_id": "submit", + "page": "/checkout", +}) +``` + +**Event Structure:** + +All tracked events are automatically enriched with: +- `tv` (tracker version): Set to "0.0.1" +- `aid` (application ID): Your configured app ID +- `p` (platform): Set to "srv" (server) +- `tna` (namespace): Set to "gitlab" +- `eid` (event ID): Unique UUID for each event +- `dtm` (device timestamp): Unix milliseconds when event was created +- `e` (event type): Set to "ue" (unstructured event) +- `cx` (context): Base64-encoded self-describing JSON with your event name and properties + +**Event Example + +```json +{ + "schema": "iglu:com.snowplowanalytics.snowplow/payload_data/jsonschema/1-0-4", + "data": [ + { + "e": "ue", //event + "ue_px": "eyJzY2hlbWEiOiJpZ2x1OmNvbS5zbm93cGxvd2FuYWx5dGljcy5zbm93cGxvdy91bnN0cnVjdF9ldmVudC9qc29uc2NoZW1hLzEtMC0wIiwiZGF0YSI6eyJzY2hlbWEiOiJpZ2x1OmNvbS5naXRsYWIvY3VzdG9tX2V2ZW50L2pzb25zY2hlbWEvMS0wLTAiLCJkYXRhIjp7Im5hbWUiOiJhbl9ldmVudCIsInByb3BzIjp7ImlkIjoxMjN9fX19", + "p": "srv", //platform + "ua": "GitLab Analytics Golang Tracker", + "dtm": "1765830622557", + "tna": "gitlab", //namespace + "tv": "0.0.1", //version + "aid": "test_app", //app_id + "eid": "7474693b-3aa5-48f1-86f9-2927275487f3", + "stm": "1765830622558" + } + ] +} + +// decoded context +{ + "schema": "iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0", + "data": { + "schema": "iglu:com.gitlab/custom_event/jsonschema/1-0-0", + "data": { + "name": "an_event", + "props": { + "id": 123 + } + } + } +} +``` + +### Managing the Emitter + +The emitter handles asynchronous sending and can be controlled: + +```go +// Flush any buffered events immediately (non-blocking) +emitter.Flush() + +// Check if emitter is currently sending +if emitter.IsSending() { + log.Println("Events are being sent") +} + +// Wait for in-progress sends to complete before shutdown +emitter.Stop() +``` + +## Architecture + +### `emitter.go` +Manages asynchronous batch sending of events to Snowplow collectors. Events are buffered in memory and sent in configurable batches (default: 5 events per batch). Handles HTTP communication, connection pooling, and retry logic (stops sending on HTTP errors). + +### `storage.go` +Provides thread-safe in-memory event storage. Implements a simple queue where events are stored with auto-incrementing IDs and can be retrieved in LIFO order (most recent first). Supports concurrent operations with proper mutex synchronization. + +### `tracker.go` +High-level API for event tracking. Constructs Snowplow protocol-compliant events, generates UUIDs and timestamps, and encodes custom event data as base64-encoded self-describing JSON. + +### `self_describing_json.go` +Handles self-describing JSON structures required by the Snowplow protocol. Wraps data with schema URIs for both custom events (iglu:com.gitlab/custom_event) and payload data (iglu:com.snowplowanalytics.snowplow/payload_data). + +### `constants.go` +Defines internal Snowplow protocol field names and values. All constants follow Go naming conventions (unexported, camelCase) with prefixes for clarity (field*, value*, schema*). + +### `internal.go` +Defines the Event type alias and internal types used across the package. + +## Testing + +Run tests: +```bash +go test -v -race ./events/snowplow/ +``` + +## Implementation Notes + +- Events are sent asynchronously in batches to minimize overhead +- Failed sends stop the batch processing (no automatic retry) +- Storage is in-memory only (events are lost on process restart) +- HTTP client uses connection pooling (100 max idle connections) +- Request timeout is 5 seconds +- Snowplow payload schema version: 1-0-4 (current) + +## Example Application + +A complete example is available in `example/snowplow_tracking.go` demonstrating: +- Event without properties +- Event with simple properties +- Event with nested multi-level properties +- Proper emitter lifecycle (Flush and Stop) + +Run the example: +```bash +cd events/snowplow/example +go run snowplow_tracking.go +``` + +The example sends events to `http://localhost:9093/com.snowplowanalytics.snowplow/tp2` (default Snowplow Micro url. Navigate to http://localhost:9093/micro/ui to see the events) diff --git a/events/snowplow/constants.go b/events/snowplow/constants.go new file mode 100644 index 0000000000000000000000000000000000000000..55a008f4ab0da664432b191d526f7688c761944a --- /dev/null +++ b/events/snowplow/constants.go @@ -0,0 +1,19 @@ +package snowplow + +const ( + fieldTrackerVersion = "tv" // required + + fieldAppId = "aid" + fieldNamespace = "tna" + fieldPlatform = "p" // required + fieldUserAgent = "ua" + + fieldEvent = "e" // required + fieldEid = "eid" + fieldTimestamp = "dtm" + fieldSentTimestamp = "stm" + fieldContext = "cx" + fieldUnstructedEncoded = "ue_px" + + valueUnstructEvent = "ue" +) diff --git a/events/snowplow/emitter.go b/events/snowplow/emitter.go new file mode 100644 index 0000000000000000000000000000000000000000..a12fb3c68cb3dd2eac7ce14e5b81c730d531822f --- /dev/null +++ b/events/snowplow/emitter.go @@ -0,0 +1,202 @@ +package snowplow + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "io" + "log" + "net/http" + "net/url" + "strconv" + "sync" + "time" +) + +const ( + sendingAmount = 5 +) + +// SendResult represents the result of sending a batch of events. +type SendResult struct { + ids []int + status int +} + +// Emitter handles asynchronous sending of events to a Snowplow collector. +// It buffers events in storage and sends them in batches. +type Emitter struct { + httpClient *http.Client + collectorUrl *url.URL + storage EventStorage + sendMu sync.Mutex + sendChannel chan bool +} + +// InitEmitter creates a new Emitter with the given collector URL. +// Returns an error if the collector URL is invalid. +func InitEmitter(collectorUrl string) (*Emitter, error) { + if collectorUrl == "" { + return nil, errors.New("collectorUrl cannot be empty") + } + + e := &Emitter{} + + e.storage = NewStorage() + + parsedUrl, err := url.Parse(collectorUrl) + if err != nil { + return nil, fmt.Errorf("failed to parse collector URL: %w", err) + } + e.collectorUrl = parsedUrl + + defaultRoundTripper := http.DefaultTransport + defaultTransportPointer, ok := defaultRoundTripper.(*http.Transport) + if !ok { + return nil, errors.New("http.DefaultTransport is not an *http.Transport") + } + defaultTransport := defaultTransportPointer.Clone() + defaultTransport.MaxIdleConns = 100 + defaultTransport.MaxIdleConnsPerHost = 100 + timeout := 5 * time.Second + e.httpClient = &http.Client{ + Timeout: timeout, + Transport: defaultTransport, + } + + return e, nil +} + +// AddAsync adds an event to the buffer and triggers asynchronous sending. +// Events are queued in storage and sent in batches to the collector. +func (e *Emitter) AddAsync(event map[string]string) { + e.storage.Add(event) + e.start() +} + +// Flush triggers sending of any buffered events. +// This is a non-blocking operation that starts the send process if not already running. +func (e *Emitter) Flush() { + e.start() +} + +// Stop waits for any in-progress sending to complete and stops the emitter. +// This method blocks until the current send operation finishes. +func (e *Emitter) Stop() { + e.sendMu.Lock() + ch := e.sendChannel + e.sendMu.Unlock() + + if ch != nil { + <-ch // Wait for completion outside the lock + + e.sendMu.Lock() + e.sendChannel = nil + e.sendMu.Unlock() + } +} + +func (e *Emitter) start() { + e.sendMu.Lock() + + if e.sendChannel == nil || !e.isSending() { + e.sendChannel = make(chan bool, 1) + ch := e.sendChannel + e.sendMu.Unlock() + + go func() { + var done bool + defer func() { + ch <- done // Send to captured channel to avoid race + }() + + for { + eventRows, eventIDs := e.storage.GetWithIDs(sendingAmount) + + if len(eventRows) == 0 { + break + } + result := e.doSend(eventRows, eventIDs) + + if result.status >= 200 && result.status < 400 { + e.storage.Delete(result.ids) + } else { + break + } + } + done = true + }() + return + } + e.sendMu.Unlock() +} + +func (e *Emitter) doSend(eventRows []Event, eventIDs []int) SendResult { + stm := strconv.FormatInt(time.Now().UnixMilli(), 10) + for _, event := range eventRows { + event[fieldSentTimestamp] = stm + } + + result := <-e.sendPostRequest(eventIDs, eventRows) + return result +} + +func (e *Emitter) sendPostRequest(ids []int, payloads []Event) <-chan SendResult { + c := make(chan SendResult, 1) + go func() { + var result SendResult + defer func() { + c <- result + }() + + status := -1 + + events := SelfDescribingJson{ + schema: schemaPayloadData, + data: payloads, + } + eventsJSON, err := json.Marshal(events.Get()) + if err != nil { + log.Printf("Failed to marshal events: %v", err) + result = SendResult{ids: ids, status: status} + return + } + + req, err := http.NewRequest("POST", e.collectorUrl.String(), bytes.NewBuffer(eventsJSON)) + if err != nil { + log.Printf("Failed to create request: %v", err) + result = SendResult{ids: ids, status: status} + return + } + req.Header.Set("Content-Type", "application/json") + + resp, err := e.httpClient.Do(req) + if err != nil { + log.Printf("Failed to send request: %v", err) + result = SendResult{ids: ids, status: status} + return + } + defer func() { + _ = resp.Body.Close() + }() + + // Drain response body to enable connection reuse + _, _ = io.Copy(io.Discard, resp.Body) + + status = resp.StatusCode + result = SendResult{ids: ids, status: status} + }() + return c +} + +// IsSending returns true if the emitter is currently sending events. +func (e *Emitter) IsSending() bool { + e.sendMu.Lock() + defer e.sendMu.Unlock() + return e.isSending() +} + +func (e *Emitter) isSending() bool { + return e.sendChannel != nil && len(e.sendChannel) == 0 +} diff --git a/events/snowplow/example/snowplow_tracking.go b/events/snowplow/example/snowplow_tracking.go new file mode 100644 index 0000000000000000000000000000000000000000..f457faf4d2265cda2bcc4b31fa3e48c10c2af4c7 --- /dev/null +++ b/events/snowplow/example/snowplow_tracking.go @@ -0,0 +1,68 @@ +package main + +import ( + "log" + "time" + + "gitlab.com/gitlab-org/labkit/events/snowplow" +) + +func main() { + // Initialize emitter with collector URL + emitter, err := snowplow.InitEmitter("http://localhost:9093/com.snowplowanalytics.snowplow/tp2") + if err != nil { + log.Fatalf("Failed to create emitter: %v", err) + } + + // Create tracker with application ID + tracker, err := snowplow.InitTracker("test_golang_client", emitter) + if err != nil { + log.Fatalf("Failed to create tracker: %v", err) + } + + // Event 1: Simple event with just a name (no properties) + log.Println("Tracking event 1: simple event without properties") + err = tracker.TrackEvent("user_login") + if err != nil { + log.Printf("Failed to track event 1: %v", err) + } + + // Event 2: Event with simple properties (flat hash) + log.Println("Tracking event 2: event with simple properties") + err = tracker.TrackEvent("button_clicked", map[string]any{ + "button_id": "submit_form", + "user_id": 12345, + }) + if err != nil { + log.Printf("Failed to track event 2: %v", err) + } + + // Event 3: Event with nested properties (multi-level hash) + log.Println("Tracking event 3: event with nested properties") + err = tracker.TrackEvent("page_view", map[string]any{ + "title": "Product Details - Premium Widget", + "duration": 4500, + "user": map[string]any{ + "id": 67890, + "username": "john_doe", + }, + }) + if err != nil { + log.Printf("Failed to track event 3: %v", err) + } + + // Give time for events to be sent asynchronously + log.Println("Waiting for events to be sent...") + time.Sleep(2 * time.Second) + + // Flush any remaining events + log.Println("Flushing remaining events...") + emitter.Flush() + time.Sleep(1 * time.Second) + + // Wait for in-progress sends to complete + log.Println("Stopping emitter...") + emitter.Stop() + + log.Println("All events tracked successfully!") +} diff --git a/events/snowplow/self_describing_json.go b/events/snowplow/self_describing_json.go new file mode 100644 index 0000000000000000000000000000000000000000..1cf39cfe0c6f8ffb748392e1054286bb4c14092c --- /dev/null +++ b/events/snowplow/self_describing_json.go @@ -0,0 +1,28 @@ +package snowplow + +import "encoding/json" + +const ( + schemaCustomEvent = "iglu:com.gitlab/custom_event/jsonschema/1-0-0" // https://gitlab.com/gitlab-org/iglu/-/blob/master/public/schemas/com.gitlab/custom_event/jsonschema/1-0-0 + schemaPayloadData = "iglu:com.snowplowanalytics.snowplow/payload_data/jsonschema/1-0-4" // https://github.com/snowplow/iglu-central/blob/master/schemas/com.snowplowanalytics.snowplow/payload_data/jsonschema/1-0-4 + schemaUnstructEvent = "iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0" +) + +type SelfDescribingJson struct { + schema string + data any +} + +// Get wraps the schema and data into a JSON. +func (s SelfDescribingJson) Get() map[string]interface{} { + return map[string]interface{}{ + "schema": s.schema, + "data": s.data, + } +} + +// String returns the JSON as a String. +func (s SelfDescribingJson) String() string { + jsonBytes, _ := json.Marshal(s.Get()) + return string(jsonBytes) +} diff --git a/events/snowplow/self_describing_json_test.go b/events/snowplow/self_describing_json_test.go new file mode 100644 index 0000000000000000000000000000000000000000..b6919ea3e74c61796e8c5a2a3b064659c5b120ce --- /dev/null +++ b/events/snowplow/self_describing_json_test.go @@ -0,0 +1,410 @@ +package snowplow + +import ( + "encoding/json" + "testing" +) + +func TestSelfDescribingJson_Get(t *testing.T) { + schema := "iglu:com.example/test/jsonschema/1-0-0" + data := map[string]any{ + "key1": "value1", + "key2": 123, + } + + sdj := SelfDescribingJson{ + schema: schema, + data: data, + } + + result := sdj.Get() + + // Check schema field + if result["schema"] != schema { + t.Errorf("Expected schema '%s', got '%s'", schema, result["schema"]) + } + + // Check data field + resultData, ok := result["data"].(map[string]any) + if !ok { + t.Errorf("Expected data to be map[string]any") + } + + if resultData["key1"] != "value1" { + t.Errorf("Expected key1 to be 'value1', got '%v'", resultData["key1"]) + } + + if resultData["key2"] != 123 { + t.Errorf("Expected key2 to be 123, got '%v'", resultData["key2"]) + } +} + +func TestSelfDescribingJson_Get_EmptyData(t *testing.T) { + schema := "iglu:com.example/test/jsonschema/1-0-0" + data := map[string]any{} + + sdj := SelfDescribingJson{ + schema: schema, + data: data, + } + + result := sdj.Get() + + if result["schema"] != schema { + t.Errorf("Expected schema '%s', got '%s'", schema, result["schema"]) + } + + resultData, ok := result["data"].(map[string]any) + if !ok { + t.Errorf("Expected data to be map[string]any") + } + + if len(resultData) != 0 { + t.Errorf("Expected empty data map, got %d items", len(resultData)) + } +} + +func TestSelfDescribingJson_Get_NestedData(t *testing.T) { + schema := "iglu:com.example/test/jsonschema/1-0-0" + data := map[string]any{ + "nested": map[string]any{ + "level2": map[string]any{ + "level3": "deep value", + }, + }, + "array": []string{"item1", "item2", "item3"}, + } + + sdj := SelfDescribingJson{ + schema: schema, + data: data, + } + + result := sdj.Get() + resultData := result["data"].(map[string]any) + + nested := resultData["nested"].(map[string]any) + level2 := nested["level2"].(map[string]any) + if level2["level3"] != "deep value" { + t.Errorf("Expected nested value 'deep value', got '%v'", level2["level3"]) + } + + array := resultData["array"].([]string) + if len(array) != 3 { + t.Errorf("Expected array length 3, got %d", len(array)) + } +} + +func TestSelfDescribingJson_String(t *testing.T) { + schema := "iglu:com.example/test/jsonschema/1-0-0" + data := map[string]any{ + "name": "test", + "count": 42, + } + + sdj := SelfDescribingJson{ + schema: schema, + data: data, + } + + result := sdj.String() + + // Parse the JSON string to validate structure + var parsed map[string]interface{} + err := json.Unmarshal([]byte(result), &parsed) + if err != nil { + t.Fatalf("Failed to parse JSON string: %v", err) + } + + // Verify schema field + if parsed["schema"] != schema { + t.Errorf("Expected schema '%s', got '%s'", schema, parsed["schema"]) + } + + // Verify data field + parsedData, ok := parsed["data"].(map[string]interface{}) + if !ok { + t.Errorf("Expected data to be a map") + } + + if parsedData["name"] != "test" { + t.Errorf("Expected name to be 'test', got '%v'", parsedData["name"]) + } + + if parsedData["count"].(float64) != 42 { + t.Errorf("Expected count to be 42, got '%v'", parsedData["count"]) + } +} + +func TestSelfDescribingJson_String_EmptyData(t *testing.T) { + schema := "iglu:com.example/test/jsonschema/1-0-0" + data := map[string]any{} + + sdj := SelfDescribingJson{ + schema: schema, + data: data, + } + + result := sdj.String() + + var parsed map[string]interface{} + err := json.Unmarshal([]byte(result), &parsed) + if err != nil { + t.Fatalf("Failed to parse JSON string: %v", err) + } + + parsedData := parsed["data"].(map[string]interface{}) + if len(parsedData) != 0 { + t.Errorf("Expected empty data map, got %d items", len(parsedData)) + } +} + +func TestSelfDescribingJson_String_JSONFormat(t *testing.T) { + schema := schemaCustomEvent + data := map[string]any{ + "event": "page_view", + "props": map[string]any{ + "url": "https://example.com", + "duration": 1500, + }, + } + + sdj := SelfDescribingJson{ + schema: schema, + data: data, + } + + result := sdj.String() + + // Verify it's valid JSON + var parsed map[string]interface{} + err := json.Unmarshal([]byte(result), &parsed) + if err != nil { + t.Fatalf("Invalid JSON format: %v", err) + } + + // Verify top-level structure + if _, hasSchema := parsed["schema"]; !hasSchema { + t.Error("JSON missing 'schema' field") + } + + if _, hasData := parsed["data"]; !hasData { + t.Error("JSON missing 'data' field") + } +} + +func TestSelfDescribingJson_WithCustomEventSchema(t *testing.T) { + data := map[string]any{ + "name": "button_clicked", + "props": map[string]any{ + "button_id": "submit", + "page": "/checkout", + }, + } + + sdj := SelfDescribingJson{ + schema: schemaCustomEvent, + data: data, + } + + result := sdj.Get() + + if result["schema"] != schemaCustomEvent { + t.Errorf("Expected schema '%s', got '%s'", schemaCustomEvent, result["schema"]) + } +} + +func TestSelfDescribingJson_WithPayloadDataSchema(t *testing.T) { + data := map[string]any{ + "data": []map[string]string{ + {"e": "pv", "url": "https://example.com"}, + {"e": "ue", "ue_pr": "{...}"}, + }, + } + + sdj := SelfDescribingJson{ + schema: schemaPayloadData, + data: data, + } + + result := sdj.Get() + + if result["schema"] != schemaPayloadData { + t.Errorf("Expected schema '%s', got '%s'", schemaPayloadData, result["schema"]) + } +} + +func TestSelfDescribingJson_String_SpecialCharacters(t *testing.T) { + schema := "iglu:com.example/test/jsonschema/1-0-0" + data := map[string]any{ + "message": "Hello \"World\" with 'quotes' and\nnewlines", + "url": "https://example.com/path?param=value&other=true", + "unicode": "こんにちは 🎉", + } + + sdj := SelfDescribingJson{ + schema: schema, + data: data, + } + + result := sdj.String() + + // Verify it's still valid JSON after special characters + var parsed map[string]interface{} + err := json.Unmarshal([]byte(result), &parsed) + if err != nil { + t.Fatalf("Failed to parse JSON with special characters: %v", err) + } + + parsedData := parsed["data"].(map[string]interface{}) + if parsedData["message"] != "Hello \"World\" with 'quotes' and\nnewlines" { + t.Errorf("Special characters not preserved correctly") + } + + if parsedData["unicode"] != "こんにちは 🎉" { + t.Errorf("Unicode characters not preserved correctly") + } +} + +func TestSelfDescribingJson_String_NumericTypes(t *testing.T) { + schema := "iglu:com.example/test/jsonschema/1-0-0" + data := map[string]any{ + "int": 42, + "float": 3.14, + "negative": -100, + "zero": 0, + } + + sdj := SelfDescribingJson{ + schema: schema, + data: data, + } + + result := sdj.String() + + var parsed map[string]interface{} + err := json.Unmarshal([]byte(result), &parsed) + if err != nil { + t.Fatalf("Failed to parse JSON: %v", err) + } + + parsedData := parsed["data"].(map[string]interface{}) + + if parsedData["int"].(float64) != 42 { + t.Errorf("Expected int to be 42") + } + + if parsedData["float"].(float64) != 3.14 { + t.Errorf("Expected float to be 3.14") + } + + if parsedData["negative"].(float64) != -100 { + t.Errorf("Expected negative to be -100") + } + + if parsedData["zero"].(float64) != 0 { + t.Errorf("Expected zero to be 0") + } +} + +func TestSelfDescribingJson_String_BooleanValues(t *testing.T) { + schema := "iglu:com.example/test/jsonschema/1-0-0" + data := map[string]any{ + "enabled": true, + "disabled": false, + } + + sdj := SelfDescribingJson{ + schema: schema, + data: data, + } + + result := sdj.String() + + var parsed map[string]interface{} + err := json.Unmarshal([]byte(result), &parsed) + if err != nil { + t.Fatalf("Failed to parse JSON: %v", err) + } + + parsedData := parsed["data"].(map[string]interface{}) + + if parsedData["enabled"] != true { + t.Errorf("Expected enabled to be true") + } + + if parsedData["disabled"] != false { + t.Errorf("Expected disabled to be false") + } +} + +func TestSelfDescribingJson_MultipleInstances(t *testing.T) { + // Test that multiple instances don't interfere with each other + sdj1 := SelfDescribingJson{ + schema: "schema1", + data: map[string]any{"key": "value1"}, + } + + sdj2 := SelfDescribingJson{ + schema: "schema2", + data: map[string]any{"key": "value2"}, + } + + result1 := sdj1.Get() + result2 := sdj2.Get() + + if result1["schema"] != "schema1" { + t.Errorf("Instance 1 schema corrupted") + } + + if result2["schema"] != "schema2" { + t.Errorf("Instance 2 schema corrupted") + } + + data1 := result1["data"].(map[string]any) + data2 := result2["data"].(map[string]any) + + if data1["key"] != "value1" { + t.Errorf("Instance 1 data corrupted") + } + + if data2["key"] != "value2" { + t.Errorf("Instance 2 data corrupted") + } +} + +func BenchmarkSelfDescribingJson_Get(b *testing.B) { + sdj := SelfDescribingJson{ + schema: schemaCustomEvent, + data: map[string]any{ + "name": "test", + "props": map[string]any{ + "key1": "value1", + "key2": 123, + }, + }, + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = sdj.Get() + } +} + +func BenchmarkSelfDescribingJson_String(b *testing.B) { + sdj := SelfDescribingJson{ + schema: schemaCustomEvent, + data: map[string]any{ + "name": "test", + "props": map[string]any{ + "key1": "value1", + "key2": 123, + }, + }, + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = sdj.String() + } +} diff --git a/events/snowplow/storage.go b/events/snowplow/storage.go new file mode 100644 index 0000000000000000000000000000000000000000..85fbd64b4a79329cfbfb7b00af4634bda9fb3eae --- /dev/null +++ b/events/snowplow/storage.go @@ -0,0 +1,89 @@ +package snowplow + +import "sync" + +// Event represents a Snowplow event as a map of key-value pairs. +type Event = map[string]string + +type EventStorage interface { + // Add stores an event and returns its unique ID. + Add(payload Event) int + + // GetWithIDs retrieves the most recent events with their IDs. + GetWithIDs(amount int) ([]Event, []int) + + // Delete removes events with the specified IDs. + Delete(ids []int) +} + +// Storage provides thread-safe in-memory storage for events. +type Storage struct { + mu sync.Mutex + items map[int]Event + nextID int +} + +// NewStorage creates a new empty Storage instance. +func NewStorage() EventStorage { + return &Storage{ + items: make(map[int]Event), + nextID: 0, + } +} + +// Add stores an event and returns its unique ID. +// The event is copied to prevent external modification. +func (s *Storage) Add(payload Event) int { + s.mu.Lock() + defer s.mu.Unlock() + + // Copy the map to prevent external modification + eventCopy := make(Event, len(payload)) + for k, v := range payload { + eventCopy[k] = v + } + + id := s.nextID + s.items[id] = eventCopy + s.nextID++ + + return id +} + +// GetWithIDs retrieves the most recent events with their IDs. +// Returns empty slices if amount <= 0. +func (s *Storage) GetWithIDs(amount int) ([]Event, []int) { + s.mu.Lock() + defer s.mu.Unlock() + + if amount <= 0 { + return []Event{}, []int{} + } + + var events []Event + var ids []int + + for i := s.nextID - 1; i >= 0 && len(events) < amount; i-- { + if event, exists := s.items[i]; exists { + // Copy the map to prevent external modification + eventCopy := make(Event, len(event)) + for k, v := range event { + eventCopy[k] = v + } + events = append(events, eventCopy) + ids = append(ids, i) + } + } + + return events, ids +} + +// Delete removes events with the specified IDs from storage. +func (s *Storage) Delete(ids []int) { + s.mu.Lock() + defer s.mu.Unlock() + + for _, id := range ids { + delete(s.items, id) + } +} diff --git a/events/snowplow/storage_test.go b/events/snowplow/storage_test.go new file mode 100644 index 0000000000000000000000000000000000000000..9390b1e69848f2fe83e148692f7aa64e1f6b4c97 --- /dev/null +++ b/events/snowplow/storage_test.go @@ -0,0 +1,323 @@ +package snowplow + +import ( + "sync" + "testing" +) + +func TestStorage_Add(t *testing.T) { + storage := NewStorage() + + event := Event{"key": "value"} + id := storage.Add(event) + + if id != 0 { + t.Errorf("Expected first ID to be 0, got %d", id) + } + + // Add another event + id2 := storage.Add(Event{"key2": "value2"}) + if id2 != 1 { + t.Errorf("Expected second ID to be 1, got %d", id2) + } +} + +func TestStorage_GetWithIDs(t *testing.T) { + storage := NewStorage() + + // Add events + storage.Add(Event{"event": "1"}) + storage.Add(Event{"event": "2"}) + storage.Add(Event{"event": "3"}) + + // Get last 2 events + events, ids := storage.GetWithIDs(2) + + if len(events) != 2 { + t.Errorf("Expected 2 events, got %d", len(events)) + } + + if len(ids) != 2 { + t.Errorf("Expected 2 IDs, got %d", len(ids)) + } + + // Events should be in reverse order (most recent first) + if events[0]["event"] != "3" { + t.Errorf("Expected first event to be '3', got '%s'", events[0]["event"]) + } + + if events[1]["event"] != "2" { + t.Errorf("Expected second event to be '2', got '%s'", events[1]["event"]) + } +} + +func TestStorage_Delete(t *testing.T) { + storage := NewStorage() + + // Add events + id1 := storage.Add(Event{"event": "1"}) + id2 := storage.Add(Event{"event": "2"}) + storage.Add(Event{"event": "3"}) + + // Delete first two events + storage.Delete([]int{id1, id2}) + + // Get all events + events, _ := storage.GetWithIDs(10) + + if len(events) != 1 { + t.Errorf("Expected 1 event after deletion, got %d", len(events)) + } + + if events[0]["event"] != "3" { + t.Errorf("Expected remaining event to be '3', got '%s'", events[0]["event"]) + } +} + +func TestStorage_GetWithIDs_EdgeCases(t *testing.T) { + storage := NewStorage() + + // Test empty storage + events, ids := storage.GetWithIDs(10) + if len(events) != 0 || len(ids) != 0 { + t.Errorf("Expected empty slices for empty storage") + } + + // Test zero amount + storage.Add(Event{"key": "value"}) + events, ids = storage.GetWithIDs(0) + if len(events) != 0 || len(ids) != 0 { + t.Errorf("Expected empty slices for zero amount") + } + + // Test negative amount + events, ids = storage.GetWithIDs(-5) + if len(events) != 0 || len(ids) != 0 { + t.Errorf("Expected empty slices for negative amount") + } + + // Test requesting more than available + events, ids = storage.GetWithIDs(100) + if len(events) != 1 { + t.Errorf("Expected 1 event when requesting more than available, got %d", len(events)) + } +} + +func TestStorage_MapAliasing(t *testing.T) { + storage := NewStorage() + + // Test Add doesn't store reference + event := Event{"key": "original"} + storage.Add(event) + + // Modify original map + event["key"] = "modified" + + // Get event and verify it wasn't modified + events, _ := storage.GetWithIDs(1) + if events[0]["key"] != "original" { + t.Errorf("Storage should copy map on Add, expected 'original', got '%s'", events[0]["key"]) + } + + // Test GetWithIDs returns copies + events, _ = storage.GetWithIDs(1) + events[0]["key"] = "changed" + + // Get again and verify original wasn't modified + events2, _ := storage.GetWithIDs(1) + if events2[0]["key"] != "original" { + t.Errorf("GetWithIDs should return copies, expected 'original', got '%s'", events2[0]["key"]) + } +} + +func TestStorage_ConcurrentAdd(t *testing.T) { + storage := NewStorage() + numGoroutines := 100 + numAddsPerGoroutine := 100 + + var wg sync.WaitGroup + wg.Add(numGoroutines) + + // Concurrently add events + for i := 0; i < numGoroutines; i++ { + go func(routineID int) { + defer wg.Done() + for j := 0; j < numAddsPerGoroutine; j++ { + storage.Add(Event{"routine": string(rune(routineID)), "count": string(rune(j))}) + } + }(i) + } + + wg.Wait() + + // Verify all events were added + events, ids := storage.GetWithIDs(numGoroutines * numAddsPerGoroutine) + expectedCount := numGoroutines * numAddsPerGoroutine + + if len(events) != expectedCount { + t.Errorf("Expected %d events, got %d", expectedCount, len(events)) + } + + if len(ids) != expectedCount { + t.Errorf("Expected %d IDs, got %d", expectedCount, len(ids)) + } + + // Verify all IDs are unique + idSet := make(map[int]bool) + for _, id := range ids { + if idSet[id] { + t.Errorf("Duplicate ID found: %d", id) + } + idSet[id] = true + } +} + +func TestStorage_ConcurrentGetAndAdd(t *testing.T) { + storage := NewStorage() + numGoroutines := 50 + iterations := 100 + + var wg sync.WaitGroup + wg.Add(numGoroutines * 2) + + // Half goroutines add, half read + for i := 0; i < numGoroutines; i++ { + // Add goroutine + go func(id int) { + defer wg.Done() + for j := 0; j < iterations; j++ { + storage.Add(Event{"id": string(rune(id)), "iteration": string(rune(j))}) + } + }(i) + + // Read goroutine + go func() { + defer wg.Done() + for j := 0; j < iterations; j++ { + storage.GetWithIDs(10) + } + }() + } + + wg.Wait() +} + +func TestStorage_ConcurrentDelete(t *testing.T) { + storage := NewStorage() + + // Pre-populate storage + var ids []int + for i := 0; i < 100; i++ { + id := storage.Add(Event{"index": string(rune(i))}) + ids = append(ids, id) + } + + var wg sync.WaitGroup + wg.Add(10) + + // Concurrently delete different subsets + for i := 0; i < 10; i++ { + go func(routineID int) { + defer wg.Done() + start := routineID * 10 + end := start + 10 + storage.Delete(ids[start:end]) + }(i) + } + + wg.Wait() + + // Verify all events were deleted + events, _ := storage.GetWithIDs(100) + if len(events) != 0 { + t.Errorf("Expected 0 events after deletion, got %d", len(events)) + } +} + +func TestStorage_ConcurrentMixed(t *testing.T) { + storage := NewStorage() + numGoroutines := 30 + iterations := 50 + + var wg sync.WaitGroup + wg.Add(numGoroutines * 3) + + // Add goroutines + for i := 0; i < numGoroutines; i++ { + go func(id int) { + defer wg.Done() + for j := 0; j < iterations; j++ { + storage.Add(Event{"op": "add", "id": string(rune(id))}) + } + }(i) + } + + // Get goroutines + for i := 0; i < numGoroutines; i++ { + go func() { + defer wg.Done() + for j := 0; j < iterations; j++ { + storage.GetWithIDs(5) + } + }() + } + + // Delete goroutines + for i := 0; i < numGoroutines; i++ { + go func() { + defer wg.Done() + for j := 0; j < iterations; j++ { + events, ids := storage.GetWithIDs(3) + if len(ids) > 0 { + storage.Delete(ids) + } + _ = events + } + }() + } + + wg.Wait() + + // Storage should still be functional + id := storage.Add(Event{"final": "test"}) + events, _ := storage.GetWithIDs(1) + if len(events) > 0 && events[0]["final"] != "test" { + t.Errorf("Storage corrupted after concurrent operations") + } + _ = id +} + +func BenchmarkStorage_Add(b *testing.B) { + storage := NewStorage() + event := Event{"key": "value"} + + b.ResetTimer() + for i := 0; i < b.N; i++ { + storage.Add(event) + } +} + +func BenchmarkStorage_GetWithIDs(b *testing.B) { + storage := NewStorage() + + // Pre-populate + for i := 0; i < 1000; i++ { + storage.Add(Event{"index": string(rune(i))}) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + storage.GetWithIDs(10) + } +} + +func BenchmarkStorage_ConcurrentAdd(b *testing.B) { + storage := NewStorage() + event := Event{"key": "value"} + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + storage.Add(event) + } + }) +} diff --git a/events/snowplow/tracker.go b/events/snowplow/tracker.go new file mode 100644 index 0000000000000000000000000000000000000000..4be41f1afbe54cc34c14df106ced12a231243a25 --- /dev/null +++ b/events/snowplow/tracker.go @@ -0,0 +1,84 @@ +package snowplow + +import ( + "encoding/base64" + "errors" + "strconv" + "time" + + "github.com/google/uuid" +) + +const ( + trackerVersionValue = "0.0.1" + platformValue = "srv" + defaultNamespace = "gitlab" + userAgentValue = "GitLab Analytics Golang Tracker" +) + +// Tracker tracks events and sends them via an Emitter. +type Tracker struct { + Emitter *Emitter + AppId string +} + +// InitTracker creates a new Tracker instance. +// Returns an error if emitter is nil or appId is empty. +func InitTracker(appId string, emitter *Emitter) (*Tracker, error) { + if emitter == nil { + return nil, errors.New("emitter cannot be nil") + } + if appId == "" { + return nil, errors.New("appId cannot be empty") + } + + return &Tracker{ + Emitter: emitter, + AppId: appId, + }, nil +} + +// TrackEvent tracks a custom event with the given name and properties. +// The properties are encoded as a self-describing JSON context. +// Props are optional - omit for events without additional properties. +// Returns an error if eventName is empty. +func (t *Tracker) TrackEvent(eventName string, props ...map[string]any) error { + if eventName == "" { + return errors.New("eventName cannot be empty") + } + + //Use provided props or default to nil + var eventProps map[string]any + if len(props) > 0 && props[0] != nil { + eventProps = props[0] + } else { + eventProps = nil + } + + payload := make(Event) + payload[fieldTrackerVersion] = trackerVersionValue + payload[fieldAppId] = t.AppId + payload[fieldPlatform] = platformValue + payload[fieldNamespace] = defaultNamespace + payload[fieldEid] = uuid.New().String() + payload[fieldTimestamp] = strconv.FormatInt(time.Now().UnixMilli(), 10) + payload[fieldUserAgent] = userAgentValue + payload[fieldEvent] = valueUnstructEvent + + customContext := &SelfDescribingJson{ + schema: schemaCustomEvent, + data: map[string]any{"name": eventName, "props": eventProps}, + } + + unstructContext := &SelfDescribingJson{ + schema: schemaUnstructEvent, + data: customContext.Get(), + } + + encodedContext := base64.StdEncoding.EncodeToString([]byte(unstructContext.String())) + payload[fieldUnstructedEncoded] = encodedContext + + t.Emitter.AddAsync(payload) + + return nil +} diff --git a/events/snowplow/tracker_test.go b/events/snowplow/tracker_test.go new file mode 100644 index 0000000000000000000000000000000000000000..e4e802146f341e0ff4ff28340713222d979b1427 --- /dev/null +++ b/events/snowplow/tracker_test.go @@ -0,0 +1,486 @@ +package snowplow + +import ( + "encoding/base64" + "encoding/json" + "io" + "log" + "net/http" + "net/url" + "strings" + "testing" + "time" +) + +func createTestEmitter(t *testing.T) *Emitter { + t.Helper() + return createTestEmitterWithStorage(t, NewStorage()) +} + +func createTestEmitterWithStorage(t *testing.T, storage EventStorage) *Emitter { + t.Helper() + // Parse a fake URL for testing + collectorUrl, err := url.Parse("http://localhost:9999/invalid") + if err != nil { + t.Fatalf("Failed to parse test URL: %v", err) + } + + // Create emitter with minimal setup for testing + // HTTP requests will fail but events will be stored + emitter := &Emitter{ + storage: storage, + collectorUrl: collectorUrl, + httpClient: &http.Client{Timeout: 1 * time.Second}, + } + return emitter +} + +func createBenchmarkEmitter(storage EventStorage) *Emitter { + // Parse a fake URL for benchmarking + collectorUrl, _ := url.Parse("http://localhost:9999/invalid") + + // Create emitter with minimal setup for benchmarking + // HTTP requests will fail but events will be stored + emitter := &Emitter{ + storage: storage, + collectorUrl: collectorUrl, + httpClient: &http.Client{Timeout: 1 * time.Second}, + } + return emitter +} + +func init() { + // Suppress log output during tests + log.SetOutput(io.Discard) +} + +func TestInitTracker_Success(t *testing.T) { + emitter := createTestEmitter(t) + appId := "test-app" + + tracker, err := InitTracker(appId, emitter) + + if err != nil { + t.Fatalf("Expected no error, got: %v", err) + } + + if tracker == nil { + t.Fatal("Expected tracker to be non-nil") + } + + if tracker.AppId != appId { + t.Errorf("Expected AppId '%s', got '%s'", appId, tracker.AppId) + } + + if tracker.Emitter != emitter { + t.Error("Expected Emitter to be set correctly") + } +} + +func TestInitTracker_NilEmitter(t *testing.T) { + appId := "test-app" + + tracker, err := InitTracker(appId, nil) + + if err == nil { + t.Fatal("Expected error for nil emitter, got nil") + } + + if tracker != nil { + t.Error("Expected tracker to be nil on error") + } + + if !strings.Contains(err.Error(), "emitter") { + t.Errorf("Expected error message to mention 'emitter', got: %v", err) + } +} + +func TestInitTracker_EmptyAppId(t *testing.T) { + emitter := &Emitter{ + storage: NewStorage(), + } + + tracker, err := InitTracker("", emitter) + + if err == nil { + t.Fatal("Expected error for empty appId, got nil") + } + + if tracker != nil { + t.Error("Expected tracker to be nil on error") + } + + if !strings.Contains(err.Error(), "appId") { + t.Errorf("Expected error message to mention 'appId', got: %v", err) + } +} + +func TestTrackEvent_Success(t *testing.T) { + storage := NewStorage() + emitter := createTestEmitterWithStorage(t, storage) + appId := "test-app" + + tracker, _ := InitTracker(appId, emitter) + + eventName := "button_clicked" + props := map[string]any{ + "button_id": "submit", + "page": "/checkout", + } + + err := tracker.TrackEvent(eventName, props) + + if err != nil { + t.Fatalf("Expected no error, got: %v", err) + } + + // Verify event was added to storage + events, _ := storage.GetWithIDs(1) + if len(events) != 1 { + t.Fatalf("Expected 1 event in storage, got %d", len(events)) + } + + event := events[0] + + // Verify required fields + if event[fieldTrackerVersion] != trackerVersionValue { + t.Errorf("Expected tracker version '%s', got '%s'", trackerVersionValue, event[fieldTrackerVersion]) + } + + if event[fieldAppId] != appId { + t.Errorf("Expected app_id '%s', got '%s'", appId, event[fieldAppId]) + } + + if event[fieldPlatform] != platformValue { + t.Errorf("Expected platform '%s', got '%s'", platformValue, event[fieldPlatform]) + } + + if event[fieldNamespace] != defaultNamespace { + t.Errorf("Expected namespace '%s', got '%s'", defaultNamespace, event[fieldNamespace]) + } + + if event[fieldEvent] != valueUnstructEvent { + t.Errorf("Expected event type '%s', got '%s'", valueUnstructEvent, event[fieldEvent]) + } + + // Verify EID is set (UUID) + if event[fieldEid] == "" { + t.Error("Expected EID to be set") + } + + // Verify timestamp is set + if event[fieldTimestamp] == "" { + t.Error("Expected timestamp to be set") + } + + // Verify context is encoded + if event[fieldContext] == "" { + t.Error("Expected context to be encoded") + } +} + +func TestTrackEvent_EmptyEventName(t *testing.T) { + storage := NewStorage() + emitter := &Emitter{ + storage: storage, + } + tracker, _ := InitTracker("test-app", emitter) + + props := map[string]any{"key": "value"} + + err := tracker.TrackEvent("", props) + + if err == nil { + t.Fatal("Expected error for empty eventName, got nil") + } + + if !strings.Contains(err.Error(), "eventName") { + t.Errorf("Expected error message to mention 'eventName', got: %v", err) + } + + // Verify no event was added to storage + events, _ := storage.GetWithIDs(1) + if len(events) != 0 { + t.Errorf("Expected 0 events in storage, got %d", len(events)) + } +} + +func TestTrackEvent_NoProps(t *testing.T) { + storage := NewStorage() + emitter := createTestEmitterWithStorage(t, storage) + tracker, _ := InitTracker("test-app", emitter) + + // No props parameter - completely optional + err := tracker.TrackEvent("event_name") + + if err != nil { + t.Fatalf("Expected no error without props, got: %v", err) + } + + // Verify event was added to storage + events, _ := storage.GetWithIDs(1) + if len(events) != 1 { + t.Fatalf("Expected 1 event in storage, got %d", len(events)) + } + + // Verify the props in context are empty + event := events[0] + contextEnc := event[fieldContext] + contextJSON, _ := base64.StdEncoding.DecodeString(contextEnc) + var contextData map[string]interface{} + json.Unmarshal(contextJSON, &contextData) + + data := contextData["data"].(map[string]interface{}) + propsData := data["props"].(map[string]interface{}) + + if len(propsData) != 0 { + t.Errorf("Expected empty props, got %d items", len(propsData)) + } +} + +func TestTrackEvent_NilProps(t *testing.T) { + storage := NewStorage() + emitter := createTestEmitterWithStorage(t, storage) + tracker, _ := InitTracker("test-app", emitter) + + // Nil props should also be treated as empty map + err := tracker.TrackEvent("event_name", nil) + + if err != nil { + t.Fatalf("Expected no error for nil props, got: %v", err) + } + + // Verify event was added to storage + events, _ := storage.GetWithIDs(1) + if len(events) != 1 { + t.Fatalf("Expected 1 event in storage, got %d", len(events)) + } + + // Verify the props in context are empty + event := events[0] + contextEnc := event[fieldContext] + contextJSON, _ := base64.StdEncoding.DecodeString(contextEnc) + var contextData map[string]interface{} + json.Unmarshal(contextJSON, &contextData) + + data := contextData["data"].(map[string]interface{}) + propsData := data["props"].(map[string]interface{}) + + if len(propsData) != 0 { + t.Errorf("Expected empty props, got %d items", len(propsData)) + } +} + +func TestTrackEvent_ContextEncoding(t *testing.T) { + storage := NewStorage() + emitter := createTestEmitterWithStorage(t, storage) + tracker, _ := InitTracker("test-app", emitter) + + eventName := "page_view" + props := map[string]any{ + "url": "https://example.com", + "duration": 1500, + "success": true, + } + + tracker.TrackEvent(eventName, props) + + events, _ := storage.GetWithIDs(1) + event := events[0] + + // Decode the context + contextEnc := event[fieldContext] + contextJSON, err := base64.StdEncoding.DecodeString(contextEnc) + if err != nil { + t.Fatalf("Failed to decode context: %v", err) + } + + // Parse the JSON + var contextData map[string]interface{} + err = json.Unmarshal(contextJSON, &contextData) + if err != nil { + t.Fatalf("Failed to parse context JSON: %v", err) + } + + // Verify schema + if contextData["schema"] != schemaCustomEvent { + t.Errorf("Expected schema '%s', got '%v'", schemaCustomEvent, contextData["schema"]) + } + + // Verify data contains name and props + data := contextData["data"].(map[string]interface{}) + if data["name"] != eventName { + t.Errorf("Expected event name '%s', got '%v'", eventName, data["name"]) + } + + propsData := data["props"].(map[string]interface{}) + if propsData["url"] != "https://example.com" { + t.Errorf("Expected url 'https://example.com', got '%v'", propsData["url"]) + } + + if propsData["duration"].(float64) != 1500 { + t.Errorf("Expected duration 1500, got '%v'", propsData["duration"]) + } + + if propsData["success"] != true { + t.Errorf("Expected success true, got '%v'", propsData["success"]) + } +} + +func TestTrackEvent_MultipleEvents(t *testing.T) { + storage := NewStorage() + emitter := createTestEmitterWithStorage(t, storage) + tracker, _ := InitTracker("test-app", emitter) + + // Track multiple events + tracker.TrackEvent("event1", map[string]any{"key": "value1"}) + tracker.TrackEvent("event2", map[string]any{"key": "value2"}) + tracker.TrackEvent("event3", map[string]any{"key": "value3"}) + + events, _ := storage.GetWithIDs(10) + if len(events) != 3 { + t.Fatalf("Expected 3 events in storage, got %d", len(events)) + } + + // Verify all events have unique EIDs + eids := make(map[string]bool) + for _, event := range events { + eventId := event[fieldEid] + if eids[eventId] { + t.Errorf("Duplicate EID found: %s", eventId) + } + eids[eventId] = true + } +} + +func TestTrackEvent_EmptyProps(t *testing.T) { + storage := NewStorage() + emitter := createTestEmitterWithStorage(t, storage) + tracker, _ := InitTracker("test-app", emitter) + + // Empty props map (not nil) should work + props := map[string]any{} + + err := tracker.TrackEvent("event_name", props) + + if err != nil { + t.Fatalf("Expected no error for empty props map, got: %v", err) + } + + events, _ := storage.GetWithIDs(1) + if len(events) != 1 { + t.Errorf("Expected 1 event in storage, got %d", len(events)) + } +} + +func TestTrackEvent_ComplexProps(t *testing.T) { + storage := NewStorage() + emitter := createTestEmitterWithStorage(t, storage) + tracker, _ := InitTracker("test-app", emitter) + + props := map[string]any{ + "string": "value", + "number": 42, + "float": 3.14, + "bool": true, + "nested": map[string]any{"key": "nested_value"}, + "array": []string{"item1", "item2"}, + "null": nil, + } + + err := tracker.TrackEvent("complex_event", props) + + if err != nil { + t.Fatalf("Expected no error, got: %v", err) + } + + events, _ := storage.GetWithIDs(1) + if len(events) != 1 { + t.Errorf("Expected 1 event in storage, got %d", len(events)) + } + + // Decode and verify the context contains all prop types + event := events[0] + contextJSON, _ := base64.StdEncoding.DecodeString(event[fieldContext]) + var contextData map[string]interface{} + json.Unmarshal(contextJSON, &contextData) + + data := contextData["data"].(map[string]interface{}) + propsData := data["props"].(map[string]interface{}) + + if propsData["string"] != "value" { + t.Error("String prop not preserved") + } + + if propsData["number"].(float64) != 42 { + t.Error("Number prop not preserved") + } + + if propsData["bool"] != true { + t.Error("Bool prop not preserved") + } + + if propsData["null"] != nil { + t.Error("Null prop not preserved") + } +} + +func TestTrackEvent_UniqueTimestamps(t *testing.T) { + storage := NewStorage() + emitter := createTestEmitterWithStorage(t, storage) + tracker, _ := InitTracker("test-app", emitter) + + // Track events quickly + for i := 0; i < 10; i++ { + tracker.TrackEvent("event", map[string]any{"index": i}) + } + + events, _ := storage.GetWithIDs(10) + if len(events) != 10 { + t.Fatalf("Expected 10 events, got %d", len(events)) + } + + // All events should have timestamps (they may or may not be unique depending on timing) + for _, event := range events { + if event[fieldTimestamp] == "" { + t.Error("Event missing timestamp") + } + } +} + +func BenchmarkTrackEvent(b *testing.B) { + storage := NewStorage() + emitter := createBenchmarkEmitter(storage) + tracker, _ := InitTracker("test-app", emitter) + + props := map[string]any{ + "key1": "value1", + "key2": 123, + "key3": true, + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + tracker.TrackEvent("benchmark_event", props) + } +} + +func BenchmarkTrackEvent_ComplexProps(b *testing.B) { + storage := NewStorage() + emitter := createBenchmarkEmitter(storage) + tracker, _ := InitTracker("test-app", emitter) + + props := map[string]any{ + "string": "value", + "number": 42, + "float": 3.14, + "bool": true, + "nested": map[string]any{"key1": "value1", "key2": "value2"}, + "array": []string{"item1", "item2", "item3"}, + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + tracker.TrackEvent("complex_event", props) + } +} diff --git a/go.mod b/go.mod index dda720d3fd8bcb0198881de20a4568223d7c7a0a..7ef89c334beda18a2425676bab20715d8d006907 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( contrib.go.opencensus.io/exporter/stackdriver v0.13.8 github.com/getsentry/raven-go v0.2.0 github.com/getsentry/sentry-go v0.13.0 + github.com/google/uuid v1.6.0 github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.2 github.com/lightstep/lightstep-tracer-go v0.25.0 @@ -46,7 +47,6 @@ require ( github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/google/pprof v0.0.0-20210804190019-f964ff605595 // indirect - github.com/google/uuid v1.6.0 // indirect github.com/googleapis/gax-go/v2 v2.0.5 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/klauspost/compress v1.17.9 // indirect