package client import ( "encoding/json" "fmt" "net/http" "net/http/httptest" "reflect" "strings" "sync" "testing" "time" ) func TestUDPClient_Query(t *testing.T) { config := UDPConfig{Addr: "localhost:8089"} c, err := NewUDPClient(config) if err != nil { t.Errorf("unexpected error. expected %v, actual %v", nil, err) } defer c.Close() query := Query{} _, err = c.Query(query) if err == nil { t.Error("Querying UDP client should fail") } } func TestUDPClient_Ping(t *testing.T) { config := UDPConfig{Addr: "localhost:8089"} c, err := NewUDPClient(config) if err != nil { t.Errorf("unexpected error. expected %v, actual %v", nil, err) } defer c.Close() rtt, version, err := c.Ping(0) if rtt != 0 || version != "" || err != nil { t.Errorf("unexpected error. expected (%v, '%v', %v), actual (%v, '%v', %v)", 0, "", nil, rtt, version, err) } } func TestUDPClient_Write(t *testing.T) { config := UDPConfig{Addr: "localhost:8089"} c, err := NewUDPClient(config) if err != nil { t.Errorf("unexpected error. expected %v, actual %v", nil, err) } defer c.Close() bp, err := NewBatchPoints(BatchPointsConfig{}) if err != nil { t.Errorf("unexpected error. expected %v, actual %v", nil, err) } fields := make(map[string]interface{}) fields["value"] = 1.0 pt, _ := NewPoint("cpu", make(map[string]string), fields) bp.AddPoint(pt) err = c.Write(bp) if err != nil { t.Errorf("unexpected error. expected %v, actual %v", nil, err) } } func TestUDPClient_BadAddr(t *testing.T) { config := UDPConfig{Addr: "foobar@wahoo"} c, err := NewUDPClient(config) if err == nil { defer c.Close() t.Error("Expected resolve error") } } func TestUDPClient_Batches(t *testing.T) { var logger writeLogger var cl udpclient cl.conn = &logger cl.payloadSize = 20 // should allow for two points per batch // expected point should look like this: "cpu a=1i" fields := map[string]interface{}{"a": 1} p, _ := NewPoint("cpu", nil, fields, time.Time{}) bp, _ := NewBatchPoints(BatchPointsConfig{}) for i := 0; i < 9; i++ { bp.AddPoint(p) } if err := cl.Write(bp); err != nil { t.Fatalf("Unexpected error during Write: %v", err) } if len(logger.writes) != 5 { t.Errorf("Mismatched write count: got %v, exp %v", len(logger.writes), 5) } } func TestUDPClient_Split(t *testing.T) { var logger writeLogger var cl udpclient cl.conn = &logger cl.payloadSize = 1 // force one field per point fields := map[string]interface{}{"a": 1, "b": 2, "c": 3, "d": 4} p, _ := NewPoint("cpu", nil, fields, time.Unix(1, 0)) bp, _ := NewBatchPoints(BatchPointsConfig{}) bp.AddPoint(p) if err := cl.Write(bp); err != nil { t.Fatalf("Unexpected error during Write: %v", err) } if len(logger.writes) != len(fields) { t.Errorf("Mismatched write count: got %v, exp %v", len(logger.writes), len(fields)) } } type writeLogger struct { writes [][]byte } func (w *writeLogger) Write(b []byte) (int, error) { w.writes = append(w.writes, append([]byte(nil), b...)) return len(b), nil } func (w *writeLogger) Close() error { return nil } func TestClient_Query(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { var data Response w.WriteHeader(http.StatusOK) _ = json.NewEncoder(w).Encode(data) })) defer ts.Close() config := HTTPConfig{Addr: ts.URL} c, _ := NewHTTPClient(config) defer c.Close() query := Query{} _, err := c.Query(query) if err != nil { t.Errorf("unexpected error. expected %v, actual %v", nil, err) } } func TestClient_ChunkedQuery(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { var data Response w.WriteHeader(http.StatusOK) enc := json.NewEncoder(w) _ = enc.Encode(data) _ = enc.Encode(data) })) defer ts.Close() config := HTTPConfig{Addr: ts.URL} c, err := NewHTTPClient(config) if err != nil { t.Fatalf("unexpected error. expected %v, actual %v", nil, err) } query := Query{Chunked: true} _, err = c.Query(query) if err != nil { t.Fatalf("unexpected error. expected %v, actual %v", nil, err) } } func TestClient_BoundParameters(t *testing.T) { var parameterString string ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { var data Response r.ParseForm() parameterString = r.FormValue("params") w.WriteHeader(http.StatusOK) _ = json.NewEncoder(w).Encode(data) })) defer ts.Close() config := HTTPConfig{Addr: ts.URL} c, _ := NewHTTPClient(config) defer c.Close() expectedParameters := map[string]interface{}{ "testStringParameter": "testStringValue", "testNumberParameter": 12.3, } query := Query{ Parameters: expectedParameters, } _, err := c.Query(query) if err != nil { t.Errorf("unexpected error. expected %v, actual %v", nil, err) } var actualParameters map[string]interface{} err = json.Unmarshal([]byte(parameterString), &actualParameters) if err != nil { t.Errorf("unexpected error. expected %v, actual %v", nil, err) } if !reflect.DeepEqual(expectedParameters, actualParameters) { t.Errorf("unexpected parameters. expected %v, actual %v", expectedParameters, actualParameters) } } func TestClient_BasicAuth(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { u, p, ok := r.BasicAuth() if !ok { t.Errorf("basic auth error") } if u != "username" { t.Errorf("unexpected username, expected %q, actual %q", "username", u) } if p != "password" { t.Errorf("unexpected password, expected %q, actual %q", "password", p) } var data Response w.WriteHeader(http.StatusOK) _ = json.NewEncoder(w).Encode(data) })) defer ts.Close() config := HTTPConfig{Addr: ts.URL, Username: "username", Password: "password"} c, _ := NewHTTPClient(config) defer c.Close() query := Query{} _, err := c.Query(query) if err != nil { t.Errorf("unexpected error. expected %v, actual %v", nil, err) } } func TestClient_Ping(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { var data Response w.WriteHeader(http.StatusNoContent) _ = json.NewEncoder(w).Encode(data) })) defer ts.Close() config := HTTPConfig{Addr: ts.URL} c, _ := NewHTTPClient(config) defer c.Close() _, _, err := c.Ping(0) if err != nil { t.Errorf("unexpected error. expected %v, actual %v", nil, err) } } func TestClient_Concurrent_Use(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) w.Write([]byte(`{}`)) })) defer ts.Close() config := HTTPConfig{Addr: ts.URL} c, _ := NewHTTPClient(config) defer c.Close() var wg sync.WaitGroup wg.Add(3) n := 1000 errC := make(chan error) go func() { defer wg.Done() bp, err := NewBatchPoints(BatchPointsConfig{}) if err != nil { errC <- fmt.Errorf("got error %v", err) return } for i := 0; i < n; i++ { if err = c.Write(bp); err != nil { errC <- fmt.Errorf("got error %v", err) return } } }() go func() { defer wg.Done() var q Query for i := 0; i < n; i++ { if _, err := c.Query(q); err != nil { errC <- fmt.Errorf("got error %v", err) return } } }() go func() { defer wg.Done() for i := 0; i < n; i++ { c.Ping(time.Second) } }() go func() { wg.Wait() close(errC) }() for err := range errC { if err != nil { t.Error(err) } } } func TestClient_Write(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { var data Response w.WriteHeader(http.StatusNoContent) _ = json.NewEncoder(w).Encode(data) })) defer ts.Close() config := HTTPConfig{Addr: ts.URL} c, _ := NewHTTPClient(config) defer c.Close() bp, err := NewBatchPoints(BatchPointsConfig{}) if err != nil { t.Errorf("unexpected error. expected %v, actual %v", nil, err) } err = c.Write(bp) if err != nil { t.Errorf("unexpected error. expected %v, actual %v", nil, err) } } func TestClient_UserAgent(t *testing.T) { receivedUserAgent := "" ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { receivedUserAgent = r.UserAgent() var data Response w.WriteHeader(http.StatusOK) _ = json.NewEncoder(w).Encode(data) })) defer ts.Close() _, err := http.Get(ts.URL) if err != nil { t.Errorf("unexpected error. expected %v, actual %v", nil, err) } tests := []struct { name string userAgent string expected string }{ { name: "Empty user agent", userAgent: "", expected: "InfluxDBClient", }, { name: "Custom user agent", userAgent: "Test Influx Client", expected: "Test Influx Client", }, } for _, test := range tests { config := HTTPConfig{Addr: ts.URL, UserAgent: test.userAgent} c, _ := NewHTTPClient(config) defer c.Close() receivedUserAgent = "" query := Query{} _, err = c.Query(query) if err != nil { t.Errorf("unexpected error. expected %v, actual %v", nil, err) } if !strings.HasPrefix(receivedUserAgent, test.expected) { t.Errorf("Unexpected user agent. expected %v, actual %v", test.expected, receivedUserAgent) } receivedUserAgent = "" bp, _ := NewBatchPoints(BatchPointsConfig{}) err = c.Write(bp) if err != nil { t.Errorf("unexpected error. expected %v, actual %v", nil, err) } if !strings.HasPrefix(receivedUserAgent, test.expected) { t.Errorf("Unexpected user agent. expected %v, actual %v", test.expected, receivedUserAgent) } receivedUserAgent = "" _, err := c.Query(query) if err != nil { t.Errorf("unexpected error. expected %v, actual %v", nil, err) } if receivedUserAgent != test.expected { t.Errorf("Unexpected user agent. expected %v, actual %v", test.expected, receivedUserAgent) } } } func TestClient_PointString(t *testing.T) { const shortForm = "2006-Jan-02" time1, _ := time.Parse(shortForm, "2013-Feb-03") tags := map[string]string{"cpu": "cpu-total"} fields := map[string]interface{}{"idle": 10.1, "system": 50.9, "user": 39.0} p, _ := NewPoint("cpu_usage", tags, fields, time1) s := "cpu_usage,cpu=cpu-total idle=10.1,system=50.9,user=39 1359849600000000000" if p.String() != s { t.Errorf("Point String Error, got %s, expected %s", p.String(), s) } s = "cpu_usage,cpu=cpu-total idle=10.1,system=50.9,user=39 1359849600000" if p.PrecisionString("ms") != s { t.Errorf("Point String Error, got %s, expected %s", p.PrecisionString("ms"), s) } } func TestClient_PointWithoutTimeString(t *testing.T) { tags := map[string]string{"cpu": "cpu-total"} fields := map[string]interface{}{"idle": 10.1, "system": 50.9, "user": 39.0} p, _ := NewPoint("cpu_usage", tags, fields) s := "cpu_usage,cpu=cpu-total idle=10.1,system=50.9,user=39" if p.String() != s { t.Errorf("Point String Error, got %s, expected %s", p.String(), s) } if p.PrecisionString("ms") != s { t.Errorf("Point String Error, got %s, expected %s", p.PrecisionString("ms"), s) } } func TestClient_PointName(t *testing.T) { tags := map[string]string{"cpu": "cpu-total"} fields := map[string]interface{}{"idle": 10.1, "system": 50.9, "user": 39.0} p, _ := NewPoint("cpu_usage", tags, fields) exp := "cpu_usage" if p.Name() != exp { t.Errorf("Error, got %s, expected %s", p.Name(), exp) } } func TestClient_PointTags(t *testing.T) { tags := map[string]string{"cpu": "cpu-total"} fields := map[string]interface{}{"idle": 10.1, "system": 50.9, "user": 39.0} p, _ := NewPoint("cpu_usage", tags, fields) if !reflect.DeepEqual(tags, p.Tags()) { t.Errorf("Error, got %v, expected %v", p.Tags(), tags) } } func TestClient_PointUnixNano(t *testing.T) { const shortForm = "2006-Jan-02" time1, _ := time.Parse(shortForm, "2013-Feb-03") tags := map[string]string{"cpu": "cpu-total"} fields := map[string]interface{}{"idle": 10.1, "system": 50.9, "user": 39.0} p, _ := NewPoint("cpu_usage", tags, fields, time1) exp := int64(1359849600000000000) if p.UnixNano() != exp { t.Errorf("Error, got %d, expected %d", p.UnixNano(), exp) } } func TestClient_PointFields(t *testing.T) { tags := map[string]string{"cpu": "cpu-total"} fields := map[string]interface{}{"idle": 10.1, "system": 50.9, "user": 39.0} p, _ := NewPoint("cpu_usage", tags, fields) pfields, err := p.Fields() if err != nil { t.Fatal(err) } if !reflect.DeepEqual(fields, pfields) { t.Errorf("Error, got %v, expected %v", pfields, fields) } } func TestBatchPoints_PrecisionError(t *testing.T) { _, err := NewBatchPoints(BatchPointsConfig{Precision: "foobar"}) if err == nil { t.Errorf("Precision: foobar should have errored") } bp, _ := NewBatchPoints(BatchPointsConfig{Precision: "ns"}) err = bp.SetPrecision("foobar") if err == nil { t.Errorf("Precision: foobar should have errored") } } func TestBatchPoints_SettersGetters(t *testing.T) { bp, _ := NewBatchPoints(BatchPointsConfig{ Precision: "ns", Database: "db", RetentionPolicy: "rp", WriteConsistency: "wc", }) if bp.Precision() != "ns" { t.Errorf("Expected: %s, got %s", bp.Precision(), "ns") } if bp.Database() != "db" { t.Errorf("Expected: %s, got %s", bp.Database(), "db") } if bp.RetentionPolicy() != "rp" { t.Errorf("Expected: %s, got %s", bp.RetentionPolicy(), "rp") } if bp.WriteConsistency() != "wc" { t.Errorf("Expected: %s, got %s", bp.WriteConsistency(), "wc") } bp.SetDatabase("db2") bp.SetRetentionPolicy("rp2") bp.SetWriteConsistency("wc2") err := bp.SetPrecision("s") if err != nil { t.Errorf("Did not expect error: %s", err.Error()) } if bp.Precision() != "s" { t.Errorf("Expected: %s, got %s", bp.Precision(), "s") } if bp.Database() != "db2" { t.Errorf("Expected: %s, got %s", bp.Database(), "db2") } if bp.RetentionPolicy() != "rp2" { t.Errorf("Expected: %s, got %s", bp.RetentionPolicy(), "rp2") } if bp.WriteConsistency() != "wc2" { t.Errorf("Expected: %s, got %s", bp.WriteConsistency(), "wc2") } }