Hi everyone, I was trying to ingest csv data to Timestream Db with AWS SDK boto3.M y credential , region , Database_Name, Table_Name are all correct but still I am unable to connect to endpoint of my Timestream DB . Please respond if you have any ideas on how to resolve this issue. This is my Python script, which I am running in Visual Studio Code:
import csv
import time
import boto3
from botocore.exceptions import ClientError
Configure AWS credentials and region
aws_access_key_id = ''
aws_secret_access_key = ''
region_name = ''
Specify the path to the CSV file to be ingested
csv_filepath = 'C:/Users/.csv'
Configure Timestream database and table
database_name = ''
table_name = ''
class CsvIngestionExample:
def init(self, client):
self.client = client
def bulk_write_records(self, filepath):
with open(filepath, 'r') as csvfile:
# Creating a csv reader object
csvreader = csv.reader(csvfile)
records = []
current_time = self.__current_milli_time()
counter = 0
# Skip the header row
next(csvreader)
# Extracting each data row one by one
for row in csvreader:
dimensions = [
{'Name': 'region', 'Value': ''}
]
record_time = current_time - (counter * 50)
for i in range(1, len(row)):
measure_name = f"MeasureName{i+1}"
measure_value = str(row[i])
# Determine MeasureValue type based on column data type in CSV
measure_value_type = CsvIngestionExample.get_measure_value_type(measure_value)
record = {
'Dimensions': dimensions,
'MeasureName': measure_name,
'MeasureValue': measure_value,
'MeasureValueType': measure_value_type,
'Time': str(record_time)
}
records.append(record)
counter = counter + 1
if len(records) == 100:
self.__submit_batch(records, counter)
records = []
if len(records) != 0:
self.__submit_batch(records, counter)
print("Ingested %d records" % counter)
def __submit_batch(self, records, counter):
try:
self.client.write_records(DatabaseName=database_name, TableName=table_name,
Records=records, CommonAttributes={})
print("Processed [%d] records." % counter)
except ClientError as e:
print("Error:", e.response['Error']['Message'])
def __current_milli_time(self):
return int(round(time.time() * 1000))
@staticmethod
def get_measure_value_type(value):
try:
int(value)
return 'INT'
except ValueError:
try:
float(value)
return 'DOUBLE'
except ValueError:
return 'VARCHAR'
Create Timestream Write Client
session = boto3.Session(
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=region_name
)
timestream_write_client = session.client('timestream-write')
Create an instance of CsvIngestionExample with the Timestream Write Client
csv_ingestion = CsvIngestionExample(timestream_write_client)
Call the bulk_write_records method to ingest the CSV data
csv_ingestion.bulk_write_records(csv_filepath)
Error:-
Exception has occurred: EndpointDiscoveryRefreshFailed
Endpoint Discovery failed to refresh the required endpoints.
self.client.write_records(DatabaseName=database_name, TableName=table_name,
File "C:\Users", line 64, in bulk_write_records
self.__submit_batch(records, counter)
File "C:\Users", line 110, in <module>
csv_ingestion.bulk_write_records(csv_filepath)
botocore.discovery.EndpointDiscoveryRefreshFailed: Endpoint Discovery failed to refresh the required endpoints.