package ntfy import ( "bufio" "encoding/json" "log" "net/http" "net/url" "path" "time" ) type Message struct { Id string `json:"id"` Time int `json:"time"` Message string `json:"message"` Event string `json:"event"` } type NtfyWatcher struct { Endpoint string Topics []string } func (w *NtfyWatcher) Watch() chan Message { notifications := make(chan Message) for _, topic := range w.Topics { log.Println("subscribing to topic:", topic) go func() { retryCount := 5 retryTime := 5 * time.Second retries := retryCount sleepAndDecrementRetry := func() { log.Println("waiting 5 seconds before reconnecting. retries left:", retries, "topic:", topic, "endpoint:", w.Endpoint) time.Sleep(retryTime) retries-- } for true { if retries == 0 { log.Fatal("too many retries, exiting") } endpoint, _ := url.JoinPath(w.Endpoint, path.Join(topic, "json")) resp, err := http.Get(endpoint) if err != nil { log.Println("error connecting to endpoint:", err) sleepAndDecrementRetry() continue } defer resp.Body.Close() scanner := bufio.NewScanner(resp.Body) for scanner.Scan() { bytes := scanner.Bytes() var msg Message err := json.Unmarshal(bytes, &msg) if err != nil { log.Println("could not unmarshal message:", err) continue } if msg.Event == "keepalive" { log.Println("received keepalive message") continue } if msg.Event != "message" { log.Println("received unknown event:", msg.Event) continue } log.Println("received notification:", msg) notifications <- msg retries = retryCount // reset retries } if err := scanner.Err(); err != nil { log.Println("error reading response body:", err) sleepAndDecrementRetry() } } }() } return notifications } func MakeNtfyWatcher(endpoint string, topics []string) *NtfyWatcher { return &NtfyWatcher{ Endpoint: endpoint, Topics: topics, } }