222 lines
9.0 KiB
Python
222 lines
9.0 KiB
Python
# -*- coding: UTF-8 -*-
|
|
import api
|
|
import config
|
|
import logging
|
|
import utils
|
|
|
|
LOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s"
|
|
logging.basicConfig(format=LOG_FORMAT, level=logging.INFO)
|
|
|
|
def get_all_records(client: api.Client, access_token: str, app_token: str, table_id: str):
|
|
"""Retrieve all records from a given table, handling pagination if necessary."""
|
|
records = []
|
|
page_token = None
|
|
while True:
|
|
resp = client.get_records_list(access_token, app_token, table_id, page_token=page_token)
|
|
items = resp.get('items', [])
|
|
if items:
|
|
records.extend(items)
|
|
if resp.get('has_more'):
|
|
page_token = resp.get('page_token')
|
|
else:
|
|
break
|
|
return records
|
|
|
|
|
|
def get_all_fields(client: api.Client, access_token: str, app_token: str, table_id: str):
|
|
"""Retrieve all fields from a given table, handling pagination if necessary."""
|
|
fields = []
|
|
page_token = None
|
|
while True:
|
|
resp = client.get_fields_list(access_token, app_token, table_id, page_token=page_token)
|
|
items = resp.get('items', [])
|
|
if items:
|
|
fields.extend(items)
|
|
if resp.get('has_more'):
|
|
page_token = resp.get('page_token')
|
|
else:
|
|
break
|
|
return fields
|
|
|
|
|
|
def merge_schema(fields1: list, fields2: list):
|
|
"""
|
|
Merge two lists of field definitions into one unified schema list.
|
|
If fields have the same name, we assume they are the same field.
|
|
We'll keep the definition from fields1 if there's a collision.
|
|
"""
|
|
merged_fields = {}
|
|
|
|
# Process fields from first table
|
|
for f in fields1:
|
|
# We use a deep copy-like approach or just assign, but we will mutate properties later
|
|
import copy
|
|
merged_fields[f['field_name']] = {
|
|
'field_name': f['field_name'],
|
|
'type': f['type'],
|
|
'property': copy.deepcopy(f.get('property'))
|
|
}
|
|
|
|
# Process fields from second table (add if not exists)
|
|
for f in fields2:
|
|
import copy
|
|
if f['field_name'] not in merged_fields:
|
|
merged_fields[f['field_name']] = {
|
|
'field_name': f['field_name'],
|
|
'type': f['type'],
|
|
'property': copy.deepcopy(f.get('property'))
|
|
}
|
|
else:
|
|
# Merge options if both are SingleSelect (3) or MultiSelect (4)
|
|
existing = merged_fields[f['field_name']]
|
|
if existing['type'] in (3, 4) and existing.get('property', {}).get('options') and f.get('property', {}).get('options'):
|
|
existing_names = {opt['name'] for opt in existing['property']['options']}
|
|
for opt in f['property']['options']:
|
|
if opt['name'] not in existing_names:
|
|
existing['property']['options'].append(copy.deepcopy(opt))
|
|
existing_names.add(opt['name'])
|
|
|
|
# Clean up option IDs to prevent insertion errors in the target table
|
|
for f_name, f_def in merged_fields.items():
|
|
if f_def.get('property') and 'options' in f_def['property']:
|
|
for opt in f_def['property']['options']:
|
|
if 'id' in opt:
|
|
del opt['id']
|
|
|
|
# Add a custom "Source" field
|
|
merged_fields['Source'] = {
|
|
'field_name': 'Source',
|
|
'type': 1, # Text type
|
|
'property': None
|
|
}
|
|
|
|
return list(merged_fields.values())
|
|
|
|
|
|
def merge_bitables(
|
|
client: api.Client,
|
|
access_token: str,
|
|
source_app_token_1: str,
|
|
source_table_id_1: str,
|
|
source_app_token_2: str,
|
|
source_table_id_2: str,
|
|
target_app_token: str,
|
|
target_table_name: str = "Merged Table"
|
|
):
|
|
"""
|
|
Core function to merge two bitables into a single one.
|
|
"""
|
|
logging.info(f"Merging {source_app_token_1}/{source_table_id_1} and {source_app_token_2}/{source_table_id_2} into {target_app_token}")
|
|
|
|
# 1. Read schema from both tables
|
|
logging.info("Reading schema from source tables...")
|
|
fields1 = get_all_fields(client, access_token, source_app_token_1, source_table_id_1)
|
|
fields2 = get_all_fields(client, access_token, source_app_token_2, source_table_id_2)
|
|
|
|
# 2. Merge schema
|
|
unified_schema = merge_schema(fields1, fields2)
|
|
|
|
# 3. Create Target Table
|
|
logging.info(f"Creating target table '{target_table_name}'...")
|
|
target_table_id = client.create_table(access_token, target_app_token, target_table_name)
|
|
|
|
# The new table will have some default columns.
|
|
# Let's get them, so we can update them or ignore them.
|
|
target_initial_fields = client.get_fields_list(access_token, target_app_token, target_table_id).get('items', [])
|
|
initial_field_names = {f['field_name']: f['field_id'] for f in target_initial_fields}
|
|
|
|
# 4. Create Fields in Target Table
|
|
logging.info("Creating fields in target table...")
|
|
target_field_map = {} # Maps field name to its new field_id in the target table
|
|
|
|
for field_def in unified_schema:
|
|
name = field_def['field_name']
|
|
ftype = field_def['type']
|
|
fprop = field_def['property']
|
|
|
|
# If the default table creation already made this field (e.g. initial '多行文本' / Text), we can update it or skip
|
|
if name in initial_field_names:
|
|
field_id = initial_field_names[name]
|
|
# Optionally update it the type is different (often default is just 'Text')
|
|
client.update_field(access_token, target_app_token, target_table_id, field_id, name, ftype, fprop)
|
|
target_field_map[name] = field_id
|
|
else:
|
|
try:
|
|
resp = client.add_field(access_token, target_app_token, target_table_id, name, ftype, fprop)
|
|
target_field_map[name] = resp['field_id']
|
|
except utils.LarkException as e:
|
|
logging.error(f"Failed to create field '{name}': {e}")
|
|
|
|
# 5. Read Records from Source Tables
|
|
logging.info("Reading records from source tables...")
|
|
records1 = get_all_records(client, access_token, source_app_token_1, source_table_id_1)
|
|
records2 = get_all_records(client, access_token, source_app_token_2, source_table_id_2)
|
|
|
|
# 6. Map and batch insert records
|
|
logging.info("Mapping and inserting records...")
|
|
|
|
def process_records(source_records, source_name):
|
|
batch = []
|
|
for r in source_records:
|
|
original_fields = r.get('fields', {})
|
|
new_fields = {}
|
|
for fname, fvalue in original_fields.items():
|
|
if fname in target_field_map:
|
|
# In bitable API, it's safer to use the new field_names (or their ids depending on the specific endpoint).
|
|
# The get_records endpoint returns field names as keys. The batch_create_records also accepts field names as keys if no 'user_id_type' strictly enforces IDs.
|
|
# We'll use the mapped names to be safe.
|
|
new_fields[fname] = fvalue
|
|
|
|
# Add Source
|
|
new_fields['Source'] = source_name
|
|
batch.append({'fields': new_fields})
|
|
return batch
|
|
|
|
target_records = []
|
|
target_records.extend(process_records(records1, f"{source_app_token_1}/{source_table_id_1}"))
|
|
target_records.extend(process_records(records2, f"{source_app_token_2}/{source_table_id_2}"))
|
|
|
|
# Feishu batch insert has a limit of 500 per request usually, so we chunk it just in case
|
|
CHUNK_SIZE = 500
|
|
for i in range(0, len(target_records), CHUNK_SIZE):
|
|
chunk = target_records[i:i + CHUNK_SIZE]
|
|
if chunk:
|
|
try:
|
|
client.batch_create_records(access_token, target_app_token, target_table_id, chunk)
|
|
except utils.LarkException as e:
|
|
logging.error(f"Failed to insert chunk of records: {e}")
|
|
|
|
logging.info(f"Successfully merged into {target_table_id}!")
|
|
return target_table_id
|
|
|
|
if __name__ == "__main__":
|
|
# Example execution using vars from config.py
|
|
|
|
client = api.Client(config.LARK_HOST)
|
|
|
|
# Get tenant access token
|
|
try:
|
|
access_token = client.get_tenant_access_token(config.APP_ID, config.APP_SECRET)
|
|
except Exception as e:
|
|
logging.error(f"Could not get access token: {e}")
|
|
exit(1)
|
|
|
|
logging.info(f"Using App ID: {config.APP_ID}")
|
|
|
|
# These need to be valid app_tokens and table_ids for this script to actually work
|
|
is_placeholder = (config.MERGE_SOURCE_APP_TOKEN_1 == "source_app_token_1")
|
|
|
|
if is_placeholder:
|
|
logging.warning("Please update config.py or environment variables with valid MERGE_* tokens to execute.")
|
|
else:
|
|
merge_bitables(
|
|
client=client,
|
|
access_token=access_token,
|
|
source_app_token_1=config.MERGE_SOURCE_APP_TOKEN_1,
|
|
source_table_id_1=config.MERGE_SOURCE_TABLE_ID_1,
|
|
source_app_token_2=config.MERGE_SOURCE_APP_TOKEN_2,
|
|
source_table_id_2=config.MERGE_SOURCE_TABLE_ID_2,
|
|
target_app_token=config.MERGE_TARGET_APP_TOKEN,
|
|
target_table_name="Merged Data"
|
|
)
|