backup-notify/ntfy/watcher.go

78 lines
1.5 KiB
Go
Raw Normal View History

2024-04-21 21:46:40 -04:00
package ntfy
import (
"bufio"
"log"
"net/http"
"net/url"
"path"
"time"
)
type Message struct {
Topic string
Text string
}
type NtfyWatcher struct {
Endpoint string
Topics []string
}
func (w *NtfyWatcher) Watch() chan Message {
notifications := make(chan Message)
for _, topic := range w.Topics {
log.Println("subscribing to topic:", topic)
go func() {
retryCount := 5
retryTime := 5 * time.Second
retries := retryCount
retry := func() {
log.Println("waiting 5 seconds before reconnecting. retries left:", retries, "topic:", topic, "endpoint:", w.Endpoint)
time.Sleep(retryTime)
retries--
}
for true {
if retries == 0 {
log.Fatal("too many retries, exiting")
}
endpoint, _ := url.JoinPath(w.Endpoint, path.Join(topic, "json"))
resp, err := http.Get(endpoint)
if err != nil {
log.Println("error connecting to endpoint:", err)
retry()
continue
}
defer resp.Body.Close()
scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
text := scanner.Text()
log.Println("received notification:", text)
notifications <- Message{Topic: topic, Text: text}
retries = retryCount // reset retries
}
if err := scanner.Err(); err != nil {
log.Println("error reading response body:", err)
retry()
}
}
}()
}
return notifications
}
func MakeNtfyWatcher(endpoint string, topics []string) *NtfyWatcher {
return &NtfyWatcher{
Endpoint: endpoint,
Topics: topics,
}
}