package ntfy import ( "bufio" "log" "net/http" "net/url" "path" "time" ) type Message struct { Topic string Text string } type NtfyWatcher struct { Endpoint string Topics []string } func (w *NtfyWatcher) Watch() chan Message { notifications := make(chan Message) for _, topic := range w.Topics { log.Println("subscribing to topic:", topic) go func() { retryCount := 5 retryTime := 5 * time.Second retries := retryCount sleepAndDecrementRetry := func() { log.Println("waiting 5 seconds before reconnecting. retries left:", retries, "topic:", topic, "endpoint:", w.Endpoint) time.Sleep(retryTime) retries-- } for true { if retries == 0 { log.Fatal("too many retries, exiting") } endpoint, _ := url.JoinPath(w.Endpoint, path.Join(topic, "json")) resp, err := http.Get(endpoint) if err != nil { log.Println("error connecting to endpoint:", err) sleepAndDecrementRetry() continue } defer resp.Body.Close() scanner := bufio.NewScanner(resp.Body) for scanner.Scan() { text := scanner.Text() log.Println("received notification:", text) notifications <- Message{Topic: topic, Text: text} retries = retryCount // reset retries } if err := scanner.Err(); err != nil { log.Println("error reading response body:", err) sleepAndDecrementRetry() } } }() } return notifications } func MakeNtfyWatcher(endpoint string, topics []string) *NtfyWatcher { return &NtfyWatcher{ Endpoint: endpoint, Topics: topics, } }