Skip to content

Commit

Permalink
api: Create a couple additional API endpoints (#14)
Browse files Browse the repository at this point in the history
* api: Create GetAssetByPlaybackID API

* api: Create DeleteAsset API

* api: Add options to the ListAssets function

* api: Use ListAssets from GetAssetByPlaybackId
  • Loading branch information
victorges committed Jul 21, 2022
1 parent 88bef5d commit 95e2bb1
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 11 deletions.
90 changes: 80 additions & 10 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/golang/glog"
"github.com/livepeer/go-api-client/logs"
"github.com/livepeer/go-api-client/metrics"
"github.com/tomnomnom/linkheader"
)

const (
Expand Down Expand Up @@ -271,6 +272,7 @@ type (

Asset struct {
ID string `json:"id"`
Deleted bool `json:"deleted,omitempty"`
PlaybackID string `json:"playbackId"`
UserID string `json:"userId"`
CreatedAt int64 `json:"createdAt"`
Expand Down Expand Up @@ -360,6 +362,14 @@ type (
deactivateManyResp struct {
RowCount int `json:"rowCount"`
}

ListOptions struct {
Limit int
Cursor string
AllUsers, IncludeDeleted bool
Filters map[string]interface{}
Order map[string]bool
}
)

// NewAPIClientGeolocated creates a new Livepeer API object calling the
Expand Down Expand Up @@ -983,13 +993,65 @@ func (lapi *Client) GetAsset(id string) (*Asset, error) {
return &asset, nil
}

func (lapi *Client) ListAssets() (*[]Asset, error) {
var assets []Asset
url := fmt.Sprintf("%s/api/asset", lapi.chosenServer)
if err := lapi.getJSON(url, "asset", "", &assets); err != nil {
func (lapi *Client) ListAssets(opts ListOptions) ([]*Asset, string, error) {
if opts.Limit <= 0 {
opts.Limit = 10
}
url := fmt.Sprintf("%s/api/asset?limit=%d&cursor=%s&allUsers=%v&all=%v", lapi.chosenServer, opts.Limit, opts.Cursor, opts.AllUsers, opts.IncludeDeleted)
if len(opts.Filters) > 0 {
filtersStrs := make([]string, 0, len(opts.Filters))
for k, v := range opts.Filters {
vb, err := json.Marshal(v)
if err != nil {
return nil, "", fmt.Errorf("error marshaling filter: %w", err)
}
filtersStrs = append(filtersStrs, fmt.Sprintf(`{"id":%q,"value":%s}`, k, vb))
}
url += fmt.Sprintf(`&filters=[%s]`, strings.Join(filtersStrs, ","))
}
if len(opts.Order) > 0 {
orderStrs := make([]string, 0, len(opts.Order))
for field, descending := range opts.Order {
orderStrs = append(orderStrs, fmt.Sprintf(`%s-%v`, field, descending))
}
url += fmt.Sprintf(`&order=%s`, strings.Join(orderStrs, ","))
}

var assets []*Asset
headers, err := lapi.doRequestHeaders("GET", url, "asset", "list-assets", nil, &assets)
if err != nil {
return nil, "", err
}
next := ""
links := linkheader.ParseMultiple(headers["Link"]).FilterByRel("next")
if len(links) > 0 {
next = links[0].URL
}
return assets, next, nil
}

func (lapi *Client) GetAssetByPlaybackID(pid string, includeDeleted bool) (*Asset, error) {
assets, _, err := lapi.ListAssets(ListOptions{
Limit: 2,
AllUsers: true,
IncludeDeleted: includeDeleted,
Filters: map[string]interface{}{
"playbackId": pid,
}})
if err != nil {
return nil, err
}
return &assets, nil
if len(assets) == 0 {
return nil, ErrNotExists
} else if len(assets) > 1 {
return nil, fmt.Errorf("multiple assets found for playbackId %q", pid)
}
return assets[0], nil
}

func (lapi *Client) DeleteAsset(id string) error {
url := fmt.Sprintf("%s/api/asset/%s", lapi.chosenServer, id)
return lapi.doRequest("DELETE", url, "asset", "", nil, nil)
}

func (lapi *Client) GetObjectStore(id string) (*ObjectStore, error) {
Expand Down Expand Up @@ -1051,38 +1113,46 @@ func (lapi *Client) getJSON(url, resourceType, metricName string, output interfa
}

func (lapi *Client) doRequest(method, url, resourceType, metricName string, input, output interface{}) error {
_, err := lapi.doRequestHeaders(method, url, resourceType, metricName, input, output)
return err
}

func (lapi *Client) doRequestHeaders(method, url, resourceType, metricName string, input, output interface{}) (http.Header, error) {
if metricName == "" {
metricName = strings.ToLower(method + "_" + resourceType)
}
start := time.Now()
req, err := lapi.newRequest(method, url, input)
if err != nil {
return err
return nil, err
}

resp, err := lapi.httpClient.Do(req)
if err != nil {
glog.Errorf("Error calling Livepeer API resource=%s method=%s url=%s error=%q", resourceType, method, url, err)
lapi.metrics.APIRequest(metricName, 0, err)
return err
return nil, err
}
defer resp.Body.Close()

if err := checkResponseError(resp); err != nil {
lapi.metrics.APIRequest(metricName, 0, err)
return err
return resp.Header, err
}

b, err := ioutil.ReadAll(resp.Body)
if err != nil {
glog.Errorf("Error reading Livepeer API response body resource=%s method=%s url=%s error=%q", resourceType, method, url, err)
lapi.metrics.APIRequest(metricName, 0, err)
return err
return resp.Header, err
}
took := time.Since(start)
lapi.metrics.APIRequest(metricName, took, nil)

return json.Unmarshal(b, output)
if output == nil {
return resp.Header, nil
}
return resp.Header, json.Unmarshal(b, output)
}

// PushSegmentR pushes a segment with retries
Expand Down
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,7 @@ module github.com/livepeer/go-api-client

go 1.16

require github.com/golang/glog v1.0.0
require (
github.com/golang/glog v1.0.0
github.com/tomnomnom/linkheader v0.0.0-20180905144013-02ca5825eb80
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
github.com/golang/glog v1.0.0 h1:nfP3RFugxnNRyKgeWd4oI1nYvXpxrx8ck8ZrcizshdQ=
github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4=
github.com/tomnomnom/linkheader v0.0.0-20180905144013-02ca5825eb80 h1:nrZ3ySNYwJbSpD6ce9duiP+QkD3JuLCcWkdaehUS/3Y=
github.com/tomnomnom/linkheader v0.0.0-20180905144013-02ca5825eb80/go.mod h1:iFyPdL66DjUD96XmzVL3ZntbzcflLnznH0fr99w5VqE=

0 comments on commit 95e2bb1

Please sign in to comment.