The Code I am using is
import boto3
from collections import defaultdict
from urllib.parse import unquote_plus
#Timestamp for appending to File Name
from datetime import datetime
Get the current date and time
current_datetime = datetime.now()
Extract date, hours, and minutes
current_date = current_datetime.strftime("%Y-%m-%d")
current_hours_minutes = current_datetime.strftime("%H:%M")
Concatenate date and time with a space in between
CurrentTime = str(current_date + "-" + current_hours_minutes)
def get_rows_columns_map(table_result, blocks_map):
rows = {}
scores = []
for relationship in table_result['Relationships']:
if relationship['Type'] == 'CHILD':
for child_id in relationship['Ids']:
cell = blocks_map[child_id]
if cell['BlockType'] == 'CELL':
row_index = cell['RowIndex']
col_index = cell['ColumnIndex']
if row_index not in rows:
# create new row
rows[row_index] = {}
# get confidence score
#scores.append(str(cell['Confidence']))
# get the text value
rows[row_index][col_index] = get_text(cell, blocks_map)
return rows, scores
def get_table_csv_results(bucket, file_name):
# with open(file_name, 'rb') as file:
# img_test = file.read()
# bytes_test = bytearray(img_test)
# print('Image loaded', file_name)
# process using image bytes
# get the results
# session = boto3.Session(profile_name='profile-name')
client = boto3.client('textract')
# response = client.analyze_document(Document={'Bytes': bytes_test}, FeatureTypes=['TABLES'])
response = client.analyze_document(Document={'S3Object': {'Bucket': bucket, "Name": file_name}}, FeatureTypes=['TABLES'])
# Get the text blocks
blocks=response['Blocks']
print(blocks)
blocks_map = {}
table_blocks = []
for block in blocks:
blocks_map[block['Id']] = block
if block['BlockType'] == "TABLE":
table_blocks.append(block)
if len(table_blocks) <= 0:
return "<b> NO Table FOUND </b>"
csv = ''
for index, table in enumerate(table_blocks):
csv += generate_table_csv(table, blocks_map, index +1)
csv += '\n\n'
return csv
def generate_table_csv(table_result, blocks_map, table_index):
rows, scores = get_rows_columns_map(table_result, blocks_map)
table_id = 'Table_' + str(table_index)
# get cells.
csv = 'Table: {0}\n\n'.format(table_id)
for row_index, cols in rows.items():
for col_index, text in cols.items():
col_indices = len(cols.items())
csv += '{}'.format(text) + ","
csv += '\n'
csv += '\n\n Confidence Scores % (Table Cell) \n'
cols_count = 0
for score in scores:
cols_count += 1
csv += score + ","
if cols_count == col_indices:
csv += '\n'
cols_count = 0
csv += '\n\n\n'
return csv
def find_value_block(key_block, value_map):
for relationship in key_block['Relationships']:
if relationship['Type'] == 'VALUE':
for value_id in relationship['Ids']:
value_block = value_map[value_id]
return value_block
def get_text(result, blocks_map):
text = ''
if 'Relationships' in result:
for relationship in result['Relationships']:
if relationship['Type'] == 'CHILD':
for child_id in relationship['Ids']:
word = blocks_map[child_id]
if word['BlockType'] == 'WORD':
text += word['Text'] + ' '
if word['BlockType'] == 'SELECTION_ELEMENT':
if word['SelectionStatus'] == 'SELECTED':
text += 'X '
return text
def lambda_handler(event, context):
print(event)
print(context)
file_obj = event["Records"][0]
bucket = unquote_plus(str(file_obj["s3"]["bucket"]["name"]))
file_name = unquote_plus(str(file_obj["s3"]["object"]["key"]))
print(f'Bucket: {bucket}, file: {file_name}')
table_csv = get_table_csv_results(bucket, file_name)
output_file =CurrentTime+'_outputfile.csv'
s3 = boto3.resource("s3")
s3.Bucket("outputbucketforcsv").put_object(Key=output_file, Body=table_csv)
==============================================================================
what needs to be changes to get all other tables and forms data to be extracted.