How To Poll for Changes in Acquia DAM

  • Last updated
  • 1 minute read

Goal

Detect changes to assets in Acquia DAM.

Overview

This article will describe the overarching concepts necessary for polling Acquia DAM using the List Assets by Search Query endpoint. This program will detect asset creations, asset file content updates, and asset information updates.

If you are looking for real-time notifications please see Ronald Shaw's article about the Webhooks feature. Whether you decide to use Webhooks or polling depends on your use case.

The samples that are provided will be written in Python using the Requests library, however these examples can be easily translated into your preferred language and library.

The code examples only aim to demonstrate the overarching concepts necessary for polling the DAM. A production-grade implementation will look vastly different based on the specific needs of the implementation.

Considerations: When dealing with assets that are affected by automated processes, we recommend using upload profiles that will auto-version conflicting assets. Assets that end up in the conflict queue may be missed by polling.

  1. Setting up your options

    There are a few options that we need to set up prior to reviewing the actual code. While the example shows hard-coded values, these may be filled in via a configuration file

    1. Obtain an Authentication Token using the Acquia DAM API Documentation
    2. Determine how often you’d like to poll (in seconds).
    3. Determine which events to detect
      1. new_asset - Initial creation of an asset
      2. new_asset_version - File content updates
      3. asset_update - Any action that appears in “Asset History”
    4. Set an initial start time to pick events from. Must be a valid ISO 8601 UTC timestamp
    5. Set a search query to filter the poll to include only assets that meet a certain criteria
    AUTH_TOKEN = 'YOUR TOKEN HERE'
    POLL_INTERVAL = 300 # 5 minutes
    EVENT_NAME = 'new_asset'
    START_FROM = '2021-03-25T00:00:00Z'
    FILTER_QUERY = 'ADVANCED SEARCH QUERY STRING'
  2. Setting up the initial application state

    The EVENT_TIMESTAMP_MAP maps from the event you picked to the specific timestamp field in the Acquia DAM API asset response. 

    The state variable will provide information about the previous poll the next time the polling function is called.

    The options variable provides the setup parameters for the polling function. They should be treated as constant values.

    EVENT_TIMESTAMP_MAP = {
        'new_asset': 'created_date',
        'new_asset_version': 'file_upload_date',
        'asset_update': 'last_update_date'
    }
    
    # Initial Setup
    state = {
        'offset': 0,
        'can_poll_more': False,
        'since': datetime.strptime(START_FROM, '%Y-%m-%dT%H:%M:%SZ'),
        'latest_job': datetime.strptime(START_FROM, '%Y-%m-%dT%H:%M:%SZ')
    }
    
    OPTIONS = {
        'timestamp_field': EVENT_TIMESTAMP_MAP[EVENT_NAME],
        'filter': FILTER_QUERY,
        'auth': {
            'Authorization': f"Bearer {AUTH_TOKEN}"
        }
    }
    
  3. Polling Loop

    For the purpose of this example, we will set up a simple infinite loop to poll assets at a regular interval. When we find jobs, the application will simply print them to the screen. In a practical application, you’d want to enqueue your jobs into some sort of handler.

    The polling function can poll for 100 assets at a time, but there may be times that more than 100 events happen within your polling interval. This loop will check the value of  ‘can_poll_more’ within the state variable. If there are more assets to poll, the loop will poll again without waiting for the polling interval. 

    while True:
        state, jobs = poll_assets(state, OPTIONS)
        
        # Printing for example purposes only.
        # This is where you'd enqueue your jobs in your job handler
        for job in jobs:
            print(f"{job[OPTIONS['timestamp_field']]}: {job['filename']}")
    
        # If there are more assets, we want to poll again right away.
        # If not, then we can wait for one poll interval
        if not state['can_poll_more']:
            time.sleep(POLL_INTERVAL)
    
  4. Polling Function

    The polling function makes use of the V2 List Assets by Search Query endpoint. Essentially, we want to search for assets that meet our search criteria and sort them in descending order based on the timestamp that corresponds to the events we want to pick up. After searching, we must filter out the jobs that are older than the most recent job from the last polling interval. 

    The rest of the function handles setting the state for the next polling interval, including taking into account if the polling function should run again immediately to pick up more jobs and the parameters necessary to do that. Finally, the function returns two values: the state of the polling function that should be fed back for the next poll, and an array of asset objects.

    def poll_assets(state, options):
        timestamp_field = options['timestamp_field']
        
        params = {
            'query': options['filter'],
            'limit': 100,
            'offset': state['offset'],
            'sort': f"-{timestamp_field}"
        }
    
        response = requests.get(
            'https://api.widencollective.com/v2/assets/search',
            headers=options['auth'],
            params=params
        )
        response.raise_for_status()
    
        # Collect all assets into a list up until the most recent one we ran
        unfiltered_jobs = response.json()['items']
        jobs = [job for job in unfiltered_jobs 
        		if datetime.strptime(job[timestamp_field], '%Y-%m-%dT%H:%M:%SZ') > state['since']]
    
        # Since we are ordering our search results in descending order
        # the first item on the first page will have the time value of the most recent job
        if state['offset'] == 0 and len(jobs) > 0:
            next_latest_job = datetime.strptime(jobs[0][timestamp_field], '%Y-%m-%dT%H:%M:%SZ')
        else:
            next_latest_job = state['latest_job']
    
        # If there is more to poll, then add 100 to the offset but keep the 'since' value
        # to get the next group of results
        # Otherwise, set the offset back to 0 and advance the 'since' date 
        # to the date of the most recent job
        more_to_poll = len(jobs) == 100
        if more_to_poll:
            next_offset = state['offset'] + 100
            next_since = state['since']
        else:
            next_offset = 0
            next_since = next_latest_job
    
        next_state = {
            'offset': next_offset,
            'can_poll_more': more_to_poll,
            'since': next_since,
            'latest_job': next_latest_job
        }
    
        return next_state, jobs

Implementation Notes

Deduplication: New changes may come in as you’re polling, which will cause the polling function to pick up the same assets more than once. Before enqueueing assets into your handler, you’ll want to compare the asset to a list of assets that have already run to avoid unnecessarily reprocessing the same assets. 

PIM: To detect changes in PIM, you can follow a conceptually similar approach using the V2 List Products endpoint. Note the differences in the timestamps and the requirements for the ‘filter’ and the ‘sort’ parameters. 

Full Worked Example

from datetime import datetime
import requests
import time

# Polling Function
def poll_assets(state, options):
    timestamp_field = options['timestamp_field']
    
    params = {
        'query': options['filter'],
        'limit': 100,
        'offset': state['offset'],
        'sort': f"-{timestamp_field}"
    }

    response = requests.get(
        'https://api.widencollective.com/v2/assets/search',
        headers=options['auth'],
        params=params
    )
    response.raise_for_status()

    # Collect all assets into a list up until the most recent one we ran
    unfiltered_jobs = response.json()['items']
    jobs = [job for job in unfiltered_jobs 
    		if datetime.strptime(job[timestamp_field], '%Y-%m-%dT%H:%M:%SZ') > state['since']]

    # Since we are ordering our search results in descending order
    # the first item on the first page will have the time value of the most recent job
    if state['offset'] == 0 and len(jobs) > 0:
        next_latest_job = datetime.strptime(jobs[0][timestamp_field], '%Y-%m-%dT%H:%M:%SZ')
    else:
        next_latest_job = state['latest_job']

    # If there is more to poll, then add 100 to the offset but keep the 'since' value
    # to get the next group of results
    # Otherwise, set the offset back to 0 and advance the 'since' date 
    # to the date of the most recent job
    more_to_poll = len(jobs) == 100
    if more_to_poll:
        next_offset = state['offset'] + 100
        next_since = state['since']
    else:
        next_offset = 0
        next_since = next_latest_job

    next_state = {
        'offset': next_offset,
        'can_poll_more': more_to_poll,
        'since': next_since,
        'latest_job': next_latest_job
    }

    return next_state, jobs

# Constants, Environment Variables, Configuration Parameters
AUTH_TOKEN = 'YOUR TOKEN HERE'
POLL_INTERVAL = 300 # 5 minutes
EVENT_NAME = 'new_asset'
START_FROM = '2021-03-25T00:00:00Z'
FILTER_QUERY = 'image_group:{User Group Images}'

EVENT_TIMESTAMP_MAP = {
    'new_asset': 'created_date',
    'new_asset_version': 'file_upload_date',
    'asset_update': 'last_update_date'
}

# Initial Setup
state = {
    'offset': 0,
    'can_poll_more': False,
    'since': datetime.strptime(START_FROM, '%Y-%m-%dT%H:%M:%SZ'),
    'latest_job': datetime.strptime(START_FROM, '%Y-%m-%dT%H:%M:%SZ')
}

OPTIONS = {
    'timestamp_field': EVENT_TIMESTAMP_MAP[EVENT_NAME],
    'filter': FILTER_QUERY,
    'auth': {
        'Authorization': f"Bearer {AUTH_TOKEN}"
    }
}

while True:
    state, jobs = poll_assets(state, OPTIONS)
    
    # Printing for example purposes only.
    # This is where you'd enqueue your jobs in your job handler
    for job in jobs:
        print(f"{job[OPTIONS['timestamp_field']]}: {job['filename']}")

    # If there are more assets, we want to poll again right away.
    # If not, then we can wait for one poll interval
    if not state['can_poll_more']:
        time.sleep(POLL_INTERVAL)