我试图轮询API保持一个时间序列的交通数据,并保存数据到postgres时,已经发生了变化。
目前我有一个类似这样的实现
//this needs to check the api for new information every X seconds
func Poll(req *http.Request, client *http.Client) ([]byte, error) {
r := rand.New(rand.NewSource(99))
c := time.Tick(10 * time.Second)
for _ = range c {
//Download the current contents of the URL and do something with it
response, err := client.Do(req)
data, _ := io.ReadAll(response.Body)
if err != nil {
return nil, err
}
return data, nil
// add a bit of jitter
jitter := time.Duration(r.Int31n(5000)) * time.Millisecond
time.Sleep(jitter)
}
}
func main() {
client := &http.Client{
Timeout: time.Second * 60 * 60 * 600,
}
url := "https://data-exchange-api.vicroads.vic.gov.au/bluetooth_data/links"
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return err
}
req.Header.Set("Ocp-Apim-Subscription-Key", "xx")
// response, err := client.Do(req)
data, err := Poll(req, client)
fmt.Println(string(data))
}
我接下来会做一个对比功能。
基本上,我正在尝试解决如何确保循环首先调用查询并返回适当的值。
我认为这个实现可能不是很好,我只是不确定如何真正正确地实现它。我能得到一些建议吗?
您的问题呈现出典型的生产者/消费者场景,因为您的Poll()函数正在生成由您的main()函数使用的响应数据(可能是将数据保存在postgres中)。这个问题可以通过使用go例程和通道来很好地解决。
轮询工作可以在goroutine中完成,它通过通道将响应数据传递给main函数。轮询工作时也可能出现错误(响应错误或io错误),因此它也应该传递给main()函数。
首先定义一个新类型来保存轮询数据和错误:
type PollResponse struct {
Data []byte
Err error
}
在Poll()函数中,启动一个go例程来进行轮询工作,并返回一个通道来共享go例程之外的数据:
func Poll(req *http.Request, client *http.Client) (ch chan PollResponse){
ch = make(chan PollResponse) // Buffered channel is also good
go func() {
defer func() {
close(ch)
}()
r := rand.New(rand.NewSource(99))
c := time.Tick(10 * time.Second)
for range c {
res, err := client.Do(req);
pollRes := PollResponse {}
if err != nil {
pollRes.Data, pollRes.Err = nil, err
ch <- pollRes
break
}
pollRes.Data, pollRes.Err = io.ReadAll(res.Body)
ch <- pollRes
if pollRes.Err != nil {
break
}
jitter := time.Duration(r.Int31n(5000)) * time.Millisecond
time.Sleep(jitter)
}
}()
return
}
最后在main()函数中,调用Poll()并读取通道以获得轮询响应:
func main() {
client := &http.Client{
Timeout: time.Second * 60 * 60 * 600,
}
url := "https://data-exchange-api.vicroads.vic.gov.au/bluetooth_data/links"
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return
}
req.Header.Set("Ocp-Apim-Subscription-Key", "xx")
pollCh := Poll(req, client)
for item := range pollCh {
if item.Err == nil {
fmt.Println(string(item.Data)) // or save it to postgres database
}
}
}
覆盖ticker通道。在每次迭代中,获取数据,检查数据是否发生变化并处理数据。关键点是从循环内部处理数据,而不是从函数返回数据。
假设您具有以下功能:
// procesChangedData updates the database with new
// data from the API endpoint.
func processChangedData(data []byte) error {
// implement save to postgress
}
使用以下函数轮询:
func Poll(client *http.Client) error {
url := "https://data-exchange-api.vicroads.vic.gov.au/bluetooth_data/links"
// Use NewTicker instead of Tick so we can cleanup
// ticker on return from the function.
t := time.NewTicker(10 * time.Second)
defer t.Stop()
var prev []byte
for _ = range t.C {
// Create a new request objet for each request.
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return err
}
req.Header.Set("Ocp-Apim-Subscription-Key", "xx")
resp, err := client.Do(req)
if err != nil {
// Edit error handling to match application
// requirements. I return an error here. Continuing
// the loop is also an option.
return err
}
data, err := io.ReadAll(resp.Body)
// Ensure that body is closed before handling errors
// below.
resp.Body.Close()
if err != nil {
// Edit error handling to match application
// requirements. I return an error here. Continuing
// the loop is also an option.
return err
}
if resp.StatusCode != http.StatusOK {
// Edit error handling to match application
// requirements. I return an error here. Continuing
// the loop is also an option.
return fmt.Errorf("bad status %d", resp.StatusCode)
}
if bytes.Equal(data, prev) {
continue
}
prev = data
if err := processChangedData(data); err != nil {
// Edit error handling to match application
// requirements. I return an error here. Continuing
// the loop is also an option.
return err
}
}
panic("unexpected break from loop")
}