# -*- coding: UTF-8 -*- import api import config import logging import utils LOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s" logging.basicConfig(format=LOG_FORMAT, level=logging.INFO) def get_all_records(client: api.Client, access_token: str, app_token: str, table_id: str): """Retrieve all records from a given table, handling pagination if necessary.""" records = [] page_token = None while True: resp = client.get_records_list(access_token, app_token, table_id, page_token=page_token) items = resp.get('items', []) if items: records.extend(items) if resp.get('has_more'): page_token = resp.get('page_token') else: break return records def get_all_fields(client: api.Client, access_token: str, app_token: str, table_id: str): """Retrieve all fields from a given table, handling pagination if necessary.""" fields = [] page_token = None while True: resp = client.get_fields_list(access_token, app_token, table_id, page_token=page_token) items = resp.get('items', []) if items: fields.extend(items) if resp.get('has_more'): page_token = resp.get('page_token') else: break return fields def merge_schema(fields1: list, fields2: list): """ Merge two lists of field definitions into one unified schema list. If fields have the same name, we assume they are the same field. We'll keep the definition from fields1 if there's a collision. """ merged_fields = {} # Process fields from first table for f in fields1: # We use a deep copy-like approach or just assign, but we will mutate properties later import copy merged_fields[f['field_name']] = { 'field_name': f['field_name'], 'type': f['type'], 'property': copy.deepcopy(f.get('property')) } # Process fields from second table (add if not exists) for f in fields2: import copy if f['field_name'] not in merged_fields: merged_fields[f['field_name']] = { 'field_name': f['field_name'], 'type': f['type'], 'property': copy.deepcopy(f.get('property')) } else: # Merge options if both are SingleSelect (3) or MultiSelect (4) existing = merged_fields[f['field_name']] if existing['type'] in (3, 4) and existing.get('property', {}).get('options') and f.get('property', {}).get('options'): existing_names = {opt['name'] for opt in existing['property']['options']} for opt in f['property']['options']: if opt['name'] not in existing_names: existing['property']['options'].append(copy.deepcopy(opt)) existing_names.add(opt['name']) # Clean up option IDs to prevent insertion errors in the target table for f_name, f_def in merged_fields.items(): if f_def.get('property') and 'options' in f_def['property']: for opt in f_def['property']['options']: if 'id' in opt: del opt['id'] # Add a custom "Source" field merged_fields['Source'] = { 'field_name': 'Source', 'type': 1, # Text type 'property': None } return list(merged_fields.values()) def merge_bitables( client: api.Client, access_token: str, source_app_token_1: str, source_table_id_1: str, source_app_token_2: str, source_table_id_2: str, target_app_token: str, target_table_name: str = "Merged Table" ): """ Core function to merge two bitables into a single one. """ logging.info(f"Merging {source_app_token_1}/{source_table_id_1} and {source_app_token_2}/{source_table_id_2} into {target_app_token}") # 1. Read schema from both tables logging.info("Reading schema from source tables...") fields1 = get_all_fields(client, access_token, source_app_token_1, source_table_id_1) fields2 = get_all_fields(client, access_token, source_app_token_2, source_table_id_2) # 2. Merge schema unified_schema = merge_schema(fields1, fields2) # 3. Create Target Table logging.info(f"Creating target table '{target_table_name}'...") target_table_id = client.create_table(access_token, target_app_token, target_table_name) # The new table will have some default columns. # Let's get them, so we can update them or ignore them. target_initial_fields = client.get_fields_list(access_token, target_app_token, target_table_id).get('items', []) initial_field_names = {f['field_name']: f['field_id'] for f in target_initial_fields} # 4. Create Fields in Target Table logging.info("Creating fields in target table...") target_field_map = {} # Maps field name to its new field_id in the target table for field_def in unified_schema: name = field_def['field_name'] ftype = field_def['type'] fprop = field_def['property'] # If the default table creation already made this field (e.g. initial '多行文本' / Text), we can update it or skip if name in initial_field_names: field_id = initial_field_names[name] # Optionally update it the type is different (often default is just 'Text') client.update_field(access_token, target_app_token, target_table_id, field_id, name, ftype, fprop) target_field_map[name] = field_id else: try: resp = client.add_field(access_token, target_app_token, target_table_id, name, ftype, fprop) target_field_map[name] = resp['field_id'] except utils.LarkException as e: logging.error(f"Failed to create field '{name}': {e}") # 5. Read Records from Source Tables logging.info("Reading records from source tables...") records1 = get_all_records(client, access_token, source_app_token_1, source_table_id_1) records2 = get_all_records(client, access_token, source_app_token_2, source_table_id_2) # 6. Map and batch insert records logging.info("Mapping and inserting records...") def process_records(source_records, source_name): batch = [] for r in source_records: original_fields = r.get('fields', {}) new_fields = {} for fname, fvalue in original_fields.items(): if fname in target_field_map: # In bitable API, it's safer to use the new field_names (or their ids depending on the specific endpoint). # The get_records endpoint returns field names as keys. The batch_create_records also accepts field names as keys if no 'user_id_type' strictly enforces IDs. # We'll use the mapped names to be safe. new_fields[fname] = fvalue # Add Source new_fields['Source'] = source_name batch.append({'fields': new_fields}) return batch target_records = [] target_records.extend(process_records(records1, f"{source_app_token_1}/{source_table_id_1}")) target_records.extend(process_records(records2, f"{source_app_token_2}/{source_table_id_2}")) # Feishu batch insert has a limit of 500 per request usually, so we chunk it just in case CHUNK_SIZE = 500 for i in range(0, len(target_records), CHUNK_SIZE): chunk = target_records[i:i + CHUNK_SIZE] if chunk: try: client.batch_create_records(access_token, target_app_token, target_table_id, chunk) except utils.LarkException as e: logging.error(f"Failed to insert chunk of records: {e}") logging.info(f"Successfully merged into {target_table_id}!") return target_table_id if __name__ == "__main__": # Example execution using vars from config.py client = api.Client(config.LARK_HOST) # Get tenant access token try: access_token = client.get_tenant_access_token(config.APP_ID, config.APP_SECRET) except Exception as e: logging.error(f"Could not get access token: {e}") exit(1) logging.info(f"Using App ID: {config.APP_ID}") # These need to be valid app_tokens and table_ids for this script to actually work is_placeholder = (config.MERGE_SOURCE_APP_TOKEN_1 == "source_app_token_1") if is_placeholder: logging.warning("Please update config.py or environment variables with valid MERGE_* tokens to execute.") else: merge_bitables( client=client, access_token=access_token, source_app_token_1=config.MERGE_SOURCE_APP_TOKEN_1, source_table_id_1=config.MERGE_SOURCE_TABLE_ID_1, source_app_token_2=config.MERGE_SOURCE_APP_TOKEN_2, source_table_id_2=config.MERGE_SOURCE_TABLE_ID_2, target_app_token=config.MERGE_TARGET_APP_TOKEN, target_table_name="Merged Data" )