VOOZH about

URL: https://dzone.com/articles/automating-twilio-recording-exports

⇱ Automating Twilio Recording Exports for Quality Purposes


Related

  1. DZone
  2. Software Design and Architecture
  3. Integration
  4. Automating Twilio Recording Exports for Quality Purposes: Python Implementation Guidelines

Automating Twilio Recording Exports for Quality Purposes: Python Implementation Guidelines

Discover how to use Python to download recordings from Twilio and transcribe them for sentimental analysis, quality, and audit purposes.

By Jan. 07, 25 · Tutorial
Likes
Comment
Save
4.2K Views

Join the DZone community and get the full member experience.

Join For Free

For crucial business operations, compliance, and quality assurance call recordings are pivotal. Twilio is a call management system that provides excellent call recording capabilities, but often organizations are in need of automatically downloading and storing these recordings locally or in their preferred cloud storage. However, downloading large numbers of recordings from Twilio can be challenging. In this article, we'll explore how to build an efficient Python solution for bulk-downloading Twilio recordings while handling pagination, parallel downloads, and queue filtering. 

Use Cases

When working with call management systems like Twilio, we might need to:

  • Download thousands of call recordings for quality assurance.
  • Export call recordings while excluding specific queues.
  • Process or download recordings within specific date ranges.
  • Handle processes efficiently without overwhelming resources.

Solution Overview

Using Python, we will create a class that handles the bulk download of recordings with the following key features: 

  • Parallel downloads using ThreadPoolExecutor
  • Pagination handling for large datasets
  • Queue filtering capabilities
  • Progress tracking with tqdm
  • Error handling and retry logic

Prerequisites

  • Python 3.8+
  • Twilio account with recordings

Required Python packages:

  • twilio
  • boto3
  • python-dotenv
  • requests
Python
import os
from datetime import datetime, timedelta
from twilio.rest import Client
import requests
from pathlib import Path
import random
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
import time


Implementation

Complete the Python class as shown here:

Python
import os
from datetime import datetime, timedelta
from twilio.rest import Client
import requests
from pathlib import Path
import random
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
import time

class TwilioRecordingExporter:
 def __init__(self, account_sid, auth_token, output_dir="random_recordings"):
 """
 Initialize the exporter with Twilio credentials
 """
 self.client = Client(account_sid, auth_token)
 self.account_sid = account_sid
 self.auth_token = auth_token
 self.output_dir = Path(output_dir)
 self.output_dir.mkdir(exist_ok=True)
 
 # Excluded queues with their queue SIDs (no need for names anymore)
 self.excluded_queue_sids = {
 'WQ65xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx', # Example SID 1 
 'WQ3xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx', # Example SID 2 
 'WQexxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx', # Example SID 3 
 'WQ0xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' # Example SID 4 
 }
 
 self.max_workers = 10 # Number of parallel downloads

 def download_recording(self, recording):
 """
 Download a single recording
 """
 try:
 date_str = recording.date_created.strftime('%Y%m%d_')
 filename = f"{date_str}{recording.sid}.wav"
 filepath = self.output_dir / filename

 if filepath.exists():
 return filepath

 wav_url = f"{recording.media_url}.wav"
 response = requests.get(wav_url, auth=(self.account_sid, self.auth_token))
 
 if response.status_code == 200:
 filepath.write_bytes(response.content)
 return filepath
 else:
 print(f"\nFailed to download {recording.sid}: {response.status_code}")
 return None
 except Exception as e:
 print(f"\nError downloading recording {recording.sid}: {str(e)}")
 return None

 def download_batch(self, recordings):
 """
 Download a batch of recordings in parallel
 """
 successful_downloads = []
 with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
 future_to_recording = {
 executor.submit(self.download_recording, recording): recording 
 for recording in recordings
 }
 
 for future in as_completed(future_to_recording):
 filepath = future.result()
 if filepath:
 successful_downloads.append(filepath)
 
 return successful_downloads

 def export_random_recordings(self, num_recordings=10000, days_back=180, batch_size=100):
 """
 Export random recordings while excluding specific queues
 """
 downloaded_files = []
 
 try:
 # Calculate date range
 end_date = datetime.utcnow()
 start_date = end_date - timedelta(days=days_back)
 
 print(f"Fetching recordings from {start_date} to {end_date}")
 print("Excluded queues SIDs:", ", ".join(self.excluded_queue_sids))
 
 # Fetch recordings with pagination
 all_recordings = []
 page = self.client.recordings.list(
 date_created_after=start_date,
 date_created_before=end_date,
 page_size=100 # Maximum page size
 )
 
 with tqdm(desc="Fetching recordings", unit="page") as pbar:
 while page:
 all_recordings.extend(page)
 pbar.update(1)
 if len(all_recordings) >= num_recordings * 2: # Fetch extra to account for excluded queues
 break
 page = page.next_page() if hasattr(page, 'next_page') else None

 print(f"\nFound {len(all_recordings)} recordings")
 
 # Shuffle recordings
 random.shuffle(all_recordings)
 
 # Process in batches
 selected_recordings = []
 processed_count = 0
 
 with tqdm(total=num_recordings, desc="Downloading recordings") as pbar:
 for i in range(0, len(all_recordings), batch_size):
 if processed_count >= num_recordings:
 break
 
 batch = all_recordings[i:i + batch_size]
 
 # Filter out recordings associated with excluded queues
 filtered_batch = [
 recording for recording in batch 
 if not self.is_recording_in_excluded_queue(recording)
 ]
 
 downloaded_batch = self.download_batch(filtered_batch)
 downloaded_files.extend(downloaded_batch)
 
 new_count = min(len(downloaded_batch), num_recordings - processed_count)
 processed_count += new_count
 pbar.update(new_count)
 
 if processed_count >= num_recordings:
 break
 
 except Exception as e:
 print(f"\nError in export process: {str(e)}")
 
 return downloaded_files[:num_recordings]

 def is_recording_in_excluded_queue(self, recording):
 """
 Check if the recording is associated with an excluded queue based on task queue SID
 """
 task_queue_sid = recording.queue_sid if hasattr(recording, 'queue_sid') else None
 return task_queue_sid in self.excluded_queue_sids

def main():
 # Your Twilio credentials
 ACCOUNT_SID = "AC738a9a46c65dxxxxxxxxxxxxxxxxx"
 AUTH_TOKEN = "xxxxxxxxxx9ae2e4572xxxxxxxxxxxx"
 
 try:
 start_time = time.time()
 
 # Create exporter instance
 exporter = TwilioRecordingExporter(ACCOUNT_SID, AUTH_TOKEN)
 
 # Download random recordings
 print("Starting random recording export...")
 downloaded_files = exporter.export_random_recordings(
 num_recordings=10000,
 days_back=180,
 batch_size=100
 )
 
 duration = time.time() - start_time
 print(f"\nExport complete:")
 print(f"- Downloaded: {len(downloaded_files)} files")
 print(f"- Location: {exporter.output_dir}")
 print(f"- Time taken: {duration:.2f} seconds")
 
 except Exception as e:
 print(f"Error: {str(e)}")

if __name__ == "__main__":
 main()
 main()

 print('success')


Let's breakdown the above code into manageable components: 

1. Basic Setup

First, we create a Python class to handle Twilio client initialization and configuration: 

Python
class TwilioRecordingExporter:
 def __init__(self, account_sid, auth_token, output_dir="random_recordings"):
 self.client = Client(account_sid, auth_token)
 self.output_dir = Path(output_dir)
 self.output_dir.mkdir(exist_ok=True)
        self.max_workers = 10


2. Single Record Download Implementation

The method below will handle individual recording downloads: 

Python
def download_recording(self, recording):
 try:
 date_str = recording.date_created.strftime('%Y%m%d_')
 filename = f"{date_str}{recording.sid}.wav"
 filepath = self.output_dir / filename

 if filepath.exists():
 return filepath

 wav_url = f"{recording.media_url}.wav"
 response = requests.get(wav_url, 
 auth=(self.account_sid, self.auth_token))
 
 if response.status_code == 200:
 filepath.write_bytes(response.content)
 return filepath
 except Exception as e:
 print(f"\nError downloading recording {recording.sid}: {str(e)}")
        return None


3. Parallel Downloads

Implementing the code below will improve performance when downloading a large number of recordings.

Python
def download_batch(self, recordings):
 successful_downloads = []
 with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
 future_to_recording = {
 executor.submit(self.download_recording, recording): recording 
 for recording in recordings
 }
 
 for future in as_completed(future_to_recording):
 filepath = future.result()
 if filepath:
 successful_downloads.append(filepath)
 
    return successful_downloads


4. Queue Filtering

For queue filtering, we can filter out or eliminate a few queues that are not required for QA.

Python
def is_recording_in_excluded_queue(self, recording):
 task_queue_sid = recording.queue_sid if hasattr(recording, 'queue_sid') else None
    return task_queue_sid in self.excluded_queue_sids


Best Practices and Optimizations

Batch Processing

To manage resources efficiently and process recordings in batches, use the following:

Python
for i in range(0, len(all_recordings), batch_size):
    batch = all_recordings[i:i + batch_size]


Tracking Progress

Implement tqdm for tracking progress visually:

Python
with tqdm(total=num_recordings, desc="Downloading recordings") as pbar:
 # Download process
    pbar.update(new_count)


Error Handling

Error handling can be implemented at multiple levels, such as:

  1. Download failures
  2. Batch processing errors
  3. API communication issues

Resource Management

Parallel downloads can be controlled by max_workers as shown below:

Python
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
    # Parallel download logic


Example Usage

Use the below exporter to download 10000 records within 180 days chunking batch sizes of 100:

Python
exporter = TwilioRecordingExporter(ACCOUNT_SID, AUTH_TOKEN)
downloaded_files = exporter.export_random_recordings(
 num_recordings=10000,
 days_back=180,
 batch_size=100
)


Considerations for Security

  • File safety: Use pathlib for safe file operations: 
Python
filepath = Path(output_dir) / filename


  • Credential management:
Python
ACCOUNT_SID = os.environ.get('TWILIO_ACCOUNT_SID')
AUTH_TOKEN = os.environ.get('TWILIO_AUTH_TOKEN')


  • Further improvement considerations: 
    • Call metadata can be included to know the caller name, agent name, duration, and other factors.
    • Automate export to AWS S3 buckets

Conclusion

Using the solution above, one can efficiently download bulk recordings from Twilio while maintaining best practices like error handling, performance, and resource management. This implementation can be easily extended for further use cases and can be scaled according to needs.

Batch processing Implementation Management system Exporter (computing) Python (language)

Opinions expressed by DZone contributors are their own.

Related

  • OPC-UA and MQTT: A Guide to Protocols, Python Implementations
  • Application-Level Tracing: The Good, the Bad, and the Alternative
  • Python Stack Data Structure: A Versatile Tool for Real-time Applications
  • Parallel Kafka Batch Processing With Kotlin Coroutines in Spring Boot

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

Let's be friends: