rl-warmup-plugin/includes/class-rl-mailwarmer-message-handler.php
ruben de40085318 Implement AI-powered conversation generation and improved campaign timeline management
- Add OpenAI integration for realistic email conversation generation
- Enhance campaign timeline algorithm with natural distribution patterns
- Improve message handling with Symfony Mailer components
- Add conversation blueprint system for structured email threads
- Implement visual timeline with heatmap for campaign tracking

🤖 Generated with [Claude Code](https://claude.ai/code)
Co-Authored-By: Claude <noreply@anthropic.com>
2025-03-07 01:49:49 -06:00

534 lines
23 KiB
PHP

<?php
/**
* Helper functions for the conversation messages
*/
use Symfony\Component\Mime\Email;
use Symfony\Component\Mime\Address;
use Symfony\Component\Mailer\Mailer;
use Symfony\Component\Mailer\Transport\Transport;
use Symfony\Component\Mailer\Exception\TransportExceptionInterface;
use PhpImap\Mailbox;
use phpseclib3\Net\SSH2;
use phpseclib3\Crypt\PublicKeyLoader;
if (!defined('ABSPATH')) {
exit;
}
class RL_MailWarmer_Message_Handler {
/**
* Process pending messages.
*/
public static function process_pending_messages() {
// log_to_file("process_pending_messages - Running");
global $wpdb;
$results = [
'success' => 0,
'failure' => 0,
];
// Fetch the next 100 pending messages with scheduled timestamps in the past
$table_name = $wpdb->prefix . 'rl_mailwarmer_messages';
$messages = $wpdb->get_results(
$wpdb->prepare(
"SELECT * FROM $table_name WHERE status = %s AND scheduled_for_timestamp < %s ORDER BY scheduled_for_timestamp ASC LIMIT 10",
'scheduled',
current_time('mysql')
),
ARRAY_A
);
// if (empty($messages)) {
// // log_to_file("process_pending_messages - messages empty");
// return;
// }
$messages_count = count($messages);
if ($messages_count > 0) {
action_log("process_pending_messages - Processing {$messages_count} messages" );
foreach ($messages as $message) {
log_to_file("==========================================================");
try {
// if (!empty($message['first_message']) && $message['first_message']) {
// log_to_file("process_pending_messages - trying send_message");
// $result = self::send_message($message);
// } else {
// log_to_file("process_pending_messages - trying reply_message");
// $result = self::reply_message($message);
// }
log_to_file("process_pending_messages - trying send_message for {$message['id']} from {$message['from_email']} to ", $message['to_email']);
$result = self::send_message($message);
// Update message status to 'sent' on success
if ($result) {
self::update_message_status($message['id'], 'sent');
$results['success']++;
log_to_file("process_pending_messages - Success sending message: {$message['id']}");
action_log("process_pending_messages - Sent email {$message['id']} from {$message['from_email']} to ", $message['to_email']);
} else {
self::update_message_status($message['id'], 'failed');
log_to_file("process_pending_messages - Error sending message: {$message['id']}");
$results['failure']++;
}
} catch (Exception $e) {
// Handle errors gracefully and log them
log_to_file('process_pending_messages - Error processing message ID ' . $message['id'] . ': ' . $e->getMessage());
self::update_message_status($message['id'], 'failed');
$results['failure']++;
}
// sleep(3);
}
log_to_file("process_pending_messages - Results: ", $results);
action_log("process_pending_messages - Finished processing {$messages_count} messages with {$results['success']} sent and {$results['failure']} failures");
}
return $results;
}
/**
* Send the first message in a conversation.
*
* @param array $message The message details.
* @return bool True if the message is sent successfully, false otherwise.
* @throws Exception If required fields are missing or an error occurs during sending.
*/
public static function send_message($message) {
// log_to_file("send_message - Running");
// log_to_file("send_message - Message: ", $message);
// Prepare email data and connection info
$email_data = self::prepare_email_data($message);
// log_to_file("send_message - Email Data: ", $email_data);
// Extract connection info
$connection_info = $email_data['connection_info'];
if (!empty($connection_info['smtp_password'])) {
$password = $connection_info['smtp_password'];
} else {
$password = $connection_info['mail_password'];
}
// Check required fields
if (empty($email_data['to']) || empty($email_data['from']) || empty($email_data['subject']) || empty($email_data['text_body'])) {
// log_to_file("send_message - Missing required fields for sending the email");
throw new Exception(__('Missing required fields for sending the email.', 'rl-mailwarmer'));
}
// Create the SMTP transport
try {
// log_to_file("send_message - Creating Transport");
// Create the SMTP transport
$transport = new Symfony\Component\Mailer\Transport\Smtp\EsmtpTransport(
$connection_info['smtp_server'],
$connection_info['smtp_port']
);
// Set authentication details
$transport->setUsername($email_data['from']);
$transport->setPassword($password);
$to_addresses = $email_data['to'];
if (!is_array($to_addresses)) {
$to_addresses = json_decode($to_addresses, true);
}
$to_addresses_type = gettype($to_addresses);
// log_to_file("send_message - To ({$to_addresses_type}): ", $to_addresses);
// Create the mailer
$mailer = new Symfony\Component\Mailer\Mailer($transport);
// Send an email
$email_message = (new Symfony\Component\Mime\Email())
->from(new Address($email_data['from'], $email_data['name']))
->to(...$to_addresses)
->subject($email_data['subject'])
->text($email_data['text_body'])
->html($email_data['html_body']);
// Add headers
$campaign_tracking_id = $email_data['campaign_tracking_id'];
// $previous_message_id = $message['previous_message_id'];
// if ($previous_message_id) {
// $campaign_tracking_id .= '-' . $previous_message_id;
// }
$email_message->getHeaders()->addTextHeader('X-MFTID', $campaign_tracking_id);
// log_to_file("send_message - Creating email with MFTID: {$campaign_tracking_id}");
// log_to_file("send_message - Trying to send email.");
$smtp_result = $mailer->send($email_message);
// log_to_file("send_message - Message sent!", $smtp_result);
return true;
} catch (TransportExceptionInterface $e) {
log_to_file("send_message - Error sending email {$message['id']}: " . $e->getMessage());
return false;
}
}
public static function search_email_by_x_mftid($imap_stream, $imap_server, $campaign_tracking_id) {
$folders = imap_list($imap_stream, $imap_server, '*');
$result = ['folder' => null, 'email' => null];
$search_term = '/X\-MFTID: ' . preg_quote($campaign_tracking_id, '/') . '.+/i';
log_to_file("search_email_by_x_mftid - search term: {$search_term}");
foreach ($folders as $folder) {
$decoded_folder = imap_utf7_decode($folder);
// log_to_file("search_email_by_x_mftid - decoded_folder: ", $decoded_folder);
$status = imap_status($imap_stream, $decoded_folder, SA_MESSAGES);
if ($status->messages > 0) {
log_to_file("search_email_by_x_mftid - Searching {$decoded_folder}");
$emails = imap_search($imap_stream, 'TEXT "' . $campaign_tracking_id . '"', SE_UID);
if ($emails) {
$result['folder'] = $decoded_folder;
$result['email'] = $emails;
break;
}
}
}
return $result;
}
/**
* Prepare email data and fetch connection info.
*
* @param array $message The message data.
* @return array Prepared email data and connection info.
* @throws Exception If required data is missing.
*/
private static function prepare_email_data($message) {
// log_to_file("prepare_email_data - Running");
// log_to_file("prepare_email_data - Message: ", $message);
// Ensure 'from' is valid and fetch connection info
if (empty($message['from_email'])) {
throw new Exception(__('Missing "from" address in the message.', 'rl-mailwarmer'));
}
// Find the email-account post matching the 'from' address
$from_post_id = self::find_email_account_by_address($message['from_email']);
if (!$from_post_id) {
throw new Exception(__('No matching email account found for "from" address.', 'rl-mailwarmer'));
}
// Fetch connection details
// log_to_file("prepare_email_data - Getting connection info");
$full_name = get_post_meta($from_post_id, 'full_name', true);
$mail_password = get_post_meta($from_post_id, 'mail_password', true);
$email_provider_id = get_post_meta($from_post_id, 'email_provider', true);
$connection_info = RL_MailWarmer_Email_Helper::get_provider_defaults($email_provider_id);
$connection_info['username'] = $message['from_email'];
$connection_info['mail_password'] = $mail_password;
// log_to_file("prepare_email_data - Connection Info: ", $connection_info);
// Override provider defaults with account-specific settings
foreach (['smtp_server', 'smtp_port', 'smtp_password', 'imap_server', 'imap_port', 'imap_password'] as $key) {
$meta_value = get_post_meta($from_post_id, $key, true);
if (!empty($meta_value)) {
$connection_info[$key] = $meta_value;
}
}
// Handle recipients
// log_to_file("prepare_email_data - Handling recipients");
// $to_emails = json_decode();
$to_emails = is_array($message['to_email']) ? $message['to_email'] : json_decode($message['to_email']);
$cc_emails = is_array($message['cc']) ? $message['cc'] : json_decode($message['cc']);
$campaign_tracking_id = $message['campaign_tracking_id'] . '-' . $message['id'];
$previous_message_id = $message['previous_message_id'];
$text_body = $message['body'] . "\n\n" . $campaign_tracking_id;
$html_body = $message['body'] . "<br /><br /><span style='font-size:0.5rem;'>{$campaign_tracking_id}</span>";
return [
'id' => $message['id'],
'connection_info' => $connection_info,
'to' => array_filter(array_map('trim', array_map('stripslashes', $to_emails))),
'cc' => array_filter(array_map('trim', array_map('stripslashes', $cc_emails))),
'subject' => $message['subject'],
'text_body' => $text_body,
'html_body' => $html_body,
'from' => $message['from_email'],
'name' => $full_name,
'campaign_tracking_id' => $campaign_tracking_id,
'previous_message_id' => $previous_message_id,
];
}
/**
* Find the email-account post ID by email address.
*
* @param string $email_address The email address to search for.
* @return int|null The post ID if found, or null.
*/
private static function find_email_account_by_address($email_address) {
// log_to_file("find_email_account_by_address - Searching for: $email_address");
$query = new WP_Query([
'post_type' => 'email-account',
'post_status' => 'publish',
'title' => $email_address,
'fields' => 'ids',
]);
if (!empty($query->posts)) {
return $query->posts[0];
} else {
return new WP_Error('find_email_account_by_address - Unable to find a matching email address');
}
}
/**
* Update the status of a message.
*
* @param int $message_id The ID of the message to update.
* @param string $status The new status ('completed', 'failed', etc.).
* @return void
*/
private static function update_message_status($message_id, $status) {
global $wpdb;
$table_name = $wpdb->prefix . 'rl_mailwarmer_messages';
$wpdb->update(
$table_name,
['status' => $status],
['id' => $message_id],
['%s'],
['%d']
);
}
// /**
// * Reply to an email message.
// *
// * @param array $message The message details.
// * @return bool True if the message is replied to successfully, false otherwise.
// * @throws Exception If required fields are missing or an error occurs.
// */
// public static function reply_message($message) {
// // Prepare email data and connection info
// $email_data = self::prepare_email_data($message);
// log_to_file("reply_message - Email Data: ", $email_data);
// // Validate required fields
// if (empty($email_data['to']) || empty($email_data['from']) || empty($email_data['subject']) || empty($email_data['text_body'])) {
// throw new Exception(__('Missing required fields for replying to the email.', 'rl-mailwarmer'));
// }
// // Extract connection info
// $connection_info = $email_data['connection_info'];
// $emails = '';
// try {
// // log_to_file("reply_message - Trying to reply via IMAP {$connection_info['imap_server']}:{$connection_info['imap_port']}");
// // $imap = new \PhpImap\Mailbox(
// // sprintf('{%s:%s/imap/ssl}', $connection_info['imap_server'], $connection_info['imap_port']),
// // $connection_info['username'],
// // $connection_info['mail_password'],
// // null,
// // 'UTF-8'
// // );
// // $imap->checkMailbox();
// // // Search for the email with the matching subject
// // log_to_file("reply_message - Searching for message with X-MFTID");
// // $emails = $imap->searchMailbox('HEADER X-MFTID "' . addslashes($email_data['campaign_tracking_id']) . '"');
// $password = $connection_info['mail_password'];
// $email = $connection_info['username'];
// log_to_file("reply_message - Trying to reply via IMAP for {$email} using: ", $connection_info);
// $imap_server = '{' . $connection_info['imap_server'] . ':' . $connection_info['imap_port'] . '/imap/ssl}';
// log_to_file("reply_message - Trying to open stream for {$email} : {$password} @ {$imap_server}");
// // Try connecting to the IMAP server
// $imap_stream = @imap_open(
// $imap_server,
// $email,
// $password
// );
// if ($imap_stream) {
// log_to_file("reply_message - IMAP stream opened.");
// $imap_search_result = self::search_email_by_x_mftid($imap_stream, $imap_server, $email_data['campaign_tracking_id']);
// if ($imap_search_result['folder']) {
// log_to_file("reply_message - Email with X-MFTID found in folder: {$imap_search_result['folder']}");
// log_to_file("reply_message - Email: ", $imap_search_result['email']);
// // $emails = $imap_search_result['email'];
// // Continue with reply logic
// } else {
// log_to_file("No email found with X-MFTID: " . $email_data['campaign_tracking_id']);
// }
// imap_close($imap_stream);
// }
// if (!empty($emails)) {
// // Fetch the email data
// $original_email = $imap->getMail($emails[0]);
// log_to_file("reply_message - Message found!");
// // log_to_file("reply_message - IMAP Message: ", $original_email);
// // Step 2: Send the reply via SMTP
// $transport = new Symfony\Component\Mailer\Transport\Smtp\EsmtpTransport(
// $connection_info['smtp_server'],
// $connection_info['smtp_port']
// );
// // Set authentication details
// $transport->setUsername($connection_info['username']);
// $transport->setPassword($connection_info['mail_password']);
// $to_addresses = $email_data['to'];
// if (!is_array($to_addresses)) {
// $to_addresses = json_decode($to_addresses, true);
// }
// $mailer = new Mailer($transport);
// $reply_email = (new Email())
// ->from(new Address($email_data['from'], $email_data['name']))
// ->to(...$to_addresses)
// ->subject('Re: ' . $original_email->subject)
// ->text($email_data['text_body'])
// ->html($email_data['html_body']);
// // Add headers
// $campaign_tracking_id = $email_data['campaign_tracking_id'];
// $reply_email->getHeaders()->addTextHeader('X-MFTID', $campaign_tracking_id);
// // Add headers for threading
// $headers = $reply_email->getHeaders();
// $headers->addTextHeader('In-Reply-To', $original_email->messageId);
// $headers->addTextHeader('References', trim($original_email->headers->references . ' ' . $original_email->messageId));
// // ->addHeader('In-Reply-To', $original_email->messageId)
// // ->addHeader('References', trim($original_email->headers->references . ' ' . $original_email->messageId));
// // if (!empty($email_data['cc'])) {
// // $reply_email->cc(...$email_data['cc']);
// // }
// $mailer->send($reply_email);
// log_to_file("reply_message - Successfully sent IMAP/SMTP reply from ", $email_data['from']);
// // Step 3: Upload the reply to the Sent folder
// $imap_stream = imap_open(
// sprintf('{%s:%d/imap/ssl}', $connection_info['imap_server'], $connection_info['imap_port']),
// $connection_info['username'],
// $connection_info['mail_password']
// );
// $raw_message = $reply_email->toString(); // Convert the Email object to raw MIME format
// imap_append($imap_stream, sprintf('{%s}/Sent', $connection_info['imap_server']), $raw_message);
// imap_close($imap_stream);
// // Create the reply headers
// // $reply_headers = [
// // 'In-Reply-To' => $original_email->messageId,
// // 'References' => trim($original_email->headers->references . ' ' . $original_email->messageId),
// // ];
// // // Construct the reply body
// // $reply_body = $email_data['body'] . "\n\n" .
// // 'On ' . $original_email->date . ', ' . $original_email->fromName . ' <' . $original_email->fromAddress . '> wrote:' . "\n" .
// // $original_email->textPlain;
// // // Send the reply via IMAP
// // log_to_file("reply_message - Sending message via IMAP");
// // $imap->addMessageToSentFolder(
// // 'To: ' . implode(', ', $email_data['to']) . "\r\n" .
// // 'Cc: ' . implode(', ', $email_data['cc']) . "\r\n" .
// // 'Subject: Re: ' . $original_email->subject . "\r\n" .
// // 'From: ' . $email_data['from'] . "\r\n" .
// // 'In-Reply-To: ' . $reply_headers['In-Reply-To'] . "\r\n" .
// // 'References: ' . $reply_headers['References'] . "\r\n" .
// // "\r\n" .
// // $reply_body
// // );
// // log_to_file("reply_message - Done message via IMAP");
// // $mailer = new Mailer($transport);
// // $mailer->send($reply);
// return true; // Reply sent successfully
// } else {
// log_to_file("reply_message - Unable to reply via IMAP. Falling back to SMTP");
// }
// } catch (Exception $e) {
// log_to_file('reply_message - IMAP Error: ' . $e->getMessage());
// }
// // Fallback to SMTP if IMAP fails
// try {
// log_to_file("reply_message - Falling back to SMTP");
// $result = self::send_message($message);
// return $result;
// // $to_addresses = $email_data['to'];
// // if (!is_array($to_addresses)) {
// // $to_addresses = json_decode($to_addresses, true);
// // }
// // log_to_file("reply_message - Creating SMTP message");
// // $smtp_reply = (new Email())
// // ->from(new Address($email_data['from'], $email_data['name']))
// // ->to(...$to_addresses)
// // ->subject($email_data['subject'])
// // ->text($email_data['text_body'])
// // ->html($email_data['html_body']);
// // // Add headers
// // $campaign_tracking_id = $email_data['campaign_tracking_id'];
// // $smtp_reply->getHeaders()->addTextHeader('X-MFTID', $campaign_tracking_id);
// // // Add CCs if present
// // // if (!empty($email_data['cc'])) {
// // // $smtp_reply->cc(...$email_data['cc']);
// // // }
// // // Create the SMTP transport
// // log_to_file("reply_message - Creating SMTP transport");
// // $transport = new Symfony\Component\Mailer\Transport\Smtp\EsmtpTransport(
// // $connection_info['smtp_server'],
// // $connection_info['smtp_port']
// // );
// // // Set authentication details
// // $transport->setUsername($connection_info['username']);
// // $transport->setPassword($connection_info['mail_password']);
// // // Create the mailer
// // log_to_file("reply_message - Creating SMTP mailer");
// // $mailer = new Symfony\Component\Mailer\Mailer($transport);
// // $smtp_result = $mailer->send($smtp_reply);
// // log_to_file("reply_message - Sent reply via fallback SMTP from {$email_data['from']}:", $smtp_result);
// // // log_to_file('reply_message - SMTP Send Success (?)');
// // Fallback SMTP reply sent successfully
// } catch (Exception $e) {
// log_to_file('reply_message - SMTP Error: ' . $e->getMessage());
// return false; // Reply failed
// }
// }
}