first commit
This commit is contained in:
221
merge_bitables.py
Normal file
221
merge_bitables.py
Normal file
@ -0,0 +1,221 @@
|
||||
# -*- coding: UTF-8 -*-
|
||||
import api
|
||||
import config
|
||||
import logging
|
||||
import utils
|
||||
|
||||
LOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s"
|
||||
logging.basicConfig(format=LOG_FORMAT, level=logging.INFO)
|
||||
|
||||
def get_all_records(client: api.Client, access_token: str, app_token: str, table_id: str):
|
||||
"""Retrieve all records from a given table, handling pagination if necessary."""
|
||||
records = []
|
||||
page_token = None
|
||||
while True:
|
||||
resp = client.get_records_list(access_token, app_token, table_id, page_token=page_token)
|
||||
items = resp.get('items', [])
|
||||
if items:
|
||||
records.extend(items)
|
||||
if resp.get('has_more'):
|
||||
page_token = resp.get('page_token')
|
||||
else:
|
||||
break
|
||||
return records
|
||||
|
||||
|
||||
def get_all_fields(client: api.Client, access_token: str, app_token: str, table_id: str):
|
||||
"""Retrieve all fields from a given table, handling pagination if necessary."""
|
||||
fields = []
|
||||
page_token = None
|
||||
while True:
|
||||
resp = client.get_fields_list(access_token, app_token, table_id, page_token=page_token)
|
||||
items = resp.get('items', [])
|
||||
if items:
|
||||
fields.extend(items)
|
||||
if resp.get('has_more'):
|
||||
page_token = resp.get('page_token')
|
||||
else:
|
||||
break
|
||||
return fields
|
||||
|
||||
|
||||
def merge_schema(fields1: list, fields2: list):
|
||||
"""
|
||||
Merge two lists of field definitions into one unified schema list.
|
||||
If fields have the same name, we assume they are the same field.
|
||||
We'll keep the definition from fields1 if there's a collision.
|
||||
"""
|
||||
merged_fields = {}
|
||||
|
||||
# Process fields from first table
|
||||
for f in fields1:
|
||||
# We use a deep copy-like approach or just assign, but we will mutate properties later
|
||||
import copy
|
||||
merged_fields[f['field_name']] = {
|
||||
'field_name': f['field_name'],
|
||||
'type': f['type'],
|
||||
'property': copy.deepcopy(f.get('property'))
|
||||
}
|
||||
|
||||
# Process fields from second table (add if not exists)
|
||||
for f in fields2:
|
||||
import copy
|
||||
if f['field_name'] not in merged_fields:
|
||||
merged_fields[f['field_name']] = {
|
||||
'field_name': f['field_name'],
|
||||
'type': f['type'],
|
||||
'property': copy.deepcopy(f.get('property'))
|
||||
}
|
||||
else:
|
||||
# Merge options if both are SingleSelect (3) or MultiSelect (4)
|
||||
existing = merged_fields[f['field_name']]
|
||||
if existing['type'] in (3, 4) and existing.get('property', {}).get('options') and f.get('property', {}).get('options'):
|
||||
existing_names = {opt['name'] for opt in existing['property']['options']}
|
||||
for opt in f['property']['options']:
|
||||
if opt['name'] not in existing_names:
|
||||
existing['property']['options'].append(copy.deepcopy(opt))
|
||||
existing_names.add(opt['name'])
|
||||
|
||||
# Clean up option IDs to prevent insertion errors in the target table
|
||||
for f_name, f_def in merged_fields.items():
|
||||
if f_def.get('property') and 'options' in f_def['property']:
|
||||
for opt in f_def['property']['options']:
|
||||
if 'id' in opt:
|
||||
del opt['id']
|
||||
|
||||
# Add a custom "Source" field
|
||||
merged_fields['Source'] = {
|
||||
'field_name': 'Source',
|
||||
'type': 1, # Text type
|
||||
'property': None
|
||||
}
|
||||
|
||||
return list(merged_fields.values())
|
||||
|
||||
|
||||
def merge_bitables(
|
||||
client: api.Client,
|
||||
access_token: str,
|
||||
source_app_token_1: str,
|
||||
source_table_id_1: str,
|
||||
source_app_token_2: str,
|
||||
source_table_id_2: str,
|
||||
target_app_token: str,
|
||||
target_table_name: str = "Merged Table"
|
||||
):
|
||||
"""
|
||||
Core function to merge two bitables into a single one.
|
||||
"""
|
||||
logging.info(f"Merging {source_app_token_1}/{source_table_id_1} and {source_app_token_2}/{source_table_id_2} into {target_app_token}")
|
||||
|
||||
# 1. Read schema from both tables
|
||||
logging.info("Reading schema from source tables...")
|
||||
fields1 = get_all_fields(client, access_token, source_app_token_1, source_table_id_1)
|
||||
fields2 = get_all_fields(client, access_token, source_app_token_2, source_table_id_2)
|
||||
|
||||
# 2. Merge schema
|
||||
unified_schema = merge_schema(fields1, fields2)
|
||||
|
||||
# 3. Create Target Table
|
||||
logging.info(f"Creating target table '{target_table_name}'...")
|
||||
target_table_id = client.create_table(access_token, target_app_token, target_table_name)
|
||||
|
||||
# The new table will have some default columns.
|
||||
# Let's get them, so we can update them or ignore them.
|
||||
target_initial_fields = client.get_fields_list(access_token, target_app_token, target_table_id).get('items', [])
|
||||
initial_field_names = {f['field_name']: f['field_id'] for f in target_initial_fields}
|
||||
|
||||
# 4. Create Fields in Target Table
|
||||
logging.info("Creating fields in target table...")
|
||||
target_field_map = {} # Maps field name to its new field_id in the target table
|
||||
|
||||
for field_def in unified_schema:
|
||||
name = field_def['field_name']
|
||||
ftype = field_def['type']
|
||||
fprop = field_def['property']
|
||||
|
||||
# If the default table creation already made this field (e.g. initial '多行文本' / Text), we can update it or skip
|
||||
if name in initial_field_names:
|
||||
field_id = initial_field_names[name]
|
||||
# Optionally update it the type is different (often default is just 'Text')
|
||||
client.update_field(access_token, target_app_token, target_table_id, field_id, name, ftype, fprop)
|
||||
target_field_map[name] = field_id
|
||||
else:
|
||||
try:
|
||||
resp = client.add_field(access_token, target_app_token, target_table_id, name, ftype, fprop)
|
||||
target_field_map[name] = resp['field_id']
|
||||
except utils.LarkException as e:
|
||||
logging.error(f"Failed to create field '{name}': {e}")
|
||||
|
||||
# 5. Read Records from Source Tables
|
||||
logging.info("Reading records from source tables...")
|
||||
records1 = get_all_records(client, access_token, source_app_token_1, source_table_id_1)
|
||||
records2 = get_all_records(client, access_token, source_app_token_2, source_table_id_2)
|
||||
|
||||
# 6. Map and batch insert records
|
||||
logging.info("Mapping and inserting records...")
|
||||
|
||||
def process_records(source_records, source_name):
|
||||
batch = []
|
||||
for r in source_records:
|
||||
original_fields = r.get('fields', {})
|
||||
new_fields = {}
|
||||
for fname, fvalue in original_fields.items():
|
||||
if fname in target_field_map:
|
||||
# In bitable API, it's safer to use the new field_names (or their ids depending on the specific endpoint).
|
||||
# The get_records endpoint returns field names as keys. The batch_create_records also accepts field names as keys if no 'user_id_type' strictly enforces IDs.
|
||||
# We'll use the mapped names to be safe.
|
||||
new_fields[fname] = fvalue
|
||||
|
||||
# Add Source
|
||||
new_fields['Source'] = source_name
|
||||
batch.append({'fields': new_fields})
|
||||
return batch
|
||||
|
||||
target_records = []
|
||||
target_records.extend(process_records(records1, f"{source_app_token_1}/{source_table_id_1}"))
|
||||
target_records.extend(process_records(records2, f"{source_app_token_2}/{source_table_id_2}"))
|
||||
|
||||
# Feishu batch insert has a limit of 500 per request usually, so we chunk it just in case
|
||||
CHUNK_SIZE = 500
|
||||
for i in range(0, len(target_records), CHUNK_SIZE):
|
||||
chunk = target_records[i:i + CHUNK_SIZE]
|
||||
if chunk:
|
||||
try:
|
||||
client.batch_create_records(access_token, target_app_token, target_table_id, chunk)
|
||||
except utils.LarkException as e:
|
||||
logging.error(f"Failed to insert chunk of records: {e}")
|
||||
|
||||
logging.info(f"Successfully merged into {target_table_id}!")
|
||||
return target_table_id
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Example execution using vars from config.py
|
||||
|
||||
client = api.Client(config.LARK_HOST)
|
||||
|
||||
# Get tenant access token
|
||||
try:
|
||||
access_token = client.get_tenant_access_token(config.APP_ID, config.APP_SECRET)
|
||||
except Exception as e:
|
||||
logging.error(f"Could not get access token: {e}")
|
||||
exit(1)
|
||||
|
||||
logging.info(f"Using App ID: {config.APP_ID}")
|
||||
|
||||
# These need to be valid app_tokens and table_ids for this script to actually work
|
||||
is_placeholder = (config.MERGE_SOURCE_APP_TOKEN_1 == "source_app_token_1")
|
||||
|
||||
if is_placeholder:
|
||||
logging.warning("Please update config.py or environment variables with valid MERGE_* tokens to execute.")
|
||||
else:
|
||||
merge_bitables(
|
||||
client=client,
|
||||
access_token=access_token,
|
||||
source_app_token_1=config.MERGE_SOURCE_APP_TOKEN_1,
|
||||
source_table_id_1=config.MERGE_SOURCE_TABLE_ID_1,
|
||||
source_app_token_2=config.MERGE_SOURCE_APP_TOKEN_2,
|
||||
source_table_id_2=config.MERGE_SOURCE_TABLE_ID_2,
|
||||
target_app_token=config.MERGE_TARGET_APP_TOKEN,
|
||||
target_table_name="Merged Data"
|
||||
)
|
||||
Reference in New Issue
Block a user