提问者:小点点

使用Go语言检查api以连续更改数据


我试图轮询API保持一个时间序列的交通数据,并保存数据到postgres时,已经发生了变化。

目前我有一个类似这样的实现

//this needs to check the api for new information every X seconds
func Poll(req *http.Request, client *http.Client) ([]byte, error) {
    r := rand.New(rand.NewSource(99))
    c := time.Tick(10 * time.Second)
    for _ = range c {
        //Download the current contents of the URL and do something with it
        response, err := client.Do(req)
        data, _ := io.ReadAll(response.Body)

        if err != nil {
            return nil, err
        }
        return data, nil
        // add a bit of jitter
        jitter := time.Duration(r.Int31n(5000)) * time.Millisecond
        time.Sleep(jitter)
    }

}



func main() {

    client := &http.Client{
        Timeout: time.Second * 60 * 60 * 600,
    }
    url := "https://data-exchange-api.vicroads.vic.gov.au/bluetooth_data/links"
    req, err := http.NewRequest("GET", url, nil)
    if err != nil {
        return err
    }
    req.Header.Set("Ocp-Apim-Subscription-Key", "xx")

    // response, err := client.Do(req)
    data, err := Poll(req, client)
    fmt.Println(string(data))

}

我接下来会做一个对比功能。

基本上,我正在尝试解决如何确保循环首先调用查询并返回适当的值。

我认为这个实现可能不是很好,我只是不确定如何真正正确地实现它。我能得到一些建议吗?


共2个答案

匿名用户

您的问题呈现出典型的生产者/消费者场景,因为您的Poll()函数正在生成由您的main()函数使用的响应数据(可能是将数据保存在postgres中)。这个问题可以通过使用go例程和通道来很好地解决。

轮询工作可以在goroutine中完成,它通过通道将响应数据传递给main函数。轮询工作时也可能出现错误(响应错误或io错误),因此它也应该传递给main()函数。

首先定义一个新类型来保存轮询数据和错误:

type PollResponse struct {
    Data []byte
    Err error
}

在Poll()函数中,启动一个go例程来进行轮询工作,并返回一个通道来共享go例程之外的数据:

func Poll(req *http.Request, client *http.Client) (ch chan PollResponse){
    ch = make(chan PollResponse) // Buffered channel is also good
    go func() {
        defer func() {
            close(ch)
        }()
        r := rand.New(rand.NewSource(99))
        c := time.Tick(10 * time.Second)

        for range c {
            res, err := client.Do(req);
            pollRes := PollResponse {}
            if err != nil {
                pollRes.Data, pollRes.Err = nil, err
                ch <- pollRes
                break
            }
            pollRes.Data, pollRes.Err = io.ReadAll(res.Body)
            ch <- pollRes
            if pollRes.Err != nil {
                break
            }
            jitter := time.Duration(r.Int31n(5000)) * time.Millisecond
            time.Sleep(jitter)
        }
    }()
    return
}

最后在main()函数中,调用Poll()并读取通道以获得轮询响应:

func main() {
    client := &http.Client{
        Timeout: time.Second * 60 * 60 * 600,
    }
    url := "https://data-exchange-api.vicroads.vic.gov.au/bluetooth_data/links"

    req, err := http.NewRequest("GET", url, nil)
    if err != nil {
        return
    }
    req.Header.Set("Ocp-Apim-Subscription-Key", "xx")

    pollCh := Poll(req, client)
    
    for item := range pollCh {
        if item.Err == nil {
            fmt.Println(string(item.Data)) // or save it to postgres database
        }       
    }
}

匿名用户

覆盖ticker通道。在每次迭代中,获取数据,检查数据是否发生变化并处理数据。关键点是从循环内部处理数据,而不是从函数返回数据。

假设您具有以下功能:

// procesChangedData updates the database with new
// data from the API endpoint.
func processChangedData(data []byte) error {
    // implement save to postgress
}

使用以下函数轮询:

func Poll(client *http.Client) error {

    url := "https://data-exchange-api.vicroads.vic.gov.au/bluetooth_data/links"

    // Use NewTicker instead of Tick so we can cleanup
    // ticker on return from the function.
    t := time.NewTicker(10 * time.Second)
    defer t.Stop()

    var prev []byte

    for _ = range t.C {

        // Create a new request objet for each request.
        req, err := http.NewRequest("GET", url, nil)
        if err != nil {
            return err
        }
        req.Header.Set("Ocp-Apim-Subscription-Key", "xx")

        resp, err := client.Do(req)
        if err != nil {
            // Edit error handling to match application 
            // requirements. I return an error here. Continuing
            // the loop is also an option.
            return err
        }

        data, err := io.ReadAll(resp.Body)

        // Ensure that body is closed before handling errors
        // below.
        resp.Body.Close()

        if err != nil {
            // Edit error handling to match application 
            // requirements. I return an error here. Continuing
            // the loop is also an option.
            return err
        }

        if resp.StatusCode != http.StatusOK {
            // Edit error handling to match application 
            // requirements. I return an error here. Continuing
            // the loop is also an option.
            return fmt.Errorf("bad status %d", resp.StatusCode)
        }

        if bytes.Equal(data, prev) {
            continue
        }
        prev = data

        if err := processChangedData(data); err != nil {
            // Edit error handling to match application 
            // requirements. I return an error here. Continuing
            // the loop is also an option.
            return err
        }
    }
    panic("unexpected break from loop")
}